]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/migrations.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / cephadm / migrations.py
CommitLineData
a4b75251 1import json
f6b5b4d7 2import logging
a4b75251 3from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any
f6b5b4d7
TL
4
5from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
6from cephadm.schedule import HostAssignment
a4b75251 7import rados
f6b5b4d7 8
a4b75251 9from mgr_module import NFS_POOL_NAME
f67539c2 10from orchestrator import OrchestratorError, DaemonDescription
f6b5b4d7
TL
11
12if TYPE_CHECKING:
13 from .module import CephadmOrchestrator
14
20effc67 15LAST_MIGRATION = 5
f6b5b4d7
TL
16
17logger = logging.getLogger(__name__)
18
19
20class Migrations:
21 def __init__(self, mgr: "CephadmOrchestrator"):
22 self.mgr = mgr
23
24 # Why having a global counter, instead of spec versions?
25 #
26 # for the first migration:
27 # The specs don't change in (this) migration. but the scheduler here.
28 # Adding the version to the specs at this time just felt wrong to me.
29 #
30 # And the specs are only another part of cephadm which needs potential upgrades.
31 # We have the cache, the inventory, the config store, the upgrade (imagine changing the
32 # upgrade code, while an old upgrade is still in progress), naming of daemons,
33 # fs-layout of the daemons, etc.
39ae355f 34 self.set_sane_migration_current()
a4b75251
TL
35
36 v = mgr.get_store('nfs_migration_queue')
37 self.nfs_migration_queue = json.loads(v) if v else []
f6b5b4d7
TL
38
39 # for some migrations, we don't need to do anything except for
20effc67 40 # incrementing migration_current.
f6b5b4d7 41 # let's try to shortcut things here.
a4b75251 42 self.migrate(True)
f6b5b4d7 43
adb31ebb 44 def set(self, val: int) -> None:
f6b5b4d7
TL
45 self.mgr.set_module_option('migration_current', val)
46 self.mgr.migration_current = val
47
39ae355f
TL
48 def set_sane_migration_current(self) -> None:
49 # migration current should always be an integer
50 # between 0 and LAST_MIGRATION (inclusive) in order to
51 # actually carry out migration. If we find
52 # it is None or too high of a value here we should
53 # set it to some sane value
54 mc: Optional[int] = self.mgr.migration_current
55 if mc is None:
56 logger.info('Found migration_current of "None". Setting to last migration.')
57 self.set(LAST_MIGRATION)
58 return
59
60 if mc > LAST_MIGRATION:
61 logger.error(f'Found migration_current of {mc} when max should be {LAST_MIGRATION}. Setting back to 0.')
62 # something has gone wrong and caused migration_current
63 # to be higher than it should be able to be. Best option
64 # we have here is to just set it back to 0
65 self.set(0)
66
adb31ebb 67 def is_migration_ongoing(self) -> bool:
39ae355f
TL
68 self.set_sane_migration_current()
69 mc: Optional[int] = self.mgr.migration_current
70 return mc is None or mc < LAST_MIGRATION
f6b5b4d7 71
adb31ebb 72 def verify_no_migration(self) -> None:
f6b5b4d7
TL
73 if self.is_migration_ongoing():
74 # this is raised in module.serve()
75 raise OrchestratorError(
76 "cephadm migration still ongoing. Please wait, until the migration is complete.")
77
a4b75251 78 def migrate(self, startup: bool = False) -> None:
f6b5b4d7
TL
79 if self.mgr.migration_current == 0:
80 if self.migrate_0_1():
81 self.set(1)
82
83 if self.mgr.migration_current == 1:
84 if self.migrate_1_2():
85 self.set(2)
86
a4b75251
TL
87 if self.mgr.migration_current == 2 and not startup:
88 if self.migrate_2_3():
89 self.set(3)
90
20effc67
TL
91 if self.mgr.migration_current == 3:
92 if self.migrate_3_4():
93 self.set(4)
94
95 if self.mgr.migration_current == 4:
96 if self.migrate_4_5():
97 self.set(5)
98
f6b5b4d7
TL
99 def migrate_0_1(self) -> bool:
100 """
101 Migration 0 -> 1
102 New scheduler that takes PlacementSpec as the bound and not as recommendation.
103 I.e. the new scheduler won't suggest any new placements outside of the hosts
104 specified by label etc.
105
106 Which means, we have to make sure, we're not removing any daemons directly after
107 upgrading to the new scheduler.
108
109 There is a potential race here:
110 1. user updates his spec to remove daemons
111 2. mgr gets upgraded to new scheduler, before the old scheduler removed the daemon
112 3. now, we're converting the spec to explicit placement, thus reverting (1.)
113 I think this is ok.
114 """
115
116 def interesting_specs() -> Iterator[ServiceSpec]:
f67539c2 117 for s in self.mgr.spec_store.all_specs.values():
f6b5b4d7
TL
118 if s.unmanaged:
119 continue
120 p = s.placement
121 if p is None:
122 continue
123 if p.count is None:
124 continue
125 if not p.hosts and not p.host_pattern and not p.label:
126 continue
127 yield s
128
129 def convert_to_explicit(spec: ServiceSpec) -> None:
f67539c2
TL
130 existing_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
131 placements, to_add, to_remove = HostAssignment(
f6b5b4d7 132 spec=spec,
f91f0fd5 133 hosts=self.mgr.inventory.all_specs(),
20effc67 134 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
2a845540 135 draining_hosts=self.mgr.cache.get_draining_hosts(),
f67539c2 136 daemons=existing_daemons,
f6b5b4d7
TL
137 ).place()
138
f6b5b4d7
TL
139 # We have to migrate, only if the new scheduler would remove daemons
140 if len(placements) >= len(existing_daemons):
141 return
142
f67539c2
TL
143 def to_hostname(d: DaemonDescription) -> HostPlacementSpec:
144 if d.hostname in old_hosts:
145 return old_hosts[d.hostname]
146 else:
147 assert d.hostname
148 return HostPlacementSpec(d.hostname, '', '')
149
f6b5b4d7 150 old_hosts = {h.hostname: h for h in spec.placement.hosts}
f67539c2 151 new_hosts = [to_hostname(d) for d in existing_daemons]
f6b5b4d7
TL
152
153 new_placement = PlacementSpec(
154 hosts=new_hosts,
155 count=spec.placement.count
156 )
157
158 new_spec = ServiceSpec.from_json(spec.to_json())
159 new_spec.placement = new_placement
160
161 logger.info(f"Migrating {spec.one_line_str()} to explicit placement")
162
163 self.mgr.spec_store.save(new_spec)
164
165 specs = list(interesting_specs())
166 if not specs:
167 return True # nothing to do. shortcut
168
169 if not self.mgr.cache.daemon_cache_filled():
170 logger.info("Unable to migrate yet. Daemon Cache still incomplete.")
171 return False
172
173 for spec in specs:
174 convert_to_explicit(spec)
175
176 return True
177
178 def migrate_1_2(self) -> bool:
179 """
180 After 15.2.4, we unified some service IDs: MONs, MGRs etc no longer have a service id.
181 Which means, the service names changed:
182
183 mon.foo -> mon
184 mgr.foo -> mgr
185
186 This fixes the data structure consistency
187 """
188 bad_specs = {}
f67539c2 189 for name, spec in self.mgr.spec_store.all_specs.items():
f6b5b4d7
TL
190 if name != spec.service_name():
191 bad_specs[name] = (spec.service_name(), spec)
192
193 for old, (new, old_spec) in bad_specs.items():
f67539c2 194 if new not in self.mgr.spec_store.all_specs:
f6b5b4d7
TL
195 spec = old_spec
196 else:
f67539c2 197 spec = self.mgr.spec_store.all_specs[new]
f6b5b4d7
TL
198 spec.unmanaged = True
199 self.mgr.spec_store.save(spec)
f67539c2 200 self.mgr.spec_store.finally_rm(old)
f6b5b4d7
TL
201
202 return True
a4b75251
TL
203
204 def migrate_2_3(self) -> bool:
205 if self.nfs_migration_queue:
206 from nfs.cluster import create_ganesha_pool
207
208 create_ganesha_pool(self.mgr)
209 for service_id, pool, ns in self.nfs_migration_queue:
210 if pool != '.nfs':
211 self.migrate_nfs_spec(service_id, pool, ns)
212 self.nfs_migration_queue = []
213 self.mgr.log.info('Done migrating all NFS services')
214 return True
215
216 def migrate_nfs_spec(self, service_id: str, pool: str, ns: Optional[str]) -> None:
217 renamed = False
218 if service_id.startswith('ganesha-'):
219 service_id = service_id[8:]
220 renamed = True
221
222 self.mgr.log.info(
223 f'Migrating nfs.{service_id} from legacy pool {pool} namespace {ns}'
224 )
225
226 # read exports
227 ioctx = self.mgr.rados.open_ioctx(pool)
228 if ns is not None:
229 ioctx.set_namespace(ns)
230 object_iterator = ioctx.list_objects()
231 exports = []
232 while True:
233 try:
234 obj = object_iterator.__next__()
235 if obj.key.startswith('export-'):
236 self.mgr.log.debug(f'reading {obj.key}')
237 exports.append(obj.read().decode())
238 except StopIteration:
239 break
240 self.mgr.log.info(f'Found {len(exports)} exports for legacy nfs.{service_id}')
241
242 # copy grace file
243 if service_id != ns:
244 try:
245 grace = ioctx.read("grace")
246 new_ioctx = self.mgr.rados.open_ioctx(NFS_POOL_NAME)
247 new_ioctx.set_namespace(service_id)
248 new_ioctx.write_full("grace", grace)
249 self.mgr.log.info('Migrated nfs-ganesha grace file')
250 except rados.ObjectNotFound:
251 self.mgr.log.debug('failed to read old grace file; skipping')
252
253 if renamed and f'nfs.ganesha-{service_id}' in self.mgr.spec_store:
254 # rename from nfs.ganesha-* to nfs.*. This will destroy old daemons and
255 # deploy new ones.
256 self.mgr.log.info(f'Replacing nfs.ganesha-{service_id} with nfs.{service_id}')
257 spec = self.mgr.spec_store[f'nfs.ganesha-{service_id}'].spec
258 self.mgr.spec_store.rm(f'nfs.ganesha-{service_id}')
259 spec.service_id = service_id
260 self.mgr.spec_store.save(spec, True)
20effc67
TL
261
262 # We have to remove the old daemons here as well, otherwise we'll end up with a port conflict.
263 daemons = [d.name()
264 for d in self.mgr.cache.get_daemons_by_service(f'nfs.ganesha-{service_id}')]
265 self.mgr.log.info(f'Removing old nfs.ganesha-{service_id} daemons {daemons}')
266 self.mgr.remove_daemons(daemons)
a4b75251
TL
267 else:
268 # redeploy all ganesha daemons to ensures that the daemon
269 # cephx are correct AND container configs are set up properly
270 daemons = [d.name() for d in self.mgr.cache.get_daemons_by_service(f'nfs.{service_id}')]
271 self.mgr.log.info(f'Removing old nfs.{service_id} daemons {daemons}')
272 self.mgr.remove_daemons(daemons)
273
274 # re-save service spec (without pool and namespace properties!)
275 spec = self.mgr.spec_store[f'nfs.{service_id}'].spec
276 self.mgr.spec_store.save(spec)
277
278 # import exports
279 for export in exports:
280 ex = ''
281 for line in export.splitlines():
282 if (
283 line.startswith(' secret_access_key =')
284 or line.startswith(' user_id =')
285 ):
286 continue
287 ex += line + '\n'
288 self.mgr.log.debug(f'importing export: {ex}')
289 ret, out, err = self.mgr.mon_command({
290 'prefix': 'nfs export apply',
291 'cluster_id': service_id
292 }, inbuf=ex)
293 if ret:
294 self.mgr.log.warning(f'Failed to migrate export ({ret}): {err}\nExport was:\n{ex}')
295 self.mgr.log.info(f'Done migrating nfs.{service_id}')
296
20effc67
TL
297 def migrate_3_4(self) -> bool:
298 # We can't set any host with the _admin label, but we're
299 # going to warn when calling `ceph orch host rm...`
300 if 'client.admin' not in self.mgr.keys.keys:
301 self.mgr._client_keyring_set(
302 entity='client.admin',
303 placement='label:_admin',
304 )
305 return True
306
307 def migrate_4_5(self) -> bool:
308 registry_url = self.mgr.get_module_option('registry_url')
309 registry_username = self.mgr.get_module_option('registry_username')
310 registry_password = self.mgr.get_module_option('registry_password')
311 if registry_url and registry_username and registry_password:
312
313 registry_credentials = {'url': registry_url,
314 'username': registry_username, 'password': registry_password}
315 self.mgr.set_store('registry_credentials', json.dumps(registry_credentials))
316
317 self.mgr.set_module_option('registry_url', None)
318 self.mgr.check_mon_command({
319 'prefix': 'config rm',
320 'who': 'mgr',
321 'key': 'mgr/cephadm/registry_url',
322 })
323 self.mgr.set_module_option('registry_username', None)
324 self.mgr.check_mon_command({
325 'prefix': 'config rm',
326 'who': 'mgr',
327 'key': 'mgr/cephadm/registry_username',
328 })
329 self.mgr.set_module_option('registry_password', None)
330 self.mgr.check_mon_command({
331 'prefix': 'config rm',
332 'who': 'mgr',
333 'key': 'mgr/cephadm/registry_password',
334 })
335
336 self.mgr.log.info('Done migrating registry login info')
337 return True
338
a4b75251
TL
339
340def queue_migrate_nfs_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
341 """
342 After 16.2.5 we dropped the NFSServiceSpec pool and namespace properties.
343 Queue up a migration to process later, once we are sure that RADOS is available
344 and so on.
345 """
346 service_id = spec_dict['spec']['service_id']
347 args = spec_dict['spec'].get('spec', {})
348 pool = args.pop('pool', 'nfs-ganesha')
349 ns = args.pop('namespace', service_id)
350 queued = mgr.get_store('nfs_migration_queue') or '[]'
351 ls = json.loads(queued)
352 ls.append([service_id, pool, ns])
353 mgr.set_store('nfs_migration_queue', json.dumps(ls))
354 mgr.log.info(f'Queued nfs.{service_id} for migration')