]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/common.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / rbd_support / common.py
1 import re
2
3 from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING, Union
4
5
6 GLOBAL_POOL_KEY = (None, None)
7
8 class NotAuthorizedError(Exception):
9 pass
10
11
12 if TYPE_CHECKING:
13 from rbd_support.module import Module
14
15
16 def is_authorized(module: 'Module',
17 pool: Optional[str],
18 namespace: Optional[str]) -> bool:
19 return module.is_authorized({"pool": pool or '',
20 "namespace": namespace or ''})
21
22
23 def authorize_request(module: 'Module',
24 pool: Optional[str],
25 namespace: Optional[str]) -> None:
26 if not is_authorized(module, pool, namespace):
27 raise NotAuthorizedError("not authorized on pool={}, namespace={}".format(
28 pool, namespace))
29
30
31 PoolKeyT = Union[Tuple[str, str], Tuple[None, None]]
32
33
34 def extract_pool_key(pool_spec: Optional[str]) -> PoolKeyT:
35 if not pool_spec:
36 return GLOBAL_POOL_KEY
37
38 match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec)
39 if not match:
40 raise ValueError("Invalid pool spec: {}".format(pool_spec))
41 return (match.group(1), match.group(2) or '')
42
43
44 def get_rbd_pools(module: 'Module') -> Dict[int, str]:
45 osd_map = module.get('osd_map')
46 return {pool['pool']: pool['pool_name'] for pool in osd_map['pools']
47 if 'rbd' in pool.get('application_metadata', {})}
48