5 from typing
import TYPE_CHECKING
, Optional
, Dict
, List
, Tuple
, Any
8 from cephadm
.registry
import Registry
9 from cephadm
.serve
import CephadmServe
10 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
11 from cephadm
.utils
import ceph_release_to_major
, name_to_config_section
, CEPH_UPGRADE_ORDER
, \
12 MONITORING_STACK_TYPES
, CEPH_TYPES
, GATEWAY_TYPES
13 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
, daemon_type_to_service
16 from .module
import CephadmOrchestrator
19 logger
= logging
.getLogger(__name__
)
22 CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
= (1 << 5)
25 def normalize_image_digest(digest
: str, default_registry
: str) -> str:
28 >>> normalize_image_digest('ceph/ceph', 'docker.io')
32 >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io')
33 'quay.ceph.io/ceph/ceph'
35 >>> normalize_image_digest('docker.io/ubuntu', 'docker.io')
38 >>> normalize_image_digest('localhost/ceph', 'docker.io')
46 for image
in known_shortnames
:
47 if digest
.startswith(image
):
48 return f
'{default_registry}/{digest}'
56 target_id
: Optional
[str] = None,
57 target_digests
: Optional
[List
[str]] = None,
58 target_version
: Optional
[str] = None,
59 error
: Optional
[str] = None,
60 paused
: Optional
[bool] = None,
61 fs_original_max_mds
: Optional
[Dict
[str, int]] = None,
62 fs_original_allow_standby_replay
: Optional
[Dict
[str, bool]] = None,
63 daemon_types
: Optional
[List
[str]] = None,
64 hosts
: Optional
[List
[str]] = None,
65 services
: Optional
[List
[str]] = None,
66 total_count
: Optional
[int] = None,
67 remaining_count
: Optional
[int] = None,
69 self
._target
_name
: str = target_name
# Use CephadmUpgrade.target_image instead.
70 self
.progress_id
: str = progress_id
71 self
.target_id
: Optional
[str] = target_id
72 self
.target_digests
: Optional
[List
[str]] = target_digests
73 self
.target_version
: Optional
[str] = target_version
74 self
.error
: Optional
[str] = error
75 self
.paused
: bool = paused
or False
76 self
.fs_original_max_mds
: Optional
[Dict
[str, int]] = fs_original_max_mds
77 self
.fs_original_allow_standby_replay
: Optional
[Dict
[str,
78 bool]] = fs_original_allow_standby_replay
79 self
.daemon_types
= daemon_types
81 self
.services
= services
82 self
.total_count
= total_count
83 self
.remaining_count
= remaining_count
85 def to_json(self
) -> dict:
87 'target_name': self
._target
_name
,
88 'progress_id': self
.progress_id
,
89 'target_id': self
.target_id
,
90 'target_digests': self
.target_digests
,
91 'target_version': self
.target_version
,
92 'fs_original_max_mds': self
.fs_original_max_mds
,
93 'fs_original_allow_standby_replay': self
.fs_original_allow_standby_replay
,
95 'paused': self
.paused
,
96 'daemon_types': self
.daemon_types
,
98 'services': self
.services
,
99 'total_count': self
.total_count
,
100 'remaining_count': self
.remaining_count
,
104 def from_json(cls
, data
: dict) -> Optional
['UpgradeState']:
105 valid_params
= UpgradeState
.__init
__.__code
__.co_varnames
107 c
= {k
: v
for k
, v
in data
.items() if k
in valid_params
}
108 if 'repo_digest' in c
:
109 c
['target_digests'] = [c
.pop('repo_digest')]
115 class CephadmUpgrade
:
117 'UPGRADE_NO_STANDBY_MGR',
118 'UPGRADE_FAILED_PULL',
119 'UPGRADE_REDEPLOY_DAEMON',
120 'UPGRADE_BAD_TARGET_VERSION',
124 def __init__(self
, mgr
: "CephadmOrchestrator"):
127 t
= self
.mgr
.get_store('upgrade_state')
129 self
.upgrade_state
: Optional
[UpgradeState
] = UpgradeState
.from_json(json
.loads(t
))
131 self
.upgrade_state
= None
134 def target_image(self
) -> str:
135 assert self
.upgrade_state
136 if not self
.mgr
.use_repo_digest
:
137 return self
.upgrade_state
._target
_name
138 if not self
.upgrade_state
.target_digests
:
139 return self
.upgrade_state
._target
_name
141 # FIXME: we assume the first digest is the best one to use
142 return self
.upgrade_state
.target_digests
[0]
144 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
145 r
= orchestrator
.UpgradeStatusSpec()
146 if self
.upgrade_state
:
147 r
.target_image
= self
.target_image
149 r
.progress
, r
.services_complete
= self
._get
_upgrade
_info
()
150 r
.is_paused
= self
.upgrade_state
.paused
152 if self
.upgrade_state
.daemon_types
is not None:
153 which_str
= f
'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
154 if self
.upgrade_state
.hosts
is not None:
155 which_str
+= f
' on host(s) {",".join(self.upgrade_state.hosts)}'
156 elif self
.upgrade_state
.services
is not None:
157 which_str
= f
'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
158 if self
.upgrade_state
.hosts
is not None:
159 which_str
+= f
' on host(s) {",".join(self.upgrade_state.hosts)}'
160 elif self
.upgrade_state
.hosts
is not None:
161 which_str
= f
'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
163 which_str
= 'Upgrading all daemon types on all hosts'
164 if self
.upgrade_state
.total_count
is not None and self
.upgrade_state
.remaining_count
is not None:
165 which_str
+= f
'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
168 # accessing self.upgrade_info_str will throw an exception if it
169 # has not been set in _do_upgrade yet
171 r
.message
= self
.upgrade_info_str
172 except AttributeError:
174 if self
.upgrade_state
.error
:
175 r
.message
= 'Error: ' + self
.upgrade_state
.error
176 elif self
.upgrade_state
.paused
:
177 r
.message
= 'Upgrade paused'
180 def _get_upgrade_info(self
) -> Tuple
[str, List
[str]]:
181 if not self
.upgrade_state
or not self
.upgrade_state
.target_digests
:
184 daemons
= self
._get
_filtered
_daemons
()
186 if any(not d
.container_image_digests
for d
in daemons
if d
.daemon_type
== 'mgr'):
189 completed_daemons
= [(d
.daemon_type
, any(d
in self
.upgrade_state
.target_digests
for d
in (
190 d
.container_image_digests
or []))) for d
in daemons
if d
.daemon_type
]
192 done
= len([True for completion
in completed_daemons
if completion
[1]])
194 completed_types
= list(set([completion
[0] for completion
in completed_daemons
if all(
195 c
[1] for c
in completed_daemons
if c
[0] == completion
[0])]))
197 return '%s/%s daemons upgraded' % (done
, len(daemons
)), completed_types
199 def _get_filtered_daemons(self
) -> List
[DaemonDescription
]:
200 # Return the set of daemons set to be upgraded with out current
201 # filtering parameters (or all daemons in upgrade order if no filtering
202 # parameter are set).
203 assert self
.upgrade_state
is not None
204 if self
.upgrade_state
.daemon_types
is not None:
205 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
206 ) if d
.daemon_type
in self
.upgrade_state
.daemon_types
]
207 elif self
.upgrade_state
.services
is not None:
209 for service
in self
.upgrade_state
.services
:
210 daemons
+= self
.mgr
.cache
.get_daemons_by_service(service
)
212 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
213 ) if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
214 if self
.upgrade_state
.hosts
is not None:
215 daemons
= [d
for d
in daemons
if d
.hostname
in self
.upgrade_state
.hosts
]
218 def _get_current_version(self
) -> Tuple
[int, int, str]:
219 current_version
= self
.mgr
.version
.split('ceph version ')[1]
220 (current_major
, current_minor
, _
) = current_version
.split('-')[0].split('.', 2)
221 return (int(current_major
), int(current_minor
), current_version
)
223 def _check_target_version(self
, version
: str) -> Optional
[str]:
225 v
= version
.split('.', 2)
226 (major
, minor
) = (int(v
[0]), int(v
[1]))
228 # patch might be a number or {number}-g{sha1}
230 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
231 if major
< 15 or (major
== 15 and minor
< 2):
232 return 'cephadm only supports octopus (15.2.0) or later'
235 (current_major
, current_minor
, current_version
) = self
._get
_current
_version
()
236 if current_major
< major
- 2:
237 return f
'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
238 if current_major
> major
:
239 return f
'ceph cannot downgrade major versions (from {current_version} to {version})'
240 if current_major
== major
:
241 if current_minor
> minor
:
242 return f
'ceph cannot downgrade to a {"rc" if minor == 1 else "dev"} release'
245 monmap
= self
.mgr
.get("mon_map")
246 mon_min
= monmap
.get("min_mon_release", 0)
247 if mon_min
< major
- 2:
248 return f
'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
251 osdmap
= self
.mgr
.get("osd_map")
252 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
253 osd_min
= ceph_release_to_major(osd_min_name
)
254 if osd_min
< major
- 2:
255 return f
'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
259 def upgrade_ls(self
, image
: Optional
[str], tags
: bool, show_all_versions
: Optional
[bool]) -> Dict
:
261 image
= self
.mgr
.container_image_base
262 reg_name
, bare_image
= image
.split('/', 1)
263 reg
= Registry(reg_name
)
264 (current_major
, current_minor
, _
) = self
._get
_current
_version
()
266 r
: Dict
[Any
, Any
] = {
268 "registry": reg_name
,
269 "bare_image": bare_image
,
273 ls
= reg
.get_tags(bare_image
)
274 except ValueError as e
:
275 raise OrchestratorError(f
'{e}')
287 candidate_version
= (v_major
> current_major
288 or (v_major
== current_major
and v_minor
>= current_minor
))
289 if show_all_versions
or candidate_version
:
290 versions
.append('.'.join(v
))
291 r
["versions"] = sorted(
293 key
=lambda k
: list(map(int, k
.split('.'))),
297 r
["tags"] = sorted(ls
)
300 def upgrade_start(self
, image
: str, version
: str, daemon_types
: Optional
[List
[str]] = None,
301 hosts
: Optional
[List
[str]] = None, services
: Optional
[List
[str]] = None, limit
: Optional
[int] = None) -> str:
302 if self
.mgr
.mode
!= 'root':
303 raise OrchestratorError('upgrade is not supported in %s mode' % (
306 version_error
= self
._check
_target
_version
(version
)
308 raise OrchestratorError(version_error
)
309 target_name
= self
.mgr
.container_image_base
+ ':v' + version
311 target_name
= normalize_image_digest(image
, self
.mgr
.default_registry
)
313 raise OrchestratorError('must specify either image or version')
315 if daemon_types
is not None or services
is not None or hosts
is not None:
316 self
._validate
_upgrade
_filters
(target_name
, daemon_types
, hosts
, services
)
318 if self
.upgrade_state
:
319 if self
.upgrade_state
._target
_name
!= target_name
:
320 raise OrchestratorError(
321 'Upgrade to %s (not %s) already in progress' %
322 (self
.upgrade_state
._target
_name
, target_name
))
323 if self
.upgrade_state
.paused
:
324 self
.upgrade_state
.paused
= False
325 self
._save
_upgrade
_state
()
326 return 'Resumed upgrade to %s' % self
.target_image
327 return 'Upgrade to %s in progress' % self
.target_image
329 running_mgr_count
= len([daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type(
330 'mgr') if daemon
.status
== DaemonDescriptionStatus
.running
])
332 if running_mgr_count
< 2:
333 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
335 self
.mgr
.log
.info('Upgrade: Started with target %s' % target_name
)
336 self
.upgrade_state
= UpgradeState(
337 target_name
=target_name
,
338 progress_id
=str(uuid
.uuid4()),
339 daemon_types
=daemon_types
,
343 remaining_count
=limit
,
345 self
._update
_upgrade
_progress
(0.0)
346 self
._save
_upgrade
_state
()
347 self
._clear
_upgrade
_health
_checks
()
349 return 'Initiating upgrade to %s' % (target_name
)
351 def _validate_upgrade_filters(self
, target_name
: str, daemon_types
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None, services
: Optional
[List
[str]] = None) -> None:
352 def _latest_type(dtypes
: List
[str]) -> str:
353 # [::-1] gives the list in reverse
354 for daemon_type
in CEPH_UPGRADE_ORDER
[::-1]:
355 if daemon_type
in dtypes
:
359 def _get_earlier_daemons(dtypes
: List
[str], candidates
: List
[DaemonDescription
]) -> List
[DaemonDescription
]:
360 # this function takes a list of daemon types and first finds the daemon
361 # type from that list that is latest in our upgrade order. Then, from
362 # that latest type, it filters the list of candidate daemons received
363 # for daemons with types earlier in the upgrade order than the latest
364 # type found earlier. That filtered list of daemons is returned. The
365 # purpose of this function is to help in finding daemons that must have
366 # already been upgraded for the given filtering parameters (--daemon-types,
367 # --services, --hosts) to be valid.
368 latest
= _latest_type(dtypes
)
371 earlier_types
= '|'.join(CEPH_UPGRADE_ORDER
).split(latest
)[0].split('|')[:-1]
372 earlier_types
= [t
for t
in earlier_types
if t
not in dtypes
]
373 return [d
for d
in candidates
if d
.daemon_type
in earlier_types
]
375 if self
.upgrade_state
:
376 raise OrchestratorError(
377 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
379 target_id
, target_version
, target_digests
= self
.mgr
.wait_async(
380 CephadmServe(self
.mgr
)._get
_container
_image
_info
(target_name
))
381 except OrchestratorError
as e
:
382 raise OrchestratorError(f
'Failed to pull {target_name}: {str(e)}')
383 # what we need to do here is build a list of daemons that must already be upgraded
384 # in order for the user's selection of daemons to upgrade to be valid. for example,
385 # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
386 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
387 ) if d
.daemon_type
not in MONITORING_STACK_TYPES
]
388 err_msg_base
= 'Cannot start upgrade. '
389 # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
391 if daemon_types
is not None:
392 dtypes
= daemon_types
393 if hosts
is not None:
394 dtypes
= [_latest_type(dtypes
)]
395 other_host_daemons
= [
396 d
for d
in daemons
if d
.hostname
is not None and d
.hostname
not in hosts
]
397 daemons
= _get_earlier_daemons(dtypes
, other_host_daemons
)
399 daemons
= _get_earlier_daemons(dtypes
, daemons
)
400 err_msg_base
+= 'Daemons with types earlier in upgrade order than given types need upgrading.\n'
401 elif services
is not None:
402 # for our purposes here we can effectively convert our list of services into the
403 # set of daemon types the services contain. This works because we don't allow --services
404 # and --daemon-types at the same time and we only allow services of the same type
406 self
.mgr
.spec_store
[s
].spec
for s
in services
if self
.mgr
.spec_store
[s
].spec
is not None]
407 stypes
= list(set([s
.service_type
for s
in sspecs
]))
409 raise OrchestratorError('Doing upgrade by service only support services of one type at '
410 f
'a time. Found service types: {stypes}')
412 dtypes
+= orchestrator
.service_to_daemon_types(stype
)
413 dtypes
= list(set(dtypes
))
414 if hosts
is not None:
415 other_host_daemons
= [
416 d
for d
in daemons
if d
.hostname
is not None and d
.hostname
not in hosts
]
417 daemons
= _get_earlier_daemons(dtypes
, other_host_daemons
)
419 daemons
= _get_earlier_daemons(dtypes
, daemons
)
420 err_msg_base
+= 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n'
421 elif hosts
is not None:
422 # hosts must be handled a bit differently. For this, we really need to find all the daemon types
423 # that reside on hosts in the list of hosts we will upgrade. Then take the type from
424 # that list that is latest in the upgrade order and check if any daemons on hosts not in the
425 # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
427 set([d
.daemon_type
for d
in daemons
if d
.daemon_type
is not None and d
.hostname
in hosts
]))
428 other_hosts_daemons
= [
429 d
for d
in daemons
if d
.hostname
is not None and d
.hostname
not in hosts
]
430 daemons
= _get_earlier_daemons([_latest_type(dtypes
)], other_hosts_daemons
)
431 err_msg_base
+= 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
432 need_upgrade_self
, n1
, n2
, _
= self
._detect
_need
_upgrade
(daemons
, target_digests
)
433 if need_upgrade_self
and ('mgr' not in dtypes
or (daemon_types
is None and services
is None)):
434 # also report active mgr as needing to be upgraded. It is not included in the resulting list
435 # by default as it is treated special and handled via the need_upgrade_self bool
436 n1
.insert(0, (self
.mgr
.mgr_service
.get_active_daemon(
437 self
.mgr
.cache
.get_daemons_by_type('mgr')), True))
439 raise OrchestratorError(f
'{err_msg_base}Please first upgrade '
440 f
'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
441 f
'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}')
443 def upgrade_pause(self
) -> str:
444 if not self
.upgrade_state
:
445 raise OrchestratorError('No upgrade in progress')
446 if self
.upgrade_state
.paused
:
447 return 'Upgrade to %s already paused' % self
.target_image
448 self
.upgrade_state
.paused
= True
449 self
.mgr
.log
.info('Upgrade: Paused upgrade to %s' % self
.target_image
)
450 self
._save
_upgrade
_state
()
451 return 'Paused upgrade to %s' % self
.target_image
453 def upgrade_resume(self
) -> str:
454 if not self
.upgrade_state
:
455 raise OrchestratorError('No upgrade in progress')
456 if not self
.upgrade_state
.paused
:
457 return 'Upgrade to %s not paused' % self
.target_image
458 self
.upgrade_state
.paused
= False
459 self
.upgrade_state
.error
= ''
460 self
.mgr
.log
.info('Upgrade: Resumed upgrade to %s' % self
.target_image
)
461 self
._save
_upgrade
_state
()
463 return 'Resumed upgrade to %s' % self
.target_image
465 def upgrade_stop(self
) -> str:
466 if not self
.upgrade_state
:
467 return 'No upgrade in progress'
468 if self
.upgrade_state
.progress_id
:
469 self
.mgr
.remote('progress', 'complete',
470 self
.upgrade_state
.progress_id
)
471 target_image
= self
.target_image
472 self
.mgr
.log
.info('Upgrade: Stopped')
473 self
.upgrade_state
= None
474 self
._save
_upgrade
_state
()
475 self
._clear
_upgrade
_health
_checks
()
477 return 'Stopped upgrade to %s' % target_image
479 def continue_upgrade(self
) -> bool:
481 Returns false, if nothing was done.
484 if self
.upgrade_state
and not self
.upgrade_state
.paused
:
487 except Exception as e
:
488 self
._fail
_upgrade
('UPGRADE_EXCEPTION', {
490 'summary': 'Upgrade: failed due to an unexpected exception',
492 'detail': [f
'Unexpected exception occurred during upgrade process: {str(e)}'],
498 def _wait_for_ok_to_stop(
499 self
, s
: DaemonDescription
,
500 known
: Optional
[List
[str]] = None, # NOTE: output argument!
502 # only wait a little bit; the service might go away for something
503 assert s
.daemon_type
is not None
504 assert s
.daemon_id
is not None
507 if not self
.upgrade_state
or self
.upgrade_state
.paused
:
510 # setting force flag to retain old functionality.
511 # note that known is an output argument for ok_to_stop()
512 r
= self
.mgr
.cephadm_services
[daemon_type_to_service(s
.daemon_type
)].ok_to_stop([
513 s
.daemon_id
], known
=known
, force
=True)
516 logger
.info(f
'Upgrade: {r.stdout}')
518 logger
.info(f
'Upgrade: {r.stderr}')
524 def _clear_upgrade_health_checks(self
) -> None:
525 for k
in self
.UPGRADE_ERRORS
:
526 if k
in self
.mgr
.health_checks
:
527 del self
.mgr
.health_checks
[k
]
528 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
530 def _fail_upgrade(self
, alert_id
: str, alert
: dict) -> None:
531 assert alert_id
in self
.UPGRADE_ERRORS
532 if not self
.upgrade_state
:
533 # this could happen if the user canceled the upgrade while we
534 # were doing something
537 logger
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
539 self
.upgrade_state
.error
= alert_id
+ ': ' + alert
['summary']
540 self
.upgrade_state
.paused
= True
541 self
._save
_upgrade
_state
()
542 self
.mgr
.health_checks
[alert_id
] = alert
543 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
545 def _update_upgrade_progress(self
, progress
: float) -> None:
546 if not self
.upgrade_state
:
547 assert False, 'No upgrade in progress'
549 if not self
.upgrade_state
.progress_id
:
550 self
.upgrade_state
.progress_id
= str(uuid
.uuid4())
551 self
._save
_upgrade
_state
()
552 self
.mgr
.remote('progress', 'update', self
.upgrade_state
.progress_id
,
553 ev_msg
='Upgrade to %s' % (
554 self
.upgrade_state
.target_version
or self
.target_image
556 ev_progress
=progress
,
559 def _save_upgrade_state(self
) -> None:
560 if not self
.upgrade_state
:
561 self
.mgr
.set_store('upgrade_state', None)
563 self
.mgr
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
.to_json()))
565 def get_distinct_container_image_settings(self
) -> Dict
[str, str]:
566 # get all distinct container_image settings
568 ret
, out
, err
= self
.mgr
.check_mon_command({
569 'prefix': 'config dump',
572 config
= json
.loads(out
)
574 if opt
['name'] == 'container_image':
575 image_settings
[opt
['section']] = opt
['value']
576 return image_settings
578 def _prepare_for_mds_upgrade(
581 need_upgrade
: List
[DaemonDescription
]
583 # scale down all filesystems to 1 MDS
584 assert self
.upgrade_state
585 if not self
.upgrade_state
.fs_original_max_mds
:
586 self
.upgrade_state
.fs_original_max_mds
= {}
587 if not self
.upgrade_state
.fs_original_allow_standby_replay
:
588 self
.upgrade_state
.fs_original_allow_standby_replay
= {}
589 fsmap
= self
.mgr
.get("fs_map")
590 continue_upgrade
= True
591 for fs
in fsmap
.get('filesystems', []):
593 mdsmap
= fs
["mdsmap"]
594 fs_name
= mdsmap
["fs_name"]
596 # disable allow_standby_replay?
597 if mdsmap
['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY
:
598 self
.mgr
.log
.info('Upgrade: Disabling standby-replay for filesystem %s' % (
601 if fscid
not in self
.upgrade_state
.fs_original_allow_standby_replay
:
602 self
.upgrade_state
.fs_original_allow_standby_replay
[fscid
] = True
603 self
._save
_upgrade
_state
()
604 ret
, out
, err
= self
.mgr
.check_mon_command({
607 'var': 'allow_standby_replay',
610 continue_upgrade
= False
613 # scale down this filesystem?
614 if mdsmap
["max_mds"] > 1:
615 self
.mgr
.log
.info('Upgrade: Scaling down filesystem %s' % (
618 if fscid
not in self
.upgrade_state
.fs_original_max_mds
:
619 self
.upgrade_state
.fs_original_max_mds
[fscid
] = mdsmap
['max_mds']
620 self
._save
_upgrade
_state
()
621 ret
, out
, err
= self
.mgr
.check_mon_command({
627 continue_upgrade
= False
630 if not (mdsmap
['in'] == [0] and len(mdsmap
['up']) <= 1):
632 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name
))
634 continue_upgrade
= False
637 if len(mdsmap
['up']) == 0:
638 self
.mgr
.log
.warning(
639 "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
640 # This can happen because the current version MDS have
641 # incompatible compatsets; the mons will not do any promotions.
642 # We must upgrade to continue.
643 elif len(mdsmap
['up']) > 0:
644 mdss
= list(mdsmap
['info'].values())
645 assert len(mdss
) == 1
647 if lone_mds
['state'] != 'up:active':
648 self
.mgr
.log
.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
653 continue_upgrade
= False
658 return continue_upgrade
660 def _enough_mons_for_ok_to_stop(self
) -> bool:
662 ret
, out
, err
= self
.mgr
.check_mon_command({
663 'prefix': 'quorum_status',
668 raise OrchestratorError('failed to parse quorum status')
670 mons
= [m
['name'] for m
in j
['monmap']['mons']]
673 def _enough_mds_for_ok_to_stop(self
, mds_daemon
: DaemonDescription
) -> bool:
674 # type (DaemonDescription) -> bool
676 # find fs this mds daemon belongs to
677 fsmap
= self
.mgr
.get("fs_map")
678 for fs
in fsmap
.get('filesystems', []):
679 mdsmap
= fs
["mdsmap"]
680 fs_name
= mdsmap
["fs_name"]
682 assert mds_daemon
.daemon_id
683 if fs_name
!= mds_daemon
.service_name().split('.', 1)[1]:
684 # wrong fs for this mds daemon
687 # get number of mds daemons for this fs
689 [daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_service(mds_daemon
.service_name())])
691 # standby mds daemons for this fs?
692 if mdsmap
["max_mds"] < mds_count
:
696 return True # if mds has no fs it should pass ok-to-stop
698 def _detect_need_upgrade(self
, daemons
: List
[DaemonDescription
], target_digests
: Optional
[List
[str]] = None) -> Tuple
[bool, List
[Tuple
[DaemonDescription
, bool]], List
[Tuple
[DaemonDescription
, bool]], int]:
699 # this function takes a list of daemons and container digests. The purpose
700 # is to go through each daemon and check if the current container digests
701 # for that daemon match the target digests. The purpose being that we determine
702 # if a daemon is upgraded to a certain container image or not based on what
703 # container digests it has. By checking the current digests against the
704 # targets we can determine which daemons still need to be upgraded
705 need_upgrade_self
= False
706 need_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
707 need_upgrade_deployer
: List
[Tuple
[DaemonDescription
, bool]] = []
709 if target_digests
is None:
712 assert d
.daemon_type
is not None
713 assert d
.daemon_id
is not None
714 assert d
.hostname
is not None
715 if self
.mgr
.use_agent
and not self
.mgr
.cache
.host_metadata_up_to_date(d
.hostname
):
717 correct_digest
= False
718 if (any(d
in target_digests
for d
in (d
.container_image_digests
or []))
719 or d
.daemon_type
in MONITORING_STACK_TYPES
):
720 logger
.debug('daemon %s.%s container digest correct' % (
721 d
.daemon_type
, d
.daemon_id
))
722 correct_digest
= True
723 if any(d
in target_digests
for d
in (d
.deployed_by
or [])):
724 logger
.debug('daemon %s.%s deployed by correct version' % (
725 d
.daemon_type
, d
.daemon_id
))
729 if self
.mgr
.daemon_is_self(d
.daemon_type
, d
.daemon_id
):
730 logger
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
731 self
.mgr
.get_mgr_id())
732 need_upgrade_self
= True
736 logger
.debug('daemon %s.%s not deployed by correct version' % (
737 d
.daemon_type
, d
.daemon_id
))
738 need_upgrade_deployer
.append((d
, True))
740 logger
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
741 d
.daemon_type
, d
.daemon_id
,
742 d
.container_image_name
, d
.container_image_digests
, d
.version
))
743 need_upgrade
.append((d
, False))
745 return (need_upgrade_self
, need_upgrade
, need_upgrade_deployer
, done
)
747 def _to_upgrade(self
, need_upgrade
: List
[Tuple
[DaemonDescription
, bool]], target_image
: str) -> Tuple
[bool, List
[Tuple
[DaemonDescription
, bool]]]:
748 to_upgrade
: List
[Tuple
[DaemonDescription
, bool]] = []
749 known_ok_to_stop
: List
[str] = []
750 for d_entry
in need_upgrade
:
752 assert d
.daemon_type
is not None
753 assert d
.daemon_id
is not None
754 assert d
.hostname
is not None
756 if not d
.container_image_id
:
757 if d
.container_image_name
== target_image
:
759 'daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
763 if d
.name() in known_ok_to_stop
:
764 logger
.info(f
'Upgrade: {d.name()} is also safe to restart')
765 to_upgrade
.append(d_entry
)
768 if d
.daemon_type
== 'osd':
769 # NOTE: known_ok_to_stop is an output argument for
770 # _wait_for_ok_to_stop
771 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
772 return False, to_upgrade
774 if d
.daemon_type
== 'mon' and self
._enough
_mons
_for
_ok
_to
_stop
():
775 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
776 return False, to_upgrade
778 if d
.daemon_type
== 'mds' and self
._enough
_mds
_for
_ok
_to
_stop
(d
):
779 if not self
._wait
_for
_ok
_to
_stop
(d
, known_ok_to_stop
):
780 return False, to_upgrade
782 to_upgrade
.append(d_entry
)
784 # if we don't have a list of others to consider, stop now
785 if d
.daemon_type
in ['osd', 'mds', 'mon'] and not known_ok_to_stop
:
787 return True, to_upgrade
789 def _upgrade_daemons(self
, to_upgrade
: List
[Tuple
[DaemonDescription
, bool]], target_image
: str, target_digests
: Optional
[List
[str]] = None) -> None:
790 assert self
.upgrade_state
is not None
792 if target_digests
is None:
794 for d_entry
in to_upgrade
:
795 if self
.upgrade_state
.remaining_count
is not None and self
.upgrade_state
.remaining_count
<= 0 and not d_entry
[1]:
797 f
'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
800 assert d
.daemon_type
is not None
801 assert d
.daemon_id
is not None
802 assert d
.hostname
is not None
804 # make sure host has latest container image
805 out
, errs
, code
= self
.mgr
.wait_async(CephadmServe(self
.mgr
)._run
_cephadm
(
806 d
.hostname
, '', 'inspect-image', [],
807 image
=target_image
, no_fsid
=True, error_ok
=True))
808 if code
or not any(d
in target_digests
for d
in json
.loads(''.join(out
)).get('repo_digests', [])):
809 logger
.info('Upgrade: Pulling %s on %s' % (target_image
,
811 self
.upgrade_info_str
= 'Pulling %s image on host %s' % (
812 target_image
, d
.hostname
)
813 out
, errs
, code
= self
.mgr
.wait_async(CephadmServe(self
.mgr
)._run
_cephadm
(
814 d
.hostname
, '', 'pull', [],
815 image
=target_image
, no_fsid
=True, error_ok
=True))
817 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
818 'severity': 'warning',
819 'summary': 'Upgrade: failed to pull target image',
822 'failed to pull %s on host %s' % (target_image
,
826 r
= json
.loads(''.join(out
))
827 if not any(d
in target_digests
for d
in r
.get('repo_digests', [])):
828 logger
.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
829 target_image
, d
.hostname
, r
['repo_digests'], target_digests
))
830 self
.upgrade_info_str
= 'Image %s pull on %s got new digests %s (not %s), restarting' % (
831 target_image
, d
.hostname
, r
['repo_digests'], target_digests
)
832 self
.upgrade_state
.target_digests
= r
['repo_digests']
833 self
._save
_upgrade
_state
()
836 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (d
.daemon_type
)
838 if len(to_upgrade
) > 1:
839 logger
.info('Upgrade: Updating %s.%s (%d/%d)' % (d
.daemon_type
, d
.daemon_id
, num
, min(len(to_upgrade
),
840 self
.upgrade_state
.remaining_count
if self
.upgrade_state
.remaining_count
is not None else 9999999)))
842 logger
.info('Upgrade: Updating %s.%s' %
843 (d
.daemon_type
, d
.daemon_id
))
844 action
= 'Upgrading' if not d_entry
[1] else 'Redeploying'
846 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(d
)
847 self
.mgr
._daemon
_action
(
850 image
=target_image
if not d_entry
[1] else None
852 self
.mgr
.cache
.metadata_up_to_date
[d
.hostname
] = False
853 except Exception as e
:
854 self
._fail
_upgrade
('UPGRADE_REDEPLOY_DAEMON', {
855 'severity': 'warning',
856 'summary': f
'{action} daemon {d.name()} on host {d.hostname} failed.',
859 f
'Upgrade daemon: {d.name()}: {e}'
864 if self
.upgrade_state
.remaining_count
is not None and not d_entry
[1]:
865 self
.upgrade_state
.remaining_count
-= 1
866 self
._save
_upgrade
_state
()
868 def _handle_need_upgrade_self(self
, need_upgrade_self
: bool, upgrading_mgrs
: bool) -> None:
869 if need_upgrade_self
:
871 self
.mgr
.mgr_service
.fail_over()
872 except OrchestratorError
as e
:
873 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
874 'severity': 'warning',
875 'summary': f
'Upgrade: {e}',
878 'The upgrade process needs to upgrade the mgr, '
879 'but it needs at least one standby to proceed.',
884 return # unreachable code, as fail_over never returns
886 if 'UPGRADE_NO_STANDBY_MGR' in self
.mgr
.health_checks
:
887 del self
.mgr
.health_checks
['UPGRADE_NO_STANDBY_MGR']
888 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
890 def _set_container_images(self
, daemon_type
: str, target_image
: str, image_settings
: Dict
[str, str]) -> None:
892 daemon_type_section
= name_to_config_section(daemon_type
)
893 if image_settings
.get(daemon_type_section
) != target_image
:
894 logger
.info('Upgrade: Setting container_image for all %s' %
896 self
.mgr
.set_container_image(daemon_type_section
, target_image
)
898 for section
in image_settings
.keys():
899 if section
.startswith(name_to_config_section(daemon_type
) + '.'):
900 to_clean
.append(section
)
902 logger
.debug('Upgrade: Cleaning up container_image for %s' %
904 for section
in to_clean
:
905 ret
, image
, err
= self
.mgr
.check_mon_command({
906 'prefix': 'config rm',
907 'name': 'container_image',
911 def _complete_osd_upgrade(self
, target_major
: str, target_major_name
: str) -> None:
912 osdmap
= self
.mgr
.get("osd_map")
913 osd_min_name
= osdmap
.get("require_osd_release", "argonaut")
914 osd_min
= ceph_release_to_major(osd_min_name
)
915 if osd_min
< int(target_major
):
917 f
'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
918 ret
, _
, err
= self
.mgr
.check_mon_command({
919 'prefix': 'osd require-osd-release',
920 'release': target_major_name
,
923 def _complete_mds_upgrade(self
) -> None:
924 assert self
.upgrade_state
is not None
925 if self
.upgrade_state
.fs_original_max_mds
:
926 for fs
in self
.mgr
.get("fs_map")['filesystems']:
928 fs_name
= fs
['mdsmap']['fs_name']
929 new_max
= self
.upgrade_state
.fs_original_max_mds
.get(fscid
, 1)
931 self
.mgr
.log
.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
934 ret
, _
, err
= self
.mgr
.check_mon_command({
941 self
.upgrade_state
.fs_original_max_mds
= {}
942 self
._save
_upgrade
_state
()
943 if self
.upgrade_state
.fs_original_allow_standby_replay
:
944 for fs
in self
.mgr
.get("fs_map")['filesystems']:
946 fs_name
= fs
['mdsmap']['fs_name']
947 asr
= self
.upgrade_state
.fs_original_allow_standby_replay
.get(fscid
, False)
949 self
.mgr
.log
.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
952 ret
, _
, err
= self
.mgr
.check_mon_command({
955 'var': 'allow_standby_replay',
959 self
.upgrade_state
.fs_original_allow_standby_replay
= {}
960 self
._save
_upgrade
_state
()
962 def _mark_upgrade_complete(self
) -> None:
963 if not self
.upgrade_state
:
964 logger
.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
966 logger
.info('Upgrade: Complete!')
967 if self
.upgrade_state
.progress_id
:
968 self
.mgr
.remote('progress', 'complete',
969 self
.upgrade_state
.progress_id
)
970 self
.upgrade_state
= None
971 self
._save
_upgrade
_state
()
973 def _do_upgrade(self
):
975 if not self
.upgrade_state
:
976 logger
.debug('_do_upgrade no state, exiting')
979 target_image
= self
.target_image
980 target_id
= self
.upgrade_state
.target_id
981 target_digests
= self
.upgrade_state
.target_digests
982 target_version
= self
.upgrade_state
.target_version
985 if not target_id
or not target_version
or not target_digests
:
986 # need to learn the container hash
987 logger
.info('Upgrade: First pull of %s' % target_image
)
988 self
.upgrade_info_str
= 'Doing first pull of %s image' % (target_image
)
990 target_id
, target_version
, target_digests
= self
.mgr
.wait_async(CephadmServe(self
.mgr
)._get
_container
_image
_info
(
992 except OrchestratorError
as e
:
993 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
994 'severity': 'warning',
995 'summary': 'Upgrade: failed to pull target image',
1000 if not target_version
:
1001 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
1002 'severity': 'warning',
1003 'summary': 'Upgrade: failed to pull target image',
1005 'detail': ['unable to extract ceph version from container'],
1008 self
.upgrade_state
.target_id
= target_id
1009 # extract the version portion of 'ceph version {version} ({sha1})'
1010 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
1011 self
.upgrade_state
.target_digests
= target_digests
1012 self
._save
_upgrade
_state
()
1013 target_image
= self
.target_image
1016 if target_digests
is None:
1018 if target_version
.startswith('ceph version '):
1019 # tolerate/fix upgrade state from older version
1020 self
.upgrade_state
.target_version
= target_version
.split(' ')[2]
1021 target_version
= self
.upgrade_state
.target_version
1022 (target_major
, _
) = target_version
.split('.', 1)
1023 target_major_name
= self
.mgr
.lookup_release_name(int(target_major
))
1026 logger
.info('Upgrade: Target is version %s (%s)' % (
1027 target_version
, target_major_name
))
1028 logger
.info('Upgrade: Target container is %s, digests %s' % (
1029 target_image
, target_digests
))
1031 version_error
= self
._check
_target
_version
(target_version
)
1033 self
._fail
_upgrade
('UPGRADE_BAD_TARGET_VERSION', {
1034 'severity': 'error',
1035 'summary': f
'Upgrade: cannot upgrade/downgrade to {target_version}',
1037 'detail': [version_error
],
1041 image_settings
= self
.get_distinct_container_image_settings()
1043 # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
1044 # MDSMap::compat for all fs. This is no longer the case beginning in
1045 # v16.2.5. We must disable the sanity checks during upgrade.
1046 # N.B.: we don't bother confirming the operator has not already
1047 # disabled this or saving the config value.
1048 self
.mgr
.check_mon_command({
1049 'prefix': 'config set',
1050 'name': 'mon_mds_skip_sanity',
1055 if self
.upgrade_state
.daemon_types
is not None:
1057 f
'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
1058 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
1059 ) if d
.daemon_type
in self
.upgrade_state
.daemon_types
]
1060 elif self
.upgrade_state
.services
is not None:
1062 f
'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
1064 for service
in self
.upgrade_state
.services
:
1065 daemons
+= self
.mgr
.cache
.get_daemons_by_service(service
)
1067 daemons
= [d
for d
in self
.mgr
.cache
.get_daemons(
1068 ) if d
.daemon_type
in CEPH_UPGRADE_ORDER
]
1069 if self
.upgrade_state
.hosts
is not None:
1070 logger
.debug(f
'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
1071 daemons
= [d
for d
in daemons
if d
.hostname
in self
.upgrade_state
.hosts
]
1072 upgraded_daemon_count
: int = 0
1073 for daemon_type
in CEPH_UPGRADE_ORDER
:
1074 if self
.upgrade_state
.remaining_count
is not None and self
.upgrade_state
.remaining_count
<= 0:
1075 # we hit our limit and should end the upgrade
1076 # except for cases where we only need to redeploy, but not actually upgrade
1077 # the image (which we don't count towards our limit). This case only occurs with mgr
1078 # and monitoring stack daemons. Additionally, this case is only valid if
1079 # the active mgr is already upgraded.
1080 if any(d
in target_digests
for d
in self
.mgr
.get_active_mgr_digests()):
1081 if daemon_type
not in MONITORING_STACK_TYPES
and daemon_type
!= 'mgr':
1084 self
._mark
_upgrade
_complete
()
1086 logger
.debug('Upgrade: Checking %s daemons' % daemon_type
)
1087 daemons_of_type
= [d
for d
in daemons
if d
.daemon_type
== daemon_type
]
1089 need_upgrade_self
, need_upgrade
, need_upgrade_deployer
, done
= self
._detect
_need
_upgrade
(
1090 daemons_of_type
, target_digests
)
1091 upgraded_daemon_count
+= done
1092 self
._update
_upgrade
_progress
(upgraded_daemon_count
/ len(daemons
))
1094 # make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios
1095 if daemon_type
== 'mgr' or daemon_type
in MONITORING_STACK_TYPES
:
1096 if any(d
in target_digests
for d
in self
.mgr
.get_active_mgr_digests()):
1097 need_upgrade_names
= [d
[0].name() for d
in need_upgrade
] + \
1098 [d
[0].name() for d
in need_upgrade_deployer
]
1099 dds
= [d
for d
in self
.mgr
.cache
.get_daemons_by_type(
1100 daemon_type
) if d
.name() not in need_upgrade_names
]
1101 need_upgrade_active
, n1
, n2
, __
= self
._detect
_need
_upgrade
(dds
, target_digests
)
1103 if not need_upgrade_self
and need_upgrade_active
:
1104 need_upgrade_self
= True
1105 need_upgrade_deployer
+= n2
1107 # no point in trying to redeploy with new version if active mgr is not on the new version
1108 need_upgrade_deployer
= []
1110 if not need_upgrade_self
:
1111 # only after the mgr itself is upgraded can we expect daemons to have
1112 # deployed_by == target_digests
1113 need_upgrade
+= need_upgrade_deployer
1115 # prepare filesystems for daemon upgrades?
1117 daemon_type
== 'mds'
1119 and not self
._prepare
_for
_mds
_upgrade
(target_major
, [d_entry
[0] for d_entry
in need_upgrade
])
1124 self
.upgrade_info_str
= 'Currently upgrading %s daemons' % (daemon_type
)
1126 _continue
, to_upgrade
= self
._to
_upgrade
(need_upgrade
, target_image
)
1129 self
._upgrade
_daemons
(to_upgrade
, target_image
, target_digests
)
1133 self
._handle
_need
_upgrade
_self
(need_upgrade_self
, daemon_type
== 'mgr')
1135 # following bits of _do_upgrade are for completing upgrade for given
1136 # types. If we haven't actually finished upgrading all the daemons
1137 # of this type, we should exit the loop here
1138 _
, n1
, n2
, _
= self
._detect
_need
_upgrade
(
1139 self
.mgr
.cache
.get_daemons_by_type(daemon_type
), target_digests
)
1143 # complete mon upgrade?
1144 if daemon_type
== 'mon':
1145 if not self
.mgr
.get("have_local_config_map"):
1146 logger
.info('Upgrade: Restarting mgr now that mons are running pacific')
1147 need_upgrade_self
= True
1149 self
._handle
_need
_upgrade
_self
(need_upgrade_self
, daemon_type
== 'mgr')
1151 # make sure 'ceph versions' agrees
1152 ret
, out_ver
, err
= self
.mgr
.check_mon_command({
1153 'prefix': 'versions',
1155 j
= json
.loads(out_ver
)
1156 for version
, count
in j
.get(daemon_type
, {}).items():
1157 short_version
= version
.split(' ')[2]
1158 if short_version
!= target_version
:
1160 'Upgrade: %d %s daemon(s) are %s != target %s' %
1161 (count
, daemon_type
, short_version
, target_version
))
1163 self
._set
_container
_images
(daemon_type
, target_image
, image_settings
)
1165 # complete osd upgrade?
1166 if daemon_type
== 'osd':
1167 self
._complete
_osd
_upgrade
(target_major
, target_major_name
)
1169 # complete mds upgrade?
1170 if daemon_type
== 'mds':
1171 self
._complete
_mds
_upgrade
()
1173 # Make sure all metadata is up to date before saying we are done upgrading this daemon type
1174 if self
.mgr
.use_agent
and not self
.mgr
.cache
.all_host_metadata_up_to_date():
1175 self
.mgr
.agent_helpers
._request
_ack
_all
_not
_up
_to
_date
()
1178 logger
.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type
)
1181 logger
.info('Upgrade: Finalizing container_image settings')
1182 self
.mgr
.set_container_image('global', target_image
)
1184 for daemon_type
in CEPH_UPGRADE_ORDER
:
1185 ret
, image
, err
= self
.mgr
.check_mon_command({
1186 'prefix': 'config rm',
1187 'name': 'container_image',
1188 'who': name_to_config_section(daemon_type
),
1191 self
.mgr
.check_mon_command({
1192 'prefix': 'config rm',
1193 'name': 'mon_mds_skip_sanity',
1197 self
._mark
_upgrade
_complete
()