]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/dashboard/services/ganesha.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / dashboard / services / ganesha.py
CommitLineData
11fdf7f2 1# -*- coding: utf-8 -*-
9f95a23c 2# pylint: disable=too-many-lines
11fdf7f2
TL
3from __future__ import absolute_import
4
9f95a23c 5import logging
f91f0fd5 6import os
11fdf7f2 7import re
f91f0fd5 8from typing import Any, Dict, List, Optional, cast
11fdf7f2 9
f91f0fd5
TL
10from ceph.deployment.service_spec import NFSServiceSpec
11from orchestrator import DaemonDescription, OrchestratorError, ServiceDescription
12
13from .. import mgr
14from ..exceptions import DashboardException
15from ..settings import Settings
11fdf7f2
TL
16from .cephfs import CephFS
17from .cephx import CephX
18from .orchestrator import OrchClient
19from .rgw_client import RgwClient, RequestException, NoCredentialsException
11fdf7f2
TL
20
21
9f95a23c
TL
22logger = logging.getLogger('ganesha')
23
24
11fdf7f2
TL
25class NFSException(DashboardException):
26 def __init__(self, msg):
27 super(NFSException, self).__init__(component="nfs", msg=msg)
28
29
30class Ganesha(object):
31 @classmethod
32 def _get_clusters_locations(cls):
f91f0fd5
TL
33 # pylint: disable=too-many-branches
34 # Get Orchestrator clusters
35 orch_result = cls._get_orch_clusters_locations()
36
37 # Get user-defined clusters
11fdf7f2 38 location_list_str = Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE
f91f0fd5
TL
39 if not orch_result and not location_list_str:
40 raise NFSException("NFS-Ganesha cluster is not detected. "
11fdf7f2 41 "Please set the GANESHA_RADOS_POOL_NAMESPACE "
f91f0fd5
TL
42 "setting or deploy an NFS-Ganesha cluster with the Orchestrator.")
43 result = {} # type: ignore
44 location_list = [loc.strip() for loc in location_list_str.split(
45 ",")] if location_list_str else []
11fdf7f2
TL
46 for location in location_list:
47 cluster = None
48 pool = None
49 namespace = None
50 if not location:
51 raise NFSException("Invalid Ganesha cluster RADOS "
52 "[cluster_id:]pool/namespace setting: {}"
53 .format(location))
54 if location.count(':') < 1:
55 # default cluster_id
56 if location.count('/') > 1:
57 raise NFSException("Invalid Ganesha RADOS pool/namespace "
58 "setting: {}".format(location))
59 # in this case accept pool/namespace only
60 cluster = "_default_"
61 if location.count('/') == 0:
62 pool, namespace = location, None
63 else:
64 pool, namespace = location.split('/', 1)
65 else:
66 cluster = location[:location.find(':')]
67 pool_nm = location[location.find(':')+1:]
68 if pool_nm.count('/') == 0:
69 pool, namespace = pool_nm, None
70 else:
71 pool, namespace = pool_nm.split('/', 1)
72
f91f0fd5
TL
73 if cluster in orch_result:
74 # cephadm might have set same cluster settings, ask the user to remove it.
75 raise NFSException(
76 'Detected a conflicting NFS-Ganesha cluster name `{0}`. There exists an '
77 'NFS-Ganesha cluster called `{0}` that is deployed by the Orchestrator. '
78 'Please remove or rename the cluster from the GANESHA_RADOS_POOL_NAMESPACE '
79 'setting.'.format(cluster))
80
11fdf7f2
TL
81 if cluster in result:
82 raise NFSException("Duplicate Ganesha cluster definition in "
83 "the setting: {}".format(location_list_str))
f91f0fd5
TL
84 result[cluster] = {
85 'pool': pool,
86 'namespace': namespace,
87 'type': ClusterType.USER,
88 'daemon_conf': None
89 }
90 return {**orch_result, **result}
11fdf7f2 91
f91f0fd5
TL
92 @classmethod
93 def _get_orch_clusters_locations(cls):
94 orch_result = {} # type: ignore
95 services = cls._get_orch_nfs_services()
96 for service in services:
97 spec = cast(NFSServiceSpec, service.spec)
98 try:
99 orch_result[spec.service_id] = {
100 'pool': spec.pool,
101 'namespace': spec.namespace,
102 'type': ClusterType.ORCHESTRATOR,
103 'daemon_conf': spec.rados_config_name()
104 }
105 except AttributeError as ex:
106 logger.warning('Error when getting NFS service from the Orchestrator. %s', str(ex))
107 continue
108 return orch_result
11fdf7f2
TL
109
110 @classmethod
111 def get_ganesha_clusters(cls):
112 return [cluster_id for cluster_id in cls._get_clusters_locations()]
113
81eedcae 114 @staticmethod
f91f0fd5 115 def _get_orch_nfs_services() -> List[ServiceDescription]:
81eedcae 116 try:
f91f0fd5 117 return OrchClient.instance().services.list('nfs')
81eedcae
TL
118 except (RuntimeError, OrchestratorError, ImportError):
119 return []
120
11fdf7f2
TL
121 @classmethod
122 def parse_rados_url(cls, rados_url):
123 if not rados_url.startswith("rados://"):
124 raise NFSException("Invalid NFS Ganesha RADOS configuration URL: {}"
125 .format(rados_url))
126 rados_url = rados_url[8:]
127 url_comps = rados_url.split("/")
128 if len(url_comps) < 2 or len(url_comps) > 3:
129 raise NFSException("Invalid NFS Ganesha RADOS configuration URL: "
130 "rados://{}".format(rados_url))
131 if len(url_comps) == 2:
132 return url_comps[0], None, url_comps[1]
133 return url_comps
134
135 @classmethod
136 def make_rados_url(cls, pool, namespace, obj):
137 if namespace:
138 return "rados://{}/{}/{}".format(pool, namespace, obj)
139 return "rados://{}/{}".format(pool, obj)
140
141 @classmethod
f91f0fd5 142 def get_cluster(cls, cluster_id):
11fdf7f2
TL
143 locations = cls._get_clusters_locations()
144 if cluster_id not in locations:
145 raise NFSException("Cluster not found: cluster_id={}"
146 .format(cluster_id))
147 return locations[cluster_id]
148
11fdf7f2
TL
149 @classmethod
150 def fsals_available(cls):
151 result = []
152 if CephFS.list_filesystems():
153 result.append("CEPH")
154 try:
155 if RgwClient.admin_instance().is_service_online() and \
156 RgwClient.admin_instance().is_system_user():
157 result.append("RGW")
158 except (NoCredentialsException, RequestException, LookupError):
159 pass
160 return result
161
162
163class GaneshaConfParser(object):
164 def __init__(self, raw_config):
165 self.pos = 0
166 self.text = ""
167 self.clean_config(raw_config)
168
169 def clean_config(self, raw_config):
170 for line in raw_config.split("\n"):
171 cardinal_idx = line.find('#')
172 if cardinal_idx == -1:
173 self.text += line
174 else:
175 # remove comments
176 self.text += line[:cardinal_idx]
177 if line.startswith("%"):
178 self.text += "\n"
179
180 def remove_all_whitespaces(self):
181 new_text = ""
182 in_string = False
183 in_section = False
184 for i, cha in enumerate(self.text):
185 if in_section:
186 if cha != '"' and self.text[i-1] != '\\':
187 new_text += cha
188 elif cha == '\n':
189 new_text += cha
190 in_section = False
191 elif i == (len(self.text)-1):
192 if cha != '"' and self.text[i-1] != '\\':
193 new_text += cha
194 in_section = False
195 elif not in_section and (i == 0 or self.text[i-1] == '\n') and cha == '%':
196 in_section = True
197 new_text += cha
198 elif in_string or cha not in [' ', '\n', '\t']:
199 new_text += cha
200 elif cha == '"' and self.text[i-1] != '\\':
201 in_string = not in_string
202 self.text = new_text
203
204 def stream(self):
205 return self.text[self.pos:]
206
207 def parse_block_name(self):
208 idx = self.stream().find('{')
209 if idx == -1:
210 raise Exception("Cannot find block name")
211 block_name = self.stream()[:idx]
212 self.pos += idx+1
213 return block_name
214
215 def parse_block_or_section(self):
216 if self.stream().startswith("%url "):
217 # section line
218 self.pos += 5
219 idx = self.stream().find('\n')
220 if idx == -1:
221 value = self.stream()
222 self.pos += len(self.stream())
223 else:
224 value = self.stream()[:idx]
225 self.pos += idx+1
226 block_dict = {'block_name': '%url', 'value': value}
227 return block_dict
228
229 block_name = self.parse_block_name().upper()
230 block_dict = {'block_name': block_name}
231 self.parse_block_body(block_dict)
232 if self.stream()[0] != '}':
233 raise Exception("No closing bracket '}' found at the end of block")
234 self.pos += 1
235 return block_dict
236
237 def parse_parameter_value(self, raw_value):
238 colon_idx = raw_value.find(',')
239
240 if colon_idx == -1:
241 try:
242 return int(raw_value)
243 except ValueError:
244 if raw_value == "true":
245 return True
246 if raw_value == "false":
247 return False
248 if raw_value.find('"') == 0:
249 return raw_value[1:-1]
250 return raw_value
251 else:
252 return [self.parse_parameter_value(v.strip())
253 for v in raw_value.split(',')]
254
255 def parse_stanza(self, block_dict):
256 equal_idx = self.stream().find('=')
257 semicolon_idx = self.stream().find(';')
258 if equal_idx == -1:
259 raise Exception("Malformed stanza: no equal symbol found.")
260 parameter_name = self.stream()[:equal_idx].lower()
261 parameter_value = self.stream()[equal_idx+1:semicolon_idx]
262 block_dict[parameter_name] = self.parse_parameter_value(
263 parameter_value)
264 self.pos += semicolon_idx+1
265
266 def parse_block_body(self, block_dict):
267 last_pos = self.pos
268 while True:
269 semicolon_idx = self.stream().find(';')
270 lbracket_idx = self.stream().find('{')
271 rbracket_idx = self.stream().find('}')
272
273 if rbracket_idx == 0:
274 # block end
275 return
276
277 if (semicolon_idx != -1 and lbracket_idx != -1
278 and semicolon_idx < lbracket_idx) \
279 or (semicolon_idx != -1 and lbracket_idx == -1):
280 self.parse_stanza(block_dict)
281 elif (semicolon_idx != -1 and lbracket_idx != -1
282 and semicolon_idx > lbracket_idx) or (
283 semicolon_idx == -1 and lbracket_idx != -1):
284 if '_blocks_' not in block_dict:
285 block_dict['_blocks_'] = []
286 block_dict['_blocks_'].append(self.parse_block_or_section())
287 else:
288 raise Exception("Malformed stanza: no semicolon found.")
289
290 if last_pos == self.pos:
291 raise Exception("Infinite loop while parsing block content")
292 last_pos = self.pos
293
294 def parse(self):
295 self.remove_all_whitespaces()
296 blocks = []
297 while self.stream():
298 block_dict = self.parse_block_or_section()
299 blocks.append(block_dict)
300 return blocks
301
302 @staticmethod
303 def _indentation(depth, size=4):
304 conf_str = ""
305 for _ in range(0, depth*size):
306 conf_str += " "
307 return conf_str
308
309 @staticmethod
310 def write_block_body(block, depth=0):
311 def format_val(key, val):
312 if isinstance(val, list):
313 return ', '.join([format_val(key, v) for v in val])
314 if isinstance(val, bool):
315 return str(val).lower()
316 if isinstance(val, int) or (block['block_name'] == 'CLIENT'
317 and key == 'clients'):
318 return '{}'.format(val)
319 return '"{}"'.format(val)
320
321 conf_str = ""
322 for key, val in block.items():
323 if key == 'block_name':
324 continue
325 elif key == '_blocks_':
326 for blo in val:
327 conf_str += GaneshaConfParser.write_block(blo, depth)
328 elif val:
329 conf_str += GaneshaConfParser._indentation(depth)
330 conf_str += '{} = {};\n'.format(key, format_val(key, val))
331 return conf_str
332
333 @staticmethod
334 def write_block(block, depth):
335 if block['block_name'] == "%url":
336 return '%url "{}"\n\n'.format(block['value'])
337
338 conf_str = ""
339 conf_str += GaneshaConfParser._indentation(depth)
340 conf_str += format(block['block_name'])
341 conf_str += " {\n"
342 conf_str += GaneshaConfParser.write_block_body(block, depth+1)
343 conf_str += GaneshaConfParser._indentation(depth)
344 conf_str += "}\n\n"
345 return conf_str
346
347 @staticmethod
348 def write_conf(blocks):
349 if not isinstance(blocks, list):
350 blocks = [blocks]
351 conf_str = ""
352 for block in blocks:
353 conf_str += GaneshaConfParser.write_block(block, 0)
354 return conf_str
355
356
357class FSal(object):
358 def __init__(self, name):
359 self.name = name
360
361 @classmethod
362 def validate_path(cls, _):
363 raise NotImplementedError()
364
365 def validate(self):
366 raise NotImplementedError()
367
368 def fill_keys(self):
369 raise NotImplementedError()
370
371 def create_path(self, path):
372 raise NotImplementedError()
373
374 @staticmethod
375 def from_fsal_block(fsal_block):
376 if fsal_block['name'] == "CEPH":
377 return CephFSFSal.from_fsal_block(fsal_block)
378 if fsal_block['name'] == 'RGW':
379 return RGWFSal.from_fsal_block(fsal_block)
380 return None
381
382 def to_fsal_block(self):
383 raise NotImplementedError()
384
385 @staticmethod
386 def from_dict(fsal_dict):
387 if fsal_dict['name'] == "CEPH":
388 return CephFSFSal.from_dict(fsal_dict)
389 if fsal_dict['name'] == 'RGW':
390 return RGWFSal.from_dict(fsal_dict)
391 return None
392
393 def to_dict(self):
394 raise NotImplementedError()
395
396
397class RGWFSal(FSal):
398 def __init__(self, name, rgw_user_id, access_key, secret_key):
399 super(RGWFSal, self).__init__(name)
400 self.rgw_user_id = rgw_user_id
401 self.access_key = access_key
402 self.secret_key = secret_key
403
404 @classmethod
405 def validate_path(cls, path):
406 return path == "/" or re.match(r'^[^/><|&()#?]+$', path)
407
408 def validate(self):
409 if not self.rgw_user_id:
410 raise NFSException('RGW user must be specified')
411
412 if not RgwClient.admin_instance().user_exists(self.rgw_user_id):
413 raise NFSException("RGW user '{}' does not exist"
414 .format(self.rgw_user_id))
415
416 def fill_keys(self):
417 keys = RgwClient.admin_instance().get_user_keys(self.rgw_user_id)
418 self.access_key = keys['access_key']
419 self.secret_key = keys['secret_key']
420
421 def create_path(self, path):
422 if path == '/': # nothing to do
423 return
424 rgw = RgwClient.instance(self.rgw_user_id)
425 try:
426 exists = rgw.bucket_exists(path, self.rgw_user_id)
427 logger.debug('Checking existence of RGW bucket "%s" for user "%s": %s',
428 path, self.rgw_user_id, exists)
429 except RequestException as exp:
430 if exp.status_code == 403:
431 raise NFSException('Cannot create bucket "{}" as it already '
432 'exists, and belongs to other user.'
433 .format(path))
9f95a23c 434 raise exp
11fdf7f2
TL
435 if not exists:
436 logger.info('Creating new RGW bucket "%s" for user "%s"', path,
437 self.rgw_user_id)
438 rgw.create_bucket(path)
439
440 @classmethod
441 def from_fsal_block(cls, fsal_block):
442 return cls(fsal_block['name'],
443 fsal_block['user_id'],
444 fsal_block['access_key_id'],
445 fsal_block['secret_access_key'])
446
447 def to_fsal_block(self):
448 return {
449 'block_name': 'FSAL',
450 'name': self.name,
451 'user_id': self.rgw_user_id,
452 'access_key_id': self.access_key,
453 'secret_access_key': self.secret_key
454 }
455
456 @classmethod
457 def from_dict(cls, fsal_dict):
458 return cls(fsal_dict['name'], fsal_dict['rgw_user_id'], None, None)
459
460 def to_dict(self):
461 return {
462 'name': self.name,
463 'rgw_user_id': self.rgw_user_id
464 }
465
466
467class CephFSFSal(FSal):
468 def __init__(self, name, user_id=None, fs_name=None, sec_label_xattr=None,
469 cephx_key=None):
470 super(CephFSFSal, self).__init__(name)
471 self.fs_name = fs_name
472 self.user_id = user_id
473 self.sec_label_xattr = sec_label_xattr
474 self.cephx_key = cephx_key
475
476 @classmethod
477 def validate_path(cls, path):
478 return re.match(r'^/[^><|&()?]*$', path)
479
480 def validate(self):
481 if self.user_id and self.user_id not in CephX.list_clients():
482 raise NFSException("cephx user '{}' does not exist"
483 .format(self.user_id))
484
485 def fill_keys(self):
486 if self.user_id:
487 self.cephx_key = CephX.get_client_key(self.user_id)
488
489 def create_path(self, path):
9f95a23c 490 cfs = CephFS(self.fs_name)
f91f0fd5
TL
491 if path == os.sep:
492 return
9f95a23c 493 cfs.mk_dirs(path)
11fdf7f2
TL
494
495 @classmethod
496 def from_fsal_block(cls, fsal_block):
497 return cls(fsal_block['name'],
498 fsal_block.get('user_id', None),
499 fsal_block.get('filesystem', None),
500 fsal_block.get('sec_label_xattr', None),
501 fsal_block.get('secret_access_key', None))
502
503 def to_fsal_block(self):
504 result = {
505 'block_name': 'FSAL',
506 'name': self.name,
507 }
508 if self.user_id:
509 result['user_id'] = self.user_id
510 if self.fs_name:
511 result['filesystem'] = self.fs_name
512 if self.sec_label_xattr:
513 result['sec_label_xattr'] = self.sec_label_xattr
514 if self.cephx_key:
515 result['secret_access_key'] = self.cephx_key
516 return result
517
518 @classmethod
519 def from_dict(cls, fsal_dict):
520 return cls(fsal_dict['name'], fsal_dict['user_id'],
521 fsal_dict['fs_name'], fsal_dict['sec_label_xattr'], None)
522
523 def to_dict(self):
524 return {
525 'name': self.name,
526 'user_id': self.user_id,
527 'fs_name': self.fs_name,
528 'sec_label_xattr': self.sec_label_xattr
529 }
530
531
532class Client(object):
533 def __init__(self, addresses, access_type=None, squash=None):
534 self.addresses = addresses
535 self.access_type = access_type
536 self.squash = GaneshaConf.format_squash(squash)
537
538 @classmethod
539 def from_client_block(cls, client_block):
540 addresses = client_block['clients']
541 if not isinstance(addresses, list):
542 addresses = [addresses]
543 return cls(addresses,
544 client_block.get('access_type', None),
545 client_block.get('squash', None))
546
547 def to_client_block(self):
548 result = {
549 'block_name': 'CLIENT',
550 'clients': self.addresses,
551 }
552 if self.access_type:
553 result['access_type'] = self.access_type
554 if self.squash:
555 result['squash'] = self.squash
556 return result
557
558 @classmethod
559 def from_dict(cls, client_dict):
560 return cls(client_dict['addresses'], client_dict['access_type'],
561 client_dict['squash'])
562
563 def to_dict(self):
564 return {
565 'addresses': self.addresses,
566 'access_type': self.access_type,
567 'squash': self.squash
568 }
569
570
571class Export(object):
572 # pylint: disable=R0902
573 def __init__(self, export_id, path, fsal, cluster_id, daemons, pseudo=None,
574 tag=None, access_type=None, squash=None,
575 attr_expiration_time=None, security_label=False,
576 protocols=None, transports=None, clients=None):
577 self.export_id = export_id
578 self.path = GaneshaConf.format_path(path)
579 self.fsal = fsal
580 self.cluster_id = cluster_id
581 self.daemons = set(daemons)
582 self.pseudo = GaneshaConf.format_path(pseudo)
583 self.tag = tag
584 self.access_type = access_type
585 self.squash = GaneshaConf.format_squash(squash)
586 if attr_expiration_time is None:
587 self.attr_expiration_time = 0
588 else:
589 self.attr_expiration_time = attr_expiration_time
590 self.security_label = security_label
591 self.protocols = {GaneshaConf.format_protocol(p) for p in protocols}
592 self.transports = set(transports)
593 self.clients = clients
594
f91f0fd5 595 def validate(self):
11fdf7f2 596 # pylint: disable=R0912
11fdf7f2
TL
597 if not self.fsal.validate_path(self.path):
598 raise NFSException("Export path ({}) is invalid.".format(self.path))
599
600 if not self.protocols:
601 raise NFSException(
602 "No NFS protocol version specified for the export.")
603
604 if not self.transports:
605 raise NFSException(
606 "No network transport type specified for the export.")
607
608 for t in self.transports:
609 match = re.match(r'^TCP$|^UDP$', t)
610 if not match:
611 raise NFSException(
612 "'{}' is an invalid network transport type identifier"
613 .format(t))
614
615 self.fsal.validate()
616
617 if 4 in self.protocols:
618 if not self.pseudo:
619 raise NFSException(
620 "Pseudo path is required when NFSv4 protocol is used")
621 match = re.match(r'^/[^><|&()]*$', self.pseudo)
622 if not match:
623 raise NFSException(
624 "Export pseudo path ({}) is invalid".format(self.pseudo))
625
626 if self.tag:
627 match = re.match(r'^[^/><|:&()]+$', self.tag)
628 if not match:
629 raise NFSException(
630 "Export tag ({}) is invalid".format(self.tag))
631
632 if self.fsal.name == 'RGW' and 4 not in self.protocols and not self.tag:
633 raise NFSException(
634 "Tag is mandatory for RGW export when using only NFSv3")
635
636 @classmethod
637 def from_export_block(cls, export_block, cluster_id, defaults):
9f95a23c 638 logger.debug("parsing export block: %s", export_block)
11fdf7f2
TL
639
640 fsal_block = [b for b in export_block['_blocks_']
641 if b['block_name'] == "FSAL"]
642
643 protocols = export_block.get('protocols', defaults['protocols'])
644 if not isinstance(protocols, list):
645 protocols = [protocols]
646
647 transports = export_block.get('transports', defaults['transports'])
648 if not isinstance(transports, list):
649 transports = [transports]
650
651 client_blocks = [b for b in export_block['_blocks_']
652 if b['block_name'] == "CLIENT"]
653
654 return cls(export_block['export_id'],
655 export_block['path'],
656 FSal.from_fsal_block(fsal_block[0]),
657 cluster_id,
658 [],
659 export_block.get('pseudo', None),
660 export_block.get('tag', None),
661 export_block.get('access_type', defaults['access_type']),
662 export_block.get('squash', defaults['squash']),
663 export_block.get('attr_expiration_time', None),
664 export_block.get('security_label', False),
665 protocols,
666 transports,
667 [Client.from_client_block(client)
668 for client in client_blocks])
669
670 def to_export_block(self, defaults):
671 # pylint: disable=too-many-branches
672 result = {
673 'block_name': 'EXPORT',
674 'export_id': self.export_id,
675 'path': self.path
676 }
677 if self.pseudo:
678 result['pseudo'] = self.pseudo
679 if self.tag:
680 result['tag'] = self.tag
681 if 'access_type' not in defaults \
682 or self.access_type != defaults['access_type']:
683 result['access_type'] = self.access_type
684 if 'squash' not in defaults or self.squash != defaults['squash']:
685 result['squash'] = self.squash
686 if self.fsal.name == 'CEPH':
687 result['attr_expiration_time'] = self.attr_expiration_time
688 result['security_label'] = self.security_label
689 if 'protocols' not in defaults:
690 result['protocols'] = [p for p in self.protocols]
691 else:
692 def_proto = defaults['protocols']
693 if not isinstance(def_proto, list):
694 def_proto = set([def_proto])
695 if self.protocols != def_proto:
696 result['protocols'] = [p for p in self.protocols]
697 if 'transports' not in defaults:
698 result['transports'] = [t for t in self.transports]
699 else:
700 def_transp = defaults['transports']
701 if not isinstance(def_transp, list):
702 def_transp = set([def_transp])
703 if self.transports != def_transp:
704 result['transports'] = [t for t in self.transports]
705
706 result['_blocks_'] = [self.fsal.to_fsal_block()]
707 result['_blocks_'].extend([client.to_client_block()
708 for client in self.clients])
709 return result
710
711 @classmethod
712 def from_dict(cls, export_id, ex_dict, old_export=None):
713 return cls(export_id,
714 ex_dict['path'],
715 FSal.from_dict(ex_dict['fsal']),
716 ex_dict['cluster_id'],
717 ex_dict['daemons'],
718 ex_dict['pseudo'],
719 ex_dict['tag'],
720 ex_dict['access_type'],
721 ex_dict['squash'],
722 old_export.attr_expiration_time if old_export else None,
723 ex_dict['security_label'],
724 ex_dict['protocols'],
725 ex_dict['transports'],
726 [Client.from_dict(client) for client in ex_dict['clients']])
727
728 def to_dict(self):
729 return {
730 'export_id': self.export_id,
731 'path': self.path,
732 'fsal': self.fsal.to_dict(),
733 'cluster_id': self.cluster_id,
734 'daemons': sorted([d for d in self.daemons]),
735 'pseudo': self.pseudo,
736 'tag': self.tag,
737 'access_type': self.access_type,
738 'squash': self.squash,
739 'security_label': self.security_label,
740 'protocols': sorted([p for p in self.protocols]),
741 'transports': sorted([t for t in self.transports]),
742 'clients': [client.to_dict() for client in self.clients]
743 }
744
745
f91f0fd5
TL
746class ClusterType(object):
747
748 # Ganesha clusters deployed by the Orchestrator.
749 ORCHESTRATOR = 'orchestrator'
750
751 # Ganesha clusters deployed manually by the user. Specified by using the
752 # GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE setting.
753 USER = 'user'
754
755
11fdf7f2
TL
756class GaneshaConf(object):
757 # pylint: disable=R0902
758
f91f0fd5 759 def __init__(self, cluster_id, rados_pool, rados_namespace, daemon_confs=None):
11fdf7f2
TL
760 self.cluster_id = cluster_id
761 self.rados_pool = rados_pool
762 self.rados_namespace = rados_namespace
f91f0fd5 763 self.daemon_confs = daemon_confs if daemon_confs is not None else []
9f95a23c
TL
764 self.export_conf_blocks = [] # type: ignore
765 self.daemons_conf_blocks = {} # type: ignore
11fdf7f2
TL
766 self._defaults = {}
767 self.exports = {}
768
769 self._read_raw_config()
770
771 # load defaults
772 def_block = [b for b in self.export_conf_blocks
773 if b['block_name'] == "EXPORT_DEFAULTS"]
774 self.export_defaults = def_block[0] if def_block else {}
775 self._defaults = self.ganesha_defaults(self.export_defaults)
776
777 for export_block in [block for block in self.export_conf_blocks
778 if block['block_name'] == "EXPORT"]:
779 export = Export.from_export_block(export_block, cluster_id,
780 self._defaults)
781 self.exports[export.export_id] = export
782
783 # link daemons to exports
f91f0fd5
TL
784 self._link_daemons_to_exports()
785
786 def _link_daemons_to_exports(self):
787 raise NotImplementedError()
11fdf7f2
TL
788
789 @classmethod
790 def instance(cls, cluster_id):
f91f0fd5
TL
791 cluster = Ganesha.get_cluster(cluster_id)
792 if cluster['type'] == ClusterType.ORCHESTRATOR:
793 return GaneshaConfOrchestrator(cluster_id, cluster['pool'], cluster['namespace'],
794 [cluster['daemon_conf']])
795 if cluster['type'] == ClusterType.USER:
796 return GaneshaConfUser(cluster_id, cluster['pool'], cluster['namespace'])
797 raise NFSException('Unknown cluster type `{}` for cluster `{}`'.format(
798 cluster['type'], cluster_id))
11fdf7f2
TL
799
800 def _read_raw_config(self):
f91f0fd5
TL
801
802 def _read_rados_obj(_obj):
803 size, _ = _obj.stat()
804 return _obj.read(size).decode("utf-8")
805
11fdf7f2
TL
806 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
807 if self.rados_namespace:
808 ioctx.set_namespace(self.rados_namespace)
809 objs = ioctx.list_objects()
810 for obj in objs:
811 if obj.key.startswith("export-"):
f91f0fd5 812 raw_config = _read_rados_obj(obj)
9f95a23c 813 logger.debug("read export configuration from rados "
11fdf7f2
TL
814 "object %s/%s/%s:\n%s", self.rados_pool,
815 self.rados_namespace, obj.key, raw_config)
816 self.export_conf_blocks.extend(
817 GaneshaConfParser(raw_config).parse())
f91f0fd5
TL
818 elif not self.daemon_confs and obj.key.startswith("conf-"):
819 # Read all `conf-xxx` for daemon configs.
820 raw_config = _read_rados_obj(obj)
9f95a23c 821 logger.debug("read daemon configuration from rados "
11fdf7f2
TL
822 "object %s/%s/%s:\n%s", self.rados_pool,
823 self.rados_namespace, obj.key, raw_config)
824 idx = obj.key.find('-')
825 self.daemons_conf_blocks[obj.key[idx+1:]] = \
826 GaneshaConfParser(raw_config).parse()
827
f91f0fd5
TL
828 if self.daemon_confs:
829 # When daemon configs are provided.
830 for conf in self.daemon_confs:
831 size, _ = ioctx.stat(conf)
832 raw_config = ioctx.read(conf, size).decode("utf-8")
833 logger.debug("read daemon configuration from rados "
834 "object %s/%s/%s:\n%s", self.rados_pool,
835 self.rados_namespace, conf, raw_config)
836 self.daemons_conf_blocks[conf] = \
837 GaneshaConfParser(raw_config).parse()
838
11fdf7f2
TL
839 def _write_raw_config(self, conf_block, obj):
840 raw_config = GaneshaConfParser.write_conf(conf_block)
841 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
842 if self.rados_namespace:
843 ioctx.set_namespace(self.rados_namespace)
844 ioctx.write_full(obj, raw_config.encode('utf-8'))
845 logger.debug(
9f95a23c 846 "write configuration into rados object %s/%s/%s:\n%s",
11fdf7f2
TL
847 self.rados_pool, self.rados_namespace, obj, raw_config)
848
849 @classmethod
850 def ganesha_defaults(cls, export_defaults):
851 """
852 According to
853 https://github.com/nfs-ganesha/nfs-ganesha/blob/next/src/config_samples/export.txt
854 """
855 return {
856 'access_type': export_defaults.get('access_type', 'NONE'),
857 'protocols': export_defaults.get('protocols', [3, 4]),
858 'transports': export_defaults.get('transports', ['TCP', 'UDP']),
859 'squash': export_defaults.get('squash', 'root_squash')
860 }
861
862 @classmethod
863 def format_squash(cls, squash):
864 if squash is None:
865 return None
866 if squash.lower() in ["no_root_squash", "noidsquash", "none"]:
867 return "no_root_squash"
868 if squash.lower() in ["rootid", "root_id_squash", "rootidsquash"]:
869 return "root_id_squash"
870 if squash.lower() in ["root", "root_squash", "rootsquash"]:
871 return "root_squash"
872 if squash.lower() in ["all", "all_squash", "allsquash",
873 "all_anonymous", "allanonymous"]:
874 return "all_squash"
9f95a23c 875 logger.error("could not parse squash value: %s", squash)
11fdf7f2
TL
876 raise NFSException("'{}' is an invalid squash option".format(squash))
877
878 @classmethod
879 def format_protocol(cls, protocol):
880 if str(protocol) in ["NFSV3", "3", "V3", "NFS3"]:
881 return 3
882 if str(protocol) in ["NFSV4", "4", "V4", "NFS4"]:
883 return 4
9f95a23c 884 logger.error("could not parse protocol value: %s", protocol)
11fdf7f2
TL
885 raise NFSException("'{}' is an invalid NFS protocol version identifier"
886 .format(protocol))
887
888 @classmethod
889 def format_path(cls, path):
890 if path is not None:
891 path = path.strip()
892 if len(path) > 1 and path[-1] == '/':
893 path = path[:-1]
894 return path
895
f91f0fd5
TL
896 def validate(self, export: Export):
897 export.validate()
11fdf7f2
TL
898
899 if 4 in export.protocols: # NFSv4 protocol
900 len_prefix = 1
901 parent_export = None
902 for ex in self.list_exports():
903 if export.tag and ex.tag == export.tag:
904 raise NFSException(
905 "Another export exists with the same tag: {}"
906 .format(export.tag))
907
908 if export.pseudo and ex.pseudo == export.pseudo:
909 raise NFSException(
910 "Another export exists with the same pseudo path: {}"
911 .format(export.pseudo))
912
913 if not ex.pseudo:
914 continue
915
916 if export.pseudo[:export.pseudo.rfind('/')+1].startswith(ex.pseudo):
917 if export.pseudo[len(ex.pseudo)] == '/':
918 if len(ex.pseudo) > len_prefix:
919 len_prefix = len(ex.pseudo)
920 parent_export = ex
921
922 if len_prefix > 1:
923 # validate pseudo path
9f95a23c 924 idx = len(parent_export.pseudo) # type: ignore
11fdf7f2 925 idx = idx + 1 if idx > 1 else idx
9f95a23c
TL
926 real_path = "{}/{}".format(
927 parent_export.path # type: ignore
928 if len(parent_export.path) > 1 else "", # type: ignore
929 export.pseudo[idx:])
11fdf7f2
TL
930 if export.fsal.name == 'CEPH':
931 cfs = CephFS()
932 if export.path != real_path and not cfs.dir_exists(real_path):
933 raise NFSException(
934 "Pseudo path ({}) invalid, path {} does not exist."
935 .format(export.pseudo, real_path))
936
937 def _gen_export_id(self):
938 exports = sorted(self.exports)
939 nid = 1
940 for e_id in exports:
941 if e_id == nid:
942 nid += 1
943 else:
944 break
945 return nid
946
947 def _persist_daemon_configuration(self):
f91f0fd5 948 raise NotImplementedError()
11fdf7f2
TL
949
950 def _save_export(self, export):
951 self.validate(export)
952 export.fsal.create_path(export.path)
953 export.fsal.fill_keys()
954 self.exports[export.export_id] = export
955 conf_block = export.to_export_block(self.export_defaults)
956 self._write_raw_config(conf_block, "export-{}".format(export.export_id))
957 self._persist_daemon_configuration()
958
959 def _delete_export(self, export_id):
960 self._persist_daemon_configuration()
961 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
962 if self.rados_namespace:
963 ioctx.set_namespace(self.rados_namespace)
964 ioctx.remove_object("export-{}".format(export_id))
965
966 def list_exports(self):
967 return [ex for _, ex in self.exports.items()]
968
969 def create_export(self, ex_dict):
970 ex_id = self._gen_export_id()
971 export = Export.from_dict(ex_id, ex_dict)
972 self._save_export(export)
973 return ex_id
974
975 def has_export(self, export_id):
976 return export_id in self.exports
977
978 def update_export(self, ex_dict):
979 if ex_dict['export_id'] not in self.exports:
980 return None
981 old_export = self.exports[ex_dict['export_id']]
982 del self.exports[ex_dict['export_id']]
983 export = Export.from_dict(ex_dict['export_id'], ex_dict, old_export)
984 self._save_export(export)
985 self.exports[export.export_id] = export
986 return old_export
987
988 def remove_export(self, export_id):
989 if export_id not in self.exports:
990 return None
991 export = self.exports[export_id]
992 del self.exports[export_id]
993 self._delete_export(export_id)
994 return export
995
996 def get_export(self, export_id):
997 if export_id in self.exports:
998 return self.exports[export_id]
999 return None
1000
f91f0fd5
TL
1001 def list_daemons(self) -> List[Dict[str, Any]]:
1002 raise NotImplementedError()
1003
1004 def list_daemon_confs(self):
1005 return self.daemons_conf_blocks.keys()
11fdf7f2
TL
1006
1007 def reload_daemons(self, daemons):
1008 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
1009 if self.rados_namespace:
1010 ioctx.set_namespace(self.rados_namespace)
1011 for daemon_id in daemons:
1012 ioctx.notify("conf-{}".format(daemon_id))
f91f0fd5
TL
1013
1014
1015class GaneshaConfOrchestrator(GaneshaConf):
1016 @classmethod
1017 def _get_orch_nfs_instances(cls,
1018 service_name: Optional[str] = None) -> List[DaemonDescription]:
1019 try:
1020 return OrchClient.instance().services.\
1021 list_daemons(service_name=service_name, daemon_type="nfs")
1022 except (RuntimeError, OrchestratorError, ImportError):
1023 return []
1024
1025 def _link_daemons_to_exports(self):
1026 instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
1027 daemon_ids = {instance.daemon_id for instance in instances}
1028 for _, daemon_blocks in self.daemons_conf_blocks.items():
1029 for block in daemon_blocks:
1030 if block['block_name'] == "%url":
1031 rados_url = block['value']
1032 _, _, obj = Ganesha.parse_rados_url(rados_url)
1033 if obj.startswith("export-"):
1034 export_id = int(obj[obj.find('-')+1:])
1035 self.exports[export_id].daemons.update(daemon_ids)
1036
1037 def validate(self, export: Export):
1038 daemons_list = {d['daemon_id'] for d in self.list_daemons()}
1039 if export.daemons and set(export.daemons) != daemons_list:
1040 raise NFSException('Export should be linked to all daemons.')
1041 super().validate(export)
1042
1043 def _persist_daemon_configuration(self):
1044 daemon_map = {} # type: ignore
1045 for daemon_id in self.list_daemon_confs():
1046 daemon_map[daemon_id] = []
1047
1048 for daemon_id in self.list_daemon_confs():
1049 for _, ex in self.exports.items():
1050 if ex.daemons:
1051 daemon_map[daemon_id].append({
1052 'block_name': "%url",
1053 'value': Ganesha.make_rados_url(
1054 self.rados_pool, self.rados_namespace,
1055 "export-{}".format(ex.export_id))
1056 })
1057 for daemon_id, conf_blocks in daemon_map.items():
1058 self._write_raw_config(conf_blocks, daemon_id)
1059
1060 def list_daemons(self) -> List[Dict[str, Any]]:
1061 instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
1062 return [{
1063 'cluster_id': self.cluster_id,
1064 'daemon_id': instance.daemon_id,
1065 'cluster_type': ClusterType.ORCHESTRATOR,
1066 'status': instance.status,
1067 'status_desc': instance.status_desc
1068 } for instance in instances]
1069
1070 def reload_daemons(self, daemons):
1071 with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
1072 if self.rados_namespace:
1073 ioctx.set_namespace(self.rados_namespace)
1074 for daemon_id in self.list_daemon_confs():
1075 ioctx.notify(daemon_id)
1076
1077
1078class GaneshaConfUser(GaneshaConf):
1079
1080 def _link_daemons_to_exports(self):
1081 for daemon_id, daemon_blocks in self.daemons_conf_blocks.items():
1082 for block in daemon_blocks:
1083 if block['block_name'] == "%url":
1084 rados_url = block['value']
1085 _, _, obj = Ganesha.parse_rados_url(rados_url)
1086 if obj.startswith("export-"):
1087 export_id = int(obj[obj.find('-')+1:])
1088 self.exports[export_id].daemons.add(daemon_id)
1089
1090 def validate(self, export: Export):
1091 daemons_list = [d['daemon_id'] for d in self.list_daemons()]
1092 for daemon_id in export.daemons:
1093 if daemon_id not in daemons_list:
1094 raise NFSException("Daemon '{}' does not exist".format(daemon_id))
1095 super().validate(export)
1096
1097 def _persist_daemon_configuration(self):
1098 daemon_map = {} # type: ignore
1099 for daemon_id in self.list_daemon_confs():
1100 daemon_map[daemon_id] = []
1101
1102 for _, ex in self.exports.items():
1103 for daemon in ex.daemons:
1104 daemon_map[daemon].append({
1105 'block_name': "%url",
1106 'value': Ganesha.make_rados_url(
1107 self.rados_pool, self.rados_namespace,
1108 "export-{}".format(ex.export_id))
1109 })
1110 for daemon_id, conf_blocks in daemon_map.items():
1111 self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id))
1112
1113 def list_daemons(self) -> List[Dict[str, Any]]:
1114 return [{
1115 'cluster_id': self.cluster_id,
1116 'cluster_type': ClusterType.USER,
1117 'daemon_id': daemon_id,
1118 'status': 1,
1119 'status_desc': 'running'
1120 } for daemon_id in self.list_daemon_confs()]