]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
5 from typing
import TYPE_CHECKING
, Optional
, Dict
, List
, Tuple
8 from cephadm
.serve
import CephadmServe
9 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
10 from cephadm
.utils
import ceph_release_to_major
, name_to_config_section
, CEPH_UPGRADE_ORDER
, MONITORING_STACK_TYPES
11 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
, daemon_type_to_service
14 from .module
import CephadmOrchestrator
17 logger
= logging
.getLogger(__name__
)
20 def normalize_image_digest(digest
: str, default_registry
: str) -> str:
22 # ceph/ceph -> docker.io/ceph/ceph
23 # edge cases that shouldn't ever come up:
24 # ubuntu -> docker.io/ubuntu (ubuntu alias for library/ubuntu)
26 # quay.ceph.io/ceph/ceph -> ceph
27 # docker.io/ubuntu -> no change
28 bits
= digest
.split('/')
29 if '.' not in bits
[0] or len(bits
) < 3:
30 digest
= 'docker.io/' + digest
38 target_id
: Optional
[str] = None,
39 target_digests
: Optional
[List
[str]] = None,
40 target_version
: Optional
[str] = None,
41 error
: Optional
[str] = None,
42 paused
: Optional
[bool] = None,
43 fs_original_max_mds
: Optional
[Dict
[str, int]] = None,
45 self
._target
_name
: str = target_name
# Use CephadmUpgrade.target_image instead.
46 self
.progress_id
: str = progress_id
47 self
.target_id
: Optional
[str] = target_id
48 self
.target_digests
: Optional
[List
[str]] = target_digests
49 self
.target_version
: Optional
[str] = target_version
50 self
.error
: Optional
[str] = error
51 self
.paused
: bool = paused
or False
52 self
.fs_original_max_mds
: Optional
[Dict
[str, int]] = fs_original_max_mds
54 def to_json(self
) -> dict:
56 'target_name': self
._target
_name
,
57 'progress_id': self
.progress_id
,
58 'target_id': self
.target_id
,
59 'target_digests': self
.target_digests
,
60 'target_version': self
.target_version
,
61 'fs_original_max_mds': self
.fs_original_max_mds
,
63 'paused': self
.paused
,
67 def from_json(cls
, data
: dict) -> Optional
['UpgradeState']:
69 c
= {k
: v
for k
, v
in data
.items()}
70 if 'repo_digest' in c
:
71 c
['target_digests'] = [c
.pop('repo_digest')]
79 'UPGRADE_NO_STANDBY_MGR',
80 'UPGRADE_FAILED_PULL',
81 'UPGRADE_REDEPLOY_DAEMON',
82 'UPGRADE_BAD_TARGET_VERSION',
86 def __init__(self
, mgr
: "CephadmOrchestrator"):
89 t
= self
.mgr
.get_store('upgrade_state')
91 self
.upgrade_state
: Optional
[UpgradeState
] = UpgradeState
.from_json(json
.loads(t
))
93 self
.upgrade_state
= None
96 def target_image(self
) -> str:
97 assert self
.upgrade_state
98 if not self
.mgr
.use_repo_digest
:
99 return self
.upgrade_state
._target
_name
100 if not self
.upgrade_state
.target_digests
:
101 return self
.upgrade_state
._target
_name
103 # FIXME: we assume the first digest is the best one to use
104 return self
.upgrade_state
.target_digests
[0]
106 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
107 r
= orchestrator
.UpgradeStatusSpec()
108 if self
.upgrade_state
:
109 r
.target_image
= self
.target_image
111 r
.progress
, r
.services_complete
= self
._get
_upgrade
_info
()
112 # accessing self.upgrade_info_str will throw an exception if it
113 # has not been set in _do_upgrade yet
115 r
.message
= self
.upgrade_info_str
116 except AttributeError:
118 if self
.upgrade_state
.error
:
119 r
.message
= 'Error: ' + self
.upgrade_state
.error
120 elif self
.upgrade_state
.paused
:
121 r
.message
= 'Upgrade paused'
124 def _get_upgrade_info(self
) -> Tuple
[str, List
[str]]:
125 if not self
.upgrade_state
or not self
.upgrade_state
.target_digests
:
128 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons() if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
130 if any(not d
.container_image_digests
for d
in daemons
if d
.daemon_type
== 'mgr'):
133 completed_daemons
= [(d
.daemon_type
, any(d
in self
.upgrade_state
.target_digests
for d
in (
134 d
.container_image_digests
or []))) for d
in daemons
if d
.daemon_type
]
136 done
= len([True for completion
in completed_daemons
if completion
[1]])
138 completed_types
= list(set([completion
[0] for completion
in completed_daemons
if all(
139 c
[1] for c
in completed_daemons
if c
[0] == completion
[0])]))
141 return '%s/%s daemons upgraded' % (done
, len(daemons
)), completed_types
143 def _check_target_version(self
, version
: str) -> Optional
[str]:
145 (major
, minor
, _
) = version
.split('.', 2)
146 assert int(minor
) >= 0
147 # patch might be a number or {number}-g{sha1}
149 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
150 if int(major
) < 15 or (int(major
) == 15 and int(minor
) < 2):
151 return 'cephadm only supports octopus (15.2.0) or later'
154 current_version
= self
.mgr
.version
.split('ceph version ')[1]
155 (current_major
, current_minor
, _
) = current_version
.split('-')[0].split('.', 2)
156 if int(current_major
) < int(major
) - 2:
157 return f
'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
158 if int(current_major
) > int(major
):
159 return f
'ceph cannot downgrade major versions (from {current_version} to {version})'
160 if int(current_major
) == int(major
):
161 if int(current_minor
) > int(minor
):
162 return f
'ceph cannot downgrade to a {"rc" if minor == "1" else "dev"} release'
165 monmap
= self
.mgr
.get("mon_map")
166 mon_min
= monmap
.get("min_mon_release", 0)
167 if mon_min
< int(major
) - 2:
168 return f
'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
171 osdmap
= self
.mgr
.get("osd_map")
172 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
173 osd_min
= ceph_release_to_major(osd_min_name
)
174 if osd_min
< int(major
) - 2:
175 return f
'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
179 def upgrade_start(self
, image
: str, version
: str) -> str:
180 if self
.mgr
.mode
!= 'root':
181 raise OrchestratorError('upgrade is not supported in %s mode' % (
184 version_error
= self
._check
_target
_version
(version
)
186 raise OrchestratorError(version_error
)
187 target_name
= self
.mgr
.container_image_base
+ ':v' + version
189 target_name
= normalize_image_digest(image
, self
.mgr
.default_registry
)
191 raise OrchestratorError('must specify either image or version')
192 if self
.upgrade_state
:
193 if self
.upgrade_state
._target
_name
!= target_name
:
194 raise OrchestratorError(
195 'Upgrade to %s (not %s) already in progress' %
196 (self
.upgrade_state
._target
_name
, target_name
))
197 if self
.upgrade_state
.paused
:
198 self
.upgrade_state
.paused
= False
199 self
._save
_upgrade
_state
()
200 return 'Resumed upgrade to %s' % self
.target_image
201 return 'Upgrade to %s in progress' % self
.target_image
203 running_mgr_count
= len([daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type(
204 'mgr') if daemon
.status
== DaemonDescriptionStatus
.running
])
206 if running_mgr_count
< 2:
207 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
209 self
.mgr
.log
.info('Upgrade: Started with target %s' % target_name
)
210 self
.upgrade_state
= UpgradeState(
211 target_name
=target_name
,
212 progress_id
=str(uuid
.uuid4())
214 self
._update
_upgrade
_progress
(0.0)
215 self
._save
_upgrade
_state
()
216 self
._clear
_upgrade
_health
_checks
()
218 return 'Initiating upgrade to %s' % (target_name
)
220 def upgrade_pause(self
) -> str:
221 if not self
.upgrade_state
:
222 raise OrchestratorError('No upgrade in progress')
223 if self
.upgrade_state
.paused
:
224 return 'Upgrade to %s already paused' % self
.target_image
225 self
.upgrade_state
.paused
= True
226 self
.mgr
.log
.info('Upgrade: Paused upgrade to %s' % self
.target_image
)
227 self
._save
_upgrade
_state
()
228 return 'Paused upgrade to %s' % self
.target_image
230 def upgrade_resume(self
) -> str:
231 if not self
.upgrade_state
:
232 raise OrchestratorError('No upgrade in progress')
233 if not self
.upgrade_state
.paused
:
234 return 'Upgrade to %s not paused' % self
.target_image
235 self
.upgrade_state
.paused
= False
236 self
.mgr
.log
.info('Upgrade: Resumed upgrade to %s' % self
.target_image
)
237 self
._save
_upgrade
_state
()
239 return 'Resumed upgrade to %s' % self
.target_image
241 def upgrade_stop(self
) -> str:
242 if not self
.upgrade_state
:
243 return 'No upgrade in progress'
244 if self
.upgrade_state
.progress_id
:
245 self
.mgr
.remote('progress', 'complete',
246 self
.upgrade_state
.progress_id
)
247 target_image
= self
.target_image
248 self
.mgr
.log
.info('Upgrade: Stopped')
249 self
.upgrade_state
= None
250 self
._save
_upgrade
_state
()
251 self
._clear
_upgrade
_health
_checks
()
253 return 'Stopped upgrade to %s' % target_image
255 def continue_upgrade(self
) -> bool:
257 Returns false, if nothing was done.
260 if self
.upgrade_state
and not self
.upgrade_state
.paused
:
263 except Exception as e
:
264 self
._fail
_upgrade
('UPGRADE_EXCEPTION', {
266 'summary': 'Upgrade: failed due to an unexpected exception',
268 'detail': [f
'Unexpected exception occurred during upgrade process: {str(e)}'],
274 def _wait_for_ok_to_stop(
275 self
, s
: DaemonDescription
,
276 known
: Optional
[List
[str]] = None, # NOTE: output argument!
278 # only wait a little bit; the service might go away for something
279 assert s
.daemon_type
is not None
280 assert s
.daemon_id
is not None
283 if not self
.upgrade_state
or self
.upgrade_state
.paused
:
286 # setting force flag to retain old functionality.
287 # note that known is an output argument for ok_to_stop()
288 r
= self
.mgr
.cephadm_services
[daemon_type_to_service(s
.daemon_type
)].ok_to_stop([
289 s
.daemon_id
], known
=known
, force
=True)
292 logger
.info(f
'Upgrade: {r.stdout}')
294 logger
.info(f
'Upgrade: {r.stderr}')
300 def _clear_upgrade_health_checks(self
) -> None:
301 for k
in self
.UPGRADE_ERRORS
:
302 if k
in self
.mgr
.health_checks
:
303 del self
.mgr
.health_checks
[k
]
304 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
306 def _fail_upgrade(self
, alert_id
: str, alert
: dict) -> None:
307 assert alert_id
in self
.UPGRADE_ERRORS
308 if not self
.upgrade_state
:
309 # this could happen if the user canceled the upgrade while we
310 # were doing something
313 logger
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
315 self
.upgrade_state
.error
= alert_id
+ ': ' + alert
['summary']
316 self
.upgrade_state
.paused
= True
317 self
._save
_upgrade
_state
()
318 self
.mgr
.health_checks
[alert_id
] = alert
319 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
321 def _update_upgrade_progress(self
, progress
: float) -> None:
322 if not self
.upgrade_state
:
323 assert False, 'No upgrade in progress'
325 if not self
.upgrade_state
.progress_id
:
326 self
.upgrade_state
.progress_id
= str(uuid
.uuid4())
327 self
._save
_upgrade
_state
()
328 self
.mgr
.remote('progress', 'update', self
.upgrade_state
.progress_id
,
329 ev_msg
='Upgrade to %s' % (
330 self
.upgrade_state
.target_version
or self
.target_image
332 ev_progress
=progress
,
335 def _save_upgrade_state(self
) -> None:
336 if not self
.upgrade_state
:
337 self
.mgr
.set_store('upgrade_state', None)
339 self
.mgr
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
.to_json()))
341 def get_distinct_container_image_settings(self
) -> Dict
[str, str]:
342 # get all distinct container_image settings
344 ret
, out
, err
= self
.mgr
.check_mon_command({
345 'prefix': 'config dump',
348 config
= json
.loads(out
)
350 if opt
['name'] == 'container_image':
351 image_settings
[opt
['section']] = opt
['value']
352 return image_settings
354 def _prepare_for_mds_upgrade(
357 need_upgrade
: List
[DaemonDescription
]
359 # are any daemons running a different major version?
361 for name
, info
in self
.mgr
.get("mds_metadata").items():
362 version
= info
.get("ceph_version_short")
365 major_version
= version
.split('.')[0]
366 if not major_version
:
367 self
.mgr
.log
.info('Upgrade: mds.%s version is not known, will retry' % name
)
370 if int(major_version
) < int(target_major
):
374 self
.mgr
.log
.debug('Upgrade: All MDS daemons run same major version')
377 # scale down all filesystems to 1 MDS
378 assert self
.upgrade_state
379 if not self
.upgrade_state
.fs_original_max_mds
:
380 self
.upgrade_state
.fs_original_max_mds
= {}
381 fsmap
= self
.mgr
.get("fs_map")
382 continue_upgrade
= True
383 for i
in fsmap
.get('filesystems', []):
386 fs_name
= fs
["fs_name"]
388 # scale down this filesystem?
389 if fs
["max_mds"] > 1:
390 self
.mgr
.log
.info('Upgrade: Scaling down filesystem %s' % (
393 if fs_id
not in self
.upgrade_state
.fs_original_max_mds
:
394 self
.upgrade_state
.fs_original_max_mds
[fs_id
] = fs
['max_mds']
395 self
._save
_upgrade
_state
()
396 ret
, out
, err
= self
.mgr
.check_mon_command({
402 continue_upgrade
= False
405 if len(fs
['info']) > 1:
406 self
.mgr
.log
.info('Upgrade: Waiting for fs %s to scale down to 1 MDS' % (fs_name
))
408 continue_upgrade
= False
411 lone_mds
= list(fs
['info'].values())[0]
412 if lone_mds
['state'] != 'up:active':
413 self
.mgr
.log
.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
418 continue_upgrade
= False
421 return continue_upgrade
423 def _enough_mons_for_ok_to_stop(self
) -> bool:
425 ret
, out
, err
= self
.mgr
.check_mon_command({
426 'prefix': 'quorum_status',
431 raise OrchestratorError('failed to parse quorum status')
433 mons
= [m
['name'] for m
in j
['monmap']['mons']]
436 def _enough_mds_for_ok_to_stop(self
, mds_daemon
: DaemonDescription
) -> bool:
437 # type (DaemonDescription) -> bool
439 # find fs this mds daemon belongs to
440 fsmap
= self
.mgr
.get("fs_map")
441 for i
in fsmap
.get('filesystems', []):
443 fs_name
= fs
["fs_name"]
445 assert mds_daemon
.daemon_id
446 if fs_name
!= mds_daemon
.service_name().split('.', 1)[1]:
447 # wrong fs for this mds daemon
450 # get number of mds daemons for this fs
452 [daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_service(mds_daemon
.service_name())])
454 # standby mds daemons for this fs?
455 if fs
["max_mds"] < mds_count
:
459 return True # if mds has no fs it should pass ok-to-stop
461 def _do_upgrade(self
):
463 if not self
.upgrade_state
:
464 logger
.debug('_do_upgrade no state, exiting')
467 target_image
= self
.target_image
468 target_id
= self
.upgrade_state
.target_id
469 target_digests
= self
.upgrade_state
.target_digests
470 target_version
= self
.upgrade_state
.target_version
473 if not target_id
or not target_version
or not target_digests
:
474 # need to learn the container hash
475 logger
.info('Upgrade: First pull of %s' % target_image
)
476 self
.upgrade_info_str
= 'Doing first pull of %s image' % (target_image
)
478 target_id
, target_version
, target_digests
= CephadmServe(self
.mgr
)._get
_container
_image
_info
(
480 except OrchestratorError
as e
:
481 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
482 'severity': 'warning',
483 'summary': 'Upgrade: failed to pull target image',
488 if not target_version
:
489 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
490 'severity': 'warning',
491 'summary': 'Upgrade: failed to pull target image',
493 'detail': ['unable to extract ceph version from container'],
496 self
.upgrade_state
.target_id
= target_id
497 # extract the version portion of 'ceph version {version} ({sha1})'
498 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
499 self
.upgrade_state
.target_digests
= target_digests
500 self
._save
_upgrade
_state
()
501 target_image
= self
.target_image
504 if target_digests
is None:
506 if target_version
.startswith('ceph version '):
507 # tolerate/fix upgrade state from older version
508 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
509 target_version
= self
.upgrade_state
.target_version
510 (target_major
, _
) = target_version
.split('.', 1)
511 target_major_name
= self
.mgr
.lookup_release_name(int(target_major
))
514 logger
.info('Upgrade: Target is version %s (%s)' % (
515 target_version
, target_major_name
))
516 logger
.info('Upgrade: Target container is %s, digests %s' % (
517 target_image
, target_digests
))
519 version_error
= self
._check
_target
_version
(target_version
)
521 self
._fail
_upgrade
('UPGRADE_BAD_TARGET_VERSION', {
523 'summary': f
'Upgrade: cannot upgrade/downgrade to {target_version}',
525 'detail': [version_error
],
529 image_settings
= self
.get_distinct_container_image_settings()
531 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons() if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
533 for daemon_type
in CEPH_UPGRADE_ORDER
:
534 logger
.debug('Upgrade: Checking %s daemons' % daemon_type
)
536 need_upgrade_self
= False
537 need_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
538 need_upgrade_deployer
: List
[Tuple
[DaemonDescription
, bool]] = []
540 if d
.daemon_type
!= daemon_type
:
542 assert d
.daemon_type
is not None
543 assert d
.daemon_id
is not None
544 correct_digest
= False
545 if (any(d
in target_digests
for d
in (d
.container_image_digests
or []))
546 or d
.daemon_type
in MONITORING_STACK_TYPES
):
547 logger
.debug('daemon %s.%s container digest correct' % (
548 daemon_type
, d
.daemon_id
))
549 correct_digest
= True
550 if any(d
in target_digests
for d
in (d
.deployed_by
or [])):
551 logger
.debug('daemon %s.%s deployed by correct version' % (
552 d
.daemon_type
, d
.daemon_id
))
556 if self
.mgr
.daemon_is_self(d
.daemon_type
, d
.daemon_id
):
557 logger
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
558 self
.mgr
.get_mgr_id())
559 need_upgrade_self
= True
563 logger
.debug('daemon %s.%s not deployed by correct version' % (
564 d
.daemon_type
, d
.daemon_id
))
565 need_upgrade_deployer
.append((d
, True))
567 logger
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
568 daemon_type
, d
.daemon_id
,
569 d
.container_image_name
, d
.container_image_digests
, d
.version
))
570 need_upgrade
.append((d
, False))
572 if not need_upgrade_self
:
573 # only after the mgr itself is upgraded can we expect daemons to have
574 # deployed_by == target_digests
575 need_upgrade
+= need_upgrade_deployer
577 # prepare filesystems for daemon upgrades?
581 and not self
._prepare
_for
_mds
_upgrade
(target_major
, [d_entry
[0] for d_entry
in need_upgrade
])
586 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (daemon_type
)
588 to_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
589 known_ok_to_stop
: List
[str] = []
590 for d_entry
in need_upgrade
:
592 assert d
.daemon_type
is not None
593 assert d
.daemon_id
is not None
594 assert d
.hostname
is not None
596 if not d
.container_image_id
:
597 if d
.container_image_name
== target_image
:
599 'daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
603 if d
.name() in known_ok_to_stop
:
604 logger
.info(f
'Upgrade: {d.name()} is also safe to restart')
605 to_upgrade
.append(d_entry
)
608 if d
.daemon_type
== 'osd':
609 # NOTE: known_ok_to_stop is an output argument for
610 # _wait_for_ok_to_stop
611 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
614 if d
.daemon_type
== 'mon' and self
._enough
_mons
_for
_ok
_to
_stop
():
615 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
618 if d
.daemon_type
== 'mds' and self
._enough
_mds
_for
_ok
_to
_stop
(d
):
619 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
622 to_upgrade
.append(d_entry
)
624 # if we don't have a list of others to consider, stop now
625 if not known_ok_to_stop
:
629 for d_entry
in to_upgrade
:
631 assert d
.daemon_type
is not None
632 assert d
.daemon_id
is not None
633 assert d
.hostname
is not None
635 self
._update
_upgrade
_progress
(done
/ len(daemons
))
637 # make sure host has latest container image
638 out
, errs
, code
= CephadmServe(self
.mgr
)._run
_cephadm
(
639 d
.hostname
, '', 'inspect-image', [],
640 image
=target_image
, no_fsid
=True, error_ok
=True)
641 if code
or not any(d
in target_digests
for d
in json
.loads(''.join(out
)).get('repo_digests', [])):
642 logger
.info('Upgrade: Pulling %s on %s' % (target_image
,
644 self
.upgrade_info_str
= 'Pulling %s image on host %s' % (
645 target_image
, d
.hostname
)
646 out
, errs
, code
= CephadmServe(self
.mgr
)._run
_cephadm
(
647 d
.hostname
, '', 'pull', [],
648 image
=target_image
, no_fsid
=True, error_ok
=True)
650 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
651 'severity': 'warning',
652 'summary': 'Upgrade: failed to pull target image',
655 'failed to pull %s on host %s' % (target_image
,
659 r
= json
.loads(''.join(out
))
660 if not any(d
in target_digests
for d
in r
.get('repo_digests', [])):
661 logger
.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
662 target_image
, d
.hostname
, r
['repo_digests'], target_digests
))
663 self
.upgrade_info_str
= 'Image %s pull on %s got new digests %s (not %s), restarting' % (
664 target_image
, d
.hostname
, r
['repo_digests'], target_digests
)
665 self
.upgrade_state
.target_digests
= r
['repo_digests']
666 self
._save
_upgrade
_state
()
669 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (daemon_type
)
671 if len(to_upgrade
) > 1:
672 logger
.info('Upgrade: Updating %s.%s (%d/%d)' %
673 (d
.daemon_type
, d
.daemon_id
, num
, len(to_upgrade
)))
675 logger
.info('Upgrade: Updating %s.%s' %
676 (d
.daemon_type
, d
.daemon_id
))
677 action
= 'Upgrading' if not d_entry
[1] else 'Redeploying'
679 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(d
)
680 self
.mgr
._daemon
_action
(
683 image
=target_image
if not d_entry
[1] else None
685 except Exception as e
:
686 self
._fail
_upgrade
('UPGRADE_REDEPLOY_DAEMON', {
687 'severity': 'warning',
688 'summary': f
'{action} daemon {d.name()} on host {d.hostname} failed.',
691 f
'Upgrade daemon: {d.name()}: {e}'
699 # complete mon upgrade?
700 if daemon_type
== 'mon':
701 if not self
.mgr
.get("have_local_config_map"):
702 logger
.info('Upgrade: Restarting mgr now that mons are running pacific')
703 need_upgrade_self
= True
705 if need_upgrade_self
:
707 self
.mgr
.mgr_service
.fail_over()
708 except OrchestratorError
as e
:
709 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
710 'severity': 'warning',
711 'summary': f
'Upgrade: {e}',
714 'The upgrade process needs to upgrade the mgr, '
715 'but it needs at least one standby to proceed.',
720 return # unreachable code, as fail_over never returns
721 elif daemon_type
== 'mgr':
722 if 'UPGRADE_NO_STANDBY_MGR' in self
.mgr
.health_checks
:
723 del self
.mgr
.health_checks
['UPGRADE_NO_STANDBY_MGR']
724 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
726 # make sure 'ceph versions' agrees
727 ret
, out_ver
, err
= self
.mgr
.check_mon_command({
728 'prefix': 'versions',
730 j
= json
.loads(out_ver
)
731 for version
, count
in j
.get(daemon_type
, {}).items():
732 short_version
= version
.split(' ')[2]
733 if short_version
!= target_version
:
735 'Upgrade: %d %s daemon(s) are %s != target %s' %
736 (count
, daemon_type
, short_version
, target_version
))
739 daemon_type_section
= name_to_config_section(daemon_type
)
740 if image_settings
.get(daemon_type_section
) != target_image
:
741 logger
.info('Upgrade: Setting container_image for all %s' %
743 self
.mgr
.set_container_image(daemon_type_section
, target_image
)
745 for section
in image_settings
.keys():
746 if section
.startswith(name_to_config_section(daemon_type
) + '.'):
747 to_clean
.append(section
)
749 logger
.debug('Upgrade: Cleaning up container_image for %s' %
751 for section
in to_clean
:
752 ret
, image
, err
= self
.mgr
.check_mon_command({
753 'prefix': 'config rm',
754 'name': 'container_image',
758 logger
.debug('Upgrade: All %s daemons are up to date.' % daemon_type
)
760 # complete osd upgrade?
761 if daemon_type
== 'osd':
762 osdmap
= self
.mgr
.get("osd_map")
763 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
764 osd_min
= ceph_release_to_major(osd_min_name
)
765 if osd_min
< int(target_major
):
767 f
'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
768 ret
, _
, err
= self
.mgr
.check_mon_command({
769 'prefix': 'osd require-osd-release',
770 'release': target_major_name
,
773 # complete mds upgrade?
774 if daemon_type
== 'mds' and self
.upgrade_state
.fs_original_max_mds
:
775 for i
in self
.mgr
.get("fs_map")['filesystems']:
777 fs_name
= i
['mdsmap']['fs_name']
778 new_max
= self
.upgrade_state
.fs_original_max_mds
.get(fs_id
)
780 self
.mgr
.log
.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
783 ret
, _
, err
= self
.mgr
.check_mon_command({
790 self
.upgrade_state
.fs_original_max_mds
= {}
791 self
._save
_upgrade
_state
()
794 logger
.info('Upgrade: Finalizing container_image settings')
795 self
.mgr
.set_container_image('global', target_image
)
797 for daemon_type
in CEPH_UPGRADE_ORDER
:
798 ret
, image
, err
= self
.mgr
.check_mon_command({
799 'prefix': 'config rm',
800 'name': 'container_image',
801 'who': name_to_config_section(daemon_type
),
804 logger
.info('Upgrade: Complete!')
805 if self
.upgrade_state
.progress_id
:
806 self
.mgr
.remote('progress', 'complete',
807 self
.upgrade_state
.progress_id
)
808 self
.upgrade_state
= None
809 self
._save
_upgrade
_state
()