]>
Commit | Line | Data |
---|---|---|
a4b75251 | 1 | import json |
1e59de90 | 2 | import re |
f6b5b4d7 | 3 | import logging |
1e59de90 | 4 | from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any, List |
f6b5b4d7 | 5 | |
1e59de90 | 6 | from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec, RGWSpec |
f6b5b4d7 | 7 | from cephadm.schedule import HostAssignment |
aee94f69 | 8 | from cephadm.utils import SpecialHostLabels |
a4b75251 | 9 | import rados |
f6b5b4d7 | 10 | |
a4b75251 | 11 | from mgr_module import NFS_POOL_NAME |
f67539c2 | 12 | from orchestrator import OrchestratorError, DaemonDescription |
f6b5b4d7 TL |
13 | |
14 | if TYPE_CHECKING: | |
15 | from .module import CephadmOrchestrator | |
16 | ||
1e59de90 | 17 | LAST_MIGRATION = 6 |
f6b5b4d7 TL |
18 | |
19 | logger = logging.getLogger(__name__) | |
20 | ||
21 | ||
22 | class Migrations: | |
23 | def __init__(self, mgr: "CephadmOrchestrator"): | |
24 | self.mgr = mgr | |
25 | ||
26 | # Why having a global counter, instead of spec versions? | |
27 | # | |
28 | # for the first migration: | |
29 | # The specs don't change in (this) migration. but the scheduler here. | |
30 | # Adding the version to the specs at this time just felt wrong to me. | |
31 | # | |
32 | # And the specs are only another part of cephadm which needs potential upgrades. | |
33 | # We have the cache, the inventory, the config store, the upgrade (imagine changing the | |
34 | # upgrade code, while an old upgrade is still in progress), naming of daemons, | |
35 | # fs-layout of the daemons, etc. | |
39ae355f | 36 | self.set_sane_migration_current() |
a4b75251 TL |
37 | |
38 | v = mgr.get_store('nfs_migration_queue') | |
39 | self.nfs_migration_queue = json.loads(v) if v else [] | |
f6b5b4d7 | 40 | |
1e59de90 TL |
41 | r = mgr.get_store('rgw_migration_queue') |
42 | self.rgw_migration_queue = json.loads(r) if r else [] | |
43 | ||
f6b5b4d7 | 44 | # for some migrations, we don't need to do anything except for |
20effc67 | 45 | # incrementing migration_current. |
f6b5b4d7 | 46 | # let's try to shortcut things here. |
a4b75251 | 47 | self.migrate(True) |
f6b5b4d7 | 48 | |
adb31ebb | 49 | def set(self, val: int) -> None: |
f6b5b4d7 TL |
50 | self.mgr.set_module_option('migration_current', val) |
51 | self.mgr.migration_current = val | |
52 | ||
39ae355f TL |
53 | def set_sane_migration_current(self) -> None: |
54 | # migration current should always be an integer | |
55 | # between 0 and LAST_MIGRATION (inclusive) in order to | |
56 | # actually carry out migration. If we find | |
57 | # it is None or too high of a value here we should | |
58 | # set it to some sane value | |
59 | mc: Optional[int] = self.mgr.migration_current | |
60 | if mc is None: | |
61 | logger.info('Found migration_current of "None". Setting to last migration.') | |
62 | self.set(LAST_MIGRATION) | |
63 | return | |
64 | ||
65 | if mc > LAST_MIGRATION: | |
66 | logger.error(f'Found migration_current of {mc} when max should be {LAST_MIGRATION}. Setting back to 0.') | |
67 | # something has gone wrong and caused migration_current | |
68 | # to be higher than it should be able to be. Best option | |
69 | # we have here is to just set it back to 0 | |
70 | self.set(0) | |
71 | ||
adb31ebb | 72 | def is_migration_ongoing(self) -> bool: |
39ae355f TL |
73 | self.set_sane_migration_current() |
74 | mc: Optional[int] = self.mgr.migration_current | |
75 | return mc is None or mc < LAST_MIGRATION | |
f6b5b4d7 | 76 | |
adb31ebb | 77 | def verify_no_migration(self) -> None: |
f6b5b4d7 TL |
78 | if self.is_migration_ongoing(): |
79 | # this is raised in module.serve() | |
80 | raise OrchestratorError( | |
81 | "cephadm migration still ongoing. Please wait, until the migration is complete.") | |
82 | ||
a4b75251 | 83 | def migrate(self, startup: bool = False) -> None: |
f6b5b4d7 TL |
84 | if self.mgr.migration_current == 0: |
85 | if self.migrate_0_1(): | |
86 | self.set(1) | |
87 | ||
88 | if self.mgr.migration_current == 1: | |
89 | if self.migrate_1_2(): | |
90 | self.set(2) | |
91 | ||
a4b75251 TL |
92 | if self.mgr.migration_current == 2 and not startup: |
93 | if self.migrate_2_3(): | |
94 | self.set(3) | |
95 | ||
20effc67 TL |
96 | if self.mgr.migration_current == 3: |
97 | if self.migrate_3_4(): | |
98 | self.set(4) | |
99 | ||
100 | if self.mgr.migration_current == 4: | |
101 | if self.migrate_4_5(): | |
102 | self.set(5) | |
103 | ||
1e59de90 TL |
104 | if self.mgr.migration_current == 5: |
105 | if self.migrate_5_6(): | |
106 | self.set(6) | |
107 | ||
f6b5b4d7 TL |
108 | def migrate_0_1(self) -> bool: |
109 | """ | |
110 | Migration 0 -> 1 | |
111 | New scheduler that takes PlacementSpec as the bound and not as recommendation. | |
112 | I.e. the new scheduler won't suggest any new placements outside of the hosts | |
113 | specified by label etc. | |
114 | ||
115 | Which means, we have to make sure, we're not removing any daemons directly after | |
116 | upgrading to the new scheduler. | |
117 | ||
118 | There is a potential race here: | |
119 | 1. user updates his spec to remove daemons | |
120 | 2. mgr gets upgraded to new scheduler, before the old scheduler removed the daemon | |
121 | 3. now, we're converting the spec to explicit placement, thus reverting (1.) | |
122 | I think this is ok. | |
123 | """ | |
124 | ||
125 | def interesting_specs() -> Iterator[ServiceSpec]: | |
f67539c2 | 126 | for s in self.mgr.spec_store.all_specs.values(): |
f6b5b4d7 TL |
127 | if s.unmanaged: |
128 | continue | |
129 | p = s.placement | |
130 | if p is None: | |
131 | continue | |
132 | if p.count is None: | |
133 | continue | |
134 | if not p.hosts and not p.host_pattern and not p.label: | |
135 | continue | |
136 | yield s | |
137 | ||
138 | def convert_to_explicit(spec: ServiceSpec) -> None: | |
f67539c2 TL |
139 | existing_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name()) |
140 | placements, to_add, to_remove = HostAssignment( | |
f6b5b4d7 | 141 | spec=spec, |
f91f0fd5 | 142 | hosts=self.mgr.inventory.all_specs(), |
20effc67 | 143 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), |
2a845540 | 144 | draining_hosts=self.mgr.cache.get_draining_hosts(), |
f67539c2 | 145 | daemons=existing_daemons, |
f6b5b4d7 TL |
146 | ).place() |
147 | ||
f6b5b4d7 TL |
148 | # We have to migrate, only if the new scheduler would remove daemons |
149 | if len(placements) >= len(existing_daemons): | |
150 | return | |
151 | ||
f67539c2 TL |
152 | def to_hostname(d: DaemonDescription) -> HostPlacementSpec: |
153 | if d.hostname in old_hosts: | |
154 | return old_hosts[d.hostname] | |
155 | else: | |
156 | assert d.hostname | |
157 | return HostPlacementSpec(d.hostname, '', '') | |
158 | ||
f6b5b4d7 | 159 | old_hosts = {h.hostname: h for h in spec.placement.hosts} |
f67539c2 | 160 | new_hosts = [to_hostname(d) for d in existing_daemons] |
f6b5b4d7 TL |
161 | |
162 | new_placement = PlacementSpec( | |
163 | hosts=new_hosts, | |
164 | count=spec.placement.count | |
165 | ) | |
166 | ||
167 | new_spec = ServiceSpec.from_json(spec.to_json()) | |
168 | new_spec.placement = new_placement | |
169 | ||
170 | logger.info(f"Migrating {spec.one_line_str()} to explicit placement") | |
171 | ||
172 | self.mgr.spec_store.save(new_spec) | |
173 | ||
174 | specs = list(interesting_specs()) | |
175 | if not specs: | |
176 | return True # nothing to do. shortcut | |
177 | ||
178 | if not self.mgr.cache.daemon_cache_filled(): | |
179 | logger.info("Unable to migrate yet. Daemon Cache still incomplete.") | |
180 | return False | |
181 | ||
182 | for spec in specs: | |
183 | convert_to_explicit(spec) | |
184 | ||
185 | return True | |
186 | ||
187 | def migrate_1_2(self) -> bool: | |
188 | """ | |
189 | After 15.2.4, we unified some service IDs: MONs, MGRs etc no longer have a service id. | |
190 | Which means, the service names changed: | |
191 | ||
192 | mon.foo -> mon | |
193 | mgr.foo -> mgr | |
194 | ||
195 | This fixes the data structure consistency | |
196 | """ | |
197 | bad_specs = {} | |
f67539c2 | 198 | for name, spec in self.mgr.spec_store.all_specs.items(): |
f6b5b4d7 TL |
199 | if name != spec.service_name(): |
200 | bad_specs[name] = (spec.service_name(), spec) | |
201 | ||
202 | for old, (new, old_spec) in bad_specs.items(): | |
f67539c2 | 203 | if new not in self.mgr.spec_store.all_specs: |
f6b5b4d7 TL |
204 | spec = old_spec |
205 | else: | |
f67539c2 | 206 | spec = self.mgr.spec_store.all_specs[new] |
f6b5b4d7 TL |
207 | spec.unmanaged = True |
208 | self.mgr.spec_store.save(spec) | |
f67539c2 | 209 | self.mgr.spec_store.finally_rm(old) |
f6b5b4d7 TL |
210 | |
211 | return True | |
a4b75251 TL |
212 | |
213 | def migrate_2_3(self) -> bool: | |
214 | if self.nfs_migration_queue: | |
215 | from nfs.cluster import create_ganesha_pool | |
216 | ||
217 | create_ganesha_pool(self.mgr) | |
218 | for service_id, pool, ns in self.nfs_migration_queue: | |
219 | if pool != '.nfs': | |
220 | self.migrate_nfs_spec(service_id, pool, ns) | |
221 | self.nfs_migration_queue = [] | |
222 | self.mgr.log.info('Done migrating all NFS services') | |
223 | return True | |
224 | ||
225 | def migrate_nfs_spec(self, service_id: str, pool: str, ns: Optional[str]) -> None: | |
226 | renamed = False | |
227 | if service_id.startswith('ganesha-'): | |
228 | service_id = service_id[8:] | |
229 | renamed = True | |
230 | ||
231 | self.mgr.log.info( | |
232 | f'Migrating nfs.{service_id} from legacy pool {pool} namespace {ns}' | |
233 | ) | |
234 | ||
235 | # read exports | |
236 | ioctx = self.mgr.rados.open_ioctx(pool) | |
237 | if ns is not None: | |
238 | ioctx.set_namespace(ns) | |
239 | object_iterator = ioctx.list_objects() | |
240 | exports = [] | |
241 | while True: | |
242 | try: | |
243 | obj = object_iterator.__next__() | |
244 | if obj.key.startswith('export-'): | |
245 | self.mgr.log.debug(f'reading {obj.key}') | |
246 | exports.append(obj.read().decode()) | |
247 | except StopIteration: | |
248 | break | |
249 | self.mgr.log.info(f'Found {len(exports)} exports for legacy nfs.{service_id}') | |
250 | ||
251 | # copy grace file | |
252 | if service_id != ns: | |
253 | try: | |
254 | grace = ioctx.read("grace") | |
255 | new_ioctx = self.mgr.rados.open_ioctx(NFS_POOL_NAME) | |
256 | new_ioctx.set_namespace(service_id) | |
257 | new_ioctx.write_full("grace", grace) | |
258 | self.mgr.log.info('Migrated nfs-ganesha grace file') | |
259 | except rados.ObjectNotFound: | |
260 | self.mgr.log.debug('failed to read old grace file; skipping') | |
261 | ||
262 | if renamed and f'nfs.ganesha-{service_id}' in self.mgr.spec_store: | |
263 | # rename from nfs.ganesha-* to nfs.*. This will destroy old daemons and | |
264 | # deploy new ones. | |
265 | self.mgr.log.info(f'Replacing nfs.ganesha-{service_id} with nfs.{service_id}') | |
266 | spec = self.mgr.spec_store[f'nfs.ganesha-{service_id}'].spec | |
267 | self.mgr.spec_store.rm(f'nfs.ganesha-{service_id}') | |
268 | spec.service_id = service_id | |
269 | self.mgr.spec_store.save(spec, True) | |
20effc67 TL |
270 | |
271 | # We have to remove the old daemons here as well, otherwise we'll end up with a port conflict. | |
272 | daemons = [d.name() | |
273 | for d in self.mgr.cache.get_daemons_by_service(f'nfs.ganesha-{service_id}')] | |
274 | self.mgr.log.info(f'Removing old nfs.ganesha-{service_id} daemons {daemons}') | |
275 | self.mgr.remove_daemons(daemons) | |
a4b75251 TL |
276 | else: |
277 | # redeploy all ganesha daemons to ensures that the daemon | |
278 | # cephx are correct AND container configs are set up properly | |
279 | daemons = [d.name() for d in self.mgr.cache.get_daemons_by_service(f'nfs.{service_id}')] | |
280 | self.mgr.log.info(f'Removing old nfs.{service_id} daemons {daemons}') | |
281 | self.mgr.remove_daemons(daemons) | |
282 | ||
283 | # re-save service spec (without pool and namespace properties!) | |
284 | spec = self.mgr.spec_store[f'nfs.{service_id}'].spec | |
285 | self.mgr.spec_store.save(spec) | |
286 | ||
287 | # import exports | |
288 | for export in exports: | |
289 | ex = '' | |
290 | for line in export.splitlines(): | |
291 | if ( | |
292 | line.startswith(' secret_access_key =') | |
293 | or line.startswith(' user_id =') | |
294 | ): | |
295 | continue | |
296 | ex += line + '\n' | |
297 | self.mgr.log.debug(f'importing export: {ex}') | |
298 | ret, out, err = self.mgr.mon_command({ | |
299 | 'prefix': 'nfs export apply', | |
300 | 'cluster_id': service_id | |
301 | }, inbuf=ex) | |
302 | if ret: | |
303 | self.mgr.log.warning(f'Failed to migrate export ({ret}): {err}\nExport was:\n{ex}') | |
304 | self.mgr.log.info(f'Done migrating nfs.{service_id}') | |
305 | ||
20effc67 TL |
306 | def migrate_3_4(self) -> bool: |
307 | # We can't set any host with the _admin label, but we're | |
308 | # going to warn when calling `ceph orch host rm...` | |
309 | if 'client.admin' not in self.mgr.keys.keys: | |
310 | self.mgr._client_keyring_set( | |
311 | entity='client.admin', | |
aee94f69 | 312 | placement=f'label:{SpecialHostLabels.ADMIN}', |
20effc67 TL |
313 | ) |
314 | return True | |
315 | ||
316 | def migrate_4_5(self) -> bool: | |
317 | registry_url = self.mgr.get_module_option('registry_url') | |
318 | registry_username = self.mgr.get_module_option('registry_username') | |
319 | registry_password = self.mgr.get_module_option('registry_password') | |
320 | if registry_url and registry_username and registry_password: | |
321 | ||
322 | registry_credentials = {'url': registry_url, | |
323 | 'username': registry_username, 'password': registry_password} | |
324 | self.mgr.set_store('registry_credentials', json.dumps(registry_credentials)) | |
325 | ||
326 | self.mgr.set_module_option('registry_url', None) | |
327 | self.mgr.check_mon_command({ | |
328 | 'prefix': 'config rm', | |
329 | 'who': 'mgr', | |
330 | 'key': 'mgr/cephadm/registry_url', | |
331 | }) | |
332 | self.mgr.set_module_option('registry_username', None) | |
333 | self.mgr.check_mon_command({ | |
334 | 'prefix': 'config rm', | |
335 | 'who': 'mgr', | |
336 | 'key': 'mgr/cephadm/registry_username', | |
337 | }) | |
338 | self.mgr.set_module_option('registry_password', None) | |
339 | self.mgr.check_mon_command({ | |
340 | 'prefix': 'config rm', | |
341 | 'who': 'mgr', | |
342 | 'key': 'mgr/cephadm/registry_password', | |
343 | }) | |
344 | ||
345 | self.mgr.log.info('Done migrating registry login info') | |
346 | return True | |
347 | ||
1e59de90 TL |
348 | def migrate_rgw_spec(self, spec: Dict[Any, Any]) -> Optional[RGWSpec]: |
349 | """ Migrate an old rgw spec to the new format.""" | |
350 | new_spec = spec.copy() | |
351 | field_content: List[str] = re.split(' +', new_spec['spec']['rgw_frontend_type']) | |
352 | valid_spec = False | |
353 | if 'beast' in field_content: | |
354 | new_spec['spec']['rgw_frontend_type'] = 'beast' | |
355 | field_content.remove('beast') | |
356 | valid_spec = True | |
357 | elif 'civetweb' in field_content: | |
358 | new_spec['spec']['rgw_frontend_type'] = 'civetweb' | |
359 | field_content.remove('civetweb') | |
360 | valid_spec = True | |
361 | else: | |
362 | # Error: Should not happen as that would be an invalid RGW spec. In that case | |
363 | # we keep the spec as it, mark it as unmanaged to avoid the daemons being deleted | |
364 | # and raise a health warning so the user can fix the issue manually later. | |
365 | self.mgr.log.error("Cannot migrate RGW spec, bad rgw_frontend_type value: {spec['spec']['rgw_frontend_type']}.") | |
366 | ||
367 | if valid_spec: | |
368 | new_spec['spec']['rgw_frontend_extra_args'] = [] | |
369 | new_spec['spec']['rgw_frontend_extra_args'].extend(field_content) | |
370 | ||
371 | return RGWSpec.from_json(new_spec) | |
372 | ||
373 | def rgw_spec_needs_migration(self, spec: Dict[Any, Any]) -> bool: | |
05a536ef TL |
374 | if 'spec' not in spec: |
375 | # if users allowed cephadm to set up most of the | |
376 | # attributes, it's possible there is no "spec" section | |
377 | # inside the spec. In that case, no migration is needed | |
378 | return False | |
1e59de90 TL |
379 | return 'rgw_frontend_type' in spec['spec'] \ |
380 | and spec['spec']['rgw_frontend_type'] is not None \ | |
381 | and spec['spec']['rgw_frontend_type'].strip() not in ['beast', 'civetweb'] | |
382 | ||
383 | def migrate_5_6(self) -> bool: | |
384 | """ | |
385 | Migration 5 -> 6 | |
386 | ||
387 | Old RGW spec used to allow 'bad' values on the rgw_frontend_type field. For example | |
388 | the following value used to be valid: | |
389 | ||
390 | rgw_frontend_type: "beast endpoint=10.16.96.54:8043 tcp_nodelay=1" | |
391 | ||
392 | As of 17.2.6 release, these kind of entries are not valid anymore and a more strict check | |
393 | has been added to validate this field. | |
394 | ||
395 | This migration logic detects this 'bad' values and tries to transform them to the new | |
396 | valid format where rgw_frontend_type field can only be either 'beast' or 'civetweb'. | |
397 | Any extra arguments detected on rgw_frontend_type field will be parsed and passed in the | |
398 | new spec field rgw_frontend_extra_args. | |
399 | """ | |
400 | self.mgr.log.debug(f'Starting rgw migration (queue length is {len(self.rgw_migration_queue)})') | |
401 | for s in self.rgw_migration_queue: | |
402 | spec = s['spec'] | |
403 | if self.rgw_spec_needs_migration(spec): | |
404 | rgw_spec = self.migrate_rgw_spec(spec) | |
405 | if rgw_spec is not None: | |
406 | logger.info(f"Migrating {spec} to new RGW with extra args format {rgw_spec}") | |
407 | self.mgr.spec_store.save(rgw_spec) | |
408 | else: | |
409 | logger.info(f"No Migration is needed for rgw spec: {spec}") | |
410 | self.rgw_migration_queue = [] | |
411 | return True | |
412 | ||
413 | ||
414 | def queue_migrate_rgw_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None: | |
415 | """ | |
416 | As aprt of 17.2.6 a stricter RGW spec validation has been added so the field | |
417 | rgw_frontend_type cannot be used to pass rgw-frontends parameters. | |
418 | """ | |
419 | service_id = spec_dict['spec']['service_id'] | |
420 | queued = mgr.get_store('rgw_migration_queue') or '[]' | |
421 | ls = json.loads(queued) | |
422 | ls.append(spec_dict) | |
423 | mgr.set_store('rgw_migration_queue', json.dumps(ls)) | |
424 | mgr.log.info(f'Queued rgw.{service_id} for migration') | |
425 | ||
a4b75251 TL |
426 | |
427 | def queue_migrate_nfs_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None: | |
428 | """ | |
429 | After 16.2.5 we dropped the NFSServiceSpec pool and namespace properties. | |
430 | Queue up a migration to process later, once we are sure that RADOS is available | |
431 | and so on. | |
432 | """ | |
433 | service_id = spec_dict['spec']['service_id'] | |
434 | args = spec_dict['spec'].get('spec', {}) | |
435 | pool = args.pop('pool', 'nfs-ganesha') | |
436 | ns = args.pop('namespace', service_id) | |
437 | queued = mgr.get_store('nfs_migration_queue') or '[]' | |
438 | ls = json.loads(queued) | |
439 | ls.append([service_id, pool, ns]) | |
440 | mgr.set_store('nfs_migration_queue', json.dumps(ls)) | |
441 | mgr.log.info(f'Queued nfs.{service_id} for migration') |