import logging
import time
import uuid
-from typing import TYPE_CHECKING, Optional, Dict, List, Tuple
+from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any
import orchestrator
+from cephadm.registry import Registry
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
-from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES
-from orchestrator import OrchestratorError, DaemonDescription, daemon_type_to_service
+from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \
+ MONITORING_STACK_TYPES, CEPH_TYPES, GATEWAY_TYPES
+from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
if TYPE_CHECKING:
from .module import CephadmOrchestrator
logger = logging.getLogger(__name__)
+# from ceph_fs.h
+CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
+
def normalize_image_digest(digest: str, default_registry: str) -> str:
- # normal case:
- # ceph/ceph -> docker.io/ceph/ceph
- # edge cases that shouldn't ever come up:
- # ubuntu -> docker.io/ubuntu (ubuntu alias for library/ubuntu)
- # no change:
- # quay.ceph.io/ceph/ceph -> ceph
- # docker.io/ubuntu -> no change
- bits = digest.split('/')
- if '.' not in bits[0] or len(bits) < 3:
- digest = 'docker.io/' + digest
+ """
+ Normal case:
+ >>> normalize_image_digest('ceph/ceph', 'docker.io')
+ 'docker.io/ceph/ceph'
+
+ No change:
+ >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io')
+ 'quay.ceph.io/ceph/ceph'
+
+ >>> normalize_image_digest('docker.io/ubuntu', 'docker.io')
+ 'docker.io/ubuntu'
+
+ >>> normalize_image_digest('localhost/ceph', 'docker.io')
+ 'localhost/ceph'
+ """
+ known_shortnames = [
+ 'ceph/ceph',
+ 'ceph/daemon',
+ 'ceph/daemon-base',
+ ]
+ for image in known_shortnames:
+ if digest.startswith(image):
+ return f'{default_registry}/{digest}'
return digest
error: Optional[str] = None,
paused: Optional[bool] = None,
fs_original_max_mds: Optional[Dict[str, int]] = None,
+ fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None,
+ daemon_types: Optional[List[str]] = None,
+ hosts: Optional[List[str]] = None,
+ services: Optional[List[str]] = None,
+ total_count: Optional[int] = None,
+ remaining_count: Optional[int] = None,
):
self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
self.progress_id: str = progress_id
self.error: Optional[str] = error
self.paused: bool = paused or False
self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
+ self.fs_original_allow_standby_replay: Optional[Dict[str,
+ bool]] = fs_original_allow_standby_replay
+ self.daemon_types = daemon_types
+ self.hosts = hosts
+ self.services = services
+ self.total_count = total_count
+ self.remaining_count = remaining_count
def to_json(self) -> dict:
return {
'target_digests': self.target_digests,
'target_version': self.target_version,
'fs_original_max_mds': self.fs_original_max_mds,
+ 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
'error': self.error,
'paused': self.paused,
+ 'daemon_types': self.daemon_types,
+ 'hosts': self.hosts,
+ 'services': self.services,
+ 'total_count': self.total_count,
+ 'remaining_count': self.remaining_count,
}
@classmethod
def from_json(cls, data: dict) -> Optional['UpgradeState']:
+ valid_params = UpgradeState.__init__.__code__.co_varnames
if data:
- c = {k: v for k, v in data.items()}
+ c = {k: v for k, v in data.items() if k in valid_params}
if 'repo_digest' in c:
c['target_digests'] = [c.pop('repo_digest')]
return cls(**c)
r.target_image = self.target_image
r.in_progress = True
r.progress, r.services_complete = self._get_upgrade_info()
+ r.is_paused = self.upgrade_state.paused
+
+ if self.upgrade_state.daemon_types is not None:
+ which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
+ if self.upgrade_state.hosts is not None:
+ which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
+ elif self.upgrade_state.services is not None:
+ which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
+ if self.upgrade_state.hosts is not None:
+ which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
+ elif self.upgrade_state.hosts is not None:
+ which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
+ else:
+ which_str = 'Upgrading all daemon types on all hosts'
+ if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
+ which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
+ r.which = which_str
+
# accessing self.upgrade_info_str will throw an exception if it
# has not been set in _do_upgrade yet
try:
if not self.upgrade_state or not self.upgrade_state.target_digests:
return '', []
- daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
+ daemons = self._get_filtered_daemons()
if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
return '', []
completed_types = list(set([completion[0] for completion in completed_daemons if all(
c[1] for c in completed_daemons if c[0] == completion[0])]))
- return '%s/%s ceph daemons upgraded' % (done, len(daemons)), completed_types
+ return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
+
+ def _get_filtered_daemons(self) -> List[DaemonDescription]:
+ # Return the set of daemons set to be upgraded with out current
+ # filtering parameters (or all daemons in upgrade order if no filtering
+ # parameter are set).
+ assert self.upgrade_state is not None
+ if self.upgrade_state.daemon_types is not None:
+ daemons = [d for d in self.mgr.cache.get_daemons(
+ ) if d.daemon_type in self.upgrade_state.daemon_types]
+ elif self.upgrade_state.services is not None:
+ daemons = []
+ for service in self.upgrade_state.services:
+ daemons += self.mgr.cache.get_daemons_by_service(service)
+ else:
+ daemons = [d for d in self.mgr.cache.get_daemons(
+ ) if d.daemon_type in CEPH_UPGRADE_ORDER]
+ if self.upgrade_state.hosts is not None:
+ daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
+ return daemons
+
+ def _get_current_version(self) -> Tuple[int, int, str]:
+ current_version = self.mgr.version.split('ceph version ')[1]
+ (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
+ return (int(current_major), int(current_minor), current_version)
def _check_target_version(self, version: str) -> Optional[str]:
try:
- (major, minor, _) = version.split('.', 2)
- assert int(minor) >= 0
+ v = version.split('.', 2)
+ (major, minor) = (int(v[0]), int(v[1]))
+ assert minor >= 0
# patch might be a number or {number}-g{sha1}
except ValueError:
return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
- if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
+ if major < 15 or (major == 15 and minor < 2):
return 'cephadm only supports octopus (15.2.0) or later'
# to far a jump?
- current_version = self.mgr.version.split('ceph version ')[1]
- (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
- if int(current_major) < int(major) - 2:
+ (current_major, current_minor, current_version) = self._get_current_version()
+ if current_major < major - 2:
return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
- if int(current_major) > int(major):
+ if current_major > major:
return f'ceph cannot downgrade major versions (from {current_version} to {version})'
- if int(current_major) == int(major):
- if int(current_minor) > int(minor):
- return f'ceph cannot downgrade to a {"rc" if minor == "1" else "dev"} release'
+ if current_major == major:
+ if current_minor > minor:
+ return f'ceph cannot downgrade to a {"rc" if minor == 1 else "dev"} release'
# check mon min
monmap = self.mgr.get("mon_map")
mon_min = monmap.get("min_mon_release", 0)
- if mon_min < int(major) - 2:
+ if mon_min < major - 2:
return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
# check osd min
osdmap = self.mgr.get("osd_map")
osd_min_name = osdmap.get("require_osd_release", "argonaut")
osd_min = ceph_release_to_major(osd_min_name)
- if osd_min < int(major) - 2:
+ if osd_min < major - 2:
return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
return None
- def upgrade_start(self, image: str, version: str) -> str:
+ def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict:
+ if not image:
+ image = self.mgr.container_image_base
+ reg_name, bare_image = image.split('/', 1)
+ reg = Registry(reg_name)
+ (current_major, current_minor, _) = self._get_current_version()
+ versions = []
+ r: Dict[Any, Any] = {
+ "image": image,
+ "registry": reg_name,
+ "bare_image": bare_image,
+ }
+
+ try:
+ ls = reg.get_tags(bare_image)
+ except ValueError as e:
+ raise OrchestratorError(f'{e}')
+ if not tags:
+ for t in ls:
+ if t[0] != 'v':
+ continue
+ v = t[1:].split('.')
+ if len(v) != 3:
+ continue
+ if '-' in v[2]:
+ continue
+ v_major = int(v[0])
+ v_minor = int(v[1])
+ candidate_version = (v_major > current_major
+ or (v_major == current_major and v_minor >= current_minor))
+ if show_all_versions or candidate_version:
+ versions.append('.'.join(v))
+ r["versions"] = sorted(
+ versions,
+ key=lambda k: list(map(int, k.split('.'))),
+ reverse=True
+ )
+ else:
+ r["tags"] = sorted(ls)
+ return r
+
+ def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
+ hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
if self.mgr.mode != 'root':
raise OrchestratorError('upgrade is not supported in %s mode' % (
self.mgr.mode))
target_name = normalize_image_digest(image, self.mgr.default_registry)
else:
raise OrchestratorError('must specify either image or version')
+
+ if daemon_types is not None or services is not None or hosts is not None:
+ self._validate_upgrade_filters(target_name, daemon_types, hosts, services)
+
if self.upgrade_state:
if self.upgrade_state._target_name != target_name:
raise OrchestratorError(
self._save_upgrade_state()
return 'Resumed upgrade to %s' % self.target_image
return 'Upgrade to %s in progress' % self.target_image
+
+ running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type(
+ 'mgr') if daemon.status == DaemonDescriptionStatus.running])
+
+ if running_mgr_count < 2:
+ raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
+
self.mgr.log.info('Upgrade: Started with target %s' % target_name)
self.upgrade_state = UpgradeState(
target_name=target_name,
- progress_id=str(uuid.uuid4())
+ progress_id=str(uuid.uuid4()),
+ daemon_types=daemon_types,
+ hosts=hosts,
+ services=services,
+ total_count=limit,
+ remaining_count=limit,
)
self._update_upgrade_progress(0.0)
self._save_upgrade_state()
self.mgr.event.set()
return 'Initiating upgrade to %s' % (target_name)
+ def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None:
+ def _latest_type(dtypes: List[str]) -> str:
+ # [::-1] gives the list in reverse
+ for daemon_type in CEPH_UPGRADE_ORDER[::-1]:
+ if daemon_type in dtypes:
+ return daemon_type
+ return ''
+
+ def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]:
+ # this function takes a list of daemon types and first finds the daemon
+ # type from that list that is latest in our upgrade order. Then, from
+ # that latest type, it filters the list of candidate daemons received
+ # for daemons with types earlier in the upgrade order than the latest
+ # type found earlier. That filtered list of daemons is returned. The
+ # purpose of this function is to help in finding daemons that must have
+ # already been upgraded for the given filtering parameters (--daemon-types,
+ # --services, --hosts) to be valid.
+ latest = _latest_type(dtypes)
+ if not latest:
+ return []
+ earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1]
+ earlier_types = [t for t in earlier_types if t not in dtypes]
+ return [d for d in candidates if d.daemon_type in earlier_types]
+
+ if self.upgrade_state:
+ raise OrchestratorError(
+ 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
+ try:
+ target_id, target_version, target_digests = self.mgr.wait_async(
+ CephadmServe(self.mgr)._get_container_image_info(target_name))
+ except OrchestratorError as e:
+ raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
+ # what we need to do here is build a list of daemons that must already be upgraded
+ # in order for the user's selection of daemons to upgrade to be valid. for example,
+ # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
+ daemons = [d for d in self.mgr.cache.get_daemons(
+ ) if d.daemon_type not in MONITORING_STACK_TYPES]
+ err_msg_base = 'Cannot start upgrade. '
+ # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
+ dtypes = []
+ if daemon_types is not None:
+ dtypes = daemon_types
+ if hosts is not None:
+ dtypes = [_latest_type(dtypes)]
+ other_host_daemons = [
+ d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+ daemons = _get_earlier_daemons(dtypes, other_host_daemons)
+ else:
+ daemons = _get_earlier_daemons(dtypes, daemons)
+ err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n'
+ elif services is not None:
+ # for our purposes here we can effectively convert our list of services into the
+ # set of daemon types the services contain. This works because we don't allow --services
+ # and --daemon-types at the same time and we only allow services of the same type
+ sspecs = [
+ self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None]
+ stypes = list(set([s.service_type for s in sspecs]))
+ if len(stypes) != 1:
+ raise OrchestratorError('Doing upgrade by service only support services of one type at '
+ f'a time. Found service types: {stypes}')
+ for stype in stypes:
+ dtypes += orchestrator.service_to_daemon_types(stype)
+ dtypes = list(set(dtypes))
+ if hosts is not None:
+ other_host_daemons = [
+ d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+ daemons = _get_earlier_daemons(dtypes, other_host_daemons)
+ else:
+ daemons = _get_earlier_daemons(dtypes, daemons)
+ err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n'
+ elif hosts is not None:
+ # hosts must be handled a bit differently. For this, we really need to find all the daemon types
+ # that reside on hosts in the list of hosts we will upgrade. Then take the type from
+ # that list that is latest in the upgrade order and check if any daemons on hosts not in the
+ # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
+ dtypes = list(
+ set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts]))
+ other_hosts_daemons = [
+ d for d in daemons if d.hostname is not None and d.hostname not in hosts]
+ daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons)
+ err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
+ need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests)
+ if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)):
+ # also report active mgr as needing to be upgraded. It is not included in the resulting list
+ # by default as it is treated special and handled via the need_upgrade_self bool
+ n1.insert(0, (self.mgr.mgr_service.get_active_daemon(
+ self.mgr.cache.get_daemons_by_type('mgr')), True))
+ if n1 or n2:
+ raise OrchestratorError(f'{err_msg_base}Please first upgrade '
+ f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
+ f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}')
+
def upgrade_pause(self) -> str:
if not self.upgrade_state:
raise OrchestratorError('No upgrade in progress')
if not self.upgrade_state.paused:
return 'Upgrade to %s not paused' % self.target_image
self.upgrade_state.paused = False
+ self.upgrade_state.error = ''
self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image)
self._save_upgrade_state()
self.mgr.event.set()
target_major: str,
need_upgrade: List[DaemonDescription]
) -> bool:
- # are any daemons running a different major version?
- scale_down = False
- for name, info in self.mgr.get("mds_metadata").items():
- version = info.get("ceph_version_short")
- major_version = None
- if version:
- major_version = version.split('.')[0]
- if not major_version:
- self.mgr.log.info('Upgrade: mds.%s version is not known, will retry' % name)
- time.sleep(5)
- return False
- if int(major_version) < int(target_major):
- scale_down = True
-
- if not scale_down:
- self.mgr.log.debug('Upgrade: All MDS daemons run same major version')
- return True
-
# scale down all filesystems to 1 MDS
assert self.upgrade_state
if not self.upgrade_state.fs_original_max_mds:
self.upgrade_state.fs_original_max_mds = {}
+ if not self.upgrade_state.fs_original_allow_standby_replay:
+ self.upgrade_state.fs_original_allow_standby_replay = {}
fsmap = self.mgr.get("fs_map")
continue_upgrade = True
- for i in fsmap.get('filesystems', []):
- fs = i["mdsmap"]
- fs_id = i["id"]
- fs_name = fs["fs_name"]
+ for fs in fsmap.get('filesystems', []):
+ fscid = fs["id"]
+ mdsmap = fs["mdsmap"]
+ fs_name = mdsmap["fs_name"]
+
+ # disable allow_standby_replay?
+ if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY:
+ self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % (
+ fs_name
+ ))
+ if fscid not in self.upgrade_state.fs_original_allow_standby_replay:
+ self.upgrade_state.fs_original_allow_standby_replay[fscid] = True
+ self._save_upgrade_state()
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'fs set',
+ 'fs_name': fs_name,
+ 'var': 'allow_standby_replay',
+ 'val': '0',
+ })
+ continue_upgrade = False
+ continue
# scale down this filesystem?
- if fs["max_mds"] > 1:
+ if mdsmap["max_mds"] > 1:
self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
fs_name
))
- if fs_id not in self.upgrade_state.fs_original_max_mds:
- self.upgrade_state.fs_original_max_mds[fs_id] = fs['max_mds']
+ if fscid not in self.upgrade_state.fs_original_max_mds:
+ self.upgrade_state.fs_original_max_mds[fscid] = mdsmap['max_mds']
self._save_upgrade_state()
ret, out, err = self.mgr.check_mon_command({
'prefix': 'fs set',
continue_upgrade = False
continue
- if len(fs['info']) > 1:
- self.mgr.log.info('Upgrade: Waiting for fs %s to scale down to 1 MDS' % (fs_name))
+ if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
+ self.mgr.log.info(
+ 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
time.sleep(10)
continue_upgrade = False
continue
- lone_mds = list(fs['info'].values())[0]
- if lone_mds['state'] != 'up:active':
- self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
- lone_mds['name'],
- lone_mds['state'],
- ))
- time.sleep(10)
- continue_upgrade = False
- continue
+ if len(mdsmap['up']) == 0:
+ self.mgr.log.warning(
+ "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
+ # This can happen because the current version MDS have
+ # incompatible compatsets; the mons will not do any promotions.
+ # We must upgrade to continue.
+ elif len(mdsmap['up']) > 0:
+ mdss = list(mdsmap['info'].values())
+ assert len(mdss) == 1
+ lone_mds = mdss[0]
+ if lone_mds['state'] != 'up:active':
+ self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
+ lone_mds['name'],
+ lone_mds['state'],
+ ))
+ time.sleep(10)
+ continue_upgrade = False
+ continue
+ else:
+ assert False
return continue_upgrade
+ def _enough_mons_for_ok_to_stop(self) -> bool:
+ # type () -> bool
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'quorum_status',
+ })
+ try:
+ j = json.loads(out)
+ except Exception:
+ raise OrchestratorError('failed to parse quorum status')
+
+ mons = [m['name'] for m in j['monmap']['mons']]
+ return len(mons) > 2
+
+ def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool:
+ # type (DaemonDescription) -> bool
+
+ # find fs this mds daemon belongs to
+ fsmap = self.mgr.get("fs_map")
+ for fs in fsmap.get('filesystems', []):
+ mdsmap = fs["mdsmap"]
+ fs_name = mdsmap["fs_name"]
+
+ assert mds_daemon.daemon_id
+ if fs_name != mds_daemon.service_name().split('.', 1)[1]:
+ # wrong fs for this mds daemon
+ continue
+
+ # get number of mds daemons for this fs
+ mds_count = len(
+ [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())])
+
+ # standby mds daemons for this fs?
+ if mdsmap["max_mds"] < mds_count:
+ return True
+ return False
+
+ return True # if mds has no fs it should pass ok-to-stop
+
+ def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]:
+ # this function takes a list of daemons and container digests. The purpose
+ # is to go through each daemon and check if the current container digests
+ # for that daemon match the target digests. The purpose being that we determine
+ # if a daemon is upgraded to a certain container image or not based on what
+ # container digests it has. By checking the current digests against the
+ # targets we can determine which daemons still need to be upgraded
+ need_upgrade_self = False
+ need_upgrade: List[Tuple[DaemonDescription, bool]] = []
+ need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
+ done = 0
+ if target_digests is None:
+ target_digests = []
+ for d in daemons:
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
+ assert d.hostname is not None
+ if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname):
+ continue
+ correct_digest = False
+ if (any(d in target_digests for d in (d.container_image_digests or []))
+ or d.daemon_type in MONITORING_STACK_TYPES):
+ logger.debug('daemon %s.%s container digest correct' % (
+ d.daemon_type, d.daemon_id))
+ correct_digest = True
+ if any(d in target_digests for d in (d.deployed_by or [])):
+ logger.debug('daemon %s.%s deployed by correct version' % (
+ d.daemon_type, d.daemon_id))
+ done += 1
+ continue
+
+ if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
+ logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
+ self.mgr.get_mgr_id())
+ need_upgrade_self = True
+ continue
+
+ if correct_digest:
+ logger.debug('daemon %s.%s not deployed by correct version' % (
+ d.daemon_type, d.daemon_id))
+ need_upgrade_deployer.append((d, True))
+ else:
+ logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
+ d.daemon_type, d.daemon_id,
+ d.container_image_name, d.container_image_digests, d.version))
+ need_upgrade.append((d, False))
+
+ return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done)
+
+ def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]:
+ to_upgrade: List[Tuple[DaemonDescription, bool]] = []
+ known_ok_to_stop: List[str] = []
+ for d_entry in need_upgrade:
+ d = d_entry[0]
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
+ assert d.hostname is not None
+
+ if not d.container_image_id:
+ if d.container_image_name == target_image:
+ logger.debug(
+ 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
+ continue
+
+ if known_ok_to_stop:
+ if d.name() in known_ok_to_stop:
+ logger.info(f'Upgrade: {d.name()} is also safe to restart')
+ to_upgrade.append(d_entry)
+ continue
+
+ if d.daemon_type == 'osd':
+ # NOTE: known_ok_to_stop is an output argument for
+ # _wait_for_ok_to_stop
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
+
+ if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
+
+ if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
+
+ to_upgrade.append(d_entry)
+
+ # if we don't have a list of others to consider, stop now
+ if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
+ break
+ return True, to_upgrade
+
+ def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None:
+ assert self.upgrade_state is not None
+ num = 1
+ if target_digests is None:
+ target_digests = []
+ for d_entry in to_upgrade:
+ if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
+ self.mgr.log.info(
+ f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
+ return
+ d = d_entry[0]
+ assert d.daemon_type is not None
+ assert d.daemon_id is not None
+ assert d.hostname is not None
+
+ # make sure host has latest container image
+ out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+ d.hostname, '', 'inspect-image', [],
+ image=target_image, no_fsid=True, error_ok=True))
+ if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
+ logger.info('Upgrade: Pulling %s on %s' % (target_image,
+ d.hostname))
+ self.upgrade_info_str = 'Pulling %s image on host %s' % (
+ target_image, d.hostname)
+ out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
+ d.hostname, '', 'pull', [],
+ image=target_image, no_fsid=True, error_ok=True))
+ if code:
+ self._fail_upgrade('UPGRADE_FAILED_PULL', {
+ 'severity': 'warning',
+ 'summary': 'Upgrade: failed to pull target image',
+ 'count': 1,
+ 'detail': [
+ 'failed to pull %s on host %s' % (target_image,
+ d.hostname)],
+ })
+ return
+ r = json.loads(''.join(out))
+ if not any(d in target_digests for d in r.get('repo_digests', [])):
+ logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
+ target_image, d.hostname, r['repo_digests'], target_digests))
+ self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
+ target_image, d.hostname, r['repo_digests'], target_digests)
+ self.upgrade_state.target_digests = r['repo_digests']
+ self._save_upgrade_state()
+ return
+
+ self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
+
+ if len(to_upgrade) > 1:
+ logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade),
+ self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
+ else:
+ logger.info('Upgrade: Updating %s.%s' %
+ (d.daemon_type, d.daemon_id))
+ action = 'Upgrading' if not d_entry[1] else 'Redeploying'
+ try:
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
+ self.mgr._daemon_action(
+ daemon_spec,
+ 'redeploy',
+ image=target_image if not d_entry[1] else None
+ )
+ self.mgr.cache.metadata_up_to_date[d.hostname] = False
+ except Exception as e:
+ self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
+ 'severity': 'warning',
+ 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
+ 'count': 1,
+ 'detail': [
+ f'Upgrade daemon: {d.name()}: {e}'
+ ],
+ })
+ return
+ num += 1
+ if self.upgrade_state.remaining_count is not None and not d_entry[1]:
+ self.upgrade_state.remaining_count -= 1
+ self._save_upgrade_state()
+
+ def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
+ if need_upgrade_self:
+ try:
+ self.mgr.mgr_service.fail_over()
+ except OrchestratorError as e:
+ self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
+ 'severity': 'warning',
+ 'summary': f'Upgrade: {e}',
+ 'count': 1,
+ 'detail': [
+ 'The upgrade process needs to upgrade the mgr, '
+ 'but it needs at least one standby to proceed.',
+ ],
+ })
+ return
+
+ return # unreachable code, as fail_over never returns
+ elif upgrading_mgrs:
+ if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
+ del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
+ self.mgr.set_health_checks(self.mgr.health_checks)
+
+ def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None:
+ # push down configs
+ daemon_type_section = name_to_config_section(daemon_type)
+ if image_settings.get(daemon_type_section) != target_image:
+ logger.info('Upgrade: Setting container_image for all %s' %
+ daemon_type)
+ self.mgr.set_container_image(daemon_type_section, target_image)
+ to_clean = []
+ for section in image_settings.keys():
+ if section.startswith(name_to_config_section(daemon_type) + '.'):
+ to_clean.append(section)
+ if to_clean:
+ logger.debug('Upgrade: Cleaning up container_image for %s' %
+ to_clean)
+ for section in to_clean:
+ ret, image, err = self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'name': 'container_image',
+ 'who': section,
+ })
+
+ def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None:
+ osdmap = self.mgr.get("osd_map")
+ osd_min_name = osdmap.get("require_osd_release", "argonaut")
+ osd_min = ceph_release_to_major(osd_min_name)
+ if osd_min < int(target_major):
+ logger.info(
+ f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
+ ret, _, err = self.mgr.check_mon_command({
+ 'prefix': 'osd require-osd-release',
+ 'release': target_major_name,
+ })
+
+ def _complete_mds_upgrade(self) -> None:
+ assert self.upgrade_state is not None
+ if self.upgrade_state.fs_original_max_mds:
+ for fs in self.mgr.get("fs_map")['filesystems']:
+ fscid = fs["id"]
+ fs_name = fs['mdsmap']['fs_name']
+ new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
+ if new_max > 1:
+ self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
+ fs_name, new_max
+ ))
+ ret, _, err = self.mgr.check_mon_command({
+ 'prefix': 'fs set',
+ 'fs_name': fs_name,
+ 'var': 'max_mds',
+ 'val': str(new_max),
+ })
+
+ self.upgrade_state.fs_original_max_mds = {}
+ self._save_upgrade_state()
+ if self.upgrade_state.fs_original_allow_standby_replay:
+ for fs in self.mgr.get("fs_map")['filesystems']:
+ fscid = fs["id"]
+ fs_name = fs['mdsmap']['fs_name']
+ asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
+ if asr:
+ self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
+ fs_name
+ ))
+ ret, _, err = self.mgr.check_mon_command({
+ 'prefix': 'fs set',
+ 'fs_name': fs_name,
+ 'var': 'allow_standby_replay',
+ 'val': '1'
+ })
+
+ self.upgrade_state.fs_original_allow_standby_replay = {}
+ self._save_upgrade_state()
+
+ def _mark_upgrade_complete(self) -> None:
+ if not self.upgrade_state:
+ logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
+ return
+ logger.info('Upgrade: Complete!')
+ if self.upgrade_state.progress_id:
+ self.mgr.remote('progress', 'complete',
+ self.upgrade_state.progress_id)
+ self.upgrade_state = None
+ self._save_upgrade_state()
+
def _do_upgrade(self):
# type: () -> None
if not self.upgrade_state:
logger.info('Upgrade: First pull of %s' % target_image)
self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
try:
- target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
- target_image)
+ target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
+ target_image))
except OrchestratorError as e:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
'severity': 'warning',
image_settings = self.get_distinct_container_image_settings()
- daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
- done = 0
- for daemon_type in CEPH_UPGRADE_ORDER:
- logger.debug('Upgrade: Checking %s daemons' % daemon_type)
+ # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
+ # MDSMap::compat for all fs. This is no longer the case beginning in
+ # v16.2.5. We must disable the sanity checks during upgrade.
+ # N.B.: we don't bother confirming the operator has not already
+ # disabled this or saving the config value.
+ self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'name': 'mon_mds_skip_sanity',
+ 'value': '1',
+ 'who': 'mon',
+ })
- need_upgrade_self = False
- need_upgrade: List[Tuple[DaemonDescription, bool]] = []
- need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
- for d in daemons:
- if d.daemon_type != daemon_type:
- continue
- assert d.daemon_type is not None
- assert d.daemon_id is not None
- correct_digest = False
- if (any(d in target_digests for d in (d.container_image_digests or []))
- or d.daemon_type in MONITORING_STACK_TYPES):
- logger.debug('daemon %s.%s container digest correct' % (
- daemon_type, d.daemon_id))
- correct_digest = True
- if any(d in target_digests for d in (d.deployed_by or [])):
- logger.debug('daemon %s.%s deployed by correct version' % (
- d.daemon_type, d.daemon_id))
- done += 1
+ if self.upgrade_state.daemon_types is not None:
+ logger.debug(
+ f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
+ daemons = [d for d in self.mgr.cache.get_daemons(
+ ) if d.daemon_type in self.upgrade_state.daemon_types]
+ elif self.upgrade_state.services is not None:
+ logger.debug(
+ f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
+ daemons = []
+ for service in self.upgrade_state.services:
+ daemons += self.mgr.cache.get_daemons_by_service(service)
+ else:
+ daemons = [d for d in self.mgr.cache.get_daemons(
+ ) if d.daemon_type in CEPH_UPGRADE_ORDER]
+ if self.upgrade_state.hosts is not None:
+ logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
+ daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
+ upgraded_daemon_count: int = 0
+ for daemon_type in CEPH_UPGRADE_ORDER:
+ if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0:
+ # we hit our limit and should end the upgrade
+ # except for cases where we only need to redeploy, but not actually upgrade
+ # the image (which we don't count towards our limit). This case only occurs with mgr
+ # and monitoring stack daemons. Additionally, this case is only valid if
+ # the active mgr is already upgraded.
+ if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
+ if daemon_type not in MONITORING_STACK_TYPES and daemon_type != 'mgr':
continue
-
- if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
- logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
- self.mgr.get_mgr_id())
- need_upgrade_self = True
- continue
-
- if correct_digest:
- logger.debug('daemon %s.%s not deployed by correct version' % (
- d.daemon_type, d.daemon_id))
- need_upgrade_deployer.append((d, True))
else:
- logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
- daemon_type, d.daemon_id,
- d.container_image_name, d.container_image_digests, d.version))
- need_upgrade.append((d, False))
+ self._mark_upgrade_complete()
+ return
+ logger.debug('Upgrade: Checking %s daemons' % daemon_type)
+ daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
+
+ need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(
+ daemons_of_type, target_digests)
+ upgraded_daemon_count += done
+ self._update_upgrade_progress(upgraded_daemon_count / len(daemons))
+
+ # make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios
+ if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES:
+ if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
+ need_upgrade_names = [d[0].name() for d in need_upgrade] + \
+ [d[0].name() for d in need_upgrade_deployer]
+ dds = [d for d in self.mgr.cache.get_daemons_by_type(
+ daemon_type) if d.name() not in need_upgrade_names]
+ need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests)
+ if not n1:
+ if not need_upgrade_self and need_upgrade_active:
+ need_upgrade_self = True
+ need_upgrade_deployer += n2
+ else:
+ # no point in trying to redeploy with new version if active mgr is not on the new version
+ need_upgrade_deployer = []
if not need_upgrade_self:
# only after the mgr itself is upgraded can we expect daemons to have
if need_upgrade:
self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
- to_upgrade: List[Tuple[DaemonDescription, bool]] = []
- known_ok_to_stop: List[str] = []
- for d_entry in need_upgrade:
- d = d_entry[0]
- assert d.daemon_type is not None
- assert d.daemon_id is not None
- assert d.hostname is not None
-
- if not d.container_image_id:
- if d.container_image_name == target_image:
- logger.debug(
- 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
- continue
-
- if known_ok_to_stop:
- if d.name() in known_ok_to_stop:
- logger.info(f'Upgrade: {d.name()} is also safe to restart')
- to_upgrade.append(d_entry)
- continue
-
- if d.daemon_type in ['mon', 'osd', 'mds']:
- # NOTE: known_ok_to_stop is an output argument for
- # _wait_for_ok_to_stop
- if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
- return
-
- to_upgrade.append(d_entry)
-
- # if we don't have a list of others to consider, stop now
- if not known_ok_to_stop:
- break
-
- num = 1
- for d_entry in to_upgrade:
- d = d_entry[0]
- assert d.daemon_type is not None
- assert d.daemon_id is not None
- assert d.hostname is not None
-
- self._update_upgrade_progress(done / len(daemons))
-
- # make sure host has latest container image
- out, errs, code = CephadmServe(self.mgr)._run_cephadm(
- d.hostname, '', 'inspect-image', [],
- image=target_image, no_fsid=True, error_ok=True)
- if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
- logger.info('Upgrade: Pulling %s on %s' % (target_image,
- d.hostname))
- self.upgrade_info_str = 'Pulling %s image on host %s' % (
- target_image, d.hostname)
- out, errs, code = CephadmServe(self.mgr)._run_cephadm(
- d.hostname, '', 'pull', [],
- image=target_image, no_fsid=True, error_ok=True)
- if code:
- self._fail_upgrade('UPGRADE_FAILED_PULL', {
- 'severity': 'warning',
- 'summary': 'Upgrade: failed to pull target image',
- 'count': 1,
- 'detail': [
- 'failed to pull %s on host %s' % (target_image,
- d.hostname)],
- })
- return
- r = json.loads(''.join(out))
- if not any(d in target_digests for d in r.get('repo_digests', [])):
- logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
- target_image, d.hostname, r['repo_digests'], target_digests))
- self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
- target_image, d.hostname, r['repo_digests'], target_digests)
- self.upgrade_state.target_digests = r['repo_digests']
- self._save_upgrade_state()
- return
-
- self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
-
- if len(to_upgrade) > 1:
- logger.info('Upgrade: Updating %s.%s (%d/%d)' %
- (d.daemon_type, d.daemon_id, num, len(to_upgrade)))
- else:
- logger.info('Upgrade: Updating %s.%s' %
- (d.daemon_type, d.daemon_id))
- action = 'Upgrading' if not d_entry[1] else 'Redeploying'
- try:
- daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
- self.mgr._daemon_action(
- daemon_spec,
- 'redeploy',
- image=target_image if not d_entry[1] else None
- )
- except Exception as e:
- self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
- 'severity': 'warning',
- 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
- 'count': 1,
- 'detail': [
- f'Upgrade daemon: {d.name()}: {e}'
- ],
- })
- return
- num += 1
+ _continue, to_upgrade = self._to_upgrade(need_upgrade, target_image)
+ if not _continue:
+ return
+ self._upgrade_daemons(to_upgrade, target_image, target_digests)
if to_upgrade:
return
+ self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
+
+ # following bits of _do_upgrade are for completing upgrade for given
+ # types. If we haven't actually finished upgrading all the daemons
+ # of this type, we should exit the loop here
+ _, n1, n2, _ = self._detect_need_upgrade(
+ self.mgr.cache.get_daemons_by_type(daemon_type), target_digests)
+ if n1 or n2:
+ continue
+
# complete mon upgrade?
if daemon_type == 'mon':
if not self.mgr.get("have_local_config_map"):
logger.info('Upgrade: Restarting mgr now that mons are running pacific')
need_upgrade_self = True
- if need_upgrade_self:
- try:
- self.mgr.mgr_service.fail_over()
- except OrchestratorError as e:
- self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
- 'severity': 'warning',
- 'summary': f'Upgrade: {e}',
- 'count': 1,
- 'detail': [
- 'The upgrade process needs to upgrade the mgr, '
- 'but it needs at least one standby to proceed.',
- ],
- })
- return
-
- return # unreachable code, as fail_over never returns
- elif daemon_type == 'mgr':
- if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
- del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
- self.mgr.set_health_checks(self.mgr.health_checks)
+ self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
# make sure 'ceph versions' agrees
ret, out_ver, err = self.mgr.check_mon_command({
'Upgrade: %d %s daemon(s) are %s != target %s' %
(count, daemon_type, short_version, target_version))
- # push down configs
- daemon_type_section = name_to_config_section(daemon_type)
- if image_settings.get(daemon_type_section) != target_image:
- logger.info('Upgrade: Setting container_image for all %s' %
- daemon_type)
- self.mgr.set_container_image(daemon_type_section, target_image)
- to_clean = []
- for section in image_settings.keys():
- if section.startswith(name_to_config_section(daemon_type) + '.'):
- to_clean.append(section)
- if to_clean:
- logger.debug('Upgrade: Cleaning up container_image for %s' %
- to_clean)
- for section in to_clean:
- ret, image, err = self.mgr.check_mon_command({
- 'prefix': 'config rm',
- 'name': 'container_image',
- 'who': section,
- })
-
- logger.debug('Upgrade: All %s daemons are up to date.' % daemon_type)
+ self._set_container_images(daemon_type, target_image, image_settings)
# complete osd upgrade?
if daemon_type == 'osd':
- osdmap = self.mgr.get("osd_map")
- osd_min_name = osdmap.get("require_osd_release", "argonaut")
- osd_min = ceph_release_to_major(osd_min_name)
- if osd_min < int(target_major):
- logger.info(
- f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
- ret, _, err = self.mgr.check_mon_command({
- 'prefix': 'osd require-osd-release',
- 'release': target_major_name,
- })
+ self._complete_osd_upgrade(target_major, target_major_name)
# complete mds upgrade?
- if daemon_type == 'mds' and self.upgrade_state.fs_original_max_mds:
- for i in self.mgr.get("fs_map")['filesystems']:
- fs_id = i["id"]
- fs_name = i['mdsmap']['fs_name']
- new_max = self.upgrade_state.fs_original_max_mds.get(fs_id)
- if new_max:
- self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
- fs_name, new_max
- ))
- ret, _, err = self.mgr.check_mon_command({
- 'prefix': 'fs set',
- 'fs_name': fs_name,
- 'var': 'max_mds',
- 'val': str(new_max),
- })
-
- self.upgrade_state.fs_original_max_mds = {}
- self._save_upgrade_state()
+ if daemon_type == 'mds':
+ self._complete_mds_upgrade()
+
+ # Make sure all metadata is up to date before saying we are done upgrading this daemon type
+ if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
+ self.mgr.agent_helpers._request_ack_all_not_up_to_date()
+ return
+
+ logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type)
# clean up
logger.info('Upgrade: Finalizing container_image settings')
'who': name_to_config_section(daemon_type),
})
- logger.info('Upgrade: Complete!')
- if self.upgrade_state.progress_id:
- self.mgr.remote('progress', 'complete',
- self.upgrade_state.progress_id)
- self.upgrade_state = None
- self._save_upgrade_state()
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'name': 'mon_mds_skip_sanity',
+ 'who': 'mon',
+ })
+
+ self._mark_upgrade_complete()
return