]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/upgrade.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
CommitLineData
e306af50
TL
1import json
2import logging
3import time
4import uuid
a4b75251 5from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any
e306af50
TL
6
7import orchestrator
a4b75251 8from cephadm.registry import Registry
f67539c2
TL
9from cephadm.serve import CephadmServe
10from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
33c7a0ef
TL
11from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \
12 MONITORING_STACK_TYPES, CEPH_TYPES, GATEWAY_TYPES
39ae355f 13from cephadm.ssh import HostConnectionError
b3b6e05e 14from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
e306af50
TL
15
16if TYPE_CHECKING:
17 from .module import CephadmOrchestrator
18
19
e306af50
TL
20logger = logging.getLogger(__name__)
21
a4b75251
TL
22# from ceph_fs.h
23CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
24
f6b5b4d7 25
f67539c2 26def normalize_image_digest(digest: str, default_registry: str) -> str:
20effc67
TL
27 """
28 Normal case:
29 >>> normalize_image_digest('ceph/ceph', 'docker.io')
30 'docker.io/ceph/ceph'
31
32 No change:
33 >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io')
34 'quay.ceph.io/ceph/ceph'
35
36 >>> normalize_image_digest('docker.io/ubuntu', 'docker.io')
37 'docker.io/ubuntu'
38
39 >>> normalize_image_digest('localhost/ceph', 'docker.io')
40 'localhost/ceph'
41 """
42 known_shortnames = [
43 'ceph/ceph',
44 'ceph/daemon',
45 'ceph/daemon-base',
46 ]
47 for image in known_shortnames:
48 if digest.startswith(image):
49 return f'{default_registry}/{digest}'
f67539c2
TL
50 return digest
51
52
f6b5b4d7
TL
53class UpgradeState:
54 def __init__(self,
55 target_name: str,
56 progress_id: str,
57 target_id: Optional[str] = None,
f67539c2 58 target_digests: Optional[List[str]] = None,
f6b5b4d7
TL
59 target_version: Optional[str] = None,
60 error: Optional[str] = None,
61 paused: Optional[bool] = None,
f67539c2 62 fs_original_max_mds: Optional[Dict[str, int]] = None,
33c7a0ef
TL
63 fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None,
64 daemon_types: Optional[List[str]] = None,
65 hosts: Optional[List[str]] = None,
66 services: Optional[List[str]] = None,
67 total_count: Optional[int] = None,
68 remaining_count: Optional[int] = None,
f6b5b4d7 69 ):
f91f0fd5 70 self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
f6b5b4d7
TL
71 self.progress_id: str = progress_id
72 self.target_id: Optional[str] = target_id
f67539c2 73 self.target_digests: Optional[List[str]] = target_digests
f6b5b4d7
TL
74 self.target_version: Optional[str] = target_version
75 self.error: Optional[str] = error
76 self.paused: bool = paused or False
f67539c2 77 self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
20effc67
TL
78 self.fs_original_allow_standby_replay: Optional[Dict[str,
79 bool]] = fs_original_allow_standby_replay
33c7a0ef
TL
80 self.daemon_types = daemon_types
81 self.hosts = hosts
82 self.services = services
83 self.total_count = total_count
84 self.remaining_count = remaining_count
f6b5b4d7
TL
85
86 def to_json(self) -> dict:
87 return {
f91f0fd5 88 'target_name': self._target_name,
f6b5b4d7
TL
89 'progress_id': self.progress_id,
90 'target_id': self.target_id,
f67539c2 91 'target_digests': self.target_digests,
f6b5b4d7 92 'target_version': self.target_version,
f67539c2 93 'fs_original_max_mds': self.fs_original_max_mds,
a4b75251 94 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
f6b5b4d7
TL
95 'error': self.error,
96 'paused': self.paused,
33c7a0ef
TL
97 'daemon_types': self.daemon_types,
98 'hosts': self.hosts,
99 'services': self.services,
100 'total_count': self.total_count,
101 'remaining_count': self.remaining_count,
f6b5b4d7
TL
102 }
103
104 @classmethod
adb31ebb 105 def from_json(cls, data: dict) -> Optional['UpgradeState']:
33c7a0ef 106 valid_params = UpgradeState.__init__.__code__.co_varnames
f91f0fd5 107 if data:
33c7a0ef 108 c = {k: v for k, v in data.items() if k in valid_params}
f67539c2
TL
109 if 'repo_digest' in c:
110 c['target_digests'] = [c.pop('repo_digest')]
111 return cls(**c)
f91f0fd5
TL
112 else:
113 return None
f6b5b4d7
TL
114
115
e306af50 116class CephadmUpgrade:
adb31ebb
TL
117 UPGRADE_ERRORS = [
118 'UPGRADE_NO_STANDBY_MGR',
119 'UPGRADE_FAILED_PULL',
120 'UPGRADE_REDEPLOY_DAEMON',
f67539c2 121 'UPGRADE_BAD_TARGET_VERSION',
39ae355f
TL
122 'UPGRADE_EXCEPTION',
123 'UPGRADE_OFFLINE_HOST'
adb31ebb
TL
124 ]
125
e306af50
TL
126 def __init__(self, mgr: "CephadmOrchestrator"):
127 self.mgr = mgr
128
129 t = self.mgr.get_store('upgrade_state')
130 if t:
f6b5b4d7 131 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
e306af50
TL
132 else:
133 self.upgrade_state = None
134
f91f0fd5
TL
135 @property
136 def target_image(self) -> str:
137 assert self.upgrade_state
138 if not self.mgr.use_repo_digest:
139 return self.upgrade_state._target_name
f67539c2 140 if not self.upgrade_state.target_digests:
f91f0fd5
TL
141 return self.upgrade_state._target_name
142
f67539c2
TL
143 # FIXME: we assume the first digest is the best one to use
144 return self.upgrade_state.target_digests[0]
f91f0fd5 145
e306af50
TL
146 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
147 r = orchestrator.UpgradeStatusSpec()
148 if self.upgrade_state:
f91f0fd5 149 r.target_image = self.target_image
e306af50 150 r.in_progress = True
f67539c2 151 r.progress, r.services_complete = self._get_upgrade_info()
2a845540 152 r.is_paused = self.upgrade_state.paused
33c7a0ef
TL
153
154 if self.upgrade_state.daemon_types is not None:
155 which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
156 if self.upgrade_state.hosts is not None:
157 which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
158 elif self.upgrade_state.services is not None:
159 which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
160 if self.upgrade_state.hosts is not None:
161 which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
162 elif self.upgrade_state.hosts is not None:
163 which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
164 else:
165 which_str = 'Upgrading all daemon types on all hosts'
166 if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
167 which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
168 r.which = which_str
169
f67539c2
TL
170 # accessing self.upgrade_info_str will throw an exception if it
171 # has not been set in _do_upgrade yet
172 try:
173 r.message = self.upgrade_info_str
174 except AttributeError:
175 pass
f6b5b4d7
TL
176 if self.upgrade_state.error:
177 r.message = 'Error: ' + self.upgrade_state.error
178 elif self.upgrade_state.paused:
e306af50
TL
179 r.message = 'Upgrade paused'
180 return r
181
f67539c2
TL
182 def _get_upgrade_info(self) -> Tuple[str, List[str]]:
183 if not self.upgrade_state or not self.upgrade_state.target_digests:
184 return '', []
185
33c7a0ef 186 daemons = self._get_filtered_daemons()
f67539c2
TL
187
188 if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
189 return '', []
190
191 completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in (
192 d.container_image_digests or []))) for d in daemons if d.daemon_type]
193
194 done = len([True for completion in completed_daemons if completion[1]])
195
196 completed_types = list(set([completion[0] for completion in completed_daemons if all(
197 c[1] for c in completed_daemons if c[0] == completion[0])]))
198
522d829b 199 return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
f67539c2 200
33c7a0ef
TL
201 def _get_filtered_daemons(self) -> List[DaemonDescription]:
202 # Return the set of daemons set to be upgraded with out current
203 # filtering parameters (or all daemons in upgrade order if no filtering
204 # parameter are set).
205 assert self.upgrade_state is not None
206 if self.upgrade_state.daemon_types is not None:
207 daemons = [d for d in self.mgr.cache.get_daemons(
208 ) if d.daemon_type in self.upgrade_state.daemon_types]
209 elif self.upgrade_state.services is not None:
210 daemons = []
211 for service in self.upgrade_state.services:
212 daemons += self.mgr.cache.get_daemons_by_service(service)
213 else:
214 daemons = [d for d in self.mgr.cache.get_daemons(
215 ) if d.daemon_type in CEPH_UPGRADE_ORDER]
216 if self.upgrade_state.hosts is not None:
217 daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
218 return daemons
219
220 def _get_current_version(self) -> Tuple[int, int, str]:
221 current_version = self.mgr.version.split('ceph version ')[1]
222 (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
223 return (int(current_major), int(current_minor), current_version)
224
f67539c2
TL
225 def _check_target_version(self, version: str) -> Optional[str]:
226 try:
33c7a0ef
TL
227 v = version.split('.', 2)
228 (major, minor) = (int(v[0]), int(v[1]))
229 assert minor >= 0
f67539c2
TL
230 # patch might be a number or {number}-g{sha1}
231 except ValueError:
232 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
33c7a0ef 233 if major < 15 or (major == 15 and minor < 2):
f67539c2
TL
234 return 'cephadm only supports octopus (15.2.0) or later'
235
236 # to far a jump?
33c7a0ef
TL
237 (current_major, current_minor, current_version) = self._get_current_version()
238 if current_major < major - 2:
f67539c2 239 return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
33c7a0ef 240 if current_major > major:
f67539c2 241 return f'ceph cannot downgrade major versions (from {current_version} to {version})'
33c7a0ef
TL
242 if current_major == major:
243 if current_minor > minor:
244 return f'ceph cannot downgrade to a {"rc" if minor == 1 else "dev"} release'
f67539c2
TL
245
246 # check mon min
247 monmap = self.mgr.get("mon_map")
248 mon_min = monmap.get("min_mon_release", 0)
33c7a0ef 249 if mon_min < major - 2:
f67539c2
TL
250 return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
251
252 # check osd min
253 osdmap = self.mgr.get("osd_map")
254 osd_min_name = osdmap.get("require_osd_release", "argonaut")
255 osd_min = ceph_release_to_major(osd_min_name)
33c7a0ef 256 if osd_min < major - 2:
f67539c2
TL
257 return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
258
259 return None
260
33c7a0ef 261 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict:
a4b75251
TL
262 if not image:
263 image = self.mgr.container_image_base
264 reg_name, bare_image = image.split('/', 1)
265 reg = Registry(reg_name)
33c7a0ef 266 (current_major, current_minor, _) = self._get_current_version()
a4b75251
TL
267 versions = []
268 r: Dict[Any, Any] = {
269 "image": image,
270 "registry": reg_name,
271 "bare_image": bare_image,
272 }
2a845540
TL
273
274 try:
275 ls = reg.get_tags(bare_image)
276 except ValueError as e:
277 raise OrchestratorError(f'{e}')
a4b75251
TL
278 if not tags:
279 for t in ls:
280 if t[0] != 'v':
281 continue
282 v = t[1:].split('.')
283 if len(v) != 3:
284 continue
285 if '-' in v[2]:
286 continue
33c7a0ef
TL
287 v_major = int(v[0])
288 v_minor = int(v[1])
289 candidate_version = (v_major > current_major
290 or (v_major == current_major and v_minor >= current_minor))
291 if show_all_versions or candidate_version:
292 versions.append('.'.join(v))
a4b75251
TL
293 r["versions"] = sorted(
294 versions,
295 key=lambda k: list(map(int, k.split('.'))),
296 reverse=True
297 )
298 else:
299 r["tags"] = sorted(ls)
300 return r
301
33c7a0ef
TL
302 def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
303 hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
e306af50
TL
304 if self.mgr.mode != 'root':
305 raise OrchestratorError('upgrade is not supported in %s mode' % (
306 self.mgr.mode))
307 if version:
f67539c2
TL
308 version_error = self._check_target_version(version)
309 if version_error:
310 raise OrchestratorError(version_error)
e306af50
TL
311 target_name = self.mgr.container_image_base + ':v' + version
312 elif image:
f67539c2 313 target_name = normalize_image_digest(image, self.mgr.default_registry)
e306af50
TL
314 else:
315 raise OrchestratorError('must specify either image or version')
33c7a0ef
TL
316
317 if daemon_types is not None or services is not None or hosts is not None:
318 self._validate_upgrade_filters(target_name, daemon_types, hosts, services)
319
e306af50 320 if self.upgrade_state:
f91f0fd5 321 if self.upgrade_state._target_name != target_name:
e306af50
TL
322 raise OrchestratorError(
323 'Upgrade to %s (not %s) already in progress' %
f91f0fd5 324 (self.upgrade_state._target_name, target_name))
f6b5b4d7
TL
325 if self.upgrade_state.paused:
326 self.upgrade_state.paused = False
e306af50 327 self._save_upgrade_state()
f91f0fd5
TL
328 return 'Resumed upgrade to %s' % self.target_image
329 return 'Upgrade to %s in progress' % self.target_image
b3b6e05e
TL
330
331 running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type(
332 'mgr') if daemon.status == DaemonDescriptionStatus.running])
333
334 if running_mgr_count < 2:
335 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
336
f67539c2 337 self.mgr.log.info('Upgrade: Started with target %s' % target_name)
f6b5b4d7
TL
338 self.upgrade_state = UpgradeState(
339 target_name=target_name,
33c7a0ef
TL
340 progress_id=str(uuid.uuid4()),
341 daemon_types=daemon_types,
342 hosts=hosts,
343 services=services,
344 total_count=limit,
345 remaining_count=limit,
f6b5b4d7 346 )
e306af50
TL
347 self._update_upgrade_progress(0.0)
348 self._save_upgrade_state()
349 self._clear_upgrade_health_checks()
350 self.mgr.event.set()
351 return 'Initiating upgrade to %s' % (target_name)
352
33c7a0ef
TL
353 def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None:
354 def _latest_type(dtypes: List[str]) -> str:
355 # [::-1] gives the list in reverse
356 for daemon_type in CEPH_UPGRADE_ORDER[::-1]:
357 if daemon_type in dtypes:
358 return daemon_type
359 return ''
360
361 def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]:
362 # this function takes a list of daemon types and first finds the daemon
363 # type from that list that is latest in our upgrade order. Then, from
364 # that latest type, it filters the list of candidate daemons received
365 # for daemons with types earlier in the upgrade order than the latest
366 # type found earlier. That filtered list of daemons is returned. The
367 # purpose of this function is to help in finding daemons that must have
368 # already been upgraded for the given filtering parameters (--daemon-types,
369 # --services, --hosts) to be valid.
370 latest = _latest_type(dtypes)
371 if not latest:
372 return []
373 earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1]
374 earlier_types = [t for t in earlier_types if t not in dtypes]
375 return [d for d in candidates if d.daemon_type in earlier_types]
376
377 if self.upgrade_state:
378 raise OrchestratorError(
379 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
380 try:
381 target_id, target_version, target_digests = self.mgr.wait_async(
382 CephadmServe(self.mgr)._get_container_image_info(target_name))
383 except OrchestratorError as e:
384 raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
385 # what we need to do here is build a list of daemons that must already be upgraded
386 # in order for the user's selection of daemons to upgrade to be valid. for example,
387 # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
388 daemons = [d for d in self.mgr.cache.get_daemons(
389 ) if d.daemon_type not in MONITORING_STACK_TYPES]
390 err_msg_base = 'Cannot start upgrade. '
391 # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
392 dtypes = []
393 if daemon_types is not None:
394 dtypes = daemon_types
395 if hosts is not None:
396 dtypes = [_latest_type(dtypes)]
397 other_host_daemons = [
398 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
399 daemons = _get_earlier_daemons(dtypes, other_host_daemons)
400 else:
401 daemons = _get_earlier_daemons(dtypes, daemons)
402 err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n'
403 elif services is not None:
404 # for our purposes here we can effectively convert our list of services into the
405 # set of daemon types the services contain. This works because we don't allow --services
406 # and --daemon-types at the same time and we only allow services of the same type
407 sspecs = [
408 self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None]
409 stypes = list(set([s.service_type for s in sspecs]))
410 if len(stypes) != 1:
411 raise OrchestratorError('Doing upgrade by service only support services of one type at '
412 f'a time. Found service types: {stypes}')
413 for stype in stypes:
414 dtypes += orchestrator.service_to_daemon_types(stype)
415 dtypes = list(set(dtypes))
416 if hosts is not None:
417 other_host_daemons = [
418 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
419 daemons = _get_earlier_daemons(dtypes, other_host_daemons)
420 else:
421 daemons = _get_earlier_daemons(dtypes, daemons)
422 err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n'
423 elif hosts is not None:
424 # hosts must be handled a bit differently. For this, we really need to find all the daemon types
425 # that reside on hosts in the list of hosts we will upgrade. Then take the type from
426 # that list that is latest in the upgrade order and check if any daemons on hosts not in the
427 # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
428 dtypes = list(
429 set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts]))
430 other_hosts_daemons = [
431 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
432 daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons)
433 err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
434 need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests)
435 if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)):
436 # also report active mgr as needing to be upgraded. It is not included in the resulting list
437 # by default as it is treated special and handled via the need_upgrade_self bool
438 n1.insert(0, (self.mgr.mgr_service.get_active_daemon(
439 self.mgr.cache.get_daemons_by_type('mgr')), True))
440 if n1 or n2:
441 raise OrchestratorError(f'{err_msg_base}Please first upgrade '
442 f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
443 f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}')
444
e306af50
TL
445 def upgrade_pause(self) -> str:
446 if not self.upgrade_state:
447 raise OrchestratorError('No upgrade in progress')
f6b5b4d7 448 if self.upgrade_state.paused:
f91f0fd5 449 return 'Upgrade to %s already paused' % self.target_image
f6b5b4d7 450 self.upgrade_state.paused = True
f67539c2 451 self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image)
e306af50 452 self._save_upgrade_state()
f91f0fd5 453 return 'Paused upgrade to %s' % self.target_image
e306af50
TL
454
455 def upgrade_resume(self) -> str:
456 if not self.upgrade_state:
457 raise OrchestratorError('No upgrade in progress')
f6b5b4d7 458 if not self.upgrade_state.paused:
f91f0fd5 459 return 'Upgrade to %s not paused' % self.target_image
f6b5b4d7 460 self.upgrade_state.paused = False
2a845540 461 self.upgrade_state.error = ''
f67539c2 462 self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image)
e306af50
TL
463 self._save_upgrade_state()
464 self.mgr.event.set()
39ae355f
TL
465 for alert_id in self.UPGRADE_ERRORS:
466 self.mgr.remove_health_warning(alert_id)
f91f0fd5 467 return 'Resumed upgrade to %s' % self.target_image
e306af50
TL
468
469 def upgrade_stop(self) -> str:
470 if not self.upgrade_state:
471 return 'No upgrade in progress'
f6b5b4d7 472 if self.upgrade_state.progress_id:
e306af50 473 self.mgr.remote('progress', 'complete',
f6b5b4d7 474 self.upgrade_state.progress_id)
f91f0fd5 475 target_image = self.target_image
f67539c2 476 self.mgr.log.info('Upgrade: Stopped')
e306af50
TL
477 self.upgrade_state = None
478 self._save_upgrade_state()
479 self._clear_upgrade_health_checks()
480 self.mgr.event.set()
f91f0fd5 481 return 'Stopped upgrade to %s' % target_image
e306af50
TL
482
483 def continue_upgrade(self) -> bool:
484 """
485 Returns false, if nothing was done.
486 :return:
487 """
f6b5b4d7 488 if self.upgrade_state and not self.upgrade_state.paused:
f67539c2
TL
489 try:
490 self._do_upgrade()
39ae355f
TL
491 except HostConnectionError as e:
492 self._fail_upgrade('UPGRADE_OFFLINE_HOST', {
493 'severity': 'error',
494 'summary': f'Upgrade: Failed to connect to host {e.hostname} at addr ({e.addr})',
495 'count': 1,
496 'detail': [f'SSH connection failed to {e.hostname} at addr ({e.addr}): {str(e)}'],
497 })
498 return False
f67539c2
TL
499 except Exception as e:
500 self._fail_upgrade('UPGRADE_EXCEPTION', {
501 'severity': 'error',
502 'summary': 'Upgrade: failed due to an unexpected exception',
503 'count': 1,
504 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'],
505 })
506 return False
e306af50
TL
507 return True
508 return False
509
f67539c2
TL
510 def _wait_for_ok_to_stop(
511 self, s: DaemonDescription,
512 known: Optional[List[str]] = None, # NOTE: output argument!
513 ) -> bool:
e306af50 514 # only wait a little bit; the service might go away for something
f67539c2
TL
515 assert s.daemon_type is not None
516 assert s.daemon_id is not None
e306af50
TL
517 tries = 4
518 while tries > 0:
f6b5b4d7 519 if not self.upgrade_state or self.upgrade_state.paused:
e306af50 520 return False
f6b5b4d7 521
f67539c2
TL
522 # setting force flag to retain old functionality.
523 # note that known is an output argument for ok_to_stop()
524 r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([
525 s.daemon_id], known=known, force=True)
f6b5b4d7
TL
526
527 if not r.retval:
528 logger.info(f'Upgrade: {r.stdout}')
e306af50 529 return True
f67539c2 530 logger.info(f'Upgrade: {r.stderr}')
f6b5b4d7
TL
531
532 time.sleep(15)
533 tries -= 1
e306af50
TL
534 return False
535
536 def _clear_upgrade_health_checks(self) -> None:
adb31ebb 537 for k in self.UPGRADE_ERRORS:
e306af50
TL
538 if k in self.mgr.health_checks:
539 del self.mgr.health_checks[k]
540 self.mgr.set_health_checks(self.mgr.health_checks)
541
adb31ebb
TL
542 def _fail_upgrade(self, alert_id: str, alert: dict) -> None:
543 assert alert_id in self.UPGRADE_ERRORS
f6b5b4d7 544 if not self.upgrade_state:
f67539c2
TL
545 # this could happen if the user canceled the upgrade while we
546 # were doing something
547 return
f6b5b4d7 548
f67539c2
TL
549 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
550 alert['summary']))
f6b5b4d7
TL
551 self.upgrade_state.error = alert_id + ': ' + alert['summary']
552 self.upgrade_state.paused = True
e306af50
TL
553 self._save_upgrade_state()
554 self.mgr.health_checks[alert_id] = alert
555 self.mgr.set_health_checks(self.mgr.health_checks)
556
adb31ebb 557 def _update_upgrade_progress(self, progress: float) -> None:
f6b5b4d7
TL
558 if not self.upgrade_state:
559 assert False, 'No upgrade in progress'
560
561 if not self.upgrade_state.progress_id:
562 self.upgrade_state.progress_id = str(uuid.uuid4())
e306af50 563 self._save_upgrade_state()
f6b5b4d7 564 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
f67539c2
TL
565 ev_msg='Upgrade to %s' % (
566 self.upgrade_state.target_version or self.target_image
567 ),
568 ev_progress=progress,
569 add_to_ceph_s=True)
e306af50
TL
570
571 def _save_upgrade_state(self) -> None:
f6b5b4d7
TL
572 if not self.upgrade_state:
573 self.mgr.set_store('upgrade_state', None)
574 return
575 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
e306af50 576
f91f0fd5
TL
577 def get_distinct_container_image_settings(self) -> Dict[str, str]:
578 # get all distinct container_image settings
579 image_settings = {}
580 ret, out, err = self.mgr.check_mon_command({
581 'prefix': 'config dump',
582 'format': 'json',
583 })
584 config = json.loads(out)
585 for opt in config:
586 if opt['name'] == 'container_image':
587 image_settings[opt['section']] = opt['value']
588 return image_settings
589
f67539c2
TL
590 def _prepare_for_mds_upgrade(
591 self,
592 target_major: str,
593 need_upgrade: List[DaemonDescription]
594 ) -> bool:
f67539c2
TL
595 # scale down all filesystems to 1 MDS
596 assert self.upgrade_state
597 if not self.upgrade_state.fs_original_max_mds:
598 self.upgrade_state.fs_original_max_mds = {}
a4b75251
TL
599 if not self.upgrade_state.fs_original_allow_standby_replay:
600 self.upgrade_state.fs_original_allow_standby_replay = {}
f67539c2
TL
601 fsmap = self.mgr.get("fs_map")
602 continue_upgrade = True
a4b75251
TL
603 for fs in fsmap.get('filesystems', []):
604 fscid = fs["id"]
605 mdsmap = fs["mdsmap"]
606 fs_name = mdsmap["fs_name"]
607
608 # disable allow_standby_replay?
609 if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY:
610 self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % (
611 fs_name
612 ))
613 if fscid not in self.upgrade_state.fs_original_allow_standby_replay:
614 self.upgrade_state.fs_original_allow_standby_replay[fscid] = True
615 self._save_upgrade_state()
616 ret, out, err = self.mgr.check_mon_command({
617 'prefix': 'fs set',
618 'fs_name': fs_name,
619 'var': 'allow_standby_replay',
620 'val': '0',
621 })
622 continue_upgrade = False
623 continue
f67539c2
TL
624
625 # scale down this filesystem?
a4b75251 626 if mdsmap["max_mds"] > 1:
f67539c2
TL
627 self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
628 fs_name
629 ))
a4b75251
TL
630 if fscid not in self.upgrade_state.fs_original_max_mds:
631 self.upgrade_state.fs_original_max_mds[fscid] = mdsmap['max_mds']
f67539c2
TL
632 self._save_upgrade_state()
633 ret, out, err = self.mgr.check_mon_command({
634 'prefix': 'fs set',
635 'fs_name': fs_name,
636 'var': 'max_mds',
637 'val': '1',
638 })
639 continue_upgrade = False
640 continue
641
a4b75251 642 if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
20effc67
TL
643 self.mgr.log.info(
644 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
f67539c2
TL
645 time.sleep(10)
646 continue_upgrade = False
647 continue
648
a4b75251 649 if len(mdsmap['up']) == 0:
20effc67
TL
650 self.mgr.log.warning(
651 "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
a4b75251
TL
652 # This can happen because the current version MDS have
653 # incompatible compatsets; the mons will not do any promotions.
654 # We must upgrade to continue.
655 elif len(mdsmap['up']) > 0:
656 mdss = list(mdsmap['info'].values())
657 assert len(mdss) == 1
658 lone_mds = mdss[0]
659 if lone_mds['state'] != 'up:active':
660 self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
661 lone_mds['name'],
662 lone_mds['state'],
663 ))
664 time.sleep(10)
665 continue_upgrade = False
666 continue
667 else:
668 assert False
f67539c2
TL
669
670 return continue_upgrade
671
b3b6e05e
TL
672 def _enough_mons_for_ok_to_stop(self) -> bool:
673 # type () -> bool
674 ret, out, err = self.mgr.check_mon_command({
675 'prefix': 'quorum_status',
676 })
677 try:
678 j = json.loads(out)
679 except Exception:
680 raise OrchestratorError('failed to parse quorum status')
681
682 mons = [m['name'] for m in j['monmap']['mons']]
683 return len(mons) > 2
684
685 def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool:
686 # type (DaemonDescription) -> bool
687
688 # find fs this mds daemon belongs to
689 fsmap = self.mgr.get("fs_map")
a4b75251
TL
690 for fs in fsmap.get('filesystems', []):
691 mdsmap = fs["mdsmap"]
692 fs_name = mdsmap["fs_name"]
b3b6e05e
TL
693
694 assert mds_daemon.daemon_id
695 if fs_name != mds_daemon.service_name().split('.', 1)[1]:
696 # wrong fs for this mds daemon
697 continue
698
699 # get number of mds daemons for this fs
700 mds_count = len(
701 [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())])
702
703 # standby mds daemons for this fs?
a4b75251 704 if mdsmap["max_mds"] < mds_count:
b3b6e05e
TL
705 return True
706 return False
707
708 return True # if mds has no fs it should pass ok-to-stop
709
33c7a0ef
TL
710 def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]:
711 # this function takes a list of daemons and container digests. The purpose
712 # is to go through each daemon and check if the current container digests
713 # for that daemon match the target digests. The purpose being that we determine
714 # if a daemon is upgraded to a certain container image or not based on what
715 # container digests it has. By checking the current digests against the
716 # targets we can determine which daemons still need to be upgraded
717 need_upgrade_self = False
718 need_upgrade: List[Tuple[DaemonDescription, bool]] = []
719 need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
720 done = 0
721 if target_digests is None:
722 target_digests = []
723 for d in daemons:
724 assert d.daemon_type is not None
725 assert d.daemon_id is not None
726 assert d.hostname is not None
727 if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname):
728 continue
729 correct_digest = False
730 if (any(d in target_digests for d in (d.container_image_digests or []))
731 or d.daemon_type in MONITORING_STACK_TYPES):
732 logger.debug('daemon %s.%s container digest correct' % (
733 d.daemon_type, d.daemon_id))
734 correct_digest = True
735 if any(d in target_digests for d in (d.deployed_by or [])):
736 logger.debug('daemon %s.%s deployed by correct version' % (
737 d.daemon_type, d.daemon_id))
738 done += 1
739 continue
740
741 if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
742 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
743 self.mgr.get_mgr_id())
744 need_upgrade_self = True
745 continue
746
747 if correct_digest:
748 logger.debug('daemon %s.%s not deployed by correct version' % (
749 d.daemon_type, d.daemon_id))
750 need_upgrade_deployer.append((d, True))
751 else:
752 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
753 d.daemon_type, d.daemon_id,
754 d.container_image_name, d.container_image_digests, d.version))
755 need_upgrade.append((d, False))
756
757 return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done)
758
759 def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]:
760 to_upgrade: List[Tuple[DaemonDescription, bool]] = []
761 known_ok_to_stop: List[str] = []
762 for d_entry in need_upgrade:
763 d = d_entry[0]
764 assert d.daemon_type is not None
765 assert d.daemon_id is not None
766 assert d.hostname is not None
767
768 if not d.container_image_id:
769 if d.container_image_name == target_image:
770 logger.debug(
771 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
772 continue
773
774 if known_ok_to_stop:
775 if d.name() in known_ok_to_stop:
776 logger.info(f'Upgrade: {d.name()} is also safe to restart')
777 to_upgrade.append(d_entry)
778 continue
779
780 if d.daemon_type == 'osd':
781 # NOTE: known_ok_to_stop is an output argument for
782 # _wait_for_ok_to_stop
783 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
784 return False, to_upgrade
785
786 if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
787 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
788 return False, to_upgrade
789
790 if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
791 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
792 return False, to_upgrade
793
794 to_upgrade.append(d_entry)
795
796 # if we don't have a list of others to consider, stop now
797 if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
798 break
799 return True, to_upgrade
800
801 def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None:
802 assert self.upgrade_state is not None
803 num = 1
804 if target_digests is None:
805 target_digests = []
806 for d_entry in to_upgrade:
807 if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
808 self.mgr.log.info(
809 f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
810 return
811 d = d_entry[0]
812 assert d.daemon_type is not None
813 assert d.daemon_id is not None
814 assert d.hostname is not None
815
816 # make sure host has latest container image
817 out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
818 d.hostname, '', 'inspect-image', [],
819 image=target_image, no_fsid=True, error_ok=True))
820 if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
821 logger.info('Upgrade: Pulling %s on %s' % (target_image,
822 d.hostname))
823 self.upgrade_info_str = 'Pulling %s image on host %s' % (
824 target_image, d.hostname)
825 out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
826 d.hostname, '', 'pull', [],
827 image=target_image, no_fsid=True, error_ok=True))
828 if code:
829 self._fail_upgrade('UPGRADE_FAILED_PULL', {
830 'severity': 'warning',
831 'summary': 'Upgrade: failed to pull target image',
832 'count': 1,
833 'detail': [
834 'failed to pull %s on host %s' % (target_image,
835 d.hostname)],
836 })
837 return
838 r = json.loads(''.join(out))
839 if not any(d in target_digests for d in r.get('repo_digests', [])):
840 logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
841 target_image, d.hostname, r['repo_digests'], target_digests))
842 self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
843 target_image, d.hostname, r['repo_digests'], target_digests)
844 self.upgrade_state.target_digests = r['repo_digests']
845 self._save_upgrade_state()
846 return
847
848 self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
849
850 if len(to_upgrade) > 1:
851 logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade),
852 self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
853 else:
854 logger.info('Upgrade: Updating %s.%s' %
855 (d.daemon_type, d.daemon_id))
856 action = 'Upgrading' if not d_entry[1] else 'Redeploying'
857 try:
858 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
859 self.mgr._daemon_action(
860 daemon_spec,
861 'redeploy',
862 image=target_image if not d_entry[1] else None
863 )
864 self.mgr.cache.metadata_up_to_date[d.hostname] = False
865 except Exception as e:
866 self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
867 'severity': 'warning',
868 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
869 'count': 1,
870 'detail': [
871 f'Upgrade daemon: {d.name()}: {e}'
872 ],
873 })
874 return
875 num += 1
876 if self.upgrade_state.remaining_count is not None and not d_entry[1]:
877 self.upgrade_state.remaining_count -= 1
878 self._save_upgrade_state()
879
880 def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
881 if need_upgrade_self:
882 try:
883 self.mgr.mgr_service.fail_over()
884 except OrchestratorError as e:
885 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
886 'severity': 'warning',
887 'summary': f'Upgrade: {e}',
888 'count': 1,
889 'detail': [
890 'The upgrade process needs to upgrade the mgr, '
891 'but it needs at least one standby to proceed.',
892 ],
893 })
894 return
895
896 return # unreachable code, as fail_over never returns
897 elif upgrading_mgrs:
898 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
899 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
900 self.mgr.set_health_checks(self.mgr.health_checks)
901
902 def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None:
903 # push down configs
904 daemon_type_section = name_to_config_section(daemon_type)
905 if image_settings.get(daemon_type_section) != target_image:
906 logger.info('Upgrade: Setting container_image for all %s' %
907 daemon_type)
908 self.mgr.set_container_image(daemon_type_section, target_image)
909 to_clean = []
910 for section in image_settings.keys():
911 if section.startswith(name_to_config_section(daemon_type) + '.'):
912 to_clean.append(section)
913 if to_clean:
914 logger.debug('Upgrade: Cleaning up container_image for %s' %
915 to_clean)
916 for section in to_clean:
917 ret, image, err = self.mgr.check_mon_command({
918 'prefix': 'config rm',
919 'name': 'container_image',
920 'who': section,
921 })
922
923 def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None:
924 osdmap = self.mgr.get("osd_map")
925 osd_min_name = osdmap.get("require_osd_release", "argonaut")
926 osd_min = ceph_release_to_major(osd_min_name)
927 if osd_min < int(target_major):
928 logger.info(
929 f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
930 ret, _, err = self.mgr.check_mon_command({
931 'prefix': 'osd require-osd-release',
932 'release': target_major_name,
933 })
934
935 def _complete_mds_upgrade(self) -> None:
936 assert self.upgrade_state is not None
937 if self.upgrade_state.fs_original_max_mds:
938 for fs in self.mgr.get("fs_map")['filesystems']:
939 fscid = fs["id"]
940 fs_name = fs['mdsmap']['fs_name']
941 new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
942 if new_max > 1:
943 self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
944 fs_name, new_max
945 ))
946 ret, _, err = self.mgr.check_mon_command({
947 'prefix': 'fs set',
948 'fs_name': fs_name,
949 'var': 'max_mds',
950 'val': str(new_max),
951 })
952
953 self.upgrade_state.fs_original_max_mds = {}
954 self._save_upgrade_state()
955 if self.upgrade_state.fs_original_allow_standby_replay:
956 for fs in self.mgr.get("fs_map")['filesystems']:
957 fscid = fs["id"]
958 fs_name = fs['mdsmap']['fs_name']
959 asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
960 if asr:
961 self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
962 fs_name
963 ))
964 ret, _, err = self.mgr.check_mon_command({
965 'prefix': 'fs set',
966 'fs_name': fs_name,
967 'var': 'allow_standby_replay',
968 'val': '1'
969 })
970
971 self.upgrade_state.fs_original_allow_standby_replay = {}
972 self._save_upgrade_state()
973
974 def _mark_upgrade_complete(self) -> None:
975 if not self.upgrade_state:
976 logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
977 return
978 logger.info('Upgrade: Complete!')
979 if self.upgrade_state.progress_id:
980 self.mgr.remote('progress', 'complete',
981 self.upgrade_state.progress_id)
982 self.upgrade_state = None
983 self._save_upgrade_state()
984
e306af50
TL
985 def _do_upgrade(self):
986 # type: () -> None
987 if not self.upgrade_state:
988 logger.debug('_do_upgrade no state, exiting')
989 return
990
39ae355f
TL
991 if self.mgr.offline_hosts:
992 # offline host(s), on top of potential connection errors when trying to upgrade a daemon
993 # or pull an image, can cause issues where daemons are never ok to stop. Since evaluating
994 # whether or not that risk is present for any given offline hosts is a difficult problem,
995 # it's best to just fail upgrade cleanly so user can address the offline host(s)
996
997 # the HostConnectionError expects a hostname and addr, so let's just take
998 # one at random. It doesn't really matter which host we say we couldn't reach here.
999 hostname: str = list(self.mgr.offline_hosts)[0]
1000 addr: str = self.mgr.inventory.get_addr(hostname)
1001 raise HostConnectionError(f'Host(s) were marked offline: {self.mgr.offline_hosts}', hostname, addr)
1002
f91f0fd5 1003 target_image = self.target_image
f6b5b4d7 1004 target_id = self.upgrade_state.target_id
f67539c2
TL
1005 target_digests = self.upgrade_state.target_digests
1006 target_version = self.upgrade_state.target_version
1007
1008 first = False
1009 if not target_id or not target_version or not target_digests:
e306af50 1010 # need to learn the container hash
f91f0fd5 1011 logger.info('Upgrade: First pull of %s' % target_image)
33c7a0ef 1012 self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
e306af50 1013 try:
20effc67
TL
1014 target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info(
1015 target_image))
e306af50
TL
1016 except OrchestratorError as e:
1017 self._fail_upgrade('UPGRADE_FAILED_PULL', {
1018 'severity': 'warning',
1019 'summary': 'Upgrade: failed to pull target image',
1020 'count': 1,
1021 'detail': [str(e)],
1022 })
1023 return
f67539c2
TL
1024 if not target_version:
1025 self._fail_upgrade('UPGRADE_FAILED_PULL', {
1026 'severity': 'warning',
1027 'summary': 'Upgrade: failed to pull target image',
1028 'count': 1,
1029 'detail': ['unable to extract ceph version from container'],
1030 })
1031 return
f6b5b4d7 1032 self.upgrade_state.target_id = target_id
f67539c2
TL
1033 # extract the version portion of 'ceph version {version} ({sha1})'
1034 self.upgrade_state.target_version = target_version.split(' ')[2]
1035 self.upgrade_state.target_digests = target_digests
e306af50 1036 self._save_upgrade_state()
f91f0fd5 1037 target_image = self.target_image
f67539c2
TL
1038 first = True
1039
1040 if target_digests is None:
1041 target_digests = []
1042 if target_version.startswith('ceph version '):
1043 # tolerate/fix upgrade state from older version
1044 self.upgrade_state.target_version = target_version.split(' ')[2]
1045 target_version = self.upgrade_state.target_version
1046 (target_major, _) = target_version.split('.', 1)
1047 target_major_name = self.mgr.lookup_release_name(int(target_major))
1048
1049 if first:
1050 logger.info('Upgrade: Target is version %s (%s)' % (
1051 target_version, target_major_name))
1052 logger.info('Upgrade: Target container is %s, digests %s' % (
1053 target_image, target_digests))
1054
1055 version_error = self._check_target_version(target_version)
1056 if version_error:
1057 self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', {
1058 'severity': 'error',
1059 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}',
1060 'count': 1,
1061 'detail': [version_error],
1062 })
1063 return
e306af50 1064
f91f0fd5 1065 image_settings = self.get_distinct_container_image_settings()
e306af50 1066
a4b75251
TL
1067 # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
1068 # MDSMap::compat for all fs. This is no longer the case beginning in
1069 # v16.2.5. We must disable the sanity checks during upgrade.
1070 # N.B.: we don't bother confirming the operator has not already
1071 # disabled this or saving the config value.
1072 self.mgr.check_mon_command({
1073 'prefix': 'config set',
1074 'name': 'mon_mds_skip_sanity',
1075 'value': '1',
1076 'who': 'mon',
1077 })
1078
33c7a0ef
TL
1079 if self.upgrade_state.daemon_types is not None:
1080 logger.debug(
1081 f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
1082 daemons = [d for d in self.mgr.cache.get_daemons(
1083 ) if d.daemon_type in self.upgrade_state.daemon_types]
1084 elif self.upgrade_state.services is not None:
1085 logger.debug(
1086 f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
1087 daemons = []
1088 for service in self.upgrade_state.services:
1089 daemons += self.mgr.cache.get_daemons_by_service(service)
1090 else:
1091 daemons = [d for d in self.mgr.cache.get_daemons(
1092 ) if d.daemon_type in CEPH_UPGRADE_ORDER]
1093 if self.upgrade_state.hosts is not None:
1094 logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
1095 daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
1096 upgraded_daemon_count: int = 0
e306af50 1097 for daemon_type in CEPH_UPGRADE_ORDER:
33c7a0ef
TL
1098 if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0:
1099 # we hit our limit and should end the upgrade
1100 # except for cases where we only need to redeploy, but not actually upgrade
1101 # the image (which we don't count towards our limit). This case only occurs with mgr
1102 # and monitoring stack daemons. Additionally, this case is only valid if
1103 # the active mgr is already upgraded.
1104 if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
1105 if daemon_type not in MONITORING_STACK_TYPES and daemon_type != 'mgr':
f67539c2 1106 continue
f67539c2 1107 else:
33c7a0ef
TL
1108 self._mark_upgrade_complete()
1109 return
1110 logger.debug('Upgrade: Checking %s daemons' % daemon_type)
1111 daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
1112
1113 need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(
1114 daemons_of_type, target_digests)
1115 upgraded_daemon_count += done
1116 self._update_upgrade_progress(upgraded_daemon_count / len(daemons))
1117
1118 # make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios
1119 if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES:
1120 if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
1121 need_upgrade_names = [d[0].name() for d in need_upgrade] + \
1122 [d[0].name() for d in need_upgrade_deployer]
1123 dds = [d for d in self.mgr.cache.get_daemons_by_type(
1124 daemon_type) if d.name() not in need_upgrade_names]
1125 need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests)
1126 if not n1:
1127 if not need_upgrade_self and need_upgrade_active:
1128 need_upgrade_self = True
1129 need_upgrade_deployer += n2
1130 else:
1131 # no point in trying to redeploy with new version if active mgr is not on the new version
1132 need_upgrade_deployer = []
f67539c2 1133
39ae355f 1134 if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
f67539c2
TL
1135 # only after the mgr itself is upgraded can we expect daemons to have
1136 # deployed_by == target_digests
1137 need_upgrade += need_upgrade_deployer
1138
1139 # prepare filesystems for daemon upgrades?
1140 if (
1141 daemon_type == 'mds'
1142 and need_upgrade
1143 and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade])
1144 ):
1145 return
1146
1147 if need_upgrade:
1148 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
1149
33c7a0ef
TL
1150 _continue, to_upgrade = self._to_upgrade(need_upgrade, target_image)
1151 if not _continue:
1152 return
1153 self._upgrade_daemons(to_upgrade, target_image, target_digests)
f67539c2 1154 if to_upgrade:
e306af50
TL
1155 return
1156
33c7a0ef
TL
1157 self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
1158
1159 # following bits of _do_upgrade are for completing upgrade for given
1160 # types. If we haven't actually finished upgrading all the daemons
1161 # of this type, we should exit the loop here
1162 _, n1, n2, _ = self._detect_need_upgrade(
1163 self.mgr.cache.get_daemons_by_type(daemon_type), target_digests)
1164 if n1 or n2:
1165 continue
1166
f67539c2
TL
1167 # complete mon upgrade?
1168 if daemon_type == 'mon':
1169 if not self.mgr.get("have_local_config_map"):
1170 logger.info('Upgrade: Restarting mgr now that mons are running pacific')
1171 need_upgrade_self = True
1172
33c7a0ef 1173 self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
e306af50
TL
1174
1175 # make sure 'ceph versions' agrees
f6b5b4d7 1176 ret, out_ver, err = self.mgr.check_mon_command({
e306af50
TL
1177 'prefix': 'versions',
1178 })
f6b5b4d7 1179 j = json.loads(out_ver)
e306af50 1180 for version, count in j.get(daemon_type, {}).items():
f67539c2
TL
1181 short_version = version.split(' ')[2]
1182 if short_version != target_version:
e306af50
TL
1183 logger.warning(
1184 'Upgrade: %d %s daemon(s) are %s != target %s' %
f67539c2 1185 (count, daemon_type, short_version, target_version))
e306af50 1186
33c7a0ef 1187 self._set_container_images(daemon_type, target_image, image_settings)
e306af50 1188
f67539c2
TL
1189 # complete osd upgrade?
1190 if daemon_type == 'osd':
33c7a0ef 1191 self._complete_osd_upgrade(target_major, target_major_name)
f67539c2
TL
1192
1193 # complete mds upgrade?
a4b75251 1194 if daemon_type == 'mds':
33c7a0ef 1195 self._complete_mds_upgrade()
e306af50 1196
20effc67
TL
1197 # Make sure all metadata is up to date before saying we are done upgrading this daemon type
1198 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
1199 self.mgr.agent_helpers._request_ack_all_not_up_to_date()
1200 return
1201
33c7a0ef 1202 logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type)
20effc67 1203
e306af50
TL
1204 # clean up
1205 logger.info('Upgrade: Finalizing container_image settings')
f91f0fd5
TL
1206 self.mgr.set_container_image('global', target_image)
1207
e306af50
TL
1208 for daemon_type in CEPH_UPGRADE_ORDER:
1209 ret, image, err = self.mgr.check_mon_command({
1210 'prefix': 'config rm',
1211 'name': 'container_image',
1212 'who': name_to_config_section(daemon_type),
1213 })
1214
a4b75251
TL
1215 self.mgr.check_mon_command({
1216 'prefix': 'config rm',
1217 'name': 'mon_mds_skip_sanity',
1218 'who': 'mon',
1219 })
1220
33c7a0ef 1221 self._mark_upgrade_complete()
e306af50 1222 return