]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/upgrade.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
CommitLineData
e306af50
TL
1import json
2import logging
3import time
4import uuid
1e59de90 5from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast
e306af50
TL
6
7import orchestrator
a4b75251 8from cephadm.registry import Registry
f67539c2
TL
9from cephadm.serve import CephadmServe
10from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
33c7a0ef 11from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \
aee94f69 12 CEPH_TYPES, NON_CEPH_IMAGE_TYPES, GATEWAY_TYPES
39ae355f 13from cephadm.ssh import HostConnectionError
b3b6e05e 14from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service
e306af50
TL
15
16if TYPE_CHECKING:
17 from .module import CephadmOrchestrator
18
19
e306af50
TL
20logger = logging.getLogger(__name__)
21
a4b75251
TL
22# from ceph_fs.h
23CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
1e59de90 24CEPH_MDSMAP_NOT_JOINABLE = (1 << 0)
a4b75251 25
f6b5b4d7 26
f67539c2 27def normalize_image_digest(digest: str, default_registry: str) -> str:
20effc67
TL
28 """
29 Normal case:
30 >>> normalize_image_digest('ceph/ceph', 'docker.io')
31 'docker.io/ceph/ceph'
32
33 No change:
34 >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io')
35 'quay.ceph.io/ceph/ceph'
36
37 >>> normalize_image_digest('docker.io/ubuntu', 'docker.io')
38 'docker.io/ubuntu'
39
40 >>> normalize_image_digest('localhost/ceph', 'docker.io')
41 'localhost/ceph'
42 """
43 known_shortnames = [
44 'ceph/ceph',
45 'ceph/daemon',
46 'ceph/daemon-base',
47 ]
48 for image in known_shortnames:
49 if digest.startswith(image):
50 return f'{default_registry}/{digest}'
f67539c2
TL
51 return digest
52
53
f6b5b4d7
TL
54class UpgradeState:
55 def __init__(self,
56 target_name: str,
57 progress_id: str,
58 target_id: Optional[str] = None,
f67539c2 59 target_digests: Optional[List[str]] = None,
f6b5b4d7
TL
60 target_version: Optional[str] = None,
61 error: Optional[str] = None,
62 paused: Optional[bool] = None,
1e59de90 63 fail_fs: bool = False,
f67539c2 64 fs_original_max_mds: Optional[Dict[str, int]] = None,
33c7a0ef
TL
65 fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None,
66 daemon_types: Optional[List[str]] = None,
67 hosts: Optional[List[str]] = None,
68 services: Optional[List[str]] = None,
69 total_count: Optional[int] = None,
70 remaining_count: Optional[int] = None,
f6b5b4d7 71 ):
f91f0fd5 72 self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
f6b5b4d7
TL
73 self.progress_id: str = progress_id
74 self.target_id: Optional[str] = target_id
f67539c2 75 self.target_digests: Optional[List[str]] = target_digests
f6b5b4d7
TL
76 self.target_version: Optional[str] = target_version
77 self.error: Optional[str] = error
78 self.paused: bool = paused or False
f67539c2 79 self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
20effc67
TL
80 self.fs_original_allow_standby_replay: Optional[Dict[str,
81 bool]] = fs_original_allow_standby_replay
1e59de90 82 self.fail_fs = fail_fs
33c7a0ef
TL
83 self.daemon_types = daemon_types
84 self.hosts = hosts
85 self.services = services
86 self.total_count = total_count
87 self.remaining_count = remaining_count
f6b5b4d7
TL
88
89 def to_json(self) -> dict:
90 return {
f91f0fd5 91 'target_name': self._target_name,
f6b5b4d7
TL
92 'progress_id': self.progress_id,
93 'target_id': self.target_id,
f67539c2 94 'target_digests': self.target_digests,
f6b5b4d7 95 'target_version': self.target_version,
1e59de90 96 'fail_fs': self.fail_fs,
f67539c2 97 'fs_original_max_mds': self.fs_original_max_mds,
a4b75251 98 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
f6b5b4d7
TL
99 'error': self.error,
100 'paused': self.paused,
33c7a0ef
TL
101 'daemon_types': self.daemon_types,
102 'hosts': self.hosts,
103 'services': self.services,
104 'total_count': self.total_count,
105 'remaining_count': self.remaining_count,
f6b5b4d7
TL
106 }
107
108 @classmethod
adb31ebb 109 def from_json(cls, data: dict) -> Optional['UpgradeState']:
33c7a0ef 110 valid_params = UpgradeState.__init__.__code__.co_varnames
f91f0fd5 111 if data:
33c7a0ef 112 c = {k: v for k, v in data.items() if k in valid_params}
f67539c2
TL
113 if 'repo_digest' in c:
114 c['target_digests'] = [c.pop('repo_digest')]
115 return cls(**c)
f91f0fd5
TL
116 else:
117 return None
f6b5b4d7
TL
118
119
e306af50 120class CephadmUpgrade:
adb31ebb
TL
121 UPGRADE_ERRORS = [
122 'UPGRADE_NO_STANDBY_MGR',
123 'UPGRADE_FAILED_PULL',
124 'UPGRADE_REDEPLOY_DAEMON',
f67539c2 125 'UPGRADE_BAD_TARGET_VERSION',
39ae355f
TL
126 'UPGRADE_EXCEPTION',
127 'UPGRADE_OFFLINE_HOST'
adb31ebb
TL
128 ]
129
e306af50
TL
130 def __init__(self, mgr: "CephadmOrchestrator"):
131 self.mgr = mgr
132
133 t = self.mgr.get_store('upgrade_state')
134 if t:
f6b5b4d7 135 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
e306af50
TL
136 else:
137 self.upgrade_state = None
1e59de90 138 self.upgrade_info_str: str = ''
e306af50 139
f91f0fd5
TL
140 @property
141 def target_image(self) -> str:
142 assert self.upgrade_state
143 if not self.mgr.use_repo_digest:
144 return self.upgrade_state._target_name
f67539c2 145 if not self.upgrade_state.target_digests:
f91f0fd5
TL
146 return self.upgrade_state._target_name
147
f67539c2
TL
148 # FIXME: we assume the first digest is the best one to use
149 return self.upgrade_state.target_digests[0]
f91f0fd5 150
e306af50
TL
151 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
152 r = orchestrator.UpgradeStatusSpec()
153 if self.upgrade_state:
f91f0fd5 154 r.target_image = self.target_image
e306af50 155 r.in_progress = True
f67539c2 156 r.progress, r.services_complete = self._get_upgrade_info()
2a845540 157 r.is_paused = self.upgrade_state.paused
33c7a0ef
TL
158
159 if self.upgrade_state.daemon_types is not None:
160 which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
161 if self.upgrade_state.hosts is not None:
162 which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
163 elif self.upgrade_state.services is not None:
164 which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}'
165 if self.upgrade_state.hosts is not None:
166 which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
167 elif self.upgrade_state.hosts is not None:
168 which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
169 else:
170 which_str = 'Upgrading all daemon types on all hosts'
171 if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
172 which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).'
173 r.which = which_str
174
f67539c2
TL
175 # accessing self.upgrade_info_str will throw an exception if it
176 # has not been set in _do_upgrade yet
177 try:
178 r.message = self.upgrade_info_str
179 except AttributeError:
180 pass
f6b5b4d7
TL
181 if self.upgrade_state.error:
182 r.message = 'Error: ' + self.upgrade_state.error
183 elif self.upgrade_state.paused:
e306af50
TL
184 r.message = 'Upgrade paused'
185 return r
186
f67539c2
TL
187 def _get_upgrade_info(self) -> Tuple[str, List[str]]:
188 if not self.upgrade_state or not self.upgrade_state.target_digests:
189 return '', []
190
33c7a0ef 191 daemons = self._get_filtered_daemons()
f67539c2
TL
192
193 if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'):
194 return '', []
195
196 completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in (
197 d.container_image_digests or []))) for d in daemons if d.daemon_type]
198
199 done = len([True for completion in completed_daemons if completion[1]])
200
201 completed_types = list(set([completion[0] for completion in completed_daemons if all(
202 c[1] for c in completed_daemons if c[0] == completion[0])]))
203
522d829b 204 return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types
f67539c2 205
33c7a0ef
TL
206 def _get_filtered_daemons(self) -> List[DaemonDescription]:
207 # Return the set of daemons set to be upgraded with out current
208 # filtering parameters (or all daemons in upgrade order if no filtering
209 # parameter are set).
210 assert self.upgrade_state is not None
211 if self.upgrade_state.daemon_types is not None:
212 daemons = [d for d in self.mgr.cache.get_daemons(
213 ) if d.daemon_type in self.upgrade_state.daemon_types]
214 elif self.upgrade_state.services is not None:
215 daemons = []
216 for service in self.upgrade_state.services:
217 daemons += self.mgr.cache.get_daemons_by_service(service)
218 else:
219 daemons = [d for d in self.mgr.cache.get_daemons(
220 ) if d.daemon_type in CEPH_UPGRADE_ORDER]
221 if self.upgrade_state.hosts is not None:
222 daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
223 return daemons
224
225 def _get_current_version(self) -> Tuple[int, int, str]:
226 current_version = self.mgr.version.split('ceph version ')[1]
227 (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2)
228 return (int(current_major), int(current_minor), current_version)
229
f67539c2
TL
230 def _check_target_version(self, version: str) -> Optional[str]:
231 try:
33c7a0ef
TL
232 v = version.split('.', 2)
233 (major, minor) = (int(v[0]), int(v[1]))
234 assert minor >= 0
f67539c2
TL
235 # patch might be a number or {number}-g{sha1}
236 except ValueError:
237 return 'version must be in the form X.Y.Z (e.g., 15.2.3)'
33c7a0ef 238 if major < 15 or (major == 15 and minor < 2):
f67539c2
TL
239 return 'cephadm only supports octopus (15.2.0) or later'
240
241 # to far a jump?
33c7a0ef
TL
242 (current_major, current_minor, current_version) = self._get_current_version()
243 if current_major < major - 2:
f67539c2 244 return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump'
33c7a0ef 245 if current_major > major:
f67539c2 246 return f'ceph cannot downgrade major versions (from {current_version} to {version})'
33c7a0ef
TL
247 if current_major == major:
248 if current_minor > minor:
249 return f'ceph cannot downgrade to a {"rc" if minor == 1 else "dev"} release'
f67539c2
TL
250
251 # check mon min
252 monmap = self.mgr.get("mon_map")
253 mon_min = monmap.get("min_mon_release", 0)
33c7a0ef 254 if mon_min < major - 2:
f67539c2
TL
255 return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release'
256
257 # check osd min
258 osdmap = self.mgr.get("osd_map")
259 osd_min_name = osdmap.get("require_osd_release", "argonaut")
260 osd_min = ceph_release_to_major(osd_min_name)
33c7a0ef 261 if osd_min < major - 2:
f67539c2
TL
262 return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release'
263
264 return None
265
33c7a0ef 266 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict:
a4b75251
TL
267 if not image:
268 image = self.mgr.container_image_base
269 reg_name, bare_image = image.split('/', 1)
aee94f69
TL
270 if ':' in bare_image:
271 # for our purposes, we don't want to use the tag here
272 bare_image = bare_image.split(':')[0]
a4b75251 273 reg = Registry(reg_name)
33c7a0ef 274 (current_major, current_minor, _) = self._get_current_version()
a4b75251
TL
275 versions = []
276 r: Dict[Any, Any] = {
277 "image": image,
278 "registry": reg_name,
279 "bare_image": bare_image,
280 }
2a845540
TL
281
282 try:
283 ls = reg.get_tags(bare_image)
284 except ValueError as e:
285 raise OrchestratorError(f'{e}')
a4b75251
TL
286 if not tags:
287 for t in ls:
288 if t[0] != 'v':
289 continue
290 v = t[1:].split('.')
291 if len(v) != 3:
292 continue
293 if '-' in v[2]:
294 continue
33c7a0ef
TL
295 v_major = int(v[0])
296 v_minor = int(v[1])
297 candidate_version = (v_major > current_major
298 or (v_major == current_major and v_minor >= current_minor))
299 if show_all_versions or candidate_version:
300 versions.append('.'.join(v))
a4b75251
TL
301 r["versions"] = sorted(
302 versions,
303 key=lambda k: list(map(int, k.split('.'))),
304 reverse=True
305 )
306 else:
307 r["tags"] = sorted(ls)
308 return r
309
33c7a0ef
TL
310 def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
311 hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
1e59de90
TL
312 fail_fs_value = cast(bool, self.mgr.get_module_option_ex(
313 'orchestrator', 'fail_fs', False))
e306af50
TL
314 if self.mgr.mode != 'root':
315 raise OrchestratorError('upgrade is not supported in %s mode' % (
316 self.mgr.mode))
317 if version:
f67539c2
TL
318 version_error = self._check_target_version(version)
319 if version_error:
320 raise OrchestratorError(version_error)
e306af50
TL
321 target_name = self.mgr.container_image_base + ':v' + version
322 elif image:
f67539c2 323 target_name = normalize_image_digest(image, self.mgr.default_registry)
e306af50
TL
324 else:
325 raise OrchestratorError('must specify either image or version')
33c7a0ef
TL
326
327 if daemon_types is not None or services is not None or hosts is not None:
328 self._validate_upgrade_filters(target_name, daemon_types, hosts, services)
329
e306af50 330 if self.upgrade_state:
f91f0fd5 331 if self.upgrade_state._target_name != target_name:
e306af50
TL
332 raise OrchestratorError(
333 'Upgrade to %s (not %s) already in progress' %
f91f0fd5 334 (self.upgrade_state._target_name, target_name))
f6b5b4d7
TL
335 if self.upgrade_state.paused:
336 self.upgrade_state.paused = False
e306af50 337 self._save_upgrade_state()
f91f0fd5
TL
338 return 'Resumed upgrade to %s' % self.target_image
339 return 'Upgrade to %s in progress' % self.target_image
b3b6e05e
TL
340
341 running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type(
342 'mgr') if daemon.status == DaemonDescriptionStatus.running])
343
344 if running_mgr_count < 2:
345 raise OrchestratorError('Need at least 2 running mgr daemons for upgrade')
346
f67539c2 347 self.mgr.log.info('Upgrade: Started with target %s' % target_name)
f6b5b4d7
TL
348 self.upgrade_state = UpgradeState(
349 target_name=target_name,
33c7a0ef 350 progress_id=str(uuid.uuid4()),
1e59de90 351 fail_fs=fail_fs_value,
33c7a0ef
TL
352 daemon_types=daemon_types,
353 hosts=hosts,
354 services=services,
355 total_count=limit,
356 remaining_count=limit,
f6b5b4d7 357 )
e306af50
TL
358 self._update_upgrade_progress(0.0)
359 self._save_upgrade_state()
360 self._clear_upgrade_health_checks()
361 self.mgr.event.set()
362 return 'Initiating upgrade to %s' % (target_name)
363
33c7a0ef
TL
364 def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None:
365 def _latest_type(dtypes: List[str]) -> str:
366 # [::-1] gives the list in reverse
367 for daemon_type in CEPH_UPGRADE_ORDER[::-1]:
368 if daemon_type in dtypes:
369 return daemon_type
370 return ''
371
372 def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]:
373 # this function takes a list of daemon types and first finds the daemon
374 # type from that list that is latest in our upgrade order. Then, from
375 # that latest type, it filters the list of candidate daemons received
376 # for daemons with types earlier in the upgrade order than the latest
377 # type found earlier. That filtered list of daemons is returned. The
378 # purpose of this function is to help in finding daemons that must have
379 # already been upgraded for the given filtering parameters (--daemon-types,
380 # --services, --hosts) to be valid.
381 latest = _latest_type(dtypes)
382 if not latest:
383 return []
384 earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1]
385 earlier_types = [t for t in earlier_types if t not in dtypes]
386 return [d for d in candidates if d.daemon_type in earlier_types]
387
388 if self.upgrade_state:
389 raise OrchestratorError(
390 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.')
391 try:
1e59de90
TL
392 with self.mgr.async_timeout_handler('cephadm inspect-image'):
393 target_id, target_version, target_digests = self.mgr.wait_async(
394 CephadmServe(self.mgr)._get_container_image_info(target_name))
33c7a0ef
TL
395 except OrchestratorError as e:
396 raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}')
397 # what we need to do here is build a list of daemons that must already be upgraded
398 # in order for the user's selection of daemons to upgrade to be valid. for example,
399 # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block.
400 daemons = [d for d in self.mgr.cache.get_daemons(
aee94f69 401 ) if d.daemon_type not in NON_CEPH_IMAGE_TYPES]
33c7a0ef
TL
402 err_msg_base = 'Cannot start upgrade. '
403 # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters
404 dtypes = []
405 if daemon_types is not None:
406 dtypes = daemon_types
407 if hosts is not None:
408 dtypes = [_latest_type(dtypes)]
409 other_host_daemons = [
410 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
411 daemons = _get_earlier_daemons(dtypes, other_host_daemons)
412 else:
413 daemons = _get_earlier_daemons(dtypes, daemons)
414 err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n'
415 elif services is not None:
416 # for our purposes here we can effectively convert our list of services into the
417 # set of daemon types the services contain. This works because we don't allow --services
418 # and --daemon-types at the same time and we only allow services of the same type
419 sspecs = [
420 self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None]
421 stypes = list(set([s.service_type for s in sspecs]))
422 if len(stypes) != 1:
423 raise OrchestratorError('Doing upgrade by service only support services of one type at '
424 f'a time. Found service types: {stypes}')
425 for stype in stypes:
426 dtypes += orchestrator.service_to_daemon_types(stype)
427 dtypes = list(set(dtypes))
428 if hosts is not None:
429 other_host_daemons = [
430 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
431 daemons = _get_earlier_daemons(dtypes, other_host_daemons)
432 else:
433 daemons = _get_earlier_daemons(dtypes, daemons)
434 err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n'
435 elif hosts is not None:
436 # hosts must be handled a bit differently. For this, we really need to find all the daemon types
437 # that reside on hosts in the list of hosts we will upgrade. Then take the type from
438 # that list that is latest in the upgrade order and check if any daemons on hosts not in the
439 # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded.
440 dtypes = list(
441 set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts]))
442 other_hosts_daemons = [
443 d for d in daemons if d.hostname is not None and d.hostname not in hosts]
444 daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons)
445 err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n'
1e59de90 446 need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests, target_name)
33c7a0ef
TL
447 if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)):
448 # also report active mgr as needing to be upgraded. It is not included in the resulting list
449 # by default as it is treated special and handled via the need_upgrade_self bool
450 n1.insert(0, (self.mgr.mgr_service.get_active_daemon(
451 self.mgr.cache.get_daemons_by_type('mgr')), True))
452 if n1 or n2:
453 raise OrchestratorError(f'{err_msg_base}Please first upgrade '
454 f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n'
455 f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}')
456
e306af50
TL
457 def upgrade_pause(self) -> str:
458 if not self.upgrade_state:
459 raise OrchestratorError('No upgrade in progress')
f6b5b4d7 460 if self.upgrade_state.paused:
f91f0fd5 461 return 'Upgrade to %s already paused' % self.target_image
f6b5b4d7 462 self.upgrade_state.paused = True
f67539c2 463 self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image)
e306af50 464 self._save_upgrade_state()
f91f0fd5 465 return 'Paused upgrade to %s' % self.target_image
e306af50
TL
466
467 def upgrade_resume(self) -> str:
468 if not self.upgrade_state:
469 raise OrchestratorError('No upgrade in progress')
f6b5b4d7 470 if not self.upgrade_state.paused:
f91f0fd5 471 return 'Upgrade to %s not paused' % self.target_image
f6b5b4d7 472 self.upgrade_state.paused = False
2a845540 473 self.upgrade_state.error = ''
f67539c2 474 self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image)
e306af50
TL
475 self._save_upgrade_state()
476 self.mgr.event.set()
39ae355f
TL
477 for alert_id in self.UPGRADE_ERRORS:
478 self.mgr.remove_health_warning(alert_id)
f91f0fd5 479 return 'Resumed upgrade to %s' % self.target_image
e306af50
TL
480
481 def upgrade_stop(self) -> str:
482 if not self.upgrade_state:
483 return 'No upgrade in progress'
f6b5b4d7 484 if self.upgrade_state.progress_id:
e306af50 485 self.mgr.remote('progress', 'complete',
f6b5b4d7 486 self.upgrade_state.progress_id)
f91f0fd5 487 target_image = self.target_image
f67539c2 488 self.mgr.log.info('Upgrade: Stopped')
e306af50
TL
489 self.upgrade_state = None
490 self._save_upgrade_state()
491 self._clear_upgrade_health_checks()
492 self.mgr.event.set()
f91f0fd5 493 return 'Stopped upgrade to %s' % target_image
e306af50
TL
494
495 def continue_upgrade(self) -> bool:
496 """
497 Returns false, if nothing was done.
498 :return:
499 """
f6b5b4d7 500 if self.upgrade_state and not self.upgrade_state.paused:
f67539c2
TL
501 try:
502 self._do_upgrade()
39ae355f
TL
503 except HostConnectionError as e:
504 self._fail_upgrade('UPGRADE_OFFLINE_HOST', {
505 'severity': 'error',
506 'summary': f'Upgrade: Failed to connect to host {e.hostname} at addr ({e.addr})',
507 'count': 1,
508 'detail': [f'SSH connection failed to {e.hostname} at addr ({e.addr}): {str(e)}'],
509 })
510 return False
f67539c2
TL
511 except Exception as e:
512 self._fail_upgrade('UPGRADE_EXCEPTION', {
513 'severity': 'error',
514 'summary': 'Upgrade: failed due to an unexpected exception',
515 'count': 1,
516 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'],
517 })
518 return False
e306af50
TL
519 return True
520 return False
521
f67539c2
TL
522 def _wait_for_ok_to_stop(
523 self, s: DaemonDescription,
524 known: Optional[List[str]] = None, # NOTE: output argument!
525 ) -> bool:
e306af50 526 # only wait a little bit; the service might go away for something
f67539c2
TL
527 assert s.daemon_type is not None
528 assert s.daemon_id is not None
e306af50
TL
529 tries = 4
530 while tries > 0:
f6b5b4d7 531 if not self.upgrade_state or self.upgrade_state.paused:
e306af50 532 return False
f6b5b4d7 533
f67539c2
TL
534 # setting force flag to retain old functionality.
535 # note that known is an output argument for ok_to_stop()
536 r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([
537 s.daemon_id], known=known, force=True)
f6b5b4d7
TL
538
539 if not r.retval:
540 logger.info(f'Upgrade: {r.stdout}')
e306af50 541 return True
f67539c2 542 logger.info(f'Upgrade: {r.stderr}')
f6b5b4d7
TL
543
544 time.sleep(15)
545 tries -= 1
e306af50
TL
546 return False
547
548 def _clear_upgrade_health_checks(self) -> None:
adb31ebb 549 for k in self.UPGRADE_ERRORS:
e306af50
TL
550 if k in self.mgr.health_checks:
551 del self.mgr.health_checks[k]
552 self.mgr.set_health_checks(self.mgr.health_checks)
553
adb31ebb
TL
554 def _fail_upgrade(self, alert_id: str, alert: dict) -> None:
555 assert alert_id in self.UPGRADE_ERRORS
f6b5b4d7 556 if not self.upgrade_state:
f67539c2
TL
557 # this could happen if the user canceled the upgrade while we
558 # were doing something
559 return
f6b5b4d7 560
f67539c2
TL
561 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
562 alert['summary']))
f6b5b4d7
TL
563 self.upgrade_state.error = alert_id + ': ' + alert['summary']
564 self.upgrade_state.paused = True
e306af50
TL
565 self._save_upgrade_state()
566 self.mgr.health_checks[alert_id] = alert
567 self.mgr.set_health_checks(self.mgr.health_checks)
568
adb31ebb 569 def _update_upgrade_progress(self, progress: float) -> None:
f6b5b4d7
TL
570 if not self.upgrade_state:
571 assert False, 'No upgrade in progress'
572
573 if not self.upgrade_state.progress_id:
574 self.upgrade_state.progress_id = str(uuid.uuid4())
e306af50 575 self._save_upgrade_state()
f6b5b4d7 576 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
f67539c2
TL
577 ev_msg='Upgrade to %s' % (
578 self.upgrade_state.target_version or self.target_image
579 ),
580 ev_progress=progress,
581 add_to_ceph_s=True)
e306af50
TL
582
583 def _save_upgrade_state(self) -> None:
f6b5b4d7
TL
584 if not self.upgrade_state:
585 self.mgr.set_store('upgrade_state', None)
586 return
587 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
e306af50 588
f91f0fd5
TL
589 def get_distinct_container_image_settings(self) -> Dict[str, str]:
590 # get all distinct container_image settings
591 image_settings = {}
592 ret, out, err = self.mgr.check_mon_command({
593 'prefix': 'config dump',
594 'format': 'json',
595 })
596 config = json.loads(out)
597 for opt in config:
598 if opt['name'] == 'container_image':
599 image_settings[opt['section']] = opt['value']
600 return image_settings
601
f67539c2
TL
602 def _prepare_for_mds_upgrade(
603 self,
604 target_major: str,
605 need_upgrade: List[DaemonDescription]
606 ) -> bool:
f67539c2
TL
607 # scale down all filesystems to 1 MDS
608 assert self.upgrade_state
609 if not self.upgrade_state.fs_original_max_mds:
610 self.upgrade_state.fs_original_max_mds = {}
a4b75251
TL
611 if not self.upgrade_state.fs_original_allow_standby_replay:
612 self.upgrade_state.fs_original_allow_standby_replay = {}
f67539c2
TL
613 fsmap = self.mgr.get("fs_map")
614 continue_upgrade = True
a4b75251
TL
615 for fs in fsmap.get('filesystems', []):
616 fscid = fs["id"]
617 mdsmap = fs["mdsmap"]
618 fs_name = mdsmap["fs_name"]
619
620 # disable allow_standby_replay?
621 if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY:
622 self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % (
623 fs_name
624 ))
625 if fscid not in self.upgrade_state.fs_original_allow_standby_replay:
626 self.upgrade_state.fs_original_allow_standby_replay[fscid] = True
627 self._save_upgrade_state()
628 ret, out, err = self.mgr.check_mon_command({
629 'prefix': 'fs set',
630 'fs_name': fs_name,
631 'var': 'allow_standby_replay',
632 'val': '0',
633 })
634 continue_upgrade = False
635 continue
f67539c2
TL
636
637 # scale down this filesystem?
a4b75251 638 if mdsmap["max_mds"] > 1:
1e59de90
TL
639 if self.upgrade_state.fail_fs:
640 if not (mdsmap['flags'] & CEPH_MDSMAP_NOT_JOINABLE) and \
641 len(mdsmap['up']) > 0:
642 self.mgr.log.info(f'Upgrade: failing fs {fs_name} for '
643 f'rapid multi-rank mds upgrade')
644 ret, out, err = self.mgr.check_mon_command({
645 'prefix': 'fs fail',
646 'fs_name': fs_name
647 })
648 if ret != 0:
649 continue_upgrade = False
650 continue
651 else:
652 self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
653 fs_name
654 ))
655 if fscid not in self.upgrade_state.fs_original_max_mds:
656 self.upgrade_state.fs_original_max_mds[fscid] = \
657 mdsmap['max_mds']
658 self._save_upgrade_state()
659 ret, out, err = self.mgr.check_mon_command({
660 'prefix': 'fs set',
661 'fs_name': fs_name,
662 'var': 'max_mds',
663 'val': '1',
664 })
665 continue_upgrade = False
666 continue
f67539c2 667
1e59de90
TL
668 if not self.upgrade_state.fail_fs:
669 if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
670 self.mgr.log.info(
671 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (
672 fs_name))
673 time.sleep(10)
674 continue_upgrade = False
675 continue
f67539c2 676
a4b75251 677 if len(mdsmap['up']) == 0:
20effc67
TL
678 self.mgr.log.warning(
679 "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction")
a4b75251
TL
680 # This can happen because the current version MDS have
681 # incompatible compatsets; the mons will not do any promotions.
682 # We must upgrade to continue.
683 elif len(mdsmap['up']) > 0:
684 mdss = list(mdsmap['info'].values())
685 assert len(mdss) == 1
686 lone_mds = mdss[0]
687 if lone_mds['state'] != 'up:active':
688 self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % (
689 lone_mds['name'],
690 lone_mds['state'],
691 ))
692 time.sleep(10)
693 continue_upgrade = False
694 continue
695 else:
696 assert False
f67539c2
TL
697
698 return continue_upgrade
699
b3b6e05e
TL
700 def _enough_mons_for_ok_to_stop(self) -> bool:
701 # type () -> bool
702 ret, out, err = self.mgr.check_mon_command({
703 'prefix': 'quorum_status',
704 })
705 try:
706 j = json.loads(out)
707 except Exception:
708 raise OrchestratorError('failed to parse quorum status')
709
710 mons = [m['name'] for m in j['monmap']['mons']]
711 return len(mons) > 2
712
713 def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool:
714 # type (DaemonDescription) -> bool
715
716 # find fs this mds daemon belongs to
717 fsmap = self.mgr.get("fs_map")
a4b75251
TL
718 for fs in fsmap.get('filesystems', []):
719 mdsmap = fs["mdsmap"]
720 fs_name = mdsmap["fs_name"]
b3b6e05e
TL
721
722 assert mds_daemon.daemon_id
723 if fs_name != mds_daemon.service_name().split('.', 1)[1]:
724 # wrong fs for this mds daemon
725 continue
726
727 # get number of mds daemons for this fs
728 mds_count = len(
729 [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())])
730
731 # standby mds daemons for this fs?
a4b75251 732 if mdsmap["max_mds"] < mds_count:
b3b6e05e
TL
733 return True
734 return False
735
736 return True # if mds has no fs it should pass ok-to-stop
737
1e59de90 738 def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None, target_name: Optional[str] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]:
33c7a0ef
TL
739 # this function takes a list of daemons and container digests. The purpose
740 # is to go through each daemon and check if the current container digests
741 # for that daemon match the target digests. The purpose being that we determine
742 # if a daemon is upgraded to a certain container image or not based on what
743 # container digests it has. By checking the current digests against the
744 # targets we can determine which daemons still need to be upgraded
745 need_upgrade_self = False
746 need_upgrade: List[Tuple[DaemonDescription, bool]] = []
747 need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = []
748 done = 0
749 if target_digests is None:
750 target_digests = []
1e59de90
TL
751 if target_name is None:
752 target_name = ''
33c7a0ef
TL
753 for d in daemons:
754 assert d.daemon_type is not None
755 assert d.daemon_id is not None
756 assert d.hostname is not None
757 if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname):
758 continue
1e59de90
TL
759 correct_image = False
760 # check if the container digest for the digest we're upgrading to matches
761 # the container digest for the daemon if "use_repo_digest" setting is true
762 # or that the image name matches the daemon's image name if "use_repo_digest"
763 # is false. The idea is to generally check if the daemon is already using
764 # the image we're upgrading to or not. Additionally, since monitoring stack
765 # daemons are included in the upgrade process but don't use the ceph images
766 # we are assuming any monitoring stack daemon is on the "correct" image already
767 if (
768 (self.mgr.use_repo_digest and d.matches_digests(target_digests))
769 or (not self.mgr.use_repo_digest and d.matches_image_name(target_name))
aee94f69 770 or (d.daemon_type in NON_CEPH_IMAGE_TYPES)
1e59de90
TL
771 ):
772 logger.debug('daemon %s.%s on correct image' % (
33c7a0ef 773 d.daemon_type, d.daemon_id))
1e59de90
TL
774 correct_image = True
775 # do deployed_by check using digest no matter what. We don't care
776 # what repo the image used to deploy the daemon was as long
777 # as the image content is correct
33c7a0ef
TL
778 if any(d in target_digests for d in (d.deployed_by or [])):
779 logger.debug('daemon %s.%s deployed by correct version' % (
780 d.daemon_type, d.daemon_id))
781 done += 1
782 continue
783
784 if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
785 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
786 self.mgr.get_mgr_id())
787 need_upgrade_self = True
788 continue
789
1e59de90 790 if correct_image:
33c7a0ef
TL
791 logger.debug('daemon %s.%s not deployed by correct version' % (
792 d.daemon_type, d.daemon_id))
793 need_upgrade_deployer.append((d, True))
794 else:
795 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
796 d.daemon_type, d.daemon_id,
797 d.container_image_name, d.container_image_digests, d.version))
798 need_upgrade.append((d, False))
799
800 return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done)
801
802 def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]:
803 to_upgrade: List[Tuple[DaemonDescription, bool]] = []
804 known_ok_to_stop: List[str] = []
805 for d_entry in need_upgrade:
806 d = d_entry[0]
807 assert d.daemon_type is not None
808 assert d.daemon_id is not None
809 assert d.hostname is not None
810
811 if not d.container_image_id:
812 if d.container_image_name == target_image:
813 logger.debug(
814 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
815 continue
816
817 if known_ok_to_stop:
818 if d.name() in known_ok_to_stop:
819 logger.info(f'Upgrade: {d.name()} is also safe to restart')
820 to_upgrade.append(d_entry)
821 continue
822
823 if d.daemon_type == 'osd':
824 # NOTE: known_ok_to_stop is an output argument for
825 # _wait_for_ok_to_stop
826 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
827 return False, to_upgrade
828
829 if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
830 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
831 return False, to_upgrade
832
833 if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
1e59de90
TL
834 # when fail_fs is set to true, all MDS daemons will be moved to
835 # up:standby state, so Cephadm won't be able to upgrade due to
836 # this check and and will warn with "It is NOT safe to stop
837 # mds.<daemon_name> at this time: one or more filesystems is
838 # currently degraded", therefore we bypass this check for that
839 # case.
840 assert self.upgrade_state is not None
841 if not self.upgrade_state.fail_fs \
842 and not self._wait_for_ok_to_stop(d, known_ok_to_stop):
33c7a0ef
TL
843 return False, to_upgrade
844
845 to_upgrade.append(d_entry)
846
847 # if we don't have a list of others to consider, stop now
848 if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
849 break
850 return True, to_upgrade
851
852 def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None:
853 assert self.upgrade_state is not None
854 num = 1
855 if target_digests is None:
856 target_digests = []
857 for d_entry in to_upgrade:
858 if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]:
859 self.mgr.log.info(
860 f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade')
861 return
862 d = d_entry[0]
863 assert d.daemon_type is not None
864 assert d.daemon_id is not None
865 assert d.hostname is not None
866
867 # make sure host has latest container image
1e59de90
TL
868 with self.mgr.async_timeout_handler(d.hostname, 'cephadm inspect-image'):
869 out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
870 d.hostname, '', 'inspect-image', [],
871 image=target_image, no_fsid=True, error_ok=True))
33c7a0ef
TL
872 if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])):
873 logger.info('Upgrade: Pulling %s on %s' % (target_image,
874 d.hostname))
875 self.upgrade_info_str = 'Pulling %s image on host %s' % (
876 target_image, d.hostname)
1e59de90
TL
877 with self.mgr.async_timeout_handler(d.hostname, 'cephadm pull'):
878 out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
879 d.hostname, '', 'pull', [],
880 image=target_image, no_fsid=True, error_ok=True))
33c7a0ef
TL
881 if code:
882 self._fail_upgrade('UPGRADE_FAILED_PULL', {
883 'severity': 'warning',
884 'summary': 'Upgrade: failed to pull target image',
885 'count': 1,
886 'detail': [
887 'failed to pull %s on host %s' % (target_image,
888 d.hostname)],
889 })
890 return
891 r = json.loads(''.join(out))
892 if not any(d in target_digests for d in r.get('repo_digests', [])):
893 logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % (
894 target_image, d.hostname, r['repo_digests'], target_digests))
895 self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % (
896 target_image, d.hostname, r['repo_digests'], target_digests)
897 self.upgrade_state.target_digests = r['repo_digests']
898 self._save_upgrade_state()
899 return
900
901 self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type)
902
903 if len(to_upgrade) > 1:
904 logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade),
905 self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999)))
906 else:
907 logger.info('Upgrade: Updating %s.%s' %
908 (d.daemon_type, d.daemon_id))
909 action = 'Upgrading' if not d_entry[1] else 'Redeploying'
910 try:
911 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d)
912 self.mgr._daemon_action(
913 daemon_spec,
914 'redeploy',
915 image=target_image if not d_entry[1] else None
916 )
917 self.mgr.cache.metadata_up_to_date[d.hostname] = False
918 except Exception as e:
919 self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
920 'severity': 'warning',
921 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.',
922 'count': 1,
923 'detail': [
924 f'Upgrade daemon: {d.name()}: {e}'
925 ],
926 })
927 return
928 num += 1
929 if self.upgrade_state.remaining_count is not None and not d_entry[1]:
930 self.upgrade_state.remaining_count -= 1
931 self._save_upgrade_state()
932
933 def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None:
934 if need_upgrade_self:
935 try:
936 self.mgr.mgr_service.fail_over()
937 except OrchestratorError as e:
938 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
939 'severity': 'warning',
940 'summary': f'Upgrade: {e}',
941 'count': 1,
942 'detail': [
943 'The upgrade process needs to upgrade the mgr, '
944 'but it needs at least one standby to proceed.',
945 ],
946 })
947 return
948
949 return # unreachable code, as fail_over never returns
950 elif upgrading_mgrs:
951 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
952 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
953 self.mgr.set_health_checks(self.mgr.health_checks)
954
955 def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None:
956 # push down configs
957 daemon_type_section = name_to_config_section(daemon_type)
958 if image_settings.get(daemon_type_section) != target_image:
959 logger.info('Upgrade: Setting container_image for all %s' %
960 daemon_type)
961 self.mgr.set_container_image(daemon_type_section, target_image)
962 to_clean = []
963 for section in image_settings.keys():
964 if section.startswith(name_to_config_section(daemon_type) + '.'):
965 to_clean.append(section)
966 if to_clean:
967 logger.debug('Upgrade: Cleaning up container_image for %s' %
968 to_clean)
969 for section in to_clean:
970 ret, image, err = self.mgr.check_mon_command({
971 'prefix': 'config rm',
972 'name': 'container_image',
973 'who': section,
974 })
975
976 def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None:
977 osdmap = self.mgr.get("osd_map")
978 osd_min_name = osdmap.get("require_osd_release", "argonaut")
979 osd_min = ceph_release_to_major(osd_min_name)
980 if osd_min < int(target_major):
981 logger.info(
982 f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}')
983 ret, _, err = self.mgr.check_mon_command({
984 'prefix': 'osd require-osd-release',
985 'release': target_major_name,
986 })
987
988 def _complete_mds_upgrade(self) -> None:
989 assert self.upgrade_state is not None
1e59de90
TL
990 if self.upgrade_state.fail_fs:
991 for fs in self.mgr.get("fs_map")['filesystems']:
992 fs_name = fs['mdsmap']['fs_name']
993 self.mgr.log.info('Upgrade: Setting filesystem '
994 f'{fs_name} Joinable')
995 try:
996 ret, _, err = self.mgr.check_mon_command({
997 'prefix': 'fs set',
998 'fs_name': fs_name,
999 'var': 'joinable',
1000 'val': 'true',
1001 })
1002 except Exception as e:
1003 logger.error("Failed to set fs joinable "
1004 f"true due to {e}")
1005 raise OrchestratorError("Failed to set"
1006 "fs joinable true"
1007 f"due to {e}")
1008 elif self.upgrade_state.fs_original_max_mds:
33c7a0ef
TL
1009 for fs in self.mgr.get("fs_map")['filesystems']:
1010 fscid = fs["id"]
1011 fs_name = fs['mdsmap']['fs_name']
1012 new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1)
1013 if new_max > 1:
1014 self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % (
1015 fs_name, new_max
1016 ))
1017 ret, _, err = self.mgr.check_mon_command({
1018 'prefix': 'fs set',
1019 'fs_name': fs_name,
1020 'var': 'max_mds',
1021 'val': str(new_max),
1022 })
1023
1024 self.upgrade_state.fs_original_max_mds = {}
1025 self._save_upgrade_state()
1026 if self.upgrade_state.fs_original_allow_standby_replay:
1027 for fs in self.mgr.get("fs_map")['filesystems']:
1028 fscid = fs["id"]
1029 fs_name = fs['mdsmap']['fs_name']
1030 asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False)
1031 if asr:
1032 self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % (
1033 fs_name
1034 ))
1035 ret, _, err = self.mgr.check_mon_command({
1036 'prefix': 'fs set',
1037 'fs_name': fs_name,
1038 'var': 'allow_standby_replay',
1039 'val': '1'
1040 })
1041
1042 self.upgrade_state.fs_original_allow_standby_replay = {}
1043 self._save_upgrade_state()
1044
1045 def _mark_upgrade_complete(self) -> None:
1046 if not self.upgrade_state:
1047 logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting')
1048 return
1049 logger.info('Upgrade: Complete!')
1050 if self.upgrade_state.progress_id:
1051 self.mgr.remote('progress', 'complete',
1052 self.upgrade_state.progress_id)
1053 self.upgrade_state = None
1054 self._save_upgrade_state()
1055
e306af50
TL
1056 def _do_upgrade(self):
1057 # type: () -> None
1058 if not self.upgrade_state:
1059 logger.debug('_do_upgrade no state, exiting')
1060 return
1061
39ae355f
TL
1062 if self.mgr.offline_hosts:
1063 # offline host(s), on top of potential connection errors when trying to upgrade a daemon
1064 # or pull an image, can cause issues where daemons are never ok to stop. Since evaluating
1065 # whether or not that risk is present for any given offline hosts is a difficult problem,
1066 # it's best to just fail upgrade cleanly so user can address the offline host(s)
1067
1068 # the HostConnectionError expects a hostname and addr, so let's just take
1069 # one at random. It doesn't really matter which host we say we couldn't reach here.
1070 hostname: str = list(self.mgr.offline_hosts)[0]
1071 addr: str = self.mgr.inventory.get_addr(hostname)
1072 raise HostConnectionError(f'Host(s) were marked offline: {self.mgr.offline_hosts}', hostname, addr)
1073
f91f0fd5 1074 target_image = self.target_image
f6b5b4d7 1075 target_id = self.upgrade_state.target_id
f67539c2
TL
1076 target_digests = self.upgrade_state.target_digests
1077 target_version = self.upgrade_state.target_version
1078
1079 first = False
1080 if not target_id or not target_version or not target_digests:
e306af50 1081 # need to learn the container hash
f91f0fd5 1082 logger.info('Upgrade: First pull of %s' % target_image)
33c7a0ef 1083 self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
e306af50 1084 try:
1e59de90
TL
1085 with self.mgr.async_timeout_handler(f'cephadm inspect-image (image {target_image})'):
1086 target_id, target_version, target_digests = self.mgr.wait_async(
1087 CephadmServe(self.mgr)._get_container_image_info(target_image))
e306af50
TL
1088 except OrchestratorError as e:
1089 self._fail_upgrade('UPGRADE_FAILED_PULL', {
1090 'severity': 'warning',
1091 'summary': 'Upgrade: failed to pull target image',
1092 'count': 1,
1093 'detail': [str(e)],
1094 })
1095 return
f67539c2
TL
1096 if not target_version:
1097 self._fail_upgrade('UPGRADE_FAILED_PULL', {
1098 'severity': 'warning',
1099 'summary': 'Upgrade: failed to pull target image',
1100 'count': 1,
1101 'detail': ['unable to extract ceph version from container'],
1102 })
1103 return
f6b5b4d7 1104 self.upgrade_state.target_id = target_id
f67539c2
TL
1105 # extract the version portion of 'ceph version {version} ({sha1})'
1106 self.upgrade_state.target_version = target_version.split(' ')[2]
1107 self.upgrade_state.target_digests = target_digests
e306af50 1108 self._save_upgrade_state()
f91f0fd5 1109 target_image = self.target_image
f67539c2
TL
1110 first = True
1111
1112 if target_digests is None:
1113 target_digests = []
1114 if target_version.startswith('ceph version '):
1115 # tolerate/fix upgrade state from older version
1116 self.upgrade_state.target_version = target_version.split(' ')[2]
1117 target_version = self.upgrade_state.target_version
1118 (target_major, _) = target_version.split('.', 1)
1119 target_major_name = self.mgr.lookup_release_name(int(target_major))
1120
1121 if first:
1122 logger.info('Upgrade: Target is version %s (%s)' % (
1123 target_version, target_major_name))
1124 logger.info('Upgrade: Target container is %s, digests %s' % (
1125 target_image, target_digests))
1126
1127 version_error = self._check_target_version(target_version)
1128 if version_error:
1129 self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', {
1130 'severity': 'error',
1131 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}',
1132 'count': 1,
1133 'detail': [version_error],
1134 })
1135 return
e306af50 1136
f91f0fd5 1137 image_settings = self.get_distinct_container_image_settings()
e306af50 1138
a4b75251
TL
1139 # Older monitors (pre-v16.2.5) asserted that FSMap::compat ==
1140 # MDSMap::compat for all fs. This is no longer the case beginning in
1141 # v16.2.5. We must disable the sanity checks during upgrade.
1142 # N.B.: we don't bother confirming the operator has not already
1143 # disabled this or saving the config value.
1144 self.mgr.check_mon_command({
1145 'prefix': 'config set',
1146 'name': 'mon_mds_skip_sanity',
1147 'value': '1',
1148 'who': 'mon',
1149 })
1150
33c7a0ef
TL
1151 if self.upgrade_state.daemon_types is not None:
1152 logger.debug(
1153 f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}')
1154 daemons = [d for d in self.mgr.cache.get_daemons(
1155 ) if d.daemon_type in self.upgrade_state.daemon_types]
1156 elif self.upgrade_state.services is not None:
1157 logger.debug(
1158 f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}')
1159 daemons = []
1160 for service in self.upgrade_state.services:
1161 daemons += self.mgr.cache.get_daemons_by_service(service)
1162 else:
1163 daemons = [d for d in self.mgr.cache.get_daemons(
1164 ) if d.daemon_type in CEPH_UPGRADE_ORDER]
1165 if self.upgrade_state.hosts is not None:
1166 logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}')
1167 daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts]
1168 upgraded_daemon_count: int = 0
e306af50 1169 for daemon_type in CEPH_UPGRADE_ORDER:
33c7a0ef
TL
1170 if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0:
1171 # we hit our limit and should end the upgrade
1172 # except for cases where we only need to redeploy, but not actually upgrade
1173 # the image (which we don't count towards our limit). This case only occurs with mgr
1174 # and monitoring stack daemons. Additionally, this case is only valid if
1175 # the active mgr is already upgraded.
1176 if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
aee94f69 1177 if daemon_type not in NON_CEPH_IMAGE_TYPES and daemon_type != 'mgr':
f67539c2 1178 continue
f67539c2 1179 else:
33c7a0ef
TL
1180 self._mark_upgrade_complete()
1181 return
1182 logger.debug('Upgrade: Checking %s daemons' % daemon_type)
1183 daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
1184
1185 need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(
1e59de90 1186 daemons_of_type, target_digests, target_image)
33c7a0ef
TL
1187 upgraded_daemon_count += done
1188 self._update_upgrade_progress(upgraded_daemon_count / len(daemons))
1189
aee94f69
TL
1190 # make sure mgr and non-ceph-image daemons are properly redeployed in staggered upgrade scenarios
1191 if daemon_type == 'mgr' or daemon_type in NON_CEPH_IMAGE_TYPES:
33c7a0ef
TL
1192 if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
1193 need_upgrade_names = [d[0].name() for d in need_upgrade] + \
1194 [d[0].name() for d in need_upgrade_deployer]
1195 dds = [d for d in self.mgr.cache.get_daemons_by_type(
1196 daemon_type) if d.name() not in need_upgrade_names]
1e59de90 1197 need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests, target_image)
33c7a0ef
TL
1198 if not n1:
1199 if not need_upgrade_self and need_upgrade_active:
1200 need_upgrade_self = True
1201 need_upgrade_deployer += n2
1202 else:
1203 # no point in trying to redeploy with new version if active mgr is not on the new version
1204 need_upgrade_deployer = []
f67539c2 1205
39ae355f 1206 if any(d in target_digests for d in self.mgr.get_active_mgr_digests()):
f67539c2
TL
1207 # only after the mgr itself is upgraded can we expect daemons to have
1208 # deployed_by == target_digests
1209 need_upgrade += need_upgrade_deployer
1210
1211 # prepare filesystems for daemon upgrades?
1212 if (
1213 daemon_type == 'mds'
1214 and need_upgrade
1215 and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade])
1216 ):
1217 return
1218
1219 if need_upgrade:
1220 self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type)
1221
33c7a0ef
TL
1222 _continue, to_upgrade = self._to_upgrade(need_upgrade, target_image)
1223 if not _continue:
1224 return
1225 self._upgrade_daemons(to_upgrade, target_image, target_digests)
f67539c2 1226 if to_upgrade:
e306af50
TL
1227 return
1228
33c7a0ef
TL
1229 self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
1230
1231 # following bits of _do_upgrade are for completing upgrade for given
1232 # types. If we haven't actually finished upgrading all the daemons
1233 # of this type, we should exit the loop here
1234 _, n1, n2, _ = self._detect_need_upgrade(
1e59de90 1235 self.mgr.cache.get_daemons_by_type(daemon_type), target_digests, target_image)
33c7a0ef
TL
1236 if n1 or n2:
1237 continue
1238
f67539c2
TL
1239 # complete mon upgrade?
1240 if daemon_type == 'mon':
1241 if not self.mgr.get("have_local_config_map"):
1242 logger.info('Upgrade: Restarting mgr now that mons are running pacific')
1243 need_upgrade_self = True
1244
33c7a0ef 1245 self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr')
e306af50
TL
1246
1247 # make sure 'ceph versions' agrees
f6b5b4d7 1248 ret, out_ver, err = self.mgr.check_mon_command({
e306af50
TL
1249 'prefix': 'versions',
1250 })
f6b5b4d7 1251 j = json.loads(out_ver)
e306af50 1252 for version, count in j.get(daemon_type, {}).items():
f67539c2
TL
1253 short_version = version.split(' ')[2]
1254 if short_version != target_version:
e306af50
TL
1255 logger.warning(
1256 'Upgrade: %d %s daemon(s) are %s != target %s' %
f67539c2 1257 (count, daemon_type, short_version, target_version))
e306af50 1258
33c7a0ef 1259 self._set_container_images(daemon_type, target_image, image_settings)
e306af50 1260
f67539c2
TL
1261 # complete osd upgrade?
1262 if daemon_type == 'osd':
33c7a0ef 1263 self._complete_osd_upgrade(target_major, target_major_name)
f67539c2
TL
1264
1265 # complete mds upgrade?
a4b75251 1266 if daemon_type == 'mds':
33c7a0ef 1267 self._complete_mds_upgrade()
e306af50 1268
20effc67
TL
1269 # Make sure all metadata is up to date before saying we are done upgrading this daemon type
1270 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
1271 self.mgr.agent_helpers._request_ack_all_not_up_to_date()
1272 return
1273
33c7a0ef 1274 logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type)
20effc67 1275
e306af50
TL
1276 # clean up
1277 logger.info('Upgrade: Finalizing container_image settings')
f91f0fd5
TL
1278 self.mgr.set_container_image('global', target_image)
1279
e306af50
TL
1280 for daemon_type in CEPH_UPGRADE_ORDER:
1281 ret, image, err = self.mgr.check_mon_command({
1282 'prefix': 'config rm',
1283 'name': 'container_image',
1284 'who': name_to_config_section(daemon_type),
1285 })
1286
a4b75251
TL
1287 self.mgr.check_mon_command({
1288 'prefix': 'config rm',
1289 'name': 'mon_mds_skip_sanity',
1290 'who': 'mon',
1291 })
1292
33c7a0ef 1293 self._mark_upgrade_complete()
e306af50 1294 return