10 from typing
import TYPE_CHECKING
, Dict
, List
, Iterator
, Optional
, Any
, Tuple
, Set
, Mapping
, cast
, \
14 from ceph
.deployment
import inventory
15 from ceph
.deployment
.service_spec
import ServiceSpec
, PlacementSpec
, TunedProfileSpec
, IngressSpec
16 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
17 from orchestrator
import OrchestratorError
, HostSpec
, OrchestratorEvent
, service_to_daemon_types
18 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
20 from .utils
import resolve_ip
, SpecialHostLabels
21 from .migrations
import queue_migrate_nfs_spec
, queue_migrate_rgw_spec
24 from .module
import CephadmOrchestrator
27 logger
= logging
.getLogger(__name__
)
29 HOST_CACHE_PREFIX
= "host."
30 SPEC_STORE_PREFIX
= "spec."
31 AGENT_CACHE_PREFIX
= 'agent.'
34 class HostCacheStatus(enum
.Enum
):
42 The inventory stores a HostSpec for all hosts persistently.
45 def __init__(self
, mgr
: 'CephadmOrchestrator'):
47 adjusted_addrs
= False
49 def is_valid_ip(ip
: str) -> bool:
51 ipaddress
.ip_address(ip
)
57 i
= self
.mgr
.get_store('inventory')
59 self
._inventory
: Dict
[str, dict] = json
.loads(i
)
60 # handle old clusters missing 'hostname' key from hostspec
61 for k
, v
in self
._inventory
.items():
62 if 'hostname' not in v
:
65 # convert legacy non-IP addr?
66 if is_valid_ip(str(v
.get('addr'))):
68 if len(self
._inventory
) > 1:
69 if k
== socket
.gethostname():
70 # Never try to resolve our own host! This is
71 # fraught and can lead to either a loopback
72 # address (due to podman's futzing with
73 # /etc/hosts) or a private IP based on the CNI
74 # configuration. Instead, wait until the mgr
75 # fails over to another host and let them resolve
78 ip
= resolve_ip(cast(str, v
.get('addr')))
80 # we only have 1 node in the cluster, so we can't
81 # rely on another host doing the lookup. use the
82 # IP the mgr binds to.
83 ip
= self
.mgr
.get_mgr_ip()
84 if is_valid_ip(ip
) and not ip
.startswith('127.0.'):
86 f
"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
93 self
._inventory
= dict()
94 self
._all
_known
_names
: Dict
[str, List
[str]] = {}
95 logger
.debug('Loaded inventory %s' % self
._inventory
)
97 def keys(self
) -> List
[str]:
98 return list(self
._inventory
.keys())
100 def __contains__(self
, host
: str) -> bool:
101 return host
in self
._inventory
or host
in itertools
.chain
.from_iterable(self
._all
_known
_names
.values())
103 def _get_stored_name(self
, host
: str) -> str:
104 self
.assert_host(host
)
105 if host
in self
._inventory
:
107 for stored_name
, all_names
in self
._all
_known
_names
.items():
108 if host
in all_names
:
112 def update_known_hostnames(self
, hostname
: str, shortname
: str, fqdn
: str) -> None:
113 for hname
in [hostname
, shortname
, fqdn
]:
114 # if we know the host by any of the names, store the full set of names
115 # in order to be able to check against those names for matching a host
116 if hname
in self
._inventory
:
117 self
._all
_known
_names
[hname
] = [hostname
, shortname
, fqdn
]
119 logger
.debug(f
'got hostname set from gather-facts for unknown host: {[hostname, shortname, fqdn]}')
121 def assert_host(self
, host
: str) -> None:
123 raise OrchestratorError('host %s does not exist' % host
)
125 def add_host(self
, spec
: HostSpec
) -> None:
126 if spec
.hostname
in self
:
128 if self
.get_addr(spec
.hostname
) != spec
.addr
:
129 self
.set_addr(spec
.hostname
, spec
.addr
)
131 for label
in spec
.labels
:
132 self
.add_label(spec
.hostname
, label
)
134 self
._inventory
[spec
.hostname
] = spec
.to_json()
137 def rm_host(self
, host
: str) -> None:
138 host
= self
._get
_stored
_name
(host
)
139 del self
._inventory
[host
]
140 self
._all
_known
_names
.pop(host
, [])
143 def set_addr(self
, host
: str, addr
: str) -> None:
144 host
= self
._get
_stored
_name
(host
)
145 self
._inventory
[host
]['addr'] = addr
148 def add_label(self
, host
: str, label
: str) -> None:
149 host
= self
._get
_stored
_name
(host
)
151 if 'labels' not in self
._inventory
[host
]:
152 self
._inventory
[host
]['labels'] = list()
153 if label
not in self
._inventory
[host
]['labels']:
154 self
._inventory
[host
]['labels'].append(label
)
157 def rm_label(self
, host
: str, label
: str) -> None:
158 host
= self
._get
_stored
_name
(host
)
160 if 'labels' not in self
._inventory
[host
]:
161 self
._inventory
[host
]['labels'] = list()
162 if label
in self
._inventory
[host
]['labels']:
163 self
._inventory
[host
]['labels'].remove(label
)
166 def has_label(self
, host
: str, label
: str) -> bool:
167 host
= self
._get
_stored
_name
(host
)
169 host
in self
._inventory
170 and label
in self
._inventory
[host
].get('labels', [])
173 def get_addr(self
, host
: str) -> str:
174 host
= self
._get
_stored
_name
(host
)
175 return self
._inventory
[host
].get('addr', host
)
177 def spec_from_dict(self
, info
: dict) -> HostSpec
:
178 hostname
= info
['hostname']
179 hostname
= self
._get
_stored
_name
(hostname
)
182 addr
=info
.get('addr', hostname
),
183 labels
=info
.get('labels', []),
184 status
='Offline' if hostname
in self
.mgr
.offline_hosts
else info
.get('status', ''),
187 def all_specs(self
) -> List
[HostSpec
]:
188 return list(map(self
.spec_from_dict
, self
._inventory
.values()))
190 def get_host_with_state(self
, state
: str = "") -> List
[str]:
191 """return a list of host names in a specific state"""
192 return [h
for h
in self
._inventory
if self
._inventory
[h
].get("status", "").lower() == state
]
194 def save(self
) -> None:
195 self
.mgr
.set_store('inventory', json
.dumps(self
._inventory
))
198 class SpecDescription(NamedTuple
):
200 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]]
201 created
: datetime
.datetime
202 deleted
: Optional
[datetime
.datetime
]
206 def __init__(self
, mgr
):
207 # type: (CephadmOrchestrator) -> None
209 self
._specs
= {} # type: Dict[str, ServiceSpec]
210 # service_name -> rank -> gen -> daemon_id
211 self
._rank
_maps
= {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
212 self
.spec_created
= {} # type: Dict[str, datetime.datetime]
213 self
.spec_deleted
= {} # type: Dict[str, datetime.datetime]
214 self
.spec_preview
= {} # type: Dict[str, ServiceSpec]
215 self
._needs
_configuration
: Dict
[str, bool] = {}
218 def all_specs(self
) -> Mapping
[str, ServiceSpec
]:
220 returns active and deleted specs. Returns read-only dict.
224 def __contains__(self
, name
: str) -> bool:
225 return name
in self
._specs
227 def __getitem__(self
, name
: str) -> SpecDescription
:
228 if name
not in self
._specs
:
229 raise OrchestratorError(f
'Service {name} not found.')
230 return SpecDescription(self
._specs
[name
],
231 self
._rank
_maps
.get(name
),
232 self
.spec_created
[name
],
233 self
.spec_deleted
.get(name
, None))
236 def active_specs(self
) -> Mapping
[str, ServiceSpec
]:
237 return {k
: v
for k
, v
in self
._specs
.items() if k
not in self
.spec_deleted
}
241 for k
, v
in self
.mgr
.get_store_prefix(SPEC_STORE_PREFIX
).items():
242 service_name
= k
[len(SPEC_STORE_PREFIX
):]
244 j
= cast(Dict
[str, dict], json
.loads(v
))
246 (self
.mgr
.migration_current
or 0) < 3
247 and j
['spec'].get('service_type') == 'nfs'
249 self
.mgr
.log
.debug(f
'found legacy nfs spec {j}')
250 queue_migrate_nfs_spec(self
.mgr
, j
)
253 (self
.mgr
.migration_current
or 0) < 6
254 and j
['spec'].get('service_type') == 'rgw'
256 queue_migrate_rgw_spec(self
.mgr
, j
)
258 spec
= ServiceSpec
.from_json(j
['spec'])
259 created
= str_to_datetime(cast(str, j
['created']))
260 self
._specs
[service_name
] = spec
261 self
.spec_created
[service_name
] = created
264 deleted
= str_to_datetime(cast(str, j
['deleted']))
265 self
.spec_deleted
[service_name
] = deleted
267 if 'needs_configuration' in j
:
268 self
._needs
_configuration
[service_name
] = cast(bool, j
['needs_configuration'])
270 if 'rank_map' in j
and isinstance(j
['rank_map'], dict):
271 self
._rank
_maps
[service_name
] = {}
272 for rank_str
, m
in j
['rank_map'].items():
276 logger
.exception(f
"failed to parse rank in {j['rank_map']}")
278 if isinstance(m
, dict):
279 self
._rank
_maps
[service_name
][rank
] = {}
280 for gen_str
, name
in m
.items():
284 logger
.exception(f
"failed to parse gen in {j['rank_map']}")
286 if isinstance(name
, str) or m
is None:
287 self
._rank
_maps
[service_name
][rank
][gen
] = name
289 self
.mgr
.log
.debug('SpecStore: loaded spec for %s' % (
291 except Exception as e
:
292 self
.mgr
.log
.warning('unable to load spec for %s: %s' % (
299 update_create
: bool = True,
301 name
= spec
.service_name()
302 if spec
.preview_only
:
303 self
.spec_preview
[name
] = spec
305 self
._specs
[name
] = spec
306 self
._needs
_configuration
[name
] = True
309 self
.spec_created
[name
] = datetime_now()
312 def save_rank_map(self
,
314 rank_map
: Dict
[int, Dict
[int, Optional
[str]]]) -> None:
315 self
._rank
_maps
[name
] = rank_map
318 def _save(self
, name
: str) -> None:
319 data
: Dict
[str, Any
] = {
320 'spec': self
._specs
[name
].to_json(),
322 if name
in self
.spec_created
:
323 data
['created'] = datetime_to_str(self
.spec_created
[name
])
324 if name
in self
._rank
_maps
:
325 data
['rank_map'] = self
._rank
_maps
[name
]
326 if name
in self
.spec_deleted
:
327 data
['deleted'] = datetime_to_str(self
.spec_deleted
[name
])
328 if name
in self
._needs
_configuration
:
329 data
['needs_configuration'] = self
._needs
_configuration
[name
]
332 SPEC_STORE_PREFIX
+ name
,
333 json
.dumps(data
, sort_keys
=True),
335 self
.mgr
.events
.for_service(self
._specs
[name
],
336 OrchestratorEvent
.INFO
,
337 'service was created')
339 def rm(self
, service_name
: str) -> bool:
340 if service_name
not in self
._specs
:
343 if self
._specs
[service_name
].preview_only
:
344 self
.finally_rm(service_name
)
347 self
.spec_deleted
[service_name
] = datetime_now()
348 self
.save(self
._specs
[service_name
], update_create
=False)
351 def finally_rm(self
, service_name
):
352 # type: (str) -> bool
353 found
= service_name
in self
._specs
355 del self
._specs
[service_name
]
356 if service_name
in self
._rank
_maps
:
357 del self
._rank
_maps
[service_name
]
358 del self
.spec_created
[service_name
]
359 if service_name
in self
.spec_deleted
:
360 del self
.spec_deleted
[service_name
]
361 if service_name
in self
._needs
_configuration
:
362 del self
._needs
_configuration
[service_name
]
363 self
.mgr
.set_store(SPEC_STORE_PREFIX
+ service_name
, None)
366 def get_created(self
, spec
: ServiceSpec
) -> Optional
[datetime
.datetime
]:
367 return self
.spec_created
.get(spec
.service_name())
369 def set_unmanaged(self
, service_name
: str, value
: bool) -> str:
370 if service_name
not in self
._specs
:
371 return f
'No service of name {service_name} found. Check "ceph orch ls" for all known services'
372 if self
._specs
[service_name
].unmanaged
== value
:
373 return f
'Service {service_name}{" already " if value else " not "}marked unmanaged. No action taken.'
374 self
._specs
[service_name
].unmanaged
= value
375 self
.save(self
._specs
[service_name
])
376 return f
'Set unmanaged to {str(value)} for service {service_name}'
378 def needs_configuration(self
, name
: str) -> bool:
379 return self
._needs
_configuration
.get(name
, False)
381 def mark_needs_configuration(self
, name
: str) -> None:
382 if name
in self
._specs
:
383 self
._needs
_configuration
[name
] = True
386 self
.mgr
.log
.warning(f
'Attempted to mark unknown service "{name}" as needing configuration')
388 def mark_configured(self
, name
: str) -> None:
389 if name
in self
._specs
:
390 self
._needs
_configuration
[name
] = False
393 self
.mgr
.log
.warning(f
'Attempted to mark unknown service "{name}" as having been configured')
396 class ClientKeyringSpec(object):
398 A client keyring file that we should maintain
404 placement
: PlacementSpec
,
405 mode
: Optional
[int] = None,
406 uid
: Optional
[int] = None,
407 gid
: Optional
[int] = None,
410 self
.placement
= placement
411 self
.mode
= mode
or 0o600
415 def validate(self
) -> None:
418 def to_json(self
) -> Dict
[str, Any
]:
420 'entity': self
.entity
,
421 'placement': self
.placement
.to_json(),
428 def path(self
) -> str:
429 return f
'/etc/ceph/ceph.{self.entity}.keyring'
432 def from_json(cls
: Type
, data
: dict) -> 'ClientKeyringSpec':
435 c
['placement'] = PlacementSpec
.from_json(c
['placement'])
441 class ClientKeyringStore():
443 Track client keyring files that we are supposed to maintain
446 def __init__(self
, mgr
):
447 # type: (CephadmOrchestrator) -> None
448 self
.mgr
: CephadmOrchestrator
= mgr
450 self
.keys
: Dict
[str, ClientKeyringSpec
] = {}
452 def load(self
) -> None:
453 c
= self
.mgr
.get_store('client_keyrings') or b
'{}'
455 for e
, d
in j
.items():
456 self
.keys
[e
] = ClientKeyringSpec
.from_json(d
)
458 def save(self
) -> None:
460 k
: v
.to_json() for k
, v
in self
.keys
.items()
462 self
.mgr
.set_store('client_keyrings', json
.dumps(data
))
464 def update(self
, ks
: ClientKeyringSpec
) -> None:
465 self
.keys
[ks
.entity
] = ks
468 def rm(self
, entity
: str) -> None:
469 if entity
in self
.keys
:
470 del self
.keys
[entity
]
474 class TunedProfileStore():
476 Store for out tuned profile information
479 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
480 self
.mgr
: CephadmOrchestrator
= mgr
482 self
.profiles
: Dict
[str, TunedProfileSpec
] = {}
484 def __contains__(self
, profile
: str) -> bool:
485 return profile
in self
.profiles
487 def load(self
) -> None:
488 c
= self
.mgr
.get_store('tuned_profiles') or b
'{}'
490 for k
, v
in j
.items():
491 self
.profiles
[k
] = TunedProfileSpec
.from_json(v
)
492 self
.profiles
[k
]._last
_updated
= datetime_to_str(datetime_now())
494 def exists(self
, profile_name
: str) -> bool:
495 return profile_name
in self
.profiles
497 def save(self
) -> None:
498 profiles_json
= {k
: v
.to_json() for k
, v
in self
.profiles
.items()}
499 self
.mgr
.set_store('tuned_profiles', json
.dumps(profiles_json
))
501 def add_setting(self
, profile
: str, setting
: str, value
: str) -> None:
502 if profile
in self
.profiles
:
503 self
.profiles
[profile
].settings
[setting
] = value
504 self
.profiles
[profile
]._last
_updated
= datetime_to_str(datetime_now())
508 f
'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"')
510 def rm_setting(self
, profile
: str, setting
: str) -> None:
511 if profile
in self
.profiles
:
512 if setting
in self
.profiles
[profile
].settings
:
513 self
.profiles
[profile
].settings
.pop(setting
, '')
514 self
.profiles
[profile
]._last
_updated
= datetime_to_str(datetime_now())
518 f
'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"')
521 f
'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"')
523 def add_profile(self
, spec
: TunedProfileSpec
) -> None:
524 spec
._last
_updated
= datetime_to_str(datetime_now())
525 self
.profiles
[spec
.profile_name
] = spec
528 def rm_profile(self
, profile
: str) -> None:
529 if profile
in self
.profiles
:
530 self
.profiles
.pop(profile
, TunedProfileSpec(''))
532 logger
.error(f
'Attempted to remove nonexistent os tuning profile "{profile}"')
535 def last_updated(self
, profile
: str) -> Optional
[datetime
.datetime
]:
536 if profile
not in self
.profiles
or not self
.profiles
[profile
]._last
_updated
:
538 return str_to_datetime(self
.profiles
[profile
]._last
_updated
)
540 def set_last_updated(self
, profile
: str, new_datetime
: datetime
.datetime
) -> None:
541 if profile
in self
.profiles
:
542 self
.profiles
[profile
]._last
_updated
= datetime_to_str(new_datetime
)
544 def list_profiles(self
) -> List
[TunedProfileSpec
]:
545 return [p
for p
in self
.profiles
.values()]
550 HostCache stores different things:
552 1. `daemons`: Deployed daemons O(daemons)
554 They're part of the configuration nowadays and need to be
555 persistent. The name "daemon cache" is unfortunately a bit misleading.
556 Like for example we really need to know where daemons are deployed on
557 hosts that are offline.
559 2. `devices`: ceph-volume inventory cache O(hosts)
561 As soon as this is populated, it becomes more or less read-only.
563 3. `networks`: network interfaces for each host. O(hosts)
565 This is needed in order to deploy MONs. As this is mostly read-only.
567 4. `last_client_files` O(hosts)
569 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
570 (ceph.conf or client keyrings).
572 5. `scheduled_daemon_actions`: O(daemons)
574 Used to run daemon actions after deploying a daemon. We need to
575 store it persistently, in order to stay consistent across
579 def __init__(self
, mgr
):
580 # type: (CephadmOrchestrator) -> None
581 self
.mgr
: CephadmOrchestrator
= mgr
582 self
.daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
583 self
._tmp
_daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
584 self
.last_daemon_update
= {} # type: Dict[str, datetime.datetime]
585 self
.devices
= {} # type: Dict[str, List[inventory.Device]]
586 self
.facts
= {} # type: Dict[str, Dict[str, Any]]
587 self
.last_facts_update
= {} # type: Dict[str, datetime.datetime]
588 self
.last_autotune
= {} # type: Dict[str, datetime.datetime]
589 self
.osdspec_previews
= {} # type: Dict[str, List[Dict[str, Any]]]
590 self
.osdspec_last_applied
= {} # type: Dict[str, Dict[str, datetime.datetime]]
591 self
.networks
= {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
592 self
.last_network_update
= {} # type: Dict[str, datetime.datetime]
593 self
.last_device_update
= {} # type: Dict[str, datetime.datetime]
594 self
.last_device_change
= {} # type: Dict[str, datetime.datetime]
595 self
.last_tuned_profile_update
= {} # type: Dict[str, datetime.datetime]
596 self
.daemon_refresh_queue
= [] # type: List[str]
597 self
.device_refresh_queue
= [] # type: List[str]
598 self
.network_refresh_queue
= [] # type: List[str]
599 self
.osdspec_previews_refresh_queue
= [] # type: List[str]
601 # host -> daemon name -> dict
602 self
.daemon_config_deps
= {} # type: Dict[str, Dict[str, Dict[str,Any]]]
603 self
.last_host_check
= {} # type: Dict[str, datetime.datetime]
604 self
.loading_osdspec_preview
= set() # type: Set[str]
605 self
.last_client_files
: Dict
[str, Dict
[str, Tuple
[str, int, int, int]]] = {}
606 self
.registry_login_queue
: Set
[str] = set()
608 self
.scheduled_daemon_actions
: Dict
[str, Dict
[str, str]] = {}
610 self
.metadata_up_to_date
= {} # type: Dict[str, bool]
614 for k
, v
in self
.mgr
.get_store_prefix(HOST_CACHE_PREFIX
).items():
615 host
= k
[len(HOST_CACHE_PREFIX
):]
616 if self
._get
_host
_cache
_entry
_status
(host
) != HostCacheStatus
.host
:
617 if self
._get
_host
_cache
_entry
_status
(host
) == HostCacheStatus
.devices
:
619 self
.mgr
.log
.warning('removing stray HostCache host record %s' % (
621 self
.mgr
.set_store(k
, None)
624 if 'last_device_update' in j
:
625 self
.last_device_update
[host
] = str_to_datetime(j
['last_device_update'])
627 self
.device_refresh_queue
.append(host
)
628 if 'last_device_change' in j
:
629 self
.last_device_change
[host
] = str_to_datetime(j
['last_device_change'])
630 # for services, we ignore the persisted last_*_update
631 # and always trigger a new scrape on mgr restart.
632 self
.daemon_refresh_queue
.append(host
)
633 self
.network_refresh_queue
.append(host
)
634 self
.daemons
[host
] = {}
635 self
.osdspec_previews
[host
] = []
636 self
.osdspec_last_applied
[host
] = {}
637 self
.networks
[host
] = {}
638 self
.daemon_config_deps
[host
] = {}
639 for name
, d
in j
.get('daemons', {}).items():
640 self
.daemons
[host
][name
] = \
641 orchestrator
.DaemonDescription
.from_json(d
)
642 self
.devices
[host
] = []
643 # still want to check old device location for upgrade scenarios
644 for d
in j
.get('devices', []):
645 self
.devices
[host
].append(inventory
.Device
.from_json(d
))
646 self
.devices
[host
] += self
.load_host_devices(host
)
647 self
.networks
[host
] = j
.get('networks_and_interfaces', {})
648 self
.osdspec_previews
[host
] = j
.get('osdspec_previews', {})
649 self
.last_client_files
[host
] = j
.get('last_client_files', {})
650 for name
, ts
in j
.get('osdspec_last_applied', {}).items():
651 self
.osdspec_last_applied
[host
][name
] = str_to_datetime(ts
)
653 for name
, d
in j
.get('daemon_config_deps', {}).items():
654 self
.daemon_config_deps
[host
][name
] = {
655 'deps': d
.get('deps', []),
656 'last_config': str_to_datetime(d
['last_config']),
658 if 'last_host_check' in j
:
659 self
.last_host_check
[host
] = str_to_datetime(j
['last_host_check'])
660 if 'last_tuned_profile_update' in j
:
661 self
.last_tuned_profile_update
[host
] = str_to_datetime(
662 j
['last_tuned_profile_update'])
663 self
.registry_login_queue
.add(host
)
664 self
.scheduled_daemon_actions
[host
] = j
.get('scheduled_daemon_actions', {})
665 self
.metadata_up_to_date
[host
] = j
.get('metadata_up_to_date', False)
668 'HostCache.load: host %s has %d daemons, '
669 '%d devices, %d networks' % (
670 host
, len(self
.daemons
[host
]), len(self
.devices
[host
]),
671 len(self
.networks
[host
])))
672 except Exception as e
:
673 self
.mgr
.log
.warning('unable to load cached state for %s: %s' % (
677 def _get_host_cache_entry_status(self
, host
: str) -> HostCacheStatus
:
678 # return whether a host cache entry in the config-key
679 # store is for a host, a set of devices or is stray.
680 # for a host, the entry name will match a hostname in our
681 # inventory. For devices, it will be formatted
682 # <hostname>.devices.<integer> where <hostname> is
683 # in out inventory. If neither case applies, it is stray
684 if host
in self
.mgr
.inventory
:
685 return HostCacheStatus
.host
687 # try stripping off the ".devices.<integer>" and see if we get
688 # a host name that matches our inventory
689 actual_host
= '.'.join(host
.split('.')[:-2])
690 return HostCacheStatus
.devices
if actual_host
in self
.mgr
.inventory
else HostCacheStatus
.stray
692 return HostCacheStatus
.stray
694 def update_host_daemons(self
, host
, dm
):
695 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
696 self
.daemons
[host
] = dm
697 self
._tmp
_daemons
.pop(host
, {})
698 self
.last_daemon_update
[host
] = datetime_now()
700 def append_tmp_daemon(self
, host
: str, dd
: orchestrator
.DaemonDescription
) -> None:
701 # for storing empty daemon descriptions representing daemons we have
702 # just deployed but not yet had the chance to pick up in a daemon refresh
703 # _tmp_daemons is cleared for a host upon receiving a real update of the
705 if host
not in self
._tmp
_daemons
:
706 self
._tmp
_daemons
[host
] = {}
707 self
._tmp
_daemons
[host
][dd
.name()] = dd
709 def update_host_facts(self
, host
, facts
):
710 # type: (str, Dict[str, Dict[str, Any]]) -> None
711 self
.facts
[host
] = facts
712 hostnames
: List
[str] = []
713 for k
in ['hostname', 'shortname', 'fqdn']:
715 hostnames
.append(v
if isinstance(v
, str) else '')
716 self
.mgr
.inventory
.update_known_hostnames(hostnames
[0], hostnames
[1], hostnames
[2])
717 self
.last_facts_update
[host
] = datetime_now()
719 def update_autotune(self
, host
: str) -> None:
720 self
.last_autotune
[host
] = datetime_now()
722 def invalidate_autotune(self
, host
: str) -> None:
723 if host
in self
.last_autotune
:
724 del self
.last_autotune
[host
]
726 def devices_changed(self
, host
: str, b
: List
[inventory
.Device
]) -> bool:
727 old_devs
= inventory
.Devices(self
.devices
[host
])
728 new_devs
= inventory
.Devices(b
)
729 # relying on Devices class __eq__ function here
730 if old_devs
!= new_devs
:
731 self
.mgr
.log
.info("Detected new or changed devices on %s" % host
)
735 def update_host_devices(
738 dls
: List
[inventory
.Device
],
741 host
not in self
.devices
742 or host
not in self
.last_device_change
743 or self
.devices_changed(host
, dls
)
745 self
.last_device_change
[host
] = datetime_now()
746 self
.last_device_update
[host
] = datetime_now()
747 self
.devices
[host
] = dls
749 def update_host_networks(
752 nets
: Dict
[str, Dict
[str, List
[str]]]
754 self
.networks
[host
] = nets
755 self
.last_network_update
[host
] = datetime_now()
757 def update_daemon_config_deps(self
, host
: str, name
: str, deps
: List
[str], stamp
: datetime
.datetime
) -> None:
758 self
.daemon_config_deps
[host
][name
] = {
760 'last_config': stamp
,
763 def update_last_host_check(self
, host
):
764 # type: (str) -> None
765 self
.last_host_check
[host
] = datetime_now()
767 def update_osdspec_last_applied(self
, host
, service_name
, ts
):
768 # type: (str, str, datetime.datetime) -> None
769 self
.osdspec_last_applied
[host
][service_name
] = ts
771 def update_client_file(self
,
778 if host
not in self
.last_client_files
:
779 self
.last_client_files
[host
] = {}
780 self
.last_client_files
[host
][path
] = (digest
, mode
, uid
, gid
)
782 def removed_client_file(self
, host
: str, path
: str) -> None:
784 host
in self
.last_client_files
785 and path
in self
.last_client_files
[host
]
787 del self
.last_client_files
[host
][path
]
789 def prime_empty_host(self
, host
):
790 # type: (str) -> None
792 Install an empty entry for a host
794 self
.daemons
[host
] = {}
795 self
.devices
[host
] = []
796 self
.networks
[host
] = {}
797 self
.osdspec_previews
[host
] = []
798 self
.osdspec_last_applied
[host
] = {}
799 self
.daemon_config_deps
[host
] = {}
800 self
.daemon_refresh_queue
.append(host
)
801 self
.device_refresh_queue
.append(host
)
802 self
.network_refresh_queue
.append(host
)
803 self
.osdspec_previews_refresh_queue
.append(host
)
804 self
.registry_login_queue
.add(host
)
805 self
.last_client_files
[host
] = {}
807 def refresh_all_host_info(self
, host
):
808 # type: (str) -> None
810 self
.last_host_check
.pop(host
, None)
811 self
.daemon_refresh_queue
.append(host
)
812 self
.registry_login_queue
.add(host
)
813 self
.device_refresh_queue
.append(host
)
814 self
.last_facts_update
.pop(host
, None)
815 self
.osdspec_previews_refresh_queue
.append(host
)
816 self
.last_autotune
.pop(host
, None)
818 def invalidate_host_daemons(self
, host
):
819 # type: (str) -> None
820 self
.daemon_refresh_queue
.append(host
)
821 if host
in self
.last_daemon_update
:
822 del self
.last_daemon_update
[host
]
825 def invalidate_host_devices(self
, host
):
826 # type: (str) -> None
827 self
.device_refresh_queue
.append(host
)
828 if host
in self
.last_device_update
:
829 del self
.last_device_update
[host
]
832 def invalidate_host_networks(self
, host
):
833 # type: (str) -> None
834 self
.network_refresh_queue
.append(host
)
835 if host
in self
.last_network_update
:
836 del self
.last_network_update
[host
]
839 def distribute_new_registry_login_info(self
) -> None:
840 self
.registry_login_queue
= set(self
.mgr
.inventory
.keys())
842 def save_host(self
, host
: str) -> None:
843 j
: Dict
[str, Any
] = {
846 'osdspec_previews': [],
847 'osdspec_last_applied': {},
848 'daemon_config_deps': {},
850 if host
in self
.last_daemon_update
:
851 j
['last_daemon_update'] = datetime_to_str(self
.last_daemon_update
[host
])
852 if host
in self
.last_device_update
:
853 j
['last_device_update'] = datetime_to_str(self
.last_device_update
[host
])
854 if host
in self
.last_network_update
:
855 j
['last_network_update'] = datetime_to_str(self
.last_network_update
[host
])
856 if host
in self
.last_device_change
:
857 j
['last_device_change'] = datetime_to_str(self
.last_device_change
[host
])
858 if host
in self
.last_tuned_profile_update
:
859 j
['last_tuned_profile_update'] = datetime_to_str(self
.last_tuned_profile_update
[host
])
860 if host
in self
.daemons
:
861 for name
, dd
in self
.daemons
[host
].items():
862 j
['daemons'][name
] = dd
.to_json()
863 if host
in self
.networks
:
864 j
['networks_and_interfaces'] = self
.networks
[host
]
865 if host
in self
.daemon_config_deps
:
866 for name
, depi
in self
.daemon_config_deps
[host
].items():
867 j
['daemon_config_deps'][name
] = {
868 'deps': depi
.get('deps', []),
869 'last_config': datetime_to_str(depi
['last_config']),
871 if host
in self
.osdspec_previews
and self
.osdspec_previews
[host
]:
872 j
['osdspec_previews'] = self
.osdspec_previews
[host
]
873 if host
in self
.osdspec_last_applied
:
874 for name
, ts
in self
.osdspec_last_applied
[host
].items():
875 j
['osdspec_last_applied'][name
] = datetime_to_str(ts
)
877 if host
in self
.last_host_check
:
878 j
['last_host_check'] = datetime_to_str(self
.last_host_check
[host
])
880 if host
in self
.last_client_files
:
881 j
['last_client_files'] = self
.last_client_files
[host
]
882 if host
in self
.scheduled_daemon_actions
:
883 j
['scheduled_daemon_actions'] = self
.scheduled_daemon_actions
[host
]
884 if host
in self
.metadata_up_to_date
:
885 j
['metadata_up_to_date'] = self
.metadata_up_to_date
[host
]
886 if host
in self
.devices
:
887 self
.save_host_devices(host
)
889 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, json
.dumps(j
))
891 def save_host_devices(self
, host
: str) -> None:
892 if host
not in self
.devices
or not self
.devices
[host
]:
893 logger
.debug(f
'Host {host} has no devices to save')
896 devs
: List
[Dict
[str, Any
]] = []
897 for d
in self
.devices
[host
]:
898 devs
.append(d
.to_json())
900 def byte_len(s
: str) -> int:
901 return len(s
.encode('utf-8'))
903 dev_cache_counter
: int = 0
904 cache_size
: int = self
.mgr
.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
905 if cache_size
is not None and cache_size
!= 0 and byte_len(json
.dumps(devs
)) > cache_size
- 1024:
906 # no guarantee all device entries take up the same amount of space
907 # splitting it up so there's one more entry than we need should be fairly
908 # safe and save a lot of extra logic checking sizes
909 cache_entries_needed
= math
.ceil(byte_len(json
.dumps(devs
)) / cache_size
) + 1
910 dev_sublist_size
= math
.ceil(len(devs
) / cache_entries_needed
)
911 dev_lists
: List
[List
[Dict
[str, Any
]]] = [devs
[i
:i
+ dev_sublist_size
]
912 for i
in range(0, len(devs
), dev_sublist_size
)]
913 for dev_list
in dev_lists
:
914 dev_dict
: Dict
[str, Any
] = {'devices': dev_list
}
915 if dev_cache_counter
== 0:
916 dev_dict
.update({'entries': len(dev_lists
)})
917 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
+ '.devices.'
918 + str(dev_cache_counter
), json
.dumps(dev_dict
))
919 dev_cache_counter
+= 1
921 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
+ '.devices.'
922 + str(dev_cache_counter
), json
.dumps({'devices': devs
, 'entries': 1}))
924 def load_host_devices(self
, host
: str) -> List
[inventory
.Device
]:
925 dev_cache_counter
: int = 0
926 devs
: List
[Dict
[str, Any
]] = []
929 # number of entries for the host's devices should be in
930 # the "entries" field of the first entry
931 dev_entries
= json
.loads(self
.mgr
.get_store(
932 HOST_CACHE_PREFIX
+ host
+ '.devices.0')).get('entries')
934 logger
.debug(f
'No device entries found for host {host}')
935 for i
in range(dev_entries
):
937 new_devs
= json
.loads(self
.mgr
.get_store(
938 HOST_CACHE_PREFIX
+ host
+ '.devices.' + str(i
))).get('devices', [])
939 if len(new_devs
) > 0:
940 # verify list contains actual device objects by trying to load one from json
941 inventory
.Device
.from_json(new_devs
[0])
942 # if we didn't throw an Exception on above line, we can add the devices
943 devs
= devs
+ new_devs
944 dev_cache_counter
+= 1
945 except Exception as e
:
946 logger
.error(('Hit exception trying to load devices from '
947 + f
'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
949 return [inventory
.Device
.from_json(d
) for d
in devs
]
951 def rm_host(self
, host
):
952 # type: (str) -> None
953 if host
in self
.daemons
:
954 del self
.daemons
[host
]
955 if host
in self
.devices
:
956 del self
.devices
[host
]
957 if host
in self
.facts
:
959 if host
in self
.last_facts_update
:
960 del self
.last_facts_update
[host
]
961 if host
in self
.last_autotune
:
962 del self
.last_autotune
[host
]
963 if host
in self
.osdspec_previews
:
964 del self
.osdspec_previews
[host
]
965 if host
in self
.osdspec_last_applied
:
966 del self
.osdspec_last_applied
[host
]
967 if host
in self
.loading_osdspec_preview
:
968 self
.loading_osdspec_preview
.remove(host
)
969 if host
in self
.networks
:
970 del self
.networks
[host
]
971 if host
in self
.last_daemon_update
:
972 del self
.last_daemon_update
[host
]
973 if host
in self
.last_device_update
:
974 del self
.last_device_update
[host
]
975 if host
in self
.last_network_update
:
976 del self
.last_network_update
[host
]
977 if host
in self
.last_device_change
:
978 del self
.last_device_change
[host
]
979 if host
in self
.last_tuned_profile_update
:
980 del self
.last_tuned_profile_update
[host
]
981 if host
in self
.daemon_config_deps
:
982 del self
.daemon_config_deps
[host
]
983 if host
in self
.scheduled_daemon_actions
:
984 del self
.scheduled_daemon_actions
[host
]
985 if host
in self
.last_client_files
:
986 del self
.last_client_files
[host
]
987 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, None)
990 # type: () -> List[str]
991 return list(self
.daemons
)
993 def get_schedulable_hosts(self
) -> List
[HostSpec
]:
995 Returns all usable hosts that went through _refresh_host_daemons().
997 This mitigates a potential race, where new host was added *after*
998 ``_refresh_host_daemons()`` was called, but *before*
999 ``_apply_all_specs()`` was called. thus we end up with a hosts
1000 where daemons might be running, but we have not yet detected them.
1003 h
for h
in self
.mgr
.inventory
.all_specs()
1005 self
.host_had_daemon_refresh(h
.hostname
)
1006 and SpecialHostLabels
.DRAIN_DAEMONS
not in h
.labels
1010 def get_conf_keyring_available_hosts(self
) -> List
[HostSpec
]:
1012 Returns all hosts without the drain conf and keyrings
1013 label (SpecialHostLabels.DRAIN_CONF_KEYRING) that have
1014 had a refresh. That is equivalent to all hosts we
1015 consider eligible for deployment of conf and keyring files
1017 Any host without that label is considered fair game for
1018 a client keyring spec to match. However, we want to still
1019 wait for refresh here so that we know what keyrings we've
1020 already deployed here
1023 h
for h
in self
.mgr
.inventory
.all_specs()
1025 self
.host_had_daemon_refresh(h
.hostname
)
1026 and SpecialHostLabels
.DRAIN_CONF_KEYRING
not in h
.labels
1030 def get_non_draining_hosts(self
) -> List
[HostSpec
]:
1032 Returns all hosts that do not have drain daemon label
1033 (SpecialHostLabels.DRAIN_DAEMONS).
1035 Useful for the agent who needs this specific list rather than the
1036 schedulable_hosts since the agent needs to be deployed on hosts with
1040 h
for h
in self
.mgr
.inventory
.all_specs() if SpecialHostLabels
.DRAIN_DAEMONS
not in h
.labels
1043 def get_draining_hosts(self
) -> List
[HostSpec
]:
1045 Returns all hosts that have the drain daemons label (SpecialHostLabels.DRAIN_DAEMONS)
1046 and therefore should have no daemons placed on them, but are potentially still reachable
1049 h
for h
in self
.mgr
.inventory
.all_specs() if SpecialHostLabels
.DRAIN_DAEMONS
in h
.labels
1052 def get_conf_keyring_draining_hosts(self
) -> List
[HostSpec
]:
1054 Returns all hosts that have drain conf and keyrings label (SpecialHostLabels.DRAIN_CONF_KEYRING)
1055 and therefore should have no config files or client keyring placed on them, but are
1056 potentially still reachable
1059 h
for h
in self
.mgr
.inventory
.all_specs() if SpecialHostLabels
.DRAIN_CONF_KEYRING
in h
.labels
1062 def get_unreachable_hosts(self
) -> List
[HostSpec
]:
1064 Return all hosts that are offline or in maintenance mode.
1066 The idea is we should not touch the daemons on these hosts (since
1067 in theory the hosts are inaccessible so we CAN'T touch them) but
1068 we still want to count daemons that exist on these hosts toward the
1069 placement so daemons on these hosts aren't just moved elsewhere
1072 h
for h
in self
.mgr
.inventory
.all_specs()
1074 h
.status
.lower() in ['maintenance', 'offline']
1075 or h
.hostname
in self
.mgr
.offline_hosts
1079 def is_host_unreachable(self
, hostname
: str) -> bool:
1080 # take hostname and return if it matches the hostname of an unreachable host
1081 return hostname
in [h
.hostname
for h
in self
.get_unreachable_hosts()]
1083 def is_host_schedulable(self
, hostname
: str) -> bool:
1084 # take hostname and return if it matches the hostname of a schedulable host
1085 return hostname
in [h
.hostname
for h
in self
.get_schedulable_hosts()]
1087 def is_host_draining(self
, hostname
: str) -> bool:
1088 # take hostname and return if it matches the hostname of a draining host
1089 return hostname
in [h
.hostname
for h
in self
.get_draining_hosts()]
1091 def get_facts(self
, host
: str) -> Dict
[str, Any
]:
1092 return self
.facts
.get(host
, {})
1094 def _get_daemons(self
) -> Iterator
[orchestrator
.DaemonDescription
]:
1095 for dm
in self
.daemons
.copy().values():
1096 yield from dm
.values()
1098 def _get_tmp_daemons(self
) -> Iterator
[orchestrator
.DaemonDescription
]:
1099 for dm
in self
._tmp
_daemons
.copy().values():
1100 yield from dm
.values()
1102 def get_daemons(self
):
1103 # type: () -> List[orchestrator.DaemonDescription]
1104 return list(self
._get
_daemons
())
1106 def get_error_daemons(self
) -> List
[orchestrator
.DaemonDescription
]:
1108 for dd
in self
._get
_daemons
():
1109 if dd
.status
is not None and dd
.status
== orchestrator
.DaemonDescriptionStatus
.error
:
1113 def get_daemons_by_host(self
, host
: str) -> List
[orchestrator
.DaemonDescription
]:
1114 return list(self
.daemons
.get(host
, {}).values())
1116 def get_daemon(self
, daemon_name
: str, host
: Optional
[str] = None) -> orchestrator
.DaemonDescription
:
1117 assert not daemon_name
.startswith('ha-rgw.')
1118 dds
= self
.get_daemons_by_host(host
) if host
else self
._get
_daemons
()
1120 if dd
.name() == daemon_name
:
1123 raise orchestrator
.OrchestratorError(f
'Unable to find {daemon_name} daemon(s)')
1125 def has_daemon(self
, daemon_name
: str, host
: Optional
[str] = None) -> bool:
1127 self
.get_daemon(daemon_name
, host
)
1128 except orchestrator
.OrchestratorError
:
1132 def get_daemons_with_volatile_status(self
) -> Iterator
[Tuple
[str, Dict
[str, orchestrator
.DaemonDescription
]]]:
1133 def alter(host
: str, dd_orig
: orchestrator
.DaemonDescription
) -> orchestrator
.DaemonDescription
:
1135 if host
in self
.mgr
.offline_hosts
:
1136 dd
.status
= orchestrator
.DaemonDescriptionStatus
.error
1137 dd
.status_desc
= 'host is offline'
1138 elif self
.mgr
.inventory
._inventory
[host
].get("status", "").lower() == "maintenance":
1139 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
1140 # could be wrong. We must assume maintenance is working and daemons are stopped
1141 dd
.status
= orchestrator
.DaemonDescriptionStatus
.stopped
1142 dd
.events
= self
.mgr
.events
.get_for_daemon(dd
.name())
1145 for host
, dm
in self
.daemons
.copy().items():
1146 yield host
, {name
: alter(host
, d
) for name
, d
in dm
.items()}
1148 def get_daemons_by_service(self
, service_name
):
1149 # type: (str) -> List[orchestrator.DaemonDescription]
1150 assert not service_name
.startswith('keepalived.')
1151 assert not service_name
.startswith('haproxy.')
1153 return list(dd
for dd
in self
._get
_daemons
() if dd
.service_name() == service_name
)
1155 def get_related_service_daemons(self
, service_spec
: ServiceSpec
) -> Optional
[List
[orchestrator
.DaemonDescription
]]:
1156 if service_spec
.service_type
== 'ingress':
1157 dds
= list(dd
for dd
in self
._get
_daemons
() if dd
.service_name() == cast(IngressSpec
, service_spec
).backend_service
)
1158 dds
+= list(dd
for dd
in self
._get
_tmp
_daemons
() if dd
.service_name() == cast(IngressSpec
, service_spec
).backend_service
)
1159 logger
.debug(f
'Found related daemons {dds} for service {service_spec.service_name()}')
1162 for ingress_spec
in [cast(IngressSpec
, s
) for s
in self
.mgr
.spec_store
.active_specs
.values() if s
.service_type
== 'ingress']:
1163 if ingress_spec
.backend_service
== service_spec
.service_name():
1164 dds
= list(dd
for dd
in self
._get
_daemons
() if dd
.service_name() == ingress_spec
.service_name())
1165 dds
+= list(dd
for dd
in self
._get
_tmp
_daemons
() if dd
.service_name() == ingress_spec
.service_name())
1166 logger
.debug(f
'Found related daemons {dds} for service {service_spec.service_name()}')
1170 def get_daemons_by_type(self
, service_type
: str, host
: str = '') -> List
[orchestrator
.DaemonDescription
]:
1171 assert service_type
not in ['keepalived', 'haproxy']
1173 daemons
= self
.daemons
[host
].values() if host
else self
._get
_daemons
()
1175 return [d
for d
in daemons
if d
.daemon_type
in service_to_daemon_types(service_type
)]
1177 def get_daemon_types(self
, hostname
: str) -> Set
[str]:
1178 """Provide a list of the types of daemons on the host"""
1179 return cast(Set
[str], {d
.daemon_type
for d
in self
.daemons
[hostname
].values()})
1181 def get_daemon_names(self
):
1182 # type: () -> List[str]
1183 return [d
.name() for d
in self
._get
_daemons
()]
1185 def get_daemon_last_config_deps(self
, host
: str, name
: str) -> Tuple
[Optional
[List
[str]], Optional
[datetime
.datetime
]]:
1186 if host
in self
.daemon_config_deps
:
1187 if name
in self
.daemon_config_deps
[host
]:
1188 return self
.daemon_config_deps
[host
][name
].get('deps', []), \
1189 self
.daemon_config_deps
[host
][name
].get('last_config', None)
1192 def get_host_client_files(self
, host
: str) -> Dict
[str, Tuple
[str, int, int, int]]:
1193 return self
.last_client_files
.get(host
, {})
1195 def host_needs_daemon_refresh(self
, host
):
1196 # type: (str) -> bool
1197 if host
in self
.mgr
.offline_hosts
:
1198 logger
.debug(f
'Host "{host}" marked as offline. Skipping daemon refresh')
1200 if host
in self
.daemon_refresh_queue
:
1201 self
.daemon_refresh_queue
.remove(host
)
1203 cutoff
= datetime_now() - datetime
.timedelta(
1204 seconds
=self
.mgr
.daemon_cache_timeout
)
1205 if host
not in self
.last_daemon_update
or self
.last_daemon_update
[host
] < cutoff
:
1207 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
1211 def host_needs_facts_refresh(self
, host
):
1212 # type: (str) -> bool
1213 if host
in self
.mgr
.offline_hosts
:
1214 logger
.debug(f
'Host "{host}" marked as offline. Skipping gather facts refresh')
1216 cutoff
= datetime_now() - datetime
.timedelta(
1217 seconds
=self
.mgr
.facts_cache_timeout
)
1218 if host
not in self
.last_facts_update
or self
.last_facts_update
[host
] < cutoff
:
1220 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
1224 def host_needs_autotune_memory(self
, host
):
1225 # type: (str) -> bool
1226 if host
in self
.mgr
.offline_hosts
:
1227 logger
.debug(f
'Host "{host}" marked as offline. Skipping autotune')
1229 cutoff
= datetime_now() - datetime
.timedelta(
1230 seconds
=self
.mgr
.autotune_interval
)
1231 if host
not in self
.last_autotune
or self
.last_autotune
[host
] < cutoff
:
1235 def host_needs_tuned_profile_update(self
, host
: str, profile
: str) -> bool:
1236 if host
in self
.mgr
.offline_hosts
:
1237 logger
.debug(f
'Host "{host}" marked as offline. Cannot apply tuned profile')
1239 if profile
not in self
.mgr
.tuned_profiles
:
1241 f
'Cannot apply tuned profile {profile} on host {host}. Profile does not exist')
1243 if host
not in self
.last_tuned_profile_update
:
1245 last_profile_update
= self
.mgr
.tuned_profiles
.last_updated(profile
)
1246 if last_profile_update
is None:
1247 self
.mgr
.tuned_profiles
.set_last_updated(profile
, datetime_now())
1249 if self
.last_tuned_profile_update
[host
] < last_profile_update
:
1253 def host_had_daemon_refresh(self
, host
: str) -> bool:
1257 if host
in self
.last_daemon_update
:
1259 if host
not in self
.daemons
:
1261 return bool(self
.daemons
[host
])
1263 def host_needs_device_refresh(self
, host
):
1264 # type: (str) -> bool
1265 if host
in self
.mgr
.offline_hosts
:
1266 logger
.debug(f
'Host "{host}" marked as offline. Skipping device refresh')
1268 if host
in self
.device_refresh_queue
:
1269 self
.device_refresh_queue
.remove(host
)
1271 cutoff
= datetime_now() - datetime
.timedelta(
1272 seconds
=self
.mgr
.device_cache_timeout
)
1273 if host
not in self
.last_device_update
or self
.last_device_update
[host
] < cutoff
:
1275 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
1279 def host_needs_network_refresh(self
, host
):
1280 # type: (str) -> bool
1281 if host
in self
.mgr
.offline_hosts
:
1282 logger
.debug(f
'Host "{host}" marked as offline. Skipping network refresh')
1284 if host
in self
.network_refresh_queue
:
1285 self
.network_refresh_queue
.remove(host
)
1287 cutoff
= datetime_now() - datetime
.timedelta(
1288 seconds
=self
.mgr
.device_cache_timeout
)
1289 if host
not in self
.last_network_update
or self
.last_network_update
[host
] < cutoff
:
1291 if not self
.mgr
.cache
.host_metadata_up_to_date(host
):
1295 def host_needs_osdspec_preview_refresh(self
, host
: str) -> bool:
1296 if host
in self
.mgr
.offline_hosts
:
1297 logger
.debug(f
'Host "{host}" marked as offline. Skipping osdspec preview refresh')
1299 if host
in self
.osdspec_previews_refresh_queue
:
1300 self
.osdspec_previews_refresh_queue
.remove(host
)
1302 # Since this is dependent on other factors (device and spec) this does not need
1303 # to be updated periodically.
1306 def host_needs_check(self
, host
):
1307 # type: (str) -> bool
1308 cutoff
= datetime_now() - datetime
.timedelta(
1309 seconds
=self
.mgr
.host_check_interval
)
1310 return host
not in self
.last_host_check
or self
.last_host_check
[host
] < cutoff
1312 def osdspec_needs_apply(self
, host
: str, spec
: ServiceSpec
) -> bool:
1314 host
not in self
.devices
1315 or host
not in self
.last_device_change
1316 or host
not in self
.last_device_update
1317 or host
not in self
.osdspec_last_applied
1318 or spec
.service_name() not in self
.osdspec_last_applied
[host
]
1321 created
= self
.mgr
.spec_store
.get_created(spec
)
1322 if not created
or created
> self
.last_device_change
[host
]:
1324 return self
.osdspec_last_applied
[host
][spec
.service_name()] < self
.last_device_change
[host
]
1326 def host_needs_registry_login(self
, host
: str) -> bool:
1327 if host
in self
.mgr
.offline_hosts
:
1329 if host
in self
.registry_login_queue
:
1330 self
.registry_login_queue
.remove(host
)
1334 def host_metadata_up_to_date(self
, host
: str) -> bool:
1335 if host
not in self
.metadata_up_to_date
or not self
.metadata_up_to_date
[host
]:
1339 def all_host_metadata_up_to_date(self
) -> bool:
1340 if [h
for h
in self
.get_hosts() if (not self
.host_metadata_up_to_date(h
) and not self
.is_host_unreachable(h
))]:
1341 # this function is primarily for telling if it's safe to try and apply a service
1342 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1343 # we don't want to return False if the host without up-to-date metadata is in one
1344 # of those two categories.
1348 def add_daemon(self
, host
, dd
):
1349 # type: (str, orchestrator.DaemonDescription) -> None
1350 assert host
in self
.daemons
1351 self
.daemons
[host
][dd
.name()] = dd
1353 def rm_daemon(self
, host
: str, name
: str) -> None:
1354 assert not name
.startswith('ha-rgw.')
1356 if host
in self
.daemons
:
1357 if name
in self
.daemons
[host
]:
1358 del self
.daemons
[host
][name
]
1360 def daemon_cache_filled(self
) -> bool:
1362 i.e. we have checked the daemons for each hosts at least once.
1363 excluding offline hosts.
1365 We're not checking for `host_needs_daemon_refresh`, as this might never be
1366 False for all hosts.
1368 return all((self
.host_had_daemon_refresh(h
) or h
in self
.mgr
.offline_hosts
)
1369 for h
in self
.get_hosts())
1371 def schedule_daemon_action(self
, host
: str, daemon_name
: str, action
: str) -> None:
1372 assert not daemon_name
.startswith('ha-rgw.')
1382 existing_action
= self
.scheduled_daemon_actions
.get(host
, {}).get(daemon_name
, None)
1383 if existing_action
and priorities
[existing_action
] > priorities
[action
]:
1385 f
'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1388 if host
not in self
.scheduled_daemon_actions
:
1389 self
.scheduled_daemon_actions
[host
] = {}
1390 self
.scheduled_daemon_actions
[host
][daemon_name
] = action
1392 def rm_scheduled_daemon_action(self
, host
: str, daemon_name
: str) -> bool:
1394 if host
in self
.scheduled_daemon_actions
:
1395 if daemon_name
in self
.scheduled_daemon_actions
[host
]:
1396 del self
.scheduled_daemon_actions
[host
][daemon_name
]
1398 if not self
.scheduled_daemon_actions
[host
]:
1399 del self
.scheduled_daemon_actions
[host
]
1402 def get_scheduled_daemon_action(self
, host
: str, daemon
: str) -> Optional
[str]:
1403 assert not daemon
.startswith('ha-rgw.')
1405 return self
.scheduled_daemon_actions
.get(host
, {}).get(daemon
)
1410 AgentCache is used for storing metadata about agent daemons that must be kept
1411 through MGR failovers
1414 def __init__(self
, mgr
):
1415 # type: (CephadmOrchestrator) -> None
1416 self
.mgr
: CephadmOrchestrator
= mgr
1417 self
.agent_config_deps
= {} # type: Dict[str, Dict[str,Any]]
1418 self
.agent_counter
= {} # type: Dict[str, int]
1419 self
.agent_timestamp
= {} # type: Dict[str, datetime.datetime]
1420 self
.agent_keys
= {} # type: Dict[str, str]
1421 self
.agent_ports
= {} # type: Dict[str, int]
1422 self
.sending_agent_message
= {} # type: Dict[str, bool]
1426 for k
, v
in self
.mgr
.get_store_prefix(AGENT_CACHE_PREFIX
).items():
1427 host
= k
[len(AGENT_CACHE_PREFIX
):]
1428 if host
not in self
.mgr
.inventory
:
1429 self
.mgr
.log
.warning('removing stray AgentCache record for agent on %s' % (
1431 self
.mgr
.set_store(k
, None)
1434 self
.agent_config_deps
[host
] = {}
1435 conf_deps
= j
.get('agent_config_deps', {})
1437 conf_deps
['last_config'] = str_to_datetime(conf_deps
['last_config'])
1438 self
.agent_config_deps
[host
] = conf_deps
1439 self
.agent_counter
[host
] = int(j
.get('agent_counter', 1))
1440 self
.agent_timestamp
[host
] = str_to_datetime(
1441 j
.get('agent_timestamp', datetime_to_str(datetime_now())))
1442 self
.agent_keys
[host
] = str(j
.get('agent_keys', ''))
1443 agent_port
= int(j
.get('agent_ports', 0))
1445 self
.agent_ports
[host
] = agent_port
1447 except Exception as e
:
1448 self
.mgr
.log
.warning('unable to load cached state for agent on host %s: %s' % (
1452 def save_agent(self
, host
: str) -> None:
1453 j
: Dict
[str, Any
] = {}
1454 if host
in self
.agent_config_deps
:
1455 j
['agent_config_deps'] = {
1456 'deps': self
.agent_config_deps
[host
].get('deps', []),
1457 'last_config': datetime_to_str(self
.agent_config_deps
[host
]['last_config']),
1459 if host
in self
.agent_counter
:
1460 j
['agent_counter'] = self
.agent_counter
[host
]
1461 if host
in self
.agent_keys
:
1462 j
['agent_keys'] = self
.agent_keys
[host
]
1463 if host
in self
.agent_ports
:
1464 j
['agent_ports'] = self
.agent_ports
[host
]
1465 if host
in self
.agent_timestamp
:
1466 j
['agent_timestamp'] = datetime_to_str(self
.agent_timestamp
[host
])
1468 self
.mgr
.set_store(AGENT_CACHE_PREFIX
+ host
, json
.dumps(j
))
1470 def update_agent_config_deps(self
, host
: str, deps
: List
[str], stamp
: datetime
.datetime
) -> None:
1471 self
.agent_config_deps
[host
] = {
1473 'last_config': stamp
,
1476 def get_agent_last_config_deps(self
, host
: str) -> Tuple
[Optional
[List
[str]], Optional
[datetime
.datetime
]]:
1477 if host
in self
.agent_config_deps
:
1478 return self
.agent_config_deps
[host
].get('deps', []), \
1479 self
.agent_config_deps
[host
].get('last_config', None)
1482 def messaging_agent(self
, host
: str) -> bool:
1483 if host
not in self
.sending_agent_message
or not self
.sending_agent_message
[host
]:
1487 def agent_config_successfully_delivered(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> None:
1488 # agent successfully received new config. Update config/deps
1489 assert daemon_spec
.service_name
== 'agent'
1490 self
.update_agent_config_deps(
1491 daemon_spec
.host
, daemon_spec
.deps
, datetime_now())
1492 self
.agent_timestamp
[daemon_spec
.host
] = datetime_now()
1493 self
.agent_counter
[daemon_spec
.host
] = 1
1494 self
.save_agent(daemon_spec
.host
)
1498 def __init__(self
, mgr
):
1499 # type: (CephadmOrchestrator) -> None
1500 self
.mgr
: CephadmOrchestrator
= mgr
1501 self
.events
= {} # type: Dict[str, List[OrchestratorEvent]]
1503 def add(self
, event
: OrchestratorEvent
) -> None:
1505 if event
.kind_subject() not in self
.events
:
1506 self
.events
[event
.kind_subject()] = [event
]
1508 for e
in self
.events
[event
.kind_subject()]:
1509 if e
.message
== event
.message
:
1512 self
.events
[event
.kind_subject()].append(event
)
1514 # limit to five events for now.
1515 self
.events
[event
.kind_subject()] = self
.events
[event
.kind_subject()][-5:]
1517 def for_service(self
, spec
: ServiceSpec
, level
: str, message
: str) -> None:
1518 e
= OrchestratorEvent(datetime_now(), 'service',
1519 spec
.service_name(), level
, message
)
1522 def from_orch_error(self
, e
: OrchestratorError
) -> None:
1523 if e
.event_subject
is not None:
1524 self
.add(OrchestratorEvent(
1532 def for_daemon(self
, daemon_name
: str, level
: str, message
: str) -> None:
1533 e
= OrchestratorEvent(datetime_now(), 'daemon', daemon_name
, level
, message
)
1536 def for_daemon_from_exception(self
, daemon_name
: str, e
: Exception) -> None:
1543 def cleanup(self
) -> None:
1544 # Needs to be properly done, in case events are persistently stored.
1546 unknowns
: List
[str] = []
1547 daemons
= self
.mgr
.cache
.get_daemon_names()
1548 specs
= self
.mgr
.spec_store
.all_specs
.keys()
1549 for k_s
, v
in self
.events
.items():
1550 kind
, subject
= k_s
.split(':')
1551 if kind
== 'service':
1552 if subject
not in specs
:
1553 unknowns
.append(k_s
)
1554 elif kind
== 'daemon':
1555 if subject
not in daemons
:
1556 unknowns
.append(k_s
)
1558 for k_s
in unknowns
:
1559 del self
.events
[k_s
]
1561 def get_for_service(self
, name
: str) -> List
[OrchestratorEvent
]:
1562 return self
.events
.get('service:' + name
, [])
1564 def get_for_daemon(self
, name
: str) -> List
[OrchestratorEvent
]:
1565 return self
.events
.get('daemon:' + name
, [])