]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import json |
2 | import logging | |
3 | import time | |
4 | import uuid | |
a4b75251 | 5 | from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any |
e306af50 TL |
6 | |
7 | import orchestrator | |
a4b75251 | 8 | from cephadm.registry import Registry |
f67539c2 TL |
9 | from cephadm.serve import CephadmServe |
10 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec | |
33c7a0ef TL |
11 | from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER, \ |
12 | MONITORING_STACK_TYPES, CEPH_TYPES, GATEWAY_TYPES | |
39ae355f | 13 | from cephadm.ssh import HostConnectionError |
b3b6e05e | 14 | from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus, daemon_type_to_service |
e306af50 TL |
15 | |
16 | if TYPE_CHECKING: | |
17 | from .module import CephadmOrchestrator | |
18 | ||
19 | ||
e306af50 TL |
20 | logger = logging.getLogger(__name__) |
21 | ||
a4b75251 TL |
22 | # from ceph_fs.h |
23 | CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5) | |
24 | ||
f6b5b4d7 | 25 | |
f67539c2 | 26 | def normalize_image_digest(digest: str, default_registry: str) -> str: |
20effc67 TL |
27 | """ |
28 | Normal case: | |
29 | >>> normalize_image_digest('ceph/ceph', 'docker.io') | |
30 | 'docker.io/ceph/ceph' | |
31 | ||
32 | No change: | |
33 | >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io') | |
34 | 'quay.ceph.io/ceph/ceph' | |
35 | ||
36 | >>> normalize_image_digest('docker.io/ubuntu', 'docker.io') | |
37 | 'docker.io/ubuntu' | |
38 | ||
39 | >>> normalize_image_digest('localhost/ceph', 'docker.io') | |
40 | 'localhost/ceph' | |
41 | """ | |
42 | known_shortnames = [ | |
43 | 'ceph/ceph', | |
44 | 'ceph/daemon', | |
45 | 'ceph/daemon-base', | |
46 | ] | |
47 | for image in known_shortnames: | |
48 | if digest.startswith(image): | |
49 | return f'{default_registry}/{digest}' | |
f67539c2 TL |
50 | return digest |
51 | ||
52 | ||
f6b5b4d7 TL |
53 | class UpgradeState: |
54 | def __init__(self, | |
55 | target_name: str, | |
56 | progress_id: str, | |
57 | target_id: Optional[str] = None, | |
f67539c2 | 58 | target_digests: Optional[List[str]] = None, |
f6b5b4d7 TL |
59 | target_version: Optional[str] = None, |
60 | error: Optional[str] = None, | |
61 | paused: Optional[bool] = None, | |
f67539c2 | 62 | fs_original_max_mds: Optional[Dict[str, int]] = None, |
33c7a0ef TL |
63 | fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None, |
64 | daemon_types: Optional[List[str]] = None, | |
65 | hosts: Optional[List[str]] = None, | |
66 | services: Optional[List[str]] = None, | |
67 | total_count: Optional[int] = None, | |
68 | remaining_count: Optional[int] = None, | |
f6b5b4d7 | 69 | ): |
f91f0fd5 | 70 | self._target_name: str = target_name # Use CephadmUpgrade.target_image instead. |
f6b5b4d7 TL |
71 | self.progress_id: str = progress_id |
72 | self.target_id: Optional[str] = target_id | |
f67539c2 | 73 | self.target_digests: Optional[List[str]] = target_digests |
f6b5b4d7 TL |
74 | self.target_version: Optional[str] = target_version |
75 | self.error: Optional[str] = error | |
76 | self.paused: bool = paused or False | |
f67539c2 | 77 | self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds |
20effc67 TL |
78 | self.fs_original_allow_standby_replay: Optional[Dict[str, |
79 | bool]] = fs_original_allow_standby_replay | |
33c7a0ef TL |
80 | self.daemon_types = daemon_types |
81 | self.hosts = hosts | |
82 | self.services = services | |
83 | self.total_count = total_count | |
84 | self.remaining_count = remaining_count | |
f6b5b4d7 TL |
85 | |
86 | def to_json(self) -> dict: | |
87 | return { | |
f91f0fd5 | 88 | 'target_name': self._target_name, |
f6b5b4d7 TL |
89 | 'progress_id': self.progress_id, |
90 | 'target_id': self.target_id, | |
f67539c2 | 91 | 'target_digests': self.target_digests, |
f6b5b4d7 | 92 | 'target_version': self.target_version, |
f67539c2 | 93 | 'fs_original_max_mds': self.fs_original_max_mds, |
a4b75251 | 94 | 'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay, |
f6b5b4d7 TL |
95 | 'error': self.error, |
96 | 'paused': self.paused, | |
33c7a0ef TL |
97 | 'daemon_types': self.daemon_types, |
98 | 'hosts': self.hosts, | |
99 | 'services': self.services, | |
100 | 'total_count': self.total_count, | |
101 | 'remaining_count': self.remaining_count, | |
f6b5b4d7 TL |
102 | } |
103 | ||
104 | @classmethod | |
adb31ebb | 105 | def from_json(cls, data: dict) -> Optional['UpgradeState']: |
33c7a0ef | 106 | valid_params = UpgradeState.__init__.__code__.co_varnames |
f91f0fd5 | 107 | if data: |
33c7a0ef | 108 | c = {k: v for k, v in data.items() if k in valid_params} |
f67539c2 TL |
109 | if 'repo_digest' in c: |
110 | c['target_digests'] = [c.pop('repo_digest')] | |
111 | return cls(**c) | |
f91f0fd5 TL |
112 | else: |
113 | return None | |
f6b5b4d7 TL |
114 | |
115 | ||
e306af50 | 116 | class CephadmUpgrade: |
adb31ebb TL |
117 | UPGRADE_ERRORS = [ |
118 | 'UPGRADE_NO_STANDBY_MGR', | |
119 | 'UPGRADE_FAILED_PULL', | |
120 | 'UPGRADE_REDEPLOY_DAEMON', | |
f67539c2 | 121 | 'UPGRADE_BAD_TARGET_VERSION', |
39ae355f TL |
122 | 'UPGRADE_EXCEPTION', |
123 | 'UPGRADE_OFFLINE_HOST' | |
adb31ebb TL |
124 | ] |
125 | ||
e306af50 TL |
126 | def __init__(self, mgr: "CephadmOrchestrator"): |
127 | self.mgr = mgr | |
128 | ||
129 | t = self.mgr.get_store('upgrade_state') | |
130 | if t: | |
f6b5b4d7 | 131 | self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t)) |
e306af50 TL |
132 | else: |
133 | self.upgrade_state = None | |
134 | ||
f91f0fd5 TL |
135 | @property |
136 | def target_image(self) -> str: | |
137 | assert self.upgrade_state | |
138 | if not self.mgr.use_repo_digest: | |
139 | return self.upgrade_state._target_name | |
f67539c2 | 140 | if not self.upgrade_state.target_digests: |
f91f0fd5 TL |
141 | return self.upgrade_state._target_name |
142 | ||
f67539c2 TL |
143 | # FIXME: we assume the first digest is the best one to use |
144 | return self.upgrade_state.target_digests[0] | |
f91f0fd5 | 145 | |
e306af50 TL |
146 | def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: |
147 | r = orchestrator.UpgradeStatusSpec() | |
148 | if self.upgrade_state: | |
f91f0fd5 | 149 | r.target_image = self.target_image |
e306af50 | 150 | r.in_progress = True |
f67539c2 | 151 | r.progress, r.services_complete = self._get_upgrade_info() |
2a845540 | 152 | r.is_paused = self.upgrade_state.paused |
33c7a0ef TL |
153 | |
154 | if self.upgrade_state.daemon_types is not None: | |
155 | which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}' | |
156 | if self.upgrade_state.hosts is not None: | |
157 | which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}' | |
158 | elif self.upgrade_state.services is not None: | |
159 | which_str = f'Upgrading daemons in service(s) {",".join(self.upgrade_state.services)}' | |
160 | if self.upgrade_state.hosts is not None: | |
161 | which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}' | |
162 | elif self.upgrade_state.hosts is not None: | |
163 | which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}' | |
164 | else: | |
165 | which_str = 'Upgrading all daemon types on all hosts' | |
166 | if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None: | |
167 | which_str += f'. Upgrade limited to {self.upgrade_state.total_count} daemons ({self.upgrade_state.remaining_count} remaining).' | |
168 | r.which = which_str | |
169 | ||
f67539c2 TL |
170 | # accessing self.upgrade_info_str will throw an exception if it |
171 | # has not been set in _do_upgrade yet | |
172 | try: | |
173 | r.message = self.upgrade_info_str | |
174 | except AttributeError: | |
175 | pass | |
f6b5b4d7 TL |
176 | if self.upgrade_state.error: |
177 | r.message = 'Error: ' + self.upgrade_state.error | |
178 | elif self.upgrade_state.paused: | |
e306af50 TL |
179 | r.message = 'Upgrade paused' |
180 | return r | |
181 | ||
f67539c2 TL |
182 | def _get_upgrade_info(self) -> Tuple[str, List[str]]: |
183 | if not self.upgrade_state or not self.upgrade_state.target_digests: | |
184 | return '', [] | |
185 | ||
33c7a0ef | 186 | daemons = self._get_filtered_daemons() |
f67539c2 TL |
187 | |
188 | if any(not d.container_image_digests for d in daemons if d.daemon_type == 'mgr'): | |
189 | return '', [] | |
190 | ||
191 | completed_daemons = [(d.daemon_type, any(d in self.upgrade_state.target_digests for d in ( | |
192 | d.container_image_digests or []))) for d in daemons if d.daemon_type] | |
193 | ||
194 | done = len([True for completion in completed_daemons if completion[1]]) | |
195 | ||
196 | completed_types = list(set([completion[0] for completion in completed_daemons if all( | |
197 | c[1] for c in completed_daemons if c[0] == completion[0])])) | |
198 | ||
522d829b | 199 | return '%s/%s daemons upgraded' % (done, len(daemons)), completed_types |
f67539c2 | 200 | |
33c7a0ef TL |
201 | def _get_filtered_daemons(self) -> List[DaemonDescription]: |
202 | # Return the set of daemons set to be upgraded with out current | |
203 | # filtering parameters (or all daemons in upgrade order if no filtering | |
204 | # parameter are set). | |
205 | assert self.upgrade_state is not None | |
206 | if self.upgrade_state.daemon_types is not None: | |
207 | daemons = [d for d in self.mgr.cache.get_daemons( | |
208 | ) if d.daemon_type in self.upgrade_state.daemon_types] | |
209 | elif self.upgrade_state.services is not None: | |
210 | daemons = [] | |
211 | for service in self.upgrade_state.services: | |
212 | daemons += self.mgr.cache.get_daemons_by_service(service) | |
213 | else: | |
214 | daemons = [d for d in self.mgr.cache.get_daemons( | |
215 | ) if d.daemon_type in CEPH_UPGRADE_ORDER] | |
216 | if self.upgrade_state.hosts is not None: | |
217 | daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts] | |
218 | return daemons | |
219 | ||
220 | def _get_current_version(self) -> Tuple[int, int, str]: | |
221 | current_version = self.mgr.version.split('ceph version ')[1] | |
222 | (current_major, current_minor, _) = current_version.split('-')[0].split('.', 2) | |
223 | return (int(current_major), int(current_minor), current_version) | |
224 | ||
f67539c2 TL |
225 | def _check_target_version(self, version: str) -> Optional[str]: |
226 | try: | |
33c7a0ef TL |
227 | v = version.split('.', 2) |
228 | (major, minor) = (int(v[0]), int(v[1])) | |
229 | assert minor >= 0 | |
f67539c2 TL |
230 | # patch might be a number or {number}-g{sha1} |
231 | except ValueError: | |
232 | return 'version must be in the form X.Y.Z (e.g., 15.2.3)' | |
33c7a0ef | 233 | if major < 15 or (major == 15 and minor < 2): |
f67539c2 TL |
234 | return 'cephadm only supports octopus (15.2.0) or later' |
235 | ||
236 | # to far a jump? | |
33c7a0ef TL |
237 | (current_major, current_minor, current_version) = self._get_current_version() |
238 | if current_major < major - 2: | |
f67539c2 | 239 | return f'ceph can only upgrade 1 or 2 major versions at a time; {current_version} -> {version} is too big a jump' |
33c7a0ef | 240 | if current_major > major: |
f67539c2 | 241 | return f'ceph cannot downgrade major versions (from {current_version} to {version})' |
33c7a0ef TL |
242 | if current_major == major: |
243 | if current_minor > minor: | |
244 | return f'ceph cannot downgrade to a {"rc" if minor == 1 else "dev"} release' | |
f67539c2 TL |
245 | |
246 | # check mon min | |
247 | monmap = self.mgr.get("mon_map") | |
248 | mon_min = monmap.get("min_mon_release", 0) | |
33c7a0ef | 249 | if mon_min < major - 2: |
f67539c2 TL |
250 | return f'min_mon_release ({mon_min}) < target {major} - 2; first complete an upgrade to an earlier release' |
251 | ||
252 | # check osd min | |
253 | osdmap = self.mgr.get("osd_map") | |
254 | osd_min_name = osdmap.get("require_osd_release", "argonaut") | |
255 | osd_min = ceph_release_to_major(osd_min_name) | |
33c7a0ef | 256 | if osd_min < major - 2: |
f67539c2 TL |
257 | return f'require_osd_release ({osd_min_name} or {osd_min}) < target {major} - 2; first complete an upgrade to an earlier release' |
258 | ||
259 | return None | |
260 | ||
33c7a0ef | 261 | def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict: |
a4b75251 TL |
262 | if not image: |
263 | image = self.mgr.container_image_base | |
264 | reg_name, bare_image = image.split('/', 1) | |
265 | reg = Registry(reg_name) | |
33c7a0ef | 266 | (current_major, current_minor, _) = self._get_current_version() |
a4b75251 TL |
267 | versions = [] |
268 | r: Dict[Any, Any] = { | |
269 | "image": image, | |
270 | "registry": reg_name, | |
271 | "bare_image": bare_image, | |
272 | } | |
2a845540 TL |
273 | |
274 | try: | |
275 | ls = reg.get_tags(bare_image) | |
276 | except ValueError as e: | |
277 | raise OrchestratorError(f'{e}') | |
a4b75251 TL |
278 | if not tags: |
279 | for t in ls: | |
280 | if t[0] != 'v': | |
281 | continue | |
282 | v = t[1:].split('.') | |
283 | if len(v) != 3: | |
284 | continue | |
285 | if '-' in v[2]: | |
286 | continue | |
33c7a0ef TL |
287 | v_major = int(v[0]) |
288 | v_minor = int(v[1]) | |
289 | candidate_version = (v_major > current_major | |
290 | or (v_major == current_major and v_minor >= current_minor)) | |
291 | if show_all_versions or candidate_version: | |
292 | versions.append('.'.join(v)) | |
a4b75251 TL |
293 | r["versions"] = sorted( |
294 | versions, | |
295 | key=lambda k: list(map(int, k.split('.'))), | |
296 | reverse=True | |
297 | ) | |
298 | else: | |
299 | r["tags"] = sorted(ls) | |
300 | return r | |
301 | ||
33c7a0ef TL |
302 | def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, |
303 | hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: | |
e306af50 TL |
304 | if self.mgr.mode != 'root': |
305 | raise OrchestratorError('upgrade is not supported in %s mode' % ( | |
306 | self.mgr.mode)) | |
307 | if version: | |
f67539c2 TL |
308 | version_error = self._check_target_version(version) |
309 | if version_error: | |
310 | raise OrchestratorError(version_error) | |
e306af50 TL |
311 | target_name = self.mgr.container_image_base + ':v' + version |
312 | elif image: | |
f67539c2 | 313 | target_name = normalize_image_digest(image, self.mgr.default_registry) |
e306af50 TL |
314 | else: |
315 | raise OrchestratorError('must specify either image or version') | |
33c7a0ef TL |
316 | |
317 | if daemon_types is not None or services is not None or hosts is not None: | |
318 | self._validate_upgrade_filters(target_name, daemon_types, hosts, services) | |
319 | ||
e306af50 | 320 | if self.upgrade_state: |
f91f0fd5 | 321 | if self.upgrade_state._target_name != target_name: |
e306af50 TL |
322 | raise OrchestratorError( |
323 | 'Upgrade to %s (not %s) already in progress' % | |
f91f0fd5 | 324 | (self.upgrade_state._target_name, target_name)) |
f6b5b4d7 TL |
325 | if self.upgrade_state.paused: |
326 | self.upgrade_state.paused = False | |
e306af50 | 327 | self._save_upgrade_state() |
f91f0fd5 TL |
328 | return 'Resumed upgrade to %s' % self.target_image |
329 | return 'Upgrade to %s in progress' % self.target_image | |
b3b6e05e TL |
330 | |
331 | running_mgr_count = len([daemon for daemon in self.mgr.cache.get_daemons_by_type( | |
332 | 'mgr') if daemon.status == DaemonDescriptionStatus.running]) | |
333 | ||
334 | if running_mgr_count < 2: | |
335 | raise OrchestratorError('Need at least 2 running mgr daemons for upgrade') | |
336 | ||
f67539c2 | 337 | self.mgr.log.info('Upgrade: Started with target %s' % target_name) |
f6b5b4d7 TL |
338 | self.upgrade_state = UpgradeState( |
339 | target_name=target_name, | |
33c7a0ef TL |
340 | progress_id=str(uuid.uuid4()), |
341 | daemon_types=daemon_types, | |
342 | hosts=hosts, | |
343 | services=services, | |
344 | total_count=limit, | |
345 | remaining_count=limit, | |
f6b5b4d7 | 346 | ) |
e306af50 TL |
347 | self._update_upgrade_progress(0.0) |
348 | self._save_upgrade_state() | |
349 | self._clear_upgrade_health_checks() | |
350 | self.mgr.event.set() | |
351 | return 'Initiating upgrade to %s' % (target_name) | |
352 | ||
33c7a0ef TL |
353 | def _validate_upgrade_filters(self, target_name: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None) -> None: |
354 | def _latest_type(dtypes: List[str]) -> str: | |
355 | # [::-1] gives the list in reverse | |
356 | for daemon_type in CEPH_UPGRADE_ORDER[::-1]: | |
357 | if daemon_type in dtypes: | |
358 | return daemon_type | |
359 | return '' | |
360 | ||
361 | def _get_earlier_daemons(dtypes: List[str], candidates: List[DaemonDescription]) -> List[DaemonDescription]: | |
362 | # this function takes a list of daemon types and first finds the daemon | |
363 | # type from that list that is latest in our upgrade order. Then, from | |
364 | # that latest type, it filters the list of candidate daemons received | |
365 | # for daemons with types earlier in the upgrade order than the latest | |
366 | # type found earlier. That filtered list of daemons is returned. The | |
367 | # purpose of this function is to help in finding daemons that must have | |
368 | # already been upgraded for the given filtering parameters (--daemon-types, | |
369 | # --services, --hosts) to be valid. | |
370 | latest = _latest_type(dtypes) | |
371 | if not latest: | |
372 | return [] | |
373 | earlier_types = '|'.join(CEPH_UPGRADE_ORDER).split(latest)[0].split('|')[:-1] | |
374 | earlier_types = [t for t in earlier_types if t not in dtypes] | |
375 | return [d for d in candidates if d.daemon_type in earlier_types] | |
376 | ||
377 | if self.upgrade_state: | |
378 | raise OrchestratorError( | |
379 | 'Cannot set values for --daemon-types, --services or --hosts when upgrade already in progress.') | |
380 | try: | |
381 | target_id, target_version, target_digests = self.mgr.wait_async( | |
382 | CephadmServe(self.mgr)._get_container_image_info(target_name)) | |
383 | except OrchestratorError as e: | |
384 | raise OrchestratorError(f'Failed to pull {target_name}: {str(e)}') | |
385 | # what we need to do here is build a list of daemons that must already be upgraded | |
386 | # in order for the user's selection of daemons to upgrade to be valid. for example, | |
387 | # if they say --daemon-types 'osd,mds' but mons have not been upgraded, we block. | |
388 | daemons = [d for d in self.mgr.cache.get_daemons( | |
389 | ) if d.daemon_type not in MONITORING_STACK_TYPES] | |
390 | err_msg_base = 'Cannot start upgrade. ' | |
391 | # "dtypes" will later be filled in with the types of daemons that will be upgraded with the given parameters | |
392 | dtypes = [] | |
393 | if daemon_types is not None: | |
394 | dtypes = daemon_types | |
395 | if hosts is not None: | |
396 | dtypes = [_latest_type(dtypes)] | |
397 | other_host_daemons = [ | |
398 | d for d in daemons if d.hostname is not None and d.hostname not in hosts] | |
399 | daemons = _get_earlier_daemons(dtypes, other_host_daemons) | |
400 | else: | |
401 | daemons = _get_earlier_daemons(dtypes, daemons) | |
402 | err_msg_base += 'Daemons with types earlier in upgrade order than given types need upgrading.\n' | |
403 | elif services is not None: | |
404 | # for our purposes here we can effectively convert our list of services into the | |
405 | # set of daemon types the services contain. This works because we don't allow --services | |
406 | # and --daemon-types at the same time and we only allow services of the same type | |
407 | sspecs = [ | |
408 | self.mgr.spec_store[s].spec for s in services if self.mgr.spec_store[s].spec is not None] | |
409 | stypes = list(set([s.service_type for s in sspecs])) | |
410 | if len(stypes) != 1: | |
411 | raise OrchestratorError('Doing upgrade by service only support services of one type at ' | |
412 | f'a time. Found service types: {stypes}') | |
413 | for stype in stypes: | |
414 | dtypes += orchestrator.service_to_daemon_types(stype) | |
415 | dtypes = list(set(dtypes)) | |
416 | if hosts is not None: | |
417 | other_host_daemons = [ | |
418 | d for d in daemons if d.hostname is not None and d.hostname not in hosts] | |
419 | daemons = _get_earlier_daemons(dtypes, other_host_daemons) | |
420 | else: | |
421 | daemons = _get_earlier_daemons(dtypes, daemons) | |
422 | err_msg_base += 'Daemons with types earlier in upgrade order than daemons from given services need upgrading.\n' | |
423 | elif hosts is not None: | |
424 | # hosts must be handled a bit differently. For this, we really need to find all the daemon types | |
425 | # that reside on hosts in the list of hosts we will upgrade. Then take the type from | |
426 | # that list that is latest in the upgrade order and check if any daemons on hosts not in the | |
427 | # provided list of hosts have a daemon with a type earlier in the upgrade order that is not upgraded. | |
428 | dtypes = list( | |
429 | set([d.daemon_type for d in daemons if d.daemon_type is not None and d.hostname in hosts])) | |
430 | other_hosts_daemons = [ | |
431 | d for d in daemons if d.hostname is not None and d.hostname not in hosts] | |
432 | daemons = _get_earlier_daemons([_latest_type(dtypes)], other_hosts_daemons) | |
433 | err_msg_base += 'Daemons with types earlier in upgrade order than daemons on given host need upgrading.\n' | |
434 | need_upgrade_self, n1, n2, _ = self._detect_need_upgrade(daemons, target_digests) | |
435 | if need_upgrade_self and ('mgr' not in dtypes or (daemon_types is None and services is None)): | |
436 | # also report active mgr as needing to be upgraded. It is not included in the resulting list | |
437 | # by default as it is treated special and handled via the need_upgrade_self bool | |
438 | n1.insert(0, (self.mgr.mgr_service.get_active_daemon( | |
439 | self.mgr.cache.get_daemons_by_type('mgr')), True)) | |
440 | if n1 or n2: | |
441 | raise OrchestratorError(f'{err_msg_base}Please first upgrade ' | |
442 | f'{", ".join(list(set([d[0].name() for d in n1] + [d[0].name() for d in n2])))}\n' | |
443 | f'NOTE: Enforced upgrade order is: {" -> ".join(CEPH_TYPES + GATEWAY_TYPES)}') | |
444 | ||
e306af50 TL |
445 | def upgrade_pause(self) -> str: |
446 | if not self.upgrade_state: | |
447 | raise OrchestratorError('No upgrade in progress') | |
f6b5b4d7 | 448 | if self.upgrade_state.paused: |
f91f0fd5 | 449 | return 'Upgrade to %s already paused' % self.target_image |
f6b5b4d7 | 450 | self.upgrade_state.paused = True |
f67539c2 | 451 | self.mgr.log.info('Upgrade: Paused upgrade to %s' % self.target_image) |
e306af50 | 452 | self._save_upgrade_state() |
f91f0fd5 | 453 | return 'Paused upgrade to %s' % self.target_image |
e306af50 TL |
454 | |
455 | def upgrade_resume(self) -> str: | |
456 | if not self.upgrade_state: | |
457 | raise OrchestratorError('No upgrade in progress') | |
f6b5b4d7 | 458 | if not self.upgrade_state.paused: |
f91f0fd5 | 459 | return 'Upgrade to %s not paused' % self.target_image |
f6b5b4d7 | 460 | self.upgrade_state.paused = False |
2a845540 | 461 | self.upgrade_state.error = '' |
f67539c2 | 462 | self.mgr.log.info('Upgrade: Resumed upgrade to %s' % self.target_image) |
e306af50 TL |
463 | self._save_upgrade_state() |
464 | self.mgr.event.set() | |
39ae355f TL |
465 | for alert_id in self.UPGRADE_ERRORS: |
466 | self.mgr.remove_health_warning(alert_id) | |
f91f0fd5 | 467 | return 'Resumed upgrade to %s' % self.target_image |
e306af50 TL |
468 | |
469 | def upgrade_stop(self) -> str: | |
470 | if not self.upgrade_state: | |
471 | return 'No upgrade in progress' | |
f6b5b4d7 | 472 | if self.upgrade_state.progress_id: |
e306af50 | 473 | self.mgr.remote('progress', 'complete', |
f6b5b4d7 | 474 | self.upgrade_state.progress_id) |
f91f0fd5 | 475 | target_image = self.target_image |
f67539c2 | 476 | self.mgr.log.info('Upgrade: Stopped') |
e306af50 TL |
477 | self.upgrade_state = None |
478 | self._save_upgrade_state() | |
479 | self._clear_upgrade_health_checks() | |
480 | self.mgr.event.set() | |
f91f0fd5 | 481 | return 'Stopped upgrade to %s' % target_image |
e306af50 TL |
482 | |
483 | def continue_upgrade(self) -> bool: | |
484 | """ | |
485 | Returns false, if nothing was done. | |
486 | :return: | |
487 | """ | |
f6b5b4d7 | 488 | if self.upgrade_state and not self.upgrade_state.paused: |
f67539c2 TL |
489 | try: |
490 | self._do_upgrade() | |
39ae355f TL |
491 | except HostConnectionError as e: |
492 | self._fail_upgrade('UPGRADE_OFFLINE_HOST', { | |
493 | 'severity': 'error', | |
494 | 'summary': f'Upgrade: Failed to connect to host {e.hostname} at addr ({e.addr})', | |
495 | 'count': 1, | |
496 | 'detail': [f'SSH connection failed to {e.hostname} at addr ({e.addr}): {str(e)}'], | |
497 | }) | |
498 | return False | |
f67539c2 TL |
499 | except Exception as e: |
500 | self._fail_upgrade('UPGRADE_EXCEPTION', { | |
501 | 'severity': 'error', | |
502 | 'summary': 'Upgrade: failed due to an unexpected exception', | |
503 | 'count': 1, | |
504 | 'detail': [f'Unexpected exception occurred during upgrade process: {str(e)}'], | |
505 | }) | |
506 | return False | |
e306af50 TL |
507 | return True |
508 | return False | |
509 | ||
f67539c2 TL |
510 | def _wait_for_ok_to_stop( |
511 | self, s: DaemonDescription, | |
512 | known: Optional[List[str]] = None, # NOTE: output argument! | |
513 | ) -> bool: | |
e306af50 | 514 | # only wait a little bit; the service might go away for something |
f67539c2 TL |
515 | assert s.daemon_type is not None |
516 | assert s.daemon_id is not None | |
e306af50 TL |
517 | tries = 4 |
518 | while tries > 0: | |
f6b5b4d7 | 519 | if not self.upgrade_state or self.upgrade_state.paused: |
e306af50 | 520 | return False |
f6b5b4d7 | 521 | |
f67539c2 TL |
522 | # setting force flag to retain old functionality. |
523 | # note that known is an output argument for ok_to_stop() | |
524 | r = self.mgr.cephadm_services[daemon_type_to_service(s.daemon_type)].ok_to_stop([ | |
525 | s.daemon_id], known=known, force=True) | |
f6b5b4d7 TL |
526 | |
527 | if not r.retval: | |
528 | logger.info(f'Upgrade: {r.stdout}') | |
e306af50 | 529 | return True |
f67539c2 | 530 | logger.info(f'Upgrade: {r.stderr}') |
f6b5b4d7 TL |
531 | |
532 | time.sleep(15) | |
533 | tries -= 1 | |
e306af50 TL |
534 | return False |
535 | ||
536 | def _clear_upgrade_health_checks(self) -> None: | |
adb31ebb | 537 | for k in self.UPGRADE_ERRORS: |
e306af50 TL |
538 | if k in self.mgr.health_checks: |
539 | del self.mgr.health_checks[k] | |
540 | self.mgr.set_health_checks(self.mgr.health_checks) | |
541 | ||
adb31ebb TL |
542 | def _fail_upgrade(self, alert_id: str, alert: dict) -> None: |
543 | assert alert_id in self.UPGRADE_ERRORS | |
f6b5b4d7 | 544 | if not self.upgrade_state: |
f67539c2 TL |
545 | # this could happen if the user canceled the upgrade while we |
546 | # were doing something | |
547 | return | |
f6b5b4d7 | 548 | |
f67539c2 TL |
549 | logger.error('Upgrade: Paused due to %s: %s' % (alert_id, |
550 | alert['summary'])) | |
f6b5b4d7 TL |
551 | self.upgrade_state.error = alert_id + ': ' + alert['summary'] |
552 | self.upgrade_state.paused = True | |
e306af50 TL |
553 | self._save_upgrade_state() |
554 | self.mgr.health_checks[alert_id] = alert | |
555 | self.mgr.set_health_checks(self.mgr.health_checks) | |
556 | ||
adb31ebb | 557 | def _update_upgrade_progress(self, progress: float) -> None: |
f6b5b4d7 TL |
558 | if not self.upgrade_state: |
559 | assert False, 'No upgrade in progress' | |
560 | ||
561 | if not self.upgrade_state.progress_id: | |
562 | self.upgrade_state.progress_id = str(uuid.uuid4()) | |
e306af50 | 563 | self._save_upgrade_state() |
f6b5b4d7 | 564 | self.mgr.remote('progress', 'update', self.upgrade_state.progress_id, |
f67539c2 TL |
565 | ev_msg='Upgrade to %s' % ( |
566 | self.upgrade_state.target_version or self.target_image | |
567 | ), | |
568 | ev_progress=progress, | |
569 | add_to_ceph_s=True) | |
e306af50 TL |
570 | |
571 | def _save_upgrade_state(self) -> None: | |
f6b5b4d7 TL |
572 | if not self.upgrade_state: |
573 | self.mgr.set_store('upgrade_state', None) | |
574 | return | |
575 | self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json())) | |
e306af50 | 576 | |
f91f0fd5 TL |
577 | def get_distinct_container_image_settings(self) -> Dict[str, str]: |
578 | # get all distinct container_image settings | |
579 | image_settings = {} | |
580 | ret, out, err = self.mgr.check_mon_command({ | |
581 | 'prefix': 'config dump', | |
582 | 'format': 'json', | |
583 | }) | |
584 | config = json.loads(out) | |
585 | for opt in config: | |
586 | if opt['name'] == 'container_image': | |
587 | image_settings[opt['section']] = opt['value'] | |
588 | return image_settings | |
589 | ||
f67539c2 TL |
590 | def _prepare_for_mds_upgrade( |
591 | self, | |
592 | target_major: str, | |
593 | need_upgrade: List[DaemonDescription] | |
594 | ) -> bool: | |
f67539c2 TL |
595 | # scale down all filesystems to 1 MDS |
596 | assert self.upgrade_state | |
597 | if not self.upgrade_state.fs_original_max_mds: | |
598 | self.upgrade_state.fs_original_max_mds = {} | |
a4b75251 TL |
599 | if not self.upgrade_state.fs_original_allow_standby_replay: |
600 | self.upgrade_state.fs_original_allow_standby_replay = {} | |
f67539c2 TL |
601 | fsmap = self.mgr.get("fs_map") |
602 | continue_upgrade = True | |
a4b75251 TL |
603 | for fs in fsmap.get('filesystems', []): |
604 | fscid = fs["id"] | |
605 | mdsmap = fs["mdsmap"] | |
606 | fs_name = mdsmap["fs_name"] | |
607 | ||
608 | # disable allow_standby_replay? | |
609 | if mdsmap['flags'] & CEPH_MDSMAP_ALLOW_STANDBY_REPLAY: | |
610 | self.mgr.log.info('Upgrade: Disabling standby-replay for filesystem %s' % ( | |
611 | fs_name | |
612 | )) | |
613 | if fscid not in self.upgrade_state.fs_original_allow_standby_replay: | |
614 | self.upgrade_state.fs_original_allow_standby_replay[fscid] = True | |
615 | self._save_upgrade_state() | |
616 | ret, out, err = self.mgr.check_mon_command({ | |
617 | 'prefix': 'fs set', | |
618 | 'fs_name': fs_name, | |
619 | 'var': 'allow_standby_replay', | |
620 | 'val': '0', | |
621 | }) | |
622 | continue_upgrade = False | |
623 | continue | |
f67539c2 TL |
624 | |
625 | # scale down this filesystem? | |
a4b75251 | 626 | if mdsmap["max_mds"] > 1: |
f67539c2 TL |
627 | self.mgr.log.info('Upgrade: Scaling down filesystem %s' % ( |
628 | fs_name | |
629 | )) | |
a4b75251 TL |
630 | if fscid not in self.upgrade_state.fs_original_max_mds: |
631 | self.upgrade_state.fs_original_max_mds[fscid] = mdsmap['max_mds'] | |
f67539c2 TL |
632 | self._save_upgrade_state() |
633 | ret, out, err = self.mgr.check_mon_command({ | |
634 | 'prefix': 'fs set', | |
635 | 'fs_name': fs_name, | |
636 | 'var': 'max_mds', | |
637 | 'val': '1', | |
638 | }) | |
639 | continue_upgrade = False | |
640 | continue | |
641 | ||
a4b75251 | 642 | if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1): |
20effc67 TL |
643 | self.mgr.log.info( |
644 | 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name)) | |
f67539c2 TL |
645 | time.sleep(10) |
646 | continue_upgrade = False | |
647 | continue | |
648 | ||
a4b75251 | 649 | if len(mdsmap['up']) == 0: |
20effc67 TL |
650 | self.mgr.log.warning( |
651 | "Upgrade: No mds is up; continuing upgrade procedure to poke things in the right direction") | |
a4b75251 TL |
652 | # This can happen because the current version MDS have |
653 | # incompatible compatsets; the mons will not do any promotions. | |
654 | # We must upgrade to continue. | |
655 | elif len(mdsmap['up']) > 0: | |
656 | mdss = list(mdsmap['info'].values()) | |
657 | assert len(mdss) == 1 | |
658 | lone_mds = mdss[0] | |
659 | if lone_mds['state'] != 'up:active': | |
660 | self.mgr.log.info('Upgrade: Waiting for mds.%s to be up:active (currently %s)' % ( | |
661 | lone_mds['name'], | |
662 | lone_mds['state'], | |
663 | )) | |
664 | time.sleep(10) | |
665 | continue_upgrade = False | |
666 | continue | |
667 | else: | |
668 | assert False | |
f67539c2 TL |
669 | |
670 | return continue_upgrade | |
671 | ||
b3b6e05e TL |
672 | def _enough_mons_for_ok_to_stop(self) -> bool: |
673 | # type () -> bool | |
674 | ret, out, err = self.mgr.check_mon_command({ | |
675 | 'prefix': 'quorum_status', | |
676 | }) | |
677 | try: | |
678 | j = json.loads(out) | |
679 | except Exception: | |
680 | raise OrchestratorError('failed to parse quorum status') | |
681 | ||
682 | mons = [m['name'] for m in j['monmap']['mons']] | |
683 | return len(mons) > 2 | |
684 | ||
685 | def _enough_mds_for_ok_to_stop(self, mds_daemon: DaemonDescription) -> bool: | |
686 | # type (DaemonDescription) -> bool | |
687 | ||
688 | # find fs this mds daemon belongs to | |
689 | fsmap = self.mgr.get("fs_map") | |
a4b75251 TL |
690 | for fs in fsmap.get('filesystems', []): |
691 | mdsmap = fs["mdsmap"] | |
692 | fs_name = mdsmap["fs_name"] | |
b3b6e05e TL |
693 | |
694 | assert mds_daemon.daemon_id | |
695 | if fs_name != mds_daemon.service_name().split('.', 1)[1]: | |
696 | # wrong fs for this mds daemon | |
697 | continue | |
698 | ||
699 | # get number of mds daemons for this fs | |
700 | mds_count = len( | |
701 | [daemon for daemon in self.mgr.cache.get_daemons_by_service(mds_daemon.service_name())]) | |
702 | ||
703 | # standby mds daemons for this fs? | |
a4b75251 | 704 | if mdsmap["max_mds"] < mds_count: |
b3b6e05e TL |
705 | return True |
706 | return False | |
707 | ||
708 | return True # if mds has no fs it should pass ok-to-stop | |
709 | ||
33c7a0ef TL |
710 | def _detect_need_upgrade(self, daemons: List[DaemonDescription], target_digests: Optional[List[str]] = None) -> Tuple[bool, List[Tuple[DaemonDescription, bool]], List[Tuple[DaemonDescription, bool]], int]: |
711 | # this function takes a list of daemons and container digests. The purpose | |
712 | # is to go through each daemon and check if the current container digests | |
713 | # for that daemon match the target digests. The purpose being that we determine | |
714 | # if a daemon is upgraded to a certain container image or not based on what | |
715 | # container digests it has. By checking the current digests against the | |
716 | # targets we can determine which daemons still need to be upgraded | |
717 | need_upgrade_self = False | |
718 | need_upgrade: List[Tuple[DaemonDescription, bool]] = [] | |
719 | need_upgrade_deployer: List[Tuple[DaemonDescription, bool]] = [] | |
720 | done = 0 | |
721 | if target_digests is None: | |
722 | target_digests = [] | |
723 | for d in daemons: | |
724 | assert d.daemon_type is not None | |
725 | assert d.daemon_id is not None | |
726 | assert d.hostname is not None | |
727 | if self.mgr.use_agent and not self.mgr.cache.host_metadata_up_to_date(d.hostname): | |
728 | continue | |
729 | correct_digest = False | |
730 | if (any(d in target_digests for d in (d.container_image_digests or [])) | |
731 | or d.daemon_type in MONITORING_STACK_TYPES): | |
732 | logger.debug('daemon %s.%s container digest correct' % ( | |
733 | d.daemon_type, d.daemon_id)) | |
734 | correct_digest = True | |
735 | if any(d in target_digests for d in (d.deployed_by or [])): | |
736 | logger.debug('daemon %s.%s deployed by correct version' % ( | |
737 | d.daemon_type, d.daemon_id)) | |
738 | done += 1 | |
739 | continue | |
740 | ||
741 | if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id): | |
742 | logger.info('Upgrade: Need to upgrade myself (mgr.%s)' % | |
743 | self.mgr.get_mgr_id()) | |
744 | need_upgrade_self = True | |
745 | continue | |
746 | ||
747 | if correct_digest: | |
748 | logger.debug('daemon %s.%s not deployed by correct version' % ( | |
749 | d.daemon_type, d.daemon_id)) | |
750 | need_upgrade_deployer.append((d, True)) | |
751 | else: | |
752 | logger.debug('daemon %s.%s not correct (%s, %s, %s)' % ( | |
753 | d.daemon_type, d.daemon_id, | |
754 | d.container_image_name, d.container_image_digests, d.version)) | |
755 | need_upgrade.append((d, False)) | |
756 | ||
757 | return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done) | |
758 | ||
759 | def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]: | |
760 | to_upgrade: List[Tuple[DaemonDescription, bool]] = [] | |
761 | known_ok_to_stop: List[str] = [] | |
762 | for d_entry in need_upgrade: | |
763 | d = d_entry[0] | |
764 | assert d.daemon_type is not None | |
765 | assert d.daemon_id is not None | |
766 | assert d.hostname is not None | |
767 | ||
768 | if not d.container_image_id: | |
769 | if d.container_image_name == target_image: | |
770 | logger.debug( | |
771 | 'daemon %s has unknown container_image_id but has correct image name' % (d.name())) | |
772 | continue | |
773 | ||
774 | if known_ok_to_stop: | |
775 | if d.name() in known_ok_to_stop: | |
776 | logger.info(f'Upgrade: {d.name()} is also safe to restart') | |
777 | to_upgrade.append(d_entry) | |
778 | continue | |
779 | ||
780 | if d.daemon_type == 'osd': | |
781 | # NOTE: known_ok_to_stop is an output argument for | |
782 | # _wait_for_ok_to_stop | |
783 | if not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
784 | return False, to_upgrade | |
785 | ||
786 | if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop(): | |
787 | if not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
788 | return False, to_upgrade | |
789 | ||
790 | if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d): | |
791 | if not self._wait_for_ok_to_stop(d, known_ok_to_stop): | |
792 | return False, to_upgrade | |
793 | ||
794 | to_upgrade.append(d_entry) | |
795 | ||
796 | # if we don't have a list of others to consider, stop now | |
797 | if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop: | |
798 | break | |
799 | return True, to_upgrade | |
800 | ||
801 | def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None: | |
802 | assert self.upgrade_state is not None | |
803 | num = 1 | |
804 | if target_digests is None: | |
805 | target_digests = [] | |
806 | for d_entry in to_upgrade: | |
807 | if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0 and not d_entry[1]: | |
808 | self.mgr.log.info( | |
809 | f'Hit upgrade limit of {self.upgrade_state.total_count}. Stopping upgrade') | |
810 | return | |
811 | d = d_entry[0] | |
812 | assert d.daemon_type is not None | |
813 | assert d.daemon_id is not None | |
814 | assert d.hostname is not None | |
815 | ||
816 | # make sure host has latest container image | |
817 | out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( | |
818 | d.hostname, '', 'inspect-image', [], | |
819 | image=target_image, no_fsid=True, error_ok=True)) | |
820 | if code or not any(d in target_digests for d in json.loads(''.join(out)).get('repo_digests', [])): | |
821 | logger.info('Upgrade: Pulling %s on %s' % (target_image, | |
822 | d.hostname)) | |
823 | self.upgrade_info_str = 'Pulling %s image on host %s' % ( | |
824 | target_image, d.hostname) | |
825 | out, errs, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm( | |
826 | d.hostname, '', 'pull', [], | |
827 | image=target_image, no_fsid=True, error_ok=True)) | |
828 | if code: | |
829 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
830 | 'severity': 'warning', | |
831 | 'summary': 'Upgrade: failed to pull target image', | |
832 | 'count': 1, | |
833 | 'detail': [ | |
834 | 'failed to pull %s on host %s' % (target_image, | |
835 | d.hostname)], | |
836 | }) | |
837 | return | |
838 | r = json.loads(''.join(out)) | |
839 | if not any(d in target_digests for d in r.get('repo_digests', [])): | |
840 | logger.info('Upgrade: image %s pull on %s got new digests %s (not %s), restarting' % ( | |
841 | target_image, d.hostname, r['repo_digests'], target_digests)) | |
842 | self.upgrade_info_str = 'Image %s pull on %s got new digests %s (not %s), restarting' % ( | |
843 | target_image, d.hostname, r['repo_digests'], target_digests) | |
844 | self.upgrade_state.target_digests = r['repo_digests'] | |
845 | self._save_upgrade_state() | |
846 | return | |
847 | ||
848 | self.upgrade_info_str = 'Currently upgrading %s daemons' % (d.daemon_type) | |
849 | ||
850 | if len(to_upgrade) > 1: | |
851 | logger.info('Upgrade: Updating %s.%s (%d/%d)' % (d.daemon_type, d.daemon_id, num, min(len(to_upgrade), | |
852 | self.upgrade_state.remaining_count if self.upgrade_state.remaining_count is not None else 9999999))) | |
853 | else: | |
854 | logger.info('Upgrade: Updating %s.%s' % | |
855 | (d.daemon_type, d.daemon_id)) | |
856 | action = 'Upgrading' if not d_entry[1] else 'Redeploying' | |
857 | try: | |
858 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(d) | |
859 | self.mgr._daemon_action( | |
860 | daemon_spec, | |
861 | 'redeploy', | |
862 | image=target_image if not d_entry[1] else None | |
863 | ) | |
864 | self.mgr.cache.metadata_up_to_date[d.hostname] = False | |
865 | except Exception as e: | |
866 | self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', { | |
867 | 'severity': 'warning', | |
868 | 'summary': f'{action} daemon {d.name()} on host {d.hostname} failed.', | |
869 | 'count': 1, | |
870 | 'detail': [ | |
871 | f'Upgrade daemon: {d.name()}: {e}' | |
872 | ], | |
873 | }) | |
874 | return | |
875 | num += 1 | |
876 | if self.upgrade_state.remaining_count is not None and not d_entry[1]: | |
877 | self.upgrade_state.remaining_count -= 1 | |
878 | self._save_upgrade_state() | |
879 | ||
880 | def _handle_need_upgrade_self(self, need_upgrade_self: bool, upgrading_mgrs: bool) -> None: | |
881 | if need_upgrade_self: | |
882 | try: | |
883 | self.mgr.mgr_service.fail_over() | |
884 | except OrchestratorError as e: | |
885 | self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { | |
886 | 'severity': 'warning', | |
887 | 'summary': f'Upgrade: {e}', | |
888 | 'count': 1, | |
889 | 'detail': [ | |
890 | 'The upgrade process needs to upgrade the mgr, ' | |
891 | 'but it needs at least one standby to proceed.', | |
892 | ], | |
893 | }) | |
894 | return | |
895 | ||
896 | return # unreachable code, as fail_over never returns | |
897 | elif upgrading_mgrs: | |
898 | if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks: | |
899 | del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR'] | |
900 | self.mgr.set_health_checks(self.mgr.health_checks) | |
901 | ||
902 | def _set_container_images(self, daemon_type: str, target_image: str, image_settings: Dict[str, str]) -> None: | |
903 | # push down configs | |
904 | daemon_type_section = name_to_config_section(daemon_type) | |
905 | if image_settings.get(daemon_type_section) != target_image: | |
906 | logger.info('Upgrade: Setting container_image for all %s' % | |
907 | daemon_type) | |
908 | self.mgr.set_container_image(daemon_type_section, target_image) | |
909 | to_clean = [] | |
910 | for section in image_settings.keys(): | |
911 | if section.startswith(name_to_config_section(daemon_type) + '.'): | |
912 | to_clean.append(section) | |
913 | if to_clean: | |
914 | logger.debug('Upgrade: Cleaning up container_image for %s' % | |
915 | to_clean) | |
916 | for section in to_clean: | |
917 | ret, image, err = self.mgr.check_mon_command({ | |
918 | 'prefix': 'config rm', | |
919 | 'name': 'container_image', | |
920 | 'who': section, | |
921 | }) | |
922 | ||
923 | def _complete_osd_upgrade(self, target_major: str, target_major_name: str) -> None: | |
924 | osdmap = self.mgr.get("osd_map") | |
925 | osd_min_name = osdmap.get("require_osd_release", "argonaut") | |
926 | osd_min = ceph_release_to_major(osd_min_name) | |
927 | if osd_min < int(target_major): | |
928 | logger.info( | |
929 | f'Upgrade: Setting require_osd_release to {target_major} {target_major_name}') | |
930 | ret, _, err = self.mgr.check_mon_command({ | |
931 | 'prefix': 'osd require-osd-release', | |
932 | 'release': target_major_name, | |
933 | }) | |
934 | ||
935 | def _complete_mds_upgrade(self) -> None: | |
936 | assert self.upgrade_state is not None | |
937 | if self.upgrade_state.fs_original_max_mds: | |
938 | for fs in self.mgr.get("fs_map")['filesystems']: | |
939 | fscid = fs["id"] | |
940 | fs_name = fs['mdsmap']['fs_name'] | |
941 | new_max = self.upgrade_state.fs_original_max_mds.get(fscid, 1) | |
942 | if new_max > 1: | |
943 | self.mgr.log.info('Upgrade: Scaling up filesystem %s max_mds to %d' % ( | |
944 | fs_name, new_max | |
945 | )) | |
946 | ret, _, err = self.mgr.check_mon_command({ | |
947 | 'prefix': 'fs set', | |
948 | 'fs_name': fs_name, | |
949 | 'var': 'max_mds', | |
950 | 'val': str(new_max), | |
951 | }) | |
952 | ||
953 | self.upgrade_state.fs_original_max_mds = {} | |
954 | self._save_upgrade_state() | |
955 | if self.upgrade_state.fs_original_allow_standby_replay: | |
956 | for fs in self.mgr.get("fs_map")['filesystems']: | |
957 | fscid = fs["id"] | |
958 | fs_name = fs['mdsmap']['fs_name'] | |
959 | asr = self.upgrade_state.fs_original_allow_standby_replay.get(fscid, False) | |
960 | if asr: | |
961 | self.mgr.log.info('Upgrade: Enabling allow_standby_replay on filesystem %s' % ( | |
962 | fs_name | |
963 | )) | |
964 | ret, _, err = self.mgr.check_mon_command({ | |
965 | 'prefix': 'fs set', | |
966 | 'fs_name': fs_name, | |
967 | 'var': 'allow_standby_replay', | |
968 | 'val': '1' | |
969 | }) | |
970 | ||
971 | self.upgrade_state.fs_original_allow_standby_replay = {} | |
972 | self._save_upgrade_state() | |
973 | ||
974 | def _mark_upgrade_complete(self) -> None: | |
975 | if not self.upgrade_state: | |
976 | logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting') | |
977 | return | |
978 | logger.info('Upgrade: Complete!') | |
979 | if self.upgrade_state.progress_id: | |
980 | self.mgr.remote('progress', 'complete', | |
981 | self.upgrade_state.progress_id) | |
982 | self.upgrade_state = None | |
983 | self._save_upgrade_state() | |
984 | ||
e306af50 TL |
985 | def _do_upgrade(self): |
986 | # type: () -> None | |
987 | if not self.upgrade_state: | |
988 | logger.debug('_do_upgrade no state, exiting') | |
989 | return | |
990 | ||
39ae355f TL |
991 | if self.mgr.offline_hosts: |
992 | # offline host(s), on top of potential connection errors when trying to upgrade a daemon | |
993 | # or pull an image, can cause issues where daemons are never ok to stop. Since evaluating | |
994 | # whether or not that risk is present for any given offline hosts is a difficult problem, | |
995 | # it's best to just fail upgrade cleanly so user can address the offline host(s) | |
996 | ||
997 | # the HostConnectionError expects a hostname and addr, so let's just take | |
998 | # one at random. It doesn't really matter which host we say we couldn't reach here. | |
999 | hostname: str = list(self.mgr.offline_hosts)[0] | |
1000 | addr: str = self.mgr.inventory.get_addr(hostname) | |
1001 | raise HostConnectionError(f'Host(s) were marked offline: {self.mgr.offline_hosts}', hostname, addr) | |
1002 | ||
f91f0fd5 | 1003 | target_image = self.target_image |
f6b5b4d7 | 1004 | target_id = self.upgrade_state.target_id |
f67539c2 TL |
1005 | target_digests = self.upgrade_state.target_digests |
1006 | target_version = self.upgrade_state.target_version | |
1007 | ||
1008 | first = False | |
1009 | if not target_id or not target_version or not target_digests: | |
e306af50 | 1010 | # need to learn the container hash |
f91f0fd5 | 1011 | logger.info('Upgrade: First pull of %s' % target_image) |
33c7a0ef | 1012 | self.upgrade_info_str = 'Doing first pull of %s image' % (target_image) |
e306af50 | 1013 | try: |
20effc67 TL |
1014 | target_id, target_version, target_digests = self.mgr.wait_async(CephadmServe(self.mgr)._get_container_image_info( |
1015 | target_image)) | |
e306af50 TL |
1016 | except OrchestratorError as e: |
1017 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
1018 | 'severity': 'warning', | |
1019 | 'summary': 'Upgrade: failed to pull target image', | |
1020 | 'count': 1, | |
1021 | 'detail': [str(e)], | |
1022 | }) | |
1023 | return | |
f67539c2 TL |
1024 | if not target_version: |
1025 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
1026 | 'severity': 'warning', | |
1027 | 'summary': 'Upgrade: failed to pull target image', | |
1028 | 'count': 1, | |
1029 | 'detail': ['unable to extract ceph version from container'], | |
1030 | }) | |
1031 | return | |
f6b5b4d7 | 1032 | self.upgrade_state.target_id = target_id |
f67539c2 TL |
1033 | # extract the version portion of 'ceph version {version} ({sha1})' |
1034 | self.upgrade_state.target_version = target_version.split(' ')[2] | |
1035 | self.upgrade_state.target_digests = target_digests | |
e306af50 | 1036 | self._save_upgrade_state() |
f91f0fd5 | 1037 | target_image = self.target_image |
f67539c2 TL |
1038 | first = True |
1039 | ||
1040 | if target_digests is None: | |
1041 | target_digests = [] | |
1042 | if target_version.startswith('ceph version '): | |
1043 | # tolerate/fix upgrade state from older version | |
1044 | self.upgrade_state.target_version = target_version.split(' ')[2] | |
1045 | target_version = self.upgrade_state.target_version | |
1046 | (target_major, _) = target_version.split('.', 1) | |
1047 | target_major_name = self.mgr.lookup_release_name(int(target_major)) | |
1048 | ||
1049 | if first: | |
1050 | logger.info('Upgrade: Target is version %s (%s)' % ( | |
1051 | target_version, target_major_name)) | |
1052 | logger.info('Upgrade: Target container is %s, digests %s' % ( | |
1053 | target_image, target_digests)) | |
1054 | ||
1055 | version_error = self._check_target_version(target_version) | |
1056 | if version_error: | |
1057 | self._fail_upgrade('UPGRADE_BAD_TARGET_VERSION', { | |
1058 | 'severity': 'error', | |
1059 | 'summary': f'Upgrade: cannot upgrade/downgrade to {target_version}', | |
1060 | 'count': 1, | |
1061 | 'detail': [version_error], | |
1062 | }) | |
1063 | return | |
e306af50 | 1064 | |
f91f0fd5 | 1065 | image_settings = self.get_distinct_container_image_settings() |
e306af50 | 1066 | |
a4b75251 TL |
1067 | # Older monitors (pre-v16.2.5) asserted that FSMap::compat == |
1068 | # MDSMap::compat for all fs. This is no longer the case beginning in | |
1069 | # v16.2.5. We must disable the sanity checks during upgrade. | |
1070 | # N.B.: we don't bother confirming the operator has not already | |
1071 | # disabled this or saving the config value. | |
1072 | self.mgr.check_mon_command({ | |
1073 | 'prefix': 'config set', | |
1074 | 'name': 'mon_mds_skip_sanity', | |
1075 | 'value': '1', | |
1076 | 'who': 'mon', | |
1077 | }) | |
1078 | ||
33c7a0ef TL |
1079 | if self.upgrade_state.daemon_types is not None: |
1080 | logger.debug( | |
1081 | f'Filtering daemons to upgrade by daemon types: {self.upgrade_state.daemon_types}') | |
1082 | daemons = [d for d in self.mgr.cache.get_daemons( | |
1083 | ) if d.daemon_type in self.upgrade_state.daemon_types] | |
1084 | elif self.upgrade_state.services is not None: | |
1085 | logger.debug( | |
1086 | f'Filtering daemons to upgrade by services: {self.upgrade_state.daemon_types}') | |
1087 | daemons = [] | |
1088 | for service in self.upgrade_state.services: | |
1089 | daemons += self.mgr.cache.get_daemons_by_service(service) | |
1090 | else: | |
1091 | daemons = [d for d in self.mgr.cache.get_daemons( | |
1092 | ) if d.daemon_type in CEPH_UPGRADE_ORDER] | |
1093 | if self.upgrade_state.hosts is not None: | |
1094 | logger.debug(f'Filtering daemons to upgrade by hosts: {self.upgrade_state.hosts}') | |
1095 | daemons = [d for d in daemons if d.hostname in self.upgrade_state.hosts] | |
1096 | upgraded_daemon_count: int = 0 | |
e306af50 | 1097 | for daemon_type in CEPH_UPGRADE_ORDER: |
33c7a0ef TL |
1098 | if self.upgrade_state.remaining_count is not None and self.upgrade_state.remaining_count <= 0: |
1099 | # we hit our limit and should end the upgrade | |
1100 | # except for cases where we only need to redeploy, but not actually upgrade | |
1101 | # the image (which we don't count towards our limit). This case only occurs with mgr | |
1102 | # and monitoring stack daemons. Additionally, this case is only valid if | |
1103 | # the active mgr is already upgraded. | |
1104 | if any(d in target_digests for d in self.mgr.get_active_mgr_digests()): | |
1105 | if daemon_type not in MONITORING_STACK_TYPES and daemon_type != 'mgr': | |
f67539c2 | 1106 | continue |
f67539c2 | 1107 | else: |
33c7a0ef TL |
1108 | self._mark_upgrade_complete() |
1109 | return | |
1110 | logger.debug('Upgrade: Checking %s daemons' % daemon_type) | |
1111 | daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type] | |
1112 | ||
1113 | need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade( | |
1114 | daemons_of_type, target_digests) | |
1115 | upgraded_daemon_count += done | |
1116 | self._update_upgrade_progress(upgraded_daemon_count / len(daemons)) | |
1117 | ||
1118 | # make sure mgr and monitoring stack daemons are properly redeployed in staggered upgrade scenarios | |
1119 | if daemon_type == 'mgr' or daemon_type in MONITORING_STACK_TYPES: | |
1120 | if any(d in target_digests for d in self.mgr.get_active_mgr_digests()): | |
1121 | need_upgrade_names = [d[0].name() for d in need_upgrade] + \ | |
1122 | [d[0].name() for d in need_upgrade_deployer] | |
1123 | dds = [d for d in self.mgr.cache.get_daemons_by_type( | |
1124 | daemon_type) if d.name() not in need_upgrade_names] | |
1125 | need_upgrade_active, n1, n2, __ = self._detect_need_upgrade(dds, target_digests) | |
1126 | if not n1: | |
1127 | if not need_upgrade_self and need_upgrade_active: | |
1128 | need_upgrade_self = True | |
1129 | need_upgrade_deployer += n2 | |
1130 | else: | |
1131 | # no point in trying to redeploy with new version if active mgr is not on the new version | |
1132 | need_upgrade_deployer = [] | |
f67539c2 | 1133 | |
39ae355f | 1134 | if any(d in target_digests for d in self.mgr.get_active_mgr_digests()): |
f67539c2 TL |
1135 | # only after the mgr itself is upgraded can we expect daemons to have |
1136 | # deployed_by == target_digests | |
1137 | need_upgrade += need_upgrade_deployer | |
1138 | ||
1139 | # prepare filesystems for daemon upgrades? | |
1140 | if ( | |
1141 | daemon_type == 'mds' | |
1142 | and need_upgrade | |
1143 | and not self._prepare_for_mds_upgrade(target_major, [d_entry[0] for d_entry in need_upgrade]) | |
1144 | ): | |
1145 | return | |
1146 | ||
1147 | if need_upgrade: | |
1148 | self.upgrade_info_str = 'Currently upgrading %s daemons' % (daemon_type) | |
1149 | ||
33c7a0ef TL |
1150 | _continue, to_upgrade = self._to_upgrade(need_upgrade, target_image) |
1151 | if not _continue: | |
1152 | return | |
1153 | self._upgrade_daemons(to_upgrade, target_image, target_digests) | |
f67539c2 | 1154 | if to_upgrade: |
e306af50 TL |
1155 | return |
1156 | ||
33c7a0ef TL |
1157 | self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr') |
1158 | ||
1159 | # following bits of _do_upgrade are for completing upgrade for given | |
1160 | # types. If we haven't actually finished upgrading all the daemons | |
1161 | # of this type, we should exit the loop here | |
1162 | _, n1, n2, _ = self._detect_need_upgrade( | |
1163 | self.mgr.cache.get_daemons_by_type(daemon_type), target_digests) | |
1164 | if n1 or n2: | |
1165 | continue | |
1166 | ||
f67539c2 TL |
1167 | # complete mon upgrade? |
1168 | if daemon_type == 'mon': | |
1169 | if not self.mgr.get("have_local_config_map"): | |
1170 | logger.info('Upgrade: Restarting mgr now that mons are running pacific') | |
1171 | need_upgrade_self = True | |
1172 | ||
33c7a0ef | 1173 | self._handle_need_upgrade_self(need_upgrade_self, daemon_type == 'mgr') |
e306af50 TL |
1174 | |
1175 | # make sure 'ceph versions' agrees | |
f6b5b4d7 | 1176 | ret, out_ver, err = self.mgr.check_mon_command({ |
e306af50 TL |
1177 | 'prefix': 'versions', |
1178 | }) | |
f6b5b4d7 | 1179 | j = json.loads(out_ver) |
e306af50 | 1180 | for version, count in j.get(daemon_type, {}).items(): |
f67539c2 TL |
1181 | short_version = version.split(' ')[2] |
1182 | if short_version != target_version: | |
e306af50 TL |
1183 | logger.warning( |
1184 | 'Upgrade: %d %s daemon(s) are %s != target %s' % | |
f67539c2 | 1185 | (count, daemon_type, short_version, target_version)) |
e306af50 | 1186 | |
33c7a0ef | 1187 | self._set_container_images(daemon_type, target_image, image_settings) |
e306af50 | 1188 | |
f67539c2 TL |
1189 | # complete osd upgrade? |
1190 | if daemon_type == 'osd': | |
33c7a0ef | 1191 | self._complete_osd_upgrade(target_major, target_major_name) |
f67539c2 TL |
1192 | |
1193 | # complete mds upgrade? | |
a4b75251 | 1194 | if daemon_type == 'mds': |
33c7a0ef | 1195 | self._complete_mds_upgrade() |
e306af50 | 1196 | |
20effc67 TL |
1197 | # Make sure all metadata is up to date before saying we are done upgrading this daemon type |
1198 | if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date(): | |
1199 | self.mgr.agent_helpers._request_ack_all_not_up_to_date() | |
1200 | return | |
1201 | ||
33c7a0ef | 1202 | logger.debug('Upgrade: Upgraded %s daemon(s).' % daemon_type) |
20effc67 | 1203 | |
e306af50 TL |
1204 | # clean up |
1205 | logger.info('Upgrade: Finalizing container_image settings') | |
f91f0fd5 TL |
1206 | self.mgr.set_container_image('global', target_image) |
1207 | ||
e306af50 TL |
1208 | for daemon_type in CEPH_UPGRADE_ORDER: |
1209 | ret, image, err = self.mgr.check_mon_command({ | |
1210 | 'prefix': 'config rm', | |
1211 | 'name': 'container_image', | |
1212 | 'who': name_to_config_section(daemon_type), | |
1213 | }) | |
1214 | ||
a4b75251 TL |
1215 | self.mgr.check_mon_command({ |
1216 | 'prefix': 'config rm', | |
1217 | 'name': 'mon_mds_skip_sanity', | |
1218 | 'who': 'mon', | |
1219 | }) | |
1220 | ||
33c7a0ef | 1221 | self._mark_upgrade_complete() |
e306af50 | 1222 | return |