]>
Commit | Line | Data |
---|---|---|
1e59de90 | 1 | import json |
20effc67 | 2 | import threading |
1e59de90 TL |
3 | import yaml |
4 | import errno | |
5 | import base64 | |
6 | import functools | |
7 | import sys | |
20effc67 TL |
8 | |
9 | from mgr_module import MgrModule, CLICommand, HandleCommandResult, Option | |
10 | import orchestrator | |
11 | ||
1e59de90 TL |
12 | from ceph.deployment.service_spec import RGWSpec, PlacementSpec, SpecValidationError |
13 | from typing import Any, Optional, Sequence, Iterator, List, Callable, TypeVar, cast, Dict, Tuple, Union, TYPE_CHECKING | |
20effc67 | 14 | |
1e59de90 | 15 | from ceph.rgw.types import RGWAMException, RGWAMEnvMgr, RealmToken |
20effc67 | 16 | from ceph.rgw.rgwam_core import EnvArgs, RGWAM |
1e59de90 TL |
17 | from orchestrator import OrchestratorClientMixin, OrchestratorError, DaemonDescription, OrchResult |
18 | ||
19 | ||
20 | FuncT = TypeVar('FuncT', bound=Callable[..., Any]) | |
21 | ||
22 | if TYPE_CHECKING: | |
23 | # this uses a version check as opposed to a try/except because this | |
24 | # form makes mypy happy and try/except doesn't. | |
25 | if sys.version_info >= (3, 8): | |
26 | from typing import Protocol | |
27 | else: | |
28 | from typing_extensions import Protocol | |
29 | ||
30 | class MgrModuleProtocol(Protocol): | |
31 | def tool_exec(self, args: List[str]) -> Tuple[int, str, str]: | |
32 | ... | |
33 | ||
34 | def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]: | |
35 | ... | |
36 | ||
37 | def list_daemons(self, service_name: Optional[str] = None, | |
38 | daemon_type: Optional[str] = None, | |
39 | daemon_id: Optional[str] = None, | |
40 | host: Optional[str] = None, | |
41 | refresh: bool = False) -> OrchResult[List['DaemonDescription']]: | |
42 | ... | |
43 | else: | |
44 | class MgrModuleProtocol: | |
45 | pass | |
46 | ||
47 | ||
48 | class RGWSpecParsingError(Exception): | |
49 | pass | |
50 | ||
51 | ||
52 | class OrchestratorAPI(OrchestratorClientMixin): | |
53 | def __init__(self, mgr: MgrModule): | |
54 | super(OrchestratorAPI, self).__init__() | |
55 | self.set_mgr(mgr) | |
56 | ||
57 | def status(self) -> Dict[str, Union[str, bool]]: | |
58 | try: | |
59 | status, message, _module_details = super().available() | |
60 | return dict(available=status, message=message) | |
61 | except (RuntimeError, OrchestratorError, ImportError) as e: | |
62 | return dict(available=False, message=f'Orchestrator is unavailable: {e}') | |
20effc67 TL |
63 | |
64 | ||
65 | class RGWAMOrchMgr(RGWAMEnvMgr): | |
1e59de90 | 66 | def __init__(self, mgr: MgrModuleProtocol): |
20effc67 TL |
67 | self.mgr = mgr |
68 | ||
1e59de90 TL |
69 | def tool_exec(self, prog: str, args: List[str]) -> Tuple[List[str], int, str, str]: |
70 | cmd = [prog] + args | |
71 | rc, stdout, stderr = self.mgr.tool_exec(args=cmd) | |
20effc67 TL |
72 | return cmd, rc, stdout, stderr |
73 | ||
1e59de90 | 74 | def apply_rgw(self, spec: RGWSpec) -> None: |
20effc67 TL |
75 | completion = self.mgr.apply_rgw(spec) |
76 | orchestrator.raise_if_exception(completion) | |
77 | ||
1e59de90 TL |
78 | def list_daemons(self, service_name: Optional[str] = None, |
79 | daemon_type: Optional[str] = None, | |
80 | daemon_id: Optional[str] = None, | |
81 | host: Optional[str] = None, | |
82 | refresh: bool = True) -> List['DaemonDescription']: | |
20effc67 TL |
83 | completion = self.mgr.list_daemons(service_name, |
84 | daemon_type, | |
85 | daemon_id=daemon_id, | |
86 | host=host, | |
87 | refresh=refresh) | |
88 | return orchestrator.raise_if_exception(completion) | |
89 | ||
90 | ||
1e59de90 TL |
91 | def check_orchestrator(func: FuncT) -> FuncT: |
92 | @functools.wraps(func) | |
93 | def wrapper(self: Any, *args: Any, **kwargs: Any) -> HandleCommandResult: | |
94 | available = self.api.status()['available'] | |
95 | if available: | |
96 | return func(self, *args, **kwargs) | |
97 | else: | |
98 | err_msg = "Cephadm is not available. Please enable cephadm by 'ceph mgr module enable cephadm'." | |
99 | return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg) | |
100 | return cast(FuncT, wrapper) | |
101 | ||
102 | ||
20effc67 | 103 | class Module(orchestrator.OrchestratorClientMixin, MgrModule): |
1e59de90 | 104 | MODULE_OPTIONS: List[Option] = [] |
20effc67 TL |
105 | |
106 | # These are "native" Ceph options that this module cares about. | |
1e59de90 | 107 | NATIVE_OPTIONS: List[Option] = [] |
20effc67 TL |
108 | |
109 | def __init__(self, *args: Any, **kwargs: Any): | |
110 | self.inited = False | |
111 | self.lock = threading.Lock() | |
112 | super(Module, self).__init__(*args, **kwargs) | |
1e59de90 | 113 | self.api = OrchestratorAPI(self) |
20effc67 TL |
114 | |
115 | # ensure config options members are initialized; see config_notify() | |
116 | self.config_notify() | |
117 | ||
118 | with self.lock: | |
119 | self.inited = True | |
120 | self.env = EnvArgs(RGWAMOrchMgr(self)) | |
121 | ||
122 | # set up some members to enable the serve() method and shutdown() | |
123 | self.run = True | |
124 | self.event = threading.Event() | |
125 | ||
20effc67 TL |
126 | def config_notify(self) -> None: |
127 | """ | |
128 | This method is called whenever one of our config options is changed. | |
129 | """ | |
130 | # This is some boilerplate that stores MODULE_OPTIONS in a class | |
131 | # member, so that, for instance, the 'emphatic' option is always | |
132 | # available as 'self.emphatic'. | |
133 | for opt in self.MODULE_OPTIONS: | |
134 | setattr(self, | |
135 | opt['name'], | |
136 | self.get_module_option(opt['name'])) | |
137 | self.log.debug(' mgr option %s = %s', | |
138 | opt['name'], getattr(self, opt['name'])) | |
139 | # Do the same for the native options. | |
140 | for opt in self.NATIVE_OPTIONS: | |
141 | setattr(self, | |
1e59de90 | 142 | opt, # type: ignore |
20effc67 | 143 | self.get_ceph_option(opt)) |
1e59de90 | 144 | self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore |
20effc67 TL |
145 | |
146 | @CLICommand('rgw admin', perm='rw') | |
1e59de90 | 147 | def _cmd_rgw_admin(self, params: Sequence[str]) -> HandleCommandResult: |
20effc67 TL |
148 | """rgw admin""" |
149 | cmd, returncode, out, err = self.env.mgr.tool_exec('radosgw-admin', params or []) | |
150 | ||
151 | self.log.error('retcode=%d' % returncode) | |
152 | self.log.error('out=%s' % out) | |
153 | self.log.error('err=%s' % err) | |
154 | ||
155 | return HandleCommandResult(retval=returncode, stdout=out, stderr=err) | |
156 | ||
157 | @CLICommand('rgw realm bootstrap', perm='rw') | |
1e59de90 | 158 | @check_orchestrator |
20effc67 | 159 | def _cmd_rgw_realm_bootstrap(self, |
1e59de90 | 160 | realm_name: Optional[str] = None, |
20effc67 TL |
161 | zonegroup_name: Optional[str] = None, |
162 | zone_name: Optional[str] = None, | |
1e59de90 TL |
163 | port: Optional[int] = None, |
164 | placement: Optional[str] = None, | |
165 | zone_endpoints: Optional[str] = None, | |
166 | start_radosgw: Optional[bool] = True, | |
167 | inbuf: Optional[str] = None) -> HandleCommandResult: | |
20effc67 TL |
168 | """Bootstrap new rgw realm, zonegroup, and zone""" |
169 | ||
1e59de90 TL |
170 | if inbuf: |
171 | try: | |
172 | rgw_specs = self._parse_rgw_specs(inbuf) | |
173 | except RGWSpecParsingError as e: | |
174 | return HandleCommandResult(retval=-errno.EINVAL, stderr=f'{e}') | |
175 | elif (realm_name and zonegroup_name and zone_name): | |
176 | placement_spec = PlacementSpec.from_string(placement) if placement else None | |
177 | rgw_specs = [RGWSpec(rgw_realm=realm_name, | |
178 | rgw_zonegroup=zonegroup_name, | |
179 | rgw_zone=zone_name, | |
180 | rgw_frontend_port=port, | |
181 | placement=placement_spec, | |
182 | zone_endpoints=zone_endpoints)] | |
183 | else: | |
184 | err_msg = 'Invalid arguments: either pass a spec with -i or provide the realm, zonegroup and zone.' | |
185 | return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg) | |
20effc67 TL |
186 | |
187 | try: | |
1e59de90 TL |
188 | for spec in rgw_specs: |
189 | RGWAM(self.env).realm_bootstrap(spec, start_radosgw) | |
20effc67 TL |
190 | except RGWAMException as e: |
191 | self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) | |
1e59de90 TL |
192 | return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) |
193 | ||
194 | return HandleCommandResult(retval=0, stdout="Realm(s) created correctly. Please, use 'ceph rgw realm tokens' to get the token.", stderr='') | |
195 | ||
196 | def _parse_rgw_specs(self, inbuf: str) -> List[RGWSpec]: | |
197 | """Parse RGW specs from a YAML file.""" | |
198 | # YAML '---' document separator with no content generates | |
199 | # None entries in the output. Let's skip them silently. | |
200 | yaml_objs: Iterator = yaml.safe_load_all(inbuf) | |
201 | specs = [o for o in yaml_objs if o is not None] | |
202 | rgw_specs = [] | |
203 | for spec in specs: | |
204 | # A secondary zone spec normally contains only the zone and the reaml token | |
205 | # since no rgw_realm is specified in this case we extract it from the token | |
206 | if 'rgw_realm_token' in spec: | |
207 | realm_token = RealmToken.from_base64_str(spec['rgw_realm_token']) | |
208 | if realm_token is None: | |
209 | raise RGWSpecParsingError(f"Invalid realm token: {spec['rgw_realm_token']}") | |
210 | spec['rgw_realm'] = realm_token.realm_name | |
211 | ||
212 | try: | |
213 | rgw_spec = RGWSpec.from_json(spec) | |
214 | rgw_spec.validate() | |
215 | except SpecValidationError as e: | |
216 | raise RGWSpecParsingError(f'RGW Spec parsing/validation error: {e}') | |
217 | else: | |
218 | rgw_specs.append(rgw_spec) | |
219 | ||
220 | return rgw_specs | |
20effc67 TL |
221 | |
222 | @CLICommand('rgw realm zone-creds create', perm='rw') | |
223 | def _cmd_rgw_realm_new_zone_creds(self, | |
1e59de90 TL |
224 | realm_name: Optional[str] = None, |
225 | endpoints: Optional[str] = None, | |
226 | sys_uid: Optional[str] = None) -> HandleCommandResult: | |
20effc67 TL |
227 | """Create credentials for new zone creation""" |
228 | ||
229 | try: | |
230 | retval, out, err = RGWAM(self.env).realm_new_zone_creds(realm_name, endpoints, sys_uid) | |
231 | except RGWAMException as e: | |
232 | self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) | |
1e59de90 | 233 | return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) |
20effc67 TL |
234 | |
235 | return HandleCommandResult(retval=retval, stdout=out, stderr=err) | |
236 | ||
237 | @CLICommand('rgw realm zone-creds remove', perm='rw') | |
1e59de90 | 238 | def _cmd_rgw_realm_rm_zone_creds(self, realm_token: Optional[str] = None) -> HandleCommandResult: |
20effc67 TL |
239 | """Create credentials for new zone creation""" |
240 | ||
241 | try: | |
242 | retval, out, err = RGWAM(self.env).realm_rm_zone_creds(realm_token) | |
243 | except RGWAMException as e: | |
244 | self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) | |
1e59de90 | 245 | return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) |
20effc67 TL |
246 | |
247 | return HandleCommandResult(retval=retval, stdout=out, stderr=err) | |
248 | ||
1e59de90 TL |
249 | @CLICommand('rgw realm tokens', perm='r') |
250 | def list_realm_tokens(self) -> HandleCommandResult: | |
251 | try: | |
aee94f69 | 252 | realms_info = self.get_realm_tokens() |
1e59de90 TL |
253 | except RGWAMException as e: |
254 | self.log.error(f'cmd run exception: ({e.retcode}) {e.message}') | |
255 | return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) | |
256 | ||
257 | return HandleCommandResult(retval=0, stdout=json.dumps(realms_info, indent=4), stderr='') | |
258 | ||
aee94f69 TL |
259 | def get_realm_tokens(self) -> List[Dict]: |
260 | realms_info = [] | |
261 | for realm_info in RGWAM(self.env).get_realms_info(): | |
262 | if not realm_info['master_zone_id']: | |
263 | realms_info.append({'realm': realm_info['realm_name'], 'token': 'realm has no master zone'}) | |
264 | elif not realm_info['endpoint']: | |
265 | realms_info.append({'realm': realm_info['realm_name'], 'token': 'master zone has no endpoint'}) | |
266 | elif not (realm_info['access_key'] and realm_info['secret']): | |
267 | realms_info.append({'realm': realm_info['realm_name'], 'token': 'master zone has no access/secret keys'}) | |
268 | else: | |
269 | keys = ['realm_name', 'realm_id', 'endpoint', 'access_key', 'secret'] | |
270 | realm_token = RealmToken(**{k: realm_info[k] for k in keys}) | |
271 | realm_token_b = realm_token.to_json().encode('utf-8') | |
272 | realm_token_s = base64.b64encode(realm_token_b).decode('utf-8') | |
273 | realms_info.append({'realm': realm_info['realm_name'], 'token': realm_token_s}) | |
274 | return realms_info | |
275 | ||
1e59de90 TL |
276 | @CLICommand('rgw zone modify', perm='rw') |
277 | def update_zone_info(self, realm_name: str, zonegroup_name: str, zone_name: str, realm_token: str, zone_endpoints: List[str]) -> HandleCommandResult: | |
278 | try: | |
279 | retval, out, err = RGWAM(self.env).zone_modify(realm_name, | |
280 | zonegroup_name, | |
281 | zone_name, | |
282 | zone_endpoints, | |
283 | realm_token) | |
284 | return HandleCommandResult(retval, 'Zone updated successfully', '') | |
285 | except RGWAMException as e: | |
286 | self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) | |
287 | return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) | |
288 | ||
20effc67 | 289 | @CLICommand('rgw zone create', perm='rw') |
1e59de90 | 290 | @check_orchestrator |
20effc67 | 291 | def _cmd_rgw_zone_create(self, |
20effc67 | 292 | zone_name: Optional[str] = None, |
1e59de90 TL |
293 | realm_token: Optional[str] = None, |
294 | port: Optional[int] = None, | |
295 | placement: Optional[str] = None, | |
296 | start_radosgw: Optional[bool] = True, | |
297 | zone_endpoints: Optional[str] = None, | |
298 | inbuf: Optional[str] = None) -> HandleCommandResult: | |
299 | """Bootstrap new rgw zone that syncs with zone on another cluster in the same realm""" | |
300 | ||
aee94f69 TL |
301 | created_zones = self.rgw_zone_create(zone_name, realm_token, port, placement, |
302 | start_radosgw, zone_endpoints, inbuf) | |
303 | ||
304 | return HandleCommandResult(retval=0, stdout=f"Zones {', '.join(created_zones)} created successfully") | |
305 | ||
306 | def rgw_zone_create(self, | |
307 | zone_name: Optional[str] = None, | |
308 | realm_token: Optional[str] = None, | |
309 | port: Optional[int] = None, | |
310 | placement: Optional[Union[str, Dict[str, Any]]] = None, | |
311 | start_radosgw: Optional[bool] = True, | |
312 | zone_endpoints: Optional[str] = None, | |
313 | inbuf: Optional[str] = None) -> Any: | |
314 | ||
1e59de90 TL |
315 | if inbuf: |
316 | try: | |
317 | rgw_specs = self._parse_rgw_specs(inbuf) | |
318 | except RGWSpecParsingError as e: | |
319 | return HandleCommandResult(retval=-errno.EINVAL, stderr=f'{e}') | |
320 | elif (zone_name and realm_token): | |
321 | token = RealmToken.from_base64_str(realm_token) | |
aee94f69 TL |
322 | if isinstance(placement, dict): |
323 | placement_spec = PlacementSpec.from_json(placement) if placement else None | |
324 | elif isinstance(placement, str): | |
325 | placement_spec = PlacementSpec.from_string(placement) if placement else None | |
1e59de90 TL |
326 | rgw_specs = [RGWSpec(rgw_realm=token.realm_name, |
327 | rgw_zone=zone_name, | |
328 | rgw_realm_token=realm_token, | |
329 | rgw_frontend_port=port, | |
330 | placement=placement_spec, | |
331 | zone_endpoints=zone_endpoints)] | |
332 | else: | |
333 | err_msg = 'Invalid arguments: either pass a spec with -i or provide the zone_name and realm_token.' | |
334 | return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg) | |
20effc67 TL |
335 | |
336 | try: | |
1e59de90 TL |
337 | created_zones = [] |
338 | for rgw_spec in rgw_specs: | |
339 | RGWAM(self.env).zone_create(rgw_spec, start_radosgw) | |
340 | if rgw_spec.rgw_zone is not None: | |
341 | created_zones.append(rgw_spec.rgw_zone) | |
aee94f69 | 342 | return created_zones |
20effc67 TL |
343 | except RGWAMException as e: |
344 | self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) | |
1e59de90 | 345 | return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) |
aee94f69 | 346 | return created_zones |
20effc67 TL |
347 | |
348 | @CLICommand('rgw realm reconcile', perm='rw') | |
349 | def _cmd_rgw_realm_reconcile(self, | |
1e59de90 TL |
350 | realm_name: Optional[str] = None, |
351 | zonegroup_name: Optional[str] = None, | |
352 | zone_name: Optional[str] = None, | |
353 | update: Optional[bool] = False) -> HandleCommandResult: | |
20effc67 TL |
354 | """Bootstrap new rgw zone that syncs with existing zone""" |
355 | ||
356 | try: | |
357 | retval, out, err = RGWAM(self.env).realm_reconcile(realm_name, zonegroup_name, | |
358 | zone_name, update) | |
359 | except RGWAMException as e: | |
360 | self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message)) | |
1e59de90 | 361 | return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr) |
20effc67 TL |
362 | |
363 | return HandleCommandResult(retval=retval, stdout=out, stderr=err) | |
364 | ||
365 | def shutdown(self) -> None: | |
366 | """ | |
367 | This method is called by the mgr when the module needs to shut | |
368 | down (i.e., when the serve() function needs to exit). | |
369 | """ | |
370 | self.log.info('Stopping') | |
371 | self.run = False | |
372 | self.event.set() | |
aee94f69 TL |
373 | |
374 | def import_realm_token(self, | |
375 | zone_name: Optional[str] = None, | |
376 | realm_token: Optional[str] = None, | |
377 | port: Optional[int] = None, | |
378 | placement: Optional[dict] = None, | |
379 | start_radosgw: Optional[bool] = True, | |
380 | zone_endpoints: Optional[str] = None) -> None: | |
381 | placement_spec = placement.get('placement') if placement else None | |
382 | self.rgw_zone_create(zone_name, realm_token, port, placement_spec, start_radosgw, | |
383 | zone_endpoints) |