]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/module.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / orchestrator / module.py
1 import datetime
2 import errno
3 import json
4 from typing import List, Set, Optional, Iterator, cast, Dict, Any, Union
5 import re
6 import ast
7
8 import yaml
9 import six
10 from prettytable import PrettyTable
11
12 from ceph.deployment.inventory import Device
13 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
14 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
15
16 from mgr_util import format_bytes, to_pretty_timedelta
17 from mgr_module import MgrModule, HandleCommandResult
18
19 from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_command, \
20 raise_if_exception, _cli_write_command, TrivialReadCompletion, OrchestratorError, \
21 NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
22 RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
23 ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, GenericSpec
24
25
26 def nice_delta(now, t, suffix=''):
27 if t:
28 return to_pretty_timedelta(now - t) + suffix
29 else:
30 return '-'
31
32
33 def to_format(what, format: str, many: bool, cls):
34 def to_json_1(obj):
35 if hasattr(obj, 'to_json'):
36 return obj.to_json()
37 return obj
38
39 def to_json_n(objs):
40 return [to_json_1(o) for o in objs]
41
42 to_json = to_json_n if many else to_json_1
43
44 if format == 'json':
45 return json.dumps(to_json(what), sort_keys=True)
46 elif format == 'json-pretty':
47 return json.dumps(to_json(what), indent=2, sort_keys=True)
48 elif format == 'yaml':
49 # fun with subinterpreters again. pyyaml depends on object identity.
50 # as what originates from a different subinterpreter we have to copy things here.
51 if cls:
52 flat = to_json(what)
53 copy = [cls.from_json(o) for o in flat] if many else cls.from_json(flat)
54 else:
55 copy = what
56
57 def to_yaml_1(obj):
58 if hasattr(obj, 'yaml_representer'):
59 return obj
60 return to_json_1(obj)
61
62 def to_yaml_n(objs):
63 return [to_yaml_1(o) for o in objs]
64
65 to_yaml = to_yaml_n if many else to_yaml_1
66
67 if many:
68 return yaml.dump_all(to_yaml(copy), default_flow_style=False)
69 return yaml.dump(to_yaml(copy), default_flow_style=False)
70 else:
71 raise OrchestratorError(f'unsupported format type: {format}')
72
73
74 def generate_preview_tables(data):
75 error = [x.get('error') for x in data if x.get('error')]
76 if error:
77 return json.dumps(error)
78 warning = [x.get('warning') for x in data if x.get('warning')]
79 osd_table = preview_table_osd(data)
80 service_table = preview_table_services(data)
81 tables = f"""
82 {''.join(warning)}
83
84 ####################
85 SERVICESPEC PREVIEWS
86 ####################
87 {service_table}
88
89 ################
90 OSDSPEC PREVIEWS
91 ################
92 {osd_table}
93 """
94 return tables
95
96
97 def preview_table_osd(data):
98 table = PrettyTable(header_style='upper', title='OSDSPEC PREVIEWS', border=True)
99 table.field_names = "service name host data db wal".split()
100 table.align = 'l'
101 table.left_padding_width = 0
102 table.right_padding_width = 2
103 for osd_data in data:
104 if osd_data.get('service_type') != 'osd':
105 continue
106 for host, specs in osd_data.get('data').items():
107 for spec in specs:
108 if spec.get('error'):
109 return spec.get('message')
110 dg_name = spec.get('osdspec')
111 for osd in spec.get('data', {}).get('osds', []):
112 db_path = '-'
113 wal_path = '-'
114 block_db = osd.get('block.db', {}).get('path')
115 block_wal = osd.get('block.wal', {}).get('path')
116 block_data = osd.get('data', {}).get('path', '')
117 if not block_data:
118 continue
119 if block_db:
120 db_path = spec.get('data', {}).get('vg', {}).get('devices', [])
121 if block_wal:
122 wal_path = spec.get('data', {}).get('wal_vg', {}).get('devices', [])
123 table.add_row(('osd', dg_name, host, block_data, db_path, wal_path))
124 return table.get_string()
125
126
127 def preview_table_services(data):
128 table = PrettyTable(header_style='upper', title="SERVICESPEC PREVIEW", border=True)
129 table.field_names = 'SERVICE NAME ADD_TO REMOVE_FROM'.split()
130 table.align = 'l'
131 table.left_padding_width = 0
132 table.right_padding_width = 2
133 for item in data:
134 if item.get('warning'):
135 continue
136 if item.get('service_type') != 'osd':
137 table.add_row((item.get('service_type'), item.get('service_name'),
138 " ".join(item.get('add')), " ".join(item.get('remove'))))
139 return table.get_string()
140
141
142
143 @six.add_metaclass(CLICommandMeta)
144 class OrchestratorCli(OrchestratorClientMixin, MgrModule):
145 MODULE_OPTIONS = [
146 {
147 'name': 'orchestrator',
148 'type': 'str',
149 'default': None,
150 'desc': 'Orchestrator backend',
151 'enum_allowed': ['cephadm', 'rook',
152 'test_orchestrator'],
153 'runtime': True,
154 },
155 ]
156 NATIVE_OPTIONS = [] # type: List[dict]
157
158 def __init__(self, *args, **kwargs):
159 super(OrchestratorCli, self).__init__(*args, **kwargs)
160 self.ident = set() # type: Set[str]
161 self.fault = set() # type: Set[str]
162 self._load()
163 self._refresh_health()
164
165 def _load(self):
166 active = self.get_store('active_devices')
167 if active:
168 decoded = json.loads(active)
169 self.ident = set(decoded.get('ident', []))
170 self.fault = set(decoded.get('fault', []))
171 self.log.debug('ident {}, fault {}'.format(self.ident, self.fault))
172
173 def _save(self):
174 encoded = json.dumps({
175 'ident': list(self.ident),
176 'fault': list(self.fault),
177 })
178 self.set_store('active_devices', encoded)
179
180 def _refresh_health(self):
181 h = {}
182 if self.ident:
183 h['DEVICE_IDENT_ON'] = {
184 'severity': 'warning',
185 'summary': '%d devices have ident light turned on' % len(
186 self.ident),
187 'detail': ['{} ident light enabled'.format(d) for d in self.ident]
188 }
189 if self.fault:
190 h['DEVICE_FAULT_ON'] = {
191 'severity': 'warning',
192 'summary': '%d devices have fault light turned on' % len(
193 self.fault),
194 'detail': ['{} fault light enabled'.format(d) for d in self.ident]
195 }
196 self.set_health_checks(h)
197
198 def _get_device_locations(self, dev_id):
199 # type: (str) -> List[DeviceLightLoc]
200 locs = [d['location'] for d in self.get('devices')['devices'] if d['devid'] == dev_id]
201 return [DeviceLightLoc(**l) for l in sum(locs, [])]
202
203 @_cli_read_command(
204 prefix='device ls-lights',
205 desc='List currently active device indicator lights')
206 def _device_ls(self):
207 return HandleCommandResult(
208 stdout=json.dumps({
209 'ident': list(self.ident),
210 'fault': list(self.fault)
211 }, indent=4, sort_keys=True))
212
213 def light_on(self, fault_ident, devid):
214 # type: (str, str) -> HandleCommandResult
215 assert fault_ident in ("fault", "ident")
216 locs = self._get_device_locations(devid)
217 if locs is None:
218 return HandleCommandResult(stderr='device {} not found'.format(devid),
219 retval=-errno.ENOENT)
220
221 getattr(self, fault_ident).add(devid)
222 self._save()
223 self._refresh_health()
224 completion = self.blink_device_light(fault_ident, True, locs)
225 self._orchestrator_wait([completion])
226 return HandleCommandResult(stdout=str(completion.result))
227
228 def light_off(self, fault_ident, devid, force):
229 # type: (str, str, bool) -> HandleCommandResult
230 assert fault_ident in ("fault", "ident")
231 locs = self._get_device_locations(devid)
232 if locs is None:
233 return HandleCommandResult(stderr='device {} not found'.format(devid),
234 retval=-errno.ENOENT)
235
236 try:
237 completion = self.blink_device_light(fault_ident, False, locs)
238 self._orchestrator_wait([completion])
239
240 if devid in getattr(self, fault_ident):
241 getattr(self, fault_ident).remove(devid)
242 self._save()
243 self._refresh_health()
244 return HandleCommandResult(stdout=str(completion.result))
245
246 except:
247 # There are several reasons the try: block might fail:
248 # 1. the device no longer exist
249 # 2. the device is no longer known to Ceph
250 # 3. the host is not reachable
251 if force and devid in getattr(self, fault_ident):
252 getattr(self, fault_ident).remove(devid)
253 self._save()
254 self._refresh_health()
255 raise
256
257 @_cli_write_command(
258 prefix='device light',
259 cmd_args='name=enable,type=CephChoices,strings=on|off '
260 'name=devid,type=CephString '
261 'name=light_type,type=CephChoices,strings=ident|fault,req=false '
262 'name=force,type=CephBool,req=false',
263 desc='Enable or disable the device light. Default type is `ident`\n'
264 'Usage: device light (on|off) <devid> [ident|fault] [--force]')
265 def _device_light(self, enable, devid, light_type=None, force=False):
266 # type: (str, str, Optional[str], bool) -> HandleCommandResult
267 light_type = light_type or 'ident'
268 on = enable == 'on'
269 if on:
270 return self.light_on(light_type, devid)
271 else:
272 return self.light_off(light_type, devid, force)
273
274 def _select_orchestrator(self):
275 return self.get_module_option("orchestrator")
276
277 @_cli_write_command(
278 'orch host add',
279 'name=hostname,type=CephString,req=true '
280 'name=addr,type=CephString,req=false '
281 'name=labels,type=CephString,n=N,req=false',
282 'Add a host')
283 def _add_host(self, hostname:str, addr: Optional[str]=None, labels: Optional[List[str]]=None):
284 s = HostSpec(hostname=hostname, addr=addr, labels=labels)
285 completion = self.add_host(s)
286 self._orchestrator_wait([completion])
287 raise_if_exception(completion)
288 return HandleCommandResult(stdout=completion.result_str())
289
290 @_cli_write_command(
291 'orch host rm',
292 "name=hostname,type=CephString,req=true",
293 'Remove a host')
294 def _remove_host(self, hostname):
295 completion = self.remove_host(hostname)
296 self._orchestrator_wait([completion])
297 raise_if_exception(completion)
298 return HandleCommandResult(stdout=completion.result_str())
299
300 @_cli_write_command(
301 'orch host set-addr',
302 'name=hostname,type=CephString '
303 'name=addr,type=CephString',
304 'Update a host address')
305 def _update_set_addr(self, hostname, addr):
306 completion = self.update_host_addr(hostname, addr)
307 self._orchestrator_wait([completion])
308 raise_if_exception(completion)
309 return HandleCommandResult(stdout=completion.result_str())
310
311 @_cli_read_command(
312 'orch host ls',
313 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false',
314 'List hosts')
315 def _get_hosts(self, format='plain'):
316 completion = self.get_hosts()
317 self._orchestrator_wait([completion])
318 raise_if_exception(completion)
319 if format != 'plain':
320 output = to_format(completion.result, format, many=True, cls=HostSpec)
321 else:
322 table = PrettyTable(
323 ['HOST', 'ADDR', 'LABELS', 'STATUS'],
324 border=False)
325 table.align = 'l'
326 table.left_padding_width = 0
327 table.right_padding_width = 2
328 for host in sorted(completion.result, key=lambda h: h.hostname):
329 table.add_row((host.hostname, host.addr, ' '.join(host.labels), host.status))
330 output = table.get_string()
331 return HandleCommandResult(stdout=output)
332
333 @_cli_write_command(
334 'orch host label add',
335 'name=hostname,type=CephString '
336 'name=label,type=CephString',
337 'Add a host label')
338 def _host_label_add(self, hostname, label):
339 completion = self.add_host_label(hostname, label)
340 self._orchestrator_wait([completion])
341 raise_if_exception(completion)
342 return HandleCommandResult(stdout=completion.result_str())
343
344 @_cli_write_command(
345 'orch host label rm',
346 'name=hostname,type=CephString '
347 'name=label,type=CephString',
348 'Remove a host label')
349 def _host_label_rm(self, hostname, label):
350 completion = self.remove_host_label(hostname, label)
351 self._orchestrator_wait([completion])
352 raise_if_exception(completion)
353 return HandleCommandResult(stdout=completion.result_str())
354
355 @_cli_write_command(
356 'orch host ok-to-stop',
357 'name=hostname,type=CephString',
358 desc='Check if the specified host can be safely stopped without reducing availability')
359 def _host_ok_to_stop(self, hostname: str):
360 completion = self.host_ok_to_stop(hostname)
361 self._orchestrator_wait([completion])
362 raise_if_exception(completion)
363 return HandleCommandResult(stdout=completion.result_str())
364
365 @_cli_read_command(
366 'orch device ls',
367 "name=hostname,type=CephString,n=N,req=false "
368 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false "
369 "name=refresh,type=CephBool,req=false",
370 'List devices on a host')
371 def _list_devices(self, hostname=None, format='plain', refresh=False):
372 # type: (Optional[List[str]], str, bool) -> HandleCommandResult
373 """
374 Provide information about storage devices present in cluster hosts
375
376 Note: this does not have to be completely synchronous. Slightly out of
377 date hardware inventory is fine as long as hardware ultimately appears
378 in the output of this command.
379 """
380 nf = InventoryFilter(hosts=hostname) if hostname else None
381
382 completion = self.get_inventory(host_filter=nf, refresh=refresh)
383
384 self._orchestrator_wait([completion])
385 raise_if_exception(completion)
386
387 if format != 'plain':
388 return HandleCommandResult(stdout=to_format(completion.result, format, many=True, cls=InventoryHost))
389 else:
390 out = []
391
392 table = PrettyTable(
393 ['HOST', 'PATH', 'TYPE', 'SIZE', 'DEVICE_ID', 'MODEL', 'VENDOR', 'ROTATIONAL', 'AVAIL',
394 'REJECT REASONS'],
395 border=False)
396 table.align = 'l'
397 table._align['SIZE'] = 'r'
398 table.left_padding_width = 0
399 table.right_padding_width = 2
400 for host_ in completion.result: # type: InventoryHost
401 for d in host_.devices.devices: # type: Device
402 table.add_row(
403 (
404 host_.name,
405 d.path,
406 d.human_readable_type,
407 format_bytes(d.sys_api.get('size', 0), 5),
408 d.device_id,
409 d.sys_api.get('model') or 'n/a',
410 d.sys_api.get('vendor') or 'n/a',
411 d.sys_api.get('rotational') or 'n/a',
412 d.available,
413 ', '.join(d.rejected_reasons)
414 )
415 )
416 out.append(table.get_string())
417 return HandleCommandResult(stdout='\n'.join(out))
418
419 @_cli_write_command(
420 'orch device zap',
421 'name=hostname,type=CephString '
422 'name=path,type=CephString '
423 'name=force,type=CephBool,req=false',
424 'Zap (erase!) a device so it can be re-used')
425 def _zap_device(self, hostname, path, force=False):
426 if not force:
427 raise OrchestratorError('must pass --force to PERMANENTLY ERASE DEVICE DATA')
428 completion = self.zap_device(hostname, path)
429 self._orchestrator_wait([completion])
430 raise_if_exception(completion)
431 return HandleCommandResult(stdout=completion.result_str())
432
433 @_cli_read_command(
434 'orch ls',
435 "name=service_type,type=CephString,req=false "
436 "name=service_name,type=CephString,req=false "
437 "name=export,type=CephBool,req=false "
438 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false "
439 "name=refresh,type=CephBool,req=false",
440 'List services known to orchestrator')
441 def _list_services(self, host=None, service_type=None, service_name=None, export=False, format='plain', refresh=False):
442
443 if export and format == 'plain':
444 format = 'yaml'
445
446 completion = self.describe_service(service_type,
447 service_name,
448 refresh=refresh)
449 self._orchestrator_wait([completion])
450 raise_if_exception(completion)
451 services: List[ServiceDescription] = completion.result
452
453 def ukn(s):
454 return '<unknown>' if s is None else s
455
456 # Sort the list for display
457 services.sort(key=lambda s: (ukn(s.spec.service_name())))
458
459 if len(services) == 0:
460 return HandleCommandResult(stdout="No services reported")
461 elif format != 'plain':
462 if export:
463 data = [s.spec for s in services]
464 return HandleCommandResult(stdout=to_format(data, format, many=True, cls=ServiceSpec))
465 else:
466 return HandleCommandResult(stdout=to_format(services, format, many=True, cls=ServiceDescription))
467 else:
468 now = datetime.datetime.utcnow()
469 table = PrettyTable(
470 ['NAME', 'RUNNING', 'REFRESHED', 'AGE',
471 'PLACEMENT',
472 'IMAGE NAME', 'IMAGE ID'
473 ],
474 border=False)
475 table.align['NAME'] = 'l'
476 table.align['RUNNING'] = 'r'
477 table.align['REFRESHED'] = 'l'
478 table.align['AGE'] = 'l'
479 table.align['IMAGE NAME'] = 'l'
480 table.align['IMAGE ID'] = 'l'
481 table.align['PLACEMENT'] = 'l'
482 table.left_padding_width = 0
483 table.right_padding_width = 2
484 for s in services:
485 if not s.spec:
486 pl = '<no spec>'
487 elif s.spec.unmanaged:
488 pl = '<unmanaged>'
489 else:
490 pl = s.spec.placement.pretty_str()
491 table.add_row((
492 s.spec.service_name(),
493 '%d/%d' % (s.running, s.size),
494 nice_delta(now, s.last_refresh, ' ago'),
495 nice_delta(now, s.created),
496 pl,
497 ukn(s.container_image_name),
498 ukn(s.container_image_id)[0:12],
499 ))
500
501 return HandleCommandResult(stdout=table.get_string())
502
503 @_cli_read_command(
504 'orch ps',
505 "name=hostname,type=CephString,req=false "
506 "name=service_name,type=CephString,req=false "
507 "name=daemon_type,type=CephString,req=false "
508 "name=daemon_id,type=CephString,req=false "
509 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false "
510 "name=refresh,type=CephBool,req=false",
511 'List daemons known to orchestrator')
512 def _list_daemons(self, hostname=None, service_name=None, daemon_type=None, daemon_id=None, format='plain', refresh=False):
513 completion = self.list_daemons(service_name,
514 daemon_type,
515 daemon_id=daemon_id,
516 host=hostname,
517 refresh=refresh)
518 self._orchestrator_wait([completion])
519 raise_if_exception(completion)
520 daemons: List[DaemonDescription] = completion.result
521
522 def ukn(s):
523 return '<unknown>' if s is None else s
524 # Sort the list for display
525 daemons.sort(key=lambda s: (ukn(s.daemon_type), ukn(s.hostname), ukn(s.daemon_id)))
526
527 if format != 'plain':
528 return HandleCommandResult(stdout=to_format(daemons, format, many=True, cls=DaemonDescription))
529 else:
530 if len(daemons) == 0:
531 return HandleCommandResult(stdout="No daemons reported")
532
533 now = datetime.datetime.utcnow()
534 table = PrettyTable(
535 ['NAME', 'HOST', 'STATUS', 'REFRESHED', 'AGE',
536 'VERSION', 'IMAGE NAME', 'IMAGE ID', 'CONTAINER ID'],
537 border=False)
538 table.align = 'l'
539 table.left_padding_width = 0
540 table.right_padding_width = 2
541 for s in sorted(daemons, key=lambda s: s.name()):
542 if s.status_desc:
543 status = s.status_desc
544 else:
545 status = {
546 -1: 'error',
547 0: 'stopped',
548 1: 'running',
549 None: '<unknown>'
550 }[s.status]
551 if s.status == 1 and s.started:
552 status += ' (%s)' % to_pretty_timedelta(now - s.started)
553
554 table.add_row((
555 s.name(),
556 ukn(s.hostname),
557 status,
558 nice_delta(now, s.last_refresh, ' ago'),
559 nice_delta(now, s.created),
560 ukn(s.version),
561 ukn(s.container_image_name),
562 ukn(s.container_image_id)[0:12],
563 ukn(s.container_id)))
564
565 return HandleCommandResult(stdout=table.get_string())
566
567 @_cli_write_command(
568 'orch apply osd',
569 'name=all_available_devices,type=CephBool,req=false '
570 'name=dry_run,type=CephBool,req=false '
571 'name=unmanaged,type=CephBool,req=false '
572 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
573 'Create OSD daemon(s) using a drive group spec')
574 def _apply_osd(self,
575 all_available_devices: bool = False,
576 format: str = 'plain',
577 unmanaged=None,
578 dry_run=None,
579 inbuf: Optional[str] = None) -> HandleCommandResult:
580 """Apply DriveGroupSpecs to create OSDs"""
581 usage = """
582 usage:
583 ceph orch apply osd -i <json_file/yaml_file> [--dry-run]
584 ceph orch apply osd --all-available-devices [--dry-run] [--unmanaged]
585
586 Restrictions:
587
588 Mutexes:
589 * -i, --all-available-devices
590 * -i, --unmanaged (this would overwrite the osdspec loaded from a file)
591
592 Parameters:
593
594 * --unmanaged
595 Only works with --all-available-devices.
596
597 Description:
598
599 * -i
600 An inbuf object like a file or a json/yaml blob containing a valid OSDSpec
601
602 * --all-available-devices
603 The most simple OSDSpec there is. Takes all as 'available' marked devices
604 and creates standalone OSDs on them.
605
606 * --unmanaged
607 Set a the unmanaged flag for all--available-devices (default is False)
608
609 Examples:
610
611 # ceph orch apply osd -i <file.yml|json>
612
613 Applies one or more OSDSpecs found in <file>
614
615 # ceph orch osd apply --all-available-devices --unmanaged=true
616
617 Creates and applies simple OSDSpec with the unmanaged flag set to <true>
618 """
619
620 if inbuf and all_available_devices:
621 # mutually exclusive
622 return HandleCommandResult(-errno.EINVAL, stderr=usage)
623
624 if not inbuf and not all_available_devices:
625 # one parameter must be present
626 return HandleCommandResult(-errno.EINVAL, stderr=usage)
627
628 if inbuf:
629 if unmanaged is not None:
630 return HandleCommandResult(-errno.EINVAL, stderr=usage)
631 try:
632 drivegroups = yaml.safe_load_all(inbuf)
633
634 dg_specs = []
635 for dg in drivegroups:
636 spec = DriveGroupSpec.from_json(dg)
637 if dry_run:
638 spec.preview_only = True
639 dg_specs.append(spec)
640
641 completion = self.apply(dg_specs)
642 self._orchestrator_wait([completion])
643 raise_if_exception(completion)
644 out = completion.result_str()
645 if dry_run:
646 completion = self.plan(dg_specs)
647 self._orchestrator_wait([completion])
648 raise_if_exception(completion)
649 data = completion.result
650 if format == 'plain':
651 out = preview_table_osd(data)
652 else:
653 out = to_format(data, format, many=True, cls=None)
654 return HandleCommandResult(stdout=out)
655
656 except ValueError as e:
657 msg = 'Failed to read JSON/YAML input: {}'.format(str(e)) + usage
658 return HandleCommandResult(-errno.EINVAL, stderr=msg)
659 if all_available_devices:
660 if unmanaged is None:
661 unmanaged = False
662 dg_specs = [
663 DriveGroupSpec(
664 service_id='all-available-devices',
665 placement=PlacementSpec(host_pattern='*'),
666 data_devices=DeviceSelection(all=True),
667 unmanaged=unmanaged,
668 preview_only=dry_run
669 )
670 ]
671 # This acts weird when abstracted to a function
672 completion = self.apply(dg_specs)
673 self._orchestrator_wait([completion])
674 raise_if_exception(completion)
675 out = completion.result_str()
676 if dry_run:
677 completion = self.plan(dg_specs)
678 self._orchestrator_wait([completion])
679 data = completion.result
680 if format == 'plain':
681 out = preview_table_osd(data)
682 else:
683 out = to_format(data, format, many=True, cls=None)
684 return HandleCommandResult(stdout=out)
685
686 return HandleCommandResult(-errno.EINVAL, stderr=usage)
687
688 @_cli_write_command(
689 'orch daemon add osd',
690 "name=svc_arg,type=CephString,req=false",
691 'Create an OSD service. Either --svc_arg=host:drives')
692 def _daemon_add_osd(self, svc_arg=None):
693 # type: (Optional[str]) -> HandleCommandResult
694 """Create one or more OSDs"""
695
696 usage = """
697 Usage:
698 ceph orch daemon add osd host:device1,device2,...
699 """
700 if not svc_arg:
701 return HandleCommandResult(-errno.EINVAL, stderr=usage)
702 try:
703 host_name, block_device = svc_arg.split(":")
704 block_devices = block_device.split(',')
705 devs = DeviceSelection(paths=block_devices)
706 drive_group = DriveGroupSpec(placement=PlacementSpec(host_pattern=host_name), data_devices=devs)
707 except (TypeError, KeyError, ValueError):
708 msg = "Invalid host:device spec: '{}'".format(svc_arg) + usage
709 return HandleCommandResult(-errno.EINVAL, stderr=msg)
710
711 completion = self.create_osds(drive_group)
712 self._orchestrator_wait([completion])
713 raise_if_exception(completion)
714 return HandleCommandResult(stdout=completion.result_str())
715
716 @_cli_write_command(
717 'orch osd rm',
718 "name=svc_id,type=CephString,n=N "
719 "name=replace,type=CephBool,req=false "
720 "name=force,type=CephBool,req=false",
721 'Remove OSD services')
722 def _osd_rm_start(self,
723 svc_id: List[str],
724 replace: bool = False,
725 force: bool = False) -> HandleCommandResult:
726 completion = self.remove_osds(svc_id, replace=replace, force=force)
727 self._orchestrator_wait([completion])
728 raise_if_exception(completion)
729 return HandleCommandResult(stdout=completion.result_str())
730
731 @_cli_write_command(
732 'orch osd rm stop',
733 "name=svc_id,type=CephString,n=N",
734 'Remove OSD services')
735 def _osd_rm_stop(self, svc_id: List[str]) -> HandleCommandResult:
736 completion = self.stop_remove_osds(svc_id)
737 self._orchestrator_wait([completion])
738 raise_if_exception(completion)
739 return HandleCommandResult(stdout=completion.result_str())
740
741 @_cli_write_command(
742 'orch osd rm status',
743 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
744 desc='status of OSD removal operation')
745 def _osd_rm_status(self, format='plain') -> HandleCommandResult:
746 completion = self.remove_osds_status()
747 self._orchestrator_wait([completion])
748 raise_if_exception(completion)
749 report = completion.result
750
751 if not report:
752 return HandleCommandResult(stdout="No OSD remove/replace operations reported")
753
754 if format != 'plain':
755 out = to_format(report, format, many=True, cls=None)
756 else:
757 table = PrettyTable(
758 ['OSD_ID', 'HOST', 'STATE', 'PG_COUNT', 'REPLACE', 'FORCE', 'DRAIN_STARTED_AT'],
759 border=False)
760 table.align = 'l'
761 table.left_padding_width = 0
762 table.right_padding_width = 2
763 for osd in sorted(report, key=lambda o: o.osd_id):
764 table.add_row([osd.osd_id, osd.nodename, osd.drain_status_human(),
765 osd.get_pg_count(), osd.replace, osd.replace, osd.drain_started_at])
766 out = table.get_string()
767
768 return HandleCommandResult(stdout=out)
769
770 @_cli_write_command(
771 'orch daemon add',
772 'name=daemon_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
773 'name=placement,type=CephString,req=false',
774 'Add daemon(s)')
775 def _daemon_add_misc(self,
776 daemon_type: Optional[str] = None,
777 placement: Optional[str] = None,
778 inbuf: Optional[str] = None) -> HandleCommandResult:
779 usage = f"""Usage:
780 ceph orch daemon add -i <json_file>
781 ceph orch daemon add {daemon_type or '<daemon_type>'} <placement>"""
782 if inbuf:
783 if daemon_type or placement:
784 raise OrchestratorValidationError(usage)
785 spec = ServiceSpec.from_json(yaml.safe_load(inbuf))
786 else:
787 spec = PlacementSpec.from_string(placement)
788 assert daemon_type
789 spec = ServiceSpec(daemon_type, placement=spec)
790
791 daemon_type = spec.service_type
792
793 if daemon_type == 'mon':
794 completion = self.add_mon(spec)
795 elif daemon_type == 'mgr':
796 completion = self.add_mgr(spec)
797 elif daemon_type == 'rbd-mirror':
798 completion = self.add_rbd_mirror(spec)
799 elif daemon_type == 'crash':
800 completion = self.add_crash(spec)
801 elif daemon_type == 'alertmanager':
802 completion = self.add_alertmanager(spec)
803 elif daemon_type == 'grafana':
804 completion = self.add_grafana(spec)
805 elif daemon_type == 'node-exporter':
806 completion = self.add_node_exporter(spec)
807 elif daemon_type == 'prometheus':
808 completion = self.add_prometheus(spec)
809 elif daemon_type == 'mds':
810 completion = self.add_mds(spec)
811 elif daemon_type == 'rgw':
812 completion = self.add_rgw(spec)
813 elif daemon_type == 'nfs':
814 completion = self.add_nfs(spec)
815 elif daemon_type == 'iscsi':
816 completion = self.add_iscsi(spec)
817 else:
818 raise OrchestratorValidationError(f'unknown daemon type `{daemon_type}`')
819
820 self._orchestrator_wait([completion])
821 raise_if_exception(completion)
822 return HandleCommandResult(stdout=completion.result_str())
823
824 @_cli_write_command(
825 'orch daemon add mds',
826 'name=fs_name,type=CephString '
827 'name=placement,type=CephString,req=false',
828 'Start MDS daemon(s)')
829 def _mds_add(self,
830 fs_name: str,
831 placement: Optional[str] = None,
832 inbuf: Optional[str] = None) -> HandleCommandResult:
833 if inbuf:
834 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
835
836 spec = ServiceSpec(
837 service_type='mds',
838 service_id=fs_name,
839 placement=PlacementSpec.from_string(placement),
840 )
841
842 completion = self.add_mds(spec)
843 self._orchestrator_wait([completion])
844 raise_if_exception(completion)
845 return HandleCommandResult(stdout=completion.result_str())
846
847 @_cli_write_command(
848 'orch daemon add rgw',
849 'name=realm_name,type=CephString '
850 'name=zone_name,type=CephString '
851 'name=subcluster,type=CephString,req=false '
852 'name=port,type=CephInt,req=false '
853 'name=ssl,type=CephBool,req=false '
854 'name=placement,type=CephString,req=false',
855 'Start RGW daemon(s)')
856 def _rgw_add(self,
857 realm_name: str,
858 zone_name: str,
859 subcluster: Optional[str] = None,
860 port: Optional[int] = None,
861 ssl: bool = False,
862 placement: Optional[str] = None,
863 inbuf: Optional[str] = None) -> HandleCommandResult:
864 if inbuf:
865 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
866
867 spec = RGWSpec(
868 rgw_realm=realm_name,
869 rgw_zone=zone_name,
870 subcluster=subcluster,
871 rgw_frontend_port=port,
872 ssl=ssl,
873 placement=PlacementSpec.from_string(placement),
874 )
875
876 completion = self.add_rgw(spec)
877 self._orchestrator_wait([completion])
878 raise_if_exception(completion)
879 return HandleCommandResult(stdout=completion.result_str())
880
881 @_cli_write_command(
882 'orch daemon add nfs',
883 "name=svc_id,type=CephString "
884 "name=pool,type=CephString "
885 "name=namespace,type=CephString,req=false "
886 'name=placement,type=CephString,req=false',
887 'Start NFS daemon(s)')
888 def _nfs_add(self,
889 svc_id: str,
890 pool: str,
891 namespace: Optional[str] = None,
892 placement: Optional[str] = None,
893 inbuf: Optional[str] = None) -> HandleCommandResult:
894 if inbuf:
895 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
896
897 spec = NFSServiceSpec(
898 service_id=svc_id,
899 pool=pool,
900 namespace=namespace,
901 placement=PlacementSpec.from_string(placement),
902 )
903
904 completion = self.add_nfs(spec)
905 self._orchestrator_wait([completion])
906 raise_if_exception(completion)
907 return HandleCommandResult(stdout=completion.result_str())
908
909 @_cli_write_command(
910 'orch daemon add iscsi',
911 'name=pool,type=CephString '
912 'name=api_user,type=CephString '
913 'name=api_password,type=CephString '
914 'name=trusted_ip_list,type=CephString,req=false '
915 'name=placement,type=CephString,req=false',
916 'Start iscsi daemon(s)')
917 def _iscsi_add(self,
918 pool: str,
919 api_user: str,
920 api_password: str,
921 trusted_ip_list: Optional[str] = None,
922 placement: Optional[str] = None,
923 inbuf: Optional[str] = None) -> HandleCommandResult:
924 if inbuf:
925 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
926
927 spec = IscsiServiceSpec(
928 service_id='iscsi',
929 pool=pool,
930 api_user=api_user,
931 api_password=api_password,
932 trusted_ip_list=trusted_ip_list,
933 placement=PlacementSpec.from_string(placement),
934 )
935
936 completion = self.add_iscsi(spec)
937 self._orchestrator_wait([completion])
938 raise_if_exception(completion)
939 return HandleCommandResult(stdout=completion.result_str())
940
941 @_cli_write_command(
942 'orch',
943 "name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig "
944 "name=service_name,type=CephString",
945 'Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)')
946 def _service_action(self, action, service_name):
947 completion = self.service_action(action, service_name)
948 self._orchestrator_wait([completion])
949 raise_if_exception(completion)
950 return HandleCommandResult(stdout=completion.result_str())
951
952 @_cli_write_command(
953 'orch daemon',
954 "name=action,type=CephChoices,strings=start|stop|restart|reconfig "
955 "name=name,type=CephString",
956 'Start, stop, restart, (redeploy,) or reconfig a specific daemon')
957 def _daemon_action(self, action, name):
958 if '.' not in name:
959 raise OrchestratorError('%s is not a valid daemon name' % name)
960 completion = self.daemon_action(action, name)
961 self._orchestrator_wait([completion])
962 raise_if_exception(completion)
963 return HandleCommandResult(stdout=completion.result_str())
964
965 @_cli_write_command(
966 'orch daemon redeploy',
967 "name=name,type=CephString "
968 "name=image,type=CephString,req=false",
969 'Redeploy a daemon (with a specifc image)')
970 def _daemon_action_redeploy(self, name, image):
971 if '.' not in name:
972 raise OrchestratorError('%s is not a valid daemon name' % name)
973 completion = self.daemon_action("redeploy", name, image=image)
974 self._orchestrator_wait([completion])
975 raise_if_exception(completion)
976 return HandleCommandResult(stdout=completion.result_str())
977
978 @_cli_write_command(
979 'orch daemon rm',
980 "name=names,type=CephString,n=N "
981 'name=force,type=CephBool,req=false',
982 'Remove specific daemon(s)')
983 def _daemon_rm(self, names, force=False):
984 for name in names:
985 if '.' not in name:
986 raise OrchestratorError('%s is not a valid daemon name' % name)
987 (daemon_type) = name.split('.')[0]
988 if not force and daemon_type in ['osd', 'mon', 'prometheus']:
989 raise OrchestratorError('must pass --force to REMOVE daemon with potentially PRECIOUS DATA for %s' % name)
990 completion = self.remove_daemons(names)
991 self._orchestrator_wait([completion])
992 raise_if_exception(completion)
993 return HandleCommandResult(stdout=completion.result_str())
994
995 @_cli_write_command(
996 'orch rm',
997 'name=service_name,type=CephString '
998 'name=force,type=CephBool,req=false',
999 'Remove a service')
1000 def _service_rm(self, service_name, force=False):
1001 if service_name in ['mon', 'mgr'] and not force:
1002 raise OrchestratorError('The mon and mgr services cannot be removed')
1003 completion = self.remove_service(service_name)
1004 self._orchestrator_wait([completion])
1005 raise_if_exception(completion)
1006 return HandleCommandResult(stdout=completion.result_str())
1007
1008 @_cli_write_command(
1009 'orch apply',
1010 'name=service_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
1011 'name=placement,type=CephString,req=false '
1012 'name=dry_run,type=CephBool,req=false '
1013 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
1014 'name=unmanaged,type=CephBool,req=false',
1015 'Update the size or placement for a service or apply a large yaml spec')
1016 def _apply_misc(self,
1017 service_type: Optional[str] = None,
1018 placement: Optional[str] = None,
1019 dry_run: bool = False,
1020 format: str = 'plain',
1021 unmanaged: bool = False,
1022 inbuf: Optional[str] = None) -> HandleCommandResult:
1023 usage = """Usage:
1024 ceph orch apply -i <yaml spec> [--dry-run]
1025 ceph orch apply <service_type> <placement> [--unmanaged]
1026 """
1027 if inbuf:
1028 if service_type or placement or unmanaged:
1029 raise OrchestratorValidationError(usage)
1030 content: Iterator = yaml.safe_load_all(inbuf)
1031 specs: List[Union[ServiceSpec, HostSpec]] = []
1032 for s in content:
1033 spec = json_to_generic_spec(s)
1034 if dry_run and not isinstance(spec, HostSpec):
1035 spec.preview_only = dry_run
1036 specs.append(spec)
1037 else:
1038 placementspec = PlacementSpec.from_string(placement)
1039 assert service_type
1040 specs = [ServiceSpec(service_type, placement=placementspec, unmanaged=unmanaged, preview_only=dry_run)]
1041
1042 completion = self.apply(specs)
1043 self._orchestrator_wait([completion])
1044 raise_if_exception(completion)
1045 out = completion.result_str()
1046 if dry_run:
1047 completion = self.plan(specs)
1048 self._orchestrator_wait([completion])
1049 raise_if_exception(completion)
1050 data = completion.result
1051 if format == 'plain':
1052 out = generate_preview_tables(data)
1053 else:
1054 out = to_format(data, format, many=True, cls=None)
1055 return HandleCommandResult(stdout=out)
1056
1057 @_cli_write_command(
1058 'orch apply mds',
1059 'name=fs_name,type=CephString '
1060 'name=placement,type=CephString,req=false '
1061 'name=dry_run,type=CephBool,req=false '
1062 'name=unmanaged,type=CephBool,req=false '
1063 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false',
1064 'Update the number of MDS instances for the given fs_name')
1065 def _apply_mds(self,
1066 fs_name: str,
1067 placement: Optional[str] = None,
1068 dry_run: bool = False,
1069 unmanaged: bool = False,
1070 format: str = 'plain',
1071 inbuf: Optional[str] = None) -> HandleCommandResult:
1072 if inbuf:
1073 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
1074
1075 spec = ServiceSpec(
1076 service_type='mds',
1077 service_id=fs_name,
1078 placement=PlacementSpec.from_string(placement),
1079 unmanaged=unmanaged,
1080 preview_only=dry_run)
1081
1082 completion = self.apply_mds(spec)
1083 self._orchestrator_wait([completion])
1084 raise_if_exception(completion)
1085 out = completion.result_str()
1086 if dry_run:
1087 completion_plan = self.plan([spec])
1088 self._orchestrator_wait([completion_plan])
1089 raise_if_exception(completion_plan)
1090 data = completion_plan.result
1091 if format == 'plain':
1092 out = preview_table_services(data)
1093 else:
1094 out = to_format(data, format, many=True, cls=None)
1095 return HandleCommandResult(stdout=out)
1096
1097 @_cli_write_command(
1098 'orch apply rgw',
1099 'name=realm_name,type=CephString '
1100 'name=zone_name,type=CephString '
1101 'name=subcluster,type=CephString,req=false '
1102 'name=port,type=CephInt,req=false '
1103 'name=ssl,type=CephBool,req=false '
1104 'name=placement,type=CephString,req=false '
1105 'name=dry_run,type=CephBool,req=false '
1106 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
1107 'name=unmanaged,type=CephBool,req=false',
1108 'Update the number of RGW instances for the given zone')
1109 def _apply_rgw(self,
1110 realm_name: str,
1111 zone_name: str,
1112 subcluster: Optional[str] = None,
1113 port: Optional[int] = None,
1114 ssl: bool = False,
1115 placement: Optional[str] = None,
1116 dry_run: bool = False,
1117 format: str = 'plain',
1118 unmanaged: bool = False,
1119 inbuf: Optional[str] = None) -> HandleCommandResult:
1120 if inbuf:
1121 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
1122
1123 spec = RGWSpec(
1124 rgw_realm=realm_name,
1125 rgw_zone=zone_name,
1126 subcluster=subcluster,
1127 rgw_frontend_port=port,
1128 ssl=ssl,
1129 placement=PlacementSpec.from_string(placement),
1130 unmanaged=unmanaged,
1131 preview_only=dry_run
1132 )
1133
1134 completion = self.apply_rgw(spec)
1135 self._orchestrator_wait([completion])
1136 raise_if_exception(completion)
1137 out = completion.result_str()
1138 if dry_run:
1139 completion_plan = self.plan([spec])
1140 self._orchestrator_wait([completion_plan])
1141 raise_if_exception(completion_plan)
1142 data = completion_plan.result
1143 if format == 'plain':
1144 out = preview_table_services(data)
1145 else:
1146 out = to_format(data, format, many=True, cls=None)
1147 return HandleCommandResult(stdout=out)
1148
1149 @_cli_write_command(
1150 'orch apply nfs',
1151 'name=svc_id,type=CephString '
1152 'name=pool,type=CephString '
1153 'name=namespace,type=CephString,req=false '
1154 'name=placement,type=CephString,req=false '
1155 'name=dry_run,type=CephBool,req=false '
1156 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
1157 'name=unmanaged,type=CephBool,req=false',
1158 'Scale an NFS service')
1159 def _apply_nfs(self,
1160 svc_id: str,
1161 pool: str,
1162 namespace: Optional[str] = None,
1163 placement: Optional[str] = None,
1164 format: str = 'plain',
1165 dry_run: bool = False,
1166 unmanaged: bool = False,
1167 inbuf: Optional[str] = None) -> HandleCommandResult:
1168 if inbuf:
1169 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
1170
1171 spec = NFSServiceSpec(
1172 service_id=svc_id,
1173 pool=pool,
1174 namespace=namespace,
1175 placement=PlacementSpec.from_string(placement),
1176 unmanaged=unmanaged,
1177 preview_only=dry_run
1178 )
1179
1180 completion = self.apply_nfs(spec)
1181 self._orchestrator_wait([completion])
1182 raise_if_exception(completion)
1183 out = completion.result_str()
1184 if dry_run:
1185 completion_plan = self.plan([spec])
1186 self._orchestrator_wait([completion_plan])
1187 raise_if_exception(completion_plan)
1188 data = completion_plan.result
1189 if format == 'plain':
1190 out = preview_table_services(data)
1191 else:
1192 out = to_format(data, format, many=True, cls=None)
1193 return HandleCommandResult(stdout=out)
1194
1195 @_cli_write_command(
1196 'orch apply iscsi',
1197 'name=pool,type=CephString '
1198 'name=api_user,type=CephString '
1199 'name=api_password,type=CephString '
1200 'name=trusted_ip_list,type=CephString,req=false '
1201 'name=placement,type=CephString,req=false '
1202 'name=dry_run,type=CephBool,req=false '
1203 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
1204 'name=unmanaged,type=CephBool,req=false',
1205 'Scale an iSCSI service')
1206 def _apply_iscsi(self,
1207 pool: str,
1208 api_user: str,
1209 api_password: str,
1210 trusted_ip_list: Optional[str] = None,
1211 placement: Optional[str] = None,
1212 unmanaged: bool = False,
1213 dry_run: bool = False,
1214 format: str = 'plain',
1215 inbuf: Optional[str] = None) -> HandleCommandResult:
1216 if inbuf:
1217 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
1218
1219 spec = IscsiServiceSpec(
1220 service_id='iscsi',
1221 pool=pool,
1222 api_user=api_user,
1223 api_password=api_password,
1224 trusted_ip_list=trusted_ip_list,
1225 placement=PlacementSpec.from_string(placement),
1226 unmanaged=unmanaged,
1227 preview_only=dry_run
1228 )
1229
1230 completion = self.apply_iscsi(spec)
1231 self._orchestrator_wait([completion])
1232 raise_if_exception(completion)
1233 out = completion.result_str()
1234 if dry_run:
1235 completion_plan = self.plan([spec])
1236 self._orchestrator_wait([completion_plan])
1237 raise_if_exception(completion_plan)
1238 data = completion_plan.result
1239 if format == 'plain':
1240 out = preview_table_services(data)
1241 else:
1242 out = to_format(data, format, many=True, cls=None)
1243 return HandleCommandResult(stdout=out)
1244
1245 @_cli_write_command(
1246 'orch set backend',
1247 "name=module_name,type=CephString,req=true",
1248 'Select orchestrator module backend')
1249 def _set_backend(self, module_name):
1250 """
1251 We implement a setter command instead of just having the user
1252 modify the setting directly, so that we can validate they're setting
1253 it to a module that really exists and is enabled.
1254
1255 There isn't a mechanism for ensuring they don't *disable* the module
1256 later, but this is better than nothing.
1257 """
1258 mgr_map = self.get("mgr_map")
1259
1260 if module_name is None or module_name == "":
1261 self.set_module_option("orchestrator", None)
1262 return HandleCommandResult()
1263
1264 for module in mgr_map['available_modules']:
1265 if module['name'] != module_name:
1266 continue
1267
1268 if not module['can_run']:
1269 continue
1270
1271 enabled = module['name'] in mgr_map['modules']
1272 if not enabled:
1273 return HandleCommandResult(-errno.EINVAL,
1274 stderr="Module '{module_name}' is not enabled. \n Run "
1275 "`ceph mgr module enable {module_name}` "
1276 "to enable.".format(module_name=module_name))
1277
1278 try:
1279 is_orchestrator = self.remote(module_name,
1280 "is_orchestrator_module")
1281 except NameError:
1282 is_orchestrator = False
1283
1284 if not is_orchestrator:
1285 return HandleCommandResult(-errno.EINVAL,
1286 stderr="'{0}' is not an orchestrator module".format(module_name))
1287
1288 self.set_module_option("orchestrator", module_name)
1289
1290 return HandleCommandResult()
1291
1292 return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
1293
1294 @_cli_write_command(
1295 'orch pause',
1296 desc='Pause orchestrator background work')
1297 def _pause(self):
1298 self.pause()
1299 return HandleCommandResult()
1300
1301 @_cli_write_command(
1302 'orch resume',
1303 desc='Resume orchestrator background work (if paused)')
1304 def _resume(self):
1305 self.resume()
1306 return HandleCommandResult()
1307
1308 @_cli_write_command(
1309 'orch cancel',
1310 desc='cancels ongoing operations')
1311 def _cancel(self):
1312 """
1313 ProgressReferences might get stuck. Let's unstuck them.
1314 """
1315 self.cancel_completions()
1316 return HandleCommandResult()
1317
1318 @_cli_read_command(
1319 'orch status',
1320 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false',
1321 desc='Report configured backend and its status')
1322 def _status(self, format='plain'):
1323 o = self._select_orchestrator()
1324 if o is None:
1325 raise NoOrchestrator()
1326
1327 avail, why = self.available()
1328 result = {
1329 "backend": o
1330 }
1331 if avail is not None:
1332 result['available'] = avail
1333 if not avail:
1334 result['reason'] = why
1335
1336 if format != 'plain':
1337 output = to_format(result, format, many=False, cls=None)
1338 else:
1339 output = "Backend: {0}".format(result['backend'])
1340 if 'available' in result:
1341 output += "\nAvailable: {0}".format(result['available'])
1342 if 'reason' in result:
1343 output += ' ({0})'.format(result['reason'])
1344 return HandleCommandResult(stdout=output)
1345
1346 def self_test(self):
1347 old_orch = self._select_orchestrator()
1348 self._set_backend('')
1349 assert self._select_orchestrator() is None
1350 self._set_backend(old_orch)
1351
1352 e1 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "ZeroDivisionError")
1353 try:
1354 raise_if_exception(e1)
1355 assert False
1356 except ZeroDivisionError as e:
1357 assert e.args == ('hello, world',)
1358
1359 e2 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "OrchestratorError")
1360 try:
1361 raise_if_exception(e2)
1362 assert False
1363 except OrchestratorError as e:
1364 assert e.args == ('hello, world',)
1365
1366 c = TrivialReadCompletion(result=True)
1367 assert c.has_result
1368
1369 @staticmethod
1370 def _upgrade_check_image_name(image, ceph_version):
1371 """
1372 >>> OrchestratorCli._upgrade_check_image_name('v15.2.0', None)
1373 Traceback (most recent call last):
1374 orchestrator._interface.OrchestratorValidationError: Error: unable to pull image name `v15.2.0`.
1375 Maybe you meant `--ceph-version 15.2.0`?
1376
1377 """
1378 if image and re.match(r'^v?\d+\.\d+\.\d+$', image) and ceph_version is None:
1379 ver = image[1:] if image.startswith('v') else image
1380 s = f"Error: unable to pull image name `{image}`.\n" \
1381 f" Maybe you meant `--ceph-version {ver}`?"
1382 raise OrchestratorValidationError(s)
1383
1384 @_cli_write_command(
1385 'orch upgrade check',
1386 'name=image,type=CephString,req=false '
1387 'name=ceph_version,type=CephString,req=false',
1388 desc='Check service versions vs available and target containers')
1389 def _upgrade_check(self, image=None, ceph_version=None):
1390 self._upgrade_check_image_name(image, ceph_version)
1391 completion = self.upgrade_check(image=image, version=ceph_version)
1392 self._orchestrator_wait([completion])
1393 raise_if_exception(completion)
1394 return HandleCommandResult(stdout=completion.result_str())
1395
1396 @_cli_write_command(
1397 'orch upgrade status',
1398 desc='Check service versions vs available and target containers')
1399 def _upgrade_status(self):
1400 completion = self.upgrade_status()
1401 self._orchestrator_wait([completion])
1402 raise_if_exception(completion)
1403 r = {
1404 'target_image': completion.result.target_image,
1405 'in_progress': completion.result.in_progress,
1406 'services_complete': completion.result.services_complete,
1407 'message': completion.result.message,
1408 }
1409 out = json.dumps(r, indent=4)
1410 return HandleCommandResult(stdout=out)
1411
1412 @_cli_write_command(
1413 'orch upgrade start',
1414 'name=image,type=CephString,req=false '
1415 'name=ceph_version,type=CephString,req=false',
1416 desc='Initiate upgrade')
1417 def _upgrade_start(self, image=None, ceph_version=None):
1418 self._upgrade_check_image_name(image, ceph_version)
1419 completion = self.upgrade_start(image, ceph_version)
1420 self._orchestrator_wait([completion])
1421 raise_if_exception(completion)
1422 return HandleCommandResult(stdout=completion.result_str())
1423
1424 @_cli_write_command(
1425 'orch upgrade pause',
1426 desc='Pause an in-progress upgrade')
1427 def _upgrade_pause(self):
1428 completion = self.upgrade_pause()
1429 self._orchestrator_wait([completion])
1430 raise_if_exception(completion)
1431 return HandleCommandResult(stdout=completion.result_str())
1432
1433 @_cli_write_command(
1434 'orch upgrade resume',
1435 desc='Resume paused upgrade')
1436 def _upgrade_resume(self):
1437 completion = self.upgrade_resume()
1438 self._orchestrator_wait([completion])
1439 raise_if_exception(completion)
1440 return HandleCommandResult(stdout=completion.result_str())
1441
1442 @_cli_write_command(
1443 'orch upgrade stop',
1444 desc='Stop an in-progress upgrade')
1445 def _upgrade_stop(self):
1446 completion = self.upgrade_stop()
1447 self._orchestrator_wait([completion])
1448 raise_if_exception(completion)
1449 return HandleCommandResult(stdout=completion.result_str())