]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/migrations.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / cephadm / migrations.py
CommitLineData
a4b75251 1import json
f6b5b4d7 2import logging
a4b75251 3from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any
f6b5b4d7
TL
4
5from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
6from cephadm.schedule import HostAssignment
a4b75251 7import rados
f6b5b4d7 8
a4b75251 9from mgr_module import NFS_POOL_NAME
f67539c2 10from orchestrator import OrchestratorError, DaemonDescription
f6b5b4d7
TL
11
12if TYPE_CHECKING:
13 from .module import CephadmOrchestrator
14
a4b75251 15LAST_MIGRATION = 3
f6b5b4d7
TL
16
17logger = logging.getLogger(__name__)
18
19
20class Migrations:
21 def __init__(self, mgr: "CephadmOrchestrator"):
22 self.mgr = mgr
23
24 # Why having a global counter, instead of spec versions?
25 #
26 # for the first migration:
27 # The specs don't change in (this) migration. but the scheduler here.
28 # Adding the version to the specs at this time just felt wrong to me.
29 #
30 # And the specs are only another part of cephadm which needs potential upgrades.
31 # We have the cache, the inventory, the config store, the upgrade (imagine changing the
32 # upgrade code, while an old upgrade is still in progress), naming of daemons,
33 # fs-layout of the daemons, etc.
34 if self.mgr.migration_current is None:
a4b75251
TL
35 self.set(LAST_MIGRATION)
36
37 v = mgr.get_store('nfs_migration_queue')
38 self.nfs_migration_queue = json.loads(v) if v else []
f6b5b4d7
TL
39
40 # for some migrations, we don't need to do anything except for
41 # setting migration_current = 1.
42 # let's try to shortcut things here.
a4b75251 43 self.migrate(True)
f6b5b4d7 44
adb31ebb 45 def set(self, val: int) -> None:
f6b5b4d7
TL
46 self.mgr.set_module_option('migration_current', val)
47 self.mgr.migration_current = val
48
adb31ebb 49 def is_migration_ongoing(self) -> bool:
f6b5b4d7
TL
50 return self.mgr.migration_current != LAST_MIGRATION
51
adb31ebb 52 def verify_no_migration(self) -> None:
f6b5b4d7
TL
53 if self.is_migration_ongoing():
54 # this is raised in module.serve()
55 raise OrchestratorError(
56 "cephadm migration still ongoing. Please wait, until the migration is complete.")
57
a4b75251 58 def migrate(self, startup: bool = False) -> None:
f6b5b4d7
TL
59 if self.mgr.migration_current == 0:
60 if self.migrate_0_1():
61 self.set(1)
62
63 if self.mgr.migration_current == 1:
64 if self.migrate_1_2():
65 self.set(2)
66
a4b75251
TL
67 if self.mgr.migration_current == 2 and not startup:
68 if self.migrate_2_3():
69 self.set(3)
70
f6b5b4d7
TL
71 def migrate_0_1(self) -> bool:
72 """
73 Migration 0 -> 1
74 New scheduler that takes PlacementSpec as the bound and not as recommendation.
75 I.e. the new scheduler won't suggest any new placements outside of the hosts
76 specified by label etc.
77
78 Which means, we have to make sure, we're not removing any daemons directly after
79 upgrading to the new scheduler.
80
81 There is a potential race here:
82 1. user updates his spec to remove daemons
83 2. mgr gets upgraded to new scheduler, before the old scheduler removed the daemon
84 3. now, we're converting the spec to explicit placement, thus reverting (1.)
85 I think this is ok.
86 """
87
88 def interesting_specs() -> Iterator[ServiceSpec]:
f67539c2 89 for s in self.mgr.spec_store.all_specs.values():
f6b5b4d7
TL
90 if s.unmanaged:
91 continue
92 p = s.placement
93 if p is None:
94 continue
95 if p.count is None:
96 continue
97 if not p.hosts and not p.host_pattern and not p.label:
98 continue
99 yield s
100
101 def convert_to_explicit(spec: ServiceSpec) -> None:
f67539c2
TL
102 existing_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name())
103 placements, to_add, to_remove = HostAssignment(
f6b5b4d7 104 spec=spec,
f91f0fd5 105 hosts=self.mgr.inventory.all_specs(),
522d829b 106 unreachable_hosts=self.mgr._unreachable_hosts(),
f67539c2 107 daemons=existing_daemons,
f6b5b4d7
TL
108 ).place()
109
f6b5b4d7
TL
110 # We have to migrate, only if the new scheduler would remove daemons
111 if len(placements) >= len(existing_daemons):
112 return
113
f67539c2
TL
114 def to_hostname(d: DaemonDescription) -> HostPlacementSpec:
115 if d.hostname in old_hosts:
116 return old_hosts[d.hostname]
117 else:
118 assert d.hostname
119 return HostPlacementSpec(d.hostname, '', '')
120
f6b5b4d7 121 old_hosts = {h.hostname: h for h in spec.placement.hosts}
f67539c2 122 new_hosts = [to_hostname(d) for d in existing_daemons]
f6b5b4d7
TL
123
124 new_placement = PlacementSpec(
125 hosts=new_hosts,
126 count=spec.placement.count
127 )
128
129 new_spec = ServiceSpec.from_json(spec.to_json())
130 new_spec.placement = new_placement
131
132 logger.info(f"Migrating {spec.one_line_str()} to explicit placement")
133
134 self.mgr.spec_store.save(new_spec)
135
136 specs = list(interesting_specs())
137 if not specs:
138 return True # nothing to do. shortcut
139
140 if not self.mgr.cache.daemon_cache_filled():
141 logger.info("Unable to migrate yet. Daemon Cache still incomplete.")
142 return False
143
144 for spec in specs:
145 convert_to_explicit(spec)
146
147 return True
148
149 def migrate_1_2(self) -> bool:
150 """
151 After 15.2.4, we unified some service IDs: MONs, MGRs etc no longer have a service id.
152 Which means, the service names changed:
153
154 mon.foo -> mon
155 mgr.foo -> mgr
156
157 This fixes the data structure consistency
158 """
159 bad_specs = {}
f67539c2 160 for name, spec in self.mgr.spec_store.all_specs.items():
f6b5b4d7
TL
161 if name != spec.service_name():
162 bad_specs[name] = (spec.service_name(), spec)
163
164 for old, (new, old_spec) in bad_specs.items():
f67539c2 165 if new not in self.mgr.spec_store.all_specs:
f6b5b4d7
TL
166 spec = old_spec
167 else:
f67539c2 168 spec = self.mgr.spec_store.all_specs[new]
f6b5b4d7
TL
169 spec.unmanaged = True
170 self.mgr.spec_store.save(spec)
f67539c2 171 self.mgr.spec_store.finally_rm(old)
f6b5b4d7
TL
172
173 return True
a4b75251
TL
174
175 def migrate_2_3(self) -> bool:
176 if self.nfs_migration_queue:
177 from nfs.cluster import create_ganesha_pool
178
179 create_ganesha_pool(self.mgr)
180 for service_id, pool, ns in self.nfs_migration_queue:
181 if pool != '.nfs':
182 self.migrate_nfs_spec(service_id, pool, ns)
183 self.nfs_migration_queue = []
184 self.mgr.log.info('Done migrating all NFS services')
185 return True
186
187 def migrate_nfs_spec(self, service_id: str, pool: str, ns: Optional[str]) -> None:
188 renamed = False
189 if service_id.startswith('ganesha-'):
190 service_id = service_id[8:]
191 renamed = True
192
193 self.mgr.log.info(
194 f'Migrating nfs.{service_id} from legacy pool {pool} namespace {ns}'
195 )
196
197 # read exports
198 ioctx = self.mgr.rados.open_ioctx(pool)
199 if ns is not None:
200 ioctx.set_namespace(ns)
201 object_iterator = ioctx.list_objects()
202 exports = []
203 while True:
204 try:
205 obj = object_iterator.__next__()
206 if obj.key.startswith('export-'):
207 self.mgr.log.debug(f'reading {obj.key}')
208 exports.append(obj.read().decode())
209 except StopIteration:
210 break
211 self.mgr.log.info(f'Found {len(exports)} exports for legacy nfs.{service_id}')
212
213 # copy grace file
214 if service_id != ns:
215 try:
216 grace = ioctx.read("grace")
217 new_ioctx = self.mgr.rados.open_ioctx(NFS_POOL_NAME)
218 new_ioctx.set_namespace(service_id)
219 new_ioctx.write_full("grace", grace)
220 self.mgr.log.info('Migrated nfs-ganesha grace file')
221 except rados.ObjectNotFound:
222 self.mgr.log.debug('failed to read old grace file; skipping')
223
224 if renamed and f'nfs.ganesha-{service_id}' in self.mgr.spec_store:
225 # rename from nfs.ganesha-* to nfs.*. This will destroy old daemons and
226 # deploy new ones.
227 self.mgr.log.info(f'Replacing nfs.ganesha-{service_id} with nfs.{service_id}')
228 spec = self.mgr.spec_store[f'nfs.ganesha-{service_id}'].spec
229 self.mgr.spec_store.rm(f'nfs.ganesha-{service_id}')
230 spec.service_id = service_id
231 self.mgr.spec_store.save(spec, True)
232 else:
233 # redeploy all ganesha daemons to ensures that the daemon
234 # cephx are correct AND container configs are set up properly
235 daemons = [d.name() for d in self.mgr.cache.get_daemons_by_service(f'nfs.{service_id}')]
236 self.mgr.log.info(f'Removing old nfs.{service_id} daemons {daemons}')
237 self.mgr.remove_daemons(daemons)
238
239 # re-save service spec (without pool and namespace properties!)
240 spec = self.mgr.spec_store[f'nfs.{service_id}'].spec
241 self.mgr.spec_store.save(spec)
242
243 # import exports
244 for export in exports:
245 ex = ''
246 for line in export.splitlines():
247 if (
248 line.startswith(' secret_access_key =')
249 or line.startswith(' user_id =')
250 ):
251 continue
252 ex += line + '\n'
253 self.mgr.log.debug(f'importing export: {ex}')
254 ret, out, err = self.mgr.mon_command({
255 'prefix': 'nfs export apply',
256 'cluster_id': service_id
257 }, inbuf=ex)
258 if ret:
259 self.mgr.log.warning(f'Failed to migrate export ({ret}): {err}\nExport was:\n{ex}')
260 self.mgr.log.info(f'Done migrating nfs.{service_id}')
261
262
263def queue_migrate_nfs_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
264 """
265 After 16.2.5 we dropped the NFSServiceSpec pool and namespace properties.
266 Queue up a migration to process later, once we are sure that RADOS is available
267 and so on.
268 """
269 service_id = spec_dict['spec']['service_id']
270 args = spec_dict['spec'].get('spec', {})
271 pool = args.pop('pool', 'nfs-ganesha')
272 ns = args.pop('namespace', service_id)
273 queued = mgr.get_store('nfs_migration_queue') or '[]'
274 ls = json.loads(queued)
275 ls.append([service_id, pool, ns])
276 mgr.set_store('nfs_migration_queue', json.dumps(ls))
277 mgr.log.info(f'Queued nfs.{service_id} for migration')