5 from typing
import TYPE_CHECKING
, Optional
, Dict
, List
, Tuple
, Any
, cast
8 from cephadm
.registry
import Registry
9 from cephadm
.serve
import CephadmServe
10 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
11 from cephadm
.utils
import ceph_release_to_major
, name_to_config_section
, CEPH_UPGRADE_ORDER
, \
12 CEPH_TYPES
, NON_CEPH_IMAGE_TYPES
, GATEWAY_TYPES
13 from cephadm
.ssh
import HostConnectionError
14 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
, daemon_type_to_service
17 from .module
import CephadmOrchestrator
20 logger
= logging
.getLogger(__name__
)
23 CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
= (1 << 5)
24 CEPH_MDSMAP_NOT_JOINABLE
= (1 << 0)
27 def normalize_image_digest(digest
: str, default_registry
: str) -> str:
30 >>> normalize_image_digest('ceph/ceph', 'docker.io')
34 >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io')
35 'quay.ceph.io/ceph/ceph'
37 >>> normalize_image_digest('docker.io/ubuntu', 'docker.io')
40 >>> normalize_image_digest('localhost/ceph', 'docker.io')
48 for image
in known_shortnames
:
49 if digest
.startswith(image
):
50 return f
'{default_registry}/{digest}'
58 target_id
: Optional
[str] = None,
59 target_digests
: Optional
[List
[str]] = None,
60 target_version
: Optional
[str] = None,
61 error
: Optional
[str] = None,
62 paused
: Optional
[bool] = None,
63 fail_fs
: bool = False,
64 fs_original_max_mds
: Optional
[Dict
[str, int]] = None,
65 fs_original_allow_standby_replay
: Optional
[Dict
[str, bool]] = None,
66 daemon_types
: Optional
[List
[str]] = None,
67 hosts
: Optional
[List
[str]] = None,
68 services
: Optional
[List
[str]] = None,
69 total_count
: Optional
[int] = None,
70 remaining_count
: Optional
[int] = None,
72 self
._target
_name
: str = target_name
# Use CephadmUpgrade.target_image instead.
73 self
.progress_id
: str = progress_id
74 self
.target_id
: Optional
[str] = target_id
75 self
.target_digests
: Optional
[List
[str]] = target_digests
76 self
.target_version
: Optional
[str] = target_version
77 self
.error
: Optional
[str] = error
78 self
.paused
: bool = paused
or False
79 self
.fs_original_max_mds
: Optional
[Dict
[str, int]] = fs_original_max_mds
80 self
.fs_original_allow_standby_replay
: Optional
[Dict
[str,
81 bool]] = fs_original_allow_standby_replay
82 self
.fail_fs
= fail_fs
83 self
.daemon_types
= daemon_types
85 self
.services
= services
86 self
.total_count
= total_count
87 self
.remaining_count
= remaining_count
89 def to_json(self
) -> dict:
91 'target_name': self
._target
_name
,
92 'progress_id': self
.progress_id
,
93 'target_id': self
.target_id
,
94 'target_digests': self
.target_digests
,
95 'target_version': self
.target_version
,
96 'fail_fs': self
.fail_fs
,
97 'fs_original_max_mds': self
.fs_original_max_mds
,
98 'fs_original_allow_standby_replay': self
.fs_original_allow_standby_replay
,
100 'paused': self
.paused
,
101 'daemon_types': self
.daemon_types
,
103 'services': self
.services
,
104 'total_count': self
.total_count
,
105 'remaining_count': self
.remaining_count
,
109 def from_json(cls
, data
: dict) -> Optional
['UpgradeState']:
110 valid_params
= UpgradeState
.__init
__.__code
__.co_varnames
112 c
= {k
: v
for k
, v
in data
.items() if k
in valid_params
}
113 if 'repo_digest' in c
:
114 c
['target_digests'] = [c
.pop('repo_digest')]
120 class CephadmUpgrade
:
122 'UPGRADE_NO_STANDBY_MGR',
123 'UPGRADE_FAILED_PULL',
124 'UPGRADE_REDEPLOY_DAEMON',
125 'UPGRADE_BAD_TARGET_VERSION',
127 'UPGRADE_OFFLINE_HOST'
130 def __init__(self
, mgr
: "CephadmOrchestrator"):
133 t
= self
.mgr
.get_store('upgrade_state')
135 self
.upgrade_state
: Optional
[UpgradeState
] = UpgradeState
.from_json(json
.loads(t
))
137 self
.upgrade_state
= None
138 self
.upgrade_info_str
: str = ''
141 def target_image(self
) -> str:
142 assert self
.upgrade_state
143 if not self
.mgr
.use_repo_digest
:
144 return self
.upgrade_state
._target
_name
145 if not self
.upgrade_state
.target_digests
:
146 return self
.upgrade_state
._target
_name
148 # FIXME: we assume the first digest is the best one to use
149 return self
.upgrade_state
.target_digests
[0]
151 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
152 r
= orchestrator
.UpgradeStatusSpec()
153 if self
.upgrade_state
:
154 r
.target_image
= self
.target_image
156 r
.progress
, r
.services_complete
= self
._get
_upgrade
_info
()
157 r
.is_paused
= self
.upgrade_state
.paused
159 if self
.upgrade_state
.daemon_types
is not None:
160 which_str
= f
'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
161 if self
.upgrade_state
.hosts
is not None:
162 which_str
+= f
' on host(s) {",".join(self.upgrade_state.hosts)}'
163 elif self
.upgrade_state
.services
is not None:
164 which_str
= f
'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
165 if self
.upgrade_state
.hosts
is not None:
166 which_str
+= f
' on host(s) {",".join(self.upgrade_state.hosts)}'
167 elif self
.upgrade_state
.hosts
is not None:
168 which_str
= f
'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
170 which_str
= 'Upgrading all daemon types on all hosts'
171 if self
.upgrade_state
.total_count
is not None and self
.upgrade_state
.remaining_count
is not None:
172 which_str
+= f
'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
175 # accessing self.upgrade_info_str will throw an exception if it
176 # has not been set in _do_upgrade yet
178 r
.message
= self
.upgrade_info_str
179 except AttributeError:
181 if self
.upgrade_state
.error
:
182 r
.message
= 'Error: ' + self
.upgrade_state
.error
183 elif self
.upgrade_state
.paused
:
184 r
.message
= 'Upgrade paused'
187 def _get_upgrade_info(self
) -> Tuple
[str, List
[str]]:
188 if not self
.upgrade_state
or not self
.upgrade_state
.target_digests
:
191 daemons
= self
._get
_filtered
_daemons
()
193 if any(not d
.container_image_digests
for d
in daemons
if d
.daemon_type
== 'mgr'):
196 completed_daemons
= [(d
.daemon_type
, any(d
in self
.upgrade_state
.target_digests
for d
in (
197 d
.container_image_digests
or []))) for d
in daemons
if d
.daemon_type
]
199 done
= len([True for completion
in completed_daemons
if completion
[1]])
201 completed_types
= list(set([completion
[0] for completion
in completed_daemons
if all(
202 c
[1] for c
in completed_daemons
if c
[0] == completion
[0])]))
204 return '%s/%s daemons upgraded' % (done
, len(daemons
)), completed_types
206 def _get_filtered_daemons(self
) -> List
[DaemonDescription
]:
207 # Return the set of daemons set to be upgraded with out current
208 # filtering parameters (or all daemons in upgrade order if no filtering
209 # parameter are set).
210 assert self
.upgrade_state
is not None
211 if self
.upgrade_state
.daemon_types
is not None:
212 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
213 ) if d
.daemon_type
in self
.upgrade_state
.daemon_types
]
214 elif self
.upgrade_state
.services
is not None:
216 for service
in self
.upgrade_state
.services
:
217 daemons
+= self
.mgr
.cache
.get_daemons_by_service(service
)
219 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
220 ) if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
221 if self
.upgrade_state
.hosts
is not None:
222 daemons
= [d
for d
in daemons
if d
.hostname
in self
.upgrade_state
.hosts
]
225 def _get_current_version(self
) -> Tuple
[int, int, str]:
226 current_version
= self
.mgr
.version
.split('ceph version ')[1]
227 (current_major
, current_minor
, _
) = current_version
.split('-')[0].split('.', 2)
228 return (int(current_major
), int(current_minor
), current_version
)
230 def _check_target_version(self
, version
: str) -> Optional
[str]:
232 v
= version
.split('.', 2)
233 (major
, minor
) = (int(v
[0]), int(v
[1]))
235 # patch might be a number or {number}-g{sha1}
237 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
238 if major
< 15 or (major
== 15 and minor
< 2):
239 return 'cephadm only supports octopus (15.2.0) or later'
242 (current_major
, current_minor
, current_version
) = self
._get
_current
_version
()
243 if current_major
< major
- 2:
244 return f
'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
245 if current_major
> major
:
246 return f
'ceph cannot downgrade major versions (from {current_version} to {version})'
247 if current_major
== major
:
248 if current_minor
> minor
:
249 return f
'ceph cannot downgrade to a {"rc" if minor == 1 else "dev"} release'
252 monmap
= self
.mgr
.get("mon_map")
253 mon_min
= monmap
.get("min_mon_release", 0)
254 if mon_min
< major
- 2:
255 return f
'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
258 osdmap
= self
.mgr
.get("osd_map")
259 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
260 osd_min
= ceph_release_to_major(osd_min_name
)
261 if osd_min
< major
- 2:
262 return f
'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
266 def upgrade_ls(self
, image
: Optional
[str], tags
: bool, show_all_versions
: Optional
[bool]) -> Dict
:
268 image
= self
.mgr
.container_image_base
269 reg_name
, bare_image
= image
.split('/', 1)
270 if ':' in bare_image
:
271 # for our purposes, we don't want to use the tag here
272 bare_image
= bare_image
.split(':')[0]
273 reg
= Registry(reg_name
)
274 (current_major
, current_minor
, _
) = self
._get
_current
_version
()
276 r
: Dict
[Any
, Any
] = {
278 "registry": reg_name
,
279 "bare_image": bare_image
,
283 ls
= reg
.get_tags(bare_image
)
284 except ValueError as e
:
285 raise OrchestratorError(f
'{e}')
297 candidate_version
= (v_major
> current_major
298 or (v_major
== current_major
and v_minor
>= current_minor
))
299 if show_all_versions
or candidate_version
:
300 versions
.append('.'.join(v
))
301 r
["versions"] = sorted(
303 key
=lambda k
: list(map(int, k
.split('.'))),
307 r
["tags"] = sorted(ls
)
310 def upgrade_start(self
, image
: str, version
: str, daemon_types
: Optional
[List
[str]] = None,
311 hosts
: Optional
[List
[str]] = None, services
: Optional
[List
[str]] = None, limit
: Optional
[int] = None) -> str:
312 fail_fs_value
= cast(bool, self
.mgr
.get_module_option_ex(
313 'orchestrator', 'fail_fs', False))
314 if self
.mgr
.mode
!= 'root':
315 raise OrchestratorError('upgrade is not supported in %s mode' % (
318 version_error
= self
._check
_target
_version
(version
)
320 raise OrchestratorError(version_error
)
321 target_name
= self
.mgr
.container_image_base
+ ':v' + version
323 target_name
= normalize_image_digest(image
, self
.mgr
.default_registry
)
325 raise OrchestratorError('must specify either image or version')
327 if daemon_types
is not None or services
is not None or hosts
is not None:
328 self
._validate
_upgrade
_filters
(target_name
, daemon_types
, hosts
, services
)
330 if self
.upgrade_state
:
331 if self
.upgrade_state
._target
_name
!= target_name
:
332 raise OrchestratorError(
333 'Upgrade to %s (not %s) already in progress' %
334 (self
.upgrade_state
._target
_name
, target_name
))
335 if self
.upgrade_state
.paused
:
336 self
.upgrade_state
.paused
= False
337 self
._save
_upgrade
_state
()
338 return 'Resumed upgrade to %s' % self
.target_image
339 return 'Upgrade to %s in progress' % self
.target_image
341 running_mgr_count
= len([daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type(
342 'mgr') if daemon
.status
== DaemonDescriptionStatus
.running
])
344 if running_mgr_count
< 2:
345 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
347 self
.mgr
.log
.info('Upgrade: Started with target %s' % target_name
)
348 self
.upgrade_state
= UpgradeState(
349 target_name
=target_name
,
350 progress_id
=str(uuid
.uuid4()),
351 fail_fs
=fail_fs_value
,
352 daemon_types
=daemon_types
,
356 remaining_count
=limit
,
358 self
._update
_upgrade
_progress
(0.0)
359 self
._save
_upgrade
_state
()
360 self
._clear
_upgrade
_health
_checks
()
362 return 'Initiating upgrade to %s' % (target_name
)
364 def _validate_upgrade_filters(self
, target_name
: str, daemon_types
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None, services
: Optional
[List
[str]] = None) -> None:
365 def _latest_type(dtypes
: List
[str]) -> str:
366 # [::-1] gives the list in reverse
367 for daemon_type
in CEPH_UPGRADE_ORDER
[::-1]:
368 if daemon_type
in dtypes
:
372 def _get_earlier_daemons(dtypes
: List
[str], candidates
: List
[DaemonDescription
]) -> List
[DaemonDescription
]:
373 # this function takes a list of daemon types and first finds the daemon
374 # type from that list that is latest in our upgrade order. Then, from
375 # that latest type, it filters the list of candidate daemons received
376 # for daemons with types earlier in the upgrade order than the latest
377 # type found earlier. That filtered list of daemons is returned. The
378 # purpose of this function is to help in finding daemons that must have
379 # already been upgraded for the given filtering parameters (--daemon-types,
380 # --services, --hosts) to be valid.
381 latest
= _latest_type(dtypes
)
384 earlier_types
= '|'.join(CEPH_UPGRADE_ORDER
).split(latest
)[0].split('|')[:-1]
385 earlier_types
= [t
for t
in earlier_types
if t
not in dtypes
]
386 return [d
for d
in candidates
if d
.daemon_type
in earlier_types
]
388 if self
.upgrade_state
:
389 raise OrchestratorError(
390 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
392 with self
.mgr
.async_timeout_handler('cephadm inspect-image'):
393 target_id
, target_version
, target_digests
= self
.mgr
.wait_async(
394 CephadmServe(self
.mgr
)._get
_container
_image
_info
(target_name
))
395 except OrchestratorError
as e
:
396 raise OrchestratorError(f
'Failed to pull {target_name}: {str(e)}')
397 # what we need to do here is build a list of daemons that must already be upgraded
398 # in order for the user's selection of daemons to upgrade to be valid. for example,
399 # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
400 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
401 ) if d
.daemon_type
not in NON_CEPH_IMAGE_TYPES
]
402 err_msg_base
= 'Cannot start upgrade. '
403 # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
405 if daemon_types
is not None:
406 dtypes
= daemon_types
407 if hosts
is not None:
408 dtypes
= [_latest_type(dtypes
)]
409 other_host_daemons
= [
410 d
for d
in daemons
if d
.hostname
is not None and d
.hostname
not in hosts
]
411 daemons
= _get_earlier_daemons(dtypes
, other_host_daemons
)
413 daemons
= _get_earlier_daemons(dtypes
, daemons
)
414 err_msg_base
+= 'Daemons with types earlier in upgrade order than given types need upgrading.\n'
415 elif services
is not None:
416 # for our purposes here we can effectively convert our list of services into the
417 # set of daemon types the services contain. This works because we don't allow --services
418 # and --daemon-types at the same time and we only allow services of the same type
420 self
.mgr
.spec_store
[s
].spec
for s
in services
if self
.mgr
.spec_store
[s
].spec
is not None]
421 stypes
= list(set([s
.service_type
for s
in sspecs
]))
423 raise OrchestratorError('Doing upgrade by service only support services of one type at '
424 f
'a time. Found service types: {stypes}')
426 dtypes
+= orchestrator
.service_to_daemon_types(stype
)
427 dtypes
= list(set(dtypes
))
428 if hosts
is not None:
429 other_host_daemons
= [
430 d
for d
in daemons
if d
.hostname
is not None and d
.hostname
not in hosts
]
431 daemons
= _get_earlier_daemons(dtypes
, other_host_daemons
)
433 daemons
= _get_earlier_daemons(dtypes
, daemons
)
434 err_msg_base
+= 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n'
435 elif hosts
is not None:
436 # hosts must be handled a bit differently. For this, we really need to find all the daemon types
437 # that reside on hosts in the list of hosts we will upgrade. Then take the type from
438 # that list that is latest in the upgrade order and check if any daemons on hosts not in the
439 # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
441 set([d
.daemon_type
for d
in daemons
if d
.daemon_type
is not None and d
.hostname
in hosts
]))
442 other_hosts_daemons
= [
443 d
for d
in daemons
if d
.hostname
is not None and d
.hostname
not in hosts
]
444 daemons
= _get_earlier_daemons([_latest_type(dtypes
)], other_hosts_daemons
)
445 err_msg_base
+= 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
446 need_upgrade_self
, n1
, n2
, _
= self
._detect
_need
_upgrade
(daemons
, target_digests
, target_name
)
447 if need_upgrade_self
and ('mgr' not in dtypes
or (daemon_types
is None and services
is None)):
448 # also report active mgr as needing to be upgraded. It is not included in the resulting list
449 # by default as it is treated special and handled via the need_upgrade_self bool
450 n1
.insert(0, (self
.mgr
.mgr_service
.get_active_daemon(
451 self
.mgr
.cache
.get_daemons_by_type('mgr')), True))
453 raise OrchestratorError(f
'{err_msg_base}Please first upgrade '
454 f
'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
455 f
'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}')
457 def upgrade_pause(self
) -> str:
458 if not self
.upgrade_state
:
459 raise OrchestratorError('No upgrade in progress')
460 if self
.upgrade_state
.paused
:
461 return 'Upgrade to %s already paused' % self
.target_image
462 self
.upgrade_state
.paused
= True
463 self
.mgr
.log
.info('Upgrade: Paused upgrade to %s' % self
.target_image
)
464 self
._save
_upgrade
_state
()
465 return 'Paused upgrade to %s' % self
.target_image
467 def upgrade_resume(self
) -> str:
468 if not self
.upgrade_state
:
469 raise OrchestratorError('No upgrade in progress')
470 if not self
.upgrade_state
.paused
:
471 return 'Upgrade to %s not paused' % self
.target_image
472 self
.upgrade_state
.paused
= False
473 self
.upgrade_state
.error
= ''
474 self
.mgr
.log
.info('Upgrade: Resumed upgrade to %s' % self
.target_image
)
475 self
._save
_upgrade
_state
()
477 for alert_id
in self
.UPGRADE_ERRORS
:
478 self
.mgr
.remove_health_warning(alert_id
)
479 return 'Resumed upgrade to %s' % self
.target_image
481 def upgrade_stop(self
) -> str:
482 if not self
.upgrade_state
:
483 return 'No upgrade in progress'
484 if self
.upgrade_state
.progress_id
:
485 self
.mgr
.remote('progress', 'complete',
486 self
.upgrade_state
.progress_id
)
487 target_image
= self
.target_image
488 self
.mgr
.log
.info('Upgrade: Stopped')
489 self
.upgrade_state
= None
490 self
._save
_upgrade
_state
()
491 self
._clear
_upgrade
_health
_checks
()
493 return 'Stopped upgrade to %s' % target_image
495 def continue_upgrade(self
) -> bool:
497 Returns false, if nothing was done.
500 if self
.upgrade_state
and not self
.upgrade_state
.paused
:
503 except HostConnectionError
as e
:
504 self
._fail
_upgrade
('UPGRADE_OFFLINE_HOST', {
506 'summary': f
'Upgrade: Failed to connect to host {e.hostname} at addr ({e.addr})',
508 'detail': [f
'SSH connection failed to {e.hostname} at addr ({e.addr}): {str(e)}'],
511 except Exception as e
:
512 self
._fail
_upgrade
('UPGRADE_EXCEPTION', {
514 'summary': 'Upgrade: failed due to an unexpected exception',
516 'detail': [f
'Unexpected exception occurred during upgrade process: {str(e)}'],
522 def _wait_for_ok_to_stop(
523 self
, s
: DaemonDescription
,
524 known
: Optional
[List
[str]] = None, # NOTE: output argument!
526 # only wait a little bit; the service might go away for something
527 assert s
.daemon_type
is not None
528 assert s
.daemon_id
is not None
531 if not self
.upgrade_state
or self
.upgrade_state
.paused
:
534 # setting force flag to retain old functionality.
535 # note that known is an output argument for ok_to_stop()
536 r
= self
.mgr
.cephadm_services
[daemon_type_to_service(s
.daemon_type
)].ok_to_stop([
537 s
.daemon_id
], known
=known
, force
=True)
540 logger
.info(f
'Upgrade: {r.stdout}')
542 logger
.info(f
'Upgrade: {r.stderr}')
548 def _clear_upgrade_health_checks(self
) -> None:
549 for k
in self
.UPGRADE_ERRORS
:
550 if k
in self
.mgr
.health_checks
:
551 del self
.mgr
.health_checks
[k
]
552 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
554 def _fail_upgrade(self
, alert_id
: str, alert
: dict) -> None:
555 assert alert_id
in self
.UPGRADE_ERRORS
556 if not self
.upgrade_state
:
557 # this could happen if the user canceled the upgrade while we
558 # were doing something
561 logger
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
563 self
.upgrade_state
.error
= alert_id
+ ': ' + alert
['summary']
564 self
.upgrade_state
.paused
= True
565 self
._save
_upgrade
_state
()
566 self
.mgr
.health_checks
[alert_id
] = alert
567 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
569 def _update_upgrade_progress(self
, progress
: float) -> None:
570 if not self
.upgrade_state
:
571 assert False, 'No upgrade in progress'
573 if not self
.upgrade_state
.progress_id
:
574 self
.upgrade_state
.progress_id
= str(uuid
.uuid4())
575 self
._save
_upgrade
_state
()
576 self
.mgr
.remote('progress', 'update', self
.upgrade_state
.progress_id
,
577 ev_msg
='Upgrade to %s' % (
578 self
.upgrade_state
.target_version
or self
.target_image
580 ev_progress
=progress
,
583 def _save_upgrade_state(self
) -> None:
584 if not self
.upgrade_state
:
585 self
.mgr
.set_store('upgrade_state', None)
587 self
.mgr
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
.to_json()))
589 def get_distinct_container_image_settings(self
) -> Dict
[str, str]:
590 # get all distinct container_image settings
592 ret
, out
, err
= self
.mgr
.check_mon_command({
593 'prefix': 'config dump',
596 config
= json
.loads(out
)
598 if opt
['name'] == 'container_image':
599 image_settings
[opt
['section']] = opt
['value']
600 return image_settings
602 def _prepare_for_mds_upgrade(
605 need_upgrade
: List
[DaemonDescription
]
607 # scale down all filesystems to 1 MDS
608 assert self
.upgrade_state
609 if not self
.upgrade_state
.fs_original_max_mds
:
610 self
.upgrade_state
.fs_original_max_mds
= {}
611 if not self
.upgrade_state
.fs_original_allow_standby_replay
:
612 self
.upgrade_state
.fs_original_allow_standby_replay
= {}
613 fsmap
= self
.mgr
.get("fs_map")
614 continue_upgrade
= True
615 for fs
in fsmap
.get('filesystems', []):
617 mdsmap
= fs
["mdsmap"]
618 fs_name
= mdsmap
["fs_name"]
620 # disable allow_standby_replay?
621 if mdsmap
['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
:
622 self
.mgr
.log
.info('Upgrade: Disabling standby-replay for filesystem %s' % (
625 if fscid
not in self
.upgrade_state
.fs_original_allow_standby_replay
:
626 self
.upgrade_state
.fs_original_allow_standby_replay
[fscid
] = True
627 self
._save
_upgrade
_state
()
628 ret
, out
, err
= self
.mgr
.check_mon_command({
631 'var': 'allow_standby_replay',
634 continue_upgrade
= False
637 # scale down this filesystem?
638 if mdsmap
["max_mds"] > 1:
639 if self
.upgrade_state
.fail_fs
:
640 if not (mdsmap
['flags'] & CEPH_MDSMAP_NOT_JOINABLE
) and \
641 len(mdsmap
['up']) > 0:
642 self
.mgr
.log
.info(f
'Upgrade: failing fs {fs_name} for '
643 f
'rapid multi-rank mds upgrade')
644 ret
, out
, err
= self
.mgr
.check_mon_command({
649 continue_upgrade
= False
652 self
.mgr
.log
.info('Upgrade: Scaling down filesystem %s' % (
655 if fscid
not in self
.upgrade_state
.fs_original_max_mds
:
656 self
.upgrade_state
.fs_original_max_mds
[fscid
] = \
658 self
._save
_upgrade
_state
()
659 ret
, out
, err
= self
.mgr
.check_mon_command({
665 continue_upgrade
= False
668 if not self
.upgrade_state
.fail_fs
:
669 if not (mdsmap
['in'] == [0] and len(mdsmap
['up']) <= 1):
671 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (
674 continue_upgrade
= False
677 if len(mdsmap
['up']) == 0:
678 self
.mgr
.log
.warning(
679 "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
680 # This can happen because the current version MDS have
681 # incompatible compatsets; the mons will not do any promotions.
682 # We must upgrade to continue.
683 elif len(mdsmap
['up']) > 0:
684 mdss
= list(mdsmap
['info'].values())
685 assert len(mdss
) == 1
687 if lone_mds
['state'] != 'up:active':
688 self
.mgr
.log
.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
693 continue_upgrade
= False
698 return continue_upgrade
700 def _enough_mons_for_ok_to_stop(self
) -> bool:
702 ret
, out
, err
= self
.mgr
.check_mon_command({
703 'prefix': 'quorum_status',
708 raise OrchestratorError('failed to parse quorum status')
710 mons
= [m
['name'] for m
in j
['monmap']['mons']]
713 def _enough_mds_for_ok_to_stop(self
, mds_daemon
: DaemonDescription
) -> bool:
714 # type (DaemonDescription) -> bool
716 # find fs this mds daemon belongs to
717 fsmap
= self
.mgr
.get("fs_map")
718 for fs
in fsmap
.get('filesystems', []):
719 mdsmap
= fs
["mdsmap"]
720 fs_name
= mdsmap
["fs_name"]
722 assert mds_daemon
.daemon_id
723 if fs_name
!= mds_daemon
.service_name().split('.', 1)[1]:
724 # wrong fs for this mds daemon
727 # get number of mds daemons for this fs
729 [daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_service(mds_daemon
.service_name())])
731 # standby mds daemons for this fs?
732 if mdsmap
["max_mds"] < mds_count
:
736 return True # if mds has no fs it should pass ok-to-stop
738 def _detect_need_upgrade(self
, daemons
: List
[DaemonDescription
], target_digests
: Optional
[List
[str]] = None, target_name
: Optional
[str] = None) -> Tuple
[bool, List
[Tuple
[DaemonDescription
, bool]], List
[Tuple
[DaemonDescription
, bool]], int]:
739 # this function takes a list of daemons and container digests. The purpose
740 # is to go through each daemon and check if the current container digests
741 # for that daemon match the target digests. The purpose being that we determine
742 # if a daemon is upgraded to a certain container image or not based on what
743 # container digests it has. By checking the current digests against the
744 # targets we can determine which daemons still need to be upgraded
745 need_upgrade_self
= False
746 need_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
747 need_upgrade_deployer
: List
[Tuple
[DaemonDescription
, bool]] = []
749 if target_digests
is None:
751 if target_name
is None:
754 assert d
.daemon_type
is not None
755 assert d
.daemon_id
is not None
756 assert d
.hostname
is not None
757 if self
.mgr
.use_agent
and not self
.mgr
.cache
.host_metadata_up_to_date(d
.hostname
):
759 correct_image
= False
760 # check if the container digest for the digest we're upgrading to matches
761 # the container digest for the daemon if "use_repo_digest" setting is true
762 # or that the image name matches the daemon's image name if "use_repo_digest"
763 # is false. The idea is to generally check if the daemon is already using
764 # the image we're upgrading to or not. Additionally, since monitoring stack
765 # daemons are included in the upgrade process but don't use the ceph images
766 # we are assuming any monitoring stack daemon is on the "correct" image already
768 (self
.mgr
.use_repo_digest
and d
.matches_digests(target_digests
))
769 or (not self
.mgr
.use_repo_digest
and d
.matches_image_name(target_name
))
770 or (d
.daemon_type
in NON_CEPH_IMAGE_TYPES
)
772 logger
.debug('daemon %s.%s on correct image' % (
773 d
.daemon_type
, d
.daemon_id
))
775 # do deployed_by check using digest no matter what. We don't care
776 # what repo the image used to deploy the daemon was as long
777 # as the image content is correct
778 if any(d
in target_digests
for d
in (d
.deployed_by
or [])):
779 logger
.debug('daemon %s.%s deployed by correct version' % (
780 d
.daemon_type
, d
.daemon_id
))
784 if self
.mgr
.daemon_is_self(d
.daemon_type
, d
.daemon_id
):
785 logger
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
786 self
.mgr
.get_mgr_id())
787 need_upgrade_self
= True
791 logger
.debug('daemon %s.%s not deployed by correct version' % (
792 d
.daemon_type
, d
.daemon_id
))
793 need_upgrade_deployer
.append((d
, True))
795 logger
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
796 d
.daemon_type
, d
.daemon_id
,
797 d
.container_image_name
, d
.container_image_digests
, d
.version
))
798 need_upgrade
.append((d
, False))
800 return (need_upgrade_self
, need_upgrade
, need_upgrade_deployer
, done
)
802 def _to_upgrade(self
, need_upgrade
: List
[Tuple
[DaemonDescription
, bool]], target_image
: str) -> Tuple
[bool, List
[Tuple
[DaemonDescription
, bool]]]:
803 to_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
804 known_ok_to_stop
: List
[str] = []
805 for d_entry
in need_upgrade
:
807 assert d
.daemon_type
is not None
808 assert d
.daemon_id
is not None
809 assert d
.hostname
is not None
811 if not d
.container_image_id
:
812 if d
.container_image_name
== target_image
:
814 'daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
818 if d
.name() in known_ok_to_stop
:
819 logger
.info(f
'Upgrade: {d.name()} is also safe to restart')
820 to_upgrade
.append(d_entry
)
823 if d
.daemon_type
== 'osd':
824 # NOTE: known_ok_to_stop is an output argument for
825 # _wait_for_ok_to_stop
826 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
827 return False, to_upgrade
829 if d
.daemon_type
== 'mon' and self
._enough
_mons
_for
_ok
_to
_stop
():
830 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
831 return False, to_upgrade
833 if d
.daemon_type
== 'mds' and self
._enough
_mds
_for
_ok
_to
_stop
(d
):
834 # when fail_fs is set to true, all MDS daemons will be moved to
835 # up:standby state, so Cephadm won't be able to upgrade due to
836 # this check and and will warn with "It is NOT safe to stop
837 # mds.<daemon_name> at this time: one or more filesystems is
838 # currently degraded", therefore we bypass this check for that
840 assert self
.upgrade_state
is not None
841 if not self
.upgrade_state
.fail_fs \
842 and not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
843 return False, to_upgrade
845 to_upgrade
.append(d_entry
)
847 # if we don't have a list of others to consider, stop now
848 if d
.daemon_type
in ['osd', 'mds', 'mon'] and not known_ok_to_stop
:
850 return True, to_upgrade
852 def _upgrade_daemons(self
, to_upgrade
: List
[Tuple
[DaemonDescription
, bool]], target_image
: str, target_digests
: Optional
[List
[str]] = None) -> None:
853 assert self
.upgrade_state
is not None
855 if target_digests
is None:
857 for d_entry
in to_upgrade
:
858 if self
.upgrade_state
.remaining_count
is not None and self
.upgrade_state
.remaining_count
<= 0 and not d_entry
[1]:
860 f
'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
863 assert d
.daemon_type
is not None
864 assert d
.daemon_id
is not None
865 assert d
.hostname
is not None
867 # make sure host has latest container image
868 with self
.mgr
.async_timeout_handler(d
.hostname
, 'cephadm inspect-image'):
869 out
, errs
, code
= self
.mgr
.wait_async(CephadmServe(self
.mgr
)._run
_cephadm
(
870 d
.hostname
, '', 'inspect-image', [],
871 image
=target_image
, no_fsid
=True, error_ok
=True))
872 if code
or not any(d
in target_digests
for d
in json
.loads(''.join(out
)).get('repo_digests', [])):
873 logger
.info('Upgrade: Pulling %s on %s' % (target_image
,
875 self
.upgrade_info_str
= 'Pulling %s image on host %s' % (
876 target_image
, d
.hostname
)
877 with self
.mgr
.async_timeout_handler(d
.hostname
, 'cephadm pull'):
878 out
, errs
, code
= self
.mgr
.wait_async(CephadmServe(self
.mgr
)._run
_cephadm
(
879 d
.hostname
, '', 'pull', [],
880 image
=target_image
, no_fsid
=True, error_ok
=True))
882 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
883 'severity': 'warning',
884 'summary': 'Upgrade: failed to pull target image',
887 'failed to pull %s on host %s' % (target_image
,
891 r
= json
.loads(''.join(out
))
892 if not any(d
in target_digests
for d
in r
.get('repo_digests', [])):
893 logger
.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
894 target_image
, d
.hostname
, r
['repo_digests'], target_digests
))
895 self
.upgrade_info_str
= 'Image %s pull on %s got new digests %s (not %s), restarting' % (
896 target_image
, d
.hostname
, r
['repo_digests'], target_digests
)
897 self
.upgrade_state
.target_digests
= r
['repo_digests']
898 self
._save
_upgrade
_state
()
901 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (d
.daemon_type
)
903 if len(to_upgrade
) > 1:
904 logger
.info('Upgrade: Updating %s.%s (%d/%d)' % (d
.daemon_type
, d
.daemon_id
, num
, min(len(to_upgrade
),
905 self
.upgrade_state
.remaining_count
if self
.upgrade_state
.remaining_count
is not None else 9999999)))
907 logger
.info('Upgrade: Updating %s.%s' %
908 (d
.daemon_type
, d
.daemon_id
))
909 action
= 'Upgrading' if not d_entry
[1] else 'Redeploying'
911 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(d
)
912 self
.mgr
._daemon
_action
(
915 image
=target_image
if not d_entry
[1] else None
917 self
.mgr
.cache
.metadata_up_to_date
[d
.hostname
] = False
918 except Exception as e
:
919 self
._fail
_upgrade
('UPGRADE_REDEPLOY_DAEMON', {
920 'severity': 'warning',
921 'summary': f
'{action} daemon {d.name()} on host {d.hostname} failed.',
924 f
'Upgrade daemon: {d.name()}: {e}'
929 if self
.upgrade_state
.remaining_count
is not None and not d_entry
[1]:
930 self
.upgrade_state
.remaining_count
-= 1
931 self
._save
_upgrade
_state
()
933 def _handle_need_upgrade_self(self
, need_upgrade_self
: bool, upgrading_mgrs
: bool) -> None:
934 if need_upgrade_self
:
936 self
.mgr
.mgr_service
.fail_over()
937 except OrchestratorError
as e
:
938 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
939 'severity': 'warning',
940 'summary': f
'Upgrade: {e}',
943 'The upgrade process needs to upgrade the mgr, '
944 'but it needs at least one standby to proceed.',
949 return # unreachable code, as fail_over never returns
951 if 'UPGRADE_NO_STANDBY_MGR' in self
.mgr
.health_checks
:
952 del self
.mgr
.health_checks
['UPGRADE_NO_STANDBY_MGR']
953 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
955 def _set_container_images(self
, daemon_type
: str, target_image
: str, image_settings
: Dict
[str, str]) -> None:
957 daemon_type_section
= name_to_config_section(daemon_type
)
958 if image_settings
.get(daemon_type_section
) != target_image
:
959 logger
.info('Upgrade: Setting container_image for all %s' %
961 self
.mgr
.set_container_image(daemon_type_section
, target_image
)
963 for section
in image_settings
.keys():
964 if section
.startswith(name_to_config_section(daemon_type
) + '.'):
965 to_clean
.append(section
)
967 logger
.debug('Upgrade: Cleaning up container_image for %s' %
969 for section
in to_clean
:
970 ret
, image
, err
= self
.mgr
.check_mon_command({
971 'prefix': 'config rm',
972 'name': 'container_image',
976 def _complete_osd_upgrade(self
, target_major
: str, target_major_name
: str) -> None:
977 osdmap
= self
.mgr
.get("osd_map")
978 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
979 osd_min
= ceph_release_to_major(osd_min_name
)
980 if osd_min
< int(target_major
):
982 f
'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
983 ret
, _
, err
= self
.mgr
.check_mon_command({
984 'prefix': 'osd require-osd-release',
985 'release': target_major_name
,
988 def _complete_mds_upgrade(self
) -> None:
989 assert self
.upgrade_state
is not None
990 if self
.upgrade_state
.fail_fs
:
991 for fs
in self
.mgr
.get("fs_map")['filesystems']:
992 fs_name
= fs
['mdsmap']['fs_name']
993 self
.mgr
.log
.info('Upgrade: Setting filesystem '
994 f
'{fs_name} Joinable')
996 ret
, _
, err
= self
.mgr
.check_mon_command({
1002 except Exception as e
:
1003 logger
.error("Failed to set fs joinable "
1005 raise OrchestratorError("Failed to set"
1008 elif self
.upgrade_state
.fs_original_max_mds
:
1009 for fs
in self
.mgr
.get("fs_map")['filesystems']:
1011 fs_name
= fs
['mdsmap']['fs_name']
1012 new_max
= self
.upgrade_state
.fs_original_max_mds
.get(fscid
, 1)
1014 self
.mgr
.log
.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
1017 ret
, _
, err
= self
.mgr
.check_mon_command({
1021 'val': str(new_max
),
1024 self
.upgrade_state
.fs_original_max_mds
= {}
1025 self
._save
_upgrade
_state
()
1026 if self
.upgrade_state
.fs_original_allow_standby_replay
:
1027 for fs
in self
.mgr
.get("fs_map")['filesystems']:
1029 fs_name
= fs
['mdsmap']['fs_name']
1030 asr
= self
.upgrade_state
.fs_original_allow_standby_replay
.get(fscid
, False)
1032 self
.mgr
.log
.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
1035 ret
, _
, err
= self
.mgr
.check_mon_command({
1038 'var': 'allow_standby_replay',
1042 self
.upgrade_state
.fs_original_allow_standby_replay
= {}
1043 self
._save
_upgrade
_state
()
1045 def _mark_upgrade_complete(self
) -> None:
1046 if not self
.upgrade_state
:
1047 logger
.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
1049 logger
.info('Upgrade: Complete!')
1050 if self
.upgrade_state
.progress_id
:
1051 self
.mgr
.remote('progress', 'complete',
1052 self
.upgrade_state
.progress_id
)
1053 self
.upgrade_state
= None
1054 self
._save
_upgrade
_state
()
1056 def _do_upgrade(self
):
1058 if not self
.upgrade_state
:
1059 logger
.debug('_do_upgrade no state, exiting')
1062 if self
.mgr
.offline_hosts
:
1063 # offline host(s), on top of potential connection errors when trying to upgrade a daemon
1064 # or pull an image, can cause issues where daemons are never ok to stop. Since evaluating
1065 # whether or not that risk is present for any given offline hosts is a difficult problem,
1066 # it's best to just fail upgrade cleanly so user can address the offline host(s)
1068 # the HostConnectionError expects a hostname and addr, so let's just take
1069 # one at random. It doesn't really matter which host we say we couldn't reach here.
1070 hostname
: str = list(self
.mgr
.offline_hosts
)[0]
1071 addr
: str = self
.mgr
.inventory
.get_addr(hostname
)
1072 raise HostConnectionError(f
'Host(s) were marked offline: {self.mgr.offline_hosts}', hostname
, addr
)
1074 target_image
= self
.target_image
1075 target_id
= self
.upgrade_state
.target_id
1076 target_digests
= self
.upgrade_state
.target_digests
1077 target_version
= self
.upgrade_state
.target_version
1080 if not target_id
or not target_version
or not target_digests
:
1081 # need to learn the container hash
1082 logger
.info('Upgrade: First pull of %s' % target_image
)
1083 self
.upgrade_info_str
= 'Doing first pull of %s image' % (target_image
)
1085 with self
.mgr
.async_timeout_handler(f
'cephadm inspect-image (image {target_image})'):
1086 target_id
, target_version
, target_digests
= self
.mgr
.wait_async(
1087 CephadmServe(self
.mgr
)._get
_container
_image
_info
(target_image
))
1088 except OrchestratorError
as e
:
1089 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
1090 'severity': 'warning',
1091 'summary': 'Upgrade: failed to pull target image',
1096 if not target_version
:
1097 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
1098 'severity': 'warning',
1099 'summary': 'Upgrade: failed to pull target image',
1101 'detail': ['unable to extract ceph version from container'],
1104 self
.upgrade_state
.target_id
= target_id
1105 # extract the version portion of 'ceph version {version} ({sha1})'
1106 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
1107 self
.upgrade_state
.target_digests
= target_digests
1108 self
._save
_upgrade
_state
()
1109 target_image
= self
.target_image
1112 if target_digests
is None:
1114 if target_version
.startswith('ceph version '):
1115 # tolerate/fix upgrade state from older version
1116 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
1117 target_version
= self
.upgrade_state
.target_version
1118 (target_major
, _
) = target_version
.split('.', 1)
1119 target_major_name
= self
.mgr
.lookup_release_name(int(target_major
))
1122 logger
.info('Upgrade: Target is version %s (%s)' % (
1123 target_version
, target_major_name
))
1124 logger
.info('Upgrade: Target container is %s, digests %s' % (
1125 target_image
, target_digests
))
1127 version_error
= self
._check
_target
_version
(target_version
)
1129 self
._fail
_upgrade
('UPGRADE_BAD_TARGET_VERSION', {
1130 'severity': 'error',
1131 'summary': f
'Upgrade: cannot upgrade/downgrade to {target_version}',
1133 'detail': [version_error
],
1137 image_settings
= self
.get_distinct_container_image_settings()
1139 # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
1140 # MDSMap::compat for all fs. This is no longer the case beginning in
1141 # v16.2.5. We must disable the sanity checks during upgrade.
1142 # N.B.: we don't bother confirming the operator has not already
1143 # disabled this or saving the config value.
1144 self
.mgr
.check_mon_command({
1145 'prefix': 'config set',
1146 'name': 'mon_mds_skip_sanity',
1151 if self
.upgrade_state
.daemon_types
is not None:
1153 f
'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
1154 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
1155 ) if d
.daemon_type
in self
.upgrade_state
.daemon_types
]
1156 elif self
.upgrade_state
.services
is not None:
1158 f
'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
1160 for service
in self
.upgrade_state
.services
:
1161 daemons
+= self
.mgr
.cache
.get_daemons_by_service(service
)
1163 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
1164 ) if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
1165 if self
.upgrade_state
.hosts
is not None:
1166 logger
.debug(f
'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
1167 daemons
= [d
for d
in daemons
if d
.hostname
in self
.upgrade_state
.hosts
]
1168 upgraded_daemon_count
: int = 0
1169 for daemon_type
in CEPH_UPGRADE_ORDER
:
1170 if self
.upgrade_state
.remaining_count
is not None and self
.upgrade_state
.remaining_count
<= 0:
1171 # we hit our limit and should end the upgrade
1172 # except for cases where we only need to redeploy, but not actually upgrade
1173 # the image (which we don't count towards our limit). This case only occurs with mgr
1174 # and monitoring stack daemons. Additionally, this case is only valid if
1175 # the active mgr is already upgraded.
1176 if any(d
in target_digests
for d
in self
.mgr
.get_active_mgr_digests()):
1177 if daemon_type
not in NON_CEPH_IMAGE_TYPES
and daemon_type
!= 'mgr':
1180 self
._mark
_upgrade
_complete
()
1182 logger
.debug('Upgrade: Checking %s daemons' % daemon_type
)
1183 daemons_of_type
= [d
for d
in daemons
if d
.daemon_type
== daemon_type
]
1185 need_upgrade_self
, need_upgrade
, need_upgrade_deployer
, done
= self
._detect
_need
_upgrade
(
1186 daemons_of_type
, target_digests
, target_image
)
1187 upgraded_daemon_count
+= done
1188 self
._update
_upgrade
_progress
(upgraded_daemon_count
/ len(daemons
))
1190 # make sure mgr and non-ceph-image daemons are properly redeployed in staggered upgrade scenarios
1191 if daemon_type
== 'mgr' or daemon_type
in NON_CEPH_IMAGE_TYPES
:
1192 if any(d
in target_digests
for d
in self
.mgr
.get_active_mgr_digests()):
1193 need_upgrade_names
= [d
[0].name() for d
in need_upgrade
] + \
1194 [d
[0].name() for d
in need_upgrade_deployer
]
1195 dds
= [d
for d
in self
.mgr
.cache
.get_daemons_by_type(
1196 daemon_type
) if d
.name() not in need_upgrade_names
]
1197 need_upgrade_active
, n1
, n2
, __
= self
._detect
_need
_upgrade
(dds
, target_digests
, target_image
)
1199 if not need_upgrade_self
and need_upgrade_active
:
1200 need_upgrade_self
= True
1201 need_upgrade_deployer
+= n2
1203 # no point in trying to redeploy with new version if active mgr is not on the new version
1204 need_upgrade_deployer
= []
1206 if any(d
in target_digests
for d
in self
.mgr
.get_active_mgr_digests()):
1207 # only after the mgr itself is upgraded can we expect daemons to have
1208 # deployed_by == target_digests
1209 need_upgrade
+= need_upgrade_deployer
1211 # prepare filesystems for daemon upgrades?
1213 daemon_type
== 'mds'
1215 and not self
._prepare
_for
_mds
_upgrade
(target_major
, [d_entry
[0] for d_entry
in need_upgrade
])
1220 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (daemon_type
)
1222 _continue
, to_upgrade
= self
._to
_upgrade
(need_upgrade
, target_image
)
1225 self
._upgrade
_daemons
(to_upgrade
, target_image
, target_digests
)
1229 self
._handle
_need
_upgrade
_self
(need_upgrade_self
, daemon_type
== 'mgr')
1231 # following bits of _do_upgrade are for completing upgrade for given
1232 # types. If we haven't actually finished upgrading all the daemons
1233 # of this type, we should exit the loop here
1234 _
, n1
, n2
, _
= self
._detect
_need
_upgrade
(
1235 self
.mgr
.cache
.get_daemons_by_type(daemon_type
), target_digests
, target_image
)
1239 # complete mon upgrade?
1240 if daemon_type
== 'mon':
1241 if not self
.mgr
.get("have_local_config_map"):
1242 logger
.info('Upgrade: Restarting mgr now that mons are running pacific')
1243 need_upgrade_self
= True
1245 self
._handle
_need
_upgrade
_self
(need_upgrade_self
, daemon_type
== 'mgr')
1247 # make sure 'ceph versions' agrees
1248 ret
, out_ver
, err
= self
.mgr
.check_mon_command({
1249 'prefix': 'versions',
1251 j
= json
.loads(out_ver
)
1252 for version
, count
in j
.get(daemon_type
, {}).items():
1253 short_version
= version
.split(' ')[2]
1254 if short_version
!= target_version
:
1256 'Upgrade: %d %s daemon(s) are %s != target %s' %
1257 (count
, daemon_type
, short_version
, target_version
))
1259 self
._set
_container
_images
(daemon_type
, target_image
, image_settings
)
1261 # complete osd upgrade?
1262 if daemon_type
== 'osd':
1263 self
._complete
_osd
_upgrade
(target_major
, target_major_name
)
1265 # complete mds upgrade?
1266 if daemon_type
== 'mds':
1267 self
._complete
_mds
_upgrade
()
1269 # Make sure all metadata is up to date before saying we are done upgrading this daemon type
1270 if self
.mgr
.use_agent
and not self
.mgr
.cache
.all_host_metadata_up_to_date():
1271 self
.mgr
.agent_helpers
._request
_ack
_all
_not
_up
_to
_date
()
1274 logger
.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type
)
1277 logger
.info('Upgrade: Finalizing container_image settings')
1278 self
.mgr
.set_container_image('global', target_image
)
1280 for daemon_type
in CEPH_UPGRADE_ORDER
:
1281 ret
, image
, err
= self
.mgr
.check_mon_command({
1282 'prefix': 'config rm',
1283 'name': 'container_image',
1284 'who': name_to_config_section(daemon_type
),
1287 self
.mgr
.check_mon_command({
1288 'prefix': 'config rm',
1289 'name': 'mon_mds_skip_sanity',
1293 self
._mark
_upgrade
_complete
()