]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/osd_support/module.py
1 from typing
import List
, Set
, Optional
2 from mgr_module
import MgrModule
, HandleCommandResult
3 from threading
import Event
8 class OSDSupport(MgrModule
):
9 # these are CLI commands we implement
12 "cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N",
13 "desc": "drain osd ids",
17 "cmd": "osd drain status",
18 "desc": "show status",
22 "cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N",
23 "desc": "show status for osds. Stopping all if osd_ids are omitted",
28 MODULE_OPTIONS
: List
[dict] = []
30 # These are "native" Ceph options that this module cares about.
31 NATIVE_OPTIONS
: List
[str] = []
33 osd_ids
: Set
[int] = set()
34 emptying_osds
: Set
[int] = set()
35 check_osds
: Set
[int] = set()
36 empty
: Set
[int] = set()
38 def __init__(self
, *args
, **kwargs
):
39 super(OSDSupport
, self
).__init
__(*args
, **kwargs
)
41 # set up some members to enable the serve() method and shutdown()
45 # ensure config options members are initialized; see config_notify()
48 def config_notify(self
):
50 This method is called whenever one of our config options is changed.
52 # This is some boilerplate that stores MODULE_OPTIONS in a class
53 # member, so that, for instance, the 'emphatic' option is always
54 # available as 'self.emphatic'.
55 for opt
in self
.MODULE_OPTIONS
:
58 self
.get_module_option(opt
['name']))
59 self
.log
.debug(' mgr option %s = %s',
60 opt
['name'], getattr(self
, opt
['name']))
61 # Do the same for the native options.
62 for _opt
in self
.NATIVE_OPTIONS
:
65 self
.get_ceph_option(_opt
))
66 self
.log
.debug('native option %s = %s', _opt
, getattr(self
, _opt
))
68 def handle_command(self
, inbuf
, cmd
):
72 cmd_prefix
= cmd
.get('prefix', '')
73 osd_ids
: List
[int] = cmd
.get('osd_ids', list())
74 not_found_osds
: Set
[int] = self
.osds_not_in_cluster(osd_ids
)
75 if cmd_prefix
== 'osd drain':
77 return -errno
.EINVAL
, '', f
"OSDs <{not_found_osds}> not found in cluster"
79 for osd_id
in osd_ids
:
80 if osd_id
not in self
.emptying_osds
:
81 self
.osd_ids
.add(osd_id
)
82 self
.log
.info(f
'Found OSD(s) <{self.osd_ids}> in the queue.')
83 out
= 'Started draining OSDs. Query progress with <ceph osd drain status>'
85 elif cmd_prefix
== 'osd drain status':
86 # re-initialize it with an empty set on invocation (long running processes)
87 self
.check_osds
= set()
88 # assemble a set of emptying osds and to_be_emptied osds
89 self
.check_osds
.update(self
.emptying_osds
)
90 self
.check_osds
.update(self
.osd_ids
)
91 self
.check_osds
.update(self
.empty
)
94 for osd_id
in self
.check_osds
:
95 pgs
= self
.get_pg_count(osd_id
)
96 report
.append(dict(osd_id
=osd_id
, pgs
=pgs
))
97 out
= f
"{json.dumps(report)}"
99 elif cmd_prefix
== 'osd drain stop':
101 self
.log
.debug("No osd_ids provided, stop all pending drain operations)")
103 self
.emptying_osds
= set()
105 # this is just a poor-man's solution as it will not really stop draining
106 # the osds. It will just stop the queries and also prevent any scheduled OSDs
107 # from getting drained at a later point in time.
108 out
= "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)"
112 return -errno
.EINVAL
, '', f
"OSDs <{not_found_osds}> not found in cluster"
114 self
.osd_ids
= self
.osd_ids
.difference(osd_ids
)
115 self
.emptying_osds
= self
.emptying_osds
.difference(osd_ids
)
116 out
= f
"Stopped draining operations for OSD(s): {osd_ids}"
119 return HandleCommandResult(
120 retval
=-errno
.EINVAL
,
122 stderr
=f
"Command not found <{cmd.get('prefix', '')}>")
123 return HandleCommandResult(
124 retval
=ret
, # exit code
130 This method is called by the mgr when the module starts and can be
131 used for any background activity.
133 self
.log
.info("Starting mgr/osd_support")
136 self
.log
.debug(f
"Scheduled for draining: <{self.osd_ids}>")
137 self
.log
.debug(f
"Currently being drained: <{self.emptying_osds}>")
138 # the state should be saved to the mon store in the actual call and
139 # then retrieved in serve() probably
141 # 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop
142 for x
in self
.find_osd_stop_threshold(self
.osd_ids
):
143 self
.emptying_osds
.add(x
)
145 # remove the emptying osds from the osd_ids since they don't need to be checked again.
146 self
.osd_ids
= self
.osd_ids
.difference(self
.emptying_osds
)
148 # 2) reweight the ok-to-stop osds, ONCE
149 self
.reweight_osds(self
.emptying_osds
)
151 # 3) check for osds to be empty
152 empty_osds
= self
.empty_osds(self
.emptying_osds
)
154 # remove osds that are marked as empty
155 self
.emptying_osds
= self
.emptying_osds
.difference(empty_osds
)
157 # move empty osds in the done queue until they disappear from ceph's view
158 # other modules need to know when OSDs are empty
159 for osd
in empty_osds
:
160 self
.log
.debug(f
"Adding {osd} to list of empty OSDs")
163 # remove from queue if no longer part of ceph cluster
166 # fixed sleep interval of 10 seconds
168 self
.log
.debug('Sleeping for %d seconds', sleep_interval
)
169 self
.event
.wait(sleep_interval
)
174 Remove OSDs that are no longer in the ceph cluster from the
178 for osd
in self
.osds_not_in_cluster(list(self
.empty
)):
179 self
.log
.info(f
"OSD: {osd} is not found in the cluster anymore. Removing")
180 self
.empty
.remove(osd
)
184 This method is called by the mgr when the module needs to shut
185 down (i.e., when the serve() function needs to exit).
187 self
.log
.info('Stopping')
191 def osds_not_in_cluster(self
, osd_ids
: List
[int]) -> Set
[int]:
192 self
.log
.info(f
"Checking if provided osds <{osd_ids}> exist in the cluster")
193 osd_map
= self
.get_osdmap()
194 cluster_osds
= [x
.get('osd') for x
in osd_map
.dump().get('osds', [])]
195 not_in_cluster
= set()
196 for osd_id
in osd_ids
:
197 if int(osd_id
) not in cluster_osds
:
198 self
.log
.error(f
"Could not find {osd_id} in cluster")
199 not_in_cluster
.add(osd_id
)
200 return not_in_cluster
202 def empty_osds(self
, osd_ids
: Set
[int]) -> List
[int]:
205 osd_df_data
= self
.osd_df()
207 for osd_id
in osd_ids
:
208 if self
.is_empty(osd_id
, osd_df
=osd_df_data
):
209 empty_osds
.append(osd_id
)
212 def osd_df(self
) -> dict:
213 # TODO: this should be cached I think
215 ret
, out
, err
= self
.mon_command({
219 return json
.loads(out
)
221 def is_empty(self
, osd_id
: int, osd_df
: Optional
[dict] = None) -> bool:
222 pgs
= self
.get_pg_count(osd_id
, osd_df
=osd_df
)
224 self
.log
.info(f
"osd: {osd_id} still has {pgs} PGs.")
226 self
.log
.info(f
"osd: {osd_id} has no PGs anymore")
229 def reweight_osds(self
, osd_ids
: Set
[int]) -> bool:
230 results
= [(self
.reweight_osd(osd_id
)) for osd_id
in osd_ids
]
233 def get_pg_count(self
, osd_id
: int, osd_df
: Optional
[dict] = None) -> int:
235 osd_df
= self
.osd_df()
236 osd_nodes
= osd_df
.get('nodes', [])
237 for osd_node
in osd_nodes
:
238 if osd_node
.get('id', None) == int(osd_id
):
239 return osd_node
.get('pgs', -1)
242 def get_osd_weight(self
, osd_id
: int) -> float:
243 osd_df
= self
.osd_df()
244 osd_nodes
= osd_df
.get('nodes', [])
245 for osd_node
in osd_nodes
:
246 if osd_node
.get('id', None) == int(osd_id
):
247 return float(osd_node
.get('crush_weight'))
250 def reweight_osd(self
, osd_id
: int, weight
: float = 0.0) -> bool:
251 if self
.get_osd_weight(osd_id
) == weight
:
252 self
.log
.debug(f
"OSD <{osd_id}> is already weighted with: {weight}")
254 base_cmd
= 'osd crush reweight'
255 ret
, out
, err
= self
.mon_command({
257 'name': f
'osd.{osd_id}',
260 cmd
= f
"{base_cmd} on osd.{osd_id} to weight: {weight}"
261 self
.log
.debug(f
"running cmd: {cmd}")
263 self
.log
.error(f
"command: {cmd} failed with: {err}")
265 self
.log
.info(f
"command: {cmd} succeeded with: {out}")
268 def find_osd_stop_threshold(self
, osd_ids
: Set
[int]) -> Set
[int]:
270 Cut osd_id list in half until it's ok-to-stop
272 :param osd_ids: list of osd_ids
273 :return: list of ods_ids that can be stopped at once
277 _osds
: List
[int] = list(osd_ids
.copy())
278 while not self
.ok_to_stop(_osds
):
280 # can't even stop one OSD, aborting
281 self
.log
.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
284 # splitting osd_ids in half until ok_to_stop yields success
285 # maybe popping ids off one by one is better here..depends on the cluster size I guess..
286 # There's a lot of room for micro adjustments here
287 _osds
= _osds
[len(_osds
)//2:]
290 def ok_to_stop(self
, osd_ids
: List
[int]) -> bool:
291 base_cmd
= "osd ok-to-stop"
292 self
.log
.debug(f
"running cmd: {base_cmd} on ids {osd_ids}")
293 ret
, out
, err
= self
.mon_command({
295 # apparently ok-to-stop allows strings only
296 'ids': [str(x
) for x
in osd_ids
]
299 self
.log
.error(f
"{osd_ids} are not ok-to-stop. {err}")
301 self
.log
.info(f
"OSDs <{osd_ids}> are ok-to-stop")