]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/upgrade.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
CommitLineData
e306af50
TL
1import json
2import logging
3import time
4import uuid
5from typing import TYPE_CHECKING, Optional
6
7import orchestrator
8from cephadm.utils import name_to_config_section
f6b5b4d7 9from orchestrator import OrchestratorError, DaemonDescription
e306af50
TL
10
11if TYPE_CHECKING:
12 from .module import CephadmOrchestrator
13
14
15# ceph daemon types that use the ceph container image.
16# NOTE: listed in upgrade order!
17CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
18
19logger = logging.getLogger(__name__)
20
f6b5b4d7
TL
21
22class UpgradeState:
23 def __init__(self,
24 target_name: str,
25 progress_id: str,
26 target_id: Optional[str] = None,
27 target_version: Optional[str] = None,
28 error: Optional[str] = None,
29 paused: Optional[bool] = None,
30 ):
31 self.target_name: str = target_name
32 self.progress_id: str = progress_id
33 self.target_id: Optional[str] = target_id
34 self.target_version: Optional[str] = target_version
35 self.error: Optional[str] = error
36 self.paused: bool = paused or False
37
38 def to_json(self) -> dict:
39 return {
40 'target_name': self.target_name,
41 'progress_id': self.progress_id,
42 'target_id': self.target_id,
43 'target_version': self.target_version,
44 'error': self.error,
45 'paused': self.paused,
46 }
47
48 @classmethod
49 def from_json(cls, data) -> 'UpgradeState':
50 return cls(**data)
51
52
e306af50
TL
53class CephadmUpgrade:
54 def __init__(self, mgr: "CephadmOrchestrator"):
55 self.mgr = mgr
56
57 t = self.mgr.get_store('upgrade_state')
58 if t:
f6b5b4d7 59 self.upgrade_state: Optional[UpgradeState] = UpgradeState.from_json(json.loads(t))
e306af50
TL
60 else:
61 self.upgrade_state = None
62
63 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
64 r = orchestrator.UpgradeStatusSpec()
65 if self.upgrade_state:
f6b5b4d7 66 r.target_image = self.upgrade_state.target_name
e306af50 67 r.in_progress = True
f6b5b4d7
TL
68 if self.upgrade_state.error:
69 r.message = 'Error: ' + self.upgrade_state.error
70 elif self.upgrade_state.paused:
e306af50
TL
71 r.message = 'Upgrade paused'
72 return r
73
74 def upgrade_start(self, image, version) -> str:
75 if self.mgr.mode != 'root':
76 raise OrchestratorError('upgrade is not supported in %s mode' % (
77 self.mgr.mode))
78 if version:
79 try:
80 (major, minor, patch) = version.split('.')
81 assert int(minor) >= 0
82 assert int(patch) >= 0
83 except:
84 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
85 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
86 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
87 target_name = self.mgr.container_image_base + ':v' + version
88 elif image:
89 target_name = image
90 else:
91 raise OrchestratorError('must specify either image or version')
92 if self.upgrade_state:
f6b5b4d7 93 if self.upgrade_state.target_name != target_name:
e306af50
TL
94 raise OrchestratorError(
95 'Upgrade to %s (not %s) already in progress' %
f6b5b4d7
TL
96 (self.upgrade_state.target_name, target_name))
97 if self.upgrade_state.paused:
98 self.upgrade_state.paused = False
e306af50 99 self._save_upgrade_state()
f6b5b4d7
TL
100 return 'Resumed upgrade to %s' % self.upgrade_state.target_name
101 return 'Upgrade to %s in progress' % self.upgrade_state.target_name
102 self.upgrade_state = UpgradeState(
103 target_name=target_name,
104 progress_id=str(uuid.uuid4())
105 )
e306af50
TL
106 self._update_upgrade_progress(0.0)
107 self._save_upgrade_state()
108 self._clear_upgrade_health_checks()
109 self.mgr.event.set()
110 return 'Initiating upgrade to %s' % (target_name)
111
112 def upgrade_pause(self) -> str:
113 if not self.upgrade_state:
114 raise OrchestratorError('No upgrade in progress')
f6b5b4d7
TL
115 if self.upgrade_state.paused:
116 return 'Upgrade to %s already paused' % self.upgrade_state.target_name
117 self.upgrade_state.paused = True
e306af50 118 self._save_upgrade_state()
f6b5b4d7 119 return 'Paused upgrade to %s' % self.upgrade_state.target_name
e306af50
TL
120
121 def upgrade_resume(self) -> str:
122 if not self.upgrade_state:
123 raise OrchestratorError('No upgrade in progress')
f6b5b4d7
TL
124 if not self.upgrade_state.paused:
125 return 'Upgrade to %s not paused' % self.upgrade_state.target_name
126 self.upgrade_state.paused = False
e306af50
TL
127 self._save_upgrade_state()
128 self.mgr.event.set()
f6b5b4d7 129 return 'Resumed upgrade to %s' % self.upgrade_state.target_name
e306af50
TL
130
131 def upgrade_stop(self) -> str:
132 if not self.upgrade_state:
133 return 'No upgrade in progress'
f6b5b4d7
TL
134 target_name = self.upgrade_state.target_name
135 if self.upgrade_state.progress_id:
e306af50 136 self.mgr.remote('progress', 'complete',
f6b5b4d7 137 self.upgrade_state.progress_id)
e306af50
TL
138 self.upgrade_state = None
139 self._save_upgrade_state()
140 self._clear_upgrade_health_checks()
141 self.mgr.event.set()
142 return 'Stopped upgrade to %s' % target_name
143
144 def continue_upgrade(self) -> bool:
145 """
146 Returns false, if nothing was done.
147 :return:
148 """
f6b5b4d7 149 if self.upgrade_state and not self.upgrade_state.paused:
e306af50
TL
150 self._do_upgrade()
151 return True
152 return False
153
f6b5b4d7 154 def _wait_for_ok_to_stop(self, s: DaemonDescription) -> bool:
e306af50
TL
155 # only wait a little bit; the service might go away for something
156 tries = 4
157 while tries > 0:
f6b5b4d7 158 if not self.upgrade_state or self.upgrade_state.paused:
e306af50 159 return False
f6b5b4d7
TL
160
161 r = self.mgr.cephadm_services[s.daemon_type].ok_to_stop([s.daemon_id])
162
163 if not r.retval:
164 logger.info(f'Upgrade: {r.stdout}')
e306af50 165 return True
f6b5b4d7
TL
166 logger.error('Upgrade: {r.stderr}')
167
168 time.sleep(15)
169 tries -= 1
e306af50
TL
170 return False
171
172 def _clear_upgrade_health_checks(self) -> None:
173 for k in ['UPGRADE_NO_STANDBY_MGR',
174 'UPGRADE_FAILED_PULL']:
175 if k in self.mgr.health_checks:
176 del self.mgr.health_checks[k]
177 self.mgr.set_health_checks(self.mgr.health_checks)
178
179 def _fail_upgrade(self, alert_id, alert) -> None:
180 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
f6b5b4d7
TL
181 alert['summary']))
182 if not self.upgrade_state:
183 assert False, 'No upgrade in progress'
184
185 self.upgrade_state.error = alert_id + ': ' + alert['summary']
186 self.upgrade_state.paused = True
e306af50
TL
187 self._save_upgrade_state()
188 self.mgr.health_checks[alert_id] = alert
189 self.mgr.set_health_checks(self.mgr.health_checks)
190
191 def _update_upgrade_progress(self, progress) -> None:
f6b5b4d7
TL
192 if not self.upgrade_state:
193 assert False, 'No upgrade in progress'
194
195 if not self.upgrade_state.progress_id:
196 self.upgrade_state.progress_id = str(uuid.uuid4())
e306af50 197 self._save_upgrade_state()
f6b5b4d7
TL
198 self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
199 ev_msg='Upgrade to %s' % self.upgrade_state.target_name,
e306af50
TL
200 ev_progress=progress)
201
202 def _save_upgrade_state(self) -> None:
f6b5b4d7
TL
203 if not self.upgrade_state:
204 self.mgr.set_store('upgrade_state', None)
205 return
206 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
e306af50
TL
207
208 def _do_upgrade(self):
209 # type: () -> None
210 if not self.upgrade_state:
211 logger.debug('_do_upgrade no state, exiting')
212 return
213
f6b5b4d7
TL
214 target_name = self.upgrade_state.target_name
215 target_id = self.upgrade_state.target_id
e306af50
TL
216 if not target_id:
217 # need to learn the container hash
218 logger.info('Upgrade: First pull of %s' % target_name)
219 try:
220 target_id, target_version = self.mgr._get_container_image_id(target_name)
221 except OrchestratorError as e:
222 self._fail_upgrade('UPGRADE_FAILED_PULL', {
223 'severity': 'warning',
224 'summary': 'Upgrade: failed to pull target image',
225 'count': 1,
226 'detail': [str(e)],
227 })
228 return
f6b5b4d7
TL
229 self.upgrade_state.target_id = target_id
230 self.upgrade_state.target_version = target_version
e306af50 231 self._save_upgrade_state()
f6b5b4d7 232 target_version = self.upgrade_state.target_version
e306af50 233 logger.info('Upgrade: Target is %s with id %s' % (target_name,
f6b5b4d7 234 target_id))
e306af50
TL
235
236 # get all distinct container_image settings
237 image_settings = {}
238 ret, out, err = self.mgr.check_mon_command({
239 'prefix': 'config dump',
240 'format': 'json',
241 })
242 config = json.loads(out)
243 for opt in config:
244 if opt['name'] == 'container_image':
245 image_settings[opt['section']] = opt['value']
246
247 daemons = self.mgr.cache.get_daemons()
248 done = 0
249 for daemon_type in CEPH_UPGRADE_ORDER:
250 logger.info('Upgrade: Checking %s daemons...' % daemon_type)
251 need_upgrade_self = False
252 for d in daemons:
253 if d.daemon_type != daemon_type:
254 continue
255 if d.container_image_id == target_id:
256 logger.debug('daemon %s.%s version correct' % (
257 daemon_type, d.daemon_id))
258 done += 1
259 continue
260 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
261 daemon_type, d.daemon_id,
262 d.container_image_name, d.container_image_id, d.version))
263
264 if daemon_type == 'mgr' and \
265 d.daemon_id == self.mgr.get_mgr_id():
266 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
f6b5b4d7 267 self.mgr.get_mgr_id())
e306af50
TL
268 need_upgrade_self = True
269 continue
270
271 # make sure host has latest container image
272 out, err, code = self.mgr._run_cephadm(
f6b5b4d7 273 d.hostname, '', 'inspect-image', [],
e306af50
TL
274 image=target_name, no_fsid=True, error_ok=True)
275 if code or json.loads(''.join(out)).get('image_id') != target_id:
276 logger.info('Upgrade: Pulling %s on %s' % (target_name,
f6b5b4d7 277 d.hostname))
e306af50 278 out, err, code = self.mgr._run_cephadm(
f6b5b4d7 279 d.hostname, '', 'pull', [],
e306af50
TL
280 image=target_name, no_fsid=True, error_ok=True)
281 if code:
282 self._fail_upgrade('UPGRADE_FAILED_PULL', {
283 'severity': 'warning',
284 'summary': 'Upgrade: failed to pull target image',
285 'count': 1,
286 'detail': [
287 'failed to pull %s on host %s' % (target_name,
288 d.hostname)],
289 })
290 return
291 r = json.loads(''.join(out))
292 if r.get('image_id') != target_id:
f6b5b4d7
TL
293 logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
294 target_name, d.hostname, r['image_id'], target_id))
295 self.upgrade_state.target_id = r['image_id']
e306af50
TL
296 self._save_upgrade_state()
297 return
298
299 self._update_upgrade_progress(done / len(daemons))
300
301 if not d.container_image_id:
302 if d.container_image_name == target_name:
f6b5b4d7
TL
303 logger.debug(
304 'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
e306af50
TL
305 continue
306 if not self._wait_for_ok_to_stop(d):
307 return
308 logger.info('Upgrade: Redeploying %s.%s' %
f6b5b4d7 309 (d.daemon_type, d.daemon_id))
e306af50
TL
310 self.mgr._daemon_action(
311 d.daemon_type,
312 d.daemon_id,
313 d.hostname,
f6b5b4d7
TL
314 'redeploy',
315 image=target_name
e306af50
TL
316 )
317 return
318
319 if need_upgrade_self:
320 mgr_map = self.mgr.get('mgr_map')
321 num = len(mgr_map.get('standbys'))
322 if not num:
323 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
324 'severity': 'warning',
325 'summary': 'Upgrade: Need standby mgr daemon',
326 'count': 1,
327 'detail': [
328 'The upgrade process needs to upgrade the mgr, '
329 'but it needs at least one standby to proceed.',
330 ],
331 })
332 return
333
334 logger.info('Upgrade: there are %d other already-upgraded '
f6b5b4d7 335 'standby mgrs, failing over' % num)
e306af50
TL
336
337 self._update_upgrade_progress(done / len(daemons))
338
339 # fail over
340 ret, out, err = self.mgr.check_mon_command({
341 'prefix': 'mgr fail',
342 'who': self.mgr.get_mgr_id(),
343 })
344 return
345 elif daemon_type == 'mgr':
346 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
347 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
348 self.mgr.set_health_checks(self.mgr.health_checks)
349
350 # make sure 'ceph versions' agrees
f6b5b4d7 351 ret, out_ver, err = self.mgr.check_mon_command({
e306af50
TL
352 'prefix': 'versions',
353 })
f6b5b4d7 354 j = json.loads(out_ver)
e306af50
TL
355 for version, count in j.get(daemon_type, {}).items():
356 if version != target_version:
357 logger.warning(
358 'Upgrade: %d %s daemon(s) are %s != target %s' %
359 (count, daemon_type, version, target_version))
360
361 # push down configs
362 if image_settings.get(daemon_type) != target_name:
363 logger.info('Upgrade: Setting container_image for all %s...' %
f6b5b4d7 364 daemon_type)
e306af50
TL
365 ret, out, err = self.mgr.check_mon_command({
366 'prefix': 'config set',
367 'name': 'container_image',
368 'value': target_name,
369 'who': name_to_config_section(daemon_type),
370 })
371 to_clean = []
372 for section in image_settings.keys():
373 if section.startswith(name_to_config_section(daemon_type) + '.'):
374 to_clean.append(section)
375 if to_clean:
376 logger.debug('Upgrade: Cleaning up container_image for %s...' %
f6b5b4d7 377 to_clean)
e306af50
TL
378 for section in to_clean:
379 ret, image, err = self.mgr.check_mon_command({
380 'prefix': 'config rm',
381 'name': 'container_image',
382 'who': section,
383 })
384
385 logger.info('Upgrade: All %s daemons are up to date.' %
f6b5b4d7 386 daemon_type)
e306af50
TL
387
388 # clean up
389 logger.info('Upgrade: Finalizing container_image settings')
390 ret, out, err = self.mgr.check_mon_command({
391 'prefix': 'config set',
392 'name': 'container_image',
393 'value': target_name,
394 'who': 'global',
395 })
396 for daemon_type in CEPH_UPGRADE_ORDER:
397 ret, image, err = self.mgr.check_mon_command({
398 'prefix': 'config rm',
399 'name': 'container_image',
400 'who': name_to_config_section(daemon_type),
401 })
402
403 logger.info('Upgrade: Complete!')
f6b5b4d7 404 if self.upgrade_state.progress_id:
e306af50 405 self.mgr.remote('progress', 'complete',
f6b5b4d7 406 self.upgrade_state.progress_id)
e306af50
TL
407 self.upgrade_state = None
408 self._save_upgrade_state()
409 return