]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
7 from ceph
.deployment
import inventory
8 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, PlacementSpec
11 from typing
import List
, Dict
, Optional
, Callable
, Any
12 from ceph
.deployment
.drive_group
import DriveGroupSpec
14 pass # just for type checking
17 from kubernetes
import client
, config
18 from kubernetes
.client
.rest
import ApiException
20 kubernetes_imported
= True
22 # https://github.com/kubernetes-client/python/issues/895
23 from kubernetes
.client
.models
.v1_container_image
import V1ContainerImage
24 def names(self
, names
):
26 V1ContainerImage
.names
= V1ContainerImage
.names
.setter(names
)
29 kubernetes_imported
= False
33 from mgr_module
import MgrModule
36 from .rook_cluster
import RookCluster
39 class RookCompletion(orchestrator
.Completion
):
45 # type: (Callable) -> Callable[..., RookCompletion]
47 Decorator to make RookOrchestrator methods return
48 a completion object that executes themselves.
52 def wrapper(*args
, **kwargs
):
53 return RookCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
58 def write_completion(on_complete
, # type: Callable
61 calc_percent
=None # type: Optional[Callable[[], RookCompletion]]
63 # type: (...) -> RookCompletion
64 return RookCompletion
.with_progress(
67 on_complete
=lambda _
: on_complete(),
68 calc_percent
=calc_percent
,
72 class RookEnv(object):
74 # POD_NAMESPACE already exist for Rook 0.9
75 self
.namespace
= os
.environ
.get('POD_NAMESPACE', 'rook-ceph')
77 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
78 self
.cluster_name
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_NAME', self
.namespace
)
80 self
.operator_namespace
= os
.environ
.get('ROOK_OPERATOR_NAMESPACE', self
.namespace
)
81 self
.crd_version
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
82 self
.api_name
= "ceph.rook.io/" + self
.crd_version
84 def api_version_match(self
):
85 return self
.crd_version
== 'v1'
87 def has_namespace(self
):
88 return 'POD_NAMESPACE' in os
.environ
91 class RookOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
93 Writes are a two-phase thing, firstly sending
94 the write to the k8s API (fast) and then waiting
95 for the corresponding change to appear in the
98 Right now, we are calling the k8s API synchronously.
102 # TODO: configure k8s API addr instead of assuming local
103 ] # type: List[Dict[str, Any]]
105 def process(self
, completions
):
106 # type: (List[RookCompletion]) -> None
109 self
.log
.info("process: completions={0}".format(orchestrator
.pretty_print(completions
)))
111 for p
in completions
:
116 if not kubernetes_imported
:
117 return False, "`kubernetes` python module not found"
118 if not RookEnv().api_version_match():
119 return False, "Rook version unsupported."
123 if not kubernetes_imported
:
124 return False, "`kubernetes` python module not found"
125 elif not self
._rook
_env
.has_namespace():
126 return False, "ceph-mgr not running in Rook cluster"
129 self
.k8s
.list_namespaced_pod(self
._rook
_env
.namespace
)
130 except ApiException
as e
:
131 return False, "Cannot reach Kubernetes API: {}".format(e
)
135 def __init__(self
, *args
, **kwargs
):
136 super(RookOrchestrator
, self
).__init
__(*args
, **kwargs
)
138 self
._initialized
= threading
.Event()
139 self
._k
8s
_CoreV
1_api
= None
140 self
._k
8s
_BatchV
1_api
= None
141 self
._rook
_cluster
= None
142 self
._rook
_env
= RookEnv()
144 self
._shutdown
= threading
.Event()
146 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
153 # type: () -> client.CoreV1Api
154 self
._initialized
.wait()
155 assert self
._k
8s
_CoreV
1_api
is not None
156 return self
._k
8s
_CoreV
1_api
159 def rook_cluster(self
):
160 # type: () -> RookCluster
161 self
._initialized
.wait()
162 assert self
._rook
_cluster
is not None
163 return self
._rook
_cluster
166 # For deployed clusters, we should always be running inside
167 # a Rook cluster. For development convenience, also support
168 # running outside (reading ~/.kube config)
170 if self
._rook
_env
.has_namespace():
171 config
.load_incluster_config()
173 self
.log
.warning("DEVELOPMENT ONLY: Reading kube config from ~")
174 config
.load_kube_config()
176 # So that I can do port forwarding from my workstation - jcsp
177 from kubernetes
.client
import configuration
178 configuration
.verify_ssl
= False
180 self
._k
8s
_CoreV
1_api
= client
.CoreV1Api()
181 self
._k
8s
_BatchV
1_api
= client
.BatchV1Api()
184 # XXX mystery hack -- I need to do an API call from
185 # this context, or subsequent API usage from handle_command
186 # fails with SSLError('bad handshake'). Suspect some kind of
187 # thread context setup in SSL lib?
188 self
._k
8s
_CoreV
1_api
.list_namespaced_pod(self
._rook
_env
.namespace
)
190 # Ignore here to make self.available() fail with a proper error message
193 self
._rook
_cluster
= RookCluster(
194 self
._k
8s
_CoreV
1_api
,
195 self
._k
8s
_BatchV
1_api
,
198 self
._initialized
.set()
200 while not self
._shutdown
.is_set():
201 # XXX hack (or is it?) to kick all completions periodically,
202 # in case we had a caller that wait()'ed on them long enough
203 # to get persistence but not long enough to get completion
205 self
.all_progress_references
= [p
for p
in self
.all_progress_references
if not p
.effective
]
206 for p
in self
.all_progress_references
:
209 self
._shutdown
.wait(5)
211 def cancel_completions(self
):
212 for p
in self
.all_progress_references
:
214 self
.all_progress_references
.clear()
217 def get_inventory(self
, host_filter
=None, refresh
=False):
219 if host_filter
and host_filter
.hosts
:
221 host_list
= host_filter
.hosts
222 elif host_filter
and host_filter
.labels
:
223 # TODO: query k8s API to resolve to host list, and pass
224 # it into RookCluster.get_discovered_devices
225 raise NotImplementedError()
227 devs
= self
.rook_cluster
.get_discovered_devices(host_list
)
230 for host_name
, host_devs
in devs
.items():
233 if 'cephVolumeData' in d
and d
['cephVolumeData']:
234 devs
.append(inventory
.Device
.from_json(json
.loads(d
['cephVolumeData'])))
236 devs
.append(inventory
.Device(
237 path
= '/dev/' + d
['name'],
239 rotational
= '1' if d
['rotational'] else '0',
243 rejected_reasons
=['device data coming from ceph-volume not provided'],
246 result
.append(orchestrator
.InventoryHost(host_name
, inventory
.Devices(devs
)))
252 # type: () -> List[orchestrator.HostSpec]
253 return [orchestrator
.HostSpec(n
) for n
in self
.rook_cluster
.get_node_names()]
256 def describe_service(self
, service_type
=None, service_name
=None,
258 now
= datetime
.datetime
.utcnow()
261 cl
= self
.rook_cluster
.rook_api_get(
262 "cephclusters/{0}".format(self
.rook_cluster
.rook_env
.cluster_name
))
263 self
.log
.debug('CephCluster %s' % cl
)
264 image_name
= cl
['spec'].get('cephVersion', {}).get('image', None)
265 num_nodes
= len(self
.rook_cluster
.get_node_names())
268 spec
['mon'] = orchestrator
.ServiceDescription(
271 placement
=PlacementSpec(
272 count
=cl
['spec'].get('mon', {}).get('count', 1),
275 size
=cl
['spec'].get('mon', {}).get('count', 1),
276 container_image_name
=image_name
,
279 spec
['mgr'] = orchestrator
.ServiceDescription(
282 placement
=PlacementSpec
.from_string('count:1'),
285 container_image_name
=image_name
,
288 if not cl
['spec'].get('crashCollector', {}).get('disable', False):
289 spec
['crash'] = orchestrator
.ServiceDescription(
292 placement
=PlacementSpec
.from_string('*'),
295 container_image_name
=image_name
,
300 all_fs
= self
.rook_cluster
.rook_api_get(
302 self
.log
.debug('CephFilesystems %s' % all_fs
)
303 for fs
in all_fs
.get('items', []):
304 svc
= 'mds.' + fs
['metadata']['name']
307 # FIXME: we are conflating active (+ standby) with count
308 active
= fs
['spec'].get('metadataServer', {}).get('activeCount', 1)
310 if fs
['spec'].get('metadataServer', {}).get('activeStandby', False):
311 total_mds
= active
* 2
312 spec
[svc
] = orchestrator
.ServiceDescription(
315 service_id
=fs
['metadata']['name'],
316 placement
=PlacementSpec(count
=active
),
319 container_image_name
=image_name
,
324 all_zones
= self
.rook_cluster
.rook_api_get(
326 self
.log
.debug('CephObjectstores %s' % all_zones
)
327 for zone
in all_zones
.get('items', []):
328 rgw_realm
= zone
['metadata']['name']
330 svc
= 'rgw.' + rgw_realm
+ '.' + rgw_zone
333 active
= zone
['spec']['gateway']['instances'];
334 if 'securePort' in zone
['spec']['gateway']:
336 port
= zone
['spec']['gateway']['securePort']
339 port
= zone
['spec']['gateway']['port'] or 80
340 spec
[svc
] = orchestrator
.ServiceDescription(
342 service_id
=rgw_realm
+ '.' + rgw_zone
,
346 rgw_frontend_port
=port
,
347 placement
=PlacementSpec(count
=active
),
350 container_image_name
=image_name
,
354 for dd
in self
._list
_daemons
():
355 if dd
.service_name() not in spec
:
357 spec
[dd
.service_name()].running
+= 1
358 return [v
for k
, v
in spec
.items()]
361 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None,
363 return self
._list
_daemons
(service_name
=service_name
,
364 daemon_type
=daemon_type
,
369 def _list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None,
371 pods
= self
.rook_cluster
.describe_pods(daemon_type
, daemon_id
, host
)
372 self
.log
.debug('pods %s' % pods
)
375 sd
= orchestrator
.DaemonDescription()
376 sd
.hostname
= p
['hostname']
377 sd
.container_id
= p
['name']
378 sd
.daemon_type
= p
['labels']['app'].replace('rook-ceph-', '')
387 sd
.status_desc
= p
['phase']
389 if 'ceph_daemon_id' in p
['labels']:
390 sd
.daemon_id
= p
['labels']['ceph_daemon_id']
391 elif 'ceph-osd-id' in p
['labels']:
392 sd
.daemon_id
= p
['labels']['ceph-osd-id']
394 # Unknown type -- skip it
397 if service_name
is not None and service_name
!= sd
.service_name():
399 sd
.container_image_name
= p
['container_image_name']
400 sd
.created
= p
['created']
401 sd
.last_configured
= p
['created']
402 sd
.last_deployed
= p
['created']
403 sd
.started
= p
['started']
404 sd
.last_refresh
= p
['refreshed']
409 def _service_add_decorate(self
, typename
, spec
, func
):
410 return write_completion(
411 on_complete
=lambda : func(spec
),
412 message
="Creating {} services for {}".format(typename
, spec
.service_id
),
416 def add_nfs(self
, spec
):
417 # type: (NFSServiceSpec) -> RookCompletion
418 return self
._service
_add
_decorate
("NFS", spec
,
419 self
.rook_cluster
.add_nfsgw
)
421 def _service_rm_decorate(self
, typename
, name
, func
):
422 return write_completion(
423 on_complete
=lambda : func(name
),
424 message
="Removing {} services for {}".format(typename
, name
),
428 def remove_service(self
, service_type
, service_name
):
429 if service_type
== 'mds':
430 return self
._service
_rm
_decorate
(
431 'MDS', service_name
, lambda: self
.rook_cluster
.rm_service(
432 'cephfilesystems', service_name
)
434 elif service_type
== 'rgw':
435 return self
._service
_rm
_decorate
(
436 'RGW', service_name
, lambda: self
.rook_cluster
.rm_service('cephobjectstores', service_name
)
438 elif service_type
== 'nfs':
439 return self
._service
_rm
_decorate
(
440 'NFS', service_name
, lambda: self
.rook_cluster
.rm_service('cephnfses', service_name
)
443 def apply_mon(self
, spec
):
444 # type: (ServiceSpec) -> RookCompletion
445 if spec
.placement
.hosts
or spec
.placement
.label
:
446 raise RuntimeError("Host list or label is not supported by rook.")
448 return write_completion(
449 lambda: self
.rook_cluster
.update_mon_count(spec
.placement
.count
),
450 "Updating mon count to {0}".format(spec
.placement
.count
),
454 def apply_mds(self
, spec
):
455 # type: (ServiceSpec) -> RookCompletion
456 return self
._service
_add
_decorate
('MDS', spec
,
457 self
.rook_cluster
.apply_filesystem
)
459 def apply_rgw(self
, spec
):
460 # type: (RGWSpec) -> RookCompletion
461 return self
._service
_add
_decorate
('RGW', spec
,
462 self
.rook_cluster
.apply_objectstore
)
464 def apply_nfs(self
, spec
):
465 # type: (NFSServiceSpec) -> RookCompletion
466 num
= spec
.placement
.count
467 return write_completion(
468 lambda: self
.rook_cluster
.update_nfs_count(spec
.service_id
, num
),
469 "Updating NFS server count in {0} to {1}".format(spec
.service_id
, num
),
473 def remove_daemons(self
, names
):
474 return write_completion(
475 lambda: self
.rook_cluster
.remove_pods(names
),
476 "Removing daemons {}".format(','.join(names
)),
480 def create_osds(self
, drive_group
):
481 # type: (DriveGroupSpec) -> RookCompletion
482 """ Creates OSDs from a drive group specification.
484 $: ceph orch osd create -i <dg.file>
486 The drivegroup file must only contain one spec at a time.
489 targets
= [] # type: List[str]
490 if drive_group
.data_devices
and drive_group
.data_devices
.paths
:
491 targets
+= [d
.path
for d
in drive_group
.data_devices
.paths
]
492 if drive_group
.data_directories
:
493 targets
+= drive_group
.data_directories
495 def execute(all_hosts_
):
496 # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
497 matching_hosts
= drive_group
.placement
.filter_matching_hosts(lambda label
=None, as_hostspec
=None: all_hosts_
)
499 assert len(matching_hosts
) == 1
501 if not self
.rook_cluster
.node_exists(matching_hosts
[0]):
502 raise RuntimeError("Node '{0}' is not in the Kubernetes "
503 "cluster".format(matching_hosts
))
505 # Validate whether cluster CRD can accept individual OSD
506 # creations (i.e. not useAllDevices)
507 if not self
.rook_cluster
.can_create_osd():
508 raise RuntimeError("Rook cluster configuration does not "
509 "support OSD creation.")
511 return orchestrator
.Completion
.with_progress(
512 message
="Creating OSD on {0}:{1}".format(
516 on_complete
=lambda _
:self
.rook_cluster
.add_osds(drive_group
, matching_hosts
),
517 calc_percent
=lambda: has_osds(matching_hosts
)
521 def has_osds(matching_hosts
):
523 # Find OSD pods on this host
525 pods
= self
.k8s
.list_namespaced_pod(self
._rook
_env
.namespace
,
526 label_selector
="rook_cluster={},app=rook-ceph-osd".format(self
._rook
_env
.cluster_name
),
527 field_selector
="spec.nodeName={0}".format(
531 pod_osd_ids
.add(int(p
.metadata
.labels
['ceph-osd-id']))
533 self
.log
.debug('pod_osd_ids={0}'.format(pod_osd_ids
))
536 osdmap
= self
.get("osd_map")
537 for osd
in osdmap
['osds']:
539 if osd_id
not in pod_osd_ids
:
542 metadata
= self
.get_metadata('osd', "%s" % osd_id
)
543 if metadata
and metadata
['devices'] in targets
:
546 self
.log
.info("ignoring osd {0} {1}".format(
547 osd_id
, metadata
['devices']
550 return found
is not None
552 c
= self
.get_hosts().then(execute
)
555 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> RookCompletion
:
556 return write_completion(
557 on_complete
=lambda: self
.rook_cluster
.blink_light(
558 ident_fault
, on
, locs
),
559 message
="Switching <{}> identification light in {}".format(
560 on
, ",".join(["{}:{}".format(loc
.host
, loc
.dev
) for loc
in locs
])),