]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
1 import json
2 import logging
3 import time
4 import uuid
5 from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any
6
7 import orchestrator
8 from cephadm.registry import Registry
9 from cephadm.serve import CephadmServe
10 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
11 from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES
12 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
13
14 if TYPE_CHECKING:
15 from .module import CephadmOrchestrator
16
17
18 logger = logging.getLogger(__name__)
19
20 # from ceph_fs.h
21 CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
22
23
24 def normalize_image_digest(digest: str, default_registry: str) -> str:
25 # normal case:
26 # ceph/ceph -> docker.io/ceph/ceph
27 # edge cases that shouldn't ever come up:
28 # ubuntu -> docker.io/ubuntu (ubuntu alias for library/ubuntu)
29 # no change:
30 # quay.ceph.io/ceph/ceph -> ceph
31 # docker.io/ubuntu -> no change
32 bits = digest.split('/')
33 if '.' not in bits[0] or len(bits) < 3:
34 digest = 'docker.io/' + digest
35 return digest
36
37
38 class UpgradeState:
39 def __init__(self,
40 target_name: str,
41 progress_id: str,
42 target_id: Optional[str] = None,
43 target_digests: Optional[List[str]] = None,
44 target_version: Optional[str] = None,
45 error: Optional[str] = None,
46 paused: Optional[bool] = None,
47 fs_original_max_mds: Optional[Dict[str, int]] = None,
48 fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None
49 ):
50 self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
51 self.progress_id: str = progress_id
52 self.target_id: Optional[str] = target_id
53 self.target_digests: Optional[List[str]] = target_digests
54 self.target_version: Optional[str] = target_version
55 self.error: Optional[str] = error
56 self.paused: bool = paused or False
57 self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
58 self.fs_original_allow_standby_replay: Optional[Dict[str, bool]] = fs_original_allow_standby_replay
59
60 def to_json(self) -> dict:
61 return {
62 'target_name': self._target_name,
63 'progress_id': self.progress_id,
64 'target_id': self.target_id,
65 'target_digests': self.target_digests,
66 'target_version': self.target_version,
67 'fs_original_max_mds': self.fs_original_max_mds,
68 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
69 'error': self.error,
70 'paused': self.paused,
71 }
72
73 @classmethod
74 def from_json(cls, data: dict) -> Optional['UpgradeState']:
75 if data:
76 c = {k: v for k, v in data.items()}
77 if 'repo_digest' in c:
78 c['target_digests'] = [c.pop('repo_digest')]
79 return cls(**c)
80 else:
81 return None
82
83
84 class CephadmUpgrade:
85 UPGRADE_ERRORS = [
86 'UPGRADE_NO_STANDBY_MGR',
87 'UPGRADE_FAILED_PULL',
88 'UPGRADE_REDEPLOY_DAEMON',
89 'UPGRADE_BAD_TARGET_VERSION',
90 'UPGRADE_EXCEPTION'
91 ]
92
93 def __init__(self, mgr: "CephadmOrchestrator"):
94 self.mgr = mgr
95
96 t = self.mgr.get_store('upgrade_state')
97 if t:
98 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
99 else:
100 self.upgrade_state = None
101
102 @property
103 def target_image(self) -> str:
104 assert self.upgrade_state
105 if not self.mgr.use_repo_digest:
106 return self.upgrade_state._target_name
107 if not self.upgrade_state.target_digests:
108 return self.upgrade_state._target_name
109
110 # FIXME: we assume the first digest is the best one to use
111 return self.upgrade_state.target_digests[0]
112
113 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
114 r = orchestrator.UpgradeStatusSpec()
115 if self.upgrade_state:
116 r.target_image = self.target_image
117 r.in_progress = True
118 r.progress, r.services_complete = self._get_upgrade_info()
119 # accessing self.upgrade_info_str will throw an exception if it
120 # has not been set in _do_upgrade yet
121 try:
122 r.message = self.upgrade_info_str
123 except AttributeError:
124 pass
125 if self.upgrade_state.error:
126 r.message = 'Error: ' + self.upgrade_state.error
127 elif self.upgrade_state.paused:
128 r.message = 'Upgrade paused'
129 return r
130
131 def _get_upgrade_info(self) -> Tuple[str, List[str]]:
132 if not self.upgrade_state or not self.upgrade_state.target_digests:
133 return '', []
134
135 daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
136
137 if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
138 return '', []
139
140 completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in (
141 d.container_image_digests or []))) for d in daemons if d.daemon_type]
142
143 done = len([True for completion in completed_daemons if completion[1]])
144
145 completed_types = list(set([completion[0] for completion in completed_daemons if all(
146 c[1] for c in completed_daemons if c[0] == completion[0])]))
147
148 return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
149
150 def _check_target_version(self, version: str) -> Optional[str]:
151 try:
152 (major, minor, _) = version.split('.', 2)
153 assert int(minor) >= 0
154 # patch might be a number or {number}-g{sha1}
155 except ValueError:
156 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
157 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
158 return 'cephadm only supports octopus (15.2.0) or later'
159
160 # to far a jump?
161 current_version = self.mgr.version.split('ceph version ')[1]
162 (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
163 if int(current_major) < int(major) - 2:
164 return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
165 if int(current_major) > int(major):
166 return f'ceph cannot downgrade major versions (from {current_version} to {version})'
167 if int(current_major) == int(major):
168 if int(current_minor) > int(minor):
169 return f'ceph cannot downgrade to a {"rc" if minor == "1" else "dev"} release'
170
171 # check mon min
172 monmap = self.mgr.get("mon_map")
173 mon_min = monmap.get("min_mon_release", 0)
174 if mon_min < int(major) - 2:
175 return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
176
177 # check osd min
178 osdmap = self.mgr.get("osd_map")
179 osd_min_name = osdmap.get("require_osd_release", "argonaut")
180 osd_min = ceph_release_to_major(osd_min_name)
181 if osd_min < int(major) - 2:
182 return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
183
184 return None
185
186 def upgrade_ls(self, image: Optional[str], tags: bool) -> Dict:
187 if not image:
188 image = self.mgr.container_image_base
189 reg_name, bare_image = image.split('/', 1)
190 reg = Registry(reg_name)
191 versions = []
192 r: Dict[Any, Any] = {
193 "image": image,
194 "registry": reg_name,
195 "bare_image": bare_image,
196 }
197 ls = reg.get_tags(bare_image)
198 if not tags:
199 for t in ls:
200 if t[0] != 'v':
201 continue
202 v = t[1:].split('.')
203 if len(v) != 3:
204 continue
205 if '-' in v[2]:
206 continue
207 versions.append('.'.join(v))
208 r["versions"] = sorted(
209 versions,
210 key=lambda k: list(map(int, k.split('.'))),
211 reverse=True
212 )
213 else:
214 r["tags"] = sorted(ls)
215 return r
216
217 def upgrade_start(self, image: str, version: str) -> str:
218 if self.mgr.mode != 'root':
219 raise OrchestratorError('upgrade is not supported in %s mode' % (
220 self.mgr.mode))
221 if version:
222 version_error = self._check_target_version(version)
223 if version_error:
224 raise OrchestratorError(version_error)
225 target_name = self.mgr.container_image_base + ':v' + version
226 elif image:
227 target_name = normalize_image_digest(image, self.mgr.default_registry)
228 else:
229 raise OrchestratorError('must specify either image or version')
230 if self.upgrade_state:
231 if self.upgrade_state._target_name != target_name:
232 raise OrchestratorError(
233 'Upgrade to %s (not %s) already in progress' %
234 (self.upgrade_state._target_name, target_name))
235 if self.upgrade_state.paused:
236 self.upgrade_state.paused = False
237 self._save_upgrade_state()
238 return 'Resumed upgrade to %s' % self.target_image
239 return 'Upgrade to %s in progress' % self.target_image
240
241 running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type(
242 'mgr') if daemon.status == DaemonDescriptionStatus.running])
243
244 if running_mgr_count < 2:
245 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
246
247 self.mgr.log.info('Upgrade: Started with target %s' % target_name)
248 self.upgrade_state = UpgradeState(
249 target_name=target_name,
250 progress_id=str(uuid.uuid4())
251 )
252 self._update_upgrade_progress(0.0)
253 self._save_upgrade_state()
254 self._clear_upgrade_health_checks()
255 self.mgr.event.set()
256 return 'Initiating upgrade to %s' % (target_name)
257
258 def upgrade_pause(self) -> str:
259 if not self.upgrade_state:
260 raise OrchestratorError('No upgrade in progress')
261 if self.upgrade_state.paused:
262 return 'Upgrade to %s already paused' % self.target_image
263 self.upgrade_state.paused = True
264 self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image)
265 self._save_upgrade_state()
266 return 'Paused upgrade to %s' % self.target_image
267
268 def upgrade_resume(self) -> str:
269 if not self.upgrade_state:
270 raise OrchestratorError('No upgrade in progress')
271 if not self.upgrade_state.paused:
272 return 'Upgrade to %s not paused' % self.target_image
273 self.upgrade_state.paused = False
274 self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image)
275 self._save_upgrade_state()
276 self.mgr.event.set()
277 return 'Resumed upgrade to %s' % self.target_image
278
279 def upgrade_stop(self) -> str:
280 if not self.upgrade_state:
281 return 'No upgrade in progress'
282 if self.upgrade_state.progress_id:
283 self.mgr.remote('progress', 'complete',
284 self.upgrade_state.progress_id)
285 target_image = self.target_image
286 self.mgr.log.info('Upgrade: Stopped')
287 self.upgrade_state = None
288 self._save_upgrade_state()
289 self._clear_upgrade_health_checks()
290 self.mgr.event.set()
291 return 'Stopped upgrade to %s' % target_image
292
293 def continue_upgrade(self) -> bool:
294 """
295 Returns false, if nothing was done.
296 :return:
297 """
298 if self.upgrade_state and not self.upgrade_state.paused:
299 try:
300 self._do_upgrade()
301 except Exception as e:
302 self._fail_upgrade('UPGRADE_EXCEPTION', {
303 'severity': 'error',
304 'summary': 'Upgrade: failed due to an unexpected exception',
305 'count': 1,
306 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'],
307 })
308 return False
309 return True
310 return False
311
312 def _wait_for_ok_to_stop(
313 self, s: DaemonDescription,
314 known: Optional[List[str]] = None, # NOTE: output argument!
315 ) -> bool:
316 # only wait a little bit; the service might go away for something
317 assert s.daemon_type is not None
318 assert s.daemon_id is not None
319 tries = 4
320 while tries > 0:
321 if not self.upgrade_state or self.upgrade_state.paused:
322 return False
323
324 # setting force flag to retain old functionality.
325 # note that known is an output argument for ok_to_stop()
326 r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([
327 s.daemon_id], known=known, force=True)
328
329 if not r.retval:
330 logger.info(f'Upgrade: {r.stdout}')
331 return True
332 logger.info(f'Upgrade: {r.stderr}')
333
334 time.sleep(15)
335 tries -= 1
336 return False
337
338 def _clear_upgrade_health_checks(self) -> None:
339 for k in self.UPGRADE_ERRORS:
340 if k in self.mgr.health_checks:
341 del self.mgr.health_checks[k]
342 self.mgr.set_health_checks(self.mgr.health_checks)
343
344 def _fail_upgrade(self, alert_id: str, alert: dict) -> None:
345 assert alert_id in self.UPGRADE_ERRORS
346 if not self.upgrade_state:
347 # this could happen if the user canceled the upgrade while we
348 # were doing something
349 return
350
351 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
352 alert['summary']))
353 self.upgrade_state.error = alert_id + ': ' + alert['summary']
354 self.upgrade_state.paused = True
355 self._save_upgrade_state()
356 self.mgr.health_checks[alert_id] = alert
357 self.mgr.set_health_checks(self.mgr.health_checks)
358
359 def _update_upgrade_progress(self, progress: float) -> None:
360 if not self.upgrade_state:
361 assert False, 'No upgrade in progress'
362
363 if not self.upgrade_state.progress_id:
364 self.upgrade_state.progress_id = str(uuid.uuid4())
365 self._save_upgrade_state()
366 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
367 ev_msg='Upgrade to %s' % (
368 self.upgrade_state.target_version or self.target_image
369 ),
370 ev_progress=progress,
371 add_to_ceph_s=True)
372
373 def _save_upgrade_state(self) -> None:
374 if not self.upgrade_state:
375 self.mgr.set_store('upgrade_state', None)
376 return
377 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
378
379 def get_distinct_container_image_settings(self) -> Dict[str, str]:
380 # get all distinct container_image settings
381 image_settings = {}
382 ret, out, err = self.mgr.check_mon_command({
383 'prefix': 'config dump',
384 'format': 'json',
385 })
386 config = json.loads(out)
387 for opt in config:
388 if opt['name'] == 'container_image':
389 image_settings[opt['section']] = opt['value']
390 return image_settings
391
392 def _prepare_for_mds_upgrade(
393 self,
394 target_major: str,
395 need_upgrade: List[DaemonDescription]
396 ) -> bool:
397 # scale down all filesystems to 1 MDS
398 assert self.upgrade_state
399 if not self.upgrade_state.fs_original_max_mds:
400 self.upgrade_state.fs_original_max_mds = {}
401 if not self.upgrade_state.fs_original_allow_standby_replay:
402 self.upgrade_state.fs_original_allow_standby_replay = {}
403 fsmap = self.mgr.get("fs_map")
404 continue_upgrade = True
405 for fs in fsmap.get('filesystems', []):
406 fscid = fs["id"]
407 mdsmap = fs["mdsmap"]
408 fs_name = mdsmap["fs_name"]
409
410 # disable allow_standby_replay?
411 if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY:
412 self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % (
413 fs_name
414 ))
415 if fscid not in self.upgrade_state.fs_original_allow_standby_replay:
416 self.upgrade_state.fs_original_allow_standby_replay[fscid] = True
417 self._save_upgrade_state()
418 ret, out, err = self.mgr.check_mon_command({
419 'prefix': 'fs set',
420 'fs_name': fs_name,
421 'var': 'allow_standby_replay',
422 'val': '0',
423 })
424 continue_upgrade = False
425 continue
426
427 # scale down this filesystem?
428 if mdsmap["max_mds"] > 1:
429 self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
430 fs_name
431 ))
432 if fscid not in self.upgrade_state.fs_original_max_mds:
433 self.upgrade_state.fs_original_max_mds[fscid] = mdsmap['max_mds']
434 self._save_upgrade_state()
435 ret, out, err = self.mgr.check_mon_command({
436 'prefix': 'fs set',
437 'fs_name': fs_name,
438 'var': 'max_mds',
439 'val': '1',
440 })
441 continue_upgrade = False
442 continue
443
444 if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
445 self.mgr.log.info('Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
446 time.sleep(10)
447 continue_upgrade = False
448 continue
449
450 if len(mdsmap['up']) == 0:
451 self.mgr.log.warning("Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
452 # This can happen because the current version MDS have
453 # incompatible compatsets; the mons will not do any promotions.
454 # We must upgrade to continue.
455 elif len(mdsmap['up']) > 0:
456 mdss = list(mdsmap['info'].values())
457 assert len(mdss) == 1
458 lone_mds = mdss[0]
459 if lone_mds['state'] != 'up:active':
460 self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
461 lone_mds['name'],
462 lone_mds['state'],
463 ))
464 time.sleep(10)
465 continue_upgrade = False
466 continue
467 else:
468 assert False
469
470 return continue_upgrade
471
472 def _enough_mons_for_ok_to_stop(self) -> bool:
473 # type () -> bool
474 ret, out, err = self.mgr.check_mon_command({
475 'prefix': 'quorum_status',
476 })
477 try:
478 j = json.loads(out)
479 except Exception:
480 raise OrchestratorError('failed to parse quorum status')
481
482 mons = [m['name'] for m in j['monmap']['mons']]
483 return len(mons) > 2
484
485 def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool:
486 # type (DaemonDescription) -> bool
487
488 # find fs this mds daemon belongs to
489 fsmap = self.mgr.get("fs_map")
490 for fs in fsmap.get('filesystems', []):
491 mdsmap = fs["mdsmap"]
492 fs_name = mdsmap["fs_name"]
493
494 assert mds_daemon.daemon_id
495 if fs_name != mds_daemon.service_name().split('.', 1)[1]:
496 # wrong fs for this mds daemon
497 continue
498
499 # get number of mds daemons for this fs
500 mds_count = len(
501 [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())])
502
503 # standby mds daemons for this fs?
504 if mdsmap["max_mds"] < mds_count:
505 return True
506 return False
507
508 return True # if mds has no fs it should pass ok-to-stop
509
510 def _do_upgrade(self):
511 # type: () -> None
512 if not self.upgrade_state:
513 logger.debug('_do_upgrade no state, exiting')
514 return
515
516 target_image = self.target_image
517 target_id = self.upgrade_state.target_id
518 target_digests = self.upgrade_state.target_digests
519 target_version = self.upgrade_state.target_version
520
521 first = False
522 if not target_id or not target_version or not target_digests:
523 # need to learn the container hash
524 logger.info('Upgrade: First pull of %s' % target_image)
525 self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
526 try:
527 target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
528 target_image)
529 except OrchestratorError as e:
530 self._fail_upgrade('UPGRADE_FAILED_PULL', {
531 'severity': 'warning',
532 'summary': 'Upgrade: failed to pull target image',
533 'count': 1,
534 'detail': [str(e)],
535 })
536 return
537 if not target_version:
538 self._fail_upgrade('UPGRADE_FAILED_PULL', {
539 'severity': 'warning',
540 'summary': 'Upgrade: failed to pull target image',
541 'count': 1,
542 'detail': ['unable to extract ceph version from container'],
543 })
544 return
545 self.upgrade_state.target_id = target_id
546 # extract the version portion of 'ceph version {version} ({sha1})'
547 self.upgrade_state.target_version = target_version.split(' ')[2]
548 self.upgrade_state.target_digests = target_digests
549 self._save_upgrade_state()
550 target_image = self.target_image
551 first = True
552
553 if target_digests is None:
554 target_digests = []
555 if target_version.startswith('ceph version '):
556 # tolerate/fix upgrade state from older version
557 self.upgrade_state.target_version = target_version.split(' ')[2]
558 target_version = self.upgrade_state.target_version
559 (target_major, _) = target_version.split('.', 1)
560 target_major_name = self.mgr.lookup_release_name(int(target_major))
561
562 if first:
563 logger.info('Upgrade: Target is version %s (%s)' % (
564 target_version, target_major_name))
565 logger.info('Upgrade: Target container is %s, digests %s' % (
566 target_image, target_digests))
567
568 version_error = self._check_target_version(target_version)
569 if version_error:
570 self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', {
571 'severity': 'error',
572 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}',
573 'count': 1,
574 'detail': [version_error],
575 })
576 return
577
578 image_settings = self.get_distinct_container_image_settings()
579
580 # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
581 # MDSMap::compat for all fs. This is no longer the case beginning in
582 # v16.2.5. We must disable the sanity checks during upgrade.
583 # N.B.: we don't bother confirming the operator has not already
584 # disabled this or saving the config value.
585 self.mgr.check_mon_command({
586 'prefix': 'config set',
587 'name': 'mon_mds_skip_sanity',
588 'value': '1',
589 'who': 'mon',
590 })
591
592 daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
593 done = 0
594 for daemon_type in CEPH_UPGRADE_ORDER:
595 logger.debug('Upgrade: Checking %s daemons' % daemon_type)
596
597 need_upgrade_self = False
598 need_upgrade: List[Tuple[DaemonDescription, bool]] = []
599 need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
600 for d in daemons:
601 if d.daemon_type != daemon_type:
602 continue
603 assert d.daemon_type is not None
604 assert d.daemon_id is not None
605 correct_digest = False
606 if (any(d in target_digests for d in (d.container_image_digests or []))
607 or d.daemon_type in MONITORING_STACK_TYPES):
608 logger.debug('daemon %s.%s container digest correct' % (
609 daemon_type, d.daemon_id))
610 correct_digest = True
611 if any(d in target_digests for d in (d.deployed_by or [])):
612 logger.debug('daemon %s.%s deployed by correct version' % (
613 d.daemon_type, d.daemon_id))
614 done += 1
615 continue
616
617 if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
618 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
619 self.mgr.get_mgr_id())
620 need_upgrade_self = True
621 continue
622
623 if correct_digest:
624 logger.debug('daemon %s.%s not deployed by correct version' % (
625 d.daemon_type, d.daemon_id))
626 need_upgrade_deployer.append((d, True))
627 else:
628 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
629 daemon_type, d.daemon_id,
630 d.container_image_name, d.container_image_digests, d.version))
631 need_upgrade.append((d, False))
632
633 if not need_upgrade_self:
634 # only after the mgr itself is upgraded can we expect daemons to have
635 # deployed_by == target_digests
636 need_upgrade += need_upgrade_deployer
637
638 # prepare filesystems for daemon upgrades?
639 if (
640 daemon_type == 'mds'
641 and need_upgrade
642 and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade])
643 ):
644 return
645
646 if need_upgrade:
647 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
648
649 to_upgrade: List[Tuple[DaemonDescription, bool]] = []
650 known_ok_to_stop: List[str] = []
651 for d_entry in need_upgrade:
652 d = d_entry[0]
653 assert d.daemon_type is not None
654 assert d.daemon_id is not None
655 assert d.hostname is not None
656
657 if not d.container_image_id:
658 if d.container_image_name == target_image:
659 logger.debug(
660 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
661 continue
662
663 if known_ok_to_stop:
664 if d.name() in known_ok_to_stop:
665 logger.info(f'Upgrade: {d.name()} is also safe to restart')
666 to_upgrade.append(d_entry)
667 continue
668
669 if d.daemon_type == 'osd':
670 # NOTE: known_ok_to_stop is an output argument for
671 # _wait_for_ok_to_stop
672 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
673 return
674
675 if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
676 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
677 return
678
679 if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
680 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
681 return
682
683 to_upgrade.append(d_entry)
684
685 # if we don't have a list of others to consider, stop now
686 if not known_ok_to_stop:
687 break
688
689 num = 1
690 for d_entry in to_upgrade:
691 d = d_entry[0]
692 assert d.daemon_type is not None
693 assert d.daemon_id is not None
694 assert d.hostname is not None
695
696 self._update_upgrade_progress(done / len(daemons))
697
698 # make sure host has latest container image
699 out, errs, code = CephadmServe(self.mgr)._run_cephadm(
700 d.hostname, '', 'inspect-image', [],
701 image=target_image, no_fsid=True, error_ok=True)
702 if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
703 logger.info('Upgrade: Pulling %s on %s' % (target_image,
704 d.hostname))
705 self.upgrade_info_str = 'Pulling %s image on host %s' % (
706 target_image, d.hostname)
707 out, errs, code = CephadmServe(self.mgr)._run_cephadm(
708 d.hostname, '', 'pull', [],
709 image=target_image, no_fsid=True, error_ok=True)
710 if code:
711 self._fail_upgrade('UPGRADE_FAILED_PULL', {
712 'severity': 'warning',
713 'summary': 'Upgrade: failed to pull target image',
714 'count': 1,
715 'detail': [
716 'failed to pull %s on host %s' % (target_image,
717 d.hostname)],
718 })
719 return
720 r = json.loads(''.join(out))
721 if not any(d in target_digests for d in r.get('repo_digests', [])):
722 logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
723 target_image, d.hostname, r['repo_digests'], target_digests))
724 self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
725 target_image, d.hostname, r['repo_digests'], target_digests)
726 self.upgrade_state.target_digests = r['repo_digests']
727 self._save_upgrade_state()
728 return
729
730 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
731
732 if len(to_upgrade) > 1:
733 logger.info('Upgrade: Updating %s.%s (%d/%d)' %
734 (d.daemon_type, d.daemon_id, num, len(to_upgrade)))
735 else:
736 logger.info('Upgrade: Updating %s.%s' %
737 (d.daemon_type, d.daemon_id))
738 action = 'Upgrading' if not d_entry[1] else 'Redeploying'
739 try:
740 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
741 self.mgr._daemon_action(
742 daemon_spec,
743 'redeploy',
744 image=target_image if not d_entry[1] else None
745 )
746 except Exception as e:
747 self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
748 'severity': 'warning',
749 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
750 'count': 1,
751 'detail': [
752 f'Upgrade daemon: {d.name()}: {e}'
753 ],
754 })
755 return
756 num += 1
757 if to_upgrade:
758 return
759
760 # complete mon upgrade?
761 if daemon_type == 'mon':
762 if not self.mgr.get("have_local_config_map"):
763 logger.info('Upgrade: Restarting mgr now that mons are running pacific')
764 need_upgrade_self = True
765
766 if need_upgrade_self:
767 try:
768 self.mgr.mgr_service.fail_over()
769 except OrchestratorError as e:
770 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
771 'severity': 'warning',
772 'summary': f'Upgrade: {e}',
773 'count': 1,
774 'detail': [
775 'The upgrade process needs to upgrade the mgr, '
776 'but it needs at least one standby to proceed.',
777 ],
778 })
779 return
780
781 return # unreachable code, as fail_over never returns
782 elif daemon_type == 'mgr':
783 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
784 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
785 self.mgr.set_health_checks(self.mgr.health_checks)
786
787 # make sure 'ceph versions' agrees
788 ret, out_ver, err = self.mgr.check_mon_command({
789 'prefix': 'versions',
790 })
791 j = json.loads(out_ver)
792 for version, count in j.get(daemon_type, {}).items():
793 short_version = version.split(' ')[2]
794 if short_version != target_version:
795 logger.warning(
796 'Upgrade: %d %s daemon(s) are %s != target %s' %
797 (count, daemon_type, short_version, target_version))
798
799 # push down configs
800 daemon_type_section = name_to_config_section(daemon_type)
801 if image_settings.get(daemon_type_section) != target_image:
802 logger.info('Upgrade: Setting container_image for all %s' %
803 daemon_type)
804 self.mgr.set_container_image(daemon_type_section, target_image)
805 to_clean = []
806 for section in image_settings.keys():
807 if section.startswith(name_to_config_section(daemon_type) + '.'):
808 to_clean.append(section)
809 if to_clean:
810 logger.debug('Upgrade: Cleaning up container_image for %s' %
811 to_clean)
812 for section in to_clean:
813 ret, image, err = self.mgr.check_mon_command({
814 'prefix': 'config rm',
815 'name': 'container_image',
816 'who': section,
817 })
818
819 logger.debug('Upgrade: All %s daemons are up to date.' % daemon_type)
820
821 # complete osd upgrade?
822 if daemon_type == 'osd':
823 osdmap = self.mgr.get("osd_map")
824 osd_min_name = osdmap.get("require_osd_release", "argonaut")
825 osd_min = ceph_release_to_major(osd_min_name)
826 if osd_min < int(target_major):
827 logger.info(
828 f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
829 ret, _, err = self.mgr.check_mon_command({
830 'prefix': 'osd require-osd-release',
831 'release': target_major_name,
832 })
833
834 # complete mds upgrade?
835 if daemon_type == 'mds':
836 if self.upgrade_state.fs_original_max_mds:
837 for fs in self.mgr.get("fs_map")['filesystems']:
838 fscid = fs["id"]
839 fs_name = fs['mdsmap']['fs_name']
840 new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
841 if new_max > 1:
842 self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
843 fs_name, new_max
844 ))
845 ret, _, err = self.mgr.check_mon_command({
846 'prefix': 'fs set',
847 'fs_name': fs_name,
848 'var': 'max_mds',
849 'val': str(new_max),
850 })
851
852 self.upgrade_state.fs_original_max_mds = {}
853 self._save_upgrade_state()
854 if self.upgrade_state.fs_original_allow_standby_replay:
855 for fs in self.mgr.get("fs_map")['filesystems']:
856 fscid = fs["id"]
857 fs_name = fs['mdsmap']['fs_name']
858 asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
859 if asr:
860 self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
861 fs_name
862 ))
863 ret, _, err = self.mgr.check_mon_command({
864 'prefix': 'fs set',
865 'fs_name': fs_name,
866 'var': 'allow_standby_replay',
867 'val': '1'
868 })
869
870 self.upgrade_state.fs_original_allow_standby_replay = {}
871 self._save_upgrade_state()
872
873 # clean up
874 logger.info('Upgrade: Finalizing container_image settings')
875 self.mgr.set_container_image('global', target_image)
876
877 for daemon_type in CEPH_UPGRADE_ORDER:
878 ret, image, err = self.mgr.check_mon_command({
879 'prefix': 'config rm',
880 'name': 'container_image',
881 'who': name_to_config_section(daemon_type),
882 })
883
884 self.mgr.check_mon_command({
885 'prefix': 'config rm',
886 'name': 'mon_mds_skip_sanity',
887 'who': 'mon',
888 })
889
890 logger.info('Upgrade: Complete!')
891 if self.upgrade_state.progress_id:
892 self.mgr.remote('progress', 'complete',
893 self.upgrade_state.progress_id)
894 self.upgrade_state = None
895 self._save_upgrade_state()
896 return