6 from threading
import Event
7 from functools
import wraps
9 from mgr_util
import create_self_signed_cert
, verify_tls
, ServerConfigException
13 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, Type
, \
14 Any
, NamedTuple
, Iterator
, Set
, Sequence
15 from typing
import TYPE_CHECKING
, cast
17 TYPE_CHECKING
= False # just for type checking
25 import multiprocessing
.pool
31 from ceph
.deployment
import inventory
, translate
32 from ceph
.deployment
.drive_group
import DriveGroupSpec
33 from ceph
.deployment
.drive_selection
import selector
34 from ceph
.deployment
.service_spec
import \
35 HostPlacementSpec
, NFSServiceSpec
, ServiceSpec
, PlacementSpec
, assert_valid_host
37 from mgr_module
import MgrModule
, HandleCommandResult
39 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
44 from .nfs
import NFSGanesha
45 from .osd
import RemoveUtil
, OSDRemoval
51 import execnet
.gateway_bootstrap
52 except ImportError as e
:
54 remoto_import_error
= str(e
)
57 from typing
import List
61 logger
= logging
.getLogger(__name__
)
63 DEFAULT_SSH_CONFIG
= ('Host *\n'
65 'StrictHostKeyChecking no\n'
66 'UserKnownHostsFile /dev/null\n')
68 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%f'
69 CEPH_DATEFMT
= '%Y-%m-%dT%H:%M:%S.%fZ'
71 HOST_CACHE_PREFIX
= "host."
72 SPEC_STORE_PREFIX
= "spec."
74 # ceph daemon types that use the ceph container image.
75 # NOTE: listed in upgrade order!
76 CEPH_UPGRADE_ORDER
= ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
77 CEPH_TYPES
= set(CEPH_UPGRADE_ORDER
)
82 from tempfile
import TemporaryDirectory
# py3
84 # define a minimal (but sufficient) equivalent for <= py 3.2
85 class TemporaryDirectory(object): # type: ignore
87 self
.name
= tempfile
.mkdtemp()
91 self
.name
= tempfile
.mkdtemp()
95 shutil
.rmtree(self
.name
)
97 def __exit__(self
, exc_type
, exc_value
, traceback
):
102 def __init__(self
, mgr
):
103 # type: (CephadmOrchestrator) -> None
105 self
.specs
= {} # type: Dict[str, ServiceSpec]
106 self
.spec_created
= {} # type: Dict[str, datetime.datetime]
110 for k
, v
in six
.iteritems(self
.mgr
.get_store_prefix(SPEC_STORE_PREFIX
)):
111 service_name
= k
[len(SPEC_STORE_PREFIX
):]
114 spec
= ServiceSpec
.from_json(v
['spec'])
115 created
= datetime
.datetime
.strptime(v
['created'], DATEFMT
)
116 self
.specs
[service_name
] = spec
117 self
.spec_created
[service_name
] = created
118 self
.mgr
.log
.debug('SpecStore: loaded spec for %s' % (
120 except Exception as e
:
121 self
.mgr
.log
.warning('unable to load spec for %s: %s' % (
125 def save(self
, spec
):
126 # type: (ServiceSpec) -> None
127 self
.specs
[spec
.service_name()] = spec
128 self
.spec_created
[spec
.service_name()] = datetime
.datetime
.utcnow()
130 SPEC_STORE_PREFIX
+ spec
.service_name(),
132 'spec': spec
.to_json(),
133 'created': self
.spec_created
[spec
.service_name()].strftime(DATEFMT
),
137 def rm(self
, service_name
):
138 # type: (str) -> None
139 if service_name
in self
.specs
:
140 del self
.specs
[service_name
]
141 del self
.spec_created
[service_name
]
142 self
.mgr
.set_store(SPEC_STORE_PREFIX
+ service_name
, None)
144 def find(self
, service_name
=None):
145 # type: (Optional[str]) -> List[ServiceSpec]
147 for sn
, spec
in self
.specs
.items():
148 if not service_name
or \
149 sn
== service_name
or \
150 sn
.startswith(service_name
+ '.'):
152 self
.mgr
.log
.debug('SpecStore: find spec for %s returned: %s' % (
153 service_name
, specs
))
157 def __init__(self
, mgr
):
158 # type: (CephadmOrchestrator) -> None
160 self
.daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
161 self
.last_daemon_update
= {} # type: Dict[str, datetime.datetime]
162 self
.devices
= {} # type: Dict[str, List[inventory.Device]]
163 self
.networks
= {} # type: Dict[str, Dict[str, List[str]]]
164 self
.last_device_update
= {} # type: Dict[str, datetime.datetime]
165 self
.daemon_refresh_queue
= [] # type: List[str]
166 self
.device_refresh_queue
= [] # type: List[str]
167 self
.daemon_config_deps
= {} # type: Dict[str, Dict[str, Dict[str,Any]]]
168 self
.last_host_check
= {} # type: Dict[str, datetime.datetime]
172 for k
, v
in six
.iteritems(self
.mgr
.get_store_prefix(HOST_CACHE_PREFIX
)):
173 host
= k
[len(HOST_CACHE_PREFIX
):]
174 if host
not in self
.mgr
.inventory
:
175 self
.mgr
.log
.warning('removing stray HostCache host record %s' % (
177 self
.mgr
.set_store(k
, None)
180 if 'last_device_update' in j
:
181 self
.last_device_update
[host
] = datetime
.datetime
.strptime(
182 j
['last_device_update'], DATEFMT
)
184 self
.device_refresh_queue
.append(host
)
185 # for services, we ignore the persisted last_*_update
186 # and always trigger a new scrape on mgr restart.
187 self
.daemon_refresh_queue
.append(host
)
188 self
.daemons
[host
] = {}
189 self
.devices
[host
] = []
190 self
.networks
[host
] = {}
191 self
.daemon_config_deps
[host
] = {}
192 for name
, d
in j
.get('daemons', {}).items():
193 self
.daemons
[host
][name
] = \
194 orchestrator
.DaemonDescription
.from_json(d
)
195 for d
in j
.get('devices', []):
196 self
.devices
[host
].append(inventory
.Device
.from_json(d
))
197 self
.networks
[host
] = j
.get('networks', {})
198 for name
, d
in j
.get('daemon_config_deps', {}).items():
199 self
.daemon_config_deps
[host
][name
] = {
200 'deps': d
.get('deps', []),
201 'last_config': datetime
.datetime
.strptime(
202 d
['last_config'], DATEFMT
),
204 if 'last_host_check' in j
:
205 self
.last_host_check
[host
] = datetime
.datetime
.strptime(
206 j
['last_host_check'], DATEFMT
)
208 'HostCache.load: host %s has %d daemons, '
209 '%d devices, %d networks' % (
210 host
, len(self
.daemons
[host
]), len(self
.devices
[host
]),
211 len(self
.networks
[host
])))
212 except Exception as e
:
213 self
.mgr
.log
.warning('unable to load cached state for %s: %s' % (
217 def update_host_daemons(self
, host
, dm
):
218 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
219 self
.daemons
[host
] = dm
220 self
.last_daemon_update
[host
] = datetime
.datetime
.utcnow()
222 def update_host_devices_networks(self
, host
, dls
, nets
):
223 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
224 self
.devices
[host
] = dls
225 self
.networks
[host
] = nets
226 self
.last_device_update
[host
] = datetime
.datetime
.utcnow()
228 def update_daemon_config_deps(self
, host
, name
, deps
, stamp
):
229 self
.daemon_config_deps
[host
][name
] = {
231 'last_config': stamp
,
234 def update_last_host_check(self
, host
):
235 # type: (str) -> None
236 self
.last_host_check
[host
] = datetime
.datetime
.utcnow()
238 def prime_empty_host(self
, host
):
239 # type: (str) -> None
241 Install an empty entry for a host
243 self
.daemons
[host
] = {}
244 self
.devices
[host
] = []
245 self
.networks
[host
] = {}
246 self
.daemon_config_deps
[host
] = {}
247 self
.daemon_refresh_queue
.append(host
)
248 self
.device_refresh_queue
.append(host
)
250 def invalidate_host_daemons(self
, host
):
251 # type: (str) -> None
252 self
.daemon_refresh_queue
.append(host
)
253 if host
in self
.last_daemon_update
:
254 del self
.last_daemon_update
[host
]
257 def invalidate_host_devices(self
, host
):
258 # type: (str) -> None
259 self
.device_refresh_queue
.append(host
)
260 if host
in self
.last_device_update
:
261 del self
.last_device_update
[host
]
264 def save_host(self
, host
):
265 # type: (str) -> None
269 'daemon_config_deps': {},
271 if host
in self
.last_daemon_update
:
272 j
['last_daemon_update'] = self
.last_daemon_update
[host
].strftime(DATEFMT
) # type: ignore
273 if host
in self
.last_device_update
:
274 j
['last_device_update'] = self
.last_device_update
[host
].strftime(DATEFMT
) # type: ignore
275 for name
, dd
in self
.daemons
[host
].items():
276 j
['daemons'][name
] = dd
.to_json() # type: ignore
277 for d
in self
.devices
[host
]:
278 j
['devices'].append(d
.to_json()) # type: ignore
279 j
['networks'] = self
.networks
[host
]
280 for name
, depi
in self
.daemon_config_deps
[host
].items():
281 j
['daemon_config_deps'][name
] = { # type: ignore
282 'deps': depi
.get('deps', []),
283 'last_config': depi
['last_config'].strftime(DATEFMT
),
285 if host
in self
.last_host_check
:
286 j
['last_host_check']= self
.last_host_check
[host
].strftime(DATEFMT
)
287 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, json
.dumps(j
))
289 def rm_host(self
, host
):
290 # type: (str) -> None
291 if host
in self
.daemons
:
292 del self
.daemons
[host
]
293 if host
in self
.devices
:
294 del self
.devices
[host
]
295 if host
in self
.networks
:
296 del self
.networks
[host
]
297 if host
in self
.last_daemon_update
:
298 del self
.last_daemon_update
[host
]
299 if host
in self
.last_device_update
:
300 del self
.last_device_update
[host
]
301 if host
in self
.daemon_config_deps
:
302 del self
.daemon_config_deps
[host
]
303 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, None)
306 # type: () -> List[str]
308 for host
, di
in self
.daemons
.items():
312 def get_daemons(self
):
313 # type: () -> List[orchestrator.DaemonDescription]
315 for host
, dm
in self
.daemons
.items():
316 for name
, dd
in dm
.items():
320 def get_daemons_by_service(self
, service_name
):
321 # type: (str) -> List[orchestrator.DaemonDescription]
322 result
= [] # type: List[orchestrator.DaemonDescription]
323 for host
, dm
in self
.daemons
.items():
324 for name
, d
in dm
.items():
325 if name
.startswith(service_name
+ '.'):
329 def get_daemon_names(self
):
330 # type: () -> List[str]
332 for host
, dm
in self
.daemons
.items():
333 for name
, dd
in dm
.items():
337 def get_daemon_last_config_deps(self
, host
, name
):
338 if host
in self
.daemon_config_deps
:
339 if name
in self
.daemon_config_deps
[host
]:
340 return self
.daemon_config_deps
[host
][name
].get('deps', []), \
341 self
.daemon_config_deps
[host
][name
].get('last_config', None)
344 def host_needs_daemon_refresh(self
, host
):
345 # type: (str) -> bool
346 if host
in self
.daemon_refresh_queue
:
347 self
.daemon_refresh_queue
.remove(host
)
349 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
350 seconds
=self
.mgr
.daemon_cache_timeout
)
351 if host
not in self
.last_daemon_update
or self
.last_daemon_update
[host
] < cutoff
:
355 def host_needs_device_refresh(self
, host
):
356 # type: (str) -> bool
357 if host
in self
.device_refresh_queue
:
358 self
.device_refresh_queue
.remove(host
)
360 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
361 seconds
=self
.mgr
.device_cache_timeout
)
362 if host
not in self
.last_device_update
or self
.last_device_update
[host
] < cutoff
:
366 def host_needs_check(self
, host
):
367 # type: (str) -> bool
368 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
369 seconds
=self
.mgr
.host_check_interval
)
370 return host
not in self
.last_host_check
or self
.last_host_check
[host
] < cutoff
372 def add_daemon(self
, host
, dd
):
373 # type: (str, orchestrator.DaemonDescription) -> None
374 assert host
in self
.daemons
375 self
.daemons
[host
][dd
.name()] = dd
377 def rm_daemon(self
, host
, name
):
378 if host
in self
.daemons
:
379 if name
in self
.daemons
[host
]:
380 del self
.daemons
[host
][name
]
383 class AsyncCompletion(orchestrator
.Completion
):
385 _first_promise
=None, # type: Optional[orchestrator.Completion]
386 value
=orchestrator
._Promise
.NO_RESULT
, # type: Any
387 on_complete
=None, # type: Optional[Callable]
388 name
=None, # type: Optional[str]
389 many
=False, # type: bool
390 update_progress
=False, # type: bool
393 assert CephadmOrchestrator
.instance
is not None
395 self
.update_progress
= update_progress
396 if name
is None and on_complete
is not None:
397 name
= getattr(on_complete
, '__name__', None)
398 super(AsyncCompletion
, self
).__init
__(_first_promise
, value
, on_complete
, name
)
401 def _progress_reference(self
):
402 # type: () -> Optional[orchestrator.ProgressReference]
403 if hasattr(self
._on
_complete
_, 'progress_id'): # type: ignore
404 return self
._on
_complete
_ # type: ignore
408 def _on_complete(self
):
409 # type: () -> Optional[Callable]
410 if self
._on
_complete
_ is None:
413 def callback(result
):
415 if self
.update_progress
:
416 assert self
.progress_reference
417 self
.progress_reference
.progress
= 1.0
418 self
._on
_complete
_ = None
419 self
._finalize
(result
)
420 except Exception as e
:
424 logger
.exception(f
'failed to fail AsyncCompletion: >{repr(self)}<')
425 if 'UNITTEST' in os
.environ
:
428 def error_callback(e
):
432 def do_work(*args
, **kwargs
):
433 assert self
._on
_complete
_ is not None
435 res
= self
._on
_complete
_(*args
, **kwargs
)
436 if self
.update_progress
and self
.many
:
437 assert self
.progress_reference
438 self
.progress_reference
.progress
+= 1.0 / len(value
)
440 except Exception as e
:
444 assert CephadmOrchestrator
.instance
447 logger
.info('calling map_async without values')
450 CephadmOrchestrator
.instance
._worker
_pool
.map_async(do_work
, value
,
452 error_callback
=error_callback
)
454 CephadmOrchestrator
.instance
._worker
_pool
.map_async(do_work
, value
,
458 CephadmOrchestrator
.instance
._worker
_pool
.apply_async(do_work
, (value
,),
459 callback
=callback
, error_callback
=error_callback
)
461 CephadmOrchestrator
.instance
._worker
_pool
.apply_async(do_work
, (value
,),
463 return self
.ASYNC_RESULT
468 def _on_complete(self
, inner
):
469 # type: (Callable) -> None
470 self
._on
_complete
_ = inner
473 def ssh_completion(cls
=AsyncCompletion
, **c_kwargs
):
474 # type: (Type[orchestrator.Completion], Any) -> Callable
476 See ./HACKING.rst for a how-to
483 many
= c_kwargs
.get('many', False)
485 # Some weired logic to make calling functions with multiple arguments work.
488 if many
and value
and isinstance(value
[0], tuple):
489 return cls(on_complete
=lambda x
: f(*x
), value
=value
, name
=name
, **c_kwargs
)
491 return cls(on_complete
=f
, value
=value
, name
=name
, **c_kwargs
)
496 def call_self(inner_args
):
497 if not isinstance(inner_args
, tuple):
498 inner_args
= (inner_args
, )
499 return f(self
, *inner_args
)
501 return cls(on_complete
=call_self
, value
=value
, name
=name
, **c_kwargs
)
503 return cls(on_complete
=lambda x
: f(*x
), value
=args
, name
=name
, **c_kwargs
)
509 def async_completion(f
):
510 # type: (Callable) -> Callable[..., AsyncCompletion]
512 See ./HACKING.rst for a how-to
514 :param f: wrapped function
516 return ssh_completion()(f
)
519 def async_map_completion(f
):
520 # type: (Callable) -> Callable[..., AsyncCompletion]
522 See ./HACKING.rst for a how-to
524 :param f: wrapped function
529 ... return lambda x: map(f, x)
532 return ssh_completion(many
=True)(f
)
535 def trivial_completion(f
):
536 # type: (Callable) -> Callable[..., orchestrator.Completion]
538 def wrapper(*args
, **kwargs
):
539 return AsyncCompletion(value
=f(*args
, **kwargs
), name
=f
.__name
__)
543 @six.add_metaclass(CLICommandMeta
)
544 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
):
546 _STORE_HOST_PREFIX
= "host"
549 NATIVE_OPTIONS
= [] # type: List[Any]
552 'name': 'ssh_config_file',
555 'desc': 'customized SSH config file to connect to managed hosts',
558 'name': 'device_cache_timeout',
561 'desc': 'seconds to cache device inventory',
564 'name': 'daemon_cache_timeout',
567 'desc': 'seconds to cache service (daemon) inventory',
570 'name': 'host_check_interval',
573 'desc': 'how frequently to perform a host check',
578 'enum_allowed': ['root', 'cephadm-package'],
580 'desc': 'mode for remote execution of cephadm',
583 'name': 'container_image_base',
584 'default': 'docker.io/ceph/ceph',
585 'desc': 'Container image name, without the tag',
589 'name': 'warn_on_stray_hosts',
592 'desc': 'raise a health warning if daemons are detected on a host '
593 'that is not managed by cephadm',
596 'name': 'warn_on_stray_daemons',
599 'desc': 'raise a health warning if daemons are detected '
600 'that are not managed by cephadm',
603 'name': 'warn_on_failed_host_check',
606 'desc': 'raise a health warning if the host check fails',
609 'name': 'log_to_cluster',
612 'desc': 'log to the "cephadm" cluster log channel"',
615 'name': 'allow_ptrace',
618 'desc': 'allow SYS_PTRACE capability on ceph containers',
619 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
620 'process with gdb or strace. Enabling this options '
621 'can allow debugging daemons that encounter problems '
625 'name': 'prometheus_alerts_path',
627 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
628 'desc': 'location of alerts to include in prometheus deployments',
632 def __init__(self
, *args
, **kwargs
):
633 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
634 self
._cluster
_fsid
= self
.get('mon_map')['fsid']
640 if self
.get_store('pause'):
645 # for mypy which does not run the code
647 self
.ssh_config_file
= None # type: Optional[str]
648 self
.device_cache_timeout
= 0
649 self
.daemon_cache_timeout
= 0
650 self
.host_check_interval
= 0
652 self
.container_image_base
= ''
653 self
.warn_on_stray_hosts
= True
654 self
.warn_on_stray_daemons
= True
655 self
.warn_on_failed_host_check
= True
656 self
.allow_ptrace
= False
657 self
.prometheus_alerts_path
= ''
659 self
._cons
= {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
663 path
= self
.get_ceph_option('cephadm_path')
665 with
open(path
, 'r') as f
:
666 self
._cephadm
= f
.read()
667 except (IOError, TypeError) as e
:
668 raise RuntimeError("unable to read cephadm at '%s': %s" % (
671 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
675 CephadmOrchestrator
.instance
= self
677 t
= self
.get_store('upgrade_state')
679 self
.upgrade_state
= json
.loads(t
)
681 self
.upgrade_state
= None
683 self
.health_checks
= {}
685 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
688 i
= self
.get_store('inventory')
690 self
.inventory
: Dict
[str, dict] = json
.loads(i
)
692 self
.inventory
= dict()
693 self
.log
.debug('Loaded inventory %s' % self
.inventory
)
695 self
.cache
= HostCache(self
)
697 self
.rm_util
= RemoveUtil(self
)
699 self
.spec_store
= SpecStore(self
)
700 self
.spec_store
.load()
702 # ensure the host lists are in sync
703 for h
in self
.inventory
.keys():
704 if h
not in self
.cache
.daemons
:
705 self
.cache
.prime_empty_host(h
)
706 for h
in self
.cache
.get_hosts():
707 if h
not in self
.inventory
:
708 self
.cache
.rm_host(h
)
711 self
.log
.debug('shutdown')
712 self
._worker
_pool
.close()
713 self
._worker
_pool
.join()
717 def _kick_serve_loop(self
):
718 self
.log
.debug('_kick_serve_loop')
721 def _check_safe_to_destroy_mon(self
, mon_id
):
722 # type: (str) -> None
723 ret
, out
, err
= self
.mon_command({
724 'prefix': 'quorum_status',
727 raise OrchestratorError('failed to check mon quorum status')
730 except Exception as e
:
731 raise OrchestratorError('failed to parse quorum status')
733 mons
= [m
['name'] for m
in j
['monmap']['mons']]
734 if mon_id
not in mons
:
735 self
.log
.info('Safe to remove mon.%s: not in monmap (%s)' % (
738 new_mons
= [m
for m
in mons
if m
!= mon_id
]
739 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
740 if len(new_quorum
) > len(new_mons
) / 2:
741 self
.log
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id
, new_quorum
, new_mons
))
743 raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
745 def _wait_for_ok_to_stop(self
, s
):
746 # only wait a little bit; the service might go away for something
749 if s
.daemon_type
not in ['mon', 'osd', 'mds']:
750 self
.log
.info('Upgrade: It is presumed safe to stop %s.%s' %
751 (s
.daemon_type
, s
.daemon_id
))
753 ret
, out
, err
= self
.mon_command({
754 'prefix': '%s ok-to-stop' % s
.daemon_type
,
755 'ids': [s
.daemon_id
],
757 if not self
.upgrade_state
or self
.upgrade_state
.get('paused'):
760 self
.log
.info('Upgrade: It is NOT safe to stop %s.%s' %
761 (s
.daemon_type
, s
.daemon_id
))
765 self
.log
.info('Upgrade: It is safe to stop %s.%s' %
766 (s
.daemon_type
, s
.daemon_id
))
770 def _clear_upgrade_health_checks(self
):
771 for k
in ['UPGRADE_NO_STANDBY_MGR',
772 'UPGRADE_FAILED_PULL']:
773 if k
in self
.health_checks
:
774 del self
.health_checks
[k
]
775 self
.set_health_checks(self
.health_checks
)
777 def _fail_upgrade(self
, alert_id
, alert
):
778 self
.log
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
780 self
.upgrade_state
['error'] = alert_id
+ ': ' + alert
['summary']
781 self
.upgrade_state
['paused'] = True
782 self
._save
_upgrade
_state
()
783 self
.health_checks
[alert_id
] = alert
784 self
.set_health_checks(self
.health_checks
)
786 def _update_upgrade_progress(self
, progress
):
787 if 'progress_id' not in self
.upgrade_state
:
788 self
.upgrade_state
['progress_id'] = str(uuid
.uuid4())
789 self
._save
_upgrade
_state
()
790 self
.remote('progress', 'update', self
.upgrade_state
['progress_id'],
791 ev_msg
='Upgrade to %s' % self
.upgrade_state
['target_name'],
792 ev_progress
=progress
)
794 def _do_upgrade(self
):
796 if not self
.upgrade_state
:
797 self
.log
.debug('_do_upgrade no state, exiting')
800 target_name
= self
.upgrade_state
.get('target_name')
801 target_id
= self
.upgrade_state
.get('target_id', None)
803 # need to learn the container hash
804 self
.log
.info('Upgrade: First pull of %s' % target_name
)
806 target_id
, target_version
= self
._get
_container
_image
_id
(target_name
)
807 except OrchestratorError
as e
:
808 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
809 'severity': 'warning',
810 'summary': 'Upgrade: failed to pull target image',
815 self
.upgrade_state
['target_id'] = target_id
816 self
.upgrade_state
['target_version'] = target_version
817 self
._save
_upgrade
_state
()
818 target_version
= self
.upgrade_state
.get('target_version')
819 self
.log
.info('Upgrade: Target is %s with id %s' % (target_name
,
822 # get all distinct container_image settings
824 ret
, out
, err
= self
.mon_command({
825 'prefix': 'config dump',
828 config
= json
.loads(out
)
830 if opt
['name'] == 'container_image':
831 image_settings
[opt
['section']] = opt
['value']
833 daemons
= self
.cache
.get_daemons()
835 for daemon_type
in CEPH_UPGRADE_ORDER
:
836 self
.log
.info('Upgrade: Checking %s daemons...' % daemon_type
)
837 need_upgrade_self
= False
839 if d
.daemon_type
!= daemon_type
:
841 if d
.container_image_id
== target_id
:
842 self
.log
.debug('daemon %s.%s version correct' % (
843 daemon_type
, d
.daemon_id
))
846 self
.log
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
847 daemon_type
, d
.daemon_id
,
848 d
.container_image_name
, d
.container_image_id
, d
.version
))
850 if daemon_type
== 'mgr' and \
851 d
.daemon_id
== self
.get_mgr_id():
852 self
.log
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
854 need_upgrade_self
= True
857 # make sure host has latest container image
858 out
, err
, code
= self
._run
_cephadm
(
859 d
.hostname
, None, 'inspect-image', [],
860 image
=target_name
, no_fsid
=True, error_ok
=True)
861 if code
or json
.loads(''.join(out
)).get('image_id') != target_id
:
862 self
.log
.info('Upgrade: Pulling %s on %s' % (target_name
,
864 out
, err
, code
= self
._run
_cephadm
(
865 d
.hostname
, None, 'pull', [],
866 image
=target_name
, no_fsid
=True, error_ok
=True)
868 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
869 'severity': 'warning',
870 'summary': 'Upgrade: failed to pull target image',
873 'failed to pull %s on host %s' % (target_name
,
877 r
= json
.loads(''.join(out
))
878 if r
.get('image_id') != target_id
:
879 self
.log
.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name
, d
.hostname
, r
['image_id'], target_id
))
880 self
.upgrade_state
['target_id'] = r
['image_id']
881 self
._save
_upgrade
_state
()
884 self
._update
_upgrade
_progress
(done
/ len(daemons
))
886 if not d
.container_image_id
:
887 if d
.container_image_name
== target_name
:
888 self
.log
.debug('daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
890 if not self
._wait
_for
_ok
_to
_stop
(d
):
892 self
.log
.info('Upgrade: Redeploying %s.%s' %
893 (d
.daemon_type
, d
.daemon_id
))
894 ret
, out
, err
= self
.mon_command({
895 'prefix': 'config set',
896 'name': 'container_image',
897 'value': target_name
,
898 'who': utils
.name_to_config_section(daemon_type
+ '.' + d
.daemon_id
),
908 if need_upgrade_self
:
909 mgr_map
= self
.get('mgr_map')
910 num
= len(mgr_map
.get('standbys'))
912 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
913 'severity': 'warning',
914 'summary': 'Upgrade: Need standby mgr daemon',
917 'The upgrade process needs to upgrade the mgr, '
918 'but it needs at least one standby to proceed.',
923 self
.log
.info('Upgrade: there are %d other already-upgraded '
924 'standby mgrs, failing over' % num
)
926 self
._update
_upgrade
_progress
(done
/ len(daemons
))
929 ret
, out
, err
= self
.mon_command({
930 'prefix': 'mgr fail',
931 'who': self
.get_mgr_id(),
934 elif daemon_type
== 'mgr':
935 if 'UPGRADE_NO_STANDBY_MGR' in self
.health_checks
:
936 del self
.health_checks
['UPGRADE_NO_STANDBY_MGR']
937 self
.set_health_checks(self
.health_checks
)
939 # make sure 'ceph versions' agrees
940 ret
, out
, err
= self
.mon_command({
941 'prefix': 'versions',
944 for version
, count
in j
.get(daemon_type
, {}).items():
945 if version
!= target_version
:
947 'Upgrade: %d %s daemon(s) are %s != target %s' %
948 (count
, daemon_type
, version
, target_version
))
951 if image_settings
.get(daemon_type
) != target_name
:
952 self
.log
.info('Upgrade: Setting container_image for all %s...' %
954 ret
, out
, err
= self
.mon_command({
955 'prefix': 'config set',
956 'name': 'container_image',
957 'value': target_name
,
961 for section
in image_settings
.keys():
962 if section
.startswith(utils
.name_to_config_section(daemon_type
) + '.'):
963 to_clean
.append(section
)
965 self
.log
.debug('Upgrade: Cleaning up container_image for %s...' %
967 for section
in to_clean
:
968 ret
, image
, err
= self
.mon_command({
969 'prefix': 'config rm',
970 'name': 'container_image',
974 self
.log
.info('Upgrade: All %s daemons are up to date.' %
978 self
.log
.info('Upgrade: Finalizing container_image settings')
979 ret
, out
, err
= self
.mon_command({
980 'prefix': 'config set',
981 'name': 'container_image',
982 'value': target_name
,
985 for daemon_type
in CEPH_UPGRADE_ORDER
:
986 ret
, image
, err
= self
.mon_command({
987 'prefix': 'config rm',
988 'name': 'container_image',
989 'who': utils
.name_to_config_section(daemon_type
),
992 self
.log
.info('Upgrade: Complete!')
993 if 'progress_id' in self
.upgrade_state
:
994 self
.remote('progress', 'complete',
995 self
.upgrade_state
['progress_id'])
996 self
.upgrade_state
= None
997 self
._save
_upgrade
_state
()
1000 def _check_hosts(self
):
1001 self
.log
.debug('_check_hosts')
1003 hosts
= self
.inventory
.keys()
1005 if host
not in self
.inventory
:
1007 self
.log
.debug(' checking %s' % host
)
1009 out
, err
, code
= self
._run
_cephadm
(
1010 host
, 'client', 'check-host', [],
1011 error_ok
=True, no_fsid
=True)
1013 self
.log
.debug(' host %s failed check' % host
)
1014 if self
.warn_on_failed_host_check
:
1015 bad_hosts
.append('host %s failed check: %s' % (host
, err
))
1017 self
.log
.debug(' host %s ok' % host
)
1018 except Exception as e
:
1019 self
.log
.debug(' host %s failed check' % host
)
1020 bad_hosts
.append('host %s failed check: %s' % (host
, e
))
1021 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1022 del self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']
1024 self
.health_checks
['CEPHADM_HOST_CHECK_FAILED'] = {
1025 'severity': 'warning',
1026 'summary': '%d hosts fail cephadm check' % len(bad_hosts
),
1027 'count': len(bad_hosts
),
1028 'detail': bad_hosts
,
1030 self
.set_health_checks(self
.health_checks
)
1032 def _check_host(self
, host
):
1033 if host
not in self
.inventory
:
1035 self
.log
.debug(' checking %s' % host
)
1037 out
, err
, code
= self
._run
_cephadm
(
1038 host
, 'client', 'check-host', [],
1039 error_ok
=True, no_fsid
=True)
1040 self
.cache
.update_last_host_check(host
)
1041 self
.cache
.save_host(host
)
1043 self
.log
.debug(' host %s failed check' % host
)
1044 if self
.warn_on_failed_host_check
:
1045 return 'host %s failed check: %s' % (host
, err
)
1047 self
.log
.debug(' host %s ok' % host
)
1048 except Exception as e
:
1049 self
.log
.debug(' host %s failed check' % host
)
1050 return 'host %s failed check: %s' % (host
, e
)
1052 def _check_for_strays(self
):
1053 self
.log
.debug('_check_for_strays')
1054 for k
in ['CEPHADM_STRAY_HOST',
1055 'CEPHADM_STRAY_DAEMON']:
1056 if k
in self
.health_checks
:
1057 del self
.health_checks
[k
]
1058 if self
.warn_on_stray_hosts
or self
.warn_on_stray_daemons
:
1059 ls
= self
.list_servers()
1060 managed
= self
.cache
.get_daemon_names()
1061 host_detail
= [] # type: List[str]
1062 host_num_daemons
= 0
1063 daemon_detail
= [] # type: List[str]
1065 host
= item
.get('hostname')
1066 daemons
= item
.get('services') # misnomer!
1069 name
= '%s.%s' % (s
.get('type'), s
.get('id'))
1070 if host
not in self
.inventory
:
1071 missing_names
.append(name
)
1072 host_num_daemons
+= 1
1073 if name
not in managed
:
1074 daemon_detail
.append(
1075 'stray daemon %s on host %s not managed by cephadm' % (name
, host
))
1078 'stray host %s has %d stray daemons: %s' % (
1079 host
, len(missing_names
), missing_names
))
1081 self
.health_checks
['CEPHADM_STRAY_HOST'] = {
1082 'severity': 'warning',
1083 'summary': '%d stray host(s) with %s daemon(s) '
1084 'not managed by cephadm' % (
1085 len(host_detail
), host_num_daemons
),
1086 'count': len(host_detail
),
1087 'detail': host_detail
,
1090 self
.health_checks
['CEPHADM_STRAY_DAEMON'] = {
1091 'severity': 'warning',
1092 'summary': '%d stray daemons(s) not managed by cephadm' % (
1093 len(daemon_detail
)),
1094 'count': len(daemon_detail
),
1095 'detail': daemon_detail
,
1097 self
.set_health_checks(self
.health_checks
)
1099 def _serve_sleep(self
):
1100 sleep_interval
= 600
1101 self
.log
.debug('Sleeping for %d seconds', sleep_interval
)
1102 ret
= self
.event
.wait(sleep_interval
)
1107 self
.log
.debug("serve starting")
1111 self
.log
.debug('refreshing hosts')
1114 for host
in self
.cache
.get_hosts():
1115 if self
.cache
.host_needs_check(host
):
1116 r
= self
._check
_host
(host
)
1119 if self
.cache
.host_needs_daemon_refresh(host
):
1120 self
.log
.debug('refreshing %s daemons' % host
)
1121 r
= self
._refresh
_host
_daemons
(host
)
1124 if self
.cache
.host_needs_device_refresh(host
):
1125 self
.log
.debug('refreshing %s devices' % host
)
1126 r
= self
._refresh
_host
_devices
(host
)
1130 health_changed
= False
1131 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1132 del self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']
1133 health_changed
= True
1135 self
.health_checks
['CEPHADM_HOST_CHECK_FAILED'] = {
1136 'severity': 'warning',
1137 'summary': '%d hosts fail cephadm check' % len(bad_hosts
),
1138 'count': len(bad_hosts
),
1139 'detail': bad_hosts
,
1141 health_changed
= True
1143 self
.health_checks
['CEPHADM_REFRESH_FAILED'] = {
1144 'severity': 'warning',
1145 'summary': 'failed to probe daemons or devices',
1146 'count': len(failures
),
1149 health_changed
= True
1150 elif 'CEPHADM_REFRESH_FAILED' in self
.health_checks
:
1151 del self
.health_checks
['CEPHADM_REFRESH_FAILED']
1152 health_changed
= True
1154 self
.set_health_checks(self
.health_checks
)
1158 self
._check
_for
_strays
()
1161 self
.health_checks
['CEPHADM_PAUSED'] = {
1162 'severity': 'warning',
1163 'summary': 'cephadm background work is paused',
1165 'detail': ["'ceph orch resume' to resume"],
1167 self
.set_health_checks(self
.health_checks
)
1169 if 'CEPHADM_PAUSED' in self
.health_checks
:
1170 del self
.health_checks
['CEPHADM_PAUSED']
1171 self
.set_health_checks(self
.health_checks
)
1173 self
.rm_util
._remove
_osds
_bg
()
1175 if self
._apply
_all
_services
():
1176 continue # did something, refresh
1178 self
._check
_daemons
()
1180 if self
.upgrade_state
and not self
.upgrade_state
.get('paused'):
1185 self
.log
.debug("serve exit")
1187 def config_notify(self
):
1189 This method is called whenever one of our config options is changed.
1191 for opt
in self
.MODULE_OPTIONS
:
1193 opt
['name'], # type: ignore
1194 self
.get_module_option(opt
['name'])) # type: ignore
1195 self
.log
.debug(' mgr option %s = %s',
1196 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
1197 for opt
in self
.NATIVE_OPTIONS
:
1200 self
.get_ceph_option(opt
))
1201 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
1205 def notify(self
, notify_type
, notify_id
):
1210 self
.log
.info('Paused')
1211 self
.set_store('pause', 'true')
1213 # wake loop so we update the health status
1214 self
._kick
_serve
_loop
()
1218 self
.log
.info('Resumed')
1220 self
.set_store('pause', None)
1221 # unconditionally wake loop so that 'orch resume' can be used to kick
1223 self
._kick
_serve
_loop
()
1225 def get_unique_name(self
, daemon_type
, host
, existing
, prefix
=None,
1227 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
1229 Generate a unique random service name
1231 suffix
= daemon_type
not in [
1232 'mon', 'crash', 'nfs',
1233 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
1236 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
1237 raise orchestrator
.OrchestratorValidationError('name %s already in use', forcename
)
1241 host
= host
.split('.')[0]
1249 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
1251 if len([d
for d
in existing
if d
.daemon_id
== name
]):
1253 raise orchestrator
.OrchestratorValidationError('name %s already in use', name
)
1254 self
.log
.debug('name %s exists, trying again', name
)
1258 def get_service_name(self
, daemon_type
, daemon_id
, host
):
1259 # type: (str, str, str) -> (str)
1261 Returns the generic service name
1263 p
= re
.compile(r
'(.*)\.%s.*' % (host
))
1264 p
.sub(r
'\1', daemon_id
)
1265 return '%s.%s' % (daemon_type
, p
.sub(r
'\1', daemon_id
))
1267 def _save_inventory(self
):
1268 self
.set_store('inventory', json
.dumps(self
.inventory
))
1270 def _save_upgrade_state(self
):
1271 self
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
))
1273 def _reconfig_ssh(self
):
1274 temp_files
= [] # type: list
1275 ssh_options
= [] # type: List[str]
1278 ssh_config_fname
= self
.ssh_config_file
1279 ssh_config
= self
.get_store("ssh_config")
1280 if ssh_config
is not None or ssh_config_fname
is None:
1282 ssh_config
= DEFAULT_SSH_CONFIG
1283 f
= tempfile
.NamedTemporaryFile(prefix
='cephadm-conf-')
1284 os
.fchmod(f
.fileno(), 0o600)
1285 f
.write(ssh_config
.encode('utf-8'))
1286 f
.flush() # make visible to other processes
1288 ssh_config_fname
= f
.name
1289 if ssh_config_fname
:
1290 self
.validate_ssh_config_fname(ssh_config_fname
)
1291 ssh_options
+= ['-F', ssh_config_fname
]
1294 ssh_key
= self
.get_store("ssh_identity_key")
1295 ssh_pub
= self
.get_store("ssh_identity_pub")
1296 self
.ssh_pub
= ssh_pub
1297 self
.ssh_key
= ssh_key
1298 if ssh_key
and ssh_pub
:
1299 tkey
= tempfile
.NamedTemporaryFile(prefix
='cephadm-identity-')
1300 tkey
.write(ssh_key
.encode('utf-8'))
1301 os
.fchmod(tkey
.fileno(), 0o600)
1302 tkey
.flush() # make visible to other processes
1303 tpub
= open(tkey
.name
+ '.pub', 'w')
1304 os
.fchmod(tpub
.fileno(), 0o600)
1306 tpub
.flush() # make visible to other processes
1307 temp_files
+= [tkey
, tpub
]
1308 ssh_options
+= ['-i', tkey
.name
]
1310 self
._temp
_files
= temp_files
1312 self
._ssh
_options
= ' '.join(ssh_options
) # type: Optional[str]
1314 self
._ssh
_options
= None
1316 if self
.mode
== 'root':
1317 self
.ssh_user
= 'root'
1318 elif self
.mode
== 'cephadm-package':
1319 self
.ssh_user
= 'cephadm'
1323 def validate_ssh_config_fname(self
, ssh_config_fname
):
1324 if not os
.path
.isfile(ssh_config_fname
):
1325 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
1328 def _reset_con(self
, host
):
1329 conn
, r
= self
._cons
.get(host
, (None, None))
1331 self
.log
.debug('_reset_con close %s' % host
)
1333 del self
._cons
[host
]
1335 def _reset_cons(self
):
1336 for host
, conn_and_r
in self
._cons
.items():
1337 self
.log
.debug('_reset_cons close %s' % host
)
1338 conn
, r
= conn_and_r
1344 if remoto
is not None:
1347 return False, "loading remoto library:{}".format(
1348 remoto_import_error
)
1350 def available(self
):
1352 The cephadm orchestrator is always available.
1354 return self
.can_run()
1356 def process(self
, completions
):
1358 Does nothing, as completions are processed in another thread.
1361 self
.log
.debug("process: completions={0}".format(orchestrator
.pretty_print(completions
)))
1363 for p
in completions
:
1366 def _require_hosts(self
, hosts
):
1368 Raise an error if any of the given hosts are unregistered.
1370 if isinstance(hosts
, six
.string_types
):
1372 keys
= self
.inventory
.keys()
1373 unregistered_hosts
= set(hosts
) - keys
1374 if unregistered_hosts
:
1375 logger
.warning('keys = {}'.format(keys
))
1376 raise RuntimeError("Host(s) {} not registered".format(
1377 ", ".join(map(lambda h
: "'{}'".format(h
),
1378 unregistered_hosts
))))
1380 @orchestrator._cli
_write
_command
(
1381 prefix
='cephadm set-ssh-config',
1382 desc
='Set the ssh_config file (use -i <ssh_config>)')
1383 def _set_ssh_config(self
, inbuf
=None):
1385 Set an ssh_config file provided from stdin
1390 if inbuf
is None or len(inbuf
) == 0:
1391 return -errno
.EINVAL
, "", "empty ssh config provided"
1392 self
.set_store("ssh_config", inbuf
)
1393 self
.log
.info('Set ssh_config')
1396 @orchestrator._cli
_write
_command
(
1397 prefix
='cephadm clear-ssh-config',
1398 desc
='Clear the ssh_config file')
1399 def _clear_ssh_config(self
):
1401 Clear the ssh_config file provided from stdin
1403 self
.set_store("ssh_config", None)
1404 self
.ssh_config_tmp
= None
1405 self
.log
.info('Cleared ssh_config')
1408 @orchestrator._cli
_read
_command
(
1409 prefix
='cephadm get-ssh-config',
1410 desc
='Returns the ssh config as used by cephadm'
1412 def _get_ssh_config(self
):
1413 if self
.ssh_config_file
:
1414 self
.validate_ssh_config_fname(self
.ssh_config_file
)
1415 with
open(self
.ssh_config_file
) as f
:
1416 return HandleCommandResult(stdout
=f
.read())
1417 ssh_config
= self
.get_store("ssh_config")
1419 return HandleCommandResult(stdout
=ssh_config
)
1420 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
1423 @orchestrator._cli
_write
_command
(
1424 'cephadm generate-key',
1425 desc
='Generate a cluster SSH key (if not present)')
1426 def _generate_key(self
):
1427 if not self
.ssh_pub
or not self
.ssh_key
:
1428 self
.log
.info('Generating ssh key...')
1429 tmp_dir
= TemporaryDirectory()
1430 path
= tmp_dir
.name
+ '/key'
1433 '/usr/bin/ssh-keygen',
1434 '-C', 'ceph-%s' % self
._cluster
_fsid
,
1438 with
open(path
, 'r') as f
:
1440 with
open(path
+ '.pub', 'r') as f
:
1444 os
.unlink(path
+ '.pub')
1446 self
.set_store('ssh_identity_key', secret
)
1447 self
.set_store('ssh_identity_pub', pub
)
1448 self
._reconfig
_ssh
()
1451 @orchestrator._cli
_write
_command
(
1452 'cephadm clear-key',
1453 desc
='Clear cluster SSH key')
1454 def _clear_key(self
):
1455 self
.set_store('ssh_identity_key', None)
1456 self
.set_store('ssh_identity_pub', None)
1457 self
._reconfig
_ssh
()
1458 self
.log
.info('Cleared cluster SSH key')
1461 @orchestrator._cli
_read
_command
(
1462 'cephadm get-pub-key',
1463 desc
='Show SSH public key for connecting to cluster hosts')
1464 def _get_pub_key(self
):
1466 return 0, self
.ssh_pub
, ''
1468 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
1470 @orchestrator._cli
_read
_command
(
1472 desc
='Show user for SSHing to cluster hosts')
1473 def _get_user(self
):
1474 return 0, self
.ssh_user
, ''
1476 @orchestrator._cli
_read
_command
(
1477 'cephadm check-host',
1478 'name=host,type=CephString '
1479 'name=addr,type=CephString,req=false',
1480 'Check whether we can access and manage a remote host')
1481 def check_host(self
, host
, addr
=None):
1482 out
, err
, code
= self
._run
_cephadm
(host
, 'client', 'check-host',
1483 ['--expect-hostname', host
],
1485 error_ok
=True, no_fsid
=True)
1487 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
1488 # if we have an outstanding health alert for this host, give the
1489 # serve thread a kick
1490 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1491 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1492 if item
.startswith('host %s ' % host
):
1494 return 0, '%s (%s) ok' % (host
, addr
), err
1496 @orchestrator._cli
_read
_command
(
1497 'cephadm prepare-host',
1498 'name=host,type=CephString '
1499 'name=addr,type=CephString,req=false',
1500 'Prepare a remote host for use with cephadm')
1501 def _prepare_host(self
, host
, addr
=None):
1502 out
, err
, code
= self
._run
_cephadm
(host
, 'client', 'prepare-host',
1503 ['--expect-hostname', host
],
1505 error_ok
=True, no_fsid
=True)
1507 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
1508 # if we have an outstanding health alert for this host, give the
1509 # serve thread a kick
1510 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1511 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1512 if item
.startswith('host %s ' % host
):
1514 return 0, '%s (%s) ok' % (host
, addr
), err
1516 def _get_connection(self
, host
):
1518 Setup a connection for running commands on remote host.
1520 conn_and_r
= self
._cons
.get(host
)
1522 self
.log
.debug('Have connection to %s' % host
)
1524 n
= self
.ssh_user
+ '@' + host
1525 self
.log
.debug("Opening connection to {} with ssh options '{}'".format(
1526 n
, self
._ssh
_options
))
1527 child_logger
=self
.log
.getChild(n
)
1528 child_logger
.setLevel('WARNING')
1529 conn
= remoto
.Connection(
1531 logger
=child_logger
,
1532 ssh_options
=self
._ssh
_options
)
1534 r
= conn
.import_module(remotes
)
1535 self
._cons
[host
] = conn
, r
1539 def _executable_path(self
, conn
, executable
):
1541 Remote validator that accepts a connection object to ensure that a certain
1542 executable is available returning its full path if so.
1544 Otherwise an exception with thorough details will be raised, informing the
1545 user that the executable was not found.
1547 executable_path
= conn
.remote_module
.which(executable
)
1548 if not executable_path
:
1549 raise RuntimeError("Executable '{}' not found on host '{}'".format(
1550 executable
, conn
.hostname
))
1551 self
.log
.debug("Found executable '{}' at path '{}'".format(executable
,
1553 return executable_path
1555 def _run_cephadm(self
, host
, entity
, command
, args
,
1561 # type: (str, Optional[str], str, List[str], Optional[str], Optional[str], bool, bool, Optional[str]) -> Tuple[List[str], List[str], int]
1563 Run cephadm on the remote host with the given command + args
1565 if not addr
and host
in self
.inventory
:
1566 addr
= self
.inventory
[host
].get('addr', host
)
1569 conn
, connr
= self
._get
_connection
(addr
)
1571 assert image
or entity
1573 daemon_type
= entity
.split('.', 1)[0] # type: ignore
1574 if daemon_type
in CEPH_TYPES
:
1575 # get container image
1576 ret
, image
, err
= self
.mon_command({
1577 'prefix': 'config get',
1578 'who': utils
.name_to_config_section(entity
),
1579 'key': 'container_image',
1581 image
= image
.strip() # type: ignore
1582 self
.log
.debug('%s container image %s' % (entity
, image
))
1586 final_args
.extend(['--image', image
])
1587 final_args
.append(command
)
1590 final_args
+= ['--fsid', self
._cluster
_fsid
]
1593 if self
.mode
== 'root':
1594 self
.log
.debug('args: %s' % (' '.join(final_args
)))
1596 self
.log
.debug('stdin: %s' % stdin
)
1597 script
= 'injected_argv = ' + json
.dumps(final_args
) + '\n'
1599 script
+= 'injected_stdin = ' + json
.dumps(stdin
) + '\n'
1600 script
+= self
._cephadm
1601 python
= connr
.choose_python()
1604 'unable to find python on %s (tried %s in %s)' % (
1605 host
, remotes
.PYTHONS
, remotes
.PATH
))
1607 out
, err
, code
= remoto
.process
.check(
1610 stdin
=script
.encode('utf-8'))
1611 except RuntimeError as e
:
1612 self
._reset
_con
(host
)
1614 return [], [str(e
)], 1
1616 elif self
.mode
== 'cephadm-package':
1618 out
, err
, code
= remoto
.process
.check(
1620 ['sudo', '/usr/bin/cephadm'] + final_args
,
1622 except RuntimeError as e
:
1623 self
._reset
_con
(host
)
1625 return [], [str(e
)], 1
1628 assert False, 'unsupported mode'
1630 self
.log
.debug('code: %d' % code
)
1632 self
.log
.debug('out: %s' % '\n'.join(out
))
1634 self
.log
.debug('err: %s' % '\n'.join(err
))
1635 if code
and not error_ok
:
1637 'cephadm exited with an error code: %d, stderr:%s' % (
1638 code
, '\n'.join(err
)))
1639 return out
, err
, code
1641 except execnet
.gateway_bootstrap
.HostNotFound
as e
:
1642 # this is a misleading exception as it seems to be thrown for
1643 # any sort of connection failure, even those having nothing to
1644 # do with "host not found" (e.g., ssh key permission denied).
1645 user
= 'root' if self
.mode
== 'root' else 'cephadm'
1646 msg
= f
'Failed to connect to {host} ({addr}). ' \
1647 f
'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \
1648 f
'you may want to run: \n' \
1649 f
'> ssh -F =(ceph cephadm get-ssh-config) -i =(ceph config-key get mgr/cephadm/ssh_identity_key) {user}@{host}'
1650 raise OrchestratorError(msg
) from e
1651 except Exception as ex
:
1652 self
.log
.exception(ex
)
1655 def _get_hosts(self
, label
=None):
1656 # type: (Optional[str]) -> List[str]
1658 for h
, hostspec
in self
.inventory
.items():
1659 if not label
or label
in hostspec
.get('labels', []):
1664 def add_host(self
, spec
):
1665 # type: (HostSpec) -> str
1667 Add a host to be managed by the orchestrator.
1669 :param host: host name
1671 assert_valid_host(spec
.hostname
)
1672 out
, err
, code
= self
._run
_cephadm
(spec
.hostname
, 'client', 'check-host',
1673 ['--expect-hostname', spec
.hostname
],
1675 error_ok
=True, no_fsid
=True)
1677 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1678 spec
.hostname
, spec
.addr
, err
))
1680 self
.inventory
[spec
.hostname
] = spec
.to_json()
1681 self
._save
_inventory
()
1682 self
.cache
.prime_empty_host(spec
.hostname
)
1683 self
.event
.set() # refresh stray health check
1684 self
.log
.info('Added host %s' % spec
.hostname
)
1685 return "Added host '{}'".format(spec
.hostname
)
1688 def remove_host(self
, host
):
1689 # type: (str) -> str
1691 Remove a host from orchestrator management.
1693 :param host: host name
1695 del self
.inventory
[host
]
1696 self
._save
_inventory
()
1697 self
.cache
.rm_host(host
)
1698 self
._reset
_con
(host
)
1699 self
.event
.set() # refresh stray health check
1700 self
.log
.info('Removed host %s' % host
)
1701 return "Removed host '{}'".format(host
)
1704 def update_host_addr(self
, host
, addr
):
1705 if host
not in self
.inventory
:
1706 raise OrchestratorError('host %s not registered' % host
)
1707 self
.inventory
[host
]['addr'] = addr
1708 self
._save
_inventory
()
1709 self
._reset
_con
(host
)
1710 self
.event
.set() # refresh stray health check
1711 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1712 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1715 def get_hosts(self
):
1716 # type: () -> List[orchestrator.HostSpec]
1718 Return a list of hosts managed by the orchestrator.
1721 - skip async: manager reads from cache.
1724 for hostname
, info
in self
.inventory
.items():
1725 r
.append(orchestrator
.HostSpec(
1727 addr
=info
.get('addr', hostname
),
1728 labels
=info
.get('labels', []),
1729 status
=info
.get('status', ''),
1734 def add_host_label(self
, host
, label
):
1735 if host
not in self
.inventory
:
1736 raise OrchestratorError('host %s does not exist' % host
)
1738 if 'labels' not in self
.inventory
[host
]:
1739 self
.inventory
[host
]['labels'] = list()
1740 if label
not in self
.inventory
[host
]['labels']:
1741 self
.inventory
[host
]['labels'].append(label
)
1742 self
._save
_inventory
()
1743 self
.log
.info('Added label %s to host %s' % (label
, host
))
1744 return 'Added label %s to host %s' % (label
, host
)
1747 def remove_host_label(self
, host
, label
):
1748 if host
not in self
.inventory
:
1749 raise OrchestratorError('host %s does not exist' % host
)
1751 if 'labels' not in self
.inventory
[host
]:
1752 self
.inventory
[host
]['labels'] = list()
1753 if label
in self
.inventory
[host
]['labels']:
1754 self
.inventory
[host
]['labels'].remove(label
)
1755 self
._save
_inventory
()
1756 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1757 return 'Removed label %s from host %s' % (label
, host
)
1759 def _refresh_host_daemons(self
, host
):
1761 out
, err
, code
= self
._run
_cephadm
(
1762 host
, 'mon', 'ls', [], no_fsid
=True)
1764 return 'host %s cephadm ls returned %d: %s' % (
1766 except Exception as e
:
1767 return 'host %s scrape failed: %s' % (host
, e
)
1768 ls
= json
.loads(''.join(out
))
1771 if not d
['style'].startswith('cephadm'):
1773 if d
['fsid'] != self
._cluster
_fsid
:
1775 if '.' not in d
['name']:
1777 sd
= orchestrator
.DaemonDescription()
1778 sd
.last_refresh
= datetime
.datetime
.utcnow()
1779 for k
in ['created', 'started', 'last_configured', 'last_deployed']:
1782 setattr(sd
, k
, datetime
.datetime
.strptime(d
[k
], DATEFMT
))
1783 sd
.daemon_type
= d
['name'].split('.')[0]
1784 sd
.daemon_id
= '.'.join(d
['name'].split('.')[1:])
1786 sd
.container_id
= d
.get('container_id')
1789 sd
.container_id
= sd
.container_id
[0:12]
1790 sd
.container_image_name
= d
.get('container_image_name')
1791 sd
.container_image_id
= d
.get('container_image_id')
1792 sd
.version
= d
.get('version')
1794 sd
.status_desc
= d
['state']
1802 sd
.status_desc
= 'unknown'
1805 self
.log
.debug('Refreshed host %s daemons (%d)' % (host
, len(dm
)))
1806 self
.cache
.update_host_daemons(host
, dm
)
1807 self
.cache
.save_host(host
)
1810 def _refresh_host_devices(self
, host
):
1812 out
, err
, code
= self
._run
_cephadm
(
1815 ['--', 'inventory', '--format=json'])
1817 return 'host %s ceph-volume inventory returned %d: %s' % (
1819 except Exception as e
:
1820 return 'host %s ceph-volume inventory failed: %s' % (host
, e
)
1821 devices
= json
.loads(''.join(out
))
1823 out
, err
, code
= self
._run
_cephadm
(
1829 return 'host %s list-networks returned %d: %s' % (
1831 except Exception as e
:
1832 return 'host %s list-networks failed: %s' % (host
, e
)
1833 networks
= json
.loads(''.join(out
))
1834 self
.log
.debug('Refreshed host %s devices (%d) networks (%s)' % (
1835 host
, len(devices
), len(networks
)))
1836 devices
= inventory
.Devices
.from_json(devices
)
1837 self
.cache
.update_host_devices_networks(host
, devices
.devices
, networks
)
1838 self
.cache
.save_host(host
)
1841 def _get_spec_size(self
, spec
):
1842 if spec
.placement
.count
:
1843 return spec
.placement
.count
1844 elif spec
.placement
.host_pattern
:
1845 return len(spec
.placement
.pattern_matches_hosts(self
.inventory
.keys()))
1846 elif spec
.placement
.label
:
1847 return len(self
._get
_hosts
(spec
.placement
.label
))
1848 elif spec
.placement
.hosts
:
1849 return len(spec
.placement
.hosts
)
1854 def describe_service(self
, service_type
=None, service_name
=None,
1857 # ugly sync path, FIXME someday perhaps?
1858 for host
, hi
in self
.inventory
.items():
1859 self
._refresh
_host
_daemons
(host
)
1861 sm
= {} # type: Dict[str, orchestrator.ServiceDescription]
1862 for h
, dm
in self
.cache
.daemons
.items():
1863 for name
, dd
in dm
.items():
1864 if service_type
and service_type
!= dd
.daemon_type
:
1866 n
: str = dd
.service_name()
1867 if service_name
and service_name
!= n
:
1869 if dd
.daemon_type
== 'osd':
1870 continue # ignore OSDs for now
1872 if dd
.service_name() in self
.spec_store
.specs
:
1873 spec
= self
.spec_store
.specs
[dd
.service_name()]
1875 sm
[n
] = orchestrator
.ServiceDescription(
1877 last_refresh
=dd
.last_refresh
,
1878 container_image_id
=dd
.container_image_id
,
1879 container_image_name
=dd
.container_image_name
,
1883 sm
[n
].size
= self
._get
_spec
_size
(spec
)
1884 sm
[n
].created
= self
.spec_store
.spec_created
[dd
.service_name()]
1889 if not sm
[n
].last_refresh
or not dd
.last_refresh
or dd
.last_refresh
< sm
[n
].last_refresh
: # type: ignore
1890 sm
[n
].last_refresh
= dd
.last_refresh
1891 if sm
[n
].container_image_id
!= dd
.container_image_id
:
1892 sm
[n
].container_image_id
= 'mix'
1893 if sm
[n
].container_image_name
!= dd
.container_image_name
:
1894 sm
[n
].container_image_name
= 'mix'
1895 for n
, spec
in self
.spec_store
.specs
.items():
1898 if service_type
is not None and service_type
!= spec
.service_type
:
1900 if service_name
is not None and service_name
!= n
:
1902 sm
[n
] = orchestrator
.ServiceDescription(
1905 size
=self
._get
_spec
_size
(spec
),
1908 return [s
for n
, s
in sm
.items()]
1911 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None,
1912 host
=None, refresh
=False):
1914 # ugly sync path, FIXME someday perhaps?
1916 self
._refresh
_host
_daemons
(host
)
1918 for hostname
, hi
in self
.inventory
.items():
1919 self
._refresh
_host
_daemons
(hostname
)
1921 for h
, dm
in self
.cache
.daemons
.items():
1922 if host
and h
!= host
:
1924 for name
, dd
in dm
.items():
1925 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
1927 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
1929 if service_name
is not None and service_name
!= dd
.service_name():
1934 def service_action(self
, action
, service_name
):
1936 for host
, dm
in self
.cache
.daemons
.items():
1937 for name
, d
in dm
.items():
1938 if d
.matches_service(service_name
):
1939 args
.append((d
.daemon_type
, d
.daemon_id
,
1940 d
.hostname
, action
))
1941 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
1942 return self
._daemon
_actions
(args
)
1944 @async_map_completion
1945 def _daemon_actions(self
, daemon_type
, daemon_id
, host
, action
):
1946 return self
._daemon
_action
(daemon_type
, daemon_id
, host
, action
)
1948 def _daemon_action(self
, daemon_type
, daemon_id
, host
, action
):
1949 if action
== 'redeploy':
1950 # stop, recreate the container+unit, then restart
1951 return self
._create
_daemon
(daemon_type
, daemon_id
, host
)
1952 elif action
== 'reconfig':
1953 return self
._create
_daemon
(daemon_type
, daemon_id
, host
,
1957 'start': ['reset-failed', 'start'],
1959 'restart': ['reset-failed', 'restart'],
1961 name
= '%s.%s' % (daemon_type
, daemon_id
)
1962 for a
in actions
[action
]:
1963 out
, err
, code
= self
._run
_cephadm
(
1965 ['--name', name
, a
],
1967 self
.cache
.invalidate_host_daemons(host
)
1968 return "{} {} from host '{}'".format(action
, name
, host
)
1970 def daemon_action(self
, action
, daemon_type
, daemon_id
):
1972 for host
, dm
in self
.cache
.daemons
.items():
1973 for name
, d
in dm
.items():
1974 if d
.daemon_type
== daemon_type
and d
.daemon_id
== daemon_id
:
1975 args
.append((d
.daemon_type
, d
.daemon_id
,
1976 d
.hostname
, action
))
1978 raise orchestrator
.OrchestratorError(
1979 'Unable to find %s.%s daemon(s)' % (
1980 daemon_type
, daemon_id
))
1981 self
.log
.info('%s daemons %s' % (
1982 action
.capitalize(),
1983 ','.join(['%s.%s' % (a
[0], a
[1]) for a
in args
])))
1984 return self
._daemon
_actions
(args
)
1986 def remove_daemons(self
, names
):
1987 # type: (List[str]) -> orchestrator.Completion
1989 for host
, dm
in self
.cache
.daemons
.items():
1992 args
.append((name
, host
))
1994 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
1995 self
.log
.info('Remove daemons %s' % [a
[0] for a
in args
])
1996 return self
._remove
_daemons
(args
)
1999 def remove_service(self
, service_name
):
2000 self
.log
.info('Remove service %s' % service_name
)
2001 self
.spec_store
.rm(service_name
)
2002 self
._kick
_serve
_loop
()
2003 return ['Removed service %s' % service_name
]
2006 def get_inventory(self
, host_filter
=None, refresh
=False):
2008 Return the storage inventory of hosts matching the given filter.
2010 :param host_filter: host filter
2013 - add filtering by label
2016 # ugly sync path, FIXME someday perhaps?
2018 for host
in host_filter
.hosts
:
2019 self
._refresh
_host
_devices
(host
)
2021 for host
, hi
in self
.inventory
.items():
2022 self
._refresh
_host
_devices
(host
)
2025 for host
, dls
in self
.cache
.devices
.items():
2026 if host_filter
and host
not in host_filter
.hosts
:
2028 result
.append(orchestrator
.InventoryHost(host
,
2029 inventory
.Devices(dls
)))
2033 def zap_device(self
, host
, path
):
2034 self
.log
.info('Zap device %s:%s' % (host
, path
))
2035 out
, err
, code
= self
._run
_cephadm
(
2036 host
, 'osd', 'ceph-volume',
2037 ['--', 'lvm', 'zap', '--destroy', path
],
2039 self
.cache
.invalidate_host_devices(host
)
2041 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
2042 return '\n'.join(out
+ err
)
2044 def blink_device_light(self
, ident_fault
, on
, locs
):
2045 @async_map_completion
2046 def blink(host
, dev
, path
):
2049 'local-disk-%s-led-%s' % (
2051 'on' if on
else 'off'),
2052 '--path', path
or dev
,
2054 out
, err
, code
= self
._run
_cephadm
(
2055 host
, 'osd', 'shell', ['--'] + cmd
,
2059 'Unable to affect %s light for %s:%s. Command: %s' % (
2060 ident_fault
, host
, dev
, ' '.join(cmd
)))
2061 self
.log
.info('Set %s light for %s:%s %s' % (
2062 ident_fault
, host
, dev
, 'on' if on
else 'off'))
2063 return "Set %s light for %s:%s %s" % (
2064 ident_fault
, host
, dev
, 'on' if on
else 'off')
2068 def get_osd_uuid_map(self
, only_up
=False):
2069 # type: (bool) -> Dict[str,str]
2070 osd_map
= self
.get('osd_map')
2072 for o
in osd_map
['osds']:
2073 # only include OSDs that have ever started in this map. this way
2074 # an interrupted osd create can be repeated and succeed the second
2076 if not only_up
or o
['up_from'] > 0:
2077 r
[str(o
['osd'])] = o
['uuid']
2081 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]):
2082 return [self
._apply
(spec
) for spec
in specs
]
2085 def create_osds(self
, drive_group
: DriveGroupSpec
):
2086 self
.log
.debug("Processing DriveGroup {}".format(drive_group
))
2087 # 1) use fn_filter to determine matching_hosts
2088 matching_hosts
= drive_group
.placement
.pattern_matches_hosts([x
for x
in self
.cache
.get_hosts()])
2089 # 2) Map the inventory to the InventoryHost object
2091 def _find_inv_for_host(hostname
: str, inventory_dict
: dict):
2092 # This is stupid and needs to be loaded with the host
2093 for _host
, _inventory
in inventory_dict
.items():
2094 if _host
== hostname
:
2096 raise OrchestratorError("No inventory found for host: {}".format(hostname
))
2099 # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
2100 self
.log
.debug(f
"Checking matching hosts -> {matching_hosts}")
2101 for host
in matching_hosts
:
2102 inventory_for_host
= _find_inv_for_host(host
, self
.cache
.devices
)
2103 self
.log
.debug(f
"Found inventory for host {inventory_for_host}")
2104 drive_selection
= selector
.DriveSelection(drive_group
, inventory_for_host
)
2105 self
.log
.debug(f
"Found drive selection {drive_selection}")
2106 cmd
= translate
.to_ceph_volume(drive_group
, drive_selection
).run()
2107 self
.log
.debug(f
"translated to cmd {cmd}")
2109 self
.log
.debug("No data_devices, skipping DriveGroup: {}".format(drive_group
.service_name()))
2111 self
.log
.info('Applying %s on host %s...' % (
2112 drive_group
.service_name(), host
))
2113 ret_msg
= self
._create
_osd
(host
, cmd
)
2115 return ", ".join(ret
)
2117 def _create_osd(self
, host
, cmd
):
2119 self
._require
_hosts
(host
)
2122 ret
, keyring
, err
= self
.mon_command({
2123 'prefix': 'auth get',
2124 'entity': 'client.bootstrap-osd',
2128 ret
, config
, err
= self
.mon_command({
2129 "prefix": "config generate-minimal-conf",
2137 before_osd_uuid_map
= self
.get_osd_uuid_map(only_up
=True)
2139 split_cmd
= cmd
.split(' ')
2140 _cmd
= ['--config-json', '-', '--']
2141 _cmd
.extend(split_cmd
)
2142 out
, err
, code
= self
._run
_cephadm
(
2143 host
, 'osd', 'ceph-volume',
2147 if code
== 1 and ', it is already prepared' in '\n'.join(err
):
2148 # HACK: when we create against an existing LV, ceph-volume
2149 # returns an error and the above message. To make this
2150 # command idempotent, tolerate this "error" and continue.
2151 self
.log
.debug('the device was already prepared; continuing')
2155 'cephadm exited with an error code: %d, stderr:%s' % (
2156 code
, '\n'.join(err
)))
2159 out
, err
, code
= self
._run
_cephadm
(
2160 host
, 'osd', 'ceph-volume',
2166 osds_elems
= json
.loads('\n'.join(out
))
2167 fsid
= self
._cluster
_fsid
2168 osd_uuid_map
= self
.get_osd_uuid_map()
2170 for osd_id
, osds
in osds_elems
.items():
2172 if osd
['tags']['ceph.cluster_fsid'] != fsid
:
2173 self
.log
.debug('mismatched fsid, skipping %s' % osd
)
2175 if osd_id
in before_osd_uuid_map
:
2176 # this osd existed before we ran prepare
2178 if osd_id
not in osd_uuid_map
:
2179 self
.log
.debug('osd id %d does not exist in cluster' % osd_id
)
2181 if osd_uuid_map
[osd_id
] != osd
['tags']['ceph.osd_fsid']:
2182 self
.log
.debug('mismatched osd uuid (cluster has %s, osd '
2184 osd_uuid_map
[osd_id
],
2185 osd
['tags']['ceph.osd_fsid']))
2188 created
.append(osd_id
)
2189 self
._create
_daemon
(
2190 'osd', osd_id
, host
,
2191 osd_uuid_map
=osd_uuid_map
)
2194 self
.cache
.invalidate_host_devices(host
)
2195 return "Created osd(s) %s on host '%s'" % (','.join(created
), host
)
2197 return "Created no osd(s) on host %s; already created?" % host
2199 def _calc_daemon_deps(self
, daemon_type
, daemon_id
):
2201 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
2202 'grafana': ['prometheus'],
2203 'alertmanager': ['mgr', 'alertmanager'],
2206 for dep_type
in need
.get(daemon_type
, []):
2207 for dd
in self
.cache
.get_daemons_by_service(dep_type
):
2208 deps
.append(dd
.name())
2211 def _get_config_and_keyring(self
, daemon_type
, daemon_id
,
2214 # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
2217 if daemon_type
== 'mon':
2220 ename
= utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
)
2221 ret
, keyring
, err
= self
.mon_command({
2222 'prefix': 'auth get',
2227 ret
, config
, err
= self
.mon_command({
2228 "prefix": "config generate-minimal-conf",
2231 config
+= extra_config
2238 def _create_daemon(self
, daemon_type
, daemon_id
, host
,
2240 extra_args
=None, extra_config
=None,
2245 name
= '%s.%s' % (daemon_type
, daemon_id
)
2247 start_time
= datetime
.datetime
.utcnow()
2248 deps
= [] # type: List[str]
2249 cephadm_config
= {} # type: Dict[str, Any]
2250 if daemon_type
== 'prometheus':
2251 cephadm_config
, deps
= self
._generate
_prometheus
_config
()
2252 extra_args
.extend(['--config-json', '-'])
2253 elif daemon_type
== 'grafana':
2254 cephadm_config
, deps
= self
._generate
_grafana
_config
()
2255 extra_args
.extend(['--config-json', '-'])
2256 elif daemon_type
== 'nfs':
2257 cephadm_config
, deps
= \
2258 self
._generate
_nfs
_config
(daemon_type
, daemon_id
, host
)
2259 extra_args
.extend(['--config-json', '-'])
2260 elif daemon_type
== 'alertmanager':
2261 cephadm_config
, deps
= self
._generate
_alertmanager
_config
()
2262 extra_args
.extend(['--config-json', '-'])
2264 # Ceph.daemons (mon, mgr, mds, osd, etc)
2265 cephadm_config
= self
._get
_config
_and
_keyring
(
2266 daemon_type
, daemon_id
,
2268 extra_config
=extra_config
)
2269 extra_args
.extend(['--config-json', '-'])
2271 # osd deployments needs an --osd-uuid arg
2272 if daemon_type
== 'osd':
2273 if not osd_uuid_map
:
2274 osd_uuid_map
= self
.get_osd_uuid_map()
2275 osd_uuid
= osd_uuid_map
.get(daemon_id
, None)
2277 raise OrchestratorError('osd.%d not in osdmap' % daemon_id
)
2278 extra_args
.extend(['--osd-fsid', osd_uuid
])
2281 extra_args
.append('--reconfig')
2282 if self
.allow_ptrace
:
2283 extra_args
.append('--allow-ptrace')
2285 self
.log
.info('%s daemon %s on %s' % (
2286 'Reconfiguring' if reconfig
else 'Deploying',
2289 out
, err
, code
= self
._run
_cephadm
(
2290 host
, name
, 'deploy',
2294 stdin
=json
.dumps(cephadm_config
))
2295 if not code
and host
in self
.cache
.daemons
:
2296 # prime cached service state with what we (should have)
2298 sd
= orchestrator
.DaemonDescription()
2299 sd
.daemon_type
= daemon_type
2300 sd
.daemon_id
= daemon_id
2303 sd
.status_desc
= 'starting'
2304 self
.cache
.add_daemon(host
, sd
)
2305 self
.cache
.invalidate_host_daemons(host
)
2306 self
.cache
.update_daemon_config_deps(host
, name
, deps
, start_time
)
2307 self
.cache
.save_host(host
)
2308 return "{} {} on host '{}'".format(
2309 'Reconfigured' if reconfig
else 'Deployed', name
, host
)
2311 @async_map_completion
2312 def _remove_daemons(self
, name
, host
):
2313 return self
._remove
_daemon
(name
, host
)
2315 def _remove_daemon(self
, name
, host
):
2319 (daemon_type
, daemon_id
) = name
.split('.', 1)
2320 if daemon_type
== 'mon':
2321 self
._check
_safe
_to
_destroy
_mon
(daemon_id
)
2323 # remove mon from quorum before we destroy the daemon
2324 self
.log
.info('Removing monitor %s from monmap...' % name
)
2325 ret
, out
, err
= self
.mon_command({
2330 raise OrchestratorError('failed to remove mon %s from monmap' % (
2333 args
= ['--name', name
, '--force']
2334 self
.log
.info('Removing daemon %s from %s' % (name
, host
))
2335 out
, err
, code
= self
._run
_cephadm
(
2336 host
, name
, 'rm-daemon', args
)
2338 # remove item from cache
2339 self
.cache
.rm_daemon(host
, name
)
2340 self
.cache
.invalidate_host_daemons(host
)
2341 return "Removed {} from host '{}'".format(name
, host
)
2343 def _apply_service(self
, spec
):
2345 Schedule a service. Deploy new daemons or remove old ones, depending
2346 on the target label and count specified in the placement.
2348 daemon_type
= spec
.service_type
2349 service_name
= spec
.service_name()
2351 self
.log
.debug('Skipping unmanaged service %s spec' % service_name
)
2353 self
.log
.debug('Applying service %s spec' % service_name
)
2355 'mon': self
._create
_mon
,
2356 'mgr': self
._create
_mgr
,
2357 'osd': self
.create_osds
,
2358 'mds': self
._create
_mds
,
2359 'rgw': self
._create
_rgw
,
2360 'rbd-mirror': self
._create
_rbd
_mirror
,
2361 'nfs': self
._create
_nfs
,
2362 'grafana': self
._create
_grafana
,
2363 'alertmanager': self
._create
_alertmanager
,
2364 'prometheus': self
._create
_prometheus
,
2365 'node-exporter': self
._create
_node
_exporter
,
2366 'crash': self
._create
_crash
,
2369 'mds': self
._config
_mds
,
2370 'rgw': self
._config
_rgw
,
2371 'nfs': self
._config
_nfs
,
2373 create_func
= create_fns
.get(daemon_type
, None)
2375 self
.log
.debug('unrecognized service type %s' % daemon_type
)
2377 config_func
= config_fns
.get(daemon_type
, None)
2379 daemons
= self
.cache
.get_daemons_by_service(service_name
)
2381 public_network
= None
2382 if daemon_type
== 'mon':
2383 ret
, out
, err
= self
.mon_command({
2384 'prefix': 'config get',
2386 'key': 'public_network',
2389 public_network
= out
.strip()
2390 self
.log
.debug('mon public_network is %s' % public_network
)
2392 def matches_network(host
):
2393 # type: (str) -> bool
2394 if not public_network
:
2396 # make sure we have 1 or more IPs for that network on that
2398 return len(self
.cache
.networks
[host
].get(public_network
, [])) > 0
2400 hosts
= HostAssignment(
2402 get_hosts_func
=self
._get
_hosts
,
2403 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2404 filter_new_host
=matches_network
if daemon_type
== 'mon' else None,
2409 if daemon_type
== 'osd':
2410 return False if create_func(spec
) else True # type: ignore
2413 if daemon_type
in ['mon', 'mgr'] and len(hosts
) < 1:
2414 self
.log
.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts
)
2419 hosts_with_daemons
= {d
.hostname
for d
in daemons
}
2420 self
.log
.debug('hosts with daemons: %s' % hosts_with_daemons
)
2421 for host
, network
, name
in hosts
:
2422 if host
not in hosts_with_daemons
:
2423 if not did_config
and config_func
:
2426 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2427 spec
.service_id
, name
)
2428 self
.log
.debug('Placing %s.%s on host %s' % (
2429 daemon_type
, daemon_id
, host
))
2430 if daemon_type
== 'mon':
2431 create_func(daemon_id
, host
, network
) # type: ignore
2432 elif daemon_type
== 'nfs':
2433 create_func(daemon_id
, host
, spec
) # type: ignore
2435 create_func(daemon_id
, host
) # type: ignore
2437 # add to daemon list so next name(s) will also be unique
2438 sd
= orchestrator
.DaemonDescription(
2440 daemon_type
=daemon_type
,
2441 daemon_id
=daemon_id
,
2447 target_hosts
= [h
.hostname
for h
in hosts
]
2449 if d
.hostname
not in target_hosts
:
2450 # NOTE: we are passing the 'force' flag here, which means
2451 # we can delete a mon instances data.
2452 self
._remove
_daemon
(d
.name(), d
.hostname
)
2457 def _apply_all_services(self
):
2459 specs
= [] # type: List[ServiceSpec]
2460 for sn
, spec
in self
.spec_store
.specs
.items():
2464 if self
._apply
_service
(spec
):
2466 except Exception as e
:
2467 self
.log
.warning('Failed to apply %s spec %s: %s' % (
2468 spec
.service_name(), spec
, e
))
2471 def _check_daemons(self
):
2472 # get monmap mtime so we can refresh configs when mons change
2473 monmap
= self
.get('mon_map')
2474 last_monmap
: Optional
[datetime
.datetime
] = datetime
.datetime
.strptime(
2475 monmap
['modified'], CEPH_DATEFMT
)
2476 if last_monmap
and last_monmap
> datetime
.datetime
.utcnow():
2477 last_monmap
= None # just in case clocks are skewed
2479 daemons
= self
.cache
.get_daemons()
2480 grafanas
= [] # type: List[orchestrator.DaemonDescription]
2483 spec
= self
.spec_store
.specs
.get(dd
.service_name(), None)
2484 if not spec
and dd
.daemon_type
not in ['mon', 'mgr', 'osd']:
2485 # (mon and mgr specs should always exist; osds aren't matched
2486 # to a service spec)
2487 self
.log
.info('Removing orphan daemon %s...' % dd
.name())
2488 self
._remove
_daemon
(dd
.name(), dd
.hostname
)
2490 # ignore unmanaged services
2491 if not spec
or spec
.unmanaged
:
2495 if dd
.daemon_type
== 'grafana':
2496 # put running instances at the front of the list
2497 grafanas
.insert(0, dd
)
2498 deps
= self
._calc
_daemon
_deps
(dd
.daemon_type
, dd
.daemon_id
)
2499 last_deps
, last_config
= self
.cache
.get_daemon_last_config_deps(
2500 dd
.hostname
, dd
.name())
2501 if last_deps
is None:
2505 self
.log
.info('Reconfiguring %s (unknown last config time)...'% (
2508 elif last_deps
!= deps
:
2509 self
.log
.debug('%s deps %s -> %s' % (dd
.name(), last_deps
,
2511 self
.log
.info('Reconfiguring %s (dependencies changed)...' % (
2514 elif last_monmap
and \
2515 last_monmap
> last_config
and \
2516 dd
.daemon_type
in CEPH_TYPES
:
2517 self
.log
.info('Reconfiguring %s (monmap changed)...' % dd
.name())
2520 self
._create
_daemon
(dd
.daemon_type
, dd
.daemon_id
,
2521 dd
.hostname
, reconfig
=True)
2523 # make sure the dashboard [does not] references grafana
2525 current_url
= self
.get_module_option_ex('dashboard',
2528 host
= grafanas
[0].hostname
2529 url
= 'https://%s:3000' % (self
.inventory
[host
].get('addr',
2531 if current_url
!= url
:
2532 self
.log
.info('Setting dashboard grafana config to %s' % url
)
2533 self
.set_module_option_ex('dashboard', 'GRAFANA_API_URL',
2535 # FIXME: is it a signed cert??
2536 except Exception as e
:
2537 self
.log
.debug('got exception fetching dashboard grafana state: %s',
2540 def _add_daemon(self
, daemon_type
, spec
,
2541 create_func
, config_func
=None):
2543 Add (and place) a daemon. Require explicit host placement. Do not
2544 schedule, and do not apply the related scheduling limitations.
2546 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
2547 if not spec
.placement
.hosts
:
2548 raise OrchestratorError('must specify host(s) to deploy on')
2549 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
2550 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2551 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
2552 spec
.placement
.hosts
, count
,
2553 create_func
, config_func
)
2555 def _create_daemons(self
, daemon_type
, spec
, daemons
,
2557 create_func
, config_func
=None):
2558 if count
> len(hosts
):
2559 raise OrchestratorError('too few hosts: want %d, have %s' % (
2565 args
= [] # type: List[tuple]
2566 for host
, network
, name
in hosts
:
2567 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2568 spec
.service_id
, name
)
2569 self
.log
.debug('Placing %s.%s on host %s' % (
2570 daemon_type
, daemon_id
, host
))
2571 if daemon_type
== 'mon':
2572 args
.append((daemon_id
, host
, network
)) # type: ignore
2573 elif daemon_type
== 'nfs':
2574 args
.append((daemon_id
, host
, spec
)) # type: ignore
2576 args
.append((daemon_id
, host
)) # type: ignore
2578 # add to daemon list so next name(s) will also be unique
2579 sd
= orchestrator
.DaemonDescription(
2581 daemon_type
=daemon_type
,
2582 daemon_id
=daemon_id
,
2586 @async_map_completion
2587 def create_func_map(*args
):
2588 return create_func(*args
)
2590 return create_func_map(args
)
2593 def apply_mon(self
, spec
):
2594 return self
._apply
(spec
)
2596 def _create_mon(self
, name
, host
, network
):
2598 Create a new monitor on the given host.
2601 ret
, keyring
, err
= self
.mon_command({
2602 'prefix': 'auth get',
2606 extra_config
= '[mon.%s]\n' % name
2608 # infer whether this is a CIDR network, addrvec, or plain IP
2610 extra_config
+= 'public network = %s\n' % network
2611 elif network
.startswith('[v') and network
.endswith(']'):
2612 extra_config
+= 'public addrv = %s\n' % network
2613 elif ':' not in network
:
2614 extra_config
+= 'public addr = %s\n' % network
2616 raise OrchestratorError('Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
2618 # try to get the public_network from the config
2619 ret
, network
, err
= self
.mon_command({
2620 'prefix': 'config get',
2622 'key': 'public_network',
2624 network
= network
.strip() # type: ignore
2626 raise RuntimeError('Unable to fetch cluster_network config option')
2628 raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
2629 if '/' not in network
:
2630 raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network
)
2631 extra_config
+= 'public network = %s\n' % network
2633 return self
._create
_daemon
('mon', name
, host
,
2635 extra_config
=extra_config
)
2637 def add_mon(self
, spec
):
2638 # type: (ServiceSpec) -> orchestrator.Completion
2639 return self
._add
_daemon
('mon', spec
, self
._create
_mon
)
2641 def _create_mgr(self
, mgr_id
, host
):
2643 Create a new manager instance on a host.
2646 ret
, keyring
, err
= self
.mon_command({
2647 'prefix': 'auth get-or-create',
2648 'entity': 'mgr.%s' % mgr_id
,
2649 'caps': ['mon', 'profile mgr',
2654 return self
._create
_daemon
('mgr', mgr_id
, host
, keyring
=keyring
)
2656 def add_mgr(self
, spec
):
2657 # type: (ServiceSpec) -> orchestrator.Completion
2658 return self
._add
_daemon
('mgr', spec
, self
._create
_mgr
)
2660 def _apply(self
, spec
: ServiceSpec
) -> str:
2661 if spec
.placement
.is_empty():
2662 # fill in default placement
2664 'mon': PlacementSpec(count
=5),
2665 'mgr': PlacementSpec(count
=2),
2666 'mds': PlacementSpec(count
=2),
2667 'rgw': PlacementSpec(count
=2),
2668 'rbd-mirror': PlacementSpec(count
=2),
2669 'nfs': PlacementSpec(count
=1),
2670 'grafana': PlacementSpec(count
=1),
2671 'alertmanager': PlacementSpec(count
=1),
2672 'prometheus': PlacementSpec(count
=1),
2673 'node-exporter': PlacementSpec(host_pattern
='*'),
2674 'crash': PlacementSpec(host_pattern
='*'),
2676 spec
.placement
= defaults
[spec
.service_type
]
2677 elif spec
.service_type
in ['mon', 'mgr'] and \
2678 spec
.placement
.count
is not None and \
2679 spec
.placement
.count
< 1:
2680 raise OrchestratorError('cannot scale %s service below 1' % (
2685 get_hosts_func
=self
._get
_hosts
,
2686 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2689 self
.log
.info('Saving service %s spec with placement %s' % (
2690 spec
.service_name(), spec
.placement
.pretty_str()))
2691 self
.spec_store
.save(spec
)
2692 self
._kick
_serve
_loop
()
2693 return "Scheduled %s update..." % spec
.service_type
2696 def apply(self
, specs
: List
[ServiceSpec
]):
2697 return [self
._apply
(spec
) for spec
in specs
]
2700 def apply_mgr(self
, spec
):
2701 return self
._apply
(spec
)
2703 def add_mds(self
, spec
: ServiceSpec
):
2704 return self
._add
_daemon
('mds', spec
, self
._create
_mds
, self
._config
_mds
)
2707 def apply_mds(self
, spec
: ServiceSpec
):
2708 return self
._apply
(spec
)
2710 def _config_mds(self
, spec
):
2711 # ensure mds_join_fs is set for these daemons
2712 assert spec
.service_id
2713 ret
, out
, err
= self
.mon_command({
2714 'prefix': 'config set',
2715 'who': 'mds.' + spec
.service_id
,
2716 'name': 'mds_join_fs',
2717 'value': spec
.service_id
,
2720 def _create_mds(self
, mds_id
, host
):
2722 ret
, keyring
, err
= self
.mon_command({
2723 'prefix': 'auth get-or-create',
2724 'entity': 'mds.' + mds_id
,
2725 'caps': ['mon', 'profile mds',
2729 return self
._create
_daemon
('mds', mds_id
, host
, keyring
=keyring
)
2731 def add_rgw(self
, spec
):
2732 return self
._add
_daemon
('rgw', spec
, self
._create
_rgw
, self
._config
_rgw
)
2734 def _config_rgw(self
, spec
):
2735 # ensure rgw_realm and rgw_zone is set for these daemons
2736 ret
, out
, err
= self
.mon_command({
2737 'prefix': 'config set',
2738 'who': 'client.rgw.' + spec
.service_id
,
2740 'value': spec
.rgw_zone
,
2742 ret
, out
, err
= self
.mon_command({
2743 'prefix': 'config set',
2744 'who': 'client.rgw.' + spec
.rgw_realm
,
2745 'name': 'rgw_realm',
2746 'value': spec
.rgw_realm
,
2749 v
= 'beast ssl_port=%d' % spec
.get_port()
2751 v
= 'beast port=%d' % spec
.get_port()
2752 ret
, out
, err
= self
.mon_command({
2753 'prefix': 'config set',
2754 'who': 'client.rgw.' + spec
.service_id
,
2755 'name': 'rgw_frontends',
2759 def _create_rgw(self
, rgw_id
, host
):
2760 ret
, keyring
, err
= self
.mon_command({
2761 'prefix': 'auth get-or-create',
2762 'entity': 'client.rgw.' + rgw_id
,
2763 'caps': ['mon', 'allow rw',
2765 'osd', 'allow rwx'],
2767 return self
._create
_daemon
('rgw', rgw_id
, host
, keyring
=keyring
)
2770 def apply_rgw(self
, spec
):
2771 return self
._apply
(spec
)
2773 def add_rbd_mirror(self
, spec
):
2774 return self
._add
_daemon
('rbd-mirror', spec
, self
._create
_rbd
_mirror
)
2776 def _create_rbd_mirror(self
, daemon_id
, host
):
2777 ret
, keyring
, err
= self
.mon_command({
2778 'prefix': 'auth get-or-create',
2779 'entity': 'client.rbd-mirror.' + daemon_id
,
2780 'caps': ['mon', 'profile rbd-mirror',
2781 'osd', 'profile rbd'],
2783 return self
._create
_daemon
('rbd-mirror', daemon_id
, host
,
2787 def apply_rbd_mirror(self
, spec
):
2788 return self
._apply
(spec
)
2790 def _generate_nfs_config(self
, daemon_type
, daemon_id
, host
):
2791 # type: (str, str, str) -> Tuple[Dict[str, Any], List[str]]
2792 deps
= [] # type: List[str]
2794 # find the matching NFSServiceSpec
2795 # TODO: find the spec and pass via _create_daemon instead ??
2796 service_name
= self
.get_service_name(daemon_type
, daemon_id
, host
)
2797 specs
= self
.spec_store
.find(service_name
)
2799 raise OrchestratorError('Cannot find service spec %s' % (service_name
))
2800 elif len(specs
) > 1:
2801 raise OrchestratorError('Found multiple service specs for %s' % (service_name
))
2803 # cast to keep mypy happy
2804 spec
= cast(NFSServiceSpec
, specs
[0])
2806 nfs
= NFSGanesha(self
, daemon_id
, spec
)
2808 # create the keyring
2809 entity
= nfs
.get_keyring_entity()
2810 keyring
= nfs
.get_or_create_keyring(entity
=entity
)
2812 # update the caps after get-or-create, the keyring might already exist!
2813 nfs
.update_keyring_caps(entity
=entity
)
2815 # create the rados config object
2816 nfs
.create_rados_config_obj()
2818 # generate the cephadm config
2819 cephadm_config
= nfs
.get_cephadm_config()
2820 cephadm_config
.update(
2821 self
._get
_config
_and
_keyring
(
2822 daemon_type
, daemon_id
,
2825 return cephadm_config
, deps
2827 def add_nfs(self
, spec
):
2828 return self
._add
_daemon
('nfs', spec
, self
._create
_nfs
, self
._config
_nfs
)
2830 def _config_nfs(self
, spec
):
2831 logger
.info('Saving service %s spec with placement %s' % (
2832 spec
.service_name(), spec
.placement
.pretty_str()))
2833 self
.spec_store
.save(spec
)
2835 def _create_nfs(self
, daemon_id
, host
, spec
):
2836 return self
._create
_daemon
('nfs', daemon_id
, host
)
2839 def apply_nfs(self
, spec
):
2840 return self
._apply
(spec
)
2842 def _generate_prometheus_config(self
):
2843 # type: () -> Tuple[Dict[str, Any], List[str]]
2844 deps
= [] # type: List[str]
2847 mgr_scrape_list
= []
2848 mgr_map
= self
.get('mgr_map')
2850 t
= mgr_map
.get('services', {}).get('prometheus', None)
2853 mgr_scrape_list
.append(t
)
2856 port
= t
.split(':')[1]
2857 # scan all mgrs to generate deps and to get standbys too.
2858 # assume that they are all on the same port as the active mgr.
2859 for dd
in self
.cache
.get_daemons_by_service('mgr'):
2860 # we consider the mgr a dep even if the prometheus module is
2861 # disabled in order to be consistent with _calc_daemon_deps().
2862 deps
.append(dd
.name())
2865 if dd
.daemon_id
== self
.get_mgr_id():
2867 hi
= self
.inventory
.get(dd
.hostname
, {})
2868 addr
= hi
.get('addr', dd
.hostname
)
2869 mgr_scrape_list
.append(addr
.split(':')[0] + ':' + port
)
2871 # scrape node exporters
2873 for dd
in self
.cache
.get_daemons_by_service('node-exporter'):
2874 deps
.append(dd
.name())
2875 hi
= self
.inventory
.get(dd
.hostname
, {})
2876 addr
= hi
.get('addr', dd
.hostname
)
2877 if not node_configs
:
2882 node_configs
+= """ - targets: {}
2885 """.format([addr
.split(':')[0] + ':9100'],
2888 # scrape alert managers
2889 alertmgr_configs
= ""
2890 alertmgr_targets
= []
2891 for dd
in self
.cache
.get_daemons_by_service('alertmanager'):
2892 deps
.append(dd
.name())
2893 hi
= self
.inventory
.get(dd
.hostname
, {})
2894 addr
= hi
.get('addr', dd
.hostname
)
2895 alertmgr_targets
.append("'{}:9093'".format(addr
.split(':')[0]))
2896 if alertmgr_targets
:
2897 alertmgr_configs
= """alerting:
2900 path_prefix: /alertmanager
2903 """.format(", ".join(alertmgr_targets
))
2905 # generate the prometheus configuration
2908 'prometheus.yml': """# generated by cephadm
2911 evaluation_interval: 10s
2913 - /etc/prometheus/alerting/*
2918 - targets: {mgr_scrape_list}
2920 instance: 'ceph_cluster'
2923 mgr_scrape_list
=str(mgr_scrape_list
),
2924 node_configs
=str(node_configs
),
2925 alertmgr_configs
=str(alertmgr_configs
)
2930 # include alerts, if present in the container
2931 if os
.path
.exists(self
.prometheus_alerts_path
):
2932 with
open(self
.prometheus_alerts_path
, "r") as f
:
2934 r
['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts
2936 return r
, sorted(deps
)
2938 def _generate_grafana_config(self
):
2939 # type: () -> Tuple[Dict[str, Any], List[str]]
2940 deps
= [] # type: List[str]
2941 def generate_grafana_ds_config(hosts
: List
[str]) -> str:
2942 config
= '''# generated by cephadm
2944 {delete_data_sources}
2949 delete_ds_template
= '''
2951 orgId: 1\n'''.lstrip('\n')
2957 url: 'http://{host}:9095'
2959 isDefault: {is_default}
2960 editable: false\n'''.lstrip('\n')
2962 delete_data_sources
= ''
2964 for i
, host
in enumerate(hosts
):
2965 name
= "Dashboard %d" % (i
+ 1)
2966 data_sources
+= ds_template
.format(
2969 is_default
=str(i
== 0).lower()
2971 delete_data_sources
+= delete_ds_template
.format(
2974 return config
.format(
2975 delete_data_sources
=delete_data_sources
,
2976 data_sources
=data_sources
,
2979 prom_services
= [] # type: List[str]
2980 for dd
in self
.cache
.get_daemons_by_service('prometheus'):
2981 prom_services
.append(dd
.hostname
)
2982 deps
.append(dd
.name())
2984 cert
= self
.get_store('grafana_crt')
2985 pkey
= self
.get_store('grafana_key')
2988 verify_tls(cert
, pkey
)
2989 except ServerConfigException
as e
:
2990 logger
.warning('Provided grafana TLS certificates invalid: %s', str(e
))
2991 cert
, pkey
= None, None
2992 if not (cert
and pkey
):
2993 cert
, pkey
= create_self_signed_cert('Ceph', 'cephadm')
2994 self
.set_store('grafana_crt', cert
)
2995 self
.set_store('grafana_key', pkey
)
2999 "grafana.ini": """# generated by cephadm
3001 default_theme = light
3004 org_name = 'Main Org.'
3007 domain = 'bootstrap.storage.lab'
3009 cert_file = /etc/grafana/certs/cert_file
3010 cert_key = /etc/grafana/certs/cert_key
3014 admin_password = admin
3015 allow_embedding = true
3017 'provisioning/datasources/ceph-dashboard.yml': generate_grafana_ds_config(prom_services
),
3018 'certs/cert_file': '# generated by cephadm\n%s' % cert
,
3019 'certs/cert_key': '# generated by cephadm\n%s' % pkey
,
3022 return config_file
, sorted(deps
)
3024 def _get_dashboard_url(self
):
3026 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
3028 def _generate_alertmanager_config(self
):
3029 # type: () -> Tuple[Dict[str, Any], List[str]]
3030 deps
= [] # type: List[str]
3034 mgr_map
= self
.get('mgr_map')
3036 proto
= None # http: or https:
3037 url
= mgr_map
.get('services', {}).get('dashboard', None)
3039 dashboard_urls
.append(url
)
3040 proto
= url
.split('/')[0]
3041 port
= url
.split('/')[2].split(':')[1]
3042 # scan all mgrs to generate deps and to get standbys too.
3043 # assume that they are all on the same port as the active mgr.
3044 for dd
in self
.cache
.get_daemons_by_service('mgr'):
3045 # we consider mgr a dep even if the dashboard is disabled
3046 # in order to be consistent with _calc_daemon_deps().
3047 deps
.append(dd
.name())
3050 if dd
.daemon_id
== self
.get_mgr_id():
3052 hi
= self
.inventory
.get(dd
.hostname
, {})
3053 addr
= hi
.get('addr', dd
.hostname
)
3054 dashboard_urls
.append('%s//%s:%s/' % (proto
, addr
.split(':')[0],
3057 yml
= """# generated by cephadm
3058 # See https://prometheus.io/docs/alerting/configuration/ for documentation.
3064 group_by: ['alertname']
3068 receiver: 'ceph-dashboard'
3070 - name: 'ceph-dashboard'
3075 [" - url: '{}api/prometheus_receiver'".format(u
)
3076 for u
in dashboard_urls
]
3080 for dd
in self
.cache
.get_daemons_by_service('alertmanager'):
3081 deps
.append(dd
.name())
3082 hi
= self
.inventory
.get(dd
.hostname
, {})
3083 addr
= hi
.get('addr', dd
.hostname
)
3084 peers
.append(addr
.split(':')[0] + ':' + port
)
3087 "alertmanager.yml": yml
3092 def add_prometheus(self
, spec
):
3093 return self
._add
_daemon
('prometheus', spec
, self
._create
_prometheus
)
3095 def _create_prometheus(self
, daemon_id
, host
):
3096 return self
._create
_daemon
('prometheus', daemon_id
, host
)
3099 def apply_prometheus(self
, spec
):
3100 return self
._apply
(spec
)
3102 def add_node_exporter(self
, spec
):
3103 # type: (ServiceSpec) -> AsyncCompletion
3104 return self
._add
_daemon
('node-exporter', spec
,
3105 self
._create
_node
_exporter
)
3108 def apply_node_exporter(self
, spec
):
3109 return self
._apply
(spec
)
3111 def _create_node_exporter(self
, daemon_id
, host
):
3112 return self
._create
_daemon
('node-exporter', daemon_id
, host
)
3114 def add_crash(self
, spec
):
3115 # type: (ServiceSpec) -> AsyncCompletion
3116 return self
._add
_daemon
('crash', spec
,
3120 def apply_crash(self
, spec
):
3121 return self
._apply
(spec
)
3123 def _create_crash(self
, daemon_id
, host
):
3124 ret
, keyring
, err
= self
.mon_command({
3125 'prefix': 'auth get-or-create',
3126 'entity': 'client.crash.' + host
,
3127 'caps': ['mon', 'profile crash',
3128 'mgr', 'profile crash'],
3130 return self
._create
_daemon
('crash', daemon_id
, host
, keyring
=keyring
)
3132 def add_grafana(self
, spec
):
3133 # type: (ServiceSpec) -> AsyncCompletion
3134 return self
._add
_daemon
('grafana', spec
, self
._create
_grafana
)
3137 def apply_grafana(self
, spec
: ServiceSpec
):
3138 return self
._apply
(spec
)
3140 def _create_grafana(self
, daemon_id
, host
):
3141 # type: (str, str) -> str
3142 return self
._create
_daemon
('grafana', daemon_id
, host
)
3144 def add_alertmanager(self
, spec
):
3145 # type: (ServiceSpec) -> AsyncCompletion
3146 return self
._add
_daemon
('alertmanager', spec
, self
._create
_alertmanager
)
3149 def apply_alertmanager(self
, spec
: ServiceSpec
):
3150 return self
._apply
(spec
)
3152 def _create_alertmanager(self
, daemon_id
, host
):
3153 return self
._create
_daemon
('alertmanager', daemon_id
, host
)
3156 def _get_container_image_id(self
, image_name
):
3157 # pick a random host...
3159 for host_name
, hi
in self
.inventory
.items():
3163 raise OrchestratorError('no hosts defined')
3164 out
, err
, code
= self
._run
_cephadm
(
3165 host
, None, 'pull', [],
3170 raise OrchestratorError('Failed to pull %s on %s: %s' % (
3171 image_name
, host
, '\n'.join(out
)))
3172 j
= json
.loads('\n'.join(out
))
3173 image_id
= j
.get('image_id')
3174 ceph_version
= j
.get('ceph_version')
3175 self
.log
.debug('image %s -> id %s version %s' %
3176 (image_name
, image_id
, ceph_version
))
3177 return image_id
, ceph_version
3180 def upgrade_check(self
, image
, version
):
3182 target_name
= self
.container_image_base
+ ':v' + version
3186 raise OrchestratorError('must specify either image or version')
3188 target_id
, target_version
= self
._get
_container
_image
_id
(target_name
)
3189 self
.log
.debug('Target image %s id %s version %s' % (
3190 target_name
, target_id
, target_version
))
3192 'target_name': target_name
,
3193 'target_id': target_id
,
3194 'target_version': target_version
,
3195 'needs_update': dict(),
3196 'up_to_date': list(),
3198 for host
, dm
in self
.cache
.daemons
.items():
3199 for name
, dd
in dm
.items():
3200 if target_id
== dd
.container_image_id
:
3201 r
['up_to_date'].append(dd
.name())
3203 r
['needs_update'][dd
.name()] = {
3204 'current_name': dd
.container_image_name
,
3205 'current_id': dd
.container_image_id
,
3206 'current_version': dd
.version
,
3208 return json
.dumps(r
, indent
=4, sort_keys
=True)
3211 def upgrade_status(self
):
3212 r
= orchestrator
.UpgradeStatusSpec()
3213 if self
.upgrade_state
:
3214 r
.target_image
= self
.upgrade_state
.get('target_name')
3215 r
.in_progress
= True
3216 if self
.upgrade_state
.get('error'):
3217 r
.message
= 'Error: ' + self
.upgrade_state
.get('error')
3218 elif self
.upgrade_state
.get('paused'):
3219 r
.message
= 'Upgrade paused'
3223 def upgrade_start(self
, image
, version
):
3224 if self
.mode
!= 'root':
3225 raise OrchestratorError('upgrade is not supported in %s mode' % (
3229 (major
, minor
, patch
) = version
.split('.')
3230 assert int(minor
) >= 0
3231 assert int(patch
) >= 0
3233 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
3234 if int(major
) < 15 or (int(major
) == 15 and int(minor
) < 2):
3235 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
3236 target_name
= self
.container_image_base
+ ':v' + version
3240 raise OrchestratorError('must specify either image or version')
3241 if self
.upgrade_state
:
3242 if self
.upgrade_state
.get('target_name') != target_name
:
3243 raise OrchestratorError(
3244 'Upgrade to %s (not %s) already in progress' %
3245 (self
.upgrade_state
.get('target_name'), target_name
))
3246 if self
.upgrade_state
.get('paused'):
3247 del self
.upgrade_state
['paused']
3248 self
._save
_upgrade
_state
()
3249 return 'Resumed upgrade to %s' % self
.upgrade_state
.get('target_name')
3250 return 'Upgrade to %s in progress' % self
.upgrade_state
.get('target_name')
3251 self
.upgrade_state
= {
3252 'target_name': target_name
,
3253 'progress_id': str(uuid
.uuid4()),
3255 self
._update
_upgrade
_progress
(0.0)
3256 self
._save
_upgrade
_state
()
3257 self
._clear
_upgrade
_health
_checks
()
3259 return 'Initiating upgrade to %s' % (target_name
)
3262 def upgrade_pause(self
):
3263 if not self
.upgrade_state
:
3264 raise OrchestratorError('No upgrade in progress')
3265 if self
.upgrade_state
.get('paused'):
3266 return 'Upgrade to %s already paused' % self
.upgrade_state
.get('target_name')
3267 self
.upgrade_state
['paused'] = True
3268 self
._save
_upgrade
_state
()
3269 return 'Paused upgrade to %s' % self
.upgrade_state
.get('target_name')
3272 def upgrade_resume(self
):
3273 if not self
.upgrade_state
:
3274 raise OrchestratorError('No upgrade in progress')
3275 if not self
.upgrade_state
.get('paused'):
3276 return 'Upgrade to %s not paused' % self
.upgrade_state
.get('target_name')
3277 del self
.upgrade_state
['paused']
3278 self
._save
_upgrade
_state
()
3280 return 'Resumed upgrade to %s' % self
.upgrade_state
.get('target_name')
3283 def upgrade_stop(self
):
3284 if not self
.upgrade_state
:
3285 return 'No upgrade in progress'
3286 target_name
= self
.upgrade_state
.get('target_name')
3287 if 'progress_id' in self
.upgrade_state
:
3288 self
.remote('progress', 'complete',
3289 self
.upgrade_state
['progress_id'])
3290 self
.upgrade_state
= None
3291 self
._save
_upgrade
_state
()
3292 self
._clear
_upgrade
_health
_checks
()
3294 return 'Stopped upgrade to %s' % target_name
3297 def remove_osds(self
, osd_ids
: List
[str],
3298 replace
: bool = False,
3299 force
: bool = False):
3301 Takes a list of OSDs and schedules them for removal.
3302 The function that takes care of the actual removal is
3306 daemons
= self
.cache
.get_daemons_by_service('osd')
3307 found
: Set
[OSDRemoval
] = set()
3308 for daemon
in daemons
:
3309 if daemon
.daemon_id
not in osd_ids
:
3311 found
.add(OSDRemoval(daemon
.daemon_id
, replace
, force
,
3312 daemon
.hostname
, daemon
.name(),
3313 datetime
.datetime
.utcnow(), -1))
3315 not_found
= {osd_id
for osd_id
in osd_ids
if osd_id
not in [x
.osd_id
for x
in found
]}
3317 raise OrchestratorError('Unable to find OSD: %s' % not_found
)
3319 self
.rm_util
.queue_osds_for_removal(found
)
3321 # trigger the serve loop to initiate the removal
3322 self
._kick
_serve
_loop
()
3323 return "Scheduled OSD(s) for removal"
3326 def remove_osds_status(self
):
3328 The CLI call to retrieve an osd removal report
3330 return self
.rm_util
.report
3333 def list_specs(self
, service_name
=None):
3335 Loads all entries from the service_spec mon_store root.
3337 return self
.spec_store
.find(service_name
=service_name
)
3340 class BaseScheduler(object):
3342 Base Scheduler Interface
3344 * requires a placement_spec
3346 `place(host_pool)` needs to return a List[HostPlacementSpec, ..]
3349 def __init__(self
, placement_spec
):
3350 # type: (PlacementSpec) -> None
3351 self
.placement_spec
= placement_spec
3353 def place(self
, host_pool
, count
=None):
3354 # type: (List, Optional[int]) -> List[HostPlacementSpec]
3355 raise NotImplementedError
3358 class SimpleScheduler(BaseScheduler
):
3360 The most simple way to pick/schedule a set of hosts.
3361 1) Shuffle the provided host_pool
3362 2) Select from list up to :count
3364 def __init__(self
, placement_spec
):
3365 super(SimpleScheduler
, self
).__init
__(placement_spec
)
3367 def place(self
, host_pool
, count
=None):
3368 # type: (List, Optional[int]) -> List[HostPlacementSpec]
3371 host_pool
= [x
for x
in host_pool
]
3372 # shuffle for pseudo random selection
3373 random
.shuffle(host_pool
)
3374 return host_pool
[:count
]
3377 class HostAssignment(object):
3379 A class to detect if hosts are being passed imperative or declarative
3380 If the spec is populated via the `hosts/hosts` field it will not load
3381 any hosts into the list.
3382 If the spec isn't populated, i.e. when only num or label is present (declarative)
3383 it will use the provided `get_host_func` to load it from the inventory.
3385 Schedulers can be assigned to pick hosts from the pool.
3389 spec
, # type: ServiceSpec
3390 get_hosts_func
, # type: Callable[[Optional[str]],List[str]]
3391 get_daemons_func
, # type: Callable[[str],List[orchestrator.DaemonDescription]]
3393 filter_new_host
=None, # type: Optional[Callable[[str],bool]]
3394 scheduler
=None, # type: Optional[BaseScheduler]
3396 assert spec
and get_hosts_func
and get_daemons_func
3397 self
.spec
= spec
# type: ServiceSpec
3398 self
.scheduler
= scheduler
if scheduler
else SimpleScheduler(self
.spec
.placement
)
3399 self
.get_hosts_func
= get_hosts_func
3400 self
.get_daemons_func
= get_daemons_func
3401 self
.filter_new_host
= filter_new_host
3402 self
.service_name
= spec
.service_name()
3406 self
.spec
.validate()
3408 if self
.spec
.placement
.hosts
:
3409 explicit_hostnames
= {h
.hostname
for h
in self
.spec
.placement
.hosts
}
3410 unknown_hosts
= explicit_hostnames
.difference(set(self
.get_hosts_func(None)))
3412 raise OrchestratorValidationError(
3413 f
'Cannot place {self.spec.one_line_str()} on {unknown_hosts}: Unknown hosts')
3415 if self
.spec
.placement
.host_pattern
:
3416 pattern_hostnames
= self
.spec
.placement
.pattern_matches_hosts(self
.get_hosts_func(None))
3417 if not pattern_hostnames
:
3418 raise OrchestratorValidationError(
3419 f
'Cannot place {self.spec.one_line_str()}: No matching hosts')
3421 if self
.spec
.placement
.label
:
3422 label_hostnames
= self
.get_hosts_func(self
.spec
.placement
.label
)
3423 if not label_hostnames
:
3424 raise OrchestratorValidationError(
3425 f
'Cannot place {self.spec.one_line_str()}: No matching '
3426 f
'hosts for label {self.spec.placement.label}')
3429 # type: () -> List[HostPlacementSpec]
3431 Load hosts into the spec.placement.hosts container.
3437 if self
.spec
.placement
.count
== 0:
3440 # respect any explicit host list
3441 if self
.spec
.placement
.hosts
and not self
.spec
.placement
.count
:
3442 logger
.debug('Provided hosts: %s' % self
.spec
.placement
.hosts
)
3443 return self
.spec
.placement
.hosts
3445 # respect host_pattern
3446 if self
.spec
.placement
.host_pattern
:
3448 HostPlacementSpec(x
, '', '')
3449 for x
in self
.spec
.placement
.pattern_matches_hosts(self
.get_hosts_func(None))
3451 logger
.debug('All hosts: {}'.format(candidates
))
3455 if self
.spec
.placement
.hosts
and \
3456 self
.spec
.placement
.count
and \
3457 len(self
.spec
.placement
.hosts
) >= self
.spec
.placement
.count
:
3458 hosts
= self
.spec
.placement
.hosts
3459 logger
.debug('place %d over provided host list: %s' % (
3461 count
= self
.spec
.placement
.count
3462 elif self
.spec
.placement
.label
:
3464 HostPlacementSpec(x
, '', '')
3465 for x
in self
.get_hosts_func(self
.spec
.placement
.label
)
3467 if not self
.spec
.placement
.count
:
3468 logger
.debug('Labeled hosts: {}'.format(hosts
))
3470 count
= self
.spec
.placement
.count
3471 logger
.debug('place %d over label %s: %s' % (
3472 count
, self
.spec
.placement
.label
, hosts
))
3475 HostPlacementSpec(x
, '', '')
3476 for x
in self
.get_hosts_func(None)
3478 if self
.spec
.placement
.count
:
3479 count
= self
.spec
.placement
.count
3481 # this should be a totally empty spec given all of the
3482 # alternative paths above.
3483 assert self
.spec
.placement
.count
is None
3484 assert not self
.spec
.placement
.hosts
3485 assert not self
.spec
.placement
.label
3487 logger
.debug('place %d over all hosts: %s' % (count
, hosts
))
3489 # we need to select a subset of the candidates
3491 # if a partial host list is provided, always start with that
3492 if len(self
.spec
.placement
.hosts
) < count
:
3493 chosen
= self
.spec
.placement
.hosts
3497 # prefer hosts that already have services
3498 daemons
= self
.get_daemons_func(self
.service_name
)
3499 hosts_with_daemons
= {d
.hostname
for d
in daemons
}
3500 # calc existing daemons (that aren't already in chosen)
3501 chosen_hosts
= [hs
.hostname
for hs
in chosen
]
3502 existing
= [hs
for hs
in hosts
3503 if hs
.hostname
in hosts_with_daemons
and \
3504 hs
.hostname
not in chosen_hosts
]
3505 if len(chosen
+ existing
) >= count
:
3506 chosen
= chosen
+ self
.scheduler
.place(
3508 count
- len(chosen
))
3509 logger
.debug('Hosts with existing daemons: {}'.format(chosen
))
3512 need
= count
- len(existing
+ chosen
)
3513 others
= [hs
for hs
in hosts
3514 if hs
.hostname
not in hosts_with_daemons
]
3515 if self
.filter_new_host
:
3517 others
= [h
for h
in others
if self
.filter_new_host(h
.hostname
)]
3518 logger
.debug('filtered %s down to %s' % (old
, hosts
))
3519 chosen
= chosen
+ self
.scheduler
.place(others
, need
)
3520 logger
.debug('Combine hosts with existing daemons %s + new hosts %s' % (
3522 return existing
+ chosen