]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/migrations.py
52a8605bc1d148199b83b464fcd7fe961dd2283b
[ceph.git] / ceph / src / pybind / mgr / cephadm / migrations.py
1 import json
2 import re
3 import logging
4 from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any, List
5
6 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec, RGWSpec
7 from cephadm.schedule import HostAssignment
8 import rados
9
10 from mgr_module import NFS_POOL_NAME
11 from orchestrator import OrchestratorError, DaemonDescription
12
13 if TYPE_CHECKING:
14 from .module import CephadmOrchestrator
15
16 LAST_MIGRATION = 6
17
18 logger = logging.getLogger(__name__)
19
20
21 class Migrations:
22 def __init__(self, mgr: "CephadmOrchestrator"):
23 self.mgr = mgr
24
25 # Why having a global counter, instead of spec versions?
26 #
27 # for the first migration:
28 # The specs don't change in (this) migration. but the scheduler here.
29 # Adding the version to the specs at this time just felt wrong to me.
30 #
31 # And the specs are only another part of cephadm which needs potential upgrades.
32 # We have the cache, the inventory, the config store, the upgrade (imagine changing the
33 # upgrade code, while an old upgrade is still in progress), naming of daemons,
34 # fs-layout of the daemons, etc.
35 self.set_sane_migration_current()
36
37 v = mgr.get_store('nfs_migration_queue')
38 self.nfs_migration_queue = json.loads(v) if v else []
39
40 r = mgr.get_store('rgw_migration_queue')
41 self.rgw_migration_queue = json.loads(r) if r else []
42
43 # for some migrations, we don't need to do anything except for
44 # incrementing migration_current.
45 # let's try to shortcut things here.
46 self.migrate(True)
47
48 def set(self, val: int) -> None:
49 self.mgr.set_module_option('migration_current', val)
50 self.mgr.migration_current = val
51
52 def set_sane_migration_current(self) -> None:
53 # migration current should always be an integer
54 # between 0 and LAST_MIGRATION (inclusive) in order to
55 # actually carry out migration. If we find
56 # it is None or too high of a value here we should
57 # set it to some sane value
58 mc: Optional[int] = self.mgr.migration_current
59 if mc is None:
60 logger.info('Found migration_current of "None". Setting to last migration.')
61 self.set(LAST_MIGRATION)
62 return
63
64 if mc > LAST_MIGRATION:
65 logger.error(f'Found migration_current of {mc} when max should be {LAST_MIGRATION}. Setting back to 0.')
66 # something has gone wrong and caused migration_current
67 # to be higher than it should be able to be. Best option
68 # we have here is to just set it back to 0
69 self.set(0)
70
71 def is_migration_ongoing(self) -> bool:
72 self.set_sane_migration_current()
73 mc: Optional[int] = self.mgr.migration_current
74 return mc is None or mc < LAST_MIGRATION
75
76 def verify_no_migration(self) -> None:
77 if self.is_migration_ongoing():
78 # this is raised in module.serve()
79 raise OrchestratorError(
80 "cephadm migration still ongoing. Please wait, until the migration is complete.")
81
82 def migrate(self, startup: bool = False) -> None:
83 if self.mgr.migration_current == 0:
84 if self.migrate_0_1():
85 self.set(1)
86
87 if self.mgr.migration_current == 1:
88 if self.migrate_1_2():
89 self.set(2)
90
91 if self.mgr.migration_current == 2 and not startup:
92 if self.migrate_2_3():
93 self.set(3)
94
95 if self.mgr.migration_current == 3:
96 if self.migrate_3_4():
97 self.set(4)
98
99 if self.mgr.migration_current == 4:
100 if self.migrate_4_5():
101 self.set(5)
102
103 if self.mgr.migration_current == 5:
104 if self.migrate_5_6():
105 self.set(6)
106
107 def migrate_0_1(self) -> bool:
108 """
109 Migration 0 -> 1
110 New scheduler that takes PlacementSpec as the bound and not as recommendation.
111 I.e. the new scheduler won't suggest any new placements outside of the hosts
112 specified by label etc.
113
114 Which means, we have to make sure, we're not removing any daemons directly after
115 upgrading to the new scheduler.
116
117 There is a potential race here:
118 1. user updates his spec to remove daemons
119 2. mgr gets upgraded to new scheduler, before the old scheduler removed the daemon
120 3. now, we're converting the spec to explicit placement, thus reverting (1.)
121 I think this is ok.
122 """
123
124 def interesting_specs() -> Iterator[ServiceSpec]:
125 for s in self.mgr.spec_store.all_specs.values():
126 if s.unmanaged:
127 continue
128 p = s.placement
129 if p is None:
130 continue
131 if p.count is None:
132 continue
133 if not p.hosts and not p.host_pattern and not p.label:
134 continue
135 yield s
136
137 def convert_to_explicit(spec: ServiceSpec) -> None:
138 existing_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
139 placements, to_add, to_remove = HostAssignment(
140 spec=spec,
141 hosts=self.mgr.inventory.all_specs(),
142 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
143 draining_hosts=self.mgr.cache.get_draining_hosts(),
144 daemons=existing_daemons,
145 ).place()
146
147 # We have to migrate, only if the new scheduler would remove daemons
148 if len(placements) >= len(existing_daemons):
149 return
150
151 def to_hostname(d: DaemonDescription) -> HostPlacementSpec:
152 if d.hostname in old_hosts:
153 return old_hosts[d.hostname]
154 else:
155 assert d.hostname
156 return HostPlacementSpec(d.hostname, '', '')
157
158 old_hosts = {h.hostname: h for h in spec.placement.hosts}
159 new_hosts = [to_hostname(d) for d in existing_daemons]
160
161 new_placement = PlacementSpec(
162 hosts=new_hosts,
163 count=spec.placement.count
164 )
165
166 new_spec = ServiceSpec.from_json(spec.to_json())
167 new_spec.placement = new_placement
168
169 logger.info(f"Migrating {spec.one_line_str()} to explicit placement")
170
171 self.mgr.spec_store.save(new_spec)
172
173 specs = list(interesting_specs())
174 if not specs:
175 return True # nothing to do. shortcut
176
177 if not self.mgr.cache.daemon_cache_filled():
178 logger.info("Unable to migrate yet. Daemon Cache still incomplete.")
179 return False
180
181 for spec in specs:
182 convert_to_explicit(spec)
183
184 return True
185
186 def migrate_1_2(self) -> bool:
187 """
188 After 15.2.4, we unified some service IDs: MONs, MGRs etc no longer have a service id.
189 Which means, the service names changed:
190
191 mon.foo -> mon
192 mgr.foo -> mgr
193
194 This fixes the data structure consistency
195 """
196 bad_specs = {}
197 for name, spec in self.mgr.spec_store.all_specs.items():
198 if name != spec.service_name():
199 bad_specs[name] = (spec.service_name(), spec)
200
201 for old, (new, old_spec) in bad_specs.items():
202 if new not in self.mgr.spec_store.all_specs:
203 spec = old_spec
204 else:
205 spec = self.mgr.spec_store.all_specs[new]
206 spec.unmanaged = True
207 self.mgr.spec_store.save(spec)
208 self.mgr.spec_store.finally_rm(old)
209
210 return True
211
212 def migrate_2_3(self) -> bool:
213 if self.nfs_migration_queue:
214 from nfs.cluster import create_ganesha_pool
215
216 create_ganesha_pool(self.mgr)
217 for service_id, pool, ns in self.nfs_migration_queue:
218 if pool != '.nfs':
219 self.migrate_nfs_spec(service_id, pool, ns)
220 self.nfs_migration_queue = []
221 self.mgr.log.info('Done migrating all NFS services')
222 return True
223
224 def migrate_nfs_spec(self, service_id: str, pool: str, ns: Optional[str]) -> None:
225 renamed = False
226 if service_id.startswith('ganesha-'):
227 service_id = service_id[8:]
228 renamed = True
229
230 self.mgr.log.info(
231 f'Migrating nfs.{service_id} from legacy pool {pool} namespace {ns}'
232 )
233
234 # read exports
235 ioctx = self.mgr.rados.open_ioctx(pool)
236 if ns is not None:
237 ioctx.set_namespace(ns)
238 object_iterator = ioctx.list_objects()
239 exports = []
240 while True:
241 try:
242 obj = object_iterator.__next__()
243 if obj.key.startswith('export-'):
244 self.mgr.log.debug(f'reading {obj.key}')
245 exports.append(obj.read().decode())
246 except StopIteration:
247 break
248 self.mgr.log.info(f'Found {len(exports)} exports for legacy nfs.{service_id}')
249
250 # copy grace file
251 if service_id != ns:
252 try:
253 grace = ioctx.read("grace")
254 new_ioctx = self.mgr.rados.open_ioctx(NFS_POOL_NAME)
255 new_ioctx.set_namespace(service_id)
256 new_ioctx.write_full("grace", grace)
257 self.mgr.log.info('Migrated nfs-ganesha grace file')
258 except rados.ObjectNotFound:
259 self.mgr.log.debug('failed to read old grace file; skipping')
260
261 if renamed and f'nfs.ganesha-{service_id}' in self.mgr.spec_store:
262 # rename from nfs.ganesha-* to nfs.*. This will destroy old daemons and
263 # deploy new ones.
264 self.mgr.log.info(f'Replacing nfs.ganesha-{service_id} with nfs.{service_id}')
265 spec = self.mgr.spec_store[f'nfs.ganesha-{service_id}'].spec
266 self.mgr.spec_store.rm(f'nfs.ganesha-{service_id}')
267 spec.service_id = service_id
268 self.mgr.spec_store.save(spec, True)
269
270 # We have to remove the old daemons here as well, otherwise we'll end up with a port conflict.
271 daemons = [d.name()
272 for d in self.mgr.cache.get_daemons_by_service(f'nfs.ganesha-{service_id}')]
273 self.mgr.log.info(f'Removing old nfs.ganesha-{service_id} daemons {daemons}')
274 self.mgr.remove_daemons(daemons)
275 else:
276 # redeploy all ganesha daemons to ensures that the daemon
277 # cephx are correct AND container configs are set up properly
278 daemons = [d.name() for d in self.mgr.cache.get_daemons_by_service(f'nfs.{service_id}')]
279 self.mgr.log.info(f'Removing old nfs.{service_id} daemons {daemons}')
280 self.mgr.remove_daemons(daemons)
281
282 # re-save service spec (without pool and namespace properties!)
283 spec = self.mgr.spec_store[f'nfs.{service_id}'].spec
284 self.mgr.spec_store.save(spec)
285
286 # import exports
287 for export in exports:
288 ex = ''
289 for line in export.splitlines():
290 if (
291 line.startswith(' secret_access_key =')
292 or line.startswith(' user_id =')
293 ):
294 continue
295 ex += line + '\n'
296 self.mgr.log.debug(f'importing export: {ex}')
297 ret, out, err = self.mgr.mon_command({
298 'prefix': 'nfs export apply',
299 'cluster_id': service_id
300 }, inbuf=ex)
301 if ret:
302 self.mgr.log.warning(f'Failed to migrate export ({ret}): {err}\nExport was:\n{ex}')
303 self.mgr.log.info(f'Done migrating nfs.{service_id}')
304
305 def migrate_3_4(self) -> bool:
306 # We can't set any host with the _admin label, but we're
307 # going to warn when calling `ceph orch host rm...`
308 if 'client.admin' not in self.mgr.keys.keys:
309 self.mgr._client_keyring_set(
310 entity='client.admin',
311 placement='label:_admin',
312 )
313 return True
314
315 def migrate_4_5(self) -> bool:
316 registry_url = self.mgr.get_module_option('registry_url')
317 registry_username = self.mgr.get_module_option('registry_username')
318 registry_password = self.mgr.get_module_option('registry_password')
319 if registry_url and registry_username and registry_password:
320
321 registry_credentials = {'url': registry_url,
322 'username': registry_username, 'password': registry_password}
323 self.mgr.set_store('registry_credentials', json.dumps(registry_credentials))
324
325 self.mgr.set_module_option('registry_url', None)
326 self.mgr.check_mon_command({
327 'prefix': 'config rm',
328 'who': 'mgr',
329 'key': 'mgr/cephadm/registry_url',
330 })
331 self.mgr.set_module_option('registry_username', None)
332 self.mgr.check_mon_command({
333 'prefix': 'config rm',
334 'who': 'mgr',
335 'key': 'mgr/cephadm/registry_username',
336 })
337 self.mgr.set_module_option('registry_password', None)
338 self.mgr.check_mon_command({
339 'prefix': 'config rm',
340 'who': 'mgr',
341 'key': 'mgr/cephadm/registry_password',
342 })
343
344 self.mgr.log.info('Done migrating registry login info')
345 return True
346
347 def migrate_rgw_spec(self, spec: Dict[Any, Any]) -> Optional[RGWSpec]:
348 """ Migrate an old rgw spec to the new format."""
349 new_spec = spec.copy()
350 field_content: List[str] = re.split(' +', new_spec['spec']['rgw_frontend_type'])
351 valid_spec = False
352 if 'beast' in field_content:
353 new_spec['spec']['rgw_frontend_type'] = 'beast'
354 field_content.remove('beast')
355 valid_spec = True
356 elif 'civetweb' in field_content:
357 new_spec['spec']['rgw_frontend_type'] = 'civetweb'
358 field_content.remove('civetweb')
359 valid_spec = True
360 else:
361 # Error: Should not happen as that would be an invalid RGW spec. In that case
362 # we keep the spec as it, mark it as unmanaged to avoid the daemons being deleted
363 # and raise a health warning so the user can fix the issue manually later.
364 self.mgr.log.error("Cannot migrate RGW spec, bad rgw_frontend_type value: {spec['spec']['rgw_frontend_type']}.")
365
366 if valid_spec:
367 new_spec['spec']['rgw_frontend_extra_args'] = []
368 new_spec['spec']['rgw_frontend_extra_args'].extend(field_content)
369
370 return RGWSpec.from_json(new_spec)
371
372 def rgw_spec_needs_migration(self, spec: Dict[Any, Any]) -> bool:
373 if 'spec' not in spec:
374 # if users allowed cephadm to set up most of the
375 # attributes, it's possible there is no "spec" section
376 # inside the spec. In that case, no migration is needed
377 return False
378 return 'rgw_frontend_type' in spec['spec'] \
379 and spec['spec']['rgw_frontend_type'] is not None \
380 and spec['spec']['rgw_frontend_type'].strip() not in ['beast', 'civetweb']
381
382 def migrate_5_6(self) -> bool:
383 """
384 Migration 5 -> 6
385
386 Old RGW spec used to allow 'bad' values on the rgw_frontend_type field. For example
387 the following value used to be valid:
388
389 rgw_frontend_type: "beast endpoint=10.16.96.54:8043 tcp_nodelay=1"
390
391 As of 17.2.6 release, these kind of entries are not valid anymore and a more strict check
392 has been added to validate this field.
393
394 This migration logic detects this 'bad' values and tries to transform them to the new
395 valid format where rgw_frontend_type field can only be either 'beast' or 'civetweb'.
396 Any extra arguments detected on rgw_frontend_type field will be parsed and passed in the
397 new spec field rgw_frontend_extra_args.
398 """
399 self.mgr.log.debug(f'Starting rgw migration (queue length is {len(self.rgw_migration_queue)})')
400 for s in self.rgw_migration_queue:
401 spec = s['spec']
402 if self.rgw_spec_needs_migration(spec):
403 rgw_spec = self.migrate_rgw_spec(spec)
404 if rgw_spec is not None:
405 logger.info(f"Migrating {spec} to new RGW with extra args format {rgw_spec}")
406 self.mgr.spec_store.save(rgw_spec)
407 else:
408 logger.info(f"No Migration is needed for rgw spec: {spec}")
409 self.rgw_migration_queue = []
410 return True
411
412
413 def queue_migrate_rgw_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
414 """
415 As aprt of 17.2.6 a stricter RGW spec validation has been added so the field
416 rgw_frontend_type cannot be used to pass rgw-frontends parameters.
417 """
418 service_id = spec_dict['spec']['service_id']
419 queued = mgr.get_store('rgw_migration_queue') or '[]'
420 ls = json.loads(queued)
421 ls.append(spec_dict)
422 mgr.set_store('rgw_migration_queue', json.dumps(ls))
423 mgr.log.info(f'Queued rgw.{service_id} for migration')
424
425
426 def queue_migrate_nfs_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
427 """
428 After 16.2.5 we dropped the NFSServiceSpec pool and namespace properties.
429 Queue up a migration to process later, once we are sure that RADOS is available
430 and so on.
431 """
432 service_id = spec_dict['spec']['service_id']
433 args = spec_dict['spec'].get('spec', {})
434 pool = args.pop('pool', 'nfs-ganesha')
435 ns = args.pop('namespace', service_id)
436 queued = mgr.get_store('nfs_migration_queue') or '[]'
437 ls = json.loads(queued)
438 ls.append([service_id, pool, ns])
439 mgr.set_store('nfs_migration_queue', json.dumps(ls))
440 mgr.log.info(f'Queued nfs.{service_id} for migration')