]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
1 import json
2 import logging
3 import time
4 import uuid
5 from typing import TYPE_CHECKING, Optional, Dict, NamedTuple
6
7 import orchestrator
8 from cephadm.utils import name_to_config_section
9 from orchestrator import OrchestratorError, DaemonDescription
10
11 if TYPE_CHECKING:
12 from .module import CephadmOrchestrator
13
14
15 # ceph daemon types that use the ceph container image.
16 # NOTE: listed in upgrade order!
17 CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
18
19 logger = logging.getLogger(__name__)
20
21
22 class UpgradeState:
23 def __init__(self,
24 target_name: str,
25 progress_id: str,
26 target_id: Optional[str] = None,
27 repo_digest: Optional[str] = None,
28 target_version: Optional[str] = None,
29 error: Optional[str] = None,
30 paused: Optional[bool] = None,
31 ):
32 self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
33 self.progress_id: str = progress_id
34 self.target_id: Optional[str] = target_id
35 self.repo_digest: Optional[str] = repo_digest
36 self.target_version: Optional[str] = target_version
37 self.error: Optional[str] = error
38 self.paused: bool = paused or False
39
40 def to_json(self) -> dict:
41 return {
42 'target_name': self._target_name,
43 'progress_id': self.progress_id,
44 'target_id': self.target_id,
45 'repo_digest': self.repo_digest,
46 'target_version': self.target_version,
47 'error': self.error,
48 'paused': self.paused,
49 }
50
51 @classmethod
52 def from_json(cls, data) -> Optional['UpgradeState']:
53 if data:
54 return cls(**data)
55 else:
56 return None
57
58
59 class CephadmUpgrade:
60 def __init__(self, mgr: "CephadmOrchestrator"):
61 self.mgr = mgr
62
63 t = self.mgr.get_store('upgrade_state')
64 if t:
65 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
66 else:
67 self.upgrade_state = None
68
69 @property
70 def target_image(self) -> str:
71 assert self.upgrade_state
72 if not self.mgr.use_repo_digest:
73 return self.upgrade_state._target_name
74 if not self.upgrade_state.repo_digest:
75 return self.upgrade_state._target_name
76
77 return self.upgrade_state.repo_digest
78
79 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
80 r = orchestrator.UpgradeStatusSpec()
81 if self.upgrade_state:
82 r.target_image = self.target_image
83 r.in_progress = True
84 if self.upgrade_state.error:
85 r.message = 'Error: ' + self.upgrade_state.error
86 elif self.upgrade_state.paused:
87 r.message = 'Upgrade paused'
88 return r
89
90 def upgrade_start(self, image, version) -> str:
91 if self.mgr.mode != 'root':
92 raise OrchestratorError('upgrade is not supported in %s mode' % (
93 self.mgr.mode))
94 if version:
95 try:
96 (major, minor, patch) = version.split('.')
97 assert int(minor) >= 0
98 assert int(patch) >= 0
99 except:
100 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
101 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
102 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
103 target_name = self.mgr.container_image_base + ':v' + version
104 elif image:
105 target_name = image
106 else:
107 raise OrchestratorError('must specify either image or version')
108 if self.upgrade_state:
109 if self.upgrade_state._target_name != target_name:
110 raise OrchestratorError(
111 'Upgrade to %s (not %s) already in progress' %
112 (self.upgrade_state._target_name, target_name))
113 if self.upgrade_state.paused:
114 self.upgrade_state.paused = False
115 self._save_upgrade_state()
116 return 'Resumed upgrade to %s' % self.target_image
117 return 'Upgrade to %s in progress' % self.target_image
118 self.upgrade_state = UpgradeState(
119 target_name=target_name,
120 progress_id=str(uuid.uuid4())
121 )
122 self._update_upgrade_progress(0.0)
123 self._save_upgrade_state()
124 self._clear_upgrade_health_checks()
125 self.mgr.event.set()
126 return 'Initiating upgrade to %s' % (target_name)
127
128 def upgrade_pause(self) -> str:
129 if not self.upgrade_state:
130 raise OrchestratorError('No upgrade in progress')
131 if self.upgrade_state.paused:
132 return 'Upgrade to %s already paused' % self.target_image
133 self.upgrade_state.paused = True
134 self._save_upgrade_state()
135 return 'Paused upgrade to %s' % self.target_image
136
137 def upgrade_resume(self) -> str:
138 if not self.upgrade_state:
139 raise OrchestratorError('No upgrade in progress')
140 if not self.upgrade_state.paused:
141 return 'Upgrade to %s not paused' % self.target_image
142 self.upgrade_state.paused = False
143 self._save_upgrade_state()
144 self.mgr.event.set()
145 return 'Resumed upgrade to %s' % self.target_image
146
147 def upgrade_stop(self) -> str:
148 if not self.upgrade_state:
149 return 'No upgrade in progress'
150 if self.upgrade_state.progress_id:
151 self.mgr.remote('progress', 'complete',
152 self.upgrade_state.progress_id)
153 target_image = self.target_image
154 self.upgrade_state = None
155 self._save_upgrade_state()
156 self._clear_upgrade_health_checks()
157 self.mgr.event.set()
158 return 'Stopped upgrade to %s' % target_image
159
160 def continue_upgrade(self) -> bool:
161 """
162 Returns false, if nothing was done.
163 :return:
164 """
165 if self.upgrade_state and not self.upgrade_state.paused:
166 self._do_upgrade()
167 return True
168 return False
169
170 def _wait_for_ok_to_stop(self, s: DaemonDescription) -> bool:
171 # only wait a little bit; the service might go away for something
172 tries = 4
173 while tries > 0:
174 if not self.upgrade_state or self.upgrade_state.paused:
175 return False
176
177 r = self.mgr.cephadm_services[s.daemon_type].ok_to_stop([s.daemon_id])
178
179 if not r.retval:
180 logger.info(f'Upgrade: {r.stdout}')
181 return True
182 logger.error(f'Upgrade: {r.stderr}')
183
184 time.sleep(15)
185 tries -= 1
186 return False
187
188 def _clear_upgrade_health_checks(self) -> None:
189 for k in ['UPGRADE_NO_STANDBY_MGR',
190 'UPGRADE_FAILED_PULL']:
191 if k in self.mgr.health_checks:
192 del self.mgr.health_checks[k]
193 self.mgr.set_health_checks(self.mgr.health_checks)
194
195 def _fail_upgrade(self, alert_id, alert) -> None:
196 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
197 alert['summary']))
198 if not self.upgrade_state:
199 assert False, 'No upgrade in progress'
200
201 self.upgrade_state.error = alert_id + ': ' + alert['summary']
202 self.upgrade_state.paused = True
203 self._save_upgrade_state()
204 self.mgr.health_checks[alert_id] = alert
205 self.mgr.set_health_checks(self.mgr.health_checks)
206
207 def _update_upgrade_progress(self, progress) -> None:
208 if not self.upgrade_state:
209 assert False, 'No upgrade in progress'
210
211 if not self.upgrade_state.progress_id:
212 self.upgrade_state.progress_id = str(uuid.uuid4())
213 self._save_upgrade_state()
214 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
215 ev_msg='Upgrade to %s' % self.target_image,
216 ev_progress=progress)
217
218 def _save_upgrade_state(self) -> None:
219 if not self.upgrade_state:
220 self.mgr.set_store('upgrade_state', None)
221 return
222 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
223
224 def get_distinct_container_image_settings(self) -> Dict[str, str]:
225 # get all distinct container_image settings
226 image_settings = {}
227 ret, out, err = self.mgr.check_mon_command({
228 'prefix': 'config dump',
229 'format': 'json',
230 })
231 config = json.loads(out)
232 for opt in config:
233 if opt['name'] == 'container_image':
234 image_settings[opt['section']] = opt['value']
235 return image_settings
236
237 def _do_upgrade(self):
238 # type: () -> None
239 if not self.upgrade_state:
240 logger.debug('_do_upgrade no state, exiting')
241 return
242
243 target_image = self.target_image
244 target_id = self.upgrade_state.target_id
245 if not target_id or (self.mgr.use_repo_digest and not self.upgrade_state.repo_digest):
246 # need to learn the container hash
247 logger.info('Upgrade: First pull of %s' % target_image)
248 try:
249 target_id, target_version, repo_digest = self.mgr._get_container_image_info(
250 target_image)
251 except OrchestratorError as e:
252 self._fail_upgrade('UPGRADE_FAILED_PULL', {
253 'severity': 'warning',
254 'summary': 'Upgrade: failed to pull target image',
255 'count': 1,
256 'detail': [str(e)],
257 })
258 return
259 self.upgrade_state.target_id = target_id
260 self.upgrade_state.target_version = target_version
261 self.upgrade_state.repo_digest = repo_digest
262 self._save_upgrade_state()
263 target_image = self.target_image
264 target_version = self.upgrade_state.target_version
265 logger.info('Upgrade: Target is %s with id %s' % (target_image,
266 target_id))
267
268 image_settings = self.get_distinct_container_image_settings()
269
270 daemons = self.mgr.cache.get_daemons()
271 done = 0
272 for daemon_type in CEPH_UPGRADE_ORDER:
273 logger.info('Upgrade: Checking %s daemons...' % daemon_type)
274 need_upgrade_self = False
275 for d in daemons:
276 if d.daemon_type != daemon_type:
277 continue
278 if d.container_image_id == target_id:
279 logger.debug('daemon %s.%s version correct' % (
280 daemon_type, d.daemon_id))
281 done += 1
282 continue
283 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
284 daemon_type, d.daemon_id,
285 d.container_image_name, d.container_image_id, d.version))
286
287 if self.mgr.daemon_is_self(d.daemon_type, d.daemon_id):
288 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
289 self.mgr.get_mgr_id())
290 need_upgrade_self = True
291 continue
292
293 # make sure host has latest container image
294 out, err, code = self.mgr._run_cephadm(
295 d.hostname, '', 'inspect-image', [],
296 image=target_image, no_fsid=True, error_ok=True)
297 if code or json.loads(''.join(out)).get('image_id') != target_id:
298 logger.info('Upgrade: Pulling %s on %s' % (target_image,
299 d.hostname))
300 out, err, code = self.mgr._run_cephadm(
301 d.hostname, '', 'pull', [],
302 image=target_image, no_fsid=True, error_ok=True)
303 if code:
304 self._fail_upgrade('UPGRADE_FAILED_PULL', {
305 'severity': 'warning',
306 'summary': 'Upgrade: failed to pull target image',
307 'count': 1,
308 'detail': [
309 'failed to pull %s on host %s' % (target_image,
310 d.hostname)],
311 })
312 return
313 r = json.loads(''.join(out))
314 if r.get('image_id') != target_id:
315 logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
316 target_image, d.hostname, r['image_id'], target_id))
317 self.upgrade_state.target_id = r['image_id']
318 self._save_upgrade_state()
319 return
320
321 self._update_upgrade_progress(done / len(daemons))
322
323 if not d.container_image_id:
324 if d.container_image_name == target_image:
325 logger.debug(
326 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
327 continue
328 if not self._wait_for_ok_to_stop(d):
329 return
330 logger.info('Upgrade: Redeploying %s.%s' %
331 (d.daemon_type, d.daemon_id))
332 self.mgr._daemon_action(
333 d.daemon_type,
334 d.daemon_id,
335 d.hostname,
336 'redeploy',
337 image=target_image
338 )
339 return
340
341 if need_upgrade_self:
342 try:
343 self.mgr.mgr_service.fail_over()
344 except OrchestratorError as e:
345 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
346 'severity': 'warning',
347 'summary': f'Upgrade: {e}',
348 'count': 1,
349 'detail': [
350 'The upgrade process needs to upgrade the mgr, '
351 'but it needs at least one standby to proceed.',
352 ],
353 })
354 return
355
356 return # unreachable code, as fail_over never returns
357 elif daemon_type == 'mgr':
358 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
359 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
360 self.mgr.set_health_checks(self.mgr.health_checks)
361
362 # make sure 'ceph versions' agrees
363 ret, out_ver, err = self.mgr.check_mon_command({
364 'prefix': 'versions',
365 })
366 j = json.loads(out_ver)
367 for version, count in j.get(daemon_type, {}).items():
368 if version != target_version:
369 logger.warning(
370 'Upgrade: %d %s daemon(s) are %s != target %s' %
371 (count, daemon_type, version, target_version))
372
373 # push down configs
374 if image_settings.get(daemon_type) != target_image:
375 logger.info('Upgrade: Setting container_image for all %s...' %
376 daemon_type)
377 self.mgr.set_container_image(name_to_config_section(daemon_type), target_image)
378 to_clean = []
379 for section in image_settings.keys():
380 if section.startswith(name_to_config_section(daemon_type) + '.'):
381 to_clean.append(section)
382 if to_clean:
383 logger.debug('Upgrade: Cleaning up container_image for %s...' %
384 to_clean)
385 for section in to_clean:
386 ret, image, err = self.mgr.check_mon_command({
387 'prefix': 'config rm',
388 'name': 'container_image',
389 'who': section,
390 })
391
392 logger.info('Upgrade: All %s daemons are up to date.' %
393 daemon_type)
394
395 # clean up
396 logger.info('Upgrade: Finalizing container_image settings')
397 self.mgr.set_container_image('global', target_image)
398
399 for daemon_type in CEPH_UPGRADE_ORDER:
400 ret, image, err = self.mgr.check_mon_command({
401 'prefix': 'config rm',
402 'name': 'container_image',
403 'who': name_to_config_section(daemon_type),
404 })
405
406 logger.info('Upgrade: Complete!')
407 if self.upgrade_state.progress_id:
408 self.mgr.remote('progress', 'complete',
409 self.upgrade_state.progress_id)
410 self.upgrade_state = None
411 self._save_upgrade_state()
412 return