]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / cephadm / upgrade.py
1 import json
2 import logging
3 import time
4 import uuid
5 from typing import TYPE_CHECKING, Optional
6
7 import orchestrator
8 from cephadm.utils import name_to_config_section
9 from orchestrator import OrchestratorError
10
11 if TYPE_CHECKING:
12 from .module import CephadmOrchestrator
13
14
15 # ceph daemon types that use the ceph container image.
16 # NOTE: listed in upgrade order!
17 CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
18
19 logger = logging.getLogger(__name__)
20
21 class CephadmUpgrade:
22 def __init__(self, mgr: "CephadmOrchestrator"):
23 self.mgr = mgr
24
25 t = self.mgr.get_store('upgrade_state')
26 if t:
27 self.upgrade_state = json.loads(t)
28 else:
29 self.upgrade_state = None
30
31 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
32 r = orchestrator.UpgradeStatusSpec()
33 if self.upgrade_state:
34 r.target_image = self.upgrade_state.get('target_name')
35 r.in_progress = True
36 if self.upgrade_state.get('error'):
37 r.message = 'Error: ' + self.upgrade_state.get('error')
38 elif self.upgrade_state.get('paused'):
39 r.message = 'Upgrade paused'
40 return r
41
42 def upgrade_start(self, image, version) -> str:
43 if self.mgr.mode != 'root':
44 raise OrchestratorError('upgrade is not supported in %s mode' % (
45 self.mgr.mode))
46 if version:
47 try:
48 (major, minor, patch) = version.split('.')
49 assert int(minor) >= 0
50 assert int(patch) >= 0
51 except:
52 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
53 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
54 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
55 target_name = self.mgr.container_image_base + ':v' + version
56 elif image:
57 target_name = image
58 else:
59 raise OrchestratorError('must specify either image or version')
60 if self.upgrade_state:
61 if self.upgrade_state.get('target_name') != target_name:
62 raise OrchestratorError(
63 'Upgrade to %s (not %s) already in progress' %
64 (self.upgrade_state.get('target_name'), target_name))
65 if self.upgrade_state.get('paused'):
66 del self.upgrade_state['paused']
67 self._save_upgrade_state()
68 return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
69 return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
70 self.upgrade_state = {
71 'target_name': target_name,
72 'progress_id': str(uuid.uuid4()),
73 }
74 self._update_upgrade_progress(0.0)
75 self._save_upgrade_state()
76 self._clear_upgrade_health_checks()
77 self.mgr.event.set()
78 return 'Initiating upgrade to %s' % (target_name)
79
80 def upgrade_pause(self) -> str:
81 if not self.upgrade_state:
82 raise OrchestratorError('No upgrade in progress')
83 if self.upgrade_state.get('paused'):
84 return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
85 self.upgrade_state['paused'] = True
86 self._save_upgrade_state()
87 return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
88
89 def upgrade_resume(self) -> str:
90 if not self.upgrade_state:
91 raise OrchestratorError('No upgrade in progress')
92 if not self.upgrade_state.get('paused'):
93 return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
94 del self.upgrade_state['paused']
95 self._save_upgrade_state()
96 self.mgr.event.set()
97 return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
98
99 def upgrade_stop(self) -> str:
100 if not self.upgrade_state:
101 return 'No upgrade in progress'
102 target_name = self.upgrade_state.get('target_name')
103 if 'progress_id' in self.upgrade_state:
104 self.mgr.remote('progress', 'complete',
105 self.upgrade_state['progress_id'])
106 self.upgrade_state = None
107 self._save_upgrade_state()
108 self._clear_upgrade_health_checks()
109 self.mgr.event.set()
110 return 'Stopped upgrade to %s' % target_name
111
112 def continue_upgrade(self) -> bool:
113 """
114 Returns false, if nothing was done.
115 :return:
116 """
117 if self.upgrade_state and not self.upgrade_state.get('paused'):
118 self._do_upgrade()
119 return True
120 return False
121
122 def _wait_for_ok_to_stop(self, s) -> bool:
123 # only wait a little bit; the service might go away for something
124 tries = 4
125 while tries > 0:
126 if s.daemon_type not in ['mon', 'osd', 'mds']:
127 logger.info('Upgrade: It is presumed safe to stop %s.%s' %
128 (s.daemon_type, s.daemon_id))
129 return True
130 ret, out, err = self.mgr.mon_command({
131 'prefix': '%s ok-to-stop' % s.daemon_type,
132 'ids': [s.daemon_id],
133 })
134 if not self.upgrade_state or self.upgrade_state.get('paused'):
135 return False
136 if ret:
137 logger.info('Upgrade: It is NOT safe to stop %s.%s' %
138 (s.daemon_type, s.daemon_id))
139 time.sleep(15)
140 tries -= 1
141 else:
142 logger.info('Upgrade: It is safe to stop %s.%s' %
143 (s.daemon_type, s.daemon_id))
144 return True
145 return False
146
147 def _clear_upgrade_health_checks(self) -> None:
148 for k in ['UPGRADE_NO_STANDBY_MGR',
149 'UPGRADE_FAILED_PULL']:
150 if k in self.mgr.health_checks:
151 del self.mgr.health_checks[k]
152 self.mgr.set_health_checks(self.mgr.health_checks)
153
154 def _fail_upgrade(self, alert_id, alert) -> None:
155 logger.error('Upgrade: Paused due to %s: %s' % (alert_id,
156 alert['summary']))
157 self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
158 self.upgrade_state['paused'] = True
159 self._save_upgrade_state()
160 self.mgr.health_checks[alert_id] = alert
161 self.mgr.set_health_checks(self.mgr.health_checks)
162
163 def _update_upgrade_progress(self, progress) -> None:
164 if 'progress_id' not in self.upgrade_state:
165 self.upgrade_state['progress_id'] = str(uuid.uuid4())
166 self._save_upgrade_state()
167 self.mgr.remote('progress', 'update', self.upgrade_state['progress_id'],
168 ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
169 ev_progress=progress)
170
171 def _save_upgrade_state(self) -> None:
172 self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state))
173
174 def _do_upgrade(self):
175 # type: () -> None
176 if not self.upgrade_state:
177 logger.debug('_do_upgrade no state, exiting')
178 return
179
180 target_name = self.upgrade_state.get('target_name')
181 target_id = self.upgrade_state.get('target_id', None)
182 if not target_id:
183 # need to learn the container hash
184 logger.info('Upgrade: First pull of %s' % target_name)
185 try:
186 target_id, target_version = self.mgr._get_container_image_id(target_name)
187 except OrchestratorError as e:
188 self._fail_upgrade('UPGRADE_FAILED_PULL', {
189 'severity': 'warning',
190 'summary': 'Upgrade: failed to pull target image',
191 'count': 1,
192 'detail': [str(e)],
193 })
194 return
195 self.upgrade_state['target_id'] = target_id
196 self.upgrade_state['target_version'] = target_version
197 self._save_upgrade_state()
198 target_version = self.upgrade_state.get('target_version')
199 logger.info('Upgrade: Target is %s with id %s' % (target_name,
200 target_id))
201
202 # get all distinct container_image settings
203 image_settings = {}
204 ret, out, err = self.mgr.check_mon_command({
205 'prefix': 'config dump',
206 'format': 'json',
207 })
208 config = json.loads(out)
209 for opt in config:
210 if opt['name'] == 'container_image':
211 image_settings[opt['section']] = opt['value']
212
213 daemons = self.mgr.cache.get_daemons()
214 done = 0
215 for daemon_type in CEPH_UPGRADE_ORDER:
216 logger.info('Upgrade: Checking %s daemons...' % daemon_type)
217 need_upgrade_self = False
218 for d in daemons:
219 if d.daemon_type != daemon_type:
220 continue
221 if d.container_image_id == target_id:
222 logger.debug('daemon %s.%s version correct' % (
223 daemon_type, d.daemon_id))
224 done += 1
225 continue
226 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
227 daemon_type, d.daemon_id,
228 d.container_image_name, d.container_image_id, d.version))
229
230 if daemon_type == 'mgr' and \
231 d.daemon_id == self.mgr.get_mgr_id():
232 logger.info('Upgrade: Need to upgrade myself (mgr.%s)' %
233 self.mgr.get_mgr_id())
234 need_upgrade_self = True
235 continue
236
237 # make sure host has latest container image
238 out, err, code = self.mgr._run_cephadm(
239 d.hostname, None, 'inspect-image', [],
240 image=target_name, no_fsid=True, error_ok=True)
241 if code or json.loads(''.join(out)).get('image_id') != target_id:
242 logger.info('Upgrade: Pulling %s on %s' % (target_name,
243 d.hostname))
244 out, err, code = self.mgr._run_cephadm(
245 d.hostname, None, 'pull', [],
246 image=target_name, no_fsid=True, error_ok=True)
247 if code:
248 self._fail_upgrade('UPGRADE_FAILED_PULL', {
249 'severity': 'warning',
250 'summary': 'Upgrade: failed to pull target image',
251 'count': 1,
252 'detail': [
253 'failed to pull %s on host %s' % (target_name,
254 d.hostname)],
255 })
256 return
257 r = json.loads(''.join(out))
258 if r.get('image_id') != target_id:
259 logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
260 self.upgrade_state['target_id'] = r['image_id']
261 self._save_upgrade_state()
262 return
263
264 self._update_upgrade_progress(done / len(daemons))
265
266 if not d.container_image_id:
267 if d.container_image_name == target_name:
268 logger.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
269 continue
270 if not self._wait_for_ok_to_stop(d):
271 return
272 logger.info('Upgrade: Redeploying %s.%s' %
273 (d.daemon_type, d.daemon_id))
274 ret, out, err = self.mgr.check_mon_command({
275 'prefix': 'config set',
276 'name': 'container_image',
277 'value': target_name,
278 'who': name_to_config_section(daemon_type + '.' + d.daemon_id),
279 })
280 self.mgr._daemon_action(
281 d.daemon_type,
282 d.daemon_id,
283 d.hostname,
284 'redeploy'
285 )
286 return
287
288 if need_upgrade_self:
289 mgr_map = self.mgr.get('mgr_map')
290 num = len(mgr_map.get('standbys'))
291 if not num:
292 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
293 'severity': 'warning',
294 'summary': 'Upgrade: Need standby mgr daemon',
295 'count': 1,
296 'detail': [
297 'The upgrade process needs to upgrade the mgr, '
298 'but it needs at least one standby to proceed.',
299 ],
300 })
301 return
302
303 logger.info('Upgrade: there are %d other already-upgraded '
304 'standby mgrs, failing over' % num)
305
306 self._update_upgrade_progress(done / len(daemons))
307
308 # fail over
309 ret, out, err = self.mgr.check_mon_command({
310 'prefix': 'mgr fail',
311 'who': self.mgr.get_mgr_id(),
312 })
313 return
314 elif daemon_type == 'mgr':
315 if 'UPGRADE_NO_STANDBY_MGR' in self.mgr.health_checks:
316 del self.mgr.health_checks['UPGRADE_NO_STANDBY_MGR']
317 self.mgr.set_health_checks(self.mgr.health_checks)
318
319 # make sure 'ceph versions' agrees
320 ret, out, err = self.mgr.check_mon_command({
321 'prefix': 'versions',
322 })
323 j = json.loads(out)
324 for version, count in j.get(daemon_type, {}).items():
325 if version != target_version:
326 logger.warning(
327 'Upgrade: %d %s daemon(s) are %s != target %s' %
328 (count, daemon_type, version, target_version))
329
330 # push down configs
331 if image_settings.get(daemon_type) != target_name:
332 logger.info('Upgrade: Setting container_image for all %s...' %
333 daemon_type)
334 ret, out, err = self.mgr.check_mon_command({
335 'prefix': 'config set',
336 'name': 'container_image',
337 'value': target_name,
338 'who': name_to_config_section(daemon_type),
339 })
340 to_clean = []
341 for section in image_settings.keys():
342 if section.startswith(name_to_config_section(daemon_type) + '.'):
343 to_clean.append(section)
344 if to_clean:
345 logger.debug('Upgrade: Cleaning up container_image for %s...' %
346 to_clean)
347 for section in to_clean:
348 ret, image, err = self.mgr.check_mon_command({
349 'prefix': 'config rm',
350 'name': 'container_image',
351 'who': section,
352 })
353
354 logger.info('Upgrade: All %s daemons are up to date.' %
355 daemon_type)
356
357 # clean up
358 logger.info('Upgrade: Finalizing container_image settings')
359 ret, out, err = self.mgr.check_mon_command({
360 'prefix': 'config set',
361 'name': 'container_image',
362 'value': target_name,
363 'who': 'global',
364 })
365 for daemon_type in CEPH_UPGRADE_ORDER:
366 ret, image, err = self.mgr.check_mon_command({
367 'prefix': 'config rm',
368 'name': 'container_image',
369 'who': name_to_config_section(daemon_type),
370 })
371
372 logger.info('Upgrade: Complete!')
373 if 'progress_id' in self.upgrade_state:
374 self.mgr.remote('progress', 'complete',
375 self.upgrade_state['progress_id'])
376 self.upgrade_state = None
377 self._save_upgrade_state()
378 return