]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
import ceph 16.2.6
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
1 import json
2 import logging
3 import time
4 import uuid
5 from typing import TYPE_CHECKING, Optional, Dict, List, Tuple
6
7 import orchestrator
8 from cephadm.serve import CephadmServe
9 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
10 from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES
11 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
12
13 if TYPE_CHECKING:
14 from .module import CephadmOrchestrator
15
16
17 logger = logging.getLogger(__name__)
18
19
20 def normalize_image_digest(digest: str, default_registry: str) -> str:
21 # normal case:
22 # ceph/ceph -> docker.io/ceph/ceph
23 # edge cases that shouldn't ever come up:
24 # ubuntu -> docker.io/ubuntu (ubuntu alias for library/ubuntu)
25 # no change:
26 # quay.ceph.io/ceph/ceph -> ceph
27 # docker.io/ubuntu -> no change
28 bits = digest.split('/')
29 if '.' not in bits[0] or len(bits) < 3:
30 digest = 'docker.io/' + digest
31 return digest
32
33
34 class UpgradeState:
35 def __init__(self,
36 target_name: str,
37 progress_id: str,
38 target_id: Optional[str] = None,
39 target_digests: Optional[List[str]] = None,
40 target_version: Optional[str] = None,
41 error: Optional[str] = None,
42 paused: Optional[bool] = None,
43 fs_original_max_mds: Optional[Dict[str, int]] = None,
44 ):
45 self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
46 self.progress_id: str = progress_id
47 self.target_id: Optional[str] = target_id
48 self.target_digests: Optional[List[str]] = target_digests
49 self.target_version: Optional[str] = target_version
50 self.error: Optional[str] = error
51 self.paused: bool = paused or False
52 self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
53
54 def to_json(self) -> dict:
55 return {
56 'target_name': self._target_name,
57 'progress_id': self.progress_id,
58 'target_id': self.target_id,
59 'target_digests': self.target_digests,
60 'target_version': self.target_version,
61 'fs_original_max_mds': self.fs_original_max_mds,
62 'error': self.error,
63 'paused': self.paused,
64 }
65
66 @classmethod
67 def from_json(cls, data: dict) -> Optional['UpgradeState']:
68 if data:
69 c = {k: v for k, v in data.items()}
70 if 'repo_digest' in c:
71 c['target_digests'] = [c.pop('repo_digest')]
72 return cls(**c)
73 else:
74 return None
75
76
77 class CephadmUpgrade:
78 UPGRADE_ERRORS = [
79 'UPGRADE_NO_STANDBY_MGR',
80 'UPGRADE_FAILED_PULL',
81 'UPGRADE_REDEPLOY_DAEMON',
82 'UPGRADE_BAD_TARGET_VERSION',
83 'UPGRADE_EXCEPTION'
84 ]
85
86 def __init__(self, mgr: "CephadmOrchestrator"):
87 self.mgr = mgr
88
89 t = self.mgr.get_store('upgrade_state')
90 if t:
91 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
92 else:
93 self.upgrade_state = None
94
95 @property
96 def target_image(self) -> str:
97 assert self.upgrade_state
98 if not self.mgr.use_repo_digest:
99 return self.upgrade_state._target_name
100 if not self.upgrade_state.target_digests:
101 return self.upgrade_state._target_name
102
103 # FIXME: we assume the first digest is the best one to use
104 return self.upgrade_state.target_digests[0]
105
106 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
107 r = orchestrator.UpgradeStatusSpec()
108 if self.upgrade_state:
109 r.target_image = self.target_image
110 r.in_progress = True
111 r.progress, r.services_complete = self._get_upgrade_info()
112 # accessing self.upgrade_info_str will throw an exception if it
113 # has not been set in _do_upgrade yet
114 try:
115 r.message = self.upgrade_info_str
116 except AttributeError:
117 pass
118 if self.upgrade_state.error:
119 r.message = 'Error: ' + self.upgrade_state.error
120 elif self.upgrade_state.paused:
121 r.message = 'Upgrade paused'
122 return r
123
124 def _get_upgrade_info(self) -> Tuple[str, List[str]]:
125 if not self.upgrade_state or not self.upgrade_state.target_digests:
126 return '', []
127
128 daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
129
130 if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
131 return '', []
132
133 completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in (
134 d.container_image_digests or []))) for d in daemons if d.daemon_type]
135
136 done = len([True for completion in completed_daemons if completion[1]])
137
138 completed_types = list(set([completion[0] for completion in completed_daemons if all(
139 c[1] for c in completed_daemons if c[0] == completion[0])]))
140
141 return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
142
143 def _check_target_version(self, version: str) -> Optional[str]:
144 try:
145 (major, minor, _) = version.split('.', 2)
146 assert int(minor) >= 0
147 # patch might be a number or {number}-g{sha1}
148 except ValueError:
149 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
150 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
151 return 'cephadm only supports octopus (15.2.0) or later'
152
153 # to far a jump?
154 current_version = self.mgr.version.split('ceph version ')[1]
155 (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
156 if int(current_major) < int(major) - 2:
157 return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
158 if int(current_major) > int(major):
159 return f'ceph cannot downgrade major versions (from {current_version} to {version})'
160 if int(current_major) == int(major):
161 if int(current_minor) > int(minor):
162 return f'ceph cannot downgrade to a {"rc" if minor == "1" else "dev"} release'
163
164 # check mon min
165 monmap = self.mgr.get("mon_map")
166 mon_min = monmap.get("min_mon_release", 0)
167 if mon_min < int(major) - 2:
168 return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
169
170 # check osd min
171 osdmap = self.mgr.get("osd_map")
172 osd_min_name = osdmap.get("require_osd_release", "argonaut")
173 osd_min = ceph_release_to_major(osd_min_name)
174 if osd_min < int(major) - 2:
175 return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
176
177 return None
178
179 def upgrade_start(self, image: str, version: str) -> str:
180 if self.mgr.mode != 'root':
181 raise OrchestratorError('upgrade is not supported in %s mode' % (
182 self.mgr.mode))
183 if version:
184 version_error = self._check_target_version(version)
185 if version_error:
186 raise OrchestratorError(version_error)
187 target_name = self.mgr.container_image_base + ':v' + version
188 elif image:
189 target_name = normalize_image_digest(image, self.mgr.default_registry)
190 else:
191 raise OrchestratorError('must specify either image or version')
192 if self.upgrade_state:
193 if self.upgrade_state._target_name != target_name:
194 raise OrchestratorError(
195 'Upgrade to %s (not %s) already in progress' %
196 (self.upgrade_state._target_name, target_name))
197 if self.upgrade_state.paused:
198 self.upgrade_state.paused = False
199 self._save_upgrade_state()
200 return 'Resumed upgrade to %s' % self.target_image
201 return 'Upgrade to %s in progress' % self.target_image
202
203 running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type(
204 'mgr') if daemon.status == DaemonDescriptionStatus.running])
205
206 if running_mgr_count < 2:
207 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
208
209 self.mgr.log.info('Upgrade: Started with target %s' % target_name)
210 self.upgrade_state = UpgradeState(
211 target_name=target_name,
212 progress_id=str(uuid.uuid4())
213 )
214 self._update_upgrade_progress(0.0)
215 self._save_upgrade_state()
216 self._clear_upgrade_health_checks()
217 self.mgr.event.set()
218 return 'Initiating upgrade to %s' % (target_name)
219
220 def upgrade_pause(self) -> str:
221 if not self.upgrade_state:
222 raise OrchestratorError('No upgrade in progress')
223 if self.upgrade_state.paused:
224 return 'Upgrade to %s already paused' % self.target_image
225 self.upgrade_state.paused = True
226 self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image)
227 self._save_upgrade_state()
228 return 'Paused upgrade to %s' % self.target_image
229
230 def upgrade_resume(self) -> str:
231 if not self.upgrade_state:
232 raise OrchestratorError('No upgrade in progress')
233 if not self.upgrade_state.paused:
234 return 'Upgrade to %s not paused' % self.target_image
235 self.upgrade_state.paused = False
236 self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image)
237 self._save_upgrade_state()
238 self.mgr.event.set()
239 return 'Resumed upgrade to %s' % self.target_image
240
241 def upgrade_stop(self) -> str:
242 if not self.upgrade_state:
243 return 'No upgrade in progress'
244 if self.upgrade_state.progress_id:
245 self.mgr.remote('progress', 'complete',
246 self.upgrade_state.progress_id)
247 target_image = self.target_image
248 self.mgr.log.info('Upgrade: Stopped')
249 self.upgrade_state = None
250 self._save_upgrade_state()
251 self._clear_upgrade_health_checks()
252 self.mgr.event.set()
253 return 'Stopped upgrade to %s' % target_image
254
255 def continue_upgrade(self) -> bool:
256 """
257 Returns false, if nothing was done.
258 :return:
259 """
260 if self.upgrade_state and not self.upgrade_state.paused:
261 try:
262 self._do_upgrade()
263 except Exception as e:
264 self._fail_upgrade('UPGRADE_EXCEPTION', {
265 'severity': 'error',
266 'summary': 'Upgrade: failed due to an unexpected exception',
267 'count': 1,
268 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'],
269 })
270 return False
271 return True
272 return False
273
274 def _wait_for_ok_to_stop(
275 self, s: DaemonDescription,
276 known: Optional[List[str]] = None, # NOTE: output argument!
277 ) -> bool:
278 # only wait a little bit; the service might go away for something
279 assert s.daemon_type is not None
280 assert s.daemon_id is not None
281 tries = 4
282 while tries > 0:
283 if not self.upgrade_state or self.upgrade_state.paused:
284 return False
285
286 # setting force flag to retain old functionality.
287 # note that known is an output argument for ok_to_stop()
288 r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([
289 s.daemon_id], known=known, force=True)
290
291 if not r.retval:
292 logger.info(f'Upgrade: {r.stdout}')
293 return True
294 logger.info(f'Upgrade: {r.stderr}')
295
296 time.sleep(15)
297 tries -= 1
298 return False
299
300 def _clear_upgrade_health_checks(self) -> None:
301 for k in self.UPGRADE_ERRORS:
302 if k in self.mgr.health_checks:
303 del self.mgr.health_checks[k]
304 self.mgr.set_health_checks(self.mgr.health_checks)
305
306 def _fail_upgrade(self, alert_id: str, alert: dict) -> None:
307 assert alert_id in self.UPGRADE_ERRORS
308 if not self.upgrade_state:
309 # this could happen if the user canceled the upgrade while we
310 # were doing something
311 return
312
313 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
314 alert['summary']))
315 self.upgrade_state.error = alert_id + ': ' + alert['summary']
316 self.upgrade_state.paused = True
317 self._save_upgrade_state()
318 self.mgr.health_checks[alert_id] = alert
319 self.mgr.set_health_checks(self.mgr.health_checks)
320
321 def _update_upgrade_progress(self, progress: float) -> None:
322 if not self.upgrade_state:
323 assert False, 'No upgrade in progress'
324
325 if not self.upgrade_state.progress_id:
326 self.upgrade_state.progress_id = str(uuid.uuid4())
327 self._save_upgrade_state()
328 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
329 ev_msg='Upgrade to %s' % (
330 self.upgrade_state.target_version or self.target_image
331 ),
332 ev_progress=progress,
333 add_to_ceph_s=True)
334
335 def _save_upgrade_state(self) -> None:
336 if not self.upgrade_state:
337 self.mgr.set_store('upgrade_state', None)
338 return
339 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
340
341 def get_distinct_container_image_settings(self) -> Dict[str, str]:
342 # get all distinct container_image settings
343 image_settings = {}
344 ret, out, err = self.mgr.check_mon_command({
345 'prefix': 'config dump',
346 'format': 'json',
347 })
348 config = json.loads(out)
349 for opt in config:
350 if opt['name'] == 'container_image':
351 image_settings[opt['section']] = opt['value']
352 return image_settings
353
354 def _prepare_for_mds_upgrade(
355 self,
356 target_major: str,
357 need_upgrade: List[DaemonDescription]
358 ) -> bool:
359 # are any daemons running a different major version?
360 scale_down = False
361 for name, info in self.mgr.get("mds_metadata").items():
362 version = info.get("ceph_version_short")
363 major_version = None
364 if version:
365 major_version = version.split('.')[0]
366 if not major_version:
367 self.mgr.log.info('Upgrade: mds.%s version is not known, will retry' % name)
368 time.sleep(5)
369 return False
370 if int(major_version) < int(target_major):
371 scale_down = True
372
373 if not scale_down:
374 self.mgr.log.debug('Upgrade: All MDS daemons run same major version')
375 return True
376
377 # scale down all filesystems to 1 MDS
378 assert self.upgrade_state
379 if not self.upgrade_state.fs_original_max_mds:
380 self.upgrade_state.fs_original_max_mds = {}
381 fsmap = self.mgr.get("fs_map")
382 continue_upgrade = True
383 for i in fsmap.get('filesystems', []):
384 fs = i["mdsmap"]
385 fs_id = i["id"]
386 fs_name = fs["fs_name"]
387
388 # scale down this filesystem?
389 if fs["max_mds"] > 1:
390 self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
391 fs_name
392 ))
393 if fs_id not in self.upgrade_state.fs_original_max_mds:
394 self.upgrade_state.fs_original_max_mds[fs_id] = fs['max_mds']
395 self._save_upgrade_state()
396 ret, out, err = self.mgr.check_mon_command({
397 'prefix': 'fs set',
398 'fs_name': fs_name,
399 'var': 'max_mds',
400 'val': '1',
401 })
402 continue_upgrade = False
403 continue
404
405 if len(fs['info']) > 1:
406 self.mgr.log.info('Upgrade: Waiting for fs %s to scale down to 1 MDS' % (fs_name))
407 time.sleep(10)
408 continue_upgrade = False
409 continue
410
411 lone_mds = list(fs['info'].values())[0]
412 if lone_mds['state'] != 'up:active':
413 self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
414 lone_mds['name'],
415 lone_mds['state'],
416 ))
417 time.sleep(10)
418 continue_upgrade = False
419 continue
420
421 return continue_upgrade
422
423 def _enough_mons_for_ok_to_stop(self) -> bool:
424 # type () -> bool
425 ret, out, err = self.mgr.check_mon_command({
426 'prefix': 'quorum_status',
427 })
428 try:
429 j = json.loads(out)
430 except Exception:
431 raise OrchestratorError('failed to parse quorum status')
432
433 mons = [m['name'] for m in j['monmap']['mons']]
434 return len(mons) > 2
435
436 def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool:
437 # type (DaemonDescription) -> bool
438
439 # find fs this mds daemon belongs to
440 fsmap = self.mgr.get("fs_map")
441 for i in fsmap.get('filesystems', []):
442 fs = i["mdsmap"]
443 fs_name = fs["fs_name"]
444
445 assert mds_daemon.daemon_id
446 if fs_name != mds_daemon.service_name().split('.', 1)[1]:
447 # wrong fs for this mds daemon
448 continue
449
450 # get number of mds daemons for this fs
451 mds_count = len(
452 [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())])
453
454 # standby mds daemons for this fs?
455 if fs["max_mds"] < mds_count:
456 return True
457 return False
458
459 return True # if mds has no fs it should pass ok-to-stop
460
461 def _do_upgrade(self):
462 # type: () -> None
463 if not self.upgrade_state:
464 logger.debug('_do_upgrade no state, exiting')
465 return
466
467 target_image = self.target_image
468 target_id = self.upgrade_state.target_id
469 target_digests = self.upgrade_state.target_digests
470 target_version = self.upgrade_state.target_version
471
472 first = False
473 if not target_id or not target_version or not target_digests:
474 # need to learn the container hash
475 logger.info('Upgrade: First pull of %s' % target_image)
476 self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
477 try:
478 target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
479 target_image)
480 except OrchestratorError as e:
481 self._fail_upgrade('UPGRADE_FAILED_PULL', {
482 'severity': 'warning',
483 'summary': 'Upgrade: failed to pull target image',
484 'count': 1,
485 'detail': [str(e)],
486 })
487 return
488 if not target_version:
489 self._fail_upgrade('UPGRADE_FAILED_PULL', {
490 'severity': 'warning',
491 'summary': 'Upgrade: failed to pull target image',
492 'count': 1,
493 'detail': ['unable to extract ceph version from container'],
494 })
495 return
496 self.upgrade_state.target_id = target_id
497 # extract the version portion of 'ceph version {version} ({sha1})'
498 self.upgrade_state.target_version = target_version.split(' ')[2]
499 self.upgrade_state.target_digests = target_digests
500 self._save_upgrade_state()
501 target_image = self.target_image
502 first = True
503
504 if target_digests is None:
505 target_digests = []
506 if target_version.startswith('ceph version '):
507 # tolerate/fix upgrade state from older version
508 self.upgrade_state.target_version = target_version.split(' ')[2]
509 target_version = self.upgrade_state.target_version
510 (target_major, _) = target_version.split('.', 1)
511 target_major_name = self.mgr.lookup_release_name(int(target_major))
512
513 if first:
514 logger.info('Upgrade: Target is version %s (%s)' % (
515 target_version, target_major_name))
516 logger.info('Upgrade: Target container is %s, digests %s' % (
517 target_image, target_digests))
518
519 version_error = self._check_target_version(target_version)
520 if version_error:
521 self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', {
522 'severity': 'error',
523 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}',
524 'count': 1,
525 'detail': [version_error],
526 })
527 return
528
529 image_settings = self.get_distinct_container_image_settings()
530
531 daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
532 done = 0
533 for daemon_type in CEPH_UPGRADE_ORDER:
534 logger.debug('Upgrade: Checking %s daemons' % daemon_type)
535
536 need_upgrade_self = False
537 need_upgrade: List[Tuple[DaemonDescription, bool]] = []
538 need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
539 for d in daemons:
540 if d.daemon_type != daemon_type:
541 continue
542 assert d.daemon_type is not None
543 assert d.daemon_id is not None
544 correct_digest = False
545 if (any(d in target_digests for d in (d.container_image_digests or []))
546 or d.daemon_type in MONITORING_STACK_TYPES):
547 logger.debug('daemon %s.%s container digest correct' % (
548 daemon_type, d.daemon_id))
549 correct_digest = True
550 if any(d in target_digests for d in (d.deployed_by or [])):
551 logger.debug('daemon %s.%s deployed by correct version' % (
552 d.daemon_type, d.daemon_id))
553 done += 1
554 continue
555
556 if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
557 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
558 self.mgr.get_mgr_id())
559 need_upgrade_self = True
560 continue
561
562 if correct_digest:
563 logger.debug('daemon %s.%s not deployed by correct version' % (
564 d.daemon_type, d.daemon_id))
565 need_upgrade_deployer.append((d, True))
566 else:
567 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
568 daemon_type, d.daemon_id,
569 d.container_image_name, d.container_image_digests, d.version))
570 need_upgrade.append((d, False))
571
572 if not need_upgrade_self:
573 # only after the mgr itself is upgraded can we expect daemons to have
574 # deployed_by == target_digests
575 need_upgrade += need_upgrade_deployer
576
577 # prepare filesystems for daemon upgrades?
578 if (
579 daemon_type == 'mds'
580 and need_upgrade
581 and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade])
582 ):
583 return
584
585 if need_upgrade:
586 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
587
588 to_upgrade: List[Tuple[DaemonDescription, bool]] = []
589 known_ok_to_stop: List[str] = []
590 for d_entry in need_upgrade:
591 d = d_entry[0]
592 assert d.daemon_type is not None
593 assert d.daemon_id is not None
594 assert d.hostname is not None
595
596 if not d.container_image_id:
597 if d.container_image_name == target_image:
598 logger.debug(
599 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
600 continue
601
602 if known_ok_to_stop:
603 if d.name() in known_ok_to_stop:
604 logger.info(f'Upgrade: {d.name()} is also safe to restart')
605 to_upgrade.append(d_entry)
606 continue
607
608 if d.daemon_type == 'osd':
609 # NOTE: known_ok_to_stop is an output argument for
610 # _wait_for_ok_to_stop
611 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
612 return
613
614 if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
615 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
616 return
617
618 if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
619 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
620 return
621
622 to_upgrade.append(d_entry)
623
624 # if we don't have a list of others to consider, stop now
625 if not known_ok_to_stop:
626 break
627
628 num = 1
629 for d_entry in to_upgrade:
630 d = d_entry[0]
631 assert d.daemon_type is not None
632 assert d.daemon_id is not None
633 assert d.hostname is not None
634
635 self._update_upgrade_progress(done / len(daemons))
636
637 # make sure host has latest container image
638 out, errs, code = CephadmServe(self.mgr)._run_cephadm(
639 d.hostname, '', 'inspect-image', [],
640 image=target_image, no_fsid=True, error_ok=True)
641 if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
642 logger.info('Upgrade: Pulling %s on %s' % (target_image,
643 d.hostname))
644 self.upgrade_info_str = 'Pulling %s image on host %s' % (
645 target_image, d.hostname)
646 out, errs, code = CephadmServe(self.mgr)._run_cephadm(
647 d.hostname, '', 'pull', [],
648 image=target_image, no_fsid=True, error_ok=True)
649 if code:
650 self._fail_upgrade('UPGRADE_FAILED_PULL', {
651 'severity': 'warning',
652 'summary': 'Upgrade: failed to pull target image',
653 'count': 1,
654 'detail': [
655 'failed to pull %s on host %s' % (target_image,
656 d.hostname)],
657 })
658 return
659 r = json.loads(''.join(out))
660 if not any(d in target_digests for d in r.get('repo_digests', [])):
661 logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
662 target_image, d.hostname, r['repo_digests'], target_digests))
663 self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
664 target_image, d.hostname, r['repo_digests'], target_digests)
665 self.upgrade_state.target_digests = r['repo_digests']
666 self._save_upgrade_state()
667 return
668
669 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
670
671 if len(to_upgrade) > 1:
672 logger.info('Upgrade: Updating %s.%s (%d/%d)' %
673 (d.daemon_type, d.daemon_id, num, len(to_upgrade)))
674 else:
675 logger.info('Upgrade: Updating %s.%s' %
676 (d.daemon_type, d.daemon_id))
677 action = 'Upgrading' if not d_entry[1] else 'Redeploying'
678 try:
679 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
680 self.mgr._daemon_action(
681 daemon_spec,
682 'redeploy',
683 image=target_image if not d_entry[1] else None
684 )
685 except Exception as e:
686 self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
687 'severity': 'warning',
688 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
689 'count': 1,
690 'detail': [
691 f'Upgrade daemon: {d.name()}: {e}'
692 ],
693 })
694 return
695 num += 1
696 if to_upgrade:
697 return
698
699 # complete mon upgrade?
700 if daemon_type == 'mon':
701 if not self.mgr.get("have_local_config_map"):
702 logger.info('Upgrade: Restarting mgr now that mons are running pacific')
703 need_upgrade_self = True
704
705 if need_upgrade_self:
706 try:
707 self.mgr.mgr_service.fail_over()
708 except OrchestratorError as e:
709 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
710 'severity': 'warning',
711 'summary': f'Upgrade: {e}',
712 'count': 1,
713 'detail': [
714 'The upgrade process needs to upgrade the mgr, '
715 'but it needs at least one standby to proceed.',
716 ],
717 })
718 return
719
720 return # unreachable code, as fail_over never returns
721 elif daemon_type == 'mgr':
722 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
723 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
724 self.mgr.set_health_checks(self.mgr.health_checks)
725
726 # make sure 'ceph versions' agrees
727 ret, out_ver, err = self.mgr.check_mon_command({
728 'prefix': 'versions',
729 })
730 j = json.loads(out_ver)
731 for version, count in j.get(daemon_type, {}).items():
732 short_version = version.split(' ')[2]
733 if short_version != target_version:
734 logger.warning(
735 'Upgrade: %d %s daemon(s) are %s != target %s' %
736 (count, daemon_type, short_version, target_version))
737
738 # push down configs
739 daemon_type_section = name_to_config_section(daemon_type)
740 if image_settings.get(daemon_type_section) != target_image:
741 logger.info('Upgrade: Setting container_image for all %s' %
742 daemon_type)
743 self.mgr.set_container_image(daemon_type_section, target_image)
744 to_clean = []
745 for section in image_settings.keys():
746 if section.startswith(name_to_config_section(daemon_type) + '.'):
747 to_clean.append(section)
748 if to_clean:
749 logger.debug('Upgrade: Cleaning up container_image for %s' %
750 to_clean)
751 for section in to_clean:
752 ret, image, err = self.mgr.check_mon_command({
753 'prefix': 'config rm',
754 'name': 'container_image',
755 'who': section,
756 })
757
758 logger.debug('Upgrade: All %s daemons are up to date.' % daemon_type)
759
760 # complete osd upgrade?
761 if daemon_type == 'osd':
762 osdmap = self.mgr.get("osd_map")
763 osd_min_name = osdmap.get("require_osd_release", "argonaut")
764 osd_min = ceph_release_to_major(osd_min_name)
765 if osd_min < int(target_major):
766 logger.info(
767 f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
768 ret, _, err = self.mgr.check_mon_command({
769 'prefix': 'osd require-osd-release',
770 'release': target_major_name,
771 })
772
773 # complete mds upgrade?
774 if daemon_type == 'mds' and self.upgrade_state.fs_original_max_mds:
775 for i in self.mgr.get("fs_map")['filesystems']:
776 fs_id = i["id"]
777 fs_name = i['mdsmap']['fs_name']
778 new_max = self.upgrade_state.fs_original_max_mds.get(fs_id)
779 if new_max:
780 self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
781 fs_name, new_max
782 ))
783 ret, _, err = self.mgr.check_mon_command({
784 'prefix': 'fs set',
785 'fs_name': fs_name,
786 'var': 'max_mds',
787 'val': str(new_max),
788 })
789
790 self.upgrade_state.fs_original_max_mds = {}
791 self._save_upgrade_state()
792
793 # clean up
794 logger.info('Upgrade: Finalizing container_image settings')
795 self.mgr.set_container_image('global', target_image)
796
797 for daemon_type in CEPH_UPGRADE_ORDER:
798 ret, image, err = self.mgr.check_mon_command({
799 'prefix': 'config rm',
800 'name': 'container_image',
801 'who': name_to_config_section(daemon_type),
802 })
803
804 logger.info('Upgrade: Complete!')
805 if self.upgrade_state.progress_id:
806 self.mgr.remote('progress', 'complete',
807 self.upgrade_state.progress_id)
808 self.upgrade_state = None
809 self._save_upgrade_state()
810 return