]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import datetime |
2 | import json | |
3 | import logging | |
4 | from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional | |
5 | ||
6 | from ceph.deployment import translate | |
7 | from ceph.deployment.drive_group import DriveGroupSpec | |
8 | from ceph.deployment.drive_selection import DriveSelection | |
9 | ||
10 | import orchestrator | |
11 | from orchestrator import OrchestratorError | |
12 | from mgr_module import MonCommandFailed | |
13 | ||
14 | from cephadm.services.cephadmservice import CephadmService | |
15 | ||
16 | ||
17 | logger = logging.getLogger(__name__) | |
18 | ||
19 | ||
20 | class OSDService(CephadmService): | |
21 | def create(self, drive_group: DriveGroupSpec) -> str: | |
22 | logger.debug(f"Processing DriveGroup {drive_group}") | |
23 | ret = [] | |
24 | drive_group.osd_id_claims = self.find_destroyed_osds() | |
25 | logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}") | |
26 | for host, drive_selection in self.prepare_drivegroup(drive_group): | |
27 | logger.info('Applying %s on host %s...' % (drive_group.service_id, host)) | |
28 | cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection, | |
29 | drive_group.osd_id_claims.get(host, [])) | |
30 | if not cmd: | |
31 | logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id)) | |
32 | continue | |
33 | # env_vars = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"] | |
34 | # disable this until https://github.com/ceph/ceph/pull/34835 is merged | |
35 | env_vars: List[str] = [] | |
36 | ret_msg = self.create_single_host( | |
37 | host, cmd, replace_osd_ids=drive_group.osd_id_claims.get(host, []), env_vars=env_vars | |
38 | ) | |
39 | ret.append(ret_msg) | |
40 | return ", ".join(ret) | |
41 | ||
42 | def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str: | |
43 | out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars) | |
44 | ||
45 | if code == 1 and ', it is already prepared' in '\n'.join(err): | |
46 | # HACK: when we create against an existing LV, ceph-volume | |
47 | # returns an error and the above message. To make this | |
48 | # command idempotent, tolerate this "error" and continue. | |
49 | logger.debug('the device was already prepared; continuing') | |
50 | code = 0 | |
51 | if code: | |
52 | raise RuntimeError( | |
53 | 'cephadm exited with an error code: %d, stderr:%s' % ( | |
54 | code, '\n'.join(err))) | |
55 | ||
56 | # check result | |
57 | out, err, code = self.mgr._run_cephadm( | |
58 | host, 'osd', 'ceph-volume', | |
59 | [ | |
60 | '--', | |
61 | 'lvm', 'list', | |
62 | '--format', 'json', | |
63 | ]) | |
64 | before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True) | |
65 | osds_elems = json.loads('\n'.join(out)) | |
66 | fsid = self.mgr._cluster_fsid | |
67 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
68 | created = [] | |
69 | for osd_id, osds in osds_elems.items(): | |
70 | for osd in osds: | |
71 | if osd['tags']['ceph.cluster_fsid'] != fsid: | |
72 | logger.debug('mismatched fsid, skipping %s' % osd) | |
73 | continue | |
74 | if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: | |
75 | # if it exists but is part of the replacement operation, don't skip | |
76 | continue | |
77 | if osd_id not in osd_uuid_map: | |
78 | logger.debug('osd id {} does not exist in cluster'.format(osd_id)) | |
79 | continue | |
80 | if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: | |
81 | logger.debug('mismatched osd uuid (cluster has %s, osd ' | |
82 | 'has %s)' % ( | |
83 | osd_uuid_map.get(osd_id), | |
84 | osd['tags']['ceph.osd_fsid'])) | |
85 | continue | |
86 | ||
87 | created.append(osd_id) | |
88 | self.mgr._create_daemon( | |
89 | 'osd', osd_id, host, | |
90 | osd_uuid_map=osd_uuid_map) | |
91 | ||
92 | if created: | |
93 | self.mgr.cache.invalidate_host_devices(host) | |
94 | return "Created osd(s) %s on host '%s'" % (','.join(created), host) | |
95 | else: | |
96 | return "Created no osd(s) on host %s; already created?" % host | |
97 | ||
98 | def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]: | |
99 | # 1) use fn_filter to determine matching_hosts | |
100 | matching_hosts = drive_group.placement.filter_matching_hosts(self.mgr._get_hosts) | |
101 | # 2) Map the inventory to the InventoryHost object | |
102 | host_ds_map = [] | |
103 | ||
104 | # set osd_id_claims | |
105 | ||
106 | def _find_inv_for_host(hostname: str, inventory_dict: dict): | |
107 | # This is stupid and needs to be loaded with the host | |
108 | for _host, _inventory in inventory_dict.items(): | |
109 | if _host == hostname: | |
110 | return _inventory | |
111 | raise OrchestratorError("No inventory found for host: {}".format(hostname)) | |
112 | ||
113 | # 3) iterate over matching_host and call DriveSelection | |
114 | logger.debug(f"Checking matching hosts -> {matching_hosts}") | |
115 | for host in matching_hosts: | |
116 | inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices) | |
117 | logger.debug(f"Found inventory for host {inventory_for_host}") | |
118 | drive_selection = DriveSelection(drive_group, inventory_for_host) | |
119 | logger.debug(f"Found drive selection {drive_selection}") | |
120 | host_ds_map.append((host, drive_selection)) | |
121 | return host_ds_map | |
122 | ||
123 | def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec, | |
124 | drive_selection: DriveSelection, | |
125 | osd_id_claims: Optional[List[str]] = None, | |
126 | preview: bool = False) -> Optional[str]: | |
127 | logger.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command") | |
128 | cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, | |
129 | osd_id_claims, preview=preview).run() | |
130 | logger.debug(f"Resulting ceph-volume cmd: {cmd}") | |
131 | return cmd | |
132 | ||
133 | def get_previews(self, host) -> List[Dict[str, Any]]: | |
134 | # Find OSDSpecs that match host. | |
135 | osdspecs = self.mgr.resolve_osdspecs_for_host(host) | |
136 | return self.generate_previews(osdspecs, host) | |
137 | ||
138 | def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]: | |
139 | """ | |
140 | ||
141 | The return should look like this: | |
142 | ||
143 | [ | |
144 | {'data': {<metadata>}, | |
145 | 'osdspec': <name of osdspec>, | |
146 | 'host': <name of host> | |
147 | }, | |
148 | ||
149 | {'data': ..., | |
150 | 'osdspec': .., | |
151 | 'host': .. | |
152 | } | |
153 | ] | |
154 | ||
155 | Note: One host can have multiple previews based on its assigned OSDSpecs. | |
156 | """ | |
157 | self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}") | |
158 | ret_all: List[Dict[str, Any]] = [] | |
159 | if not osdspecs: | |
160 | return ret_all | |
161 | for osdspec in osdspecs: | |
162 | ||
163 | # populate osd_id_claims | |
164 | osdspec.osd_id_claims = self.find_destroyed_osds() | |
165 | ||
166 | # prepare driveselection | |
167 | for host, ds in self.prepare_drivegroup(osdspec): | |
168 | if host != for_host: | |
169 | continue | |
170 | ||
171 | # driveselection for host | |
172 | cmd = self.driveselection_to_ceph_volume(osdspec, | |
173 | ds, | |
174 | osdspec.osd_id_claims.get(host, []), | |
175 | preview=True) | |
176 | if not cmd: | |
177 | logger.debug("No data_devices, skipping DriveGroup: {}".format( | |
178 | osdspec.service_name())) | |
179 | continue | |
180 | ||
181 | # get preview data from ceph-volume | |
182 | out, err, code = self._run_ceph_volume_command(host, cmd) | |
183 | if out: | |
184 | concat_out: Dict[str, Any] = json.loads(" ".join(out)) | |
185 | ret_all.append({'data': concat_out, | |
186 | 'osdspec': osdspec.service_id, | |
187 | 'host': host}) | |
188 | return ret_all | |
189 | ||
190 | def _run_ceph_volume_command(self, host: str, | |
191 | cmd: str, env_vars: Optional[List[str]] = None | |
192 | ) -> Tuple[List[str], List[str], int]: | |
193 | self.mgr.inventory.assert_host(host) | |
194 | ||
195 | # get bootstrap key | |
196 | ret, keyring, err = self.mgr.check_mon_command({ | |
197 | 'prefix': 'auth get', | |
198 | 'entity': 'client.bootstrap-osd', | |
199 | }) | |
200 | ||
201 | # generate config | |
202 | ret, config, err = self.mgr.check_mon_command({ | |
203 | "prefix": "config generate-minimal-conf", | |
204 | }) | |
205 | ||
206 | j = json.dumps({ | |
207 | 'config': config, | |
208 | 'keyring': keyring, | |
209 | }) | |
210 | ||
211 | split_cmd = cmd.split(' ') | |
212 | _cmd = ['--config-json', '-', '--'] | |
213 | _cmd.extend(split_cmd) | |
214 | out, err, code = self.mgr._run_cephadm( | |
215 | host, 'osd', 'ceph-volume', | |
216 | _cmd, | |
217 | env_vars=env_vars, | |
218 | stdin=j, | |
219 | error_ok=True) | |
220 | return out, err, code | |
221 | ||
222 | def get_osdspec_affinity(self, osd_id: str) -> str: | |
223 | return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '') | |
224 | ||
225 | def find_destroyed_osds(self) -> Dict[str, List[str]]: | |
226 | osd_host_map: Dict[str, List[str]] = dict() | |
227 | try: | |
228 | ret, out, err = self.mgr.check_mon_command({ | |
229 | 'prefix': 'osd tree', | |
230 | 'states': ['destroyed'], | |
231 | 'format': 'json' | |
232 | }) | |
233 | except MonCommandFailed as e: | |
234 | logger.exception('osd tree failed') | |
235 | raise OrchestratorError(str(e)) | |
236 | try: | |
237 | tree = json.loads(out) | |
238 | except json.decoder.JSONDecodeError: | |
239 | logger.exception(f"Could not decode json -> {out}") | |
240 | return osd_host_map | |
241 | ||
242 | nodes = tree.get('nodes', {}) | |
243 | for node in nodes: | |
244 | if node.get('type') == 'host': | |
245 | osd_host_map.update( | |
246 | {node.get('name'): [str(_id) for _id in node.get('children', list())]} | |
247 | ) | |
248 | self.mgr.log.info( | |
249 | f"Found osd claims -> {osd_host_map}") | |
250 | return osd_host_map | |
251 | ||
252 | ||
253 | class OSDRemoval(object): | |
254 | def __init__(self, | |
255 | osd_id: str, | |
256 | replace: bool, | |
257 | force: bool, | |
258 | nodename: str, | |
259 | fullname: str, | |
260 | start_at: datetime.datetime, | |
261 | pg_count: int): | |
262 | self.osd_id = osd_id | |
263 | self.replace = replace | |
264 | self.force = force | |
265 | self.nodename = nodename | |
266 | self.fullname = fullname | |
267 | self.started_at = start_at | |
268 | self.pg_count = pg_count | |
269 | ||
270 | # needed due to changing 'started_at' attr | |
271 | def __eq__(self, other): | |
272 | return self.osd_id == other.osd_id | |
273 | ||
274 | def __hash__(self): | |
275 | return hash(self.osd_id) | |
276 | ||
277 | def __repr__(self): | |
278 | return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}' | |
279 | ', fullname={}, started_at={}, pg_count={})').format( | |
280 | self.osd_id, self.replace, self.force, self.nodename, | |
281 | self.fullname, self.started_at, self.pg_count) | |
282 | ||
283 | @property | |
284 | def pg_count_str(self) -> str: | |
285 | return 'n/a' if self.pg_count < 0 else str(self.pg_count) | |
286 | ||
287 | ||
288 | class RemoveUtil(object): | |
289 | def __init__(self, mgr): | |
290 | self.mgr = mgr | |
291 | self.to_remove_osds: Set[OSDRemoval] = set() | |
292 | self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict() | |
293 | ||
294 | @property | |
295 | def report(self) -> Set[OSDRemoval]: | |
296 | return self.to_remove_osds.copy() | |
297 | ||
298 | def queue_osds_for_removal(self, osds: Set[OSDRemoval]): | |
299 | self.to_remove_osds.update(osds) | |
300 | ||
301 | def _remove_osds_bg(self) -> None: | |
302 | """ | |
303 | Performs actions in the _serve() loop to remove an OSD | |
304 | when criteria is met. | |
305 | """ | |
306 | logger.debug( | |
307 | f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") | |
308 | self._update_osd_removal_status() | |
309 | remove_osds: set = self.to_remove_osds.copy() | |
310 | for osd in remove_osds: | |
311 | if not osd.force: | |
312 | self.drain_osd(osd.osd_id) | |
313 | # skip criteria | |
314 | if not self.is_empty(osd.osd_id): | |
315 | logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") | |
316 | continue | |
317 | ||
318 | if not self.ok_to_destroy([osd.osd_id]): | |
319 | logger.info( | |
320 | f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") | |
321 | continue | |
322 | ||
323 | # abort criteria | |
324 | if not self.down_osd([osd.osd_id]): | |
325 | # also remove it from the remove_osd list and set a health_check warning? | |
326 | raise orchestrator.OrchestratorError( | |
327 | f"Could not set OSD <{osd.osd_id}> to 'down'") | |
328 | ||
329 | if osd.replace: | |
330 | if not self.destroy_osd(osd.osd_id): | |
331 | # also remove it from the remove_osd list and set a health_check warning? | |
332 | raise orchestrator.OrchestratorError( | |
333 | f"Could not destroy OSD <{osd.osd_id}>") | |
334 | else: | |
335 | if not self.purge_osd(osd.osd_id): | |
336 | # also remove it from the remove_osd list and set a health_check warning? | |
337 | raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") | |
338 | ||
339 | self.mgr._remove_daemon(osd.fullname, osd.nodename) | |
340 | logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}") | |
341 | logger.debug(f"Removing {osd.osd_id} from the queue.") | |
342 | self.to_remove_osds.remove(osd) | |
343 | ||
344 | def _update_osd_removal_status(self): | |
345 | """ | |
346 | Generate a OSD report that can be printed to the CLI | |
347 | """ | |
348 | logger.debug("Update OSD removal status") | |
349 | for osd in self.to_remove_osds: | |
350 | osd.pg_count = self.get_pg_count(str(osd.osd_id)) | |
351 | logger.debug(f"OSD removal status: {self.to_remove_osds}") | |
352 | ||
353 | def drain_osd(self, osd_id: str) -> bool: | |
354 | """ | |
355 | Uses `osd_support` module to schedule a drain operation of an OSD | |
356 | """ | |
357 | cmd_args = { | |
358 | 'prefix': 'osd drain', | |
359 | 'osd_ids': [int(osd_id)] | |
360 | } | |
361 | return self._run_mon_cmd(cmd_args) | |
362 | ||
363 | def get_pg_count(self, osd_id: str) -> int: | |
364 | """ Queries for PG count of an OSD """ | |
365 | self.mgr.log.debug("Querying for drain status") | |
366 | ret, out, err = self.mgr.mon_command({ | |
367 | 'prefix': 'osd drain status', | |
368 | }) | |
369 | if ret != 0: | |
370 | self.mgr.log.error(f"Calling osd drain status failed with {err}") | |
371 | raise OrchestratorError("Could not query `osd drain status`") | |
372 | out = json.loads(out) | |
373 | for o in out: | |
374 | if str(o.get('osd_id', '')) == str(osd_id): | |
375 | return int(o.get('pgs', -1)) | |
376 | return -1 | |
377 | ||
378 | def is_empty(self, osd_id: str) -> bool: | |
379 | """ Checks if an OSD is empty """ | |
380 | return self.get_pg_count(osd_id) == 0 | |
381 | ||
382 | def ok_to_destroy(self, osd_ids: List[int]) -> bool: | |
383 | """ Queries the safe-to-destroy flag for OSDs """ | |
384 | cmd_args = {'prefix': 'osd safe-to-destroy', | |
385 | 'ids': osd_ids} | |
386 | return self._run_mon_cmd(cmd_args) | |
387 | ||
388 | def destroy_osd(self, osd_id: int) -> bool: | |
389 | """ Destroys an OSD (forcefully) """ | |
390 | cmd_args = {'prefix': 'osd destroy-actual', | |
391 | 'id': int(osd_id), | |
392 | 'yes_i_really_mean_it': True} | |
393 | return self._run_mon_cmd(cmd_args) | |
394 | ||
395 | def down_osd(self, osd_ids: List[int]) -> bool: | |
396 | """ Sets `out` flag to OSDs """ | |
397 | cmd_args = { | |
398 | 'prefix': 'osd down', | |
399 | 'ids': osd_ids, | |
400 | } | |
401 | return self._run_mon_cmd(cmd_args) | |
402 | ||
403 | def purge_osd(self, osd_id: int) -> bool: | |
404 | """ Purges an OSD from the cluster (forcefully) """ | |
405 | cmd_args = { | |
406 | 'prefix': 'osd purge-actual', | |
407 | 'id': int(osd_id), | |
408 | 'yes_i_really_mean_it': True | |
409 | } | |
410 | return self._run_mon_cmd(cmd_args) | |
411 | ||
412 | def out_osd(self, osd_ids: List[int]) -> bool: | |
413 | """ Sets `down` flag to OSDs """ | |
414 | cmd_args = { | |
415 | 'prefix': 'osd out', | |
416 | 'ids': osd_ids, | |
417 | } | |
418 | return self._run_mon_cmd(cmd_args) | |
419 | ||
420 | def _run_mon_cmd(self, cmd_args: dict) -> bool: | |
421 | """ | |
422 | Generic command to run mon_command and evaluate/log the results | |
423 | """ | |
424 | ret, out, err = self.mgr.mon_command(cmd_args) | |
425 | if ret != 0: | |
426 | self.mgr.log.debug(f"ran {cmd_args} with mon_command") | |
427 | self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") | |
428 | return False | |
429 | self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") | |
430 | return True |