]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/balancer/module.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
CommitLineData
3efd9988
FG
1"""
2Balance PG distribution across OSDs.
3"""
4
5import copy
20effc67 6import enum
3efd9988
FG
7import errno
8import json
9import math
10import random
11import time
20effc67 12from mgr_module import CLIReadCommand, CLICommand, CommandResult, MgrModule, Option, OSDMap
3efd9988 13from threading import Event
20effc67 14from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, Union
b32b8144 15from mgr_module import CRUSHMap
eafe8130 16import datetime
3efd9988 17
3efd9988
FG
18TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
19
20effc67 20
3efd9988 21class MappingState:
9f95a23c 22 def __init__(self, osdmap, raw_pg_stats, raw_pool_stats, desc=''):
3efd9988
FG
23 self.desc = desc
24 self.osdmap = osdmap
25 self.osdmap_dump = self.osdmap.dump()
26 self.crush = osdmap.get_crush()
27 self.crush_dump = self.crush.dump()
9f95a23c
TL
28 self.raw_pg_stats = raw_pg_stats
29 self.raw_pool_stats = raw_pool_stats
3efd9988 30 self.pg_stat = {
9f95a23c 31 i['pgid']: i['stat_sum'] for i in raw_pg_stats.get('pg_stats', [])
3efd9988 32 }
94b18763 33 osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
9f95a23c 34 pg_poolids = [p['poolid'] for p in raw_pool_stats.get('pool_stats', [])]
94b18763 35 self.poolids = set(osd_poolids) & set(pg_poolids)
3efd9988
FG
36 self.pg_up = {}
37 self.pg_up_by_poolid = {}
38 for poolid in self.poolids:
39 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
20effc67 40 for a, b in self.pg_up_by_poolid[poolid].items():
3efd9988
FG
41 self.pg_up[a] = b
42
43 def calc_misplaced_from(self, other_ms):
44 num = len(other_ms.pg_up)
45 misplaced = 0
f67539c2 46 for pgid, before in other_ms.pg_up.items():
3efd9988
FG
47 if before != self.pg_up.get(pgid, []):
48 misplaced += 1
49 if num > 0:
50 return float(misplaced) / float(num)
51 return 0.0
52
20effc67
TL
53
54class Mode(enum.Enum):
55 none = 'none'
56 crush_compat = 'crush-compat'
57 upmap = 'upmap'
58
59
9f95a23c
TL
60class Plan(object):
61 def __init__(self, name, mode, osdmap, pools):
3efd9988 62 self.name = name
9f95a23c
TL
63 self.mode = mode
64 self.osdmap = osdmap
65 self.osdmap_dump = osdmap.dump()
94b18763 66 self.pools = pools
3efd9988
FG
67 self.osd_weights = {}
68 self.compat_ws = {}
9f95a23c
TL
69 self.inc = osdmap.new_incremental()
70 self.pg_status = {}
71
20effc67
TL
72 def dump(self) -> str:
73 return json.dumps(self.inc.dump(), indent=4, sort_keys=True)
74
75 def show(self) -> str:
76 return 'upmap plan'
77
78
9f95a23c
TL
79class MsPlan(Plan):
80 """
81 Plan with a preloaded MappingState member.
82 """
20effc67
TL
83
84 def __init__(self, name: str, mode: str, ms: MappingState, pools: List[str]) -> None:
9f95a23c
TL
85 super(MsPlan, self).__init__(name, mode, ms.osdmap, pools)
86 self.initial = ms
3efd9988 87
20effc67 88 def final_state(self) -> MappingState:
3efd9988
FG
89 self.inc.set_osd_reweights(self.osd_weights)
90 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
91 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
9f95a23c
TL
92 self.initial.raw_pg_stats,
93 self.initial.raw_pool_stats,
3efd9988
FG
94 'plan %s final' % self.name)
95
20effc67 96 def show(self) -> str:
3efd9988
FG
97 ls = []
98 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
99 ls.append('# starting crush version %d' %
100 self.initial.osdmap.get_crush_version())
101 ls.append('# mode %s' % self.mode)
102 if len(self.compat_ws) and \
11fdf7f2 103 not CRUSHMap.have_default_choose_args(self.initial.crush_dump):
3efd9988 104 ls.append('ceph osd crush weight-set create-compat')
f67539c2 105 for osd, weight in self.compat_ws.items():
3efd9988
FG
106 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
107 (osd, weight))
f67539c2 108 for osd, weight in self.osd_weights.items():
3efd9988
FG
109 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
110 incdump = self.inc.dump()
111 for pgid in incdump.get('old_pg_upmap_items', []):
112 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
113 for item in incdump.get('new_pg_upmap_items', []):
114 osdlist = []
115 for m in item['mappings']:
116 osdlist += [m['from'], m['to']]
117 ls.append('ceph osd pg-upmap-items %s %s' %
118 (item['pgid'], ' '.join([str(a) for a in osdlist])))
119 return '\n'.join(ls)
120
121
122class Eval:
20effc67 123 def __init__(self, ms: MappingState):
3efd9988 124 self.ms = ms
20effc67
TL
125 self.root_ids: Dict[str, int] = {} # root name -> id
126 self.pool_name: Dict[str, str] = {} # pool id -> pool name
127 self.pool_id: Dict[str, int] = {} # pool name -> id
128 self.pool_roots: Dict[str, List[str]] = {} # pool name -> root name
129 self.root_pools: Dict[str, List[str]] = {} # root name -> pools
130 self.target_by_root: Dict[str, Dict[int, float]] = {} # root name -> target weight map
131 self.count_by_pool: Dict[str, dict] = {}
132 self.count_by_root: Dict[str, dict] = {}
133 self.actual_by_pool: Dict[str, dict] = {} # pool -> by_* -> actual weight map
134 self.actual_by_root: Dict[str, dict] = {} # pool -> by_* -> actual weight map
135 self.total_by_pool: Dict[str, dict] = {} # pool -> by_* -> total
136 self.total_by_root: Dict[str, dict] = {} # root -> by_* -> total
137 self.stats_by_pool: Dict[str, dict] = {} # pool -> by_* -> stddev or avg -> value
138 self.stats_by_root: Dict[str, dict] = {} # root -> by_* -> stddev or avg -> value
139
140 self.score_by_pool: Dict[str, float] = {}
141 self.score_by_root: Dict[str, Dict[str, float]] = {}
94b18763
FG
142
143 self.score = 0.0
3efd9988 144
20effc67 145 def show(self, verbose: bool = False) -> str:
3efd9988
FG
146 if verbose:
147 r = self.ms.desc + '\n'
148 r += 'target_by_root %s\n' % self.target_by_root
149 r += 'actual_by_pool %s\n' % self.actual_by_pool
150 r += 'actual_by_root %s\n' % self.actual_by_root
151 r += 'count_by_pool %s\n' % self.count_by_pool
152 r += 'count_by_root %s\n' % self.count_by_root
153 r += 'total_by_pool %s\n' % self.total_by_pool
154 r += 'total_by_root %s\n' % self.total_by_root
155 r += 'stats_by_root %s\n' % self.stats_by_root
156 r += 'score_by_pool %s\n' % self.score_by_pool
157 r += 'score_by_root %s\n' % self.score_by_root
158 else:
159 r = self.ms.desc + ' '
160 r += 'score %f (lower is better)\n' % self.score
161 return r
162
163 def calc_stats(self, count, target, total):
164 num = max(len(target), 1)
20effc67 165 r: Dict[str, Dict[str, Union[int, float]]] = {}
3efd9988 166 for t in ('pgs', 'objects', 'bytes'):
94b18763
FG
167 if total[t] == 0:
168 r[t] = {
9f95a23c
TL
169 'max': 0,
170 'min': 0,
94b18763
FG
171 'avg': 0,
172 'stddev': 0,
173 'sum_weight': 0,
174 'score': 0,
175 }
176 continue
177
3efd9988
FG
178 avg = float(total[t]) / float(num)
179 dev = 0.0
180
181 # score is a measure of how uneven the data distribution is.
182 # score lies between [0, 1), 0 means perfect distribution.
183 score = 0.0
184 sum_weight = 0.0
185
f67539c2 186 for k, v in count[t].items():
3efd9988
FG
187 # adjust/normalize by weight
188 if target[k]:
189 adjusted = float(v) / target[k] / float(num)
190 else:
191 adjusted = 0.0
192
193 # Overweighted devices and their weights are factors to calculate reweight_urgency.
194 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
195 # situation than one 10% overfilled with 5 2% underfilled devices
196 if adjusted > avg:
197 '''
198 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
199 x = (adjusted - avg)/avg.
200 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
201 To bring range of F(x) in range [0, 1), we need to make the above modification.
202
203 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
204 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
205 2. A larger value of x, should imply more urgency to reweight.
206 3. Also, the difference between F(x) when x is large, should be minimal.
207 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
208
209 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
210
211 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
212 '''
20effc67 213 score += target[k] * (math.erf(((adjusted - avg) / avg) / math.sqrt(2.0)))
3efd9988
FG
214 sum_weight += target[k]
215 dev += (avg - adjusted) * (avg - adjusted)
216 stddev = math.sqrt(dev / float(max(num - 1, 1)))
217 score = score / max(sum_weight, 1)
218 r[t] = {
9f95a23c
TL
219 'max': max(count[t].values()),
220 'min': min(count[t].values()),
3efd9988
FG
221 'avg': avg,
222 'stddev': stddev,
223 'sum_weight': sum_weight,
224 'score': score,
225 }
226 return r
227
20effc67 228
3efd9988 229class Module(MgrModule):
11fdf7f2 230 MODULE_OPTIONS = [
20effc67
TL
231 Option(name='active',
232 type='bool',
233 default=True,
234 desc='automatically balance PGs across cluster',
235 runtime=True),
236 Option(name='begin_time',
237 type='str',
238 default='0000',
239 desc='beginning time of day to automatically balance',
240 long_desc='This is a time of day in the format HHMM.',
241 runtime=True),
242 Option(name='end_time',
243 type='str',
1e59de90 244 default='2359',
20effc67
TL
245 desc='ending time of day to automatically balance',
246 long_desc='This is a time of day in the format HHMM.',
247 runtime=True),
248 Option(name='begin_weekday',
249 type='uint',
250 default=0,
251 min=0,
1e59de90 252 max=6,
20effc67 253 desc='Restrict automatic balancing to this day of the week or later',
1e59de90 254 long_desc='0 = Sunday, 1 = Monday, etc.',
20effc67
TL
255 runtime=True),
256 Option(name='end_weekday',
257 type='uint',
1e59de90 258 default=0,
20effc67 259 min=0,
1e59de90 260 max=6,
20effc67 261 desc='Restrict automatic balancing to days of the week earlier than this',
1e59de90 262 long_desc='0 = Sunday, 1 = Monday, etc.',
20effc67
TL
263 runtime=True),
264 Option(name='crush_compat_max_iterations',
265 type='uint',
266 default=25,
267 min=1,
268 max=250,
269 desc='maximum number of iterations to attempt optimization',
270 runtime=True),
271 Option(name='crush_compat_metrics',
272 type='str',
273 default='pgs,objects,bytes',
274 desc='metrics with which to calculate OSD utilization',
275 long_desc='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
276 runtime=True),
277 Option(name='crush_compat_step',
278 type='float',
279 default=.5,
280 min=.001,
281 max=.999,
282 desc='aggressiveness of optimization',
283 long_desc='.99 is very aggressive, .01 is less aggressive',
284 runtime=True),
285 Option(name='min_score',
286 type='float',
287 default=0,
288 desc='minimum score, below which no optimization is attempted',
289 runtime=True),
290 Option(name='mode',
291 desc='Balancer mode',
292 default='upmap',
293 enum_allowed=['none', 'crush-compat', 'upmap'],
294 runtime=True),
295 Option(name='sleep_interval',
296 type='secs',
297 default=60,
298 desc='how frequently to wake up and attempt optimization',
299 runtime=True),
300 Option(name='upmap_max_optimizations',
301 type='uint',
302 default=10,
303 desc='maximum upmap optimizations to make per attempt',
304 runtime=True),
305 Option(name='upmap_max_deviation',
306 type='int',
307 default=5,
308 min=1,
309 desc='deviation below which no optimization is attempted',
310 long_desc='If the number of PGs are within this count then no optimization is attempted',
311 runtime=True),
312 Option(name='pool_ids',
313 type='str',
314 default='',
315 desc='pools which the automatic balancing will be limited to',
316 runtime=True)
11fdf7f2
TL
317 ]
318
3efd9988
FG
319 active = False
320 run = True
20effc67 321 plans: Dict[str, Plan] = {}
3efd9988 322 mode = ''
eafe8130
TL
323 optimizing = False
324 last_optimize_started = ''
325 last_optimize_duration = ''
326 optimize_result = ''
39ae355f 327 no_optimization_needed = False
eafe8130
TL
328 success_string = 'Optimization plan created successfully'
329 in_progress_string = 'in progress'
3efd9988 330
20effc67 331 def __init__(self, *args: Any, **kwargs: Any) -> None:
3efd9988
FG
332 super(Module, self).__init__(*args, **kwargs)
333 self.event = Event()
334
20effc67
TL
335 @CLIReadCommand('balancer status')
336 def show_status(self) -> Tuple[int, str, str]:
337 """
338 Show balancer status
339 """
340 s = {
341 'plans': list(self.plans.keys()),
342 'active': self.active,
343 'last_optimize_started': self.last_optimize_started,
344 'last_optimize_duration': self.last_optimize_duration,
345 'optimize_result': self.optimize_result,
39ae355f 346 'no_optimization_needed': self.no_optimization_needed,
20effc67
TL
347 'mode': self.get_module_option('mode'),
348 }
349 return (0, json.dumps(s, indent=4, sort_keys=True), '')
350
351 @CLICommand('balancer mode')
352 def set_mode(self, mode: Mode) -> Tuple[int, str, str]:
353 """
354 Set balancer mode
355 """
356 if mode == Mode.upmap:
357 min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
358 if min_compat_client < 'luminous': # works well because version is alphabetized..
359 warn = ('min_compat_client "%s" '
360 '< "luminous", which is required for pg-upmap. '
361 'Try "ceph osd set-require-min-compat-client luminous" '
362 'before enabling this mode' % min_compat_client)
363 return (-errno.EPERM, '', warn)
364 elif mode == Mode.crush_compat:
365 ms = MappingState(self.get_osdmap(),
366 self.get("pg_stats"),
367 self.get("pool_stats"),
368 'initialize compat weight-set')
369 self.get_compat_weight_set_weights(ms) # ignore error
370 self.set_module_option('mode', mode.value)
371 return (0, '', '')
372
373 @CLICommand('balancer on')
374 def on(self) -> Tuple[int, str, str]:
375 """
376 Enable automatic balancing
377 """
378 if not self.active:
379 self.set_module_option('active', 'true')
380 self.active = True
381 self.event.set()
382 return (0, '', '')
383
384 @CLICommand('balancer off')
385 def off(self) -> Tuple[int, str, str]:
386 """
387 Disable automatic balancing
388 """
389 if self.active:
390 self.set_module_option('active', 'false')
391 self.active = False
392 self.event.set()
393 return (0, '', '')
394
395 @CLIReadCommand('balancer pool ls')
396 def pool_ls(self) -> Tuple[int, str, str]:
397 """
398 List automatic balancing pools
399
400 Note that empty list means all existing pools will be automatic balancing targets,
401 which is the default behaviour of balancer.
402 """
403 pool_ids = cast(str, self.get_module_option('pool_ids'))
404 if pool_ids == '':
11fdf7f2 405 return (0, '', '')
20effc67
TL
406 pool_ids = [int(p) for p in pool_ids.split(',')]
407 pool_name_by_id = dict((p['pool'], p['pool_name'])
408 for p in self.get_osdmap().dump().get('pools', []))
409 should_prune = False
410 final_ids: List[int] = []
411 final_names = []
412 for p in pool_ids:
413 if p in pool_name_by_id:
414 final_ids.append(p)
415 final_names.append(pool_name_by_id[p])
3efd9988 416 else:
20effc67
TL
417 should_prune = True
418 if should_prune: # some pools were gone, prune
419 self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids))
420 return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
421
422 @CLICommand('balancer pool add')
423 def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]:
424 """
425 Enable automatic balancing for specific pools
426 """
427 raw_names = pools
428 pool_id_by_name = dict((p['pool_name'], p['pool'])
429 for p in self.get_osdmap().dump().get('pools', []))
430 invalid_names = [p for p in raw_names if p not in pool_id_by_name]
431 if invalid_names:
432 return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
433 to_add = set(str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name)
434 pool_ids = cast(str, self.get_module_option('pool_ids'))
435 existing = set(pool_ids.split(',') if pool_ids else [])
436 final = to_add | existing
437 self.set_module_option('pool_ids', ','.join(final))
438 return (0, '', '')
439
440 @CLICommand('balancer pool rm')
441 def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]:
442 """
443 Disable automatic balancing for specific pools
444 """
445 raw_names = pools
446 existing = cast(str, self.get_module_option('pool_ids'))
447 if existing == '': # for idempotence
448 return (0, '', '')
449 existing = existing.split(',')
450 osdmap = self.get_osdmap()
451 pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
452 pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
453 final = [p for p in existing if p in pool_ids]
454 to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
455 final = set(final) - set(to_delete)
456 self.set_module_option('pool_ids', ','.join(final))
457 return (0, '', '')
458
459 def _state_from_option(self, option: Optional[str] = None) -> Tuple[MappingState, List[str]]:
460 pools = []
461 if option is None:
462 ms = MappingState(self.get_osdmap(),
463 self.get("pg_stats"),
464 self.get("pool_stats"),
465 'current cluster')
466 elif option in self.plans:
467 plan = self.plans.get(option)
468 assert plan
469 pools = plan.pools
470 if plan.mode == 'upmap':
471 # Note that for upmap, to improve the efficiency,
472 # we use a basic version of Plan without keeping the obvious
473 # *redundant* MS member.
474 # Hence ms might not be accurate here since we are basically
475 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
476 # It should not be a big deal though..
477 ms = MappingState(plan.osdmap,
9f95a23c
TL
478 self.get("pg_stats"),
479 self.get("pool_stats"),
20effc67
TL
480 f'plan "{plan.name}"')
481 else:
482 ms = cast(MsPlan, plan).final_state()
483 else:
484 # not a plan, does it look like a pool?
94b18763
FG
485 osdmap = self.get_osdmap()
486 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
20effc67
TL
487 if option not in valid_pool_names:
488 raise ValueError(f'option "{option}" not a plan or a pool')
489 pools.append(option)
490 ms = MappingState(osdmap,
491 self.get("pg_stats"),
492 self.get("pool_stats"),
493 f'pool "{option}"')
494 return ms, pools
495
496 @CLIReadCommand('balancer eval-verbose')
497 def plan_eval_verbose(self, option: Optional[str] = None):
498 """
499 Evaluate data distribution for the current cluster or specific pool or specific
500 plan (verbosely)
501 """
502 try:
503 ms, pools = self._state_from_option(option)
504 return (0, self.evaluate(ms, pools, verbose=True), '')
505 except ValueError as e:
506 return (-errno.EINVAL, '', str(e))
507
508 @CLIReadCommand('balancer eval')
509 def plan_eval_brief(self, option: Optional[str] = None):
510 """
511 Evaluate data distribution for the current cluster or specific pool or specific plan
512 """
513 try:
514 ms, pools = self._state_from_option(option)
515 return (0, self.evaluate(ms, pools, verbose=False), '')
516 except ValueError as e:
517 return (-errno.EINVAL, '', str(e))
518
519 @CLIReadCommand('balancer optimize')
520 def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]:
521 """
522 Run optimizer to create a new plan
523 """
524 # The GIL can be release by the active balancer, so disallow when active
525 if self.active:
526 return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
527 if self.optimizing:
528 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
529 osdmap = self.get_osdmap()
530 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
531 invalid_pool_names = []
532 for p in pools:
533 if p not in valid_pool_names:
534 invalid_pool_names.append(p)
535 if len(invalid_pool_names):
536 return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
537 plan_ = self.plan_create(plan, osdmap, pools)
538 self.last_optimize_started = time.asctime(time.localtime())
539 self.optimize_result = self.in_progress_string
540 start = time.time()
541 r, detail = self.optimize(plan_)
542 end = time.time()
543 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
544 if r == 0:
545 # Add plan if an optimization was created
546 self.optimize_result = self.success_string
547 self.plans[plan] = plan_
3efd9988 548 else:
20effc67
TL
549 self.optimize_result = detail
550 return (r, '', detail)
551
552 @CLIReadCommand('balancer show')
553 def plan_show(self, plan: str) -> Tuple[int, str, str]:
554 """
555 Show details of an optimization plan
556 """
557 plan_ = self.plans.get(plan)
558 if not plan_:
559 return (-errno.ENOENT, '', f'plan {plan} not found')
560 return (0, plan_.show(), '')
561
562 @CLICommand('balancer rm')
563 def plan_rm(self, plan: str) -> Tuple[int, str, str]:
564 """
565 Discard an optimization plan
566 """
567 if plan in self.plans:
568 del self.plans[plan]
569 return (0, '', '')
570
571 @CLICommand('balancer reset')
572 def plan_reset(self) -> Tuple[int, str, str]:
573 """
574 Discard all optimization plans
575 """
576 self.plans = {}
577 return (0, '', '')
578
579 @CLIReadCommand('balancer dump')
580 def plan_dump(self, plan: str) -> Tuple[int, str, str]:
581 """
582 Show an optimization plan
583 """
584 plan_ = self.plans.get(plan)
585 if not plan_:
586 return -errno.ENOENT, '', f'plan {plan} not found'
587 else:
588 return (0, plan_.dump(), '')
589
590 @CLIReadCommand('balancer ls')
591 def plan_ls(self) -> Tuple[int, str, str]:
592 """
593 List all plans
594 """
595 return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
596
597 @CLIReadCommand('balancer execute')
598 def plan_execute(self, plan: str) -> Tuple[int, str, str]:
599 """
600 Execute an optimization plan
601 """
602 # The GIL can be release by the active balancer, so disallow when active
603 if self.active:
604 return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
605 if self.optimizing:
606 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
607 plan_ = self.plans.get(plan)
608 if not plan_:
609 return (-errno.ENOENT, '', f'plan {plan} not found')
610 r, detail = self.execute(plan_)
611 self.plan_rm(plan)
612 return (r, '', detail)
613
614 def shutdown(self) -> None:
3efd9988
FG
615 self.log.info('Stopping')
616 self.run = False
617 self.event.set()
618
20effc67 619 def time_permit(self) -> bool:
a8e16298
TL
620 local_time = time.localtime()
621 time_of_day = time.strftime('%H%M', local_time)
20effc67 622 weekday = (local_time.tm_wday + 1) % 7 # be compatible with C
a8e16298
TL
623 permit = False
624
1e59de90
TL
625 def check_time(time: str, option: str):
626 if len(time) != 4:
627 self.log.error('invalid time for %s - expected HHMM format', option)
628 try:
629 datetime.time(int(time[:2]), int(time[2:]))
630 except ValueError as err:
631 self.log.error('invalid time for %s - %s', option, err)
632
20effc67 633 begin_time = cast(str, self.get_module_option('begin_time'))
1e59de90 634 check_time(begin_time, 'begin_time')
20effc67 635 end_time = cast(str, self.get_module_option('end_time'))
1e59de90
TL
636 check_time(end_time, 'end_time')
637 if begin_time < end_time:
a8e16298 638 permit = begin_time <= time_of_day < end_time
1e59de90
TL
639 elif begin_time == end_time:
640 permit = True
a8e16298
TL
641 else:
642 permit = time_of_day >= begin_time or time_of_day < end_time
643 if not permit:
644 self.log.debug("should run between %s - %s, now %s, skipping",
645 begin_time, end_time, time_of_day)
646 return False
647
20effc67
TL
648 begin_weekday = cast(int, self.get_module_option('begin_weekday'))
649 end_weekday = cast(int, self.get_module_option('end_weekday'))
1e59de90
TL
650 if begin_weekday < end_weekday:
651 permit = begin_weekday <= weekday <= end_weekday
652 elif begin_weekday == end_weekday:
653 permit = True
3efd9988 654 else:
a8e16298
TL
655 permit = weekday >= begin_weekday or weekday < end_weekday
656 if not permit:
657 self.log.debug("should run between weekday %d - %d, now %d, skipping",
658 begin_weekday, end_weekday, weekday)
659 return False
660
661 return True
3efd9988 662
20effc67 663 def serve(self) -> None:
3efd9988
FG
664 self.log.info('Starting')
665 while self.run:
20effc67
TL
666 self.active = cast(bool, self.get_module_option('active'))
667 sleep_interval = cast(float, self.get_module_option('sleep_interval'))
a8e16298
TL
668 self.log.debug('Waking up [%s, now %s]',
669 "active" if self.active else "inactive",
670 time.strftime(TIME_FORMAT, time.localtime()))
671 if self.active and self.time_permit():
3efd9988
FG
672 self.log.debug('Running')
673 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
11fdf7f2 674 osdmap = self.get_osdmap()
20effc67
TL
675 pool_ids = cast(str, self.get_module_option('pool_ids'))
676 if pool_ids:
677 allow = [int(p) for p in pool_ids.split(',')]
678 else:
679 allow = []
680 final: List[str] = []
681 if allow:
682 pools = osdmap.dump().get('pools', [])
683 valid = [p['pool'] for p in pools]
684 ids = set(allow) & set(valid)
685 if set(allow) - set(valid): # some pools were gone, prune
686 self.set_module_option('pool_ids', ','.join(str(p) for p in ids))
687 pool_name_by_id = dict((p['pool'], p['pool_name']) for p in pools)
688 final = [pool_name_by_id[p] for p in ids if p in pool_name_by_id]
11fdf7f2 689 plan = self.plan_create(name, osdmap, final)
eafe8130
TL
690 self.optimizing = True
691 self.last_optimize_started = time.asctime(time.localtime())
692 self.optimize_result = self.in_progress_string
693 start = time.time()
94b18763 694 r, detail = self.optimize(plan)
eafe8130
TL
695 end = time.time()
696 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
94b18763 697 if r == 0:
eafe8130 698 self.optimize_result = self.success_string
3efd9988 699 self.execute(plan)
eafe8130
TL
700 else:
701 self.optimize_result = detail
702 self.optimizing = False
3efd9988
FG
703 self.log.debug('Sleeping for %d', sleep_interval)
704 self.event.wait(sleep_interval)
705 self.event.clear()
706
20effc67
TL
707 def plan_create(self, name: str, osdmap: OSDMap, pools: List[str]) -> Plan:
708 mode = cast(str, self.get_module_option('mode'))
9f95a23c
TL
709 if mode == 'upmap':
710 # drop unnecessary MS member for upmap mode.
711 # this way we could effectively eliminate the usage of a
712 # complete pg_stats, which can become horribly inefficient
713 # as pg_num grows..
714 plan = Plan(name, mode, osdmap, pools)
715 else:
716 plan = MsPlan(name,
717 mode,
718 MappingState(osdmap,
719 self.get("pg_stats"),
720 self.get("pool_stats"),
721 'plan %s initial' % name),
722 pools)
3efd9988
FG
723 return plan
724
20effc67 725 def calc_eval(self, ms: MappingState, pools: List[str]) -> Eval:
3efd9988
FG
726 pe = Eval(ms)
727 pool_rule = {}
728 pool_info = {}
20effc67 729 for p in ms.osdmap_dump.get('pools', []):
94b18763
FG
730 if len(pools) and p['pool_name'] not in pools:
731 continue
732 # skip dead or not-yet-ready pools too
733 if p['pool'] not in ms.poolids:
734 continue
3efd9988
FG
735 pe.pool_name[p['pool']] = p['pool_name']
736 pe.pool_id[p['pool_name']] = p['pool']
737 pool_rule[p['pool_name']] = p['crush_rule']
738 pe.pool_roots[p['pool_name']] = []
739 pool_info[p['pool_name']] = p
94b18763 740 if len(pool_info) == 0:
3efd9988
FG
741 return pe
742 self.log.debug('pool_name %s' % pe.pool_name)
743 self.log.debug('pool_id %s' % pe.pool_id)
744 self.log.debug('pools %s' % pools)
745 self.log.debug('pool_rule %s' % pool_rule)
746
20effc67
TL
747 osd_weight = {a['osd']: a['weight']
748 for a in ms.osdmap_dump.get('osds', []) if a['weight'] > 0}
3efd9988
FG
749
750 # get expected distributions by root
20effc67 751 actual_by_root: Dict[str, Dict[str, dict]] = {}
3efd9988
FG
752 rootids = ms.crush.find_takes()
753 roots = []
754 for rootid in rootids:
3efd9988 755 ls = ms.osdmap.get_pools_by_take(rootid)
94b18763
FG
756 want = []
757 # find out roots associating with pools we are passed in
758 for candidate in ls:
759 if candidate in pe.pool_name:
760 want.append(candidate)
761 if len(want) == 0:
762 continue
763 root = ms.crush.get_item_name(rootid)
3efd9988 764 pe.root_pools[root] = []
94b18763 765 for poolid in want:
3efd9988
FG
766 pe.pool_roots[pe.pool_name[poolid]].append(root)
767 pe.root_pools[root].append(pe.pool_name[poolid])
94b18763
FG
768 pe.root_ids[root] = rootid
769 roots.append(root)
3efd9988
FG
770 weight_map = ms.crush.get_take_weight_osd_map(rootid)
771 adjusted_map = {
94b18763 772 osd: cw * osd_weight[osd]
20effc67 773 for osd, cw in weight_map.items() if osd in osd_weight and cw > 0
3efd9988 774 }
94b18763
FG
775 sum_w = sum(adjusted_map.values())
776 assert len(adjusted_map) == 0 or sum_w > 0
20effc67
TL
777 pe.target_by_root[root] = {osd: w / sum_w
778 for osd, w in adjusted_map.items()}
3efd9988
FG
779 actual_by_root[root] = {
780 'pgs': {},
781 'objects': {},
782 'bytes': {},
783 }
11fdf7f2 784 for osd in pe.target_by_root[root]:
3efd9988
FG
785 actual_by_root[root]['pgs'][osd] = 0
786 actual_by_root[root]['objects'][osd] = 0
787 actual_by_root[root]['bytes'][osd] = 0
788 pe.total_by_root[root] = {
789 'pgs': 0,
790 'objects': 0,
791 'bytes': 0,
792 }
793 self.log.debug('pool_roots %s' % pe.pool_roots)
794 self.log.debug('root_pools %s' % pe.root_pools)
795 self.log.debug('target_by_root %s' % pe.target_by_root)
796
797 # pool and root actual
f67539c2 798 for pool, pi in pool_info.items():
3efd9988
FG
799 poolid = pi['pool']
800 pm = ms.pg_up_by_poolid[poolid]
801 pgs = 0
802 objects = 0
803 bytes = 0
804 pgs_by_osd = {}
805 objects_by_osd = {}
806 bytes_by_osd = {}
f67539c2 807 for pgid, up in pm.items():
3efd9988 808 for osd in [int(osd) for osd in up]:
b32b8144
FG
809 if osd == CRUSHMap.ITEM_NONE:
810 continue
9f95a23c
TL
811 if osd not in pgs_by_osd:
812 pgs_by_osd[osd] = 0
813 objects_by_osd[osd] = 0
814 bytes_by_osd[osd] = 0
3efd9988
FG
815 pgs_by_osd[osd] += 1
816 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
817 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
818 # pick a root to associate this pg instance with.
819 # note that this is imprecise if the roots have
820 # overlapping children.
821 # FIXME: divide bytes by k for EC pools.
822 for root in pe.pool_roots[pool]:
823 if osd in pe.target_by_root[root]:
824 actual_by_root[root]['pgs'][osd] += 1
825 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
826 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
827 pgs += 1
828 objects += ms.pg_stat[pgid]['num_objects']
829 bytes += ms.pg_stat[pgid]['num_bytes']
830 pe.total_by_root[root]['pgs'] += 1
831 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
832 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
833 break
834 pe.count_by_pool[pool] = {
835 'pgs': {
836 k: v
f67539c2 837 for k, v in pgs_by_osd.items()
3efd9988
FG
838 },
839 'objects': {
840 k: v
f67539c2 841 for k, v in objects_by_osd.items()
3efd9988
FG
842 },
843 'bytes': {
844 k: v
f67539c2 845 for k, v in bytes_by_osd.items()
3efd9988
FG
846 },
847 }
848 pe.actual_by_pool[pool] = {
849 'pgs': {
850 k: float(v) / float(max(pgs, 1))
f67539c2 851 for k, v in pgs_by_osd.items()
3efd9988
FG
852 },
853 'objects': {
854 k: float(v) / float(max(objects, 1))
f67539c2 855 for k, v in objects_by_osd.items()
3efd9988
FG
856 },
857 'bytes': {
858 k: float(v) / float(max(bytes, 1))
f67539c2 859 for k, v in bytes_by_osd.items()
3efd9988
FG
860 },
861 }
862 pe.total_by_pool[pool] = {
863 'pgs': pgs,
864 'objects': objects,
865 'bytes': bytes,
866 }
11fdf7f2 867 for root in pe.total_by_root:
3efd9988
FG
868 pe.count_by_root[root] = {
869 'pgs': {
870 k: float(v)
f67539c2 871 for k, v in actual_by_root[root]['pgs'].items()
3efd9988
FG
872 },
873 'objects': {
874 k: float(v)
f67539c2 875 for k, v in actual_by_root[root]['objects'].items()
3efd9988
FG
876 },
877 'bytes': {
878 k: float(v)
f67539c2 879 for k, v in actual_by_root[root]['bytes'].items()
3efd9988
FG
880 },
881 }
882 pe.actual_by_root[root] = {
883 'pgs': {
884 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
f67539c2 885 for k, v in actual_by_root[root]['pgs'].items()
3efd9988
FG
886 },
887 'objects': {
888 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
f67539c2 889 for k, v in actual_by_root[root]['objects'].items()
3efd9988
FG
890 },
891 'bytes': {
892 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
f67539c2 893 for k, v in actual_by_root[root]['bytes'].items()
3efd9988
FG
894 },
895 }
896 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
897 self.log.debug('actual_by_root %s' % pe.actual_by_root)
898
899 # average and stddev and score
900 pe.stats_by_root = {
901 a: pe.calc_stats(
902 b,
903 pe.target_by_root[a],
904 pe.total_by_root[a]
f67539c2 905 ) for a, b in pe.count_by_root.items()
3efd9988 906 }
94b18763 907 self.log.debug('stats_by_root %s' % pe.stats_by_root)
3efd9988 908
20effc67 909 # the scores are already normalized
3efd9988
FG
910 pe.score_by_root = {
911 r: {
912 'pgs': pe.stats_by_root[r]['pgs']['score'],
913 'objects': pe.stats_by_root[r]['objects']['score'],
914 'bytes': pe.stats_by_root[r]['bytes']['score'],
915 } for r in pe.total_by_root.keys()
916 }
94b18763 917 self.log.debug('score_by_root %s' % pe.score_by_root)
3efd9988 918
f64942e4 919 # get the list of score metrics, comma separated
20effc67 920 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
f64942e4 921
3efd9988
FG
922 # total score is just average of normalized stddevs
923 pe.score = 0.0
f67539c2
TL
924 for r, vs in pe.score_by_root.items():
925 for k, v in vs.items():
f64942e4
AA
926 if k in metrics:
927 pe.score += v
928 pe.score /= len(metrics) * len(roots)
3efd9988
FG
929 return pe
930
20effc67 931 def evaluate(self, ms: MappingState, pools: List[str], verbose: bool = False) -> str:
94b18763 932 pe = self.calc_eval(ms, pools)
3efd9988
FG
933 return pe.show(verbose=verbose)
934
20effc67 935 def optimize(self, plan: Plan) -> Tuple[int, str]:
3efd9988 936 self.log.info('Optimize plan %s' % plan.name)
20effc67 937 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
3efd9988
FG
938 self.log.info('Mode %s, max misplaced %f' %
939 (plan.mode, max_misplaced))
940
941 info = self.get('pg_status')
942 unknown = info.get('unknown_pgs_ratio', 0.0)
943 degraded = info.get('degraded_ratio', 0.0)
944 inactive = info.get('inactive_pgs_ratio', 0.0)
945 misplaced = info.get('misplaced_ratio', 0.0)
9f95a23c 946 plan.pg_status = info
3efd9988
FG
947 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
948 unknown, degraded, inactive, misplaced)
949 if unknown > 0.0:
94b18763
FG
950 detail = 'Some PGs (%f) are unknown; try again later' % unknown
951 self.log.info(detail)
952 return -errno.EAGAIN, detail
3efd9988 953 elif degraded > 0.0:
94b18763
FG
954 detail = 'Some objects (%f) are degraded; try again later' % degraded
955 self.log.info(detail)
956 return -errno.EAGAIN, detail
3efd9988 957 elif inactive > 0.0:
94b18763
FG
958 detail = 'Some PGs (%f) are inactive; try again later' % inactive
959 self.log.info(detail)
960 return -errno.EAGAIN, detail
3efd9988 961 elif misplaced >= max_misplaced:
94b18763
FG
962 detail = 'Too many objects (%f > %f) are misplaced; ' \
963 'try again later' % (misplaced, max_misplaced)
964 self.log.info(detail)
965 return -errno.EAGAIN, detail
3efd9988
FG
966 else:
967 if plan.mode == 'upmap':
968 return self.do_upmap(plan)
969 elif plan.mode == 'crush-compat':
20effc67 970 return self.do_crush_compat(cast(MsPlan, plan))
3efd9988 971 elif plan.mode == 'none':
94b18763 972 detail = 'Please do "ceph balancer mode" to choose a valid mode first'
3efd9988 973 self.log.info('Idle')
94b18763 974 return -errno.ENOEXEC, detail
3efd9988 975 else:
94b18763
FG
976 detail = 'Unrecognized mode %s' % plan.mode
977 self.log.info(detail)
978 return -errno.EINVAL, detail
3efd9988 979
20effc67 980 def do_upmap(self, plan: Plan) -> Tuple[int, str]:
3efd9988 981 self.log.info('do_upmap')
20effc67
TL
982 max_optimizations = cast(float, self.get_module_option('upmap_max_optimizations'))
983 max_deviation = cast(int, self.get_module_option('upmap_max_deviation'))
9f95a23c 984 osdmap_dump = plan.osdmap_dump
3efd9988 985
94b18763
FG
986 if len(plan.pools):
987 pools = plan.pools
20effc67
TL
988 else: # all
989 pools = [str(i['pool_name']) for i in osdmap_dump.get('pools', [])]
3efd9988 990 if len(pools) == 0:
94b18763
FG
991 detail = 'No pools available'
992 self.log.info(detail)
993 return -errno.ENOENT, detail
92f5a8d4
TL
994 # shuffle pool list so they all get equal (in)attention
995 random.shuffle(pools)
996 self.log.info('pools %s' % pools)
3efd9988 997
92f5a8d4 998 adjusted_pools = []
3efd9988
FG
999 inc = plan.inc
1000 total_did = 0
9f95a23c 1001 left = max_optimizations
11fdf7f2
TL
1002 pools_with_pg_merge = [p['pool_name'] for p in osdmap_dump.get('pools', [])
1003 if p['pg_num'] > p['pg_num_target']]
20effc67
TL
1004 crush_rule_by_pool_name = dict((p['pool_name'], p['crush_rule'])
1005 for p in osdmap_dump.get('pools', []))
3efd9988 1006 for pool in pools:
11fdf7f2
TL
1007 if pool not in crush_rule_by_pool_name:
1008 self.log.info('pool %s does not exist' % pool)
1009 continue
1010 if pool in pools_with_pg_merge:
1011 self.log.info('pool %s has pending PG(s) for merging, skipping for now' % pool)
1012 continue
92f5a8d4 1013 adjusted_pools.append(pool)
11fdf7f2 1014 # shuffle so all pools get equal (in)attention
92f5a8d4 1015 random.shuffle(adjusted_pools)
9f95a23c 1016 pool_dump = osdmap_dump.get('pools', [])
92f5a8d4 1017 for pool in adjusted_pools:
9f95a23c
TL
1018 for p in pool_dump:
1019 if p['pool_name'] == pool:
9f95a23c
TL
1020 pool_id = p['pool']
1021 break
1022
1023 # note that here we deliberately exclude any scrubbing pgs too
1024 # since scrubbing activities have significant impacts on performance
1025 num_pg_active_clean = 0
1026 for p in plan.pg_status.get('pgs_by_pool_state', []):
1027 pgs_pool_id = p['pool_id']
1028 if pgs_pool_id != pool_id:
1029 continue
1030 for s in p['pg_state_counts']:
1031 if s['state_name'] == 'active+clean':
1032 num_pg_active_clean += s['count']
1033 break
adb31ebb 1034 available = min(left, num_pg_active_clean)
9f95a23c 1035 did = plan.osdmap.calc_pg_upmaps(inc, max_deviation, available, [pool])
3efd9988
FG
1036 total_did += did
1037 left -= did
1038 if left <= 0:
1039 break
9f95a23c 1040 self.log.info('prepared %d/%d changes' % (total_did, max_optimizations))
94b18763 1041 if total_did == 0:
39ae355f 1042 self.no_optimization_needed = True
11fdf7f2 1043 return -errno.EALREADY, 'Unable to find further optimization, ' \
92f5a8d4 1044 'or pool(s) pg_num is decreasing, ' \
94b18763
FG
1045 'or distribution is already perfect'
1046 return 0, ''
3efd9988 1047
20effc67 1048 def do_crush_compat(self, plan: MsPlan) -> Tuple[int, str]:
3efd9988 1049 self.log.info('do_crush_compat')
20effc67 1050 max_iterations = cast(int, self.get_module_option('crush_compat_max_iterations'))
3efd9988 1051 if max_iterations < 1:
94b18763 1052 return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
20effc67 1053 step = cast(float, self.get_module_option('crush_compat_step'))
3efd9988 1054 if step <= 0 or step >= 1.0:
94b18763 1055 return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
20effc67 1056 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
3efd9988
FG
1057 min_pg_per_osd = 2
1058
1059 ms = plan.initial
1060 osdmap = ms.osdmap
1061 crush = osdmap.get_crush()
94b18763 1062 pe = self.calc_eval(ms, plan.pools)
20effc67 1063 min_score_to_optimize = cast(float, self.get_module_option('min_score'))
94b18763
FG
1064 if pe.score <= min_score_to_optimize:
1065 if pe.score == 0:
1066 detail = 'Distribution is already perfect'
1067 else:
1068 detail = 'score %f <= min_score %f, will not optimize' \
1069 % (pe.score, min_score_to_optimize)
1070 self.log.info(detail)
1071 return -errno.EALREADY, detail
3efd9988
FG
1072
1073 # get current osd reweights
20effc67
TL
1074 orig_osd_weight = {a['osd']: a['weight']
1075 for a in ms.osdmap_dump.get('osds', [])}
3efd9988
FG
1076
1077 # get current compat weight-set weights
94b18763
FG
1078 orig_ws = self.get_compat_weight_set_weights(ms)
1079 if not orig_ws:
1080 return -errno.EAGAIN, 'compat weight-set not available'
20effc67 1081 orig_ws = {a: b for a, b in orig_ws.items() if a >= 0}
3efd9988
FG
1082
1083 # Make sure roots don't overlap their devices. If so, we
1084 # can't proceed.
eafe8130 1085 roots = list(pe.target_by_root.keys())
3efd9988 1086 self.log.debug('roots %s', roots)
20effc67
TL
1087 visited: Dict[int, str] = {}
1088 overlap: Dict[int, List[str]] = {}
f67539c2 1089 for root, wm in pe.target_by_root.items():
11fdf7f2 1090 for osd in wm:
3efd9988 1091 if osd in visited:
11fdf7f2 1092 if osd not in overlap:
20effc67 1093 overlap[osd] = [visited[osd]]
11fdf7f2
TL
1094 overlap[osd].append(root)
1095 visited[osd] = root
3efd9988 1096 if len(overlap) > 0:
94b18763 1097 detail = 'Some osds belong to multiple subtrees: %s' % \
11fdf7f2 1098 overlap
94b18763
FG
1099 self.log.error(detail)
1100 return -errno.EOPNOTSUPP, detail
3efd9988 1101
f64942e4 1102 # rebalance by pgs, objects, or bytes
20effc67
TL
1103 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
1104 key = metrics[0] # balancing using the first score metric
f64942e4 1105 if key not in ['pgs', 'bytes', 'objects']:
e306af50 1106 self.log.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
f64942e4 1107 key = 'pgs'
3efd9988
FG
1108
1109 # go
1110 best_ws = copy.deepcopy(orig_ws)
1111 best_ow = copy.deepcopy(orig_osd_weight)
1112 best_pe = pe
1113 left = max_iterations
1114 bad_steps = 0
1115 next_ws = copy.deepcopy(best_ws)
1116 next_ow = copy.deepcopy(best_ow)
1117 while left > 0:
1118 # adjust
1119 self.log.debug('best_ws %s' % best_ws)
1120 random.shuffle(roots)
1121 for root in roots:
1122 pools = best_pe.root_pools[root]
94b18763
FG
1123 osds = len(best_pe.target_by_root[root])
1124 min_pgs = osds * min_pg_per_osd
1125 if best_pe.total_by_root[root][key] < min_pgs:
3efd9988
FG
1126 self.log.info('Skipping root %s (pools %s), total pgs %d '
1127 '< minimum %d (%d per osd)',
94b18763
FG
1128 root, pools,
1129 best_pe.total_by_root[root][key],
1130 min_pgs, min_pg_per_osd)
3efd9988
FG
1131 continue
1132 self.log.info('Balancing root %s (pools %s) by %s' %
1133 (root, pools, key))
1134 target = best_pe.target_by_root[root]
1135 actual = best_pe.actual_by_root[root][key]
1136 queue = sorted(actual.keys(),
1137 key=lambda osd: -abs(target[osd] - actual[osd]))
1138 for osd in queue:
1139 if orig_osd_weight[osd] == 0:
1140 self.log.debug('skipping out osd.%d', osd)
1141 else:
1142 deviation = target[osd] - actual[osd]
1143 if deviation == 0:
1144 break
1145 self.log.debug('osd.%d deviation %f', osd, deviation)
1146 weight = best_ws[osd]
1147 ow = orig_osd_weight[osd]
1148 if actual[osd] > 0:
1149 calc_weight = target[osd] / actual[osd] * weight * ow
1150 else:
81eedcae
TL
1151 # for newly created osds, reset calc_weight at target value
1152 # this way weight-set will end up absorbing *step* of its
1153 # target (final) value at the very beginning and slowly catch up later.
1154 # note that if this turns out causing too many misplaced
1155 # pgs, then we'll reduce step and retry
1156 calc_weight = target[osd]
3efd9988
FG
1157 new_weight = weight * (1.0 - step) + calc_weight * step
1158 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
1159 new_weight)
1160 next_ws[osd] = new_weight
1161 if ow < 1.0:
1162 new_ow = min(1.0, max(step + (1.0 - step) * ow,
1163 ow + .005))
1164 self.log.debug('Reweight osd.%d reweight %f -> %f',
1165 osd, ow, new_ow)
1166 next_ow[osd] = new_ow
1167
1168 # normalize weights under this root
1169 root_weight = crush.get_item_weight(pe.root_ids[root])
20effc67 1170 root_sum = sum(b for a, b in next_ws.items()
3efd9988
FG
1171 if a in target.keys())
1172 if root_sum > 0 and root_weight > 0:
1173 factor = root_sum / root_weight
1174 self.log.debug('normalizing root %s %d, weight %f, '
1175 'ws sum %f, factor %f',
1176 root, pe.root_ids[root], root_weight,
1177 root_sum, factor)
1178 for osd in actual.keys():
1179 next_ws[osd] = next_ws[osd] / factor
1180
1181 # recalc
1182 plan.compat_ws = copy.deepcopy(next_ws)
1183 next_ms = plan.final_state()
94b18763 1184 next_pe = self.calc_eval(next_ms, plan.pools)
3efd9988
FG
1185 next_misplaced = next_ms.calc_misplaced_from(ms)
1186 self.log.debug('Step result score %f -> %f, misplacing %f',
1187 best_pe.score, next_pe.score, next_misplaced)
1188
1189 if next_misplaced > max_misplaced:
1190 if best_pe.score < pe.score:
1191 self.log.debug('Step misplaced %f > max %f, stopping',
1192 next_misplaced, max_misplaced)
1193 break
1194 step /= 2.0
1195 next_ws = copy.deepcopy(best_ws)
1196 next_ow = copy.deepcopy(best_ow)
1197 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
1198 next_misplaced, max_misplaced, step)
1199 else:
1200 if next_pe.score > best_pe.score * 1.0001:
94b18763 1201 bad_steps += 1
3efd9988
FG
1202 if bad_steps < 5 and random.randint(0, 100) < 70:
1203 self.log.debug('Score got worse, taking another step')
1204 else:
1205 step /= 2.0
1206 next_ws = copy.deepcopy(best_ws)
1207 next_ow = copy.deepcopy(best_ow)
1208 self.log.debug('Score got worse, trying smaller step %f',
1209 step)
1210 else:
1211 bad_steps = 0
1212 best_pe = next_pe
91327a77
AA
1213 best_ws = copy.deepcopy(next_ws)
1214 best_ow = copy.deepcopy(next_ow)
3efd9988
FG
1215 if best_pe.score == 0:
1216 break
1217 left -= 1
1218
1219 # allow a small regression if we are phasing out osd weights
20effc67 1220 fudge = 0.0
81eedcae 1221 if best_ow != orig_osd_weight:
3efd9988
FG
1222 fudge = .001
1223
1224 if best_pe.score < pe.score + fudge:
1225 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
1226 plan.compat_ws = best_ws
f67539c2 1227 for osd, w in best_ow.items():
3efd9988
FG
1228 if w != orig_osd_weight[osd]:
1229 self.log.debug('osd.%d reweight %f', osd, w)
1230 plan.osd_weights[osd] = w
94b18763 1231 return 0, ''
3efd9988
FG
1232 else:
1233 self.log.info('Failed to find further optimization, score %f',
1234 pe.score)
94b18763
FG
1235 plan.compat_ws = {}
1236 return -errno.EDOM, 'Unable to find further optimization, ' \
1237 'change balancer mode and retry might help'
1238
20effc67 1239 def get_compat_weight_set_weights(self, ms: MappingState):
f67539c2
TL
1240 have_choose_args = CRUSHMap.have_default_choose_args(ms.crush_dump)
1241 if have_choose_args:
1242 # get number of buckets in choose_args
1243 choose_args_len = len(CRUSHMap.get_default_choose_args(ms.crush_dump))
1244 if not have_choose_args or choose_args_len != len(ms.crush_dump['buckets']):
94b18763 1245 # enable compat weight-set first
f67539c2 1246 self.log.debug('no choose_args or all buckets do not have weight-sets')
94b18763
FG
1247 self.log.debug('ceph osd crush weight-set create-compat')
1248 result = CommandResult('')
1249 self.send_command(result, 'mon', '', json.dumps({
1250 'prefix': 'osd crush weight-set create-compat',
1251 'format': 'json',
1252 }), '')
1253 r, outb, outs = result.wait()
1254 if r != 0:
1255 self.log.error('Error creating compat weight-set')
1256 return
1257
1258 result = CommandResult('')
1259 self.send_command(result, 'mon', '', json.dumps({
1260 'prefix': 'osd crush dump',
1261 'format': 'json',
1262 }), '')
1263 r, outb, outs = result.wait()
1264 if r != 0:
1265 self.log.error('Error dumping crush map')
1266 return
1267 try:
1268 crushmap = json.loads(outb)
20effc67 1269 except json.JSONDecodeError:
94b18763
FG
1270 raise RuntimeError('unable to parse crush map')
1271 else:
1272 crushmap = ms.crush_dump
3efd9988 1273
11fdf7f2 1274 raw = CRUSHMap.get_default_choose_args(crushmap)
3efd9988
FG
1275 weight_set = {}
1276 for b in raw:
1277 bucket = None
1278 for t in crushmap['buckets']:
1279 if t['id'] == b['bucket_id']:
1280 bucket = t
1281 break
1282 if not bucket:
1283 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
1284 self.log.debug('bucket items %s' % bucket['items'])
1285 self.log.debug('weight set %s' % b['weight_set'][0])
1286 if len(bucket['items']) != len(b['weight_set'][0]):
1287 raise RuntimeError('weight-set size does not match bucket items')
1288 for pos in range(len(bucket['items'])):
1289 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
1290
1291 self.log.debug('weight_set weights %s' % weight_set)
1292 return weight_set
1293
20effc67 1294 def do_crush(self) -> None:
3efd9988
FG
1295 self.log.info('do_crush (not yet implemented)')
1296
20effc67 1297 def do_osd_weight(self) -> None:
3efd9988
FG
1298 self.log.info('do_osd_weight (not yet implemented)')
1299
20effc67 1300 def execute(self, plan: Plan) -> Tuple[int, str]:
3efd9988
FG
1301 self.log.info('Executing plan %s' % plan.name)
1302
1303 commands = []
1304
1305 # compat weight-set
20effc67
TL
1306 if len(plan.compat_ws):
1307 ms_plan = cast(MsPlan, plan)
1308 if not CRUSHMap.have_default_choose_args(ms_plan.initial.crush_dump):
1309 self.log.debug('ceph osd crush weight-set create-compat')
1310 result = CommandResult('')
1311 self.send_command(result, 'mon', '', json.dumps({
1312 'prefix': 'osd crush weight-set create-compat',
1313 'format': 'json',
1314 }), '')
1315 r, outb, outs = result.wait()
1316 if r != 0:
1317 self.log.error('Error creating compat weight-set')
1318 return r, outs
3efd9988 1319
f67539c2 1320 for osd, weight in plan.compat_ws.items():
3efd9988
FG
1321 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1322 osd, weight)
b32b8144 1323 result = CommandResult('')
3efd9988
FG
1324 self.send_command(result, 'mon', '', json.dumps({
1325 'prefix': 'osd crush weight-set reweight-compat',
1326 'format': 'json',
1327 'item': 'osd.%d' % osd,
1328 'weight': [weight],
b32b8144 1329 }), '')
3efd9988
FG
1330 commands.append(result)
1331
1332 # new_weight
1333 reweightn = {}
f67539c2 1334 for osd, weight in plan.osd_weights.items():
3efd9988
FG
1335 reweightn[str(osd)] = str(int(weight * float(0x10000)))
1336 if len(reweightn):
1337 self.log.info('ceph osd reweightn %s', reweightn)
b32b8144 1338 result = CommandResult('')
3efd9988
FG
1339 self.send_command(result, 'mon', '', json.dumps({
1340 'prefix': 'osd reweightn',
1341 'format': 'json',
1342 'weights': json.dumps(reweightn),
b32b8144 1343 }), '')
3efd9988
FG
1344 commands.append(result)
1345
1346 # upmap
1347 incdump = plan.inc.dump()
9f95a23c
TL
1348 for item in incdump.get('new_pg_upmap', []):
1349 self.log.info('ceph osd pg-upmap %s mappings %s', item['pgid'],
1350 item['osds'])
3efd9988
FG
1351 result = CommandResult('foo')
1352 self.send_command(result, 'mon', '', json.dumps({
9f95a23c
TL
1353 'prefix': 'osd pg-upmap',
1354 'format': 'json',
1355 'pgid': item['pgid'],
1356 'id': item['osds'],
1357 }), 'foo')
1358 commands.append(result)
1359
1360 for pgid in incdump.get('old_pg_upmap', []):
1361 self.log.info('ceph osd rm-pg-upmap %s', pgid)
1362 result = CommandResult('foo')
1363 self.send_command(result, 'mon', '', json.dumps({
1364 'prefix': 'osd rm-pg-upmap',
3efd9988
FG
1365 'format': 'json',
1366 'pgid': pgid,
1367 }), 'foo')
1368 commands.append(result)
1369
1370 for item in incdump.get('new_pg_upmap_items', []):
1371 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
1372 item['mappings'])
1373 osdlist = []
1374 for m in item['mappings']:
1375 osdlist += [m['from'], m['to']]
1376 result = CommandResult('foo')
1377 self.send_command(result, 'mon', '', json.dumps({
1378 'prefix': 'osd pg-upmap-items',
1379 'format': 'json',
1380 'pgid': item['pgid'],
1381 'id': osdlist,
1382 }), 'foo')
1383 commands.append(result)
1384
9f95a23c
TL
1385 for pgid in incdump.get('old_pg_upmap_items', []):
1386 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
1387 result = CommandResult('foo')
1388 self.send_command(result, 'mon', '', json.dumps({
1389 'prefix': 'osd rm-pg-upmap-items',
1390 'format': 'json',
1391 'pgid': pgid,
1392 }), 'foo')
1393 commands.append(result)
1394
3efd9988
FG
1395 # wait for commands
1396 self.log.debug('commands %s' % commands)
1397 for result in commands:
1398 r, outb, outs = result.wait()
1399 if r != 0:
94b18763
FG
1400 self.log.error('execute error: r = %d, detail = %s' % (r, outs))
1401 return r, outs
3efd9988 1402 self.log.debug('done')
94b18763 1403 return 0, ''
eafe8130 1404
20effc67 1405 def gather_telemetry(self) -> Dict[str, Any]:
eafe8130
TL
1406 return {
1407 'active': self.active,
1408 'mode': self.mode,
1409 }