]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import json |
2 | import logging | |
3 | import time | |
4 | import uuid | |
1e59de90 | 5 | from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast |
e306af50 TL |
6 | |
7 | import orchestrator | |
a4b75251 | 8 | from cephadm.registry import Registry |
f67539c2 TL |
9 | from cephadm.serve import CephadmServe |
10 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec | |
33c7a0ef | 11 | from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \ |
aee94f69 | 12 | CEPH_TYPES, NON_CEPH_IMAGE_TYPES, GATEWAY_TYPES |
39ae355f | 13 | from cephadm.ssh import HostConnectionError |
b3b6e05e | 14 | from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service |
e306af50 TL |
15 | |
16 | if TYPE_CHECKING: | |
17 | from .module import CephadmOrchestrator | |
18 | ||
19 | ||
e306af50 TL |
20 | logger = logging.getLogger(__name__) |
21 | ||
a4b75251 TL |
22 | # from ceph_fs.h |
23 | CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5) | |
1e59de90 | 24 | CEPH_MDSMAP_NOT_JOINABLE = (1 << 0) |
a4b75251 | 25 | |
f6b5b4d7 | 26 | |
f67539c2 | 27 | def normalize_image_digest(digest: str, default_registry: str) -> str: |
20effc67 TL |
28 | """ |
29 | Normal case: | |
30 | >>> normalize_image_digest('ceph/ceph', 'docker.io') | |
31 | 'docker.io/ceph/ceph' | |
32 | ||
33 | No change: | |
34 | >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io') | |
35 | 'quay.ceph.io/ceph/ceph' | |
36 | ||
37 | >>> normalize_image_digest('docker.io/ubuntu', 'docker.io') | |
38 | 'docker.io/ubuntu' | |
39 | ||
40 | >>> normalize_image_digest('localhost/ceph', 'docker.io') | |
41 | 'localhost/ceph' | |
42 | """ | |
43 | known_shortnames = [ | |
44 | 'ceph/ceph', | |
45 | 'ceph/daemon', | |
46 | 'ceph/daemon-base', | |
47 | ] | |
48 | for image in known_shortnames: | |
49 | if digest.startswith(image): | |
50 | return f'{default_registry}/{digest}' | |
f67539c2 TL |
51 | return digest |
52 | ||
53 | ||
f6b5b4d7 TL |
54 | class UpgradeState: |
55 | def __init__(self, | |
56 | target_name: str, | |
57 | progress_id: str, | |
58 | target_id: Optional[str] = None, | |
f67539c2 | 59 | target_digests: Optional[List[str]] = None, |
f6b5b4d7 TL |
60 | target_version: Optional[str] = None, |
61 | error: Optional[str] = None, | |
62 | paused: Optional[bool] = None, | |
1e59de90 | 63 | fail_fs: bool = False, |
f67539c2 | 64 | fs_original_max_mds: Optional[Dict[str, int]] = None, |
33c7a0ef TL |
65 | fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None, |
66 | daemon_types: Optional[List[str]] = None, | |
67 | hosts: Optional[List[str]] = None, | |
68 | services: Optional[List[str]] = None, | |
69 | total_count: Optional[int] = None, | |
70 | remaining_count: Optional[int] = None, | |
f6b5b4d7 | 71 | ): |
f91f0fd5 | 72 | self._target_name: str = target_name # Use CephadmUpgrade.target_image instead. |
f6b5b4d7 TL |
73 | self.progress_id: str = progress_id |
74 | self.target_id: Optional[str] = target_id | |
f67539c2 | 75 | self.target_digests: Optional[List[str]] = target_digests |
f6b5b4d7 TL |
76 | self.target_version: Optional[str] = target_version |
77 | self.error: Optional[str] = error | |
78 | self.paused: bool = paused or False | |
f67539c2 | 79 | self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds |
20effc67 TL |
80 | self.fs_original_allow_standby_replay: Optional[Dict[str, |
81 | bool]] = fs_original_allow_standby_replay | |
1e59de90 | 82 | self.fail_fs = fail_fs |
33c7a0ef TL |
83 | self.daemon_types = daemon_types |
84 | self.hosts = hosts | |
85 | self.services = services | |
86 | self.total_count = total_count | |
87 | self.remaining_count = remaining_count | |
f6b5b4d7 TL |
88 | |
89 | def to_json(self) -> dict: | |
90 | return { | |
f91f0fd5 | 91 | 'target_name': self._target_name, |
f6b5b4d7 TL |
92 | 'progress_id': self.progress_id, |
93 | 'target_id': self.target_id, | |
f67539c2 | 94 | 'target_digests': self.target_digests, |
f6b5b4d7 | 95 | 'target_version': self.target_version, |
1e59de90 | 96 | 'fail_fs': self.fail_fs, |
f67539c2 | 97 | 'fs_original_max_mds': self.fs_original_max_mds, |
a4b75251 | 98 | 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay, |
f6b5b4d7 TL |
99 | 'error': self.error, |
100 | 'paused': self.paused, | |
33c7a0ef TL |
101 | 'daemon_types': self.daemon_types, |
102 | 'hosts': self.hosts, | |
103 | 'services': self.services, | |
104 | 'total_count': self.total_count, | |
105 | 'remaining_count': self.remaining_count, | |
f6b5b4d7 TL |
106 | } |
107 | ||
108 | @classmethod | |
adb31ebb | 109 | def from_json(cls, data: dict) -> Optional['UpgradeState']: |
33c7a0ef | 110 | valid_params = UpgradeState.__init__.__code__.co_varnames |
f91f0fd5 | 111 | if data: |
33c7a0ef | 112 | c = {k: v for k, v in data.items() if k in valid_params} |
f67539c2 TL |
113 | if 'repo_digest' in c: |
114 | c['target_digests'] = [c.pop('repo_digest')] | |
115 | return cls(**c) | |
f91f0fd5 TL |
116 | else: |
117 | return None | |
f6b5b4d7 TL |
118 | |
119 | ||
e306af50 | 120 | class CephadmUpgrade: |
adb31ebb TL |
121 | UPGRADE_ERRORS = [ |
122 | 'UPGRADE_NO_STANDBY_MGR', | |
123 | 'UPGRADE_FAILED_PULL', | |
124 | 'UPGRADE_REDEPLOY_DAEMON', | |
f67539c2 | 125 | 'UPGRADE_BAD_TARGET_VERSION', |
39ae355f TL |
126 | 'UPGRADE_EXCEPTION', |
127 | 'UPGRADE_OFFLINE_HOST' | |
adb31ebb TL |
128 | ] |
129 | ||
e306af50 TL |
130 | def __init__(self, mgr: "CephadmOrchestrator"): |
131 | self.mgr = mgr | |
132 | ||
133 | t = self.mgr.get_store('upgrade_state') | |
134 | if t: | |
f6b5b4d7 | 135 | self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t)) |
e306af50 TL |
136 | else: |
137 | self.upgrade_state = None | |
1e59de90 | 138 | self.upgrade_info_str: str = '' |
e306af50 | 139 | |
f91f0fd5 TL |
140 | @property |
141 | def target_image(self) -> str: | |
142 | assert self.upgrade_state | |
143 | if not self.mgr.use_repo_digest: | |
144 | return self.upgrade_state._target_name | |
f67539c2 | 145 | if not self.upgrade_state.target_digests: |
f91f0fd5 TL |
146 | return self.upgrade_state._target_name |
147 | ||
f67539c2 TL |
148 | # FIXME: we assume the first digest is the best one to use |
149 | return self.upgrade_state.target_digests[0] | |
f91f0fd5 | 150 | |
e306af50 TL |
151 | def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: |
152 | r = orchestrator.UpgradeStatusSpec() | |
153 | if self.upgrade_state: | |
f91f0fd5 | 154 | r.target_image = self.target_image |
e306af50 | 155 | r.in_progress = True |
f67539c2 | 156 | r.progress, r.services_complete = self._get_upgrade_info() |
2a845540 | 157 | r.is_paused = self.upgrade_state.paused |
33c7a0ef TL |
158 | |
159 | if self.upgrade_state.daemon_types is not None: | |
160 | which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}' | |
161 | if self.upgrade_state.hosts is not None: | |
162 | which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}' | |
163 | elif self.upgrade_state.services is not None: | |
164 | which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}' | |
165 | if self.upgrade_state.hosts is not None: | |
166 | which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}' | |
167 | elif self.upgrade_state.hosts is not None: | |
168 | which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}' | |
169 | else: | |
170 | which_str = 'Upgrading all daemon types on all hosts' | |
171 | if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None: | |
172 | which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).' | |
173 | r.which = which_str | |
174 | ||
f67539c2 TL |
175 | # accessing self.upgrade_info_str will throw an exception if it |
176 | # has not been set in _do_upgrade yet | |
177 | try: | |
178 | r.message = self.upgrade_info_str | |
179 | except AttributeError: | |
180 | pass | |
f6b5b4d7 TL |
181 | if self.upgrade_state.error: |
182 | r.message = 'Error: ' + self.upgrade_state.error | |
183 | elif self.upgrade_state.paused: | |
e306af50 TL |
184 | r.message = 'Upgrade paused' |
185 | return r | |
186 | ||
f67539c2 TL |
187 | def _get_upgrade_info(self) -> Tuple[str, List[str]]: |
188 | if not self.upgrade_state or not self.upgrade_state.target_digests: | |
189 | return '', [] | |
190 | ||
33c7a0ef | 191 | daemons = self._get_filtered_daemons() |
f67539c2 TL |
192 | |
193 | if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'): | |
194 | return '', [] | |
195 | ||
196 | completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in ( | |
197 | d.container_image_digests or []))) for d in daemons if d.daemon_type] | |
198 | ||
199 | done = len([True for completion in completed_daemons if completion[1]]) | |
200 | ||
201 | completed_types = list(set([completion[0] for completion in completed_daemons if all( | |
202 | c[1] for c in completed_daemons if c[0] == completion[0])])) | |
203 | ||
522d829b | 204 | return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types |
f67539c2 | 205 | |
33c7a0ef TL |
206 | def _get_filtered_daemons(self) -> List[DaemonDescription]: |
207 | # Return the set of daemons set to be upgraded with out current | |
208 | # filtering parameters (or all daemons in upgrade order if no filtering | |
209 | # parameter are set). | |
210 | assert self.upgrade_state is not None | |
211 | if self.upgrade_state.daemon_types is not None: | |
212 | daemons = [d for d in self.mgr.cache.get_daemons( | |
213 | ) if d.daemon_type in self.upgrade_state.daemon_types] | |
214 | elif self.upgrade_state.services is not None: | |
215 | daemons = [] | |
216 | for service in self.upgrade_state.services: | |
217 | daemons += self.mgr.cache.get_daemons_by_service(service) | |
218 | else: | |
219 | daemons = [d for d in self.mgr.cache.get_daemons( | |
220 | ) if d.daemon_type in CEPH_UPGRADE_ORDER] | |
221 | if self.upgrade_state.hosts is not None: | |
222 | daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts] | |
223 | return daemons | |
224 | ||
225 | def _get_current_version(self) -> Tuple[int, int, str]: | |
226 | current_version = self.mgr.version.split('ceph version ')[1] | |
227 | (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2) | |
228 | return (int(current_major), int(current_minor), current_version) | |
229 | ||
f67539c2 TL |
230 | def _check_target_version(self, version: str) -> Optional[str]: |
231 | try: | |
33c7a0ef TL |
232 | v = version.split('.', 2) |
233 | (major, minor) = (int(v[0]), int(v[1])) | |
234 | assert minor >= 0 | |
f67539c2 TL |
235 | # patch might be a number or {number}-g{sha1} |
236 | except ValueError: | |
237 | return 'version must be in the form X.Y.Z (e.g., 15.2.3)' | |
33c7a0ef | 238 | if major < 15 or (major == 15 and minor < 2): |
f67539c2 TL |
239 | return 'cephadm only supports octopus (15.2.0) or later' |
240 | ||
241 | # to far a jump? | |
33c7a0ef TL |
242 | (current_major, current_minor, current_version) = self._get_current_version() |
243 | if current_major < major - 2: | |
f67539c2 | 244 | return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump' |
33c7a0ef | 245 | if current_major > major: |
f67539c2 | 246 | return f'ceph cannot downgrade major versions (from {current_version} to {version})' |
33c7a0ef TL |
247 | if current_major == major: |
248 | if current_minor > minor: | |
249 | return f'ceph cannot downgrade to a {"rc" if minor == 1 else "dev"} release' | |
f67539c2 TL |
250 | |
251 | # check mon min | |
252 | monmap = self.mgr.get("mon_map") | |
253 | mon_min = monmap.get("min_mon_release", 0) | |
33c7a0ef | 254 | if mon_min < major - 2: |
f67539c2 TL |
255 | return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release' |
256 | ||
257 | # check osd min | |
258 | osdmap = self.mgr.get("osd_map") | |
259 | osd_min_name = osdmap.get("require_osd_release", "argonaut") | |
260 | osd_min = ceph_release_to_major(osd_min_name) | |
33c7a0ef | 261 | if osd_min < major - 2: |
f67539c2 TL |
262 | return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release' |
263 | ||
264 | return None | |
265 | ||
33c7a0ef | 266 | def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict: |
a4b75251 TL |
267 | if not image: |
268 | image = self.mgr.container_image_base | |
269 | reg_name, bare_image = image.split('/', 1) | |
aee94f69 TL |
270 | if ':' in bare_image: |
271 | # for our purposes, we don't want to use the tag here | |
272 | bare_image = bare_image.split(':')[0] | |
a4b75251 | 273 | reg = Registry(reg_name) |
33c7a0ef | 274 | (current_major, current_minor, _) = self._get_current_version() |
a4b75251 TL |
275 | versions = [] |
276 | r: Dict[Any, Any] = { | |
277 | "image": image, | |
278 | "registry": reg_name, | |
279 | "bare_image": bare_image, | |
280 | } | |
2a845540 TL |
281 | |
282 | try: | |
283 | ls = reg.get_tags(bare_image) | |
284 | except ValueError as e: | |
285 | raise OrchestratorError(f'{e}') | |
a4b75251 TL |
286 | if not tags: |
287 | for t in ls: | |
288 | if t[0] != 'v': | |
289 | continue | |
290 | v = t[1:].split('.') | |
291 | if len(v) != 3: | |
292 | continue | |
293 | if '-' in v[2]: | |
294 | continue | |
33c7a0ef TL |
295 | v_major = int(v[0]) |
296 | v_minor = int(v[1]) | |
297 | candidate_version = (v_major > current_major | |
298 | or (v_major == current_major and v_minor >= current_minor)) | |
299 | if show_all_versions or candidate_version: | |
300 | versions.append('.'.join(v)) | |
a4b75251 TL |
301 | r["versions"] = sorted( |
302 | versions, | |
303 | key=lambda k: list(map(int, k.split('.'))), | |
304 | reverse=True | |
305 | ) | |
306 | else: | |
307 | r["tags"] = sorted(ls) | |
308 | return r | |
309 | ||
33c7a0ef TL |
310 | def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, |
311 | hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: | |
1e59de90 TL |
312 | fail_fs_value = cast(bool, self.mgr.get_module_option_ex( |
313 | 'orchestrator', 'fail_fs', False)) | |
e306af50 TL |
314 | if self.mgr.mode != 'root': |
315 | raise OrchestratorError('upgrade is not supported in %s mode' % ( | |
316 | self.mgr.mode)) | |
317 | if version: | |
f67539c2 TL |
318 | version_error = self._check_target_version(version) |
319 | if version_error: | |
320 | raise OrchestratorError(version_error) | |
e306af50 TL |
321 | target_name = self.mgr.container_image_base + ':v' + version |
322 | elif image: | |
f67539c2 | 323 | target_name = normalize_image_digest(image, self.mgr.default_registry) |
e306af50 TL |
324 | else: |
325 | raise OrchestratorError('must specify either image or version') | |
33c7a0ef TL |
326 | |
327 | if daemon_types is not None or services is not None or hosts is not None: | |
328 | self._validate_upgrade_filters(target_name, daemon_types, hosts, services) | |
329 | ||
e306af50 | 330 | if self.upgrade_state: |
f91f0fd5 | 331 | if self.upgrade_state._target_name != target_name: |
e306af50 TL |
332 | raise OrchestratorError( |
333 | 'Upgrade to %s (not %s) already in progress' % | |
f91f0fd5 | 334 | (self.upgrade_state._target_name, target_name)) |
f6b5b4d7 TL |
335 | if self.upgrade_state.paused: |
336 | self.upgrade_state.paused = False | |
e306af50 | 337 | self._save_upgrade_state() |
f91f0fd5 TL |
338 | return 'Resumed upgrade to %s' % self.target_image |
339 | return 'Upgrade to %s in progress' % self.target_image | |
b3b6e05e TL |
340 | |
341 | running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type( | |
342 | 'mgr') if daemon.status == DaemonDescriptionStatus.running]) | |
343 | ||
344 | if running_mgr_count < 2: | |
345 | raise OrchestratorError('Need at least 2 running mgr daemons for upgrade') | |
346 | ||
f67539c2 | 347 | self.mgr.log.info('Upgrade: Started with target %s' % target_name) |
f6b5b4d7 TL |
348 | self.upgrade_state = UpgradeState( |
349 | target_name=target_name, | |
33c7a0ef | 350 | progress_id=str(uuid.uuid4()), |
1e59de90 | 351 | fail_fs=fail_fs_value, |
33c7a0ef TL |
352 | daemon_types=daemon_types, |
353 | hosts=hosts, | |
354 | services=services, | |
355 | total_count=limit, | |
356 | remaining_count=limit, | |
f6b5b4d7 | 357 | ) |
e306af50 TL |
358 | self._update_upgrade_progress(0.0) |
359 | self._save_upgrade_state() | |
360 | self._clear_upgrade_health_checks() | |
361 | self.mgr.event.set() | |
362 | return 'Initiating upgrade to %s' % (target_name) | |
363 | ||
33c7a0ef TL |
364 | def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None: |
365 | def _latest_type(dtypes: List[str]) -> str: | |
366 | # [::-1] gives the list in reverse | |
367 | for daemon_type in CEPH_UPGRADE_ORDER[::-1]: | |
368 | if daemon_type in dtypes: | |
369 | return daemon_type | |
370 | return '' | |
371 | ||
372 | def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]: | |
373 | # this function takes a list of daemon types and first finds the daemon | |
374 | # type from that list that is latest in our upgrade order. Then, from | |
375 | # that latest type, it filters the list of candidate daemons received | |
376 | # for daemons with types earlier in the upgrade order than the latest | |
377 | # type found earlier. That filtered list of daemons is returned. The | |
378 | # purpose of this function is to help in finding daemons that must have | |
379 | # already been upgraded for the given filtering parameters (--daemon-types, | |
380 | # --services, --hosts) to be valid. | |
381 | latest = _latest_type(dtypes) | |
382 | if not latest: | |
383 | return [] | |
384 | earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1] | |
385 | earlier_types = [t for t in earlier_types if t not in dtypes] | |
386 | return [d for d in candidates if d.daemon_type in earlier_types] | |
387 | ||
388 | if self.upgrade_state: | |
389 | raise OrchestratorError( | |
390 | 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.') | |
391 | try: | |
1e59de90 TL |
392 | with self.mgr.async_timeout_handler('cephadm inspect-image'): |
393 | target_id, target_version, target_digests = self.mgr.wait_async( | |
394 | CephadmServe(self.mgr)._get_container_image_info(target_name)) | |
33c7a0ef TL |
395 | except OrchestratorError as e: |
396 | raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}') | |
397 | # what we need to do here is build a list of daemons that must already be upgraded | |
398 | # in order for the user's selection of daemons to upgrade to be valid. for example, | |
399 | # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block. | |
400 | daemons = [d for d in self.mgr.cache.get_daemons( | |
aee94f69 | 401 | ) if d.daemon_type not in NON_CEPH_IMAGE_TYPES] |
33c7a0ef TL |
402 | err_msg_base = 'Cannot start upgrade. ' |
403 | # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters | |
404 | dtypes = [] | |
405 | if daemon_types is not None: | |
406 | dtypes = daemon_types | |
407 | if hosts is not None: | |
408 | dtypes = [_latest_type(dtypes)] | |
409 | other_host_daemons = [ | |
410 | d for d in daemons if d.hostname is not None and d.hostname not in hosts] | |
411 | daemons = _get_earlier_daemons(dtypes, other_host_daemons) | |
412 | else: | |
413 | daemons = _get_earlier_daemons(dtypes, daemons) | |
414 | err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n' | |
415 | elif services is not None: | |
416 | # for our purposes here we can effectively convert our list of services into the | |
417 | # set of daemon types the services contain. This works because we don't allow --services | |
418 | # and --daemon-types at the same time and we only allow services of the same type | |
419 | sspecs = [ | |
420 | self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None] | |
421 | stypes = list(set([s.service_type for s in sspecs])) | |
422 | if len(stypes) != 1: | |
423 | raise OrchestratorError('Doing upgrade by service only support services of one type at ' | |
424 | f'a time. Found service types: {stypes}') | |
425 | for stype in stypes: | |
426 | dtypes += orchestrator.service_to_daemon_types(stype) | |
427 | dtypes = list(set(dtypes)) | |
428 | if hosts is not None: | |
429 | other_host_daemons = [ | |
430 | d for d in daemons if d.hostname is not None and d.hostname not in hosts] | |
431 | daemons = _get_earlier_daemons(dtypes, other_host_daemons) | |
432 | else: | |
433 | daemons = _get_earlier_daemons(dtypes, daemons) | |
434 | err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n' | |
435 | elif hosts is not None: | |
436 | # hosts must be handled a bit differently. For this, we really need to find all the daemon types | |
437 | # that reside on hosts in the list of hosts we will upgrade. Then take the type from | |
438 | # that list that is latest in the upgrade order and check if any daemons on hosts not in the | |
439 | # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded. | |
440 | dtypes = list( | |
441 | set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts])) | |
442 | other_hosts_daemons = [ | |
443 | d for d in daemons if d.hostname is not None and d.hostname not in hosts] | |
444 | daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons) | |
445 | err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n' | |
1e59de90 | 446 | need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests, target_name) |
33c7a0ef TL |
447 | if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)): |
448 | # also report active mgr as needing to be upgraded. It is not included in the resulting list | |
449 | # by default as it is treated special and handled via the need_upgrade_self bool | |
450 | n1.insert(0, (self.mgr.mgr_service.get_active_daemon( | |
451 | self.mgr.cache.get_daemons_by_type('mgr')), True)) | |
452 | if n1 or n2: | |
453 | raise OrchestratorError(f'{err_msg_base}Please first upgrade ' | |
454 | f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n' | |
455 | f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}') | |
456 | ||
e306af50 TL |
457 | def upgrade_pause(self) -> str: |
458 | if not self.upgrade_state: | |
459 | raise OrchestratorError('No upgrade in progress') | |
f6b5b4d7 | 460 | if self.upgrade_state.paused: |
f91f0fd5 | 461 | return 'Upgrade to %s already paused' % self.target_image |
f6b5b4d7 | 462 | self.upgrade_state.paused = True |
f67539c2 | 463 | self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image) |
e306af50 | 464 | self._save_upgrade_state() |
f91f0fd5 | 465 | return 'Paused upgrade to %s' % self.target_image |
e306af50 TL |
466 | |
467 | def upgrade_resume(self) -> str: | |
468 | if not self.upgrade_state: | |
469 | raise OrchestratorError('No upgrade in progress') | |
f6b5b4d7 | 470 | if not self.upgrade_state.paused: |
f91f0fd5 | 471 | return 'Upgrade to %s not paused' % self.target_image |
f6b5b4d7 | 472 | self.upgrade_state.paused = False |
2a845540 | 473 | self.upgrade_state.error = '' |
f67539c2 | 474 | self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image) |
e306af50 TL |
475 | self._save_upgrade_state() |
476 | self.mgr.event.set() | |
39ae355f TL |
477 | for alert_id in self.UPGRADE_ERRORS: |
478 | self.mgr.remove_health_warning(alert_id) | |
f91f0fd5 | 479 | return 'Resumed upgrade to %s' % self.target_image |
e306af50 TL |
480 | |
481 | def upgrade_stop(self) -> str: | |
482 | if not self.upgrade_state: | |
483 | return 'No upgrade in progress' | |
f6b5b4d7 | 484 | if self.upgrade_state.progress_id: |
e306af50 | 485 | self.mgr.remote('progress', 'complete', |
f6b5b4d7 | 486 | self.upgrade_state.progress_id) |
f91f0fd5 | 487 | target_image = self.target_image |
f67539c2 | 488 | self.mgr.log.info('Upgrade: Stopped') |
e306af50 TL |
489 | self.upgrade_state = None |
490 | self._save_upgrade_state() | |
491 | self._clear_upgrade_health_checks() | |
492 | self.mgr.event.set() | |
f91f0fd5 | 493 | return 'Stopped upgrade to %s' % target_image |
e306af50 TL |
494 | |
495 | def continue_upgrade(self) -> bool: | |
496 | """ | |
497 | Returns false, if nothing was done. | |
498 | :return: | |
499 | """ | |
f6b5b4d7 | 500 | if self.upgrade_state and not self.upgrade_state.paused: |
f67539c2 TL |
501 | try: |
502 | self._do_upgrade() | |
39ae355f TL |
503 | except HostConnectionError as e: |
504 | self._fail_upgrade('UPGRADE_OFFLINE_HOST', { | |
505 | 'severity': 'error', | |
506 | 'summary': f'Upgrade: Failed to connect to host {e.hostname} at addr ({e.addr})', | |
507 | 'count': 1, | |
508 | 'detail': [f'SSH connection failed to {e.hostname} at addr ({e.addr}): {str(e)}'], | |
509 | }) | |
510 | return False | |
f67539c2 TL |
511 | except Exception as e: |
512 | self._fail_upgrade('UPGRADE_EXCEPTION', { | |
513 | 'severity': 'error', | |
514 | 'summary': 'Upgrade: failed due to an unexpected exception', | |
515 | 'count': 1, | |
516 | 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'], | |
517 | }) | |
518 | return False | |
e306af50 TL |
519 | return True |
520 | return False | |
521 | ||
f67539c2 TL |
522 | def _wait_for_ok_to_stop( |
523 | self, s: DaemonDescription, | |
524 | known: Optional[List[str]] = None, # NOTE: output argument! | |
525 | ) -> bool: | |
e306af50 | 526 | # only wait a little bit; the service might go away for something |
f67539c2 TL |
527 | assert s.daemon_type is not None |
528 | assert s.daemon_id is not None | |
e306af50 TL |
529 | tries = 4 |
530 | while tries > 0: | |
f6b5b4d7 | 531 | if not self.upgrade_state or self.upgrade_state.paused: |
e306af50 | 532 | return False |
f6b5b4d7 | 533 | |
f67539c2 TL |
534 | # setting force flag to retain old functionality. |
535 | # note that known is an output argument for ok_to_stop() | |
536 | r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([ | |
537 | s.daemon_id], known=known, force=True) | |
f6b5b4d7 TL |
538 | |
539 | if not r.retval: | |
540 | logger.info(f'Upgrade: {r.stdout}') | |
e306af50 | 541 | return True |
f67539c2 | 542 | logger.info(f'Upgrade: {r.stderr}') |
f6b5b4d7 TL |
543 | |
544 | time.sleep(15) | |
545 | tries -= 1 | |
e306af50 TL |
546 | return False |
547 | ||
548 | def _clear_upgrade_health_checks(self) -> None: | |
adb31ebb | 549 | for k in self.UPGRADE_ERRORS: |
e306af50 TL |
550 | if k in self.mgr.health_checks: |
551 | del self.mgr.health_checks[k] | |
552 | self.mgr.set_health_checks(self.mgr.health_checks) | |
553 | ||
adb31ebb TL |
554 | def _fail_upgrade(self, alert_id: str, alert: dict) -> None: |
555 | assert alert_id in self.UPGRADE_ERRORS | |
f6b5b4d7 | 556 | if not self.upgrade_state: |
f67539c2 TL |
557 | # this could happen if the user canceled the upgrade while we |
558 | # were doing something | |
559 | return | |
f6b5b4d7 | 560 | |
f67539c2 TL |
561 | logger.error('Upgrade: Paused due to %s: %s' % (alert_id, |
562 | alert['summary'])) | |
f6b5b4d7 TL |
563 | self.upgrade_state.error = alert_id + ': ' + alert['summary'] |
564 | self.upgrade_state.paused = True | |
e306af50 TL |
565 | self._save_upgrade_state() |
566 | self.mgr.health_checks[alert_id] = alert | |
567 | self.mgr.set_health_checks(self.mgr.health_checks) | |
568 | ||
adb31ebb | 569 | def _update_upgrade_progress(self, progress: float) -> None: |
f6b5b4d7 TL |
570 | if not self.upgrade_state: |
571 | assert False, 'No upgrade in progress' | |
572 | ||
573 | if not self.upgrade_state.progress_id: | |
574 | self.upgrade_state.progress_id = str(uuid.uuid4()) | |
e306af50 | 575 | self._save_upgrade_state() |
f6b5b4d7 | 576 | self.mgr.remote('progress', 'update', self.upgrade_state.progress_id, |
f67539c2 TL |
577 | ev_msg='Upgrade to %s' % ( |
578 | self.upgrade_state.target_version or self.target_image | |
579 | ), | |
580 | ev_progress=progress, | |
581 | add_to_ceph_s=True) | |
e306af50 TL |
582 | |
583 | def _save_upgrade_state(self) -> None: | |
f6b5b4d7 TL |
584 | if not self.upgrade_state: |
585 | self.mgr.set_store('upgrade_state', None) | |
586 | return | |
587 | self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json())) | |
e306af50 | 588 | |
f91f0fd5 TL |
589 | def get_distinct_container_image_settings(self) -> Dict[str, str]: |
590 | # get all distinct container_image settings | |
591 | image_settings = {} | |
592 | ret, out, err = self.mgr.check_mon_command({ | |
593 | 'prefix': 'config dump', | |
594 | 'format': 'json', | |
595 | }) | |
596 | config = json.loads(out) | |
597 | for opt in config: | |
598 | if opt['name'] == 'container_image': | |
599 | image_settings[opt['section']] = opt['value'] | |
600 | return image_settings | |
601 | ||
f67539c2 TL |
602 | def _prepare_for_mds_upgrade( |
603 | self, | |
604 | target_major: str, | |
605 | need_upgrade: List[DaemonDescription] | |
606 | ) -> bool: | |
f67539c2 TL |
607 | # scale down all filesystems to 1 MDS |
608 | assert self.upgrade_state | |
609 | if not self.upgrade_state.fs_original_max_mds: | |
610 | self.upgrade_state.fs_original_max_mds = {} | |
a4b75251 TL |
611 | if not self.upgrade_state.fs_original_allow_standby_replay: |
612 | self.upgrade_state.fs_original_allow_standby_replay = {} | |
f67539c2 TL |
613 | fsmap = self.mgr.get("fs_map") |
614 | continue_upgrade = True | |
a4b75251 TL |
615 | for fs in fsmap.get('filesystems', []): |
616 | fscid = fs["id"] | |
617 | mdsmap = fs["mdsmap"] | |
618 | fs_name = mdsmap["fs_name"] | |
619 | ||
620 | # disable allow_standby_replay? | |
621 | if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY: | |
622 | self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % ( | |
623 | fs_name | |
624 | )) | |
625 | if fscid not in self.upgrade_state.fs_original_allow_standby_replay: | |
626 | self.upgrade_state.fs_original_allow_standby_replay[fscid] = True | |
627 | self._save_upgrade_state() | |
628 | ret, out, err = self.mgr.check_mon_command({ | |
629 | 'prefix': 'fs set', | |
630 | 'fs_name': fs_name, | |
631 | 'var': 'allow_standby_replay', | |
632 | 'val': '0', | |
633 | }) | |
634 | continue_upgrade = False | |
635 | continue | |
f67539c2 TL |
636 | |
637 | # scale down this filesystem? | |
a4b75251 | 638 | if mdsmap["max_mds"] > 1: |
1e59de90 TL |
639 | if self.upgrade_state.fail_fs: |
640 | if not (mdsmap['flags'] & CEPH_MDSMAP_NOT_JOINABLE) and \ | |
641 | len(mdsmap['up']) > 0: | |
642 | self.mgr.log.info(f'Upgrade: failing fs {fs_name} for ' | |
643 | f'rapid multi-rank mds upgrade') | |
644 | ret, out, err = self.mgr.check_mon_command({ | |
645 | 'prefix': 'fs fail', | |
646 | 'fs_name': fs_name | |
647 | }) | |
648 | if ret != 0: | |
649 | continue_upgrade = False | |
650 | continue | |
651 | else: | |
652 | self.mgr.log.info('Upgrade: Scaling down filesystem %s' % ( | |
653 | fs_name | |
654 | )) | |
655 | if fscid not in self.upgrade_state.fs_original_max_mds: | |
656 | self.upgrade_state.fs_original_max_mds[fscid] = \ | |
657 | mdsmap['max_mds'] | |
658 | self._save_upgrade_state() | |
659 | ret, out, err = self.mgr.check_mon_command({ | |
660 | 'prefix': 'fs set', | |
661 | 'fs_name': fs_name, | |
662 | 'var': 'max_mds', | |
663 | 'val': '1', | |
664 | }) | |
665 | continue_upgrade = False | |
666 | continue | |
f67539c2 | 667 | |
1e59de90 TL |
668 | if not self.upgrade_state.fail_fs: |
669 | if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1): | |
670 | self.mgr.log.info( | |
671 | 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % ( | |
672 | fs_name)) | |
673 | time.sleep(10) | |
674 | continue_upgrade = False | |
675 | continue | |
f67539c2 | 676 | |
a4b75251 | 677 | if len(mdsmap['up']) == 0: |
20effc67 TL |
678 | self.mgr.log.warning( |
679 | "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction") | |
a4b75251 TL |
680 | # This can happen because the current version MDS have |
681 | # incompatible compatsets; the mons will not do any promotions. | |
682 | # We must upgrade to continue. | |
683 | elif len(mdsmap['up']) > 0: | |
684 | mdss = list(mdsmap['info'].values()) | |
685 | assert len(mdss) == 1 | |
686 | lone_mds = mdss[0] | |
687 | if lone_mds['state'] != 'up:active': | |
688 | self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % ( | |
689 | lone_mds['name'], | |
690 | lone_mds['state'], | |
691 | )) | |
692 | time.sleep(10) | |
693 | continue_upgrade = False | |
694 | continue | |
695 | else: | |
696 | assert False | |
f67539c2 TL |
697 | |
698 | return continue_upgrade | |
699 | ||
b3b6e05e TL |
700 | def _enough_mons_for_ok_to_stop(self) -> bool: |
701 | # type () -> bool | |
702 | ret, out, err = self.mgr.check_mon_command({ | |
703 | 'prefix': 'quorum_status', | |
704 | }) | |
705 | try: | |
706 | j = json.loads(out) | |
707 | except Exception: | |
708 | raise OrchestratorError('failed to parse quorum status') | |
709 | ||
710 | mons = [m['name'] for m in j['monmap']['mons']] | |
711 | return len(mons) > 2 | |
712 | ||
713 | def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool: | |
714 | # type (DaemonDescription) -> bool | |
715 | ||
716 | # find fs this mds daemon belongs to | |
717 | fsmap = self.mgr.get("fs_map") | |
a4b75251 TL |
718 | for fs in fsmap.get('filesystems', []): |
719 | mdsmap = fs["mdsmap"] | |
720 | fs_name = mdsmap["fs_name"] | |
b3b6e05e TL |
721 | |
722 | assert mds_daemon.daemon_id | |
723 | if fs_name != mds_daemon.service_name().split('.', 1)[1]: | |
724 | # wrong fs for this mds daemon | |
725 | continue | |
726 | ||
727 | # get number of mds daemons for this fs | |
728 | mds_count = len( | |
729 | [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())]) | |
730 | ||
731 | # standby mds daemons for this fs? | |
a4b75251 | 732 | if mdsmap["max_mds"] < mds_count: |
b3b6e05e TL |
733 | return True |
734 | return False | |
735 | ||
736 | return True # if mds has no fs it should pass ok-to-stop | |
737 | ||
1e59de90 | 738 | def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None, target_name: Optional[str] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]: |
33c7a0ef TL |
739 | # this function takes a list of daemons and container digests. The purpose |
740 | # is to go through each daemon and check if the current container digests | |
741 | # for that daemon match the target digests. The purpose being that we determine | |
742 | # if a daemon is upgraded to a certain container image or not based on what | |
743 | # container digests it has. By checking the current digests against the | |
744 | # targets we can determine which daemons still need to be upgraded | |
745 | need_upgrade_self = False | |
746 | need_upgrade: List[Tuple[DaemonDescription, bool]] = [] | |
747 | need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = [] | |
748 | done = 0 | |
749 | if target_digests is None: | |
750 | target_digests = [] | |
1e59de90 TL |
751 | if target_name is None: |
752 | target_name = '' | |
33c7a0ef TL |
753 | for d in daemons: |
754 | assert d.daemon_type is not None | |
755 | assert d.daemon_id is not None | |
756 | assert d.hostname is not None | |
757 | if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname): | |
758 | continue | |
1e59de90 TL |
759 | correct_image = False |
760 | # check if the container digest for the digest we're upgrading to matches | |
761 | # the container digest for the daemon if "use_repo_digest" setting is true | |
762 | # or that the image name matches the daemon's image name if "use_repo_digest" | |
763 | # is false. The idea is to generally check if the daemon is already using | |
764 | # the image we're upgrading to or not. Additionally, since monitoring stack | |
765 | # daemons are included in the upgrade process but don't use the ceph images | |
766 | # we are assuming any monitoring stack daemon is on the "correct" image already | |
767 | if ( | |
768 | (self.mgr.use_repo_digest and d.matches_digests(target_digests)) | |
769 | or (not self.mgr.use_repo_digest and d.matches_image_name(target_name)) | |
aee94f69 | 770 | or (d.daemon_type in NON_CEPH_IMAGE_TYPES) |
1e59de90 TL |
771 | ): |
772 | logger.debug('daemon %s.%s on correct image' % ( | |
33c7a0ef | 773 | d.daemon_type, d.daemon_id)) |
1e59de90 TL |
774 | correct_image = True |
775 | # do deployed_by check using digest no matter what. We don't care | |
776 | # what repo the image used to deploy the daemon was as long | |
777 | # as the image content is correct | |
33c7a0ef TL |
778 | if any(d in target_digests for d in (d.deployed_by or [])): |
779 | logger.debug('daemon %s.%s deployed by correct version' % ( | |
780 | d.daemon_type, d.daemon_id)) | |
781 | done += 1 | |
782 | continue | |
783 | ||
784 | if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id): | |
785 | logger.info('Upgrade: Need to upgrade myself (mgr.%s)' % | |
786 | self.mgr.get_mgr_id()) | |
787 | need_upgrade_self = True | |
788 | continue | |
789 | ||
1e59de90 | 790 | if correct_image: |
33c7a0ef TL |
791 | logger.debug('daemon %s.%s not deployed by correct version' % ( |
792 | d.daemon_type, d.daemon_id)) | |
793 | need_upgrade_deployer.append((d, True)) | |
794 | else: | |
795 | logger.debug('daemon %s.%s not correct (%s, %s, %s)' % ( | |
796 | d.daemon_type, d.daemon_id, | |
797 | d.container_image_name, d.container_image_digests, d.version)) | |
798 | need_upgrade.append((d, False)) | |
799 | ||
800 | return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done) | |
801 | ||
802 | def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]: | |
803 | to_upgrade: List[Tuple[DaemonDescription, bool]] = [] | |
804 | known_ok_to_stop: List[str] = [] | |
805 | for d_entry in need_upgrade: | |
806 | d = d_entry[0] | |
807 | assert d.daemon_type is not None | |
808 | assert d.daemon_id is not None | |
809 | assert d.hostname is not None | |
810 | ||
811 | if not d.container_image_id: | |
812 | if d.container_image_name == target_image: | |
813 | logger.debug( | |
814 | 'daemon %s has unknown container_image_id but has correct image name' % (d.name())) | |
815 | continue | |
816 | ||
817 | if known_ok_to_stop: | |
818 | if d.name() in known_ok_to_stop: | |
819 | logger.info(f'Upgrade: {d.name()} is also safe to restart') | |
820 | to_upgrade.append(d_entry) | |
821 | continue | |
822 | ||
823 | if d.daemon_type == 'osd': | |
824 | # NOTE: known_ok_to_stop is an output argument for | |
825 | # _wait_for_ok_to_stop | |
826 | if not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
827 | return False, to_upgrade | |
828 | ||
829 | if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop(): | |
830 | if not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
831 | return False, to_upgrade | |
832 | ||
833 | if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d): | |
1e59de90 TL |
834 | # when fail_fs is set to true, all MDS daemons will be moved to |
835 | # up:standby state, so Cephadm won't be able to upgrade due to | |
836 | # this check and and will warn with "It is NOT safe to stop | |
837 | # mds.<daemon_name> at this time: one or more filesystems is | |
838 | # currently degraded", therefore we bypass this check for that | |
839 | # case. | |
840 | assert self.upgrade_state is not None | |
841 | if not self.upgrade_state.fail_fs \ | |
842 | and not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
33c7a0ef TL |
843 | return False, to_upgrade |
844 | ||
845 | to_upgrade.append(d_entry) | |
846 | ||
847 | # if we don't have a list of others to consider, stop now | |
848 | if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop: | |
849 | break | |
850 | return True, to_upgrade | |
851 | ||
852 | def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None: | |
853 | assert self.upgrade_state is not None | |
854 | num = 1 | |
855 | if target_digests is None: | |
856 | target_digests = [] | |
857 | for d_entry in to_upgrade: | |
858 | if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]: | |
859 | self.mgr.log.info( | |
860 | f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade') | |
861 | return | |
862 | d = d_entry[0] | |
863 | assert d.daemon_type is not None | |
864 | assert d.daemon_id is not None | |
865 | assert d.hostname is not None | |
866 | ||
867 | # make sure host has latest container image | |
1e59de90 TL |
868 | with self.mgr.async_timeout_handler(d.hostname, 'cephadm inspect-image'): |
869 | out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( | |
870 | d.hostname, '', 'inspect-image', [], | |
871 | image=target_image, no_fsid=True, error_ok=True)) | |
33c7a0ef TL |
872 | if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])): |
873 | logger.info('Upgrade: Pulling %s on %s' % (target_image, | |
874 | d.hostname)) | |
875 | self.upgrade_info_str = 'Pulling %s image on host %s' % ( | |
876 | target_image, d.hostname) | |
1e59de90 TL |
877 | with self.mgr.async_timeout_handler(d.hostname, 'cephadm pull'): |
878 | out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( | |
879 | d.hostname, '', 'pull', [], | |
880 | image=target_image, no_fsid=True, error_ok=True)) | |
33c7a0ef TL |
881 | if code: |
882 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
883 | 'severity': 'warning', | |
884 | 'summary': 'Upgrade: failed to pull target image', | |
885 | 'count': 1, | |
886 | 'detail': [ | |
887 | 'failed to pull %s on host %s' % (target_image, | |
888 | d.hostname)], | |
889 | }) | |
890 | return | |
891 | r = json.loads(''.join(out)) | |
892 | if not any(d in target_digests for d in r.get('repo_digests', [])): | |
893 | logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % ( | |
894 | target_image, d.hostname, r['repo_digests'], target_digests)) | |
895 | self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % ( | |
896 | target_image, d.hostname, r['repo_digests'], target_digests) | |
897 | self.upgrade_state.target_digests = r['repo_digests'] | |
898 | self._save_upgrade_state() | |
899 | return | |
900 | ||
901 | self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type) | |
902 | ||
903 | if len(to_upgrade) > 1: | |
904 | logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade), | |
905 | self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999))) | |
906 | else: | |
907 | logger.info('Upgrade: Updating %s.%s' % | |
908 | (d.daemon_type, d.daemon_id)) | |
909 | action = 'Upgrading' if not d_entry[1] else 'Redeploying' | |
910 | try: | |
911 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d) | |
912 | self.mgr._daemon_action( | |
913 | daemon_spec, | |
914 | 'redeploy', | |
915 | image=target_image if not d_entry[1] else None | |
916 | ) | |
917 | self.mgr.cache.metadata_up_to_date[d.hostname] = False | |
918 | except Exception as e: | |
919 | self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', { | |
920 | 'severity': 'warning', | |
921 | 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.', | |
922 | 'count': 1, | |
923 | 'detail': [ | |
924 | f'Upgrade daemon: {d.name()}: {e}' | |
925 | ], | |
926 | }) | |
927 | return | |
928 | num += 1 | |
929 | if self.upgrade_state.remaining_count is not None and not d_entry[1]: | |
930 | self.upgrade_state.remaining_count -= 1 | |
931 | self._save_upgrade_state() | |
932 | ||
933 | def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None: | |
934 | if need_upgrade_self: | |
935 | try: | |
936 | self.mgr.mgr_service.fail_over() | |
937 | except OrchestratorError as e: | |
938 | self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { | |
939 | 'severity': 'warning', | |
940 | 'summary': f'Upgrade: {e}', | |
941 | 'count': 1, | |
942 | 'detail': [ | |
943 | 'The upgrade process needs to upgrade the mgr, ' | |
944 | 'but it needs at least one standby to proceed.', | |
945 | ], | |
946 | }) | |
947 | return | |
948 | ||
949 | return # unreachable code, as fail_over never returns | |
950 | elif upgrading_mgrs: | |
951 | if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks: | |
952 | del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR'] | |
953 | self.mgr.set_health_checks(self.mgr.health_checks) | |
954 | ||
955 | def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None: | |
956 | # push down configs | |
957 | daemon_type_section = name_to_config_section(daemon_type) | |
958 | if image_settings.get(daemon_type_section) != target_image: | |
959 | logger.info('Upgrade: Setting container_image for all %s' % | |
960 | daemon_type) | |
961 | self.mgr.set_container_image(daemon_type_section, target_image) | |
962 | to_clean = [] | |
963 | for section in image_settings.keys(): | |
964 | if section.startswith(name_to_config_section(daemon_type) + '.'): | |
965 | to_clean.append(section) | |
966 | if to_clean: | |
967 | logger.debug('Upgrade: Cleaning up container_image for %s' % | |
968 | to_clean) | |
969 | for section in to_clean: | |
970 | ret, image, err = self.mgr.check_mon_command({ | |
971 | 'prefix': 'config rm', | |
972 | 'name': 'container_image', | |
973 | 'who': section, | |
974 | }) | |
975 | ||
976 | def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None: | |
977 | osdmap = self.mgr.get("osd_map") | |
978 | osd_min_name = osdmap.get("require_osd_release", "argonaut") | |
979 | osd_min = ceph_release_to_major(osd_min_name) | |
980 | if osd_min < int(target_major): | |
981 | logger.info( | |
982 | f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}') | |
983 | ret, _, err = self.mgr.check_mon_command({ | |
984 | 'prefix': 'osd require-osd-release', | |
985 | 'release': target_major_name, | |
986 | }) | |
987 | ||
988 | def _complete_mds_upgrade(self) -> None: | |
989 | assert self.upgrade_state is not None | |
1e59de90 TL |
990 | if self.upgrade_state.fail_fs: |
991 | for fs in self.mgr.get("fs_map")['filesystems']: | |
992 | fs_name = fs['mdsmap']['fs_name'] | |
993 | self.mgr.log.info('Upgrade: Setting filesystem ' | |
994 | f'{fs_name} Joinable') | |
995 | try: | |
996 | ret, _, err = self.mgr.check_mon_command({ | |
997 | 'prefix': 'fs set', | |
998 | 'fs_name': fs_name, | |
999 | 'var': 'joinable', | |
1000 | 'val': 'true', | |
1001 | }) | |
1002 | except Exception as e: | |
1003 | logger.error("Failed to set fs joinable " | |
1004 | f"true due to {e}") | |
1005 | raise OrchestratorError("Failed to set" | |
1006 | "fs joinable true" | |
1007 | f"due to {e}") | |
1008 | elif self.upgrade_state.fs_original_max_mds: | |
33c7a0ef TL |
1009 | for fs in self.mgr.get("fs_map")['filesystems']: |
1010 | fscid = fs["id"] | |
1011 | fs_name = fs['mdsmap']['fs_name'] | |
1012 | new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1) | |
1013 | if new_max > 1: | |
1014 | self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % ( | |
1015 | fs_name, new_max | |
1016 | )) | |
1017 | ret, _, err = self.mgr.check_mon_command({ | |
1018 | 'prefix': 'fs set', | |
1019 | 'fs_name': fs_name, | |
1020 | 'var': 'max_mds', | |
1021 | 'val': str(new_max), | |
1022 | }) | |
1023 | ||
1024 | self.upgrade_state.fs_original_max_mds = {} | |
1025 | self._save_upgrade_state() | |
1026 | if self.upgrade_state.fs_original_allow_standby_replay: | |
1027 | for fs in self.mgr.get("fs_map")['filesystems']: | |
1028 | fscid = fs["id"] | |
1029 | fs_name = fs['mdsmap']['fs_name'] | |
1030 | asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False) | |
1031 | if asr: | |
1032 | self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % ( | |
1033 | fs_name | |
1034 | )) | |
1035 | ret, _, err = self.mgr.check_mon_command({ | |
1036 | 'prefix': 'fs set', | |
1037 | 'fs_name': fs_name, | |
1038 | 'var': 'allow_standby_replay', | |
1039 | 'val': '1' | |
1040 | }) | |
1041 | ||
1042 | self.upgrade_state.fs_original_allow_standby_replay = {} | |
1043 | self._save_upgrade_state() | |
1044 | ||
1045 | def _mark_upgrade_complete(self) -> None: | |
1046 | if not self.upgrade_state: | |
1047 | logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting') | |
1048 | return | |
1049 | logger.info('Upgrade: Complete!') | |
1050 | if self.upgrade_state.progress_id: | |
1051 | self.mgr.remote('progress', 'complete', | |
1052 | self.upgrade_state.progress_id) | |
1053 | self.upgrade_state = None | |
1054 | self._save_upgrade_state() | |
1055 | ||
e306af50 TL |
1056 | def _do_upgrade(self): |
1057 | # type: () -> None | |
1058 | if not self.upgrade_state: | |
1059 | logger.debug('_do_upgrade no state, exiting') | |
1060 | return | |
1061 | ||
39ae355f TL |
1062 | if self.mgr.offline_hosts: |
1063 | # offline host(s), on top of potential connection errors when trying to upgrade a daemon | |
1064 | # or pull an image, can cause issues where daemons are never ok to stop. Since evaluating | |
1065 | # whether or not that risk is present for any given offline hosts is a difficult problem, | |
1066 | # it's best to just fail upgrade cleanly so user can address the offline host(s) | |
1067 | ||
1068 | # the HostConnectionError expects a hostname and addr, so let's just take | |
1069 | # one at random. It doesn't really matter which host we say we couldn't reach here. | |
1070 | hostname: str = list(self.mgr.offline_hosts)[0] | |
1071 | addr: str = self.mgr.inventory.get_addr(hostname) | |
1072 | raise HostConnectionError(f'Host(s) were marked offline: {self.mgr.offline_hosts}', hostname, addr) | |
1073 | ||
f91f0fd5 | 1074 | target_image = self.target_image |
f6b5b4d7 | 1075 | target_id = self.upgrade_state.target_id |
f67539c2 TL |
1076 | target_digests = self.upgrade_state.target_digests |
1077 | target_version = self.upgrade_state.target_version | |
1078 | ||
1079 | first = False | |
1080 | if not target_id or not target_version or not target_digests: | |
e306af50 | 1081 | # need to learn the container hash |
f91f0fd5 | 1082 | logger.info('Upgrade: First pull of %s' % target_image) |
33c7a0ef | 1083 | self.upgrade_info_str = 'Doing first pull of %s image' % (target_image) |
e306af50 | 1084 | try: |
1e59de90 TL |
1085 | with self.mgr.async_timeout_handler(f'cephadm inspect-image (image {target_image})'): |
1086 | target_id, target_version, target_digests = self.mgr.wait_async( | |
1087 | CephadmServe(self.mgr)._get_container_image_info(target_image)) | |
e306af50 TL |
1088 | except OrchestratorError as e: |
1089 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
1090 | 'severity': 'warning', | |
1091 | 'summary': 'Upgrade: failed to pull target image', | |
1092 | 'count': 1, | |
1093 | 'detail': [str(e)], | |
1094 | }) | |
1095 | return | |
f67539c2 TL |
1096 | if not target_version: |
1097 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
1098 | 'severity': 'warning', | |
1099 | 'summary': 'Upgrade: failed to pull target image', | |
1100 | 'count': 1, | |
1101 | 'detail': ['unable to extract ceph version from container'], | |
1102 | }) | |
1103 | return | |
f6b5b4d7 | 1104 | self.upgrade_state.target_id = target_id |
f67539c2 TL |
1105 | # extract the version portion of 'ceph version {version} ({sha1})' |
1106 | self.upgrade_state.target_version = target_version.split(' ')[2] | |
1107 | self.upgrade_state.target_digests = target_digests | |
e306af50 | 1108 | self._save_upgrade_state() |
f91f0fd5 | 1109 | target_image = self.target_image |
f67539c2 TL |
1110 | first = True |
1111 | ||
1112 | if target_digests is None: | |
1113 | target_digests = [] | |
1114 | if target_version.startswith('ceph version '): | |
1115 | # tolerate/fix upgrade state from older version | |
1116 | self.upgrade_state.target_version = target_version.split(' ')[2] | |
1117 | target_version = self.upgrade_state.target_version | |
1118 | (target_major, _) = target_version.split('.', 1) | |
1119 | target_major_name = self.mgr.lookup_release_name(int(target_major)) | |
1120 | ||
1121 | if first: | |
1122 | logger.info('Upgrade: Target is version %s (%s)' % ( | |
1123 | target_version, target_major_name)) | |
1124 | logger.info('Upgrade: Target container is %s, digests %s' % ( | |
1125 | target_image, target_digests)) | |
1126 | ||
1127 | version_error = self._check_target_version(target_version) | |
1128 | if version_error: | |
1129 | self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', { | |
1130 | 'severity': 'error', | |
1131 | 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}', | |
1132 | 'count': 1, | |
1133 | 'detail': [version_error], | |
1134 | }) | |
1135 | return | |
e306af50 | 1136 | |
f91f0fd5 | 1137 | image_settings = self.get_distinct_container_image_settings() |
e306af50 | 1138 | |
a4b75251 TL |
1139 | # Older monitors (pre-v16.2.5) asserted that FSMap::compat == |
1140 | # MDSMap::compat for all fs. This is no longer the case beginning in | |
1141 | # v16.2.5. We must disable the sanity checks during upgrade. | |
1142 | # N.B.: we don't bother confirming the operator has not already | |
1143 | # disabled this or saving the config value. | |
1144 | self.mgr.check_mon_command({ | |
1145 | 'prefix': 'config set', | |
1146 | 'name': 'mon_mds_skip_sanity', | |
1147 | 'value': '1', | |
1148 | 'who': 'mon', | |
1149 | }) | |
1150 | ||
33c7a0ef TL |
1151 | if self.upgrade_state.daemon_types is not None: |
1152 | logger.debug( | |
1153 | f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}') | |
1154 | daemons = [d for d in self.mgr.cache.get_daemons( | |
1155 | ) if d.daemon_type in self.upgrade_state.daemon_types] | |
1156 | elif self.upgrade_state.services is not None: | |
1157 | logger.debug( | |
1158 | f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}') | |
1159 | daemons = [] | |
1160 | for service in self.upgrade_state.services: | |
1161 | daemons += self.mgr.cache.get_daemons_by_service(service) | |
1162 | else: | |
1163 | daemons = [d for d in self.mgr.cache.get_daemons( | |
1164 | ) if d.daemon_type in CEPH_UPGRADE_ORDER] | |
1165 | if self.upgrade_state.hosts is not None: | |
1166 | logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}') | |
1167 | daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts] | |
1168 | upgraded_daemon_count: int = 0 | |
e306af50 | 1169 | for daemon_type in CEPH_UPGRADE_ORDER: |
33c7a0ef TL |
1170 | if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0: |
1171 | # we hit our limit and should end the upgrade | |
1172 | # except for cases where we only need to redeploy, but not actually upgrade | |
1173 | # the image (which we don't count towards our limit). This case only occurs with mgr | |
1174 | # and monitoring stack daemons. Additionally, this case is only valid if | |
1175 | # the active mgr is already upgraded. | |
1176 | if any(d in target_digests for d in self.mgr.get_active_mgr_digests()): | |
aee94f69 | 1177 | if daemon_type not in NON_CEPH_IMAGE_TYPES and daemon_type != 'mgr': |
f67539c2 | 1178 | continue |
f67539c2 | 1179 | else: |
33c7a0ef TL |
1180 | self._mark_upgrade_complete() |
1181 | return | |
1182 | logger.debug('Upgrade: Checking %s daemons' % daemon_type) | |
1183 | daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type] | |
1184 | ||
1185 | need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade( | |
1e59de90 | 1186 | daemons_of_type, target_digests, target_image) |
33c7a0ef TL |
1187 | upgraded_daemon_count += done |
1188 | self._update_upgrade_progress(upgraded_daemon_count / len(daemons)) | |
1189 | ||
aee94f69 TL |
1190 | # make sure mgr and non-ceph-image daemons are properly redeployed in staggered upgrade scenarios |
1191 | if daemon_type == 'mgr' or daemon_type in NON_CEPH_IMAGE_TYPES: | |
33c7a0ef TL |
1192 | if any(d in target_digests for d in self.mgr.get_active_mgr_digests()): |
1193 | need_upgrade_names = [d[0].name() for d in need_upgrade] + \ | |
1194 | [d[0].name() for d in need_upgrade_deployer] | |
1195 | dds = [d for d in self.mgr.cache.get_daemons_by_type( | |
1196 | daemon_type) if d.name() not in need_upgrade_names] | |
1e59de90 | 1197 | need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests, target_image) |
33c7a0ef TL |
1198 | if not n1: |
1199 | if not need_upgrade_self and need_upgrade_active: | |
1200 | need_upgrade_self = True | |
1201 | need_upgrade_deployer += n2 | |
1202 | else: | |
1203 | # no point in trying to redeploy with new version if active mgr is not on the new version | |
1204 | need_upgrade_deployer = [] | |
f67539c2 | 1205 | |
39ae355f | 1206 | if any(d in target_digests for d in self.mgr.get_active_mgr_digests()): |
f67539c2 TL |
1207 | # only after the mgr itself is upgraded can we expect daemons to have |
1208 | # deployed_by == target_digests | |
1209 | need_upgrade += need_upgrade_deployer | |
1210 | ||
1211 | # prepare filesystems for daemon upgrades? | |
1212 | if ( | |
1213 | daemon_type == 'mds' | |
1214 | and need_upgrade | |
1215 | and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade]) | |
1216 | ): | |
1217 | return | |
1218 | ||
1219 | if need_upgrade: | |
1220 | self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type) | |
1221 | ||
33c7a0ef TL |
1222 | _continue, to_upgrade = self._to_upgrade(need_upgrade, target_image) |
1223 | if not _continue: | |
1224 | return | |
1225 | self._upgrade_daemons(to_upgrade, target_image, target_digests) | |
f67539c2 | 1226 | if to_upgrade: |
e306af50 TL |
1227 | return |
1228 | ||
33c7a0ef TL |
1229 | self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr') |
1230 | ||
1231 | # following bits of _do_upgrade are for completing upgrade for given | |
1232 | # types. If we haven't actually finished upgrading all the daemons | |
1233 | # of this type, we should exit the loop here | |
1234 | _, n1, n2, _ = self._detect_need_upgrade( | |
1e59de90 | 1235 | self.mgr.cache.get_daemons_by_type(daemon_type), target_digests, target_image) |
33c7a0ef TL |
1236 | if n1 or n2: |
1237 | continue | |
1238 | ||
f67539c2 TL |
1239 | # complete mon upgrade? |
1240 | if daemon_type == 'mon': | |
1241 | if not self.mgr.get("have_local_config_map"): | |
1242 | logger.info('Upgrade: Restarting mgr now that mons are running pacific') | |
1243 | need_upgrade_self = True | |
1244 | ||
33c7a0ef | 1245 | self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr') |
e306af50 TL |
1246 | |
1247 | # make sure 'ceph versions' agrees | |
f6b5b4d7 | 1248 | ret, out_ver, err = self.mgr.check_mon_command({ |
e306af50 TL |
1249 | 'prefix': 'versions', |
1250 | }) | |
f6b5b4d7 | 1251 | j = json.loads(out_ver) |
e306af50 | 1252 | for version, count in j.get(daemon_type, {}).items(): |
f67539c2 TL |
1253 | short_version = version.split(' ')[2] |
1254 | if short_version != target_version: | |
e306af50 TL |
1255 | logger.warning( |
1256 | 'Upgrade: %d %s daemon(s) are %s != target %s' % | |
f67539c2 | 1257 | (count, daemon_type, short_version, target_version)) |
e306af50 | 1258 | |
33c7a0ef | 1259 | self._set_container_images(daemon_type, target_image, image_settings) |
e306af50 | 1260 | |
f67539c2 TL |
1261 | # complete osd upgrade? |
1262 | if daemon_type == 'osd': | |
33c7a0ef | 1263 | self._complete_osd_upgrade(target_major, target_major_name) |
f67539c2 TL |
1264 | |
1265 | # complete mds upgrade? | |
a4b75251 | 1266 | if daemon_type == 'mds': |
33c7a0ef | 1267 | self._complete_mds_upgrade() |
e306af50 | 1268 | |
20effc67 TL |
1269 | # Make sure all metadata is up to date before saying we are done upgrading this daemon type |
1270 | if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date(): | |
1271 | self.mgr.agent_helpers._request_ack_all_not_up_to_date() | |
1272 | return | |
1273 | ||
33c7a0ef | 1274 | logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type) |
20effc67 | 1275 | |
e306af50 TL |
1276 | # clean up |
1277 | logger.info('Upgrade: Finalizing container_image settings') | |
f91f0fd5 TL |
1278 | self.mgr.set_container_image('global', target_image) |
1279 | ||
e306af50 TL |
1280 | for daemon_type in CEPH_UPGRADE_ORDER: |
1281 | ret, image, err = self.mgr.check_mon_command({ | |
1282 | 'prefix': 'config rm', | |
1283 | 'name': 'container_image', | |
1284 | 'who': name_to_config_section(daemon_type), | |
1285 | }) | |
1286 | ||
a4b75251 TL |
1287 | self.mgr.check_mon_command({ |
1288 | 'prefix': 'config rm', | |
1289 | 'name': 'mon_mds_skip_sanity', | |
1290 | 'who': 'mon', | |
1291 | }) | |
1292 | ||
33c7a0ef | 1293 | self._mark_upgrade_complete() |
e306af50 | 1294 | return |