]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/rook_cluster.py
2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
7 This module is runnable outside of ceph-mgr, useful for testing.
11 from contextlib
import contextmanager
13 from six
.moves
.urllib
.parse
import urljoin
# pylint: disable=import-error
15 # Optional kubernetes imports to enable MgrModule.can_run
18 from kubernetes
.client
.rest
import ApiException
24 from rook
.module
import RookEnv
25 from typing
import List
27 pass # just used for type checking.
30 log
= logging
.getLogger(__name__
)
33 class ApplyException(Exception):
35 For failures to update the Rook CRDs, usually indicating
36 some kind of interference between our attempted update
37 and other conflicting activity.
41 class RookCluster(object):
42 def __init__(self
, k8s
, rook_env
):
43 self
.rook_env
= rook_env
# type: RookEnv
46 def rook_url(self
, path
):
47 prefix
= "/apis/ceph.rook.io/%s/namespaces/%s/" % (
48 self
.rook_env
.crd_version
, self
.rook_env
.namespace
)
49 return urljoin(prefix
, path
)
51 def rook_api_call(self
, verb
, path
, **kwargs
):
52 full_path
= self
.rook_url(path
)
53 log
.debug("[%s] %s" % (verb
, full_path
))
55 return self
.k8s
.api_client
.call_api(
58 auth_settings
=['BearerToken'],
59 response_type
="object",
60 _return_http_data_only
=True,
61 _preload_content
=True,
64 def rook_api_get(self
, path
, **kwargs
):
65 return self
.rook_api_call("GET", path
, **kwargs
)
67 def rook_api_delete(self
, path
):
68 return self
.rook_api_call("DELETE", path
)
70 def rook_api_patch(self
, path
, **kwargs
):
71 return self
.rook_api_call("PATCH", path
,
72 header_params
={"Content-Type": "application/json-patch+json"},
75 def rook_api_post(self
, path
, **kwargs
):
76 return self
.rook_api_call("POST", path
, **kwargs
)
78 def get_discovered_devices(self
, nodenames
=None):
79 # TODO: replace direct k8s calls with Rook API calls
80 # when they're implemented
81 label_selector
= "app=rook-discover"
82 if nodenames
is not None:
83 # FIXME: is there a practical or official limit on the
84 # number of entries in a label selector
85 label_selector
+= ", rook.io/node in ({0})".format(
89 result
= self
.k8s
.list_namespaced_config_map(
90 self
.rook_env
.operator_namespace
,
91 label_selector
=label_selector
)
92 except ApiException
as e
:
93 log
.exception("Failed to fetch device metadata: {0}".format(e
))
96 nodename_to_devices
= {}
97 for i
in result
.items
:
98 drives
= json
.loads(i
.data
['devices'])
99 nodename_to_devices
[i
.metadata
.labels
['rook.io/node']] = drives
101 return nodename_to_devices
103 def get_nfs_conf_url(self
, nfs_cluster
, instance
):
105 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
106 # URL for the instance within that cluster. If the fetch fails, just
110 ceph_nfs
= self
.rook_api_get("cephnfses/{0}".format(nfs_cluster
))
111 except ApiException
as e
:
112 log
.info("Unable to fetch cephnfs object: {}".format(e
.status
))
115 pool
= ceph_nfs
['spec']['rados']['pool']
116 namespace
= ceph_nfs
['spec']['rados'].get('namespace', None)
118 if namespace
== None:
119 url
= "rados://{0}/conf-{1}.{2}".format(pool
, nfs_cluster
, instance
)
121 url
= "rados://{0}/{1}/conf-{2}.{3}".format(pool
, namespace
, nfs_cluster
, instance
)
125 def describe_pods(self
, service_type
, service_id
, nodename
):
126 # Go query the k8s API about deployment, containers related to this
129 # Inspect the Rook YAML, to decide whether this filesystem
130 # is Ceph-managed or Rook-managed
131 # TODO: extend Orchestrator interface to describe whether FS
132 # is manageable by us or not
134 # Example Rook Pod labels for a mgr daemon:
135 # Labels: app=rook-ceph-mgr
136 # pod-template-hash=2171958073
138 # And MDS containers additionally have `rook_filesystem` label
140 # Label filter is rook_cluster=<cluster name>
141 # rook_file_system=<self.fs_name>
143 label_filter
= "rook_cluster={0}".format(self
.rook_env
.cluster_name
)
144 if service_type
!= None:
145 label_filter
+= ",app=rook-ceph-{0}".format(service_type
)
146 if service_id
!= None:
147 if service_type
== "mds":
148 label_filter
+= ",rook_file_system={0}".format(service_id
)
149 elif service_type
== "osd":
150 # Label added in https://github.com/rook/rook/pull/1698
151 label_filter
+= ",ceph-osd-id={0}".format(service_id
)
152 elif service_type
== "mon":
153 # label like mon=rook-ceph-mon0
154 label_filter
+= ",mon={0}".format(service_id
)
155 elif service_type
== "mgr":
156 label_filter
+= ",mgr={0}".format(service_id
)
157 elif service_type
== "nfs":
158 label_filter
+= ",ceph_nfs={0}".format(service_id
)
159 elif service_type
== "rgw":
165 field_filter
= "spec.nodeName={0}".format(nodename
)
167 pods
= self
.k8s
.list_namespaced_pod(
168 self
.rook_env
.namespace
,
169 label_selector
=label_filter
,
170 field_selector
=field_filter
)
173 # print json.dumps(pods.items[0])
179 # p['metadata']['creationTimestamp']
180 # p['metadata']['nodeName']
181 pods_summary
.append({
182 "name": d
['metadata']['name'],
183 "nodename": d
['spec']['node_name'],
184 "labels": d
['metadata']['labels']
191 def ignore_409(self
, what
):
194 except ApiException
as e
:
196 # Idempotent, succeed.
197 log
.info("{} already exists".format(what
))
201 def add_filesystem(self
, spec
):
202 # TODO use spec.placement
203 # TODO warn if spec.extended has entries we don't kow how
207 "apiVersion": self
.rook_env
.api_name
,
208 "kind": "CephFilesystem",
211 "namespace": self
.rook_env
.namespace
214 "onlyManageDaemons": True,
216 "activeCount": spec
.count
,
217 "activeStandby": True
223 with self
.ignore_409("CephFilesystem '{0}' already exists".format(spec
.name
)):
224 self
.rook_api_post("cephfilesystems/", body
=rook_fs
)
226 def add_nfsgw(self
, spec
):
227 # TODO use spec.placement
228 # TODO warn if spec.extended has entries we don't kow how
232 "apiVersion": self
.rook_env
.api_name
,
236 "namespace": self
.rook_env
.namespace
240 "pool": spec
.extended
["pool"]
243 "active": spec
.count
,
248 if "namespace" in spec
.extended
:
249 rook_nfsgw
["spec"]["rados"]["namespace"] = spec
.extended
["namespace"]
251 with self
.ignore_409("NFS cluster '{0}' already exists".format(spec
.name
)):
252 self
.rook_api_post("cephnfses/", body
=rook_nfsgw
)
254 def add_objectstore(self
, spec
):
256 "apiVersion": self
.rook_env
.api_name
,
257 "kind": "CephObjectStore",
260 "namespace": self
.rook_env
.namespace
264 "failureDomain": "host",
270 "failureDomain": "osd",
284 with self
.ignore_409("CephObjectStore '{0}' already exists".format(spec
.name
)):
285 self
.rook_api_post("cephobjectstores/", body
=rook_os
)
287 def rm_service(self
, service_type
, service_id
):
288 assert service_type
in ("mds", "rgw", "nfs")
290 if service_type
== "mds":
291 rooktype
= "cephfilesystems"
292 elif service_type
== "rgw":
293 rooktype
= "cephobjectstores"
294 elif service_type
== "nfs":
295 rooktype
= "cephnfses"
297 objpath
= "{0}/{1}".format(rooktype
, service_id
)
300 self
.rook_api_delete(objpath
)
301 except ApiException
as e
:
303 log
.info("{0} service '{1}' does not exist".format(service_type
, service_id
))
304 # Idempotent, succeed.
308 def can_create_osd(self
):
309 current_cluster
= self
.rook_api_get(
310 "cephclusters/{0}".format(self
.rook_env
.cluster_name
))
311 use_all_nodes
= current_cluster
['spec'].get('useAllNodes', False)
313 # If useAllNodes is set, then Rook will not be paying attention
314 # to anything we put in 'nodes', so can't do OSD creation.
315 return not use_all_nodes
317 def node_exists(self
, node_name
):
319 self
.k8s
.read_node(node_name
, exact
=False, export
=True)
320 except ApiException
as e
:
328 def update_mon_count(self
, newcount
):
329 patch
= [{"op": "replace", "path": "/spec/mon/count", "value": newcount
}]
333 "cephclusters/{0}".format(self
.rook_env
.cluster_name
),
335 except ApiException
as e
:
336 log
.exception("API exception: {0}".format(e
))
337 raise ApplyException(
338 "Failed to update mon count in Cluster CRD: {0}".format(e
))
340 return "Updated mon count to {0}".format(newcount
)
342 def update_nfs_count(self
, svc_id
, newcount
):
343 patch
= [{"op": "replace", "path": "/spec/server/active", "value": newcount
}]
347 "cephnfses/{0}".format(svc_id
),
349 except ApiException
as e
:
350 log
.exception("API exception: {0}".format(e
))
351 raise ApplyException(
352 "Failed to update NFS server count for {0}: {1}".format(svc_id
, e
))
353 return "Updated NFS server count for {0} to {1}".format(svc_id
, newcount
)
355 def add_osds(self
, drive_group
, all_hosts
):
356 # type: (orchestrator.DriveGroupSpec, List[str]) -> str
358 Rook currently (0.8) can only do single-drive OSDs, so we
359 treat all drive groups as just a list of individual OSDs.
361 block_devices
= drive_group
.data_devices
.paths
if drive_group
.data_devices
else None
362 directories
= drive_group
.data_directories
364 assert drive_group
.objectstore
in ("bluestore", "filestore")
366 # The CRD looks something like this:
368 # - name: "gravel1.rockery"
372 # storeType: bluestore
374 current_cluster
= self
.rook_api_get(
375 "cephclusters/{0}".format(self
.rook_env
.cluster_name
))
379 # FIXME: this is all not really atomic, because jsonpatch doesn't
380 # let us do "test" operations that would check if items with
381 # matching names were in existing lists.
383 if 'nodes' not in current_cluster
['spec']['storage']:
385 'op': 'add', 'path': '/spec/storage/nodes', 'value': []
388 current_nodes
= current_cluster
['spec']['storage'].get('nodes', [])
390 if drive_group
.hosts(all_hosts
)[0] not in [n
['name'] for n
in current_nodes
]:
391 pd
= { "name": drive_group
.hosts(all_hosts
)[0],
392 "config": { "storeType": drive_group
.objectstore
}}
395 pd
["devices"] = [{'name': d
} for d
in block_devices
]
397 pd
["directories"] = [{'path': p
} for p
in directories
]
399 patch
.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd
})
401 # Extend existing node
404 for i
, c
in enumerate(current_nodes
):
405 if c
['name'] == drive_group
.hosts(all_hosts
)[0]:
410 assert node_idx
is not None
411 assert current_node
is not None
413 new_devices
= list(set(block_devices
) - set([d
['name'] for d
in current_node
['devices']]))
414 for n
in new_devices
:
417 "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx
),
421 new_dirs
= list(set(directories
) - set(current_node
['directories']))
425 "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx
),
434 "cephclusters/{0}".format(self
.rook_env
.cluster_name
),
436 except ApiException
as e
:
437 log
.exception("API exception: {0}".format(e
))
438 raise ApplyException(
439 "Failed to create OSD entries in Cluster CRD: {0}".format(