]>
Commit | Line | Data |
---|---|---|
a4b75251 | 1 | import json |
f6b5b4d7 | 2 | import logging |
a4b75251 | 3 | from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any |
f6b5b4d7 TL |
4 | |
5 | from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec | |
6 | from cephadm.schedule import HostAssignment | |
a4b75251 | 7 | import rados |
f6b5b4d7 | 8 | |
a4b75251 | 9 | from mgr_module import NFS_POOL_NAME |
f67539c2 | 10 | from orchestrator import OrchestratorError, DaemonDescription |
f6b5b4d7 TL |
11 | |
12 | if TYPE_CHECKING: | |
13 | from .module import CephadmOrchestrator | |
14 | ||
a4b75251 | 15 | LAST_MIGRATION = 3 |
f6b5b4d7 TL |
16 | |
17 | logger = logging.getLogger(__name__) | |
18 | ||
19 | ||
20 | class Migrations: | |
21 | def __init__(self, mgr: "CephadmOrchestrator"): | |
22 | self.mgr = mgr | |
23 | ||
24 | # Why having a global counter, instead of spec versions? | |
25 | # | |
26 | # for the first migration: | |
27 | # The specs don't change in (this) migration. but the scheduler here. | |
28 | # Adding the version to the specs at this time just felt wrong to me. | |
29 | # | |
30 | # And the specs are only another part of cephadm which needs potential upgrades. | |
31 | # We have the cache, the inventory, the config store, the upgrade (imagine changing the | |
32 | # upgrade code, while an old upgrade is still in progress), naming of daemons, | |
33 | # fs-layout of the daemons, etc. | |
34 | if self.mgr.migration_current is None: | |
a4b75251 TL |
35 | self.set(LAST_MIGRATION) |
36 | ||
37 | v = mgr.get_store('nfs_migration_queue') | |
38 | self.nfs_migration_queue = json.loads(v) if v else [] | |
f6b5b4d7 TL |
39 | |
40 | # for some migrations, we don't need to do anything except for | |
41 | # setting migration_current = 1. | |
42 | # let's try to shortcut things here. | |
a4b75251 | 43 | self.migrate(True) |
f6b5b4d7 | 44 | |
adb31ebb | 45 | def set(self, val: int) -> None: |
f6b5b4d7 TL |
46 | self.mgr.set_module_option('migration_current', val) |
47 | self.mgr.migration_current = val | |
48 | ||
adb31ebb | 49 | def is_migration_ongoing(self) -> bool: |
f6b5b4d7 TL |
50 | return self.mgr.migration_current != LAST_MIGRATION |
51 | ||
adb31ebb | 52 | def verify_no_migration(self) -> None: |
f6b5b4d7 TL |
53 | if self.is_migration_ongoing(): |
54 | # this is raised in module.serve() | |
55 | raise OrchestratorError( | |
56 | "cephadm migration still ongoing. Please wait, until the migration is complete.") | |
57 | ||
a4b75251 | 58 | def migrate(self, startup: bool = False) -> None: |
f6b5b4d7 TL |
59 | if self.mgr.migration_current == 0: |
60 | if self.migrate_0_1(): | |
61 | self.set(1) | |
62 | ||
63 | if self.mgr.migration_current == 1: | |
64 | if self.migrate_1_2(): | |
65 | self.set(2) | |
66 | ||
a4b75251 TL |
67 | if self.mgr.migration_current == 2 and not startup: |
68 | if self.migrate_2_3(): | |
69 | self.set(3) | |
70 | ||
f6b5b4d7 TL |
71 | def migrate_0_1(self) -> bool: |
72 | """ | |
73 | Migration 0 -> 1 | |
74 | New scheduler that takes PlacementSpec as the bound and not as recommendation. | |
75 | I.e. the new scheduler won't suggest any new placements outside of the hosts | |
76 | specified by label etc. | |
77 | ||
78 | Which means, we have to make sure, we're not removing any daemons directly after | |
79 | upgrading to the new scheduler. | |
80 | ||
81 | There is a potential race here: | |
82 | 1. user updates his spec to remove daemons | |
83 | 2. mgr gets upgraded to new scheduler, before the old scheduler removed the daemon | |
84 | 3. now, we're converting the spec to explicit placement, thus reverting (1.) | |
85 | I think this is ok. | |
86 | """ | |
87 | ||
88 | def interesting_specs() -> Iterator[ServiceSpec]: | |
f67539c2 | 89 | for s in self.mgr.spec_store.all_specs.values(): |
f6b5b4d7 TL |
90 | if s.unmanaged: |
91 | continue | |
92 | p = s.placement | |
93 | if p is None: | |
94 | continue | |
95 | if p.count is None: | |
96 | continue | |
97 | if not p.hosts and not p.host_pattern and not p.label: | |
98 | continue | |
99 | yield s | |
100 | ||
101 | def convert_to_explicit(spec: ServiceSpec) -> None: | |
f67539c2 TL |
102 | existing_daemons = self.mgr.cache.get_daemons_by_service(spec.service_name()) |
103 | placements, to_add, to_remove = HostAssignment( | |
f6b5b4d7 | 104 | spec=spec, |
f91f0fd5 | 105 | hosts=self.mgr.inventory.all_specs(), |
522d829b | 106 | unreachable_hosts=self.mgr._unreachable_hosts(), |
f67539c2 | 107 | daemons=existing_daemons, |
f6b5b4d7 TL |
108 | ).place() |
109 | ||
f6b5b4d7 TL |
110 | # We have to migrate, only if the new scheduler would remove daemons |
111 | if len(placements) >= len(existing_daemons): | |
112 | return | |
113 | ||
f67539c2 TL |
114 | def to_hostname(d: DaemonDescription) -> HostPlacementSpec: |
115 | if d.hostname in old_hosts: | |
116 | return old_hosts[d.hostname] | |
117 | else: | |
118 | assert d.hostname | |
119 | return HostPlacementSpec(d.hostname, '', '') | |
120 | ||
f6b5b4d7 | 121 | old_hosts = {h.hostname: h for h in spec.placement.hosts} |
f67539c2 | 122 | new_hosts = [to_hostname(d) for d in existing_daemons] |
f6b5b4d7 TL |
123 | |
124 | new_placement = PlacementSpec( | |
125 | hosts=new_hosts, | |
126 | count=spec.placement.count | |
127 | ) | |
128 | ||
129 | new_spec = ServiceSpec.from_json(spec.to_json()) | |
130 | new_spec.placement = new_placement | |
131 | ||
132 | logger.info(f"Migrating {spec.one_line_str()} to explicit placement") | |
133 | ||
134 | self.mgr.spec_store.save(new_spec) | |
135 | ||
136 | specs = list(interesting_specs()) | |
137 | if not specs: | |
138 | return True # nothing to do. shortcut | |
139 | ||
140 | if not self.mgr.cache.daemon_cache_filled(): | |
141 | logger.info("Unable to migrate yet. Daemon Cache still incomplete.") | |
142 | return False | |
143 | ||
144 | for spec in specs: | |
145 | convert_to_explicit(spec) | |
146 | ||
147 | return True | |
148 | ||
149 | def migrate_1_2(self) -> bool: | |
150 | """ | |
151 | After 15.2.4, we unified some service IDs: MONs, MGRs etc no longer have a service id. | |
152 | Which means, the service names changed: | |
153 | ||
154 | mon.foo -> mon | |
155 | mgr.foo -> mgr | |
156 | ||
157 | This fixes the data structure consistency | |
158 | """ | |
159 | bad_specs = {} | |
f67539c2 | 160 | for name, spec in self.mgr.spec_store.all_specs.items(): |
f6b5b4d7 TL |
161 | if name != spec.service_name(): |
162 | bad_specs[name] = (spec.service_name(), spec) | |
163 | ||
164 | for old, (new, old_spec) in bad_specs.items(): | |
f67539c2 | 165 | if new not in self.mgr.spec_store.all_specs: |
f6b5b4d7 TL |
166 | spec = old_spec |
167 | else: | |
f67539c2 | 168 | spec = self.mgr.spec_store.all_specs[new] |
f6b5b4d7 TL |
169 | spec.unmanaged = True |
170 | self.mgr.spec_store.save(spec) | |
f67539c2 | 171 | self.mgr.spec_store.finally_rm(old) |
f6b5b4d7 TL |
172 | |
173 | return True | |
a4b75251 TL |
174 | |
175 | def migrate_2_3(self) -> bool: | |
176 | if self.nfs_migration_queue: | |
177 | from nfs.cluster import create_ganesha_pool | |
178 | ||
179 | create_ganesha_pool(self.mgr) | |
180 | for service_id, pool, ns in self.nfs_migration_queue: | |
181 | if pool != '.nfs': | |
182 | self.migrate_nfs_spec(service_id, pool, ns) | |
183 | self.nfs_migration_queue = [] | |
184 | self.mgr.log.info('Done migrating all NFS services') | |
185 | return True | |
186 | ||
187 | def migrate_nfs_spec(self, service_id: str, pool: str, ns: Optional[str]) -> None: | |
188 | renamed = False | |
189 | if service_id.startswith('ganesha-'): | |
190 | service_id = service_id[8:] | |
191 | renamed = True | |
192 | ||
193 | self.mgr.log.info( | |
194 | f'Migrating nfs.{service_id} from legacy pool {pool} namespace {ns}' | |
195 | ) | |
196 | ||
197 | # read exports | |
198 | ioctx = self.mgr.rados.open_ioctx(pool) | |
199 | if ns is not None: | |
200 | ioctx.set_namespace(ns) | |
201 | object_iterator = ioctx.list_objects() | |
202 | exports = [] | |
203 | while True: | |
204 | try: | |
205 | obj = object_iterator.__next__() | |
206 | if obj.key.startswith('export-'): | |
207 | self.mgr.log.debug(f'reading {obj.key}') | |
208 | exports.append(obj.read().decode()) | |
209 | except StopIteration: | |
210 | break | |
211 | self.mgr.log.info(f'Found {len(exports)} exports for legacy nfs.{service_id}') | |
212 | ||
213 | # copy grace file | |
214 | if service_id != ns: | |
215 | try: | |
216 | grace = ioctx.read("grace") | |
217 | new_ioctx = self.mgr.rados.open_ioctx(NFS_POOL_NAME) | |
218 | new_ioctx.set_namespace(service_id) | |
219 | new_ioctx.write_full("grace", grace) | |
220 | self.mgr.log.info('Migrated nfs-ganesha grace file') | |
221 | except rados.ObjectNotFound: | |
222 | self.mgr.log.debug('failed to read old grace file; skipping') | |
223 | ||
224 | if renamed and f'nfs.ganesha-{service_id}' in self.mgr.spec_store: | |
225 | # rename from nfs.ganesha-* to nfs.*. This will destroy old daemons and | |
226 | # deploy new ones. | |
227 | self.mgr.log.info(f'Replacing nfs.ganesha-{service_id} with nfs.{service_id}') | |
228 | spec = self.mgr.spec_store[f'nfs.ganesha-{service_id}'].spec | |
229 | self.mgr.spec_store.rm(f'nfs.ganesha-{service_id}') | |
230 | spec.service_id = service_id | |
231 | self.mgr.spec_store.save(spec, True) | |
232 | else: | |
233 | # redeploy all ganesha daemons to ensures that the daemon | |
234 | # cephx are correct AND container configs are set up properly | |
235 | daemons = [d.name() for d in self.mgr.cache.get_daemons_by_service(f'nfs.{service_id}')] | |
236 | self.mgr.log.info(f'Removing old nfs.{service_id} daemons {daemons}') | |
237 | self.mgr.remove_daemons(daemons) | |
238 | ||
239 | # re-save service spec (without pool and namespace properties!) | |
240 | spec = self.mgr.spec_store[f'nfs.{service_id}'].spec | |
241 | self.mgr.spec_store.save(spec) | |
242 | ||
243 | # import exports | |
244 | for export in exports: | |
245 | ex = '' | |
246 | for line in export.splitlines(): | |
247 | if ( | |
248 | line.startswith(' secret_access_key =') | |
249 | or line.startswith(' user_id =') | |
250 | ): | |
251 | continue | |
252 | ex += line + '\n' | |
253 | self.mgr.log.debug(f'importing export: {ex}') | |
254 | ret, out, err = self.mgr.mon_command({ | |
255 | 'prefix': 'nfs export apply', | |
256 | 'cluster_id': service_id | |
257 | }, inbuf=ex) | |
258 | if ret: | |
259 | self.mgr.log.warning(f'Failed to migrate export ({ret}): {err}\nExport was:\n{ex}') | |
260 | self.mgr.log.info(f'Done migrating nfs.{service_id}') | |
261 | ||
262 | ||
263 | def queue_migrate_nfs_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None: | |
264 | """ | |
265 | After 16.2.5 we dropped the NFSServiceSpec pool and namespace properties. | |
266 | Queue up a migration to process later, once we are sure that RADOS is available | |
267 | and so on. | |
268 | """ | |
269 | service_id = spec_dict['spec']['service_id'] | |
270 | args = spec_dict['spec'].get('spec', {}) | |
271 | pool = args.pop('pool', 'nfs-ganesha') | |
272 | ns = args.pop('namespace', service_id) | |
273 | queued = mgr.get_store('nfs_migration_queue') or '[]' | |
274 | ls = json.loads(queued) | |
275 | ls.append([service_id, pool, ns]) | |
276 | mgr.set_store('nfs_migration_queue', json.dumps(ls)) | |
277 | mgr.log.info(f'Queued nfs.{service_id} for migration') |