import orchestrator
from cephadm.utils import name_to_config_section
-from orchestrator import OrchestratorError
+from orchestrator import OrchestratorError, DaemonDescription
if TYPE_CHECKING:
from .module import CephadmOrchestrator
logger = logging.getLogger(__name__)
+
+class UpgradeState:
+ def __init__(self,
+ target_name: str,
+ progress_id: str,
+ target_id: Optional[str] = None,
+ target_version: Optional[str] = None,
+ error: Optional[str] = None,
+ paused: Optional[bool] = None,
+ ):
+ self.target_name: str = target_name
+ self.progress_id: str = progress_id
+ self.target_id: Optional[str] = target_id
+ self.target_version: Optional[str] = target_version
+ self.error: Optional[str] = error
+ self.paused: bool = paused or False
+
+ def to_json(self) -> dict:
+ return {
+ 'target_name': self.target_name,
+ 'progress_id': self.progress_id,
+ 'target_id': self.target_id,
+ 'target_version': self.target_version,
+ 'error': self.error,
+ 'paused': self.paused,
+ }
+
+ @classmethod
+ def from_json(cls, data) -> 'UpgradeState':
+ return cls(**data)
+
+
class CephadmUpgrade:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr = mgr
t = self.mgr.get_store('upgrade_state')
if t:
- self.upgrade_state = json.loads(t)
+ self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
else:
self.upgrade_state = None
def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
r = orchestrator.UpgradeStatusSpec()
if self.upgrade_state:
- r.target_image = self.upgrade_state.get('target_name')
+ r.target_image = self.upgrade_state.target_name
r.in_progress = True
- if self.upgrade_state.get('error'):
- r.message = 'Error: ' + self.upgrade_state.get('error')
- elif self.upgrade_state.get('paused'):
+ if self.upgrade_state.error:
+ r.message = 'Error: ' + self.upgrade_state.error
+ elif self.upgrade_state.paused:
r.message = 'Upgrade paused'
return r
else:
raise OrchestratorError('must specify either image or version')
if self.upgrade_state:
- if self.upgrade_state.get('target_name') != target_name:
+ if self.upgrade_state.target_name != target_name:
raise OrchestratorError(
'Upgrade to %s (not %s) already in progress' %
- (self.upgrade_state.get('target_name'), target_name))
- if self.upgrade_state.get('paused'):
- del self.upgrade_state['paused']
+ (self.upgrade_state.target_name, target_name))
+ if self.upgrade_state.paused:
+ self.upgrade_state.paused = False
self._save_upgrade_state()
- return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
- return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
- self.upgrade_state = {
- 'target_name': target_name,
- 'progress_id': str(uuid.uuid4()),
- }
+ return 'Resumed upgrade to %s' % self.upgrade_state.target_name
+ return 'Upgrade to %s in progress' % self.upgrade_state.target_name
+ self.upgrade_state = UpgradeState(
+ target_name=target_name,
+ progress_id=str(uuid.uuid4())
+ )
self._update_upgrade_progress(0.0)
self._save_upgrade_state()
self._clear_upgrade_health_checks()
def upgrade_pause(self) -> str:
if not self.upgrade_state:
raise OrchestratorError('No upgrade in progress')
- if self.upgrade_state.get('paused'):
- return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
- self.upgrade_state['paused'] = True
+ if self.upgrade_state.paused:
+ return 'Upgrade to %s already paused' % self.upgrade_state.target_name
+ self.upgrade_state.paused = True
self._save_upgrade_state()
- return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
+ return 'Paused upgrade to %s' % self.upgrade_state.target_name
def upgrade_resume(self) -> str:
if not self.upgrade_state:
raise OrchestratorError('No upgrade in progress')
- if not self.upgrade_state.get('paused'):
- return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
- del self.upgrade_state['paused']
+ if not self.upgrade_state.paused:
+ return 'Upgrade to %s not paused' % self.upgrade_state.target_name
+ self.upgrade_state.paused = False
self._save_upgrade_state()
self.mgr.event.set()
- return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
+ return 'Resumed upgrade to %s' % self.upgrade_state.target_name
def upgrade_stop(self) -> str:
if not self.upgrade_state:
return 'No upgrade in progress'
- target_name = self.upgrade_state.get('target_name')
- if 'progress_id' in self.upgrade_state:
+ target_name = self.upgrade_state.target_name
+ if self.upgrade_state.progress_id:
self.mgr.remote('progress', 'complete',
- self.upgrade_state['progress_id'])
+ self.upgrade_state.progress_id)
self.upgrade_state = None
self._save_upgrade_state()
self._clear_upgrade_health_checks()
Returns false, if nothing was done.
:return:
"""
- if self.upgrade_state and not self.upgrade_state.get('paused'):
+ if self.upgrade_state and not self.upgrade_state.paused:
self._do_upgrade()
return True
return False
- def _wait_for_ok_to_stop(self, s) -> bool:
+ def _wait_for_ok_to_stop(self, s: DaemonDescription) -> bool:
# only wait a little bit; the service might go away for something
tries = 4
while tries > 0:
- if s.daemon_type not in ['mon', 'osd', 'mds']:
- logger.info('Upgrade: It is presumed safe to stop %s.%s' %
- (s.daemon_type, s.daemon_id))
- return True
- ret, out, err = self.mgr.mon_command({
- 'prefix': '%s ok-to-stop' % s.daemon_type,
- 'ids': [s.daemon_id],
- })
- if not self.upgrade_state or self.upgrade_state.get('paused'):
+ if not self.upgrade_state or self.upgrade_state.paused:
return False
- if ret:
- logger.info('Upgrade: It is NOT safe to stop %s.%s' %
- (s.daemon_type, s.daemon_id))
- time.sleep(15)
- tries -= 1
- else:
- logger.info('Upgrade: It is safe to stop %s.%s' %
- (s.daemon_type, s.daemon_id))
+
+ r = self.mgr.cephadm_services[s.daemon_type].ok_to_stop([s.daemon_id])
+
+ if not r.retval:
+ logger.info(f'Upgrade: {r.stdout}')
return True
+ logger.error('Upgrade: {r.stderr}')
+
+ time.sleep(15)
+ tries -= 1
return False
def _clear_upgrade_health_checks(self) -> None:
def _fail_upgrade(self, alert_id, alert) -> None:
logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
- alert['summary']))
- self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
- self.upgrade_state['paused'] = True
+ alert['summary']))
+ if not self.upgrade_state:
+ assert False, 'No upgrade in progress'
+
+ self.upgrade_state.error = alert_id + ': ' + alert['summary']
+ self.upgrade_state.paused = True
self._save_upgrade_state()
self.mgr.health_checks[alert_id] = alert
self.mgr.set_health_checks(self.mgr.health_checks)
def _update_upgrade_progress(self, progress) -> None:
- if 'progress_id' not in self.upgrade_state:
- self.upgrade_state['progress_id'] = str(uuid.uuid4())
+ if not self.upgrade_state:
+ assert False, 'No upgrade in progress'
+
+ if not self.upgrade_state.progress_id:
+ self.upgrade_state.progress_id = str(uuid.uuid4())
self._save_upgrade_state()
- self.mgr.remote('progress', 'update', self.upgrade_state['progress_id'],
- ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
+ self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
+ ev_msg='Upgrade to %s' % self.upgrade_state.target_name,
ev_progress=progress)
def _save_upgrade_state(self) -> None:
- self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state))
+ if not self.upgrade_state:
+ self.mgr.set_store('upgrade_state', None)
+ return
+ self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
def _do_upgrade(self):
# type: () -> None
logger.debug('_do_upgrade no state, exiting')
return
- target_name = self.upgrade_state.get('target_name')
- target_id = self.upgrade_state.get('target_id', None)
+ target_name = self.upgrade_state.target_name
+ target_id = self.upgrade_state.target_id
if not target_id:
# need to learn the container hash
logger.info('Upgrade: First pull of %s' % target_name)
'detail': [str(e)],
})
return
- self.upgrade_state['target_id'] = target_id
- self.upgrade_state['target_version'] = target_version
+ self.upgrade_state.target_id = target_id
+ self.upgrade_state.target_version = target_version
self._save_upgrade_state()
- target_version = self.upgrade_state.get('target_version')
+ target_version = self.upgrade_state.target_version
logger.info('Upgrade: Target is %s with id %s' % (target_name,
- target_id))
+ target_id))
# get all distinct container_image settings
image_settings = {}
if daemon_type == 'mgr' and \
d.daemon_id == self.mgr.get_mgr_id():
logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
- self.mgr.get_mgr_id())
+ self.mgr.get_mgr_id())
need_upgrade_self = True
continue
# make sure host has latest container image
out, err, code = self.mgr._run_cephadm(
- d.hostname, None, 'inspect-image', [],
+ d.hostname, '', 'inspect-image', [],
image=target_name, no_fsid=True, error_ok=True)
if code or json.loads(''.join(out)).get('image_id') != target_id:
logger.info('Upgrade: Pulling %s on %s' % (target_name,
- d.hostname))
+ d.hostname))
out, err, code = self.mgr._run_cephadm(
- d.hostname, None, 'pull', [],
+ d.hostname, '', 'pull', [],
image=target_name, no_fsid=True, error_ok=True)
if code:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
return
r = json.loads(''.join(out))
if r.get('image_id') != target_id:
- logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
- self.upgrade_state['target_id'] = r['image_id']
+ logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
+ target_name, d.hostname, r['image_id'], target_id))
+ self.upgrade_state.target_id = r['image_id']
self._save_upgrade_state()
return
if not d.container_image_id:
if d.container_image_name == target_name:
- logger.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
+ logger.debug(
+ 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
continue
if not self._wait_for_ok_to_stop(d):
return
logger.info('Upgrade: Redeploying %s.%s' %
- (d.daemon_type, d.daemon_id))
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'config set',
- 'name': 'container_image',
- 'value': target_name,
- 'who': name_to_config_section(daemon_type + '.' + d.daemon_id),
- })
+ (d.daemon_type, d.daemon_id))
self.mgr._daemon_action(
d.daemon_type,
d.daemon_id,
d.hostname,
- 'redeploy'
+ 'redeploy',
+ image=target_name
)
return
return
logger.info('Upgrade: there are %d other already-upgraded '
- 'standby mgrs, failing over' % num)
+ 'standby mgrs, failing over' % num)
self._update_upgrade_progress(done / len(daemons))
self.mgr.set_health_checks(self.mgr.health_checks)
# make sure 'ceph versions' agrees
- ret, out, err = self.mgr.check_mon_command({
+ ret, out_ver, err = self.mgr.check_mon_command({
'prefix': 'versions',
})
- j = json.loads(out)
+ j = json.loads(out_ver)
for version, count in j.get(daemon_type, {}).items():
if version != target_version:
logger.warning(
# push down configs
if image_settings.get(daemon_type) != target_name:
logger.info('Upgrade: Setting container_image for all %s...' %
- daemon_type)
+ daemon_type)
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config set',
'name': 'container_image',
to_clean.append(section)
if to_clean:
logger.debug('Upgrade: Cleaning up container_image for %s...' %
- to_clean)
+ to_clean)
for section in to_clean:
ret, image, err = self.mgr.check_mon_command({
'prefix': 'config rm',
})
logger.info('Upgrade: All %s daemons are up to date.' %
- daemon_type)
+ daemon_type)
# clean up
logger.info('Upgrade: Finalizing container_image settings')
})
logger.info('Upgrade: Complete!')
- if 'progress_id' in self.upgrade_state:
+ if self.upgrade_state.progress_id:
self.mgr.remote('progress', 'complete',
- self.upgrade_state['progress_id'])
+ self.upgrade_state.progress_id)
self.upgrade_state = None
self._save_upgrade_state()
return