]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import json |
2 | import logging | |
3 | import time | |
4 | import uuid | |
5 | from typing import TYPE_CHECKING, Optional | |
6 | ||
7 | import orchestrator | |
8 | from cephadm.utils import name_to_config_section | |
f6b5b4d7 | 9 | from orchestrator import OrchestratorError, DaemonDescription |
e306af50 TL |
10 | |
11 | if TYPE_CHECKING: | |
12 | from .module import CephadmOrchestrator | |
13 | ||
14 | ||
15 | # ceph daemon types that use the ceph container image. | |
16 | # NOTE: listed in upgrade order! | |
17 | CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror'] | |
18 | ||
19 | logger = logging.getLogger(__name__) | |
20 | ||
f6b5b4d7 TL |
21 | |
22 | class UpgradeState: | |
23 | def __init__(self, | |
24 | target_name: str, | |
25 | progress_id: str, | |
26 | target_id: Optional[str] = None, | |
27 | target_version: Optional[str] = None, | |
28 | error: Optional[str] = None, | |
29 | paused: Optional[bool] = None, | |
30 | ): | |
31 | self.target_name: str = target_name | |
32 | self.progress_id: str = progress_id | |
33 | self.target_id: Optional[str] = target_id | |
34 | self.target_version: Optional[str] = target_version | |
35 | self.error: Optional[str] = error | |
36 | self.paused: bool = paused or False | |
37 | ||
38 | def to_json(self) -> dict: | |
39 | return { | |
40 | 'target_name': self.target_name, | |
41 | 'progress_id': self.progress_id, | |
42 | 'target_id': self.target_id, | |
43 | 'target_version': self.target_version, | |
44 | 'error': self.error, | |
45 | 'paused': self.paused, | |
46 | } | |
47 | ||
48 | @classmethod | |
49 | def from_json(cls, data) -> 'UpgradeState': | |
50 | return cls(**data) | |
51 | ||
52 | ||
e306af50 TL |
53 | class CephadmUpgrade: |
54 | def __init__(self, mgr: "CephadmOrchestrator"): | |
55 | self.mgr = mgr | |
56 | ||
57 | t = self.mgr.get_store('upgrade_state') | |
58 | if t: | |
f6b5b4d7 | 59 | self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t)) |
e306af50 TL |
60 | else: |
61 | self.upgrade_state = None | |
62 | ||
63 | def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: | |
64 | r = orchestrator.UpgradeStatusSpec() | |
65 | if self.upgrade_state: | |
f6b5b4d7 | 66 | r.target_image = self.upgrade_state.target_name |
e306af50 | 67 | r.in_progress = True |
f6b5b4d7 TL |
68 | if self.upgrade_state.error: |
69 | r.message = 'Error: ' + self.upgrade_state.error | |
70 | elif self.upgrade_state.paused: | |
e306af50 TL |
71 | r.message = 'Upgrade paused' |
72 | return r | |
73 | ||
74 | def upgrade_start(self, image, version) -> str: | |
75 | if self.mgr.mode != 'root': | |
76 | raise OrchestratorError('upgrade is not supported in %s mode' % ( | |
77 | self.mgr.mode)) | |
78 | if version: | |
79 | try: | |
80 | (major, minor, patch) = version.split('.') | |
81 | assert int(minor) >= 0 | |
82 | assert int(patch) >= 0 | |
83 | except: | |
84 | raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)') | |
85 | if int(major) < 15 or (int(major) == 15 and int(minor) < 2): | |
86 | raise OrchestratorError('cephadm only supports octopus (15.2.0) or later') | |
87 | target_name = self.mgr.container_image_base + ':v' + version | |
88 | elif image: | |
89 | target_name = image | |
90 | else: | |
91 | raise OrchestratorError('must specify either image or version') | |
92 | if self.upgrade_state: | |
f6b5b4d7 | 93 | if self.upgrade_state.target_name != target_name: |
e306af50 TL |
94 | raise OrchestratorError( |
95 | 'Upgrade to %s (not %s) already in progress' % | |
f6b5b4d7 TL |
96 | (self.upgrade_state.target_name, target_name)) |
97 | if self.upgrade_state.paused: | |
98 | self.upgrade_state.paused = False | |
e306af50 | 99 | self._save_upgrade_state() |
f6b5b4d7 TL |
100 | return 'Resumed upgrade to %s' % self.upgrade_state.target_name |
101 | return 'Upgrade to %s in progress' % self.upgrade_state.target_name | |
102 | self.upgrade_state = UpgradeState( | |
103 | target_name=target_name, | |
104 | progress_id=str(uuid.uuid4()) | |
105 | ) | |
e306af50 TL |
106 | self._update_upgrade_progress(0.0) |
107 | self._save_upgrade_state() | |
108 | self._clear_upgrade_health_checks() | |
109 | self.mgr.event.set() | |
110 | return 'Initiating upgrade to %s' % (target_name) | |
111 | ||
112 | def upgrade_pause(self) -> str: | |
113 | if not self.upgrade_state: | |
114 | raise OrchestratorError('No upgrade in progress') | |
f6b5b4d7 TL |
115 | if self.upgrade_state.paused: |
116 | return 'Upgrade to %s already paused' % self.upgrade_state.target_name | |
117 | self.upgrade_state.paused = True | |
e306af50 | 118 | self._save_upgrade_state() |
f6b5b4d7 | 119 | return 'Paused upgrade to %s' % self.upgrade_state.target_name |
e306af50 TL |
120 | |
121 | def upgrade_resume(self) -> str: | |
122 | if not self.upgrade_state: | |
123 | raise OrchestratorError('No upgrade in progress') | |
f6b5b4d7 TL |
124 | if not self.upgrade_state.paused: |
125 | return 'Upgrade to %s not paused' % self.upgrade_state.target_name | |
126 | self.upgrade_state.paused = False | |
e306af50 TL |
127 | self._save_upgrade_state() |
128 | self.mgr.event.set() | |
f6b5b4d7 | 129 | return 'Resumed upgrade to %s' % self.upgrade_state.target_name |
e306af50 TL |
130 | |
131 | def upgrade_stop(self) -> str: | |
132 | if not self.upgrade_state: | |
133 | return 'No upgrade in progress' | |
f6b5b4d7 TL |
134 | target_name = self.upgrade_state.target_name |
135 | if self.upgrade_state.progress_id: | |
e306af50 | 136 | self.mgr.remote('progress', 'complete', |
f6b5b4d7 | 137 | self.upgrade_state.progress_id) |
e306af50 TL |
138 | self.upgrade_state = None |
139 | self._save_upgrade_state() | |
140 | self._clear_upgrade_health_checks() | |
141 | self.mgr.event.set() | |
142 | return 'Stopped upgrade to %s' % target_name | |
143 | ||
144 | def continue_upgrade(self) -> bool: | |
145 | """ | |
146 | Returns false, if nothing was done. | |
147 | :return: | |
148 | """ | |
f6b5b4d7 | 149 | if self.upgrade_state and not self.upgrade_state.paused: |
e306af50 TL |
150 | self._do_upgrade() |
151 | return True | |
152 | return False | |
153 | ||
f6b5b4d7 | 154 | def _wait_for_ok_to_stop(self, s: DaemonDescription) -> bool: |
e306af50 TL |
155 | # only wait a little bit; the service might go away for something |
156 | tries = 4 | |
157 | while tries > 0: | |
f6b5b4d7 | 158 | if not self.upgrade_state or self.upgrade_state.paused: |
e306af50 | 159 | return False |
f6b5b4d7 TL |
160 | |
161 | r = self.mgr.cephadm_services[s.daemon_type].ok_to_stop([s.daemon_id]) | |
162 | ||
163 | if not r.retval: | |
164 | logger.info(f'Upgrade: {r.stdout}') | |
e306af50 | 165 | return True |
f6b5b4d7 TL |
166 | logger.error('Upgrade: {r.stderr}') |
167 | ||
168 | time.sleep(15) | |
169 | tries -= 1 | |
e306af50 TL |
170 | return False |
171 | ||
172 | def _clear_upgrade_health_checks(self) -> None: | |
173 | for k in ['UPGRADE_NO_STANDBY_MGR', | |
174 | 'UPGRADE_FAILED_PULL']: | |
175 | if k in self.mgr.health_checks: | |
176 | del self.mgr.health_checks[k] | |
177 | self.mgr.set_health_checks(self.mgr.health_checks) | |
178 | ||
179 | def _fail_upgrade(self, alert_id, alert) -> None: | |
180 | logger.error('Upgrade: Paused due to %s: %s' % (alert_id, | |
f6b5b4d7 TL |
181 | alert['summary'])) |
182 | if not self.upgrade_state: | |
183 | assert False, 'No upgrade in progress' | |
184 | ||
185 | self.upgrade_state.error = alert_id + ': ' + alert['summary'] | |
186 | self.upgrade_state.paused = True | |
e306af50 TL |
187 | self._save_upgrade_state() |
188 | self.mgr.health_checks[alert_id] = alert | |
189 | self.mgr.set_health_checks(self.mgr.health_checks) | |
190 | ||
191 | def _update_upgrade_progress(self, progress) -> None: | |
f6b5b4d7 TL |
192 | if not self.upgrade_state: |
193 | assert False, 'No upgrade in progress' | |
194 | ||
195 | if not self.upgrade_state.progress_id: | |
196 | self.upgrade_state.progress_id = str(uuid.uuid4()) | |
e306af50 | 197 | self._save_upgrade_state() |
f6b5b4d7 TL |
198 | self.mgr.remote('progress', 'update', self.upgrade_state.progress_id, |
199 | ev_msg='Upgrade to %s' % self.upgrade_state.target_name, | |
e306af50 TL |
200 | ev_progress=progress) |
201 | ||
202 | def _save_upgrade_state(self) -> None: | |
f6b5b4d7 TL |
203 | if not self.upgrade_state: |
204 | self.mgr.set_store('upgrade_state', None) | |
205 | return | |
206 | self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json())) | |
e306af50 TL |
207 | |
208 | def _do_upgrade(self): | |
209 | # type: () -> None | |
210 | if not self.upgrade_state: | |
211 | logger.debug('_do_upgrade no state, exiting') | |
212 | return | |
213 | ||
f6b5b4d7 TL |
214 | target_name = self.upgrade_state.target_name |
215 | target_id = self.upgrade_state.target_id | |
e306af50 TL |
216 | if not target_id: |
217 | # need to learn the container hash | |
218 | logger.info('Upgrade: First pull of %s' % target_name) | |
219 | try: | |
220 | target_id, target_version = self.mgr._get_container_image_id(target_name) | |
221 | except OrchestratorError as e: | |
222 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
223 | 'severity': 'warning', | |
224 | 'summary': 'Upgrade: failed to pull target image', | |
225 | 'count': 1, | |
226 | 'detail': [str(e)], | |
227 | }) | |
228 | return | |
f6b5b4d7 TL |
229 | self.upgrade_state.target_id = target_id |
230 | self.upgrade_state.target_version = target_version | |
e306af50 | 231 | self._save_upgrade_state() |
f6b5b4d7 | 232 | target_version = self.upgrade_state.target_version |
e306af50 | 233 | logger.info('Upgrade: Target is %s with id %s' % (target_name, |
f6b5b4d7 | 234 | target_id)) |
e306af50 TL |
235 | |
236 | # get all distinct container_image settings | |
237 | image_settings = {} | |
238 | ret, out, err = self.mgr.check_mon_command({ | |
239 | 'prefix': 'config dump', | |
240 | 'format': 'json', | |
241 | }) | |
242 | config = json.loads(out) | |
243 | for opt in config: | |
244 | if opt['name'] == 'container_image': | |
245 | image_settings[opt['section']] = opt['value'] | |
246 | ||
247 | daemons = self.mgr.cache.get_daemons() | |
248 | done = 0 | |
249 | for daemon_type in CEPH_UPGRADE_ORDER: | |
250 | logger.info('Upgrade: Checking %s daemons...' % daemon_type) | |
251 | need_upgrade_self = False | |
252 | for d in daemons: | |
253 | if d.daemon_type != daemon_type: | |
254 | continue | |
255 | if d.container_image_id == target_id: | |
256 | logger.debug('daemon %s.%s version correct' % ( | |
257 | daemon_type, d.daemon_id)) | |
258 | done += 1 | |
259 | continue | |
260 | logger.debug('daemon %s.%s not correct (%s, %s, %s)' % ( | |
261 | daemon_type, d.daemon_id, | |
262 | d.container_image_name, d.container_image_id, d.version)) | |
263 | ||
264 | if daemon_type == 'mgr' and \ | |
265 | d.daemon_id == self.mgr.get_mgr_id(): | |
266 | logger.info('Upgrade: Need to upgrade myself (mgr.%s)' % | |
f6b5b4d7 | 267 | self.mgr.get_mgr_id()) |
e306af50 TL |
268 | need_upgrade_self = True |
269 | continue | |
270 | ||
271 | # make sure host has latest container image | |
272 | out, err, code = self.mgr._run_cephadm( | |
f6b5b4d7 | 273 | d.hostname, '', 'inspect-image', [], |
e306af50 TL |
274 | image=target_name, no_fsid=True, error_ok=True) |
275 | if code or json.loads(''.join(out)).get('image_id') != target_id: | |
276 | logger.info('Upgrade: Pulling %s on %s' % (target_name, | |
f6b5b4d7 | 277 | d.hostname)) |
e306af50 | 278 | out, err, code = self.mgr._run_cephadm( |
f6b5b4d7 | 279 | d.hostname, '', 'pull', [], |
e306af50 TL |
280 | image=target_name, no_fsid=True, error_ok=True) |
281 | if code: | |
282 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
283 | 'severity': 'warning', | |
284 | 'summary': 'Upgrade: failed to pull target image', | |
285 | 'count': 1, | |
286 | 'detail': [ | |
287 | 'failed to pull %s on host %s' % (target_name, | |
288 | d.hostname)], | |
289 | }) | |
290 | return | |
291 | r = json.loads(''.join(out)) | |
292 | if r.get('image_id') != target_id: | |
f6b5b4d7 TL |
293 | logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % ( |
294 | target_name, d.hostname, r['image_id'], target_id)) | |
295 | self.upgrade_state.target_id = r['image_id'] | |
e306af50 TL |
296 | self._save_upgrade_state() |
297 | return | |
298 | ||
299 | self._update_upgrade_progress(done / len(daemons)) | |
300 | ||
301 | if not d.container_image_id: | |
302 | if d.container_image_name == target_name: | |
f6b5b4d7 TL |
303 | logger.debug( |
304 | 'daemon %s has unknown container_image_id but has correct image name' % (d.name())) | |
e306af50 TL |
305 | continue |
306 | if not self._wait_for_ok_to_stop(d): | |
307 | return | |
308 | logger.info('Upgrade: Redeploying %s.%s' % | |
f6b5b4d7 | 309 | (d.daemon_type, d.daemon_id)) |
e306af50 TL |
310 | self.mgr._daemon_action( |
311 | d.daemon_type, | |
312 | d.daemon_id, | |
313 | d.hostname, | |
f6b5b4d7 TL |
314 | 'redeploy', |
315 | image=target_name | |
e306af50 TL |
316 | ) |
317 | return | |
318 | ||
319 | if need_upgrade_self: | |
320 | mgr_map = self.mgr.get('mgr_map') | |
321 | num = len(mgr_map.get('standbys')) | |
322 | if not num: | |
323 | self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { | |
324 | 'severity': 'warning', | |
325 | 'summary': 'Upgrade: Need standby mgr daemon', | |
326 | 'count': 1, | |
327 | 'detail': [ | |
328 | 'The upgrade process needs to upgrade the mgr, ' | |
329 | 'but it needs at least one standby to proceed.', | |
330 | ], | |
331 | }) | |
332 | return | |
333 | ||
334 | logger.info('Upgrade: there are %d other already-upgraded ' | |
f6b5b4d7 | 335 | 'standby mgrs, failing over' % num) |
e306af50 TL |
336 | |
337 | self._update_upgrade_progress(done / len(daemons)) | |
338 | ||
339 | # fail over | |
340 | ret, out, err = self.mgr.check_mon_command({ | |
341 | 'prefix': 'mgr fail', | |
342 | 'who': self.mgr.get_mgr_id(), | |
343 | }) | |
344 | return | |
345 | elif daemon_type == 'mgr': | |
346 | if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks: | |
347 | del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR'] | |
348 | self.mgr.set_health_checks(self.mgr.health_checks) | |
349 | ||
350 | # make sure 'ceph versions' agrees | |
f6b5b4d7 | 351 | ret, out_ver, err = self.mgr.check_mon_command({ |
e306af50 TL |
352 | 'prefix': 'versions', |
353 | }) | |
f6b5b4d7 | 354 | j = json.loads(out_ver) |
e306af50 TL |
355 | for version, count in j.get(daemon_type, {}).items(): |
356 | if version != target_version: | |
357 | logger.warning( | |
358 | 'Upgrade: %d %s daemon(s) are %s != target %s' % | |
359 | (count, daemon_type, version, target_version)) | |
360 | ||
361 | # push down configs | |
362 | if image_settings.get(daemon_type) != target_name: | |
363 | logger.info('Upgrade: Setting container_image for all %s...' % | |
f6b5b4d7 | 364 | daemon_type) |
e306af50 TL |
365 | ret, out, err = self.mgr.check_mon_command({ |
366 | 'prefix': 'config set', | |
367 | 'name': 'container_image', | |
368 | 'value': target_name, | |
369 | 'who': name_to_config_section(daemon_type), | |
370 | }) | |
371 | to_clean = [] | |
372 | for section in image_settings.keys(): | |
373 | if section.startswith(name_to_config_section(daemon_type) + '.'): | |
374 | to_clean.append(section) | |
375 | if to_clean: | |
376 | logger.debug('Upgrade: Cleaning up container_image for %s...' % | |
f6b5b4d7 | 377 | to_clean) |
e306af50 TL |
378 | for section in to_clean: |
379 | ret, image, err = self.mgr.check_mon_command({ | |
380 | 'prefix': 'config rm', | |
381 | 'name': 'container_image', | |
382 | 'who': section, | |
383 | }) | |
384 | ||
385 | logger.info('Upgrade: All %s daemons are up to date.' % | |
f6b5b4d7 | 386 | daemon_type) |
e306af50 TL |
387 | |
388 | # clean up | |
389 | logger.info('Upgrade: Finalizing container_image settings') | |
390 | ret, out, err = self.mgr.check_mon_command({ | |
391 | 'prefix': 'config set', | |
392 | 'name': 'container_image', | |
393 | 'value': target_name, | |
394 | 'who': 'global', | |
395 | }) | |
396 | for daemon_type in CEPH_UPGRADE_ORDER: | |
397 | ret, image, err = self.mgr.check_mon_command({ | |
398 | 'prefix': 'config rm', | |
399 | 'name': 'container_image', | |
400 | 'who': name_to_config_section(daemon_type), | |
401 | }) | |
402 | ||
403 | logger.info('Upgrade: Complete!') | |
f6b5b4d7 | 404 | if self.upgrade_state.progress_id: |
e306af50 | 405 | self.mgr.remote('progress', 'complete', |
f6b5b4d7 | 406 | self.upgrade_state.progress_id) |
e306af50 TL |
407 | self.upgrade_state = None |
408 | self._save_upgrade_state() | |
409 | return |