6 from threading
import Event
7 from functools
import wraps
9 from mgr_util
import create_self_signed_cert
, verify_tls
, ServerConfigException
13 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, Type
, \
14 Any
, NamedTuple
, Iterator
, Set
, Sequence
15 from typing
import TYPE_CHECKING
, cast
17 TYPE_CHECKING
= False # just for type checking
25 import multiprocessing
.pool
31 from ceph
.deployment
import inventory
, translate
32 from ceph
.deployment
.drive_group
import DriveGroupSpec
33 from ceph
.deployment
.drive_selection
.selector
import DriveSelection
34 from ceph
.deployment
.service_spec
import \
35 HostPlacementSpec
, NFSServiceSpec
, ServiceSpec
, PlacementSpec
, assert_valid_host
37 from mgr_module
import MgrModule
, HandleCommandResult
39 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
44 from .nfs
import NFSGanesha
45 from .osd
import RemoveUtil
, OSDRemoval
51 import execnet
.gateway_bootstrap
52 except ImportError as e
:
54 remoto_import_error
= str(e
)
57 from typing
import List
61 logger
= logging
.getLogger(__name__
)
63 DEFAULT_SSH_CONFIG
= """
66 StrictHostKeyChecking no
67 UserKnownHostsFile /dev/null
71 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%f'
72 CEPH_DATEFMT
= '%Y-%m-%dT%H:%M:%S.%fZ'
74 HOST_CACHE_PREFIX
= "host."
75 SPEC_STORE_PREFIX
= "spec."
77 # ceph daemon types that use the ceph container image.
78 # NOTE: listed in upgrade order!
79 CEPH_UPGRADE_ORDER
= ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
80 CEPH_TYPES
= set(CEPH_UPGRADE_ORDER
)
85 from tempfile
import TemporaryDirectory
# py3
87 # define a minimal (but sufficient) equivalent for <= py 3.2
88 class TemporaryDirectory(object): # type: ignore
90 self
.name
= tempfile
.mkdtemp()
94 self
.name
= tempfile
.mkdtemp()
98 shutil
.rmtree(self
.name
)
100 def __exit__(self
, exc_type
, exc_value
, traceback
):
105 def __init__(self
, mgr
):
106 # type: (CephadmOrchestrator) -> None
108 self
.specs
= {} # type: Dict[str, ServiceSpec]
109 self
.spec_created
= {} # type: Dict[str, datetime.datetime]
113 for k
, v
in six
.iteritems(self
.mgr
.get_store_prefix(SPEC_STORE_PREFIX
)):
114 service_name
= k
[len(SPEC_STORE_PREFIX
):]
117 spec
= ServiceSpec
.from_json(v
['spec'])
118 created
= datetime
.datetime
.strptime(v
['created'], DATEFMT
)
119 self
.specs
[service_name
] = spec
120 self
.spec_created
[service_name
] = created
121 self
.mgr
.log
.debug('SpecStore: loaded spec for %s' % (
123 except Exception as e
:
124 self
.mgr
.log
.warning('unable to load spec for %s: %s' % (
128 def save(self
, spec
):
129 # type: (ServiceSpec) -> None
130 self
.specs
[spec
.service_name()] = spec
131 self
.spec_created
[spec
.service_name()] = datetime
.datetime
.utcnow()
133 SPEC_STORE_PREFIX
+ spec
.service_name(),
135 'spec': spec
.to_json(),
136 'created': self
.spec_created
[spec
.service_name()].strftime(DATEFMT
),
140 def rm(self
, service_name
):
141 # type: (str) -> bool
142 found
= service_name
in self
.specs
144 del self
.specs
[service_name
]
145 del self
.spec_created
[service_name
]
146 self
.mgr
.set_store(SPEC_STORE_PREFIX
+ service_name
, None)
149 def find(self
, service_name
: Optional
[str] = None) -> List
[ServiceSpec
]:
151 for sn
, spec
in self
.specs
.items():
152 if not service_name
or \
153 sn
== service_name
or \
154 sn
.startswith(service_name
+ '.'):
156 self
.mgr
.log
.debug('SpecStore: find spec for %s returned: %s' % (
157 service_name
, specs
))
161 def __init__(self
, mgr
):
162 # type: (CephadmOrchestrator) -> None
163 self
.mgr
: CephadmOrchestrator
= mgr
164 self
.daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
165 self
.last_daemon_update
= {} # type: Dict[str, datetime.datetime]
166 self
.devices
= {} # type: Dict[str, List[inventory.Device]]
167 self
.networks
= {} # type: Dict[str, Dict[str, List[str]]]
168 self
.last_device_update
= {} # type: Dict[str, datetime.datetime]
169 self
.daemon_refresh_queue
= [] # type: List[str]
170 self
.device_refresh_queue
= [] # type: List[str]
171 self
.daemon_config_deps
= {} # type: Dict[str, Dict[str, Dict[str,Any]]]
172 self
.last_host_check
= {} # type: Dict[str, datetime.datetime]
176 for k
, v
in six
.iteritems(self
.mgr
.get_store_prefix(HOST_CACHE_PREFIX
)):
177 host
= k
[len(HOST_CACHE_PREFIX
):]
178 if host
not in self
.mgr
.inventory
:
179 self
.mgr
.log
.warning('removing stray HostCache host record %s' % (
181 self
.mgr
.set_store(k
, None)
184 if 'last_device_update' in j
:
185 self
.last_device_update
[host
] = datetime
.datetime
.strptime(
186 j
['last_device_update'], DATEFMT
)
188 self
.device_refresh_queue
.append(host
)
189 # for services, we ignore the persisted last_*_update
190 # and always trigger a new scrape on mgr restart.
191 self
.daemon_refresh_queue
.append(host
)
192 self
.daemons
[host
] = {}
193 self
.devices
[host
] = []
194 self
.networks
[host
] = {}
195 self
.daemon_config_deps
[host
] = {}
196 for name
, d
in j
.get('daemons', {}).items():
197 self
.daemons
[host
][name
] = \
198 orchestrator
.DaemonDescription
.from_json(d
)
199 for d
in j
.get('devices', []):
200 self
.devices
[host
].append(inventory
.Device
.from_json(d
))
201 self
.networks
[host
] = j
.get('networks', {})
202 for name
, d
in j
.get('daemon_config_deps', {}).items():
203 self
.daemon_config_deps
[host
][name
] = {
204 'deps': d
.get('deps', []),
205 'last_config': datetime
.datetime
.strptime(
206 d
['last_config'], DATEFMT
),
208 if 'last_host_check' in j
:
209 self
.last_host_check
[host
] = datetime
.datetime
.strptime(
210 j
['last_host_check'], DATEFMT
)
212 'HostCache.load: host %s has %d daemons, '
213 '%d devices, %d networks' % (
214 host
, len(self
.daemons
[host
]), len(self
.devices
[host
]),
215 len(self
.networks
[host
])))
216 except Exception as e
:
217 self
.mgr
.log
.warning('unable to load cached state for %s: %s' % (
221 def update_host_daemons(self
, host
, dm
):
222 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
223 self
.daemons
[host
] = dm
224 self
.last_daemon_update
[host
] = datetime
.datetime
.utcnow()
226 def update_host_devices_networks(self
, host
, dls
, nets
):
227 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
228 self
.devices
[host
] = dls
229 self
.networks
[host
] = nets
230 self
.last_device_update
[host
] = datetime
.datetime
.utcnow()
232 def update_daemon_config_deps(self
, host
, name
, deps
, stamp
):
233 self
.daemon_config_deps
[host
][name
] = {
235 'last_config': stamp
,
238 def update_last_host_check(self
, host
):
239 # type: (str) -> None
240 self
.last_host_check
[host
] = datetime
.datetime
.utcnow()
242 def prime_empty_host(self
, host
):
243 # type: (str) -> None
245 Install an empty entry for a host
247 self
.daemons
[host
] = {}
248 self
.devices
[host
] = []
249 self
.networks
[host
] = {}
250 self
.daemon_config_deps
[host
] = {}
251 self
.daemon_refresh_queue
.append(host
)
252 self
.device_refresh_queue
.append(host
)
254 def invalidate_host_daemons(self
, host
):
255 # type: (str) -> None
256 self
.daemon_refresh_queue
.append(host
)
257 if host
in self
.last_daemon_update
:
258 del self
.last_daemon_update
[host
]
261 def invalidate_host_devices(self
, host
):
262 # type: (str) -> None
263 self
.device_refresh_queue
.append(host
)
264 if host
in self
.last_device_update
:
265 del self
.last_device_update
[host
]
268 def save_host(self
, host
):
269 # type: (str) -> None
273 'daemon_config_deps': {},
275 if host
in self
.last_daemon_update
:
276 j
['last_daemon_update'] = self
.last_daemon_update
[host
].strftime(DATEFMT
) # type: ignore
277 if host
in self
.last_device_update
:
278 j
['last_device_update'] = self
.last_device_update
[host
].strftime(DATEFMT
) # type: ignore
279 for name
, dd
in self
.daemons
[host
].items():
280 j
['daemons'][name
] = dd
.to_json() # type: ignore
281 for d
in self
.devices
[host
]:
282 j
['devices'].append(d
.to_json()) # type: ignore
283 j
['networks'] = self
.networks
[host
]
284 for name
, depi
in self
.daemon_config_deps
[host
].items():
285 j
['daemon_config_deps'][name
] = { # type: ignore
286 'deps': depi
.get('deps', []),
287 'last_config': depi
['last_config'].strftime(DATEFMT
),
289 if host
in self
.last_host_check
:
290 j
['last_host_check']= self
.last_host_check
[host
].strftime(DATEFMT
)
291 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, json
.dumps(j
))
293 def rm_host(self
, host
):
294 # type: (str) -> None
295 if host
in self
.daemons
:
296 del self
.daemons
[host
]
297 if host
in self
.devices
:
298 del self
.devices
[host
]
299 if host
in self
.networks
:
300 del self
.networks
[host
]
301 if host
in self
.last_daemon_update
:
302 del self
.last_daemon_update
[host
]
303 if host
in self
.last_device_update
:
304 del self
.last_device_update
[host
]
305 if host
in self
.daemon_config_deps
:
306 del self
.daemon_config_deps
[host
]
307 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, None)
310 # type: () -> List[str]
312 for host
, di
in self
.daemons
.items():
316 def get_daemons(self
):
317 # type: () -> List[orchestrator.DaemonDescription]
319 for host
, dm
in self
.daemons
.items():
320 for name
, dd
in dm
.items():
324 def get_daemons_with_volatile_status(self
) -> Iterator
[Tuple
[str, Dict
[str, orchestrator
.DaemonDescription
]]]:
325 for host
, dm
in self
.daemons
.items():
326 if host
in self
.mgr
.offline_hosts
:
327 def set_offline(dd
: orchestrator
.DaemonDescription
) -> orchestrator
.DaemonDescription
:
330 ret
.status_desc
= 'host is offline'
332 yield host
, {name
: set_offline(d
) for name
, d
in dm
.items()}
336 def get_daemons_by_service(self
, service_name
):
337 # type: (str) -> List[orchestrator.DaemonDescription]
338 result
= [] # type: List[orchestrator.DaemonDescription]
339 for host
, dm
in self
.daemons
.items():
340 for name
, d
in dm
.items():
341 if name
.startswith(service_name
+ '.'):
345 def get_daemon_names(self
):
346 # type: () -> List[str]
348 for host
, dm
in self
.daemons
.items():
349 for name
, dd
in dm
.items():
353 def get_daemon_last_config_deps(self
, host
, name
):
354 if host
in self
.daemon_config_deps
:
355 if name
in self
.daemon_config_deps
[host
]:
356 return self
.daemon_config_deps
[host
][name
].get('deps', []), \
357 self
.daemon_config_deps
[host
][name
].get('last_config', None)
360 def host_needs_daemon_refresh(self
, host
):
361 # type: (str) -> bool
362 if host
in self
.mgr
.offline_hosts
:
363 logger
.debug(f
'Host "{host}" marked as offline. Skipping daemon refresh')
365 if host
in self
.daemon_refresh_queue
:
366 self
.daemon_refresh_queue
.remove(host
)
368 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
369 seconds
=self
.mgr
.daemon_cache_timeout
)
370 if host
not in self
.last_daemon_update
or self
.last_daemon_update
[host
] < cutoff
:
374 def host_needs_device_refresh(self
, host
):
375 # type: (str) -> bool
376 if host
in self
.mgr
.offline_hosts
:
377 logger
.debug(f
'Host "{host}" marked as offline. Skipping device refresh')
379 if host
in self
.device_refresh_queue
:
380 self
.device_refresh_queue
.remove(host
)
382 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
383 seconds
=self
.mgr
.device_cache_timeout
)
384 if host
not in self
.last_device_update
or self
.last_device_update
[host
] < cutoff
:
388 def host_needs_check(self
, host
):
389 # type: (str) -> bool
390 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
391 seconds
=self
.mgr
.host_check_interval
)
392 return host
not in self
.last_host_check
or self
.last_host_check
[host
] < cutoff
394 def add_daemon(self
, host
, dd
):
395 # type: (str, orchestrator.DaemonDescription) -> None
396 assert host
in self
.daemons
397 self
.daemons
[host
][dd
.name()] = dd
399 def rm_daemon(self
, host
, name
):
400 if host
in self
.daemons
:
401 if name
in self
.daemons
[host
]:
402 del self
.daemons
[host
][name
]
405 class AsyncCompletion(orchestrator
.Completion
):
407 _first_promise
=None, # type: Optional[orchestrator.Completion]
408 value
=orchestrator
._Promise
.NO_RESULT
, # type: Any
409 on_complete
=None, # type: Optional[Callable]
410 name
=None, # type: Optional[str]
411 many
=False, # type: bool
412 update_progress
=False, # type: bool
415 assert CephadmOrchestrator
.instance
is not None
417 self
.update_progress
= update_progress
418 if name
is None and on_complete
is not None:
419 name
= getattr(on_complete
, '__name__', None)
420 super(AsyncCompletion
, self
).__init
__(_first_promise
, value
, on_complete
, name
)
423 def _progress_reference(self
):
424 # type: () -> Optional[orchestrator.ProgressReference]
425 if hasattr(self
._on
_complete
_, 'progress_id'): # type: ignore
426 return self
._on
_complete
_ # type: ignore
430 def _on_complete(self
):
431 # type: () -> Optional[Callable]
432 if self
._on
_complete
_ is None:
435 def callback(result
):
437 if self
.update_progress
:
438 assert self
.progress_reference
439 self
.progress_reference
.progress
= 1.0
440 self
._on
_complete
_ = None
441 self
._finalize
(result
)
442 except Exception as e
:
446 logger
.exception(f
'failed to fail AsyncCompletion: >{repr(self)}<')
447 if 'UNITTEST' in os
.environ
:
450 def error_callback(e
):
454 def do_work(*args
, **kwargs
):
455 assert self
._on
_complete
_ is not None
457 res
= self
._on
_complete
_(*args
, **kwargs
)
458 if self
.update_progress
and self
.many
:
459 assert self
.progress_reference
460 self
.progress_reference
.progress
+= 1.0 / len(value
)
462 except Exception as e
:
466 assert CephadmOrchestrator
.instance
469 logger
.info('calling map_async without values')
472 CephadmOrchestrator
.instance
._worker
_pool
.map_async(do_work
, value
,
474 error_callback
=error_callback
)
476 CephadmOrchestrator
.instance
._worker
_pool
.map_async(do_work
, value
,
480 CephadmOrchestrator
.instance
._worker
_pool
.apply_async(do_work
, (value
,),
481 callback
=callback
, error_callback
=error_callback
)
483 CephadmOrchestrator
.instance
._worker
_pool
.apply_async(do_work
, (value
,),
485 return self
.ASYNC_RESULT
490 def _on_complete(self
, inner
):
491 # type: (Callable) -> None
492 self
._on
_complete
_ = inner
495 def ssh_completion(cls
=AsyncCompletion
, **c_kwargs
):
496 # type: (Type[orchestrator.Completion], Any) -> Callable
498 See ./HACKING.rst for a how-to
505 many
= c_kwargs
.get('many', False)
507 # Some weired logic to make calling functions with multiple arguments work.
510 if many
and value
and isinstance(value
[0], tuple):
511 return cls(on_complete
=lambda x
: f(*x
), value
=value
, name
=name
, **c_kwargs
)
513 return cls(on_complete
=f
, value
=value
, name
=name
, **c_kwargs
)
518 def call_self(inner_args
):
519 if not isinstance(inner_args
, tuple):
520 inner_args
= (inner_args
, )
521 return f(self
, *inner_args
)
523 return cls(on_complete
=call_self
, value
=value
, name
=name
, **c_kwargs
)
525 return cls(on_complete
=lambda x
: f(*x
), value
=args
, name
=name
, **c_kwargs
)
531 def async_completion(f
):
532 # type: (Callable) -> Callable[..., AsyncCompletion]
534 See ./HACKING.rst for a how-to
536 :param f: wrapped function
538 return ssh_completion()(f
)
541 def async_map_completion(f
):
542 # type: (Callable) -> Callable[..., AsyncCompletion]
544 See ./HACKING.rst for a how-to
546 :param f: wrapped function
551 ... return lambda x: map(f, x)
554 return ssh_completion(many
=True)(f
)
557 def trivial_completion(f
):
558 # type: (Callable) -> Callable[..., orchestrator.Completion]
560 def wrapper(*args
, **kwargs
):
561 return AsyncCompletion(value
=f(*args
, **kwargs
), name
=f
.__name
__)
565 @six.add_metaclass(CLICommandMeta
)
566 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
):
568 _STORE_HOST_PREFIX
= "host"
571 NATIVE_OPTIONS
= [] # type: List[Any]
574 'name': 'ssh_config_file',
577 'desc': 'customized SSH config file to connect to managed hosts',
580 'name': 'device_cache_timeout',
583 'desc': 'seconds to cache device inventory',
586 'name': 'daemon_cache_timeout',
589 'desc': 'seconds to cache service (daemon) inventory',
592 'name': 'host_check_interval',
595 'desc': 'how frequently to perform a host check',
600 'enum_allowed': ['root', 'cephadm-package'],
602 'desc': 'mode for remote execution of cephadm',
605 'name': 'container_image_base',
606 'default': 'docker.io/ceph/ceph',
607 'desc': 'Container image name, without the tag',
611 'name': 'warn_on_stray_hosts',
614 'desc': 'raise a health warning if daemons are detected on a host '
615 'that is not managed by cephadm',
618 'name': 'warn_on_stray_daemons',
621 'desc': 'raise a health warning if daemons are detected '
622 'that are not managed by cephadm',
625 'name': 'warn_on_failed_host_check',
628 'desc': 'raise a health warning if the host check fails',
631 'name': 'log_to_cluster',
634 'desc': 'log to the "cephadm" cluster log channel"',
637 'name': 'allow_ptrace',
640 'desc': 'allow SYS_PTRACE capability on ceph containers',
641 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
642 'process with gdb or strace. Enabling this options '
643 'can allow debugging daemons that encounter problems '
647 'name': 'prometheus_alerts_path',
649 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
650 'desc': 'location of alerts to include in prometheus deployments',
654 def __init__(self
, *args
, **kwargs
):
655 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
656 self
._cluster
_fsid
= self
.get('mon_map')['fsid']
662 if self
.get_store('pause'):
667 # for mypy which does not run the code
669 self
.ssh_config_file
= None # type: Optional[str]
670 self
.device_cache_timeout
= 0
671 self
.daemon_cache_timeout
= 0
672 self
.host_check_interval
= 0
674 self
.container_image_base
= ''
675 self
.warn_on_stray_hosts
= True
676 self
.warn_on_stray_daemons
= True
677 self
.warn_on_failed_host_check
= True
678 self
.allow_ptrace
= False
679 self
.prometheus_alerts_path
= ''
681 self
._cons
= {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
685 path
= self
.get_ceph_option('cephadm_path')
687 with
open(path
, 'r') as f
:
688 self
._cephadm
= f
.read()
689 except (IOError, TypeError) as e
:
690 raise RuntimeError("unable to read cephadm at '%s': %s" % (
693 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
697 CephadmOrchestrator
.instance
= self
699 t
= self
.get_store('upgrade_state')
701 self
.upgrade_state
= json
.loads(t
)
703 self
.upgrade_state
= None
705 self
.health_checks
= {}
707 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
710 i
= self
.get_store('inventory')
712 self
.inventory
: Dict
[str, dict] = json
.loads(i
)
714 self
.inventory
= dict()
715 self
.log
.debug('Loaded inventory %s' % self
.inventory
)
717 self
.cache
= HostCache(self
)
719 self
.rm_util
= RemoveUtil(self
)
721 self
.spec_store
= SpecStore(self
)
722 self
.spec_store
.load()
724 # ensure the host lists are in sync
725 for h
in self
.inventory
.keys():
726 if h
not in self
.cache
.daemons
:
727 self
.cache
.prime_empty_host(h
)
728 for h
in self
.cache
.get_hosts():
729 if h
not in self
.inventory
:
730 self
.cache
.rm_host(h
)
733 self
.offline_hosts
: Set
[str] = set()
736 self
.log
.debug('shutdown')
737 self
._worker
_pool
.close()
738 self
._worker
_pool
.join()
742 def _kick_serve_loop(self
):
743 self
.log
.debug('_kick_serve_loop')
746 def _check_safe_to_destroy_mon(self
, mon_id
):
747 # type: (str) -> None
748 ret
, out
, err
= self
.mon_command({
749 'prefix': 'quorum_status',
752 raise OrchestratorError('failed to check mon quorum status')
755 except Exception as e
:
756 raise OrchestratorError('failed to parse quorum status')
758 mons
= [m
['name'] for m
in j
['monmap']['mons']]
759 if mon_id
not in mons
:
760 self
.log
.info('Safe to remove mon.%s: not in monmap (%s)' % (
763 new_mons
= [m
for m
in mons
if m
!= mon_id
]
764 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
765 if len(new_quorum
) > len(new_mons
) / 2:
766 self
.log
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id
, new_quorum
, new_mons
))
768 raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
770 def _wait_for_ok_to_stop(self
, s
):
771 # only wait a little bit; the service might go away for something
774 if s
.daemon_type
not in ['mon', 'osd', 'mds']:
775 self
.log
.info('Upgrade: It is presumed safe to stop %s.%s' %
776 (s
.daemon_type
, s
.daemon_id
))
778 ret
, out
, err
= self
.mon_command({
779 'prefix': '%s ok-to-stop' % s
.daemon_type
,
780 'ids': [s
.daemon_id
],
782 if not self
.upgrade_state
or self
.upgrade_state
.get('paused'):
785 self
.log
.info('Upgrade: It is NOT safe to stop %s.%s' %
786 (s
.daemon_type
, s
.daemon_id
))
790 self
.log
.info('Upgrade: It is safe to stop %s.%s' %
791 (s
.daemon_type
, s
.daemon_id
))
795 def _clear_upgrade_health_checks(self
):
796 for k
in ['UPGRADE_NO_STANDBY_MGR',
797 'UPGRADE_FAILED_PULL']:
798 if k
in self
.health_checks
:
799 del self
.health_checks
[k
]
800 self
.set_health_checks(self
.health_checks
)
802 def _fail_upgrade(self
, alert_id
, alert
):
803 self
.log
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
805 self
.upgrade_state
['error'] = alert_id
+ ': ' + alert
['summary']
806 self
.upgrade_state
['paused'] = True
807 self
._save
_upgrade
_state
()
808 self
.health_checks
[alert_id
] = alert
809 self
.set_health_checks(self
.health_checks
)
811 def _update_upgrade_progress(self
, progress
):
812 if 'progress_id' not in self
.upgrade_state
:
813 self
.upgrade_state
['progress_id'] = str(uuid
.uuid4())
814 self
._save
_upgrade
_state
()
815 self
.remote('progress', 'update', self
.upgrade_state
['progress_id'],
816 ev_msg
='Upgrade to %s' % self
.upgrade_state
['target_name'],
817 ev_progress
=progress
)
819 def _do_upgrade(self
):
821 if not self
.upgrade_state
:
822 self
.log
.debug('_do_upgrade no state, exiting')
825 target_name
= self
.upgrade_state
.get('target_name')
826 target_id
= self
.upgrade_state
.get('target_id', None)
828 # need to learn the container hash
829 self
.log
.info('Upgrade: First pull of %s' % target_name
)
831 target_id
, target_version
= self
._get
_container
_image
_id
(target_name
)
832 except OrchestratorError
as e
:
833 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
834 'severity': 'warning',
835 'summary': 'Upgrade: failed to pull target image',
840 self
.upgrade_state
['target_id'] = target_id
841 self
.upgrade_state
['target_version'] = target_version
842 self
._save
_upgrade
_state
()
843 target_version
= self
.upgrade_state
.get('target_version')
844 self
.log
.info('Upgrade: Target is %s with id %s' % (target_name
,
847 # get all distinct container_image settings
849 ret
, out
, err
= self
.mon_command({
850 'prefix': 'config dump',
853 config
= json
.loads(out
)
855 if opt
['name'] == 'container_image':
856 image_settings
[opt
['section']] = opt
['value']
858 daemons
= self
.cache
.get_daemons()
860 for daemon_type
in CEPH_UPGRADE_ORDER
:
861 self
.log
.info('Upgrade: Checking %s daemons...' % daemon_type
)
862 need_upgrade_self
= False
864 if d
.daemon_type
!= daemon_type
:
866 if d
.container_image_id
== target_id
:
867 self
.log
.debug('daemon %s.%s version correct' % (
868 daemon_type
, d
.daemon_id
))
871 self
.log
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
872 daemon_type
, d
.daemon_id
,
873 d
.container_image_name
, d
.container_image_id
, d
.version
))
875 if daemon_type
== 'mgr' and \
876 d
.daemon_id
== self
.get_mgr_id():
877 self
.log
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
879 need_upgrade_self
= True
882 # make sure host has latest container image
883 out
, err
, code
= self
._run
_cephadm
(
884 d
.hostname
, None, 'inspect-image', [],
885 image
=target_name
, no_fsid
=True, error_ok
=True)
886 if code
or json
.loads(''.join(out
)).get('image_id') != target_id
:
887 self
.log
.info('Upgrade: Pulling %s on %s' % (target_name
,
889 out
, err
, code
= self
._run
_cephadm
(
890 d
.hostname
, None, 'pull', [],
891 image
=target_name
, no_fsid
=True, error_ok
=True)
893 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
894 'severity': 'warning',
895 'summary': 'Upgrade: failed to pull target image',
898 'failed to pull %s on host %s' % (target_name
,
902 r
= json
.loads(''.join(out
))
903 if r
.get('image_id') != target_id
:
904 self
.log
.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name
, d
.hostname
, r
['image_id'], target_id
))
905 self
.upgrade_state
['target_id'] = r
['image_id']
906 self
._save
_upgrade
_state
()
909 self
._update
_upgrade
_progress
(done
/ len(daemons
))
911 if not d
.container_image_id
:
912 if d
.container_image_name
== target_name
:
913 self
.log
.debug('daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
915 if not self
._wait
_for
_ok
_to
_stop
(d
):
917 self
.log
.info('Upgrade: Redeploying %s.%s' %
918 (d
.daemon_type
, d
.daemon_id
))
919 ret
, out
, err
= self
.mon_command({
920 'prefix': 'config set',
921 'name': 'container_image',
922 'value': target_name
,
923 'who': utils
.name_to_config_section(daemon_type
+ '.' + d
.daemon_id
),
933 if need_upgrade_self
:
934 mgr_map
= self
.get('mgr_map')
935 num
= len(mgr_map
.get('standbys'))
937 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
938 'severity': 'warning',
939 'summary': 'Upgrade: Need standby mgr daemon',
942 'The upgrade process needs to upgrade the mgr, '
943 'but it needs at least one standby to proceed.',
948 self
.log
.info('Upgrade: there are %d other already-upgraded '
949 'standby mgrs, failing over' % num
)
951 self
._update
_upgrade
_progress
(done
/ len(daemons
))
954 ret
, out
, err
= self
.mon_command({
955 'prefix': 'mgr fail',
956 'who': self
.get_mgr_id(),
959 elif daemon_type
== 'mgr':
960 if 'UPGRADE_NO_STANDBY_MGR' in self
.health_checks
:
961 del self
.health_checks
['UPGRADE_NO_STANDBY_MGR']
962 self
.set_health_checks(self
.health_checks
)
964 # make sure 'ceph versions' agrees
965 ret
, out
, err
= self
.mon_command({
966 'prefix': 'versions',
969 for version
, count
in j
.get(daemon_type
, {}).items():
970 if version
!= target_version
:
972 'Upgrade: %d %s daemon(s) are %s != target %s' %
973 (count
, daemon_type
, version
, target_version
))
976 if image_settings
.get(daemon_type
) != target_name
:
977 self
.log
.info('Upgrade: Setting container_image for all %s...' %
979 ret
, out
, err
= self
.mon_command({
980 'prefix': 'config set',
981 'name': 'container_image',
982 'value': target_name
,
986 for section
in image_settings
.keys():
987 if section
.startswith(utils
.name_to_config_section(daemon_type
) + '.'):
988 to_clean
.append(section
)
990 self
.log
.debug('Upgrade: Cleaning up container_image for %s...' %
992 for section
in to_clean
:
993 ret
, image
, err
= self
.mon_command({
994 'prefix': 'config rm',
995 'name': 'container_image',
999 self
.log
.info('Upgrade: All %s daemons are up to date.' %
1003 self
.log
.info('Upgrade: Finalizing container_image settings')
1004 ret
, out
, err
= self
.mon_command({
1005 'prefix': 'config set',
1006 'name': 'container_image',
1007 'value': target_name
,
1010 for daemon_type
in CEPH_UPGRADE_ORDER
:
1011 ret
, image
, err
= self
.mon_command({
1012 'prefix': 'config rm',
1013 'name': 'container_image',
1014 'who': utils
.name_to_config_section(daemon_type
),
1017 self
.log
.info('Upgrade: Complete!')
1018 if 'progress_id' in self
.upgrade_state
:
1019 self
.remote('progress', 'complete',
1020 self
.upgrade_state
['progress_id'])
1021 self
.upgrade_state
= None
1022 self
._save
_upgrade
_state
()
1025 def _check_host(self
, host
):
1026 if host
not in self
.inventory
:
1028 self
.log
.debug(' checking %s' % host
)
1030 out
, err
, code
= self
._run
_cephadm
(
1031 host
, 'client', 'check-host', [],
1032 error_ok
=True, no_fsid
=True)
1033 self
.cache
.update_last_host_check(host
)
1034 self
.cache
.save_host(host
)
1036 self
.log
.debug(' host %s failed check' % host
)
1037 if self
.warn_on_failed_host_check
:
1038 return 'host %s failed check: %s' % (host
, err
)
1040 self
.log
.debug(' host %s ok' % host
)
1041 except Exception as e
:
1042 self
.log
.debug(' host %s failed check' % host
)
1043 return 'host %s failed check: %s' % (host
, e
)
1045 def _check_for_strays(self
):
1046 self
.log
.debug('_check_for_strays')
1047 for k
in ['CEPHADM_STRAY_HOST',
1048 'CEPHADM_STRAY_DAEMON']:
1049 if k
in self
.health_checks
:
1050 del self
.health_checks
[k
]
1051 if self
.warn_on_stray_hosts
or self
.warn_on_stray_daemons
:
1052 ls
= self
.list_servers()
1053 managed
= self
.cache
.get_daemon_names()
1054 host_detail
= [] # type: List[str]
1055 host_num_daemons
= 0
1056 daemon_detail
= [] # type: List[str]
1058 host
= item
.get('hostname')
1059 daemons
= item
.get('services') # misnomer!
1062 name
= '%s.%s' % (s
.get('type'), s
.get('id'))
1063 if host
not in self
.inventory
:
1064 missing_names
.append(name
)
1065 host_num_daemons
+= 1
1066 if name
not in managed
:
1067 daemon_detail
.append(
1068 'stray daemon %s on host %s not managed by cephadm' % (name
, host
))
1071 'stray host %s has %d stray daemons: %s' % (
1072 host
, len(missing_names
), missing_names
))
1073 if self
.warn_on_stray_hosts
and host_detail
:
1074 self
.health_checks
['CEPHADM_STRAY_HOST'] = {
1075 'severity': 'warning',
1076 'summary': '%d stray host(s) with %s daemon(s) '
1077 'not managed by cephadm' % (
1078 len(host_detail
), host_num_daemons
),
1079 'count': len(host_detail
),
1080 'detail': host_detail
,
1082 if self
.warn_on_stray_daemons
and daemon_detail
:
1083 self
.health_checks
['CEPHADM_STRAY_DAEMON'] = {
1084 'severity': 'warning',
1085 'summary': '%d stray daemons(s) not managed by cephadm' % (
1086 len(daemon_detail
)),
1087 'count': len(daemon_detail
),
1088 'detail': daemon_detail
,
1090 self
.set_health_checks(self
.health_checks
)
1092 def _serve_sleep(self
):
1093 sleep_interval
= 600
1094 self
.log
.debug('Sleeping for %d seconds', sleep_interval
)
1095 ret
= self
.event
.wait(sleep_interval
)
1100 self
.log
.debug("serve starting")
1104 self
.log
.debug('refreshing hosts')
1107 for host
in self
.cache
.get_hosts():
1108 if self
.cache
.host_needs_check(host
):
1109 r
= self
._check
_host
(host
)
1112 if self
.cache
.host_needs_daemon_refresh(host
):
1113 self
.log
.debug('refreshing %s daemons' % host
)
1114 r
= self
._refresh
_host
_daemons
(host
)
1117 if self
.cache
.host_needs_device_refresh(host
):
1118 self
.log
.debug('refreshing %s devices' % host
)
1119 r
= self
._refresh
_host
_devices
(host
)
1123 health_changed
= False
1124 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1125 del self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']
1126 health_changed
= True
1128 self
.health_checks
['CEPHADM_HOST_CHECK_FAILED'] = {
1129 'severity': 'warning',
1130 'summary': '%d hosts fail cephadm check' % len(bad_hosts
),
1131 'count': len(bad_hosts
),
1132 'detail': bad_hosts
,
1134 health_changed
= True
1136 self
.health_checks
['CEPHADM_REFRESH_FAILED'] = {
1137 'severity': 'warning',
1138 'summary': 'failed to probe daemons or devices',
1139 'count': len(failures
),
1142 health_changed
= True
1143 elif 'CEPHADM_REFRESH_FAILED' in self
.health_checks
:
1144 del self
.health_checks
['CEPHADM_REFRESH_FAILED']
1145 health_changed
= True
1147 self
.set_health_checks(self
.health_checks
)
1151 self
._check
_for
_strays
()
1154 self
.health_checks
['CEPHADM_PAUSED'] = {
1155 'severity': 'warning',
1156 'summary': 'cephadm background work is paused',
1158 'detail': ["'ceph orch resume' to resume"],
1160 self
.set_health_checks(self
.health_checks
)
1162 if 'CEPHADM_PAUSED' in self
.health_checks
:
1163 del self
.health_checks
['CEPHADM_PAUSED']
1164 self
.set_health_checks(self
.health_checks
)
1166 self
.rm_util
._remove
_osds
_bg
()
1168 if self
._apply
_all
_services
():
1169 continue # did something, refresh
1171 self
._check
_daemons
()
1173 if self
.upgrade_state
and not self
.upgrade_state
.get('paused'):
1178 self
.log
.debug("serve exit")
1180 def config_notify(self
):
1182 This method is called whenever one of our config options is changed.
1184 for opt
in self
.MODULE_OPTIONS
:
1186 opt
['name'], # type: ignore
1187 self
.get_module_option(opt
['name'])) # type: ignore
1188 self
.log
.debug(' mgr option %s = %s',
1189 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
1190 for opt
in self
.NATIVE_OPTIONS
:
1193 self
.get_ceph_option(opt
))
1194 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
1198 def notify(self
, notify_type
, notify_id
):
1203 self
.log
.info('Paused')
1204 self
.set_store('pause', 'true')
1206 # wake loop so we update the health status
1207 self
._kick
_serve
_loop
()
1211 self
.log
.info('Resumed')
1213 self
.set_store('pause', None)
1214 # unconditionally wake loop so that 'orch resume' can be used to kick
1216 self
._kick
_serve
_loop
()
1218 def get_unique_name(self
, daemon_type
, host
, existing
, prefix
=None,
1220 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
1222 Generate a unique random service name
1224 suffix
= daemon_type
not in [
1225 'mon', 'crash', 'nfs',
1226 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
1229 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
1230 raise orchestrator
.OrchestratorValidationError('name %s already in use', forcename
)
1234 host
= host
.split('.')[0]
1242 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
1244 if len([d
for d
in existing
if d
.daemon_id
== name
]):
1246 raise orchestrator
.OrchestratorValidationError('name %s already in use', name
)
1247 self
.log
.debug('name %s exists, trying again', name
)
1251 def get_service_name(self
, daemon_type
, daemon_id
, host
):
1252 # type: (str, str, str) -> (str)
1254 Returns the generic service name
1256 p
= re
.compile(r
'(.*)\.%s.*' % (host
))
1257 return '%s.%s' % (daemon_type
, p
.sub(r
'\1', daemon_id
))
1259 def _save_inventory(self
):
1260 self
.set_store('inventory', json
.dumps(self
.inventory
))
1262 def _save_upgrade_state(self
):
1263 self
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
))
1265 def _reconfig_ssh(self
):
1266 temp_files
= [] # type: list
1267 ssh_options
= [] # type: List[str]
1270 ssh_config_fname
= self
.ssh_config_file
1271 ssh_config
= self
.get_store("ssh_config")
1272 if ssh_config
is not None or ssh_config_fname
is None:
1274 ssh_config
= DEFAULT_SSH_CONFIG
1275 f
= tempfile
.NamedTemporaryFile(prefix
='cephadm-conf-')
1276 os
.fchmod(f
.fileno(), 0o600)
1277 f
.write(ssh_config
.encode('utf-8'))
1278 f
.flush() # make visible to other processes
1280 ssh_config_fname
= f
.name
1281 if ssh_config_fname
:
1282 self
.validate_ssh_config_fname(ssh_config_fname
)
1283 ssh_options
+= ['-F', ssh_config_fname
]
1286 ssh_key
= self
.get_store("ssh_identity_key")
1287 ssh_pub
= self
.get_store("ssh_identity_pub")
1288 self
.ssh_pub
= ssh_pub
1289 self
.ssh_key
= ssh_key
1290 if ssh_key
and ssh_pub
:
1291 tkey
= tempfile
.NamedTemporaryFile(prefix
='cephadm-identity-')
1292 tkey
.write(ssh_key
.encode('utf-8'))
1293 os
.fchmod(tkey
.fileno(), 0o600)
1294 tkey
.flush() # make visible to other processes
1295 tpub
= open(tkey
.name
+ '.pub', 'w')
1296 os
.fchmod(tpub
.fileno(), 0o600)
1298 tpub
.flush() # make visible to other processes
1299 temp_files
+= [tkey
, tpub
]
1300 ssh_options
+= ['-i', tkey
.name
]
1302 self
._temp
_files
= temp_files
1304 self
._ssh
_options
= ' '.join(ssh_options
) # type: Optional[str]
1306 self
._ssh
_options
= None
1308 if self
.mode
== 'root':
1309 self
.ssh_user
= 'root'
1310 elif self
.mode
== 'cephadm-package':
1311 self
.ssh_user
= 'cephadm'
1315 def validate_ssh_config_fname(self
, ssh_config_fname
):
1316 if not os
.path
.isfile(ssh_config_fname
):
1317 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
1320 def _reset_con(self
, host
):
1321 conn
, r
= self
._cons
.get(host
, (None, None))
1323 self
.log
.debug('_reset_con close %s' % host
)
1325 del self
._cons
[host
]
1327 def _reset_cons(self
):
1328 for host
, conn_and_r
in self
._cons
.items():
1329 self
.log
.debug('_reset_cons close %s' % host
)
1330 conn
, r
= conn_and_r
1334 def offline_hosts_remove(self
, host
):
1335 if host
in self
.offline_hosts
:
1336 self
.offline_hosts
.remove(host
)
1341 if remoto
is not None:
1344 return False, "loading remoto library:{}".format(
1345 remoto_import_error
)
1347 def available(self
):
1349 The cephadm orchestrator is always available.
1351 return self
.can_run()
1353 def process(self
, completions
):
1355 Does nothing, as completions are processed in another thread.
1358 self
.log
.debug("process: completions={0}".format(orchestrator
.pretty_print(completions
)))
1360 for p
in completions
:
1363 def _require_hosts(self
, hosts
):
1365 Raise an error if any of the given hosts are unregistered.
1367 if isinstance(hosts
, six
.string_types
):
1369 keys
= self
.inventory
.keys()
1370 unregistered_hosts
= set(hosts
) - keys
1371 if unregistered_hosts
:
1372 logger
.warning('keys = {}'.format(keys
))
1373 raise RuntimeError("Host(s) {} not registered".format(
1374 ", ".join(map(lambda h
: "'{}'".format(h
),
1375 unregistered_hosts
))))
1377 @orchestrator._cli
_write
_command
(
1378 prefix
='cephadm set-ssh-config',
1379 desc
='Set the ssh_config file (use -i <ssh_config>)')
1380 def _set_ssh_config(self
, inbuf
=None):
1382 Set an ssh_config file provided from stdin
1387 if inbuf
is None or len(inbuf
) == 0:
1388 return -errno
.EINVAL
, "", "empty ssh config provided"
1389 self
.set_store("ssh_config", inbuf
)
1390 self
.log
.info('Set ssh_config')
1393 @orchestrator._cli
_write
_command
(
1394 prefix
='cephadm clear-ssh-config',
1395 desc
='Clear the ssh_config file')
1396 def _clear_ssh_config(self
):
1398 Clear the ssh_config file provided from stdin
1400 self
.set_store("ssh_config", None)
1401 self
.ssh_config_tmp
= None
1402 self
.log
.info('Cleared ssh_config')
1405 @orchestrator._cli
_read
_command
(
1406 prefix
='cephadm get-ssh-config',
1407 desc
='Returns the ssh config as used by cephadm'
1409 def _get_ssh_config(self
):
1410 if self
.ssh_config_file
:
1411 self
.validate_ssh_config_fname(self
.ssh_config_file
)
1412 with
open(self
.ssh_config_file
) as f
:
1413 return HandleCommandResult(stdout
=f
.read())
1414 ssh_config
= self
.get_store("ssh_config")
1416 return HandleCommandResult(stdout
=ssh_config
)
1417 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
1420 @orchestrator._cli
_write
_command
(
1421 'cephadm generate-key',
1422 desc
='Generate a cluster SSH key (if not present)')
1423 def _generate_key(self
):
1424 if not self
.ssh_pub
or not self
.ssh_key
:
1425 self
.log
.info('Generating ssh key...')
1426 tmp_dir
= TemporaryDirectory()
1427 path
= tmp_dir
.name
+ '/key'
1429 subprocess
.check_call([
1430 '/usr/bin/ssh-keygen',
1431 '-C', 'ceph-%s' % self
._cluster
_fsid
,
1435 with
open(path
, 'r') as f
:
1437 with
open(path
+ '.pub', 'r') as f
:
1441 os
.unlink(path
+ '.pub')
1443 self
.set_store('ssh_identity_key', secret
)
1444 self
.set_store('ssh_identity_pub', pub
)
1445 self
._reconfig
_ssh
()
1448 @orchestrator._cli
_write
_command
(
1449 'cephadm clear-key',
1450 desc
='Clear cluster SSH key')
1451 def _clear_key(self
):
1452 self
.set_store('ssh_identity_key', None)
1453 self
.set_store('ssh_identity_pub', None)
1454 self
._reconfig
_ssh
()
1455 self
.log
.info('Cleared cluster SSH key')
1458 @orchestrator._cli
_read
_command
(
1459 'cephadm get-pub-key',
1460 desc
='Show SSH public key for connecting to cluster hosts')
1461 def _get_pub_key(self
):
1463 return 0, self
.ssh_pub
, ''
1465 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
1467 @orchestrator._cli
_read
_command
(
1469 desc
='Show user for SSHing to cluster hosts')
1470 def _get_user(self
):
1471 return 0, self
.ssh_user
, ''
1473 @orchestrator._cli
_read
_command
(
1474 'cephadm check-host',
1475 'name=host,type=CephString '
1476 'name=addr,type=CephString,req=false',
1477 'Check whether we can access and manage a remote host')
1478 def check_host(self
, host
, addr
=None):
1479 out
, err
, code
= self
._run
_cephadm
(host
, 'client', 'check-host',
1480 ['--expect-hostname', host
],
1482 error_ok
=True, no_fsid
=True)
1484 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
1485 # if we have an outstanding health alert for this host, give the
1486 # serve thread a kick
1487 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1488 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1489 if item
.startswith('host %s ' % host
):
1491 return 0, '%s (%s) ok' % (host
, addr
), err
1493 @orchestrator._cli
_read
_command
(
1494 'cephadm prepare-host',
1495 'name=host,type=CephString '
1496 'name=addr,type=CephString,req=false',
1497 'Prepare a remote host for use with cephadm')
1498 def _prepare_host(self
, host
, addr
=None):
1499 out
, err
, code
= self
._run
_cephadm
(host
, 'client', 'prepare-host',
1500 ['--expect-hostname', host
],
1502 error_ok
=True, no_fsid
=True)
1504 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
1505 # if we have an outstanding health alert for this host, give the
1506 # serve thread a kick
1507 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1508 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1509 if item
.startswith('host %s ' % host
):
1511 return 0, '%s (%s) ok' % (host
, addr
), err
1513 def _get_connection(self
, host
):
1515 Setup a connection for running commands on remote host.
1517 conn_and_r
= self
._cons
.get(host
)
1519 self
.log
.debug('Have connection to %s' % host
)
1521 n
= self
.ssh_user
+ '@' + host
1522 self
.log
.debug("Opening connection to {} with ssh options '{}'".format(
1523 n
, self
._ssh
_options
))
1524 child_logger
=self
.log
.getChild(n
)
1525 child_logger
.setLevel('WARNING')
1526 conn
= remoto
.Connection(
1528 logger
=child_logger
,
1529 ssh_options
=self
._ssh
_options
)
1531 r
= conn
.import_module(remotes
)
1532 self
._cons
[host
] = conn
, r
1536 def _executable_path(self
, conn
, executable
):
1538 Remote validator that accepts a connection object to ensure that a certain
1539 executable is available returning its full path if so.
1541 Otherwise an exception with thorough details will be raised, informing the
1542 user that the executable was not found.
1544 executable_path
= conn
.remote_module
.which(executable
)
1545 if not executable_path
:
1546 raise RuntimeError("Executable '{}' not found on host '{}'".format(
1547 executable
, conn
.hostname
))
1548 self
.log
.debug("Found executable '{}' at path '{}'".format(executable
,
1550 return executable_path
1552 def _run_cephadm(self
, host
, entity
, command
, args
,
1558 # type: (str, Optional[str], str, List[str], Optional[str], Optional[str], bool, bool, Optional[str]) -> Tuple[List[str], List[str], int]
1560 Run cephadm on the remote host with the given command + args
1562 if not addr
and host
in self
.inventory
:
1563 addr
= self
.inventory
[host
].get('addr', host
)
1565 self
.offline_hosts_remove(host
)
1569 conn
, connr
= self
._get
_connection
(addr
)
1570 except IOError as e
:
1572 self
.log
.exception('failed to establish ssh connection')
1573 return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
1576 assert image
or entity
1578 daemon_type
= entity
.split('.', 1)[0] # type: ignore
1579 if daemon_type
in CEPH_TYPES
or \
1580 daemon_type
== 'nfs':
1581 # get container image
1582 ret
, image
, err
= self
.mon_command({
1583 'prefix': 'config get',
1584 'who': utils
.name_to_config_section(entity
),
1585 'key': 'container_image',
1587 image
= image
.strip() # type: ignore
1588 self
.log
.debug('%s container image %s' % (entity
, image
))
1592 final_args
.extend(['--image', image
])
1593 final_args
.append(command
)
1596 final_args
+= ['--fsid', self
._cluster
_fsid
]
1599 if self
.mode
== 'root':
1600 self
.log
.debug('args: %s' % (' '.join(final_args
)))
1602 self
.log
.debug('stdin: %s' % stdin
)
1603 script
= 'injected_argv = ' + json
.dumps(final_args
) + '\n'
1605 script
+= 'injected_stdin = ' + json
.dumps(stdin
) + '\n'
1606 script
+= self
._cephadm
1607 python
= connr
.choose_python()
1610 'unable to find python on %s (tried %s in %s)' % (
1611 host
, remotes
.PYTHONS
, remotes
.PATH
))
1613 out
, err
, code
= remoto
.process
.check(
1616 stdin
=script
.encode('utf-8'))
1617 except RuntimeError as e
:
1618 self
._reset
_con
(host
)
1620 return [], [str(e
)], 1
1622 elif self
.mode
== 'cephadm-package':
1624 out
, err
, code
= remoto
.process
.check(
1626 ['sudo', '/usr/bin/cephadm'] + final_args
,
1628 except RuntimeError as e
:
1629 self
._reset
_con
(host
)
1631 return [], [str(e
)], 1
1634 assert False, 'unsupported mode'
1636 self
.log
.debug('code: %d' % code
)
1638 self
.log
.debug('out: %s' % '\n'.join(out
))
1640 self
.log
.debug('err: %s' % '\n'.join(err
))
1641 if code
and not error_ok
:
1643 'cephadm exited with an error code: %d, stderr:%s' % (
1644 code
, '\n'.join(err
)))
1645 return out
, err
, code
1647 except execnet
.gateway_bootstrap
.HostNotFound
as e
:
1648 # this is a misleading exception as it seems to be thrown for
1649 # any sort of connection failure, even those having nothing to
1650 # do with "host not found" (e.g., ssh key permission denied).
1651 self
.offline_hosts
.add(host
)
1652 user
= 'root' if self
.mode
== 'root' else 'cephadm'
1653 msg
= f
'Failed to connect to {host} ({addr}). ' \
1654 f
'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \
1655 f
'you may want to run: \n' \
1656 f
'> ssh -F =(ceph cephadm get-ssh-config) -i =(ceph config-key get mgr/cephadm/ssh_identity_key) {user}@{host}'
1657 raise OrchestratorError(msg
) from e
1658 except Exception as ex
:
1659 self
.log
.exception(ex
)
1662 def _get_hosts(self
, label
=None):
1663 # type: (Optional[str]) -> List[str]
1665 for h
, hostspec
in self
.inventory
.items():
1666 if not label
or label
in hostspec
.get('labels', []):
1671 def add_host(self
, spec
):
1672 # type: (HostSpec) -> str
1674 Add a host to be managed by the orchestrator.
1676 :param host: host name
1678 assert_valid_host(spec
.hostname
)
1679 out
, err
, code
= self
._run
_cephadm
(spec
.hostname
, 'client', 'check-host',
1680 ['--expect-hostname', spec
.hostname
],
1682 error_ok
=True, no_fsid
=True)
1684 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1685 spec
.hostname
, spec
.addr
, err
))
1687 self
.inventory
[spec
.hostname
] = spec
.to_json()
1688 self
._save
_inventory
()
1689 self
.cache
.prime_empty_host(spec
.hostname
)
1690 self
.offline_hosts_remove(spec
.hostname
)
1691 self
.event
.set() # refresh stray health check
1692 self
.log
.info('Added host %s' % spec
.hostname
)
1693 return "Added host '{}'".format(spec
.hostname
)
1696 def remove_host(self
, host
):
1697 # type: (str) -> str
1699 Remove a host from orchestrator management.
1701 :param host: host name
1703 del self
.inventory
[host
]
1704 self
._save
_inventory
()
1705 self
.cache
.rm_host(host
)
1706 self
._reset
_con
(host
)
1707 self
.event
.set() # refresh stray health check
1708 self
.log
.info('Removed host %s' % host
)
1709 return "Removed host '{}'".format(host
)
1712 def update_host_addr(self
, host
, addr
):
1713 if host
not in self
.inventory
:
1714 raise OrchestratorError('host %s not registered' % host
)
1715 self
.inventory
[host
]['addr'] = addr
1716 self
._save
_inventory
()
1717 self
._reset
_con
(host
)
1718 self
.event
.set() # refresh stray health check
1719 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1720 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1723 def get_hosts(self
):
1724 # type: () -> List[orchestrator.HostSpec]
1726 Return a list of hosts managed by the orchestrator.
1729 - skip async: manager reads from cache.
1732 for hostname
, info
in self
.inventory
.items():
1733 r
.append(orchestrator
.HostSpec(
1735 addr
=info
.get('addr', hostname
),
1736 labels
=info
.get('labels', []),
1737 status
='Offline' if hostname
in self
.offline_hosts
else info
.get('status', ''),
1742 def add_host_label(self
, host
, label
):
1743 if host
not in self
.inventory
:
1744 raise OrchestratorError('host %s does not exist' % host
)
1746 if 'labels' not in self
.inventory
[host
]:
1747 self
.inventory
[host
]['labels'] = list()
1748 if label
not in self
.inventory
[host
]['labels']:
1749 self
.inventory
[host
]['labels'].append(label
)
1750 self
._save
_inventory
()
1751 self
.log
.info('Added label %s to host %s' % (label
, host
))
1752 return 'Added label %s to host %s' % (label
, host
)
1755 def remove_host_label(self
, host
, label
):
1756 if host
not in self
.inventory
:
1757 raise OrchestratorError('host %s does not exist' % host
)
1759 if 'labels' not in self
.inventory
[host
]:
1760 self
.inventory
[host
]['labels'] = list()
1761 if label
in self
.inventory
[host
]['labels']:
1762 self
.inventory
[host
]['labels'].remove(label
)
1763 self
._save
_inventory
()
1764 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1765 return 'Removed label %s from host %s' % (label
, host
)
1767 def _refresh_host_daemons(self
, host
):
1769 out
, err
, code
= self
._run
_cephadm
(
1770 host
, 'mon', 'ls', [], no_fsid
=True)
1772 return 'host %s cephadm ls returned %d: %s' % (
1774 except Exception as e
:
1775 return 'host %s scrape failed: %s' % (host
, e
)
1776 ls
= json
.loads(''.join(out
))
1779 if not d
['style'].startswith('cephadm'):
1781 if d
['fsid'] != self
._cluster
_fsid
:
1783 if '.' not in d
['name']:
1785 sd
= orchestrator
.DaemonDescription()
1786 sd
.last_refresh
= datetime
.datetime
.utcnow()
1787 for k
in ['created', 'started', 'last_configured', 'last_deployed']:
1790 setattr(sd
, k
, datetime
.datetime
.strptime(d
[k
], DATEFMT
))
1791 sd
.daemon_type
= d
['name'].split('.')[0]
1792 sd
.daemon_id
= '.'.join(d
['name'].split('.')[1:])
1794 sd
.container_id
= d
.get('container_id')
1797 sd
.container_id
= sd
.container_id
[0:12]
1798 sd
.container_image_name
= d
.get('container_image_name')
1799 sd
.container_image_id
= d
.get('container_image_id')
1800 sd
.version
= d
.get('version')
1802 sd
.status_desc
= d
['state']
1810 sd
.status_desc
= 'unknown'
1813 self
.log
.debug('Refreshed host %s daemons (%d)' % (host
, len(dm
)))
1814 self
.cache
.update_host_daemons(host
, dm
)
1815 self
.cache
.save_host(host
)
1818 def _refresh_host_devices(self
, host
):
1820 out
, err
, code
= self
._run
_cephadm
(
1823 ['--', 'inventory', '--format=json'])
1825 return 'host %s ceph-volume inventory returned %d: %s' % (
1827 except Exception as e
:
1828 return 'host %s ceph-volume inventory failed: %s' % (host
, e
)
1829 devices
= json
.loads(''.join(out
))
1831 out
, err
, code
= self
._run
_cephadm
(
1837 return 'host %s list-networks returned %d: %s' % (
1839 except Exception as e
:
1840 return 'host %s list-networks failed: %s' % (host
, e
)
1841 networks
= json
.loads(''.join(out
))
1842 self
.log
.debug('Refreshed host %s devices (%d) networks (%s)' % (
1843 host
, len(devices
), len(networks
)))
1844 devices
= inventory
.Devices
.from_json(devices
)
1845 self
.cache
.update_host_devices_networks(host
, devices
.devices
, networks
)
1846 self
.cache
.save_host(host
)
1849 def _get_spec_size(self
, spec
):
1850 if spec
.placement
.count
:
1851 return spec
.placement
.count
1852 elif spec
.placement
.host_pattern
:
1853 return len(spec
.placement
.pattern_matches_hosts(self
.inventory
.keys()))
1854 elif spec
.placement
.label
:
1855 return len(self
._get
_hosts
(spec
.placement
.label
))
1856 elif spec
.placement
.hosts
:
1857 return len(spec
.placement
.hosts
)
1862 def describe_service(self
, service_type
=None, service_name
=None,
1865 # ugly sync path, FIXME someday perhaps?
1866 for host
, hi
in self
.inventory
.items():
1867 self
._refresh
_host
_daemons
(host
)
1869 sm
= {} # type: Dict[str, orchestrator.ServiceDescription]
1870 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1871 for name
, dd
in dm
.items():
1872 if service_type
and service_type
!= dd
.daemon_type
:
1874 n
: str = dd
.service_name()
1875 if service_name
and service_name
!= n
:
1877 if dd
.daemon_type
== 'osd':
1878 continue # ignore OSDs for now
1879 if dd
.service_name() in self
.spec_store
.specs
:
1880 spec
= self
.spec_store
.specs
[dd
.service_name()]
1884 service_type
=dd
.daemon_type
,
1885 service_id
=dd
.service_id(),
1886 placement
=PlacementSpec(
1891 sm
[n
] = orchestrator
.ServiceDescription(
1892 last_refresh
=dd
.last_refresh
,
1893 container_image_id
=dd
.container_image_id
,
1894 container_image_name
=dd
.container_image_name
,
1897 if dd
.service_name() in self
.spec_store
.specs
:
1898 sm
[n
].size
= self
._get
_spec
_size
(spec
)
1899 sm
[n
].created
= self
.spec_store
.spec_created
[dd
.service_name()]
1900 if service_type
== 'nfs':
1901 spec
= cast(NFSServiceSpec
, spec
)
1902 sm
[n
].rados_config_location
= spec
.rados_config_location()
1907 if not sm
[n
].last_refresh
or not dd
.last_refresh
or dd
.last_refresh
< sm
[n
].last_refresh
: # type: ignore
1908 sm
[n
].last_refresh
= dd
.last_refresh
1909 if sm
[n
].container_image_id
!= dd
.container_image_id
:
1910 sm
[n
].container_image_id
= 'mix'
1911 if sm
[n
].container_image_name
!= dd
.container_image_name
:
1912 sm
[n
].container_image_name
= 'mix'
1913 for n
, spec
in self
.spec_store
.specs
.items():
1916 if service_type
is not None and service_type
!= spec
.service_type
:
1918 if service_name
is not None and service_name
!= n
:
1920 sm
[n
] = orchestrator
.ServiceDescription(
1922 size
=self
._get
_spec
_size
(spec
),
1925 if service_type
== 'nfs':
1926 spec
= cast(NFSServiceSpec
, spec
)
1927 sm
[n
].rados_config_location
= spec
.rados_config_location()
1928 return list(sm
.values())
1931 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None,
1932 host
=None, refresh
=False):
1934 # ugly sync path, FIXME someday perhaps?
1936 self
._refresh
_host
_daemons
(host
)
1938 for hostname
, hi
in self
.inventory
.items():
1939 self
._refresh
_host
_daemons
(hostname
)
1941 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1942 if host
and h
!= host
:
1944 for name
, dd
in dm
.items():
1945 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
1947 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
1949 if service_name
is not None and service_name
!= dd
.service_name():
1954 def service_action(self
, action
, service_name
):
1956 for host
, dm
in self
.cache
.daemons
.items():
1957 for name
, d
in dm
.items():
1958 if d
.matches_service(service_name
):
1959 args
.append((d
.daemon_type
, d
.daemon_id
,
1960 d
.hostname
, action
))
1961 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
1962 return self
._daemon
_actions
(args
)
1964 @async_map_completion
1965 def _daemon_actions(self
, daemon_type
, daemon_id
, host
, action
):
1966 return self
._daemon
_action
(daemon_type
, daemon_id
, host
, action
)
1968 def _daemon_action(self
, daemon_type
, daemon_id
, host
, action
):
1969 if action
== 'redeploy':
1970 # stop, recreate the container+unit, then restart
1971 return self
._create
_daemon
(daemon_type
, daemon_id
, host
)
1972 elif action
== 'reconfig':
1973 return self
._create
_daemon
(daemon_type
, daemon_id
, host
,
1977 'start': ['reset-failed', 'start'],
1979 'restart': ['reset-failed', 'restart'],
1981 name
= '%s.%s' % (daemon_type
, daemon_id
)
1982 for a
in actions
[action
]:
1983 out
, err
, code
= self
._run
_cephadm
(
1985 ['--name', name
, a
],
1987 self
.cache
.invalidate_host_daemons(host
)
1988 return "{} {} from host '{}'".format(action
, name
, host
)
1990 def daemon_action(self
, action
, daemon_type
, daemon_id
):
1992 for host
, dm
in self
.cache
.daemons
.items():
1993 for name
, d
in dm
.items():
1994 if d
.daemon_type
== daemon_type
and d
.daemon_id
== daemon_id
:
1995 args
.append((d
.daemon_type
, d
.daemon_id
,
1996 d
.hostname
, action
))
1998 raise orchestrator
.OrchestratorError(
1999 'Unable to find %s.%s daemon(s)' % (
2000 daemon_type
, daemon_id
))
2001 self
.log
.info('%s daemons %s' % (
2002 action
.capitalize(),
2003 ','.join(['%s.%s' % (a
[0], a
[1]) for a
in args
])))
2004 return self
._daemon
_actions
(args
)
2006 def remove_daemons(self
, names
):
2007 # type: (List[str]) -> orchestrator.Completion
2009 for host
, dm
in self
.cache
.daemons
.items():
2012 args
.append((name
, host
))
2014 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
2015 self
.log
.info('Remove daemons %s' % [a
[0] for a
in args
])
2016 return self
._remove
_daemons
(args
)
2019 def remove_service(self
, service_name
):
2020 self
.log
.info('Remove service %s' % service_name
)
2021 found
= self
.spec_store
.rm(service_name
)
2023 self
._kick
_serve
_loop
()
2024 return ['Removed service %s' % service_name
]
2026 # must be idempotent: still a success.
2027 return [f
'Failed to remove service. <{service_name}> was not found.']
2030 def get_inventory(self
, host_filter
=None, refresh
=False):
2032 Return the storage inventory of hosts matching the given filter.
2034 :param host_filter: host filter
2037 - add filtering by label
2040 # ugly sync path, FIXME someday perhaps?
2042 for host
in host_filter
.hosts
:
2043 self
._refresh
_host
_devices
(host
)
2045 for host
, hi
in self
.inventory
.items():
2046 self
._refresh
_host
_devices
(host
)
2049 for host
, dls
in self
.cache
.devices
.items():
2050 if host_filter
and host
not in host_filter
.hosts
:
2052 result
.append(orchestrator
.InventoryHost(host
,
2053 inventory
.Devices(dls
)))
2057 def zap_device(self
, host
, path
):
2058 self
.log
.info('Zap device %s:%s' % (host
, path
))
2059 out
, err
, code
= self
._run
_cephadm
(
2060 host
, 'osd', 'ceph-volume',
2061 ['--', 'lvm', 'zap', '--destroy', path
],
2063 self
.cache
.invalidate_host_devices(host
)
2065 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
2066 return '\n'.join(out
+ err
)
2068 def blink_device_light(self
, ident_fault
, on
, locs
):
2069 @async_map_completion
2070 def blink(host
, dev
, path
):
2073 'local-disk-%s-led-%s' % (
2075 'on' if on
else 'off'),
2076 '--path', path
or dev
,
2078 out
, err
, code
= self
._run
_cephadm
(
2079 host
, 'osd', 'shell', ['--'] + cmd
,
2083 'Unable to affect %s light for %s:%s. Command: %s' % (
2084 ident_fault
, host
, dev
, ' '.join(cmd
)))
2085 self
.log
.info('Set %s light for %s:%s %s' % (
2086 ident_fault
, host
, dev
, 'on' if on
else 'off'))
2087 return "Set %s light for %s:%s %s" % (
2088 ident_fault
, host
, dev
, 'on' if on
else 'off')
2092 def get_osd_uuid_map(self
, only_up
=False):
2093 # type: (bool) -> Dict[str, str]
2094 osd_map
= self
.get('osd_map')
2096 for o
in osd_map
['osds']:
2097 # only include OSDs that have ever started in this map. this way
2098 # an interrupted osd create can be repeated and succeed the second
2100 osd_id
= o
.get('osd')
2102 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2103 if not only_up
or (o
['up_from'] > 0):
2104 r
[str(osd_id
)] = o
.get('uuid', '')
2108 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]):
2109 return [self
._apply
(spec
) for spec
in specs
]
2111 def find_destroyed_osds(self
) -> Dict
[str, List
[str]]:
2112 osd_host_map
: Dict
[str, List
[str]] = dict()
2113 ret
, out
, err
= self
.mon_command({
2114 'prefix': 'osd tree',
2115 'states': ['destroyed'],
2119 raise OrchestratorError(f
"Caught error on calling 'osd tree destroyed' -> {err}")
2121 tree
= json
.loads(out
)
2122 except json
.decoder
.JSONDecodeError
:
2123 self
.log
.error(f
"Could not decode json -> {out}")
2126 nodes
= tree
.get('nodes', {})
2128 if node
.get('type') == 'host':
2129 osd_host_map
.update(
2130 {node
.get('name'): [str(_id
) for _id
in node
.get('children', list())]}
2135 def create_osds(self
, drive_group
: DriveGroupSpec
):
2136 self
.log
.debug(f
"Processing DriveGroup {drive_group}")
2138 drive_group
.osd_id_claims
= self
.find_destroyed_osds()
2139 self
.log
.info(f
"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
2140 for host
, drive_selection
in self
.prepare_drivegroup(drive_group
):
2141 self
.log
.info('Applying %s on host %s...' % (drive_group
.service_id
, host
))
2142 cmd
= self
.driveselection_to_ceph_volume(drive_group
, drive_selection
,
2143 drive_group
.osd_id_claims
.get(host
, []))
2145 self
.log
.debug("No data_devices, skipping DriveGroup: {}".format(drive_group
.service_id
))
2147 ret_msg
= self
._create
_osd
(host
, cmd
,
2148 replace_osd_ids
=drive_group
.osd_id_claims
.get(host
, []))
2150 return ", ".join(ret
)
2152 def prepare_drivegroup(self
, drive_group
: DriveGroupSpec
) -> List
[Tuple
[str, DriveSelection
]]:
2153 # 1) use fn_filter to determine matching_hosts
2154 matching_hosts
= drive_group
.placement
.pattern_matches_hosts([x
for x
in self
.cache
.get_hosts()])
2155 # 2) Map the inventory to the InventoryHost object
2160 def _find_inv_for_host(hostname
: str, inventory_dict
: dict):
2161 # This is stupid and needs to be loaded with the host
2162 for _host
, _inventory
in inventory_dict
.items():
2163 if _host
== hostname
:
2165 raise OrchestratorError("No inventory found for host: {}".format(hostname
))
2167 # 3) iterate over matching_host and call DriveSelection
2168 self
.log
.debug(f
"Checking matching hosts -> {matching_hosts}")
2169 for host
in matching_hosts
:
2170 inventory_for_host
= _find_inv_for_host(host
, self
.cache
.devices
)
2171 self
.log
.debug(f
"Found inventory for host {inventory_for_host}")
2172 drive_selection
= DriveSelection(drive_group
, inventory_for_host
)
2173 self
.log
.debug(f
"Found drive selection {drive_selection}")
2174 host_ds_map
.append((host
, drive_selection
))
2177 def driveselection_to_ceph_volume(self
, drive_group
: DriveGroupSpec
,
2178 drive_selection
: DriveSelection
,
2179 osd_id_claims
: Optional
[List
[str]] = None,
2180 preview
: bool = False) -> Optional
[str]:
2181 self
.log
.debug(f
"Translating DriveGroup <{drive_group}> to ceph-volume command")
2182 cmd
: Optional
[str] = translate
.to_ceph_volume(drive_group
, drive_selection
, osd_id_claims
, preview
=preview
).run()
2183 self
.log
.debug(f
"Resulting ceph-volume cmd: {cmd}")
2186 def preview_drivegroups(self
, drive_group_name
: Optional
[str] = None,
2187 dg_specs
: Optional
[List
[DriveGroupSpec
]] = None) -> List
[Dict
[str, Dict
[Any
, Any
]]]:
2189 if drive_group_name
:
2190 drive_groups
= cast(List
[DriveGroupSpec
],
2191 self
.spec_store
.find(service_name
=drive_group_name
))
2193 drive_groups
= dg_specs
2197 for drive_group
in drive_groups
:
2198 drive_group
.osd_id_claims
= self
.find_destroyed_osds()
2199 self
.log
.info(f
"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
2200 # prepare driveselection
2201 for host
, ds
in self
.prepare_drivegroup(drive_group
):
2202 cmd
= self
.driveselection_to_ceph_volume(drive_group
, ds
,
2203 drive_group
.osd_id_claims
.get(host
, []), preview
=True)
2205 self
.log
.debug("No data_devices, skipping DriveGroup: {}".format(drive_group
.service_name()))
2207 out
, err
, code
= self
._run
_ceph
_volume
_command
(host
, cmd
)
2209 concat_out
= json
.loads(" ".join(out
))
2210 ret_all
.append({'data': concat_out
, 'drivegroup': drive_group
.service_id
, 'host': host
})
2213 def _run_ceph_volume_command(self
, host
: str, cmd
: str) -> Tuple
[List
[str], List
[str], int]:
2214 self
._require
_hosts
(host
)
2217 ret
, keyring
, err
= self
.mon_command({
2218 'prefix': 'auth get',
2219 'entity': 'client.bootstrap-osd',
2223 ret
, config
, err
= self
.mon_command({
2224 "prefix": "config generate-minimal-conf",
2232 split_cmd
= cmd
.split(' ')
2233 _cmd
= ['--config-json', '-', '--']
2234 _cmd
.extend(split_cmd
)
2235 out
, err
, code
= self
._run
_cephadm
(
2236 host
, 'osd', 'ceph-volume',
2240 return out
, err
, code
2242 def _create_osd(self
, host
, cmd
, replace_osd_ids
=None):
2243 out
, err
, code
= self
._run
_ceph
_volume
_command
(host
, cmd
)
2245 if code
== 1 and ', it is already prepared' in '\n'.join(err
):
2246 # HACK: when we create against an existing LV, ceph-volume
2247 # returns an error and the above message. To make this
2248 # command idempotent, tolerate this "error" and continue.
2249 self
.log
.debug('the device was already prepared; continuing')
2253 'cephadm exited with an error code: %d, stderr:%s' % (
2254 code
, '\n'.join(err
)))
2257 out
, err
, code
= self
._run
_cephadm
(
2258 host
, 'osd', 'ceph-volume',
2264 before_osd_uuid_map
= self
.get_osd_uuid_map(only_up
=True)
2265 osds_elems
= json
.loads('\n'.join(out
))
2266 fsid
= self
._cluster
_fsid
2267 osd_uuid_map
= self
.get_osd_uuid_map()
2269 for osd_id
, osds
in osds_elems
.items():
2271 if osd
['tags']['ceph.cluster_fsid'] != fsid
:
2272 self
.log
.debug('mismatched fsid, skipping %s' % osd
)
2274 if osd_id
in before_osd_uuid_map
and osd_id
not in replace_osd_ids
:
2275 # if it exists but is part of the replacement operation, don't skip
2277 if osd_id
not in osd_uuid_map
:
2278 self
.log
.debug('osd id {} does not exist in cluster'.format(osd_id
))
2280 if osd_uuid_map
.get(osd_id
) != osd
['tags']['ceph.osd_fsid']:
2281 self
.log
.debug('mismatched osd uuid (cluster has %s, osd '
2283 osd_uuid_map
.get(osd_id
),
2284 osd
['tags']['ceph.osd_fsid']))
2287 created
.append(osd_id
)
2288 self
._create
_daemon
(
2289 'osd', osd_id
, host
,
2290 osd_uuid_map
=osd_uuid_map
)
2293 self
.cache
.invalidate_host_devices(host
)
2294 return "Created osd(s) %s on host '%s'" % (','.join(created
), host
)
2296 return "Created no osd(s) on host %s; already created?" % host
2298 def _calc_daemon_deps(self
, daemon_type
, daemon_id
):
2300 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
2301 'grafana': ['prometheus'],
2302 'alertmanager': ['mgr', 'alertmanager'],
2305 for dep_type
in need
.get(daemon_type
, []):
2306 for dd
in self
.cache
.get_daemons_by_service(dep_type
):
2307 deps
.append(dd
.name())
2310 def _get_config_and_keyring(self
, daemon_type
, daemon_id
,
2312 extra_ceph_config
=None):
2313 # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
2316 if daemon_type
== 'mon':
2319 ename
= utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
)
2320 ret
, keyring
, err
= self
.mon_command({
2321 'prefix': 'auth get',
2326 ret
, config
, err
= self
.mon_command({
2327 "prefix": "config generate-minimal-conf",
2329 if extra_ceph_config
:
2330 config
+= extra_ceph_config
2337 def _create_daemon(self
, daemon_type
, daemon_id
, host
,
2339 extra_args
=None, extra_config
=None,
2344 if not extra_config
:
2346 name
= '%s.%s' % (daemon_type
, daemon_id
)
2348 start_time
= datetime
.datetime
.utcnow()
2349 deps
= [] # type: List[str]
2350 cephadm_config
= {} # type: Dict[str, Any]
2351 if daemon_type
== 'prometheus':
2352 cephadm_config
, deps
= self
._generate
_prometheus
_config
()
2353 extra_args
.extend(['--config-json', '-'])
2354 elif daemon_type
== 'grafana':
2355 cephadm_config
, deps
= self
._generate
_grafana
_config
()
2356 extra_args
.extend(['--config-json', '-'])
2357 elif daemon_type
== 'nfs':
2358 cephadm_config
, deps
= \
2359 self
._generate
_nfs
_config
(daemon_type
, daemon_id
, host
)
2360 extra_args
.extend(['--config-json', '-'])
2361 elif daemon_type
== 'alertmanager':
2362 cephadm_config
, deps
= self
._generate
_alertmanager
_config
()
2363 extra_args
.extend(['--config-json', '-'])
2365 # Ceph.daemons (mon, mgr, mds, osd, etc)
2366 cephadm_config
= self
._get
_config
_and
_keyring
(
2367 daemon_type
, daemon_id
,
2369 extra_ceph_config
=extra_config
.pop('config', ''))
2371 cephadm_config
.update({'files': extra_config
})
2372 extra_args
.extend(['--config-json', '-'])
2374 # osd deployments needs an --osd-uuid arg
2375 if daemon_type
== 'osd':
2376 if not osd_uuid_map
:
2377 osd_uuid_map
= self
.get_osd_uuid_map()
2378 osd_uuid
= osd_uuid_map
.get(daemon_id
)
2380 raise OrchestratorError('osd.%d not in osdmap' % daemon_id
)
2381 extra_args
.extend(['--osd-fsid', osd_uuid
])
2384 extra_args
.append('--reconfig')
2385 if self
.allow_ptrace
:
2386 extra_args
.append('--allow-ptrace')
2388 self
.log
.info('%s daemon %s on %s' % (
2389 'Reconfiguring' if reconfig
else 'Deploying',
2392 out
, err
, code
= self
._run
_cephadm
(
2393 host
, name
, 'deploy',
2397 stdin
=json
.dumps(cephadm_config
))
2398 if not code
and host
in self
.cache
.daemons
:
2399 # prime cached service state with what we (should have)
2401 sd
= orchestrator
.DaemonDescription()
2402 sd
.daemon_type
= daemon_type
2403 sd
.daemon_id
= daemon_id
2406 sd
.status_desc
= 'starting'
2407 self
.cache
.add_daemon(host
, sd
)
2408 self
.cache
.invalidate_host_daemons(host
)
2409 self
.cache
.update_daemon_config_deps(host
, name
, deps
, start_time
)
2410 self
.cache
.save_host(host
)
2411 return "{} {} on host '{}'".format(
2412 'Reconfigured' if reconfig
else 'Deployed', name
, host
)
2414 @async_map_completion
2415 def _remove_daemons(self
, name
, host
):
2416 return self
._remove
_daemon
(name
, host
)
2418 def _remove_daemon(self
, name
, host
):
2422 (daemon_type
, daemon_id
) = name
.split('.', 1)
2423 if daemon_type
== 'mon':
2424 self
._check
_safe
_to
_destroy
_mon
(daemon_id
)
2426 # remove mon from quorum before we destroy the daemon
2427 self
.log
.info('Removing monitor %s from monmap...' % name
)
2428 ret
, out
, err
= self
.mon_command({
2433 raise OrchestratorError('failed to remove mon %s from monmap' % (
2436 args
= ['--name', name
, '--force']
2437 self
.log
.info('Removing daemon %s from %s' % (name
, host
))
2438 out
, err
, code
= self
._run
_cephadm
(
2439 host
, name
, 'rm-daemon', args
)
2441 # remove item from cache
2442 self
.cache
.rm_daemon(host
, name
)
2443 self
.cache
.invalidate_host_daemons(host
)
2444 return "Removed {} from host '{}'".format(name
, host
)
2446 def _apply_service(self
, spec
):
2448 Schedule a service. Deploy new daemons or remove old ones, depending
2449 on the target label and count specified in the placement.
2451 daemon_type
= spec
.service_type
2452 service_name
= spec
.service_name()
2454 self
.log
.debug('Skipping unmanaged service %s spec' % service_name
)
2456 self
.log
.debug('Applying service %s spec' % service_name
)
2458 'mon': self
._create
_mon
,
2459 'mgr': self
._create
_mgr
,
2460 'osd': self
.create_osds
,
2461 'mds': self
._create
_mds
,
2462 'rgw': self
._create
_rgw
,
2463 'rbd-mirror': self
._create
_rbd
_mirror
,
2464 'nfs': self
._create
_nfs
,
2465 'grafana': self
._create
_grafana
,
2466 'alertmanager': self
._create
_alertmanager
,
2467 'prometheus': self
._create
_prometheus
,
2468 'node-exporter': self
._create
_node
_exporter
,
2469 'crash': self
._create
_crash
,
2470 'iscsi': self
._create
_iscsi
,
2473 'mds': self
._config
_mds
,
2474 'rgw': self
._config
_rgw
,
2475 'nfs': self
._config
_nfs
,
2476 'iscsi': self
._config
_iscsi
,
2478 create_func
= create_fns
.get(daemon_type
, None)
2480 self
.log
.debug('unrecognized service type %s' % daemon_type
)
2482 config_func
= config_fns
.get(daemon_type
, None)
2484 daemons
= self
.cache
.get_daemons_by_service(service_name
)
2486 public_network
= None
2487 if daemon_type
== 'mon':
2488 ret
, out
, err
= self
.mon_command({
2489 'prefix': 'config get',
2491 'key': 'public_network',
2494 public_network
= out
.strip()
2495 self
.log
.debug('mon public_network is %s' % public_network
)
2497 def matches_network(host
):
2498 # type: (str) -> bool
2499 if not public_network
:
2501 # make sure we have 1 or more IPs for that network on that
2503 return len(self
.cache
.networks
[host
].get(public_network
, [])) > 0
2505 hosts
= HostAssignment(
2507 get_hosts_func
=self
._get
_hosts
,
2508 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2509 filter_new_host
=matches_network
if daemon_type
== 'mon' else None,
2514 if daemon_type
== 'osd':
2515 return False if create_func(spec
) else True # type: ignore
2518 if daemon_type
in ['mon', 'mgr'] and len(hosts
) < 1:
2519 self
.log
.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts
)
2524 hosts_with_daemons
= {d
.hostname
for d
in daemons
}
2525 self
.log
.debug('hosts with daemons: %s' % hosts_with_daemons
)
2526 for host
, network
, name
in hosts
:
2527 if host
not in hosts_with_daemons
:
2528 if not did_config
and config_func
:
2531 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2532 spec
.service_id
, name
)
2533 self
.log
.debug('Placing %s.%s on host %s' % (
2534 daemon_type
, daemon_id
, host
))
2535 if daemon_type
== 'mon':
2536 create_func(daemon_id
, host
, network
) # type: ignore
2537 elif daemon_type
== 'nfs':
2538 create_func(daemon_id
, host
, spec
) # type: ignore
2540 create_func(daemon_id
, host
) # type: ignore
2542 # add to daemon list so next name(s) will also be unique
2543 sd
= orchestrator
.DaemonDescription(
2545 daemon_type
=daemon_type
,
2546 daemon_id
=daemon_id
,
2552 target_hosts
= [h
.hostname
for h
in hosts
]
2554 if d
.hostname
not in target_hosts
:
2555 # NOTE: we are passing the 'force' flag here, which means
2556 # we can delete a mon instances data.
2557 self
._remove
_daemon
(d
.name(), d
.hostname
)
2562 def _apply_all_services(self
):
2564 specs
= [] # type: List[ServiceSpec]
2565 for sn
, spec
in self
.spec_store
.specs
.items():
2569 if self
._apply
_service
(spec
):
2571 except Exception as e
:
2572 self
.log
.warning('Failed to apply %s spec %s: %s' % (
2573 spec
.service_name(), spec
, e
))
2576 def _check_daemons(self
):
2577 # get monmap mtime so we can refresh configs when mons change
2578 monmap
= self
.get('mon_map')
2579 last_monmap
: Optional
[datetime
.datetime
] = datetime
.datetime
.strptime(
2580 monmap
['modified'], CEPH_DATEFMT
)
2581 if last_monmap
and last_monmap
> datetime
.datetime
.utcnow():
2582 last_monmap
= None # just in case clocks are skewed
2584 daemons
= self
.cache
.get_daemons()
2585 grafanas
= [] # type: List[orchestrator.DaemonDescription]
2588 spec
= self
.spec_store
.specs
.get(dd
.service_name(), None)
2589 if not spec
and dd
.daemon_type
not in ['mon', 'mgr', 'osd']:
2590 # (mon and mgr specs should always exist; osds aren't matched
2591 # to a service spec)
2592 self
.log
.info('Removing orphan daemon %s...' % dd
.name())
2593 self
._remove
_daemon
(dd
.name(), dd
.hostname
)
2595 # ignore unmanaged services
2596 if not spec
or spec
.unmanaged
:
2600 if dd
.daemon_type
== 'grafana':
2601 # put running instances at the front of the list
2602 grafanas
.insert(0, dd
)
2603 deps
= self
._calc
_daemon
_deps
(dd
.daemon_type
, dd
.daemon_id
)
2604 last_deps
, last_config
= self
.cache
.get_daemon_last_config_deps(
2605 dd
.hostname
, dd
.name())
2606 if last_deps
is None:
2610 self
.log
.info('Reconfiguring %s (unknown last config time)...'% (
2613 elif last_deps
!= deps
:
2614 self
.log
.debug('%s deps %s -> %s' % (dd
.name(), last_deps
,
2616 self
.log
.info('Reconfiguring %s (dependencies changed)...' % (
2619 elif last_monmap
and \
2620 last_monmap
> last_config
and \
2621 dd
.daemon_type
in CEPH_TYPES
:
2622 self
.log
.info('Reconfiguring %s (monmap changed)...' % dd
.name())
2625 self
._create
_daemon
(dd
.daemon_type
, dd
.daemon_id
,
2626 dd
.hostname
, reconfig
=True)
2628 # make sure the dashboard [does not] references grafana
2630 current_url
= self
.get_module_option_ex('dashboard',
2633 host
= grafanas
[0].hostname
2634 url
= 'https://%s:3000' % (self
.inventory
[host
].get('addr',
2636 if current_url
!= url
:
2637 self
.log
.info('Setting dashboard grafana config to %s' % url
)
2638 self
.set_module_option_ex('dashboard', 'GRAFANA_API_URL',
2640 # FIXME: is it a signed cert??
2641 except Exception as e
:
2642 self
.log
.debug('got exception fetching dashboard grafana state: %s',
2645 def _add_daemon(self
, daemon_type
, spec
,
2646 create_func
, config_func
=None):
2648 Add (and place) a daemon. Require explicit host placement. Do not
2649 schedule, and do not apply the related scheduling limitations.
2651 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
2652 if not spec
.placement
.hosts
:
2653 raise OrchestratorError('must specify host(s) to deploy on')
2654 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
2655 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2656 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
2657 spec
.placement
.hosts
, count
,
2658 create_func
, config_func
)
2660 def _create_daemons(self
, daemon_type
, spec
, daemons
,
2662 create_func
, config_func
=None):
2663 if count
> len(hosts
):
2664 raise OrchestratorError('too few hosts: want %d, have %s' % (
2670 args
= [] # type: List[tuple]
2671 for host
, network
, name
in hosts
:
2672 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2673 spec
.service_id
, name
)
2674 self
.log
.debug('Placing %s.%s on host %s' % (
2675 daemon_type
, daemon_id
, host
))
2676 if daemon_type
== 'mon':
2677 args
.append((daemon_id
, host
, network
)) # type: ignore
2678 elif daemon_type
== 'nfs':
2679 args
.append((daemon_id
, host
, spec
)) # type: ignore
2680 elif daemon_type
== 'iscsi':
2681 args
.append((daemon_id
, host
, spec
)) # type: ignore
2683 args
.append((daemon_id
, host
)) # type: ignore
2685 # add to daemon list so next name(s) will also be unique
2686 sd
= orchestrator
.DaemonDescription(
2688 daemon_type
=daemon_type
,
2689 daemon_id
=daemon_id
,
2693 @async_map_completion
2694 def create_func_map(*args
):
2695 return create_func(*args
)
2697 return create_func_map(args
)
2700 def apply_mon(self
, spec
):
2701 return self
._apply
(spec
)
2703 def _create_mon(self
, name
, host
, network
):
2705 Create a new monitor on the given host.
2708 ret
, keyring
, err
= self
.mon_command({
2709 'prefix': 'auth get',
2713 extra_config
= '[mon.%s]\n' % name
2715 # infer whether this is a CIDR network, addrvec, or plain IP
2717 extra_config
+= 'public network = %s\n' % network
2718 elif network
.startswith('[v') and network
.endswith(']'):
2719 extra_config
+= 'public addrv = %s\n' % network
2720 elif ':' not in network
:
2721 extra_config
+= 'public addr = %s\n' % network
2723 raise OrchestratorError('Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
2725 # try to get the public_network from the config
2726 ret
, network
, err
= self
.mon_command({
2727 'prefix': 'config get',
2729 'key': 'public_network',
2731 network
= network
.strip() # type: ignore
2733 raise RuntimeError('Unable to fetch cluster_network config option')
2735 raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
2736 if '/' not in network
:
2737 raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network
)
2738 extra_config
+= 'public network = %s\n' % network
2740 return self
._create
_daemon
('mon', name
, host
,
2742 extra_config
={'config': extra_config
})
2744 def add_mon(self
, spec
):
2745 # type: (ServiceSpec) -> orchestrator.Completion
2746 return self
._add
_daemon
('mon', spec
, self
._create
_mon
)
2748 def _create_mgr(self
, mgr_id
, host
):
2750 Create a new manager instance on a host.
2753 ret
, keyring
, err
= self
.mon_command({
2754 'prefix': 'auth get-or-create',
2755 'entity': 'mgr.%s' % mgr_id
,
2756 'caps': ['mon', 'profile mgr',
2761 return self
._create
_daemon
('mgr', mgr_id
, host
, keyring
=keyring
)
2763 def add_mgr(self
, spec
):
2764 # type: (ServiceSpec) -> orchestrator.Completion
2765 return self
._add
_daemon
('mgr', spec
, self
._create
_mgr
)
2767 def _apply(self
, spec
: ServiceSpec
) -> str:
2768 if spec
.placement
.is_empty():
2769 # fill in default placement
2771 'mon': PlacementSpec(count
=5),
2772 'mgr': PlacementSpec(count
=2),
2773 'mds': PlacementSpec(count
=2),
2774 'rgw': PlacementSpec(count
=2),
2775 'iscsi': PlacementSpec(count
=1),
2776 'rbd-mirror': PlacementSpec(count
=2),
2777 'nfs': PlacementSpec(count
=1),
2778 'grafana': PlacementSpec(count
=1),
2779 'alertmanager': PlacementSpec(count
=1),
2780 'prometheus': PlacementSpec(count
=1),
2781 'node-exporter': PlacementSpec(host_pattern
='*'),
2782 'crash': PlacementSpec(host_pattern
='*'),
2784 spec
.placement
= defaults
[spec
.service_type
]
2785 elif spec
.service_type
in ['mon', 'mgr'] and \
2786 spec
.placement
.count
is not None and \
2787 spec
.placement
.count
< 1:
2788 raise OrchestratorError('cannot scale %s service below 1' % (
2793 get_hosts_func
=self
._get
_hosts
,
2794 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2797 self
.log
.info('Saving service %s spec with placement %s' % (
2798 spec
.service_name(), spec
.placement
.pretty_str()))
2799 self
.spec_store
.save(spec
)
2800 self
._kick
_serve
_loop
()
2801 return "Scheduled %s update..." % spec
.service_name()
2804 def apply(self
, specs
: List
[ServiceSpec
]):
2805 return [self
._apply
(spec
) for spec
in specs
]
2808 def apply_mgr(self
, spec
):
2809 return self
._apply
(spec
)
2811 def add_mds(self
, spec
: ServiceSpec
):
2812 return self
._add
_daemon
('mds', spec
, self
._create
_mds
, self
._config
_mds
)
2815 def apply_mds(self
, spec
: ServiceSpec
):
2816 return self
._apply
(spec
)
2818 def _config_mds(self
, spec
):
2819 # ensure mds_join_fs is set for these daemons
2820 assert spec
.service_id
2821 ret
, out
, err
= self
.mon_command({
2822 'prefix': 'config set',
2823 'who': 'mds.' + spec
.service_id
,
2824 'name': 'mds_join_fs',
2825 'value': spec
.service_id
,
2828 def _create_mds(self
, mds_id
, host
):
2830 ret
, keyring
, err
= self
.mon_command({
2831 'prefix': 'auth get-or-create',
2832 'entity': 'mds.' + mds_id
,
2833 'caps': ['mon', 'profile mds',
2837 return self
._create
_daemon
('mds', mds_id
, host
, keyring
=keyring
)
2839 def add_rgw(self
, spec
):
2840 return self
._add
_daemon
('rgw', spec
, self
._create
_rgw
, self
._config
_rgw
)
2842 def _config_rgw(self
, spec
):
2843 # ensure rgw_realm and rgw_zone is set for these daemons
2844 ret
, out
, err
= self
.mon_command({
2845 'prefix': 'config set',
2846 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
2848 'value': spec
.rgw_zone
,
2850 ret
, out
, err
= self
.mon_command({
2851 'prefix': 'config set',
2852 'who': f
"{utils.name_to_config_section('rgw')}.{spec.rgw_realm}",
2853 'name': 'rgw_realm',
2854 'value': spec
.rgw_realm
,
2856 ret
, out
, err
= self
.mon_command({
2857 'prefix': 'config set',
2858 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
2859 'name': 'rgw_frontends',
2860 'value': spec
.rgw_frontends_config_value(),
2863 if spec
.rgw_frontend_ssl_certificate
:
2864 if isinstance(spec
.rgw_frontend_ssl_certificate
, list):
2865 cert_data
= '\n'.join(spec
.rgw_frontend_ssl_certificate
)
2867 cert_data
= spec
.rgw_frontend_ssl_certificate
2868 ret
, out
, err
= self
.mon_command({
2869 'prefix': 'config-key set',
2870 'key': f
'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.crt',
2874 if spec
.rgw_frontend_ssl_key
:
2875 if isinstance(spec
.rgw_frontend_ssl_key
, list):
2876 key_data
= '\n'.join(spec
.rgw_frontend_ssl_key
)
2878 key_data
= spec
.rgw_frontend_ssl_key
2879 ret
, out
, err
= self
.mon_command({
2880 'prefix': 'config-key set',
2881 'key': f
'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.key',
2885 logger
.info('Saving service %s spec with placement %s' % (
2886 spec
.service_name(), spec
.placement
.pretty_str()))
2887 self
.spec_store
.save(spec
)
2889 def _create_rgw(self
, rgw_id
, host
):
2890 ret
, keyring
, err
= self
.mon_command({
2891 'prefix': 'auth get-or-create',
2892 'entity': f
"{utils.name_to_config_section('rgw')}.{rgw_id}",
2893 'caps': ['mon', 'allow *',
2895 'osd', 'allow rwx'],
2897 return self
._create
_daemon
('rgw', rgw_id
, host
, keyring
=keyring
)
2900 def apply_rgw(self
, spec
):
2901 return self
._apply
(spec
)
2903 def add_iscsi(self
, spec
):
2904 # type: (ServiceSpec) -> orchestrator.Completion
2905 return self
._add
_daemon
('iscsi', spec
, self
._create
_iscsi
, self
._config
_iscsi
)
2907 def _config_iscsi(self
, spec
):
2908 logger
.info('Saving service %s spec with placement %s' % (
2909 spec
.service_name(), spec
.placement
.pretty_str()))
2910 self
.spec_store
.save(spec
)
2912 def _create_iscsi(self
, igw_id
, host
, spec
):
2913 ret
, keyring
, err
= self
.mon_command({
2914 'prefix': 'auth get-or-create',
2915 'entity': utils
.name_to_config_section('iscsi') + '.' + igw_id
,
2916 'caps': ['mon', 'allow rw',
2917 'osd', f
'allow rwx pool={spec.pool}'],
2920 api_secure
= 'false' if spec
.api_secure
is None else spec
.api_secure
2922 # generated by cephadm
2924 cluster_client_name = {utils.name_to_config_section('iscsi')}.{igw_id}
2926 trusted_ip_list = {spec.trusted_ip_list or ''}
2927 minimum_gateways = 1
2928 fqdn_enabled = {spec.fqdn_enabled or ''}
2929 api_port = {spec.api_port or ''}
2930 api_user = {spec.api_user or ''}
2931 api_password = {spec.api_password or ''}
2932 api_secure = {api_secure}
2934 extra_config
= {'iscsi-gateway.cfg': igw_conf
}
2935 return self
._create
_daemon
('iscsi', igw_id
, host
, keyring
=keyring
,
2936 extra_config
=extra_config
)
2939 def apply_iscsi(self
, spec
):
2940 return self
._apply
(spec
)
2942 def add_rbd_mirror(self
, spec
):
2943 return self
._add
_daemon
('rbd-mirror', spec
, self
._create
_rbd
_mirror
)
2945 def _create_rbd_mirror(self
, daemon_id
, host
):
2946 ret
, keyring
, err
= self
.mon_command({
2947 'prefix': 'auth get-or-create',
2948 'entity': 'client.rbd-mirror.' + daemon_id
,
2949 'caps': ['mon', 'profile rbd-mirror',
2950 'osd', 'profile rbd'],
2952 return self
._create
_daemon
('rbd-mirror', daemon_id
, host
,
2956 def apply_rbd_mirror(self
, spec
):
2957 return self
._apply
(spec
)
2959 def _generate_nfs_config(self
, daemon_type
, daemon_id
, host
):
2960 # type: (str, str, str) -> Tuple[Dict[str, Any], List[str]]
2961 deps
= [] # type: List[str]
2963 # find the matching NFSServiceSpec
2964 # TODO: find the spec and pass via _create_daemon instead ??
2965 service_name
= self
.get_service_name(daemon_type
, daemon_id
, host
)
2966 specs
= self
.spec_store
.find(service_name
)
2968 raise OrchestratorError('Cannot find service spec %s' % (service_name
))
2969 elif len(specs
) > 1:
2970 raise OrchestratorError('Found multiple service specs for %s' % (service_name
))
2972 # cast to keep mypy happy
2973 spec
= cast(NFSServiceSpec
, specs
[0])
2975 nfs
= NFSGanesha(self
, daemon_id
, spec
)
2977 # create the keyring
2978 entity
= nfs
.get_keyring_entity()
2979 keyring
= nfs
.get_or_create_keyring(entity
=entity
)
2981 # update the caps after get-or-create, the keyring might already exist!
2982 nfs
.update_keyring_caps(entity
=entity
)
2984 # create the rados config object
2985 nfs
.create_rados_config_obj()
2987 # generate the cephadm config
2988 cephadm_config
= nfs
.get_cephadm_config()
2989 cephadm_config
.update(
2990 self
._get
_config
_and
_keyring
(
2991 daemon_type
, daemon_id
,
2994 return cephadm_config
, deps
2996 def add_nfs(self
, spec
):
2997 return self
._add
_daemon
('nfs', spec
, self
._create
_nfs
, self
._config
_nfs
)
2999 def _config_nfs(self
, spec
):
3000 logger
.info('Saving service %s spec with placement %s' % (
3001 spec
.service_name(), spec
.placement
.pretty_str()))
3002 self
.spec_store
.save(spec
)
3004 def _create_nfs(self
, daemon_id
, host
, spec
):
3005 return self
._create
_daemon
('nfs', daemon_id
, host
)
3008 def apply_nfs(self
, spec
):
3009 return self
._apply
(spec
)
3011 def _generate_prometheus_config(self
):
3012 # type: () -> Tuple[Dict[str, Any], List[str]]
3013 deps
= [] # type: List[str]
3016 mgr_scrape_list
= []
3017 mgr_map
= self
.get('mgr_map')
3019 t
= mgr_map
.get('services', {}).get('prometheus', None)
3022 mgr_scrape_list
.append(t
)
3025 port
= t
.split(':')[1]
3026 # scan all mgrs to generate deps and to get standbys too.
3027 # assume that they are all on the same port as the active mgr.
3028 for dd
in self
.cache
.get_daemons_by_service('mgr'):
3029 # we consider the mgr a dep even if the prometheus module is
3030 # disabled in order to be consistent with _calc_daemon_deps().
3031 deps
.append(dd
.name())
3034 if dd
.daemon_id
== self
.get_mgr_id():
3036 hi
= self
.inventory
.get(dd
.hostname
, {})
3037 addr
= hi
.get('addr', dd
.hostname
)
3038 mgr_scrape_list
.append(addr
.split(':')[0] + ':' + port
)
3040 # scrape node exporters
3042 for dd
in self
.cache
.get_daemons_by_service('node-exporter'):
3043 deps
.append(dd
.name())
3044 hi
= self
.inventory
.get(dd
.hostname
, {})
3045 addr
= hi
.get('addr', dd
.hostname
)
3046 if not node_configs
:
3051 node_configs
+= """ - targets: {}
3054 """.format([addr
.split(':')[0] + ':9100'],
3057 # scrape alert managers
3058 alertmgr_configs
= ""
3059 alertmgr_targets
= []
3060 for dd
in self
.cache
.get_daemons_by_service('alertmanager'):
3061 deps
.append(dd
.name())
3062 hi
= self
.inventory
.get(dd
.hostname
, {})
3063 addr
= hi
.get('addr', dd
.hostname
)
3064 alertmgr_targets
.append("'{}:9093'".format(addr
.split(':')[0]))
3065 if alertmgr_targets
:
3066 alertmgr_configs
= """alerting:
3069 path_prefix: /alertmanager
3072 """.format(", ".join(alertmgr_targets
))
3074 # generate the prometheus configuration
3077 'prometheus.yml': """# generated by cephadm
3080 evaluation_interval: 10s
3082 - /etc/prometheus/alerting/*
3087 - targets: {mgr_scrape_list}
3089 instance: 'ceph_cluster'
3092 mgr_scrape_list
=str(mgr_scrape_list
),
3093 node_configs
=str(node_configs
),
3094 alertmgr_configs
=str(alertmgr_configs
)
3099 # include alerts, if present in the container
3100 if os
.path
.exists(self
.prometheus_alerts_path
):
3101 with
open(self
.prometheus_alerts_path
, "r") as f
:
3103 r
['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts
3105 return r
, sorted(deps
)
3107 def _generate_grafana_config(self
):
3108 # type: () -> Tuple[Dict[str, Any], List[str]]
3109 deps
= [] # type: List[str]
3110 def generate_grafana_ds_config(hosts
: List
[str]) -> str:
3111 config
= '''# generated by cephadm
3113 {delete_data_sources}
3118 delete_ds_template
= '''
3120 orgId: 1\n'''.lstrip('\n')
3126 url: 'http://{host}:9095'
3128 isDefault: {is_default}
3129 editable: false\n'''.lstrip('\n')
3131 delete_data_sources
= ''
3133 for i
, host
in enumerate(hosts
):
3134 name
= "Dashboard %d" % (i
+ 1)
3135 data_sources
+= ds_template
.format(
3138 is_default
=str(i
== 0).lower()
3140 delete_data_sources
+= delete_ds_template
.format(
3143 return config
.format(
3144 delete_data_sources
=delete_data_sources
,
3145 data_sources
=data_sources
,
3148 prom_services
= [] # type: List[str]
3149 for dd
in self
.cache
.get_daemons_by_service('prometheus'):
3150 prom_services
.append(dd
.hostname
)
3151 deps
.append(dd
.name())
3153 cert
= self
.get_store('grafana_crt')
3154 pkey
= self
.get_store('grafana_key')
3157 verify_tls(cert
, pkey
)
3158 except ServerConfigException
as e
:
3159 logger
.warning('Provided grafana TLS certificates invalid: %s', str(e
))
3160 cert
, pkey
= None, None
3161 if not (cert
and pkey
):
3162 cert
, pkey
= create_self_signed_cert('Ceph', 'cephadm')
3163 self
.set_store('grafana_crt', cert
)
3164 self
.set_store('grafana_key', pkey
)
3166 'prefix': 'dashboard set-grafana-api-ssl-verify',
3174 "grafana.ini": """# generated by cephadm
3176 default_theme = light
3179 org_name = 'Main Org.'
3182 domain = 'bootstrap.storage.lab'
3184 cert_file = /etc/grafana/certs/cert_file
3185 cert_key = /etc/grafana/certs/cert_key
3189 admin_password = admin
3190 allow_embedding = true
3192 'provisioning/datasources/ceph-dashboard.yml': generate_grafana_ds_config(prom_services
),
3193 'certs/cert_file': '# generated by cephadm\n%s' % cert
,
3194 'certs/cert_key': '# generated by cephadm\n%s' % pkey
,
3197 return config_file
, sorted(deps
)
3199 def _get_dashboard_url(self
):
3201 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
3203 def _generate_alertmanager_config(self
):
3204 # type: () -> Tuple[Dict[str, Any], List[str]]
3205 deps
= [] # type: List[str]
3209 mgr_map
= self
.get('mgr_map')
3211 proto
= None # http: or https:
3212 url
= mgr_map
.get('services', {}).get('dashboard', None)
3214 dashboard_urls
.append(url
)
3215 proto
= url
.split('/')[0]
3216 port
= url
.split('/')[2].split(':')[1]
3217 # scan all mgrs to generate deps and to get standbys too.
3218 # assume that they are all on the same port as the active mgr.
3219 for dd
in self
.cache
.get_daemons_by_service('mgr'):
3220 # we consider mgr a dep even if the dashboard is disabled
3221 # in order to be consistent with _calc_daemon_deps().
3222 deps
.append(dd
.name())
3225 if dd
.daemon_id
== self
.get_mgr_id():
3227 hi
= self
.inventory
.get(dd
.hostname
, {})
3228 addr
= hi
.get('addr', dd
.hostname
)
3229 dashboard_urls
.append('%s//%s:%s/' % (proto
, addr
.split(':')[0],
3232 yml
= """# generated by cephadm
3233 # See https://prometheus.io/docs/alerting/configuration/ for documentation.
3239 group_by: ['alertname']
3243 receiver: 'ceph-dashboard'
3245 - name: 'ceph-dashboard'
3250 [" - url: '{}api/prometheus_receiver'".format(u
)
3251 for u
in dashboard_urls
]
3255 for dd
in self
.cache
.get_daemons_by_service('alertmanager'):
3256 deps
.append(dd
.name())
3257 hi
= self
.inventory
.get(dd
.hostname
, {})
3258 addr
= hi
.get('addr', dd
.hostname
)
3259 peers
.append(addr
.split(':')[0] + ':' + port
)
3262 "alertmanager.yml": yml
3267 def add_prometheus(self
, spec
):
3268 return self
._add
_daemon
('prometheus', spec
, self
._create
_prometheus
)
3270 def _create_prometheus(self
, daemon_id
, host
):
3271 return self
._create
_daemon
('prometheus', daemon_id
, host
)
3274 def apply_prometheus(self
, spec
):
3275 return self
._apply
(spec
)
3277 def add_node_exporter(self
, spec
):
3278 # type: (ServiceSpec) -> AsyncCompletion
3279 return self
._add
_daemon
('node-exporter', spec
,
3280 self
._create
_node
_exporter
)
3283 def apply_node_exporter(self
, spec
):
3284 return self
._apply
(spec
)
3286 def _create_node_exporter(self
, daemon_id
, host
):
3287 return self
._create
_daemon
('node-exporter', daemon_id
, host
)
3289 def add_crash(self
, spec
):
3290 # type: (ServiceSpec) -> AsyncCompletion
3291 return self
._add
_daemon
('crash', spec
,
3295 def apply_crash(self
, spec
):
3296 return self
._apply
(spec
)
3298 def _create_crash(self
, daemon_id
, host
):
3299 ret
, keyring
, err
= self
.mon_command({
3300 'prefix': 'auth get-or-create',
3301 'entity': 'client.crash.' + host
,
3302 'caps': ['mon', 'profile crash',
3303 'mgr', 'profile crash'],
3305 return self
._create
_daemon
('crash', daemon_id
, host
, keyring
=keyring
)
3307 def add_grafana(self
, spec
):
3308 # type: (ServiceSpec) -> AsyncCompletion
3309 return self
._add
_daemon
('grafana', spec
, self
._create
_grafana
)
3312 def apply_grafana(self
, spec
: ServiceSpec
):
3313 return self
._apply
(spec
)
3315 def _create_grafana(self
, daemon_id
, host
):
3316 # type: (str, str) -> str
3317 return self
._create
_daemon
('grafana', daemon_id
, host
)
3319 def add_alertmanager(self
, spec
):
3320 # type: (ServiceSpec) -> AsyncCompletion
3321 return self
._add
_daemon
('alertmanager', spec
, self
._create
_alertmanager
)
3324 def apply_alertmanager(self
, spec
: ServiceSpec
):
3325 return self
._apply
(spec
)
3327 def _create_alertmanager(self
, daemon_id
, host
):
3328 return self
._create
_daemon
('alertmanager', daemon_id
, host
)
3331 def _get_container_image_id(self
, image_name
):
3332 # pick a random host...
3334 for host_name
, hi
in self
.inventory
.items():
3338 raise OrchestratorError('no hosts defined')
3339 out
, err
, code
= self
._run
_cephadm
(
3340 host
, None, 'pull', [],
3345 raise OrchestratorError('Failed to pull %s on %s: %s' % (
3346 image_name
, host
, '\n'.join(out
)))
3347 j
= json
.loads('\n'.join(out
))
3348 image_id
= j
.get('image_id')
3349 ceph_version
= j
.get('ceph_version')
3350 self
.log
.debug('image %s -> id %s version %s' %
3351 (image_name
, image_id
, ceph_version
))
3352 return image_id
, ceph_version
3355 def upgrade_check(self
, image
, version
):
3357 target_name
= self
.container_image_base
+ ':v' + version
3361 raise OrchestratorError('must specify either image or version')
3363 target_id
, target_version
= self
._get
_container
_image
_id
(target_name
)
3364 self
.log
.debug('Target image %s id %s version %s' % (
3365 target_name
, target_id
, target_version
))
3367 'target_name': target_name
,
3368 'target_id': target_id
,
3369 'target_version': target_version
,
3370 'needs_update': dict(),
3371 'up_to_date': list(),
3373 for host
, dm
in self
.cache
.daemons
.items():
3374 for name
, dd
in dm
.items():
3375 if target_id
== dd
.container_image_id
:
3376 r
['up_to_date'].append(dd
.name())
3378 r
['needs_update'][dd
.name()] = {
3379 'current_name': dd
.container_image_name
,
3380 'current_id': dd
.container_image_id
,
3381 'current_version': dd
.version
,
3383 return json
.dumps(r
, indent
=4, sort_keys
=True)
3386 def upgrade_status(self
):
3387 r
= orchestrator
.UpgradeStatusSpec()
3388 if self
.upgrade_state
:
3389 r
.target_image
= self
.upgrade_state
.get('target_name')
3390 r
.in_progress
= True
3391 if self
.upgrade_state
.get('error'):
3392 r
.message
= 'Error: ' + self
.upgrade_state
.get('error')
3393 elif self
.upgrade_state
.get('paused'):
3394 r
.message
= 'Upgrade paused'
3398 def upgrade_start(self
, image
, version
):
3399 if self
.mode
!= 'root':
3400 raise OrchestratorError('upgrade is not supported in %s mode' % (
3404 (major
, minor
, patch
) = version
.split('.')
3405 assert int(minor
) >= 0
3406 assert int(patch
) >= 0
3408 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
3409 if int(major
) < 15 or (int(major
) == 15 and int(minor
) < 2):
3410 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
3411 target_name
= self
.container_image_base
+ ':v' + version
3415 raise OrchestratorError('must specify either image or version')
3416 if self
.upgrade_state
:
3417 if self
.upgrade_state
.get('target_name') != target_name
:
3418 raise OrchestratorError(
3419 'Upgrade to %s (not %s) already in progress' %
3420 (self
.upgrade_state
.get('target_name'), target_name
))
3421 if self
.upgrade_state
.get('paused'):
3422 del self
.upgrade_state
['paused']
3423 self
._save
_upgrade
_state
()
3424 return 'Resumed upgrade to %s' % self
.upgrade_state
.get('target_name')
3425 return 'Upgrade to %s in progress' % self
.upgrade_state
.get('target_name')
3426 self
.upgrade_state
= {
3427 'target_name': target_name
,
3428 'progress_id': str(uuid
.uuid4()),
3430 self
._update
_upgrade
_progress
(0.0)
3431 self
._save
_upgrade
_state
()
3432 self
._clear
_upgrade
_health
_checks
()
3434 return 'Initiating upgrade to %s' % (target_name
)
3437 def upgrade_pause(self
):
3438 if not self
.upgrade_state
:
3439 raise OrchestratorError('No upgrade in progress')
3440 if self
.upgrade_state
.get('paused'):
3441 return 'Upgrade to %s already paused' % self
.upgrade_state
.get('target_name')
3442 self
.upgrade_state
['paused'] = True
3443 self
._save
_upgrade
_state
()
3444 return 'Paused upgrade to %s' % self
.upgrade_state
.get('target_name')
3447 def upgrade_resume(self
):
3448 if not self
.upgrade_state
:
3449 raise OrchestratorError('No upgrade in progress')
3450 if not self
.upgrade_state
.get('paused'):
3451 return 'Upgrade to %s not paused' % self
.upgrade_state
.get('target_name')
3452 del self
.upgrade_state
['paused']
3453 self
._save
_upgrade
_state
()
3455 return 'Resumed upgrade to %s' % self
.upgrade_state
.get('target_name')
3458 def upgrade_stop(self
):
3459 if not self
.upgrade_state
:
3460 return 'No upgrade in progress'
3461 target_name
= self
.upgrade_state
.get('target_name')
3462 if 'progress_id' in self
.upgrade_state
:
3463 self
.remote('progress', 'complete',
3464 self
.upgrade_state
['progress_id'])
3465 self
.upgrade_state
= None
3466 self
._save
_upgrade
_state
()
3467 self
._clear
_upgrade
_health
_checks
()
3469 return 'Stopped upgrade to %s' % target_name
3472 def remove_osds(self
, osd_ids
: List
[str],
3473 replace
: bool = False,
3474 force
: bool = False):
3476 Takes a list of OSDs and schedules them for removal.
3477 The function that takes care of the actual removal is
3481 daemons
= self
.cache
.get_daemons_by_service('osd')
3482 found
: Set
[OSDRemoval
] = set()
3483 for daemon
in daemons
:
3484 if daemon
.daemon_id
not in osd_ids
:
3486 found
.add(OSDRemoval(daemon
.daemon_id
, replace
, force
,
3487 daemon
.hostname
, daemon
.name(),
3488 datetime
.datetime
.utcnow(), -1))
3490 not_found
= {osd_id
for osd_id
in osd_ids
if osd_id
not in [x
.osd_id
for x
in found
]}
3492 raise OrchestratorError('Unable to find OSD: %s' % not_found
)
3494 self
.rm_util
.queue_osds_for_removal(found
)
3496 # trigger the serve loop to initiate the removal
3497 self
._kick
_serve
_loop
()
3498 return "Scheduled OSD(s) for removal"
3501 def remove_osds_status(self
):
3503 The CLI call to retrieve an osd removal report
3505 return self
.rm_util
.report
3508 class BaseScheduler(object):
3510 Base Scheduler Interface
3512 * requires a placement_spec
3514 `place(host_pool)` needs to return a List[HostPlacementSpec, ..]
3517 def __init__(self
, placement_spec
):
3518 # type: (PlacementSpec) -> None
3519 self
.placement_spec
= placement_spec
3521 def place(self
, host_pool
, count
=None):
3522 # type: (List, Optional[int]) -> List[HostPlacementSpec]
3523 raise NotImplementedError
3526 class SimpleScheduler(BaseScheduler
):
3528 The most simple way to pick/schedule a set of hosts.
3529 1) Shuffle the provided host_pool
3530 2) Select from list up to :count
3532 def __init__(self
, placement_spec
):
3533 super(SimpleScheduler
, self
).__init
__(placement_spec
)
3535 def place(self
, host_pool
, count
=None):
3536 # type: (List, Optional[int]) -> List[HostPlacementSpec]
3539 host_pool
= [x
for x
in host_pool
]
3540 # shuffle for pseudo random selection
3541 random
.shuffle(host_pool
)
3542 return host_pool
[:count
]
3545 class HostAssignment(object):
3547 A class to detect if hosts are being passed imperative or declarative
3548 If the spec is populated via the `hosts/hosts` field it will not load
3549 any hosts into the list.
3550 If the spec isn't populated, i.e. when only num or label is present (declarative)
3551 it will use the provided `get_host_func` to load it from the inventory.
3553 Schedulers can be assigned to pick hosts from the pool.
3557 spec
, # type: ServiceSpec
3558 get_hosts_func
, # type: Callable[[Optional[str]],List[str]]
3559 get_daemons_func
, # type: Callable[[str],List[orchestrator.DaemonDescription]]
3561 filter_new_host
=None, # type: Optional[Callable[[str],bool]]
3562 scheduler
=None, # type: Optional[BaseScheduler]
3564 assert spec
and get_hosts_func
and get_daemons_func
3565 self
.spec
= spec
# type: ServiceSpec
3566 self
.scheduler
= scheduler
if scheduler
else SimpleScheduler(self
.spec
.placement
)
3567 self
.get_hosts_func
= get_hosts_func
3568 self
.get_daemons_func
= get_daemons_func
3569 self
.filter_new_host
= filter_new_host
3570 self
.service_name
= spec
.service_name()
3574 self
.spec
.validate()
3576 if self
.spec
.placement
.hosts
:
3577 explicit_hostnames
= {h
.hostname
for h
in self
.spec
.placement
.hosts
}
3578 unknown_hosts
= explicit_hostnames
.difference(set(self
.get_hosts_func(None)))
3580 raise OrchestratorValidationError(
3581 f
'Cannot place {self.spec.one_line_str()} on {unknown_hosts}: Unknown hosts')
3583 if self
.spec
.placement
.host_pattern
:
3584 pattern_hostnames
= self
.spec
.placement
.pattern_matches_hosts(self
.get_hosts_func(None))
3585 if not pattern_hostnames
:
3586 raise OrchestratorValidationError(
3587 f
'Cannot place {self.spec.one_line_str()}: No matching hosts')
3589 if self
.spec
.placement
.label
:
3590 label_hostnames
= self
.get_hosts_func(self
.spec
.placement
.label
)
3591 if not label_hostnames
:
3592 raise OrchestratorValidationError(
3593 f
'Cannot place {self.spec.one_line_str()}: No matching '
3594 f
'hosts for label {self.spec.placement.label}')
3597 # type: () -> List[HostPlacementSpec]
3599 Load hosts into the spec.placement.hosts container.
3605 if self
.spec
.placement
.count
== 0:
3608 # respect any explicit host list
3609 if self
.spec
.placement
.hosts
and not self
.spec
.placement
.count
:
3610 logger
.debug('Provided hosts: %s' % self
.spec
.placement
.hosts
)
3611 return self
.spec
.placement
.hosts
3613 # respect host_pattern
3614 if self
.spec
.placement
.host_pattern
:
3616 HostPlacementSpec(x
, '', '')
3617 for x
in self
.spec
.placement
.pattern_matches_hosts(self
.get_hosts_func(None))
3619 logger
.debug('All hosts: {}'.format(candidates
))
3623 if self
.spec
.placement
.hosts
and \
3624 self
.spec
.placement
.count
and \
3625 len(self
.spec
.placement
.hosts
) >= self
.spec
.placement
.count
:
3626 hosts
= self
.spec
.placement
.hosts
3627 logger
.debug('place %d over provided host list: %s' % (
3629 count
= self
.spec
.placement
.count
3630 elif self
.spec
.placement
.label
:
3632 HostPlacementSpec(x
, '', '')
3633 for x
in self
.get_hosts_func(self
.spec
.placement
.label
)
3635 if not self
.spec
.placement
.count
:
3636 logger
.debug('Labeled hosts: {}'.format(hosts
))
3638 count
= self
.spec
.placement
.count
3639 logger
.debug('place %d over label %s: %s' % (
3640 count
, self
.spec
.placement
.label
, hosts
))
3643 HostPlacementSpec(x
, '', '')
3644 for x
in self
.get_hosts_func(None)
3646 if self
.spec
.placement
.count
:
3647 count
= self
.spec
.placement
.count
3649 # this should be a totally empty spec given all of the
3650 # alternative paths above.
3651 assert self
.spec
.placement
.count
is None
3652 assert not self
.spec
.placement
.hosts
3653 assert not self
.spec
.placement
.label
3655 logger
.debug('place %d over all hosts: %s' % (count
, hosts
))
3657 # we need to select a subset of the candidates
3659 # if a partial host list is provided, always start with that
3660 if len(self
.spec
.placement
.hosts
) < count
:
3661 chosen
= self
.spec
.placement
.hosts
3665 # prefer hosts that already have services
3666 daemons
= self
.get_daemons_func(self
.service_name
)
3667 hosts_with_daemons
= {d
.hostname
for d
in daemons
}
3668 # calc existing daemons (that aren't already in chosen)
3669 chosen_hosts
= [hs
.hostname
for hs
in chosen
]
3670 existing
= [hs
for hs
in hosts
3671 if hs
.hostname
in hosts_with_daemons
and \
3672 hs
.hostname
not in chosen_hosts
]
3673 if len(chosen
+ existing
) >= count
:
3674 chosen
= chosen
+ self
.scheduler
.place(
3676 count
- len(chosen
))
3677 logger
.debug('Hosts with existing daemons: {}'.format(chosen
))
3680 need
= count
- len(existing
+ chosen
)
3681 others
= [hs
for hs
in hosts
3682 if hs
.hostname
not in hosts_with_daemons
]
3683 if self
.filter_new_host
:
3685 others
= [h
for h
in others
if self
.filter_new_host(h
.hostname
)]
3686 logger
.debug('filtered %s down to %s' % (old
, hosts
))
3687 chosen
= chosen
+ self
.scheduler
.place(others
, need
)
3688 logger
.debug('Combine hosts with existing daemons %s + new hosts %s' % (
3690 return existing
+ chosen