]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
64fcff99c8d90ee81d45ae57789e131113308250
7 from ceph
.deployment
import inventory
8 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, PlacementSpec
11 from typing
import List
, Dict
, Optional
, Callable
, Any
12 from ceph
.deployment
.drive_group
import DriveGroupSpec
14 pass # just for type checking
17 from kubernetes
import client
, config
18 from kubernetes
.client
.rest
import ApiException
20 kubernetes_imported
= True
22 # https://github.com/kubernetes-client/python/issues/895
23 from kubernetes
.client
.models
.v1_container_image
import V1ContainerImage
24 def names(self
, names
):
26 V1ContainerImage
.names
= V1ContainerImage
.names
.setter(names
)
29 kubernetes_imported
= False
33 from mgr_module
import MgrModule
36 from .rook_cluster
import RookCluster
39 class RookCompletion(orchestrator
.Completion
):
45 # type: (Callable) -> Callable[..., RookCompletion]
47 Decorator to make RookOrchestrator methods return
48 a completion object that executes themselves.
52 def wrapper(*args
, **kwargs
):
53 return RookCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
58 def write_completion(on_complete
, # type: Callable
61 calc_percent
=None # type: Optional[Callable[[], RookCompletion]]
63 # type: (...) -> RookCompletion
64 return RookCompletion
.with_progress(
67 on_complete
=lambda _
: on_complete(),
68 calc_percent
=calc_percent
,
72 class RookEnv(object):
74 # POD_NAMESPACE already exist for Rook 0.9
75 self
.namespace
= os
.environ
.get('POD_NAMESPACE', 'rook-ceph')
77 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
78 self
.cluster_name
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_NAME', self
.namespace
)
80 self
.operator_namespace
= os
.environ
.get('ROOK_OPERATOR_NAMESPACE', self
.namespace
)
81 self
.crd_version
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
82 self
.api_name
= "ceph.rook.io/" + self
.crd_version
84 def api_version_match(self
):
85 return self
.crd_version
== 'v1'
87 def has_namespace(self
):
88 return 'POD_NAMESPACE' in os
.environ
91 class RookOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
93 Writes are a two-phase thing, firstly sending
94 the write to the k8s API (fast) and then waiting
95 for the corresponding change to appear in the
98 Right now, we are calling the k8s API synchronously.
102 # TODO: configure k8s API addr instead of assuming local
103 ] # type: List[Dict[str, Any]]
105 def process(self
, completions
):
106 # type: (List[RookCompletion]) -> None
109 self
.log
.info("process: completions={0}".format(orchestrator
.pretty_print(completions
)))
111 for p
in completions
:
116 if not kubernetes_imported
:
117 return False, "`kubernetes` python module not found"
118 if not RookEnv().api_version_match():
119 return False, "Rook version unsupported."
123 if not kubernetes_imported
:
124 return False, "`kubernetes` python module not found"
125 elif not self
._rook
_env
.has_namespace():
126 return False, "ceph-mgr not running in Rook cluster"
129 self
.k8s
.list_namespaced_pod(self
._rook
_env
.cluster_name
)
130 except ApiException
as e
:
131 return False, "Cannot reach Kubernetes API: {}".format(e
)
135 def __init__(self
, *args
, **kwargs
):
136 super(RookOrchestrator
, self
).__init
__(*args
, **kwargs
)
138 self
._initialized
= threading
.Event()
139 self
._k
8s
_CoreV
1_api
= None
140 self
._k
8s
_BatchV
1_api
= None
141 self
._rook
_cluster
= None
142 self
._rook
_env
= RookEnv()
144 self
._shutdown
= threading
.Event()
146 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
153 # type: () -> client.CoreV1Api
154 self
._initialized
.wait()
155 assert self
._k
8s
_CoreV
1_api
is not None
156 return self
._k
8s
_CoreV
1_api
159 def rook_cluster(self
):
160 # type: () -> RookCluster
161 self
._initialized
.wait()
162 assert self
._rook
_cluster
is not None
163 return self
._rook
_cluster
166 # For deployed clusters, we should always be running inside
167 # a Rook cluster. For development convenience, also support
168 # running outside (reading ~/.kube config)
170 if self
._rook
_env
.has_namespace():
171 config
.load_incluster_config()
172 cluster_name
= self
._rook
_env
.cluster_name
174 self
.log
.warning("DEVELOPMENT ONLY: Reading kube config from ~")
175 config
.load_kube_config()
177 cluster_name
= "rook-ceph"
179 # So that I can do port forwarding from my workstation - jcsp
180 from kubernetes
.client
import configuration
181 configuration
.verify_ssl
= False
183 self
._k
8s
_CoreV
1_api
= client
.CoreV1Api()
184 self
._k
8s
_BatchV
1_api
= client
.BatchV1Api()
187 # XXX mystery hack -- I need to do an API call from
188 # this context, or subsequent API usage from handle_command
189 # fails with SSLError('bad handshake'). Suspect some kind of
190 # thread context setup in SSL lib?
191 self
._k
8s
_CoreV
1_api
.list_namespaced_pod(cluster_name
)
193 # Ignore here to make self.available() fail with a proper error message
196 self
._rook
_cluster
= RookCluster(
197 self
._k
8s
_CoreV
1_api
,
198 self
._k
8s
_BatchV
1_api
,
201 self
._initialized
.set()
203 while not self
._shutdown
.is_set():
204 # XXX hack (or is it?) to kick all completions periodically,
205 # in case we had a caller that wait()'ed on them long enough
206 # to get persistence but not long enough to get completion
208 self
.all_progress_references
= [p
for p
in self
.all_progress_references
if not p
.effective
]
209 for p
in self
.all_progress_references
:
212 self
._shutdown
.wait(5)
214 def cancel_completions(self
):
215 for p
in self
.all_progress_references
:
217 self
.all_progress_references
.clear()
220 def get_inventory(self
, host_filter
=None, refresh
=False):
222 if host_filter
and host_filter
.hosts
:
224 host_list
= host_filter
.hosts
225 elif host_filter
and host_filter
.labels
:
226 # TODO: query k8s API to resolve to host list, and pass
227 # it into RookCluster.get_discovered_devices
228 raise NotImplementedError()
230 devs
= self
.rook_cluster
.get_discovered_devices(host_list
)
233 for host_name
, host_devs
in devs
.items():
236 if 'cephVolumeData' in d
and d
['cephVolumeData']:
237 devs
.append(inventory
.Device
.from_json(json
.loads(d
['cephVolumeData'])))
239 devs
.append(inventory
.Device(
240 path
= '/dev/' + d
['name'],
242 rotational
= '1' if d
['rotational'] else '0',
246 rejected_reasons
=['device data coming from ceph-volume not provided'],
249 result
.append(orchestrator
.InventoryHost(host_name
, inventory
.Devices(devs
)))
255 # type: () -> List[orchestrator.HostSpec]
256 return [orchestrator
.HostSpec(n
) for n
in self
.rook_cluster
.get_node_names()]
259 def describe_service(self
, service_type
=None, service_name
=None,
261 now
= datetime
.datetime
.utcnow()
264 cl
= self
.rook_cluster
.rook_api_get(
265 "cephclusters/{0}".format(self
.rook_cluster
.rook_env
.cluster_name
))
266 self
.log
.debug('CephCluster %s' % cl
)
267 image_name
= cl
['spec'].get('cephVersion', {}).get('image', None)
268 num_nodes
= len(self
.rook_cluster
.get_node_names())
271 spec
['mon'] = orchestrator
.ServiceDescription(
274 placement
=PlacementSpec(
275 count
=cl
['spec'].get('mon', {}).get('count', 1),
278 size
=cl
['spec'].get('mon', {}).get('count', 1),
279 container_image_name
=image_name
,
282 spec
['mgr'] = orchestrator
.ServiceDescription(
285 placement
=PlacementSpec
.from_string('count:1'),
288 container_image_name
=image_name
,
291 if not cl
['spec'].get('crashCollector', {}).get('disable', False):
292 spec
['crash'] = orchestrator
.ServiceDescription(
295 placement
=PlacementSpec
.from_string('*'),
298 container_image_name
=image_name
,
303 all_fs
= self
.rook_cluster
.rook_api_get(
305 self
.log
.debug('CephFilesystems %s' % all_fs
)
306 for fs
in all_fs
.get('items', []):
307 svc
= 'mds.' + fs
['metadata']['name']
310 # FIXME: we are conflating active (+ standby) with count
311 active
= fs
['spec'].get('metadataServer', {}).get('activeCount', 1)
313 if fs
['spec'].get('metadataServer', {}).get('activeStandby', False):
314 total_mds
= active
* 2
315 spec
[svc
] = orchestrator
.ServiceDescription(
318 service_id
=fs
['metadata']['name'],
319 placement
=PlacementSpec(count
=active
),
322 container_image_name
=image_name
,
327 all_zones
= self
.rook_cluster
.rook_api_get(
329 self
.log
.debug('CephObjectstores %s' % all_zones
)
330 for zone
in all_zones
.get('items', []):
331 rgw_realm
= zone
['metadata']['name']
333 svc
= 'rgw.' + rgw_realm
+ '.' + rgw_zone
336 active
= zone
['spec']['gateway']['instances'];
337 if 'securePort' in zone
['spec']['gateway']:
339 port
= zone
['spec']['gateway']['securePort']
342 port
= zone
['spec']['gateway']['port'] or 80
343 spec
[svc
] = orchestrator
.ServiceDescription(
345 service_id
=rgw_realm
+ '.' + rgw_zone
,
349 rgw_frontend_port
=port
,
350 placement
=PlacementSpec(count
=active
),
353 container_image_name
=image_name
,
357 for dd
in self
._list
_daemons
():
358 if dd
.service_name() not in spec
:
360 spec
[dd
.service_name()].running
+= 1
361 return [v
for k
, v
in spec
.items()]
364 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None,
366 return self
._list
_daemons
(daemon_type
, daemon_id
, host
, refresh
)
368 def _list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None,
370 pods
= self
.rook_cluster
.describe_pods(daemon_type
, daemon_id
, host
)
371 self
.log
.debug('pods %s' % pods
)
374 sd
= orchestrator
.DaemonDescription()
375 sd
.hostname
= p
['hostname']
376 sd
.container_id
= p
['name']
377 sd
.daemon_type
= p
['labels']['app'].replace('rook-ceph-', '')
386 sd
.status_desc
= p
['phase']
388 if 'ceph_daemon_id' in p
['labels']:
389 sd
.daemon_id
= p
['labels']['ceph_daemon_id']
390 elif 'ceph-osd-id' in p
['labels']:
391 sd
.daemon_id
= p
['labels']['ceph-osd-id']
393 # Unknown type -- skip it
396 if service_name
is not None and service_name
!= sd
.service_name():
398 sd
.container_image_name
= p
['container_image_name']
399 sd
.created
= p
['created']
400 sd
.last_configured
= p
['created']
401 sd
.last_deployed
= p
['created']
402 sd
.started
= p
['started']
403 sd
.last_refresh
= p
['refreshed']
408 def _service_add_decorate(self
, typename
, spec
, func
):
409 return write_completion(
410 on_complete
=lambda : func(spec
),
411 message
="Creating {} services for {}".format(typename
, spec
.service_id
),
415 def add_nfs(self
, spec
):
416 # type: (NFSServiceSpec) -> RookCompletion
417 return self
._service
_add
_decorate
("NFS", spec
,
418 self
.rook_cluster
.add_nfsgw
)
420 def _service_rm_decorate(self
, typename
, name
, func
):
421 return write_completion(
422 on_complete
=lambda : func(name
),
423 message
="Removing {} services for {}".format(typename
, name
),
427 def remove_service(self
, service_type
, service_name
):
428 if service_type
== 'mds':
429 return self
._service
_rm
_decorate
(
430 'MDS', service_name
, lambda: self
.rook_cluster
.rm_service(
431 'cephfilesystems', service_name
)
433 elif service_type
== 'rgw':
434 return self
._service
_rm
_decorate
(
435 'RGW', service_name
, lambda: self
.rook_cluster
.rm_service('cephobjectstores', service_name
)
437 elif service_type
== 'nfs':
438 return self
._service
_rm
_decorate
(
439 'NFS', service_name
, lambda: self
.rook_cluster
.rm_service('cephnfses', service_name
)
442 def apply_mon(self
, spec
):
443 # type: (ServiceSpec) -> RookCompletion
444 if spec
.placement
.hosts
or spec
.placement
.label
:
445 raise RuntimeError("Host list or label is not supported by rook.")
447 return write_completion(
448 lambda: self
.rook_cluster
.update_mon_count(spec
.placement
.count
),
449 "Updating mon count to {0}".format(spec
.placement
.count
),
453 def apply_mds(self
, spec
):
454 # type: (ServiceSpec) -> RookCompletion
455 return self
._service
_add
_decorate
('MDS', spec
,
456 self
.rook_cluster
.apply_filesystem
)
458 def apply_rgw(self
, spec
):
459 # type: (RGWSpec) -> RookCompletion
460 return self
._service
_add
_decorate
('RGW', spec
,
461 self
.rook_cluster
.apply_objectstore
)
463 def apply_nfs(self
, spec
):
464 # type: (NFSServiceSpec) -> RookCompletion
465 num
= spec
.placement
.count
466 return write_completion(
467 lambda: self
.rook_cluster
.update_nfs_count(spec
.service_id
, num
),
468 "Updating NFS server count in {0} to {1}".format(spec
.service_id
, num
),
472 def remove_daemons(self
, names
):
473 return write_completion(
474 lambda: self
.rook_cluster
.remove_pods(names
),
475 "Removing daemons {}".format(','.join(names
)),
479 def create_osds(self
, drive_group
):
480 # type: (DriveGroupSpec) -> RookCompletion
481 """ Creates OSDs from a drive group specification.
483 $: ceph orch osd create -i <dg.file>
485 The drivegroup file must only contain one spec at a time.
488 targets
= [] # type: List[str]
489 if drive_group
.data_devices
and drive_group
.data_devices
.paths
:
490 targets
+= [d
.path
for d
in drive_group
.data_devices
.paths
]
491 if drive_group
.data_directories
:
492 targets
+= drive_group
.data_directories
494 def execute(all_hosts_
):
495 # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
496 matching_hosts
= drive_group
.placement
.filter_matching_hosts(lambda label
=None, as_hostspec
=None: all_hosts_
)
498 assert len(matching_hosts
) == 1
500 if not self
.rook_cluster
.node_exists(matching_hosts
[0]):
501 raise RuntimeError("Node '{0}' is not in the Kubernetes "
502 "cluster".format(matching_hosts
))
504 # Validate whether cluster CRD can accept individual OSD
505 # creations (i.e. not useAllDevices)
506 if not self
.rook_cluster
.can_create_osd():
507 raise RuntimeError("Rook cluster configuration does not "
508 "support OSD creation.")
510 return orchestrator
.Completion
.with_progress(
511 message
="Creating OSD on {0}:{1}".format(
515 on_complete
=lambda _
:self
.rook_cluster
.add_osds(drive_group
, matching_hosts
),
516 calc_percent
=lambda: has_osds(matching_hosts
)
520 def has_osds(matching_hosts
):
522 # Find OSD pods on this host
524 pods
= self
.k8s
.list_namespaced_pod(self
._rook
_env
.namespace
,
525 label_selector
="rook_cluster={},app=rook-ceph-osd".format(self
._rook
_env
.cluster_name
),
526 field_selector
="spec.nodeName={0}".format(
530 pod_osd_ids
.add(int(p
.metadata
.labels
['ceph-osd-id']))
532 self
.log
.debug('pod_osd_ids={0}'.format(pod_osd_ids
))
535 osdmap
= self
.get("osd_map")
536 for osd
in osdmap
['osds']:
538 if osd_id
not in pod_osd_ids
:
541 metadata
= self
.get_metadata('osd', "%s" % osd_id
)
542 if metadata
and metadata
['devices'] in targets
:
545 self
.log
.info("ignoring osd {0} {1}".format(
546 osd_id
, metadata
['devices']
549 return found
is not None
551 c
= self
.get_hosts().then(execute
)
554 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> RookCompletion
:
555 return write_completion(
556 on_complete
=lambda: self
.rook_cluster
.blink_light(
557 ident_fault
, on
, locs
),
558 message
="Switching <{}> identification light in {}".format(
559 on
, ",".join(["{}:{}".format(loc
.host
, loc
.dev
) for loc
in locs
])),