4 from typing
import List
, Set
, Optional
, Iterator
, cast
, Dict
, Any
, Union
10 from prettytable
import PrettyTable
12 from ceph
.deployment
.inventory
import Device
13 from ceph
.deployment
.drive_group
import DriveGroupSpec
, DeviceSelection
14 from ceph
.deployment
.service_spec
import PlacementSpec
, ServiceSpec
16 from mgr_util
import format_bytes
, to_pretty_timedelta
17 from mgr_module
import MgrModule
, HandleCommandResult
19 from ._interface
import OrchestratorClientMixin
, DeviceLightLoc
, _cli_read_command
, \
20 raise_if_exception
, _cli_write_command
, TrivialReadCompletion
, OrchestratorError
, \
21 NoOrchestrator
, OrchestratorValidationError
, NFSServiceSpec
, \
22 RGWSpec
, InventoryFilter
, InventoryHost
, HostSpec
, CLICommandMeta
, \
23 ServiceDescription
, DaemonDescription
, IscsiServiceSpec
, json_to_generic_spec
, GenericSpec
26 def nice_delta(now
, t
, suffix
=''):
28 return to_pretty_timedelta(now
- t
) + suffix
33 def to_format(what
, format
: str, many
: bool, cls
):
35 if hasattr(obj
, 'to_json'):
40 return [to_json_1(o
) for o
in objs
]
42 to_json
= to_json_n
if many
else to_json_1
45 return json
.dumps(to_json(what
), sort_keys
=True)
46 elif format
== 'json-pretty':
47 return json
.dumps(to_json(what
), indent
=2, sort_keys
=True)
48 elif format
== 'yaml':
49 # fun with subinterpreters again. pyyaml depends on object identity.
50 # as what originates from a different subinterpreter we have to copy things here.
53 copy
= [cls
.from_json(o
) for o
in flat
] if many
else cls
.from_json(flat
)
58 if hasattr(obj
, 'yaml_representer'):
63 return [to_yaml_1(o
) for o
in objs
]
65 to_yaml
= to_yaml_n
if many
else to_yaml_1
68 return yaml
.dump_all(to_yaml(copy
), default_flow_style
=False)
69 return yaml
.dump(to_yaml(copy
), default_flow_style
=False)
71 raise OrchestratorError(f
'unsupported format type: {format}')
74 def generate_preview_tables(data
):
75 error
= [x
.get('error') for x
in data
if x
.get('error')]
77 return json
.dumps(error
)
78 warning
= [x
.get('warning') for x
in data
if x
.get('warning')]
79 osd_table
= preview_table_osd(data
)
80 service_table
= preview_table_services(data
)
97 def preview_table_osd(data
):
98 table
= PrettyTable(header_style
='upper', title
='OSDSPEC PREVIEWS', border
=True)
99 table
.field_names
= "service name host data db wal".split()
101 table
.left_padding_width
= 0
102 table
.right_padding_width
= 2
103 for osd_data
in data
:
104 if osd_data
.get('service_type') != 'osd':
106 for host
, specs
in osd_data
.get('data').items():
108 if spec
.get('error'):
109 return spec
.get('message')
110 dg_name
= spec
.get('osdspec')
111 for osd
in spec
.get('data', {}).get('osds', []):
114 block_db
= osd
.get('block.db', {}).get('path')
115 block_wal
= osd
.get('block.wal', {}).get('path')
116 block_data
= osd
.get('data', {}).get('path', '')
120 db_path
= spec
.get('data', {}).get('vg', {}).get('devices', [])
122 wal_path
= spec
.get('data', {}).get('wal_vg', {}).get('devices', [])
123 table
.add_row(('osd', dg_name
, host
, block_data
, db_path
, wal_path
))
124 return table
.get_string()
127 def preview_table_services(data
):
128 table
= PrettyTable(header_style
='upper', title
="SERVICESPEC PREVIEW", border
=True)
129 table
.field_names
= 'SERVICE NAME ADD_TO REMOVE_FROM'.split()
131 table
.left_padding_width
= 0
132 table
.right_padding_width
= 2
134 if item
.get('warning'):
136 if item
.get('service_type') != 'osd':
137 table
.add_row((item
.get('service_type'), item
.get('service_name'),
138 " ".join(item
.get('add')), " ".join(item
.get('remove'))))
139 return table
.get_string()
143 @six.add_metaclass(CLICommandMeta
)
144 class OrchestratorCli(OrchestratorClientMixin
, MgrModule
):
147 'name': 'orchestrator',
150 'desc': 'Orchestrator backend',
151 'enum_allowed': ['cephadm', 'rook',
152 'test_orchestrator'],
156 NATIVE_OPTIONS
= [] # type: List[dict]
158 def __init__(self
, *args
, **kwargs
):
159 super(OrchestratorCli
, self
).__init
__(*args
, **kwargs
)
160 self
.ident
= set() # type: Set[str]
161 self
.fault
= set() # type: Set[str]
163 self
._refresh
_health
()
166 active
= self
.get_store('active_devices')
168 decoded
= json
.loads(active
)
169 self
.ident
= set(decoded
.get('ident', []))
170 self
.fault
= set(decoded
.get('fault', []))
171 self
.log
.debug('ident {}, fault {}'.format(self
.ident
, self
.fault
))
174 encoded
= json
.dumps({
175 'ident': list(self
.ident
),
176 'fault': list(self
.fault
),
178 self
.set_store('active_devices', encoded
)
180 def _refresh_health(self
):
183 h
['DEVICE_IDENT_ON'] = {
184 'severity': 'warning',
185 'summary': '%d devices have ident light turned on' % len(
187 'detail': ['{} ident light enabled'.format(d
) for d
in self
.ident
]
190 h
['DEVICE_FAULT_ON'] = {
191 'severity': 'warning',
192 'summary': '%d devices have fault light turned on' % len(
194 'detail': ['{} fault light enabled'.format(d
) for d
in self
.ident
]
196 self
.set_health_checks(h
)
198 def _get_device_locations(self
, dev_id
):
199 # type: (str) -> List[DeviceLightLoc]
200 locs
= [d
['location'] for d
in self
.get('devices')['devices'] if d
['devid'] == dev_id
]
201 return [DeviceLightLoc(**l
) for l
in sum(locs
, [])]
204 prefix
='device ls-lights',
205 desc
='List currently active device indicator lights')
206 def _device_ls(self
):
207 return HandleCommandResult(
209 'ident': list(self
.ident
),
210 'fault': list(self
.fault
)
211 }, indent
=4, sort_keys
=True))
213 def light_on(self
, fault_ident
, devid
):
214 # type: (str, str) -> HandleCommandResult
215 assert fault_ident
in ("fault", "ident")
216 locs
= self
._get
_device
_locations
(devid
)
218 return HandleCommandResult(stderr
='device {} not found'.format(devid
),
219 retval
=-errno
.ENOENT
)
221 getattr(self
, fault_ident
).add(devid
)
223 self
._refresh
_health
()
224 completion
= self
.blink_device_light(fault_ident
, True, locs
)
225 self
._orchestrator
_wait
([completion
])
226 return HandleCommandResult(stdout
=str(completion
.result
))
228 def light_off(self
, fault_ident
, devid
, force
):
229 # type: (str, str, bool) -> HandleCommandResult
230 assert fault_ident
in ("fault", "ident")
231 locs
= self
._get
_device
_locations
(devid
)
233 return HandleCommandResult(stderr
='device {} not found'.format(devid
),
234 retval
=-errno
.ENOENT
)
237 completion
= self
.blink_device_light(fault_ident
, False, locs
)
238 self
._orchestrator
_wait
([completion
])
240 if devid
in getattr(self
, fault_ident
):
241 getattr(self
, fault_ident
).remove(devid
)
243 self
._refresh
_health
()
244 return HandleCommandResult(stdout
=str(completion
.result
))
247 # There are several reasons the try: block might fail:
248 # 1. the device no longer exist
249 # 2. the device is no longer known to Ceph
250 # 3. the host is not reachable
251 if force
and devid
in getattr(self
, fault_ident
):
252 getattr(self
, fault_ident
).remove(devid
)
254 self
._refresh
_health
()
258 prefix
='device light',
259 cmd_args
='name=enable,type=CephChoices,strings=on|off '
260 'name=devid,type=CephString '
261 'name=light_type,type=CephChoices,strings=ident|fault,req=false '
262 'name=force,type=CephBool,req=false',
263 desc
='Enable or disable the device light. Default type is `ident`\n'
264 'Usage: device light (on|off) <devid> [ident|fault] [--force]')
265 def _device_light(self
, enable
, devid
, light_type
=None, force
=False):
266 # type: (str, str, Optional[str], bool) -> HandleCommandResult
267 light_type
= light_type
or 'ident'
270 return self
.light_on(light_type
, devid
)
272 return self
.light_off(light_type
, devid
, force
)
274 def _select_orchestrator(self
):
275 return self
.get_module_option("orchestrator")
279 'name=hostname,type=CephString,req=true '
280 'name=addr,type=CephString,req=false '
281 'name=labels,type=CephString,n=N,req=false',
283 def _add_host(self
, hostname
:str, addr
: Optional
[str]=None, labels
: Optional
[List
[str]]=None):
284 s
= HostSpec(hostname
=hostname
, addr
=addr
, labels
=labels
)
285 completion
= self
.add_host(s
)
286 self
._orchestrator
_wait
([completion
])
287 raise_if_exception(completion
)
288 return HandleCommandResult(stdout
=completion
.result_str())
292 "name=hostname,type=CephString,req=true",
294 def _remove_host(self
, hostname
):
295 completion
= self
.remove_host(hostname
)
296 self
._orchestrator
_wait
([completion
])
297 raise_if_exception(completion
)
298 return HandleCommandResult(stdout
=completion
.result_str())
301 'orch host set-addr',
302 'name=hostname,type=CephString '
303 'name=addr,type=CephString',
304 'Update a host address')
305 def _update_set_addr(self
, hostname
, addr
):
306 completion
= self
.update_host_addr(hostname
, addr
)
307 self
._orchestrator
_wait
([completion
])
308 raise_if_exception(completion
)
309 return HandleCommandResult(stdout
=completion
.result_str())
313 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false',
315 def _get_hosts(self
, format
='plain'):
316 completion
= self
.get_hosts()
317 self
._orchestrator
_wait
([completion
])
318 raise_if_exception(completion
)
319 if format
!= 'plain':
320 output
= to_format(completion
.result
, format
, many
=True, cls
=HostSpec
)
323 ['HOST', 'ADDR', 'LABELS', 'STATUS'],
326 table
.left_padding_width
= 0
327 table
.right_padding_width
= 2
328 for host
in sorted(completion
.result
, key
=lambda h
: h
.hostname
):
329 table
.add_row((host
.hostname
, host
.addr
, ' '.join(host
.labels
), host
.status
))
330 output
= table
.get_string()
331 return HandleCommandResult(stdout
=output
)
334 'orch host label add',
335 'name=hostname,type=CephString '
336 'name=label,type=CephString',
338 def _host_label_add(self
, hostname
, label
):
339 completion
= self
.add_host_label(hostname
, label
)
340 self
._orchestrator
_wait
([completion
])
341 raise_if_exception(completion
)
342 return HandleCommandResult(stdout
=completion
.result_str())
345 'orch host label rm',
346 'name=hostname,type=CephString '
347 'name=label,type=CephString',
348 'Remove a host label')
349 def _host_label_rm(self
, hostname
, label
):
350 completion
= self
.remove_host_label(hostname
, label
)
351 self
._orchestrator
_wait
([completion
])
352 raise_if_exception(completion
)
353 return HandleCommandResult(stdout
=completion
.result_str())
356 'orch host ok-to-stop',
357 'name=hostname,type=CephString',
358 desc
='Check if the specified host can be safely stopped without reducing availability')
359 def _host_ok_to_stop(self
, hostname
: str):
360 completion
= self
.host_ok_to_stop(hostname
)
361 self
._orchestrator
_wait
([completion
])
362 raise_if_exception(completion
)
363 return HandleCommandResult(stdout
=completion
.result_str())
367 "name=hostname,type=CephString,n=N,req=false "
368 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false "
369 "name=refresh,type=CephBool,req=false",
370 'List devices on a host')
371 def _list_devices(self
, hostname
=None, format
='plain', refresh
=False):
372 # type: (Optional[List[str]], str, bool) -> HandleCommandResult
374 Provide information about storage devices present in cluster hosts
376 Note: this does not have to be completely synchronous. Slightly out of
377 date hardware inventory is fine as long as hardware ultimately appears
378 in the output of this command.
380 nf
= InventoryFilter(hosts
=hostname
) if hostname
else None
382 completion
= self
.get_inventory(host_filter
=nf
, refresh
=refresh
)
384 self
._orchestrator
_wait
([completion
])
385 raise_if_exception(completion
)
387 if format
!= 'plain':
388 return HandleCommandResult(stdout
=to_format(completion
.result
, format
, many
=True, cls
=InventoryHost
))
393 ['HOST', 'PATH', 'TYPE', 'SIZE', 'DEVICE_ID', 'MODEL', 'VENDOR', 'ROTATIONAL', 'AVAIL',
397 table
._align
['SIZE'] = 'r'
398 table
.left_padding_width
= 0
399 table
.right_padding_width
= 2
400 for host_
in completion
.result
: # type: InventoryHost
401 for d
in host_
.devices
.devices
: # type: Device
406 d
.human_readable_type
,
407 format_bytes(d
.sys_api
.get('size', 0), 5),
409 d
.sys_api
.get('model') or 'n/a',
410 d
.sys_api
.get('vendor') or 'n/a',
411 d
.sys_api
.get('rotational') or 'n/a',
413 ', '.join(d
.rejected_reasons
)
416 out
.append(table
.get_string())
417 return HandleCommandResult(stdout
='\n'.join(out
))
421 'name=hostname,type=CephString '
422 'name=path,type=CephString '
423 'name=force,type=CephBool,req=false',
424 'Zap (erase!) a device so it can be re-used')
425 def _zap_device(self
, hostname
, path
, force
=False):
427 raise OrchestratorError('must pass --force to PERMANENTLY ERASE DEVICE DATA')
428 completion
= self
.zap_device(hostname
, path
)
429 self
._orchestrator
_wait
([completion
])
430 raise_if_exception(completion
)
431 return HandleCommandResult(stdout
=completion
.result_str())
435 "name=service_type,type=CephString,req=false "
436 "name=service_name,type=CephString,req=false "
437 "name=export,type=CephBool,req=false "
438 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false "
439 "name=refresh,type=CephBool,req=false",
440 'List services known to orchestrator')
441 def _list_services(self
, host
=None, service_type
=None, service_name
=None, export
=False, format
='plain', refresh
=False):
443 if export
and format
== 'plain':
446 completion
= self
.describe_service(service_type
,
449 self
._orchestrator
_wait
([completion
])
450 raise_if_exception(completion
)
451 services
: List
[ServiceDescription
] = completion
.result
454 return '<unknown>' if s
is None else s
456 # Sort the list for display
457 services
.sort(key
=lambda s
: (ukn(s
.spec
.service_name())))
459 if len(services
) == 0:
460 return HandleCommandResult(stdout
="No services reported")
461 elif format
!= 'plain':
463 data
= [s
.spec
for s
in services
]
464 return HandleCommandResult(stdout
=to_format(data
, format
, many
=True, cls
=ServiceSpec
))
466 return HandleCommandResult(stdout
=to_format(services
, format
, many
=True, cls
=ServiceDescription
))
468 now
= datetime
.datetime
.utcnow()
470 ['NAME', 'RUNNING', 'REFRESHED', 'AGE',
472 'IMAGE NAME', 'IMAGE ID'
475 table
.align
['NAME'] = 'l'
476 table
.align
['RUNNING'] = 'r'
477 table
.align
['REFRESHED'] = 'l'
478 table
.align
['AGE'] = 'l'
479 table
.align
['IMAGE NAME'] = 'l'
480 table
.align
['IMAGE ID'] = 'l'
481 table
.align
['PLACEMENT'] = 'l'
482 table
.left_padding_width
= 0
483 table
.right_padding_width
= 2
487 elif s
.spec
.unmanaged
:
490 pl
= s
.spec
.placement
.pretty_str()
492 s
.spec
.service_name(),
493 '%d/%d' % (s
.running
, s
.size
),
494 nice_delta(now
, s
.last_refresh
, ' ago'),
495 nice_delta(now
, s
.created
),
497 ukn(s
.container_image_name
),
498 ukn(s
.container_image_id
)[0:12],
501 return HandleCommandResult(stdout
=table
.get_string())
505 "name=hostname,type=CephString,req=false "
506 "name=service_name,type=CephString,req=false "
507 "name=daemon_type,type=CephString,req=false "
508 "name=daemon_id,type=CephString,req=false "
509 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false "
510 "name=refresh,type=CephBool,req=false",
511 'List daemons known to orchestrator')
512 def _list_daemons(self
, hostname
=None, service_name
=None, daemon_type
=None, daemon_id
=None, format
='plain', refresh
=False):
513 completion
= self
.list_daemons(service_name
,
518 self
._orchestrator
_wait
([completion
])
519 raise_if_exception(completion
)
520 daemons
: List
[DaemonDescription
] = completion
.result
523 return '<unknown>' if s
is None else s
524 # Sort the list for display
525 daemons
.sort(key
=lambda s
: (ukn(s
.daemon_type
), ukn(s
.hostname
), ukn(s
.daemon_id
)))
527 if format
!= 'plain':
528 return HandleCommandResult(stdout
=to_format(daemons
, format
, many
=True, cls
=DaemonDescription
))
530 if len(daemons
) == 0:
531 return HandleCommandResult(stdout
="No daemons reported")
533 now
= datetime
.datetime
.utcnow()
535 ['NAME', 'HOST', 'STATUS', 'REFRESHED', 'AGE',
536 'VERSION', 'IMAGE NAME', 'IMAGE ID', 'CONTAINER ID'],
539 table
.left_padding_width
= 0
540 table
.right_padding_width
= 2
541 for s
in sorted(daemons
, key
=lambda s
: s
.name()):
543 status
= s
.status_desc
551 if s
.status
== 1 and s
.started
:
552 status
+= ' (%s)' % to_pretty_timedelta(now
- s
.started
)
558 nice_delta(now
, s
.last_refresh
, ' ago'),
559 nice_delta(now
, s
.created
),
561 ukn(s
.container_image_name
),
562 ukn(s
.container_image_id
)[0:12],
563 ukn(s
.container_id
)))
565 return HandleCommandResult(stdout
=table
.get_string())
569 'name=all_available_devices,type=CephBool,req=false '
570 'name=dry_run,type=CephBool,req=false '
571 'name=unmanaged,type=CephBool,req=false '
572 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
573 'Create OSD daemon(s) using a drive group spec')
575 all_available_devices
: bool = False,
576 format
: str = 'plain',
579 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
580 """Apply DriveGroupSpecs to create OSDs"""
583 ceph orch apply osd -i <json_file/yaml_file> [--dry-run]
584 ceph orch apply osd --all-available-devices [--dry-run] [--unmanaged]
589 * -i, --all-available-devices
590 * -i, --unmanaged (this would overwrite the osdspec loaded from a file)
595 Only works with --all-available-devices.
600 An inbuf object like a file or a json/yaml blob containing a valid OSDSpec
602 * --all-available-devices
603 The most simple OSDSpec there is. Takes all as 'available' marked devices
604 and creates standalone OSDs on them.
607 Set a the unmanaged flag for all--available-devices (default is False)
611 # ceph orch apply osd -i <file.yml|json>
613 Applies one or more OSDSpecs found in <file>
615 # ceph orch osd apply --all-available-devices --unmanaged=true
617 Creates and applies simple OSDSpec with the unmanaged flag set to <true>
620 if inbuf
and all_available_devices
:
622 return HandleCommandResult(-errno
.EINVAL
, stderr
=usage
)
624 if not inbuf
and not all_available_devices
:
625 # one parameter must be present
626 return HandleCommandResult(-errno
.EINVAL
, stderr
=usage
)
629 if unmanaged
is not None:
630 return HandleCommandResult(-errno
.EINVAL
, stderr
=usage
)
632 drivegroups
= yaml
.safe_load_all(inbuf
)
635 for dg
in drivegroups
:
636 spec
= DriveGroupSpec
.from_json(dg
)
638 spec
.preview_only
= True
639 dg_specs
.append(spec
)
641 completion
= self
.apply(dg_specs
)
642 self
._orchestrator
_wait
([completion
])
643 raise_if_exception(completion
)
644 out
= completion
.result_str()
646 completion
= self
.plan(dg_specs
)
647 self
._orchestrator
_wait
([completion
])
648 raise_if_exception(completion
)
649 data
= completion
.result
650 if format
== 'plain':
651 out
= preview_table_osd(data
)
653 out
= to_format(data
, format
, many
=True, cls
=None)
654 return HandleCommandResult(stdout
=out
)
656 except ValueError as e
:
657 msg
= 'Failed to read JSON/YAML input: {}'.format(str(e
)) + usage
658 return HandleCommandResult(-errno
.EINVAL
, stderr
=msg
)
659 if all_available_devices
:
660 if unmanaged
is None:
664 service_id
='all-available-devices',
665 placement
=PlacementSpec(host_pattern
='*'),
666 data_devices
=DeviceSelection(all
=True),
671 # This acts weird when abstracted to a function
672 completion
= self
.apply(dg_specs
)
673 self
._orchestrator
_wait
([completion
])
674 raise_if_exception(completion
)
675 out
= completion
.result_str()
677 completion
= self
.plan(dg_specs
)
678 self
._orchestrator
_wait
([completion
])
679 data
= completion
.result
680 if format
== 'plain':
681 out
= preview_table_osd(data
)
683 out
= to_format(data
, format
, many
=True, cls
=None)
684 return HandleCommandResult(stdout
=out
)
686 return HandleCommandResult(-errno
.EINVAL
, stderr
=usage
)
689 'orch daemon add osd',
690 "name=svc_arg,type=CephString,req=false",
691 'Create an OSD service. Either --svc_arg=host:drives')
692 def _daemon_add_osd(self
, svc_arg
=None):
693 # type: (Optional[str]) -> HandleCommandResult
694 """Create one or more OSDs"""
698 ceph orch daemon add osd host:device1,device2,...
701 return HandleCommandResult(-errno
.EINVAL
, stderr
=usage
)
703 host_name
, block_device
= svc_arg
.split(":")
704 block_devices
= block_device
.split(',')
705 devs
= DeviceSelection(paths
=block_devices
)
706 drive_group
= DriveGroupSpec(placement
=PlacementSpec(host_pattern
=host_name
), data_devices
=devs
)
707 except (TypeError, KeyError, ValueError):
708 msg
= "Invalid host:device spec: '{}'".format(svc_arg
) + usage
709 return HandleCommandResult(-errno
.EINVAL
, stderr
=msg
)
711 completion
= self
.create_osds(drive_group
)
712 self
._orchestrator
_wait
([completion
])
713 raise_if_exception(completion
)
714 return HandleCommandResult(stdout
=completion
.result_str())
718 "name=svc_id,type=CephString,n=N "
719 "name=replace,type=CephBool,req=false "
720 "name=force,type=CephBool,req=false",
721 'Remove OSD services')
722 def _osd_rm_start(self
,
724 replace
: bool = False,
725 force
: bool = False) -> HandleCommandResult
:
726 completion
= self
.remove_osds(svc_id
, replace
=replace
, force
=force
)
727 self
._orchestrator
_wait
([completion
])
728 raise_if_exception(completion
)
729 return HandleCommandResult(stdout
=completion
.result_str())
733 "name=svc_id,type=CephString,n=N",
734 'Remove OSD services')
735 def _osd_rm_stop(self
, svc_id
: List
[str]) -> HandleCommandResult
:
736 completion
= self
.stop_remove_osds(svc_id
)
737 self
._orchestrator
_wait
([completion
])
738 raise_if_exception(completion
)
739 return HandleCommandResult(stdout
=completion
.result_str())
742 'orch osd rm status',
743 "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
744 desc
='status of OSD removal operation')
745 def _osd_rm_status(self
, format
='plain') -> HandleCommandResult
:
746 completion
= self
.remove_osds_status()
747 self
._orchestrator
_wait
([completion
])
748 raise_if_exception(completion
)
749 report
= completion
.result
752 return HandleCommandResult(stdout
="No OSD remove/replace operations reported")
754 if format
!= 'plain':
755 out
= to_format(report
, format
, many
=True, cls
=None)
758 ['OSD_ID', 'HOST', 'STATE', 'PG_COUNT', 'REPLACE', 'FORCE', 'DRAIN_STARTED_AT'],
761 table
.left_padding_width
= 0
762 table
.right_padding_width
= 2
763 for osd
in sorted(report
, key
=lambda o
: o
.osd_id
):
764 table
.add_row([osd
.osd_id
, osd
.nodename
, osd
.drain_status_human(),
765 osd
.get_pg_count(), osd
.replace
, osd
.replace
, osd
.drain_started_at
])
766 out
= table
.get_string()
768 return HandleCommandResult(stdout
=out
)
772 'name=daemon_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
773 'name=placement,type=CephString,req=false',
775 def _daemon_add_misc(self
,
776 daemon_type
: Optional
[str] = None,
777 placement
: Optional
[str] = None,
778 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
780 ceph orch daemon add -i <json_file>
781 ceph orch daemon add {daemon_type or '<daemon_type>'} <placement>"""
783 if daemon_type
or placement
:
784 raise OrchestratorValidationError(usage
)
785 spec
= ServiceSpec
.from_json(yaml
.safe_load(inbuf
))
787 spec
= PlacementSpec
.from_string(placement
)
789 spec
= ServiceSpec(daemon_type
, placement
=spec
)
791 daemon_type
= spec
.service_type
793 if daemon_type
== 'mon':
794 completion
= self
.add_mon(spec
)
795 elif daemon_type
== 'mgr':
796 completion
= self
.add_mgr(spec
)
797 elif daemon_type
== 'rbd-mirror':
798 completion
= self
.add_rbd_mirror(spec
)
799 elif daemon_type
== 'crash':
800 completion
= self
.add_crash(spec
)
801 elif daemon_type
== 'alertmanager':
802 completion
= self
.add_alertmanager(spec
)
803 elif daemon_type
== 'grafana':
804 completion
= self
.add_grafana(spec
)
805 elif daemon_type
== 'node-exporter':
806 completion
= self
.add_node_exporter(spec
)
807 elif daemon_type
== 'prometheus':
808 completion
= self
.add_prometheus(spec
)
809 elif daemon_type
== 'mds':
810 completion
= self
.add_mds(spec
)
811 elif daemon_type
== 'rgw':
812 completion
= self
.add_rgw(spec
)
813 elif daemon_type
== 'nfs':
814 completion
= self
.add_nfs(spec
)
815 elif daemon_type
== 'iscsi':
816 completion
= self
.add_iscsi(spec
)
818 raise OrchestratorValidationError(f
'unknown daemon type `{daemon_type}`')
820 self
._orchestrator
_wait
([completion
])
821 raise_if_exception(completion
)
822 return HandleCommandResult(stdout
=completion
.result_str())
825 'orch daemon add mds',
826 'name=fs_name,type=CephString '
827 'name=placement,type=CephString,req=false',
828 'Start MDS daemon(s)')
831 placement
: Optional
[str] = None,
832 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
834 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
839 placement
=PlacementSpec
.from_string(placement
),
842 completion
= self
.add_mds(spec
)
843 self
._orchestrator
_wait
([completion
])
844 raise_if_exception(completion
)
845 return HandleCommandResult(stdout
=completion
.result_str())
848 'orch daemon add rgw',
849 'name=realm_name,type=CephString '
850 'name=zone_name,type=CephString '
851 'name=subcluster,type=CephString,req=false '
852 'name=port,type=CephInt,req=false '
853 'name=ssl,type=CephBool,req=false '
854 'name=placement,type=CephString,req=false',
855 'Start RGW daemon(s)')
859 subcluster
: Optional
[str] = None,
860 port
: Optional
[int] = None,
862 placement
: Optional
[str] = None,
863 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
865 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
868 rgw_realm
=realm_name
,
870 subcluster
=subcluster
,
871 rgw_frontend_port
=port
,
873 placement
=PlacementSpec
.from_string(placement
),
876 completion
= self
.add_rgw(spec
)
877 self
._orchestrator
_wait
([completion
])
878 raise_if_exception(completion
)
879 return HandleCommandResult(stdout
=completion
.result_str())
882 'orch daemon add nfs',
883 "name=svc_id,type=CephString "
884 "name=pool,type=CephString "
885 "name=namespace,type=CephString,req=false "
886 'name=placement,type=CephString,req=false',
887 'Start NFS daemon(s)')
891 namespace
: Optional
[str] = None,
892 placement
: Optional
[str] = None,
893 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
895 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
897 spec
= NFSServiceSpec(
901 placement
=PlacementSpec
.from_string(placement
),
904 completion
= self
.add_nfs(spec
)
905 self
._orchestrator
_wait
([completion
])
906 raise_if_exception(completion
)
907 return HandleCommandResult(stdout
=completion
.result_str())
910 'orch daemon add iscsi',
911 'name=pool,type=CephString '
912 'name=api_user,type=CephString '
913 'name=api_password,type=CephString '
914 'name=trusted_ip_list,type=CephString,req=false '
915 'name=placement,type=CephString,req=false',
916 'Start iscsi daemon(s)')
921 trusted_ip_list
: Optional
[str] = None,
922 placement
: Optional
[str] = None,
923 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
925 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
927 spec
= IscsiServiceSpec(
931 api_password
=api_password
,
932 trusted_ip_list
=trusted_ip_list
,
933 placement
=PlacementSpec
.from_string(placement
),
936 completion
= self
.add_iscsi(spec
)
937 self
._orchestrator
_wait
([completion
])
938 raise_if_exception(completion
)
939 return HandleCommandResult(stdout
=completion
.result_str())
943 "name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig "
944 "name=service_name,type=CephString",
945 'Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)')
946 def _service_action(self
, action
, service_name
):
947 completion
= self
.service_action(action
, service_name
)
948 self
._orchestrator
_wait
([completion
])
949 raise_if_exception(completion
)
950 return HandleCommandResult(stdout
=completion
.result_str())
954 "name=action,type=CephChoices,strings=start|stop|restart|reconfig "
955 "name=name,type=CephString",
956 'Start, stop, restart, (redeploy,) or reconfig a specific daemon')
957 def _daemon_action(self
, action
, name
):
959 raise OrchestratorError('%s is not a valid daemon name' % name
)
960 completion
= self
.daemon_action(action
, name
)
961 self
._orchestrator
_wait
([completion
])
962 raise_if_exception(completion
)
963 return HandleCommandResult(stdout
=completion
.result_str())
966 'orch daemon redeploy',
967 "name=name,type=CephString "
968 "name=image,type=CephString,req=false",
969 'Redeploy a daemon (with a specifc image)')
970 def _daemon_action_redeploy(self
, name
, image
):
972 raise OrchestratorError('%s is not a valid daemon name' % name
)
973 completion
= self
.daemon_action("redeploy", name
, image
=image
)
974 self
._orchestrator
_wait
([completion
])
975 raise_if_exception(completion
)
976 return HandleCommandResult(stdout
=completion
.result_str())
980 "name=names,type=CephString,n=N "
981 'name=force,type=CephBool,req=false',
982 'Remove specific daemon(s)')
983 def _daemon_rm(self
, names
, force
=False):
986 raise OrchestratorError('%s is not a valid daemon name' % name
)
987 (daemon_type
) = name
.split('.')[0]
988 if not force
and daemon_type
in ['osd', 'mon', 'prometheus']:
989 raise OrchestratorError('must pass --force to REMOVE daemon with potentially PRECIOUS DATA for %s' % name
)
990 completion
= self
.remove_daemons(names
)
991 self
._orchestrator
_wait
([completion
])
992 raise_if_exception(completion
)
993 return HandleCommandResult(stdout
=completion
.result_str())
997 'name=service_name,type=CephString '
998 'name=force,type=CephBool,req=false',
1000 def _service_rm(self
, service_name
, force
=False):
1001 if service_name
in ['mon', 'mgr'] and not force
:
1002 raise OrchestratorError('The mon and mgr services cannot be removed')
1003 completion
= self
.remove_service(service_name
)
1004 self
._orchestrator
_wait
([completion
])
1005 raise_if_exception(completion
)
1006 return HandleCommandResult(stdout
=completion
.result_str())
1008 @_cli_write_command(
1010 'name=service_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
1011 'name=placement,type=CephString,req=false '
1012 'name=dry_run,type=CephBool,req=false '
1013 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
1014 'name=unmanaged,type=CephBool,req=false',
1015 'Update the size or placement for a service or apply a large yaml spec')
1016 def _apply_misc(self
,
1017 service_type
: Optional
[str] = None,
1018 placement
: Optional
[str] = None,
1019 dry_run
: bool = False,
1020 format
: str = 'plain',
1021 unmanaged
: bool = False,
1022 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1024 ceph orch apply -i <yaml spec> [--dry-run]
1025 ceph orch apply <service_type> <placement> [--unmanaged]
1028 if service_type
or placement
or unmanaged
:
1029 raise OrchestratorValidationError(usage
)
1030 content
: Iterator
= yaml
.safe_load_all(inbuf
)
1031 specs
: List
[Union
[ServiceSpec
, HostSpec
]] = []
1033 spec
= json_to_generic_spec(s
)
1034 if dry_run
and not isinstance(spec
, HostSpec
):
1035 spec
.preview_only
= dry_run
1038 placementspec
= PlacementSpec
.from_string(placement
)
1040 specs
= [ServiceSpec(service_type
, placement
=placementspec
, unmanaged
=unmanaged
, preview_only
=dry_run
)]
1042 completion
= self
.apply(specs
)
1043 self
._orchestrator
_wait
([completion
])
1044 raise_if_exception(completion
)
1045 out
= completion
.result_str()
1047 completion
= self
.plan(specs
)
1048 self
._orchestrator
_wait
([completion
])
1049 raise_if_exception(completion
)
1050 data
= completion
.result
1051 if format
== 'plain':
1052 out
= generate_preview_tables(data
)
1054 out
= to_format(data
, format
, many
=True, cls
=None)
1055 return HandleCommandResult(stdout
=out
)
1057 @_cli_write_command(
1059 'name=fs_name,type=CephString '
1060 'name=placement,type=CephString,req=false '
1061 'name=dry_run,type=CephBool,req=false '
1062 'name=unmanaged,type=CephBool,req=false '
1063 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false',
1064 'Update the number of MDS instances for the given fs_name')
1065 def _apply_mds(self
,
1067 placement
: Optional
[str] = None,
1068 dry_run
: bool = False,
1069 unmanaged
: bool = False,
1070 format
: str = 'plain',
1071 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1073 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
1078 placement
=PlacementSpec
.from_string(placement
),
1079 unmanaged
=unmanaged
,
1080 preview_only
=dry_run
)
1082 completion
= self
.apply_mds(spec
)
1083 self
._orchestrator
_wait
([completion
])
1084 raise_if_exception(completion
)
1085 out
= completion
.result_str()
1087 completion_plan
= self
.plan([spec
])
1088 self
._orchestrator
_wait
([completion_plan
])
1089 raise_if_exception(completion_plan
)
1090 data
= completion_plan
.result
1091 if format
== 'plain':
1092 out
= preview_table_services(data
)
1094 out
= to_format(data
, format
, many
=True, cls
=None)
1095 return HandleCommandResult(stdout
=out
)
1097 @_cli_write_command(
1099 'name=realm_name,type=CephString '
1100 'name=zone_name,type=CephString '
1101 'name=subcluster,type=CephString,req=false '
1102 'name=port,type=CephInt,req=false '
1103 'name=ssl,type=CephBool,req=false '
1104 'name=placement,type=CephString,req=false '
1105 'name=dry_run,type=CephBool,req=false '
1106 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
1107 'name=unmanaged,type=CephBool,req=false',
1108 'Update the number of RGW instances for the given zone')
1109 def _apply_rgw(self
,
1112 subcluster
: Optional
[str] = None,
1113 port
: Optional
[int] = None,
1115 placement
: Optional
[str] = None,
1116 dry_run
: bool = False,
1117 format
: str = 'plain',
1118 unmanaged
: bool = False,
1119 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1121 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
1124 rgw_realm
=realm_name
,
1126 subcluster
=subcluster
,
1127 rgw_frontend_port
=port
,
1129 placement
=PlacementSpec
.from_string(placement
),
1130 unmanaged
=unmanaged
,
1131 preview_only
=dry_run
1134 completion
= self
.apply_rgw(spec
)
1135 self
._orchestrator
_wait
([completion
])
1136 raise_if_exception(completion
)
1137 out
= completion
.result_str()
1139 completion_plan
= self
.plan([spec
])
1140 self
._orchestrator
_wait
([completion_plan
])
1141 raise_if_exception(completion_plan
)
1142 data
= completion_plan
.result
1143 if format
== 'plain':
1144 out
= preview_table_services(data
)
1146 out
= to_format(data
, format
, many
=True, cls
=None)
1147 return HandleCommandResult(stdout
=out
)
1149 @_cli_write_command(
1151 'name=svc_id,type=CephString '
1152 'name=pool,type=CephString '
1153 'name=namespace,type=CephString,req=false '
1154 'name=placement,type=CephString,req=false '
1155 'name=dry_run,type=CephBool,req=false '
1156 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
1157 'name=unmanaged,type=CephBool,req=false',
1158 'Scale an NFS service')
1159 def _apply_nfs(self
,
1162 namespace
: Optional
[str] = None,
1163 placement
: Optional
[str] = None,
1164 format
: str = 'plain',
1165 dry_run
: bool = False,
1166 unmanaged
: bool = False,
1167 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1169 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
1171 spec
= NFSServiceSpec(
1174 namespace
=namespace
,
1175 placement
=PlacementSpec
.from_string(placement
),
1176 unmanaged
=unmanaged
,
1177 preview_only
=dry_run
1180 completion
= self
.apply_nfs(spec
)
1181 self
._orchestrator
_wait
([completion
])
1182 raise_if_exception(completion
)
1183 out
= completion
.result_str()
1185 completion_plan
= self
.plan([spec
])
1186 self
._orchestrator
_wait
([completion_plan
])
1187 raise_if_exception(completion_plan
)
1188 data
= completion_plan
.result
1189 if format
== 'plain':
1190 out
= preview_table_services(data
)
1192 out
= to_format(data
, format
, many
=True, cls
=None)
1193 return HandleCommandResult(stdout
=out
)
1195 @_cli_write_command(
1197 'name=pool,type=CephString '
1198 'name=api_user,type=CephString '
1199 'name=api_password,type=CephString '
1200 'name=trusted_ip_list,type=CephString,req=false '
1201 'name=placement,type=CephString,req=false '
1202 'name=dry_run,type=CephBool,req=false '
1203 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
1204 'name=unmanaged,type=CephBool,req=false',
1205 'Scale an iSCSI service')
1206 def _apply_iscsi(self
,
1210 trusted_ip_list
: Optional
[str] = None,
1211 placement
: Optional
[str] = None,
1212 unmanaged
: bool = False,
1213 dry_run
: bool = False,
1214 format
: str = 'plain',
1215 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1217 raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
1219 spec
= IscsiServiceSpec(
1223 api_password
=api_password
,
1224 trusted_ip_list
=trusted_ip_list
,
1225 placement
=PlacementSpec
.from_string(placement
),
1226 unmanaged
=unmanaged
,
1227 preview_only
=dry_run
1230 completion
= self
.apply_iscsi(spec
)
1231 self
._orchestrator
_wait
([completion
])
1232 raise_if_exception(completion
)
1233 out
= completion
.result_str()
1235 completion_plan
= self
.plan([spec
])
1236 self
._orchestrator
_wait
([completion_plan
])
1237 raise_if_exception(completion_plan
)
1238 data
= completion_plan
.result
1239 if format
== 'plain':
1240 out
= preview_table_services(data
)
1242 out
= to_format(data
, format
, many
=True, cls
=None)
1243 return HandleCommandResult(stdout
=out
)
1245 @_cli_write_command(
1247 "name=module_name,type=CephString,req=true",
1248 'Select orchestrator module backend')
1249 def _set_backend(self
, module_name
):
1251 We implement a setter command instead of just having the user
1252 modify the setting directly, so that we can validate they're setting
1253 it to a module that really exists and is enabled.
1255 There isn't a mechanism for ensuring they don't *disable* the module
1256 later, but this is better than nothing.
1258 mgr_map
= self
.get("mgr_map")
1260 if module_name
is None or module_name
== "":
1261 self
.set_module_option("orchestrator", None)
1262 return HandleCommandResult()
1264 for module
in mgr_map
['available_modules']:
1265 if module
['name'] != module_name
:
1268 if not module
['can_run']:
1271 enabled
= module
['name'] in mgr_map
['modules']
1273 return HandleCommandResult(-errno
.EINVAL
,
1274 stderr
="Module '{module_name}' is not enabled. \n Run "
1275 "`ceph mgr module enable {module_name}` "
1276 "to enable.".format(module_name
=module_name
))
1279 is_orchestrator
= self
.remote(module_name
,
1280 "is_orchestrator_module")
1282 is_orchestrator
= False
1284 if not is_orchestrator
:
1285 return HandleCommandResult(-errno
.EINVAL
,
1286 stderr
="'{0}' is not an orchestrator module".format(module_name
))
1288 self
.set_module_option("orchestrator", module_name
)
1290 return HandleCommandResult()
1292 return HandleCommandResult(-errno
.EINVAL
, stderr
="Module '{0}' not found".format(module_name
))
1294 @_cli_write_command(
1296 desc
='Pause orchestrator background work')
1299 return HandleCommandResult()
1301 @_cli_write_command(
1303 desc
='Resume orchestrator background work (if paused)')
1306 return HandleCommandResult()
1308 @_cli_write_command(
1310 desc
='cancels ongoing operations')
1313 ProgressReferences might get stuck. Let's unstuck them.
1315 self
.cancel_completions()
1316 return HandleCommandResult()
1320 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false',
1321 desc
='Report configured backend and its status')
1322 def _status(self
, format
='plain'):
1323 o
= self
._select
_orchestrator
()
1325 raise NoOrchestrator()
1327 avail
, why
= self
.available()
1331 if avail
is not None:
1332 result
['available'] = avail
1334 result
['reason'] = why
1336 if format
!= 'plain':
1337 output
= to_format(result
, format
, many
=False, cls
=None)
1339 output
= "Backend: {0}".format(result
['backend'])
1340 if 'available' in result
:
1341 output
+= "\nAvailable: {0}".format(result
['available'])
1342 if 'reason' in result
:
1343 output
+= ' ({0})'.format(result
['reason'])
1344 return HandleCommandResult(stdout
=output
)
1346 def self_test(self
):
1347 old_orch
= self
._select
_orchestrator
()
1348 self
._set
_backend
('')
1349 assert self
._select
_orchestrator
() is None
1350 self
._set
_backend
(old_orch
)
1352 e1
= self
.remote('selftest', 'remote_from_orchestrator_cli_self_test', "ZeroDivisionError")
1354 raise_if_exception(e1
)
1356 except ZeroDivisionError as e
:
1357 assert e
.args
== ('hello, world',)
1359 e2
= self
.remote('selftest', 'remote_from_orchestrator_cli_self_test', "OrchestratorError")
1361 raise_if_exception(e2
)
1363 except OrchestratorError
as e
:
1364 assert e
.args
== ('hello, world',)
1366 c
= TrivialReadCompletion(result
=True)
1370 def _upgrade_check_image_name(image
, ceph_version
):
1372 >>> OrchestratorCli._upgrade_check_image_name('v15.2.0', None)
1373 Traceback (most recent call last):
1374 orchestrator._interface.OrchestratorValidationError: Error: unable to pull image name `v15.2.0`.
1375 Maybe you meant `--ceph-version 15.2.0`?
1378 if image
and re
.match(r
'^v?\d+\.\d+\.\d+$', image
) and ceph_version
is None:
1379 ver
= image
[1:] if image
.startswith('v') else image
1380 s
= f
"Error: unable to pull image name `{image}`.\n" \
1381 f
" Maybe you meant `--ceph-version {ver}`?"
1382 raise OrchestratorValidationError(s
)
1384 @_cli_write_command(
1385 'orch upgrade check',
1386 'name=image,type=CephString,req=false '
1387 'name=ceph_version,type=CephString,req=false',
1388 desc
='Check service versions vs available and target containers')
1389 def _upgrade_check(self
, image
=None, ceph_version
=None):
1390 self
._upgrade
_check
_image
_name
(image
, ceph_version
)
1391 completion
= self
.upgrade_check(image
=image
, version
=ceph_version
)
1392 self
._orchestrator
_wait
([completion
])
1393 raise_if_exception(completion
)
1394 return HandleCommandResult(stdout
=completion
.result_str())
1396 @_cli_write_command(
1397 'orch upgrade status',
1398 desc
='Check service versions vs available and target containers')
1399 def _upgrade_status(self
):
1400 completion
= self
.upgrade_status()
1401 self
._orchestrator
_wait
([completion
])
1402 raise_if_exception(completion
)
1404 'target_image': completion
.result
.target_image
,
1405 'in_progress': completion
.result
.in_progress
,
1406 'services_complete': completion
.result
.services_complete
,
1407 'message': completion
.result
.message
,
1409 out
= json
.dumps(r
, indent
=4)
1410 return HandleCommandResult(stdout
=out
)
1412 @_cli_write_command(
1413 'orch upgrade start',
1414 'name=image,type=CephString,req=false '
1415 'name=ceph_version,type=CephString,req=false',
1416 desc
='Initiate upgrade')
1417 def _upgrade_start(self
, image
=None, ceph_version
=None):
1418 self
._upgrade
_check
_image
_name
(image
, ceph_version
)
1419 completion
= self
.upgrade_start(image
, ceph_version
)
1420 self
._orchestrator
_wait
([completion
])
1421 raise_if_exception(completion
)
1422 return HandleCommandResult(stdout
=completion
.result_str())
1424 @_cli_write_command(
1425 'orch upgrade pause',
1426 desc
='Pause an in-progress upgrade')
1427 def _upgrade_pause(self
):
1428 completion
= self
.upgrade_pause()
1429 self
._orchestrator
_wait
([completion
])
1430 raise_if_exception(completion
)
1431 return HandleCommandResult(stdout
=completion
.result_str())
1433 @_cli_write_command(
1434 'orch upgrade resume',
1435 desc
='Resume paused upgrade')
1436 def _upgrade_resume(self
):
1437 completion
= self
.upgrade_resume()
1438 self
._orchestrator
_wait
([completion
])
1439 raise_if_exception(completion
)
1440 return HandleCommandResult(stdout
=completion
.result_str())
1442 @_cli_write_command(
1443 'orch upgrade stop',
1444 desc
='Stop an in-progress upgrade')
1445 def _upgrade_stop(self
):
1446 completion
= self
.upgrade_stop()
1447 self
._orchestrator
_wait
([completion
])
1448 raise_if_exception(completion
)
1449 return HandleCommandResult(stdout
=completion
.result_str())