]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/migrations.py
69f39cb9107703eb286b7ce0ee7085ace3bde420
[ceph.git] / ceph / src / pybind / mgr / cephadm / migrations.py
1 import json
2 import logging
3 from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any
4
5 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
6 from cephadm.schedule import HostAssignment
7 import rados
8
9 from mgr_module import NFS_POOL_NAME
10 from orchestrator import OrchestratorError, DaemonDescription
11
12 if TYPE_CHECKING:
13 from .module import CephadmOrchestrator
14
15 LAST_MIGRATION = 5
16
17 logger = logging.getLogger(__name__)
18
19
20 class Migrations:
21 def __init__(self, mgr: "CephadmOrchestrator"):
22 self.mgr = mgr
23
24 # Why having a global counter, instead of spec versions?
25 #
26 # for the first migration:
27 # The specs don't change in (this) migration. but the scheduler here.
28 # Adding the version to the specs at this time just felt wrong to me.
29 #
30 # And the specs are only another part of cephadm which needs potential upgrades.
31 # We have the cache, the inventory, the config store, the upgrade (imagine changing the
32 # upgrade code, while an old upgrade is still in progress), naming of daemons,
33 # fs-layout of the daemons, etc.
34 if self.mgr.migration_current is None:
35 self.set(LAST_MIGRATION)
36
37 v = mgr.get_store('nfs_migration_queue')
38 self.nfs_migration_queue = json.loads(v) if v else []
39
40 # for some migrations, we don't need to do anything except for
41 # incrementing migration_current.
42 # let's try to shortcut things here.
43 self.migrate(True)
44
45 def set(self, val: int) -> None:
46 self.mgr.set_module_option('migration_current', val)
47 self.mgr.migration_current = val
48
49 def is_migration_ongoing(self) -> bool:
50 return self.mgr.migration_current != LAST_MIGRATION
51
52 def verify_no_migration(self) -> None:
53 if self.is_migration_ongoing():
54 # this is raised in module.serve()
55 raise OrchestratorError(
56 "cephadm migration still ongoing. Please wait, until the migration is complete.")
57
58 def migrate(self, startup: bool = False) -> None:
59 if self.mgr.migration_current == 0:
60 if self.migrate_0_1():
61 self.set(1)
62
63 if self.mgr.migration_current == 1:
64 if self.migrate_1_2():
65 self.set(2)
66
67 if self.mgr.migration_current == 2 and not startup:
68 if self.migrate_2_3():
69 self.set(3)
70
71 if self.mgr.migration_current == 3:
72 if self.migrate_3_4():
73 self.set(4)
74
75 if self.mgr.migration_current == 4:
76 if self.migrate_4_5():
77 self.set(5)
78
79 def migrate_0_1(self) -> bool:
80 """
81 Migration 0 -> 1
82 New scheduler that takes PlacementSpec as the bound and not as recommendation.
83 I.e. the new scheduler won't suggest any new placements outside of the hosts
84 specified by label etc.
85
86 Which means, we have to make sure, we're not removing any daemons directly after
87 upgrading to the new scheduler.
88
89 There is a potential race here:
90 1. user updates his spec to remove daemons
91 2. mgr gets upgraded to new scheduler, before the old scheduler removed the daemon
92 3. now, we're converting the spec to explicit placement, thus reverting (1.)
93 I think this is ok.
94 """
95
96 def interesting_specs() -> Iterator[ServiceSpec]:
97 for s in self.mgr.spec_store.all_specs.values():
98 if s.unmanaged:
99 continue
100 p = s.placement
101 if p is None:
102 continue
103 if p.count is None:
104 continue
105 if not p.hosts and not p.host_pattern and not p.label:
106 continue
107 yield s
108
109 def convert_to_explicit(spec: ServiceSpec) -> None:
110 existing_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
111 placements, to_add, to_remove = HostAssignment(
112 spec=spec,
113 hosts=self.mgr.inventory.all_specs(),
114 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
115 draining_hosts=self.mgr.cache.get_draining_hosts(),
116 daemons=existing_daemons,
117 ).place()
118
119 # We have to migrate, only if the new scheduler would remove daemons
120 if len(placements) >= len(existing_daemons):
121 return
122
123 def to_hostname(d: DaemonDescription) -> HostPlacementSpec:
124 if d.hostname in old_hosts:
125 return old_hosts[d.hostname]
126 else:
127 assert d.hostname
128 return HostPlacementSpec(d.hostname, '', '')
129
130 old_hosts = {h.hostname: h for h in spec.placement.hosts}
131 new_hosts = [to_hostname(d) for d in existing_daemons]
132
133 new_placement = PlacementSpec(
134 hosts=new_hosts,
135 count=spec.placement.count
136 )
137
138 new_spec = ServiceSpec.from_json(spec.to_json())
139 new_spec.placement = new_placement
140
141 logger.info(f"Migrating {spec.one_line_str()} to explicit placement")
142
143 self.mgr.spec_store.save(new_spec)
144
145 specs = list(interesting_specs())
146 if not specs:
147 return True # nothing to do. shortcut
148
149 if not self.mgr.cache.daemon_cache_filled():
150 logger.info("Unable to migrate yet. Daemon Cache still incomplete.")
151 return False
152
153 for spec in specs:
154 convert_to_explicit(spec)
155
156 return True
157
158 def migrate_1_2(self) -> bool:
159 """
160 After 15.2.4, we unified some service IDs: MONs, MGRs etc no longer have a service id.
161 Which means, the service names changed:
162
163 mon.foo -> mon
164 mgr.foo -> mgr
165
166 This fixes the data structure consistency
167 """
168 bad_specs = {}
169 for name, spec in self.mgr.spec_store.all_specs.items():
170 if name != spec.service_name():
171 bad_specs[name] = (spec.service_name(), spec)
172
173 for old, (new, old_spec) in bad_specs.items():
174 if new not in self.mgr.spec_store.all_specs:
175 spec = old_spec
176 else:
177 spec = self.mgr.spec_store.all_specs[new]
178 spec.unmanaged = True
179 self.mgr.spec_store.save(spec)
180 self.mgr.spec_store.finally_rm(old)
181
182 return True
183
184 def migrate_2_3(self) -> bool:
185 if self.nfs_migration_queue:
186 from nfs.cluster import create_ganesha_pool
187
188 create_ganesha_pool(self.mgr)
189 for service_id, pool, ns in self.nfs_migration_queue:
190 if pool != '.nfs':
191 self.migrate_nfs_spec(service_id, pool, ns)
192 self.nfs_migration_queue = []
193 self.mgr.log.info('Done migrating all NFS services')
194 return True
195
196 def migrate_nfs_spec(self, service_id: str, pool: str, ns: Optional[str]) -> None:
197 renamed = False
198 if service_id.startswith('ganesha-'):
199 service_id = service_id[8:]
200 renamed = True
201
202 self.mgr.log.info(
203 f'Migrating nfs.{service_id} from legacy pool {pool} namespace {ns}'
204 )
205
206 # read exports
207 ioctx = self.mgr.rados.open_ioctx(pool)
208 if ns is not None:
209 ioctx.set_namespace(ns)
210 object_iterator = ioctx.list_objects()
211 exports = []
212 while True:
213 try:
214 obj = object_iterator.__next__()
215 if obj.key.startswith('export-'):
216 self.mgr.log.debug(f'reading {obj.key}')
217 exports.append(obj.read().decode())
218 except StopIteration:
219 break
220 self.mgr.log.info(f'Found {len(exports)} exports for legacy nfs.{service_id}')
221
222 # copy grace file
223 if service_id != ns:
224 try:
225 grace = ioctx.read("grace")
226 new_ioctx = self.mgr.rados.open_ioctx(NFS_POOL_NAME)
227 new_ioctx.set_namespace(service_id)
228 new_ioctx.write_full("grace", grace)
229 self.mgr.log.info('Migrated nfs-ganesha grace file')
230 except rados.ObjectNotFound:
231 self.mgr.log.debug('failed to read old grace file; skipping')
232
233 if renamed and f'nfs.ganesha-{service_id}' in self.mgr.spec_store:
234 # rename from nfs.ganesha-* to nfs.*. This will destroy old daemons and
235 # deploy new ones.
236 self.mgr.log.info(f'Replacing nfs.ganesha-{service_id} with nfs.{service_id}')
237 spec = self.mgr.spec_store[f'nfs.ganesha-{service_id}'].spec
238 self.mgr.spec_store.rm(f'nfs.ganesha-{service_id}')
239 spec.service_id = service_id
240 self.mgr.spec_store.save(spec, True)
241
242 # We have to remove the old daemons here as well, otherwise we'll end up with a port conflict.
243 daemons = [d.name()
244 for d in self.mgr.cache.get_daemons_by_service(f'nfs.ganesha-{service_id}')]
245 self.mgr.log.info(f'Removing old nfs.ganesha-{service_id} daemons {daemons}')
246 self.mgr.remove_daemons(daemons)
247 else:
248 # redeploy all ganesha daemons to ensures that the daemon
249 # cephx are correct AND container configs are set up properly
250 daemons = [d.name() for d in self.mgr.cache.get_daemons_by_service(f'nfs.{service_id}')]
251 self.mgr.log.info(f'Removing old nfs.{service_id} daemons {daemons}')
252 self.mgr.remove_daemons(daemons)
253
254 # re-save service spec (without pool and namespace properties!)
255 spec = self.mgr.spec_store[f'nfs.{service_id}'].spec
256 self.mgr.spec_store.save(spec)
257
258 # import exports
259 for export in exports:
260 ex = ''
261 for line in export.splitlines():
262 if (
263 line.startswith(' secret_access_key =')
264 or line.startswith(' user_id =')
265 ):
266 continue
267 ex += line + '\n'
268 self.mgr.log.debug(f'importing export: {ex}')
269 ret, out, err = self.mgr.mon_command({
270 'prefix': 'nfs export apply',
271 'cluster_id': service_id
272 }, inbuf=ex)
273 if ret:
274 self.mgr.log.warning(f'Failed to migrate export ({ret}): {err}\nExport was:\n{ex}')
275 self.mgr.log.info(f'Done migrating nfs.{service_id}')
276
277 def migrate_3_4(self) -> bool:
278 # We can't set any host with the _admin label, but we're
279 # going to warn when calling `ceph orch host rm...`
280 if 'client.admin' not in self.mgr.keys.keys:
281 self.mgr._client_keyring_set(
282 entity='client.admin',
283 placement='label:_admin',
284 )
285 return True
286
287 def migrate_4_5(self) -> bool:
288 registry_url = self.mgr.get_module_option('registry_url')
289 registry_username = self.mgr.get_module_option('registry_username')
290 registry_password = self.mgr.get_module_option('registry_password')
291 if registry_url and registry_username and registry_password:
292
293 registry_credentials = {'url': registry_url,
294 'username': registry_username, 'password': registry_password}
295 self.mgr.set_store('registry_credentials', json.dumps(registry_credentials))
296
297 self.mgr.set_module_option('registry_url', None)
298 self.mgr.check_mon_command({
299 'prefix': 'config rm',
300 'who': 'mgr',
301 'key': 'mgr/cephadm/registry_url',
302 })
303 self.mgr.set_module_option('registry_username', None)
304 self.mgr.check_mon_command({
305 'prefix': 'config rm',
306 'who': 'mgr',
307 'key': 'mgr/cephadm/registry_username',
308 })
309 self.mgr.set_module_option('registry_password', None)
310 self.mgr.check_mon_command({
311 'prefix': 'config rm',
312 'who': 'mgr',
313 'key': 'mgr/cephadm/registry_password',
314 })
315
316 self.mgr.log.info('Done migrating registry login info')
317 return True
318
319
320 def queue_migrate_nfs_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
321 """
322 After 16.2.5 we dropped the NFSServiceSpec pool and namespace properties.
323 Queue up a migration to process later, once we are sure that RADOS is available
324 and so on.
325 """
326 service_id = spec_dict['spec']['service_id']
327 args = spec_dict['spec'].get('spec', {})
328 pool = args.pop('pool', 'nfs-ganesha')
329 ns = args.pop('namespace', service_id)
330 queued = mgr.get_store('nfs_migration_queue') or '[]'
331 ls = json.loads(queued)
332 ls.append([service_id, pool, ns])
333 mgr.set_store('nfs_migration_queue', json.dumps(ls))
334 mgr.log.info(f'Queued nfs.{service_id} for migration')