]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
6 from ceph
.deployment
import inventory
7 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, PlacementSpec
10 from typing
import List
, Dict
, Optional
, Callable
, Any
11 from ceph
.deployment
.drive_group
import DriveGroupSpec
13 pass # just for type checking
16 from kubernetes
import client
, config
17 from kubernetes
.client
.rest
import ApiException
19 kubernetes_imported
= True
21 # https://github.com/kubernetes-client/python/issues/895
22 from kubernetes
.client
.models
.v1_container_image
import V1ContainerImage
23 def names(self
, names
):
25 V1ContainerImage
.names
= V1ContainerImage
.names
.setter(names
)
28 kubernetes_imported
= False
32 from mgr_module
import MgrModule
35 from .rook_cluster
import RookCluster
38 class RookCompletion(orchestrator
.Completion
):
44 # type: (Callable) -> Callable[..., RookCompletion]
46 Decorator to make RookOrchestrator methods return
47 a completion object that executes themselves.
51 def wrapper(*args
, **kwargs
):
52 return RookCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
57 def write_completion(on_complete
, # type: Callable
60 calc_percent
=None # type: Optional[Callable[[], RookCompletion]]
62 # type: (...) -> RookCompletion
63 return RookCompletion
.with_progress(
66 on_complete
=lambda _
: on_complete(),
67 calc_percent
=calc_percent
,
71 class RookEnv(object):
73 # POD_NAMESPACE already exist for Rook 0.9
74 self
.namespace
= os
.environ
.get('POD_NAMESPACE', 'rook-ceph')
76 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
77 self
.cluster_name
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_NAME', self
.namespace
)
79 self
.operator_namespace
= os
.environ
.get('ROOK_OPERATOR_NAMESPACE', self
.namespace
)
80 self
.crd_version
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
81 self
.api_name
= "ceph.rook.io/" + self
.crd_version
83 def api_version_match(self
):
84 return self
.crd_version
== 'v1'
86 def has_namespace(self
):
87 return 'POD_NAMESPACE' in os
.environ
90 class RookOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
92 Writes are a two-phase thing, firstly sending
93 the write to the k8s API (fast) and then waiting
94 for the corresponding change to appear in the
97 Right now, we are calling the k8s API synchronously.
101 # TODO: configure k8s API addr instead of assuming local
102 ] # type: List[Dict[str, Any]]
104 def process(self
, completions
):
105 # type: (List[RookCompletion]) -> None
108 self
.log
.info("process: completions={0}".format(orchestrator
.pretty_print(completions
)))
110 for p
in completions
:
115 if not kubernetes_imported
:
116 return False, "`kubernetes` python module not found"
117 if not RookEnv().api_version_match():
118 return False, "Rook version unsupported."
122 if not kubernetes_imported
:
123 return False, "`kubernetes` python module not found"
124 elif not self
._rook
_env
.has_namespace():
125 return False, "ceph-mgr not running in Rook cluster"
128 self
.k8s
.list_namespaced_pod(self
._rook
_env
.cluster_name
)
129 except ApiException
as e
:
130 return False, "Cannot reach Kubernetes API: {}".format(e
)
134 def __init__(self
, *args
, **kwargs
):
135 super(RookOrchestrator
, self
).__init
__(*args
, **kwargs
)
137 self
._initialized
= threading
.Event()
138 self
._k
8s
_CoreV
1_api
= None
139 self
._k
8s
_BatchV
1_api
= None
140 self
._rook
_cluster
= None
141 self
._rook
_env
= RookEnv()
143 self
._shutdown
= threading
.Event()
145 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
152 # type: () -> client.CoreV1Api
153 self
._initialized
.wait()
154 assert self
._k
8s
_CoreV
1_api
is not None
155 return self
._k
8s
_CoreV
1_api
158 def rook_cluster(self
):
159 # type: () -> RookCluster
160 self
._initialized
.wait()
161 assert self
._rook
_cluster
is not None
162 return self
._rook
_cluster
165 # For deployed clusters, we should always be running inside
166 # a Rook cluster. For development convenience, also support
167 # running outside (reading ~/.kube config)
169 if self
._rook
_env
.has_namespace():
170 config
.load_incluster_config()
171 cluster_name
= self
._rook
_env
.cluster_name
173 self
.log
.warning("DEVELOPMENT ONLY: Reading kube config from ~")
174 config
.load_kube_config()
176 cluster_name
= "rook-ceph"
178 # So that I can do port forwarding from my workstation - jcsp
179 from kubernetes
.client
import configuration
180 configuration
.verify_ssl
= False
182 self
._k
8s
_CoreV
1_api
= client
.CoreV1Api()
183 self
._k
8s
_BatchV
1_api
= client
.BatchV1Api()
186 # XXX mystery hack -- I need to do an API call from
187 # this context, or subsequent API usage from handle_command
188 # fails with SSLError('bad handshake'). Suspect some kind of
189 # thread context setup in SSL lib?
190 self
._k
8s
_CoreV
1_api
.list_namespaced_pod(cluster_name
)
192 # Ignore here to make self.available() fail with a proper error message
195 self
._rook
_cluster
= RookCluster(
196 self
._k
8s
_CoreV
1_api
,
197 self
._k
8s
_BatchV
1_api
,
200 self
._initialized
.set()
202 while not self
._shutdown
.is_set():
203 # XXX hack (or is it?) to kick all completions periodically,
204 # in case we had a caller that wait()'ed on them long enough
205 # to get persistence but not long enough to get completion
207 self
.all_progress_references
= [p
for p
in self
.all_progress_references
if not p
.effective
]
208 for p
in self
.all_progress_references
:
211 self
._shutdown
.wait(5)
213 def cancel_completions(self
):
214 for p
in self
.all_progress_references
:
216 self
.all_progress_references
.clear()
219 def get_inventory(self
, host_filter
=None, refresh
=False):
221 if host_filter
and host_filter
.hosts
:
223 host_list
= host_filter
.hosts
224 elif host_filter
and host_filter
.labels
:
225 # TODO: query k8s API to resolve to host list, and pass
226 # it into RookCluster.get_discovered_devices
227 raise NotImplementedError()
229 devs
= self
.rook_cluster
.get_discovered_devices(host_list
)
232 for host_name
, host_devs
in devs
.items():
235 dev
= inventory
.Device(
236 path
='/dev/' + d
['name'],
238 rotational
='1' if d
['rotational'] else '0',
241 available
=d
['empty'],
242 rejected_reasons
=[] if d
['empty'] else ['not empty'],
246 result
.append(orchestrator
.InventoryHost(host_name
, inventory
.Devices(devs
)))
252 # type: () -> List[orchestrator.HostSpec]
253 return [orchestrator
.HostSpec(n
) for n
in self
.rook_cluster
.get_node_names()]
256 def describe_service(self
, service_type
=None, service_name
=None,
258 now
= datetime
.datetime
.utcnow()
261 cl
= self
.rook_cluster
.rook_api_get(
262 "cephclusters/{0}".format(self
.rook_cluster
.rook_env
.cluster_name
))
263 self
.log
.debug('CephCluster %s' % cl
)
264 image_name
= cl
['spec'].get('cephVersion', {}).get('image', None)
265 num_nodes
= len(self
.rook_cluster
.get_node_names())
268 spec
['mon'] = orchestrator
.ServiceDescription(
272 placement
=PlacementSpec(
273 count
=cl
['spec'].get('mon', {}).get('count', 1),
276 size
=cl
['spec'].get('mon', {}).get('count', 1),
277 container_image_name
=image_name
,
280 spec
['mgr'] = orchestrator
.ServiceDescription(
284 placement
=PlacementSpec
.from_string('count:1'),
287 container_image_name
=image_name
,
290 if not cl
['spec'].get('crashCollector', {}).get('disable', False):
291 spec
['crash'] = orchestrator
.ServiceDescription(
292 service_name
='crash',
295 placement
=PlacementSpec
.from_string('*'),
298 container_image_name
=image_name
,
303 all_fs
= self
.rook_cluster
.rook_api_get(
305 self
.log
.debug('CephFilesystems %s' % all_fs
)
306 for fs
in all_fs
.get('items', []):
307 svc
= 'mds.' + fs
['metadata']['name']
310 # FIXME: we are conflating active (+ standby) with count
311 active
= fs
['spec'].get('metadataServer', {}).get('activeCount', 1)
313 if fs
['spec'].get('metadataServer', {}).get('activeStandby', False):
314 total_mds
= active
* 2
315 spec
[svc
] = orchestrator
.ServiceDescription(
319 placement
=PlacementSpec(count
=active
),
322 container_image_name
=image_name
,
327 all_zones
= self
.rook_cluster
.rook_api_get(
329 self
.log
.debug('CephObjectstores %s' % all_zones
)
330 for zone
in all_zones
.get('items', []):
331 rgw_realm
= zone
['metadata']['name']
333 svc
= 'rgw.' + rgw_realm
+ '.' + rgw_zone
336 active
= zone
['spec']['gateway']['instances'];
337 if 'securePort' in zone
['spec']['gateway']:
339 port
= zone
['spec']['gateway']['securePort']
342 port
= zone
['spec']['gateway']['port'] or 80
343 spec
[svc
] = orchestrator
.ServiceDescription(
349 rgw_frontend_port
=port
,
350 placement
=PlacementSpec(count
=active
),
353 container_image_name
=image_name
,
357 for dd
in self
._list
_daemons
():
358 if dd
.service_name() not in spec
:
360 spec
[dd
.service_name()].running
+= 1
361 return [v
for k
, v
in spec
.items()]
364 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None,
366 return self
._list
_daemons
(daemon_type
, daemon_id
, host
, refresh
)
368 def _list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None,
370 pods
= self
.rook_cluster
.describe_pods(daemon_type
, daemon_id
, host
)
371 self
.log
.debug('pods %s' % pods
)
374 sd
= orchestrator
.DaemonDescription()
375 sd
.hostname
= p
['hostname']
376 sd
.container_id
= p
['name']
377 sd
.daemon_type
= p
['labels']['app'].replace('rook-ceph-', '')
386 sd
.status_desc
= p
['phase']
388 if 'ceph_daemon_id' in p
['labels']:
389 sd
.daemon_id
= p
['labels']['ceph_daemon_id']
390 elif 'ceph-osd-id' in p
['labels']:
391 sd
.daemon_id
= p
['labels']['ceph-osd-id']
393 # Unknown type -- skip it
396 if service_name
is not None and service_name
!= sd
.service_name():
398 sd
.container_image_name
= p
['container_image_name']
399 sd
.created
= p
['created']
400 sd
.last_configured
= p
['created']
401 sd
.last_deployed
= p
['created']
402 sd
.started
= p
['started']
403 sd
.last_refresh
= p
['refreshed']
408 def _service_add_decorate(self
, typename
, spec
, func
):
409 return write_completion(
410 on_complete
=lambda : func(spec
),
411 message
="Creating {} services for {}".format(typename
, spec
.service_id
),
415 def add_nfs(self
, spec
):
416 # type: (NFSServiceSpec) -> RookCompletion
417 return self
._service
_add
_decorate
("NFS", spec
,
418 self
.rook_cluster
.add_nfsgw
)
420 def _service_rm_decorate(self
, typename
, name
, func
):
421 return write_completion(
422 on_complete
=lambda : func(name
),
423 message
="Removing {} services for {}".format(typename
, name
),
427 def remove_service(self
, service_type
, service_name
):
428 if service_type
== 'mds':
429 return self
._service
_rm
_decorate
(
430 'MDS', service_name
, lambda: self
.rook_cluster
.rm_service(
431 'cephfilesystems', service_name
)
433 elif service_type
== 'rgw':
434 return self
._service
_rm
_decorate
(
435 'RGW', service_name
, lambda: self
.rook_cluster
.rm_service('cephobjectstores', service_name
)
437 elif service_type
== 'nfs':
438 return self
._service
_rm
_decorate
(
439 'NFS', service_name
, lambda: self
.rook_cluster
.rm_service('cephnfses', service_name
)
442 def apply_mon(self
, spec
):
443 # type: (ServiceSpec) -> RookCompletion
444 if spec
.placement
.hosts
or spec
.placement
.label
:
445 raise RuntimeError("Host list or label is not supported by rook.")
447 return write_completion(
448 lambda: self
.rook_cluster
.update_mon_count(spec
.placement
.count
),
449 "Updating mon count to {0}".format(spec
.placement
.count
),
453 def apply_mds(self
, spec
):
454 # type: (ServiceSpec) -> RookCompletion
455 return self
._service
_add
_decorate
('MDS', spec
,
456 self
.rook_cluster
.apply_filesystem
)
458 def apply_rgw(self
, spec
):
459 # type: (RGWSpec) -> RookCompletion
460 return self
._service
_add
_decorate
('RGW', spec
,
461 self
.rook_cluster
.apply_objectstore
)
464 def apply_nfs(self
, spec
):
465 # type: (NFSServiceSpec) -> RookCompletion
466 num
= spec
.placement
.count
467 return write_completion(
468 lambda: self
.rook_cluster
.update_nfs_count(spec
.service_id
, num
),
469 "Updating NFS server count in {0} to {1}".format(spec
.service_id
, num
),
473 def remove_daemons(self
, names
):
474 return write_completion(
475 lambda: self
.rook_cluster
.remove_pods(names
),
476 "Removing daemons {}".format(','.join(names
)),
480 def create_osds(self
, drive_group
):
481 # type: (DriveGroupSpec) -> RookCompletion
482 """ Creates OSDs from a drive group specification.
484 $: ceph orch osd create -i <dg.file>
486 The drivegroup file must only contain one spec at a time.
489 targets
= [] # type: List[str]
490 if drive_group
.data_devices
and drive_group
.data_devices
.paths
:
491 targets
+= [d
.path
for d
in drive_group
.data_devices
.paths
]
492 if drive_group
.data_directories
:
493 targets
+= drive_group
.data_directories
495 def execute(all_hosts_
):
496 # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
497 all_hosts
= [h
.hostname
for h
in all_hosts_
]
498 matching_hosts
= drive_group
.placement
.pattern_matches_hosts(all_hosts
)
500 assert len(matching_hosts
) == 1
502 if not self
.rook_cluster
.node_exists(matching_hosts
[0]):
503 raise RuntimeError("Node '{0}' is not in the Kubernetes "
504 "cluster".format(matching_hosts
))
506 # Validate whether cluster CRD can accept individual OSD
507 # creations (i.e. not useAllDevices)
508 if not self
.rook_cluster
.can_create_osd():
509 raise RuntimeError("Rook cluster configuration does not "
510 "support OSD creation.")
512 return orchestrator
.Completion
.with_progress(
513 message
="Creating OSD on {0}:{1}".format(
517 on_complete
=lambda _
:self
.rook_cluster
.add_osds(drive_group
, all_hosts
),
518 calc_percent
=lambda: has_osds(all_hosts
)
522 def has_osds(all_hosts
):
523 matching_hosts
= drive_group
.placement
.pattern_matches_hosts(all_hosts
)
525 # Find OSD pods on this host
527 pods
= self
.k8s
.list_namespaced_pod(self
._rook
_env
.namespace
,
528 label_selector
="rook_cluster={},app=rook-ceph-osd".format(self
._rook
_env
.cluster_name
),
529 field_selector
="spec.nodeName={0}".format(
533 pod_osd_ids
.add(int(p
.metadata
.labels
['ceph-osd-id']))
535 self
.log
.debug('pod_osd_ids={0}'.format(pod_osd_ids
))
538 osdmap
= self
.get("osd_map")
539 for osd
in osdmap
['osds']:
541 if osd_id
not in pod_osd_ids
:
544 metadata
= self
.get_metadata('osd', "%s" % osd_id
)
545 if metadata
and metadata
['devices'] in targets
:
548 self
.log
.info("ignoring osd {0} {1}".format(
549 osd_id
, metadata
['devices']
552 return found
is not None
554 c
= self
.get_hosts().then(execute
)
557 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> RookCompletion
:
558 return write_completion(
559 on_complete
=lambda: self
.rook_cluster
.blink_light(
560 ident_fault
, on
, locs
),
561 message
="Switching <{}> identification light in {}".format(
562 on
, ",".join(["{}:{}".format(loc
.host
, loc
.dev
) for loc
in locs
])),