]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
c2cc0aff9775a7429d21eeb4672800d9f8d11439
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
1 import json
2 import logging
3 import time
4 import uuid
5 from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any
6
7 import orchestrator
8 from cephadm.registry import Registry
9 from cephadm.serve import CephadmServe
10 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
11 from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \
12 MONITORING_STACK_TYPES, CEPH_TYPES, GATEWAY_TYPES
13 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
14
15 if TYPE_CHECKING:
16 from .module import CephadmOrchestrator
17
18
19 logger = logging.getLogger(__name__)
20
21 # from ceph_fs.h
22 CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
23
24
25 def normalize_image_digest(digest: str, default_registry: str) -> str:
26 """
27 Normal case:
28 >>> normalize_image_digest('ceph/ceph', 'docker.io')
29 'docker.io/ceph/ceph'
30
31 No change:
32 >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io')
33 'quay.ceph.io/ceph/ceph'
34
35 >>> normalize_image_digest('docker.io/ubuntu', 'docker.io')
36 'docker.io/ubuntu'
37
38 >>> normalize_image_digest('localhost/ceph', 'docker.io')
39 'localhost/ceph'
40 """
41 known_shortnames = [
42 'ceph/ceph',
43 'ceph/daemon',
44 'ceph/daemon-base',
45 ]
46 for image in known_shortnames:
47 if digest.startswith(image):
48 return f'{default_registry}/{digest}'
49 return digest
50
51
52 class UpgradeState:
53 def __init__(self,
54 target_name: str,
55 progress_id: str,
56 target_id: Optional[str] = None,
57 target_digests: Optional[List[str]] = None,
58 target_version: Optional[str] = None,
59 error: Optional[str] = None,
60 paused: Optional[bool] = None,
61 fs_original_max_mds: Optional[Dict[str, int]] = None,
62 fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None,
63 daemon_types: Optional[List[str]] = None,
64 hosts: Optional[List[str]] = None,
65 services: Optional[List[str]] = None,
66 total_count: Optional[int] = None,
67 remaining_count: Optional[int] = None,
68 ):
69 self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
70 self.progress_id: str = progress_id
71 self.target_id: Optional[str] = target_id
72 self.target_digests: Optional[List[str]] = target_digests
73 self.target_version: Optional[str] = target_version
74 self.error: Optional[str] = error
75 self.paused: bool = paused or False
76 self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
77 self.fs_original_allow_standby_replay: Optional[Dict[str,
78 bool]] = fs_original_allow_standby_replay
79 self.daemon_types = daemon_types
80 self.hosts = hosts
81 self.services = services
82 self.total_count = total_count
83 self.remaining_count = remaining_count
84
85 def to_json(self) -> dict:
86 return {
87 'target_name': self._target_name,
88 'progress_id': self.progress_id,
89 'target_id': self.target_id,
90 'target_digests': self.target_digests,
91 'target_version': self.target_version,
92 'fs_original_max_mds': self.fs_original_max_mds,
93 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
94 'error': self.error,
95 'paused': self.paused,
96 'daemon_types': self.daemon_types,
97 'hosts': self.hosts,
98 'services': self.services,
99 'total_count': self.total_count,
100 'remaining_count': self.remaining_count,
101 }
102
103 @classmethod
104 def from_json(cls, data: dict) -> Optional['UpgradeState']:
105 valid_params = UpgradeState.__init__.__code__.co_varnames
106 if data:
107 c = {k: v for k, v in data.items() if k in valid_params}
108 if 'repo_digest' in c:
109 c['target_digests'] = [c.pop('repo_digest')]
110 return cls(**c)
111 else:
112 return None
113
114
115 class CephadmUpgrade:
116 UPGRADE_ERRORS = [
117 'UPGRADE_NO_STANDBY_MGR',
118 'UPGRADE_FAILED_PULL',
119 'UPGRADE_REDEPLOY_DAEMON',
120 'UPGRADE_BAD_TARGET_VERSION',
121 'UPGRADE_EXCEPTION'
122 ]
123
124 def __init__(self, mgr: "CephadmOrchestrator"):
125 self.mgr = mgr
126
127 t = self.mgr.get_store('upgrade_state')
128 if t:
129 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
130 else:
131 self.upgrade_state = None
132
133 @property
134 def target_image(self) -> str:
135 assert self.upgrade_state
136 if not self.mgr.use_repo_digest:
137 return self.upgrade_state._target_name
138 if not self.upgrade_state.target_digests:
139 return self.upgrade_state._target_name
140
141 # FIXME: we assume the first digest is the best one to use
142 return self.upgrade_state.target_digests[0]
143
144 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
145 r = orchestrator.UpgradeStatusSpec()
146 if self.upgrade_state:
147 r.target_image = self.target_image
148 r.in_progress = True
149 r.progress, r.services_complete = self._get_upgrade_info()
150 r.is_paused = self.upgrade_state.paused
151
152 if self.upgrade_state.daemon_types is not None:
153 which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
154 if self.upgrade_state.hosts is not None:
155 which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
156 elif self.upgrade_state.services is not None:
157 which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
158 if self.upgrade_state.hosts is not None:
159 which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
160 elif self.upgrade_state.hosts is not None:
161 which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
162 else:
163 which_str = 'Upgrading all daemon types on all hosts'
164 if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
165 which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
166 r.which = which_str
167
168 # accessing self.upgrade_info_str will throw an exception if it
169 # has not been set in _do_upgrade yet
170 try:
171 r.message = self.upgrade_info_str
172 except AttributeError:
173 pass
174 if self.upgrade_state.error:
175 r.message = 'Error: ' + self.upgrade_state.error
176 elif self.upgrade_state.paused:
177 r.message = 'Upgrade paused'
178 return r
179
180 def _get_upgrade_info(self) -> Tuple[str, List[str]]:
181 if not self.upgrade_state or not self.upgrade_state.target_digests:
182 return '', []
183
184 daemons = self._get_filtered_daemons()
185
186 if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
187 return '', []
188
189 completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in (
190 d.container_image_digests or []))) for d in daemons if d.daemon_type]
191
192 done = len([True for completion in completed_daemons if completion[1]])
193
194 completed_types = list(set([completion[0] for completion in completed_daemons if all(
195 c[1] for c in completed_daemons if c[0] == completion[0])]))
196
197 return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
198
199 def _get_filtered_daemons(self) -> List[DaemonDescription]:
200 # Return the set of daemons set to be upgraded with out current
201 # filtering parameters (or all daemons in upgrade order if no filtering
202 # parameter are set).
203 assert self.upgrade_state is not None
204 if self.upgrade_state.daemon_types is not None:
205 daemons = [d for d in self.mgr.cache.get_daemons(
206 ) if d.daemon_type in self.upgrade_state.daemon_types]
207 elif self.upgrade_state.services is not None:
208 daemons = []
209 for service in self.upgrade_state.services:
210 daemons += self.mgr.cache.get_daemons_by_service(service)
211 else:
212 daemons = [d for d in self.mgr.cache.get_daemons(
213 ) if d.daemon_type in CEPH_UPGRADE_ORDER]
214 if self.upgrade_state.hosts is not None:
215 daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
216 return daemons
217
218 def _get_current_version(self) -> Tuple[int, int, str]:
219 current_version = self.mgr.version.split('ceph version ')[1]
220 (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
221 return (int(current_major), int(current_minor), current_version)
222
223 def _check_target_version(self, version: str) -> Optional[str]:
224 try:
225 v = version.split('.', 2)
226 (major, minor) = (int(v[0]), int(v[1]))
227 assert minor >= 0
228 # patch might be a number or {number}-g{sha1}
229 except ValueError:
230 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
231 if major < 15 or (major == 15 and minor < 2):
232 return 'cephadm only supports octopus (15.2.0) or later'
233
234 # to far a jump?
235 (current_major, current_minor, current_version) = self._get_current_version()
236 if current_major < major - 2:
237 return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
238 if current_major > major:
239 return f'ceph cannot downgrade major versions (from {current_version} to {version})'
240 if current_major == major:
241 if current_minor > minor:
242 return f'ceph cannot downgrade to a {"rc" if minor == 1 else "dev"} release'
243
244 # check mon min
245 monmap = self.mgr.get("mon_map")
246 mon_min = monmap.get("min_mon_release", 0)
247 if mon_min < major - 2:
248 return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
249
250 # check osd min
251 osdmap = self.mgr.get("osd_map")
252 osd_min_name = osdmap.get("require_osd_release", "argonaut")
253 osd_min = ceph_release_to_major(osd_min_name)
254 if osd_min < major - 2:
255 return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
256
257 return None
258
259 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict:
260 if not image:
261 image = self.mgr.container_image_base
262 reg_name, bare_image = image.split('/', 1)
263 reg = Registry(reg_name)
264 (current_major, current_minor, _) = self._get_current_version()
265 versions = []
266 r: Dict[Any, Any] = {
267 "image": image,
268 "registry": reg_name,
269 "bare_image": bare_image,
270 }
271
272 try:
273 ls = reg.get_tags(bare_image)
274 except ValueError as e:
275 raise OrchestratorError(f'{e}')
276 if not tags:
277 for t in ls:
278 if t[0] != 'v':
279 continue
280 v = t[1:].split('.')
281 if len(v) != 3:
282 continue
283 if '-' in v[2]:
284 continue
285 v_major = int(v[0])
286 v_minor = int(v[1])
287 candidate_version = (v_major > current_major
288 or (v_major == current_major and v_minor >= current_minor))
289 if show_all_versions or candidate_version:
290 versions.append('.'.join(v))
291 r["versions"] = sorted(
292 versions,
293 key=lambda k: list(map(int, k.split('.'))),
294 reverse=True
295 )
296 else:
297 r["tags"] = sorted(ls)
298 return r
299
300 def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
301 hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
302 if self.mgr.mode != 'root':
303 raise OrchestratorError('upgrade is not supported in %s mode' % (
304 self.mgr.mode))
305 if version:
306 version_error = self._check_target_version(version)
307 if version_error:
308 raise OrchestratorError(version_error)
309 target_name = self.mgr.container_image_base + ':v' + version
310 elif image:
311 target_name = normalize_image_digest(image, self.mgr.default_registry)
312 else:
313 raise OrchestratorError('must specify either image or version')
314
315 if daemon_types is not None or services is not None or hosts is not None:
316 self._validate_upgrade_filters(target_name, daemon_types, hosts, services)
317
318 if self.upgrade_state:
319 if self.upgrade_state._target_name != target_name:
320 raise OrchestratorError(
321 'Upgrade to %s (not %s) already in progress' %
322 (self.upgrade_state._target_name, target_name))
323 if self.upgrade_state.paused:
324 self.upgrade_state.paused = False
325 self._save_upgrade_state()
326 return 'Resumed upgrade to %s' % self.target_image
327 return 'Upgrade to %s in progress' % self.target_image
328
329 running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type(
330 'mgr') if daemon.status == DaemonDescriptionStatus.running])
331
332 if running_mgr_count < 2:
333 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
334
335 self.mgr.log.info('Upgrade: Started with target %s' % target_name)
336 self.upgrade_state = UpgradeState(
337 target_name=target_name,
338 progress_id=str(uuid.uuid4()),
339 daemon_types=daemon_types,
340 hosts=hosts,
341 services=services,
342 total_count=limit,
343 remaining_count=limit,
344 )
345 self._update_upgrade_progress(0.0)
346 self._save_upgrade_state()
347 self._clear_upgrade_health_checks()
348 self.mgr.event.set()
349 return 'Initiating upgrade to %s' % (target_name)
350
351 def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None:
352 def _latest_type(dtypes: List[str]) -> str:
353 # [::-1] gives the list in reverse
354 for daemon_type in CEPH_UPGRADE_ORDER[::-1]:
355 if daemon_type in dtypes:
356 return daemon_type
357 return ''
358
359 def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]:
360 # this function takes a list of daemon types and first finds the daemon
361 # type from that list that is latest in our upgrade order. Then, from
362 # that latest type, it filters the list of candidate daemons received
363 # for daemons with types earlier in the upgrade order than the latest
364 # type found earlier. That filtered list of daemons is returned. The
365 # purpose of this function is to help in finding daemons that must have
366 # already been upgraded for the given filtering parameters (--daemon-types,
367 # --services, --hosts) to be valid.
368 latest = _latest_type(dtypes)
369 if not latest:
370 return []
371 earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1]
372 earlier_types = [t for t in earlier_types if t not in dtypes]
373 return [d for d in candidates if d.daemon_type in earlier_types]
374
375 if self.upgrade_state:
376 raise OrchestratorError(
377 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
378 try:
379 target_id, target_version, target_digests = self.mgr.wait_async(
380 CephadmServe(self.mgr)._get_container_image_info(target_name))
381 except OrchestratorError as e:
382 raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
383 # what we need to do here is build a list of daemons that must already be upgraded
384 # in order for the user's selection of daemons to upgrade to be valid. for example,
385 # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
386 daemons = [d for d in self.mgr.cache.get_daemons(
387 ) if d.daemon_type not in MONITORING_STACK_TYPES]
388 err_msg_base = 'Cannot start upgrade. '
389 # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
390 dtypes = []
391 if daemon_types is not None:
392 dtypes = daemon_types
393 if hosts is not None:
394 dtypes = [_latest_type(dtypes)]
395 other_host_daemons = [
396 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
397 daemons = _get_earlier_daemons(dtypes, other_host_daemons)
398 else:
399 daemons = _get_earlier_daemons(dtypes, daemons)
400 err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n'
401 elif services is not None:
402 # for our purposes here we can effectively convert our list of services into the
403 # set of daemon types the services contain. This works because we don't allow --services
404 # and --daemon-types at the same time and we only allow services of the same type
405 sspecs = [
406 self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None]
407 stypes = list(set([s.service_type for s in sspecs]))
408 if len(stypes) != 1:
409 raise OrchestratorError('Doing upgrade by service only support services of one type at '
410 f'a time. Found service types: {stypes}')
411 for stype in stypes:
412 dtypes += orchestrator.service_to_daemon_types(stype)
413 dtypes = list(set(dtypes))
414 if hosts is not None:
415 other_host_daemons = [
416 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
417 daemons = _get_earlier_daemons(dtypes, other_host_daemons)
418 else:
419 daemons = _get_earlier_daemons(dtypes, daemons)
420 err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n'
421 elif hosts is not None:
422 # hosts must be handled a bit differently. For this, we really need to find all the daemon types
423 # that reside on hosts in the list of hosts we will upgrade. Then take the type from
424 # that list that is latest in the upgrade order and check if any daemons on hosts not in the
425 # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
426 dtypes = list(
427 set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts]))
428 other_hosts_daemons = [
429 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
430 daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons)
431 err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
432 need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests)
433 if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)):
434 # also report active mgr as needing to be upgraded. It is not included in the resulting list
435 # by default as it is treated special and handled via the need_upgrade_self bool
436 n1.insert(0, (self.mgr.mgr_service.get_active_daemon(
437 self.mgr.cache.get_daemons_by_type('mgr')), True))
438 if n1 or n2:
439 raise OrchestratorError(f'{err_msg_base}Please first upgrade '
440 f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
441 f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}')
442
443 def upgrade_pause(self) -> str:
444 if not self.upgrade_state:
445 raise OrchestratorError('No upgrade in progress')
446 if self.upgrade_state.paused:
447 return 'Upgrade to %s already paused' % self.target_image
448 self.upgrade_state.paused = True
449 self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image)
450 self._save_upgrade_state()
451 return 'Paused upgrade to %s' % self.target_image
452
453 def upgrade_resume(self) -> str:
454 if not self.upgrade_state:
455 raise OrchestratorError('No upgrade in progress')
456 if not self.upgrade_state.paused:
457 return 'Upgrade to %s not paused' % self.target_image
458 self.upgrade_state.paused = False
459 self.upgrade_state.error = ''
460 self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image)
461 self._save_upgrade_state()
462 self.mgr.event.set()
463 return 'Resumed upgrade to %s' % self.target_image
464
465 def upgrade_stop(self) -> str:
466 if not self.upgrade_state:
467 return 'No upgrade in progress'
468 if self.upgrade_state.progress_id:
469 self.mgr.remote('progress', 'complete',
470 self.upgrade_state.progress_id)
471 target_image = self.target_image
472 self.mgr.log.info('Upgrade: Stopped')
473 self.upgrade_state = None
474 self._save_upgrade_state()
475 self._clear_upgrade_health_checks()
476 self.mgr.event.set()
477 return 'Stopped upgrade to %s' % target_image
478
479 def continue_upgrade(self) -> bool:
480 """
481 Returns false, if nothing was done.
482 :return:
483 """
484 if self.upgrade_state and not self.upgrade_state.paused:
485 try:
486 self._do_upgrade()
487 except Exception as e:
488 self._fail_upgrade('UPGRADE_EXCEPTION', {
489 'severity': 'error',
490 'summary': 'Upgrade: failed due to an unexpected exception',
491 'count': 1,
492 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'],
493 })
494 return False
495 return True
496 return False
497
498 def _wait_for_ok_to_stop(
499 self, s: DaemonDescription,
500 known: Optional[List[str]] = None, # NOTE: output argument!
501 ) -> bool:
502 # only wait a little bit; the service might go away for something
503 assert s.daemon_type is not None
504 assert s.daemon_id is not None
505 tries = 4
506 while tries > 0:
507 if not self.upgrade_state or self.upgrade_state.paused:
508 return False
509
510 # setting force flag to retain old functionality.
511 # note that known is an output argument for ok_to_stop()
512 r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([
513 s.daemon_id], known=known, force=True)
514
515 if not r.retval:
516 logger.info(f'Upgrade: {r.stdout}')
517 return True
518 logger.info(f'Upgrade: {r.stderr}')
519
520 time.sleep(15)
521 tries -= 1
522 return False
523
524 def _clear_upgrade_health_checks(self) -> None:
525 for k in self.UPGRADE_ERRORS:
526 if k in self.mgr.health_checks:
527 del self.mgr.health_checks[k]
528 self.mgr.set_health_checks(self.mgr.health_checks)
529
530 def _fail_upgrade(self, alert_id: str, alert: dict) -> None:
531 assert alert_id in self.UPGRADE_ERRORS
532 if not self.upgrade_state:
533 # this could happen if the user canceled the upgrade while we
534 # were doing something
535 return
536
537 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
538 alert['summary']))
539 self.upgrade_state.error = alert_id + ': ' + alert['summary']
540 self.upgrade_state.paused = True
541 self._save_upgrade_state()
542 self.mgr.health_checks[alert_id] = alert
543 self.mgr.set_health_checks(self.mgr.health_checks)
544
545 def _update_upgrade_progress(self, progress: float) -> None:
546 if not self.upgrade_state:
547 assert False, 'No upgrade in progress'
548
549 if not self.upgrade_state.progress_id:
550 self.upgrade_state.progress_id = str(uuid.uuid4())
551 self._save_upgrade_state()
552 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
553 ev_msg='Upgrade to %s' % (
554 self.upgrade_state.target_version or self.target_image
555 ),
556 ev_progress=progress,
557 add_to_ceph_s=True)
558
559 def _save_upgrade_state(self) -> None:
560 if not self.upgrade_state:
561 self.mgr.set_store('upgrade_state', None)
562 return
563 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
564
565 def get_distinct_container_image_settings(self) -> Dict[str, str]:
566 # get all distinct container_image settings
567 image_settings = {}
568 ret, out, err = self.mgr.check_mon_command({
569 'prefix': 'config dump',
570 'format': 'json',
571 })
572 config = json.loads(out)
573 for opt in config:
574 if opt['name'] == 'container_image':
575 image_settings[opt['section']] = opt['value']
576 return image_settings
577
578 def _prepare_for_mds_upgrade(
579 self,
580 target_major: str,
581 need_upgrade: List[DaemonDescription]
582 ) -> bool:
583 # scale down all filesystems to 1 MDS
584 assert self.upgrade_state
585 if not self.upgrade_state.fs_original_max_mds:
586 self.upgrade_state.fs_original_max_mds = {}
587 if not self.upgrade_state.fs_original_allow_standby_replay:
588 self.upgrade_state.fs_original_allow_standby_replay = {}
589 fsmap = self.mgr.get("fs_map")
590 continue_upgrade = True
591 for fs in fsmap.get('filesystems', []):
592 fscid = fs["id"]
593 mdsmap = fs["mdsmap"]
594 fs_name = mdsmap["fs_name"]
595
596 # disable allow_standby_replay?
597 if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY:
598 self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % (
599 fs_name
600 ))
601 if fscid not in self.upgrade_state.fs_original_allow_standby_replay:
602 self.upgrade_state.fs_original_allow_standby_replay[fscid] = True
603 self._save_upgrade_state()
604 ret, out, err = self.mgr.check_mon_command({
605 'prefix': 'fs set',
606 'fs_name': fs_name,
607 'var': 'allow_standby_replay',
608 'val': '0',
609 })
610 continue_upgrade = False
611 continue
612
613 # scale down this filesystem?
614 if mdsmap["max_mds"] > 1:
615 self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
616 fs_name
617 ))
618 if fscid not in self.upgrade_state.fs_original_max_mds:
619 self.upgrade_state.fs_original_max_mds[fscid] = mdsmap['max_mds']
620 self._save_upgrade_state()
621 ret, out, err = self.mgr.check_mon_command({
622 'prefix': 'fs set',
623 'fs_name': fs_name,
624 'var': 'max_mds',
625 'val': '1',
626 })
627 continue_upgrade = False
628 continue
629
630 if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
631 self.mgr.log.info(
632 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
633 time.sleep(10)
634 continue_upgrade = False
635 continue
636
637 if len(mdsmap['up']) == 0:
638 self.mgr.log.warning(
639 "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
640 # This can happen because the current version MDS have
641 # incompatible compatsets; the mons will not do any promotions.
642 # We must upgrade to continue.
643 elif len(mdsmap['up']) > 0:
644 mdss = list(mdsmap['info'].values())
645 assert len(mdss) == 1
646 lone_mds = mdss[0]
647 if lone_mds['state'] != 'up:active':
648 self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
649 lone_mds['name'],
650 lone_mds['state'],
651 ))
652 time.sleep(10)
653 continue_upgrade = False
654 continue
655 else:
656 assert False
657
658 return continue_upgrade
659
660 def _enough_mons_for_ok_to_stop(self) -> bool:
661 # type () -> bool
662 ret, out, err = self.mgr.check_mon_command({
663 'prefix': 'quorum_status',
664 })
665 try:
666 j = json.loads(out)
667 except Exception:
668 raise OrchestratorError('failed to parse quorum status')
669
670 mons = [m['name'] for m in j['monmap']['mons']]
671 return len(mons) > 2
672
673 def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool:
674 # type (DaemonDescription) -> bool
675
676 # find fs this mds daemon belongs to
677 fsmap = self.mgr.get("fs_map")
678 for fs in fsmap.get('filesystems', []):
679 mdsmap = fs["mdsmap"]
680 fs_name = mdsmap["fs_name"]
681
682 assert mds_daemon.daemon_id
683 if fs_name != mds_daemon.service_name().split('.', 1)[1]:
684 # wrong fs for this mds daemon
685 continue
686
687 # get number of mds daemons for this fs
688 mds_count = len(
689 [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())])
690
691 # standby mds daemons for this fs?
692 if mdsmap["max_mds"] < mds_count:
693 return True
694 return False
695
696 return True # if mds has no fs it should pass ok-to-stop
697
698 def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]:
699 # this function takes a list of daemons and container digests. The purpose
700 # is to go through each daemon and check if the current container digests
701 # for that daemon match the target digests. The purpose being that we determine
702 # if a daemon is upgraded to a certain container image or not based on what
703 # container digests it has. By checking the current digests against the
704 # targets we can determine which daemons still need to be upgraded
705 need_upgrade_self = False
706 need_upgrade: List[Tuple[DaemonDescription, bool]] = []
707 need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
708 done = 0
709 if target_digests is None:
710 target_digests = []
711 for d in daemons:
712 assert d.daemon_type is not None
713 assert d.daemon_id is not None
714 assert d.hostname is not None
715 if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname):
716 continue
717 correct_digest = False
718 if (any(d in target_digests for d in (d.container_image_digests or []))
719 or d.daemon_type in MONITORING_STACK_TYPES):
720 logger.debug('daemon %s.%s container digest correct' % (
721 d.daemon_type, d.daemon_id))
722 correct_digest = True
723 if any(d in target_digests for d in (d.deployed_by or [])):
724 logger.debug('daemon %s.%s deployed by correct version' % (
725 d.daemon_type, d.daemon_id))
726 done += 1
727 continue
728
729 if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
730 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
731 self.mgr.get_mgr_id())
732 need_upgrade_self = True
733 continue
734
735 if correct_digest:
736 logger.debug('daemon %s.%s not deployed by correct version' % (
737 d.daemon_type, d.daemon_id))
738 need_upgrade_deployer.append((d, True))
739 else:
740 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
741 d.daemon_type, d.daemon_id,
742 d.container_image_name, d.container_image_digests, d.version))
743 need_upgrade.append((d, False))
744
745 return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done)
746
747 def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]:
748 to_upgrade: List[Tuple[DaemonDescription, bool]] = []
749 known_ok_to_stop: List[str] = []
750 for d_entry in need_upgrade:
751 d = d_entry[0]
752 assert d.daemon_type is not None
753 assert d.daemon_id is not None
754 assert d.hostname is not None
755
756 if not d.container_image_id:
757 if d.container_image_name == target_image:
758 logger.debug(
759 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
760 continue
761
762 if known_ok_to_stop:
763 if d.name() in known_ok_to_stop:
764 logger.info(f'Upgrade: {d.name()} is also safe to restart')
765 to_upgrade.append(d_entry)
766 continue
767
768 if d.daemon_type == 'osd':
769 # NOTE: known_ok_to_stop is an output argument for
770 # _wait_for_ok_to_stop
771 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
772 return False, to_upgrade
773
774 if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
775 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
776 return False, to_upgrade
777
778 if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
779 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
780 return False, to_upgrade
781
782 to_upgrade.append(d_entry)
783
784 # if we don't have a list of others to consider, stop now
785 if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
786 break
787 return True, to_upgrade
788
789 def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None:
790 assert self.upgrade_state is not None
791 num = 1
792 if target_digests is None:
793 target_digests = []
794 for d_entry in to_upgrade:
795 if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
796 self.mgr.log.info(
797 f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
798 return
799 d = d_entry[0]
800 assert d.daemon_type is not None
801 assert d.daemon_id is not None
802 assert d.hostname is not None
803
804 # make sure host has latest container image
805 out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
806 d.hostname, '', 'inspect-image', [],
807 image=target_image, no_fsid=True, error_ok=True))
808 if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
809 logger.info('Upgrade: Pulling %s on %s' % (target_image,
810 d.hostname))
811 self.upgrade_info_str = 'Pulling %s image on host %s' % (
812 target_image, d.hostname)
813 out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
814 d.hostname, '', 'pull', [],
815 image=target_image, no_fsid=True, error_ok=True))
816 if code:
817 self._fail_upgrade('UPGRADE_FAILED_PULL', {
818 'severity': 'warning',
819 'summary': 'Upgrade: failed to pull target image',
820 'count': 1,
821 'detail': [
822 'failed to pull %s on host %s' % (target_image,
823 d.hostname)],
824 })
825 return
826 r = json.loads(''.join(out))
827 if not any(d in target_digests for d in r.get('repo_digests', [])):
828 logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
829 target_image, d.hostname, r['repo_digests'], target_digests))
830 self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
831 target_image, d.hostname, r['repo_digests'], target_digests)
832 self.upgrade_state.target_digests = r['repo_digests']
833 self._save_upgrade_state()
834 return
835
836 self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
837
838 if len(to_upgrade) > 1:
839 logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade),
840 self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
841 else:
842 logger.info('Upgrade: Updating %s.%s' %
843 (d.daemon_type, d.daemon_id))
844 action = 'Upgrading' if not d_entry[1] else 'Redeploying'
845 try:
846 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
847 self.mgr._daemon_action(
848 daemon_spec,
849 'redeploy',
850 image=target_image if not d_entry[1] else None
851 )
852 self.mgr.cache.metadata_up_to_date[d.hostname] = False
853 except Exception as e:
854 self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
855 'severity': 'warning',
856 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
857 'count': 1,
858 'detail': [
859 f'Upgrade daemon: {d.name()}: {e}'
860 ],
861 })
862 return
863 num += 1
864 if self.upgrade_state.remaining_count is not None and not d_entry[1]:
865 self.upgrade_state.remaining_count -= 1
866 self._save_upgrade_state()
867
868 def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
869 if need_upgrade_self:
870 try:
871 self.mgr.mgr_service.fail_over()
872 except OrchestratorError as e:
873 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
874 'severity': 'warning',
875 'summary': f'Upgrade: {e}',
876 'count': 1,
877 'detail': [
878 'The upgrade process needs to upgrade the mgr, '
879 'but it needs at least one standby to proceed.',
880 ],
881 })
882 return
883
884 return # unreachable code, as fail_over never returns
885 elif upgrading_mgrs:
886 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
887 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
888 self.mgr.set_health_checks(self.mgr.health_checks)
889
890 def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None:
891 # push down configs
892 daemon_type_section = name_to_config_section(daemon_type)
893 if image_settings.get(daemon_type_section) != target_image:
894 logger.info('Upgrade: Setting container_image for all %s' %
895 daemon_type)
896 self.mgr.set_container_image(daemon_type_section, target_image)
897 to_clean = []
898 for section in image_settings.keys():
899 if section.startswith(name_to_config_section(daemon_type) + '.'):
900 to_clean.append(section)
901 if to_clean:
902 logger.debug('Upgrade: Cleaning up container_image for %s' %
903 to_clean)
904 for section in to_clean:
905 ret, image, err = self.mgr.check_mon_command({
906 'prefix': 'config rm',
907 'name': 'container_image',
908 'who': section,
909 })
910
911 def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None:
912 osdmap = self.mgr.get("osd_map")
913 osd_min_name = osdmap.get("require_osd_release", "argonaut")
914 osd_min = ceph_release_to_major(osd_min_name)
915 if osd_min < int(target_major):
916 logger.info(
917 f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
918 ret, _, err = self.mgr.check_mon_command({
919 'prefix': 'osd require-osd-release',
920 'release': target_major_name,
921 })
922
923 def _complete_mds_upgrade(self) -> None:
924 assert self.upgrade_state is not None
925 if self.upgrade_state.fs_original_max_mds:
926 for fs in self.mgr.get("fs_map")['filesystems']:
927 fscid = fs["id"]
928 fs_name = fs['mdsmap']['fs_name']
929 new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
930 if new_max > 1:
931 self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
932 fs_name, new_max
933 ))
934 ret, _, err = self.mgr.check_mon_command({
935 'prefix': 'fs set',
936 'fs_name': fs_name,
937 'var': 'max_mds',
938 'val': str(new_max),
939 })
940
941 self.upgrade_state.fs_original_max_mds = {}
942 self._save_upgrade_state()
943 if self.upgrade_state.fs_original_allow_standby_replay:
944 for fs in self.mgr.get("fs_map")['filesystems']:
945 fscid = fs["id"]
946 fs_name = fs['mdsmap']['fs_name']
947 asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
948 if asr:
949 self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
950 fs_name
951 ))
952 ret, _, err = self.mgr.check_mon_command({
953 'prefix': 'fs set',
954 'fs_name': fs_name,
955 'var': 'allow_standby_replay',
956 'val': '1'
957 })
958
959 self.upgrade_state.fs_original_allow_standby_replay = {}
960 self._save_upgrade_state()
961
962 def _mark_upgrade_complete(self) -> None:
963 if not self.upgrade_state:
964 logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
965 return
966 logger.info('Upgrade: Complete!')
967 if self.upgrade_state.progress_id:
968 self.mgr.remote('progress', 'complete',
969 self.upgrade_state.progress_id)
970 self.upgrade_state = None
971 self._save_upgrade_state()
972
973 def _do_upgrade(self):
974 # type: () -> None
975 if not self.upgrade_state:
976 logger.debug('_do_upgrade no state, exiting')
977 return
978
979 target_image = self.target_image
980 target_id = self.upgrade_state.target_id
981 target_digests = self.upgrade_state.target_digests
982 target_version = self.upgrade_state.target_version
983
984 first = False
985 if not target_id or not target_version or not target_digests:
986 # need to learn the container hash
987 logger.info('Upgrade: First pull of %s' % target_image)
988 self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
989 try:
990 target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
991 target_image))
992 except OrchestratorError as e:
993 self._fail_upgrade('UPGRADE_FAILED_PULL', {
994 'severity': 'warning',
995 'summary': 'Upgrade: failed to pull target image',
996 'count': 1,
997 'detail': [str(e)],
998 })
999 return
1000 if not target_version:
1001 self._fail_upgrade('UPGRADE_FAILED_PULL', {
1002 'severity': 'warning',
1003 'summary': 'Upgrade: failed to pull target image',
1004 'count': 1,
1005 'detail': ['unable to extract ceph version from container'],
1006 })
1007 return
1008 self.upgrade_state.target_id = target_id
1009 # extract the version portion of 'ceph version {version} ({sha1})'
1010 self.upgrade_state.target_version = target_version.split(' ')[2]
1011 self.upgrade_state.target_digests = target_digests
1012 self._save_upgrade_state()
1013 target_image = self.target_image
1014 first = True
1015
1016 if target_digests is None:
1017 target_digests = []
1018 if target_version.startswith('ceph version '):
1019 # tolerate/fix upgrade state from older version
1020 self.upgrade_state.target_version = target_version.split(' ')[2]
1021 target_version = self.upgrade_state.target_version
1022 (target_major, _) = target_version.split('.', 1)
1023 target_major_name = self.mgr.lookup_release_name(int(target_major))
1024
1025 if first:
1026 logger.info('Upgrade: Target is version %s (%s)' % (
1027 target_version, target_major_name))
1028 logger.info('Upgrade: Target container is %s, digests %s' % (
1029 target_image, target_digests))
1030
1031 version_error = self._check_target_version(target_version)
1032 if version_error:
1033 self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', {
1034 'severity': 'error',
1035 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}',
1036 'count': 1,
1037 'detail': [version_error],
1038 })
1039 return
1040
1041 image_settings = self.get_distinct_container_image_settings()
1042
1043 # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
1044 # MDSMap::compat for all fs. This is no longer the case beginning in
1045 # v16.2.5. We must disable the sanity checks during upgrade.
1046 # N.B.: we don't bother confirming the operator has not already
1047 # disabled this or saving the config value.
1048 self.mgr.check_mon_command({
1049 'prefix': 'config set',
1050 'name': 'mon_mds_skip_sanity',
1051 'value': '1',
1052 'who': 'mon',
1053 })
1054
1055 if self.upgrade_state.daemon_types is not None:
1056 logger.debug(
1057 f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
1058 daemons = [d for d in self.mgr.cache.get_daemons(
1059 ) if d.daemon_type in self.upgrade_state.daemon_types]
1060 elif self.upgrade_state.services is not None:
1061 logger.debug(
1062 f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
1063 daemons = []
1064 for service in self.upgrade_state.services:
1065 daemons += self.mgr.cache.get_daemons_by_service(service)
1066 else:
1067 daemons = [d for d in self.mgr.cache.get_daemons(
1068 ) if d.daemon_type in CEPH_UPGRADE_ORDER]
1069 if self.upgrade_state.hosts is not None:
1070 logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
1071 daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
1072 upgraded_daemon_count: int = 0
1073 for daemon_type in CEPH_UPGRADE_ORDER:
1074 if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0:
1075 # we hit our limit and should end the upgrade
1076 # except for cases where we only need to redeploy, but not actually upgrade
1077 # the image (which we don't count towards our limit). This case only occurs with mgr
1078 # and monitoring stack daemons. Additionally, this case is only valid if
1079 # the active mgr is already upgraded.
1080 if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
1081 if daemon_type not in MONITORING_STACK_TYPES and daemon_type != 'mgr':
1082 continue
1083 else:
1084 self._mark_upgrade_complete()
1085 return
1086 logger.debug('Upgrade: Checking %s daemons' % daemon_type)
1087 daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
1088
1089 need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(
1090 daemons_of_type, target_digests)
1091 upgraded_daemon_count += done
1092 self._update_upgrade_progress(upgraded_daemon_count / len(daemons))
1093
1094 # make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios
1095 if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES:
1096 if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
1097 need_upgrade_names = [d[0].name() for d in need_upgrade] + \
1098 [d[0].name() for d in need_upgrade_deployer]
1099 dds = [d for d in self.mgr.cache.get_daemons_by_type(
1100 daemon_type) if d.name() not in need_upgrade_names]
1101 need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests)
1102 if not n1:
1103 if not need_upgrade_self and need_upgrade_active:
1104 need_upgrade_self = True
1105 need_upgrade_deployer += n2
1106 else:
1107 # no point in trying to redeploy with new version if active mgr is not on the new version
1108 need_upgrade_deployer = []
1109
1110 if not need_upgrade_self:
1111 # only after the mgr itself is upgraded can we expect daemons to have
1112 # deployed_by == target_digests
1113 need_upgrade += need_upgrade_deployer
1114
1115 # prepare filesystems for daemon upgrades?
1116 if (
1117 daemon_type == 'mds'
1118 and need_upgrade
1119 and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade])
1120 ):
1121 return
1122
1123 if need_upgrade:
1124 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
1125
1126 _continue, to_upgrade = self._to_upgrade(need_upgrade, target_image)
1127 if not _continue:
1128 return
1129 self._upgrade_daemons(to_upgrade, target_image, target_digests)
1130 if to_upgrade:
1131 return
1132
1133 self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
1134
1135 # following bits of _do_upgrade are for completing upgrade for given
1136 # types. If we haven't actually finished upgrading all the daemons
1137 # of this type, we should exit the loop here
1138 _, n1, n2, _ = self._detect_need_upgrade(
1139 self.mgr.cache.get_daemons_by_type(daemon_type), target_digests)
1140 if n1 or n2:
1141 continue
1142
1143 # complete mon upgrade?
1144 if daemon_type == 'mon':
1145 if not self.mgr.get("have_local_config_map"):
1146 logger.info('Upgrade: Restarting mgr now that mons are running pacific')
1147 need_upgrade_self = True
1148
1149 self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
1150
1151 # make sure 'ceph versions' agrees
1152 ret, out_ver, err = self.mgr.check_mon_command({
1153 'prefix': 'versions',
1154 })
1155 j = json.loads(out_ver)
1156 for version, count in j.get(daemon_type, {}).items():
1157 short_version = version.split(' ')[2]
1158 if short_version != target_version:
1159 logger.warning(
1160 'Upgrade: %d %s daemon(s) are %s != target %s' %
1161 (count, daemon_type, short_version, target_version))
1162
1163 self._set_container_images(daemon_type, target_image, image_settings)
1164
1165 # complete osd upgrade?
1166 if daemon_type == 'osd':
1167 self._complete_osd_upgrade(target_major, target_major_name)
1168
1169 # complete mds upgrade?
1170 if daemon_type == 'mds':
1171 self._complete_mds_upgrade()
1172
1173 # Make sure all metadata is up to date before saying we are done upgrading this daemon type
1174 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
1175 self.mgr.agent_helpers._request_ack_all_not_up_to_date()
1176 return
1177
1178 logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type)
1179
1180 # clean up
1181 logger.info('Upgrade: Finalizing container_image settings')
1182 self.mgr.set_container_image('global', target_image)
1183
1184 for daemon_type in CEPH_UPGRADE_ORDER:
1185 ret, image, err = self.mgr.check_mon_command({
1186 'prefix': 'config rm',
1187 'name': 'container_image',
1188 'who': name_to_config_section(daemon_type),
1189 })
1190
1191 self.mgr.check_mon_command({
1192 'prefix': 'config rm',
1193 'name': 'mon_mds_skip_sanity',
1194 'who': 'mon',
1195 })
1196
1197 self._mark_upgrade_complete()
1198 return