+ def _enough_mons_for_ok_to_stop(self) -> bool:
+ # type () -> bool
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'quorum_status',
+ })
+ try:
+ j = json.loads(out)
+ except Exception:
+ raise OrchestratorError('failed to parse quorum status')
+
+ mons = [m['name'] for m in j['monmap']['mons']]
+ return len(mons) > 2
+
+ def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool:
+ # type (DaemonDescription) -> bool
+
+ # find fs this mds daemon belongs to
+ fsmap = self.mgr.get("fs_map")
+ for fs in fsmap.get('filesystems', []):
+ mdsmap = fs["mdsmap"]
+ fs_name = mdsmap["fs_name"]
+
+ assert mds_daemon.daemon_id
+ if fs_name != mds_daemon.service_name().split('.', 1)[1]:
+ # wrong fs for this mds daemon
+ continue
+
+ # get number of mds daemons for this fs
+ mds_count = len(
+ [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())])
+
+ # standby mds daemons for this fs?
+ if mdsmap["max_mds"] < mds_count:
+ return True
+ return False
+
+ return True # if mds has no fs it should pass ok-to-stop
+
+ def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]:
+ # this function takes a list of daemons and container digests. The purpose
+ # is to go through each daemon and check if the current container digests
+ # for that daemon match the target digests. The purpose being that we determine
+ # if a daemon is upgraded to a certain container image or not based on what
+ # container digests it has. By checking the current digests against the
+ # targets we can determine which daemons still need to be upgraded
+ need_upgrade_self = False
+ need_upgrade: List[Tuple[DaemonDescription, bool]] = []
+ need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
+ done = 0
+ if target_digests is None:
+ target_digests = []
+ for d in daemons:
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
+ assert d.hostname is not None
+ if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname):
+ continue
+ correct_digest = False
+ if (any(d in target_digests for d in (d.container_image_digests or []))
+ or d.daemon_type in MONITORING_STACK_TYPES):
+ logger.debug('daemon %s.%s container digest correct' % (
+ d.daemon_type, d.daemon_id))
+ correct_digest = True
+ if any(d in target_digests for d in (d.deployed_by or [])):
+ logger.debug('daemon %s.%s deployed by correct version' % (
+ d.daemon_type, d.daemon_id))
+ done += 1
+ continue
+
+ if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
+ logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
+ self.mgr.get_mgr_id())
+ need_upgrade_self = True
+ continue
+
+ if correct_digest:
+ logger.debug('daemon %s.%s not deployed by correct version' % (
+ d.daemon_type, d.daemon_id))
+ need_upgrade_deployer.append((d, True))
+ else:
+ logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
+ d.daemon_type, d.daemon_id,
+ d.container_image_name, d.container_image_digests, d.version))
+ need_upgrade.append((d, False))
+
+ return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done)
+
+ def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]:
+ to_upgrade: List[Tuple[DaemonDescription, bool]] = []
+ known_ok_to_stop: List[str] = []
+ for d_entry in need_upgrade:
+ d = d_entry[0]
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
+ assert d.hostname is not None
+
+ if not d.container_image_id:
+ if d.container_image_name == target_image:
+ logger.debug(
+ 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
+ continue
+
+ if known_ok_to_stop:
+ if d.name() in known_ok_to_stop:
+ logger.info(f'Upgrade: {d.name()} is also safe to restart')
+ to_upgrade.append(d_entry)
+ continue
+
+ if d.daemon_type == 'osd':
+ # NOTE: known_ok_to_stop is an output argument for
+ # _wait_for_ok_to_stop
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
+
+ if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
+
+ if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
+
+ to_upgrade.append(d_entry)
+
+ # if we don't have a list of others to consider, stop now
+ if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
+ break
+ return True, to_upgrade
+
+ def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None:
+ assert self.upgrade_state is not None
+ num = 1
+ if target_digests is None:
+ target_digests = []
+ for d_entry in to_upgrade:
+ if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
+ self.mgr.log.info(
+ f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
+ return
+ d = d_entry[0]
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
+ assert d.hostname is not None
+
+ # make sure host has latest container image
+ out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+ d.hostname, '', 'inspect-image', [],
+ image=target_image, no_fsid=True, error_ok=True))
+ if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
+ logger.info('Upgrade: Pulling %s on %s' % (target_image,
+ d.hostname))
+ self.upgrade_info_str = 'Pulling %s image on host %s' % (
+ target_image, d.hostname)
+ out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+ d.hostname, '', 'pull', [],
+ image=target_image, no_fsid=True, error_ok=True))
+ if code:
+ self._fail_upgrade('UPGRADE_FAILED_PULL', {
+ 'severity': 'warning',
+ 'summary': 'Upgrade: failed to pull target image',
+ 'count': 1,
+ 'detail': [
+ 'failed to pull %s on host %s' % (target_image,
+ d.hostname)],
+ })
+ return
+ r = json.loads(''.join(out))
+ if not any(d in target_digests for d in r.get('repo_digests', [])):
+ logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
+ target_image, d.hostname, r['repo_digests'], target_digests))
+ self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
+ target_image, d.hostname, r['repo_digests'], target_digests)
+ self.upgrade_state.target_digests = r['repo_digests']
+ self._save_upgrade_state()
+ return
+
+ self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
+
+ if len(to_upgrade) > 1:
+ logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade),
+ self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
+ else:
+ logger.info('Upgrade: Updating %s.%s' %
+ (d.daemon_type, d.daemon_id))
+ action = 'Upgrading' if not d_entry[1] else 'Redeploying'
+ try:
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
+ self.mgr._daemon_action(
+ daemon_spec,
+ 'redeploy',
+ image=target_image if not d_entry[1] else None
+ )
+ self.mgr.cache.metadata_up_to_date[d.hostname] = False
+ except Exception as e:
+ self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
+ 'severity': 'warning',
+ 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
+ 'count': 1,
+ 'detail': [
+ f'Upgrade daemon: {d.name()}: {e}'
+ ],
+ })
+ return
+ num += 1
+ if self.upgrade_state.remaining_count is not None and not d_entry[1]:
+ self.upgrade_state.remaining_count -= 1
+ self._save_upgrade_state()
+
+ def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
+ if need_upgrade_self:
+ try:
+ self.mgr.mgr_service.fail_over()
+ except OrchestratorError as e:
+ self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
+ 'severity': 'warning',
+ 'summary': f'Upgrade: {e}',
+ 'count': 1,
+ 'detail': [
+ 'The upgrade process needs to upgrade the mgr, '
+ 'but it needs at least one standby to proceed.',
+ ],
+ })
+ return
+
+ return # unreachable code, as fail_over never returns
+ elif upgrading_mgrs:
+ if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
+ del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
+ self.mgr.set_health_checks(self.mgr.health_checks)
+
+ def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None:
+ # push down configs
+ daemon_type_section = name_to_config_section(daemon_type)
+ if image_settings.get(daemon_type_section) != target_image:
+ logger.info('Upgrade: Setting container_image for all %s' %
+ daemon_type)
+ self.mgr.set_container_image(daemon_type_section, target_image)
+ to_clean = []
+ for section in image_settings.keys():
+ if section.startswith(name_to_config_section(daemon_type) + '.'):
+ to_clean.append(section)
+ if to_clean:
+ logger.debug('Upgrade: Cleaning up container_image for %s' %
+ to_clean)
+ for section in to_clean:
+ ret, image, err = self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'name': 'container_image',
+ 'who': section,
+ })
+
+ def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None:
+ osdmap = self.mgr.get("osd_map")
+ osd_min_name = osdmap.get("require_osd_release", "argonaut")
+ osd_min = ceph_release_to_major(osd_min_name)
+ if osd_min < int(target_major):
+ logger.info(
+ f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
+ ret, _, err = self.mgr.check_mon_command({
+ 'prefix': 'osd require-osd-release',
+ 'release': target_major_name,
+ })
+
+ def _complete_mds_upgrade(self) -> None:
+ assert self.upgrade_state is not None
+ if self.upgrade_state.fs_original_max_mds:
+ for fs in self.mgr.get("fs_map")['filesystems']:
+ fscid = fs["id"]
+ fs_name = fs['mdsmap']['fs_name']
+ new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
+ if new_max > 1:
+ self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
+ fs_name, new_max
+ ))
+ ret, _, err = self.mgr.check_mon_command({
+ 'prefix': 'fs set',
+ 'fs_name': fs_name,
+ 'var': 'max_mds',
+ 'val': str(new_max),
+ })
+
+ self.upgrade_state.fs_original_max_mds = {}
+ self._save_upgrade_state()
+ if self.upgrade_state.fs_original_allow_standby_replay:
+ for fs in self.mgr.get("fs_map")['filesystems']:
+ fscid = fs["id"]
+ fs_name = fs['mdsmap']['fs_name']
+ asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
+ if asr:
+ self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
+ fs_name
+ ))
+ ret, _, err = self.mgr.check_mon_command({
+ 'prefix': 'fs set',
+ 'fs_name': fs_name,
+ 'var': 'allow_standby_replay',
+ 'val': '1'
+ })
+
+ self.upgrade_state.fs_original_allow_standby_replay = {}
+ self._save_upgrade_state()
+
+ def _mark_upgrade_complete(self) -> None:
+ if not self.upgrade_state:
+ logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
+ return
+ logger.info('Upgrade: Complete!')
+ if self.upgrade_state.progress_id:
+ self.mgr.remote('progress', 'complete',
+ self.upgrade_state.progress_id)
+ self.upgrade_state = None
+ self._save_upgrade_state()
+