]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
import 15.2.9
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
1 import json
2 import logging
3 import time
4 import uuid
5 from typing import TYPE_CHECKING, Optional, Dict, NamedTuple
6
7 import orchestrator
8 from cephadm.utils import name_to_config_section
9 from orchestrator import OrchestratorError, DaemonDescription
10
11 if TYPE_CHECKING:
12 from .module import CephadmOrchestrator
13
14
15 # ceph daemon types that use the ceph container image.
16 # NOTE: listed in upgrade order!
17 CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
18
19 logger = logging.getLogger(__name__)
20
21
22 class UpgradeState:
23 def __init__(self,
24 target_name: str,
25 progress_id: str,
26 target_id: Optional[str] = None,
27 repo_digest: Optional[str] = None,
28 target_version: Optional[str] = None,
29 error: Optional[str] = None,
30 paused: Optional[bool] = None,
31 ):
32 self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
33 self.progress_id: str = progress_id
34 self.target_id: Optional[str] = target_id
35 self.repo_digest: Optional[str] = repo_digest
36 self.target_version: Optional[str] = target_version
37 self.error: Optional[str] = error
38 self.paused: bool = paused or False
39
40 def to_json(self) -> dict:
41 return {
42 'target_name': self._target_name,
43 'progress_id': self.progress_id,
44 'target_id': self.target_id,
45 'repo_digest': self.repo_digest,
46 'target_version': self.target_version,
47 'error': self.error,
48 'paused': self.paused,
49 }
50
51 @classmethod
52 def from_json(cls, data: dict) -> Optional['UpgradeState']:
53 if data:
54 return cls(**data)
55 else:
56 return None
57
58
59 class CephadmUpgrade:
60 UPGRADE_ERRORS = [
61 'UPGRADE_NO_STANDBY_MGR',
62 'UPGRADE_FAILED_PULL',
63 'UPGRADE_REDEPLOY_DAEMON',
64 ]
65
66 def __init__(self, mgr: "CephadmOrchestrator"):
67 self.mgr = mgr
68
69 t = self.mgr.get_store('upgrade_state')
70 if t:
71 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
72 else:
73 self.upgrade_state = None
74
75 @property
76 def target_image(self) -> str:
77 assert self.upgrade_state
78 if not self.mgr.use_repo_digest:
79 return self.upgrade_state._target_name
80 if not self.upgrade_state.repo_digest:
81 return self.upgrade_state._target_name
82
83 return self.upgrade_state.repo_digest
84
85 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
86 r = orchestrator.UpgradeStatusSpec()
87 if self.upgrade_state:
88 r.target_image = self.target_image
89 r.in_progress = True
90 if self.upgrade_state.error:
91 r.message = 'Error: ' + self.upgrade_state.error
92 elif self.upgrade_state.paused:
93 r.message = 'Upgrade paused'
94 return r
95
96 def upgrade_start(self, image: str, version: str) -> str:
97 if self.mgr.mode != 'root':
98 raise OrchestratorError('upgrade is not supported in %s mode' % (
99 self.mgr.mode))
100 if version:
101 try:
102 (major, minor, patch) = version.split('.')
103 assert int(minor) >= 0
104 assert int(patch) >= 0
105 except:
106 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
107 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
108 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
109 target_name = self.mgr.container_image_base + ':v' + version
110 elif image:
111 target_name = image
112 else:
113 raise OrchestratorError('must specify either image or version')
114 if self.upgrade_state:
115 if self.upgrade_state._target_name != target_name:
116 raise OrchestratorError(
117 'Upgrade to %s (not %s) already in progress' %
118 (self.upgrade_state._target_name, target_name))
119 if self.upgrade_state.paused:
120 self.upgrade_state.paused = False
121 self._save_upgrade_state()
122 return 'Resumed upgrade to %s' % self.target_image
123 return 'Upgrade to %s in progress' % self.target_image
124 self.upgrade_state = UpgradeState(
125 target_name=target_name,
126 progress_id=str(uuid.uuid4())
127 )
128 self._update_upgrade_progress(0.0)
129 self._save_upgrade_state()
130 self._clear_upgrade_health_checks()
131 self.mgr.event.set()
132 return 'Initiating upgrade to %s' % (target_name)
133
134 def upgrade_pause(self) -> str:
135 if not self.upgrade_state:
136 raise OrchestratorError('No upgrade in progress')
137 if self.upgrade_state.paused:
138 return 'Upgrade to %s already paused' % self.target_image
139 self.upgrade_state.paused = True
140 self._save_upgrade_state()
141 return 'Paused upgrade to %s' % self.target_image
142
143 def upgrade_resume(self) -> str:
144 if not self.upgrade_state:
145 raise OrchestratorError('No upgrade in progress')
146 if not self.upgrade_state.paused:
147 return 'Upgrade to %s not paused' % self.target_image
148 self.upgrade_state.paused = False
149 self._save_upgrade_state()
150 self.mgr.event.set()
151 return 'Resumed upgrade to %s' % self.target_image
152
153 def upgrade_stop(self) -> str:
154 if not self.upgrade_state:
155 return 'No upgrade in progress'
156 if self.upgrade_state.progress_id:
157 self.mgr.remote('progress', 'complete',
158 self.upgrade_state.progress_id)
159 target_image = self.target_image
160 self.upgrade_state = None
161 self._save_upgrade_state()
162 self._clear_upgrade_health_checks()
163 self.mgr.event.set()
164 return 'Stopped upgrade to %s' % target_image
165
166 def continue_upgrade(self) -> bool:
167 """
168 Returns false, if nothing was done.
169 :return:
170 """
171 if self.upgrade_state and not self.upgrade_state.paused:
172 self._do_upgrade()
173 return True
174 return False
175
176 def _wait_for_ok_to_stop(self, s: DaemonDescription) -> bool:
177 # only wait a little bit; the service might go away for something
178 tries = 4
179 while tries > 0:
180 if not self.upgrade_state or self.upgrade_state.paused:
181 return False
182
183 r = self.mgr.cephadm_services[s.daemon_type].ok_to_stop([s.daemon_id])
184
185 if not r.retval:
186 logger.info(f'Upgrade: {r.stdout}')
187 return True
188 logger.error(f'Upgrade: {r.stderr}')
189
190 time.sleep(15)
191 tries -= 1
192 return False
193
194 def _clear_upgrade_health_checks(self) -> None:
195 for k in self.UPGRADE_ERRORS:
196 if k in self.mgr.health_checks:
197 del self.mgr.health_checks[k]
198 self.mgr.set_health_checks(self.mgr.health_checks)
199
200 def _fail_upgrade(self, alert_id: str, alert: dict) -> None:
201 assert alert_id in self.UPGRADE_ERRORS
202 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
203 alert['summary']))
204 if not self.upgrade_state:
205 assert False, 'No upgrade in progress'
206
207 self.upgrade_state.error = alert_id + ': ' + alert['summary']
208 self.upgrade_state.paused = True
209 self._save_upgrade_state()
210 self.mgr.health_checks[alert_id] = alert
211 self.mgr.set_health_checks(self.mgr.health_checks)
212
213 def _update_upgrade_progress(self, progress: float) -> None:
214 if not self.upgrade_state:
215 assert False, 'No upgrade in progress'
216
217 if not self.upgrade_state.progress_id:
218 self.upgrade_state.progress_id = str(uuid.uuid4())
219 self._save_upgrade_state()
220 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
221 ev_msg='Upgrade to %s' % self.target_image,
222 ev_progress=progress)
223
224 def _save_upgrade_state(self) -> None:
225 if not self.upgrade_state:
226 self.mgr.set_store('upgrade_state', None)
227 return
228 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
229
230 def get_distinct_container_image_settings(self) -> Dict[str, str]:
231 # get all distinct container_image settings
232 image_settings = {}
233 ret, out, err = self.mgr.check_mon_command({
234 'prefix': 'config dump',
235 'format': 'json',
236 })
237 config = json.loads(out)
238 for opt in config:
239 if opt['name'] == 'container_image':
240 image_settings[opt['section']] = opt['value']
241 return image_settings
242
243 def _do_upgrade(self):
244 # type: () -> None
245 if not self.upgrade_state:
246 logger.debug('_do_upgrade no state, exiting')
247 return
248
249 target_image = self.target_image
250 target_id = self.upgrade_state.target_id
251 if not target_id or (self.mgr.use_repo_digest and not self.upgrade_state.repo_digest):
252 # need to learn the container hash
253 logger.info('Upgrade: First pull of %s' % target_image)
254 try:
255 target_id, target_version, repo_digest = self.mgr._get_container_image_info(
256 target_image)
257 except OrchestratorError as e:
258 self._fail_upgrade('UPGRADE_FAILED_PULL', {
259 'severity': 'warning',
260 'summary': 'Upgrade: failed to pull target image',
261 'count': 1,
262 'detail': [str(e)],
263 })
264 return
265 self.upgrade_state.target_id = target_id
266 self.upgrade_state.target_version = target_version
267 self.upgrade_state.repo_digest = repo_digest
268 self._save_upgrade_state()
269 target_image = self.target_image
270 target_version = self.upgrade_state.target_version
271 logger.info('Upgrade: Target is %s with id %s' % (target_image,
272 target_id))
273
274 image_settings = self.get_distinct_container_image_settings()
275
276 daemons = self.mgr.cache.get_daemons()
277 done = 0
278 for daemon_type in CEPH_UPGRADE_ORDER:
279 logger.info('Upgrade: Checking %s daemons...' % daemon_type)
280 need_upgrade_self = False
281 for d in daemons:
282 if d.daemon_type != daemon_type:
283 continue
284 if d.container_image_id == target_id:
285 logger.debug('daemon %s.%s version correct' % (
286 daemon_type, d.daemon_id))
287 done += 1
288 continue
289 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
290 daemon_type, d.daemon_id,
291 d.container_image_name, d.container_image_id, d.version))
292
293 if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
294 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
295 self.mgr.get_mgr_id())
296 need_upgrade_self = True
297 continue
298
299 # make sure host has latest container image
300 out, err, code = self.mgr._run_cephadm(
301 d.hostname, '', 'inspect-image', [],
302 image=target_image, no_fsid=True, error_ok=True)
303 if code or json.loads(''.join(out)).get('image_id') != target_id:
304 logger.info('Upgrade: Pulling %s on %s' % (target_image,
305 d.hostname))
306 out, err, code = self.mgr._run_cephadm(
307 d.hostname, '', 'pull', [],
308 image=target_image, no_fsid=True, error_ok=True)
309 if code:
310 self._fail_upgrade('UPGRADE_FAILED_PULL', {
311 'severity': 'warning',
312 'summary': 'Upgrade: failed to pull target image',
313 'count': 1,
314 'detail': [
315 'failed to pull %s on host %s' % (target_image,
316 d.hostname)],
317 })
318 return
319 r = json.loads(''.join(out))
320 if r.get('image_id') != target_id:
321 logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
322 target_image, d.hostname, r['image_id'], target_id))
323 self.upgrade_state.target_id = r['image_id']
324 self._save_upgrade_state()
325 return
326
327 self._update_upgrade_progress(done / len(daemons))
328
329 if not d.container_image_id:
330 if d.container_image_name == target_image:
331 logger.debug(
332 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
333 continue
334 if not self._wait_for_ok_to_stop(d):
335 return
336 logger.info('Upgrade: Redeploying %s.%s' %
337 (d.daemon_type, d.daemon_id))
338 try:
339 self.mgr._daemon_action(
340 d.daemon_type,
341 d.daemon_id,
342 d.hostname,
343 'redeploy',
344 image=target_image
345 )
346 except Exception as e:
347 self._fail_upgrade('UPGRADE_REDEPLOY_DAEMON', {
348 'severity': 'warning',
349 'summary': f'Upgrading daemon {d.name()} on host {d.hostname} failed.',
350 'count': 1,
351 'detail': [
352 f'Upgrade daemon: {d.name()}: {e}'
353 ],
354 })
355 return
356
357 if need_upgrade_self:
358 try:
359 self.mgr.mgr_service.fail_over()
360 except OrchestratorError as e:
361 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
362 'severity': 'warning',
363 'summary': f'Upgrade: {e}',
364 'count': 1,
365 'detail': [
366 'The upgrade process needs to upgrade the mgr, '
367 'but it needs at least one standby to proceed.',
368 ],
369 })
370 return
371
372 return # unreachable code, as fail_over never returns
373 elif daemon_type == 'mgr':
374 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
375 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
376 self.mgr.set_health_checks(self.mgr.health_checks)
377
378 # make sure 'ceph versions' agrees
379 ret, out_ver, err = self.mgr.check_mon_command({
380 'prefix': 'versions',
381 })
382 j = json.loads(out_ver)
383 for version, count in j.get(daemon_type, {}).items():
384 if version != target_version:
385 logger.warning(
386 'Upgrade: %d %s daemon(s) are %s != target %s' %
387 (count, daemon_type, version, target_version))
388
389 # push down configs
390 if image_settings.get(daemon_type) != target_image:
391 logger.info('Upgrade: Setting container_image for all %s...' %
392 daemon_type)
393 self.mgr.set_container_image(name_to_config_section(daemon_type), target_image)
394 to_clean = []
395 for section in image_settings.keys():
396 if section.startswith(name_to_config_section(daemon_type) + '.'):
397 to_clean.append(section)
398 if to_clean:
399 logger.debug('Upgrade: Cleaning up container_image for %s...' %
400 to_clean)
401 for section in to_clean:
402 ret, image, err = self.mgr.check_mon_command({
403 'prefix': 'config rm',
404 'name': 'container_image',
405 'who': section,
406 })
407
408 logger.info('Upgrade: All %s daemons are up to date.' %
409 daemon_type)
410
411 # clean up
412 logger.info('Upgrade: Finalizing container_image settings')
413 self.mgr.set_container_image('global', target_image)
414
415 for daemon_type in CEPH_UPGRADE_ORDER:
416 ret, image, err = self.mgr.check_mon_command({
417 'prefix': 'config rm',
418 'name': 'container_image',
419 'who': name_to_config_section(daemon_type),
420 })
421
422 logger.info('Upgrade: Complete!')
423 if self.upgrade_state.progress_id:
424 self.mgr.remote('progress', 'complete',
425 self.upgrade_state.progress_id)
426 self.upgrade_state = None
427 self._save_upgrade_state()
428 return