]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
5 from typing
import TYPE_CHECKING
, Optional
, Dict
, List
, Tuple
8 from cephadm
.serve
import CephadmServe
9 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
10 from cephadm
.utils
import ceph_release_to_major
, name_to_config_section
, CEPH_UPGRADE_ORDER
, MONITORING_STACK_TYPES
11 from orchestrator
import OrchestratorError
, DaemonDescription
, daemon_type_to_service
14 from .module
import CephadmOrchestrator
17 logger
= logging
.getLogger(__name__
)
20 def normalize_image_digest(digest
: str, default_registry
: str) -> str:
22 # ceph/ceph -> docker.io/ceph/ceph
23 # edge cases that shouldn't ever come up:
24 # ubuntu -> docker.io/ubuntu (ubuntu alias for library/ubuntu)
26 # quay.ceph.io/ceph/ceph -> ceph
27 # docker.io/ubuntu -> no change
28 bits
= digest
.split('/')
29 if '.' not in bits
[0] or len(bits
) < 3:
30 digest
= 'docker.io/' + digest
38 target_id
: Optional
[str] = None,
39 target_digests
: Optional
[List
[str]] = None,
40 target_version
: Optional
[str] = None,
41 error
: Optional
[str] = None,
42 paused
: Optional
[bool] = None,
43 fs_original_max_mds
: Optional
[Dict
[str, int]] = None,
45 self
._target
_name
: str = target_name
# Use CephadmUpgrade.target_image instead.
46 self
.progress_id
: str = progress_id
47 self
.target_id
: Optional
[str] = target_id
48 self
.target_digests
: Optional
[List
[str]] = target_digests
49 self
.target_version
: Optional
[str] = target_version
50 self
.error
: Optional
[str] = error
51 self
.paused
: bool = paused
or False
52 self
.fs_original_max_mds
: Optional
[Dict
[str, int]] = fs_original_max_mds
54 def to_json(self
) -> dict:
56 'target_name': self
._target
_name
,
57 'progress_id': self
.progress_id
,
58 'target_id': self
.target_id
,
59 'target_digests': self
.target_digests
,
60 'target_version': self
.target_version
,
61 'fs_original_max_mds': self
.fs_original_max_mds
,
63 'paused': self
.paused
,
67 def from_json(cls
, data
: dict) -> Optional
['UpgradeState']:
69 c
= {k
: v
for k
, v
in data
.items()}
70 if 'repo_digest' in c
:
71 c
['target_digests'] = [c
.pop('repo_digest')]
79 'UPGRADE_NO_STANDBY_MGR',
80 'UPGRADE_FAILED_PULL',
81 'UPGRADE_REDEPLOY_DAEMON',
82 'UPGRADE_BAD_TARGET_VERSION',
86 def __init__(self
, mgr
: "CephadmOrchestrator"):
89 t
= self
.mgr
.get_store('upgrade_state')
91 self
.upgrade_state
: Optional
[UpgradeState
] = UpgradeState
.from_json(json
.loads(t
))
93 self
.upgrade_state
= None
96 def target_image(self
) -> str:
97 assert self
.upgrade_state
98 if not self
.mgr
.use_repo_digest
:
99 return self
.upgrade_state
._target
_name
100 if not self
.upgrade_state
.target_digests
:
101 return self
.upgrade_state
._target
_name
103 # FIXME: we assume the first digest is the best one to use
104 return self
.upgrade_state
.target_digests
[0]
106 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
107 r
= orchestrator
.UpgradeStatusSpec()
108 if self
.upgrade_state
:
109 r
.target_image
= self
.target_image
111 r
.progress
, r
.services_complete
= self
._get
_upgrade
_info
()
112 # accessing self.upgrade_info_str will throw an exception if it
113 # has not been set in _do_upgrade yet
115 r
.message
= self
.upgrade_info_str
116 except AttributeError:
118 if self
.upgrade_state
.error
:
119 r
.message
= 'Error: ' + self
.upgrade_state
.error
120 elif self
.upgrade_state
.paused
:
121 r
.message
= 'Upgrade paused'
124 def _get_upgrade_info(self
) -> Tuple
[str, List
[str]]:
125 if not self
.upgrade_state
or not self
.upgrade_state
.target_digests
:
128 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons() if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
130 if any(not d
.container_image_digests
for d
in daemons
if d
.daemon_type
== 'mgr'):
133 completed_daemons
= [(d
.daemon_type
, any(d
in self
.upgrade_state
.target_digests
for d
in (
134 d
.container_image_digests
or []))) for d
in daemons
if d
.daemon_type
]
136 done
= len([True for completion
in completed_daemons
if completion
[1]])
138 completed_types
= list(set([completion
[0] for completion
in completed_daemons
if all(
139 c
[1] for c
in completed_daemons
if c
[0] == completion
[0])]))
141 return '%s/%s ceph daemons upgraded' % (done
, len(daemons
)), completed_types
143 def _check_target_version(self
, version
: str) -> Optional
[str]:
145 (major
, minor
, _
) = version
.split('.', 2)
146 assert int(minor
) >= 0
147 # patch might be a number or {number}-g{sha1}
149 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
150 if int(major
) < 15 or (int(major
) == 15 and int(minor
) < 2):
151 return 'cephadm only supports octopus (15.2.0) or later'
154 current_version
= self
.mgr
.version
.split('ceph version ')[1]
155 (current_major
, current_minor
, _
) = current_version
.split('-')[0].split('.', 2)
156 if int(current_major
) < int(major
) - 2:
157 return f
'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
158 if int(current_major
) > int(major
):
159 return f
'ceph cannot downgrade major versions (from {current_version} to {version})'
160 if int(current_major
) == int(major
):
161 if int(current_minor
) > int(minor
):
162 return f
'ceph cannot downgrade to a {"rc" if minor == "1" else "dev"} release'
165 monmap
= self
.mgr
.get("mon_map")
166 mon_min
= monmap
.get("min_mon_release", 0)
167 if mon_min
< int(major
) - 2:
168 return f
'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
171 osdmap
= self
.mgr
.get("osd_map")
172 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
173 osd_min
= ceph_release_to_major(osd_min_name
)
174 if osd_min
< int(major
) - 2:
175 return f
'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
179 def upgrade_start(self
, image
: str, version
: str) -> str:
180 if self
.mgr
.mode
!= 'root':
181 raise OrchestratorError('upgrade is not supported in %s mode' % (
184 version_error
= self
._check
_target
_version
(version
)
186 raise OrchestratorError(version_error
)
187 target_name
= self
.mgr
.container_image_base
+ ':v' + version
189 target_name
= normalize_image_digest(image
, self
.mgr
.default_registry
)
191 raise OrchestratorError('must specify either image or version')
192 if self
.upgrade_state
:
193 if self
.upgrade_state
._target
_name
!= target_name
:
194 raise OrchestratorError(
195 'Upgrade to %s (not %s) already in progress' %
196 (self
.upgrade_state
._target
_name
, target_name
))
197 if self
.upgrade_state
.paused
:
198 self
.upgrade_state
.paused
= False
199 self
._save
_upgrade
_state
()
200 return 'Resumed upgrade to %s' % self
.target_image
201 return 'Upgrade to %s in progress' % self
.target_image
202 self
.mgr
.log
.info('Upgrade: Started with target %s' % target_name
)
203 self
.upgrade_state
= UpgradeState(
204 target_name
=target_name
,
205 progress_id
=str(uuid
.uuid4())
207 self
._update
_upgrade
_progress
(0.0)
208 self
._save
_upgrade
_state
()
209 self
._clear
_upgrade
_health
_checks
()
211 return 'Initiating upgrade to %s' % (target_name
)
213 def upgrade_pause(self
) -> str:
214 if not self
.upgrade_state
:
215 raise OrchestratorError('No upgrade in progress')
216 if self
.upgrade_state
.paused
:
217 return 'Upgrade to %s already paused' % self
.target_image
218 self
.upgrade_state
.paused
= True
219 self
.mgr
.log
.info('Upgrade: Paused upgrade to %s' % self
.target_image
)
220 self
._save
_upgrade
_state
()
221 return 'Paused upgrade to %s' % self
.target_image
223 def upgrade_resume(self
) -> str:
224 if not self
.upgrade_state
:
225 raise OrchestratorError('No upgrade in progress')
226 if not self
.upgrade_state
.paused
:
227 return 'Upgrade to %s not paused' % self
.target_image
228 self
.upgrade_state
.paused
= False
229 self
.mgr
.log
.info('Upgrade: Resumed upgrade to %s' % self
.target_image
)
230 self
._save
_upgrade
_state
()
232 return 'Resumed upgrade to %s' % self
.target_image
234 def upgrade_stop(self
) -> str:
235 if not self
.upgrade_state
:
236 return 'No upgrade in progress'
237 if self
.upgrade_state
.progress_id
:
238 self
.mgr
.remote('progress', 'complete',
239 self
.upgrade_state
.progress_id
)
240 target_image
= self
.target_image
241 self
.mgr
.log
.info('Upgrade: Stopped')
242 self
.upgrade_state
= None
243 self
._save
_upgrade
_state
()
244 self
._clear
_upgrade
_health
_checks
()
246 return 'Stopped upgrade to %s' % target_image
248 def continue_upgrade(self
) -> bool:
250 Returns false, if nothing was done.
253 if self
.upgrade_state
and not self
.upgrade_state
.paused
:
256 except Exception as e
:
257 self
._fail
_upgrade
('UPGRADE_EXCEPTION', {
259 'summary': 'Upgrade: failed due to an unexpected exception',
261 'detail': [f
'Unexpected exception occurred during upgrade process: {str(e)}'],
267 def _wait_for_ok_to_stop(
268 self
, s
: DaemonDescription
,
269 known
: Optional
[List
[str]] = None, # NOTE: output argument!
271 # only wait a little bit; the service might go away for something
272 assert s
.daemon_type
is not None
273 assert s
.daemon_id
is not None
276 if not self
.upgrade_state
or self
.upgrade_state
.paused
:
279 # setting force flag to retain old functionality.
280 # note that known is an output argument for ok_to_stop()
281 r
= self
.mgr
.cephadm_services
[daemon_type_to_service(s
.daemon_type
)].ok_to_stop([
282 s
.daemon_id
], known
=known
, force
=True)
285 logger
.info(f
'Upgrade: {r.stdout}')
287 logger
.info(f
'Upgrade: {r.stderr}')
293 def _clear_upgrade_health_checks(self
) -> None:
294 for k
in self
.UPGRADE_ERRORS
:
295 if k
in self
.mgr
.health_checks
:
296 del self
.mgr
.health_checks
[k
]
297 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
299 def _fail_upgrade(self
, alert_id
: str, alert
: dict) -> None:
300 assert alert_id
in self
.UPGRADE_ERRORS
301 if not self
.upgrade_state
:
302 # this could happen if the user canceled the upgrade while we
303 # were doing something
306 logger
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
308 self
.upgrade_state
.error
= alert_id
+ ': ' + alert
['summary']
309 self
.upgrade_state
.paused
= True
310 self
._save
_upgrade
_state
()
311 self
.mgr
.health_checks
[alert_id
] = alert
312 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
314 def _update_upgrade_progress(self
, progress
: float) -> None:
315 if not self
.upgrade_state
:
316 assert False, 'No upgrade in progress'
318 if not self
.upgrade_state
.progress_id
:
319 self
.upgrade_state
.progress_id
= str(uuid
.uuid4())
320 self
._save
_upgrade
_state
()
321 self
.mgr
.remote('progress', 'update', self
.upgrade_state
.progress_id
,
322 ev_msg
='Upgrade to %s' % (
323 self
.upgrade_state
.target_version
or self
.target_image
325 ev_progress
=progress
,
328 def _save_upgrade_state(self
) -> None:
329 if not self
.upgrade_state
:
330 self
.mgr
.set_store('upgrade_state', None)
332 self
.mgr
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
.to_json()))
334 def get_distinct_container_image_settings(self
) -> Dict
[str, str]:
335 # get all distinct container_image settings
337 ret
, out
, err
= self
.mgr
.check_mon_command({
338 'prefix': 'config dump',
341 config
= json
.loads(out
)
343 if opt
['name'] == 'container_image':
344 image_settings
[opt
['section']] = opt
['value']
345 return image_settings
347 def _prepare_for_mds_upgrade(
350 need_upgrade
: List
[DaemonDescription
]
352 # are any daemons running a different major version?
354 for name
, info
in self
.mgr
.get("mds_metadata").items():
355 version
= info
.get("ceph_version_short")
358 major_version
= version
.split('.')[0]
359 if not major_version
:
360 self
.mgr
.log
.info('Upgrade: mds.%s version is not known, will retry' % name
)
363 if int(major_version
) < int(target_major
):
367 self
.mgr
.log
.debug('Upgrade: All MDS daemons run same major version')
370 # scale down all filesystems to 1 MDS
371 assert self
.upgrade_state
372 if not self
.upgrade_state
.fs_original_max_mds
:
373 self
.upgrade_state
.fs_original_max_mds
= {}
374 fsmap
= self
.mgr
.get("fs_map")
375 continue_upgrade
= True
376 for i
in fsmap
.get('filesystems', []):
379 fs_name
= fs
["fs_name"]
381 # scale down this filesystem?
382 if fs
["max_mds"] > 1:
383 self
.mgr
.log
.info('Upgrade: Scaling down filesystem %s' % (
386 if fs_id
not in self
.upgrade_state
.fs_original_max_mds
:
387 self
.upgrade_state
.fs_original_max_mds
[fs_id
] = fs
['max_mds']
388 self
._save
_upgrade
_state
()
389 ret
, out
, err
= self
.mgr
.check_mon_command({
395 continue_upgrade
= False
398 if len(fs
['info']) > 1:
399 self
.mgr
.log
.info('Upgrade: Waiting for fs %s to scale down to 1 MDS' % (fs_name
))
401 continue_upgrade
= False
404 lone_mds
= list(fs
['info'].values())[0]
405 if lone_mds
['state'] != 'up:active':
406 self
.mgr
.log
.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
411 continue_upgrade
= False
414 return continue_upgrade
416 def _do_upgrade(self
):
418 if not self
.upgrade_state
:
419 logger
.debug('_do_upgrade no state, exiting')
422 target_image
= self
.target_image
423 target_id
= self
.upgrade_state
.target_id
424 target_digests
= self
.upgrade_state
.target_digests
425 target_version
= self
.upgrade_state
.target_version
428 if not target_id
or not target_version
or not target_digests
:
429 # need to learn the container hash
430 logger
.info('Upgrade: First pull of %s' % target_image
)
431 self
.upgrade_info_str
= 'Doing first pull of %s image' % (target_image
)
433 target_id
, target_version
, target_digests
= CephadmServe(self
.mgr
)._get
_container
_image
_info
(
435 except OrchestratorError
as e
:
436 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
437 'severity': 'warning',
438 'summary': 'Upgrade: failed to pull target image',
443 if not target_version
:
444 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
445 'severity': 'warning',
446 'summary': 'Upgrade: failed to pull target image',
448 'detail': ['unable to extract ceph version from container'],
451 self
.upgrade_state
.target_id
= target_id
452 # extract the version portion of 'ceph version {version} ({sha1})'
453 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
454 self
.upgrade_state
.target_digests
= target_digests
455 self
._save
_upgrade
_state
()
456 target_image
= self
.target_image
459 if target_digests
is None:
461 if target_version
.startswith('ceph version '):
462 # tolerate/fix upgrade state from older version
463 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
464 target_version
= self
.upgrade_state
.target_version
465 (target_major
, _
) = target_version
.split('.', 1)
466 target_major_name
= self
.mgr
.lookup_release_name(int(target_major
))
469 logger
.info('Upgrade: Target is version %s (%s)' % (
470 target_version
, target_major_name
))
471 logger
.info('Upgrade: Target container is %s, digests %s' % (
472 target_image
, target_digests
))
474 version_error
= self
._check
_target
_version
(target_version
)
476 self
._fail
_upgrade
('UPGRADE_BAD_TARGET_VERSION', {
478 'summary': f
'Upgrade: cannot upgrade/downgrade to {target_version}',
480 'detail': [version_error
],
484 image_settings
= self
.get_distinct_container_image_settings()
486 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons() if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
488 for daemon_type
in CEPH_UPGRADE_ORDER
:
489 logger
.debug('Upgrade: Checking %s daemons' % daemon_type
)
491 need_upgrade_self
= False
492 need_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
493 need_upgrade_deployer
: List
[Tuple
[DaemonDescription
, bool]] = []
495 if d
.daemon_type
!= daemon_type
:
497 assert d
.daemon_type
is not None
498 assert d
.daemon_id
is not None
499 correct_digest
= False
500 if (any(d
in target_digests
for d
in (d
.container_image_digests
or []))
501 or d
.daemon_type
in MONITORING_STACK_TYPES
):
502 logger
.debug('daemon %s.%s container digest correct' % (
503 daemon_type
, d
.daemon_id
))
504 correct_digest
= True
505 if any(d
in target_digests
for d
in (d
.deployed_by
or [])):
506 logger
.debug('daemon %s.%s deployed by correct version' % (
507 d
.daemon_type
, d
.daemon_id
))
511 if self
.mgr
.daemon_is_self(d
.daemon_type
, d
.daemon_id
):
512 logger
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
513 self
.mgr
.get_mgr_id())
514 need_upgrade_self
= True
518 logger
.debug('daemon %s.%s not deployed by correct version' % (
519 d
.daemon_type
, d
.daemon_id
))
520 need_upgrade_deployer
.append((d
, True))
522 logger
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
523 daemon_type
, d
.daemon_id
,
524 d
.container_image_name
, d
.container_image_digests
, d
.version
))
525 need_upgrade
.append((d
, False))
527 if not need_upgrade_self
:
528 # only after the mgr itself is upgraded can we expect daemons to have
529 # deployed_by == target_digests
530 need_upgrade
+= need_upgrade_deployer
532 # prepare filesystems for daemon upgrades?
536 and not self
._prepare
_for
_mds
_upgrade
(target_major
, [d_entry
[0] for d_entry
in need_upgrade
])
541 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (daemon_type
)
543 to_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
544 known_ok_to_stop
: List
[str] = []
545 for d_entry
in need_upgrade
:
547 assert d
.daemon_type
is not None
548 assert d
.daemon_id
is not None
549 assert d
.hostname
is not None
551 if not d
.container_image_id
:
552 if d
.container_image_name
== target_image
:
554 'daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
558 if d
.name() in known_ok_to_stop
:
559 logger
.info(f
'Upgrade: {d.name()} is also safe to restart')
560 to_upgrade
.append(d_entry
)
563 if d
.daemon_type
in ['mon', 'osd', 'mds']:
564 # NOTE: known_ok_to_stop is an output argument for
565 # _wait_for_ok_to_stop
566 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
569 to_upgrade
.append(d_entry
)
571 # if we don't have a list of others to consider, stop now
572 if not known_ok_to_stop
:
576 for d_entry
in to_upgrade
:
578 assert d
.daemon_type
is not None
579 assert d
.daemon_id
is not None
580 assert d
.hostname
is not None
582 self
._update
_upgrade
_progress
(done
/ len(daemons
))
584 # make sure host has latest container image
585 out
, errs
, code
= CephadmServe(self
.mgr
)._run
_cephadm
(
586 d
.hostname
, '', 'inspect-image', [],
587 image
=target_image
, no_fsid
=True, error_ok
=True)
588 if code
or not any(d
in target_digests
for d
in json
.loads(''.join(out
)).get('repo_digests', [])):
589 logger
.info('Upgrade: Pulling %s on %s' % (target_image
,
591 self
.upgrade_info_str
= 'Pulling %s image on host %s' % (
592 target_image
, d
.hostname
)
593 out
, errs
, code
= CephadmServe(self
.mgr
)._run
_cephadm
(
594 d
.hostname
, '', 'pull', [],
595 image
=target_image
, no_fsid
=True, error_ok
=True)
597 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
598 'severity': 'warning',
599 'summary': 'Upgrade: failed to pull target image',
602 'failed to pull %s on host %s' % (target_image
,
606 r
= json
.loads(''.join(out
))
607 if not any(d
in target_digests
for d
in r
.get('repo_digests', [])):
608 logger
.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
609 target_image
, d
.hostname
, r
['repo_digests'], target_digests
))
610 self
.upgrade_info_str
= 'Image %s pull on %s got new digests %s (not %s), restarting' % (
611 target_image
, d
.hostname
, r
['repo_digests'], target_digests
)
612 self
.upgrade_state
.target_digests
= r
['repo_digests']
613 self
._save
_upgrade
_state
()
616 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (daemon_type
)
618 if len(to_upgrade
) > 1:
619 logger
.info('Upgrade: Updating %s.%s (%d/%d)' %
620 (d
.daemon_type
, d
.daemon_id
, num
, len(to_upgrade
)))
622 logger
.info('Upgrade: Updating %s.%s' %
623 (d
.daemon_type
, d
.daemon_id
))
624 action
= 'Upgrading' if not d_entry
[1] else 'Redeploying'
626 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(d
)
627 self
.mgr
._daemon
_action
(
630 image
=target_image
if not d_entry
[1] else None
632 except Exception as e
:
633 self
._fail
_upgrade
('UPGRADE_REDEPLOY_DAEMON', {
634 'severity': 'warning',
635 'summary': f
'{action} daemon {d.name()} on host {d.hostname} failed.',
638 f
'Upgrade daemon: {d.name()}: {e}'
646 # complete mon upgrade?
647 if daemon_type
== 'mon':
648 if not self
.mgr
.get("have_local_config_map"):
649 logger
.info('Upgrade: Restarting mgr now that mons are running pacific')
650 need_upgrade_self
= True
652 if need_upgrade_self
:
654 self
.mgr
.mgr_service
.fail_over()
655 except OrchestratorError
as e
:
656 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
657 'severity': 'warning',
658 'summary': f
'Upgrade: {e}',
661 'The upgrade process needs to upgrade the mgr, '
662 'but it needs at least one standby to proceed.',
667 return # unreachable code, as fail_over never returns
668 elif daemon_type
== 'mgr':
669 if 'UPGRADE_NO_STANDBY_MGR' in self
.mgr
.health_checks
:
670 del self
.mgr
.health_checks
['UPGRADE_NO_STANDBY_MGR']
671 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
673 # make sure 'ceph versions' agrees
674 ret
, out_ver
, err
= self
.mgr
.check_mon_command({
675 'prefix': 'versions',
677 j
= json
.loads(out_ver
)
678 for version
, count
in j
.get(daemon_type
, {}).items():
679 short_version
= version
.split(' ')[2]
680 if short_version
!= target_version
:
682 'Upgrade: %d %s daemon(s) are %s != target %s' %
683 (count
, daemon_type
, short_version
, target_version
))
686 daemon_type_section
= name_to_config_section(daemon_type
)
687 if image_settings
.get(daemon_type_section
) != target_image
:
688 logger
.info('Upgrade: Setting container_image for all %s' %
690 self
.mgr
.set_container_image(daemon_type_section
, target_image
)
692 for section
in image_settings
.keys():
693 if section
.startswith(name_to_config_section(daemon_type
) + '.'):
694 to_clean
.append(section
)
696 logger
.debug('Upgrade: Cleaning up container_image for %s' %
698 for section
in to_clean
:
699 ret
, image
, err
= self
.mgr
.check_mon_command({
700 'prefix': 'config rm',
701 'name': 'container_image',
705 logger
.debug('Upgrade: All %s daemons are up to date.' % daemon_type
)
707 # complete osd upgrade?
708 if daemon_type
== 'osd':
709 osdmap
= self
.mgr
.get("osd_map")
710 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
711 osd_min
= ceph_release_to_major(osd_min_name
)
712 if osd_min
< int(target_major
):
714 f
'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
715 ret
, _
, err
= self
.mgr
.check_mon_command({
716 'prefix': 'osd require-osd-release',
717 'release': target_major_name
,
720 # complete mds upgrade?
721 if daemon_type
== 'mds' and self
.upgrade_state
.fs_original_max_mds
:
722 for i
in self
.mgr
.get("fs_map")['filesystems']:
724 fs_name
= i
['mdsmap']['fs_name']
725 new_max
= self
.upgrade_state
.fs_original_max_mds
.get(fs_id
)
727 self
.mgr
.log
.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
730 ret
, _
, err
= self
.mgr
.check_mon_command({
737 self
.upgrade_state
.fs_original_max_mds
= {}
738 self
._save
_upgrade
_state
()
741 logger
.info('Upgrade: Finalizing container_image settings')
742 self
.mgr
.set_container_image('global', target_image
)
744 for daemon_type
in CEPH_UPGRADE_ORDER
:
745 ret
, image
, err
= self
.mgr
.check_mon_command({
746 'prefix': 'config rm',
747 'name': 'container_image',
748 'who': name_to_config_section(daemon_type
),
751 logger
.info('Upgrade: Complete!')
752 if self
.upgrade_state
.progress_id
:
753 self
.mgr
.remote('progress', 'complete',
754 self
.upgrade_state
.progress_id
)
755 self
.upgrade_state
= None
756 self
._save
_upgrade
_state
()