]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/services/ganesha.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / dashboard / services / ganesha.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=too-many-lines
3 from __future__ import absolute_import
4
5 import logging
6 import os
7 import re
8 from typing import Any, Dict, List, Optional, cast
9
10 from ceph.deployment.service_spec import NFSServiceSpec
11 from orchestrator import DaemonDescription, OrchestratorError, ServiceDescription
12
13 from .. import mgr
14 from ..exceptions import DashboardException
15 from ..settings import Settings
16 from .cephfs import CephFS
17 from .cephx import CephX
18 from .orchestrator import OrchClient
19 from .rgw_client import RgwClient, RequestException, NoCredentialsException
20
21
22 logger = logging.getLogger('ganesha')
23
24
25 class NFSException(DashboardException):
26 def __init__(self, msg):
27 super(NFSException, self).__init__(component="nfs", msg=msg)
28
29
30 class Ganesha(object):
31 @classmethod
32 def _get_clusters_locations(cls):
33 # pylint: disable=too-many-branches
34 # Get Orchestrator clusters
35 orch_result = cls._get_orch_clusters_locations()
36
37 # Get user-defined clusters
38 location_list_str = Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE
39 if not orch_result and not location_list_str:
40 raise NFSException("NFS-Ganesha cluster is not detected. "
41 "Please set the GANESHA_RADOS_POOL_NAMESPACE "
42 "setting or deploy an NFS-Ganesha cluster with the Orchestrator.")
43 result = {} # type: ignore
44 location_list = [loc.strip() for loc in location_list_str.split(
45 ",")] if location_list_str else []
46 for location in location_list:
47 cluster = None
48 pool = None
49 namespace = None
50 if not location:
51 raise NFSException("Invalid Ganesha cluster RADOS "
52 "[cluster_id:]pool/namespace setting: {}"
53 .format(location))
54 if location.count(':') < 1:
55 # default cluster_id
56 if location.count('/') > 1:
57 raise NFSException("Invalid Ganesha RADOS pool/namespace "
58 "setting: {}".format(location))
59 # in this case accept pool/namespace only
60 cluster = "_default_"
61 if location.count('/') == 0:
62 pool, namespace = location, None
63 else:
64 pool, namespace = location.split('/', 1)
65 else:
66 cluster = location[:location.find(':')]
67 pool_nm = location[location.find(':')+1:]
68 if pool_nm.count('/') == 0:
69 pool, namespace = pool_nm, None
70 else:
71 pool, namespace = pool_nm.split('/', 1)
72
73 if cluster in orch_result:
74 # cephadm might have set same cluster settings, ask the user to remove it.
75 raise NFSException(
76 'Detected a conflicting NFS-Ganesha cluster name `{0}`. There exists an '
77 'NFS-Ganesha cluster called `{0}` that is deployed by the Orchestrator. '
78 'Please remove or rename the cluster from the GANESHA_RADOS_POOL_NAMESPACE '
79 'setting.'.format(cluster))
80
81 if cluster in result:
82 raise NFSException("Duplicate Ganesha cluster definition in "
83 "the setting: {}".format(location_list_str))
84 result[cluster] = {
85 'pool': pool,
86 'namespace': namespace,
87 'type': ClusterType.USER,
88 'daemon_conf': None
89 }
90 return {**orch_result, **result}
91
92 @classmethod
93 def _get_orch_clusters_locations(cls):
94 orch_result = {} # type: ignore
95 services = cls._get_orch_nfs_services()
96 for service in services:
97 spec = cast(NFSServiceSpec, service.spec)
98 try:
99 orch_result[spec.service_id] = {
100 'pool': spec.pool,
101 'namespace': spec.namespace,
102 'type': ClusterType.ORCHESTRATOR,
103 'daemon_conf': spec.rados_config_name()
104 }
105 except AttributeError as ex:
106 logger.warning('Error when getting NFS service from the Orchestrator. %s', str(ex))
107 continue
108 return orch_result
109
110 @classmethod
111 def get_ganesha_clusters(cls):
112 return [cluster_id for cluster_id in cls._get_clusters_locations()]
113
114 @staticmethod
115 def _get_orch_nfs_services() -> List[ServiceDescription]:
116 try:
117 return OrchClient.instance().services.list('nfs')
118 except (RuntimeError, OrchestratorError, ImportError):
119 return []
120
121 @classmethod
122 def parse_rados_url(cls, rados_url):
123 if not rados_url.startswith("rados://"):
124 raise NFSException("Invalid NFS Ganesha RADOS configuration URL: {}"
125 .format(rados_url))
126 rados_url = rados_url[8:]
127 url_comps = rados_url.split("/")
128 if len(url_comps) < 2 or len(url_comps) > 3:
129 raise NFSException("Invalid NFS Ganesha RADOS configuration URL: "
130 "rados://{}".format(rados_url))
131 if len(url_comps) == 2:
132 return url_comps[0], None, url_comps[1]
133 return url_comps
134
135 @classmethod
136 def make_rados_url(cls, pool, namespace, obj):
137 if namespace:
138 return "rados://{}/{}/{}".format(pool, namespace, obj)
139 return "rados://{}/{}".format(pool, obj)
140
141 @classmethod
142 def get_cluster(cls, cluster_id):
143 locations = cls._get_clusters_locations()
144 if cluster_id not in locations:
145 raise NFSException("Cluster not found: cluster_id={}"
146 .format(cluster_id))
147 return locations[cluster_id]
148
149 @classmethod
150 def fsals_available(cls):
151 result = []
152 if CephFS.list_filesystems():
153 result.append("CEPH")
154 try:
155 if RgwClient.admin_instance().is_service_online() and \
156 RgwClient.admin_instance().is_system_user():
157 result.append("RGW")
158 except (NoCredentialsException, RequestException, LookupError):
159 pass
160 return result
161
162
163 class GaneshaConfParser(object):
164 def __init__(self, raw_config):
165 self.pos = 0
166 self.text = ""
167 self.clean_config(raw_config)
168
169 def clean_config(self, raw_config):
170 for line in raw_config.split("\n"):
171 cardinal_idx = line.find('#')
172 if cardinal_idx == -1:
173 self.text += line
174 else:
175 # remove comments
176 self.text += line[:cardinal_idx]
177 if line.startswith("%"):
178 self.text += "\n"
179
180 def remove_all_whitespaces(self):
181 new_text = ""
182 in_string = False
183 in_section = False
184 for i, cha in enumerate(self.text):
185 if in_section:
186 if cha != '"' and self.text[i-1] != '\\':
187 new_text += cha
188 elif cha == '\n':
189 new_text += cha
190 in_section = False
191 elif i == (len(self.text)-1):
192 if cha != '"' and self.text[i-1] != '\\':
193 new_text += cha
194 in_section = False
195 elif not in_section and (i == 0 or self.text[i-1] == '\n') and cha == '%':
196 in_section = True
197 new_text += cha
198 elif in_string or cha not in [' ', '\n', '\t']:
199 new_text += cha
200 elif cha == '"' and self.text[i-1] != '\\':
201 in_string = not in_string
202 self.text = new_text
203
204 def stream(self):
205 return self.text[self.pos:]
206
207 def parse_block_name(self):
208 idx = self.stream().find('{')
209 if idx == -1:
210 raise Exception("Cannot find block name")
211 block_name = self.stream()[:idx]
212 self.pos += idx+1
213 return block_name
214
215 def parse_block_or_section(self):
216 if self.stream().startswith("%url "):
217 # section line
218 self.pos += 5
219 idx = self.stream().find('\n')
220 if idx == -1:
221 value = self.stream()
222 self.pos += len(self.stream())
223 else:
224 value = self.stream()[:idx]
225 self.pos += idx+1
226 block_dict = {'block_name': '%url', 'value': value}
227 return block_dict
228
229 block_name = self.parse_block_name().upper()
230 block_dict = {'block_name': block_name}
231 self.parse_block_body(block_dict)
232 if self.stream()[0] != '}':
233 raise Exception("No closing bracket '}' found at the end of block")
234 self.pos += 1
235 return block_dict
236
237 def parse_parameter_value(self, raw_value):
238 colon_idx = raw_value.find(',')
239
240 if colon_idx == -1:
241 try:
242 return int(raw_value)
243 except ValueError:
244 if raw_value == "true":
245 return True
246 if raw_value == "false":
247 return False
248 if raw_value.find('"') == 0:
249 return raw_value[1:-1]
250 return raw_value
251 else:
252 return [self.parse_parameter_value(v.strip())
253 for v in raw_value.split(',')]
254
255 def parse_stanza(self, block_dict):
256 equal_idx = self.stream().find('=')
257 semicolon_idx = self.stream().find(';')
258 if equal_idx == -1:
259 raise Exception("Malformed stanza: no equal symbol found.")
260 parameter_name = self.stream()[:equal_idx].lower()
261 parameter_value = self.stream()[equal_idx+1:semicolon_idx]
262 block_dict[parameter_name] = self.parse_parameter_value(
263 parameter_value)
264 self.pos += semicolon_idx+1
265
266 def parse_block_body(self, block_dict):
267 last_pos = self.pos
268 while True:
269 semicolon_idx = self.stream().find(';')
270 lbracket_idx = self.stream().find('{')
271 rbracket_idx = self.stream().find('}')
272
273 if rbracket_idx == 0:
274 # block end
275 return
276
277 if (semicolon_idx != -1 and lbracket_idx != -1
278 and semicolon_idx < lbracket_idx) \
279 or (semicolon_idx != -1 and lbracket_idx == -1):
280 self.parse_stanza(block_dict)
281 elif (semicolon_idx != -1 and lbracket_idx != -1
282 and semicolon_idx > lbracket_idx) or (
283 semicolon_idx == -1 and lbracket_idx != -1):
284 if '_blocks_' not in block_dict:
285 block_dict['_blocks_'] = []
286 block_dict['_blocks_'].append(self.parse_block_or_section())
287 else:
288 raise Exception("Malformed stanza: no semicolon found.")
289
290 if last_pos == self.pos:
291 raise Exception("Infinite loop while parsing block content")
292 last_pos = self.pos
293
294 def parse(self):
295 self.remove_all_whitespaces()
296 blocks = []
297 while self.stream():
298 block_dict = self.parse_block_or_section()
299 blocks.append(block_dict)
300 return blocks
301
302 @staticmethod
303 def _indentation(depth, size=4):
304 conf_str = ""
305 for _ in range(0, depth*size):
306 conf_str += " "
307 return conf_str
308
309 @staticmethod
310 def write_block_body(block, depth=0):
311 def format_val(key, val):
312 if isinstance(val, list):
313 return ', '.join([format_val(key, v) for v in val])
314 if isinstance(val, bool):
315 return str(val).lower()
316 if isinstance(val, int) or (block['block_name'] == 'CLIENT'
317 and key == 'clients'):
318 return '{}'.format(val)
319 return '"{}"'.format(val)
320
321 conf_str = ""
322 for key, val in block.items():
323 if key == 'block_name':
324 continue
325 elif key == '_blocks_':
326 for blo in val:
327 conf_str += GaneshaConfParser.write_block(blo, depth)
328 elif val:
329 conf_str += GaneshaConfParser._indentation(depth)
330 conf_str += '{} = {};\n'.format(key, format_val(key, val))
331 return conf_str
332
333 @staticmethod
334 def write_block(block, depth):
335 if block['block_name'] == "%url":
336 return '%url "{}"\n\n'.format(block['value'])
337
338 conf_str = ""
339 conf_str += GaneshaConfParser._indentation(depth)
340 conf_str += format(block['block_name'])
341 conf_str += " {\n"
342 conf_str += GaneshaConfParser.write_block_body(block, depth+1)
343 conf_str += GaneshaConfParser._indentation(depth)
344 conf_str += "}\n\n"
345 return conf_str
346
347 @staticmethod
348 def write_conf(blocks):
349 if not isinstance(blocks, list):
350 blocks = [blocks]
351 conf_str = ""
352 for block in blocks:
353 conf_str += GaneshaConfParser.write_block(block, 0)
354 return conf_str
355
356
357 class FSal(object):
358 def __init__(self, name):
359 self.name = name
360
361 @classmethod
362 def validate_path(cls, _):
363 raise NotImplementedError()
364
365 def validate(self):
366 raise NotImplementedError()
367
368 def fill_keys(self):
369 raise NotImplementedError()
370
371 def create_path(self, path):
372 raise NotImplementedError()
373
374 @staticmethod
375 def from_fsal_block(fsal_block):
376 if fsal_block['name'] == "CEPH":
377 return CephFSFSal.from_fsal_block(fsal_block)
378 if fsal_block['name'] == 'RGW':
379 return RGWFSal.from_fsal_block(fsal_block)
380 return None
381
382 def to_fsal_block(self):
383 raise NotImplementedError()
384
385 @staticmethod
386 def from_dict(fsal_dict):
387 if fsal_dict['name'] == "CEPH":
388 return CephFSFSal.from_dict(fsal_dict)
389 if fsal_dict['name'] == 'RGW':
390 return RGWFSal.from_dict(fsal_dict)
391 return None
392
393 def to_dict(self):
394 raise NotImplementedError()
395
396
397 class RGWFSal(FSal):
398 def __init__(self, name, rgw_user_id, access_key, secret_key):
399 super(RGWFSal, self).__init__(name)
400 self.rgw_user_id = rgw_user_id
401 self.access_key = access_key
402 self.secret_key = secret_key
403
404 @classmethod
405 def validate_path(cls, path):
406 return path == "/" or re.match(r'^[^/><|&()#?]+$', path)
407
408 def validate(self):
409 if not self.rgw_user_id:
410 raise NFSException('RGW user must be specified')
411
412 if not RgwClient.admin_instance().user_exists(self.rgw_user_id):
413 raise NFSException("RGW user '{}' does not exist"
414 .format(self.rgw_user_id))
415
416 def fill_keys(self):
417 keys = RgwClient.admin_instance().get_user_keys(self.rgw_user_id)
418 self.access_key = keys['access_key']
419 self.secret_key = keys['secret_key']
420
421 def create_path(self, path):
422 if path == '/': # nothing to do
423 return
424 rgw = RgwClient.instance(self.rgw_user_id)
425 try:
426 exists = rgw.bucket_exists(path, self.rgw_user_id)
427 logger.debug('Checking existence of RGW bucket "%s" for user "%s": %s',
428 path, self.rgw_user_id, exists)
429 except RequestException as exp:
430 if exp.status_code == 403:
431 raise NFSException('Cannot create bucket "{}" as it already '
432 'exists, and belongs to other user.'
433 .format(path))
434 raise exp
435 if not exists:
436 logger.info('Creating new RGW bucket "%s" for user "%s"', path,
437 self.rgw_user_id)
438 rgw.create_bucket(path)
439
440 @classmethod
441 def from_fsal_block(cls, fsal_block):
442 return cls(fsal_block['name'],
443 fsal_block['user_id'],
444 fsal_block['access_key_id'],
445 fsal_block['secret_access_key'])
446
447 def to_fsal_block(self):
448 return {
449 'block_name': 'FSAL',
450 'name': self.name,
451 'user_id': self.rgw_user_id,
452 'access_key_id': self.access_key,
453 'secret_access_key': self.secret_key
454 }
455
456 @classmethod
457 def from_dict(cls, fsal_dict):
458 return cls(fsal_dict['name'], fsal_dict['rgw_user_id'], None, None)
459
460 def to_dict(self):
461 return {
462 'name': self.name,
463 'rgw_user_id': self.rgw_user_id
464 }
465
466
467 class CephFSFSal(FSal):
468 def __init__(self, name, user_id=None, fs_name=None, sec_label_xattr=None,
469 cephx_key=None):
470 super(CephFSFSal, self).__init__(name)
471 self.fs_name = fs_name
472 self.user_id = user_id
473 self.sec_label_xattr = sec_label_xattr
474 self.cephx_key = cephx_key
475
476 @classmethod
477 def validate_path(cls, path):
478 return re.match(r'^/[^><|&()?]*$', path)
479
480 def validate(self):
481 if self.user_id and self.user_id not in CephX.list_clients():
482 raise NFSException("cephx user '{}' does not exist"
483 .format(self.user_id))
484
485 def fill_keys(self):
486 if self.user_id:
487 self.cephx_key = CephX.get_client_key(self.user_id)
488
489 def create_path(self, path):
490 cfs = CephFS(self.fs_name)
491 if path == os.sep:
492 return
493 cfs.mk_dirs(path)
494
495 @classmethod
496 def from_fsal_block(cls, fsal_block):
497 return cls(fsal_block['name'],
498 fsal_block.get('user_id', None),
499 fsal_block.get('filesystem', None),
500 fsal_block.get('sec_label_xattr', None),
501 fsal_block.get('secret_access_key', None))
502
503 def to_fsal_block(self):
504 result = {
505 'block_name': 'FSAL',
506 'name': self.name,
507 }
508 if self.user_id:
509 result['user_id'] = self.user_id
510 if self.fs_name:
511 result['filesystem'] = self.fs_name
512 if self.sec_label_xattr:
513 result['sec_label_xattr'] = self.sec_label_xattr
514 if self.cephx_key:
515 result['secret_access_key'] = self.cephx_key
516 return result
517
518 @classmethod
519 def from_dict(cls, fsal_dict):
520 return cls(fsal_dict['name'], fsal_dict['user_id'],
521 fsal_dict['fs_name'], fsal_dict['sec_label_xattr'], None)
522
523 def to_dict(self):
524 return {
525 'name': self.name,
526 'user_id': self.user_id,
527 'fs_name': self.fs_name,
528 'sec_label_xattr': self.sec_label_xattr
529 }
530
531
532 class Client(object):
533 def __init__(self, addresses, access_type=None, squash=None):
534 self.addresses = addresses
535 self.access_type = access_type
536 self.squash = GaneshaConf.format_squash(squash)
537
538 @classmethod
539 def from_client_block(cls, client_block):
540 addresses = client_block['clients']
541 if not isinstance(addresses, list):
542 addresses = [addresses]
543 return cls(addresses,
544 client_block.get('access_type', None),
545 client_block.get('squash', None))
546
547 def to_client_block(self):
548 result = {
549 'block_name': 'CLIENT',
550 'clients': self.addresses,
551 }
552 if self.access_type:
553 result['access_type'] = self.access_type
554 if self.squash:
555 result['squash'] = self.squash
556 return result
557
558 @classmethod
559 def from_dict(cls, client_dict):
560 return cls(client_dict['addresses'], client_dict['access_type'],
561 client_dict['squash'])
562
563 def to_dict(self):
564 return {
565 'addresses': self.addresses,
566 'access_type': self.access_type,
567 'squash': self.squash
568 }
569
570
571 class Export(object):
572 # pylint: disable=R0902
573 def __init__(self, export_id, path, fsal, cluster_id, daemons, pseudo=None,
574 tag=None, access_type=None, squash=None,
575 attr_expiration_time=None, security_label=False,
576 protocols=None, transports=None, clients=None):
577 self.export_id = export_id
578 self.path = GaneshaConf.format_path(path)
579 self.fsal = fsal
580 self.cluster_id = cluster_id
581 self.daemons = set(daemons)
582 self.pseudo = GaneshaConf.format_path(pseudo)
583 self.tag = tag
584 self.access_type = access_type
585 self.squash = GaneshaConf.format_squash(squash)
586 if attr_expiration_time is None:
587 self.attr_expiration_time = 0
588 else:
589 self.attr_expiration_time = attr_expiration_time
590 self.security_label = security_label
591 self.protocols = {GaneshaConf.format_protocol(p) for p in protocols}
592 self.transports = set(transports)
593 self.clients = clients
594
595 def validate(self):
596 # pylint: disable=R0912
597 if not self.fsal.validate_path(self.path):
598 raise NFSException("Export path ({}) is invalid.".format(self.path))
599
600 if not self.protocols:
601 raise NFSException(
602 "No NFS protocol version specified for the export.")
603
604 if not self.transports:
605 raise NFSException(
606 "No network transport type specified for the export.")
607
608 for t in self.transports:
609 match = re.match(r'^TCP$|^UDP$', t)
610 if not match:
611 raise NFSException(
612 "'{}' is an invalid network transport type identifier"
613 .format(t))
614
615 self.fsal.validate()
616
617 if 4 in self.protocols:
618 if not self.pseudo:
619 raise NFSException(
620 "Pseudo path is required when NFSv4 protocol is used")
621 match = re.match(r'^/[^><|&()]*$', self.pseudo)
622 if not match:
623 raise NFSException(
624 "Export pseudo path ({}) is invalid".format(self.pseudo))
625
626 if self.tag:
627 match = re.match(r'^[^/><|:&()]+$', self.tag)
628 if not match:
629 raise NFSException(
630 "Export tag ({}) is invalid".format(self.tag))
631
632 if self.fsal.name == 'RGW' and 4 not in self.protocols and not self.tag:
633 raise NFSException(
634 "Tag is mandatory for RGW export when using only NFSv3")
635
636 @classmethod
637 def from_export_block(cls, export_block, cluster_id, defaults):
638 logger.debug("parsing export block: %s", export_block)
639
640 fsal_block = [b for b in export_block['_blocks_']
641 if b['block_name'] == "FSAL"]
642
643 protocols = export_block.get('protocols', defaults['protocols'])
644 if not isinstance(protocols, list):
645 protocols = [protocols]
646
647 transports = export_block.get('transports', defaults['transports'])
648 if not isinstance(transports, list):
649 transports = [transports]
650
651 client_blocks = [b for b in export_block['_blocks_']
652 if b['block_name'] == "CLIENT"]
653
654 return cls(export_block['export_id'],
655 export_block['path'],
656 FSal.from_fsal_block(fsal_block[0]),
657 cluster_id,
658 [],
659 export_block.get('pseudo', None),
660 export_block.get('tag', None),
661 export_block.get('access_type', defaults['access_type']),
662 export_block.get('squash', defaults['squash']),
663 export_block.get('attr_expiration_time', None),
664 export_block.get('security_label', False),
665 protocols,
666 transports,
667 [Client.from_client_block(client)
668 for client in client_blocks])
669
670 def to_export_block(self, defaults):
671 # pylint: disable=too-many-branches
672 result = {
673 'block_name': 'EXPORT',
674 'export_id': self.export_id,
675 'path': self.path
676 }
677 if self.pseudo:
678 result['pseudo'] = self.pseudo
679 if self.tag:
680 result['tag'] = self.tag
681 if 'access_type' not in defaults \
682 or self.access_type != defaults['access_type']:
683 result['access_type'] = self.access_type
684 if 'squash' not in defaults or self.squash != defaults['squash']:
685 result['squash'] = self.squash
686 if self.fsal.name == 'CEPH':
687 result['attr_expiration_time'] = self.attr_expiration_time
688 result['security_label'] = self.security_label
689 if 'protocols' not in defaults:
690 result['protocols'] = [p for p in self.protocols]
691 else:
692 def_proto = defaults['protocols']
693 if not isinstance(def_proto, list):
694 def_proto = set([def_proto])
695 if self.protocols != def_proto:
696 result['protocols'] = [p for p in self.protocols]
697 if 'transports' not in defaults:
698 result['transports'] = [t for t in self.transports]
699 else:
700 def_transp = defaults['transports']
701 if not isinstance(def_transp, list):
702 def_transp = set([def_transp])
703 if self.transports != def_transp:
704 result['transports'] = [t for t in self.transports]
705
706 result['_blocks_'] = [self.fsal.to_fsal_block()]
707 result['_blocks_'].extend([client.to_client_block()
708 for client in self.clients])
709 return result
710
711 @classmethod
712 def from_dict(cls, export_id, ex_dict, old_export=None):
713 return cls(export_id,
714 ex_dict['path'],
715 FSal.from_dict(ex_dict['fsal']),
716 ex_dict['cluster_id'],
717 ex_dict['daemons'],
718 ex_dict['pseudo'],
719 ex_dict['tag'],
720 ex_dict['access_type'],
721 ex_dict['squash'],
722 old_export.attr_expiration_time if old_export else None,
723 ex_dict['security_label'],
724 ex_dict['protocols'],
725 ex_dict['transports'],
726 [Client.from_dict(client) for client in ex_dict['clients']])
727
728 def to_dict(self):
729 return {
730 'export_id': self.export_id,
731 'path': self.path,
732 'fsal': self.fsal.to_dict(),
733 'cluster_id': self.cluster_id,
734 'daemons': sorted([d for d in self.daemons]),
735 'pseudo': self.pseudo,
736 'tag': self.tag,
737 'access_type': self.access_type,
738 'squash': self.squash,
739 'security_label': self.security_label,
740 'protocols': sorted([p for p in self.protocols]),
741 'transports': sorted([t for t in self.transports]),
742 'clients': [client.to_dict() for client in self.clients]
743 }
744
745
746 class ClusterType(object):
747
748 # Ganesha clusters deployed by the Orchestrator.
749 ORCHESTRATOR = 'orchestrator'
750
751 # Ganesha clusters deployed manually by the user. Specified by using the
752 # GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE setting.
753 USER = 'user'
754
755
756 class GaneshaConf(object):
757 # pylint: disable=R0902
758
759 def __init__(self, cluster_id, rados_pool, rados_namespace, daemon_confs=None):
760 self.cluster_id = cluster_id
761 self.rados_pool = rados_pool
762 self.rados_namespace = rados_namespace
763 self.daemon_confs = daemon_confs if daemon_confs is not None else []
764 self.export_conf_blocks = [] # type: ignore
765 self.daemons_conf_blocks = {} # type: ignore
766 self._defaults = {}
767 self.exports = {}
768
769 self._read_raw_config()
770
771 # load defaults
772 def_block = [b for b in self.export_conf_blocks
773 if b['block_name'] == "EXPORT_DEFAULTS"]
774 self.export_defaults = def_block[0] if def_block else {}
775 self._defaults = self.ganesha_defaults(self.export_defaults)
776
777 for export_block in [block for block in self.export_conf_blocks
778 if block['block_name'] == "EXPORT"]:
779 export = Export.from_export_block(export_block, cluster_id,
780 self._defaults)
781 self.exports[export.export_id] = export
782
783 # link daemons to exports
784 self._link_daemons_to_exports()
785
786 def _link_daemons_to_exports(self):
787 raise NotImplementedError()
788
789 @classmethod
790 def instance(cls, cluster_id):
791 cluster = Ganesha.get_cluster(cluster_id)
792 if cluster['type'] == ClusterType.ORCHESTRATOR:
793 return GaneshaConfOrchestrator(cluster_id, cluster['pool'], cluster['namespace'],
794 [cluster['daemon_conf']])
795 if cluster['type'] == ClusterType.USER:
796 return GaneshaConfUser(cluster_id, cluster['pool'], cluster['namespace'])
797 raise NFSException('Unknown cluster type `{}` for cluster `{}`'.format(
798 cluster['type'], cluster_id))
799
800 def _read_raw_config(self):
801
802 def _read_rados_obj(_obj):
803 size, _ = _obj.stat()
804 return _obj.read(size).decode("utf-8")
805
806 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
807 if self.rados_namespace:
808 ioctx.set_namespace(self.rados_namespace)
809 objs = ioctx.list_objects()
810 for obj in objs:
811 if obj.key.startswith("export-"):
812 raw_config = _read_rados_obj(obj)
813 logger.debug("read export configuration from rados "
814 "object %s/%s/%s:\n%s", self.rados_pool,
815 self.rados_namespace, obj.key, raw_config)
816 self.export_conf_blocks.extend(
817 GaneshaConfParser(raw_config).parse())
818 elif not self.daemon_confs and obj.key.startswith("conf-"):
819 # Read all `conf-xxx` for daemon configs.
820 raw_config = _read_rados_obj(obj)
821 logger.debug("read daemon configuration from rados "
822 "object %s/%s/%s:\n%s", self.rados_pool,
823 self.rados_namespace, obj.key, raw_config)
824 idx = obj.key.find('-')
825 self.daemons_conf_blocks[obj.key[idx+1:]] = \
826 GaneshaConfParser(raw_config).parse()
827
828 if self.daemon_confs:
829 # When daemon configs are provided.
830 for conf in self.daemon_confs:
831 size, _ = ioctx.stat(conf)
832 raw_config = ioctx.read(conf, size).decode("utf-8")
833 logger.debug("read daemon configuration from rados "
834 "object %s/%s/%s:\n%s", self.rados_pool,
835 self.rados_namespace, conf, raw_config)
836 self.daemons_conf_blocks[conf] = \
837 GaneshaConfParser(raw_config).parse()
838
839 def _write_raw_config(self, conf_block, obj):
840 raw_config = GaneshaConfParser.write_conf(conf_block)
841 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
842 if self.rados_namespace:
843 ioctx.set_namespace(self.rados_namespace)
844 ioctx.write_full(obj, raw_config.encode('utf-8'))
845 logger.debug(
846 "write configuration into rados object %s/%s/%s:\n%s",
847 self.rados_pool, self.rados_namespace, obj, raw_config)
848
849 @classmethod
850 def ganesha_defaults(cls, export_defaults):
851 """
852 According to
853 https://github.com/nfs-ganesha/nfs-ganesha/blob/next/src/config_samples/export.txt
854 """
855 return {
856 'access_type': export_defaults.get('access_type', 'NONE'),
857 'protocols': export_defaults.get('protocols', [3, 4]),
858 'transports': export_defaults.get('transports', ['TCP', 'UDP']),
859 'squash': export_defaults.get('squash', 'root_squash')
860 }
861
862 @classmethod
863 def format_squash(cls, squash):
864 if squash is None:
865 return None
866 if squash.lower() in ["no_root_squash", "noidsquash", "none"]:
867 return "no_root_squash"
868 if squash.lower() in ["rootid", "root_id_squash", "rootidsquash"]:
869 return "root_id_squash"
870 if squash.lower() in ["root", "root_squash", "rootsquash"]:
871 return "root_squash"
872 if squash.lower() in ["all", "all_squash", "allsquash",
873 "all_anonymous", "allanonymous"]:
874 return "all_squash"
875 logger.error("could not parse squash value: %s", squash)
876 raise NFSException("'{}' is an invalid squash option".format(squash))
877
878 @classmethod
879 def format_protocol(cls, protocol):
880 if str(protocol) in ["NFSV3", "3", "V3", "NFS3"]:
881 return 3
882 if str(protocol) in ["NFSV4", "4", "V4", "NFS4"]:
883 return 4
884 logger.error("could not parse protocol value: %s", protocol)
885 raise NFSException("'{}' is an invalid NFS protocol version identifier"
886 .format(protocol))
887
888 @classmethod
889 def format_path(cls, path):
890 if path is not None:
891 path = path.strip()
892 if len(path) > 1 and path[-1] == '/':
893 path = path[:-1]
894 return path
895
896 def validate(self, export: Export):
897 export.validate()
898
899 if 4 in export.protocols: # NFSv4 protocol
900 len_prefix = 1
901 parent_export = None
902 for ex in self.list_exports():
903 if export.tag and ex.tag == export.tag:
904 raise NFSException(
905 "Another export exists with the same tag: {}"
906 .format(export.tag))
907
908 if export.pseudo and ex.pseudo == export.pseudo:
909 raise NFSException(
910 "Another export exists with the same pseudo path: {}"
911 .format(export.pseudo))
912
913 if not ex.pseudo:
914 continue
915
916 if export.pseudo[:export.pseudo.rfind('/')+1].startswith(ex.pseudo):
917 if export.pseudo[len(ex.pseudo)] == '/':
918 if len(ex.pseudo) > len_prefix:
919 len_prefix = len(ex.pseudo)
920 parent_export = ex
921
922 if len_prefix > 1:
923 # validate pseudo path
924 idx = len(parent_export.pseudo) # type: ignore
925 idx = idx + 1 if idx > 1 else idx
926 real_path = "{}/{}".format(
927 parent_export.path # type: ignore
928 if len(parent_export.path) > 1 else "", # type: ignore
929 export.pseudo[idx:])
930 if export.fsal.name == 'CEPH':
931 cfs = CephFS()
932 if export.path != real_path and not cfs.dir_exists(real_path):
933 raise NFSException(
934 "Pseudo path ({}) invalid, path {} does not exist."
935 .format(export.pseudo, real_path))
936
937 def _gen_export_id(self):
938 exports = sorted(self.exports)
939 nid = 1
940 for e_id in exports:
941 if e_id == nid:
942 nid += 1
943 else:
944 break
945 return nid
946
947 def _persist_daemon_configuration(self):
948 raise NotImplementedError()
949
950 def _save_export(self, export):
951 self.validate(export)
952 export.fsal.create_path(export.path)
953 export.fsal.fill_keys()
954 self.exports[export.export_id] = export
955 conf_block = export.to_export_block(self.export_defaults)
956 self._write_raw_config(conf_block, "export-{}".format(export.export_id))
957 self._persist_daemon_configuration()
958
959 def _delete_export(self, export_id):
960 self._persist_daemon_configuration()
961 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
962 if self.rados_namespace:
963 ioctx.set_namespace(self.rados_namespace)
964 ioctx.remove_object("export-{}".format(export_id))
965
966 def list_exports(self):
967 return [ex for _, ex in self.exports.items()]
968
969 def create_export(self, ex_dict):
970 ex_id = self._gen_export_id()
971 export = Export.from_dict(ex_id, ex_dict)
972 self._save_export(export)
973 return ex_id
974
975 def has_export(self, export_id):
976 return export_id in self.exports
977
978 def update_export(self, ex_dict):
979 if ex_dict['export_id'] not in self.exports:
980 return None
981 old_export = self.exports[ex_dict['export_id']]
982 del self.exports[ex_dict['export_id']]
983 export = Export.from_dict(ex_dict['export_id'], ex_dict, old_export)
984 self._save_export(export)
985 self.exports[export.export_id] = export
986 return old_export
987
988 def remove_export(self, export_id):
989 if export_id not in self.exports:
990 return None
991 export = self.exports[export_id]
992 del self.exports[export_id]
993 self._delete_export(export_id)
994 return export
995
996 def get_export(self, export_id):
997 if export_id in self.exports:
998 return self.exports[export_id]
999 return None
1000
1001 def list_daemons(self) -> List[Dict[str, Any]]:
1002 raise NotImplementedError()
1003
1004 def list_daemon_confs(self):
1005 return self.daemons_conf_blocks.keys()
1006
1007 def reload_daemons(self, daemons):
1008 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
1009 if self.rados_namespace:
1010 ioctx.set_namespace(self.rados_namespace)
1011 for daemon_id in daemons:
1012 ioctx.notify("conf-{}".format(daemon_id))
1013
1014
1015 class GaneshaConfOrchestrator(GaneshaConf):
1016 @classmethod
1017 def _get_orch_nfs_instances(cls,
1018 service_name: Optional[str] = None) -> List[DaemonDescription]:
1019 try:
1020 return OrchClient.instance().services.\
1021 list_daemons(service_name=service_name, daemon_type="nfs")
1022 except (RuntimeError, OrchestratorError, ImportError):
1023 return []
1024
1025 def _link_daemons_to_exports(self):
1026 instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
1027 daemon_ids = {instance.daemon_id for instance in instances}
1028 for _, daemon_blocks in self.daemons_conf_blocks.items():
1029 for block in daemon_blocks:
1030 if block['block_name'] == "%url":
1031 rados_url = block['value']
1032 _, _, obj = Ganesha.parse_rados_url(rados_url)
1033 if obj.startswith("export-"):
1034 export_id = int(obj[obj.find('-')+1:])
1035 self.exports[export_id].daemons.update(daemon_ids)
1036
1037 def validate(self, export: Export):
1038 daemons_list = {d['daemon_id'] for d in self.list_daemons()}
1039 if export.daemons and set(export.daemons) != daemons_list:
1040 raise NFSException('Export should be linked to all daemons.')
1041 super().validate(export)
1042
1043 def _persist_daemon_configuration(self):
1044 daemon_map = {} # type: ignore
1045 for daemon_id in self.list_daemon_confs():
1046 daemon_map[daemon_id] = []
1047
1048 for daemon_id in self.list_daemon_confs():
1049 for _, ex in self.exports.items():
1050 if ex.daemons:
1051 daemon_map[daemon_id].append({
1052 'block_name': "%url",
1053 'value': Ganesha.make_rados_url(
1054 self.rados_pool, self.rados_namespace,
1055 "export-{}".format(ex.export_id))
1056 })
1057 for daemon_id, conf_blocks in daemon_map.items():
1058 self._write_raw_config(conf_blocks, daemon_id)
1059
1060 def list_daemons(self) -> List[Dict[str, Any]]:
1061 instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
1062 return [{
1063 'cluster_id': self.cluster_id,
1064 'daemon_id': instance.daemon_id,
1065 'cluster_type': ClusterType.ORCHESTRATOR,
1066 'status': instance.status,
1067 'status_desc': instance.status_desc
1068 } for instance in instances]
1069
1070 def reload_daemons(self, daemons):
1071 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
1072 if self.rados_namespace:
1073 ioctx.set_namespace(self.rados_namespace)
1074 for daemon_id in self.list_daemon_confs():
1075 ioctx.notify(daemon_id)
1076
1077
1078 class GaneshaConfUser(GaneshaConf):
1079
1080 def _link_daemons_to_exports(self):
1081 for daemon_id, daemon_blocks in self.daemons_conf_blocks.items():
1082 for block in daemon_blocks:
1083 if block['block_name'] == "%url":
1084 rados_url = block['value']
1085 _, _, obj = Ganesha.parse_rados_url(rados_url)
1086 if obj.startswith("export-"):
1087 export_id = int(obj[obj.find('-')+1:])
1088 self.exports[export_id].daemons.add(daemon_id)
1089
1090 def validate(self, export: Export):
1091 daemons_list = [d['daemon_id'] for d in self.list_daemons()]
1092 for daemon_id in export.daemons:
1093 if daemon_id not in daemons_list:
1094 raise NFSException("Daemon '{}' does not exist".format(daemon_id))
1095 super().validate(export)
1096
1097 def _persist_daemon_configuration(self):
1098 daemon_map = {} # type: ignore
1099 for daemon_id in self.list_daemon_confs():
1100 daemon_map[daemon_id] = []
1101
1102 for _, ex in self.exports.items():
1103 for daemon in ex.daemons:
1104 daemon_map[daemon].append({
1105 'block_name': "%url",
1106 'value': Ganesha.make_rados_url(
1107 self.rados_pool, self.rados_namespace,
1108 "export-{}".format(ex.export_id))
1109 })
1110 for daemon_id, conf_blocks in daemon_map.items():
1111 self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id))
1112
1113 def list_daemons(self) -> List[Dict[str, Any]]:
1114 return [{
1115 'cluster_id': self.cluster_id,
1116 'cluster_type': ClusterType.USER,
1117 'daemon_id': daemon_id,
1118 'status': 1,
1119 'status_desc': 'running'
1120 } for daemon_id in self.list_daemon_confs()]