]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rook/rook_cluster.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
1 """
2 This module wrap's Rook + Kubernetes APIs to expose the calls
3 needed to implement an orchestrator module. While the orchestrator
4 module exposes an async API, this module simply exposes blocking API
5 call methods.
6
7 This module is runnable outside of ceph-mgr, useful for testing.
8 """
9 import logging
10 import json
11 from contextlib import contextmanager
12
13 from six.moves.urllib.parse import urljoin # pylint: disable=import-error
14
15 # Optional kubernetes imports to enable MgrModule.can_run
16 # to behave cleanly.
17 try:
18 from kubernetes.client.rest import ApiException
19 except ImportError:
20 ApiException = None
21
22 try:
23 import orchestrator
24 from rook.module import RookEnv
25 from typing import List
26 except ImportError:
27 pass # just used for type checking.
28
29
30 log = logging.getLogger(__name__)
31
32
33 class ApplyException(Exception):
34 """
35 For failures to update the Rook CRDs, usually indicating
36 some kind of interference between our attempted update
37 and other conflicting activity.
38 """
39
40
41 class RookCluster(object):
42 def __init__(self, k8s, rook_env):
43 self.rook_env = rook_env # type: RookEnv
44 self.k8s = k8s
45
46 def rook_url(self, path):
47 prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
48 self.rook_env.crd_version, self.rook_env.namespace)
49 return urljoin(prefix, path)
50
51 def rook_api_call(self, verb, path, **kwargs):
52 full_path = self.rook_url(path)
53 log.debug("[%s] %s" % (verb, full_path))
54
55 return self.k8s.api_client.call_api(
56 full_path,
57 verb,
58 auth_settings=['BearerToken'],
59 response_type="object",
60 _return_http_data_only=True,
61 _preload_content=True,
62 **kwargs)
63
64 def rook_api_get(self, path, **kwargs):
65 return self.rook_api_call("GET", path, **kwargs)
66
67 def rook_api_delete(self, path):
68 return self.rook_api_call("DELETE", path)
69
70 def rook_api_patch(self, path, **kwargs):
71 return self.rook_api_call("PATCH", path,
72 header_params={"Content-Type": "application/json-patch+json"},
73 **kwargs)
74
75 def rook_api_post(self, path, **kwargs):
76 return self.rook_api_call("POST", path, **kwargs)
77
78 def get_discovered_devices(self, nodenames=None):
79 # TODO: replace direct k8s calls with Rook API calls
80 # when they're implemented
81 label_selector = "app=rook-discover"
82 if nodenames is not None:
83 # FIXME: is there a practical or official limit on the
84 # number of entries in a label selector
85 label_selector += ", rook.io/node in ({0})".format(
86 ", ".join(nodenames))
87
88 try:
89 result = self.k8s.list_namespaced_config_map(
90 self.rook_env.operator_namespace,
91 label_selector=label_selector)
92 except ApiException as e:
93 log.exception("Failed to fetch device metadata: {0}".format(e))
94 raise
95
96 nodename_to_devices = {}
97 for i in result.items:
98 drives = json.loads(i.data['devices'])
99 nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
100
101 return nodename_to_devices
102
103 def get_nfs_conf_url(self, nfs_cluster, instance):
104 #
105 # Fetch cephnfs object for "nfs_cluster" and then return a rados://
106 # URL for the instance within that cluster. If the fetch fails, just
107 # return None.
108 #
109 try:
110 ceph_nfs = self.rook_api_get("cephnfses/{0}".format(nfs_cluster))
111 except ApiException as e:
112 log.info("Unable to fetch cephnfs object: {}".format(e.status))
113 return None
114
115 pool = ceph_nfs['spec']['rados']['pool']
116 namespace = ceph_nfs['spec']['rados'].get('namespace', None)
117
118 if namespace == None:
119 url = "rados://{0}/conf-{1}.{2}".format(pool, nfs_cluster, instance)
120 else:
121 url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
122 return url
123
124
125 def describe_pods(self, service_type, service_id, nodename):
126 # Go query the k8s API about deployment, containers related to this
127 # filesystem
128
129 # Inspect the Rook YAML, to decide whether this filesystem
130 # is Ceph-managed or Rook-managed
131 # TODO: extend Orchestrator interface to describe whether FS
132 # is manageable by us or not
133
134 # Example Rook Pod labels for a mgr daemon:
135 # Labels: app=rook-ceph-mgr
136 # pod-template-hash=2171958073
137 # rook_cluster=rook
138 # And MDS containers additionally have `rook_filesystem` label
139
140 # Label filter is rook_cluster=<cluster name>
141 # rook_file_system=<self.fs_name>
142
143 label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name)
144 if service_type != None:
145 label_filter += ",app=rook-ceph-{0}".format(service_type)
146 if service_id != None:
147 if service_type == "mds":
148 label_filter += ",rook_file_system={0}".format(service_id)
149 elif service_type == "osd":
150 # Label added in https://github.com/rook/rook/pull/1698
151 label_filter += ",ceph-osd-id={0}".format(service_id)
152 elif service_type == "mon":
153 # label like mon=rook-ceph-mon0
154 label_filter += ",mon={0}".format(service_id)
155 elif service_type == "mgr":
156 label_filter += ",mgr={0}".format(service_id)
157 elif service_type == "nfs":
158 label_filter += ",ceph_nfs={0}".format(service_id)
159 elif service_type == "rgw":
160 # TODO: rgw
161 pass
162
163 field_filter = ""
164 if nodename != None:
165 field_filter = "spec.nodeName={0}".format(nodename)
166
167 pods = self.k8s.list_namespaced_pod(
168 self.rook_env.namespace,
169 label_selector=label_filter,
170 field_selector=field_filter)
171
172 # import json
173 # print json.dumps(pods.items[0])
174
175 pods_summary = []
176
177 for p in pods.items:
178 d = p.to_dict()
179 # p['metadata']['creationTimestamp']
180 # p['metadata']['nodeName']
181 pods_summary.append({
182 "name": d['metadata']['name'],
183 "nodename": d['spec']['node_name'],
184 "labels": d['metadata']['labels']
185 })
186 pass
187
188 return pods_summary
189
190 @contextmanager
191 def ignore_409(self, what):
192 try:
193 yield
194 except ApiException as e:
195 if e.status == 409:
196 # Idempotent, succeed.
197 log.info("{} already exists".format(what))
198 else:
199 raise
200
201 def add_filesystem(self, spec):
202 # TODO use spec.placement
203 # TODO warn if spec.extended has entries we don't kow how
204 # to action.
205
206 rook_fs = {
207 "apiVersion": self.rook_env.api_name,
208 "kind": "CephFilesystem",
209 "metadata": {
210 "name": spec.name,
211 "namespace": self.rook_env.namespace
212 },
213 "spec": {
214 "onlyManageDaemons": True,
215 "metadataServer": {
216 "activeCount": spec.count,
217 "activeStandby": True
218
219 }
220 }
221 }
222
223 with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)):
224 self.rook_api_post("cephfilesystems/", body=rook_fs)
225
226 def add_nfsgw(self, spec):
227 # TODO use spec.placement
228 # TODO warn if spec.extended has entries we don't kow how
229 # to action.
230
231 rook_nfsgw = {
232 "apiVersion": self.rook_env.api_name,
233 "kind": "CephNFS",
234 "metadata": {
235 "name": spec.name,
236 "namespace": self.rook_env.namespace
237 },
238 "spec": {
239 "rados": {
240 "pool": spec.extended["pool"]
241 },
242 "server": {
243 "active": spec.count,
244 }
245 }
246 }
247
248 if "namespace" in spec.extended:
249 rook_nfsgw["spec"]["rados"]["namespace"] = spec.extended["namespace"]
250
251 with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)):
252 self.rook_api_post("cephnfses/", body=rook_nfsgw)
253
254 def add_objectstore(self, spec):
255 rook_os = {
256 "apiVersion": self.rook_env.api_name,
257 "kind": "CephObjectStore",
258 "metadata": {
259 "name": spec.name,
260 "namespace": self.rook_env.namespace
261 },
262 "spec": {
263 "metadataPool": {
264 "failureDomain": "host",
265 "replicated": {
266 "size": 1
267 }
268 },
269 "dataPool": {
270 "failureDomain": "osd",
271 "replicated": {
272 "size": 1
273 }
274 },
275 "gateway": {
276 "type": "s3",
277 "port": 80,
278 "instances": 1,
279 "allNodes": False
280 }
281 }
282 }
283
284 with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)):
285 self.rook_api_post("cephobjectstores/", body=rook_os)
286
287 def rm_service(self, service_type, service_id):
288 assert service_type in ("mds", "rgw", "nfs")
289
290 if service_type == "mds":
291 rooktype = "cephfilesystems"
292 elif service_type == "rgw":
293 rooktype = "cephobjectstores"
294 elif service_type == "nfs":
295 rooktype = "cephnfses"
296
297 objpath = "{0}/{1}".format(rooktype, service_id)
298
299 try:
300 self.rook_api_delete(objpath)
301 except ApiException as e:
302 if e.status == 404:
303 log.info("{0} service '{1}' does not exist".format(service_type, service_id))
304 # Idempotent, succeed.
305 else:
306 raise
307
308 def can_create_osd(self):
309 current_cluster = self.rook_api_get(
310 "cephclusters/{0}".format(self.rook_env.cluster_name))
311 use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
312
313 # If useAllNodes is set, then Rook will not be paying attention
314 # to anything we put in 'nodes', so can't do OSD creation.
315 return not use_all_nodes
316
317 def node_exists(self, node_name):
318 try:
319 self.k8s.read_node(node_name, exact=False, export=True)
320 except ApiException as e:
321 if e.status == 404:
322 return False
323 else:
324 raise
325 else:
326 return True
327
328 def update_mon_count(self, newcount):
329 patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}]
330
331 try:
332 self.rook_api_patch(
333 "cephclusters/{0}".format(self.rook_env.cluster_name),
334 body=patch)
335 except ApiException as e:
336 log.exception("API exception: {0}".format(e))
337 raise ApplyException(
338 "Failed to update mon count in Cluster CRD: {0}".format(e))
339
340 return "Updated mon count to {0}".format(newcount)
341
342 def update_nfs_count(self, svc_id, newcount):
343 patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}]
344
345 try:
346 self.rook_api_patch(
347 "cephnfses/{0}".format(svc_id),
348 body=patch)
349 except ApiException as e:
350 log.exception("API exception: {0}".format(e))
351 raise ApplyException(
352 "Failed to update NFS server count for {0}: {1}".format(svc_id, e))
353 return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
354
355 def add_osds(self, drive_group, all_hosts):
356 # type: (orchestrator.DriveGroupSpec, List[str]) -> str
357 """
358 Rook currently (0.8) can only do single-drive OSDs, so we
359 treat all drive groups as just a list of individual OSDs.
360 """
361 block_devices = drive_group.data_devices.paths if drive_group.data_devices else None
362 directories = drive_group.data_directories
363
364 assert drive_group.objectstore in ("bluestore", "filestore")
365
366 # The CRD looks something like this:
367 # nodes:
368 # - name: "gravel1.rockery"
369 # devices:
370 # - name: "sdb"
371 # config:
372 # storeType: bluestore
373
374 current_cluster = self.rook_api_get(
375 "cephclusters/{0}".format(self.rook_env.cluster_name))
376
377 patch = []
378
379 # FIXME: this is all not really atomic, because jsonpatch doesn't
380 # let us do "test" operations that would check if items with
381 # matching names were in existing lists.
382
383 if 'nodes' not in current_cluster['spec']['storage']:
384 patch.append({
385 'op': 'add', 'path': '/spec/storage/nodes', 'value': []
386 })
387
388 current_nodes = current_cluster['spec']['storage'].get('nodes', [])
389
390 if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]:
391 pd = { "name": drive_group.hosts(all_hosts)[0],
392 "config": { "storeType": drive_group.objectstore }}
393
394 if block_devices:
395 pd["devices"] = [{'name': d} for d in block_devices]
396 if directories:
397 pd["directories"] = [{'path': p} for p in directories]
398
399 patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd })
400 else:
401 # Extend existing node
402 node_idx = None
403 current_node = None
404 for i, c in enumerate(current_nodes):
405 if c['name'] == drive_group.hosts(all_hosts)[0]:
406 current_node = c
407 node_idx = i
408 break
409
410 assert node_idx is not None
411 assert current_node is not None
412
413 new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']]))
414 for n in new_devices:
415 patch.append({
416 "op": "add",
417 "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx),
418 "value": {'name': n}
419 })
420
421 new_dirs = list(set(directories) - set(current_node['directories']))
422 for p in new_dirs:
423 patch.append({
424 "op": "add",
425 "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx),
426 "value": {'path': p}
427 })
428
429 if len(patch) == 0:
430 return "No change"
431
432 try:
433 self.rook_api_patch(
434 "cephclusters/{0}".format(self.rook_env.cluster_name),
435 body=patch)
436 except ApiException as e:
437 log.exception("API exception: {0}".format(e))
438 raise ApplyException(
439 "Failed to create OSD entries in Cluster CRD: {0}".format(
440 e))
441
442 return "Success"