]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/upgrade.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
index cdf432b62c50cb1dbe79caaef06536be5ecb5cad..0e72e0a7109233af43bfb2aa8feab32d56374bb5 100644 (file)
@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Optional
 
 import orchestrator
 from cephadm.utils import name_to_config_section
-from orchestrator import OrchestratorError
+from orchestrator import OrchestratorError, DaemonDescription
 
 if TYPE_CHECKING:
     from .module import CephadmOrchestrator
@@ -18,24 +18,56 @@ CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
 
 logger = logging.getLogger(__name__)
 
+
+class UpgradeState:
+    def __init__(self,
+                 target_name: str,
+                 progress_id: str,
+                 target_id: Optional[str] = None,
+                 target_version: Optional[str] = None,
+                 error: Optional[str] = None,
+                 paused: Optional[bool] = None,
+                 ):
+        self.target_name: str = target_name
+        self.progress_id: str = progress_id
+        self.target_id: Optional[str] = target_id
+        self.target_version: Optional[str] = target_version
+        self.error: Optional[str] = error
+        self.paused: bool = paused or False
+
+    def to_json(self) -> dict:
+        return {
+            'target_name': self.target_name,
+            'progress_id': self.progress_id,
+            'target_id': self.target_id,
+            'target_version': self.target_version,
+            'error': self.error,
+            'paused': self.paused,
+        }
+
+    @classmethod
+    def from_json(cls, data) -> 'UpgradeState':
+        return cls(**data)
+
+
 class CephadmUpgrade:
     def __init__(self, mgr: "CephadmOrchestrator"):
         self.mgr = mgr
 
         t = self.mgr.get_store('upgrade_state')
         if t:
-            self.upgrade_state = json.loads(t)
+            self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
         else:
             self.upgrade_state = None
 
     def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
         r = orchestrator.UpgradeStatusSpec()
         if self.upgrade_state:
-            r.target_image = self.upgrade_state.get('target_name')
+            r.target_image = self.upgrade_state.target_name
             r.in_progress = True
-            if self.upgrade_state.get('error'):
-                r.message = 'Error: ' + self.upgrade_state.get('error')
-            elif self.upgrade_state.get('paused'):
+            if self.upgrade_state.error:
+                r.message = 'Error: ' + self.upgrade_state.error
+            elif self.upgrade_state.paused:
                 r.message = 'Upgrade paused'
         return r
 
@@ -58,19 +90,19 @@ class CephadmUpgrade:
         else:
             raise OrchestratorError('must specify either image or version')
         if self.upgrade_state:
-            if self.upgrade_state.get('target_name') != target_name:
+            if self.upgrade_state.target_name != target_name:
                 raise OrchestratorError(
                     'Upgrade to %s (not %s) already in progress' %
-                (self.upgrade_state.get('target_name'), target_name))
-            if self.upgrade_state.get('paused'):
-                del self.upgrade_state['paused']
+                    (self.upgrade_state.target_name, target_name))
+            if self.upgrade_state.paused:
+                self.upgrade_state.paused = False
                 self._save_upgrade_state()
-                return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
-            return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
-        self.upgrade_state = {
-            'target_name': target_name,
-            'progress_id': str(uuid.uuid4()),
-        }
+                return 'Resumed upgrade to %s' % self.upgrade_state.target_name
+            return 'Upgrade to %s in progress' % self.upgrade_state.target_name
+        self.upgrade_state = UpgradeState(
+            target_name=target_name,
+            progress_id=str(uuid.uuid4())
+        )
         self._update_upgrade_progress(0.0)
         self._save_upgrade_state()
         self._clear_upgrade_health_checks()
@@ -80,29 +112,29 @@ class CephadmUpgrade:
     def upgrade_pause(self) -> str:
         if not self.upgrade_state:
             raise OrchestratorError('No upgrade in progress')
-        if self.upgrade_state.get('paused'):
-            return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
-        self.upgrade_state['paused'] = True
+        if self.upgrade_state.paused:
+            return 'Upgrade to %s already paused' % self.upgrade_state.target_name
+        self.upgrade_state.paused = True
         self._save_upgrade_state()
-        return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
+        return 'Paused upgrade to %s' % self.upgrade_state.target_name
 
     def upgrade_resume(self) -> str:
         if not self.upgrade_state:
             raise OrchestratorError('No upgrade in progress')
-        if not self.upgrade_state.get('paused'):
-            return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
-        del self.upgrade_state['paused']
+        if not self.upgrade_state.paused:
+            return 'Upgrade to %s not paused' % self.upgrade_state.target_name
+        self.upgrade_state.paused = False
         self._save_upgrade_state()
         self.mgr.event.set()
-        return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
+        return 'Resumed upgrade to %s' % self.upgrade_state.target_name
 
     def upgrade_stop(self) -> str:
         if not self.upgrade_state:
             return 'No upgrade in progress'
-        target_name = self.upgrade_state.get('target_name')
-        if 'progress_id' in self.upgrade_state:
+        target_name = self.upgrade_state.target_name
+        if self.upgrade_state.progress_id:
             self.mgr.remote('progress', 'complete',
-                           self.upgrade_state['progress_id'])
+                            self.upgrade_state.progress_id)
         self.upgrade_state = None
         self._save_upgrade_state()
         self._clear_upgrade_health_checks()
@@ -114,34 +146,27 @@ class CephadmUpgrade:
         Returns false, if nothing was done.
         :return:
         """
-        if self.upgrade_state and not self.upgrade_state.get('paused'):
+        if self.upgrade_state and not self.upgrade_state.paused:
             self._do_upgrade()
             return True
         return False
 
-    def _wait_for_ok_to_stop(self, s) -> bool:
+    def _wait_for_ok_to_stop(self, s: DaemonDescription) -> bool:
         # only wait a little bit; the service might go away for something
         tries = 4
         while tries > 0:
-            if s.daemon_type not in ['mon', 'osd', 'mds']:
-                logger.info('Upgrade: It is presumed safe to stop %s.%s' %
-                              (s.daemon_type, s.daemon_id))
-                return True
-            ret, out, err = self.mgr.mon_command({
-                'prefix': '%s ok-to-stop' % s.daemon_type,
-                'ids': [s.daemon_id],
-            })
-            if not self.upgrade_state or self.upgrade_state.get('paused'):
+            if not self.upgrade_state or self.upgrade_state.paused:
                 return False
-            if ret:
-                logger.info('Upgrade: It is NOT safe to stop %s.%s' %
-                              (s.daemon_type, s.daemon_id))
-                time.sleep(15)
-                tries -= 1
-            else:
-                logger.info('Upgrade: It is safe to stop %s.%s' %
-                              (s.daemon_type, s.daemon_id))
+
+            r = self.mgr.cephadm_services[s.daemon_type].ok_to_stop([s.daemon_id])
+
+            if not r.retval:
+                logger.info(f'Upgrade: {r.stdout}')
                 return True
+            logger.error('Upgrade: {r.stderr}')
+
+            time.sleep(15)
+            tries -= 1
         return False
 
     def _clear_upgrade_health_checks(self) -> None:
@@ -153,23 +178,32 @@ class CephadmUpgrade:
 
     def _fail_upgrade(self, alert_id, alert) -> None:
         logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
-                                                          alert['summary']))
-        self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
-        self.upgrade_state['paused'] = True
+                                                        alert['summary']))
+        if not self.upgrade_state:
+            assert False, 'No upgrade in progress'
+
+        self.upgrade_state.error = alert_id + ': ' + alert['summary']
+        self.upgrade_state.paused = True
         self._save_upgrade_state()
         self.mgr.health_checks[alert_id] = alert
         self.mgr.set_health_checks(self.mgr.health_checks)
 
     def _update_upgrade_progress(self, progress) -> None:
-        if 'progress_id' not in self.upgrade_state:
-            self.upgrade_state['progress_id'] = str(uuid.uuid4())
+        if not self.upgrade_state:
+            assert False, 'No upgrade in progress'
+
+        if not self.upgrade_state.progress_id:
+            self.upgrade_state.progress_id = str(uuid.uuid4())
             self._save_upgrade_state()
-        self.mgr.remote('progress', 'update', self.upgrade_state['progress_id'],
-                        ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
+        self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
+                        ev_msg='Upgrade to %s' % self.upgrade_state.target_name,
                         ev_progress=progress)
 
     def _save_upgrade_state(self) -> None:
-        self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state))
+        if not self.upgrade_state:
+            self.mgr.set_store('upgrade_state', None)
+            return
+        self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
 
     def _do_upgrade(self):
         # type: () -> None
@@ -177,8 +211,8 @@ class CephadmUpgrade:
             logger.debug('_do_upgrade no state, exiting')
             return
 
-        target_name = self.upgrade_state.get('target_name')
-        target_id = self.upgrade_state.get('target_id', None)
+        target_name = self.upgrade_state.target_name
+        target_id = self.upgrade_state.target_id
         if not target_id:
             # need to learn the container hash
             logger.info('Upgrade: First pull of %s' % target_name)
@@ -192,12 +226,12 @@ class CephadmUpgrade:
                     'detail': [str(e)],
                 })
                 return
-            self.upgrade_state['target_id'] = target_id
-            self.upgrade_state['target_version'] = target_version
+            self.upgrade_state.target_id = target_id
+            self.upgrade_state.target_version = target_version
             self._save_upgrade_state()
-        target_version = self.upgrade_state.get('target_version')
+        target_version = self.upgrade_state.target_version
         logger.info('Upgrade: Target is %s with id %s' % (target_name,
-                                                            target_id))
+                                                          target_id))
 
         # get all distinct container_image settings
         image_settings = {}
@@ -230,19 +264,19 @@ class CephadmUpgrade:
                 if daemon_type == 'mgr' and \
                    d.daemon_id == self.mgr.get_mgr_id():
                     logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
-                                  self.mgr.get_mgr_id())
+                                self.mgr.get_mgr_id())
                     need_upgrade_self = True
                     continue
 
                 # make sure host has latest container image
                 out, err, code = self.mgr._run_cephadm(
-                    d.hostname, None, 'inspect-image', [],
+                    d.hostname, '', 'inspect-image', [],
                     image=target_name, no_fsid=True, error_ok=True)
                 if code or json.loads(''.join(out)).get('image_id') != target_id:
                     logger.info('Upgrade: Pulling %s on %s' % (target_name,
-                                                                 d.hostname))
+                                                               d.hostname))
                     out, err, code = self.mgr._run_cephadm(
-                        d.hostname, None, 'pull', [],
+                        d.hostname, '', 'pull', [],
                         image=target_name, no_fsid=True, error_ok=True)
                     if code:
                         self._fail_upgrade('UPGRADE_FAILED_PULL', {
@@ -256,8 +290,9 @@ class CephadmUpgrade:
                         return
                     r = json.loads(''.join(out))
                     if r.get('image_id') != target_id:
-                        logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
-                        self.upgrade_state['target_id'] = r['image_id']
+                        logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
+                            target_name, d.hostname, r['image_id'], target_id))
+                        self.upgrade_state.target_id = r['image_id']
                         self._save_upgrade_state()
                         return
 
@@ -265,23 +300,19 @@ class CephadmUpgrade:
 
                 if not d.container_image_id:
                     if d.container_image_name == target_name:
-                        logger.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
+                        logger.debug(
+                            'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
                         continue
                 if not self._wait_for_ok_to_stop(d):
                     return
                 logger.info('Upgrade: Redeploying %s.%s' %
-                              (d.daemon_type, d.daemon_id))
-                ret, out, err = self.mgr.check_mon_command({
-                    'prefix': 'config set',
-                    'name': 'container_image',
-                    'value': target_name,
-                    'who': name_to_config_section(daemon_type + '.' + d.daemon_id),
-                })
+                            (d.daemon_type, d.daemon_id))
                 self.mgr._daemon_action(
                     d.daemon_type,
                     d.daemon_id,
                     d.hostname,
-                    'redeploy'
+                    'redeploy',
+                    image=target_name
                 )
                 return
 
@@ -301,7 +332,7 @@ class CephadmUpgrade:
                     return
 
                 logger.info('Upgrade: there are %d other already-upgraded '
-                              'standby mgrs, failing over' % num)
+                            'standby mgrs, failing over' % num)
 
                 self._update_upgrade_progress(done / len(daemons))
 
@@ -317,10 +348,10 @@ class CephadmUpgrade:
                     self.mgr.set_health_checks(self.mgr.health_checks)
 
             # make sure 'ceph versions' agrees
-            ret, out, err = self.mgr.check_mon_command({
+            ret, out_ver, err = self.mgr.check_mon_command({
                 'prefix': 'versions',
             })
-            j = json.loads(out)
+            j = json.loads(out_ver)
             for version, count in j.get(daemon_type, {}).items():
                 if version != target_version:
                     logger.warning(
@@ -330,7 +361,7 @@ class CephadmUpgrade:
             # push down configs
             if image_settings.get(daemon_type) != target_name:
                 logger.info('Upgrade: Setting container_image for all %s...' %
-                              daemon_type)
+                            daemon_type)
                 ret, out, err = self.mgr.check_mon_command({
                     'prefix': 'config set',
                     'name': 'container_image',
@@ -343,7 +374,7 @@ class CephadmUpgrade:
                     to_clean.append(section)
             if to_clean:
                 logger.debug('Upgrade: Cleaning up container_image for %s...' %
-                               to_clean)
+                             to_clean)
                 for section in to_clean:
                     ret, image, err = self.mgr.check_mon_command({
                         'prefix': 'config rm',
@@ -352,7 +383,7 @@ class CephadmUpgrade:
                     })
 
             logger.info('Upgrade: All %s daemons are up to date.' %
-                          daemon_type)
+                        daemon_type)
 
         # clean up
         logger.info('Upgrade: Finalizing container_image settings')
@@ -370,9 +401,9 @@ class CephadmUpgrade:
             })
 
         logger.info('Upgrade: Complete!')
-        if 'progress_id' in self.upgrade_state:
+        if self.upgrade_state.progress_id:
             self.mgr.remote('progress', 'complete',
-                        self.upgrade_state['progress_id'])
+                            self.upgrade_state.progress_id)
         self.upgrade_state = None
         self._save_upgrade_state()
         return