7 from typing
import TYPE_CHECKING
, Dict
, List
, Iterator
, Optional
, Any
, Tuple
, Set
, Mapping
, cast
, \
11 from ceph
.deployment
import inventory
12 from ceph
.deployment
.service_spec
import ServiceSpec
, PlacementSpec
13 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
14 from orchestrator
import OrchestratorError
, HostSpec
, OrchestratorEvent
, service_to_daemon_types
15 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
17 from .utils
import resolve_ip
18 from .migrations
import queue_migrate_nfs_spec
21 from .module
import CephadmOrchestrator
24 logger
= logging
.getLogger(__name__
)
26 HOST_CACHE_PREFIX
= "host."
27 SPEC_STORE_PREFIX
= "spec."
28 AGENT_CACHE_PREFIX
= 'agent.'
33 The inventory stores a HostSpec for all hosts persistently.
36 def __init__(self
, mgr
: 'CephadmOrchestrator'):
38 adjusted_addrs
= False
40 def is_valid_ip(ip
: str) -> bool:
42 ipaddress
.ip_address(ip
)
48 i
= self
.mgr
.get_store('inventory')
50 self
._inventory
: Dict
[str, dict] = json
.loads(i
)
51 # handle old clusters missing 'hostname' key from hostspec
52 for k
, v
in self
._inventory
.items():
53 if 'hostname' not in v
:
56 # convert legacy non-IP addr?
57 if is_valid_ip(str(v
.get('addr'))):
59 if len(self
._inventory
) > 1:
60 if k
== socket
.gethostname():
61 # Never try to resolve our own host! This is
62 # fraught and can lead to either a loopback
63 # address (due to podman's futzing with
64 # /etc/hosts) or a private IP based on the CNI
65 # configuration. Instead, wait until the mgr
66 # fails over to another host and let them resolve
69 ip
= resolve_ip(cast(str, v
.get('addr')))
71 # we only have 1 node in the cluster, so we can't
72 # rely on another host doing the lookup. use the
73 # IP the mgr binds to.
74 ip
= self
.mgr
.get_mgr_ip()
75 if is_valid_ip(ip
) and not ip
.startswith('127.0.'):
77 f
"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
84 self
._inventory
= dict()
85 logger
.debug('Loaded inventory %s' % self
._inventory
)
87 def keys(self
) -> List
[str]:
88 return list(self
._inventory
.keys())
90 def __contains__(self
, host
: str) -> bool:
91 return host
in self
._inventory
93 def assert_host(self
, host
: str) -> None:
94 if host
not in self
._inventory
:
95 raise OrchestratorError('host %s does not exist' % host
)
97 def add_host(self
, spec
: HostSpec
) -> None:
98 if spec
.hostname
in self
._inventory
:
100 if self
.get_addr(spec
.hostname
) != spec
.addr
:
101 self
.set_addr(spec
.hostname
, spec
.addr
)
103 for label
in spec
.labels
:
104 self
.add_label(spec
.hostname
, label
)
106 self
._inventory
[spec
.hostname
] = spec
.to_json()
109 def rm_host(self
, host
: str) -> None:
110 self
.assert_host(host
)
111 del self
._inventory
[host
]
114 def set_addr(self
, host
: str, addr
: str) -> None:
115 self
.assert_host(host
)
116 self
._inventory
[host
]['addr'] = addr
119 def add_label(self
, host
: str, label
: str) -> None:
120 self
.assert_host(host
)
122 if 'labels' not in self
._inventory
[host
]:
123 self
._inventory
[host
]['labels'] = list()
124 if label
not in self
._inventory
[host
]['labels']:
125 self
._inventory
[host
]['labels'].append(label
)
128 def rm_label(self
, host
: str, label
: str) -> None:
129 self
.assert_host(host
)
131 if 'labels' not in self
._inventory
[host
]:
132 self
._inventory
[host
]['labels'] = list()
133 if label
in self
._inventory
[host
]['labels']:
134 self
._inventory
[host
]['labels'].remove(label
)
137 def has_label(self
, host
: str, label
: str) -> bool:
139 host
in self
._inventory
140 and label
in self
._inventory
[host
].get('labels', [])
143 def get_addr(self
, host
: str) -> str:
144 self
.assert_host(host
)
145 return self
._inventory
[host
].get('addr', host
)
147 def spec_from_dict(self
, info
: dict) -> HostSpec
:
148 hostname
= info
['hostname']
151 addr
=info
.get('addr', hostname
),
152 labels
=info
.get('labels', []),
153 status
='Offline' if hostname
in self
.mgr
.offline_hosts
else info
.get('status', ''),
156 def all_specs(self
) -> List
[HostSpec
]:
157 return list(map(self
.spec_from_dict
, self
._inventory
.values()))
159 def get_host_with_state(self
, state
: str = "") -> List
[str]:
160 """return a list of host names in a specific state"""
161 return [h
for h
in self
._inventory
if self
._inventory
[h
].get("status", "").lower() == state
]
163 def save(self
) -> None:
164 self
.mgr
.set_store('inventory', json
.dumps(self
._inventory
))
167 class SpecDescription(NamedTuple
):
169 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]]
170 created
: datetime
.datetime
171 deleted
: Optional
[datetime
.datetime
]
175 def __init__(self
, mgr
):
176 # type: (CephadmOrchestrator) -> None
178 self
._specs
= {} # type: Dict[str, ServiceSpec]
179 # service_name -> rank -> gen -> daemon_id
180 self
._rank
_maps
= {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
181 self
.spec_created
= {} # type: Dict[str, datetime.datetime]
182 self
.spec_deleted
= {} # type: Dict[str, datetime.datetime]
183 self
.spec_preview
= {} # type: Dict[str, ServiceSpec]
186 def all_specs(self
) -> Mapping
[str, ServiceSpec
]:
188 returns active and deleted specs. Returns read-only dict.
192 def __contains__(self
, name
: str) -> bool:
193 return name
in self
._specs
195 def __getitem__(self
, name
: str) -> SpecDescription
:
196 if name
not in self
._specs
:
197 raise OrchestratorError(f
'Service {name} not found.')
198 return SpecDescription(self
._specs
[name
],
199 self
._rank
_maps
.get(name
),
200 self
.spec_created
[name
],
201 self
.spec_deleted
.get(name
, None))
204 def active_specs(self
) -> Mapping
[str, ServiceSpec
]:
205 return {k
: v
for k
, v
in self
._specs
.items() if k
not in self
.spec_deleted
}
209 for k
, v
in self
.mgr
.get_store_prefix(SPEC_STORE_PREFIX
).items():
210 service_name
= k
[len(SPEC_STORE_PREFIX
):]
212 j
= cast(Dict
[str, dict], json
.loads(v
))
214 (self
.mgr
.migration_current
or 0) < 3
215 and j
['spec'].get('service_type') == 'nfs'
217 self
.mgr
.log
.debug(f
'found legacy nfs spec {j}')
218 queue_migrate_nfs_spec(self
.mgr
, j
)
219 spec
= ServiceSpec
.from_json(j
['spec'])
220 created
= str_to_datetime(cast(str, j
['created']))
221 self
._specs
[service_name
] = spec
222 self
.spec_created
[service_name
] = created
225 deleted
= str_to_datetime(cast(str, j
['deleted']))
226 self
.spec_deleted
[service_name
] = deleted
228 if 'rank_map' in j
and isinstance(j
['rank_map'], dict):
229 self
._rank
_maps
[service_name
] = {}
230 for rank_str
, m
in j
['rank_map'].items():
234 logger
.exception(f
"failed to parse rank in {j['rank_map']}")
236 if isinstance(m
, dict):
237 self
._rank
_maps
[service_name
][rank
] = {}
238 for gen_str
, name
in m
.items():
242 logger
.exception(f
"failed to parse gen in {j['rank_map']}")
244 if isinstance(name
, str) or m
is None:
245 self
._rank
_maps
[service_name
][rank
][gen
] = name
247 self
.mgr
.log
.debug('SpecStore: loaded spec for %s' % (
249 except Exception as e
:
250 self
.mgr
.log
.warning('unable to load spec for %s: %s' % (
257 update_create
: bool = True,
259 name
= spec
.service_name()
260 if spec
.preview_only
:
261 self
.spec_preview
[name
] = spec
263 self
._specs
[name
] = spec
266 self
.spec_created
[name
] = datetime_now()
269 def save_rank_map(self
,
271 rank_map
: Dict
[int, Dict
[int, Optional
[str]]]) -> None:
272 self
._rank
_maps
[name
] = rank_map
275 def _save(self
, name
: str) -> None:
276 data
: Dict
[str, Any
] = {
277 'spec': self
._specs
[name
].to_json(),
278 'created': datetime_to_str(self
.spec_created
[name
]),
280 if name
in self
._rank
_maps
:
281 data
['rank_map'] = self
._rank
_maps
[name
]
282 if name
in self
.spec_deleted
:
283 data
['deleted'] = datetime_to_str(self
.spec_deleted
[name
])
286 SPEC_STORE_PREFIX
+ name
,
287 json
.dumps(data
, sort_keys
=True),
289 self
.mgr
.events
.for_service(self
._specs
[name
],
290 OrchestratorEvent
.INFO
,
291 'service was created')
293 def rm(self
, service_name
: str) -> bool:
294 if service_name
not in self
._specs
:
297 if self
._specs
[service_name
].preview_only
:
298 self
.finally_rm(service_name
)
301 self
.spec_deleted
[service_name
] = datetime_now()
302 self
.save(self
._specs
[service_name
], update_create
=False)
305 def finally_rm(self
, service_name
):
306 # type: (str) -> bool
307 found
= service_name
in self
._specs
309 del self
._specs
[service_name
]
310 if service_name
in self
._rank
_maps
:
311 del self
._rank
_maps
[service_name
]
312 del self
.spec_created
[service_name
]
313 if service_name
in self
.spec_deleted
:
314 del self
.spec_deleted
[service_name
]
315 self
.mgr
.set_store(SPEC_STORE_PREFIX
+ service_name
, None)
318 def get_created(self
, spec
: ServiceSpec
) -> Optional
[datetime
.datetime
]:
319 return self
.spec_created
.get(spec
.service_name())
322 class ClientKeyringSpec(object):
324 A client keyring file that we should maintain
330 placement
: PlacementSpec
,
331 mode
: Optional
[int] = None,
332 uid
: Optional
[int] = None,
333 gid
: Optional
[int] = None,
336 self
.placement
= placement
337 self
.mode
= mode
or 0o600
341 def validate(self
) -> None:
344 def to_json(self
) -> Dict
[str, Any
]:
346 'entity': self
.entity
,
347 'placement': self
.placement
.to_json(),
354 def path(self
) -> str:
355 return f
'/etc/ceph/ceph.{self.entity}.keyring'
358 def from_json(cls
: Type
, data
: dict) -> 'ClientKeyringSpec':
361 c
['placement'] = PlacementSpec
.from_json(c
['placement'])
367 class ClientKeyringStore():
369 Track client keyring files that we are supposed to maintain
372 def __init__(self
, mgr
):
373 # type: (CephadmOrchestrator) -> None
374 self
.mgr
: CephadmOrchestrator
= mgr
376 self
.keys
: Dict
[str, ClientKeyringSpec
] = {}
378 def load(self
) -> None:
379 c
= self
.mgr
.get_store('client_keyrings') or b
'{}'
381 for e
, d
in j
.items():
382 self
.keys
[e
] = ClientKeyringSpec
.from_json(d
)
384 def save(self
) -> None:
386 k
: v
.to_json() for k
, v
in self
.keys
.items()
388 self
.mgr
.set_store('client_keyrings', json
.dumps(data
))
390 def update(self
, ks
: ClientKeyringSpec
) -> None:
391 self
.keys
[ks
.entity
] = ks
394 def rm(self
, entity
: str) -> None:
395 if entity
in self
.keys
:
396 del self
.keys
[entity
]
402 HostCache stores different things:
404 1. `daemons`: Deployed daemons O(daemons)
406 They're part of the configuration nowadays and need to be
407 persistent. The name "daemon cache" is unfortunately a bit misleading.
408 Like for example we really need to know where daemons are deployed on
409 hosts that are offline.
411 2. `devices`: ceph-volume inventory cache O(hosts)
413 As soon as this is populated, it becomes more or less read-only.
415 3. `networks`: network interfaces for each host. O(hosts)
417 This is needed in order to deploy MONs. As this is mostly read-only.
419 4. `last_client_files` O(hosts)
421 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
422 (ceph.conf or client keyrings).
424 5. `scheduled_daemon_actions`: O(daemons)
426 Used to run daemon actions after deploying a daemon. We need to
427 store it persistently, in order to stay consistent across
431 def __init__(self
, mgr
):
432 # type: (CephadmOrchestrator) -> None
433 self
.mgr
: CephadmOrchestrator
= mgr
434 self
.daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
435 self
.last_daemon_update
= {} # type: Dict[str, datetime.datetime]
436 self
.devices
= {} # type: Dict[str, List[inventory.Device]]
437 self
.facts
= {} # type: Dict[str, Dict[str, Any]]
438 self
.last_facts_update
= {} # type: Dict[str, datetime.datetime]
439 self
.last_autotune
= {} # type: Dict[str, datetime.datetime]
440 self
.osdspec_previews
= {} # type: Dict[str, List[Dict[str, Any]]]
441 self
.osdspec_last_applied
= {} # type: Dict[str, Dict[str, datetime.datetime]]
442 self
.networks
= {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
443 self
.last_network_update
= {} # type: Dict[str, datetime.datetime]
444 self
.last_device_update
= {} # type: Dict[str, datetime.datetime]
445 self
.last_device_change
= {} # type: Dict[str, datetime.datetime]
446 self
.daemon_refresh_queue
= [] # type: List[str]
447 self
.device_refresh_queue
= [] # type: List[str]
448 self
.network_refresh_queue
= [] # type: List[str]
449 self
.osdspec_previews_refresh_queue
= [] # type: List[str]
451 # host -> daemon name -> dict
452 self
.daemon_config_deps
= {} # type: Dict[str, Dict[str, Dict[str,Any]]]
453 self
.last_host_check
= {} # type: Dict[str, datetime.datetime]
454 self
.loading_osdspec_preview
= set() # type: Set[str]
455 self
.last_client_files
: Dict
[str, Dict
[str, Tuple
[str, int, int, int]]] = {}
456 self
.registry_login_queue
: Set
[str] = set()
458 self
.scheduled_daemon_actions
: Dict
[str, Dict
[str, str]] = {}
460 self
.metadata_up_to_date
= {} # type: Dict[str, bool]
464 for k
, v
in self
.mgr
.get_store_prefix(HOST_CACHE_PREFIX
).items():
465 host
= k
[len(HOST_CACHE_PREFIX
):]
466 if host
not in self
.mgr
.inventory
:
467 self
.mgr
.log
.warning('removing stray HostCache host record %s' % (
469 self
.mgr
.set_store(k
, None)
472 if 'last_device_update' in j
:
473 self
.last_device_update
[host
] = str_to_datetime(j
['last_device_update'])
475 self
.device_refresh_queue
.append(host
)
476 if 'last_device_change' in j
:
477 self
.last_device_change
[host
] = str_to_datetime(j
['last_device_change'])
478 # for services, we ignore the persisted last_*_update
479 # and always trigger a new scrape on mgr restart.
480 self
.daemon_refresh_queue
.append(host
)
481 self
.network_refresh_queue
.append(host
)
482 self
.daemons
[host
] = {}
483 self
.osdspec_previews
[host
] = []
484 self
.osdspec_last_applied
[host
] = {}
485 self
.devices
[host
] = []
486 self
.networks
[host
] = {}
487 self
.daemon_config_deps
[host
] = {}
488 for name
, d
in j
.get('daemons', {}).items():
489 self
.daemons
[host
][name
] = \
490 orchestrator
.DaemonDescription
.from_json(d
)
491 for d
in j
.get('devices', []):
492 self
.devices
[host
].append(inventory
.Device
.from_json(d
))
493 self
.networks
[host
] = j
.get('networks_and_interfaces', {})
494 self
.osdspec_previews
[host
] = j
.get('osdspec_previews', {})
495 self
.last_client_files
[host
] = j
.get('last_client_files', {})
496 for name
, ts
in j
.get('osdspec_last_applied', {}).items():
497 self
.osdspec_last_applied
[host
][name
] = str_to_datetime(ts
)
499 for name
, d
in j
.get('daemon_config_deps', {}).items():
500 self
.daemon_config_deps
[host
][name
] = {
501 'deps': d
.get('deps', []),
502 'last_config': str_to_datetime(d
['last_config']),
504 if 'last_host_check' in j
:
505 self
.last_host_check
[host
] = str_to_datetime(j
['last_host_check'])
506 self
.registry_login_queue
.add(host
)
507 self
.scheduled_daemon_actions
[host
] = j
.get('scheduled_daemon_actions', {})
508 self
.metadata_up_to_date
[host
] = j
.get('metadata_up_to_date', False)
511 'HostCache.load: host %s has %d daemons, '
512 '%d devices, %d networks' % (
513 host
, len(self
.daemons
[host
]), len(self
.devices
[host
]),
514 len(self
.networks
[host
])))
515 except Exception as e
:
516 self
.mgr
.log
.warning('unable to load cached state for %s: %s' % (
520 def update_host_daemons(self
, host
, dm
):
521 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
522 self
.daemons
[host
] = dm
523 self
.last_daemon_update
[host
] = datetime_now()
525 def update_host_facts(self
, host
, facts
):
526 # type: (str, Dict[str, Dict[str, Any]]) -> None
527 self
.facts
[host
] = facts
528 self
.last_facts_update
[host
] = datetime_now()
530 def update_autotune(self
, host
: str) -> None:
531 self
.last_autotune
[host
] = datetime_now()
533 def invalidate_autotune(self
, host
: str) -> None:
534 if host
in self
.last_autotune
:
535 del self
.last_autotune
[host
]
537 def devices_changed(self
, host
: str, b
: List
[inventory
.Device
]) -> bool:
538 a
= self
.devices
[host
]
541 aj
= {d
.path
: d
.to_json() for d
in a
}
542 bj
= {d
.path
: d
.to_json() for d
in b
}
544 self
.mgr
.log
.info("Detected new or changed devices on %s" % host
)
548 def update_host_devices(
551 dls
: List
[inventory
.Device
],
554 host
not in self
.devices
555 or host
not in self
.last_device_change
556 or self
.devices_changed(host
, dls
)
558 self
.last_device_change
[host
] = datetime_now()
559 self
.last_device_update
[host
] = datetime_now()
560 self
.devices
[host
] = dls
562 def update_host_networks(
565 nets
: Dict
[str, Dict
[str, List
[str]]]
567 self
.networks
[host
] = nets
568 self
.last_network_update
[host
] = datetime_now()
570 def update_daemon_config_deps(self
, host
: str, name
: str, deps
: List
[str], stamp
: datetime
.datetime
) -> None:
571 self
.daemon_config_deps
[host
][name
] = {
573 'last_config': stamp
,
576 def update_last_host_check(self
, host
):
577 # type: (str) -> None
578 self
.last_host_check
[host
] = datetime_now()
580 def update_osdspec_last_applied(self
, host
, service_name
, ts
):
581 # type: (str, str, datetime.datetime) -> None
582 self
.osdspec_last_applied
[host
][service_name
] = ts
584 def update_client_file(self
,
591 if host
not in self
.last_client_files
:
592 self
.last_client_files
[host
] = {}
593 self
.last_client_files
[host
][path
] = (digest
, mode
, uid
, gid
)
595 def removed_client_file(self
, host
: str, path
: str) -> None:
597 host
in self
.last_client_files
598 and path
in self
.last_client_files
[host
]
600 del self
.last_client_files
[host
][path
]
602 def prime_empty_host(self
, host
):
603 # type: (str) -> None
605 Install an empty entry for a host
607 self
.daemons
[host
] = {}
608 self
.devices
[host
] = []
609 self
.networks
[host
] = {}
610 self
.osdspec_previews
[host
] = []
611 self
.osdspec_last_applied
[host
] = {}
612 self
.daemon_config_deps
[host
] = {}
613 self
.daemon_refresh_queue
.append(host
)
614 self
.device_refresh_queue
.append(host
)
615 self
.network_refresh_queue
.append(host
)
616 self
.osdspec_previews_refresh_queue
.append(host
)
617 self
.registry_login_queue
.add(host
)
618 self
.last_client_files
[host
] = {}
620 def refresh_all_host_info(self
, host
):
621 # type: (str) -> None
623 self
.last_host_check
.pop(host
, None)
624 self
.daemon_refresh_queue
.append(host
)
625 self
.registry_login_queue
.add(host
)
626 self
.device_refresh_queue
.append(host
)
627 self
.last_facts_update
.pop(host
, None)
628 self
.osdspec_previews_refresh_queue
.append(host
)
629 self
.last_autotune
.pop(host
, None)
631 def invalidate_host_daemons(self
, host
):
632 # type: (str) -> None
633 self
.daemon_refresh_queue
.append(host
)
634 if host
in self
.last_daemon_update
:
635 del self
.last_daemon_update
[host
]
638 def invalidate_host_devices(self
, host
):
639 # type: (str) -> None
640 self
.device_refresh_queue
.append(host
)
641 if host
in self
.last_device_update
:
642 del self
.last_device_update
[host
]
645 def invalidate_host_networks(self
, host
):
646 # type: (str) -> None
647 self
.network_refresh_queue
.append(host
)
648 if host
in self
.last_network_update
:
649 del self
.last_network_update
[host
]
652 def distribute_new_registry_login_info(self
) -> None:
653 self
.registry_login_queue
= set(self
.mgr
.inventory
.keys())
655 def save_host(self
, host
: str) -> None:
656 j
: Dict
[str, Any
] = {
659 'osdspec_previews': [],
660 'osdspec_last_applied': {},
661 'daemon_config_deps': {},
663 if host
in self
.last_daemon_update
:
664 j
['last_daemon_update'] = datetime_to_str(self
.last_daemon_update
[host
])
665 if host
in self
.last_device_update
:
666 j
['last_device_update'] = datetime_to_str(self
.last_device_update
[host
])
667 if host
in self
.last_network_update
:
668 j
['last_network_update'] = datetime_to_str(self
.last_network_update
[host
])
669 if host
in self
.last_device_change
:
670 j
['last_device_change'] = datetime_to_str(self
.last_device_change
[host
])
671 if host
in self
.daemons
:
672 for name
, dd
in self
.daemons
[host
].items():
673 j
['daemons'][name
] = dd
.to_json()
674 if host
in self
.devices
:
675 for d
in self
.devices
[host
]:
676 j
['devices'].append(d
.to_json())
677 if host
in self
.networks
:
678 j
['networks_and_interfaces'] = self
.networks
[host
]
679 if host
in self
.daemon_config_deps
:
680 for name
, depi
in self
.daemon_config_deps
[host
].items():
681 j
['daemon_config_deps'][name
] = {
682 'deps': depi
.get('deps', []),
683 'last_config': datetime_to_str(depi
['last_config']),
685 if host
in self
.osdspec_previews
and self
.osdspec_previews
[host
]:
686 j
['osdspec_previews'] = self
.osdspec_previews
[host
]
687 if host
in self
.osdspec_last_applied
:
688 for name
, ts
in self
.osdspec_last_applied
[host
].items():
689 j
['osdspec_last_applied'][name
] = datetime_to_str(ts
)
691 if host
in self
.last_host_check
:
692 j
['last_host_check'] = datetime_to_str(self
.last_host_check
[host
])
694 if host
in self
.last_client_files
:
695 j
['last_client_files'] = self
.last_client_files
[host
]
696 if host
in self
.scheduled_daemon_actions
:
697 j
['scheduled_daemon_actions'] = self
.scheduled_daemon_actions
[host
]
698 if host
in self
.metadata_up_to_date
:
699 j
['metadata_up_to_date'] = self
.metadata_up_to_date
[host
]
701 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, json
.dumps(j
))
703 def rm_host(self
, host
):
704 # type: (str) -> None
705 if host
in self
.daemons
:
706 del self
.daemons
[host
]
707 if host
in self
.devices
:
708 del self
.devices
[host
]
709 if host
in self
.facts
:
711 if host
in self
.last_facts_update
:
712 del self
.last_facts_update
[host
]
713 if host
in self
.last_autotune
:
714 del self
.last_autotune
[host
]
715 if host
in self
.osdspec_previews
:
716 del self
.osdspec_previews
[host
]
717 if host
in self
.osdspec_last_applied
:
718 del self
.osdspec_last_applied
[host
]
719 if host
in self
.loading_osdspec_preview
:
720 self
.loading_osdspec_preview
.remove(host
)
721 if host
in self
.networks
:
722 del self
.networks
[host
]
723 if host
in self
.last_daemon_update
:
724 del self
.last_daemon_update
[host
]
725 if host
in self
.last_device_update
:
726 del self
.last_device_update
[host
]
727 if host
in self
.last_network_update
:
728 del self
.last_network_update
[host
]
729 if host
in self
.last_device_change
:
730 del self
.last_device_change
[host
]
731 if host
in self
.daemon_config_deps
:
732 del self
.daemon_config_deps
[host
]
733 if host
in self
.scheduled_daemon_actions
:
734 del self
.scheduled_daemon_actions
[host
]
735 if host
in self
.last_client_files
:
736 del self
.last_client_files
[host
]
737 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, None)
740 # type: () -> List[str]
741 return list(self
.daemons
)
743 def get_schedulable_hosts(self
) -> List
[HostSpec
]:
745 Returns all usable hosts that went through _refresh_host_daemons().
747 This mitigates a potential race, where new host was added *after*
748 ``_refresh_host_daemons()`` was called, but *before*
749 ``_apply_all_specs()`` was called. thus we end up with a hosts
750 where daemons might be running, but we have not yet detected them.
753 h
for h
in self
.mgr
.inventory
.all_specs()
755 self
.host_had_daemon_refresh(h
.hostname
)
756 and '_no_schedule' not in h
.labels
760 def get_non_draining_hosts(self
) -> List
[HostSpec
]:
762 Returns all hosts that do not have _no_schedule label.
764 Useful for the agent who needs this specific list rather than the
765 schedulable_hosts since the agent needs to be deployed on hosts with
769 h
for h
in self
.mgr
.inventory
.all_specs() if '_no_schedule' not in h
.labels
772 def get_unreachable_hosts(self
) -> List
[HostSpec
]:
774 Return all hosts that are offline or in maintenance mode.
776 The idea is we should not touch the daemons on these hosts (since
777 in theory the hosts are inaccessible so we CAN'T touch them) but
778 we still want to count daemons that exist on these hosts toward the
779 placement so daemons on these hosts aren't just moved elsewhere
782 h
for h
in self
.mgr
.inventory
.all_specs()
784 h
.status
.lower() in ['maintenance', 'offline']
785 or h
.hostname
in self
.mgr
.offline_hosts
789 def get_facts(self
, host
: str) -> Dict
[str, Any
]:
790 return self
.facts
.get(host
, {})
792 def _get_daemons(self
) -> Iterator
[orchestrator
.DaemonDescription
]:
793 for dm
in self
.daemons
.copy().values():
794 yield from dm
.values()
796 def get_daemons(self
):
797 # type: () -> List[orchestrator.DaemonDescription]
798 return list(self
._get
_daemons
())
800 def get_error_daemons(self
) -> List
[orchestrator
.DaemonDescription
]:
802 for dd
in self
._get
_daemons
():
803 if dd
.status
is not None and dd
.status
== orchestrator
.DaemonDescriptionStatus
.error
:
807 def get_daemons_by_host(self
, host
: str) -> List
[orchestrator
.DaemonDescription
]:
808 return list(self
.daemons
.get(host
, {}).values())
810 def get_daemon(self
, daemon_name
: str, host
: Optional
[str] = None) -> orchestrator
.DaemonDescription
:
811 assert not daemon_name
.startswith('ha-rgw.')
812 dds
= self
.get_daemons_by_host(host
) if host
else self
._get
_daemons
()
814 if dd
.name() == daemon_name
:
817 raise orchestrator
.OrchestratorError(f
'Unable to find {daemon_name} daemon(s)')
819 def has_daemon(self
, daemon_name
: str, host
: Optional
[str] = None) -> bool:
821 self
.get_daemon(daemon_name
, host
)
822 except orchestrator
.OrchestratorError
:
826 def get_daemons_with_volatile_status(self
) -> Iterator
[Tuple
[str, Dict
[str, orchestrator
.DaemonDescription
]]]:
827 def alter(host
: str, dd_orig
: orchestrator
.DaemonDescription
) -> orchestrator
.DaemonDescription
:
829 if host
in self
.mgr
.offline_hosts
:
830 dd
.status
= orchestrator
.DaemonDescriptionStatus
.error
831 dd
.status_desc
= 'host is offline'
832 elif self
.mgr
.inventory
._inventory
[host
].get("status", "").lower() == "maintenance":
833 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
834 # could be wrong. We must assume maintenance is working and daemons are stopped
835 dd
.status
= orchestrator
.DaemonDescriptionStatus
.stopped
836 dd
.events
= self
.mgr
.events
.get_for_daemon(dd
.name())
839 for host
, dm
in self
.daemons
.copy().items():
840 yield host
, {name
: alter(host
, d
) for name
, d
in dm
.items()}
842 def get_daemons_by_service(self
, service_name
):
843 # type: (str) -> List[orchestrator.DaemonDescription]
844 assert not service_name
.startswith('keepalived.')
845 assert not service_name
.startswith('haproxy.')
847 return list(dd
for dd
in self
._get
_daemons
() if dd
.service_name() == service_name
)
849 def get_daemons_by_type(self
, service_type
: str, host
: str = '') -> List
[orchestrator
.DaemonDescription
]:
850 assert service_type
not in ['keepalived', 'haproxy']
852 daemons
= self
.daemons
[host
].values() if host
else self
._get
_daemons
()
854 return [d
for d
in daemons
if d
.daemon_type
in service_to_daemon_types(service_type
)]
856 def get_daemon_types(self
, hostname
: str) -> Set
[str]:
857 """Provide a list of the types of daemons on the host"""
858 return cast(Set
[str], {d
.daemon_type
for d
in self
.daemons
[hostname
].values()})
860 def get_daemon_names(self
):
861 # type: () -> List[str]
862 return [d
.name() for d
in self
._get
_daemons
()]
864 def get_daemon_last_config_deps(self
, host
: str, name
: str) -> Tuple
[Optional
[List
[str]], Optional
[datetime
.datetime
]]:
865 if host
in self
.daemon_config_deps
:
866 if name
in self
.daemon_config_deps
[host
]:
867 return self
.daemon_config_deps
[host
][name
].get('deps', []), \
868 self
.daemon_config_deps
[host
][name
].get('last_config', None)
871 def get_host_client_files(self
, host
: str) -> Dict
[str, Tuple
[str, int, int, int]]:
872 return self
.last_client_files
.get(host
, {})
874 def host_needs_daemon_refresh(self
, host
):
875 # type: (str) -> bool
876 if host
in self
.mgr
.offline_hosts
:
877 logger
.debug(f
'Host "{host}" marked as offline. Skipping daemon refresh')
879 if host
in self
.daemon_refresh_queue
:
880 self
.daemon_refresh_queue
.remove(host
)
882 cutoff
= datetime_now() - datetime
.timedelta(
883 seconds
=self
.mgr
.daemon_cache_timeout
)
884 if host
not in self
.last_daemon_update
or self
.last_daemon_update
[host
] < cutoff
:
886 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
890 def host_needs_facts_refresh(self
, host
):
891 # type: (str) -> bool
892 if host
in self
.mgr
.offline_hosts
:
893 logger
.debug(f
'Host "{host}" marked as offline. Skipping gather facts refresh')
895 cutoff
= datetime_now() - datetime
.timedelta(
896 seconds
=self
.mgr
.facts_cache_timeout
)
897 if host
not in self
.last_facts_update
or self
.last_facts_update
[host
] < cutoff
:
899 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
903 def host_needs_autotune_memory(self
, host
):
904 # type: (str) -> bool
905 if host
in self
.mgr
.offline_hosts
:
906 logger
.debug(f
'Host "{host}" marked as offline. Skipping autotune')
908 cutoff
= datetime_now() - datetime
.timedelta(
909 seconds
=self
.mgr
.autotune_interval
)
910 if host
not in self
.last_autotune
or self
.last_autotune
[host
] < cutoff
:
914 def host_had_daemon_refresh(self
, host
: str) -> bool:
918 if host
in self
.last_daemon_update
:
920 if host
not in self
.daemons
:
922 return bool(self
.daemons
[host
])
924 def host_needs_device_refresh(self
, host
):
925 # type: (str) -> bool
926 if host
in self
.mgr
.offline_hosts
:
927 logger
.debug(f
'Host "{host}" marked as offline. Skipping device refresh')
929 if host
in self
.device_refresh_queue
:
930 self
.device_refresh_queue
.remove(host
)
932 cutoff
= datetime_now() - datetime
.timedelta(
933 seconds
=self
.mgr
.device_cache_timeout
)
934 if host
not in self
.last_device_update
or self
.last_device_update
[host
] < cutoff
:
936 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
940 def host_needs_network_refresh(self
, host
):
941 # type: (str) -> bool
942 if host
in self
.mgr
.offline_hosts
:
943 logger
.debug(f
'Host "{host}" marked as offline. Skipping network refresh')
945 if host
in self
.network_refresh_queue
:
946 self
.network_refresh_queue
.remove(host
)
948 cutoff
= datetime_now() - datetime
.timedelta(
949 seconds
=self
.mgr
.device_cache_timeout
)
950 if host
not in self
.last_network_update
or self
.last_network_update
[host
] < cutoff
:
952 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
956 def host_needs_osdspec_preview_refresh(self
, host
: str) -> bool:
957 if host
in self
.mgr
.offline_hosts
:
958 logger
.debug(f
'Host "{host}" marked as offline. Skipping osdspec preview refresh')
960 if host
in self
.osdspec_previews_refresh_queue
:
961 self
.osdspec_previews_refresh_queue
.remove(host
)
963 # Since this is dependent on other factors (device and spec) this does not need
964 # to be updated periodically.
967 def host_needs_check(self
, host
):
968 # type: (str) -> bool
969 cutoff
= datetime_now() - datetime
.timedelta(
970 seconds
=self
.mgr
.host_check_interval
)
971 return host
not in self
.last_host_check
or self
.last_host_check
[host
] < cutoff
973 def osdspec_needs_apply(self
, host
: str, spec
: ServiceSpec
) -> bool:
975 host
not in self
.devices
976 or host
not in self
.last_device_change
977 or host
not in self
.last_device_update
978 or host
not in self
.osdspec_last_applied
979 or spec
.service_name() not in self
.osdspec_last_applied
[host
]
982 created
= self
.mgr
.spec_store
.get_created(spec
)
983 if not created
or created
> self
.last_device_change
[host
]:
985 return self
.osdspec_last_applied
[host
][spec
.service_name()] < self
.last_device_change
[host
]
987 def host_needs_registry_login(self
, host
: str) -> bool:
988 if host
in self
.mgr
.offline_hosts
:
990 if host
in self
.registry_login_queue
:
991 self
.registry_login_queue
.remove(host
)
995 def host_metadata_up_to_date(self
, host
: str) -> bool:
996 if host
not in self
.metadata_up_to_date
or not self
.metadata_up_to_date
[host
]:
1000 def all_host_metadata_up_to_date(self
) -> bool:
1001 unreachables
= [h
.hostname
for h
in self
.get_unreachable_hosts()]
1002 if [h
for h
in self
.get_hosts() if (not self
.host_metadata_up_to_date(h
) and h
not in unreachables
)]:
1003 # this function is primarily for telling if it's safe to try and apply a service
1004 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1005 # we don't want to return False if the host without up-to-date metadata is in one
1006 # of those two categories.
1010 def add_daemon(self
, host
, dd
):
1011 # type: (str, orchestrator.DaemonDescription) -> None
1012 assert host
in self
.daemons
1013 self
.daemons
[host
][dd
.name()] = dd
1015 def rm_daemon(self
, host
: str, name
: str) -> None:
1016 assert not name
.startswith('ha-rgw.')
1018 if host
in self
.daemons
:
1019 if name
in self
.daemons
[host
]:
1020 del self
.daemons
[host
][name
]
1022 def daemon_cache_filled(self
) -> bool:
1024 i.e. we have checked the daemons for each hosts at least once.
1025 excluding offline hosts.
1027 We're not checking for `host_needs_daemon_refresh`, as this might never be
1028 False for all hosts.
1030 return all((self
.host_had_daemon_refresh(h
) or h
in self
.mgr
.offline_hosts
)
1031 for h
in self
.get_hosts())
1033 def schedule_daemon_action(self
, host
: str, daemon_name
: str, action
: str) -> None:
1034 assert not daemon_name
.startswith('ha-rgw.')
1043 existing_action
= self
.scheduled_daemon_actions
.get(host
, {}).get(daemon_name
, None)
1044 if existing_action
and priorities
[existing_action
] > priorities
[action
]:
1046 f
'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1049 if host
not in self
.scheduled_daemon_actions
:
1050 self
.scheduled_daemon_actions
[host
] = {}
1051 self
.scheduled_daemon_actions
[host
][daemon_name
] = action
1053 def rm_scheduled_daemon_action(self
, host
: str, daemon_name
: str) -> bool:
1055 if host
in self
.scheduled_daemon_actions
:
1056 if daemon_name
in self
.scheduled_daemon_actions
[host
]:
1057 del self
.scheduled_daemon_actions
[host
][daemon_name
]
1059 if not self
.scheduled_daemon_actions
[host
]:
1060 del self
.scheduled_daemon_actions
[host
]
1063 def get_scheduled_daemon_action(self
, host
: str, daemon
: str) -> Optional
[str]:
1064 assert not daemon
.startswith('ha-rgw.')
1066 return self
.scheduled_daemon_actions
.get(host
, {}).get(daemon
)
1071 AgentCache is used for storing metadata about agent daemons that must be kept
1072 through MGR failovers
1075 def __init__(self
, mgr
):
1076 # type: (CephadmOrchestrator) -> None
1077 self
.mgr
: CephadmOrchestrator
= mgr
1078 self
.agent_config_deps
= {} # type: Dict[str, Dict[str,Any]]
1079 self
.agent_counter
= {} # type: Dict[str, int]
1080 self
.agent_timestamp
= {} # type: Dict[str, datetime.datetime]
1081 self
.agent_keys
= {} # type: Dict[str, str]
1082 self
.agent_ports
= {} # type: Dict[str, int]
1083 self
.sending_agent_message
= {} # type: Dict[str, bool]
1087 for k
, v
in self
.mgr
.get_store_prefix(AGENT_CACHE_PREFIX
).items():
1088 host
= k
[len(AGENT_CACHE_PREFIX
):]
1089 if host
not in self
.mgr
.inventory
:
1090 self
.mgr
.log
.warning('removing stray AgentCache record for agent on %s' % (
1092 self
.mgr
.set_store(k
, None)
1095 self
.agent_config_deps
[host
] = {}
1096 conf_deps
= j
.get('agent_config_deps', {})
1098 conf_deps
['last_config'] = str_to_datetime(conf_deps
['last_config'])
1099 self
.agent_config_deps
[host
] = conf_deps
1100 self
.agent_counter
[host
] = int(j
.get('agent_counter', 1))
1101 self
.agent_timestamp
[host
] = str_to_datetime(
1102 j
.get('agent_timestamp', datetime_to_str(datetime_now())))
1103 self
.agent_keys
[host
] = str(j
.get('agent_keys', ''))
1104 agent_port
= int(j
.get('agent_ports', 0))
1106 self
.agent_ports
[host
] = agent_port
1108 except Exception as e
:
1109 self
.mgr
.log
.warning('unable to load cached state for agent on host %s: %s' % (
1113 def save_agent(self
, host
: str) -> None:
1114 j
: Dict
[str, Any
] = {}
1115 if host
in self
.agent_config_deps
:
1116 j
['agent_config_deps'] = {
1117 'deps': self
.agent_config_deps
[host
].get('deps', []),
1118 'last_config': datetime_to_str(self
.agent_config_deps
[host
]['last_config']),
1120 if host
in self
.agent_counter
:
1121 j
['agent_counter'] = self
.agent_counter
[host
]
1122 if host
in self
.agent_keys
:
1123 j
['agent_keys'] = self
.agent_keys
[host
]
1124 if host
in self
.agent_ports
:
1125 j
['agent_ports'] = self
.agent_ports
[host
]
1126 if host
in self
.agent_timestamp
:
1127 j
['agent_timestamp'] = datetime_to_str(self
.agent_timestamp
[host
])
1129 self
.mgr
.set_store(AGENT_CACHE_PREFIX
+ host
, json
.dumps(j
))
1131 def update_agent_config_deps(self
, host
: str, deps
: List
[str], stamp
: datetime
.datetime
) -> None:
1132 self
.agent_config_deps
[host
] = {
1134 'last_config': stamp
,
1137 def get_agent_last_config_deps(self
, host
: str) -> Tuple
[Optional
[List
[str]], Optional
[datetime
.datetime
]]:
1138 if host
in self
.agent_config_deps
:
1139 return self
.agent_config_deps
[host
].get('deps', []), \
1140 self
.agent_config_deps
[host
].get('last_config', None)
1143 def messaging_agent(self
, host
: str) -> bool:
1144 if host
not in self
.sending_agent_message
or not self
.sending_agent_message
[host
]:
1148 def agent_config_successfully_delivered(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> None:
1149 # agent successfully received new config. Update config/deps
1150 assert daemon_spec
.service_name
== 'agent'
1151 self
.update_agent_config_deps(
1152 daemon_spec
.host
, daemon_spec
.deps
, datetime_now())
1153 self
.agent_timestamp
[daemon_spec
.host
] = datetime_now()
1154 self
.agent_counter
[daemon_spec
.host
] = 1
1155 self
.save_agent(daemon_spec
.host
)
1159 def __init__(self
, mgr
):
1160 # type: (CephadmOrchestrator) -> None
1161 self
.mgr
: CephadmOrchestrator
= mgr
1162 self
.events
= {} # type: Dict[str, List[OrchestratorEvent]]
1164 def add(self
, event
: OrchestratorEvent
) -> None:
1166 if event
.kind_subject() not in self
.events
:
1167 self
.events
[event
.kind_subject()] = [event
]
1169 for e
in self
.events
[event
.kind_subject()]:
1170 if e
.message
== event
.message
:
1173 self
.events
[event
.kind_subject()].append(event
)
1175 # limit to five events for now.
1176 self
.events
[event
.kind_subject()] = self
.events
[event
.kind_subject()][-5:]
1178 def for_service(self
, spec
: ServiceSpec
, level
: str, message
: str) -> None:
1179 e
= OrchestratorEvent(datetime_now(), 'service',
1180 spec
.service_name(), level
, message
)
1183 def from_orch_error(self
, e
: OrchestratorError
) -> None:
1184 if e
.event_subject
is not None:
1185 self
.add(OrchestratorEvent(
1193 def for_daemon(self
, daemon_name
: str, level
: str, message
: str) -> None:
1194 e
= OrchestratorEvent(datetime_now(), 'daemon', daemon_name
, level
, message
)
1197 def for_daemon_from_exception(self
, daemon_name
: str, e
: Exception) -> None:
1204 def cleanup(self
) -> None:
1205 # Needs to be properly done, in case events are persistently stored.
1207 unknowns
: List
[str] = []
1208 daemons
= self
.mgr
.cache
.get_daemon_names()
1209 specs
= self
.mgr
.spec_store
.all_specs
.keys()
1210 for k_s
, v
in self
.events
.items():
1211 kind
, subject
= k_s
.split(':')
1212 if kind
== 'service':
1213 if subject
not in specs
:
1214 unknowns
.append(k_s
)
1215 elif kind
== 'daemon':
1216 if subject
not in daemons
:
1217 unknowns
.append(k_s
)
1219 for k_s
in unknowns
:
1220 del self
.events
[k_s
]
1222 def get_for_service(self
, name
: str) -> List
[OrchestratorEvent
]:
1223 return self
.events
.get('service:' + name
, [])
1225 def get_for_daemon(self
, name
: str) -> List
[OrchestratorEvent
]:
1226 return self
.events
.get('daemon:' + name
, [])