]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/upgrade.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
CommitLineData
e306af50
TL
1import json
2import logging
3import time
4import uuid
a4b75251 5from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any
e306af50
TL
6
7import orchestrator
a4b75251 8from cephadm.registry import Registry
f67539c2
TL
9from cephadm.serve import CephadmServe
10from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
11from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES
b3b6e05e 12from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
e306af50
TL
13
14if TYPE_CHECKING:
15 from .module import CephadmOrchestrator
16
17
e306af50
TL
18logger = logging.getLogger(__name__)
19
a4b75251
TL
20# from ceph_fs.h
21CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
22
f6b5b4d7 23
f67539c2 24def normalize_image_digest(digest: str, default_registry: str) -> str:
20effc67
TL
25 """
26 Normal case:
27 >>> normalize_image_digest('ceph/ceph', 'docker.io')
28 'docker.io/ceph/ceph'
29
30 No change:
31 >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io')
32 'quay.ceph.io/ceph/ceph'
33
34 >>> normalize_image_digest('docker.io/ubuntu', 'docker.io')
35 'docker.io/ubuntu'
36
37 >>> normalize_image_digest('localhost/ceph', 'docker.io')
38 'localhost/ceph'
39 """
40 known_shortnames = [
41 'ceph/ceph',
42 'ceph/daemon',
43 'ceph/daemon-base',
44 ]
45 for image in known_shortnames:
46 if digest.startswith(image):
47 return f'{default_registry}/{digest}'
f67539c2
TL
48 return digest
49
50
f6b5b4d7
TL
51class UpgradeState:
52 def __init__(self,
53 target_name: str,
54 progress_id: str,
55 target_id: Optional[str] = None,
f67539c2 56 target_digests: Optional[List[str]] = None,
f6b5b4d7
TL
57 target_version: Optional[str] = None,
58 error: Optional[str] = None,
59 paused: Optional[bool] = None,
f67539c2 60 fs_original_max_mds: Optional[Dict[str, int]] = None,
a4b75251 61 fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None
f6b5b4d7 62 ):
f91f0fd5 63 self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
f6b5b4d7
TL
64 self.progress_id: str = progress_id
65 self.target_id: Optional[str] = target_id
f67539c2 66 self.target_digests: Optional[List[str]] = target_digests
f6b5b4d7
TL
67 self.target_version: Optional[str] = target_version
68 self.error: Optional[str] = error
69 self.paused: bool = paused or False
f67539c2 70 self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
20effc67
TL
71 self.fs_original_allow_standby_replay: Optional[Dict[str,
72 bool]] = fs_original_allow_standby_replay
f6b5b4d7
TL
73
74 def to_json(self) -> dict:
75 return {
f91f0fd5 76 'target_name': self._target_name,
f6b5b4d7
TL
77 'progress_id': self.progress_id,
78 'target_id': self.target_id,
f67539c2 79 'target_digests': self.target_digests,
f6b5b4d7 80 'target_version': self.target_version,
f67539c2 81 'fs_original_max_mds': self.fs_original_max_mds,
a4b75251 82 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
f6b5b4d7
TL
83 'error': self.error,
84 'paused': self.paused,
85 }
86
87 @classmethod
adb31ebb 88 def from_json(cls, data: dict) -> Optional['UpgradeState']:
f91f0fd5 89 if data:
f67539c2
TL
90 c = {k: v for k, v in data.items()}
91 if 'repo_digest' in c:
92 c['target_digests'] = [c.pop('repo_digest')]
93 return cls(**c)
f91f0fd5
TL
94 else:
95 return None
f6b5b4d7
TL
96
97
e306af50 98class CephadmUpgrade:
adb31ebb
TL
99 UPGRADE_ERRORS = [
100 'UPGRADE_NO_STANDBY_MGR',
101 'UPGRADE_FAILED_PULL',
102 'UPGRADE_REDEPLOY_DAEMON',
f67539c2
TL
103 'UPGRADE_BAD_TARGET_VERSION',
104 'UPGRADE_EXCEPTION'
adb31ebb
TL
105 ]
106
e306af50
TL
107 def __init__(self, mgr: "CephadmOrchestrator"):
108 self.mgr = mgr
109
110 t = self.mgr.get_store('upgrade_state')
111 if t:
f6b5b4d7 112 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
e306af50
TL
113 else:
114 self.upgrade_state = None
115
f91f0fd5
TL
116 @property
117 def target_image(self) -> str:
118 assert self.upgrade_state
119 if not self.mgr.use_repo_digest:
120 return self.upgrade_state._target_name
f67539c2 121 if not self.upgrade_state.target_digests:
f91f0fd5
TL
122 return self.upgrade_state._target_name
123
f67539c2
TL
124 # FIXME: we assume the first digest is the best one to use
125 return self.upgrade_state.target_digests[0]
f91f0fd5 126
e306af50
TL
127 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
128 r = orchestrator.UpgradeStatusSpec()
129 if self.upgrade_state:
f91f0fd5 130 r.target_image = self.target_image
e306af50 131 r.in_progress = True
f67539c2
TL
132 r.progress, r.services_complete = self._get_upgrade_info()
133 # accessing self.upgrade_info_str will throw an exception if it
134 # has not been set in _do_upgrade yet
135 try:
136 r.message = self.upgrade_info_str
137 except AttributeError:
138 pass
f6b5b4d7
TL
139 if self.upgrade_state.error:
140 r.message = 'Error: ' + self.upgrade_state.error
141 elif self.upgrade_state.paused:
e306af50
TL
142 r.message = 'Upgrade paused'
143 return r
144
f67539c2
TL
145 def _get_upgrade_info(self) -> Tuple[str, List[str]]:
146 if not self.upgrade_state or not self.upgrade_state.target_digests:
147 return '', []
148
149 daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
150
151 if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
152 return '', []
153
154 completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in (
155 d.container_image_digests or []))) for d in daemons if d.daemon_type]
156
157 done = len([True for completion in completed_daemons if completion[1]])
158
159 completed_types = list(set([completion[0] for completion in completed_daemons if all(
160 c[1] for c in completed_daemons if c[0] == completion[0])]))
161
522d829b 162 return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
f67539c2
TL
163
164 def _check_target_version(self, version: str) -> Optional[str]:
165 try:
166 (major, minor, _) = version.split('.', 2)
167 assert int(minor) >= 0
168 # patch might be a number or {number}-g{sha1}
169 except ValueError:
170 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
171 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
172 return 'cephadm only supports octopus (15.2.0) or later'
173
174 # to far a jump?
175 current_version = self.mgr.version.split('ceph version ')[1]
176 (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
177 if int(current_major) < int(major) - 2:
178 return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
179 if int(current_major) > int(major):
180 return f'ceph cannot downgrade major versions (from {current_version} to {version})'
181 if int(current_major) == int(major):
182 if int(current_minor) > int(minor):
183 return f'ceph cannot downgrade to a {"rc" if minor == "1" else "dev"} release'
184
185 # check mon min
186 monmap = self.mgr.get("mon_map")
187 mon_min = monmap.get("min_mon_release", 0)
188 if mon_min < int(major) - 2:
189 return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
190
191 # check osd min
192 osdmap = self.mgr.get("osd_map")
193 osd_min_name = osdmap.get("require_osd_release", "argonaut")
194 osd_min = ceph_release_to_major(osd_min_name)
195 if osd_min < int(major) - 2:
196 return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
197
198 return None
199
a4b75251
TL
200 def upgrade_ls(self, image: Optional[str], tags: bool) -> Dict:
201 if not image:
202 image = self.mgr.container_image_base
203 reg_name, bare_image = image.split('/', 1)
204 reg = Registry(reg_name)
205 versions = []
206 r: Dict[Any, Any] = {
207 "image": image,
208 "registry": reg_name,
209 "bare_image": bare_image,
210 }
211 ls = reg.get_tags(bare_image)
212 if not tags:
213 for t in ls:
214 if t[0] != 'v':
215 continue
216 v = t[1:].split('.')
217 if len(v) != 3:
218 continue
219 if '-' in v[2]:
220 continue
221 versions.append('.'.join(v))
222 r["versions"] = sorted(
223 versions,
224 key=lambda k: list(map(int, k.split('.'))),
225 reverse=True
226 )
227 else:
228 r["tags"] = sorted(ls)
229 return r
230
adb31ebb 231 def upgrade_start(self, image: str, version: str) -> str:
e306af50
TL
232 if self.mgr.mode != 'root':
233 raise OrchestratorError('upgrade is not supported in %s mode' % (
234 self.mgr.mode))
235 if version:
f67539c2
TL
236 version_error = self._check_target_version(version)
237 if version_error:
238 raise OrchestratorError(version_error)
e306af50
TL
239 target_name = self.mgr.container_image_base + ':v' + version
240 elif image:
f67539c2 241 target_name = normalize_image_digest(image, self.mgr.default_registry)
e306af50
TL
242 else:
243 raise OrchestratorError('must specify either image or version')
244 if self.upgrade_state:
f91f0fd5 245 if self.upgrade_state._target_name != target_name:
e306af50
TL
246 raise OrchestratorError(
247 'Upgrade to %s (not %s) already in progress' %
f91f0fd5 248 (self.upgrade_state._target_name, target_name))
f6b5b4d7
TL
249 if self.upgrade_state.paused:
250 self.upgrade_state.paused = False
e306af50 251 self._save_upgrade_state()
f91f0fd5
TL
252 return 'Resumed upgrade to %s' % self.target_image
253 return 'Upgrade to %s in progress' % self.target_image
b3b6e05e
TL
254
255 running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type(
256 'mgr') if daemon.status == DaemonDescriptionStatus.running])
257
258 if running_mgr_count < 2:
259 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
260
f67539c2 261 self.mgr.log.info('Upgrade: Started with target %s' % target_name)
f6b5b4d7
TL
262 self.upgrade_state = UpgradeState(
263 target_name=target_name,
264 progress_id=str(uuid.uuid4())
265 )
e306af50
TL
266 self._update_upgrade_progress(0.0)
267 self._save_upgrade_state()
268 self._clear_upgrade_health_checks()
269 self.mgr.event.set()
270 return 'Initiating upgrade to %s' % (target_name)
271
272 def upgrade_pause(self) -> str:
273 if not self.upgrade_state:
274 raise OrchestratorError('No upgrade in progress')
f6b5b4d7 275 if self.upgrade_state.paused:
f91f0fd5 276 return 'Upgrade to %s already paused' % self.target_image
f6b5b4d7 277 self.upgrade_state.paused = True
f67539c2 278 self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image)
e306af50 279 self._save_upgrade_state()
f91f0fd5 280 return 'Paused upgrade to %s' % self.target_image
e306af50
TL
281
282 def upgrade_resume(self) -> str:
283 if not self.upgrade_state:
284 raise OrchestratorError('No upgrade in progress')
f6b5b4d7 285 if not self.upgrade_state.paused:
f91f0fd5 286 return 'Upgrade to %s not paused' % self.target_image
f6b5b4d7 287 self.upgrade_state.paused = False
f67539c2 288 self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image)
e306af50
TL
289 self._save_upgrade_state()
290 self.mgr.event.set()
f91f0fd5 291 return 'Resumed upgrade to %s' % self.target_image
e306af50
TL
292
293 def upgrade_stop(self) -> str:
294 if not self.upgrade_state:
295 return 'No upgrade in progress'
f6b5b4d7 296 if self.upgrade_state.progress_id:
e306af50 297 self.mgr.remote('progress', 'complete',
f6b5b4d7 298 self.upgrade_state.progress_id)
f91f0fd5 299 target_image = self.target_image
f67539c2 300 self.mgr.log.info('Upgrade: Stopped')
e306af50
TL
301 self.upgrade_state = None
302 self._save_upgrade_state()
303 self._clear_upgrade_health_checks()
304 self.mgr.event.set()
f91f0fd5 305 return 'Stopped upgrade to %s' % target_image
e306af50
TL
306
307 def continue_upgrade(self) -> bool:
308 """
309 Returns false, if nothing was done.
310 :return:
311 """
f6b5b4d7 312 if self.upgrade_state and not self.upgrade_state.paused:
f67539c2
TL
313 try:
314 self._do_upgrade()
315 except Exception as e:
316 self._fail_upgrade('UPGRADE_EXCEPTION', {
317 'severity': 'error',
318 'summary': 'Upgrade: failed due to an unexpected exception',
319 'count': 1,
320 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'],
321 })
322 return False
e306af50
TL
323 return True
324 return False
325
f67539c2
TL
326 def _wait_for_ok_to_stop(
327 self, s: DaemonDescription,
328 known: Optional[List[str]] = None, # NOTE: output argument!
329 ) -> bool:
e306af50 330 # only wait a little bit; the service might go away for something
f67539c2
TL
331 assert s.daemon_type is not None
332 assert s.daemon_id is not None
e306af50
TL
333 tries = 4
334 while tries > 0:
f6b5b4d7 335 if not self.upgrade_state or self.upgrade_state.paused:
e306af50 336 return False
f6b5b4d7 337
f67539c2
TL
338 # setting force flag to retain old functionality.
339 # note that known is an output argument for ok_to_stop()
340 r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([
341 s.daemon_id], known=known, force=True)
f6b5b4d7
TL
342
343 if not r.retval:
344 logger.info(f'Upgrade: {r.stdout}')
e306af50 345 return True
f67539c2 346 logger.info(f'Upgrade: {r.stderr}')
f6b5b4d7
TL
347
348 time.sleep(15)
349 tries -= 1
e306af50
TL
350 return False
351
352 def _clear_upgrade_health_checks(self) -> None:
adb31ebb 353 for k in self.UPGRADE_ERRORS:
e306af50
TL
354 if k in self.mgr.health_checks:
355 del self.mgr.health_checks[k]
356 self.mgr.set_health_checks(self.mgr.health_checks)
357
adb31ebb
TL
358 def _fail_upgrade(self, alert_id: str, alert: dict) -> None:
359 assert alert_id in self.UPGRADE_ERRORS
f6b5b4d7 360 if not self.upgrade_state:
f67539c2
TL
361 # this could happen if the user canceled the upgrade while we
362 # were doing something
363 return
f6b5b4d7 364
f67539c2
TL
365 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
366 alert['summary']))
f6b5b4d7
TL
367 self.upgrade_state.error = alert_id + ': ' + alert['summary']
368 self.upgrade_state.paused = True
e306af50
TL
369 self._save_upgrade_state()
370 self.mgr.health_checks[alert_id] = alert
371 self.mgr.set_health_checks(self.mgr.health_checks)
372
adb31ebb 373 def _update_upgrade_progress(self, progress: float) -> None:
f6b5b4d7
TL
374 if not self.upgrade_state:
375 assert False, 'No upgrade in progress'
376
377 if not self.upgrade_state.progress_id:
378 self.upgrade_state.progress_id = str(uuid.uuid4())
e306af50 379 self._save_upgrade_state()
f6b5b4d7 380 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
f67539c2
TL
381 ev_msg='Upgrade to %s' % (
382 self.upgrade_state.target_version or self.target_image
383 ),
384 ev_progress=progress,
385 add_to_ceph_s=True)
e306af50
TL
386
387 def _save_upgrade_state(self) -> None:
f6b5b4d7
TL
388 if not self.upgrade_state:
389 self.mgr.set_store('upgrade_state', None)
390 return
391 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
e306af50 392
f91f0fd5
TL
393 def get_distinct_container_image_settings(self) -> Dict[str, str]:
394 # get all distinct container_image settings
395 image_settings = {}
396 ret, out, err = self.mgr.check_mon_command({
397 'prefix': 'config dump',
398 'format': 'json',
399 })
400 config = json.loads(out)
401 for opt in config:
402 if opt['name'] == 'container_image':
403 image_settings[opt['section']] = opt['value']
404 return image_settings
405
f67539c2
TL
406 def _prepare_for_mds_upgrade(
407 self,
408 target_major: str,
409 need_upgrade: List[DaemonDescription]
410 ) -> bool:
f67539c2
TL
411 # scale down all filesystems to 1 MDS
412 assert self.upgrade_state
413 if not self.upgrade_state.fs_original_max_mds:
414 self.upgrade_state.fs_original_max_mds = {}
a4b75251
TL
415 if not self.upgrade_state.fs_original_allow_standby_replay:
416 self.upgrade_state.fs_original_allow_standby_replay = {}
f67539c2
TL
417 fsmap = self.mgr.get("fs_map")
418 continue_upgrade = True
a4b75251
TL
419 for fs in fsmap.get('filesystems', []):
420 fscid = fs["id"]
421 mdsmap = fs["mdsmap"]
422 fs_name = mdsmap["fs_name"]
423
424 # disable allow_standby_replay?
425 if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY:
426 self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % (
427 fs_name
428 ))
429 if fscid not in self.upgrade_state.fs_original_allow_standby_replay:
430 self.upgrade_state.fs_original_allow_standby_replay[fscid] = True
431 self._save_upgrade_state()
432 ret, out, err = self.mgr.check_mon_command({
433 'prefix': 'fs set',
434 'fs_name': fs_name,
435 'var': 'allow_standby_replay',
436 'val': '0',
437 })
438 continue_upgrade = False
439 continue
f67539c2
TL
440
441 # scale down this filesystem?
a4b75251 442 if mdsmap["max_mds"] > 1:
f67539c2
TL
443 self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
444 fs_name
445 ))
a4b75251
TL
446 if fscid not in self.upgrade_state.fs_original_max_mds:
447 self.upgrade_state.fs_original_max_mds[fscid] = mdsmap['max_mds']
f67539c2
TL
448 self._save_upgrade_state()
449 ret, out, err = self.mgr.check_mon_command({
450 'prefix': 'fs set',
451 'fs_name': fs_name,
452 'var': 'max_mds',
453 'val': '1',
454 })
455 continue_upgrade = False
456 continue
457
a4b75251 458 if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
20effc67
TL
459 self.mgr.log.info(
460 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
f67539c2
TL
461 time.sleep(10)
462 continue_upgrade = False
463 continue
464
a4b75251 465 if len(mdsmap['up']) == 0:
20effc67
TL
466 self.mgr.log.warning(
467 "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
a4b75251
TL
468 # This can happen because the current version MDS have
469 # incompatible compatsets; the mons will not do any promotions.
470 # We must upgrade to continue.
471 elif len(mdsmap['up']) > 0:
472 mdss = list(mdsmap['info'].values())
473 assert len(mdss) == 1
474 lone_mds = mdss[0]
475 if lone_mds['state'] != 'up:active':
476 self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
477 lone_mds['name'],
478 lone_mds['state'],
479 ))
480 time.sleep(10)
481 continue_upgrade = False
482 continue
483 else:
484 assert False
f67539c2
TL
485
486 return continue_upgrade
487
b3b6e05e
TL
488 def _enough_mons_for_ok_to_stop(self) -> bool:
489 # type () -> bool
490 ret, out, err = self.mgr.check_mon_command({
491 'prefix': 'quorum_status',
492 })
493 try:
494 j = json.loads(out)
495 except Exception:
496 raise OrchestratorError('failed to parse quorum status')
497
498 mons = [m['name'] for m in j['monmap']['mons']]
499 return len(mons) > 2
500
501 def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool:
502 # type (DaemonDescription) -> bool
503
504 # find fs this mds daemon belongs to
505 fsmap = self.mgr.get("fs_map")
a4b75251
TL
506 for fs in fsmap.get('filesystems', []):
507 mdsmap = fs["mdsmap"]
508 fs_name = mdsmap["fs_name"]
b3b6e05e
TL
509
510 assert mds_daemon.daemon_id
511 if fs_name != mds_daemon.service_name().split('.', 1)[1]:
512 # wrong fs for this mds daemon
513 continue
514
515 # get number of mds daemons for this fs
516 mds_count = len(
517 [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())])
518
519 # standby mds daemons for this fs?
a4b75251 520 if mdsmap["max_mds"] < mds_count:
b3b6e05e
TL
521 return True
522 return False
523
524 return True # if mds has no fs it should pass ok-to-stop
525
e306af50
TL
526 def _do_upgrade(self):
527 # type: () -> None
528 if not self.upgrade_state:
529 logger.debug('_do_upgrade no state, exiting')
530 return
531
f91f0fd5 532 target_image = self.target_image
f6b5b4d7 533 target_id = self.upgrade_state.target_id
f67539c2
TL
534 target_digests = self.upgrade_state.target_digests
535 target_version = self.upgrade_state.target_version
536
537 first = False
538 if not target_id or not target_version or not target_digests:
e306af50 539 # need to learn the container hash
f91f0fd5 540 logger.info('Upgrade: First pull of %s' % target_image)
20effc67 541 self.upgrade_info_str: str = 'Doing first pull of %s image' % (target_image)
e306af50 542 try:
20effc67
TL
543 target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
544 target_image))
e306af50
TL
545 except OrchestratorError as e:
546 self._fail_upgrade('UPGRADE_FAILED_PULL', {
547 'severity': 'warning',
548 'summary': 'Upgrade: failed to pull target image',
549 'count': 1,
550 'detail': [str(e)],
551 })
552 return
f67539c2
TL
553 if not target_version:
554 self._fail_upgrade('UPGRADE_FAILED_PULL', {
555 'severity': 'warning',
556 'summary': 'Upgrade: failed to pull target image',
557 'count': 1,
558 'detail': ['unable to extract ceph version from container'],
559 })
560 return
f6b5b4d7 561 self.upgrade_state.target_id = target_id
f67539c2
TL
562 # extract the version portion of 'ceph version {version} ({sha1})'
563 self.upgrade_state.target_version = target_version.split(' ')[2]
564 self.upgrade_state.target_digests = target_digests
e306af50 565 self._save_upgrade_state()
f91f0fd5 566 target_image = self.target_image
f67539c2
TL
567 first = True
568
569 if target_digests is None:
570 target_digests = []
571 if target_version.startswith('ceph version '):
572 # tolerate/fix upgrade state from older version
573 self.upgrade_state.target_version = target_version.split(' ')[2]
574 target_version = self.upgrade_state.target_version
575 (target_major, _) = target_version.split('.', 1)
576 target_major_name = self.mgr.lookup_release_name(int(target_major))
577
578 if first:
579 logger.info('Upgrade: Target is version %s (%s)' % (
580 target_version, target_major_name))
581 logger.info('Upgrade: Target container is %s, digests %s' % (
582 target_image, target_digests))
583
584 version_error = self._check_target_version(target_version)
585 if version_error:
586 self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', {
587 'severity': 'error',
588 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}',
589 'count': 1,
590 'detail': [version_error],
591 })
592 return
e306af50 593
f91f0fd5 594 image_settings = self.get_distinct_container_image_settings()
e306af50 595
a4b75251
TL
596 # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
597 # MDSMap::compat for all fs. This is no longer the case beginning in
598 # v16.2.5. We must disable the sanity checks during upgrade.
599 # N.B.: we don't bother confirming the operator has not already
600 # disabled this or saving the config value.
601 self.mgr.check_mon_command({
602 'prefix': 'config set',
603 'name': 'mon_mds_skip_sanity',
604 'value': '1',
605 'who': 'mon',
606 })
607
f67539c2 608 daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER]
e306af50
TL
609 done = 0
610 for daemon_type in CEPH_UPGRADE_ORDER:
f67539c2
TL
611 logger.debug('Upgrade: Checking %s daemons' % daemon_type)
612
e306af50 613 need_upgrade_self = False
f67539c2
TL
614 need_upgrade: List[Tuple[DaemonDescription, bool]] = []
615 need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
e306af50
TL
616 for d in daemons:
617 if d.daemon_type != daemon_type:
618 continue
f67539c2
TL
619 assert d.daemon_type is not None
620 assert d.daemon_id is not None
20effc67
TL
621 assert d.hostname is not None
622 if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname):
623 continue
f67539c2
TL
624 correct_digest = False
625 if (any(d in target_digests for d in (d.container_image_digests or []))
626 or d.daemon_type in MONITORING_STACK_TYPES):
627 logger.debug('daemon %s.%s container digest correct' % (
e306af50 628 daemon_type, d.daemon_id))
f67539c2
TL
629 correct_digest = True
630 if any(d in target_digests for d in (d.deployed_by or [])):
631 logger.debug('daemon %s.%s deployed by correct version' % (
632 d.daemon_type, d.daemon_id))
633 done += 1
634 continue
e306af50 635
f91f0fd5 636 if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
e306af50 637 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
f6b5b4d7 638 self.mgr.get_mgr_id())
e306af50
TL
639 need_upgrade_self = True
640 continue
641
f67539c2
TL
642 if correct_digest:
643 logger.debug('daemon %s.%s not deployed by correct version' % (
644 d.daemon_type, d.daemon_id))
645 need_upgrade_deployer.append((d, True))
646 else:
647 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
648 daemon_type, d.daemon_id,
649 d.container_image_name, d.container_image_digests, d.version))
650 need_upgrade.append((d, False))
651
652 if not need_upgrade_self:
653 # only after the mgr itself is upgraded can we expect daemons to have
654 # deployed_by == target_digests
655 need_upgrade += need_upgrade_deployer
656
657 # prepare filesystems for daemon upgrades?
658 if (
659 daemon_type == 'mds'
660 and need_upgrade
661 and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade])
662 ):
663 return
664
665 if need_upgrade:
666 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
667
668 to_upgrade: List[Tuple[DaemonDescription, bool]] = []
669 known_ok_to_stop: List[str] = []
670 for d_entry in need_upgrade:
671 d = d_entry[0]
672 assert d.daemon_type is not None
673 assert d.daemon_id is not None
674 assert d.hostname is not None
675
676 if not d.container_image_id:
677 if d.container_image_name == target_image:
678 logger.debug(
679 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
680 continue
681
682 if known_ok_to_stop:
683 if d.name() in known_ok_to_stop:
684 logger.info(f'Upgrade: {d.name()} is also safe to restart')
685 to_upgrade.append(d_entry)
686 continue
687
b3b6e05e 688 if d.daemon_type == 'osd':
f67539c2
TL
689 # NOTE: known_ok_to_stop is an output argument for
690 # _wait_for_ok_to_stop
691 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
692 return
693
b3b6e05e
TL
694 if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
695 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
696 return
697
698 if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
699 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
700 return
701
f67539c2
TL
702 to_upgrade.append(d_entry)
703
704 # if we don't have a list of others to consider, stop now
20effc67 705 if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
f67539c2
TL
706 break
707
708 num = 1
709 for d_entry in to_upgrade:
710 d = d_entry[0]
711 assert d.daemon_type is not None
712 assert d.daemon_id is not None
713 assert d.hostname is not None
714
715 self._update_upgrade_progress(done / len(daemons))
716
e306af50 717 # make sure host has latest container image
20effc67 718 out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
f6b5b4d7 719 d.hostname, '', 'inspect-image', [],
20effc67 720 image=target_image, no_fsid=True, error_ok=True))
f67539c2 721 if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
f91f0fd5 722 logger.info('Upgrade: Pulling %s on %s' % (target_image,
f6b5b4d7 723 d.hostname))
f67539c2
TL
724 self.upgrade_info_str = 'Pulling %s image on host %s' % (
725 target_image, d.hostname)
20effc67 726 out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
f6b5b4d7 727 d.hostname, '', 'pull', [],
20effc67 728 image=target_image, no_fsid=True, error_ok=True))
e306af50
TL
729 if code:
730 self._fail_upgrade('UPGRADE_FAILED_PULL', {
731 'severity': 'warning',
732 'summary': 'Upgrade: failed to pull target image',
733 'count': 1,
734 'detail': [
f91f0fd5 735 'failed to pull %s on host %s' % (target_image,
e306af50
TL
736 d.hostname)],
737 })
738 return
739 r = json.loads(''.join(out))
f67539c2
TL
740 if not any(d in target_digests for d in r.get('repo_digests', [])):
741 logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
742 target_image, d.hostname, r['repo_digests'], target_digests))
743 self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
744 target_image, d.hostname, r['repo_digests'], target_digests)
745 self.upgrade_state.target_digests = r['repo_digests']
e306af50
TL
746 self._save_upgrade_state()
747 return
748
f67539c2 749 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
e306af50 750
f67539c2
TL
751 if len(to_upgrade) > 1:
752 logger.info('Upgrade: Updating %s.%s (%d/%d)' %
753 (d.daemon_type, d.daemon_id, num, len(to_upgrade)))
754 else:
755 logger.info('Upgrade: Updating %s.%s' %
756 (d.daemon_type, d.daemon_id))
757 action = 'Upgrading' if not d_entry[1] else 'Redeploying'
adb31ebb 758 try:
f67539c2 759 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
adb31ebb 760 self.mgr._daemon_action(
f67539c2 761 daemon_spec,
adb31ebb 762 'redeploy',
f67539c2 763 image=target_image if not d_entry[1] else None
adb31ebb 764 )
20effc67 765 self.mgr.cache.metadata_up_to_date[d.hostname] = False
adb31ebb
TL
766 except Exception as e:
767 self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
768 'severity': 'warning',
f67539c2 769 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
adb31ebb
TL
770 'count': 1,
771 'detail': [
772 f'Upgrade daemon: {d.name()}: {e}'
773 ],
774 })
f67539c2
TL
775 return
776 num += 1
777 if to_upgrade:
e306af50
TL
778 return
779
f67539c2
TL
780 # complete mon upgrade?
781 if daemon_type == 'mon':
782 if not self.mgr.get("have_local_config_map"):
783 logger.info('Upgrade: Restarting mgr now that mons are running pacific')
784 need_upgrade_self = True
785
e306af50 786 if need_upgrade_self:
f91f0fd5
TL
787 try:
788 self.mgr.mgr_service.fail_over()
789 except OrchestratorError as e:
e306af50
TL
790 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
791 'severity': 'warning',
f91f0fd5 792 'summary': f'Upgrade: {e}',
e306af50
TL
793 'count': 1,
794 'detail': [
795 'The upgrade process needs to upgrade the mgr, '
796 'but it needs at least one standby to proceed.',
797 ],
798 })
799 return
800
f91f0fd5 801 return # unreachable code, as fail_over never returns
e306af50
TL
802 elif daemon_type == 'mgr':
803 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
804 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
805 self.mgr.set_health_checks(self.mgr.health_checks)
806
807 # make sure 'ceph versions' agrees
f6b5b4d7 808 ret, out_ver, err = self.mgr.check_mon_command({
e306af50
TL
809 'prefix': 'versions',
810 })
f6b5b4d7 811 j = json.loads(out_ver)
e306af50 812 for version, count in j.get(daemon_type, {}).items():
f67539c2
TL
813 short_version = version.split(' ')[2]
814 if short_version != target_version:
e306af50
TL
815 logger.warning(
816 'Upgrade: %d %s daemon(s) are %s != target %s' %
f67539c2 817 (count, daemon_type, short_version, target_version))
e306af50
TL
818
819 # push down configs
f67539c2
TL
820 daemon_type_section = name_to_config_section(daemon_type)
821 if image_settings.get(daemon_type_section) != target_image:
822 logger.info('Upgrade: Setting container_image for all %s' %
f6b5b4d7 823 daemon_type)
f67539c2 824 self.mgr.set_container_image(daemon_type_section, target_image)
e306af50
TL
825 to_clean = []
826 for section in image_settings.keys():
827 if section.startswith(name_to_config_section(daemon_type) + '.'):
828 to_clean.append(section)
829 if to_clean:
f67539c2 830 logger.debug('Upgrade: Cleaning up container_image for %s' %
f6b5b4d7 831 to_clean)
e306af50
TL
832 for section in to_clean:
833 ret, image, err = self.mgr.check_mon_command({
834 'prefix': 'config rm',
835 'name': 'container_image',
836 'who': section,
837 })
838
f67539c2
TL
839 # complete osd upgrade?
840 if daemon_type == 'osd':
841 osdmap = self.mgr.get("osd_map")
842 osd_min_name = osdmap.get("require_osd_release", "argonaut")
843 osd_min = ceph_release_to_major(osd_min_name)
844 if osd_min < int(target_major):
845 logger.info(
846 f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
847 ret, _, err = self.mgr.check_mon_command({
848 'prefix': 'osd require-osd-release',
849 'release': target_major_name,
850 })
851
852 # complete mds upgrade?
a4b75251
TL
853 if daemon_type == 'mds':
854 if self.upgrade_state.fs_original_max_mds:
855 for fs in self.mgr.get("fs_map")['filesystems']:
856 fscid = fs["id"]
857 fs_name = fs['mdsmap']['fs_name']
858 new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
859 if new_max > 1:
860 self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
861 fs_name, new_max
862 ))
863 ret, _, err = self.mgr.check_mon_command({
864 'prefix': 'fs set',
865 'fs_name': fs_name,
866 'var': 'max_mds',
867 'val': str(new_max),
868 })
869
870 self.upgrade_state.fs_original_max_mds = {}
871 self._save_upgrade_state()
872 if self.upgrade_state.fs_original_allow_standby_replay:
873 for fs in self.mgr.get("fs_map")['filesystems']:
874 fscid = fs["id"]
875 fs_name = fs['mdsmap']['fs_name']
876 asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
877 if asr:
878 self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
879 fs_name
880 ))
881 ret, _, err = self.mgr.check_mon_command({
882 'prefix': 'fs set',
883 'fs_name': fs_name,
884 'var': 'allow_standby_replay',
885 'val': '1'
886 })
887
888 self.upgrade_state.fs_original_allow_standby_replay = {}
889 self._save_upgrade_state()
e306af50 890
20effc67
TL
891 # Make sure all metadata is up to date before saying we are done upgrading this daemon type
892 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
893 self.mgr.agent_helpers._request_ack_all_not_up_to_date()
894 return
895
896 logger.debug('Upgrade: All %s daemons are up to date.' % daemon_type)
897
e306af50
TL
898 # clean up
899 logger.info('Upgrade: Finalizing container_image settings')
f91f0fd5
TL
900 self.mgr.set_container_image('global', target_image)
901
e306af50
TL
902 for daemon_type in CEPH_UPGRADE_ORDER:
903 ret, image, err = self.mgr.check_mon_command({
904 'prefix': 'config rm',
905 'name': 'container_image',
906 'who': name_to_config_section(daemon_type),
907 })
908
a4b75251
TL
909 self.mgr.check_mon_command({
910 'prefix': 'config rm',
911 'name': 'mon_mds_skip_sanity',
912 'who': 'mon',
913 })
914
e306af50 915 logger.info('Upgrade: Complete!')
f6b5b4d7 916 if self.upgrade_state.progress_id:
e306af50 917 self.mgr.remote('progress', 'complete',
f6b5b4d7 918 self.upgrade_state.progress_id)
e306af50
TL
919 self.upgrade_state = None
920 self._save_upgrade_state()
921 return