]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/osd_support/module.py
bump version to 15.2.4-pve1
[ceph.git] / ceph / src / pybind / mgr / osd_support / module.py
1 from typing import List, Set, Optional
2 from mgr_module import MgrModule, HandleCommandResult
3 from threading import Event
4 import json
5 import errno
6
7
8 class OSDSupport(MgrModule):
9 # these are CLI commands we implement
10 COMMANDS = [
11 {
12 "cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N",
13 "desc": "drain osd ids",
14 "perm": "r"
15 },
16 {
17 "cmd": "osd drain status",
18 "desc": "show status",
19 "perm": "r"
20 },
21 {
22 "cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N",
23 "desc": "show status for osds. Stopping all if osd_ids are omitted",
24 "perm": "r"
25 },
26 ]
27
28 MODULE_OPTIONS: List[dict] = []
29
30 # These are "native" Ceph options that this module cares about.
31 NATIVE_OPTIONS: List[str] = []
32
33 osd_ids: Set[int] = set()
34 emptying_osds: Set[int] = set()
35 check_osds: Set[int] = set()
36 empty: Set[int] = set()
37
38 def __init__(self, *args, **kwargs):
39 super(OSDSupport, self).__init__(*args, **kwargs)
40
41 # set up some members to enable the serve() method and shutdown()
42 self.run = True
43 self.event = Event()
44
45 # ensure config options members are initialized; see config_notify()
46 self.config_notify()
47
48 def config_notify(self):
49 """
50 This method is called whenever one of our config options is changed.
51 """
52 # This is some boilerplate that stores MODULE_OPTIONS in a class
53 # member, so that, for instance, the 'emphatic' option is always
54 # available as 'self.emphatic'.
55 for opt in self.MODULE_OPTIONS:
56 setattr(self,
57 opt['name'],
58 self.get_module_option(opt['name']))
59 self.log.debug(' mgr option %s = %s',
60 opt['name'], getattr(self, opt['name']))
61 # Do the same for the native options.
62 for _opt in self.NATIVE_OPTIONS:
63 setattr(self,
64 _opt,
65 self.get_ceph_option(_opt))
66 self.log.debug('native option %s = %s', _opt, getattr(self, _opt))
67
68 def handle_command(self, inbuf, cmd):
69 ret = 0
70 err = ''
71 _ = inbuf
72 cmd_prefix = cmd.get('prefix', '')
73 osd_ids: List[int] = cmd.get('osd_ids', list())
74 not_found_osds: Set[int] = self.osds_not_in_cluster(osd_ids)
75 if cmd_prefix == 'osd drain':
76 if not_found_osds:
77 return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
78 # add osd_ids to set
79 for osd_id in osd_ids:
80 if osd_id not in self.emptying_osds:
81 self.osd_ids.add(osd_id)
82 self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.')
83 out = 'Started draining OSDs. Query progress with <ceph osd drain status>'
84
85 elif cmd_prefix == 'osd drain status':
86 # re-initialize it with an empty set on invocation (long running processes)
87 self.check_osds = set()
88 # assemble a set of emptying osds and to_be_emptied osds
89 self.check_osds.update(self.emptying_osds)
90 self.check_osds.update(self.osd_ids)
91 self.check_osds.update(self.empty)
92
93 report = list()
94 for osd_id in self.check_osds:
95 pgs = self.get_pg_count(osd_id)
96 report.append(dict(osd_id=osd_id, pgs=pgs))
97 out = f"{json.dumps(report)}"
98
99 elif cmd_prefix == 'osd drain stop':
100 if not osd_ids:
101 self.log.debug("No osd_ids provided, stop all pending drain operations)")
102 self.osd_ids = set()
103 self.emptying_osds = set()
104
105 # this is just a poor-man's solution as it will not really stop draining
106 # the osds. It will just stop the queries and also prevent any scheduled OSDs
107 # from getting drained at a later point in time.
108 out = "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)"
109
110 else:
111 if not_found_osds:
112 return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
113
114 self.osd_ids = self.osd_ids.difference(osd_ids)
115 self.emptying_osds = self.emptying_osds.difference(osd_ids)
116 out = f"Stopped draining operations for OSD(s): {osd_ids}"
117
118 else:
119 return HandleCommandResult(
120 retval=-errno.EINVAL,
121 stdout='',
122 stderr=f"Command not found <{cmd.get('prefix', '')}>")
123 return HandleCommandResult(
124 retval=ret, # exit code
125 stdout=out, # stdout
126 stderr=err)
127
128 def serve(self):
129 """
130 This method is called by the mgr when the module starts and can be
131 used for any background activity.
132 """
133 self.log.info("Starting mgr/osd_support")
134 while self.run:
135
136 self.log.debug(f"Scheduled for draining: <{self.osd_ids}>")
137 self.log.debug(f"Currently being drained: <{self.emptying_osds}>")
138 # the state should be saved to the mon store in the actual call and
139 # then retrieved in serve() probably
140
141 # 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop
142 for x in self.find_osd_stop_threshold(self.osd_ids):
143 self.emptying_osds.add(x)
144
145 # remove the emptying osds from the osd_ids since they don't need to be checked again.
146 self.osd_ids = self.osd_ids.difference(self.emptying_osds)
147
148 # 2) reweight the ok-to-stop osds, ONCE
149 self.reweight_osds(self.emptying_osds)
150
151 # 3) check for osds to be empty
152 empty_osds = self.empty_osds(self.emptying_osds)
153
154 # remove osds that are marked as empty
155 self.emptying_osds = self.emptying_osds.difference(empty_osds)
156
157 # move empty osds in the done queue until they disappear from ceph's view
158 # other modules need to know when OSDs are empty
159 for osd in empty_osds:
160 self.log.debug(f"Adding {osd} to list of empty OSDs")
161 self.empty.add(osd)
162
163 # remove from queue if no longer part of ceph cluster
164 self.cleanup()
165
166 # fixed sleep interval of 10 seconds
167 sleep_interval = 10
168 self.log.debug('Sleeping for %d seconds', sleep_interval)
169 self.event.wait(sleep_interval)
170 self.event.clear()
171
172 def cleanup(self):
173 """
174 Remove OSDs that are no longer in the ceph cluster from the
175 'done' list.
176 :return:
177 """
178 for osd in self.osds_not_in_cluster(list(self.empty)):
179 self.log.info(f"OSD: {osd} is not found in the cluster anymore. Removing")
180 self.empty.remove(osd)
181
182 def shutdown(self):
183 """
184 This method is called by the mgr when the module needs to shut
185 down (i.e., when the serve() function needs to exit).
186 """
187 self.log.info('Stopping')
188 self.run = False
189 self.event.set()
190
191 def osds_not_in_cluster(self, osd_ids: List[int]) -> Set[int]:
192 self.log.info(f"Checking if provided osds <{osd_ids}> exist in the cluster")
193 osd_map = self.get_osdmap()
194 cluster_osds = [x.get('osd') for x in osd_map.dump().get('osds', [])]
195 not_in_cluster = set()
196 for osd_id in osd_ids:
197 if int(osd_id) not in cluster_osds:
198 self.log.error(f"Could not find {osd_id} in cluster")
199 not_in_cluster.add(osd_id)
200 return not_in_cluster
201
202 def empty_osds(self, osd_ids: Set[int]) -> List[int]:
203 if not osd_ids:
204 return list()
205 osd_df_data = self.osd_df()
206 empty_osds = list()
207 for osd_id in osd_ids:
208 if self.is_empty(osd_id, osd_df=osd_df_data):
209 empty_osds.append(osd_id)
210 return empty_osds
211
212 def osd_df(self) -> dict:
213 # TODO: this should be cached I think
214 base_cmd = 'osd df'
215 ret, out, err = self.mon_command({
216 'prefix': base_cmd,
217 'format': 'json'
218 })
219 return json.loads(out)
220
221 def is_empty(self, osd_id: int, osd_df: Optional[dict] = None) -> bool:
222 pgs = self.get_pg_count(osd_id, osd_df=osd_df)
223 if pgs != 0:
224 self.log.info(f"osd: {osd_id} still has {pgs} PGs.")
225 return False
226 self.log.info(f"osd: {osd_id} has no PGs anymore")
227 return True
228
229 def reweight_osds(self, osd_ids: Set[int]) -> bool:
230 results = [(self.reweight_osd(osd_id)) for osd_id in osd_ids]
231 return all(results)
232
233 def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
234 if not osd_df:
235 osd_df = self.osd_df()
236 osd_nodes = osd_df.get('nodes', [])
237 for osd_node in osd_nodes:
238 if osd_node.get('id', None) == int(osd_id):
239 return osd_node.get('pgs', -1)
240 return -1
241
242 def get_osd_weight(self, osd_id: int) -> float:
243 osd_df = self.osd_df()
244 osd_nodes = osd_df.get('nodes', [])
245 for osd_node in osd_nodes:
246 if osd_node.get('id', None) == int(osd_id):
247 return float(osd_node.get('crush_weight'))
248 return -1.0
249
250 def reweight_osd(self, osd_id: int, weight: float = 0.0) -> bool:
251 if self.get_osd_weight(osd_id) == weight:
252 self.log.debug(f"OSD <{osd_id}> is already weighted with: {weight}")
253 return True
254 base_cmd = 'osd crush reweight'
255 ret, out, err = self.mon_command({
256 'prefix': base_cmd,
257 'name': f'osd.{osd_id}',
258 'weight': weight
259 })
260 cmd = f"{base_cmd} on osd.{osd_id} to weight: {weight}"
261 self.log.debug(f"running cmd: {cmd}")
262 if ret != 0:
263 self.log.error(f"command: {cmd} failed with: {err}")
264 return False
265 self.log.info(f"command: {cmd} succeeded with: {out}")
266 return True
267
268 def find_osd_stop_threshold(self, osd_ids: Set[int]) -> Set[int]:
269 """
270 Cut osd_id list in half until it's ok-to-stop
271
272 :param osd_ids: list of osd_ids
273 :return: list of ods_ids that can be stopped at once
274 """
275 if not osd_ids:
276 return set()
277 _osds: List[int] = list(osd_ids.copy())
278 while not self.ok_to_stop(_osds):
279 if len(_osds) <= 1:
280 # can't even stop one OSD, aborting
281 self.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
282 return set()
283 self.event.wait(1)
284 # splitting osd_ids in half until ok_to_stop yields success
285 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
286 # There's a lot of room for micro adjustments here
287 _osds = _osds[len(_osds)//2:]
288 return set(_osds)
289
290 def ok_to_stop(self, osd_ids: List[int]) -> bool:
291 base_cmd = "osd ok-to-stop"
292 self.log.debug(f"running cmd: {base_cmd} on ids {osd_ids}")
293 ret, out, err = self.mon_command({
294 'prefix': base_cmd,
295 # apparently ok-to-stop allows strings only
296 'ids': [str(x) for x in osd_ids]
297 })
298 if ret != 0:
299 self.log.error(f"{osd_ids} are not ok-to-stop. {err}")
300 return False
301 self.log.info(f"OSDs <{osd_ids}> are ok-to-stop")
302 return True