]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rgw/module.py
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / pybind / mgr / rgw / module.py
1 import json
2 import threading
3 import yaml
4 import errno
5 import base64
6 import functools
7 import sys
8
9 from mgr_module import MgrModule, CLICommand, HandleCommandResult, Option
10 import orchestrator
11
12 from ceph.deployment.service_spec import RGWSpec, PlacementSpec, SpecValidationError
13 from typing import Any, Optional, Sequence, Iterator, List, Callable, TypeVar, cast, Dict, Tuple, Union, TYPE_CHECKING
14
15 from ceph.rgw.types import RGWAMException, RGWAMEnvMgr, RealmToken
16 from ceph.rgw.rgwam_core import EnvArgs, RGWAM
17 from orchestrator import OrchestratorClientMixin, OrchestratorError, DaemonDescription, OrchResult
18
19
20 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
21
22 if TYPE_CHECKING:
23 # this uses a version check as opposed to a try/except because this
24 # form makes mypy happy and try/except doesn't.
25 if sys.version_info >= (3, 8):
26 from typing import Protocol
27 else:
28 from typing_extensions import Protocol
29
30 class MgrModuleProtocol(Protocol):
31 def tool_exec(self, args: List[str]) -> Tuple[int, str, str]:
32 ...
33
34 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
35 ...
36
37 def list_daemons(self, service_name: Optional[str] = None,
38 daemon_type: Optional[str] = None,
39 daemon_id: Optional[str] = None,
40 host: Optional[str] = None,
41 refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
42 ...
43 else:
44 class MgrModuleProtocol:
45 pass
46
47
48 class RGWSpecParsingError(Exception):
49 pass
50
51
52 class OrchestratorAPI(OrchestratorClientMixin):
53 def __init__(self, mgr: MgrModule):
54 super(OrchestratorAPI, self).__init__()
55 self.set_mgr(mgr)
56
57 def status(self) -> Dict[str, Union[str, bool]]:
58 try:
59 status, message, _module_details = super().available()
60 return dict(available=status, message=message)
61 except (RuntimeError, OrchestratorError, ImportError) as e:
62 return dict(available=False, message=f'Orchestrator is unavailable: {e}')
63
64
65 class RGWAMOrchMgr(RGWAMEnvMgr):
66 def __init__(self, mgr: MgrModuleProtocol):
67 self.mgr = mgr
68
69 def tool_exec(self, prog: str, args: List[str]) -> Tuple[List[str], int, str, str]:
70 cmd = [prog] + args
71 rc, stdout, stderr = self.mgr.tool_exec(args=cmd)
72 return cmd, rc, stdout, stderr
73
74 def apply_rgw(self, spec: RGWSpec) -> None:
75 completion = self.mgr.apply_rgw(spec)
76 orchestrator.raise_if_exception(completion)
77
78 def list_daemons(self, service_name: Optional[str] = None,
79 daemon_type: Optional[str] = None,
80 daemon_id: Optional[str] = None,
81 host: Optional[str] = None,
82 refresh: bool = True) -> List['DaemonDescription']:
83 completion = self.mgr.list_daemons(service_name,
84 daemon_type,
85 daemon_id=daemon_id,
86 host=host,
87 refresh=refresh)
88 return orchestrator.raise_if_exception(completion)
89
90
91 def check_orchestrator(func: FuncT) -> FuncT:
92 @functools.wraps(func)
93 def wrapper(self: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
94 available = self.api.status()['available']
95 if available:
96 return func(self, *args, **kwargs)
97 else:
98 err_msg = "Cephadm is not available. Please enable cephadm by 'ceph mgr module enable cephadm'."
99 return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg)
100 return cast(FuncT, wrapper)
101
102
103 class Module(orchestrator.OrchestratorClientMixin, MgrModule):
104 MODULE_OPTIONS: List[Option] = []
105
106 # These are "native" Ceph options that this module cares about.
107 NATIVE_OPTIONS: List[Option] = []
108
109 def __init__(self, *args: Any, **kwargs: Any):
110 self.inited = False
111 self.lock = threading.Lock()
112 super(Module, self).__init__(*args, **kwargs)
113 self.api = OrchestratorAPI(self)
114
115 # ensure config options members are initialized; see config_notify()
116 self.config_notify()
117
118 with self.lock:
119 self.inited = True
120 self.env = EnvArgs(RGWAMOrchMgr(self))
121
122 # set up some members to enable the serve() method and shutdown()
123 self.run = True
124 self.event = threading.Event()
125
126 def config_notify(self) -> None:
127 """
128 This method is called whenever one of our config options is changed.
129 """
130 # This is some boilerplate that stores MODULE_OPTIONS in a class
131 # member, so that, for instance, the 'emphatic' option is always
132 # available as 'self.emphatic'.
133 for opt in self.MODULE_OPTIONS:
134 setattr(self,
135 opt['name'],
136 self.get_module_option(opt['name']))
137 self.log.debug(' mgr option %s = %s',
138 opt['name'], getattr(self, opt['name']))
139 # Do the same for the native options.
140 for opt in self.NATIVE_OPTIONS:
141 setattr(self,
142 opt, # type: ignore
143 self.get_ceph_option(opt))
144 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
145
146 @CLICommand('rgw admin', perm='rw')
147 def _cmd_rgw_admin(self, params: Sequence[str]) -> HandleCommandResult:
148 """rgw admin"""
149 cmd, returncode, out, err = self.env.mgr.tool_exec('radosgw-admin', params or [])
150
151 self.log.error('retcode=%d' % returncode)
152 self.log.error('out=%s' % out)
153 self.log.error('err=%s' % err)
154
155 return HandleCommandResult(retval=returncode, stdout=out, stderr=err)
156
157 @CLICommand('rgw realm bootstrap', perm='rw')
158 @check_orchestrator
159 def _cmd_rgw_realm_bootstrap(self,
160 realm_name: Optional[str] = None,
161 zonegroup_name: Optional[str] = None,
162 zone_name: Optional[str] = None,
163 port: Optional[int] = None,
164 placement: Optional[str] = None,
165 zone_endpoints: Optional[str] = None,
166 start_radosgw: Optional[bool] = True,
167 inbuf: Optional[str] = None) -> HandleCommandResult:
168 """Bootstrap new rgw realm, zonegroup, and zone"""
169
170 if inbuf:
171 try:
172 rgw_specs = self._parse_rgw_specs(inbuf)
173 except RGWSpecParsingError as e:
174 return HandleCommandResult(retval=-errno.EINVAL, stderr=f'{e}')
175 elif (realm_name and zonegroup_name and zone_name):
176 placement_spec = PlacementSpec.from_string(placement) if placement else None
177 rgw_specs = [RGWSpec(rgw_realm=realm_name,
178 rgw_zonegroup=zonegroup_name,
179 rgw_zone=zone_name,
180 rgw_frontend_port=port,
181 placement=placement_spec,
182 zone_endpoints=zone_endpoints)]
183 else:
184 err_msg = 'Invalid arguments: either pass a spec with -i or provide the realm, zonegroup and zone.'
185 return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg)
186
187 try:
188 for spec in rgw_specs:
189 RGWAM(self.env).realm_bootstrap(spec, start_radosgw)
190 except RGWAMException as e:
191 self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
192 return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
193
194 return HandleCommandResult(retval=0, stdout="Realm(s) created correctly. Please, use 'ceph rgw realm tokens' to get the token.", stderr='')
195
196 def _parse_rgw_specs(self, inbuf: str) -> List[RGWSpec]:
197 """Parse RGW specs from a YAML file."""
198 # YAML '---' document separator with no content generates
199 # None entries in the output. Let's skip them silently.
200 yaml_objs: Iterator = yaml.safe_load_all(inbuf)
201 specs = [o for o in yaml_objs if o is not None]
202 rgw_specs = []
203 for spec in specs:
204 # A secondary zone spec normally contains only the zone and the reaml token
205 # since no rgw_realm is specified in this case we extract it from the token
206 if 'rgw_realm_token' in spec:
207 realm_token = RealmToken.from_base64_str(spec['rgw_realm_token'])
208 if realm_token is None:
209 raise RGWSpecParsingError(f"Invalid realm token: {spec['rgw_realm_token']}")
210 spec['rgw_realm'] = realm_token.realm_name
211
212 try:
213 rgw_spec = RGWSpec.from_json(spec)
214 rgw_spec.validate()
215 except SpecValidationError as e:
216 raise RGWSpecParsingError(f'RGW Spec parsing/validation error: {e}')
217 else:
218 rgw_specs.append(rgw_spec)
219
220 return rgw_specs
221
222 @CLICommand('rgw realm zone-creds create', perm='rw')
223 def _cmd_rgw_realm_new_zone_creds(self,
224 realm_name: Optional[str] = None,
225 endpoints: Optional[str] = None,
226 sys_uid: Optional[str] = None) -> HandleCommandResult:
227 """Create credentials for new zone creation"""
228
229 try:
230 retval, out, err = RGWAM(self.env).realm_new_zone_creds(realm_name, endpoints, sys_uid)
231 except RGWAMException as e:
232 self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
233 return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
234
235 return HandleCommandResult(retval=retval, stdout=out, stderr=err)
236
237 @CLICommand('rgw realm zone-creds remove', perm='rw')
238 def _cmd_rgw_realm_rm_zone_creds(self, realm_token: Optional[str] = None) -> HandleCommandResult:
239 """Create credentials for new zone creation"""
240
241 try:
242 retval, out, err = RGWAM(self.env).realm_rm_zone_creds(realm_token)
243 except RGWAMException as e:
244 self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
245 return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
246
247 return HandleCommandResult(retval=retval, stdout=out, stderr=err)
248
249 @CLICommand('rgw realm tokens', perm='r')
250 def list_realm_tokens(self) -> HandleCommandResult:
251 try:
252 realms_info = []
253 for realm_info in RGWAM(self.env).get_realms_info():
254 if not realm_info['master_zone_id']:
255 realms_info.append({'realm': realm_info['realm_name'], 'token': 'realm has no master zone'})
256 elif not realm_info['endpoint']:
257 realms_info.append({'realm': realm_info['realm_name'], 'token': 'master zone has no endpoint'})
258 elif not (realm_info['access_key'] and realm_info['secret']):
259 realms_info.append({'realm': realm_info['realm_name'], 'token': 'master zone has no access/secret keys'})
260 else:
261 keys = ['realm_name', 'realm_id', 'endpoint', 'access_key', 'secret']
262 realm_token = RealmToken(**{k: realm_info[k] for k in keys})
263 realm_token_b = realm_token.to_json().encode('utf-8')
264 realm_token_s = base64.b64encode(realm_token_b).decode('utf-8')
265 realms_info.append({'realm': realm_info['realm_name'], 'token': realm_token_s})
266 except RGWAMException as e:
267 self.log.error(f'cmd run exception: ({e.retcode}) {e.message}')
268 return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
269
270 return HandleCommandResult(retval=0, stdout=json.dumps(realms_info, indent=4), stderr='')
271
272 @CLICommand('rgw zone modify', perm='rw')
273 def update_zone_info(self, realm_name: str, zonegroup_name: str, zone_name: str, realm_token: str, zone_endpoints: List[str]) -> HandleCommandResult:
274 try:
275 retval, out, err = RGWAM(self.env).zone_modify(realm_name,
276 zonegroup_name,
277 zone_name,
278 zone_endpoints,
279 realm_token)
280 return HandleCommandResult(retval, 'Zone updated successfully', '')
281 except RGWAMException as e:
282 self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
283 return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
284
285 @CLICommand('rgw zone create', perm='rw')
286 @check_orchestrator
287 def _cmd_rgw_zone_create(self,
288 zone_name: Optional[str] = None,
289 realm_token: Optional[str] = None,
290 port: Optional[int] = None,
291 placement: Optional[str] = None,
292 start_radosgw: Optional[bool] = True,
293 zone_endpoints: Optional[str] = None,
294 inbuf: Optional[str] = None) -> HandleCommandResult:
295 """Bootstrap new rgw zone that syncs with zone on another cluster in the same realm"""
296
297 if inbuf:
298 try:
299 rgw_specs = self._parse_rgw_specs(inbuf)
300 except RGWSpecParsingError as e:
301 return HandleCommandResult(retval=-errno.EINVAL, stderr=f'{e}')
302 elif (zone_name and realm_token):
303 token = RealmToken.from_base64_str(realm_token)
304 placement_spec = PlacementSpec.from_string(placement) if placement else None
305 rgw_specs = [RGWSpec(rgw_realm=token.realm_name,
306 rgw_zone=zone_name,
307 rgw_realm_token=realm_token,
308 rgw_frontend_port=port,
309 placement=placement_spec,
310 zone_endpoints=zone_endpoints)]
311 else:
312 err_msg = 'Invalid arguments: either pass a spec with -i or provide the zone_name and realm_token.'
313 return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg)
314
315 try:
316 created_zones = []
317 for rgw_spec in rgw_specs:
318 RGWAM(self.env).zone_create(rgw_spec, start_radosgw)
319 if rgw_spec.rgw_zone is not None:
320 created_zones.append(rgw_spec.rgw_zone)
321 except RGWAMException as e:
322 self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
323 return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
324
325 return HandleCommandResult(retval=0, stdout=f"Zones {', '.join(created_zones)} created successfully")
326
327 @CLICommand('rgw realm reconcile', perm='rw')
328 def _cmd_rgw_realm_reconcile(self,
329 realm_name: Optional[str] = None,
330 zonegroup_name: Optional[str] = None,
331 zone_name: Optional[str] = None,
332 update: Optional[bool] = False) -> HandleCommandResult:
333 """Bootstrap new rgw zone that syncs with existing zone"""
334
335 try:
336 retval, out, err = RGWAM(self.env).realm_reconcile(realm_name, zonegroup_name,
337 zone_name, update)
338 except RGWAMException as e:
339 self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
340 return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
341
342 return HandleCommandResult(retval=retval, stdout=out, stderr=err)
343
344 def shutdown(self) -> None:
345 """
346 This method is called by the mgr when the module needs to shut
347 down (i.e., when the serve() function needs to exit).
348 """
349 self.log.info('Stopping')
350 self.run = False
351 self.event.set()