]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import json |
2 | import logging | |
3 | import time | |
4 | import uuid | |
a4b75251 | 5 | from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any |
e306af50 TL |
6 | |
7 | import orchestrator | |
a4b75251 | 8 | from cephadm.registry import Registry |
f67539c2 TL |
9 | from cephadm.serve import CephadmServe |
10 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec | |
11 | from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, MONITORING_STACK_TYPES | |
b3b6e05e | 12 | from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service |
e306af50 TL |
13 | |
14 | if TYPE_CHECKING: | |
15 | from .module import CephadmOrchestrator | |
16 | ||
17 | ||
e306af50 TL |
18 | logger = logging.getLogger(__name__) |
19 | ||
a4b75251 TL |
20 | # from ceph_fs.h |
21 | CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5) | |
22 | ||
f6b5b4d7 | 23 | |
f67539c2 | 24 | def normalize_image_digest(digest: str, default_registry: str) -> str: |
20effc67 TL |
25 | """ |
26 | Normal case: | |
27 | >>> normalize_image_digest('ceph/ceph', 'docker.io') | |
28 | 'docker.io/ceph/ceph' | |
29 | ||
30 | No change: | |
31 | >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io') | |
32 | 'quay.ceph.io/ceph/ceph' | |
33 | ||
34 | >>> normalize_image_digest('docker.io/ubuntu', 'docker.io') | |
35 | 'docker.io/ubuntu' | |
36 | ||
37 | >>> normalize_image_digest('localhost/ceph', 'docker.io') | |
38 | 'localhost/ceph' | |
39 | """ | |
40 | known_shortnames = [ | |
41 | 'ceph/ceph', | |
42 | 'ceph/daemon', | |
43 | 'ceph/daemon-base', | |
44 | ] | |
45 | for image in known_shortnames: | |
46 | if digest.startswith(image): | |
47 | return f'{default_registry}/{digest}' | |
f67539c2 TL |
48 | return digest |
49 | ||
50 | ||
f6b5b4d7 TL |
51 | class UpgradeState: |
52 | def __init__(self, | |
53 | target_name: str, | |
54 | progress_id: str, | |
55 | target_id: Optional[str] = None, | |
f67539c2 | 56 | target_digests: Optional[List[str]] = None, |
f6b5b4d7 TL |
57 | target_version: Optional[str] = None, |
58 | error: Optional[str] = None, | |
59 | paused: Optional[bool] = None, | |
f67539c2 | 60 | fs_original_max_mds: Optional[Dict[str, int]] = None, |
a4b75251 | 61 | fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None |
f6b5b4d7 | 62 | ): |
f91f0fd5 | 63 | self._target_name: str = target_name # Use CephadmUpgrade.target_image instead. |
f6b5b4d7 TL |
64 | self.progress_id: str = progress_id |
65 | self.target_id: Optional[str] = target_id | |
f67539c2 | 66 | self.target_digests: Optional[List[str]] = target_digests |
f6b5b4d7 TL |
67 | self.target_version: Optional[str] = target_version |
68 | self.error: Optional[str] = error | |
69 | self.paused: bool = paused or False | |
f67539c2 | 70 | self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds |
20effc67 TL |
71 | self.fs_original_allow_standby_replay: Optional[Dict[str, |
72 | bool]] = fs_original_allow_standby_replay | |
f6b5b4d7 TL |
73 | |
74 | def to_json(self) -> dict: | |
75 | return { | |
f91f0fd5 | 76 | 'target_name': self._target_name, |
f6b5b4d7 TL |
77 | 'progress_id': self.progress_id, |
78 | 'target_id': self.target_id, | |
f67539c2 | 79 | 'target_digests': self.target_digests, |
f6b5b4d7 | 80 | 'target_version': self.target_version, |
f67539c2 | 81 | 'fs_original_max_mds': self.fs_original_max_mds, |
a4b75251 | 82 | 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay, |
f6b5b4d7 TL |
83 | 'error': self.error, |
84 | 'paused': self.paused, | |
85 | } | |
86 | ||
87 | @classmethod | |
adb31ebb | 88 | def from_json(cls, data: dict) -> Optional['UpgradeState']: |
f91f0fd5 | 89 | if data: |
f67539c2 TL |
90 | c = {k: v for k, v in data.items()} |
91 | if 'repo_digest' in c: | |
92 | c['target_digests'] = [c.pop('repo_digest')] | |
93 | return cls(**c) | |
f91f0fd5 TL |
94 | else: |
95 | return None | |
f6b5b4d7 TL |
96 | |
97 | ||
e306af50 | 98 | class CephadmUpgrade: |
adb31ebb TL |
99 | UPGRADE_ERRORS = [ |
100 | 'UPGRADE_NO_STANDBY_MGR', | |
101 | 'UPGRADE_FAILED_PULL', | |
102 | 'UPGRADE_REDEPLOY_DAEMON', | |
f67539c2 TL |
103 | 'UPGRADE_BAD_TARGET_VERSION', |
104 | 'UPGRADE_EXCEPTION' | |
adb31ebb TL |
105 | ] |
106 | ||
e306af50 TL |
107 | def __init__(self, mgr: "CephadmOrchestrator"): |
108 | self.mgr = mgr | |
109 | ||
110 | t = self.mgr.get_store('upgrade_state') | |
111 | if t: | |
f6b5b4d7 | 112 | self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t)) |
e306af50 TL |
113 | else: |
114 | self.upgrade_state = None | |
115 | ||
f91f0fd5 TL |
116 | @property |
117 | def target_image(self) -> str: | |
118 | assert self.upgrade_state | |
119 | if not self.mgr.use_repo_digest: | |
120 | return self.upgrade_state._target_name | |
f67539c2 | 121 | if not self.upgrade_state.target_digests: |
f91f0fd5 TL |
122 | return self.upgrade_state._target_name |
123 | ||
f67539c2 TL |
124 | # FIXME: we assume the first digest is the best one to use |
125 | return self.upgrade_state.target_digests[0] | |
f91f0fd5 | 126 | |
e306af50 TL |
127 | def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: |
128 | r = orchestrator.UpgradeStatusSpec() | |
129 | if self.upgrade_state: | |
f91f0fd5 | 130 | r.target_image = self.target_image |
e306af50 | 131 | r.in_progress = True |
f67539c2 TL |
132 | r.progress, r.services_complete = self._get_upgrade_info() |
133 | # accessing self.upgrade_info_str will throw an exception if it | |
134 | # has not been set in _do_upgrade yet | |
135 | try: | |
136 | r.message = self.upgrade_info_str | |
137 | except AttributeError: | |
138 | pass | |
f6b5b4d7 TL |
139 | if self.upgrade_state.error: |
140 | r.message = 'Error: ' + self.upgrade_state.error | |
141 | elif self.upgrade_state.paused: | |
e306af50 TL |
142 | r.message = 'Upgrade paused' |
143 | return r | |
144 | ||
f67539c2 TL |
145 | def _get_upgrade_info(self) -> Tuple[str, List[str]]: |
146 | if not self.upgrade_state or not self.upgrade_state.target_digests: | |
147 | return '', [] | |
148 | ||
149 | daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER] | |
150 | ||
151 | if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'): | |
152 | return '', [] | |
153 | ||
154 | completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in ( | |
155 | d.container_image_digests or []))) for d in daemons if d.daemon_type] | |
156 | ||
157 | done = len([True for completion in completed_daemons if completion[1]]) | |
158 | ||
159 | completed_types = list(set([completion[0] for completion in completed_daemons if all( | |
160 | c[1] for c in completed_daemons if c[0] == completion[0])])) | |
161 | ||
522d829b | 162 | return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types |
f67539c2 TL |
163 | |
164 | def _check_target_version(self, version: str) -> Optional[str]: | |
165 | try: | |
166 | (major, minor, _) = version.split('.', 2) | |
167 | assert int(minor) >= 0 | |
168 | # patch might be a number or {number}-g{sha1} | |
169 | except ValueError: | |
170 | return 'version must be in the form X.Y.Z (e.g., 15.2.3)' | |
171 | if int(major) < 15 or (int(major) == 15 and int(minor) < 2): | |
172 | return 'cephadm only supports octopus (15.2.0) or later' | |
173 | ||
174 | # to far a jump? | |
175 | current_version = self.mgr.version.split('ceph version ')[1] | |
176 | (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2) | |
177 | if int(current_major) < int(major) - 2: | |
178 | return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump' | |
179 | if int(current_major) > int(major): | |
180 | return f'ceph cannot downgrade major versions (from {current_version} to {version})' | |
181 | if int(current_major) == int(major): | |
182 | if int(current_minor) > int(minor): | |
183 | return f'ceph cannot downgrade to a {"rc" if minor == "1" else "dev"} release' | |
184 | ||
185 | # check mon min | |
186 | monmap = self.mgr.get("mon_map") | |
187 | mon_min = monmap.get("min_mon_release", 0) | |
188 | if mon_min < int(major) - 2: | |
189 | return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release' | |
190 | ||
191 | # check osd min | |
192 | osdmap = self.mgr.get("osd_map") | |
193 | osd_min_name = osdmap.get("require_osd_release", "argonaut") | |
194 | osd_min = ceph_release_to_major(osd_min_name) | |
195 | if osd_min < int(major) - 2: | |
196 | return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release' | |
197 | ||
198 | return None | |
199 | ||
a4b75251 TL |
200 | def upgrade_ls(self, image: Optional[str], tags: bool) -> Dict: |
201 | if not image: | |
202 | image = self.mgr.container_image_base | |
203 | reg_name, bare_image = image.split('/', 1) | |
204 | reg = Registry(reg_name) | |
205 | versions = [] | |
206 | r: Dict[Any, Any] = { | |
207 | "image": image, | |
208 | "registry": reg_name, | |
209 | "bare_image": bare_image, | |
210 | } | |
211 | ls = reg.get_tags(bare_image) | |
212 | if not tags: | |
213 | for t in ls: | |
214 | if t[0] != 'v': | |
215 | continue | |
216 | v = t[1:].split('.') | |
217 | if len(v) != 3: | |
218 | continue | |
219 | if '-' in v[2]: | |
220 | continue | |
221 | versions.append('.'.join(v)) | |
222 | r["versions"] = sorted( | |
223 | versions, | |
224 | key=lambda k: list(map(int, k.split('.'))), | |
225 | reverse=True | |
226 | ) | |
227 | else: | |
228 | r["tags"] = sorted(ls) | |
229 | return r | |
230 | ||
adb31ebb | 231 | def upgrade_start(self, image: str, version: str) -> str: |
e306af50 TL |
232 | if self.mgr.mode != 'root': |
233 | raise OrchestratorError('upgrade is not supported in %s mode' % ( | |
234 | self.mgr.mode)) | |
235 | if version: | |
f67539c2 TL |
236 | version_error = self._check_target_version(version) |
237 | if version_error: | |
238 | raise OrchestratorError(version_error) | |
e306af50 TL |
239 | target_name = self.mgr.container_image_base + ':v' + version |
240 | elif image: | |
f67539c2 | 241 | target_name = normalize_image_digest(image, self.mgr.default_registry) |
e306af50 TL |
242 | else: |
243 | raise OrchestratorError('must specify either image or version') | |
244 | if self.upgrade_state: | |
f91f0fd5 | 245 | if self.upgrade_state._target_name != target_name: |
e306af50 TL |
246 | raise OrchestratorError( |
247 | 'Upgrade to %s (not %s) already in progress' % | |
f91f0fd5 | 248 | (self.upgrade_state._target_name, target_name)) |
f6b5b4d7 TL |
249 | if self.upgrade_state.paused: |
250 | self.upgrade_state.paused = False | |
e306af50 | 251 | self._save_upgrade_state() |
f91f0fd5 TL |
252 | return 'Resumed upgrade to %s' % self.target_image |
253 | return 'Upgrade to %s in progress' % self.target_image | |
b3b6e05e TL |
254 | |
255 | running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type( | |
256 | 'mgr') if daemon.status == DaemonDescriptionStatus.running]) | |
257 | ||
258 | if running_mgr_count < 2: | |
259 | raise OrchestratorError('Need at least 2 running mgr daemons for upgrade') | |
260 | ||
f67539c2 | 261 | self.mgr.log.info('Upgrade: Started with target %s' % target_name) |
f6b5b4d7 TL |
262 | self.upgrade_state = UpgradeState( |
263 | target_name=target_name, | |
264 | progress_id=str(uuid.uuid4()) | |
265 | ) | |
e306af50 TL |
266 | self._update_upgrade_progress(0.0) |
267 | self._save_upgrade_state() | |
268 | self._clear_upgrade_health_checks() | |
269 | self.mgr.event.set() | |
270 | return 'Initiating upgrade to %s' % (target_name) | |
271 | ||
272 | def upgrade_pause(self) -> str: | |
273 | if not self.upgrade_state: | |
274 | raise OrchestratorError('No upgrade in progress') | |
f6b5b4d7 | 275 | if self.upgrade_state.paused: |
f91f0fd5 | 276 | return 'Upgrade to %s already paused' % self.target_image |
f6b5b4d7 | 277 | self.upgrade_state.paused = True |
f67539c2 | 278 | self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image) |
e306af50 | 279 | self._save_upgrade_state() |
f91f0fd5 | 280 | return 'Paused upgrade to %s' % self.target_image |
e306af50 TL |
281 | |
282 | def upgrade_resume(self) -> str: | |
283 | if not self.upgrade_state: | |
284 | raise OrchestratorError('No upgrade in progress') | |
f6b5b4d7 | 285 | if not self.upgrade_state.paused: |
f91f0fd5 | 286 | return 'Upgrade to %s not paused' % self.target_image |
f6b5b4d7 | 287 | self.upgrade_state.paused = False |
f67539c2 | 288 | self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image) |
e306af50 TL |
289 | self._save_upgrade_state() |
290 | self.mgr.event.set() | |
f91f0fd5 | 291 | return 'Resumed upgrade to %s' % self.target_image |
e306af50 TL |
292 | |
293 | def upgrade_stop(self) -> str: | |
294 | if not self.upgrade_state: | |
295 | return 'No upgrade in progress' | |
f6b5b4d7 | 296 | if self.upgrade_state.progress_id: |
e306af50 | 297 | self.mgr.remote('progress', 'complete', |
f6b5b4d7 | 298 | self.upgrade_state.progress_id) |
f91f0fd5 | 299 | target_image = self.target_image |
f67539c2 | 300 | self.mgr.log.info('Upgrade: Stopped') |
e306af50 TL |
301 | self.upgrade_state = None |
302 | self._save_upgrade_state() | |
303 | self._clear_upgrade_health_checks() | |
304 | self.mgr.event.set() | |
f91f0fd5 | 305 | return 'Stopped upgrade to %s' % target_image |
e306af50 TL |
306 | |
307 | def continue_upgrade(self) -> bool: | |
308 | """ | |
309 | Returns false, if nothing was done. | |
310 | :return: | |
311 | """ | |
f6b5b4d7 | 312 | if self.upgrade_state and not self.upgrade_state.paused: |
f67539c2 TL |
313 | try: |
314 | self._do_upgrade() | |
315 | except Exception as e: | |
316 | self._fail_upgrade('UPGRADE_EXCEPTION', { | |
317 | 'severity': 'error', | |
318 | 'summary': 'Upgrade: failed due to an unexpected exception', | |
319 | 'count': 1, | |
320 | 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'], | |
321 | }) | |
322 | return False | |
e306af50 TL |
323 | return True |
324 | return False | |
325 | ||
f67539c2 TL |
326 | def _wait_for_ok_to_stop( |
327 | self, s: DaemonDescription, | |
328 | known: Optional[List[str]] = None, # NOTE: output argument! | |
329 | ) -> bool: | |
e306af50 | 330 | # only wait a little bit; the service might go away for something |
f67539c2 TL |
331 | assert s.daemon_type is not None |
332 | assert s.daemon_id is not None | |
e306af50 TL |
333 | tries = 4 |
334 | while tries > 0: | |
f6b5b4d7 | 335 | if not self.upgrade_state or self.upgrade_state.paused: |
e306af50 | 336 | return False |
f6b5b4d7 | 337 | |
f67539c2 TL |
338 | # setting force flag to retain old functionality. |
339 | # note that known is an output argument for ok_to_stop() | |
340 | r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([ | |
341 | s.daemon_id], known=known, force=True) | |
f6b5b4d7 TL |
342 | |
343 | if not r.retval: | |
344 | logger.info(f'Upgrade: {r.stdout}') | |
e306af50 | 345 | return True |
f67539c2 | 346 | logger.info(f'Upgrade: {r.stderr}') |
f6b5b4d7 TL |
347 | |
348 | time.sleep(15) | |
349 | tries -= 1 | |
e306af50 TL |
350 | return False |
351 | ||
352 | def _clear_upgrade_health_checks(self) -> None: | |
adb31ebb | 353 | for k in self.UPGRADE_ERRORS: |
e306af50 TL |
354 | if k in self.mgr.health_checks: |
355 | del self.mgr.health_checks[k] | |
356 | self.mgr.set_health_checks(self.mgr.health_checks) | |
357 | ||
adb31ebb TL |
358 | def _fail_upgrade(self, alert_id: str, alert: dict) -> None: |
359 | assert alert_id in self.UPGRADE_ERRORS | |
f6b5b4d7 | 360 | if not self.upgrade_state: |
f67539c2 TL |
361 | # this could happen if the user canceled the upgrade while we |
362 | # were doing something | |
363 | return | |
f6b5b4d7 | 364 | |
f67539c2 TL |
365 | logger.error('Upgrade: Paused due to %s: %s' % (alert_id, |
366 | alert['summary'])) | |
f6b5b4d7 TL |
367 | self.upgrade_state.error = alert_id + ': ' + alert['summary'] |
368 | self.upgrade_state.paused = True | |
e306af50 TL |
369 | self._save_upgrade_state() |
370 | self.mgr.health_checks[alert_id] = alert | |
371 | self.mgr.set_health_checks(self.mgr.health_checks) | |
372 | ||
adb31ebb | 373 | def _update_upgrade_progress(self, progress: float) -> None: |
f6b5b4d7 TL |
374 | if not self.upgrade_state: |
375 | assert False, 'No upgrade in progress' | |
376 | ||
377 | if not self.upgrade_state.progress_id: | |
378 | self.upgrade_state.progress_id = str(uuid.uuid4()) | |
e306af50 | 379 | self._save_upgrade_state() |
f6b5b4d7 | 380 | self.mgr.remote('progress', 'update', self.upgrade_state.progress_id, |
f67539c2 TL |
381 | ev_msg='Upgrade to %s' % ( |
382 | self.upgrade_state.target_version or self.target_image | |
383 | ), | |
384 | ev_progress=progress, | |
385 | add_to_ceph_s=True) | |
e306af50 TL |
386 | |
387 | def _save_upgrade_state(self) -> None: | |
f6b5b4d7 TL |
388 | if not self.upgrade_state: |
389 | self.mgr.set_store('upgrade_state', None) | |
390 | return | |
391 | self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json())) | |
e306af50 | 392 | |
f91f0fd5 TL |
393 | def get_distinct_container_image_settings(self) -> Dict[str, str]: |
394 | # get all distinct container_image settings | |
395 | image_settings = {} | |
396 | ret, out, err = self.mgr.check_mon_command({ | |
397 | 'prefix': 'config dump', | |
398 | 'format': 'json', | |
399 | }) | |
400 | config = json.loads(out) | |
401 | for opt in config: | |
402 | if opt['name'] == 'container_image': | |
403 | image_settings[opt['section']] = opt['value'] | |
404 | return image_settings | |
405 | ||
f67539c2 TL |
406 | def _prepare_for_mds_upgrade( |
407 | self, | |
408 | target_major: str, | |
409 | need_upgrade: List[DaemonDescription] | |
410 | ) -> bool: | |
f67539c2 TL |
411 | # scale down all filesystems to 1 MDS |
412 | assert self.upgrade_state | |
413 | if not self.upgrade_state.fs_original_max_mds: | |
414 | self.upgrade_state.fs_original_max_mds = {} | |
a4b75251 TL |
415 | if not self.upgrade_state.fs_original_allow_standby_replay: |
416 | self.upgrade_state.fs_original_allow_standby_replay = {} | |
f67539c2 TL |
417 | fsmap = self.mgr.get("fs_map") |
418 | continue_upgrade = True | |
a4b75251 TL |
419 | for fs in fsmap.get('filesystems', []): |
420 | fscid = fs["id"] | |
421 | mdsmap = fs["mdsmap"] | |
422 | fs_name = mdsmap["fs_name"] | |
423 | ||
424 | # disable allow_standby_replay? | |
425 | if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY: | |
426 | self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % ( | |
427 | fs_name | |
428 | )) | |
429 | if fscid not in self.upgrade_state.fs_original_allow_standby_replay: | |
430 | self.upgrade_state.fs_original_allow_standby_replay[fscid] = True | |
431 | self._save_upgrade_state() | |
432 | ret, out, err = self.mgr.check_mon_command({ | |
433 | 'prefix': 'fs set', | |
434 | 'fs_name': fs_name, | |
435 | 'var': 'allow_standby_replay', | |
436 | 'val': '0', | |
437 | }) | |
438 | continue_upgrade = False | |
439 | continue | |
f67539c2 TL |
440 | |
441 | # scale down this filesystem? | |
a4b75251 | 442 | if mdsmap["max_mds"] > 1: |
f67539c2 TL |
443 | self.mgr.log.info('Upgrade: Scaling down filesystem %s' % ( |
444 | fs_name | |
445 | )) | |
a4b75251 TL |
446 | if fscid not in self.upgrade_state.fs_original_max_mds: |
447 | self.upgrade_state.fs_original_max_mds[fscid] = mdsmap['max_mds'] | |
f67539c2 TL |
448 | self._save_upgrade_state() |
449 | ret, out, err = self.mgr.check_mon_command({ | |
450 | 'prefix': 'fs set', | |
451 | 'fs_name': fs_name, | |
452 | 'var': 'max_mds', | |
453 | 'val': '1', | |
454 | }) | |
455 | continue_upgrade = False | |
456 | continue | |
457 | ||
a4b75251 | 458 | if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1): |
20effc67 TL |
459 | self.mgr.log.info( |
460 | 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name)) | |
f67539c2 TL |
461 | time.sleep(10) |
462 | continue_upgrade = False | |
463 | continue | |
464 | ||
a4b75251 | 465 | if len(mdsmap['up']) == 0: |
20effc67 TL |
466 | self.mgr.log.warning( |
467 | "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction") | |
a4b75251 TL |
468 | # This can happen because the current version MDS have |
469 | # incompatible compatsets; the mons will not do any promotions. | |
470 | # We must upgrade to continue. | |
471 | elif len(mdsmap['up']) > 0: | |
472 | mdss = list(mdsmap['info'].values()) | |
473 | assert len(mdss) == 1 | |
474 | lone_mds = mdss[0] | |
475 | if lone_mds['state'] != 'up:active': | |
476 | self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % ( | |
477 | lone_mds['name'], | |
478 | lone_mds['state'], | |
479 | )) | |
480 | time.sleep(10) | |
481 | continue_upgrade = False | |
482 | continue | |
483 | else: | |
484 | assert False | |
f67539c2 TL |
485 | |
486 | return continue_upgrade | |
487 | ||
b3b6e05e TL |
488 | def _enough_mons_for_ok_to_stop(self) -> bool: |
489 | # type () -> bool | |
490 | ret, out, err = self.mgr.check_mon_command({ | |
491 | 'prefix': 'quorum_status', | |
492 | }) | |
493 | try: | |
494 | j = json.loads(out) | |
495 | except Exception: | |
496 | raise OrchestratorError('failed to parse quorum status') | |
497 | ||
498 | mons = [m['name'] for m in j['monmap']['mons']] | |
499 | return len(mons) > 2 | |
500 | ||
501 | def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool: | |
502 | # type (DaemonDescription) -> bool | |
503 | ||
504 | # find fs this mds daemon belongs to | |
505 | fsmap = self.mgr.get("fs_map") | |
a4b75251 TL |
506 | for fs in fsmap.get('filesystems', []): |
507 | mdsmap = fs["mdsmap"] | |
508 | fs_name = mdsmap["fs_name"] | |
b3b6e05e TL |
509 | |
510 | assert mds_daemon.daemon_id | |
511 | if fs_name != mds_daemon.service_name().split('.', 1)[1]: | |
512 | # wrong fs for this mds daemon | |
513 | continue | |
514 | ||
515 | # get number of mds daemons for this fs | |
516 | mds_count = len( | |
517 | [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())]) | |
518 | ||
519 | # standby mds daemons for this fs? | |
a4b75251 | 520 | if mdsmap["max_mds"] < mds_count: |
b3b6e05e TL |
521 | return True |
522 | return False | |
523 | ||
524 | return True # if mds has no fs it should pass ok-to-stop | |
525 | ||
e306af50 TL |
526 | def _do_upgrade(self): |
527 | # type: () -> None | |
528 | if not self.upgrade_state: | |
529 | logger.debug('_do_upgrade no state, exiting') | |
530 | return | |
531 | ||
f91f0fd5 | 532 | target_image = self.target_image |
f6b5b4d7 | 533 | target_id = self.upgrade_state.target_id |
f67539c2 TL |
534 | target_digests = self.upgrade_state.target_digests |
535 | target_version = self.upgrade_state.target_version | |
536 | ||
537 | first = False | |
538 | if not target_id or not target_version or not target_digests: | |
e306af50 | 539 | # need to learn the container hash |
f91f0fd5 | 540 | logger.info('Upgrade: First pull of %s' % target_image) |
20effc67 | 541 | self.upgrade_info_str: str = 'Doing first pull of %s image' % (target_image) |
e306af50 | 542 | try: |
20effc67 TL |
543 | target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info( |
544 | target_image)) | |
e306af50 TL |
545 | except OrchestratorError as e: |
546 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
547 | 'severity': 'warning', | |
548 | 'summary': 'Upgrade: failed to pull target image', | |
549 | 'count': 1, | |
550 | 'detail': [str(e)], | |
551 | }) | |
552 | return | |
f67539c2 TL |
553 | if not target_version: |
554 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
555 | 'severity': 'warning', | |
556 | 'summary': 'Upgrade: failed to pull target image', | |
557 | 'count': 1, | |
558 | 'detail': ['unable to extract ceph version from container'], | |
559 | }) | |
560 | return | |
f6b5b4d7 | 561 | self.upgrade_state.target_id = target_id |
f67539c2 TL |
562 | # extract the version portion of 'ceph version {version} ({sha1})' |
563 | self.upgrade_state.target_version = target_version.split(' ')[2] | |
564 | self.upgrade_state.target_digests = target_digests | |
e306af50 | 565 | self._save_upgrade_state() |
f91f0fd5 | 566 | target_image = self.target_image |
f67539c2 TL |
567 | first = True |
568 | ||
569 | if target_digests is None: | |
570 | target_digests = [] | |
571 | if target_version.startswith('ceph version '): | |
572 | # tolerate/fix upgrade state from older version | |
573 | self.upgrade_state.target_version = target_version.split(' ')[2] | |
574 | target_version = self.upgrade_state.target_version | |
575 | (target_major, _) = target_version.split('.', 1) | |
576 | target_major_name = self.mgr.lookup_release_name(int(target_major)) | |
577 | ||
578 | if first: | |
579 | logger.info('Upgrade: Target is version %s (%s)' % ( | |
580 | target_version, target_major_name)) | |
581 | logger.info('Upgrade: Target container is %s, digests %s' % ( | |
582 | target_image, target_digests)) | |
583 | ||
584 | version_error = self._check_target_version(target_version) | |
585 | if version_error: | |
586 | self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', { | |
587 | 'severity': 'error', | |
588 | 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}', | |
589 | 'count': 1, | |
590 | 'detail': [version_error], | |
591 | }) | |
592 | return | |
e306af50 | 593 | |
f91f0fd5 | 594 | image_settings = self.get_distinct_container_image_settings() |
e306af50 | 595 | |
a4b75251 TL |
596 | # Older monitors (pre-v16.2.5) asserted that FSMap::compat == |
597 | # MDSMap::compat for all fs. This is no longer the case beginning in | |
598 | # v16.2.5. We must disable the sanity checks during upgrade. | |
599 | # N.B.: we don't bother confirming the operator has not already | |
600 | # disabled this or saving the config value. | |
601 | self.mgr.check_mon_command({ | |
602 | 'prefix': 'config set', | |
603 | 'name': 'mon_mds_skip_sanity', | |
604 | 'value': '1', | |
605 | 'who': 'mon', | |
606 | }) | |
607 | ||
f67539c2 | 608 | daemons = [d for d in self.mgr.cache.get_daemons() if d.daemon_type in CEPH_UPGRADE_ORDER] |
e306af50 TL |
609 | done = 0 |
610 | for daemon_type in CEPH_UPGRADE_ORDER: | |
f67539c2 TL |
611 | logger.debug('Upgrade: Checking %s daemons' % daemon_type) |
612 | ||
e306af50 | 613 | need_upgrade_self = False |
f67539c2 TL |
614 | need_upgrade: List[Tuple[DaemonDescription, bool]] = [] |
615 | need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = [] | |
e306af50 TL |
616 | for d in daemons: |
617 | if d.daemon_type != daemon_type: | |
618 | continue | |
f67539c2 TL |
619 | assert d.daemon_type is not None |
620 | assert d.daemon_id is not None | |
20effc67 TL |
621 | assert d.hostname is not None |
622 | if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname): | |
623 | continue | |
f67539c2 TL |
624 | correct_digest = False |
625 | if (any(d in target_digests for d in (d.container_image_digests or [])) | |
626 | or d.daemon_type in MONITORING_STACK_TYPES): | |
627 | logger.debug('daemon %s.%s container digest correct' % ( | |
e306af50 | 628 | daemon_type, d.daemon_id)) |
f67539c2 TL |
629 | correct_digest = True |
630 | if any(d in target_digests for d in (d.deployed_by or [])): | |
631 | logger.debug('daemon %s.%s deployed by correct version' % ( | |
632 | d.daemon_type, d.daemon_id)) | |
633 | done += 1 | |
634 | continue | |
e306af50 | 635 | |
f91f0fd5 | 636 | if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id): |
e306af50 | 637 | logger.info('Upgrade: Need to upgrade myself (mgr.%s)' % |
f6b5b4d7 | 638 | self.mgr.get_mgr_id()) |
e306af50 TL |
639 | need_upgrade_self = True |
640 | continue | |
641 | ||
f67539c2 TL |
642 | if correct_digest: |
643 | logger.debug('daemon %s.%s not deployed by correct version' % ( | |
644 | d.daemon_type, d.daemon_id)) | |
645 | need_upgrade_deployer.append((d, True)) | |
646 | else: | |
647 | logger.debug('daemon %s.%s not correct (%s, %s, %s)' % ( | |
648 | daemon_type, d.daemon_id, | |
649 | d.container_image_name, d.container_image_digests, d.version)) | |
650 | need_upgrade.append((d, False)) | |
651 | ||
652 | if not need_upgrade_self: | |
653 | # only after the mgr itself is upgraded can we expect daemons to have | |
654 | # deployed_by == target_digests | |
655 | need_upgrade += need_upgrade_deployer | |
656 | ||
657 | # prepare filesystems for daemon upgrades? | |
658 | if ( | |
659 | daemon_type == 'mds' | |
660 | and need_upgrade | |
661 | and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade]) | |
662 | ): | |
663 | return | |
664 | ||
665 | if need_upgrade: | |
666 | self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type) | |
667 | ||
668 | to_upgrade: List[Tuple[DaemonDescription, bool]] = [] | |
669 | known_ok_to_stop: List[str] = [] | |
670 | for d_entry in need_upgrade: | |
671 | d = d_entry[0] | |
672 | assert d.daemon_type is not None | |
673 | assert d.daemon_id is not None | |
674 | assert d.hostname is not None | |
675 | ||
676 | if not d.container_image_id: | |
677 | if d.container_image_name == target_image: | |
678 | logger.debug( | |
679 | 'daemon %s has unknown container_image_id but has correct image name' % (d.name())) | |
680 | continue | |
681 | ||
682 | if known_ok_to_stop: | |
683 | if d.name() in known_ok_to_stop: | |
684 | logger.info(f'Upgrade: {d.name()} is also safe to restart') | |
685 | to_upgrade.append(d_entry) | |
686 | continue | |
687 | ||
b3b6e05e | 688 | if d.daemon_type == 'osd': |
f67539c2 TL |
689 | # NOTE: known_ok_to_stop is an output argument for |
690 | # _wait_for_ok_to_stop | |
691 | if not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
692 | return | |
693 | ||
b3b6e05e TL |
694 | if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop(): |
695 | if not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
696 | return | |
697 | ||
698 | if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d): | |
699 | if not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
700 | return | |
701 | ||
f67539c2 TL |
702 | to_upgrade.append(d_entry) |
703 | ||
704 | # if we don't have a list of others to consider, stop now | |
20effc67 | 705 | if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop: |
f67539c2 TL |
706 | break |
707 | ||
708 | num = 1 | |
709 | for d_entry in to_upgrade: | |
710 | d = d_entry[0] | |
711 | assert d.daemon_type is not None | |
712 | assert d.daemon_id is not None | |
713 | assert d.hostname is not None | |
714 | ||
715 | self._update_upgrade_progress(done / len(daemons)) | |
716 | ||
e306af50 | 717 | # make sure host has latest container image |
20effc67 | 718 | out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( |
f6b5b4d7 | 719 | d.hostname, '', 'inspect-image', [], |
20effc67 | 720 | image=target_image, no_fsid=True, error_ok=True)) |
f67539c2 | 721 | if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])): |
f91f0fd5 | 722 | logger.info('Upgrade: Pulling %s on %s' % (target_image, |
f6b5b4d7 | 723 | d.hostname)) |
f67539c2 TL |
724 | self.upgrade_info_str = 'Pulling %s image on host %s' % ( |
725 | target_image, d.hostname) | |
20effc67 | 726 | out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( |
f6b5b4d7 | 727 | d.hostname, '', 'pull', [], |
20effc67 | 728 | image=target_image, no_fsid=True, error_ok=True)) |
e306af50 TL |
729 | if code: |
730 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
731 | 'severity': 'warning', | |
732 | 'summary': 'Upgrade: failed to pull target image', | |
733 | 'count': 1, | |
734 | 'detail': [ | |
f91f0fd5 | 735 | 'failed to pull %s on host %s' % (target_image, |
e306af50 TL |
736 | d.hostname)], |
737 | }) | |
738 | return | |
739 | r = json.loads(''.join(out)) | |
f67539c2 TL |
740 | if not any(d in target_digests for d in r.get('repo_digests', [])): |
741 | logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % ( | |
742 | target_image, d.hostname, r['repo_digests'], target_digests)) | |
743 | self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % ( | |
744 | target_image, d.hostname, r['repo_digests'], target_digests) | |
745 | self.upgrade_state.target_digests = r['repo_digests'] | |
e306af50 TL |
746 | self._save_upgrade_state() |
747 | return | |
748 | ||
f67539c2 | 749 | self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type) |
e306af50 | 750 | |
f67539c2 TL |
751 | if len(to_upgrade) > 1: |
752 | logger.info('Upgrade: Updating %s.%s (%d/%d)' % | |
753 | (d.daemon_type, d.daemon_id, num, len(to_upgrade))) | |
754 | else: | |
755 | logger.info('Upgrade: Updating %s.%s' % | |
756 | (d.daemon_type, d.daemon_id)) | |
757 | action = 'Upgrading' if not d_entry[1] else 'Redeploying' | |
adb31ebb | 758 | try: |
f67539c2 | 759 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d) |
adb31ebb | 760 | self.mgr._daemon_action( |
f67539c2 | 761 | daemon_spec, |
adb31ebb | 762 | 'redeploy', |
f67539c2 | 763 | image=target_image if not d_entry[1] else None |
adb31ebb | 764 | ) |
20effc67 | 765 | self.mgr.cache.metadata_up_to_date[d.hostname] = False |
adb31ebb TL |
766 | except Exception as e: |
767 | self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', { | |
768 | 'severity': 'warning', | |
f67539c2 | 769 | 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.', |
adb31ebb TL |
770 | 'count': 1, |
771 | 'detail': [ | |
772 | f'Upgrade daemon: {d.name()}: {e}' | |
773 | ], | |
774 | }) | |
f67539c2 TL |
775 | return |
776 | num += 1 | |
777 | if to_upgrade: | |
e306af50 TL |
778 | return |
779 | ||
f67539c2 TL |
780 | # complete mon upgrade? |
781 | if daemon_type == 'mon': | |
782 | if not self.mgr.get("have_local_config_map"): | |
783 | logger.info('Upgrade: Restarting mgr now that mons are running pacific') | |
784 | need_upgrade_self = True | |
785 | ||
e306af50 | 786 | if need_upgrade_self: |
f91f0fd5 TL |
787 | try: |
788 | self.mgr.mgr_service.fail_over() | |
789 | except OrchestratorError as e: | |
e306af50 TL |
790 | self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { |
791 | 'severity': 'warning', | |
f91f0fd5 | 792 | 'summary': f'Upgrade: {e}', |
e306af50 TL |
793 | 'count': 1, |
794 | 'detail': [ | |
795 | 'The upgrade process needs to upgrade the mgr, ' | |
796 | 'but it needs at least one standby to proceed.', | |
797 | ], | |
798 | }) | |
799 | return | |
800 | ||
f91f0fd5 | 801 | return # unreachable code, as fail_over never returns |
e306af50 TL |
802 | elif daemon_type == 'mgr': |
803 | if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks: | |
804 | del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR'] | |
805 | self.mgr.set_health_checks(self.mgr.health_checks) | |
806 | ||
807 | # make sure 'ceph versions' agrees | |
f6b5b4d7 | 808 | ret, out_ver, err = self.mgr.check_mon_command({ |
e306af50 TL |
809 | 'prefix': 'versions', |
810 | }) | |
f6b5b4d7 | 811 | j = json.loads(out_ver) |
e306af50 | 812 | for version, count in j.get(daemon_type, {}).items(): |
f67539c2 TL |
813 | short_version = version.split(' ')[2] |
814 | if short_version != target_version: | |
e306af50 TL |
815 | logger.warning( |
816 | 'Upgrade: %d %s daemon(s) are %s != target %s' % | |
f67539c2 | 817 | (count, daemon_type, short_version, target_version)) |
e306af50 TL |
818 | |
819 | # push down configs | |
f67539c2 TL |
820 | daemon_type_section = name_to_config_section(daemon_type) |
821 | if image_settings.get(daemon_type_section) != target_image: | |
822 | logger.info('Upgrade: Setting container_image for all %s' % | |
f6b5b4d7 | 823 | daemon_type) |
f67539c2 | 824 | self.mgr.set_container_image(daemon_type_section, target_image) |
e306af50 TL |
825 | to_clean = [] |
826 | for section in image_settings.keys(): | |
827 | if section.startswith(name_to_config_section(daemon_type) + '.'): | |
828 | to_clean.append(section) | |
829 | if to_clean: | |
f67539c2 | 830 | logger.debug('Upgrade: Cleaning up container_image for %s' % |
f6b5b4d7 | 831 | to_clean) |
e306af50 TL |
832 | for section in to_clean: |
833 | ret, image, err = self.mgr.check_mon_command({ | |
834 | 'prefix': 'config rm', | |
835 | 'name': 'container_image', | |
836 | 'who': section, | |
837 | }) | |
838 | ||
f67539c2 TL |
839 | # complete osd upgrade? |
840 | if daemon_type == 'osd': | |
841 | osdmap = self.mgr.get("osd_map") | |
842 | osd_min_name = osdmap.get("require_osd_release", "argonaut") | |
843 | osd_min = ceph_release_to_major(osd_min_name) | |
844 | if osd_min < int(target_major): | |
845 | logger.info( | |
846 | f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}') | |
847 | ret, _, err = self.mgr.check_mon_command({ | |
848 | 'prefix': 'osd require-osd-release', | |
849 | 'release': target_major_name, | |
850 | }) | |
851 | ||
852 | # complete mds upgrade? | |
a4b75251 TL |
853 | if daemon_type == 'mds': |
854 | if self.upgrade_state.fs_original_max_mds: | |
855 | for fs in self.mgr.get("fs_map")['filesystems']: | |
856 | fscid = fs["id"] | |
857 | fs_name = fs['mdsmap']['fs_name'] | |
858 | new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1) | |
859 | if new_max > 1: | |
860 | self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % ( | |
861 | fs_name, new_max | |
862 | )) | |
863 | ret, _, err = self.mgr.check_mon_command({ | |
864 | 'prefix': 'fs set', | |
865 | 'fs_name': fs_name, | |
866 | 'var': 'max_mds', | |
867 | 'val': str(new_max), | |
868 | }) | |
869 | ||
870 | self.upgrade_state.fs_original_max_mds = {} | |
871 | self._save_upgrade_state() | |
872 | if self.upgrade_state.fs_original_allow_standby_replay: | |
873 | for fs in self.mgr.get("fs_map")['filesystems']: | |
874 | fscid = fs["id"] | |
875 | fs_name = fs['mdsmap']['fs_name'] | |
876 | asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False) | |
877 | if asr: | |
878 | self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % ( | |
879 | fs_name | |
880 | )) | |
881 | ret, _, err = self.mgr.check_mon_command({ | |
882 | 'prefix': 'fs set', | |
883 | 'fs_name': fs_name, | |
884 | 'var': 'allow_standby_replay', | |
885 | 'val': '1' | |
886 | }) | |
887 | ||
888 | self.upgrade_state.fs_original_allow_standby_replay = {} | |
889 | self._save_upgrade_state() | |
e306af50 | 890 | |
20effc67 TL |
891 | # Make sure all metadata is up to date before saying we are done upgrading this daemon type |
892 | if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date(): | |
893 | self.mgr.agent_helpers._request_ack_all_not_up_to_date() | |
894 | return | |
895 | ||
896 | logger.debug('Upgrade: All %s daemons are up to date.' % daemon_type) | |
897 | ||
e306af50 TL |
898 | # clean up |
899 | logger.info('Upgrade: Finalizing container_image settings') | |
f91f0fd5 TL |
900 | self.mgr.set_container_image('global', target_image) |
901 | ||
e306af50 TL |
902 | for daemon_type in CEPH_UPGRADE_ORDER: |
903 | ret, image, err = self.mgr.check_mon_command({ | |
904 | 'prefix': 'config rm', | |
905 | 'name': 'container_image', | |
906 | 'who': name_to_config_section(daemon_type), | |
907 | }) | |
908 | ||
a4b75251 TL |
909 | self.mgr.check_mon_command({ |
910 | 'prefix': 'config rm', | |
911 | 'name': 'mon_mds_skip_sanity', | |
912 | 'who': 'mon', | |
913 | }) | |
914 | ||
e306af50 | 915 | logger.info('Upgrade: Complete!') |
f6b5b4d7 | 916 | if self.upgrade_state.progress_id: |
e306af50 | 917 | self.mgr.remote('progress', 'complete', |
f6b5b4d7 | 918 | self.upgrade_state.progress_id) |
e306af50 TL |
919 | self.upgrade_state = None |
920 | self._save_upgrade_state() | |
921 | return |