]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/upgrade.py
5 from typing
import TYPE_CHECKING
, Optional
, Dict
, NamedTuple
8 from cephadm
.utils
import name_to_config_section
9 from orchestrator
import OrchestratorError
, DaemonDescription
12 from .module
import CephadmOrchestrator
15 # ceph daemon types that use the ceph container image.
16 # NOTE: listed in upgrade order!
17 CEPH_UPGRADE_ORDER
= ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
19 logger
= logging
.getLogger(__name__
)
26 target_id
: Optional
[str] = None,
27 repo_digest
: Optional
[str] = None,
28 target_version
: Optional
[str] = None,
29 error
: Optional
[str] = None,
30 paused
: Optional
[bool] = None,
32 self
._target
_name
: str = target_name
# Use CephadmUpgrade.target_image instead.
33 self
.progress_id
: str = progress_id
34 self
.target_id
: Optional
[str] = target_id
35 self
.repo_digest
: Optional
[str] = repo_digest
36 self
.target_version
: Optional
[str] = target_version
37 self
.error
: Optional
[str] = error
38 self
.paused
: bool = paused
or False
40 def to_json(self
) -> dict:
42 'target_name': self
._target
_name
,
43 'progress_id': self
.progress_id
,
44 'target_id': self
.target_id
,
45 'repo_digest': self
.repo_digest
,
46 'target_version': self
.target_version
,
48 'paused': self
.paused
,
52 def from_json(cls
, data
) -> Optional
['UpgradeState']:
60 def __init__(self
, mgr
: "CephadmOrchestrator"):
63 t
= self
.mgr
.get_store('upgrade_state')
65 self
.upgrade_state
: Optional
[UpgradeState
] = UpgradeState
.from_json(json
.loads(t
))
67 self
.upgrade_state
= None
70 def target_image(self
) -> str:
71 assert self
.upgrade_state
72 if not self
.mgr
.use_repo_digest
:
73 return self
.upgrade_state
._target
_name
74 if not self
.upgrade_state
.repo_digest
:
75 return self
.upgrade_state
._target
_name
77 return self
.upgrade_state
.repo_digest
79 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
80 r
= orchestrator
.UpgradeStatusSpec()
81 if self
.upgrade_state
:
82 r
.target_image
= self
.target_image
84 if self
.upgrade_state
.error
:
85 r
.message
= 'Error: ' + self
.upgrade_state
.error
86 elif self
.upgrade_state
.paused
:
87 r
.message
= 'Upgrade paused'
90 def upgrade_start(self
, image
, version
) -> str:
91 if self
.mgr
.mode
!= 'root':
92 raise OrchestratorError('upgrade is not supported in %s mode' % (
96 (major
, minor
, patch
) = version
.split('.')
97 assert int(minor
) >= 0
98 assert int(patch
) >= 0
100 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
101 if int(major
) < 15 or (int(major
) == 15 and int(minor
) < 2):
102 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
103 target_name
= self
.mgr
.container_image_base
+ ':v' + version
107 raise OrchestratorError('must specify either image or version')
108 if self
.upgrade_state
:
109 if self
.upgrade_state
._target
_name
!= target_name
:
110 raise OrchestratorError(
111 'Upgrade to %s (not %s) already in progress' %
112 (self
.upgrade_state
._target
_name
, target_name
))
113 if self
.upgrade_state
.paused
:
114 self
.upgrade_state
.paused
= False
115 self
._save
_upgrade
_state
()
116 return 'Resumed upgrade to %s' % self
.target_image
117 return 'Upgrade to %s in progress' % self
.target_image
118 self
.upgrade_state
= UpgradeState(
119 target_name
=target_name
,
120 progress_id
=str(uuid
.uuid4())
122 self
._update
_upgrade
_progress
(0.0)
123 self
._save
_upgrade
_state
()
124 self
._clear
_upgrade
_health
_checks
()
126 return 'Initiating upgrade to %s' % (target_name
)
128 def upgrade_pause(self
) -> str:
129 if not self
.upgrade_state
:
130 raise OrchestratorError('No upgrade in progress')
131 if self
.upgrade_state
.paused
:
132 return 'Upgrade to %s already paused' % self
.target_image
133 self
.upgrade_state
.paused
= True
134 self
._save
_upgrade
_state
()
135 return 'Paused upgrade to %s' % self
.target_image
137 def upgrade_resume(self
) -> str:
138 if not self
.upgrade_state
:
139 raise OrchestratorError('No upgrade in progress')
140 if not self
.upgrade_state
.paused
:
141 return 'Upgrade to %s not paused' % self
.target_image
142 self
.upgrade_state
.paused
= False
143 self
._save
_upgrade
_state
()
145 return 'Resumed upgrade to %s' % self
.target_image
147 def upgrade_stop(self
) -> str:
148 if not self
.upgrade_state
:
149 return 'No upgrade in progress'
150 if self
.upgrade_state
.progress_id
:
151 self
.mgr
.remote('progress', 'complete',
152 self
.upgrade_state
.progress_id
)
153 target_image
= self
.target_image
154 self
.upgrade_state
= None
155 self
._save
_upgrade
_state
()
156 self
._clear
_upgrade
_health
_checks
()
158 return 'Stopped upgrade to %s' % target_image
160 def continue_upgrade(self
) -> bool:
162 Returns false, if nothing was done.
165 if self
.upgrade_state
and not self
.upgrade_state
.paused
:
170 def _wait_for_ok_to_stop(self
, s
: DaemonDescription
) -> bool:
171 # only wait a little bit; the service might go away for something
174 if not self
.upgrade_state
or self
.upgrade_state
.paused
:
177 r
= self
.mgr
.cephadm_services
[s
.daemon_type
].ok_to_stop([s
.daemon_id
])
180 logger
.info(f
'Upgrade: {r.stdout}')
182 logger
.error(f
'Upgrade: {r.stderr}')
188 def _clear_upgrade_health_checks(self
) -> None:
189 for k
in ['UPGRADE_NO_STANDBY_MGR',
190 'UPGRADE_FAILED_PULL']:
191 if k
in self
.mgr
.health_checks
:
192 del self
.mgr
.health_checks
[k
]
193 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
195 def _fail_upgrade(self
, alert_id
, alert
) -> None:
196 logger
.error('Upgrade: Paused due to %s: %s' % (alert_id
,
198 if not self
.upgrade_state
:
199 assert False, 'No upgrade in progress'
201 self
.upgrade_state
.error
= alert_id
+ ': ' + alert
['summary']
202 self
.upgrade_state
.paused
= True
203 self
._save
_upgrade
_state
()
204 self
.mgr
.health_checks
[alert_id
] = alert
205 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
207 def _update_upgrade_progress(self
, progress
) -> None:
208 if not self
.upgrade_state
:
209 assert False, 'No upgrade in progress'
211 if not self
.upgrade_state
.progress_id
:
212 self
.upgrade_state
.progress_id
= str(uuid
.uuid4())
213 self
._save
_upgrade
_state
()
214 self
.mgr
.remote('progress', 'update', self
.upgrade_state
.progress_id
,
215 ev_msg
='Upgrade to %s' % self
.target_image
,
216 ev_progress
=progress
)
218 def _save_upgrade_state(self
) -> None:
219 if not self
.upgrade_state
:
220 self
.mgr
.set_store('upgrade_state', None)
222 self
.mgr
.set_store('upgrade_state', json
.dumps(self
.upgrade_state
.to_json()))
224 def get_distinct_container_image_settings(self
) -> Dict
[str, str]:
225 # get all distinct container_image settings
227 ret
, out
, err
= self
.mgr
.check_mon_command({
228 'prefix': 'config dump',
231 config
= json
.loads(out
)
233 if opt
['name'] == 'container_image':
234 image_settings
[opt
['section']] = opt
['value']
235 return image_settings
237 def _do_upgrade(self
):
239 if not self
.upgrade_state
:
240 logger
.debug('_do_upgrade no state, exiting')
243 target_image
= self
.target_image
244 target_id
= self
.upgrade_state
.target_id
245 if not target_id
or (self
.mgr
.use_repo_digest
and not self
.upgrade_state
.repo_digest
):
246 # need to learn the container hash
247 logger
.info('Upgrade: First pull of %s' % target_image
)
249 target_id
, target_version
, repo_digest
= self
.mgr
._get
_container
_image
_info
(
251 except OrchestratorError
as e
:
252 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
253 'severity': 'warning',
254 'summary': 'Upgrade: failed to pull target image',
259 self
.upgrade_state
.target_id
= target_id
260 self
.upgrade_state
.target_version
= target_version
261 self
.upgrade_state
.repo_digest
= repo_digest
262 self
._save
_upgrade
_state
()
263 target_image
= self
.target_image
264 target_version
= self
.upgrade_state
.target_version
265 logger
.info('Upgrade: Target is %s with id %s' % (target_image
,
268 image_settings
= self
.get_distinct_container_image_settings()
270 daemons
= self
.mgr
.cache
.get_daemons()
272 for daemon_type
in CEPH_UPGRADE_ORDER
:
273 logger
.info('Upgrade: Checking %s daemons...' % daemon_type
)
274 need_upgrade_self
= False
276 if d
.daemon_type
!= daemon_type
:
278 if d
.container_image_id
== target_id
:
279 logger
.debug('daemon %s.%s version correct' % (
280 daemon_type
, d
.daemon_id
))
283 logger
.debug('daemon %s.%s not correct (%s, %s, %s)' % (
284 daemon_type
, d
.daemon_id
,
285 d
.container_image_name
, d
.container_image_id
, d
.version
))
287 if self
.mgr
.daemon_is_self(d
.daemon_type
, d
.daemon_id
):
288 logger
.info('Upgrade: Need to upgrade myself (mgr.%s)' %
289 self
.mgr
.get_mgr_id())
290 need_upgrade_self
= True
293 # make sure host has latest container image
294 out
, err
, code
= self
.mgr
._run
_cephadm
(
295 d
.hostname
, '', 'inspect-image', [],
296 image
=target_image
, no_fsid
=True, error_ok
=True)
297 if code
or json
.loads(''.join(out
)).get('image_id') != target_id
:
298 logger
.info('Upgrade: Pulling %s on %s' % (target_image
,
300 out
, err
, code
= self
.mgr
._run
_cephadm
(
301 d
.hostname
, '', 'pull', [],
302 image
=target_image
, no_fsid
=True, error_ok
=True)
304 self
._fail
_upgrade
('UPGRADE_FAILED_PULL', {
305 'severity': 'warning',
306 'summary': 'Upgrade: failed to pull target image',
309 'failed to pull %s on host %s' % (target_image
,
313 r
= json
.loads(''.join(out
))
314 if r
.get('image_id') != target_id
:
315 logger
.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
316 target_image
, d
.hostname
, r
['image_id'], target_id
))
317 self
.upgrade_state
.target_id
= r
['image_id']
318 self
._save
_upgrade
_state
()
321 self
._update
_upgrade
_progress
(done
/ len(daemons
))
323 if not d
.container_image_id
:
324 if d
.container_image_name
== target_image
:
326 'daemon %s has unknown container_image_id but has correct image name' % (d
.name()))
328 if not self
._wait
_for
_ok
_to
_stop
(d
):
330 logger
.info('Upgrade: Redeploying %s.%s' %
331 (d
.daemon_type
, d
.daemon_id
))
332 self
.mgr
._daemon
_action
(
341 if need_upgrade_self
:
343 self
.mgr
.mgr_service
.fail_over()
344 except OrchestratorError
as e
:
345 self
._fail
_upgrade
('UPGRADE_NO_STANDBY_MGR', {
346 'severity': 'warning',
347 'summary': f
'Upgrade: {e}',
350 'The upgrade process needs to upgrade the mgr, '
351 'but it needs at least one standby to proceed.',
356 return # unreachable code, as fail_over never returns
357 elif daemon_type
== 'mgr':
358 if 'UPGRADE_NO_STANDBY_MGR' in self
.mgr
.health_checks
:
359 del self
.mgr
.health_checks
['UPGRADE_NO_STANDBY_MGR']
360 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
362 # make sure 'ceph versions' agrees
363 ret
, out_ver
, err
= self
.mgr
.check_mon_command({
364 'prefix': 'versions',
366 j
= json
.loads(out_ver
)
367 for version
, count
in j
.get(daemon_type
, {}).items():
368 if version
!= target_version
:
370 'Upgrade: %d %s daemon(s) are %s != target %s' %
371 (count
, daemon_type
, version
, target_version
))
374 if image_settings
.get(daemon_type
) != target_image
:
375 logger
.info('Upgrade: Setting container_image for all %s...' %
377 self
.mgr
.set_container_image(name_to_config_section(daemon_type
), target_image
)
379 for section
in image_settings
.keys():
380 if section
.startswith(name_to_config_section(daemon_type
) + '.'):
381 to_clean
.append(section
)
383 logger
.debug('Upgrade: Cleaning up container_image for %s...' %
385 for section
in to_clean
:
386 ret
, image
, err
= self
.mgr
.check_mon_command({
387 'prefix': 'config rm',
388 'name': 'container_image',
392 logger
.info('Upgrade: All %s daemons are up to date.' %
396 logger
.info('Upgrade: Finalizing container_image settings')
397 self
.mgr
.set_container_image('global', target_image
)
399 for daemon_type
in CEPH_UPGRADE_ORDER
:
400 ret
, image
, err
= self
.mgr
.check_mon_command({
401 'prefix': 'config rm',
402 'name': 'container_image',
403 'who': name_to_config_section(daemon_type
),
406 logger
.info('Upgrade: Complete!')
407 if self
.upgrade_state
.progress_id
:
408 self
.mgr
.remote('progress', 'complete',
409 self
.upgrade_state
.progress_id
)
410 self
.upgrade_state
= None
411 self
._save
_upgrade
_state
()