]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator/module.py
import 15.2.1 Octopus source
[ceph.git] / ceph / src / pybind / mgr / orchestrator / module.py
CommitLineData
9f95a23c
TL
1import datetime
2import errno
3import json
4import yaml
5
6import six
7
8from ceph.deployment.inventory import Device
9from prettytable import PrettyTable
10
11from mgr_util import format_bytes, to_pretty_timedelta
12
13try:
14 from typing import List, Set, Optional, Dict, Iterator
15except ImportError:
16 pass # just for type checking.
17
18
19from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
20
21from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
22from mgr_module import MgrModule, HandleCommandResult
23
24from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_command, \
25 raise_if_exception, _cli_write_command, TrivialReadCompletion, OrchestratorError, \
26 NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
27 RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta
28
29def nice_delta(now, t, suffix=''):
30 if t:
31 return to_pretty_timedelta(now - t) + suffix
32 else:
33 return '-'
34
35@six.add_metaclass(CLICommandMeta)
36class OrchestratorCli(OrchestratorClientMixin, MgrModule):
37 MODULE_OPTIONS = [
38 {
39 'name': 'orchestrator',
40 'type': 'str',
41 'default': None,
42 'desc': 'Orchestrator backend',
43 'enum_allowed': ['cephadm', 'rook',
44 'test_orchestrator'],
45 'runtime': True,
46 },
47 ]
48 NATIVE_OPTIONS = [] # type: List[dict]
49
50 def __init__(self, *args, **kwargs):
51 super(OrchestratorCli, self).__init__(*args, **kwargs)
52 self.ident = set() # type: Set[str]
53 self.fault = set() # type: Set[str]
54 self._load()
55 self._refresh_health()
56
57 def _load(self):
58 active = self.get_store('active_devices')
59 if active:
60 decoded = json.loads(active)
61 self.ident = set(decoded.get('ident', []))
62 self.fault = set(decoded.get('fault', []))
63 self.log.debug('ident {}, fault {}'.format(self.ident, self.fault))
64
65 def _save(self):
66 encoded = json.dumps({
67 'ident': list(self.ident),
68 'fault': list(self.fault),
69 })
70 self.set_store('active_devices', encoded)
71
72 def _refresh_health(self):
73 h = {}
74 if self.ident:
75 h['DEVICE_IDENT_ON'] = {
76 'severity': 'warning',
77 'summary': '%d devices have ident light turned on' % len(
78 self.ident),
79 'detail': ['{} ident light enabled'.format(d) for d in self.ident]
80 }
81 if self.fault:
82 h['DEVICE_FAULT_ON'] = {
83 'severity': 'warning',
84 'summary': '%d devices have fault light turned on' % len(
85 self.fault),
86 'detail': ['{} fault light enabled'.format(d) for d in self.ident]
87 }
88 self.set_health_checks(h)
89
90 def _get_device_locations(self, dev_id):
91 # type: (str) -> List[DeviceLightLoc]
92 locs = [d['location'] for d in self.get('devices')['devices'] if d['devid'] == dev_id]
93 return [DeviceLightLoc(**l) for l in sum(locs, [])]
94
95 @_cli_read_command(
96 prefix='device ls-lights',
97 desc='List currently active device indicator lights')
98 def _device_ls(self):
99 return HandleCommandResult(
100 stdout=json.dumps({
101 'ident': list(self.ident),
102 'fault': list(self.fault)
103 }, indent=4, sort_keys=True))
104
105 def light_on(self, fault_ident, devid):
106 # type: (str, str) -> HandleCommandResult
107 assert fault_ident in ("fault", "ident")
108 locs = self._get_device_locations(devid)
109 if locs is None:
110 return HandleCommandResult(stderr='device {} not found'.format(devid),
111 retval=-errno.ENOENT)
112
113 getattr(self, fault_ident).add(devid)
114 self._save()
115 self._refresh_health()
116 completion = self.blink_device_light(fault_ident, True, locs)
117 self._orchestrator_wait([completion])
118 return HandleCommandResult(stdout=str(completion.result))
119
120 def light_off(self, fault_ident, devid, force):
121 # type: (str, str, bool) -> HandleCommandResult
122 assert fault_ident in ("fault", "ident")
123 locs = self._get_device_locations(devid)
124 if locs is None:
125 return HandleCommandResult(stderr='device {} not found'.format(devid),
126 retval=-errno.ENOENT)
127
128 try:
129 completion = self.blink_device_light(fault_ident, False, locs)
130 self._orchestrator_wait([completion])
131
132 if devid in getattr(self, fault_ident):
133 getattr(self, fault_ident).remove(devid)
134 self._save()
135 self._refresh_health()
136 return HandleCommandResult(stdout=str(completion.result))
137
138 except:
139 # There are several reasons the try: block might fail:
140 # 1. the device no longer exist
141 # 2. the device is no longer known to Ceph
142 # 3. the host is not reachable
143 if force and devid in getattr(self, fault_ident):
144 getattr(self, fault_ident).remove(devid)
145 self._save()
146 self._refresh_health()
147 raise
148
149 @_cli_write_command(
150 prefix='device light',
151 cmd_args='name=enable,type=CephChoices,strings=on|off '
152 'name=devid,type=CephString '
153 'name=light_type,type=CephChoices,strings=ident|fault,req=false '
154 'name=force,type=CephBool,req=false',
155 desc='Enable or disable the device light. Default type is `ident`\n'
156 'Usage: device light (on|off) <devid> [ident|fault] [--force]')
157 def _device_light(self, enable, devid, light_type=None, force=False):
158 # type: (str, str, Optional[str], bool) -> HandleCommandResult
159 light_type = light_type or 'ident'
160 on = enable == 'on'
161 if on:
162 return self.light_on(light_type, devid)
163 else:
164 return self.light_off(light_type, devid, force)
165
166 def _select_orchestrator(self):
167 return self.get_module_option("orchestrator")
168
169 @_cli_write_command(
170 'orch host add',
171 'name=hostname,type=CephString,req=true '
172 'name=addr,type=CephString,req=false '
173 'name=labels,type=CephString,n=N,req=false',
174 'Add a host')
175 def _add_host(self, hostname:str, addr: Optional[str]=None, labels: Optional[List[str]]=None):
176 s = HostSpec(hostname=hostname, addr=addr, labels=labels)
177 completion = self.add_host(s)
178 self._orchestrator_wait([completion])
179 raise_if_exception(completion)
180 return HandleCommandResult(stdout=completion.result_str())
181
182 @_cli_write_command(
183 'orch host rm',
184 "name=hostname,type=CephString,req=true",
185 'Remove a host')
186 def _remove_host(self, hostname):
187 completion = self.remove_host(hostname)
188 self._orchestrator_wait([completion])
189 raise_if_exception(completion)
190 return HandleCommandResult(stdout=completion.result_str())
191
192 @_cli_write_command(
193 'orch host set-addr',
194 'name=hostname,type=CephString '
195 'name=addr,type=CephString',
196 'Update a host address')
197 def _update_set_addr(self, hostname, addr):
198 completion = self.update_host_addr(hostname, addr)
199 self._orchestrator_wait([completion])
200 raise_if_exception(completion)
201 return HandleCommandResult(stdout=completion.result_str())
202
203 @_cli_read_command(
204 'orch host ls',
205 'name=format,type=CephChoices,strings=json|plain,req=false',
206 'List hosts')
207 def _get_hosts(self, format='plain'):
208 completion = self.get_hosts()
209 self._orchestrator_wait([completion])
210 raise_if_exception(completion)
211 if format == 'json':
212 hosts = [host.to_json()
213 for host in completion.result]
214 output = json.dumps(hosts, sort_keys=True)
215 else:
216 table = PrettyTable(
217 ['HOST', 'ADDR', 'LABELS', 'STATUS'],
218 border=False)
219 table.align = 'l'
220 table.left_padding_width = 0
221 table.right_padding_width = 2
222 for host in sorted(completion.result, key=lambda h: h.hostname):
223 table.add_row((host.hostname, host.addr, ' '.join(host.labels), host.status))
224 output = table.get_string()
225 return HandleCommandResult(stdout=output)
226
227 @_cli_write_command(
228 'orch host label add',
229 'name=hostname,type=CephString '
230 'name=label,type=CephString',
231 'Add a host label')
232 def _host_label_add(self, hostname, label):
233 completion = self.add_host_label(hostname, label)
234 self._orchestrator_wait([completion])
235 raise_if_exception(completion)
236 return HandleCommandResult(stdout=completion.result_str())
237
238 @_cli_write_command(
239 'orch host label rm',
240 'name=hostname,type=CephString '
241 'name=label,type=CephString',
242 'Remove a host label')
243 def _host_label_rm(self, hostname, label):
244 completion = self.remove_host_label(hostname, label)
245 self._orchestrator_wait([completion])
246 raise_if_exception(completion)
247 return HandleCommandResult(stdout=completion.result_str())
248
249 @_cli_read_command(
250 'orch device ls',
251 "name=hostname,type=CephString,n=N,req=false "
252 "name=format,type=CephChoices,strings=json|plain,req=false "
253 "name=refresh,type=CephBool,req=false",
254 'List devices on a host')
255 def _list_devices(self, hostname=None, format='plain', refresh=False):
256 # type: (Optional[List[str]], str, bool) -> HandleCommandResult
257 """
258 Provide information about storage devices present in cluster hosts
259
260 Note: this does not have to be completely synchronous. Slightly out of
261 date hardware inventory is fine as long as hardware ultimately appears
262 in the output of this command.
263 """
264 nf = InventoryFilter(hosts=hostname) if hostname else None
265
266 completion = self.get_inventory(host_filter=nf, refresh=refresh)
267
268 self._orchestrator_wait([completion])
269 raise_if_exception(completion)
270
271 if format == 'json':
272 data = [n.to_json() for n in completion.result]
273 return HandleCommandResult(stdout=json.dumps(data))
274 else:
275 out = []
276
277 table = PrettyTable(
278 ['HOST', 'PATH', 'TYPE', 'SIZE', 'DEVICE', 'AVAIL',
279 'REJECT REASONS'],
280 border=False)
281 table.align = 'l'
282 table._align['SIZE'] = 'r'
283 table.left_padding_width = 0
284 table.right_padding_width = 2
285 for host_ in completion.result: # type: InventoryHost
286 for d in host_.devices.devices: # type: Device
287 table.add_row(
288 (
289 host_.name,
290 d.path,
291 d.human_readable_type,
292 format_bytes(d.sys_api.get('size', 0), 5),
293 d.device_id,
294 d.available,
295 ', '.join(d.rejected_reasons)
296 )
297 )
298 out.append(table.get_string())
299 return HandleCommandResult(stdout='\n'.join(out))
300
301 @_cli_write_command(
302 'orch device zap',
303 'name=hostname,type=CephString '
304 'name=path,type=CephString '
305 'name=force,type=CephBool,req=false',
306 'Zap (erase!) a device so it can be re-used')
307 def _zap_device(self, hostname, path, force=False):
308 if not force:
309 raise OrchestratorError('must pass --force to PERMANENTLY ERASE DEVICE DATA')
310 completion = self.zap_device(hostname, path)
311 self._orchestrator_wait([completion])
312 raise_if_exception(completion)
313 return HandleCommandResult(stdout=completion.result_str())
314
315 @_cli_read_command(
316 'orch ls',
317 "name=service_type,type=CephString,req=false "
318 "name=service_name,type=CephString,req=false "
319 "name=format,type=CephChoices,strings=json|plain,req=false "
320 "name=refresh,type=CephBool,req=false",
321 'List services known to orchestrator')
322 def _list_services(self, host=None, service_type=None, service_name=None, format='plain', refresh=False):
323 completion = self.describe_service(service_type,
324 service_name,
325 refresh=refresh)
326 self._orchestrator_wait([completion])
327 raise_if_exception(completion)
328 services = completion.result
329
330 def ukn(s):
331 return '<unknown>' if s is None else s
332
333 # Sort the list for display
334 services.sort(key=lambda s: (ukn(s.service_name)))
335
336 if len(services) == 0:
337 return HandleCommandResult(stdout="No services reported")
338 elif format == 'json':
339 data = [s.to_json() for s in services]
340 return HandleCommandResult(stdout=json.dumps(data))
341 else:
342 now = datetime.datetime.utcnow()
343 table = PrettyTable(
344 ['NAME', 'RUNNING', 'REFRESHED', 'AGE',
345 'PLACEMENT',
346 'IMAGE NAME', 'IMAGE ID',
347 ],
348 border=False)
349 table.align['NAME'] = 'l'
350 table.align['RUNNING'] = 'r'
351 table.align['REFRESHED'] = 'l'
352 table.align['AGE'] = 'l'
353 table.align['IMAGE NAME'] = 'l'
354 table.align['IMAGE ID'] = 'l'
355 table.align['PLACEMENT'] = 'l'
356 table.left_padding_width = 0
357 table.right_padding_width = 2
358 for s in sorted(services, key=lambda s: s.service_name):
359 if not s.spec:
360 pl = '<no spec>'
361 elif s.spec.unmanaged:
362 pl = '<unmanaged>'
363 else:
364 pl = s.spec.placement.pretty_str()
365 table.add_row((
366 s.service_name,
367 '%d/%d' % (s.running, s.size),
368 nice_delta(now, s.last_refresh, ' ago'),
369 nice_delta(now, s.created),
370 pl,
371 ukn(s.container_image_name),
372 ukn(s.container_image_id)[0:12],
373 ))
374
375 return HandleCommandResult(stdout=table.get_string())
376
377 @_cli_read_command(
378 'orch ps',
379 "name=hostname,type=CephString,req=false "
801d1391 380 "name=service_name,type=CephString,req=false "
9f95a23c
TL
381 "name=daemon_type,type=CephString,req=false "
382 "name=daemon_id,type=CephString,req=false "
383 "name=format,type=CephChoices,strings=json|plain,req=false "
384 "name=refresh,type=CephBool,req=false",
385 'List daemons known to orchestrator')
801d1391
TL
386 def _list_daemons(self, hostname=None, service_name=None, daemon_type=None, daemon_id=None, format='plain', refresh=False):
387 completion = self.list_daemons(service_name,
388 daemon_type,
9f95a23c
TL
389 daemon_id=daemon_id,
390 host=hostname,
391 refresh=refresh)
392 self._orchestrator_wait([completion])
393 raise_if_exception(completion)
394 daemons = completion.result
395
396 def ukn(s):
397 return '<unknown>' if s is None else s
398 # Sort the list for display
399 daemons.sort(key=lambda s: (ukn(s.daemon_type), ukn(s.hostname), ukn(s.daemon_id)))
400
401 if len(daemons) == 0:
402 return HandleCommandResult(stdout="No daemons reported")
403 elif format == 'json':
404 data = [s.to_json() for s in daemons]
405 return HandleCommandResult(stdout=json.dumps(data))
406 else:
407 now = datetime.datetime.utcnow()
408 table = PrettyTable(
409 ['NAME', 'HOST', 'STATUS', 'REFRESHED', 'AGE',
410 'VERSION', 'IMAGE NAME', 'IMAGE ID', 'CONTAINER ID'],
411 border=False)
412 table.align = 'l'
413 table.left_padding_width = 0
414 table.right_padding_width = 2
415 for s in sorted(daemons, key=lambda s: s.name()):
416 status = {
417 -1: 'error',
418 0: 'stopped',
419 1: 'running',
420 None: '<unknown>'
421 }[s.status]
422 if s.status == 1 and s.started:
423 status += ' (%s)' % to_pretty_timedelta(now - s.started)
424
425 table.add_row((
426 s.name(),
427 ukn(s.hostname),
428 status,
429 nice_delta(now, s.last_refresh, ' ago'),
430 nice_delta(now, s.created),
431 ukn(s.version),
432 ukn(s.container_image_name),
433 ukn(s.container_image_id)[0:12],
434 ukn(s.container_id)))
435
436 return HandleCommandResult(stdout=table.get_string())
437
438 @_cli_write_command(
439 'orch apply osd',
440 'name=all_available_devices,type=CephBool,req=false',
441 'Create OSD daemon(s) using a drive group spec')
442 def _apply_osd(self, all_available_devices=False, inbuf=None):
443 # type: (bool, Optional[str]) -> HandleCommandResult
444 """Apply DriveGroupSpecs to create OSDs"""
445 usage = """
446Usage:
447 ceph orch apply osd -i <json_file/yaml_file>
448 ceph orch apply osd --use-all-devices
449"""
450 if not inbuf and not all_available_devices:
451 return HandleCommandResult(-errno.EINVAL, stderr=usage)
452 if inbuf:
453 if all_available_devices:
454 raise OrchestratorError('--all-available-devices cannot be combined with an osd spec')
455 try:
456 drivegroups = yaml.load_all(inbuf)
457 dg_specs = [ServiceSpec.from_json(dg) for dg in drivegroups]
458 except ValueError as e:
459 msg = 'Failed to read JSON/YAML input: {}'.format(str(e)) + usage
460 return HandleCommandResult(-errno.EINVAL, stderr=msg)
461 else:
462 dg_specs = [
463 DriveGroupSpec(
464 service_id='all-available-devices',
465 placement=PlacementSpec(host_pattern='*'),
466 data_devices=DeviceSelection(all=True),
467 )
468 ]
469
470 completion = self.apply_drivegroups(dg_specs)
471 self._orchestrator_wait([completion])
472 raise_if_exception(completion)
473 return HandleCommandResult(stdout=completion.result_str())
474
475 @_cli_write_command(
476 'orch daemon add osd',
477 "name=svc_arg,type=CephString,req=false",
478 'Create an OSD service. Either --svc_arg=host:drives')
479 def _daemon_add_osd(self, svc_arg=None):
480 # type: (Optional[str]) -> HandleCommandResult
481 """Create one or more OSDs"""
482
483 usage = """
484Usage:
485 ceph orch daemon add osd host:device1,device2,...
486"""
487 if not svc_arg:
488 return HandleCommandResult(-errno.EINVAL, stderr=usage)
489 try:
490 host_name, block_device = svc_arg.split(":")
491 block_devices = block_device.split(',')
492 devs = DeviceSelection(paths=block_devices)
493 drive_group = DriveGroupSpec(placement=PlacementSpec(host_pattern=host_name), data_devices=devs)
494 except (TypeError, KeyError, ValueError):
495 msg = "Invalid host:device spec: '{}'".format(svc_arg) + usage
496 return HandleCommandResult(-errno.EINVAL, stderr=msg)
497
498 completion = self.create_osds(drive_group)
499 self._orchestrator_wait([completion])
500 raise_if_exception(completion)
501 return HandleCommandResult(stdout=completion.result_str())
502
503 @_cli_write_command(
504 'orch osd rm',
505 "name=svc_id,type=CephString,n=N "
506 "name=replace,type=CephBool,req=false "
507 "name=force,type=CephBool,req=false",
508 'Remove OSD services')
509 def _osd_rm(self, svc_id: List[str],
510 replace: bool = False,
511 force: bool = False) -> HandleCommandResult:
512 completion = self.remove_osds(svc_id, replace, force)
513 self._orchestrator_wait([completion])
514 raise_if_exception(completion)
515 return HandleCommandResult(stdout=completion.result_str())
516
517 @_cli_write_command(
518 'orch osd rm status',
519 desc='status of OSD removal operation')
520 def _osd_rm_status(self) -> HandleCommandResult:
521 completion = self.remove_osds_status()
522 self._orchestrator_wait([completion])
523 raise_if_exception(completion)
524 report = completion.result
525 if not report:
526 return HandleCommandResult(stdout="No OSD remove/replace operations reported")
527 table = PrettyTable(
528 ['NAME', 'HOST', 'PGS', 'STARTED_AT'],
529 border=False)
530 table.align = 'l'
531 table.left_padding_width = 0
532 table.right_padding_width = 1
533 # TODO: re-add sorted and sort by pg_count
534 for osd in report:
535 table.add_row((osd.fullname, osd.nodename, osd.pg_count_str, osd.started_at))
536
537 return HandleCommandResult(stdout=table.get_string())
538
539 @_cli_write_command(
540 'orch daemon add',
541 'name=daemon_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
542 'name=placement,type=CephString,req=false',
543 'Add daemon(s)')
544 def _daemon_add_misc(self, daemon_type=None, placement=None, inbuf=None):
545 usage = f"""Usage:
546 ceph orch daemon add -i <json_file>
547 ceph orch daemon add {daemon_type or '<daemon_type>'} <placement>"""
548 if inbuf:
549 if daemon_type or placement:
550 raise OrchestratorValidationError(usage)
551 spec = ServiceSpec.from_json(yaml.safe_load(inbuf))
552 else:
553 placement = PlacementSpec.from_string(placement)
554 placement.validate()
555
556 spec = ServiceSpec(daemon_type, placement=placement)
557
558 if daemon_type == 'mon':
559 completion = self.add_mon(spec)
560 elif daemon_type == 'mgr':
561 completion = self.add_mgr(spec)
562 elif daemon_type == 'rbd-mirror':
563 completion = self.add_rbd_mirror(spec)
564 elif daemon_type == 'crash':
565 completion = self.add_crash(spec)
566 elif daemon_type == 'alertmanager':
567 completion = self.add_alertmanager(spec)
568 elif daemon_type == 'grafana':
569 completion = self.add_grafana(spec)
570 elif daemon_type == 'node-exporter':
571 completion = self.add_node_exporter(spec)
572 elif daemon_type == 'prometheus':
573 completion = self.add_prometheus(spec)
574 else:
575 raise OrchestratorValidationError(f'unknown daemon type `{daemon_type}`')
576
577 self._orchestrator_wait([completion])
578 raise_if_exception(completion)
579 return HandleCommandResult(stdout=completion.result_str())
580
581 @_cli_write_command(
582 'orch daemon add mds',
583 'name=fs_name,type=CephString '
584 'name=placement,type=CephString,req=false',
585 'Start MDS daemon(s)')
586 def _mds_add(self, fs_name, placement=None):
587 spec = ServiceSpec(
588 'mds', fs_name,
589 placement=PlacementSpec.from_string(placement),
590 )
591 completion = self.add_mds(spec)
592 self._orchestrator_wait([completion])
593 raise_if_exception(completion)
594 return HandleCommandResult(stdout=completion.result_str())
595
596 @_cli_write_command(
597 'orch daemon add rgw',
598 'name=realm_name,type=CephString '
599 'name=zone_name,type=CephString '
600 'name=placement,type=CephString,req=false',
601 'Start RGW daemon(s)')
602 def _rgw_add(self, realm_name, zone_name, placement=None, inbuf=None):
603 usage = """
604Usage:
605 ceph orch daemon rgw add -i <json_file>
606 ceph orch daemon rgw add <realm_name> <zone_name>
607 """
608 if inbuf:
609 try:
610 rgw_spec = RGWSpec.from_json(json.loads(inbuf))
611 except ValueError as e:
612 msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
613 return HandleCommandResult(-errno.EINVAL, stderr=msg)
614 rgw_spec = RGWSpec(
615 rgw_realm=realm_name,
616 rgw_zone=zone_name,
617 placement=PlacementSpec.from_string(placement),
618 )
619
620 completion = self.add_rgw(rgw_spec)
621 self._orchestrator_wait([completion])
622 raise_if_exception(completion)
623 return HandleCommandResult(stdout=completion.result_str())
624
625 @_cli_write_command(
626 'orch daemon add nfs',
627 "name=svc_arg,type=CephString "
628 "name=pool,type=CephString "
629 "name=namespace,type=CephString,req=false "
630 'name=placement,type=CephString,req=false',
631 'Start NFS daemon(s)')
632 def _nfs_add(self, svc_arg, pool, namespace=None, placement=None):
633 spec = NFSServiceSpec(
634 svc_arg,
635 pool=pool,
636 namespace=namespace,
637 placement=PlacementSpec.from_string(placement),
638 )
639 spec.validate_add()
640 completion = self.add_nfs(spec)
641 self._orchestrator_wait([completion])
642 raise_if_exception(completion)
643 return HandleCommandResult(stdout=completion.result_str())
644
645 @_cli_write_command(
646 'orch',
647 "name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig "
648 "name=service_name,type=CephString",
649 'Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)')
650 def _service_action(self, action, service_name):
651 completion = self.service_action(action, service_name)
652 self._orchestrator_wait([completion])
653 raise_if_exception(completion)
654 return HandleCommandResult(stdout=completion.result_str())
655
656 @_cli_write_command(
657 'orch daemon',
658 "name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig "
659 "name=name,type=CephString",
660 'Start, stop, restart, redeploy, or reconfig a specific daemon')
661 def _daemon_action(self, action, name):
662 if '.' not in name:
663 raise OrchestratorError('%s is not a valid daemon name' % name)
664 (daemon_type, daemon_id) = name.split('.', 1)
665 completion = self.daemon_action(action, daemon_type, daemon_id)
666 self._orchestrator_wait([completion])
667 raise_if_exception(completion)
668 return HandleCommandResult(stdout=completion.result_str())
669
670 @_cli_write_command(
671 'orch daemon rm',
672 "name=names,type=CephString,n=N "
673 'name=force,type=CephBool,req=false',
674 'Remove specific daemon(s)')
675 def _daemon_rm(self, names, force=False):
676 for name in names:
677 if '.' not in name:
678 raise OrchestratorError('%s is not a valid daemon name' % name)
679 (daemon_type) = name.split('.')[0]
680 if not force and daemon_type in ['osd', 'mon', 'prometheus']:
681 raise OrchestratorError('must pass --force to REMOVE daemon with potentially PRECIOUS DATA for %s' % name)
682 completion = self.remove_daemons(names)
683 self._orchestrator_wait([completion])
684 raise_if_exception(completion)
685 return HandleCommandResult(stdout=completion.result_str())
686
687 @_cli_write_command(
688 'orch rm',
689 'name=service_name,type=CephString '
690 'name=force,type=CephBool,req=false',
691 'Remove a service')
692 def _service_rm(self, service_name, force=False):
693 if service_name in ['mon', 'mgr'] and not force:
694 raise OrchestratorError('The mon and mgr services cannot be removed')
695 completion = self.remove_service(service_name)
696 self._orchestrator_wait([completion])
697 raise_if_exception(completion)
698 return HandleCommandResult(stdout=completion.result_str())
699
700 @_cli_write_command(
701 'orch spec dump',
702 'name=service_name,type=CephString,req=false',
703 desc='List all Service specs')
704 def _get_service_specs(self, service_name=None):
705 completion = self.list_specs(service_name=service_name)
706 self._orchestrator_wait([completion])
707 raise_if_exception(completion)
708 specs = completion.result
709 return HandleCommandResult(stdout=yaml.safe_dump_all(specs))
710
711 @_cli_write_command(
712 'orch apply',
713 'name=service_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
714 'name=placement,type=CephString,req=false '
715 'name=unmanaged,type=CephBool,req=false',
716 'Update the size or placement for a service or apply a large yaml spec')
717 def _apply_misc(self, service_type=None, placement=None, unmanaged=False, inbuf=None):
718 usage = """Usage:
719 ceph orch apply -i <yaml spec>
720 ceph orch apply <service_type> <placement> [--unmanaged]
721 """
722 if inbuf:
723 if service_type or placement or unmanaged:
724 raise OrchestratorValidationError(usage)
725 content: Iterator = yaml.load_all(inbuf)
726 specs = [ServiceSpec.from_json(s) for s in content]
727 else:
728 placement = PlacementSpec.from_string(placement)
729 placement.validate()
730
731 specs = [ServiceSpec(service_type, placement=placement, unmanaged=unmanaged)]
732
733 completion = self.apply(specs)
734 self._orchestrator_wait([completion])
735 raise_if_exception(completion)
736 return HandleCommandResult(stdout=completion.result_str())
737
738 @_cli_write_command(
739 'orch apply mds',
740 'name=fs_name,type=CephString '
741 'name=placement,type=CephString,req=false '
742 'name=unmanaged,type=CephBool,req=false',
743 'Update the number of MDS instances for the given fs_name')
744 def _apply_mds(self, fs_name, placement=None, unmanaged=False):
745 placement = PlacementSpec.from_string(placement)
746 placement.validate()
747 spec = ServiceSpec(
748 'mds', fs_name,
749 placement=placement,
750 unmanaged=unmanaged)
751 completion = self.apply_mds(spec)
752 self._orchestrator_wait([completion])
753 raise_if_exception(completion)
754 return HandleCommandResult(stdout=completion.result_str())
755
756 @_cli_write_command(
757 'orch apply rgw',
758 'name=realm_name,type=CephString '
759 'name=zone_name,type=CephString '
760 'name=subcluster,type=CephString,req=false '
761 'name=port,type=CephInt,req=false '
762 'name=ssl,type=CephBool,req=false '
763 'name=placement,type=CephString,req=false '
764 'name=unmanaged,type=CephBool,req=false',
765 'Update the number of RGW instances for the given zone')
766 def _apply_rgw(self, zone_name, realm_name,
767 subcluster=None,
768 port=None,
769 ssl=False,
770 placement=None,
771 unmanaged=False):
772 spec = RGWSpec(
773 rgw_realm=realm_name,
774 rgw_zone=zone_name,
775 subcluster=subcluster,
776 placement=PlacementSpec.from_string(placement),
777 unmanaged=unmanaged,
778 rgw_frontend_port=port,
779 ssl=ssl,
780 )
781 completion = self.apply_rgw(spec)
782 self._orchestrator_wait([completion])
783 raise_if_exception(completion)
784 return HandleCommandResult(stdout=completion.result_str())
785
786 @_cli_write_command(
787 'orch apply nfs',
801d1391
TL
788 'name=svc_id,type=CephString '
789 'name=pool,type=CephString '
790 'name=namespace,type=CephString,req=false '
9f95a23c
TL
791 'name=placement,type=CephString,req=false '
792 'name=unmanaged,type=CephBool,req=false',
793 'Scale an NFS service')
801d1391 794 def _apply_nfs(self, svc_id, pool, namespace=None, placement=None, unmanaged=False):
9f95a23c
TL
795 spec = NFSServiceSpec(
796 svc_id,
801d1391
TL
797 pool=pool,
798 namespace=namespace,
9f95a23c
TL
799 placement=PlacementSpec.from_string(placement),
800 unmanaged=unmanaged,
801 )
802 completion = self.apply_nfs(spec)
803 self._orchestrator_wait([completion])
804 return HandleCommandResult(stdout=completion.result_str())
805
806 @_cli_write_command(
807 'orch set backend',
808 "name=module_name,type=CephString,req=true",
809 'Select orchestrator module backend')
810 def _set_backend(self, module_name):
811 """
812 We implement a setter command instead of just having the user
813 modify the setting directly, so that we can validate they're setting
814 it to a module that really exists and is enabled.
815
816 There isn't a mechanism for ensuring they don't *disable* the module
817 later, but this is better than nothing.
818 """
819 mgr_map = self.get("mgr_map")
820
821 if module_name is None or module_name == "":
822 self.set_module_option("orchestrator", None)
823 return HandleCommandResult()
824
825 for module in mgr_map['available_modules']:
826 if module['name'] != module_name:
827 continue
828
829 if not module['can_run']:
830 continue
831
832 enabled = module['name'] in mgr_map['modules']
833 if not enabled:
834 return HandleCommandResult(-errno.EINVAL,
835 stderr="Module '{module_name}' is not enabled. \n Run "
836 "`ceph mgr module enable {module_name}` "
837 "to enable.".format(module_name=module_name))
838
839 try:
840 is_orchestrator = self.remote(module_name,
841 "is_orchestrator_module")
842 except NameError:
843 is_orchestrator = False
844
845 if not is_orchestrator:
846 return HandleCommandResult(-errno.EINVAL,
847 stderr="'{0}' is not an orchestrator module".format(module_name))
848
849 self.set_module_option("orchestrator", module_name)
850
851 return HandleCommandResult()
852
853 return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
854
855 @_cli_write_command(
856 'orch pause',
857 desc='Pause orchestrator background work')
858 def _pause(self):
859 self.pause()
860 return HandleCommandResult()
861
862 @_cli_write_command(
863 'orch resume',
864 desc='Resume orchestrator background work (if paused)')
865 def _resume(self):
866 self.resume()
867 return HandleCommandResult()
868
869 @_cli_write_command(
870 'orch cancel',
871 desc='cancels ongoing operations')
872 def _cancel(self):
873 """
874 ProgressReferences might get stuck. Let's unstuck them.
875 """
876 self.cancel_completions()
877 return HandleCommandResult()
878
879 @_cli_read_command(
880 'orch status',
881 desc='Report configured backend and its status')
882 def _status(self):
883 o = self._select_orchestrator()
884 if o is None:
885 raise NoOrchestrator()
886
887 avail, why = self.available()
888 if avail is None:
889 # The module does not report its availability
890 return HandleCommandResult(stdout="Backend: {0}".format(o))
891 else:
892 return HandleCommandResult(stdout="Backend: {0}\nAvailable: {1}{2}".format(
893 o, avail,
894 " ({0})".format(why) if not avail else ""
895 ))
896
897 def self_test(self):
898 old_orch = self._select_orchestrator()
899 self._set_backend('')
900 assert self._select_orchestrator() is None
901 self._set_backend(old_orch)
902
903 e1 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "ZeroDivisionError")
904 try:
905 raise_if_exception(e1)
906 assert False
907 except ZeroDivisionError as e:
908 assert e.args == ('hello', 'world')
909
910 e2 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "OrchestratorError")
911 try:
912 raise_if_exception(e2)
913 assert False
914 except OrchestratorError as e:
915 assert e.args == ('hello', 'world')
916
917 c = TrivialReadCompletion(result=True)
918 assert c.has_result
919
920 @_cli_write_command(
921 'orch upgrade check',
922 'name=image,type=CephString,req=false '
923 'name=ceph_version,type=CephString,req=false',
924 desc='Check service versions vs available and target containers')
925 def _upgrade_check(self, image=None, ceph_version=None):
926 completion = self.upgrade_check(image=image, version=ceph_version)
927 self._orchestrator_wait([completion])
928 raise_if_exception(completion)
929 return HandleCommandResult(stdout=completion.result_str())
930
931 @_cli_write_command(
932 'orch upgrade status',
933 desc='Check service versions vs available and target containers')
934 def _upgrade_status(self):
935 completion = self.upgrade_status()
936 self._orchestrator_wait([completion])
937 raise_if_exception(completion)
938 r = {
939 'target_image': completion.result.target_image,
940 'in_progress': completion.result.in_progress,
941 'services_complete': completion.result.services_complete,
942 'message': completion.result.message,
943 }
944 out = json.dumps(r, indent=4)
945 return HandleCommandResult(stdout=out)
946
947 @_cli_write_command(
948 'orch upgrade start',
949 'name=image,type=CephString,req=false '
950 'name=ceph_version,type=CephString,req=false',
951 desc='Initiate upgrade')
952 def _upgrade_start(self, image=None, ceph_version=None):
953 completion = self.upgrade_start(image, ceph_version)
954 self._orchestrator_wait([completion])
955 raise_if_exception(completion)
956 return HandleCommandResult(stdout=completion.result_str())
957
958 @_cli_write_command(
959 'orch upgrade pause',
960 desc='Pause an in-progress upgrade')
961 def _upgrade_pause(self):
962 completion = self.upgrade_pause()
963 self._orchestrator_wait([completion])
964 raise_if_exception(completion)
965 return HandleCommandResult(stdout=completion.result_str())
966
967 @_cli_write_command(
968 'orch upgrade resume',
969 desc='Resume paused upgrade')
970 def _upgrade_resume(self):
971 completion = self.upgrade_resume()
972 self._orchestrator_wait([completion])
973 raise_if_exception(completion)
974 return HandleCommandResult(stdout=completion.result_str())
975
976 @_cli_write_command(
977 'orch upgrade stop',
978 desc='Stop an in-progress upgrade')
979 def _upgrade_stop(self):
980 completion = self.upgrade_stop()
981 self._orchestrator_wait([completion])
982 raise_if_exception(completion)
983 return HandleCommandResult(stdout=completion.result_str())