9 from typing
import TYPE_CHECKING
, Dict
, List
, Iterator
, Optional
, Any
, Tuple
, Set
, Mapping
, cast
, \
13 from ceph
.deployment
import inventory
14 from ceph
.deployment
.service_spec
import ServiceSpec
, PlacementSpec
, TunedProfileSpec
15 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
16 from orchestrator
import OrchestratorError
, HostSpec
, OrchestratorEvent
, service_to_daemon_types
17 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
19 from .utils
import resolve_ip
20 from .migrations
import queue_migrate_nfs_spec
23 from .module
import CephadmOrchestrator
26 logger
= logging
.getLogger(__name__
)
28 HOST_CACHE_PREFIX
= "host."
29 SPEC_STORE_PREFIX
= "spec."
30 AGENT_CACHE_PREFIX
= 'agent.'
33 class HostCacheStatus(enum
.Enum
):
41 The inventory stores a HostSpec for all hosts persistently.
44 def __init__(self
, mgr
: 'CephadmOrchestrator'):
46 adjusted_addrs
= False
48 def is_valid_ip(ip
: str) -> bool:
50 ipaddress
.ip_address(ip
)
56 i
= self
.mgr
.get_store('inventory')
58 self
._inventory
: Dict
[str, dict] = json
.loads(i
)
59 # handle old clusters missing 'hostname' key from hostspec
60 for k
, v
in self
._inventory
.items():
61 if 'hostname' not in v
:
64 # convert legacy non-IP addr?
65 if is_valid_ip(str(v
.get('addr'))):
67 if len(self
._inventory
) > 1:
68 if k
== socket
.gethostname():
69 # Never try to resolve our own host! This is
70 # fraught and can lead to either a loopback
71 # address (due to podman's futzing with
72 # /etc/hosts) or a private IP based on the CNI
73 # configuration. Instead, wait until the mgr
74 # fails over to another host and let them resolve
77 ip
= resolve_ip(cast(str, v
.get('addr')))
79 # we only have 1 node in the cluster, so we can't
80 # rely on another host doing the lookup. use the
81 # IP the mgr binds to.
82 ip
= self
.mgr
.get_mgr_ip()
83 if is_valid_ip(ip
) and not ip
.startswith('127.0.'):
85 f
"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
92 self
._inventory
= dict()
93 logger
.debug('Loaded inventory %s' % self
._inventory
)
95 def keys(self
) -> List
[str]:
96 return list(self
._inventory
.keys())
98 def __contains__(self
, host
: str) -> bool:
99 return host
in self
._inventory
101 def assert_host(self
, host
: str) -> None:
102 if host
not in self
._inventory
:
103 raise OrchestratorError('host %s does not exist' % host
)
105 def add_host(self
, spec
: HostSpec
) -> None:
106 if spec
.hostname
in self
._inventory
:
108 if self
.get_addr(spec
.hostname
) != spec
.addr
:
109 self
.set_addr(spec
.hostname
, spec
.addr
)
111 for label
in spec
.labels
:
112 self
.add_label(spec
.hostname
, label
)
114 self
._inventory
[spec
.hostname
] = spec
.to_json()
117 def rm_host(self
, host
: str) -> None:
118 self
.assert_host(host
)
119 del self
._inventory
[host
]
122 def set_addr(self
, host
: str, addr
: str) -> None:
123 self
.assert_host(host
)
124 self
._inventory
[host
]['addr'] = addr
127 def add_label(self
, host
: str, label
: str) -> None:
128 self
.assert_host(host
)
130 if 'labels' not in self
._inventory
[host
]:
131 self
._inventory
[host
]['labels'] = list()
132 if label
not in self
._inventory
[host
]['labels']:
133 self
._inventory
[host
]['labels'].append(label
)
136 def rm_label(self
, host
: str, label
: str) -> None:
137 self
.assert_host(host
)
139 if 'labels' not in self
._inventory
[host
]:
140 self
._inventory
[host
]['labels'] = list()
141 if label
in self
._inventory
[host
]['labels']:
142 self
._inventory
[host
]['labels'].remove(label
)
145 def has_label(self
, host
: str, label
: str) -> bool:
147 host
in self
._inventory
148 and label
in self
._inventory
[host
].get('labels', [])
151 def get_addr(self
, host
: str) -> str:
152 self
.assert_host(host
)
153 return self
._inventory
[host
].get('addr', host
)
155 def spec_from_dict(self
, info
: dict) -> HostSpec
:
156 hostname
= info
['hostname']
159 addr
=info
.get('addr', hostname
),
160 labels
=info
.get('labels', []),
161 status
='Offline' if hostname
in self
.mgr
.offline_hosts
else info
.get('status', ''),
164 def all_specs(self
) -> List
[HostSpec
]:
165 return list(map(self
.spec_from_dict
, self
._inventory
.values()))
167 def get_host_with_state(self
, state
: str = "") -> List
[str]:
168 """return a list of host names in a specific state"""
169 return [h
for h
in self
._inventory
if self
._inventory
[h
].get("status", "").lower() == state
]
171 def save(self
) -> None:
172 self
.mgr
.set_store('inventory', json
.dumps(self
._inventory
))
175 class SpecDescription(NamedTuple
):
177 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]]
178 created
: datetime
.datetime
179 deleted
: Optional
[datetime
.datetime
]
183 def __init__(self
, mgr
):
184 # type: (CephadmOrchestrator) -> None
186 self
._specs
= {} # type: Dict[str, ServiceSpec]
187 # service_name -> rank -> gen -> daemon_id
188 self
._rank
_maps
= {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
189 self
.spec_created
= {} # type: Dict[str, datetime.datetime]
190 self
.spec_deleted
= {} # type: Dict[str, datetime.datetime]
191 self
.spec_preview
= {} # type: Dict[str, ServiceSpec]
194 def all_specs(self
) -> Mapping
[str, ServiceSpec
]:
196 returns active and deleted specs. Returns read-only dict.
200 def __contains__(self
, name
: str) -> bool:
201 return name
in self
._specs
203 def __getitem__(self
, name
: str) -> SpecDescription
:
204 if name
not in self
._specs
:
205 raise OrchestratorError(f
'Service {name} not found.')
206 return SpecDescription(self
._specs
[name
],
207 self
._rank
_maps
.get(name
),
208 self
.spec_created
[name
],
209 self
.spec_deleted
.get(name
, None))
212 def active_specs(self
) -> Mapping
[str, ServiceSpec
]:
213 return {k
: v
for k
, v
in self
._specs
.items() if k
not in self
.spec_deleted
}
217 for k
, v
in self
.mgr
.get_store_prefix(SPEC_STORE_PREFIX
).items():
218 service_name
= k
[len(SPEC_STORE_PREFIX
):]
220 j
= cast(Dict
[str, dict], json
.loads(v
))
222 (self
.mgr
.migration_current
or 0) < 3
223 and j
['spec'].get('service_type') == 'nfs'
225 self
.mgr
.log
.debug(f
'found legacy nfs spec {j}')
226 queue_migrate_nfs_spec(self
.mgr
, j
)
227 spec
= ServiceSpec
.from_json(j
['spec'])
228 created
= str_to_datetime(cast(str, j
['created']))
229 self
._specs
[service_name
] = spec
230 self
.spec_created
[service_name
] = created
233 deleted
= str_to_datetime(cast(str, j
['deleted']))
234 self
.spec_deleted
[service_name
] = deleted
236 if 'rank_map' in j
and isinstance(j
['rank_map'], dict):
237 self
._rank
_maps
[service_name
] = {}
238 for rank_str
, m
in j
['rank_map'].items():
242 logger
.exception(f
"failed to parse rank in {j['rank_map']}")
244 if isinstance(m
, dict):
245 self
._rank
_maps
[service_name
][rank
] = {}
246 for gen_str
, name
in m
.items():
250 logger
.exception(f
"failed to parse gen in {j['rank_map']}")
252 if isinstance(name
, str) or m
is None:
253 self
._rank
_maps
[service_name
][rank
][gen
] = name
255 self
.mgr
.log
.debug('SpecStore: loaded spec for %s' % (
257 except Exception as e
:
258 self
.mgr
.log
.warning('unable to load spec for %s: %s' % (
265 update_create
: bool = True,
267 name
= spec
.service_name()
268 if spec
.preview_only
:
269 self
.spec_preview
[name
] = spec
271 self
._specs
[name
] = spec
274 self
.spec_created
[name
] = datetime_now()
277 def save_rank_map(self
,
279 rank_map
: Dict
[int, Dict
[int, Optional
[str]]]) -> None:
280 self
._rank
_maps
[name
] = rank_map
283 def _save(self
, name
: str) -> None:
284 data
: Dict
[str, Any
] = {
285 'spec': self
._specs
[name
].to_json(),
286 'created': datetime_to_str(self
.spec_created
[name
]),
288 if name
in self
._rank
_maps
:
289 data
['rank_map'] = self
._rank
_maps
[name
]
290 if name
in self
.spec_deleted
:
291 data
['deleted'] = datetime_to_str(self
.spec_deleted
[name
])
294 SPEC_STORE_PREFIX
+ name
,
295 json
.dumps(data
, sort_keys
=True),
297 self
.mgr
.events
.for_service(self
._specs
[name
],
298 OrchestratorEvent
.INFO
,
299 'service was created')
301 def rm(self
, service_name
: str) -> bool:
302 if service_name
not in self
._specs
:
305 if self
._specs
[service_name
].preview_only
:
306 self
.finally_rm(service_name
)
309 self
.spec_deleted
[service_name
] = datetime_now()
310 self
.save(self
._specs
[service_name
], update_create
=False)
313 def finally_rm(self
, service_name
):
314 # type: (str) -> bool
315 found
= service_name
in self
._specs
317 del self
._specs
[service_name
]
318 if service_name
in self
._rank
_maps
:
319 del self
._rank
_maps
[service_name
]
320 del self
.spec_created
[service_name
]
321 if service_name
in self
.spec_deleted
:
322 del self
.spec_deleted
[service_name
]
323 self
.mgr
.set_store(SPEC_STORE_PREFIX
+ service_name
, None)
326 def get_created(self
, spec
: ServiceSpec
) -> Optional
[datetime
.datetime
]:
327 return self
.spec_created
.get(spec
.service_name())
330 class ClientKeyringSpec(object):
332 A client keyring file that we should maintain
338 placement
: PlacementSpec
,
339 mode
: Optional
[int] = None,
340 uid
: Optional
[int] = None,
341 gid
: Optional
[int] = None,
344 self
.placement
= placement
345 self
.mode
= mode
or 0o600
349 def validate(self
) -> None:
352 def to_json(self
) -> Dict
[str, Any
]:
354 'entity': self
.entity
,
355 'placement': self
.placement
.to_json(),
362 def path(self
) -> str:
363 return f
'/etc/ceph/ceph.{self.entity}.keyring'
366 def from_json(cls
: Type
, data
: dict) -> 'ClientKeyringSpec':
369 c
['placement'] = PlacementSpec
.from_json(c
['placement'])
375 class ClientKeyringStore():
377 Track client keyring files that we are supposed to maintain
380 def __init__(self
, mgr
):
381 # type: (CephadmOrchestrator) -> None
382 self
.mgr
: CephadmOrchestrator
= mgr
384 self
.keys
: Dict
[str, ClientKeyringSpec
] = {}
386 def load(self
) -> None:
387 c
= self
.mgr
.get_store('client_keyrings') or b
'{}'
389 for e
, d
in j
.items():
390 self
.keys
[e
] = ClientKeyringSpec
.from_json(d
)
392 def save(self
) -> None:
394 k
: v
.to_json() for k
, v
in self
.keys
.items()
396 self
.mgr
.set_store('client_keyrings', json
.dumps(data
))
398 def update(self
, ks
: ClientKeyringSpec
) -> None:
399 self
.keys
[ks
.entity
] = ks
402 def rm(self
, entity
: str) -> None:
403 if entity
in self
.keys
:
404 del self
.keys
[entity
]
408 class TunedProfileStore():
410 Store for out tuned profile information
413 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
414 self
.mgr
: CephadmOrchestrator
= mgr
416 self
.profiles
: Dict
[str, TunedProfileSpec
] = {}
418 def __contains__(self
, profile
: str) -> bool:
419 return profile
in self
.profiles
421 def load(self
) -> None:
422 c
= self
.mgr
.get_store('tuned_profiles') or b
'{}'
424 for k
, v
in j
.items():
425 self
.profiles
[k
] = TunedProfileSpec
.from_json(v
)
426 self
.profiles
[k
]._last
_updated
= datetime_to_str(datetime_now())
428 def exists(self
, profile_name
: str) -> bool:
429 return profile_name
in self
.profiles
431 def save(self
) -> None:
432 profiles_json
= {k
: v
.to_json() for k
, v
in self
.profiles
.items()}
433 self
.mgr
.set_store('tuned_profiles', json
.dumps(profiles_json
))
435 def add_setting(self
, profile
: str, setting
: str, value
: str) -> None:
436 if profile
in self
.profiles
:
437 self
.profiles
[profile
].settings
[setting
] = value
438 self
.profiles
[profile
]._last
_updated
= datetime_to_str(datetime_now())
442 f
'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"')
444 def rm_setting(self
, profile
: str, setting
: str) -> None:
445 if profile
in self
.profiles
:
446 if setting
in self
.profiles
[profile
].settings
:
447 self
.profiles
[profile
].settings
.pop(setting
, '')
448 self
.profiles
[profile
]._last
_updated
= datetime_to_str(datetime_now())
452 f
'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"')
455 f
'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"')
457 def add_profile(self
, spec
: TunedProfileSpec
) -> None:
458 spec
._last
_updated
= datetime_to_str(datetime_now())
459 self
.profiles
[spec
.profile_name
] = spec
462 def rm_profile(self
, profile
: str) -> None:
463 if profile
in self
.profiles
:
464 self
.profiles
.pop(profile
, TunedProfileSpec(''))
466 logger
.error(f
'Attempted to remove nonexistent os tuning profile "{profile}"')
469 def last_updated(self
, profile
: str) -> Optional
[datetime
.datetime
]:
470 if profile
not in self
.profiles
or not self
.profiles
[profile
]._last
_updated
:
472 return str_to_datetime(self
.profiles
[profile
]._last
_updated
)
474 def set_last_updated(self
, profile
: str, new_datetime
: datetime
.datetime
) -> None:
475 if profile
in self
.profiles
:
476 self
.profiles
[profile
]._last
_updated
= datetime_to_str(new_datetime
)
478 def list_profiles(self
) -> List
[TunedProfileSpec
]:
479 return [p
for p
in self
.profiles
.values()]
484 HostCache stores different things:
486 1. `daemons`: Deployed daemons O(daemons)
488 They're part of the configuration nowadays and need to be
489 persistent. The name "daemon cache" is unfortunately a bit misleading.
490 Like for example we really need to know where daemons are deployed on
491 hosts that are offline.
493 2. `devices`: ceph-volume inventory cache O(hosts)
495 As soon as this is populated, it becomes more or less read-only.
497 3. `networks`: network interfaces for each host. O(hosts)
499 This is needed in order to deploy MONs. As this is mostly read-only.
501 4. `last_client_files` O(hosts)
503 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
504 (ceph.conf or client keyrings).
506 5. `scheduled_daemon_actions`: O(daemons)
508 Used to run daemon actions after deploying a daemon. We need to
509 store it persistently, in order to stay consistent across
513 def __init__(self
, mgr
):
514 # type: (CephadmOrchestrator) -> None
515 self
.mgr
: CephadmOrchestrator
= mgr
516 self
.daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
517 self
.last_daemon_update
= {} # type: Dict[str, datetime.datetime]
518 self
.devices
= {} # type: Dict[str, List[inventory.Device]]
519 self
.facts
= {} # type: Dict[str, Dict[str, Any]]
520 self
.last_facts_update
= {} # type: Dict[str, datetime.datetime]
521 self
.last_autotune
= {} # type: Dict[str, datetime.datetime]
522 self
.osdspec_previews
= {} # type: Dict[str, List[Dict[str, Any]]]
523 self
.osdspec_last_applied
= {} # type: Dict[str, Dict[str, datetime.datetime]]
524 self
.networks
= {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
525 self
.last_network_update
= {} # type: Dict[str, datetime.datetime]
526 self
.last_device_update
= {} # type: Dict[str, datetime.datetime]
527 self
.last_device_change
= {} # type: Dict[str, datetime.datetime]
528 self
.last_tuned_profile_update
= {} # type: Dict[str, datetime.datetime]
529 self
.daemon_refresh_queue
= [] # type: List[str]
530 self
.device_refresh_queue
= [] # type: List[str]
531 self
.network_refresh_queue
= [] # type: List[str]
532 self
.osdspec_previews_refresh_queue
= [] # type: List[str]
534 # host -> daemon name -> dict
535 self
.daemon_config_deps
= {} # type: Dict[str, Dict[str, Dict[str,Any]]]
536 self
.last_host_check
= {} # type: Dict[str, datetime.datetime]
537 self
.loading_osdspec_preview
= set() # type: Set[str]
538 self
.last_client_files
: Dict
[str, Dict
[str, Tuple
[str, int, int, int]]] = {}
539 self
.registry_login_queue
: Set
[str] = set()
541 self
.scheduled_daemon_actions
: Dict
[str, Dict
[str, str]] = {}
543 self
.metadata_up_to_date
= {} # type: Dict[str, bool]
547 for k
, v
in self
.mgr
.get_store_prefix(HOST_CACHE_PREFIX
).items():
548 host
= k
[len(HOST_CACHE_PREFIX
):]
549 if self
._get
_host
_cache
_entry
_status
(host
) != HostCacheStatus
.host
:
550 if self
._get
_host
_cache
_entry
_status
(host
) == HostCacheStatus
.devices
:
552 self
.mgr
.log
.warning('removing stray HostCache host record %s' % (
554 self
.mgr
.set_store(k
, None)
557 if 'last_device_update' in j
:
558 self
.last_device_update
[host
] = str_to_datetime(j
['last_device_update'])
560 self
.device_refresh_queue
.append(host
)
561 if 'last_device_change' in j
:
562 self
.last_device_change
[host
] = str_to_datetime(j
['last_device_change'])
563 # for services, we ignore the persisted last_*_update
564 # and always trigger a new scrape on mgr restart.
565 self
.daemon_refresh_queue
.append(host
)
566 self
.network_refresh_queue
.append(host
)
567 self
.daemons
[host
] = {}
568 self
.osdspec_previews
[host
] = []
569 self
.osdspec_last_applied
[host
] = {}
570 self
.networks
[host
] = {}
571 self
.daemon_config_deps
[host
] = {}
572 for name
, d
in j
.get('daemons', {}).items():
573 self
.daemons
[host
][name
] = \
574 orchestrator
.DaemonDescription
.from_json(d
)
575 self
.devices
[host
] = []
576 # still want to check old device location for upgrade scenarios
577 for d
in j
.get('devices', []):
578 self
.devices
[host
].append(inventory
.Device
.from_json(d
))
579 self
.devices
[host
] += self
.load_host_devices(host
)
580 self
.networks
[host
] = j
.get('networks_and_interfaces', {})
581 self
.osdspec_previews
[host
] = j
.get('osdspec_previews', {})
582 self
.last_client_files
[host
] = j
.get('last_client_files', {})
583 for name
, ts
in j
.get('osdspec_last_applied', {}).items():
584 self
.osdspec_last_applied
[host
][name
] = str_to_datetime(ts
)
586 for name
, d
in j
.get('daemon_config_deps', {}).items():
587 self
.daemon_config_deps
[host
][name
] = {
588 'deps': d
.get('deps', []),
589 'last_config': str_to_datetime(d
['last_config']),
591 if 'last_host_check' in j
:
592 self
.last_host_check
[host
] = str_to_datetime(j
['last_host_check'])
593 if 'last_tuned_profile_update' in j
:
594 self
.last_tuned_profile_update
[host
] = str_to_datetime(
595 j
['last_tuned_profile_update'])
596 self
.registry_login_queue
.add(host
)
597 self
.scheduled_daemon_actions
[host
] = j
.get('scheduled_daemon_actions', {})
598 self
.metadata_up_to_date
[host
] = j
.get('metadata_up_to_date', False)
601 'HostCache.load: host %s has %d daemons, '
602 '%d devices, %d networks' % (
603 host
, len(self
.daemons
[host
]), len(self
.devices
[host
]),
604 len(self
.networks
[host
])))
605 except Exception as e
:
606 self
.mgr
.log
.warning('unable to load cached state for %s: %s' % (
610 def _get_host_cache_entry_status(self
, host
: str) -> HostCacheStatus
:
611 # return whether a host cache entry in the config-key
612 # store is for a host, a set of devices or is stray.
613 # for a host, the entry name will match a hostname in our
614 # inventory. For devices, it will be formatted
615 # <hostname>.devices.<integer> where <hostname> is
616 # in out inventory. If neither case applies, it is stray
617 if host
in self
.mgr
.inventory
:
618 return HostCacheStatus
.host
620 # try stripping off the ".devices.<integer>" and see if we get
621 # a host name that matches our inventory
622 actual_host
= '.'.join(host
.split('.')[:-2])
623 return HostCacheStatus
.devices
if actual_host
in self
.mgr
.inventory
else HostCacheStatus
.stray
625 return HostCacheStatus
.stray
627 def update_host_daemons(self
, host
, dm
):
628 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
629 self
.daemons
[host
] = dm
630 self
.last_daemon_update
[host
] = datetime_now()
632 def update_host_facts(self
, host
, facts
):
633 # type: (str, Dict[str, Dict[str, Any]]) -> None
634 self
.facts
[host
] = facts
635 self
.last_facts_update
[host
] = datetime_now()
637 def update_autotune(self
, host
: str) -> None:
638 self
.last_autotune
[host
] = datetime_now()
640 def invalidate_autotune(self
, host
: str) -> None:
641 if host
in self
.last_autotune
:
642 del self
.last_autotune
[host
]
644 def devices_changed(self
, host
: str, b
: List
[inventory
.Device
]) -> bool:
645 old_devs
= inventory
.Devices(self
.devices
[host
])
646 new_devs
= inventory
.Devices(b
)
647 # relying on Devices class __eq__ function here
648 if old_devs
!= new_devs
:
649 self
.mgr
.log
.info("Detected new or changed devices on %s" % host
)
653 def update_host_devices(
656 dls
: List
[inventory
.Device
],
659 host
not in self
.devices
660 or host
not in self
.last_device_change
661 or self
.devices_changed(host
, dls
)
663 self
.last_device_change
[host
] = datetime_now()
664 self
.last_device_update
[host
] = datetime_now()
665 self
.devices
[host
] = dls
667 def update_host_networks(
670 nets
: Dict
[str, Dict
[str, List
[str]]]
672 self
.networks
[host
] = nets
673 self
.last_network_update
[host
] = datetime_now()
675 def update_daemon_config_deps(self
, host
: str, name
: str, deps
: List
[str], stamp
: datetime
.datetime
) -> None:
676 self
.daemon_config_deps
[host
][name
] = {
678 'last_config': stamp
,
681 def update_last_host_check(self
, host
):
682 # type: (str) -> None
683 self
.last_host_check
[host
] = datetime_now()
685 def update_osdspec_last_applied(self
, host
, service_name
, ts
):
686 # type: (str, str, datetime.datetime) -> None
687 self
.osdspec_last_applied
[host
][service_name
] = ts
689 def update_client_file(self
,
696 if host
not in self
.last_client_files
:
697 self
.last_client_files
[host
] = {}
698 self
.last_client_files
[host
][path
] = (digest
, mode
, uid
, gid
)
700 def removed_client_file(self
, host
: str, path
: str) -> None:
702 host
in self
.last_client_files
703 and path
in self
.last_client_files
[host
]
705 del self
.last_client_files
[host
][path
]
707 def prime_empty_host(self
, host
):
708 # type: (str) -> None
710 Install an empty entry for a host
712 self
.daemons
[host
] = {}
713 self
.devices
[host
] = []
714 self
.networks
[host
] = {}
715 self
.osdspec_previews
[host
] = []
716 self
.osdspec_last_applied
[host
] = {}
717 self
.daemon_config_deps
[host
] = {}
718 self
.daemon_refresh_queue
.append(host
)
719 self
.device_refresh_queue
.append(host
)
720 self
.network_refresh_queue
.append(host
)
721 self
.osdspec_previews_refresh_queue
.append(host
)
722 self
.registry_login_queue
.add(host
)
723 self
.last_client_files
[host
] = {}
725 def refresh_all_host_info(self
, host
):
726 # type: (str) -> None
728 self
.last_host_check
.pop(host
, None)
729 self
.daemon_refresh_queue
.append(host
)
730 self
.registry_login_queue
.add(host
)
731 self
.device_refresh_queue
.append(host
)
732 self
.last_facts_update
.pop(host
, None)
733 self
.osdspec_previews_refresh_queue
.append(host
)
734 self
.last_autotune
.pop(host
, None)
736 def invalidate_host_daemons(self
, host
):
737 # type: (str) -> None
738 self
.daemon_refresh_queue
.append(host
)
739 if host
in self
.last_daemon_update
:
740 del self
.last_daemon_update
[host
]
743 def invalidate_host_devices(self
, host
):
744 # type: (str) -> None
745 self
.device_refresh_queue
.append(host
)
746 if host
in self
.last_device_update
:
747 del self
.last_device_update
[host
]
750 def invalidate_host_networks(self
, host
):
751 # type: (str) -> None
752 self
.network_refresh_queue
.append(host
)
753 if host
in self
.last_network_update
:
754 del self
.last_network_update
[host
]
757 def distribute_new_registry_login_info(self
) -> None:
758 self
.registry_login_queue
= set(self
.mgr
.inventory
.keys())
760 def save_host(self
, host
: str) -> None:
761 j
: Dict
[str, Any
] = {
764 'osdspec_previews': [],
765 'osdspec_last_applied': {},
766 'daemon_config_deps': {},
768 if host
in self
.last_daemon_update
:
769 j
['last_daemon_update'] = datetime_to_str(self
.last_daemon_update
[host
])
770 if host
in self
.last_device_update
:
771 j
['last_device_update'] = datetime_to_str(self
.last_device_update
[host
])
772 if host
in self
.last_network_update
:
773 j
['last_network_update'] = datetime_to_str(self
.last_network_update
[host
])
774 if host
in self
.last_device_change
:
775 j
['last_device_change'] = datetime_to_str(self
.last_device_change
[host
])
776 if host
in self
.last_tuned_profile_update
:
777 j
['last_tuned_profile_update'] = datetime_to_str(self
.last_tuned_profile_update
[host
])
778 if host
in self
.daemons
:
779 for name
, dd
in self
.daemons
[host
].items():
780 j
['daemons'][name
] = dd
.to_json()
781 if host
in self
.networks
:
782 j
['networks_and_interfaces'] = self
.networks
[host
]
783 if host
in self
.daemon_config_deps
:
784 for name
, depi
in self
.daemon_config_deps
[host
].items():
785 j
['daemon_config_deps'][name
] = {
786 'deps': depi
.get('deps', []),
787 'last_config': datetime_to_str(depi
['last_config']),
789 if host
in self
.osdspec_previews
and self
.osdspec_previews
[host
]:
790 j
['osdspec_previews'] = self
.osdspec_previews
[host
]
791 if host
in self
.osdspec_last_applied
:
792 for name
, ts
in self
.osdspec_last_applied
[host
].items():
793 j
['osdspec_last_applied'][name
] = datetime_to_str(ts
)
795 if host
in self
.last_host_check
:
796 j
['last_host_check'] = datetime_to_str(self
.last_host_check
[host
])
798 if host
in self
.last_client_files
:
799 j
['last_client_files'] = self
.last_client_files
[host
]
800 if host
in self
.scheduled_daemon_actions
:
801 j
['scheduled_daemon_actions'] = self
.scheduled_daemon_actions
[host
]
802 if host
in self
.metadata_up_to_date
:
803 j
['metadata_up_to_date'] = self
.metadata_up_to_date
[host
]
804 if host
in self
.devices
:
805 self
.save_host_devices(host
)
807 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, json
.dumps(j
))
809 def save_host_devices(self
, host
: str) -> None:
810 if host
not in self
.devices
or not self
.devices
[host
]:
811 logger
.debug(f
'Host {host} has no devices to save')
814 devs
: List
[Dict
[str, Any
]] = []
815 for d
in self
.devices
[host
]:
816 devs
.append(d
.to_json())
818 def byte_len(s
: str) -> int:
819 return len(s
.encode('utf-8'))
821 dev_cache_counter
: int = 0
822 cache_size
: int = self
.mgr
.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
823 if cache_size
is not None and cache_size
!= 0 and byte_len(json
.dumps(devs
)) > cache_size
- 1024:
824 # no guarantee all device entries take up the same amount of space
825 # splitting it up so there's one more entry than we need should be fairly
826 # safe and save a lot of extra logic checking sizes
827 cache_entries_needed
= math
.ceil(byte_len(json
.dumps(devs
)) / cache_size
) + 1
828 dev_sublist_size
= math
.ceil(len(devs
) / cache_entries_needed
)
829 dev_lists
: List
[List
[Dict
[str, Any
]]] = [devs
[i
:i
+ dev_sublist_size
]
830 for i
in range(0, len(devs
), dev_sublist_size
)]
831 for dev_list
in dev_lists
:
832 dev_dict
: Dict
[str, Any
] = {'devices': dev_list
}
833 if dev_cache_counter
== 0:
834 dev_dict
.update({'entries': len(dev_lists
)})
835 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
+ '.devices.'
836 + str(dev_cache_counter
), json
.dumps(dev_dict
))
837 dev_cache_counter
+= 1
839 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
+ '.devices.'
840 + str(dev_cache_counter
), json
.dumps({'devices': devs
, 'entries': 1}))
842 def load_host_devices(self
, host
: str) -> List
[inventory
.Device
]:
843 dev_cache_counter
: int = 0
844 devs
: List
[Dict
[str, Any
]] = []
847 # number of entries for the host's devices should be in
848 # the "entries" field of the first entry
849 dev_entries
= json
.loads(self
.mgr
.get_store(
850 HOST_CACHE_PREFIX
+ host
+ '.devices.0')).get('entries')
852 logger
.debug(f
'No device entries found for host {host}')
853 for i
in range(dev_entries
):
855 new_devs
= json
.loads(self
.mgr
.get_store(
856 HOST_CACHE_PREFIX
+ host
+ '.devices.' + str(i
))).get('devices', [])
857 if len(new_devs
) > 0:
858 # verify list contains actual device objects by trying to load one from json
859 inventory
.Device
.from_json(new_devs
[0])
860 # if we didn't throw an Exception on above line, we can add the devices
861 devs
= devs
+ new_devs
862 dev_cache_counter
+= 1
863 except Exception as e
:
864 logger
.error(('Hit exception trying to load devices from '
865 + f
'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
867 return [inventory
.Device
.from_json(d
) for d
in devs
]
869 def rm_host(self
, host
):
870 # type: (str) -> None
871 if host
in self
.daemons
:
872 del self
.daemons
[host
]
873 if host
in self
.devices
:
874 del self
.devices
[host
]
875 if host
in self
.facts
:
877 if host
in self
.last_facts_update
:
878 del self
.last_facts_update
[host
]
879 if host
in self
.last_autotune
:
880 del self
.last_autotune
[host
]
881 if host
in self
.osdspec_previews
:
882 del self
.osdspec_previews
[host
]
883 if host
in self
.osdspec_last_applied
:
884 del self
.osdspec_last_applied
[host
]
885 if host
in self
.loading_osdspec_preview
:
886 self
.loading_osdspec_preview
.remove(host
)
887 if host
in self
.networks
:
888 del self
.networks
[host
]
889 if host
in self
.last_daemon_update
:
890 del self
.last_daemon_update
[host
]
891 if host
in self
.last_device_update
:
892 del self
.last_device_update
[host
]
893 if host
in self
.last_network_update
:
894 del self
.last_network_update
[host
]
895 if host
in self
.last_device_change
:
896 del self
.last_device_change
[host
]
897 if host
in self
.last_tuned_profile_update
:
898 del self
.last_tuned_profile_update
[host
]
899 if host
in self
.daemon_config_deps
:
900 del self
.daemon_config_deps
[host
]
901 if host
in self
.scheduled_daemon_actions
:
902 del self
.scheduled_daemon_actions
[host
]
903 if host
in self
.last_client_files
:
904 del self
.last_client_files
[host
]
905 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, None)
908 # type: () -> List[str]
909 return list(self
.daemons
)
911 def get_schedulable_hosts(self
) -> List
[HostSpec
]:
913 Returns all usable hosts that went through _refresh_host_daemons().
915 This mitigates a potential race, where new host was added *after*
916 ``_refresh_host_daemons()`` was called, but *before*
917 ``_apply_all_specs()`` was called. thus we end up with a hosts
918 where daemons might be running, but we have not yet detected them.
921 h
for h
in self
.mgr
.inventory
.all_specs()
923 self
.host_had_daemon_refresh(h
.hostname
)
924 and '_no_schedule' not in h
.labels
928 def get_non_draining_hosts(self
) -> List
[HostSpec
]:
930 Returns all hosts that do not have _no_schedule label.
932 Useful for the agent who needs this specific list rather than the
933 schedulable_hosts since the agent needs to be deployed on hosts with
937 h
for h
in self
.mgr
.inventory
.all_specs() if '_no_schedule' not in h
.labels
940 def get_draining_hosts(self
) -> List
[HostSpec
]:
942 Returns all hosts that have _no_schedule label and therefore should have
943 no daemons placed on them, but are potentially still reachable
946 h
for h
in self
.mgr
.inventory
.all_specs() if '_no_schedule' in h
.labels
949 def get_unreachable_hosts(self
) -> List
[HostSpec
]:
951 Return all hosts that are offline or in maintenance mode.
953 The idea is we should not touch the daemons on these hosts (since
954 in theory the hosts are inaccessible so we CAN'T touch them) but
955 we still want to count daemons that exist on these hosts toward the
956 placement so daemons on these hosts aren't just moved elsewhere
959 h
for h
in self
.mgr
.inventory
.all_specs()
961 h
.status
.lower() in ['maintenance', 'offline']
962 or h
.hostname
in self
.mgr
.offline_hosts
966 def get_facts(self
, host
: str) -> Dict
[str, Any
]:
967 return self
.facts
.get(host
, {})
969 def _get_daemons(self
) -> Iterator
[orchestrator
.DaemonDescription
]:
970 for dm
in self
.daemons
.copy().values():
971 yield from dm
.values()
973 def get_daemons(self
):
974 # type: () -> List[orchestrator.DaemonDescription]
975 return list(self
._get
_daemons
())
977 def get_error_daemons(self
) -> List
[orchestrator
.DaemonDescription
]:
979 for dd
in self
._get
_daemons
():
980 if dd
.status
is not None and dd
.status
== orchestrator
.DaemonDescriptionStatus
.error
:
984 def get_daemons_by_host(self
, host
: str) -> List
[orchestrator
.DaemonDescription
]:
985 return list(self
.daemons
.get(host
, {}).values())
987 def get_daemon(self
, daemon_name
: str, host
: Optional
[str] = None) -> orchestrator
.DaemonDescription
:
988 assert not daemon_name
.startswith('ha-rgw.')
989 dds
= self
.get_daemons_by_host(host
) if host
else self
._get
_daemons
()
991 if dd
.name() == daemon_name
:
994 raise orchestrator
.OrchestratorError(f
'Unable to find {daemon_name} daemon(s)')
996 def has_daemon(self
, daemon_name
: str, host
: Optional
[str] = None) -> bool:
998 self
.get_daemon(daemon_name
, host
)
999 except orchestrator
.OrchestratorError
:
1003 def get_daemons_with_volatile_status(self
) -> Iterator
[Tuple
[str, Dict
[str, orchestrator
.DaemonDescription
]]]:
1004 def alter(host
: str, dd_orig
: orchestrator
.DaemonDescription
) -> orchestrator
.DaemonDescription
:
1006 if host
in self
.mgr
.offline_hosts
:
1007 dd
.status
= orchestrator
.DaemonDescriptionStatus
.error
1008 dd
.status_desc
= 'host is offline'
1009 elif self
.mgr
.inventory
._inventory
[host
].get("status", "").lower() == "maintenance":
1010 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
1011 # could be wrong. We must assume maintenance is working and daemons are stopped
1012 dd
.status
= orchestrator
.DaemonDescriptionStatus
.stopped
1013 dd
.events
= self
.mgr
.events
.get_for_daemon(dd
.name())
1016 for host
, dm
in self
.daemons
.copy().items():
1017 yield host
, {name
: alter(host
, d
) for name
, d
in dm
.items()}
1019 def get_daemons_by_service(self
, service_name
):
1020 # type: (str) -> List[orchestrator.DaemonDescription]
1021 assert not service_name
.startswith('keepalived.')
1022 assert not service_name
.startswith('haproxy.')
1024 return list(dd
for dd
in self
._get
_daemons
() if dd
.service_name() == service_name
)
1026 def get_daemons_by_type(self
, service_type
: str, host
: str = '') -> List
[orchestrator
.DaemonDescription
]:
1027 assert service_type
not in ['keepalived', 'haproxy']
1029 daemons
= self
.daemons
[host
].values() if host
else self
._get
_daemons
()
1031 return [d
for d
in daemons
if d
.daemon_type
in service_to_daemon_types(service_type
)]
1033 def get_daemon_types(self
, hostname
: str) -> Set
[str]:
1034 """Provide a list of the types of daemons on the host"""
1035 return cast(Set
[str], {d
.daemon_type
for d
in self
.daemons
[hostname
].values()})
1037 def get_daemon_names(self
):
1038 # type: () -> List[str]
1039 return [d
.name() for d
in self
._get
_daemons
()]
1041 def get_daemon_last_config_deps(self
, host
: str, name
: str) -> Tuple
[Optional
[List
[str]], Optional
[datetime
.datetime
]]:
1042 if host
in self
.daemon_config_deps
:
1043 if name
in self
.daemon_config_deps
[host
]:
1044 return self
.daemon_config_deps
[host
][name
].get('deps', []), \
1045 self
.daemon_config_deps
[host
][name
].get('last_config', None)
1048 def get_host_client_files(self
, host
: str) -> Dict
[str, Tuple
[str, int, int, int]]:
1049 return self
.last_client_files
.get(host
, {})
1051 def host_needs_daemon_refresh(self
, host
):
1052 # type: (str) -> bool
1053 if host
in self
.mgr
.offline_hosts
:
1054 logger
.debug(f
'Host "{host}" marked as offline. Skipping daemon refresh')
1056 if host
in self
.daemon_refresh_queue
:
1057 self
.daemon_refresh_queue
.remove(host
)
1059 cutoff
= datetime_now() - datetime
.timedelta(
1060 seconds
=self
.mgr
.daemon_cache_timeout
)
1061 if host
not in self
.last_daemon_update
or self
.last_daemon_update
[host
] < cutoff
:
1063 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
1067 def host_needs_facts_refresh(self
, host
):
1068 # type: (str) -> bool
1069 if host
in self
.mgr
.offline_hosts
:
1070 logger
.debug(f
'Host "{host}" marked as offline. Skipping gather facts refresh')
1072 cutoff
= datetime_now() - datetime
.timedelta(
1073 seconds
=self
.mgr
.facts_cache_timeout
)
1074 if host
not in self
.last_facts_update
or self
.last_facts_update
[host
] < cutoff
:
1076 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
1080 def host_needs_autotune_memory(self
, host
):
1081 # type: (str) -> bool
1082 if host
in self
.mgr
.offline_hosts
:
1083 logger
.debug(f
'Host "{host}" marked as offline. Skipping autotune')
1085 cutoff
= datetime_now() - datetime
.timedelta(
1086 seconds
=self
.mgr
.autotune_interval
)
1087 if host
not in self
.last_autotune
or self
.last_autotune
[host
] < cutoff
:
1091 def host_needs_tuned_profile_update(self
, host
: str, profile
: str) -> bool:
1092 if host
in self
.mgr
.offline_hosts
:
1093 logger
.debug(f
'Host "{host}" marked as offline. Cannot apply tuned profile')
1095 if profile
not in self
.mgr
.tuned_profiles
:
1097 f
'Cannot apply tuned profile {profile} on host {host}. Profile does not exist')
1099 if host
not in self
.last_tuned_profile_update
:
1101 last_profile_update
= self
.mgr
.tuned_profiles
.last_updated(profile
)
1102 if last_profile_update
is None:
1103 self
.mgr
.tuned_profiles
.set_last_updated(profile
, datetime_now())
1105 if self
.last_tuned_profile_update
[host
] < last_profile_update
:
1109 def host_had_daemon_refresh(self
, host
: str) -> bool:
1113 if host
in self
.last_daemon_update
:
1115 if host
not in self
.daemons
:
1117 return bool(self
.daemons
[host
])
1119 def host_needs_device_refresh(self
, host
):
1120 # type: (str) -> bool
1121 if host
in self
.mgr
.offline_hosts
:
1122 logger
.debug(f
'Host "{host}" marked as offline. Skipping device refresh')
1124 if host
in self
.device_refresh_queue
:
1125 self
.device_refresh_queue
.remove(host
)
1127 cutoff
= datetime_now() - datetime
.timedelta(
1128 seconds
=self
.mgr
.device_cache_timeout
)
1129 if host
not in self
.last_device_update
or self
.last_device_update
[host
] < cutoff
:
1131 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
1135 def host_needs_network_refresh(self
, host
):
1136 # type: (str) -> bool
1137 if host
in self
.mgr
.offline_hosts
:
1138 logger
.debug(f
'Host "{host}" marked as offline. Skipping network refresh')
1140 if host
in self
.network_refresh_queue
:
1141 self
.network_refresh_queue
.remove(host
)
1143 cutoff
= datetime_now() - datetime
.timedelta(
1144 seconds
=self
.mgr
.device_cache_timeout
)
1145 if host
not in self
.last_network_update
or self
.last_network_update
[host
] < cutoff
:
1147 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
1151 def host_needs_osdspec_preview_refresh(self
, host
: str) -> bool:
1152 if host
in self
.mgr
.offline_hosts
:
1153 logger
.debug(f
'Host "{host}" marked as offline. Skipping osdspec preview refresh')
1155 if host
in self
.osdspec_previews_refresh_queue
:
1156 self
.osdspec_previews_refresh_queue
.remove(host
)
1158 # Since this is dependent on other factors (device and spec) this does not need
1159 # to be updated periodically.
1162 def host_needs_check(self
, host
):
1163 # type: (str) -> bool
1164 cutoff
= datetime_now() - datetime
.timedelta(
1165 seconds
=self
.mgr
.host_check_interval
)
1166 return host
not in self
.last_host_check
or self
.last_host_check
[host
] < cutoff
1168 def osdspec_needs_apply(self
, host
: str, spec
: ServiceSpec
) -> bool:
1170 host
not in self
.devices
1171 or host
not in self
.last_device_change
1172 or host
not in self
.last_device_update
1173 or host
not in self
.osdspec_last_applied
1174 or spec
.service_name() not in self
.osdspec_last_applied
[host
]
1177 created
= self
.mgr
.spec_store
.get_created(spec
)
1178 if not created
or created
> self
.last_device_change
[host
]:
1180 return self
.osdspec_last_applied
[host
][spec
.service_name()] < self
.last_device_change
[host
]
1182 def host_needs_registry_login(self
, host
: str) -> bool:
1183 if host
in self
.mgr
.offline_hosts
:
1185 if host
in self
.registry_login_queue
:
1186 self
.registry_login_queue
.remove(host
)
1190 def host_metadata_up_to_date(self
, host
: str) -> bool:
1191 if host
not in self
.metadata_up_to_date
or not self
.metadata_up_to_date
[host
]:
1195 def all_host_metadata_up_to_date(self
) -> bool:
1196 unreachables
= [h
.hostname
for h
in self
.get_unreachable_hosts()]
1197 if [h
for h
in self
.get_hosts() if (not self
.host_metadata_up_to_date(h
) and h
not in unreachables
)]:
1198 # this function is primarily for telling if it's safe to try and apply a service
1199 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1200 # we don't want to return False if the host without up-to-date metadata is in one
1201 # of those two categories.
1205 def add_daemon(self
, host
, dd
):
1206 # type: (str, orchestrator.DaemonDescription) -> None
1207 assert host
in self
.daemons
1208 self
.daemons
[host
][dd
.name()] = dd
1210 def rm_daemon(self
, host
: str, name
: str) -> None:
1211 assert not name
.startswith('ha-rgw.')
1213 if host
in self
.daemons
:
1214 if name
in self
.daemons
[host
]:
1215 del self
.daemons
[host
][name
]
1217 def daemon_cache_filled(self
) -> bool:
1219 i.e. we have checked the daemons for each hosts at least once.
1220 excluding offline hosts.
1222 We're not checking for `host_needs_daemon_refresh`, as this might never be
1223 False for all hosts.
1225 return all((self
.host_had_daemon_refresh(h
) or h
in self
.mgr
.offline_hosts
)
1226 for h
in self
.get_hosts())
1228 def schedule_daemon_action(self
, host
: str, daemon_name
: str, action
: str) -> None:
1229 assert not daemon_name
.startswith('ha-rgw.')
1239 existing_action
= self
.scheduled_daemon_actions
.get(host
, {}).get(daemon_name
, None)
1240 if existing_action
and priorities
[existing_action
] > priorities
[action
]:
1242 f
'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1245 if host
not in self
.scheduled_daemon_actions
:
1246 self
.scheduled_daemon_actions
[host
] = {}
1247 self
.scheduled_daemon_actions
[host
][daemon_name
] = action
1249 def rm_scheduled_daemon_action(self
, host
: str, daemon_name
: str) -> bool:
1251 if host
in self
.scheduled_daemon_actions
:
1252 if daemon_name
in self
.scheduled_daemon_actions
[host
]:
1253 del self
.scheduled_daemon_actions
[host
][daemon_name
]
1255 if not self
.scheduled_daemon_actions
[host
]:
1256 del self
.scheduled_daemon_actions
[host
]
1259 def get_scheduled_daemon_action(self
, host
: str, daemon
: str) -> Optional
[str]:
1260 assert not daemon
.startswith('ha-rgw.')
1262 return self
.scheduled_daemon_actions
.get(host
, {}).get(daemon
)
1267 AgentCache is used for storing metadata about agent daemons that must be kept
1268 through MGR failovers
1271 def __init__(self
, mgr
):
1272 # type: (CephadmOrchestrator) -> None
1273 self
.mgr
: CephadmOrchestrator
= mgr
1274 self
.agent_config_deps
= {} # type: Dict[str, Dict[str,Any]]
1275 self
.agent_counter
= {} # type: Dict[str, int]
1276 self
.agent_timestamp
= {} # type: Dict[str, datetime.datetime]
1277 self
.agent_keys
= {} # type: Dict[str, str]
1278 self
.agent_ports
= {} # type: Dict[str, int]
1279 self
.sending_agent_message
= {} # type: Dict[str, bool]
1283 for k
, v
in self
.mgr
.get_store_prefix(AGENT_CACHE_PREFIX
).items():
1284 host
= k
[len(AGENT_CACHE_PREFIX
):]
1285 if host
not in self
.mgr
.inventory
:
1286 self
.mgr
.log
.warning('removing stray AgentCache record for agent on %s' % (
1288 self
.mgr
.set_store(k
, None)
1291 self
.agent_config_deps
[host
] = {}
1292 conf_deps
= j
.get('agent_config_deps', {})
1294 conf_deps
['last_config'] = str_to_datetime(conf_deps
['last_config'])
1295 self
.agent_config_deps
[host
] = conf_deps
1296 self
.agent_counter
[host
] = int(j
.get('agent_counter', 1))
1297 self
.agent_timestamp
[host
] = str_to_datetime(
1298 j
.get('agent_timestamp', datetime_to_str(datetime_now())))
1299 self
.agent_keys
[host
] = str(j
.get('agent_keys', ''))
1300 agent_port
= int(j
.get('agent_ports', 0))
1302 self
.agent_ports
[host
] = agent_port
1304 except Exception as e
:
1305 self
.mgr
.log
.warning('unable to load cached state for agent on host %s: %s' % (
1309 def save_agent(self
, host
: str) -> None:
1310 j
: Dict
[str, Any
] = {}
1311 if host
in self
.agent_config_deps
:
1312 j
['agent_config_deps'] = {
1313 'deps': self
.agent_config_deps
[host
].get('deps', []),
1314 'last_config': datetime_to_str(self
.agent_config_deps
[host
]['last_config']),
1316 if host
in self
.agent_counter
:
1317 j
['agent_counter'] = self
.agent_counter
[host
]
1318 if host
in self
.agent_keys
:
1319 j
['agent_keys'] = self
.agent_keys
[host
]
1320 if host
in self
.agent_ports
:
1321 j
['agent_ports'] = self
.agent_ports
[host
]
1322 if host
in self
.agent_timestamp
:
1323 j
['agent_timestamp'] = datetime_to_str(self
.agent_timestamp
[host
])
1325 self
.mgr
.set_store(AGENT_CACHE_PREFIX
+ host
, json
.dumps(j
))
1327 def update_agent_config_deps(self
, host
: str, deps
: List
[str], stamp
: datetime
.datetime
) -> None:
1328 self
.agent_config_deps
[host
] = {
1330 'last_config': stamp
,
1333 def get_agent_last_config_deps(self
, host
: str) -> Tuple
[Optional
[List
[str]], Optional
[datetime
.datetime
]]:
1334 if host
in self
.agent_config_deps
:
1335 return self
.agent_config_deps
[host
].get('deps', []), \
1336 self
.agent_config_deps
[host
].get('last_config', None)
1339 def messaging_agent(self
, host
: str) -> bool:
1340 if host
not in self
.sending_agent_message
or not self
.sending_agent_message
[host
]:
1344 def agent_config_successfully_delivered(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> None:
1345 # agent successfully received new config. Update config/deps
1346 assert daemon_spec
.service_name
== 'agent'
1347 self
.update_agent_config_deps(
1348 daemon_spec
.host
, daemon_spec
.deps
, datetime_now())
1349 self
.agent_timestamp
[daemon_spec
.host
] = datetime_now()
1350 self
.agent_counter
[daemon_spec
.host
] = 1
1351 self
.save_agent(daemon_spec
.host
)
1355 def __init__(self
, mgr
):
1356 # type: (CephadmOrchestrator) -> None
1357 self
.mgr
: CephadmOrchestrator
= mgr
1358 self
.events
= {} # type: Dict[str, List[OrchestratorEvent]]
1360 def add(self
, event
: OrchestratorEvent
) -> None:
1362 if event
.kind_subject() not in self
.events
:
1363 self
.events
[event
.kind_subject()] = [event
]
1365 for e
in self
.events
[event
.kind_subject()]:
1366 if e
.message
== event
.message
:
1369 self
.events
[event
.kind_subject()].append(event
)
1371 # limit to five events for now.
1372 self
.events
[event
.kind_subject()] = self
.events
[event
.kind_subject()][-5:]
1374 def for_service(self
, spec
: ServiceSpec
, level
: str, message
: str) -> None:
1375 e
= OrchestratorEvent(datetime_now(), 'service',
1376 spec
.service_name(), level
, message
)
1379 def from_orch_error(self
, e
: OrchestratorError
) -> None:
1380 if e
.event_subject
is not None:
1381 self
.add(OrchestratorEvent(
1389 def for_daemon(self
, daemon_name
: str, level
: str, message
: str) -> None:
1390 e
= OrchestratorEvent(datetime_now(), 'daemon', daemon_name
, level
, message
)
1393 def for_daemon_from_exception(self
, daemon_name
: str, e
: Exception) -> None:
1400 def cleanup(self
) -> None:
1401 # Needs to be properly done, in case events are persistently stored.
1403 unknowns
: List
[str] = []
1404 daemons
= self
.mgr
.cache
.get_daemon_names()
1405 specs
= self
.mgr
.spec_store
.all_specs
.keys()
1406 for k_s
, v
in self
.events
.items():
1407 kind
, subject
= k_s
.split(':')
1408 if kind
== 'service':
1409 if subject
not in specs
:
1410 unknowns
.append(k_s
)
1411 elif kind
== 'daemon':
1412 if subject
not in daemons
:
1413 unknowns
.append(k_s
)
1415 for k_s
in unknowns
:
1416 del self
.events
[k_s
]
1418 def get_for_service(self
, name
: str) -> List
[OrchestratorEvent
]:
1419 return self
.events
.get('service:' + name
, [])
1421 def get_for_daemon(self
, name
: str) -> List
[OrchestratorEvent
]:
1422 return self
.events
.get('daemon:' + name
, [])