]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/common.py
3 from typing
import Any
, Dict
, Optional
, Tuple
, TYPE_CHECKING
, Union
6 GLOBAL_POOL_KEY
= (None, None)
8 class NotAuthorizedError(Exception):
13 from rbd_support
.module
import Module
16 def is_authorized(module
: 'Module',
18 namespace
: Optional
[str]) -> bool:
19 return module
.is_authorized({"pool": pool
or '',
20 "namespace": namespace
or ''})
23 def authorize_request(module
: 'Module',
25 namespace
: Optional
[str]) -> None:
26 if not is_authorized(module
, pool
, namespace
):
27 raise NotAuthorizedError("not authorized on pool={}, namespace={}".format(
31 PoolKeyT
= Union
[Tuple
[str, str], Tuple
[None, None]]
34 def extract_pool_key(pool_spec
: Optional
[str]) -> PoolKeyT
:
36 return GLOBAL_POOL_KEY
38 match
= re
.match(r
'^([^/]+)(?:/([^/]+))?$', pool_spec
)
40 raise ValueError("Invalid pool spec: {}".format(pool_spec
))
41 return (match
.group(1), match
.group(2) or '')
44 def get_rbd_pools(module
: 'Module') -> Dict
[int, str]:
45 osd_map
= module
.get('osd_map')
46 return {pool
['pool']: pool
['pool_name'] for pool
in osd_map
['pools']
47 if 'rbd' in pool
.get('application_metadata', {})}