]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/services/ganesha.py
bump version to 16.2.6-pve2
[ceph.git] / ceph / src / pybind / mgr / dashboard / services / ganesha.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=too-many-lines
3 from __future__ import absolute_import
4
5 import logging
6 import os
7 import re
8 from typing import Any, Dict, List, Optional, cast
9
10 from ceph.deployment.service_spec import NFSServiceSpec
11 from orchestrator import DaemonDescription, OrchestratorError, ServiceDescription
12
13 from .. import mgr
14 from ..exceptions import DashboardException
15 from ..settings import Settings
16 from .cephfs import CephFS
17 from .cephx import CephX
18 from .orchestrator import OrchClient
19 from .rgw_client import NoCredentialsException, NoRgwDaemonsException, RequestException, RgwClient
20
21 logger = logging.getLogger('ganesha')
22
23
24 class NFSException(DashboardException):
25 def __init__(self, msg):
26 super(NFSException, self).__init__(component="nfs", msg=msg)
27
28
29 class Ganesha(object):
30 @classmethod
31 def _get_clusters_locations(cls):
32 # pylint: disable=too-many-branches
33 # Get Orchestrator clusters
34 orch_result = cls._get_orch_clusters_locations()
35
36 # Get user-defined clusters
37 location_list_str = Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE
38 if not orch_result and not location_list_str:
39 raise NFSException("NFS-Ganesha cluster is not detected. "
40 "Please set the GANESHA_RADOS_POOL_NAMESPACE "
41 "setting or deploy an NFS-Ganesha cluster with the Orchestrator.")
42 result = {} # type: ignore
43 location_list = [loc.strip() for loc in location_list_str.split(
44 ",")] if location_list_str else []
45 for location in location_list:
46 if not location:
47 raise NFSException("Invalid Ganesha cluster RADOS "
48 "[cluster_id:]pool/namespace setting: {}"
49 .format(location))
50 if location.count(':') < 1:
51 # default cluster_id
52 if location.count('/') > 1:
53 raise NFSException("Invalid Ganesha RADOS pool/namespace "
54 "setting: {}".format(location))
55 # in this case accept pool/namespace only
56 cluster = "_default_"
57 if location.count('/') == 0:
58 pool, namespace = location, None
59 else:
60 pool, namespace = location.split('/', 1)
61 else:
62 cluster = location[:location.find(':')]
63 pool_nm = location[location.find(':')+1:]
64 if pool_nm.count('/') == 0:
65 pool, namespace = pool_nm, None
66 else:
67 pool, namespace = pool_nm.split('/', 1)
68
69 # Check pool/namespace collision.
70 for clusters in [orch_result, result]:
71 for cluster_name, cluster_data in clusters.items():
72 if cluster_data['pool'] == pool and cluster_data['namespace'] == namespace:
73 raise NFSException(
74 f'Pool `{pool}` and namespace `{namespace}` are already in use by '
75 f"""NFS-Ganesha cluster called `{cluster_name}`{" that is deployed by "
76 "the Orchestrator" if cluster_data['type'] == ClusterType.ORCHESTRATOR
77 else ''}. """
78 'Please update GANESHA_RADOS_POOL_NAMESPACE setting.'
79 )
80
81 if cluster in orch_result:
82 # cephadm might have set same cluster settings, ask the user to remove it.
83 raise NFSException(
84 'Detected a conflicting NFS-Ganesha cluster name `{0}`. There exists an '
85 'NFS-Ganesha cluster called `{0}` that is deployed by the Orchestrator. '
86 'Please remove or rename the cluster from the GANESHA_RADOS_POOL_NAMESPACE '
87 'setting.'.format(cluster))
88
89 if cluster in result:
90 raise NFSException("Duplicate Ganesha cluster definition in "
91 "the setting: {}".format(location_list_str))
92 result[cluster] = {
93 'pool': pool,
94 'namespace': namespace,
95 'type': ClusterType.USER,
96 'daemon_conf': None
97 }
98 return {**orch_result, **result}
99
100 @classmethod
101 def _get_orch_clusters_locations(cls):
102 orch_result = {} # type: ignore
103 services = cls._get_orch_nfs_services()
104 for service in services:
105 spec = cast(NFSServiceSpec, service.spec)
106 try:
107 orch_result[spec.service_id] = {
108 'pool': spec.pool,
109 'namespace': spec.namespace,
110 'type': ClusterType.ORCHESTRATOR,
111 'daemon_conf': spec.rados_config_name()
112 }
113 except AttributeError as ex:
114 logger.warning('Error when getting NFS service from the Orchestrator. %s', str(ex))
115 continue
116 return orch_result
117
118 @classmethod
119 def get_ganesha_clusters(cls):
120 return list(cls._get_clusters_locations())
121
122 @staticmethod
123 def _get_orch_nfs_services() -> List[ServiceDescription]:
124 try:
125 return OrchClient.instance().services.list('nfs')
126 except (RuntimeError, OrchestratorError, ImportError):
127 return []
128
129 @classmethod
130 def parse_rados_url(cls, rados_url):
131 if not rados_url.startswith("rados://"):
132 raise NFSException("Invalid NFS Ganesha RADOS configuration URL: {}"
133 .format(rados_url))
134 rados_url = rados_url[8:]
135 url_comps = rados_url.split("/")
136 if len(url_comps) < 2 or len(url_comps) > 3:
137 raise NFSException("Invalid NFS Ganesha RADOS configuration URL: "
138 "rados://{}".format(rados_url))
139 if len(url_comps) == 2:
140 return url_comps[0], None, url_comps[1]
141 return url_comps
142
143 @classmethod
144 def make_rados_url(cls, pool, namespace, obj):
145 if namespace:
146 return "rados://{}/{}/{}".format(pool, namespace, obj)
147 return "rados://{}/{}".format(pool, obj)
148
149 @classmethod
150 def get_cluster(cls, cluster_id):
151 locations = cls._get_clusters_locations()
152 if cluster_id not in locations:
153 raise NFSException("Cluster not found: cluster_id={}"
154 .format(cluster_id))
155 return locations[cluster_id]
156
157 @classmethod
158 def fsals_available(cls):
159 result = []
160 if CephFS.list_filesystems():
161 result.append("CEPH")
162 try:
163 if RgwClient.admin_instance().is_service_online() and \
164 RgwClient.admin_instance().is_system_user():
165 result.append("RGW")
166 except (DashboardException, NoCredentialsException, RequestException,
167 NoRgwDaemonsException):
168 pass
169 return result
170
171
172 class GaneshaConfParser(object):
173 def __init__(self, raw_config):
174 self.pos = 0
175 self.text = ""
176 self.clean_config(raw_config)
177
178 def clean_config(self, raw_config):
179 for line in raw_config.split("\n"):
180 cardinal_idx = line.find('#')
181 if cardinal_idx == -1:
182 self.text += line
183 else:
184 # remove comments
185 self.text += line[:cardinal_idx]
186 if line.startswith("%"):
187 self.text += "\n"
188
189 def remove_all_whitespaces(self):
190 new_text = ""
191 in_string = False
192 in_section = False
193 for i, cha in enumerate(self.text):
194 if in_section:
195 if cha != '"' and self.text[i-1] != '\\':
196 new_text += cha
197 elif cha == '\n':
198 new_text += cha
199 in_section = False
200 elif i == (len(self.text)-1):
201 if cha != '"' and self.text[i-1] != '\\':
202 new_text += cha
203 in_section = False
204 elif not in_section and (i == 0 or self.text[i-1] == '\n') and cha == '%':
205 in_section = True
206 new_text += cha
207 elif in_string or cha not in [' ', '\n', '\t']:
208 new_text += cha
209 elif cha == '"' and self.text[i-1] != '\\':
210 in_string = not in_string
211 self.text = new_text
212
213 def stream(self):
214 return self.text[self.pos:]
215
216 def parse_block_name(self):
217 idx = self.stream().find('{')
218 if idx == -1:
219 raise Exception("Cannot find block name")
220 block_name = self.stream()[:idx]
221 self.pos += idx+1
222 return block_name
223
224 def parse_block_or_section(self):
225 if self.stream().startswith("%url"):
226 # section line
227 self.pos += self.stream().find('rados://')
228 idx = self.stream().find('\n')
229 if idx == -1:
230 value = self.stream()
231 self.pos += len(self.stream())
232 else:
233 value = self.stream()[:idx]
234 self.pos += idx+1
235 block_dict = {'block_name': '%url', 'value': value}
236 return block_dict
237
238 block_name = self.parse_block_name().upper()
239 block_dict = {'block_name': block_name}
240 self.parse_block_body(block_dict)
241 if self.stream()[0] != '}':
242 raise Exception("No closing bracket '}' found at the end of block")
243 self.pos += 1
244 return block_dict
245
246 def parse_parameter_value(self, raw_value):
247 colon_idx = raw_value.find(',')
248
249 if colon_idx == -1:
250 try:
251 return int(raw_value)
252 except ValueError:
253 if raw_value == "true":
254 return True
255 if raw_value == "false":
256 return False
257 if raw_value.find('"') == 0:
258 return raw_value[1:-1]
259 return raw_value
260 else:
261 return [self.parse_parameter_value(v.strip())
262 for v in raw_value.split(',')]
263
264 def parse_stanza(self, block_dict):
265 equal_idx = self.stream().find('=')
266 semicolon_idx = self.stream().find(';')
267 if equal_idx == -1:
268 raise Exception("Malformed stanza: no equal symbol found.")
269 parameter_name = self.stream()[:equal_idx].lower()
270 parameter_value = self.stream()[equal_idx+1:semicolon_idx]
271 block_dict[parameter_name] = self.parse_parameter_value(
272 parameter_value)
273 self.pos += semicolon_idx+1
274
275 def parse_block_body(self, block_dict):
276 last_pos = self.pos
277 while True:
278 semicolon_idx = self.stream().find(';')
279 lbracket_idx = self.stream().find('{')
280 rbracket_idx = self.stream().find('}')
281
282 if rbracket_idx == 0:
283 # block end
284 return
285
286 if (semicolon_idx != -1 and lbracket_idx != -1
287 and semicolon_idx < lbracket_idx) \
288 or (semicolon_idx != -1 and lbracket_idx == -1):
289 self.parse_stanza(block_dict)
290 elif (semicolon_idx != -1 and lbracket_idx != -1
291 and semicolon_idx > lbracket_idx) or (
292 semicolon_idx == -1 and lbracket_idx != -1):
293 if '_blocks_' not in block_dict:
294 block_dict['_blocks_'] = []
295 block_dict['_blocks_'].append(self.parse_block_or_section())
296 else:
297 raise Exception("Malformed stanza: no semicolon found.")
298
299 if last_pos == self.pos:
300 raise Exception("Infinite loop while parsing block content")
301 last_pos = self.pos
302
303 def parse(self):
304 self.remove_all_whitespaces()
305 blocks = []
306 while self.stream():
307 block_dict = self.parse_block_or_section()
308 blocks.append(block_dict)
309 return blocks
310
311 @staticmethod
312 def _indentation(depth, size=4):
313 conf_str = ""
314 for _ in range(0, depth*size):
315 conf_str += " "
316 return conf_str
317
318 @staticmethod
319 def write_block_body(block, depth=0):
320 def format_val(key, val):
321 if isinstance(val, list):
322 return ', '.join([format_val(key, v) for v in val])
323 if isinstance(val, bool):
324 return str(val).lower()
325 if isinstance(val, int) or (block['block_name'] == 'CLIENT'
326 and key == 'clients'):
327 return '{}'.format(val)
328 return '"{}"'.format(val)
329
330 conf_str = ""
331 for key, val in block.items():
332 if key == 'block_name':
333 continue
334 if key == '_blocks_':
335 for blo in val:
336 conf_str += GaneshaConfParser.write_block(blo, depth)
337 elif val:
338 conf_str += GaneshaConfParser._indentation(depth)
339 conf_str += '{} = {};\n'.format(key, format_val(key, val))
340 return conf_str
341
342 @staticmethod
343 def write_block(block, depth):
344 if block['block_name'] == "%url":
345 return '%url "{}"\n\n'.format(block['value'])
346
347 conf_str = ""
348 conf_str += GaneshaConfParser._indentation(depth)
349 conf_str += format(block['block_name'])
350 conf_str += " {\n"
351 conf_str += GaneshaConfParser.write_block_body(block, depth+1)
352 conf_str += GaneshaConfParser._indentation(depth)
353 conf_str += "}\n\n"
354 return conf_str
355
356 @staticmethod
357 def write_conf(blocks):
358 if not isinstance(blocks, list):
359 blocks = [blocks]
360 conf_str = ""
361 for block in blocks:
362 conf_str += GaneshaConfParser.write_block(block, 0)
363 return conf_str
364
365
366 class FSal(object):
367 def __init__(self, name):
368 self.name = name
369
370 @classmethod
371 def validate_path(cls, _):
372 raise NotImplementedError()
373
374 def validate(self):
375 raise NotImplementedError()
376
377 def fill_keys(self):
378 raise NotImplementedError()
379
380 def create_path(self, path):
381 raise NotImplementedError()
382
383 @staticmethod
384 def from_fsal_block(fsal_block):
385 if fsal_block['name'] == "CEPH":
386 return CephFSFSal.from_fsal_block(fsal_block)
387 if fsal_block['name'] == 'RGW':
388 return RGWFSal.from_fsal_block(fsal_block)
389 return None
390
391 def to_fsal_block(self):
392 raise NotImplementedError()
393
394 @staticmethod
395 def from_dict(fsal_dict):
396 if fsal_dict['name'] == "CEPH":
397 return CephFSFSal.from_dict(fsal_dict)
398 if fsal_dict['name'] == 'RGW':
399 return RGWFSal.from_dict(fsal_dict)
400 return None
401
402 def to_dict(self):
403 raise NotImplementedError()
404
405
406 class RGWFSal(FSal):
407 def __init__(self, name, rgw_user_id, access_key, secret_key):
408 super(RGWFSal, self).__init__(name)
409 self.rgw_user_id = rgw_user_id
410 self.access_key = access_key
411 self.secret_key = secret_key
412
413 @classmethod
414 def validate_path(cls, path):
415 return path == "/" or re.match(r'^[^/><|&()#?]+$', path)
416
417 def validate(self):
418 if not self.rgw_user_id:
419 raise NFSException('RGW user must be specified')
420
421 if not RgwClient.admin_instance().user_exists(self.rgw_user_id):
422 raise NFSException("RGW user '{}' does not exist"
423 .format(self.rgw_user_id))
424
425 def fill_keys(self):
426 keys = RgwClient.admin_instance().get_user_keys(self.rgw_user_id)
427 self.access_key = keys['access_key']
428 self.secret_key = keys['secret_key']
429
430 def create_path(self, path):
431 if path == '/': # nothing to do
432 return
433 rgw = RgwClient.instance(self.rgw_user_id)
434 try:
435 exists = rgw.bucket_exists(path, self.rgw_user_id)
436 logger.debug('Checking existence of RGW bucket "%s" for user "%s": %s',
437 path, self.rgw_user_id, exists)
438 except RequestException as exp:
439 if exp.status_code == 403:
440 raise NFSException('Cannot create bucket "{}" as it already '
441 'exists, and belongs to other user.'
442 .format(path))
443 raise exp
444 if not exists:
445 logger.info('Creating new RGW bucket "%s" for user "%s"', path,
446 self.rgw_user_id)
447 rgw.create_bucket(path)
448
449 @classmethod
450 def from_fsal_block(cls, fsal_block):
451 return cls(fsal_block['name'],
452 fsal_block['user_id'],
453 fsal_block['access_key_id'],
454 fsal_block['secret_access_key'])
455
456 def to_fsal_block(self):
457 return {
458 'block_name': 'FSAL',
459 'name': self.name,
460 'user_id': self.rgw_user_id,
461 'access_key_id': self.access_key,
462 'secret_access_key': self.secret_key
463 }
464
465 @classmethod
466 def from_dict(cls, fsal_dict):
467 return cls(fsal_dict['name'], fsal_dict['rgw_user_id'], None, None)
468
469 def to_dict(self):
470 return {
471 'name': self.name,
472 'rgw_user_id': self.rgw_user_id
473 }
474
475
476 class CephFSFSal(FSal):
477 def __init__(self, name, user_id=None, fs_name=None, sec_label_xattr=None,
478 cephx_key=None):
479 super(CephFSFSal, self).__init__(name)
480 self.fs_name = fs_name
481 self.user_id = user_id
482 self.sec_label_xattr = sec_label_xattr
483 self.cephx_key = cephx_key
484
485 @classmethod
486 def validate_path(cls, path):
487 return re.match(r'^/[^><|&()?]*$', path)
488
489 def validate(self):
490 if self.user_id and self.user_id not in CephX.list_clients():
491 raise NFSException("cephx user '{}' does not exist"
492 .format(self.user_id))
493
494 def fill_keys(self):
495 if self.user_id:
496 self.cephx_key = CephX.get_client_key(self.user_id)
497
498 def create_path(self, path):
499 cfs = CephFS(self.fs_name)
500 if path == os.sep:
501 return
502 cfs.mk_dirs(path)
503
504 @classmethod
505 def from_fsal_block(cls, fsal_block):
506 return cls(fsal_block['name'],
507 fsal_block.get('user_id', None),
508 fsal_block.get('filesystem', None),
509 fsal_block.get('sec_label_xattr', None),
510 fsal_block.get('secret_access_key', None))
511
512 def to_fsal_block(self):
513 result = {
514 'block_name': 'FSAL',
515 'name': self.name,
516 }
517 if self.user_id:
518 result['user_id'] = self.user_id
519 if self.fs_name:
520 result['filesystem'] = self.fs_name
521 if self.sec_label_xattr:
522 result['sec_label_xattr'] = self.sec_label_xattr
523 if self.cephx_key:
524 result['secret_access_key'] = self.cephx_key
525 return result
526
527 @classmethod
528 def from_dict(cls, fsal_dict):
529 return cls(fsal_dict['name'], fsal_dict['user_id'],
530 fsal_dict['fs_name'], fsal_dict['sec_label_xattr'], None)
531
532 def to_dict(self):
533 return {
534 'name': self.name,
535 'user_id': self.user_id,
536 'fs_name': self.fs_name,
537 'sec_label_xattr': self.sec_label_xattr
538 }
539
540
541 class Client(object):
542 def __init__(self, addresses, access_type=None, squash=None):
543 self.addresses = addresses
544 self.access_type = access_type
545 self.squash = GaneshaConf.format_squash(squash)
546
547 @classmethod
548 def from_client_block(cls, client_block):
549 addresses = client_block['clients']
550 if not isinstance(addresses, list):
551 addresses = [addresses]
552 return cls(addresses,
553 client_block.get('access_type', None),
554 client_block.get('squash', None))
555
556 def to_client_block(self):
557 result = {
558 'block_name': 'CLIENT',
559 'clients': self.addresses,
560 }
561 if self.access_type:
562 result['access_type'] = self.access_type
563 if self.squash:
564 result['squash'] = self.squash
565 return result
566
567 @classmethod
568 def from_dict(cls, client_dict):
569 return cls(client_dict['addresses'], client_dict['access_type'],
570 client_dict['squash'])
571
572 def to_dict(self):
573 return {
574 'addresses': self.addresses,
575 'access_type': self.access_type,
576 'squash': self.squash
577 }
578
579
580 class Export(object):
581 # pylint: disable=R0902
582 def __init__(self, export_id, path, fsal, cluster_id, daemons, pseudo=None,
583 tag=None, access_type=None, squash=None,
584 attr_expiration_time=None, security_label=False,
585 protocols=None, transports=None, clients=None):
586 self.export_id = export_id
587 self.path = GaneshaConf.format_path(path)
588 self.fsal = fsal
589 self.cluster_id = cluster_id
590 self.daemons = set(daemons)
591 self.pseudo = GaneshaConf.format_path(pseudo)
592 self.tag = tag
593 self.access_type = access_type
594 self.squash = GaneshaConf.format_squash(squash)
595 if attr_expiration_time is None:
596 self.attr_expiration_time = 0
597 else:
598 self.attr_expiration_time = attr_expiration_time
599 self.security_label = security_label
600 self.protocols = {GaneshaConf.format_protocol(p) for p in protocols}
601 self.transports = set(transports)
602 self.clients = clients
603
604 def validate(self):
605 # pylint: disable=R0912
606 if not self.fsal.validate_path(self.path):
607 raise NFSException("Export path ({}) is invalid.".format(self.path))
608
609 if not self.protocols:
610 raise NFSException(
611 "No NFS protocol version specified for the export.")
612
613 if not self.transports:
614 raise NFSException(
615 "No network transport type specified for the export.")
616
617 for t in self.transports:
618 match = re.match(r'^TCP$|^UDP$', t)
619 if not match:
620 raise NFSException(
621 "'{}' is an invalid network transport type identifier"
622 .format(t))
623
624 self.fsal.validate()
625
626 if 4 in self.protocols:
627 if not self.pseudo:
628 raise NFSException(
629 "Pseudo path is required when NFSv4 protocol is used")
630 match = re.match(r'^/[^><|&()]*$', self.pseudo)
631 if not match:
632 raise NFSException(
633 "Export pseudo path ({}) is invalid".format(self.pseudo))
634
635 if self.tag:
636 match = re.match(r'^[^/><|:&()]+$', self.tag)
637 if not match:
638 raise NFSException(
639 "Export tag ({}) is invalid".format(self.tag))
640
641 if self.fsal.name == 'RGW' and 4 not in self.protocols and not self.tag:
642 raise NFSException(
643 "Tag is mandatory for RGW export when using only NFSv3")
644
645 @classmethod
646 def from_export_block(cls, export_block, cluster_id, defaults):
647 logger.debug("parsing export block: %s", export_block)
648
649 fsal_block = [b for b in export_block['_blocks_']
650 if b['block_name'] == "FSAL"]
651
652 protocols = export_block.get('protocols', defaults['protocols'])
653 if not isinstance(protocols, list):
654 protocols = [protocols]
655
656 transports = export_block.get('transports', defaults['transports'])
657 if not isinstance(transports, list):
658 transports = [transports]
659
660 client_blocks = [b for b in export_block['_blocks_']
661 if b['block_name'] == "CLIENT"]
662
663 return cls(export_block['export_id'],
664 export_block['path'],
665 FSal.from_fsal_block(fsal_block[0]),
666 cluster_id,
667 [],
668 export_block.get('pseudo', None),
669 export_block.get('tag', None),
670 export_block.get('access_type', defaults['access_type']),
671 export_block.get('squash', defaults['squash']),
672 export_block.get('attr_expiration_time', None),
673 export_block.get('security_label', False),
674 protocols,
675 transports,
676 [Client.from_client_block(client)
677 for client in client_blocks])
678
679 def to_export_block(self, defaults):
680 # pylint: disable=too-many-branches
681 result = {
682 'block_name': 'EXPORT',
683 'export_id': self.export_id,
684 'path': self.path
685 }
686 if self.pseudo:
687 result['pseudo'] = self.pseudo
688 if self.tag:
689 result['tag'] = self.tag
690 if 'access_type' not in defaults \
691 or self.access_type != defaults['access_type']:
692 result['access_type'] = self.access_type
693 if 'squash' not in defaults or self.squash != defaults['squash']:
694 result['squash'] = self.squash
695 if self.fsal.name == 'CEPH':
696 result['attr_expiration_time'] = self.attr_expiration_time
697 result['security_label'] = self.security_label
698 if 'protocols' not in defaults:
699 result['protocols'] = list(self.protocols)
700 else:
701 def_proto = defaults['protocols']
702 if not isinstance(def_proto, list):
703 def_proto = set([def_proto])
704 if self.protocols != def_proto:
705 result['protocols'] = list(self.protocols)
706 if 'transports' not in defaults:
707 result['transports'] = list(self.transports)
708 else:
709 def_transp = defaults['transports']
710 if not isinstance(def_transp, list):
711 def_transp = set([def_transp])
712 if self.transports != def_transp:
713 result['transports'] = list(self.transports)
714
715 result['_blocks_'] = [self.fsal.to_fsal_block()]
716 result['_blocks_'].extend([client.to_client_block()
717 for client in self.clients])
718 return result
719
720 @classmethod
721 def from_dict(cls, export_id, ex_dict, old_export=None):
722 return cls(export_id,
723 ex_dict['path'],
724 FSal.from_dict(ex_dict['fsal']),
725 ex_dict['cluster_id'],
726 ex_dict['daemons'],
727 ex_dict['pseudo'],
728 ex_dict['tag'],
729 ex_dict['access_type'],
730 ex_dict['squash'],
731 old_export.attr_expiration_time if old_export else None,
732 ex_dict['security_label'],
733 ex_dict['protocols'],
734 ex_dict['transports'],
735 [Client.from_dict(client) for client in ex_dict['clients']])
736
737 def to_dict(self):
738 return {
739 'export_id': self.export_id,
740 'path': self.path,
741 'fsal': self.fsal.to_dict(),
742 'cluster_id': self.cluster_id,
743 'daemons': sorted(list(self.daemons)),
744 'pseudo': self.pseudo,
745 'tag': self.tag,
746 'access_type': self.access_type,
747 'squash': self.squash,
748 'security_label': self.security_label,
749 'protocols': sorted(list(self.protocols)),
750 'transports': sorted(list(self.transports)),
751 'clients': [client.to_dict() for client in self.clients]
752 }
753
754
755 class ClusterType(object):
756
757 # Ganesha clusters deployed by the Orchestrator.
758 ORCHESTRATOR = 'orchestrator'
759
760 # Ganesha clusters deployed manually by the user. Specified by using the
761 # GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE setting.
762 USER = 'user'
763
764
765 class GaneshaConf(object):
766 # pylint: disable=R0902
767
768 def __init__(self, cluster_id, rados_pool, rados_namespace, daemon_confs=None):
769 self.cluster_id = cluster_id
770 self.rados_pool = rados_pool
771 self.rados_namespace = rados_namespace
772 self.daemon_confs = daemon_confs if daemon_confs is not None else []
773 self.export_conf_blocks = [] # type: ignore
774 self.daemons_conf_blocks = {} # type: ignore
775 self._defaults = {}
776 self.exports = {}
777
778 self._read_raw_config()
779
780 # load defaults
781 def_block = [b for b in self.export_conf_blocks
782 if b['block_name'] == "EXPORT_DEFAULTS"]
783 self.export_defaults = def_block[0] if def_block else {}
784 self._defaults = self.ganesha_defaults(self.export_defaults)
785
786 for export_block in [block for block in self.export_conf_blocks
787 if block['block_name'] == "EXPORT"]:
788 export = Export.from_export_block(export_block, cluster_id,
789 self._defaults)
790 self.exports[export.export_id] = export
791
792 # link daemons to exports
793 self._link_daemons_to_exports()
794
795 def _link_daemons_to_exports(self):
796 raise NotImplementedError()
797
798 @classmethod
799 def instance(cls, cluster_id):
800 cluster = Ganesha.get_cluster(cluster_id)
801 if cluster['type'] == ClusterType.ORCHESTRATOR:
802 return GaneshaConfOrchestrator(cluster_id, cluster['pool'], cluster['namespace'],
803 [cluster['daemon_conf']])
804 if cluster['type'] == ClusterType.USER:
805 return GaneshaConfUser(cluster_id, cluster['pool'], cluster['namespace'])
806 raise NFSException('Unknown cluster type `{}` for cluster `{}`'.format(
807 cluster['type'], cluster_id))
808
809 def _read_raw_config(self):
810
811 def _read_rados_obj(_obj):
812 size, _ = _obj.stat()
813 return _obj.read(size).decode("utf-8")
814
815 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
816 if self.rados_namespace:
817 ioctx.set_namespace(self.rados_namespace)
818 objs = ioctx.list_objects()
819 for obj in objs:
820 if obj.key.startswith("export-"):
821 raw_config = _read_rados_obj(obj)
822 logger.debug("read export configuration from rados "
823 "object %s/%s/%s:\n%s", self.rados_pool,
824 self.rados_namespace, obj.key, raw_config)
825 self.export_conf_blocks.extend(
826 GaneshaConfParser(raw_config).parse())
827 elif not self.daemon_confs and obj.key.startswith("conf-"):
828 # Read all `conf-xxx` for daemon configs.
829 raw_config = _read_rados_obj(obj)
830 logger.debug("read daemon configuration from rados "
831 "object %s/%s/%s:\n%s", self.rados_pool,
832 self.rados_namespace, obj.key, raw_config)
833 idx = obj.key.find('-')
834 self.daemons_conf_blocks[obj.key[idx+1:]] = \
835 GaneshaConfParser(raw_config).parse()
836
837 if self.daemon_confs:
838 # When daemon configs are provided.
839 for conf in self.daemon_confs:
840 size, _ = ioctx.stat(conf)
841 raw_config = ioctx.read(conf, size).decode("utf-8")
842 logger.debug("read daemon configuration from rados "
843 "object %s/%s/%s:\n%s", self.rados_pool,
844 self.rados_namespace, conf, raw_config)
845 self.daemons_conf_blocks[conf] = \
846 GaneshaConfParser(raw_config).parse()
847
848 def _write_raw_config(self, conf_block, obj):
849 raw_config = GaneshaConfParser.write_conf(conf_block)
850 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
851 if self.rados_namespace:
852 ioctx.set_namespace(self.rados_namespace)
853 ioctx.write_full(obj, raw_config.encode('utf-8'))
854 logger.debug(
855 "write configuration into rados object %s/%s/%s:\n%s",
856 self.rados_pool, self.rados_namespace, obj, raw_config)
857
858 @classmethod
859 def ganesha_defaults(cls, export_defaults):
860 """
861 According to
862 https://github.com/nfs-ganesha/nfs-ganesha/blob/next/src/config_samples/export.txt
863 """
864 return {
865 'access_type': export_defaults.get('access_type', 'NONE'),
866 'protocols': export_defaults.get('protocols', [3, 4]),
867 'transports': export_defaults.get('transports', ['TCP', 'UDP']),
868 'squash': export_defaults.get('squash', 'root_squash')
869 }
870
871 @classmethod
872 def format_squash(cls, squash):
873 if squash is None:
874 return None
875 if squash.lower() in ["no_root_squash", "noidsquash", "none"]:
876 return "no_root_squash"
877 if squash.lower() in ["rootid", "root_id_squash", "rootidsquash"]:
878 return "root_id_squash"
879 if squash.lower() in ["root", "root_squash", "rootsquash"]:
880 return "root_squash"
881 if squash.lower() in ["all", "all_squash", "allsquash",
882 "all_anonymous", "allanonymous"]:
883 return "all_squash"
884 logger.error("could not parse squash value: %s", squash)
885 raise NFSException("'{}' is an invalid squash option".format(squash))
886
887 @classmethod
888 def format_protocol(cls, protocol):
889 if str(protocol) in ["NFSV3", "3", "V3", "NFS3"]:
890 return 3
891 if str(protocol) in ["NFSV4", "4", "V4", "NFS4"]:
892 return 4
893 logger.error("could not parse protocol value: %s", protocol)
894 raise NFSException("'{}' is an invalid NFS protocol version identifier"
895 .format(protocol))
896
897 @classmethod
898 def format_path(cls, path):
899 if path is not None:
900 path = path.strip()
901 if len(path) > 1 and path[-1] == '/':
902 path = path[:-1]
903 return path
904
905 def validate(self, export: Export):
906 export.validate()
907
908 if 4 in export.protocols: # NFSv4 protocol
909 len_prefix = 1
910 parent_export = None
911 for ex in self.list_exports():
912 if export.tag and ex.tag == export.tag:
913 raise NFSException(
914 "Another export exists with the same tag: {}"
915 .format(export.tag))
916
917 if export.pseudo and ex.pseudo == export.pseudo:
918 raise NFSException(
919 "Another export exists with the same pseudo path: {}"
920 .format(export.pseudo))
921
922 if not ex.pseudo:
923 continue
924
925 if export.pseudo[:export.pseudo.rfind('/')+1].startswith(ex.pseudo):
926 if export.pseudo[len(ex.pseudo)] == '/':
927 if len(ex.pseudo) > len_prefix:
928 len_prefix = len(ex.pseudo)
929 parent_export = ex
930
931 if len_prefix > 1:
932 # validate pseudo path
933 idx = len(parent_export.pseudo) # type: ignore
934 idx = idx + 1 if idx > 1 else idx
935 real_path = "{}/{}".format(
936 parent_export.path # type: ignore
937 if len(parent_export.path) > 1 else "", # type: ignore
938 export.pseudo[idx:])
939 if export.fsal.name == 'CEPH':
940 cfs = CephFS()
941 if export.path != real_path and not cfs.dir_exists(real_path):
942 raise NFSException(
943 "Pseudo path ({}) invalid, path {} does not exist."
944 .format(export.pseudo, real_path))
945
946 def _gen_export_id(self):
947 exports = sorted(self.exports)
948 nid = 1
949 for e_id in exports:
950 if e_id == nid:
951 nid += 1
952 else:
953 break
954 return nid
955
956 def _persist_daemon_configuration(self):
957 raise NotImplementedError()
958
959 def _save_export(self, export):
960 self.validate(export)
961 export.fsal.create_path(export.path)
962 export.fsal.fill_keys()
963 self.exports[export.export_id] = export
964 conf_block = export.to_export_block(self.export_defaults)
965 self._write_raw_config(conf_block, "export-{}".format(export.export_id))
966 self._persist_daemon_configuration()
967
968 def _delete_export(self, export_id):
969 self._persist_daemon_configuration()
970 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
971 if self.rados_namespace:
972 ioctx.set_namespace(self.rados_namespace)
973 ioctx.remove_object("export-{}".format(export_id))
974
975 def list_exports(self):
976 return [ex for _, ex in self.exports.items()]
977
978 def create_export(self, ex_dict):
979 ex_id = self._gen_export_id()
980 export = Export.from_dict(ex_id, ex_dict)
981 self._save_export(export)
982 return ex_id
983
984 def has_export(self, export_id):
985 return export_id in self.exports
986
987 def update_export(self, ex_dict):
988 if ex_dict['export_id'] not in self.exports:
989 return None
990 old_export = self.exports[ex_dict['export_id']]
991 del self.exports[ex_dict['export_id']]
992 export = Export.from_dict(ex_dict['export_id'], ex_dict, old_export)
993 self._save_export(export)
994 self.exports[export.export_id] = export
995 return old_export
996
997 def remove_export(self, export_id):
998 if export_id not in self.exports:
999 return None
1000 export = self.exports[export_id]
1001 del self.exports[export_id]
1002 self._delete_export(export_id)
1003 return export
1004
1005 def get_export(self, export_id):
1006 if export_id in self.exports:
1007 return self.exports[export_id]
1008 return None
1009
1010 def list_daemons(self) -> List[Dict[str, Any]]:
1011 raise NotImplementedError()
1012
1013 def list_daemon_confs(self):
1014 return self.daemons_conf_blocks.keys()
1015
1016 def reload_daemons(self, daemons):
1017 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
1018 if self.rados_namespace:
1019 ioctx.set_namespace(self.rados_namespace)
1020 for daemon_id in daemons:
1021 ioctx.notify("conf-{}".format(daemon_id))
1022
1023
1024 class GaneshaConfOrchestrator(GaneshaConf):
1025 @classmethod
1026 def _get_orch_nfs_instances(cls,
1027 service_name: Optional[str] = None) -> List[DaemonDescription]:
1028 try:
1029 return OrchClient.instance().services.\
1030 list_daemons(service_name=service_name, daemon_type="nfs")
1031 except (RuntimeError, OrchestratorError, ImportError):
1032 return []
1033
1034 def _link_daemons_to_exports(self):
1035 instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
1036 daemon_ids = {instance.daemon_id for instance in instances}
1037 for _, daemon_blocks in self.daemons_conf_blocks.items():
1038 for block in daemon_blocks:
1039 if block['block_name'] == "%url":
1040 rados_url = block['value']
1041 _, _, obj = Ganesha.parse_rados_url(rados_url)
1042 if obj.startswith("export-"):
1043 export_id = int(obj[obj.find('-')+1:])
1044 self.exports[export_id].daemons.update(daemon_ids)
1045
1046 def validate(self, export: Export):
1047 daemons_list = {d['daemon_id'] for d in self.list_daemons()}
1048 if export.daemons and set(export.daemons) != daemons_list:
1049 raise NFSException('Export should be linked to all daemons.')
1050 super().validate(export)
1051
1052 def _persist_daemon_configuration(self):
1053 daemon_map = {} # type: ignore
1054 for daemon_id in self.list_daemon_confs():
1055 daemon_map[daemon_id] = []
1056
1057 for daemon_id in self.list_daemon_confs():
1058 for _, ex in self.exports.items():
1059 if ex.daemons:
1060 daemon_map[daemon_id].append({
1061 'block_name': "%url",
1062 'value': Ganesha.make_rados_url(
1063 self.rados_pool, self.rados_namespace,
1064 "export-{}".format(ex.export_id))
1065 })
1066 for daemon_id, conf_blocks in daemon_map.items():
1067 self._write_raw_config(conf_blocks, daemon_id)
1068
1069 def list_daemons(self) -> List[Dict[str, Any]]:
1070 instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
1071 return [{
1072 'cluster_id': self.cluster_id,
1073 'daemon_id': instance.daemon_id,
1074 'cluster_type': ClusterType.ORCHESTRATOR,
1075 'status': instance.status,
1076 'status_desc': instance.status_desc
1077 } for instance in instances]
1078
1079 def reload_daemons(self, daemons):
1080 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
1081 if self.rados_namespace:
1082 ioctx.set_namespace(self.rados_namespace)
1083 for daemon_id in self.list_daemon_confs():
1084 ioctx.notify(daemon_id)
1085
1086
1087 class GaneshaConfUser(GaneshaConf):
1088
1089 def _link_daemons_to_exports(self):
1090 for daemon_id, daemon_blocks in self.daemons_conf_blocks.items():
1091 for block in daemon_blocks:
1092 if block['block_name'] == "%url":
1093 rados_url = block['value']
1094 _, _, obj = Ganesha.parse_rados_url(rados_url)
1095 if obj.startswith("export-"):
1096 export_id = int(obj[obj.find('-')+1:])
1097 self.exports[export_id].daemons.add(daemon_id)
1098
1099 def validate(self, export: Export):
1100 daemons_list = [d['daemon_id'] for d in self.list_daemons()]
1101 for daemon_id in export.daemons:
1102 if daemon_id not in daemons_list:
1103 raise NFSException("Daemon '{}' does not exist".format(daemon_id))
1104 super().validate(export)
1105
1106 def _persist_daemon_configuration(self):
1107 daemon_map = {} # type: ignore
1108 for daemon_id in self.list_daemon_confs():
1109 daemon_map[daemon_id] = []
1110
1111 for _, ex in self.exports.items():
1112 for daemon in ex.daemons:
1113 daemon_map[daemon].append({
1114 'block_name': "%url",
1115 'value': Ganesha.make_rados_url(
1116 self.rados_pool, self.rados_namespace,
1117 "export-{}".format(ex.export_id))
1118 })
1119 for daemon_id, conf_blocks in daemon_map.items():
1120 self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id))
1121
1122 def list_daemons(self) -> List[Dict[str, Any]]:
1123 return [{
1124 'cluster_id': self.cluster_id,
1125 'cluster_type': ClusterType.USER,
1126 'daemon_id': daemon_id,
1127 'status': 1,
1128 'status_desc': 'running'
1129 } for daemon_id in self.list_daemon_confs()]