]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/services/osd.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
CommitLineData
e306af50
TL
1import datetime
2import json
3import logging
4from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional
5
6from ceph.deployment import translate
7from ceph.deployment.drive_group import DriveGroupSpec
8from ceph.deployment.drive_selection import DriveSelection
9
10import orchestrator
11from orchestrator import OrchestratorError
12from mgr_module import MonCommandFailed
13
14from cephadm.services.cephadmservice import CephadmService
15
16
17logger = logging.getLogger(__name__)
18
19
20class OSDService(CephadmService):
21 def create(self, drive_group: DriveGroupSpec) -> str:
22 logger.debug(f"Processing DriveGroup {drive_group}")
23 ret = []
24 drive_group.osd_id_claims = self.find_destroyed_osds()
25 logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
26 for host, drive_selection in self.prepare_drivegroup(drive_group):
27 logger.info('Applying %s on host %s...' % (drive_group.service_id, host))
28 cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
29 drive_group.osd_id_claims.get(host, []))
30 if not cmd:
31 logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
32 continue
33 # env_vars = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
34 # disable this until https://github.com/ceph/ceph/pull/34835 is merged
35 env_vars: List[str] = []
36 ret_msg = self.create_single_host(
37 host, cmd, replace_osd_ids=drive_group.osd_id_claims.get(host, []), env_vars=env_vars
38 )
39 ret.append(ret_msg)
40 return ", ".join(ret)
41
42 def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str:
43 out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
44
45 if code == 1 and ', it is already prepared' in '\n'.join(err):
46 # HACK: when we create against an existing LV, ceph-volume
47 # returns an error and the above message. To make this
48 # command idempotent, tolerate this "error" and continue.
49 logger.debug('the device was already prepared; continuing')
50 code = 0
51 if code:
52 raise RuntimeError(
53 'cephadm exited with an error code: %d, stderr:%s' % (
54 code, '\n'.join(err)))
55
56 # check result
57 out, err, code = self.mgr._run_cephadm(
58 host, 'osd', 'ceph-volume',
59 [
60 '--',
61 'lvm', 'list',
62 '--format', 'json',
63 ])
64 before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
65 osds_elems = json.loads('\n'.join(out))
66 fsid = self.mgr._cluster_fsid
67 osd_uuid_map = self.mgr.get_osd_uuid_map()
68 created = []
69 for osd_id, osds in osds_elems.items():
70 for osd in osds:
71 if osd['tags']['ceph.cluster_fsid'] != fsid:
72 logger.debug('mismatched fsid, skipping %s' % osd)
73 continue
74 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
75 # if it exists but is part of the replacement operation, don't skip
76 continue
77 if osd_id not in osd_uuid_map:
78 logger.debug('osd id {} does not exist in cluster'.format(osd_id))
79 continue
80 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
81 logger.debug('mismatched osd uuid (cluster has %s, osd '
82 'has %s)' % (
83 osd_uuid_map.get(osd_id),
84 osd['tags']['ceph.osd_fsid']))
85 continue
86
87 created.append(osd_id)
88 self.mgr._create_daemon(
89 'osd', osd_id, host,
90 osd_uuid_map=osd_uuid_map)
91
92 if created:
93 self.mgr.cache.invalidate_host_devices(host)
94 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
95 else:
96 return "Created no osd(s) on host %s; already created?" % host
97
98 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
99 # 1) use fn_filter to determine matching_hosts
100 matching_hosts = drive_group.placement.filter_matching_hosts(self.mgr._get_hosts)
101 # 2) Map the inventory to the InventoryHost object
102 host_ds_map = []
103
104 # set osd_id_claims
105
106 def _find_inv_for_host(hostname: str, inventory_dict: dict):
107 # This is stupid and needs to be loaded with the host
108 for _host, _inventory in inventory_dict.items():
109 if _host == hostname:
110 return _inventory
111 raise OrchestratorError("No inventory found for host: {}".format(hostname))
112
113 # 3) iterate over matching_host and call DriveSelection
114 logger.debug(f"Checking matching hosts -> {matching_hosts}")
115 for host in matching_hosts:
116 inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
117 logger.debug(f"Found inventory for host {inventory_for_host}")
118 drive_selection = DriveSelection(drive_group, inventory_for_host)
119 logger.debug(f"Found drive selection {drive_selection}")
120 host_ds_map.append((host, drive_selection))
121 return host_ds_map
122
123 def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
124 drive_selection: DriveSelection,
125 osd_id_claims: Optional[List[str]] = None,
126 preview: bool = False) -> Optional[str]:
127 logger.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
128 cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection,
129 osd_id_claims, preview=preview).run()
130 logger.debug(f"Resulting ceph-volume cmd: {cmd}")
131 return cmd
132
133 def get_previews(self, host) -> List[Dict[str, Any]]:
134 # Find OSDSpecs that match host.
135 osdspecs = self.mgr.resolve_osdspecs_for_host(host)
136 return self.generate_previews(osdspecs, host)
137
138 def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
139 """
140
141 The return should look like this:
142
143 [
144 {'data': {<metadata>},
145 'osdspec': <name of osdspec>,
146 'host': <name of host>
147 },
148
149 {'data': ...,
150 'osdspec': ..,
151 'host': ..
152 }
153 ]
154
155 Note: One host can have multiple previews based on its assigned OSDSpecs.
156 """
157 self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
158 ret_all: List[Dict[str, Any]] = []
159 if not osdspecs:
160 return ret_all
161 for osdspec in osdspecs:
162
163 # populate osd_id_claims
164 osdspec.osd_id_claims = self.find_destroyed_osds()
165
166 # prepare driveselection
167 for host, ds in self.prepare_drivegroup(osdspec):
168 if host != for_host:
169 continue
170
171 # driveselection for host
172 cmd = self.driveselection_to_ceph_volume(osdspec,
173 ds,
174 osdspec.osd_id_claims.get(host, []),
175 preview=True)
176 if not cmd:
177 logger.debug("No data_devices, skipping DriveGroup: {}".format(
178 osdspec.service_name()))
179 continue
180
181 # get preview data from ceph-volume
182 out, err, code = self._run_ceph_volume_command(host, cmd)
183 if out:
184 concat_out: Dict[str, Any] = json.loads(" ".join(out))
185 ret_all.append({'data': concat_out,
186 'osdspec': osdspec.service_id,
187 'host': host})
188 return ret_all
189
190 def _run_ceph_volume_command(self, host: str,
191 cmd: str, env_vars: Optional[List[str]] = None
192 ) -> Tuple[List[str], List[str], int]:
193 self.mgr.inventory.assert_host(host)
194
195 # get bootstrap key
196 ret, keyring, err = self.mgr.check_mon_command({
197 'prefix': 'auth get',
198 'entity': 'client.bootstrap-osd',
199 })
200
201 # generate config
202 ret, config, err = self.mgr.check_mon_command({
203 "prefix": "config generate-minimal-conf",
204 })
205
206 j = json.dumps({
207 'config': config,
208 'keyring': keyring,
209 })
210
211 split_cmd = cmd.split(' ')
212 _cmd = ['--config-json', '-', '--']
213 _cmd.extend(split_cmd)
214 out, err, code = self.mgr._run_cephadm(
215 host, 'osd', 'ceph-volume',
216 _cmd,
217 env_vars=env_vars,
218 stdin=j,
219 error_ok=True)
220 return out, err, code
221
222 def get_osdspec_affinity(self, osd_id: str) -> str:
223 return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
224
225 def find_destroyed_osds(self) -> Dict[str, List[str]]:
226 osd_host_map: Dict[str, List[str]] = dict()
227 try:
228 ret, out, err = self.mgr.check_mon_command({
229 'prefix': 'osd tree',
230 'states': ['destroyed'],
231 'format': 'json'
232 })
233 except MonCommandFailed as e:
234 logger.exception('osd tree failed')
235 raise OrchestratorError(str(e))
236 try:
237 tree = json.loads(out)
238 except json.decoder.JSONDecodeError:
239 logger.exception(f"Could not decode json -> {out}")
240 return osd_host_map
241
242 nodes = tree.get('nodes', {})
243 for node in nodes:
244 if node.get('type') == 'host':
245 osd_host_map.update(
246 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
247 )
248 self.mgr.log.info(
249 f"Found osd claims -> {osd_host_map}")
250 return osd_host_map
251
252
253class OSDRemoval(object):
254 def __init__(self,
255 osd_id: str,
256 replace: bool,
257 force: bool,
258 nodename: str,
259 fullname: str,
260 start_at: datetime.datetime,
261 pg_count: int):
262 self.osd_id = osd_id
263 self.replace = replace
264 self.force = force
265 self.nodename = nodename
266 self.fullname = fullname
267 self.started_at = start_at
268 self.pg_count = pg_count
269
270 # needed due to changing 'started_at' attr
271 def __eq__(self, other):
272 return self.osd_id == other.osd_id
273
274 def __hash__(self):
275 return hash(self.osd_id)
276
277 def __repr__(self):
278 return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
279 ', fullname={}, started_at={}, pg_count={})').format(
280 self.osd_id, self.replace, self.force, self.nodename,
281 self.fullname, self.started_at, self.pg_count)
282
283 @property
284 def pg_count_str(self) -> str:
285 return 'n/a' if self.pg_count < 0 else str(self.pg_count)
286
287
288class RemoveUtil(object):
289 def __init__(self, mgr):
290 self.mgr = mgr
291 self.to_remove_osds: Set[OSDRemoval] = set()
292 self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
293
294 @property
295 def report(self) -> Set[OSDRemoval]:
296 return self.to_remove_osds.copy()
297
298 def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
299 self.to_remove_osds.update(osds)
300
301 def _remove_osds_bg(self) -> None:
302 """
303 Performs actions in the _serve() loop to remove an OSD
304 when criteria is met.
305 """
306 logger.debug(
307 f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
308 self._update_osd_removal_status()
309 remove_osds: set = self.to_remove_osds.copy()
310 for osd in remove_osds:
311 if not osd.force:
312 self.drain_osd(osd.osd_id)
313 # skip criteria
314 if not self.is_empty(osd.osd_id):
315 logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
316 continue
317
318 if not self.ok_to_destroy([osd.osd_id]):
319 logger.info(
320 f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
321 continue
322
323 # abort criteria
324 if not self.down_osd([osd.osd_id]):
325 # also remove it from the remove_osd list and set a health_check warning?
326 raise orchestrator.OrchestratorError(
327 f"Could not set OSD <{osd.osd_id}> to 'down'")
328
329 if osd.replace:
330 if not self.destroy_osd(osd.osd_id):
331 # also remove it from the remove_osd list and set a health_check warning?
332 raise orchestrator.OrchestratorError(
333 f"Could not destroy OSD <{osd.osd_id}>")
334 else:
335 if not self.purge_osd(osd.osd_id):
336 # also remove it from the remove_osd list and set a health_check warning?
337 raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
338
339 self.mgr._remove_daemon(osd.fullname, osd.nodename)
340 logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
341 logger.debug(f"Removing {osd.osd_id} from the queue.")
342 self.to_remove_osds.remove(osd)
343
344 def _update_osd_removal_status(self):
345 """
346 Generate a OSD report that can be printed to the CLI
347 """
348 logger.debug("Update OSD removal status")
349 for osd in self.to_remove_osds:
350 osd.pg_count = self.get_pg_count(str(osd.osd_id))
351 logger.debug(f"OSD removal status: {self.to_remove_osds}")
352
353 def drain_osd(self, osd_id: str) -> bool:
354 """
355 Uses `osd_support` module to schedule a drain operation of an OSD
356 """
357 cmd_args = {
358 'prefix': 'osd drain',
359 'osd_ids': [int(osd_id)]
360 }
361 return self._run_mon_cmd(cmd_args)
362
363 def get_pg_count(self, osd_id: str) -> int:
364 """ Queries for PG count of an OSD """
365 self.mgr.log.debug("Querying for drain status")
366 ret, out, err = self.mgr.mon_command({
367 'prefix': 'osd drain status',
368 })
369 if ret != 0:
370 self.mgr.log.error(f"Calling osd drain status failed with {err}")
371 raise OrchestratorError("Could not query `osd drain status`")
372 out = json.loads(out)
373 for o in out:
374 if str(o.get('osd_id', '')) == str(osd_id):
375 return int(o.get('pgs', -1))
376 return -1
377
378 def is_empty(self, osd_id: str) -> bool:
379 """ Checks if an OSD is empty """
380 return self.get_pg_count(osd_id) == 0
381
382 def ok_to_destroy(self, osd_ids: List[int]) -> bool:
383 """ Queries the safe-to-destroy flag for OSDs """
384 cmd_args = {'prefix': 'osd safe-to-destroy',
385 'ids': osd_ids}
386 return self._run_mon_cmd(cmd_args)
387
388 def destroy_osd(self, osd_id: int) -> bool:
389 """ Destroys an OSD (forcefully) """
390 cmd_args = {'prefix': 'osd destroy-actual',
391 'id': int(osd_id),
392 'yes_i_really_mean_it': True}
393 return self._run_mon_cmd(cmd_args)
394
395 def down_osd(self, osd_ids: List[int]) -> bool:
396 """ Sets `out` flag to OSDs """
397 cmd_args = {
398 'prefix': 'osd down',
399 'ids': osd_ids,
400 }
401 return self._run_mon_cmd(cmd_args)
402
403 def purge_osd(self, osd_id: int) -> bool:
404 """ Purges an OSD from the cluster (forcefully) """
405 cmd_args = {
406 'prefix': 'osd purge-actual',
407 'id': int(osd_id),
408 'yes_i_really_mean_it': True
409 }
410 return self._run_mon_cmd(cmd_args)
411
412 def out_osd(self, osd_ids: List[int]) -> bool:
413 """ Sets `down` flag to OSDs """
414 cmd_args = {
415 'prefix': 'osd out',
416 'ids': osd_ids,
417 }
418 return self._run_mon_cmd(cmd_args)
419
420 def _run_mon_cmd(self, cmd_args: dict) -> bool:
421 """
422 Generic command to run mon_command and evaluate/log the results
423 """
424 ret, out, err = self.mgr.mon_command(cmd_args)
425 if ret != 0:
426 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
427 self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
428 return False
429 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
430 return True