]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/osd.py
2ead0cb8077f6508f82c8866e2e22397cfe93818
[ceph.git] / ceph / src / pybind / mgr / cephadm / osd.py
1 import datetime
2 import json
3 import logging
4 import time
5
6 from typing import List, Dict, Any, Set, Union
7
8 import orchestrator
9 from orchestrator import OrchestratorError
10
11 logger = logging.getLogger(__name__)
12
13
14 class OSDRemoval(object):
15 def __init__(self,
16 osd_id: str,
17 replace: bool,
18 force: bool,
19 nodename: str,
20 fullname: str,
21 start_at: datetime.datetime,
22 pg_count: int):
23 self.osd_id = osd_id
24 self.replace = replace
25 self.force = force
26 self.nodename = nodename
27 self.fullname = fullname
28 self.started_at = start_at
29 self.pg_count = pg_count
30
31 # needed due to changing 'started_at' attr
32 def __eq__(self, other):
33 return self.osd_id == other.osd_id
34
35 def __hash__(self):
36 return hash(self.osd_id)
37
38 def __repr__(self):
39 return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
40 ', fullname={}, started_at={}, pg_count={})').format(
41 self.osd_id, self.replace, self.force, self.nodename,
42 self.fullname, self.started_at, self.pg_count)
43
44 @property
45 def pg_count_str(self) -> str:
46 return 'n/a' if self.pg_count < 0 else str(self.pg_count)
47
48
49 class RemoveUtil(object):
50 def __init__(self, mgr):
51 self.mgr = mgr
52 self.to_remove_osds: Set[OSDRemoval] = set()
53 self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
54
55 @property
56 def report(self) -> Set[OSDRemoval]:
57 return self.to_remove_osds.copy()
58
59 def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
60 self.to_remove_osds.update(osds)
61
62 def _remove_osds_bg(self) -> None:
63 """
64 Performs actions in the _serve() loop to remove an OSD
65 when criteria is met.
66 """
67 logger.debug(
68 f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
69 self._update_osd_removal_status()
70 remove_osds: set = self.to_remove_osds.copy()
71 for osd in remove_osds:
72 if not osd.force:
73 self.drain_osd(osd.osd_id)
74 # skip criteria
75 if not self.is_empty(osd.osd_id):
76 logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
77 continue
78
79 if not self.ok_to_destroy([osd.osd_id]):
80 logger.info(
81 f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
82 continue
83
84 # abort criteria
85 if not self.down_osd([osd.osd_id]):
86 # also remove it from the remove_osd list and set a health_check warning?
87 raise orchestrator.OrchestratorError(
88 f"Could not set OSD <{osd.osd_id}> to 'down'")
89
90 if osd.replace:
91 if not self.destroy_osd(osd.osd_id):
92 # also remove it from the remove_osd list and set a health_check warning?
93 raise orchestrator.OrchestratorError(
94 f"Could not destroy OSD <{osd.osd_id}>")
95 else:
96 if not self.purge_osd(osd.osd_id):
97 # also remove it from the remove_osd list and set a health_check warning?
98 raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
99
100 self.mgr._remove_daemon(osd.fullname, osd.nodename)
101 logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
102 logger.debug(f"Removing {osd.osd_id} from the queue.")
103 self.to_remove_osds.remove(osd)
104
105 def _update_osd_removal_status(self):
106 """
107 Generate a OSD report that can be printed to the CLI
108 """
109 logger.debug("Update OSD removal status")
110 for osd in self.to_remove_osds:
111 osd.pg_count = self.get_pg_count(str(osd.osd_id))
112 logger.debug(f"OSD removal status: {self.to_remove_osds}")
113
114 def drain_osd(self, osd_id: str) -> bool:
115 """
116 Uses `osd_support` module to schedule a drain operation of an OSD
117 """
118 cmd_args = {
119 'prefix': 'osd drain',
120 'osd_ids': [int(osd_id)]
121 }
122 return self._run_mon_cmd(cmd_args)
123
124 def get_pg_count(self, osd_id: str) -> int:
125 """ Queries for PG count of an OSD """
126 self.mgr.log.debug("Querying for drain status")
127 ret, out, err = self.mgr.mon_command({
128 'prefix': 'osd drain status',
129 })
130 if ret != 0:
131 self.mgr.log.error(f"Calling osd drain status failed with {err}")
132 raise OrchestratorError("Could not query `osd drain status`")
133 out = json.loads(out)
134 for o in out:
135 if str(o.get('osd_id', '')) == str(osd_id):
136 return int(o.get('pgs', -1))
137 return -1
138
139 def is_empty(self, osd_id: str) -> bool:
140 """ Checks if an OSD is empty """
141 return self.get_pg_count(osd_id) == 0
142
143 def ok_to_destroy(self, osd_ids: List[int]) -> bool:
144 """ Queries the safe-to-destroy flag for OSDs """
145 cmd_args = {'prefix': 'osd safe-to-destroy',
146 'ids': osd_ids}
147 return self._run_mon_cmd(cmd_args)
148
149 def destroy_osd(self, osd_id: int) -> bool:
150 """ Destroys an OSD (forcefully) """
151 cmd_args = {'prefix': 'osd destroy-actual',
152 'id': int(osd_id),
153 'yes_i_really_mean_it': True}
154 return self._run_mon_cmd(cmd_args)
155
156 def down_osd(self, osd_ids: List[int]) -> bool:
157 """ Sets `out` flag to OSDs """
158 cmd_args = {
159 'prefix': 'osd down',
160 'ids': osd_ids,
161 }
162 return self._run_mon_cmd(cmd_args)
163
164 def purge_osd(self, osd_id: int) -> bool:
165 """ Purges an OSD from the cluster (forcefully) """
166 cmd_args = {
167 'prefix': 'osd purge-actual',
168 'id': int(osd_id),
169 'yes_i_really_mean_it': True
170 }
171 return self._run_mon_cmd(cmd_args)
172
173 def out_osd(self, osd_ids: List[int]) -> bool:
174 """ Sets `down` flag to OSDs """
175 cmd_args = {
176 'prefix': 'osd out',
177 'ids': osd_ids,
178 }
179 return self._run_mon_cmd(cmd_args)
180
181 def _run_mon_cmd(self, cmd_args: dict) -> bool:
182 """
183 Generic command to run mon_command and evaluate/log the results
184 """
185 ret, out, err = self.mgr.mon_command(cmd_args)
186 if ret != 0:
187 self.mgr.log.debug(f"ran {cmd_args} with mon_command")
188 self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
189 return False
190 self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
191 return True