8 from ceph
.deployment
.inventory
import Device
9 from prettytable
import PrettyTable
11 from mgr_util
import format_bytes
, to_pretty_timedelta
14 from typing
import List
, Set
, Optional
, Dict
, Iterator
16 pass # just for type checking.
19 from ceph
.deployment
.drive_group
import DriveGroupSpec
, DeviceSelection
21 from ceph
.deployment
.service_spec
import PlacementSpec
, ServiceSpec
22 from mgr_module
import MgrModule
, HandleCommandResult
24 from ._interface
import OrchestratorClientMixin
, DeviceLightLoc
, _cli_read_command
, \
25 raise_if_exception
, _cli_write_command
, TrivialReadCompletion
, OrchestratorError
, \
26 NoOrchestrator
, OrchestratorValidationError
, NFSServiceSpec
, \
27 RGWSpec
, InventoryFilter
, InventoryHost
, HostSpec
, CLICommandMeta
29 def nice_delta(now
, t
, suffix
=''):
31 return to_pretty_timedelta(now
- t
) + suffix
35 @six.add_metaclass(CLICommandMeta
)
36 class OrchestratorCli(OrchestratorClientMixin
, MgrModule
):
39 'name': 'orchestrator',
42 'desc': 'Orchestrator backend',
43 'enum_allowed': ['cephadm', 'rook',
48 NATIVE_OPTIONS
= [] # type: List[dict]
50 def __init__(self
, *args
, **kwargs
):
51 super(OrchestratorCli
, self
).__init
__(*args
, **kwargs
)
52 self
.ident
= set() # type: Set[str]
53 self
.fault
= set() # type: Set[str]
55 self
._refresh
_health
()
58 active
= self
.get_store('active_devices')
60 decoded
= json
.loads(active
)
61 self
.ident
= set(decoded
.get('ident', []))
62 self
.fault
= set(decoded
.get('fault', []))
63 self
.log
.debug('ident {}, fault {}'.format(self
.ident
, self
.fault
))
66 encoded
= json
.dumps({
67 'ident': list(self
.ident
),
68 'fault': list(self
.fault
),
70 self
.set_store('active_devices', encoded
)
72 def _refresh_health(self
):
75 h
['DEVICE_IDENT_ON'] = {
76 'severity': 'warning',
77 'summary': '%d devices have ident light turned on' % len(
79 'detail': ['{} ident light enabled'.format(d
) for d
in self
.ident
]
82 h
['DEVICE_FAULT_ON'] = {
83 'severity': 'warning',
84 'summary': '%d devices have fault light turned on' % len(
86 'detail': ['{} fault light enabled'.format(d
) for d
in self
.ident
]
88 self
.set_health_checks(h
)
90 def _get_device_locations(self
, dev_id
):
91 # type: (str) -> List[DeviceLightLoc]
92 locs
= [d
['location'] for d
in self
.get('devices')['devices'] if d
['devid'] == dev_id
]
93 return [DeviceLightLoc(**l
) for l
in sum(locs
, [])]
96 prefix
='device ls-lights',
97 desc
='List currently active device indicator lights')
99 return HandleCommandResult(
101 'ident': list(self
.ident
),
102 'fault': list(self
.fault
)
103 }, indent
=4, sort_keys
=True))
105 def light_on(self
, fault_ident
, devid
):
106 # type: (str, str) -> HandleCommandResult
107 assert fault_ident
in ("fault", "ident")
108 locs
= self
._get
_device
_locations
(devid
)
110 return HandleCommandResult(stderr
='device {} not found'.format(devid
),
111 retval
=-errno
.ENOENT
)
113 getattr(self
, fault_ident
).add(devid
)
115 self
._refresh
_health
()
116 completion
= self
.blink_device_light(fault_ident
, True, locs
)
117 self
._orchestrator
_wait
([completion
])
118 return HandleCommandResult(stdout
=str(completion
.result
))
120 def light_off(self
, fault_ident
, devid
, force
):
121 # type: (str, str, bool) -> HandleCommandResult
122 assert fault_ident
in ("fault", "ident")
123 locs
= self
._get
_device
_locations
(devid
)
125 return HandleCommandResult(stderr
='device {} not found'.format(devid
),
126 retval
=-errno
.ENOENT
)
129 completion
= self
.blink_device_light(fault_ident
, False, locs
)
130 self
._orchestrator
_wait
([completion
])
132 if devid
in getattr(self
, fault_ident
):
133 getattr(self
, fault_ident
).remove(devid
)
135 self
._refresh
_health
()
136 return HandleCommandResult(stdout
=str(completion
.result
))
139 # There are several reasons the try: block might fail:
140 # 1. the device no longer exist
141 # 2. the device is no longer known to Ceph
142 # 3. the host is not reachable
143 if force
and devid
in getattr(self
, fault_ident
):
144 getattr(self
, fault_ident
).remove(devid
)
146 self
._refresh
_health
()
150 prefix
='device light',
151 cmd_args
='name=enable,type=CephChoices,strings=on|off '
152 'name=devid,type=CephString '
153 'name=light_type,type=CephChoices,strings=ident|fault,req=false '
154 'name=force,type=CephBool,req=false',
155 desc
='Enable or disable the device light. Default type is `ident`\n'
156 'Usage: device light (on|off) <devid> [ident|fault] [--force]')
157 def _device_light(self
, enable
, devid
, light_type
=None, force
=False):
158 # type: (str, str, Optional[str], bool) -> HandleCommandResult
159 light_type
= light_type
or 'ident'
162 return self
.light_on(light_type
, devid
)
164 return self
.light_off(light_type
, devid
, force
)
166 def _select_orchestrator(self
):
167 return self
.get_module_option("orchestrator")
171 'name=hostname,type=CephString,req=true '
172 'name=addr,type=CephString,req=false '
173 'name=labels,type=CephString,n=N,req=false',
175 def _add_host(self
, hostname
:str, addr
: Optional
[str]=None, labels
: Optional
[List
[str]]=None):
176 s
= HostSpec(hostname
=hostname
, addr
=addr
, labels
=labels
)
177 completion
= self
.add_host(s
)
178 self
._orchestrator
_wait
([completion
])
179 raise_if_exception(completion
)
180 return HandleCommandResult(stdout
=completion
.result_str())
184 "name=hostname,type=CephString,req=true",
186 def _remove_host(self
, hostname
):
187 completion
= self
.remove_host(hostname
)
188 self
._orchestrator
_wait
([completion
])
189 raise_if_exception(completion
)
190 return HandleCommandResult(stdout
=completion
.result_str())
193 'orch host set-addr',
194 'name=hostname,type=CephString '
195 'name=addr,type=CephString',
196 'Update a host address')
197 def _update_set_addr(self
, hostname
, addr
):
198 completion
= self
.update_host_addr(hostname
, addr
)
199 self
._orchestrator
_wait
([completion
])
200 raise_if_exception(completion
)
201 return HandleCommandResult(stdout
=completion
.result_str())
205 'name=format,type=CephChoices,strings=json|plain,req=false',
207 def _get_hosts(self
, format
='plain'):
208 completion
= self
.get_hosts()
209 self
._orchestrator
_wait
([completion
])
210 raise_if_exception(completion
)
212 hosts
= [host
.to_json()
213 for host
in completion
.result
]
214 output
= json
.dumps(hosts
, sort_keys
=True)
217 ['HOST', 'ADDR', 'LABELS', 'STATUS'],
220 table
.left_padding_width
= 0
221 table
.right_padding_width
= 2
222 for host
in sorted(completion
.result
, key
=lambda h
: h
.hostname
):
223 table
.add_row((host
.hostname
, host
.addr
, ' '.join(host
.labels
), host
.status
))
224 output
= table
.get_string()
225 return HandleCommandResult(stdout
=output
)
228 'orch host label add',
229 'name=hostname,type=CephString '
230 'name=label,type=CephString',
232 def _host_label_add(self
, hostname
, label
):
233 completion
= self
.add_host_label(hostname
, label
)
234 self
._orchestrator
_wait
([completion
])
235 raise_if_exception(completion
)
236 return HandleCommandResult(stdout
=completion
.result_str())
239 'orch host label rm',
240 'name=hostname,type=CephString '
241 'name=label,type=CephString',
242 'Remove a host label')
243 def _host_label_rm(self
, hostname
, label
):
244 completion
= self
.remove_host_label(hostname
, label
)
245 self
._orchestrator
_wait
([completion
])
246 raise_if_exception(completion
)
247 return HandleCommandResult(stdout
=completion
.result_str())
251 "name=hostname,type=CephString,n=N,req=false "
252 "name=format,type=CephChoices,strings=json|plain,req=false "
253 "name=refresh,type=CephBool,req=false",
254 'List devices on a host')
255 def _list_devices(self
, hostname
=None, format
='plain', refresh
=False):
256 # type: (Optional[List[str]], str, bool) -> HandleCommandResult
258 Provide information about storage devices present in cluster hosts
260 Note: this does not have to be completely synchronous. Slightly out of
261 date hardware inventory is fine as long as hardware ultimately appears
262 in the output of this command.
264 nf
= InventoryFilter(hosts
=hostname
) if hostname
else None
266 completion
= self
.get_inventory(host_filter
=nf
, refresh
=refresh
)
268 self
._orchestrator
_wait
([completion
])
269 raise_if_exception(completion
)
272 data
= [n
.to_json() for n
in completion
.result
]
273 return HandleCommandResult(stdout
=json
.dumps(data
))
278 ['HOST', 'PATH', 'TYPE', 'SIZE', 'DEVICE', 'AVAIL',
282 table
._align
['SIZE'] = 'r'
283 table
.left_padding_width
= 0
284 table
.right_padding_width
= 2
285 for host_
in completion
.result
: # type: InventoryHost
286 for d
in host_
.devices
.devices
: # type: Device
291 d
.human_readable_type
,
292 format_bytes(d
.sys_api
.get('size', 0), 5),
295 ', '.join(d
.rejected_reasons
)
298 out
.append(table
.get_string())
299 return HandleCommandResult(stdout
='\n'.join(out
))
303 'name=hostname,type=CephString '
304 'name=path,type=CephString '
305 'name=force,type=CephBool,req=false',
306 'Zap (erase!) a device so it can be re-used')
307 def _zap_device(self
, hostname
, path
, force
=False):
309 raise OrchestratorError('must pass --force to PERMANENTLY ERASE DEVICE DATA')
310 completion
= self
.zap_device(hostname
, path
)
311 self
._orchestrator
_wait
([completion
])
312 raise_if_exception(completion
)
313 return HandleCommandResult(stdout
=completion
.result_str())
317 "name=service_type,type=CephString,req=false "
318 "name=service_name,type=CephString,req=false "
319 "name=format,type=CephChoices,strings=json|plain,req=false "
320 "name=refresh,type=CephBool,req=false",
321 'List services known to orchestrator')
322 def _list_services(self
, host
=None, service_type
=None, service_name
=None, format
='plain', refresh
=False):
323 completion
= self
.describe_service(service_type
,
326 self
._orchestrator
_wait
([completion
])
327 raise_if_exception(completion
)
328 services
= completion
.result
331 return '<unknown>' if s
is None else s
333 # Sort the list for display
334 services
.sort(key
=lambda s
: (ukn(s
.service_name
)))
336 if len(services
) == 0:
337 return HandleCommandResult(stdout
="No services reported")
338 elif format
== 'json':
339 data
= [s
.to_json() for s
in services
]
340 return HandleCommandResult(stdout
=json
.dumps(data
))
342 now
= datetime
.datetime
.utcnow()
344 ['NAME', 'RUNNING', 'REFRESHED', 'AGE',
346 'IMAGE NAME', 'IMAGE ID',
349 table
.align
['NAME'] = 'l'
350 table
.align
['RUNNING'] = 'r'
351 table
.align
['REFRESHED'] = 'l'
352 table
.align
['AGE'] = 'l'
353 table
.align
['IMAGE NAME'] = 'l'
354 table
.align
['IMAGE ID'] = 'l'
355 table
.align
['PLACEMENT'] = 'l'
356 table
.left_padding_width
= 0
357 table
.right_padding_width
= 2
358 for s
in sorted(services
, key
=lambda s
: s
.service_name
):
361 elif s
.spec
.unmanaged
:
364 pl
= s
.spec
.placement
.pretty_str()
367 '%d/%d' % (s
.running
, s
.size
),
368 nice_delta(now
, s
.last_refresh
, ' ago'),
369 nice_delta(now
, s
.created
),
371 ukn(s
.container_image_name
),
372 ukn(s
.container_image_id
)[0:12],
375 return HandleCommandResult(stdout
=table
.get_string())
379 "name=hostname,type=CephString,req=false "
380 "name=service_name,type=CephString,req=false "
381 "name=daemon_type,type=CephString,req=false "
382 "name=daemon_id,type=CephString,req=false "
383 "name=format,type=CephChoices,strings=json|plain,req=false "
384 "name=refresh,type=CephBool,req=false",
385 'List daemons known to orchestrator')
386 def _list_daemons(self
, hostname
=None, service_name
=None, daemon_type
=None, daemon_id
=None, format
='plain', refresh
=False):
387 completion
= self
.list_daemons(service_name
,
392 self
._orchestrator
_wait
([completion
])
393 raise_if_exception(completion
)
394 daemons
= completion
.result
397 return '<unknown>' if s
is None else s
398 # Sort the list for display
399 daemons
.sort(key
=lambda s
: (ukn(s
.daemon_type
), ukn(s
.hostname
), ukn(s
.daemon_id
)))
401 if len(daemons
) == 0:
402 return HandleCommandResult(stdout
="No daemons reported")
403 elif format
== 'json':
404 data
= [s
.to_json() for s
in daemons
]
405 return HandleCommandResult(stdout
=json
.dumps(data
))
407 now
= datetime
.datetime
.utcnow()
409 ['NAME', 'HOST', 'STATUS', 'REFRESHED', 'AGE',
410 'VERSION', 'IMAGE NAME', 'IMAGE ID', 'CONTAINER ID'],
413 table
.left_padding_width
= 0
414 table
.right_padding_width
= 2
415 for s
in sorted(daemons
, key
=lambda s
: s
.name()):
422 if s
.status
== 1 and s
.started
:
423 status
+= ' (%s)' % to_pretty_timedelta(now
- s
.started
)
429 nice_delta(now
, s
.last_refresh
, ' ago'),
430 nice_delta(now
, s
.created
),
432 ukn(s
.container_image_name
),
433 ukn(s
.container_image_id
)[0:12],
434 ukn(s
.container_id
)))
436 return HandleCommandResult(stdout
=table
.get_string())
440 'name=all_available_devices,type=CephBool,req=false',
441 'Create OSD daemon(s) using a drive group spec')
442 def _apply_osd(self
, all_available_devices
=False, inbuf
=None):
443 # type: (bool, Optional[str]) -> HandleCommandResult
444 """Apply DriveGroupSpecs to create OSDs"""
447 ceph orch apply osd -i <json_file/yaml_file>
448 ceph orch apply osd --use-all-devices
450 if not inbuf
and not all_available_devices
:
451 return HandleCommandResult(-errno
.EINVAL
, stderr
=usage
)
453 if all_available_devices
:
454 raise OrchestratorError('--all-available-devices cannot be combined with an osd spec')
456 drivegroups
= yaml
.load_all(inbuf
)
457 dg_specs
= [ServiceSpec
.from_json(dg
) for dg
in drivegroups
]
458 except ValueError as e
:
459 msg
= 'Failed to read JSON/YAML input: {}'.format(str(e
)) + usage
460 return HandleCommandResult(-errno
.EINVAL
, stderr
=msg
)
464 service_id
='all-available-devices',
465 placement
=PlacementSpec(host_pattern
='*'),
466 data_devices
=DeviceSelection(all
=True),
470 completion
= self
.apply_drivegroups(dg_specs
)
471 self
._orchestrator
_wait
([completion
])
472 raise_if_exception(completion
)
473 return HandleCommandResult(stdout
=completion
.result_str())
476 'orch daemon add osd',
477 "name=svc_arg,type=CephString,req=false",
478 'Create an OSD service. Either --svc_arg=host:drives')
479 def _daemon_add_osd(self
, svc_arg
=None):
480 # type: (Optional[str]) -> HandleCommandResult
481 """Create one or more OSDs"""
485 ceph orch daemon add osd host:device1,device2,...
488 return HandleCommandResult(-errno
.EINVAL
, stderr
=usage
)
490 host_name
, block_device
= svc_arg
.split(":")
491 block_devices
= block_device
.split(',')
492 devs
= DeviceSelection(paths
=block_devices
)
493 drive_group
= DriveGroupSpec(placement
=PlacementSpec(host_pattern
=host_name
), data_devices
=devs
)
494 except (TypeError, KeyError, ValueError):
495 msg
= "Invalid host:device spec: '{}'".format(svc_arg
) + usage
496 return HandleCommandResult(-errno
.EINVAL
, stderr
=msg
)
498 completion
= self
.create_osds(drive_group
)
499 self
._orchestrator
_wait
([completion
])
500 raise_if_exception(completion
)
501 return HandleCommandResult(stdout
=completion
.result_str())
505 "name=svc_id,type=CephString,n=N "
506 "name=replace,type=CephBool,req=false "
507 "name=force,type=CephBool,req=false",
508 'Remove OSD services')
509 def _osd_rm(self
, svc_id
: List
[str],
510 replace
: bool = False,
511 force
: bool = False) -> HandleCommandResult
:
512 completion
= self
.remove_osds(svc_id
, replace
, force
)
513 self
._orchestrator
_wait
([completion
])
514 raise_if_exception(completion
)
515 return HandleCommandResult(stdout
=completion
.result_str())
518 'orch osd rm status',
519 desc
='status of OSD removal operation')
520 def _osd_rm_status(self
) -> HandleCommandResult
:
521 completion
= self
.remove_osds_status()
522 self
._orchestrator
_wait
([completion
])
523 raise_if_exception(completion
)
524 report
= completion
.result
526 return HandleCommandResult(stdout
="No OSD remove/replace operations reported")
528 ['NAME', 'HOST', 'PGS', 'STARTED_AT'],
531 table
.left_padding_width
= 0
532 table
.right_padding_width
= 1
533 # TODO: re-add sorted and sort by pg_count
535 table
.add_row((osd
.fullname
, osd
.nodename
, osd
.pg_count_str
, osd
.started_at
))
537 return HandleCommandResult(stdout
=table
.get_string())
541 'name=daemon_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
542 'name=placement,type=CephString,req=false',
544 def _daemon_add_misc(self
, daemon_type
=None, placement
=None, inbuf
=None):
546 ceph orch daemon add -i <json_file>
547 ceph orch daemon add {daemon_type or '<daemon_type>'} <placement>"""
549 if daemon_type
or placement
:
550 raise OrchestratorValidationError(usage
)
551 spec
= ServiceSpec
.from_json(yaml
.safe_load(inbuf
))
553 placement
= PlacementSpec
.from_string(placement
)
556 spec
= ServiceSpec(daemon_type
, placement
=placement
)
558 if daemon_type
== 'mon':
559 completion
= self
.add_mon(spec
)
560 elif daemon_type
== 'mgr':
561 completion
= self
.add_mgr(spec
)
562 elif daemon_type
== 'rbd-mirror':
563 completion
= self
.add_rbd_mirror(spec
)
564 elif daemon_type
== 'crash':
565 completion
= self
.add_crash(spec
)
566 elif daemon_type
== 'alertmanager':
567 completion
= self
.add_alertmanager(spec
)
568 elif daemon_type
== 'grafana':
569 completion
= self
.add_grafana(spec
)
570 elif daemon_type
== 'node-exporter':
571 completion
= self
.add_node_exporter(spec
)
572 elif daemon_type
== 'prometheus':
573 completion
= self
.add_prometheus(spec
)
575 raise OrchestratorValidationError(f
'unknown daemon type `{daemon_type}`')
577 self
._orchestrator
_wait
([completion
])
578 raise_if_exception(completion
)
579 return HandleCommandResult(stdout
=completion
.result_str())
582 'orch daemon add mds',
583 'name=fs_name,type=CephString '
584 'name=placement,type=CephString,req=false',
585 'Start MDS daemon(s)')
586 def _mds_add(self
, fs_name
, placement
=None):
589 placement
=PlacementSpec
.from_string(placement
),
591 completion
= self
.add_mds(spec
)
592 self
._orchestrator
_wait
([completion
])
593 raise_if_exception(completion
)
594 return HandleCommandResult(stdout
=completion
.result_str())
597 'orch daemon add rgw',
598 'name=realm_name,type=CephString '
599 'name=zone_name,type=CephString '
600 'name=placement,type=CephString,req=false',
601 'Start RGW daemon(s)')
602 def _rgw_add(self
, realm_name
, zone_name
, placement
=None, inbuf
=None):
605 ceph orch daemon rgw add -i <json_file>
606 ceph orch daemon rgw add <realm_name> <zone_name>
610 rgw_spec
= RGWSpec
.from_json(json
.loads(inbuf
))
611 except ValueError as e
:
612 msg
= 'Failed to read JSON input: {}'.format(str(e
)) + usage
613 return HandleCommandResult(-errno
.EINVAL
, stderr
=msg
)
615 rgw_realm
=realm_name
,
617 placement
=PlacementSpec
.from_string(placement
),
620 completion
= self
.add_rgw(rgw_spec
)
621 self
._orchestrator
_wait
([completion
])
622 raise_if_exception(completion
)
623 return HandleCommandResult(stdout
=completion
.result_str())
626 'orch daemon add nfs',
627 "name=svc_arg,type=CephString "
628 "name=pool,type=CephString "
629 "name=namespace,type=CephString,req=false "
630 'name=placement,type=CephString,req=false',
631 'Start NFS daemon(s)')
632 def _nfs_add(self
, svc_arg
, pool
, namespace
=None, placement
=None):
633 spec
= NFSServiceSpec(
637 placement
=PlacementSpec
.from_string(placement
),
640 completion
= self
.add_nfs(spec
)
641 self
._orchestrator
_wait
([completion
])
642 raise_if_exception(completion
)
643 return HandleCommandResult(stdout
=completion
.result_str())
647 "name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig "
648 "name=service_name,type=CephString",
649 'Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)')
650 def _service_action(self
, action
, service_name
):
651 completion
= self
.service_action(action
, service_name
)
652 self
._orchestrator
_wait
([completion
])
653 raise_if_exception(completion
)
654 return HandleCommandResult(stdout
=completion
.result_str())
658 "name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig "
659 "name=name,type=CephString",
660 'Start, stop, restart, redeploy, or reconfig a specific daemon')
661 def _daemon_action(self
, action
, name
):
663 raise OrchestratorError('%s is not a valid daemon name' % name
)
664 (daemon_type
, daemon_id
) = name
.split('.', 1)
665 completion
= self
.daemon_action(action
, daemon_type
, daemon_id
)
666 self
._orchestrator
_wait
([completion
])
667 raise_if_exception(completion
)
668 return HandleCommandResult(stdout
=completion
.result_str())
672 "name=names,type=CephString,n=N "
673 'name=force,type=CephBool,req=false',
674 'Remove specific daemon(s)')
675 def _daemon_rm(self
, names
, force
=False):
678 raise OrchestratorError('%s is not a valid daemon name' % name
)
679 (daemon_type
) = name
.split('.')[0]
680 if not force
and daemon_type
in ['osd', 'mon', 'prometheus']:
681 raise OrchestratorError('must pass --force to REMOVE daemon with potentially PRECIOUS DATA for %s' % name
)
682 completion
= self
.remove_daemons(names
)
683 self
._orchestrator
_wait
([completion
])
684 raise_if_exception(completion
)
685 return HandleCommandResult(stdout
=completion
.result_str())
689 'name=service_name,type=CephString '
690 'name=force,type=CephBool,req=false',
692 def _service_rm(self
, service_name
, force
=False):
693 if service_name
in ['mon', 'mgr'] and not force
:
694 raise OrchestratorError('The mon and mgr services cannot be removed')
695 completion
= self
.remove_service(service_name
)
696 self
._orchestrator
_wait
([completion
])
697 raise_if_exception(completion
)
698 return HandleCommandResult(stdout
=completion
.result_str())
702 'name=service_name,type=CephString,req=false',
703 desc
='List all Service specs')
704 def _get_service_specs(self
, service_name
=None):
705 completion
= self
.list_specs(service_name
=service_name
)
706 self
._orchestrator
_wait
([completion
])
707 raise_if_exception(completion
)
708 specs
= completion
.result
709 return HandleCommandResult(stdout
=yaml
.safe_dump_all(specs
))
713 'name=service_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
714 'name=placement,type=CephString,req=false '
715 'name=unmanaged,type=CephBool,req=false',
716 'Update the size or placement for a service or apply a large yaml spec')
717 def _apply_misc(self
, service_type
=None, placement
=None, unmanaged
=False, inbuf
=None):
719 ceph orch apply -i <yaml spec>
720 ceph orch apply <service_type> <placement> [--unmanaged]
723 if service_type
or placement
or unmanaged
:
724 raise OrchestratorValidationError(usage
)
725 content
: Iterator
= yaml
.load_all(inbuf
)
726 specs
= [ServiceSpec
.from_json(s
) for s
in content
]
728 placement
= PlacementSpec
.from_string(placement
)
731 specs
= [ServiceSpec(service_type
, placement
=placement
, unmanaged
=unmanaged
)]
733 completion
= self
.apply(specs
)
734 self
._orchestrator
_wait
([completion
])
735 raise_if_exception(completion
)
736 return HandleCommandResult(stdout
=completion
.result_str())
740 'name=fs_name,type=CephString '
741 'name=placement,type=CephString,req=false '
742 'name=unmanaged,type=CephBool,req=false',
743 'Update the number of MDS instances for the given fs_name')
744 def _apply_mds(self
, fs_name
, placement
=None, unmanaged
=False):
745 placement
= PlacementSpec
.from_string(placement
)
751 completion
= self
.apply_mds(spec
)
752 self
._orchestrator
_wait
([completion
])
753 raise_if_exception(completion
)
754 return HandleCommandResult(stdout
=completion
.result_str())
758 'name=realm_name,type=CephString '
759 'name=zone_name,type=CephString '
760 'name=subcluster,type=CephString,req=false '
761 'name=port,type=CephInt,req=false '
762 'name=ssl,type=CephBool,req=false '
763 'name=placement,type=CephString,req=false '
764 'name=unmanaged,type=CephBool,req=false',
765 'Update the number of RGW instances for the given zone')
766 def _apply_rgw(self
, zone_name
, realm_name
,
773 rgw_realm
=realm_name
,
775 subcluster
=subcluster
,
776 placement
=PlacementSpec
.from_string(placement
),
778 rgw_frontend_port
=port
,
781 completion
= self
.apply_rgw(spec
)
782 self
._orchestrator
_wait
([completion
])
783 raise_if_exception(completion
)
784 return HandleCommandResult(stdout
=completion
.result_str())
788 'name=svc_id,type=CephString '
789 'name=pool,type=CephString '
790 'name=namespace,type=CephString,req=false '
791 'name=placement,type=CephString,req=false '
792 'name=unmanaged,type=CephBool,req=false',
793 'Scale an NFS service')
794 def _apply_nfs(self
, svc_id
, pool
, namespace
=None, placement
=None, unmanaged
=False):
795 spec
= NFSServiceSpec(
799 placement
=PlacementSpec
.from_string(placement
),
802 completion
= self
.apply_nfs(spec
)
803 self
._orchestrator
_wait
([completion
])
804 return HandleCommandResult(stdout
=completion
.result_str())
808 "name=module_name,type=CephString,req=true",
809 'Select orchestrator module backend')
810 def _set_backend(self
, module_name
):
812 We implement a setter command instead of just having the user
813 modify the setting directly, so that we can validate they're setting
814 it to a module that really exists and is enabled.
816 There isn't a mechanism for ensuring they don't *disable* the module
817 later, but this is better than nothing.
819 mgr_map
= self
.get("mgr_map")
821 if module_name
is None or module_name
== "":
822 self
.set_module_option("orchestrator", None)
823 return HandleCommandResult()
825 for module
in mgr_map
['available_modules']:
826 if module
['name'] != module_name
:
829 if not module
['can_run']:
832 enabled
= module
['name'] in mgr_map
['modules']
834 return HandleCommandResult(-errno
.EINVAL
,
835 stderr
="Module '{module_name}' is not enabled. \n Run "
836 "`ceph mgr module enable {module_name}` "
837 "to enable.".format(module_name
=module_name
))
840 is_orchestrator
= self
.remote(module_name
,
841 "is_orchestrator_module")
843 is_orchestrator
= False
845 if not is_orchestrator
:
846 return HandleCommandResult(-errno
.EINVAL
,
847 stderr
="'{0}' is not an orchestrator module".format(module_name
))
849 self
.set_module_option("orchestrator", module_name
)
851 return HandleCommandResult()
853 return HandleCommandResult(-errno
.EINVAL
, stderr
="Module '{0}' not found".format(module_name
))
857 desc
='Pause orchestrator background work')
860 return HandleCommandResult()
864 desc
='Resume orchestrator background work (if paused)')
867 return HandleCommandResult()
871 desc
='cancels ongoing operations')
874 ProgressReferences might get stuck. Let's unstuck them.
876 self
.cancel_completions()
877 return HandleCommandResult()
881 desc
='Report configured backend and its status')
883 o
= self
._select
_orchestrator
()
885 raise NoOrchestrator()
887 avail
, why
= self
.available()
889 # The module does not report its availability
890 return HandleCommandResult(stdout
="Backend: {0}".format(o
))
892 return HandleCommandResult(stdout
="Backend: {0}\nAvailable: {1}{2}".format(
894 " ({0})".format(why
) if not avail
else ""
898 old_orch
= self
._select
_orchestrator
()
899 self
._set
_backend
('')
900 assert self
._select
_orchestrator
() is None
901 self
._set
_backend
(old_orch
)
903 e1
= self
.remote('selftest', 'remote_from_orchestrator_cli_self_test', "ZeroDivisionError")
905 raise_if_exception(e1
)
907 except ZeroDivisionError as e
:
908 assert e
.args
== ('hello', 'world')
910 e2
= self
.remote('selftest', 'remote_from_orchestrator_cli_self_test', "OrchestratorError")
912 raise_if_exception(e2
)
914 except OrchestratorError
as e
:
915 assert e
.args
== ('hello', 'world')
917 c
= TrivialReadCompletion(result
=True)
921 'orch upgrade check',
922 'name=image,type=CephString,req=false '
923 'name=ceph_version,type=CephString,req=false',
924 desc
='Check service versions vs available and target containers')
925 def _upgrade_check(self
, image
=None, ceph_version
=None):
926 completion
= self
.upgrade_check(image
=image
, version
=ceph_version
)
927 self
._orchestrator
_wait
([completion
])
928 raise_if_exception(completion
)
929 return HandleCommandResult(stdout
=completion
.result_str())
932 'orch upgrade status',
933 desc
='Check service versions vs available and target containers')
934 def _upgrade_status(self
):
935 completion
= self
.upgrade_status()
936 self
._orchestrator
_wait
([completion
])
937 raise_if_exception(completion
)
939 'target_image': completion
.result
.target_image
,
940 'in_progress': completion
.result
.in_progress
,
941 'services_complete': completion
.result
.services_complete
,
942 'message': completion
.result
.message
,
944 out
= json
.dumps(r
, indent
=4)
945 return HandleCommandResult(stdout
=out
)
948 'orch upgrade start',
949 'name=image,type=CephString,req=false '
950 'name=ceph_version,type=CephString,req=false',
951 desc
='Initiate upgrade')
952 def _upgrade_start(self
, image
=None, ceph_version
=None):
953 completion
= self
.upgrade_start(image
, ceph_version
)
954 self
._orchestrator
_wait
([completion
])
955 raise_if_exception(completion
)
956 return HandleCommandResult(stdout
=completion
.result_str())
959 'orch upgrade pause',
960 desc
='Pause an in-progress upgrade')
961 def _upgrade_pause(self
):
962 completion
= self
.upgrade_pause()
963 self
._orchestrator
_wait
([completion
])
964 raise_if_exception(completion
)
965 return HandleCommandResult(stdout
=completion
.result_str())
968 'orch upgrade resume',
969 desc
='Resume paused upgrade')
970 def _upgrade_resume(self
):
971 completion
= self
.upgrade_resume()
972 self
._orchestrator
_wait
([completion
])
973 raise_if_exception(completion
)
974 return HandleCommandResult(stdout
=completion
.result_str())
978 desc
='Stop an in-progress upgrade')
979 def _upgrade_stop(self
):
980 completion
= self
.upgrade_stop()
981 self
._orchestrator
_wait
([completion
])
982 raise_if_exception(completion
)
983 return HandleCommandResult(stdout
=completion
.result_str())