]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/balancer/module.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
CommitLineData
3efd9988
FG
1"""
2Balance PG distribution across OSDs.
3"""
4
5import copy
20effc67 6import enum
3efd9988
FG
7import errno
8import json
9import math
10import random
11import time
20effc67 12from mgr_module import CLIReadCommand, CLICommand, CommandResult, MgrModule, Option, OSDMap
3efd9988 13from threading import Event
20effc67 14from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, Union
b32b8144 15from mgr_module import CRUSHMap
eafe8130 16import datetime
3efd9988 17
3efd9988
FG
18TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
19
20effc67 20
3efd9988 21class MappingState:
9f95a23c 22 def __init__(self, osdmap, raw_pg_stats, raw_pool_stats, desc=''):
3efd9988
FG
23 self.desc = desc
24 self.osdmap = osdmap
25 self.osdmap_dump = self.osdmap.dump()
26 self.crush = osdmap.get_crush()
27 self.crush_dump = self.crush.dump()
9f95a23c
TL
28 self.raw_pg_stats = raw_pg_stats
29 self.raw_pool_stats = raw_pool_stats
3efd9988 30 self.pg_stat = {
9f95a23c 31 i['pgid']: i['stat_sum'] for i in raw_pg_stats.get('pg_stats', [])
3efd9988 32 }
94b18763 33 osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
9f95a23c 34 pg_poolids = [p['poolid'] for p in raw_pool_stats.get('pool_stats', [])]
94b18763 35 self.poolids = set(osd_poolids) & set(pg_poolids)
3efd9988
FG
36 self.pg_up = {}
37 self.pg_up_by_poolid = {}
38 for poolid in self.poolids:
39 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
20effc67 40 for a, b in self.pg_up_by_poolid[poolid].items():
3efd9988
FG
41 self.pg_up[a] = b
42
43 def calc_misplaced_from(self, other_ms):
44 num = len(other_ms.pg_up)
45 misplaced = 0
f67539c2 46 for pgid, before in other_ms.pg_up.items():
3efd9988
FG
47 if before != self.pg_up.get(pgid, []):
48 misplaced += 1
49 if num > 0:
50 return float(misplaced) / float(num)
51 return 0.0
52
20effc67
TL
53
54class Mode(enum.Enum):
55 none = 'none'
56 crush_compat = 'crush-compat'
57 upmap = 'upmap'
58
59
9f95a23c
TL
60class Plan(object):
61 def __init__(self, name, mode, osdmap, pools):
3efd9988 62 self.name = name
9f95a23c
TL
63 self.mode = mode
64 self.osdmap = osdmap
65 self.osdmap_dump = osdmap.dump()
94b18763 66 self.pools = pools
3efd9988
FG
67 self.osd_weights = {}
68 self.compat_ws = {}
9f95a23c
TL
69 self.inc = osdmap.new_incremental()
70 self.pg_status = {}
71
20effc67
TL
72 def dump(self) -> str:
73 return json.dumps(self.inc.dump(), indent=4, sort_keys=True)
74
75 def show(self) -> str:
76 return 'upmap plan'
77
78
9f95a23c
TL
79class MsPlan(Plan):
80 """
81 Plan with a preloaded MappingState member.
82 """
20effc67
TL
83
84 def __init__(self, name: str, mode: str, ms: MappingState, pools: List[str]) -> None:
9f95a23c
TL
85 super(MsPlan, self).__init__(name, mode, ms.osdmap, pools)
86 self.initial = ms
3efd9988 87
20effc67 88 def final_state(self) -> MappingState:
3efd9988
FG
89 self.inc.set_osd_reweights(self.osd_weights)
90 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
91 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
9f95a23c
TL
92 self.initial.raw_pg_stats,
93 self.initial.raw_pool_stats,
3efd9988
FG
94 'plan %s final' % self.name)
95
20effc67 96 def show(self) -> str:
3efd9988
FG
97 ls = []
98 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
99 ls.append('# starting crush version %d' %
100 self.initial.osdmap.get_crush_version())
101 ls.append('# mode %s' % self.mode)
102 if len(self.compat_ws) and \
11fdf7f2 103 not CRUSHMap.have_default_choose_args(self.initial.crush_dump):
3efd9988 104 ls.append('ceph osd crush weight-set create-compat')
f67539c2 105 for osd, weight in self.compat_ws.items():
3efd9988
FG
106 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
107 (osd, weight))
f67539c2 108 for osd, weight in self.osd_weights.items():
3efd9988
FG
109 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
110 incdump = self.inc.dump()
111 for pgid in incdump.get('old_pg_upmap_items', []):
112 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
113 for item in incdump.get('new_pg_upmap_items', []):
114 osdlist = []
115 for m in item['mappings']:
116 osdlist += [m['from'], m['to']]
117 ls.append('ceph osd pg-upmap-items %s %s' %
118 (item['pgid'], ' '.join([str(a) for a in osdlist])))
119 return '\n'.join(ls)
120
121
122class Eval:
20effc67 123 def __init__(self, ms: MappingState):
3efd9988 124 self.ms = ms
20effc67
TL
125 self.root_ids: Dict[str, int] = {} # root name -> id
126 self.pool_name: Dict[str, str] = {} # pool id -> pool name
127 self.pool_id: Dict[str, int] = {} # pool name -> id
128 self.pool_roots: Dict[str, List[str]] = {} # pool name -> root name
129 self.root_pools: Dict[str, List[str]] = {} # root name -> pools
130 self.target_by_root: Dict[str, Dict[int, float]] = {} # root name -> target weight map
131 self.count_by_pool: Dict[str, dict] = {}
132 self.count_by_root: Dict[str, dict] = {}
133 self.actual_by_pool: Dict[str, dict] = {} # pool -> by_* -> actual weight map
134 self.actual_by_root: Dict[str, dict] = {} # pool -> by_* -> actual weight map
135 self.total_by_pool: Dict[str, dict] = {} # pool -> by_* -> total
136 self.total_by_root: Dict[str, dict] = {} # root -> by_* -> total
137 self.stats_by_pool: Dict[str, dict] = {} # pool -> by_* -> stddev or avg -> value
138 self.stats_by_root: Dict[str, dict] = {} # root -> by_* -> stddev or avg -> value
139
140 self.score_by_pool: Dict[str, float] = {}
141 self.score_by_root: Dict[str, Dict[str, float]] = {}
94b18763
FG
142
143 self.score = 0.0
3efd9988 144
20effc67 145 def show(self, verbose: bool = False) -> str:
3efd9988
FG
146 if verbose:
147 r = self.ms.desc + '\n'
148 r += 'target_by_root %s\n' % self.target_by_root
149 r += 'actual_by_pool %s\n' % self.actual_by_pool
150 r += 'actual_by_root %s\n' % self.actual_by_root
151 r += 'count_by_pool %s\n' % self.count_by_pool
152 r += 'count_by_root %s\n' % self.count_by_root
153 r += 'total_by_pool %s\n' % self.total_by_pool
154 r += 'total_by_root %s\n' % self.total_by_root
155 r += 'stats_by_root %s\n' % self.stats_by_root
156 r += 'score_by_pool %s\n' % self.score_by_pool
157 r += 'score_by_root %s\n' % self.score_by_root
158 else:
159 r = self.ms.desc + ' '
160 r += 'score %f (lower is better)\n' % self.score
161 return r
162
163 def calc_stats(self, count, target, total):
164 num = max(len(target), 1)
20effc67 165 r: Dict[str, Dict[str, Union[int, float]]] = {}
3efd9988 166 for t in ('pgs', 'objects', 'bytes'):
94b18763
FG
167 if total[t] == 0:
168 r[t] = {
9f95a23c
TL
169 'max': 0,
170 'min': 0,
94b18763
FG
171 'avg': 0,
172 'stddev': 0,
173 'sum_weight': 0,
174 'score': 0,
175 }
176 continue
177
3efd9988
FG
178 avg = float(total[t]) / float(num)
179 dev = 0.0
180
181 # score is a measure of how uneven the data distribution is.
182 # score lies between [0, 1), 0 means perfect distribution.
183 score = 0.0
184 sum_weight = 0.0
185
f67539c2 186 for k, v in count[t].items():
3efd9988
FG
187 # adjust/normalize by weight
188 if target[k]:
189 adjusted = float(v) / target[k] / float(num)
190 else:
191 adjusted = 0.0
192
193 # Overweighted devices and their weights are factors to calculate reweight_urgency.
194 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
195 # situation than one 10% overfilled with 5 2% underfilled devices
196 if adjusted > avg:
197 '''
198 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
199 x = (adjusted - avg)/avg.
200 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
201 To bring range of F(x) in range [0, 1), we need to make the above modification.
202
203 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
204 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
205 2. A larger value of x, should imply more urgency to reweight.
206 3. Also, the difference between F(x) when x is large, should be minimal.
207 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
208
209 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
210
211 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
212 '''
20effc67 213 score += target[k] * (math.erf(((adjusted - avg) / avg) / math.sqrt(2.0)))
3efd9988
FG
214 sum_weight += target[k]
215 dev += (avg - adjusted) * (avg - adjusted)
216 stddev = math.sqrt(dev / float(max(num - 1, 1)))
217 score = score / max(sum_weight, 1)
218 r[t] = {
9f95a23c
TL
219 'max': max(count[t].values()),
220 'min': min(count[t].values()),
3efd9988
FG
221 'avg': avg,
222 'stddev': stddev,
223 'sum_weight': sum_weight,
224 'score': score,
225 }
226 return r
227
20effc67 228
3efd9988 229class Module(MgrModule):
11fdf7f2 230 MODULE_OPTIONS = [
20effc67
TL
231 Option(name='active',
232 type='bool',
233 default=True,
234 desc='automatically balance PGs across cluster',
235 runtime=True),
236 Option(name='begin_time',
237 type='str',
238 default='0000',
239 desc='beginning time of day to automatically balance',
240 long_desc='This is a time of day in the format HHMM.',
241 runtime=True),
242 Option(name='end_time',
243 type='str',
244 default='2400',
245 desc='ending time of day to automatically balance',
246 long_desc='This is a time of day in the format HHMM.',
247 runtime=True),
248 Option(name='begin_weekday',
249 type='uint',
250 default=0,
251 min=0,
252 max=7,
253 desc='Restrict automatic balancing to this day of the week or later',
254 long_desc='0 or 7 = Sunday, 1 = Monday, etc.',
255 runtime=True),
256 Option(name='end_weekday',
257 type='uint',
258 default=7,
259 min=0,
260 max=7,
261 desc='Restrict automatic balancing to days of the week earlier than this',
262 long_desc='0 or 7 = Sunday, 1 = Monday, etc.',
263 runtime=True),
264 Option(name='crush_compat_max_iterations',
265 type='uint',
266 default=25,
267 min=1,
268 max=250,
269 desc='maximum number of iterations to attempt optimization',
270 runtime=True),
271 Option(name='crush_compat_metrics',
272 type='str',
273 default='pgs,objects,bytes',
274 desc='metrics with which to calculate OSD utilization',
275 long_desc='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
276 runtime=True),
277 Option(name='crush_compat_step',
278 type='float',
279 default=.5,
280 min=.001,
281 max=.999,
282 desc='aggressiveness of optimization',
283 long_desc='.99 is very aggressive, .01 is less aggressive',
284 runtime=True),
285 Option(name='min_score',
286 type='float',
287 default=0,
288 desc='minimum score, below which no optimization is attempted',
289 runtime=True),
290 Option(name='mode',
291 desc='Balancer mode',
292 default='upmap',
293 enum_allowed=['none', 'crush-compat', 'upmap'],
294 runtime=True),
295 Option(name='sleep_interval',
296 type='secs',
297 default=60,
298 desc='how frequently to wake up and attempt optimization',
299 runtime=True),
300 Option(name='upmap_max_optimizations',
301 type='uint',
302 default=10,
303 desc='maximum upmap optimizations to make per attempt',
304 runtime=True),
305 Option(name='upmap_max_deviation',
306 type='int',
307 default=5,
308 min=1,
309 desc='deviation below which no optimization is attempted',
310 long_desc='If the number of PGs are within this count then no optimization is attempted',
311 runtime=True),
312 Option(name='pool_ids',
313 type='str',
314 default='',
315 desc='pools which the automatic balancing will be limited to',
316 runtime=True)
11fdf7f2
TL
317 ]
318
3efd9988
FG
319 active = False
320 run = True
20effc67 321 plans: Dict[str, Plan] = {}
3efd9988 322 mode = ''
eafe8130
TL
323 optimizing = False
324 last_optimize_started = ''
325 last_optimize_duration = ''
326 optimize_result = ''
327 success_string = 'Optimization plan created successfully'
328 in_progress_string = 'in progress'
3efd9988 329
20effc67 330 def __init__(self, *args: Any, **kwargs: Any) -> None:
3efd9988
FG
331 super(Module, self).__init__(*args, **kwargs)
332 self.event = Event()
333
20effc67
TL
334 @CLIReadCommand('balancer status')
335 def show_status(self) -> Tuple[int, str, str]:
336 """
337 Show balancer status
338 """
339 s = {
340 'plans': list(self.plans.keys()),
341 'active': self.active,
342 'last_optimize_started': self.last_optimize_started,
343 'last_optimize_duration': self.last_optimize_duration,
344 'optimize_result': self.optimize_result,
345 'mode': self.get_module_option('mode'),
346 }
347 return (0, json.dumps(s, indent=4, sort_keys=True), '')
348
349 @CLICommand('balancer mode')
350 def set_mode(self, mode: Mode) -> Tuple[int, str, str]:
351 """
352 Set balancer mode
353 """
354 if mode == Mode.upmap:
355 min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
356 if min_compat_client < 'luminous': # works well because version is alphabetized..
357 warn = ('min_compat_client "%s" '
358 '< "luminous", which is required for pg-upmap. '
359 'Try "ceph osd set-require-min-compat-client luminous" '
360 'before enabling this mode' % min_compat_client)
361 return (-errno.EPERM, '', warn)
362 elif mode == Mode.crush_compat:
363 ms = MappingState(self.get_osdmap(),
364 self.get("pg_stats"),
365 self.get("pool_stats"),
366 'initialize compat weight-set')
367 self.get_compat_weight_set_weights(ms) # ignore error
368 self.set_module_option('mode', mode.value)
369 return (0, '', '')
370
371 @CLICommand('balancer on')
372 def on(self) -> Tuple[int, str, str]:
373 """
374 Enable automatic balancing
375 """
376 if not self.active:
377 self.set_module_option('active', 'true')
378 self.active = True
379 self.event.set()
380 return (0, '', '')
381
382 @CLICommand('balancer off')
383 def off(self) -> Tuple[int, str, str]:
384 """
385 Disable automatic balancing
386 """
387 if self.active:
388 self.set_module_option('active', 'false')
389 self.active = False
390 self.event.set()
391 return (0, '', '')
392
393 @CLIReadCommand('balancer pool ls')
394 def pool_ls(self) -> Tuple[int, str, str]:
395 """
396 List automatic balancing pools
397
398 Note that empty list means all existing pools will be automatic balancing targets,
399 which is the default behaviour of balancer.
400 """
401 pool_ids = cast(str, self.get_module_option('pool_ids'))
402 if pool_ids == '':
11fdf7f2 403 return (0, '', '')
20effc67
TL
404 pool_ids = [int(p) for p in pool_ids.split(',')]
405 pool_name_by_id = dict((p['pool'], p['pool_name'])
406 for p in self.get_osdmap().dump().get('pools', []))
407 should_prune = False
408 final_ids: List[int] = []
409 final_names = []
410 for p in pool_ids:
411 if p in pool_name_by_id:
412 final_ids.append(p)
413 final_names.append(pool_name_by_id[p])
3efd9988 414 else:
20effc67
TL
415 should_prune = True
416 if should_prune: # some pools were gone, prune
417 self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids))
418 return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
419
420 @CLICommand('balancer pool add')
421 def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]:
422 """
423 Enable automatic balancing for specific pools
424 """
425 raw_names = pools
426 pool_id_by_name = dict((p['pool_name'], p['pool'])
427 for p in self.get_osdmap().dump().get('pools', []))
428 invalid_names = [p for p in raw_names if p not in pool_id_by_name]
429 if invalid_names:
430 return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
431 to_add = set(str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name)
432 pool_ids = cast(str, self.get_module_option('pool_ids'))
433 existing = set(pool_ids.split(',') if pool_ids else [])
434 final = to_add | existing
435 self.set_module_option('pool_ids', ','.join(final))
436 return (0, '', '')
437
438 @CLICommand('balancer pool rm')
439 def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]:
440 """
441 Disable automatic balancing for specific pools
442 """
443 raw_names = pools
444 existing = cast(str, self.get_module_option('pool_ids'))
445 if existing == '': # for idempotence
446 return (0, '', '')
447 existing = existing.split(',')
448 osdmap = self.get_osdmap()
449 pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
450 pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
451 final = [p for p in existing if p in pool_ids]
452 to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
453 final = set(final) - set(to_delete)
454 self.set_module_option('pool_ids', ','.join(final))
455 return (0, '', '')
456
457 def _state_from_option(self, option: Optional[str] = None) -> Tuple[MappingState, List[str]]:
458 pools = []
459 if option is None:
460 ms = MappingState(self.get_osdmap(),
461 self.get("pg_stats"),
462 self.get("pool_stats"),
463 'current cluster')
464 elif option in self.plans:
465 plan = self.plans.get(option)
466 assert plan
467 pools = plan.pools
468 if plan.mode == 'upmap':
469 # Note that for upmap, to improve the efficiency,
470 # we use a basic version of Plan without keeping the obvious
471 # *redundant* MS member.
472 # Hence ms might not be accurate here since we are basically
473 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
474 # It should not be a big deal though..
475 ms = MappingState(plan.osdmap,
9f95a23c
TL
476 self.get("pg_stats"),
477 self.get("pool_stats"),
20effc67
TL
478 f'plan "{plan.name}"')
479 else:
480 ms = cast(MsPlan, plan).final_state()
481 else:
482 # not a plan, does it look like a pool?
94b18763
FG
483 osdmap = self.get_osdmap()
484 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
20effc67
TL
485 if option not in valid_pool_names:
486 raise ValueError(f'option "{option}" not a plan or a pool')
487 pools.append(option)
488 ms = MappingState(osdmap,
489 self.get("pg_stats"),
490 self.get("pool_stats"),
491 f'pool "{option}"')
492 return ms, pools
493
494 @CLIReadCommand('balancer eval-verbose')
495 def plan_eval_verbose(self, option: Optional[str] = None):
496 """
497 Evaluate data distribution for the current cluster or specific pool or specific
498 plan (verbosely)
499 """
500 try:
501 ms, pools = self._state_from_option(option)
502 return (0, self.evaluate(ms, pools, verbose=True), '')
503 except ValueError as e:
504 return (-errno.EINVAL, '', str(e))
505
506 @CLIReadCommand('balancer eval')
507 def plan_eval_brief(self, option: Optional[str] = None):
508 """
509 Evaluate data distribution for the current cluster or specific pool or specific plan
510 """
511 try:
512 ms, pools = self._state_from_option(option)
513 return (0, self.evaluate(ms, pools, verbose=False), '')
514 except ValueError as e:
515 return (-errno.EINVAL, '', str(e))
516
517 @CLIReadCommand('balancer optimize')
518 def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]:
519 """
520 Run optimizer to create a new plan
521 """
522 # The GIL can be release by the active balancer, so disallow when active
523 if self.active:
524 return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
525 if self.optimizing:
526 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
527 osdmap = self.get_osdmap()
528 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
529 invalid_pool_names = []
530 for p in pools:
531 if p not in valid_pool_names:
532 invalid_pool_names.append(p)
533 if len(invalid_pool_names):
534 return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
535 plan_ = self.plan_create(plan, osdmap, pools)
536 self.last_optimize_started = time.asctime(time.localtime())
537 self.optimize_result = self.in_progress_string
538 start = time.time()
539 r, detail = self.optimize(plan_)
540 end = time.time()
541 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
542 if r == 0:
543 # Add plan if an optimization was created
544 self.optimize_result = self.success_string
545 self.plans[plan] = plan_
3efd9988 546 else:
20effc67
TL
547 self.optimize_result = detail
548 return (r, '', detail)
549
550 @CLIReadCommand('balancer show')
551 def plan_show(self, plan: str) -> Tuple[int, str, str]:
552 """
553 Show details of an optimization plan
554 """
555 plan_ = self.plans.get(plan)
556 if not plan_:
557 return (-errno.ENOENT, '', f'plan {plan} not found')
558 return (0, plan_.show(), '')
559
560 @CLICommand('balancer rm')
561 def plan_rm(self, plan: str) -> Tuple[int, str, str]:
562 """
563 Discard an optimization plan
564 """
565 if plan in self.plans:
566 del self.plans[plan]
567 return (0, '', '')
568
569 @CLICommand('balancer reset')
570 def plan_reset(self) -> Tuple[int, str, str]:
571 """
572 Discard all optimization plans
573 """
574 self.plans = {}
575 return (0, '', '')
576
577 @CLIReadCommand('balancer dump')
578 def plan_dump(self, plan: str) -> Tuple[int, str, str]:
579 """
580 Show an optimization plan
581 """
582 plan_ = self.plans.get(plan)
583 if not plan_:
584 return -errno.ENOENT, '', f'plan {plan} not found'
585 else:
586 return (0, plan_.dump(), '')
587
588 @CLIReadCommand('balancer ls')
589 def plan_ls(self) -> Tuple[int, str, str]:
590 """
591 List all plans
592 """
593 return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
594
595 @CLIReadCommand('balancer execute')
596 def plan_execute(self, plan: str) -> Tuple[int, str, str]:
597 """
598 Execute an optimization plan
599 """
600 # The GIL can be release by the active balancer, so disallow when active
601 if self.active:
602 return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
603 if self.optimizing:
604 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
605 plan_ = self.plans.get(plan)
606 if not plan_:
607 return (-errno.ENOENT, '', f'plan {plan} not found')
608 r, detail = self.execute(plan_)
609 self.plan_rm(plan)
610 return (r, '', detail)
611
612 def shutdown(self) -> None:
3efd9988
FG
613 self.log.info('Stopping')
614 self.run = False
615 self.event.set()
616
20effc67 617 def time_permit(self) -> bool:
a8e16298
TL
618 local_time = time.localtime()
619 time_of_day = time.strftime('%H%M', local_time)
20effc67 620 weekday = (local_time.tm_wday + 1) % 7 # be compatible with C
a8e16298
TL
621 permit = False
622
20effc67
TL
623 begin_time = cast(str, self.get_module_option('begin_time'))
624 end_time = cast(str, self.get_module_option('end_time'))
a8e16298
TL
625 if begin_time <= end_time:
626 permit = begin_time <= time_of_day < end_time
627 else:
628 permit = time_of_day >= begin_time or time_of_day < end_time
629 if not permit:
630 self.log.debug("should run between %s - %s, now %s, skipping",
631 begin_time, end_time, time_of_day)
632 return False
633
20effc67
TL
634 begin_weekday = cast(int, self.get_module_option('begin_weekday'))
635 end_weekday = cast(int, self.get_module_option('end_weekday'))
a8e16298
TL
636 if begin_weekday <= end_weekday:
637 permit = begin_weekday <= weekday < end_weekday
3efd9988 638 else:
a8e16298
TL
639 permit = weekday >= begin_weekday or weekday < end_weekday
640 if not permit:
641 self.log.debug("should run between weekday %d - %d, now %d, skipping",
642 begin_weekday, end_weekday, weekday)
643 return False
644
645 return True
3efd9988 646
20effc67 647 def serve(self) -> None:
3efd9988
FG
648 self.log.info('Starting')
649 while self.run:
20effc67
TL
650 self.active = cast(bool, self.get_module_option('active'))
651 sleep_interval = cast(float, self.get_module_option('sleep_interval'))
a8e16298
TL
652 self.log.debug('Waking up [%s, now %s]',
653 "active" if self.active else "inactive",
654 time.strftime(TIME_FORMAT, time.localtime()))
655 if self.active and self.time_permit():
3efd9988
FG
656 self.log.debug('Running')
657 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
11fdf7f2 658 osdmap = self.get_osdmap()
20effc67
TL
659 pool_ids = cast(str, self.get_module_option('pool_ids'))
660 if pool_ids:
661 allow = [int(p) for p in pool_ids.split(',')]
662 else:
663 allow = []
664 final: List[str] = []
665 if allow:
666 pools = osdmap.dump().get('pools', [])
667 valid = [p['pool'] for p in pools]
668 ids = set(allow) & set(valid)
669 if set(allow) - set(valid): # some pools were gone, prune
670 self.set_module_option('pool_ids', ','.join(str(p) for p in ids))
671 pool_name_by_id = dict((p['pool'], p['pool_name']) for p in pools)
672 final = [pool_name_by_id[p] for p in ids if p in pool_name_by_id]
11fdf7f2 673 plan = self.plan_create(name, osdmap, final)
eafe8130
TL
674 self.optimizing = True
675 self.last_optimize_started = time.asctime(time.localtime())
676 self.optimize_result = self.in_progress_string
677 start = time.time()
94b18763 678 r, detail = self.optimize(plan)
eafe8130
TL
679 end = time.time()
680 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
94b18763 681 if r == 0:
eafe8130 682 self.optimize_result = self.success_string
3efd9988 683 self.execute(plan)
eafe8130
TL
684 else:
685 self.optimize_result = detail
686 self.optimizing = False
3efd9988
FG
687 self.log.debug('Sleeping for %d', sleep_interval)
688 self.event.wait(sleep_interval)
689 self.event.clear()
690
20effc67
TL
691 def plan_create(self, name: str, osdmap: OSDMap, pools: List[str]) -> Plan:
692 mode = cast(str, self.get_module_option('mode'))
9f95a23c
TL
693 if mode == 'upmap':
694 # drop unnecessary MS member for upmap mode.
695 # this way we could effectively eliminate the usage of a
696 # complete pg_stats, which can become horribly inefficient
697 # as pg_num grows..
698 plan = Plan(name, mode, osdmap, pools)
699 else:
700 plan = MsPlan(name,
701 mode,
702 MappingState(osdmap,
703 self.get("pg_stats"),
704 self.get("pool_stats"),
705 'plan %s initial' % name),
706 pools)
3efd9988
FG
707 return plan
708
20effc67 709 def calc_eval(self, ms: MappingState, pools: List[str]) -> Eval:
3efd9988
FG
710 pe = Eval(ms)
711 pool_rule = {}
712 pool_info = {}
20effc67 713 for p in ms.osdmap_dump.get('pools', []):
94b18763
FG
714 if len(pools) and p['pool_name'] not in pools:
715 continue
716 # skip dead or not-yet-ready pools too
717 if p['pool'] not in ms.poolids:
718 continue
3efd9988
FG
719 pe.pool_name[p['pool']] = p['pool_name']
720 pe.pool_id[p['pool_name']] = p['pool']
721 pool_rule[p['pool_name']] = p['crush_rule']
722 pe.pool_roots[p['pool_name']] = []
723 pool_info[p['pool_name']] = p
94b18763 724 if len(pool_info) == 0:
3efd9988
FG
725 return pe
726 self.log.debug('pool_name %s' % pe.pool_name)
727 self.log.debug('pool_id %s' % pe.pool_id)
728 self.log.debug('pools %s' % pools)
729 self.log.debug('pool_rule %s' % pool_rule)
730
20effc67
TL
731 osd_weight = {a['osd']: a['weight']
732 for a in ms.osdmap_dump.get('osds', []) if a['weight'] > 0}
3efd9988
FG
733
734 # get expected distributions by root
20effc67 735 actual_by_root: Dict[str, Dict[str, dict]] = {}
3efd9988
FG
736 rootids = ms.crush.find_takes()
737 roots = []
738 for rootid in rootids:
3efd9988 739 ls = ms.osdmap.get_pools_by_take(rootid)
94b18763
FG
740 want = []
741 # find out roots associating with pools we are passed in
742 for candidate in ls:
743 if candidate in pe.pool_name:
744 want.append(candidate)
745 if len(want) == 0:
746 continue
747 root = ms.crush.get_item_name(rootid)
3efd9988 748 pe.root_pools[root] = []
94b18763 749 for poolid in want:
3efd9988
FG
750 pe.pool_roots[pe.pool_name[poolid]].append(root)
751 pe.root_pools[root].append(pe.pool_name[poolid])
94b18763
FG
752 pe.root_ids[root] = rootid
753 roots.append(root)
3efd9988
FG
754 weight_map = ms.crush.get_take_weight_osd_map(rootid)
755 adjusted_map = {
94b18763 756 osd: cw * osd_weight[osd]
20effc67 757 for osd, cw in weight_map.items() if osd in osd_weight and cw > 0
3efd9988 758 }
94b18763
FG
759 sum_w = sum(adjusted_map.values())
760 assert len(adjusted_map) == 0 or sum_w > 0
20effc67
TL
761 pe.target_by_root[root] = {osd: w / sum_w
762 for osd, w in adjusted_map.items()}
3efd9988
FG
763 actual_by_root[root] = {
764 'pgs': {},
765 'objects': {},
766 'bytes': {},
767 }
11fdf7f2 768 for osd in pe.target_by_root[root]:
3efd9988
FG
769 actual_by_root[root]['pgs'][osd] = 0
770 actual_by_root[root]['objects'][osd] = 0
771 actual_by_root[root]['bytes'][osd] = 0
772 pe.total_by_root[root] = {
773 'pgs': 0,
774 'objects': 0,
775 'bytes': 0,
776 }
777 self.log.debug('pool_roots %s' % pe.pool_roots)
778 self.log.debug('root_pools %s' % pe.root_pools)
779 self.log.debug('target_by_root %s' % pe.target_by_root)
780
781 # pool and root actual
f67539c2 782 for pool, pi in pool_info.items():
3efd9988
FG
783 poolid = pi['pool']
784 pm = ms.pg_up_by_poolid[poolid]
785 pgs = 0
786 objects = 0
787 bytes = 0
788 pgs_by_osd = {}
789 objects_by_osd = {}
790 bytes_by_osd = {}
f67539c2 791 for pgid, up in pm.items():
3efd9988 792 for osd in [int(osd) for osd in up]:
b32b8144
FG
793 if osd == CRUSHMap.ITEM_NONE:
794 continue
9f95a23c
TL
795 if osd not in pgs_by_osd:
796 pgs_by_osd[osd] = 0
797 objects_by_osd[osd] = 0
798 bytes_by_osd[osd] = 0
3efd9988
FG
799 pgs_by_osd[osd] += 1
800 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
801 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
802 # pick a root to associate this pg instance with.
803 # note that this is imprecise if the roots have
804 # overlapping children.
805 # FIXME: divide bytes by k for EC pools.
806 for root in pe.pool_roots[pool]:
807 if osd in pe.target_by_root[root]:
808 actual_by_root[root]['pgs'][osd] += 1
809 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
810 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
811 pgs += 1
812 objects += ms.pg_stat[pgid]['num_objects']
813 bytes += ms.pg_stat[pgid]['num_bytes']
814 pe.total_by_root[root]['pgs'] += 1
815 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
816 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
817 break
818 pe.count_by_pool[pool] = {
819 'pgs': {
820 k: v
f67539c2 821 for k, v in pgs_by_osd.items()
3efd9988
FG
822 },
823 'objects': {
824 k: v
f67539c2 825 for k, v in objects_by_osd.items()
3efd9988
FG
826 },
827 'bytes': {
828 k: v
f67539c2 829 for k, v in bytes_by_osd.items()
3efd9988
FG
830 },
831 }
832 pe.actual_by_pool[pool] = {
833 'pgs': {
834 k: float(v) / float(max(pgs, 1))
f67539c2 835 for k, v in pgs_by_osd.items()
3efd9988
FG
836 },
837 'objects': {
838 k: float(v) / float(max(objects, 1))
f67539c2 839 for k, v in objects_by_osd.items()
3efd9988
FG
840 },
841 'bytes': {
842 k: float(v) / float(max(bytes, 1))
f67539c2 843 for k, v in bytes_by_osd.items()
3efd9988
FG
844 },
845 }
846 pe.total_by_pool[pool] = {
847 'pgs': pgs,
848 'objects': objects,
849 'bytes': bytes,
850 }
11fdf7f2 851 for root in pe.total_by_root:
3efd9988
FG
852 pe.count_by_root[root] = {
853 'pgs': {
854 k: float(v)
f67539c2 855 for k, v in actual_by_root[root]['pgs'].items()
3efd9988
FG
856 },
857 'objects': {
858 k: float(v)
f67539c2 859 for k, v in actual_by_root[root]['objects'].items()
3efd9988
FG
860 },
861 'bytes': {
862 k: float(v)
f67539c2 863 for k, v in actual_by_root[root]['bytes'].items()
3efd9988
FG
864 },
865 }
866 pe.actual_by_root[root] = {
867 'pgs': {
868 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
f67539c2 869 for k, v in actual_by_root[root]['pgs'].items()
3efd9988
FG
870 },
871 'objects': {
872 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
f67539c2 873 for k, v in actual_by_root[root]['objects'].items()
3efd9988
FG
874 },
875 'bytes': {
876 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
f67539c2 877 for k, v in actual_by_root[root]['bytes'].items()
3efd9988
FG
878 },
879 }
880 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
881 self.log.debug('actual_by_root %s' % pe.actual_by_root)
882
883 # average and stddev and score
884 pe.stats_by_root = {
885 a: pe.calc_stats(
886 b,
887 pe.target_by_root[a],
888 pe.total_by_root[a]
f67539c2 889 ) for a, b in pe.count_by_root.items()
3efd9988 890 }
94b18763 891 self.log.debug('stats_by_root %s' % pe.stats_by_root)
3efd9988 892
20effc67 893 # the scores are already normalized
3efd9988
FG
894 pe.score_by_root = {
895 r: {
896 'pgs': pe.stats_by_root[r]['pgs']['score'],
897 'objects': pe.stats_by_root[r]['objects']['score'],
898 'bytes': pe.stats_by_root[r]['bytes']['score'],
899 } for r in pe.total_by_root.keys()
900 }
94b18763 901 self.log.debug('score_by_root %s' % pe.score_by_root)
3efd9988 902
f64942e4 903 # get the list of score metrics, comma separated
20effc67 904 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
f64942e4 905
3efd9988
FG
906 # total score is just average of normalized stddevs
907 pe.score = 0.0
f67539c2
TL
908 for r, vs in pe.score_by_root.items():
909 for k, v in vs.items():
f64942e4
AA
910 if k in metrics:
911 pe.score += v
912 pe.score /= len(metrics) * len(roots)
3efd9988
FG
913 return pe
914
20effc67 915 def evaluate(self, ms: MappingState, pools: List[str], verbose: bool = False) -> str:
94b18763 916 pe = self.calc_eval(ms, pools)
3efd9988
FG
917 return pe.show(verbose=verbose)
918
20effc67 919 def optimize(self, plan: Plan) -> Tuple[int, str]:
3efd9988 920 self.log.info('Optimize plan %s' % plan.name)
20effc67 921 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
3efd9988
FG
922 self.log.info('Mode %s, max misplaced %f' %
923 (plan.mode, max_misplaced))
924
925 info = self.get('pg_status')
926 unknown = info.get('unknown_pgs_ratio', 0.0)
927 degraded = info.get('degraded_ratio', 0.0)
928 inactive = info.get('inactive_pgs_ratio', 0.0)
929 misplaced = info.get('misplaced_ratio', 0.0)
9f95a23c 930 plan.pg_status = info
3efd9988
FG
931 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
932 unknown, degraded, inactive, misplaced)
933 if unknown > 0.0:
94b18763
FG
934 detail = 'Some PGs (%f) are unknown; try again later' % unknown
935 self.log.info(detail)
936 return -errno.EAGAIN, detail
3efd9988 937 elif degraded > 0.0:
94b18763
FG
938 detail = 'Some objects (%f) are degraded; try again later' % degraded
939 self.log.info(detail)
940 return -errno.EAGAIN, detail
3efd9988 941 elif inactive > 0.0:
94b18763
FG
942 detail = 'Some PGs (%f) are inactive; try again later' % inactive
943 self.log.info(detail)
944 return -errno.EAGAIN, detail
3efd9988 945 elif misplaced >= max_misplaced:
94b18763
FG
946 detail = 'Too many objects (%f > %f) are misplaced; ' \
947 'try again later' % (misplaced, max_misplaced)
948 self.log.info(detail)
949 return -errno.EAGAIN, detail
3efd9988
FG
950 else:
951 if plan.mode == 'upmap':
952 return self.do_upmap(plan)
953 elif plan.mode == 'crush-compat':
20effc67 954 return self.do_crush_compat(cast(MsPlan, plan))
3efd9988 955 elif plan.mode == 'none':
94b18763 956 detail = 'Please do "ceph balancer mode" to choose a valid mode first'
3efd9988 957 self.log.info('Idle')
94b18763 958 return -errno.ENOEXEC, detail
3efd9988 959 else:
94b18763
FG
960 detail = 'Unrecognized mode %s' % plan.mode
961 self.log.info(detail)
962 return -errno.EINVAL, detail
3efd9988 963
20effc67 964 def do_upmap(self, plan: Plan) -> Tuple[int, str]:
3efd9988 965 self.log.info('do_upmap')
20effc67
TL
966 max_optimizations = cast(float, self.get_module_option('upmap_max_optimizations'))
967 max_deviation = cast(int, self.get_module_option('upmap_max_deviation'))
9f95a23c 968 osdmap_dump = plan.osdmap_dump
3efd9988 969
94b18763
FG
970 if len(plan.pools):
971 pools = plan.pools
20effc67
TL
972 else: # all
973 pools = [str(i['pool_name']) for i in osdmap_dump.get('pools', [])]
3efd9988 974 if len(pools) == 0:
94b18763
FG
975 detail = 'No pools available'
976 self.log.info(detail)
977 return -errno.ENOENT, detail
92f5a8d4
TL
978 # shuffle pool list so they all get equal (in)attention
979 random.shuffle(pools)
980 self.log.info('pools %s' % pools)
3efd9988 981
92f5a8d4 982 adjusted_pools = []
3efd9988
FG
983 inc = plan.inc
984 total_did = 0
9f95a23c 985 left = max_optimizations
11fdf7f2
TL
986 pools_with_pg_merge = [p['pool_name'] for p in osdmap_dump.get('pools', [])
987 if p['pg_num'] > p['pg_num_target']]
20effc67
TL
988 crush_rule_by_pool_name = dict((p['pool_name'], p['crush_rule'])
989 for p in osdmap_dump.get('pools', []))
3efd9988 990 for pool in pools:
11fdf7f2
TL
991 if pool not in crush_rule_by_pool_name:
992 self.log.info('pool %s does not exist' % pool)
993 continue
994 if pool in pools_with_pg_merge:
995 self.log.info('pool %s has pending PG(s) for merging, skipping for now' % pool)
996 continue
92f5a8d4 997 adjusted_pools.append(pool)
11fdf7f2 998 # shuffle so all pools get equal (in)attention
92f5a8d4 999 random.shuffle(adjusted_pools)
9f95a23c 1000 pool_dump = osdmap_dump.get('pools', [])
92f5a8d4 1001 for pool in adjusted_pools:
9f95a23c
TL
1002 for p in pool_dump:
1003 if p['pool_name'] == pool:
9f95a23c
TL
1004 pool_id = p['pool']
1005 break
1006
1007 # note that here we deliberately exclude any scrubbing pgs too
1008 # since scrubbing activities have significant impacts on performance
1009 num_pg_active_clean = 0
1010 for p in plan.pg_status.get('pgs_by_pool_state', []):
1011 pgs_pool_id = p['pool_id']
1012 if pgs_pool_id != pool_id:
1013 continue
1014 for s in p['pg_state_counts']:
1015 if s['state_name'] == 'active+clean':
1016 num_pg_active_clean += s['count']
1017 break
adb31ebb 1018 available = min(left, num_pg_active_clean)
9f95a23c 1019 did = plan.osdmap.calc_pg_upmaps(inc, max_deviation, available, [pool])
3efd9988
FG
1020 total_did += did
1021 left -= did
1022 if left <= 0:
1023 break
9f95a23c 1024 self.log.info('prepared %d/%d changes' % (total_did, max_optimizations))
94b18763 1025 if total_did == 0:
11fdf7f2 1026 return -errno.EALREADY, 'Unable to find further optimization, ' \
92f5a8d4 1027 'or pool(s) pg_num is decreasing, ' \
94b18763
FG
1028 'or distribution is already perfect'
1029 return 0, ''
3efd9988 1030
20effc67 1031 def do_crush_compat(self, plan: MsPlan) -> Tuple[int, str]:
3efd9988 1032 self.log.info('do_crush_compat')
20effc67 1033 max_iterations = cast(int, self.get_module_option('crush_compat_max_iterations'))
3efd9988 1034 if max_iterations < 1:
94b18763 1035 return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
20effc67 1036 step = cast(float, self.get_module_option('crush_compat_step'))
3efd9988 1037 if step <= 0 or step >= 1.0:
94b18763 1038 return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
20effc67 1039 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
3efd9988
FG
1040 min_pg_per_osd = 2
1041
1042 ms = plan.initial
1043 osdmap = ms.osdmap
1044 crush = osdmap.get_crush()
94b18763 1045 pe = self.calc_eval(ms, plan.pools)
20effc67 1046 min_score_to_optimize = cast(float, self.get_module_option('min_score'))
94b18763
FG
1047 if pe.score <= min_score_to_optimize:
1048 if pe.score == 0:
1049 detail = 'Distribution is already perfect'
1050 else:
1051 detail = 'score %f <= min_score %f, will not optimize' \
1052 % (pe.score, min_score_to_optimize)
1053 self.log.info(detail)
1054 return -errno.EALREADY, detail
3efd9988
FG
1055
1056 # get current osd reweights
20effc67
TL
1057 orig_osd_weight = {a['osd']: a['weight']
1058 for a in ms.osdmap_dump.get('osds', [])}
3efd9988
FG
1059
1060 # get current compat weight-set weights
94b18763
FG
1061 orig_ws = self.get_compat_weight_set_weights(ms)
1062 if not orig_ws:
1063 return -errno.EAGAIN, 'compat weight-set not available'
20effc67 1064 orig_ws = {a: b for a, b in orig_ws.items() if a >= 0}
3efd9988
FG
1065
1066 # Make sure roots don't overlap their devices. If so, we
1067 # can't proceed.
eafe8130 1068 roots = list(pe.target_by_root.keys())
3efd9988 1069 self.log.debug('roots %s', roots)
20effc67
TL
1070 visited: Dict[int, str] = {}
1071 overlap: Dict[int, List[str]] = {}
f67539c2 1072 for root, wm in pe.target_by_root.items():
11fdf7f2 1073 for osd in wm:
3efd9988 1074 if osd in visited:
11fdf7f2 1075 if osd not in overlap:
20effc67 1076 overlap[osd] = [visited[osd]]
11fdf7f2
TL
1077 overlap[osd].append(root)
1078 visited[osd] = root
3efd9988 1079 if len(overlap) > 0:
94b18763 1080 detail = 'Some osds belong to multiple subtrees: %s' % \
11fdf7f2 1081 overlap
94b18763
FG
1082 self.log.error(detail)
1083 return -errno.EOPNOTSUPP, detail
3efd9988 1084
f64942e4 1085 # rebalance by pgs, objects, or bytes
20effc67
TL
1086 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
1087 key = metrics[0] # balancing using the first score metric
f64942e4 1088 if key not in ['pgs', 'bytes', 'objects']:
e306af50 1089 self.log.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
f64942e4 1090 key = 'pgs'
3efd9988
FG
1091
1092 # go
1093 best_ws = copy.deepcopy(orig_ws)
1094 best_ow = copy.deepcopy(orig_osd_weight)
1095 best_pe = pe
1096 left = max_iterations
1097 bad_steps = 0
1098 next_ws = copy.deepcopy(best_ws)
1099 next_ow = copy.deepcopy(best_ow)
1100 while left > 0:
1101 # adjust
1102 self.log.debug('best_ws %s' % best_ws)
1103 random.shuffle(roots)
1104 for root in roots:
1105 pools = best_pe.root_pools[root]
94b18763
FG
1106 osds = len(best_pe.target_by_root[root])
1107 min_pgs = osds * min_pg_per_osd
1108 if best_pe.total_by_root[root][key] < min_pgs:
3efd9988
FG
1109 self.log.info('Skipping root %s (pools %s), total pgs %d '
1110 '< minimum %d (%d per osd)',
94b18763
FG
1111 root, pools,
1112 best_pe.total_by_root[root][key],
1113 min_pgs, min_pg_per_osd)
3efd9988
FG
1114 continue
1115 self.log.info('Balancing root %s (pools %s) by %s' %
1116 (root, pools, key))
1117 target = best_pe.target_by_root[root]
1118 actual = best_pe.actual_by_root[root][key]
1119 queue = sorted(actual.keys(),
1120 key=lambda osd: -abs(target[osd] - actual[osd]))
1121 for osd in queue:
1122 if orig_osd_weight[osd] == 0:
1123 self.log.debug('skipping out osd.%d', osd)
1124 else:
1125 deviation = target[osd] - actual[osd]
1126 if deviation == 0:
1127 break
1128 self.log.debug('osd.%d deviation %f', osd, deviation)
1129 weight = best_ws[osd]
1130 ow = orig_osd_weight[osd]
1131 if actual[osd] > 0:
1132 calc_weight = target[osd] / actual[osd] * weight * ow
1133 else:
81eedcae
TL
1134 # for newly created osds, reset calc_weight at target value
1135 # this way weight-set will end up absorbing *step* of its
1136 # target (final) value at the very beginning and slowly catch up later.
1137 # note that if this turns out causing too many misplaced
1138 # pgs, then we'll reduce step and retry
1139 calc_weight = target[osd]
3efd9988
FG
1140 new_weight = weight * (1.0 - step) + calc_weight * step
1141 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
1142 new_weight)
1143 next_ws[osd] = new_weight
1144 if ow < 1.0:
1145 new_ow = min(1.0, max(step + (1.0 - step) * ow,
1146 ow + .005))
1147 self.log.debug('Reweight osd.%d reweight %f -> %f',
1148 osd, ow, new_ow)
1149 next_ow[osd] = new_ow
1150
1151 # normalize weights under this root
1152 root_weight = crush.get_item_weight(pe.root_ids[root])
20effc67 1153 root_sum = sum(b for a, b in next_ws.items()
3efd9988
FG
1154 if a in target.keys())
1155 if root_sum > 0 and root_weight > 0:
1156 factor = root_sum / root_weight
1157 self.log.debug('normalizing root %s %d, weight %f, '
1158 'ws sum %f, factor %f',
1159 root, pe.root_ids[root], root_weight,
1160 root_sum, factor)
1161 for osd in actual.keys():
1162 next_ws[osd] = next_ws[osd] / factor
1163
1164 # recalc
1165 plan.compat_ws = copy.deepcopy(next_ws)
1166 next_ms = plan.final_state()
94b18763 1167 next_pe = self.calc_eval(next_ms, plan.pools)
3efd9988
FG
1168 next_misplaced = next_ms.calc_misplaced_from(ms)
1169 self.log.debug('Step result score %f -> %f, misplacing %f',
1170 best_pe.score, next_pe.score, next_misplaced)
1171
1172 if next_misplaced > max_misplaced:
1173 if best_pe.score < pe.score:
1174 self.log.debug('Step misplaced %f > max %f, stopping',
1175 next_misplaced, max_misplaced)
1176 break
1177 step /= 2.0
1178 next_ws = copy.deepcopy(best_ws)
1179 next_ow = copy.deepcopy(best_ow)
1180 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
1181 next_misplaced, max_misplaced, step)
1182 else:
1183 if next_pe.score > best_pe.score * 1.0001:
94b18763 1184 bad_steps += 1
3efd9988
FG
1185 if bad_steps < 5 and random.randint(0, 100) < 70:
1186 self.log.debug('Score got worse, taking another step')
1187 else:
1188 step /= 2.0
1189 next_ws = copy.deepcopy(best_ws)
1190 next_ow = copy.deepcopy(best_ow)
1191 self.log.debug('Score got worse, trying smaller step %f',
1192 step)
1193 else:
1194 bad_steps = 0
1195 best_pe = next_pe
91327a77
AA
1196 best_ws = copy.deepcopy(next_ws)
1197 best_ow = copy.deepcopy(next_ow)
3efd9988
FG
1198 if best_pe.score == 0:
1199 break
1200 left -= 1
1201
1202 # allow a small regression if we are phasing out osd weights
20effc67 1203 fudge = 0.0
81eedcae 1204 if best_ow != orig_osd_weight:
3efd9988
FG
1205 fudge = .001
1206
1207 if best_pe.score < pe.score + fudge:
1208 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
1209 plan.compat_ws = best_ws
f67539c2 1210 for osd, w in best_ow.items():
3efd9988
FG
1211 if w != orig_osd_weight[osd]:
1212 self.log.debug('osd.%d reweight %f', osd, w)
1213 plan.osd_weights[osd] = w
94b18763 1214 return 0, ''
3efd9988
FG
1215 else:
1216 self.log.info('Failed to find further optimization, score %f',
1217 pe.score)
94b18763
FG
1218 plan.compat_ws = {}
1219 return -errno.EDOM, 'Unable to find further optimization, ' \
1220 'change balancer mode and retry might help'
1221
20effc67 1222 def get_compat_weight_set_weights(self, ms: MappingState):
f67539c2
TL
1223 have_choose_args = CRUSHMap.have_default_choose_args(ms.crush_dump)
1224 if have_choose_args:
1225 # get number of buckets in choose_args
1226 choose_args_len = len(CRUSHMap.get_default_choose_args(ms.crush_dump))
1227 if not have_choose_args or choose_args_len != len(ms.crush_dump['buckets']):
94b18763 1228 # enable compat weight-set first
f67539c2 1229 self.log.debug('no choose_args or all buckets do not have weight-sets')
94b18763
FG
1230 self.log.debug('ceph osd crush weight-set create-compat')
1231 result = CommandResult('')
1232 self.send_command(result, 'mon', '', json.dumps({
1233 'prefix': 'osd crush weight-set create-compat',
1234 'format': 'json',
1235 }), '')
1236 r, outb, outs = result.wait()
1237 if r != 0:
1238 self.log.error('Error creating compat weight-set')
1239 return
1240
1241 result = CommandResult('')
1242 self.send_command(result, 'mon', '', json.dumps({
1243 'prefix': 'osd crush dump',
1244 'format': 'json',
1245 }), '')
1246 r, outb, outs = result.wait()
1247 if r != 0:
1248 self.log.error('Error dumping crush map')
1249 return
1250 try:
1251 crushmap = json.loads(outb)
20effc67 1252 except json.JSONDecodeError:
94b18763
FG
1253 raise RuntimeError('unable to parse crush map')
1254 else:
1255 crushmap = ms.crush_dump
3efd9988 1256
11fdf7f2 1257 raw = CRUSHMap.get_default_choose_args(crushmap)
3efd9988
FG
1258 weight_set = {}
1259 for b in raw:
1260 bucket = None
1261 for t in crushmap['buckets']:
1262 if t['id'] == b['bucket_id']:
1263 bucket = t
1264 break
1265 if not bucket:
1266 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
1267 self.log.debug('bucket items %s' % bucket['items'])
1268 self.log.debug('weight set %s' % b['weight_set'][0])
1269 if len(bucket['items']) != len(b['weight_set'][0]):
1270 raise RuntimeError('weight-set size does not match bucket items')
1271 for pos in range(len(bucket['items'])):
1272 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
1273
1274 self.log.debug('weight_set weights %s' % weight_set)
1275 return weight_set
1276
20effc67 1277 def do_crush(self) -> None:
3efd9988
FG
1278 self.log.info('do_crush (not yet implemented)')
1279
20effc67 1280 def do_osd_weight(self) -> None:
3efd9988
FG
1281 self.log.info('do_osd_weight (not yet implemented)')
1282
20effc67 1283 def execute(self, plan: Plan) -> Tuple[int, str]:
3efd9988
FG
1284 self.log.info('Executing plan %s' % plan.name)
1285
1286 commands = []
1287
1288 # compat weight-set
20effc67
TL
1289 if len(plan.compat_ws):
1290 ms_plan = cast(MsPlan, plan)
1291 if not CRUSHMap.have_default_choose_args(ms_plan.initial.crush_dump):
1292 self.log.debug('ceph osd crush weight-set create-compat')
1293 result = CommandResult('')
1294 self.send_command(result, 'mon', '', json.dumps({
1295 'prefix': 'osd crush weight-set create-compat',
1296 'format': 'json',
1297 }), '')
1298 r, outb, outs = result.wait()
1299 if r != 0:
1300 self.log.error('Error creating compat weight-set')
1301 return r, outs
3efd9988 1302
f67539c2 1303 for osd, weight in plan.compat_ws.items():
3efd9988
FG
1304 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1305 osd, weight)
b32b8144 1306 result = CommandResult('')
3efd9988
FG
1307 self.send_command(result, 'mon', '', json.dumps({
1308 'prefix': 'osd crush weight-set reweight-compat',
1309 'format': 'json',
1310 'item': 'osd.%d' % osd,
1311 'weight': [weight],
b32b8144 1312 }), '')
3efd9988
FG
1313 commands.append(result)
1314
1315 # new_weight
1316 reweightn = {}
f67539c2 1317 for osd, weight in plan.osd_weights.items():
3efd9988
FG
1318 reweightn[str(osd)] = str(int(weight * float(0x10000)))
1319 if len(reweightn):
1320 self.log.info('ceph osd reweightn %s', reweightn)
b32b8144 1321 result = CommandResult('')
3efd9988
FG
1322 self.send_command(result, 'mon', '', json.dumps({
1323 'prefix': 'osd reweightn',
1324 'format': 'json',
1325 'weights': json.dumps(reweightn),
b32b8144 1326 }), '')
3efd9988
FG
1327 commands.append(result)
1328
1329 # upmap
1330 incdump = plan.inc.dump()
9f95a23c
TL
1331 for item in incdump.get('new_pg_upmap', []):
1332 self.log.info('ceph osd pg-upmap %s mappings %s', item['pgid'],
1333 item['osds'])
3efd9988
FG
1334 result = CommandResult('foo')
1335 self.send_command(result, 'mon', '', json.dumps({
9f95a23c
TL
1336 'prefix': 'osd pg-upmap',
1337 'format': 'json',
1338 'pgid': item['pgid'],
1339 'id': item['osds'],
1340 }), 'foo')
1341 commands.append(result)
1342
1343 for pgid in incdump.get('old_pg_upmap', []):
1344 self.log.info('ceph osd rm-pg-upmap %s', pgid)
1345 result = CommandResult('foo')
1346 self.send_command(result, 'mon', '', json.dumps({
1347 'prefix': 'osd rm-pg-upmap',
3efd9988
FG
1348 'format': 'json',
1349 'pgid': pgid,
1350 }), 'foo')
1351 commands.append(result)
1352
1353 for item in incdump.get('new_pg_upmap_items', []):
1354 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
1355 item['mappings'])
1356 osdlist = []
1357 for m in item['mappings']:
1358 osdlist += [m['from'], m['to']]
1359 result = CommandResult('foo')
1360 self.send_command(result, 'mon', '', json.dumps({
1361 'prefix': 'osd pg-upmap-items',
1362 'format': 'json',
1363 'pgid': item['pgid'],
1364 'id': osdlist,
1365 }), 'foo')
1366 commands.append(result)
1367
9f95a23c
TL
1368 for pgid in incdump.get('old_pg_upmap_items', []):
1369 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
1370 result = CommandResult('foo')
1371 self.send_command(result, 'mon', '', json.dumps({
1372 'prefix': 'osd rm-pg-upmap-items',
1373 'format': 'json',
1374 'pgid': pgid,
1375 }), 'foo')
1376 commands.append(result)
1377
3efd9988
FG
1378 # wait for commands
1379 self.log.debug('commands %s' % commands)
1380 for result in commands:
1381 r, outb, outs = result.wait()
1382 if r != 0:
94b18763
FG
1383 self.log.error('execute error: r = %d, detail = %s' % (r, outs))
1384 return r, outs
3efd9988 1385 self.log.debug('done')
94b18763 1386 return 0, ''
eafe8130 1387
20effc67 1388 def gather_telemetry(self) -> Dict[str, Any]:
eafe8130
TL
1389 return {
1390 'active': self.active,
1391 'mode': self.mode,
1392 }