]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
5 from typing
import TYPE_CHECKING
, Optional
8 from cephadm
.utils
import name_to_config_section
9 from orchestrator
import OrchestratorError
, DaemonDescription
12 from .module
import CephadmOrchestrator
15 # ceph daemon types that use the ceph container image.
16 # NOTE: listed in upgrade order!
17 CEPH_UPGRADE_ORDER
= ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
19 logger
= logging
.getLogger(__name__
)
26 target_id
: Optional
[str] = None,
27 target_version
: Optional
[str] = None,
28 error
: Optional
[str] = None,
29 paused
: Optional
[bool] = None,
31 self
.target_name
: str = target_name
32 self
.progress_id
: str = progress_id
33 self
.target_id
: Optional
[str] = target_id
34 self
.target_version
: Optional
[str] = target_version
35 self
.error
: Optional
[str] = error
36 self
.paused
: bool = paused
or False
38 def to_json(self
) -> dict:
40 'target_name': self
.target_name
,
41 'progress_id': self
.progress_id
,
42 'target_id': self
.target_id
,
43 'target_version': self
.target_version
,
45 'paused': self
.paused
,
49 def from_json(cls
, data
) -> 'UpgradeState':
54 def __init__(self
, mgr
: "CephadmOrchestrator"):
57 t
= self
.mgr
.get_store('upgrade_state')
59 self
.upgrade_state
: Optional
[UpgradeState
] = UpgradeState
.from_json(json
.loads(t
))
61 self
.upgrade_state
= None
63 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
64 r
= orchestrator
.UpgradeStatusSpec()
65 if self
.upgrade_state
:
66 r
.target_image
= self
.upgrade_state
.target_name
68 if self
.upgrade_state
.error
:
69 r
.message
= 'Error: ' + self
.upgrade_state
.error
70 elif self
.upgrade_state
.paused
:
71 r
.message
= 'Upgrade paused'
74 def upgrade_start(self
, image
, version
) -> str:
75 if self
.mgr
.mode
!= 'root':
76 raise OrchestratorError('upgrade is not supported in %s mode' % (
80 (major
, minor
, patch
) = version
.split('.')
81 assert int(minor
) >= 0
82 assert int(patch
) >= 0
84 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
85 if int(major
) < 15 or (int(major
) == 15 and int(minor
) < 2):
86 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
87 target_name
= self
.mgr
.container_image_base
+ ':v' + version
91 raise OrchestratorError('must specify either image or version')
92 if self
.upgrade_state
:
93 if self
.upgrade_state
.target_name
!= target_name
:
94 raise OrchestratorError(
95 'Upgrade to %s (not %s) already in progress' %
96 (self
.upgrade_state
.target_name
, target_name
))
97 if self
.upgrade_state
.paused
:
98 self
.upgrade_state
.paused
= False
99 self
._save
_upgrade
_state
()
100 return 'Resumed upgrade to %s' % self
.upgrade_state
.target_name
101 return 'Upgrade to %s in progress' % self
.upgrade_state
.target_name
102 self
.upgrade_state
= UpgradeState(
103 target_name
=target_name
,
104 progress_id
=str(uuid
.uuid4())
106 self
._update
_upgrade
_progress
(0.0)
107 self
._save
_upgrade
_state
()
108 self
._clear
_upgrade
_health
_checks
()
110 return 'Initiating upgrade to %s' % (target_name
)
112 def upgrade_pause(self
) -> str:
113 if not self
.upgrade_state
:
114 raise OrchestratorError('No upgrade in progress')
115 if self
.upgrade_state
.paused
:
116 return 'Upgrade to %s already paused' % self
.upgrade_state
.target_name
117 self
.upgrade_state
.paused
= True
118 self
._save
_upgrade
_state
()
119 return 'Paused upgrade to %s' % self
.upgrade_state
.target_name
121 def upgrade_resume(self
) -> str:
122 if not self
.upgrade_state
:
123 raise OrchestratorError('No upgrade in progress')
124 if not self
.upgrade_state
.paused
:
125 return 'Upgrade to %s not paused' % self
.upgrade_state
.target_name
126 self
.upgrade_state
.paused
= False
127 self
._save
_upgrade
_state
()
129 return 'Resumed upgrade to %s' % self
.upgrade_state
.target_name
131 def upgrade_stop(self
) -> str:
132 if not self
.upgrade_state
:
133 return 'No upgrade in progress'
134 target_name
= self
.upgrade_state
.target_name
135 if self
.upgrade_state
.progress_id
:
136 self
.mgr
.remote('progress', 'complete',
137 self
.upgrade_state
.progress_id
)
138 self
.upgrade_state
= None
139 self
._save
_upgrade
_state
()
140 self
._clear
_upgrade
_health
_checks
()
142 return 'Stopped upgrade to %s' % target_name
144 def continue_upgrade(self
) -> bool:
146 Returns false, if nothing was done.
149 if self
.upgrade_state
and not self
.upgrade_state
.paused
:
154 def _wait_for_ok_to_stop(self
, s
: DaemonDescription
) -> bool:
155 # only wait a little bit; the service might go away for something
158 if not self
.upgrade_state
or self
.upgrade_state
.paused
:
161 r
= self
.mgr
.cephadm_services
[s
.daemon_type
].ok_to_stop([s
.daemon_id
])
164 logger
.info(f
'Upgrade: {r.stdout}')
166 logger
.error('Upgrade: {r.stderr}')
172 def _clear_upgrade_health_checks(self
) -> None:
173 for k
in ['UPGRADE_NO_STANDBY_MGR',
174 'UPGRADE_FAILED_PULL']:
175 if k
in self
.mgr
.health_checks
:
176 del self
.mgr
.health_checks
[k
]
177 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
179 def _fail_upgrade(self
, alert_id
, alert
) -> None:
180 logger
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
182 if not self
.upgrade_state
:
183 assert False, 'No upgrade in progress'
185 self
.upgrade_state
.error
= alert_id
+ ': ' + alert
['summary']
186 self
.upgrade_state
.paused
= True
187 self
._save
_upgrade
_state
()
188 self
.mgr
.health_checks
[alert_id
] = alert
189 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
191 def _update_upgrade_progress(self
, progress
) -> None:
192 if not self
.upgrade_state
:
193 assert False, 'No upgrade in progress'
195 if not self
.upgrade_state
.progress_id
:
196 self
.upgrade_state
.progress_id
= str(uuid
.uuid4())
197 self
._save
_upgrade
_state
()
198 self
.mgr
.remote('progress', 'update', self
.upgrade_state
.progress_id
,
199 ev_msg
='Upgrade to %s' % self
.upgrade_state
.target_name
,
200 ev_progress
=progress
)
202 def _save_upgrade_state(self
) -> None:
203 if not self
.upgrade_state
:
204 self
.mgr
.set_store('upgrade_state', None)
206 self
.mgr
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
.to_json()))
208 def _do_upgrade(self
):
210 if not self
.upgrade_state
:
211 logger
.debug('_do_upgrade no state, exiting')
214 target_name
= self
.upgrade_state
.target_name
215 target_id
= self
.upgrade_state
.target_id
217 # need to learn the container hash
218 logger
.info('Upgrade: First pull of %s' % target_name
)
220 target_id
, target_version
= self
.mgr
._get
_container
_image
_id
(target_name
)
221 except OrchestratorError
as e
:
222 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
223 'severity': 'warning',
224 'summary': 'Upgrade: failed to pull target image',
229 self
.upgrade_state
.target_id
= target_id
230 self
.upgrade_state
.target_version
= target_version
231 self
._save
_upgrade
_state
()
232 target_version
= self
.upgrade_state
.target_version
233 logger
.info('Upgrade: Target is %s with id %s' % (target_name
,
236 # get all distinct container_image settings
238 ret
, out
, err
= self
.mgr
.check_mon_command({
239 'prefix': 'config dump',
242 config
= json
.loads(out
)
244 if opt
['name'] == 'container_image':
245 image_settings
[opt
['section']] = opt
['value']
247 daemons
= self
.mgr
.cache
.get_daemons()
249 for daemon_type
in CEPH_UPGRADE_ORDER
:
250 logger
.info('Upgrade: Checking %s daemons...' % daemon_type
)
251 need_upgrade_self
= False
253 if d
.daemon_type
!= daemon_type
:
255 if d
.container_image_id
== target_id
:
256 logger
.debug('daemon %s.%s version correct' % (
257 daemon_type
, d
.daemon_id
))
260 logger
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
261 daemon_type
, d
.daemon_id
,
262 d
.container_image_name
, d
.container_image_id
, d
.version
))
264 if daemon_type
== 'mgr' and \
265 d
.daemon_id
== self
.mgr
.get_mgr_id():
266 logger
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
267 self
.mgr
.get_mgr_id())
268 need_upgrade_self
= True
271 # make sure host has latest container image
272 out
, err
, code
= self
.mgr
._run
_cephadm
(
273 d
.hostname
, '', 'inspect-image', [],
274 image
=target_name
, no_fsid
=True, error_ok
=True)
275 if code
or json
.loads(''.join(out
)).get('image_id') != target_id
:
276 logger
.info('Upgrade: Pulling %s on %s' % (target_name
,
278 out
, err
, code
= self
.mgr
._run
_cephadm
(
279 d
.hostname
, '', 'pull', [],
280 image
=target_name
, no_fsid
=True, error_ok
=True)
282 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
283 'severity': 'warning',
284 'summary': 'Upgrade: failed to pull target image',
287 'failed to pull %s on host %s' % (target_name
,
291 r
= json
.loads(''.join(out
))
292 if r
.get('image_id') != target_id
:
293 logger
.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
294 target_name
, d
.hostname
, r
['image_id'], target_id
))
295 self
.upgrade_state
.target_id
= r
['image_id']
296 self
._save
_upgrade
_state
()
299 self
._update
_upgrade
_progress
(done
/ len(daemons
))
301 if not d
.container_image_id
:
302 if d
.container_image_name
== target_name
:
304 'daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
306 if not self
._wait
_for
_ok
_to
_stop
(d
):
308 logger
.info('Upgrade: Redeploying %s.%s' %
309 (d
.daemon_type
, d
.daemon_id
))
310 self
.mgr
._daemon
_action
(
319 if need_upgrade_self
:
320 mgr_map
= self
.mgr
.get('mgr_map')
321 num
= len(mgr_map
.get('standbys'))
323 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
324 'severity': 'warning',
325 'summary': 'Upgrade: Need standby mgr daemon',
328 'The upgrade process needs to upgrade the mgr, '
329 'but it needs at least one standby to proceed.',
334 logger
.info('Upgrade: there are %d other already-upgraded '
335 'standby mgrs, failing over' % num
)
337 self
._update
_upgrade
_progress
(done
/ len(daemons
))
340 ret
, out
, err
= self
.mgr
.check_mon_command({
341 'prefix': 'mgr fail',
342 'who': self
.mgr
.get_mgr_id(),
345 elif daemon_type
== 'mgr':
346 if 'UPGRADE_NO_STANDBY_MGR' in self
.mgr
.health_checks
:
347 del self
.mgr
.health_checks
['UPGRADE_NO_STANDBY_MGR']
348 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
350 # make sure 'ceph versions' agrees
351 ret
, out_ver
, err
= self
.mgr
.check_mon_command({
352 'prefix': 'versions',
354 j
= json
.loads(out_ver
)
355 for version
, count
in j
.get(daemon_type
, {}).items():
356 if version
!= target_version
:
358 'Upgrade: %d %s daemon(s) are %s != target %s' %
359 (count
, daemon_type
, version
, target_version
))
362 if image_settings
.get(daemon_type
) != target_name
:
363 logger
.info('Upgrade: Setting container_image for all %s...' %
365 ret
, out
, err
= self
.mgr
.check_mon_command({
366 'prefix': 'config set',
367 'name': 'container_image',
368 'value': target_name
,
369 'who': name_to_config_section(daemon_type
),
372 for section
in image_settings
.keys():
373 if section
.startswith(name_to_config_section(daemon_type
) + '.'):
374 to_clean
.append(section
)
376 logger
.debug('Upgrade: Cleaning up container_image for %s...' %
378 for section
in to_clean
:
379 ret
, image
, err
= self
.mgr
.check_mon_command({
380 'prefix': 'config rm',
381 'name': 'container_image',
385 logger
.info('Upgrade: All %s daemons are up to date.' %
389 logger
.info('Upgrade: Finalizing container_image settings')
390 ret
, out
, err
= self
.mgr
.check_mon_command({
391 'prefix': 'config set',
392 'name': 'container_image',
393 'value': target_name
,
396 for daemon_type
in CEPH_UPGRADE_ORDER
:
397 ret
, image
, err
= self
.mgr
.check_mon_command({
398 'prefix': 'config rm',
399 'name': 'container_image',
400 'who': name_to_config_section(daemon_type
),
403 logger
.info('Upgrade: Complete!')
404 if self
.upgrade_state
.progress_id
:
405 self
.mgr
.remote('progress', 'complete',
406 self
.upgrade_state
.progress_id
)
407 self
.upgrade_state
= None
408 self
._save
_upgrade
_state
()