5 from typing
import TYPE_CHECKING
, Optional
, Dict
, List
, Tuple
, Any
8 from cephadm
.registry
import Registry
9 from cephadm
.serve
import CephadmServe
10 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
11 from cephadm
.utils
import ceph_release_to_major
, name_to_config_section
, CEPH_UPGRADE_ORDER
, MONITORING_STACK_TYPES
12 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
, daemon_type_to_service
15 from .module
import CephadmOrchestrator
18 logger
= logging
.getLogger(__name__
)
21 CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
= (1 << 5)
24 def normalize_image_digest(digest
: str, default_registry
: str) -> str:
26 # ceph/ceph -> docker.io/ceph/ceph
27 # edge cases that shouldn't ever come up:
28 # ubuntu -> docker.io/ubuntu (ubuntu alias for library/ubuntu)
30 # quay.ceph.io/ceph/ceph -> ceph
31 # docker.io/ubuntu -> no change
32 bits
= digest
.split('/')
33 if '.' not in bits
[0] or len(bits
) < 3:
34 digest
= 'docker.io/' + digest
42 target_id
: Optional
[str] = None,
43 target_digests
: Optional
[List
[str]] = None,
44 target_version
: Optional
[str] = None,
45 error
: Optional
[str] = None,
46 paused
: Optional
[bool] = None,
47 fs_original_max_mds
: Optional
[Dict
[str, int]] = None,
48 fs_original_allow_standby_replay
: Optional
[Dict
[str, bool]] = None
50 self
._target
_name
: str = target_name
# Use CephadmUpgrade.target_image instead.
51 self
.progress_id
: str = progress_id
52 self
.target_id
: Optional
[str] = target_id
53 self
.target_digests
: Optional
[List
[str]] = target_digests
54 self
.target_version
: Optional
[str] = target_version
55 self
.error
: Optional
[str] = error
56 self
.paused
: bool = paused
or False
57 self
.fs_original_max_mds
: Optional
[Dict
[str, int]] = fs_original_max_mds
58 self
.fs_original_allow_standby_replay
: Optional
[Dict
[str, bool]] = fs_original_allow_standby_replay
60 def to_json(self
) -> dict:
62 'target_name': self
._target
_name
,
63 'progress_id': self
.progress_id
,
64 'target_id': self
.target_id
,
65 'target_digests': self
.target_digests
,
66 'target_version': self
.target_version
,
67 'fs_original_max_mds': self
.fs_original_max_mds
,
68 'fs_original_allow_standby_replay': self
.fs_original_allow_standby_replay
,
70 'paused': self
.paused
,
74 def from_json(cls
, data
: dict) -> Optional
['UpgradeState']:
76 c
= {k
: v
for k
, v
in data
.items()}
77 if 'repo_digest' in c
:
78 c
['target_digests'] = [c
.pop('repo_digest')]
86 'UPGRADE_NO_STANDBY_MGR',
87 'UPGRADE_FAILED_PULL',
88 'UPGRADE_REDEPLOY_DAEMON',
89 'UPGRADE_BAD_TARGET_VERSION',
93 def __init__(self
, mgr
: "CephadmOrchestrator"):
96 t
= self
.mgr
.get_store('upgrade_state')
98 self
.upgrade_state
: Optional
[UpgradeState
] = UpgradeState
.from_json(json
.loads(t
))
100 self
.upgrade_state
= None
103 def target_image(self
) -> str:
104 assert self
.upgrade_state
105 if not self
.mgr
.use_repo_digest
:
106 return self
.upgrade_state
._target
_name
107 if not self
.upgrade_state
.target_digests
:
108 return self
.upgrade_state
._target
_name
110 # FIXME: we assume the first digest is the best one to use
111 return self
.upgrade_state
.target_digests
[0]
113 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
114 r
= orchestrator
.UpgradeStatusSpec()
115 if self
.upgrade_state
:
116 r
.target_image
= self
.target_image
118 r
.progress
, r
.services_complete
= self
._get
_upgrade
_info
()
119 # accessing self.upgrade_info_str will throw an exception if it
120 # has not been set in _do_upgrade yet
122 r
.message
= self
.upgrade_info_str
123 except AttributeError:
125 if self
.upgrade_state
.error
:
126 r
.message
= 'Error: ' + self
.upgrade_state
.error
127 elif self
.upgrade_state
.paused
:
128 r
.message
= 'Upgrade paused'
131 def _get_upgrade_info(self
) -> Tuple
[str, List
[str]]:
132 if not self
.upgrade_state
or not self
.upgrade_state
.target_digests
:
135 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons() if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
137 if any(not d
.container_image_digests
for d
in daemons
if d
.daemon_type
== 'mgr'):
140 completed_daemons
= [(d
.daemon_type
, any(d
in self
.upgrade_state
.target_digests
for d
in (
141 d
.container_image_digests
or []))) for d
in daemons
if d
.daemon_type
]
143 done
= len([True for completion
in completed_daemons
if completion
[1]])
145 completed_types
= list(set([completion
[0] for completion
in completed_daemons
if all(
146 c
[1] for c
in completed_daemons
if c
[0] == completion
[0])]))
148 return '%s/%s daemons upgraded' % (done
, len(daemons
)), completed_types
150 def _check_target_version(self
, version
: str) -> Optional
[str]:
152 (major
, minor
, _
) = version
.split('.', 2)
153 assert int(minor
) >= 0
154 # patch might be a number or {number}-g{sha1}
156 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
157 if int(major
) < 15 or (int(major
) == 15 and int(minor
) < 2):
158 return 'cephadm only supports octopus (15.2.0) or later'
161 current_version
= self
.mgr
.version
.split('ceph version ')[1]
162 (current_major
, current_minor
, _
) = current_version
.split('-')[0].split('.', 2)
163 if int(current_major
) < int(major
) - 2:
164 return f
'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
165 if int(current_major
) > int(major
):
166 return f
'ceph cannot downgrade major versions (from {current_version} to {version})'
167 if int(current_major
) == int(major
):
168 if int(current_minor
) > int(minor
):
169 return f
'ceph cannot downgrade to a {"rc" if minor == "1" else "dev"} release'
172 monmap
= self
.mgr
.get("mon_map")
173 mon_min
= monmap
.get("min_mon_release", 0)
174 if mon_min
< int(major
) - 2:
175 return f
'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
178 osdmap
= self
.mgr
.get("osd_map")
179 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
180 osd_min
= ceph_release_to_major(osd_min_name
)
181 if osd_min
< int(major
) - 2:
182 return f
'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
186 def upgrade_ls(self
, image
: Optional
[str], tags
: bool) -> Dict
:
188 image
= self
.mgr
.container_image_base
189 reg_name
, bare_image
= image
.split('/', 1)
190 reg
= Registry(reg_name
)
192 r
: Dict
[Any
, Any
] = {
194 "registry": reg_name
,
195 "bare_image": bare_image
,
197 ls
= reg
.get_tags(bare_image
)
207 versions
.append('.'.join(v
))
208 r
["versions"] = sorted(
210 key
=lambda k
: list(map(int, k
.split('.'))),
214 r
["tags"] = sorted(ls
)
217 def upgrade_start(self
, image
: str, version
: str) -> str:
218 if self
.mgr
.mode
!= 'root':
219 raise OrchestratorError('upgrade is not supported in %s mode' % (
222 version_error
= self
._check
_target
_version
(version
)
224 raise OrchestratorError(version_error
)
225 target_name
= self
.mgr
.container_image_base
+ ':v' + version
227 target_name
= normalize_image_digest(image
, self
.mgr
.default_registry
)
229 raise OrchestratorError('must specify either image or version')
230 if self
.upgrade_state
:
231 if self
.upgrade_state
._target
_name
!= target_name
:
232 raise OrchestratorError(
233 'Upgrade to %s (not %s) already in progress' %
234 (self
.upgrade_state
._target
_name
, target_name
))
235 if self
.upgrade_state
.paused
:
236 self
.upgrade_state
.paused
= False
237 self
._save
_upgrade
_state
()
238 return 'Resumed upgrade to %s' % self
.target_image
239 return 'Upgrade to %s in progress' % self
.target_image
241 running_mgr_count
= len([daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type(
242 'mgr') if daemon
.status
== DaemonDescriptionStatus
.running
])
244 if running_mgr_count
< 2:
245 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
247 self
.mgr
.log
.info('Upgrade: Started with target %s' % target_name
)
248 self
.upgrade_state
= UpgradeState(
249 target_name
=target_name
,
250 progress_id
=str(uuid
.uuid4())
252 self
._update
_upgrade
_progress
(0.0)
253 self
._save
_upgrade
_state
()
254 self
._clear
_upgrade
_health
_checks
()
256 return 'Initiating upgrade to %s' % (target_name
)
258 def upgrade_pause(self
) -> str:
259 if not self
.upgrade_state
:
260 raise OrchestratorError('No upgrade in progress')
261 if self
.upgrade_state
.paused
:
262 return 'Upgrade to %s already paused' % self
.target_image
263 self
.upgrade_state
.paused
= True
264 self
.mgr
.log
.info('Upgrade: Paused upgrade to %s' % self
.target_image
)
265 self
._save
_upgrade
_state
()
266 return 'Paused upgrade to %s' % self
.target_image
268 def upgrade_resume(self
) -> str:
269 if not self
.upgrade_state
:
270 raise OrchestratorError('No upgrade in progress')
271 if not self
.upgrade_state
.paused
:
272 return 'Upgrade to %s not paused' % self
.target_image
273 self
.upgrade_state
.paused
= False
274 self
.mgr
.log
.info('Upgrade: Resumed upgrade to %s' % self
.target_image
)
275 self
._save
_upgrade
_state
()
277 return 'Resumed upgrade to %s' % self
.target_image
279 def upgrade_stop(self
) -> str:
280 if not self
.upgrade_state
:
281 return 'No upgrade in progress'
282 if self
.upgrade_state
.progress_id
:
283 self
.mgr
.remote('progress', 'complete',
284 self
.upgrade_state
.progress_id
)
285 target_image
= self
.target_image
286 self
.mgr
.log
.info('Upgrade: Stopped')
287 self
.upgrade_state
= None
288 self
._save
_upgrade
_state
()
289 self
._clear
_upgrade
_health
_checks
()
291 return 'Stopped upgrade to %s' % target_image
293 def continue_upgrade(self
) -> bool:
295 Returns false, if nothing was done.
298 if self
.upgrade_state
and not self
.upgrade_state
.paused
:
301 except Exception as e
:
302 self
._fail
_upgrade
('UPGRADE_EXCEPTION', {
304 'summary': 'Upgrade: failed due to an unexpected exception',
306 'detail': [f
'Unexpected exception occurred during upgrade process: {str(e)}'],
312 def _wait_for_ok_to_stop(
313 self
, s
: DaemonDescription
,
314 known
: Optional
[List
[str]] = None, # NOTE: output argument!
316 # only wait a little bit; the service might go away for something
317 assert s
.daemon_type
is not None
318 assert s
.daemon_id
is not None
321 if not self
.upgrade_state
or self
.upgrade_state
.paused
:
324 # setting force flag to retain old functionality.
325 # note that known is an output argument for ok_to_stop()
326 r
= self
.mgr
.cephadm_services
[daemon_type_to_service(s
.daemon_type
)].ok_to_stop([
327 s
.daemon_id
], known
=known
, force
=True)
330 logger
.info(f
'Upgrade: {r.stdout}')
332 logger
.info(f
'Upgrade: {r.stderr}')
338 def _clear_upgrade_health_checks(self
) -> None:
339 for k
in self
.UPGRADE_ERRORS
:
340 if k
in self
.mgr
.health_checks
:
341 del self
.mgr
.health_checks
[k
]
342 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
344 def _fail_upgrade(self
, alert_id
: str, alert
: dict) -> None:
345 assert alert_id
in self
.UPGRADE_ERRORS
346 if not self
.upgrade_state
:
347 # this could happen if the user canceled the upgrade while we
348 # were doing something
351 logger
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
353 self
.upgrade_state
.error
= alert_id
+ ': ' + alert
['summary']
354 self
.upgrade_state
.paused
= True
355 self
._save
_upgrade
_state
()
356 self
.mgr
.health_checks
[alert_id
] = alert
357 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
359 def _update_upgrade_progress(self
, progress
: float) -> None:
360 if not self
.upgrade_state
:
361 assert False, 'No upgrade in progress'
363 if not self
.upgrade_state
.progress_id
:
364 self
.upgrade_state
.progress_id
= str(uuid
.uuid4())
365 self
._save
_upgrade
_state
()
366 self
.mgr
.remote('progress', 'update', self
.upgrade_state
.progress_id
,
367 ev_msg
='Upgrade to %s' % (
368 self
.upgrade_state
.target_version
or self
.target_image
370 ev_progress
=progress
,
373 def _save_upgrade_state(self
) -> None:
374 if not self
.upgrade_state
:
375 self
.mgr
.set_store('upgrade_state', None)
377 self
.mgr
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
.to_json()))
379 def get_distinct_container_image_settings(self
) -> Dict
[str, str]:
380 # get all distinct container_image settings
382 ret
, out
, err
= self
.mgr
.check_mon_command({
383 'prefix': 'config dump',
386 config
= json
.loads(out
)
388 if opt
['name'] == 'container_image':
389 image_settings
[opt
['section']] = opt
['value']
390 return image_settings
392 def _prepare_for_mds_upgrade(
395 need_upgrade
: List
[DaemonDescription
]
397 # scale down all filesystems to 1 MDS
398 assert self
.upgrade_state
399 if not self
.upgrade_state
.fs_original_max_mds
:
400 self
.upgrade_state
.fs_original_max_mds
= {}
401 if not self
.upgrade_state
.fs_original_allow_standby_replay
:
402 self
.upgrade_state
.fs_original_allow_standby_replay
= {}
403 fsmap
= self
.mgr
.get("fs_map")
404 continue_upgrade
= True
405 for fs
in fsmap
.get('filesystems', []):
407 mdsmap
= fs
["mdsmap"]
408 fs_name
= mdsmap
["fs_name"]
410 # disable allow_standby_replay?
411 if mdsmap
['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
:
412 self
.mgr
.log
.info('Upgrade: Disabling standby-replay for filesystem %s' % (
415 if fscid
not in self
.upgrade_state
.fs_original_allow_standby_replay
:
416 self
.upgrade_state
.fs_original_allow_standby_replay
[fscid
] = True
417 self
._save
_upgrade
_state
()
418 ret
, out
, err
= self
.mgr
.check_mon_command({
421 'var': 'allow_standby_replay',
424 continue_upgrade
= False
427 # scale down this filesystem?
428 if mdsmap
["max_mds"] > 1:
429 self
.mgr
.log
.info('Upgrade: Scaling down filesystem %s' % (
432 if fscid
not in self
.upgrade_state
.fs_original_max_mds
:
433 self
.upgrade_state
.fs_original_max_mds
[fscid
] = mdsmap
['max_mds']
434 self
._save
_upgrade
_state
()
435 ret
, out
, err
= self
.mgr
.check_mon_command({
441 continue_upgrade
= False
444 if not (mdsmap
['in'] == [0] and len(mdsmap
['up']) <= 1):
445 self
.mgr
.log
.info('Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name
))
447 continue_upgrade
= False
450 if len(mdsmap
['up']) == 0:
451 self
.mgr
.log
.warning("Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
452 # This can happen because the current version MDS have
453 # incompatible compatsets; the mons will not do any promotions.
454 # We must upgrade to continue.
455 elif len(mdsmap
['up']) > 0:
456 mdss
= list(mdsmap
['info'].values())
457 assert len(mdss
) == 1
459 if lone_mds
['state'] != 'up:active':
460 self
.mgr
.log
.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
465 continue_upgrade
= False
470 return continue_upgrade
472 def _enough_mons_for_ok_to_stop(self
) -> bool:
474 ret
, out
, err
= self
.mgr
.check_mon_command({
475 'prefix': 'quorum_status',
480 raise OrchestratorError('failed to parse quorum status')
482 mons
= [m
['name'] for m
in j
['monmap']['mons']]
485 def _enough_mds_for_ok_to_stop(self
, mds_daemon
: DaemonDescription
) -> bool:
486 # type (DaemonDescription) -> bool
488 # find fs this mds daemon belongs to
489 fsmap
= self
.mgr
.get("fs_map")
490 for fs
in fsmap
.get('filesystems', []):
491 mdsmap
= fs
["mdsmap"]
492 fs_name
= mdsmap
["fs_name"]
494 assert mds_daemon
.daemon_id
495 if fs_name
!= mds_daemon
.service_name().split('.', 1)[1]:
496 # wrong fs for this mds daemon
499 # get number of mds daemons for this fs
501 [daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_service(mds_daemon
.service_name())])
503 # standby mds daemons for this fs?
504 if mdsmap
["max_mds"] < mds_count
:
508 return True # if mds has no fs it should pass ok-to-stop
510 def _do_upgrade(self
):
512 if not self
.upgrade_state
:
513 logger
.debug('_do_upgrade no state, exiting')
516 target_image
= self
.target_image
517 target_id
= self
.upgrade_state
.target_id
518 target_digests
= self
.upgrade_state
.target_digests
519 target_version
= self
.upgrade_state
.target_version
522 if not target_id
or not target_version
or not target_digests
:
523 # need to learn the container hash
524 logger
.info('Upgrade: First pull of %s' % target_image
)
525 self
.upgrade_info_str
= 'Doing first pull of %s image' % (target_image
)
527 target_id
, target_version
, target_digests
= CephadmServe(self
.mgr
)._get
_container
_image
_info
(
529 except OrchestratorError
as e
:
530 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
531 'severity': 'warning',
532 'summary': 'Upgrade: failed to pull target image',
537 if not target_version
:
538 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
539 'severity': 'warning',
540 'summary': 'Upgrade: failed to pull target image',
542 'detail': ['unable to extract ceph version from container'],
545 self
.upgrade_state
.target_id
= target_id
546 # extract the version portion of 'ceph version {version} ({sha1})'
547 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
548 self
.upgrade_state
.target_digests
= target_digests
549 self
._save
_upgrade
_state
()
550 target_image
= self
.target_image
553 if target_digests
is None:
555 if target_version
.startswith('ceph version '):
556 # tolerate/fix upgrade state from older version
557 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
558 target_version
= self
.upgrade_state
.target_version
559 (target_major
, _
) = target_version
.split('.', 1)
560 target_major_name
= self
.mgr
.lookup_release_name(int(target_major
))
563 logger
.info('Upgrade: Target is version %s (%s)' % (
564 target_version
, target_major_name
))
565 logger
.info('Upgrade: Target container is %s, digests %s' % (
566 target_image
, target_digests
))
568 version_error
= self
._check
_target
_version
(target_version
)
570 self
._fail
_upgrade
('UPGRADE_BAD_TARGET_VERSION', {
572 'summary': f
'Upgrade: cannot upgrade/downgrade to {target_version}',
574 'detail': [version_error
],
578 image_settings
= self
.get_distinct_container_image_settings()
580 # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
581 # MDSMap::compat for all fs. This is no longer the case beginning in
582 # v16.2.5. We must disable the sanity checks during upgrade.
583 # N.B.: we don't bother confirming the operator has not already
584 # disabled this or saving the config value.
585 self
.mgr
.check_mon_command({
586 'prefix': 'config set',
587 'name': 'mon_mds_skip_sanity',
592 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons() if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
594 for daemon_type
in CEPH_UPGRADE_ORDER
:
595 logger
.debug('Upgrade: Checking %s daemons' % daemon_type
)
597 need_upgrade_self
= False
598 need_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
599 need_upgrade_deployer
: List
[Tuple
[DaemonDescription
, bool]] = []
601 if d
.daemon_type
!= daemon_type
:
603 assert d
.daemon_type
is not None
604 assert d
.daemon_id
is not None
605 correct_digest
= False
606 if (any(d
in target_digests
for d
in (d
.container_image_digests
or []))
607 or d
.daemon_type
in MONITORING_STACK_TYPES
):
608 logger
.debug('daemon %s.%s container digest correct' % (
609 daemon_type
, d
.daemon_id
))
610 correct_digest
= True
611 if any(d
in target_digests
for d
in (d
.deployed_by
or [])):
612 logger
.debug('daemon %s.%s deployed by correct version' % (
613 d
.daemon_type
, d
.daemon_id
))
617 if self
.mgr
.daemon_is_self(d
.daemon_type
, d
.daemon_id
):
618 logger
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
619 self
.mgr
.get_mgr_id())
620 need_upgrade_self
= True
624 logger
.debug('daemon %s.%s not deployed by correct version' % (
625 d
.daemon_type
, d
.daemon_id
))
626 need_upgrade_deployer
.append((d
, True))
628 logger
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
629 daemon_type
, d
.daemon_id
,
630 d
.container_image_name
, d
.container_image_digests
, d
.version
))
631 need_upgrade
.append((d
, False))
633 if not need_upgrade_self
:
634 # only after the mgr itself is upgraded can we expect daemons to have
635 # deployed_by == target_digests
636 need_upgrade
+= need_upgrade_deployer
638 # prepare filesystems for daemon upgrades?
642 and not self
._prepare
_for
_mds
_upgrade
(target_major
, [d_entry
[0] for d_entry
in need_upgrade
])
647 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (daemon_type
)
649 to_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
650 known_ok_to_stop
: List
[str] = []
651 for d_entry
in need_upgrade
:
653 assert d
.daemon_type
is not None
654 assert d
.daemon_id
is not None
655 assert d
.hostname
is not None
657 if not d
.container_image_id
:
658 if d
.container_image_name
== target_image
:
660 'daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
664 if d
.name() in known_ok_to_stop
:
665 logger
.info(f
'Upgrade: {d.name()} is also safe to restart')
666 to_upgrade
.append(d_entry
)
669 if d
.daemon_type
== 'osd':
670 # NOTE: known_ok_to_stop is an output argument for
671 # _wait_for_ok_to_stop
672 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
675 if d
.daemon_type
== 'mon' and self
._enough
_mons
_for
_ok
_to
_stop
():
676 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
679 if d
.daemon_type
== 'mds' and self
._enough
_mds
_for
_ok
_to
_stop
(d
):
680 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
683 to_upgrade
.append(d_entry
)
685 # if we don't have a list of others to consider, stop now
686 if not known_ok_to_stop
:
690 for d_entry
in to_upgrade
:
692 assert d
.daemon_type
is not None
693 assert d
.daemon_id
is not None
694 assert d
.hostname
is not None
696 self
._update
_upgrade
_progress
(done
/ len(daemons
))
698 # make sure host has latest container image
699 out
, errs
, code
= CephadmServe(self
.mgr
)._run
_cephadm
(
700 d
.hostname
, '', 'inspect-image', [],
701 image
=target_image
, no_fsid
=True, error_ok
=True)
702 if code
or not any(d
in target_digests
for d
in json
.loads(''.join(out
)).get('repo_digests', [])):
703 logger
.info('Upgrade: Pulling %s on %s' % (target_image
,
705 self
.upgrade_info_str
= 'Pulling %s image on host %s' % (
706 target_image
, d
.hostname
)
707 out
, errs
, code
= CephadmServe(self
.mgr
)._run
_cephadm
(
708 d
.hostname
, '', 'pull', [],
709 image
=target_image
, no_fsid
=True, error_ok
=True)
711 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
712 'severity': 'warning',
713 'summary': 'Upgrade: failed to pull target image',
716 'failed to pull %s on host %s' % (target_image
,
720 r
= json
.loads(''.join(out
))
721 if not any(d
in target_digests
for d
in r
.get('repo_digests', [])):
722 logger
.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
723 target_image
, d
.hostname
, r
['repo_digests'], target_digests
))
724 self
.upgrade_info_str
= 'Image %s pull on %s got new digests %s (not %s), restarting' % (
725 target_image
, d
.hostname
, r
['repo_digests'], target_digests
)
726 self
.upgrade_state
.target_digests
= r
['repo_digests']
727 self
._save
_upgrade
_state
()
730 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (daemon_type
)
732 if len(to_upgrade
) > 1:
733 logger
.info('Upgrade: Updating %s.%s (%d/%d)' %
734 (d
.daemon_type
, d
.daemon_id
, num
, len(to_upgrade
)))
736 logger
.info('Upgrade: Updating %s.%s' %
737 (d
.daemon_type
, d
.daemon_id
))
738 action
= 'Upgrading' if not d_entry
[1] else 'Redeploying'
740 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(d
)
741 self
.mgr
._daemon
_action
(
744 image
=target_image
if not d_entry
[1] else None
746 except Exception as e
:
747 self
._fail
_upgrade
('UPGRADE_REDEPLOY_DAEMON', {
748 'severity': 'warning',
749 'summary': f
'{action} daemon {d.name()} on host {d.hostname} failed.',
752 f
'Upgrade daemon: {d.name()}: {e}'
760 # complete mon upgrade?
761 if daemon_type
== 'mon':
762 if not self
.mgr
.get("have_local_config_map"):
763 logger
.info('Upgrade: Restarting mgr now that mons are running pacific')
764 need_upgrade_self
= True
766 if need_upgrade_self
:
768 self
.mgr
.mgr_service
.fail_over()
769 except OrchestratorError
as e
:
770 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
771 'severity': 'warning',
772 'summary': f
'Upgrade: {e}',
775 'The upgrade process needs to upgrade the mgr, '
776 'but it needs at least one standby to proceed.',
781 return # unreachable code, as fail_over never returns
782 elif daemon_type
== 'mgr':
783 if 'UPGRADE_NO_STANDBY_MGR' in self
.mgr
.health_checks
:
784 del self
.mgr
.health_checks
['UPGRADE_NO_STANDBY_MGR']
785 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
787 # make sure 'ceph versions' agrees
788 ret
, out_ver
, err
= self
.mgr
.check_mon_command({
789 'prefix': 'versions',
791 j
= json
.loads(out_ver
)
792 for version
, count
in j
.get(daemon_type
, {}).items():
793 short_version
= version
.split(' ')[2]
794 if short_version
!= target_version
:
796 'Upgrade: %d %s daemon(s) are %s != target %s' %
797 (count
, daemon_type
, short_version
, target_version
))
800 daemon_type_section
= name_to_config_section(daemon_type
)
801 if image_settings
.get(daemon_type_section
) != target_image
:
802 logger
.info('Upgrade: Setting container_image for all %s' %
804 self
.mgr
.set_container_image(daemon_type_section
, target_image
)
806 for section
in image_settings
.keys():
807 if section
.startswith(name_to_config_section(daemon_type
) + '.'):
808 to_clean
.append(section
)
810 logger
.debug('Upgrade: Cleaning up container_image for %s' %
812 for section
in to_clean
:
813 ret
, image
, err
= self
.mgr
.check_mon_command({
814 'prefix': 'config rm',
815 'name': 'container_image',
819 logger
.debug('Upgrade: All %s daemons are up to date.' % daemon_type
)
821 # complete osd upgrade?
822 if daemon_type
== 'osd':
823 osdmap
= self
.mgr
.get("osd_map")
824 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
825 osd_min
= ceph_release_to_major(osd_min_name
)
826 if osd_min
< int(target_major
):
828 f
'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
829 ret
, _
, err
= self
.mgr
.check_mon_command({
830 'prefix': 'osd require-osd-release',
831 'release': target_major_name
,
834 # complete mds upgrade?
835 if daemon_type
== 'mds':
836 if self
.upgrade_state
.fs_original_max_mds
:
837 for fs
in self
.mgr
.get("fs_map")['filesystems']:
839 fs_name
= fs
['mdsmap']['fs_name']
840 new_max
= self
.upgrade_state
.fs_original_max_mds
.get(fscid
, 1)
842 self
.mgr
.log
.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
845 ret
, _
, err
= self
.mgr
.check_mon_command({
852 self
.upgrade_state
.fs_original_max_mds
= {}
853 self
._save
_upgrade
_state
()
854 if self
.upgrade_state
.fs_original_allow_standby_replay
:
855 for fs
in self
.mgr
.get("fs_map")['filesystems']:
857 fs_name
= fs
['mdsmap']['fs_name']
858 asr
= self
.upgrade_state
.fs_original_allow_standby_replay
.get(fscid
, False)
860 self
.mgr
.log
.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
863 ret
, _
, err
= self
.mgr
.check_mon_command({
866 'var': 'allow_standby_replay',
870 self
.upgrade_state
.fs_original_allow_standby_replay
= {}
871 self
._save
_upgrade
_state
()
874 logger
.info('Upgrade: Finalizing container_image settings')
875 self
.mgr
.set_container_image('global', target_image
)
877 for daemon_type
in CEPH_UPGRADE_ORDER
:
878 ret
, image
, err
= self
.mgr
.check_mon_command({
879 'prefix': 'config rm',
880 'name': 'container_image',
881 'who': name_to_config_section(daemon_type
),
884 self
.mgr
.check_mon_command({
885 'prefix': 'config rm',
886 'name': 'mon_mds_skip_sanity',
890 logger
.info('Upgrade: Complete!')
891 if self
.upgrade_state
.progress_id
:
892 self
.mgr
.remote('progress', 'complete',
893 self
.upgrade_state
.progress_id
)
894 self
.upgrade_state
= None
895 self
._save
_upgrade
_state
()