]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/module.py
6 from typing
import List
8 pass # just for type checking
11 from kubernetes
import client
, config
12 from kubernetes
.client
.rest
import ApiException
14 kubernetes_imported
= True
16 kubernetes_imported
= False
20 from mgr_module
import MgrModule
23 from .rook_cluster
import RookCluster
29 class RookReadCompletion(orchestrator
.ReadCompletion
):
31 All reads are simply API calls: avoid spawning
32 huge numbers of threads by just running them
33 inline when someone calls wait()
36 def __init__(self
, cb
):
37 super(RookReadCompletion
, self
).__init
__()
40 self
._complete
= False
42 self
.message
= "<read op>"
45 global all_completions
46 all_completions
.append(self
)
53 def is_complete(self
):
57 self
._result
= self
.cb()
61 class RookWriteCompletion(orchestrator
.WriteCompletion
):
63 Writes are a two-phase thing, firstly sending
64 the write to the k8s API (fast) and then waiting
65 for the corresponding change to appear in the
68 # XXX kubernetes bindings call_api already usefully has
69 # a completion= param that uses threads. Maybe just
71 def __init__(self
, execute_cb
, complete_cb
, message
):
72 super(RookWriteCompletion
, self
).__init
__()
73 self
.execute_cb
= execute_cb
74 self
.complete_cb
= complete_cb
76 # Executed means I executed my k8s API call, it may or may
80 # Result of k8s API call, this is set if executed==True
83 self
.effective
= False
85 self
.id = str(uuid
.uuid4())
87 self
.message
= message
92 global all_completions
93 all_completions
.append(self
)
103 def is_persistent(self
):
104 return (not self
.is_errored
) and self
.executed
107 def is_effective(self
):
108 return self
.effective
111 def is_errored(self
):
112 return self
.error
is not None
115 if not self
.executed
:
116 self
._result
= self
.execute_cb()
119 if not self
.effective
:
120 # TODO: check self.result for API errors
121 if self
.complete_cb
is None:
122 self
.effective
= True
124 self
.effective
= self
.complete_cb()
127 def deferred_read(f
):
129 Decorator to make RookOrchestrator methods return
130 a completion object that executes themselves.
134 def wrapper(*args
, **kwargs
):
135 return RookReadCompletion(lambda: f(*args
, **kwargs
))
140 class RookEnv(object):
142 # POD_NAMESPACE already exist for Rook 0.9
143 self
.namespace
= os
.environ
.get('POD_NAMESPACE', 'rook-ceph')
145 # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
146 self
.cluster_name
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_NAME', self
.namespace
)
148 self
.operator_namespace
= os
.environ
.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
149 self
.crd_version
= os
.environ
.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
150 self
.api_name
= "ceph.rook.io/" + self
.crd_version
152 def api_version_match(self
):
153 return self
.crd_version
== 'v1'
155 def has_namespace(self
):
156 return 'POD_NAMESPACE' in os
.environ
159 class RookOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
161 # TODO: configure k8s API addr instead of assuming local
164 def wait(self
, completions
):
165 self
.log
.info("wait: completions={0}".format(completions
))
169 # Our `wait` implementation is very simple because everything's
171 for c
in completions
:
172 if not isinstance(c
, RookReadCompletion
) and \
173 not isinstance(c
, RookWriteCompletion
):
175 "wait() requires list of completions, not {0}".format(
184 except Exception as e
:
185 if not isinstance(e
, orchestrator
.OrchestratorError
):
186 self
.log
.exception("Completion {0} threw an exception:".format(
192 if not c
.is_complete
:
195 return not incomplete
199 if not kubernetes_imported
:
200 return False, "`kubernetes` python module not found"
201 if not RookEnv().api_version_match():
202 return False, "Rook version unsupported."
206 if not kubernetes_imported
:
207 return False, "`kubernetes` python module not found"
208 elif not self
._rook
_env
.has_namespace():
209 return False, "ceph-mgr not running in Rook cluster"
212 self
.k8s
.list_namespaced_pod(self
._rook
_env
.cluster_name
)
213 except ApiException
as e
:
214 return False, "Cannot reach Kubernetes API: {}".format(e
)
218 def __init__(self
, *args
, **kwargs
):
219 super(RookOrchestrator
, self
).__init
__(*args
, **kwargs
)
221 self
._initialized
= threading
.Event()
223 self
._rook
_cluster
= None
224 self
._rook
_env
= RookEnv()
226 self
._shutdown
= threading
.Event()
233 self
._initialized
.wait()
237 def rook_cluster(self
):
238 # type: () -> RookCluster
239 self
._initialized
.wait()
240 return self
._rook
_cluster
243 # For deployed clusters, we should always be running inside
244 # a Rook cluster. For development convenience, also support
245 # running outside (reading ~/.kube config)
247 if self
._rook
_env
.cluster_name
:
248 config
.load_incluster_config()
249 cluster_name
= self
._rook
_env
.cluster_name
251 self
.log
.warning("DEVELOPMENT ONLY: Reading kube config from ~")
252 config
.load_kube_config()
254 cluster_name
= "rook-ceph"
256 # So that I can do port forwarding from my workstation - jcsp
257 from kubernetes
.client
import configuration
258 configuration
.verify_ssl
= False
260 self
._k
8s
= client
.CoreV1Api()
263 # XXX mystery hack -- I need to do an API call from
264 # this context, or subsequent API usage from handle_command
265 # fails with SSLError('bad handshake'). Suspect some kind of
266 # thread context setup in SSL lib?
267 self
._k
8s
.list_namespaced_pod(cluster_name
)
269 # Ignore here to make self.available() fail with a proper error message
272 self
._rook
_cluster
= RookCluster(
276 self
._initialized
.set()
278 while not self
._shutdown
.is_set():
279 # XXX hack (or is it?) to kick all completions periodically,
280 # in case we had a caller that wait()'ed on them long enough
281 # to get persistence but not long enough to get completion
283 global all_completions
284 self
.wait(all_completions
)
285 all_completions
= [c
for c
in all_completions
if not c
.is_complete
]
287 self
._shutdown
.wait(5)
289 # TODO: watch Rook for config changes to complain/update if
290 # things look a bit out of sync?
293 def get_inventory(self
, node_filter
=None, refresh
=False):
295 if node_filter
and node_filter
.nodes
:
297 node_list
= node_filter
.nodes
298 elif node_filter
and node_filter
.labels
:
299 # TODO: query k8s API to resolve to node list, and pass
300 # it into RookCluster.get_discovered_devices
301 raise NotImplementedError()
303 devs
= self
.rook_cluster
.get_discovered_devices(node_list
)
306 for node_name
, node_devs
in devs
.items():
309 dev
= orchestrator
.InventoryDevice()
311 # XXX CAUTION! https://github.com/rook/rook/issues/1716
312 # Passing this through for the sake of completeness but it
313 # is not trustworthy!
314 dev
.blank
= d
['empty']
315 dev
.type = 'hdd' if d
['rotational'] else 'ssd'
319 if d
['filesystem'] == "" and not d
['rotational']:
320 # Empty or partitioned SSD
321 partitioned_space
= sum(
322 [p
['size'] for p
in d
['Partitions']])
323 dev
.metadata_space_free
= max(0, d
[
324 'size'] - partitioned_space
)
328 result
.append(orchestrator
.InventoryNode(node_name
, devs
))
333 def describe_service(self
, service_type
=None, service_id
=None, node_name
=None):
335 if service_type
not in ("mds", "osd", "mgr", "mon", "nfs", None):
336 raise orchestrator
.OrchestratorValidationError(service_type
+ " unsupported")
338 pods
= self
.rook_cluster
.describe_pods(service_type
, service_id
, node_name
)
342 sd
= orchestrator
.ServiceDescription()
343 sd
.nodename
= p
['nodename']
344 sd
.container_id
= p
['name']
345 sd
.service_type
= p
['labels']['app'].replace('rook-ceph-', '')
347 if sd
.service_type
== "osd":
348 sd
.service_instance
= "%s" % p
['labels']["ceph-osd-id"]
349 elif sd
.service_type
== "mds":
350 sd
.service
= p
['labels']['rook_file_system']
351 pfx
= "{0}-".format(sd
.service
)
352 sd
.service_instance
= p
['labels']['ceph_daemon_id'].replace(pfx
, '', 1)
353 elif sd
.service_type
== "mon":
354 sd
.service_instance
= p
['labels']["mon"]
355 elif sd
.service_type
== "mgr":
356 sd
.service_instance
= p
['labels']["mgr"]
357 elif sd
.service_type
== "nfs":
358 sd
.service
= p
['labels']['ceph_nfs']
359 sd
.service_instance
= p
['labels']['instance']
360 sd
.rados_config_location
= self
.rook_cluster
.get_nfs_conf_url(sd
.service
, sd
.service_instance
)
361 elif sd
.service_type
== "rgw":
362 sd
.service
= p
['labels']['rgw']
363 sd
.service_instance
= p
['labels']['ceph_daemon_id']
365 # Unknown type -- skip it
372 def _service_add_decorate(self
, typename
, spec
, func
):
373 return RookWriteCompletion(lambda: func(spec
), None,
374 "Creating {0} services for {1}".format(typename
, spec
.name
))
376 def add_stateless_service(self
, service_type
, spec
):
377 # assert isinstance(spec, orchestrator.StatelessServiceSpec)
378 if service_type
== "mds":
379 return self
._service
_add
_decorate
("Filesystem", spec
,
380 self
.rook_cluster
.add_filesystem
)
381 elif service_type
== "rgw" :
382 return self
._service
_add
_decorate
("RGW", spec
,
383 self
.rook_cluster
.add_objectstore
)
384 elif service_type
== "nfs" :
385 return self
._service
_add
_decorate
("NFS", spec
,
386 self
.rook_cluster
.add_nfsgw
)
388 raise NotImplementedError(service_type
)
390 def remove_stateless_service(self
, service_type
, service_id
):
391 return RookWriteCompletion(
392 lambda: self
.rook_cluster
.rm_service(service_type
, service_id
), None,
393 "Removing {0} services for {1}".format(service_type
, service_id
))
395 def update_mons(self
, num
, hosts
):
397 raise RuntimeError("Host list is not supported by rook.")
399 return RookWriteCompletion(
400 lambda: self
.rook_cluster
.update_mon_count(num
), None,
401 "Updating mon count to {0}".format(num
))
403 def update_stateless_service(self
, svc_type
, spec
):
404 # only nfs is currently supported
405 if svc_type
!= "nfs":
406 raise NotImplementedError(svc_type
)
409 return RookWriteCompletion(
410 lambda: self
.rook_cluster
.update_nfs_count(spec
.name
, num
), None,
411 "Updating NFS server count in {0} to {1}".format(spec
.name
, num
))
413 def create_osds(self
, drive_group
, all_hosts
):
414 # type: (orchestrator.DriveGroupSpec, List[str]) -> RookWriteCompletion
416 assert len(drive_group
.hosts(all_hosts
)) == 1
418 if drive_group
.data_devices
:
419 targets
+= drive_group
.data_devices
.paths
420 if drive_group
.data_directories
:
421 targets
+= drive_group
.data_directories
423 if not self
.rook_cluster
.node_exists(drive_group
.hosts(all_hosts
)[0]):
424 raise RuntimeError("Node '{0}' is not in the Kubernetes "
425 "cluster".format(drive_group
.hosts(all_hosts
)))
427 # Validate whether cluster CRD can accept individual OSD
428 # creations (i.e. not useAllDevices)
429 if not self
.rook_cluster
.can_create_osd():
430 raise RuntimeError("Rook cluster configuration does not "
431 "support OSD creation.")
434 return self
.rook_cluster
.add_osds(drive_group
, all_hosts
)
437 # Find OSD pods on this host
439 pods
= self
._k
8s
.list_namespaced_pod(self
._rook
_env
.namespace
,
440 label_selector
="rook_cluster={},app=rook-ceph-osd".format(self
._rook
_env
.cluster_name
),
441 field_selector
="spec.nodeName={0}".format(
442 drive_group
.hosts(all_hosts
)[0]
445 pod_osd_ids
.add(int(p
.metadata
.labels
['ceph-osd-id']))
447 self
.log
.debug('pod_osd_ids={0}'.format(pod_osd_ids
))
450 osdmap
= self
.get("osd_map")
451 for osd
in osdmap
['osds']:
453 if osd_id
not in pod_osd_ids
:
456 metadata
= self
.get_metadata('osd', "%s" % osd_id
)
457 if metadata
and metadata
['devices'] in targets
:
460 self
.log
.info("ignoring osd {0} {1}".format(
461 osd_id
, metadata
['devices']
464 return found
is not None
466 return RookWriteCompletion(execute
, is_complete
,
467 "Creating OSD on {0}:{1}".format(
468 drive_group
.hosts(all_hosts
)[0], targets