]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
update sources to v12.2.3
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
1
2 """
3 Balance PG distribution across OSDs.
4 """
5
6 import copy
7 import errno
8 import json
9 import math
10 import random
11 import time
12 from mgr_module import MgrModule, CommandResult
13 from threading import Event
14 from mgr_module import CRUSHMap
15
16 # available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight'
17 default_mode = 'none'
18 default_sleep_interval = 60 # seconds
19 default_max_misplaced = .05 # max ratio of pgs replaced at a time
20
21 TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
22
23 class MappingState:
24 def __init__(self, osdmap, pg_dump, desc=''):
25 self.desc = desc
26 self.osdmap = osdmap
27 self.osdmap_dump = self.osdmap.dump()
28 self.crush = osdmap.get_crush()
29 self.crush_dump = self.crush.dump()
30 self.pg_dump = pg_dump
31 self.pg_stat = {
32 i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', [])
33 }
34 self.poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
35 self.pg_up = {}
36 self.pg_up_by_poolid = {}
37 for poolid in self.poolids:
38 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
39 for a,b in self.pg_up_by_poolid[poolid].iteritems():
40 self.pg_up[a] = b
41
42 def calc_misplaced_from(self, other_ms):
43 num = len(other_ms.pg_up)
44 misplaced = 0
45 for pgid, before in other_ms.pg_up.iteritems():
46 if before != self.pg_up.get(pgid, []):
47 misplaced += 1
48 if num > 0:
49 return float(misplaced) / float(num)
50 return 0.0
51
52 class Plan:
53 def __init__(self, name, ms):
54 self.mode = 'unknown'
55 self.name = name
56 self.initial = ms
57
58 self.osd_weights = {}
59 self.compat_ws = {}
60 self.inc = ms.osdmap.new_incremental()
61
62 def final_state(self):
63 self.inc.set_osd_reweights(self.osd_weights)
64 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
65 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
66 self.initial.pg_dump,
67 'plan %s final' % self.name)
68
69 def dump(self):
70 return json.dumps(self.inc.dump(), indent=4)
71
72 def show(self):
73 ls = []
74 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
75 ls.append('# starting crush version %d' %
76 self.initial.osdmap.get_crush_version())
77 ls.append('# mode %s' % self.mode)
78 if len(self.compat_ws) and \
79 '-1' not in self.initial.crush_dump.get('choose_args', {}):
80 ls.append('ceph osd crush weight-set create-compat')
81 for osd, weight in self.compat_ws.iteritems():
82 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
83 (osd, weight))
84 for osd, weight in self.osd_weights.iteritems():
85 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
86 incdump = self.inc.dump()
87 for pgid in incdump.get('old_pg_upmap_items', []):
88 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
89 for item in incdump.get('new_pg_upmap_items', []):
90 osdlist = []
91 for m in item['mappings']:
92 osdlist += [m['from'], m['to']]
93 ls.append('ceph osd pg-upmap-items %s %s' %
94 (item['pgid'], ' '.join([str(a) for a in osdlist])))
95 return '\n'.join(ls)
96
97
98 class Eval:
99 root_ids = {} # root name -> id
100 pool_name = {} # pool id -> pool name
101 pool_id = {} # pool name -> id
102 pool_roots = {} # pool name -> root name
103 root_pools = {} # root name -> pools
104 target_by_root = {} # root name -> target weight map
105 count_by_pool = {}
106 count_by_root = {}
107 actual_by_pool = {} # pool -> by_* -> actual weight map
108 actual_by_root = {} # pool -> by_* -> actual weight map
109 total_by_pool = {} # pool -> by_* -> total
110 total_by_root = {} # root -> by_* -> total
111 stats_by_pool = {} # pool -> by_* -> stddev or avg -> value
112 stats_by_root = {} # root -> by_* -> stddev or avg -> value
113
114 score_by_pool = {}
115 score_by_root = {}
116
117 score = 0.0
118
119 def __init__(self, ms):
120 self.ms = ms
121
122 def show(self, verbose=False):
123 if verbose:
124 r = self.ms.desc + '\n'
125 r += 'target_by_root %s\n' % self.target_by_root
126 r += 'actual_by_pool %s\n' % self.actual_by_pool
127 r += 'actual_by_root %s\n' % self.actual_by_root
128 r += 'count_by_pool %s\n' % self.count_by_pool
129 r += 'count_by_root %s\n' % self.count_by_root
130 r += 'total_by_pool %s\n' % self.total_by_pool
131 r += 'total_by_root %s\n' % self.total_by_root
132 r += 'stats_by_root %s\n' % self.stats_by_root
133 r += 'score_by_pool %s\n' % self.score_by_pool
134 r += 'score_by_root %s\n' % self.score_by_root
135 else:
136 r = self.ms.desc + ' '
137 r += 'score %f (lower is better)\n' % self.score
138 return r
139
140 def calc_stats(self, count, target, total):
141 num = max(len(target), 1)
142 r = {}
143 for t in ('pgs', 'objects', 'bytes'):
144 avg = float(total[t]) / float(num)
145 dev = 0.0
146
147 # score is a measure of how uneven the data distribution is.
148 # score lies between [0, 1), 0 means perfect distribution.
149 score = 0.0
150 sum_weight = 0.0
151
152 for k, v in count[t].iteritems():
153 # adjust/normalize by weight
154 if target[k]:
155 adjusted = float(v) / target[k] / float(num)
156 else:
157 adjusted = 0.0
158
159 # Overweighted devices and their weights are factors to calculate reweight_urgency.
160 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
161 # situation than one 10% overfilled with 5 2% underfilled devices
162 if adjusted > avg:
163 '''
164 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
165 x = (adjusted - avg)/avg.
166 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
167 To bring range of F(x) in range [0, 1), we need to make the above modification.
168
169 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
170 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
171 2. A larger value of x, should imply more urgency to reweight.
172 3. Also, the difference between F(x) when x is large, should be minimal.
173 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
174
175 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
176
177 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
178 '''
179 score += target[k] * (math.erf(((adjusted - avg)/avg) / math.sqrt(2.0)))
180 sum_weight += target[k]
181 dev += (avg - adjusted) * (avg - adjusted)
182 stddev = math.sqrt(dev / float(max(num - 1, 1)))
183 score = score / max(sum_weight, 1)
184 r[t] = {
185 'avg': avg,
186 'stddev': stddev,
187 'sum_weight': sum_weight,
188 'score': score,
189 }
190 return r
191
192 class Module(MgrModule):
193 COMMANDS = [
194 {
195 "cmd": "balancer status",
196 "desc": "Show balancer status",
197 "perm": "r",
198 },
199 {
200 "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
201 "desc": "Set balancer mode",
202 "perm": "rw",
203 },
204 {
205 "cmd": "balancer on",
206 "desc": "Enable automatic balancing",
207 "perm": "rw",
208 },
209 {
210 "cmd": "balancer off",
211 "desc": "Disable automatic balancing",
212 "perm": "rw",
213 },
214 {
215 "cmd": "balancer eval name=plan,type=CephString,req=false",
216 "desc": "Evaluate data distribution for the current cluster or specific plan",
217 "perm": "r",
218 },
219 {
220 "cmd": "balancer eval-verbose name=plan,type=CephString,req=false",
221 "desc": "Evaluate data distribution for the current cluster or specific plan (verbosely)",
222 "perm": "r",
223 },
224 {
225 "cmd": "balancer optimize name=plan,type=CephString",
226 "desc": "Run optimizer to create a new plan",
227 "perm": "rw",
228 },
229 {
230 "cmd": "balancer show name=plan,type=CephString",
231 "desc": "Show details of an optimization plan",
232 "perm": "r",
233 },
234 {
235 "cmd": "balancer rm name=plan,type=CephString",
236 "desc": "Discard an optimization plan",
237 "perm": "rw",
238 },
239 {
240 "cmd": "balancer reset",
241 "desc": "Discard all optimization plans",
242 "perm": "rw",
243 },
244 {
245 "cmd": "balancer dump name=plan,type=CephString",
246 "desc": "Show an optimization plan",
247 "perm": "r",
248 },
249 {
250 "cmd": "balancer execute name=plan,type=CephString",
251 "desc": "Execute an optimization plan",
252 "perm": "r",
253 },
254 ]
255 active = False
256 run = True
257 plans = {}
258 mode = ''
259
260 def __init__(self, *args, **kwargs):
261 super(Module, self).__init__(*args, **kwargs)
262 self.event = Event()
263
264 def handle_command(self, command):
265 self.log.warn("Handling command: '%s'" % str(command))
266 if command['prefix'] == 'balancer status':
267 s = {
268 'plans': self.plans.keys(),
269 'active': self.active,
270 'mode': self.get_config('mode', default_mode),
271 }
272 return (0, json.dumps(s, indent=4), '')
273 elif command['prefix'] == 'balancer mode':
274 self.set_config('mode', command['mode'])
275 return (0, '', '')
276 elif command['prefix'] == 'balancer on':
277 if not self.active:
278 self.set_config('active', '1')
279 self.active = True
280 self.event.set()
281 return (0, '', '')
282 elif command['prefix'] == 'balancer off':
283 if self.active:
284 self.set_config('active', '')
285 self.active = False
286 self.event.set()
287 return (0, '', '')
288 elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose':
289 verbose = command['prefix'] == 'balancer eval-verbose'
290 if 'plan' in command:
291 plan = self.plans.get(command['plan'])
292 if not plan:
293 return (-errno.ENOENT, '', 'plan %s not found' %
294 command['plan'])
295 ms = plan.final_state()
296 else:
297 ms = MappingState(self.get_osdmap(),
298 self.get("pg_dump"),
299 'current cluster')
300 return (0, self.evaluate(ms, verbose=verbose), '')
301 elif command['prefix'] == 'balancer optimize':
302 plan = self.plan_create(command['plan'])
303 self.optimize(plan)
304 return (0, '', '')
305 elif command['prefix'] == 'balancer rm':
306 self.plan_rm(command['plan'])
307 return (0, '', '')
308 elif command['prefix'] == 'balancer reset':
309 self.plans = {}
310 return (0, '', '')
311 elif command['prefix'] == 'balancer dump':
312 plan = self.plans.get(command['plan'])
313 if not plan:
314 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
315 return (0, plan.dump(), '')
316 elif command['prefix'] == 'balancer show':
317 plan = self.plans.get(command['plan'])
318 if not plan:
319 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
320 return (0, plan.show(), '')
321 elif command['prefix'] == 'balancer execute':
322 plan = self.plans.get(command['plan'])
323 if not plan:
324 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
325 self.execute(plan)
326 self.plan_rm(plan)
327 return (0, '', '')
328 else:
329 return (-errno.EINVAL, '',
330 "Command not found '{0}'".format(command['prefix']))
331
332 def shutdown(self):
333 self.log.info('Stopping')
334 self.run = False
335 self.event.set()
336
337 def time_in_interval(self, tod, begin, end):
338 if begin <= end:
339 return tod >= begin and tod < end
340 else:
341 return tod >= begin or tod < end
342
343 def serve(self):
344 self.log.info('Starting')
345 while self.run:
346 self.active = self.get_config('active', '') is not ''
347 begin_time = self.get_config('begin_time') or '0000'
348 end_time = self.get_config('end_time') or '2400'
349 timeofday = time.strftime('%H%M', time.localtime())
350 self.log.debug('Waking up [%s, scheduled for %s-%s, now %s]',
351 "active" if self.active else "inactive",
352 begin_time, end_time, timeofday)
353 sleep_interval = float(self.get_config('sleep_interval',
354 default_sleep_interval))
355 if self.active and self.time_in_interval(timeofday, begin_time, end_time):
356 self.log.debug('Running')
357 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
358 plan = self.plan_create(name)
359 if self.optimize(plan):
360 self.execute(plan)
361 self.plan_rm(name)
362 self.log.debug('Sleeping for %d', sleep_interval)
363 self.event.wait(sleep_interval)
364 self.event.clear()
365
366 def plan_create(self, name):
367 plan = Plan(name, MappingState(self.get_osdmap(),
368 self.get("pg_dump"),
369 'plan %s initial' % name))
370 self.plans[name] = plan
371 return plan
372
373 def plan_rm(self, name):
374 if name in self.plans:
375 del self.plans[name]
376
377 def calc_eval(self, ms):
378 pe = Eval(ms)
379 pool_rule = {}
380 pool_info = {}
381 for p in ms.osdmap_dump.get('pools',[]):
382 pe.pool_name[p['pool']] = p['pool_name']
383 pe.pool_id[p['pool_name']] = p['pool']
384 pool_rule[p['pool_name']] = p['crush_rule']
385 pe.pool_roots[p['pool_name']] = []
386 pool_info[p['pool_name']] = p
387 pools = pe.pool_id.keys()
388 if len(pools) == 0:
389 return pe
390 self.log.debug('pool_name %s' % pe.pool_name)
391 self.log.debug('pool_id %s' % pe.pool_id)
392 self.log.debug('pools %s' % pools)
393 self.log.debug('pool_rule %s' % pool_rule)
394
395 osd_weight = { a['osd']: a['weight']
396 for a in ms.osdmap_dump.get('osds',[]) }
397
398 # get expected distributions by root
399 actual_by_root = {}
400 rootids = ms.crush.find_takes()
401 roots = []
402 for rootid in rootids:
403 root = ms.crush.get_item_name(rootid)
404 pe.root_ids[root] = rootid
405 roots.append(root)
406 ls = ms.osdmap.get_pools_by_take(rootid)
407 pe.root_pools[root] = []
408 for poolid in ls:
409 pe.pool_roots[pe.pool_name[poolid]].append(root)
410 pe.root_pools[root].append(pe.pool_name[poolid])
411 weight_map = ms.crush.get_take_weight_osd_map(rootid)
412 adjusted_map = {
413 osd: cw * osd_weight.get(osd, 1.0)
414 for osd,cw in weight_map.iteritems()
415 }
416 sum_w = sum(adjusted_map.values()) or 1.0
417 pe.target_by_root[root] = { osd: w / sum_w
418 for osd,w in adjusted_map.iteritems() }
419 actual_by_root[root] = {
420 'pgs': {},
421 'objects': {},
422 'bytes': {},
423 }
424 for osd in pe.target_by_root[root].iterkeys():
425 actual_by_root[root]['pgs'][osd] = 0
426 actual_by_root[root]['objects'][osd] = 0
427 actual_by_root[root]['bytes'][osd] = 0
428 pe.total_by_root[root] = {
429 'pgs': 0,
430 'objects': 0,
431 'bytes': 0,
432 }
433 self.log.debug('pool_roots %s' % pe.pool_roots)
434 self.log.debug('root_pools %s' % pe.root_pools)
435 self.log.debug('target_by_root %s' % pe.target_by_root)
436
437 # pool and root actual
438 for pool, pi in pool_info.iteritems():
439 poolid = pi['pool']
440 pm = ms.pg_up_by_poolid[poolid]
441 pgs = 0
442 objects = 0
443 bytes = 0
444 pgs_by_osd = {}
445 objects_by_osd = {}
446 bytes_by_osd = {}
447 for root in pe.pool_roots[pool]:
448 for osd in pe.target_by_root[root].iterkeys():
449 pgs_by_osd[osd] = 0
450 objects_by_osd[osd] = 0
451 bytes_by_osd[osd] = 0
452 for pgid, up in pm.iteritems():
453 for osd in [int(osd) for osd in up]:
454 if osd == CRUSHMap.ITEM_NONE:
455 continue
456 pgs_by_osd[osd] += 1
457 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
458 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
459 # pick a root to associate this pg instance with.
460 # note that this is imprecise if the roots have
461 # overlapping children.
462 # FIXME: divide bytes by k for EC pools.
463 for root in pe.pool_roots[pool]:
464 if osd in pe.target_by_root[root]:
465 actual_by_root[root]['pgs'][osd] += 1
466 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
467 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
468 pgs += 1
469 objects += ms.pg_stat[pgid]['num_objects']
470 bytes += ms.pg_stat[pgid]['num_bytes']
471 pe.total_by_root[root]['pgs'] += 1
472 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
473 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
474 break
475 pe.count_by_pool[pool] = {
476 'pgs': {
477 k: v
478 for k, v in pgs_by_osd.iteritems()
479 },
480 'objects': {
481 k: v
482 for k, v in objects_by_osd.iteritems()
483 },
484 'bytes': {
485 k: v
486 for k, v in bytes_by_osd.iteritems()
487 },
488 }
489 pe.actual_by_pool[pool] = {
490 'pgs': {
491 k: float(v) / float(max(pgs, 1))
492 for k, v in pgs_by_osd.iteritems()
493 },
494 'objects': {
495 k: float(v) / float(max(objects, 1))
496 for k, v in objects_by_osd.iteritems()
497 },
498 'bytes': {
499 k: float(v) / float(max(bytes, 1))
500 for k, v in bytes_by_osd.iteritems()
501 },
502 }
503 pe.total_by_pool[pool] = {
504 'pgs': pgs,
505 'objects': objects,
506 'bytes': bytes,
507 }
508 for root, m in pe.total_by_root.iteritems():
509 pe.count_by_root[root] = {
510 'pgs': {
511 k: float(v)
512 for k, v in actual_by_root[root]['pgs'].iteritems()
513 },
514 'objects': {
515 k: float(v)
516 for k, v in actual_by_root[root]['objects'].iteritems()
517 },
518 'bytes': {
519 k: float(v)
520 for k, v in actual_by_root[root]['bytes'].iteritems()
521 },
522 }
523 pe.actual_by_root[root] = {
524 'pgs': {
525 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
526 for k, v in actual_by_root[root]['pgs'].iteritems()
527 },
528 'objects': {
529 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
530 for k, v in actual_by_root[root]['objects'].iteritems()
531 },
532 'bytes': {
533 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
534 for k, v in actual_by_root[root]['bytes'].iteritems()
535 },
536 }
537 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
538 self.log.debug('actual_by_root %s' % pe.actual_by_root)
539
540 # average and stddev and score
541 pe.stats_by_root = {
542 a: pe.calc_stats(
543 b,
544 pe.target_by_root[a],
545 pe.total_by_root[a]
546 ) for a, b in pe.count_by_root.iteritems()
547 }
548
549 # the scores are already normalized
550 pe.score_by_root = {
551 r: {
552 'pgs': pe.stats_by_root[r]['pgs']['score'],
553 'objects': pe.stats_by_root[r]['objects']['score'],
554 'bytes': pe.stats_by_root[r]['bytes']['score'],
555 } for r in pe.total_by_root.keys()
556 }
557
558 # total score is just average of normalized stddevs
559 pe.score = 0.0
560 for r, vs in pe.score_by_root.iteritems():
561 for k, v in vs.iteritems():
562 pe.score += v
563 pe.score /= 3 * len(roots)
564 return pe
565
566 def evaluate(self, ms, verbose=False):
567 pe = self.calc_eval(ms)
568 return pe.show(verbose=verbose)
569
570 def optimize(self, plan):
571 self.log.info('Optimize plan %s' % plan.name)
572 plan.mode = self.get_config('mode', default_mode)
573 max_misplaced = float(self.get_config('max_misplaced',
574 default_max_misplaced))
575 self.log.info('Mode %s, max misplaced %f' %
576 (plan.mode, max_misplaced))
577
578 info = self.get('pg_status')
579 unknown = info.get('unknown_pgs_ratio', 0.0)
580 degraded = info.get('degraded_ratio', 0.0)
581 inactive = info.get('inactive_pgs_ratio', 0.0)
582 misplaced = info.get('misplaced_ratio', 0.0)
583 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
584 unknown, degraded, inactive, misplaced)
585 if unknown > 0.0:
586 self.log.info('Some PGs (%f) are unknown; waiting', unknown)
587 elif degraded > 0.0:
588 self.log.info('Some objects (%f) are degraded; waiting', degraded)
589 elif inactive > 0.0:
590 self.log.info('Some PGs (%f) are inactive; waiting', inactive)
591 elif misplaced >= max_misplaced:
592 self.log.info('Too many objects (%f > %f) are misplaced; waiting',
593 misplaced, max_misplaced)
594 else:
595 if plan.mode == 'upmap':
596 return self.do_upmap(plan)
597 elif plan.mode == 'crush-compat':
598 return self.do_crush_compat(plan)
599 elif plan.mode == 'none':
600 self.log.info('Idle')
601 else:
602 self.log.info('Unrecognized mode %s' % plan.mode)
603 return False
604
605 ##
606
607 def do_upmap(self, plan):
608 self.log.info('do_upmap')
609 max_iterations = int(self.get_config('upmap_max_iterations', 10))
610 max_deviation = float(self.get_config('upmap_max_deviation', .01))
611
612 ms = plan.initial
613 pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])]
614 if len(pools) == 0:
615 self.log.info('no pools, nothing to do')
616 return False
617 # shuffle pool list so they all get equal (in)attention
618 random.shuffle(pools)
619 self.log.info('pools %s' % pools)
620
621 inc = plan.inc
622 total_did = 0
623 left = max_iterations
624 for pool in pools:
625 did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
626 total_did += did
627 left -= did
628 if left <= 0:
629 break
630 self.log.info('prepared %d/%d changes' % (total_did, max_iterations))
631 return True
632
633 def do_crush_compat(self, plan):
634 self.log.info('do_crush_compat')
635 max_iterations = int(self.get_config('crush_compat_max_iterations', 25))
636 if max_iterations < 1:
637 return False
638 step = float(self.get_config('crush_compat_step', .5))
639 if step <= 0 or step >= 1.0:
640 return False
641 max_misplaced = float(self.get_config('max_misplaced',
642 default_max_misplaced))
643 min_pg_per_osd = 2
644
645 ms = plan.initial
646 osdmap = ms.osdmap
647 crush = osdmap.get_crush()
648 pe = self.calc_eval(ms)
649 if pe.score == 0:
650 self.log.info('Distribution is already perfect')
651 return False
652
653 # get current osd reweights
654 orig_osd_weight = { a['osd']: a['weight']
655 for a in ms.osdmap_dump.get('osds',[]) }
656 reweighted_osds = [ a for a,b in orig_osd_weight.iteritems()
657 if b < 1.0 and b > 0.0 ]
658
659 # get current compat weight-set weights
660 orig_ws = self.get_compat_weight_set_weights()
661 if orig_ws is None:
662 return False
663 orig_ws = { a: b for a, b in orig_ws.iteritems() if a >= 0 }
664
665 # Make sure roots don't overlap their devices. If so, we
666 # can't proceed.
667 roots = pe.target_by_root.keys()
668 self.log.debug('roots %s', roots)
669 visited = {}
670 overlap = {}
671 root_ids = {}
672 for root, wm in pe.target_by_root.iteritems():
673 for osd in wm.iterkeys():
674 if osd in visited:
675 overlap[osd] = 1
676 visited[osd] = 1
677 if len(overlap) > 0:
678 self.log.error('error: some osds belong to multiple subtrees: %s' %
679 overlap)
680 return False
681
682 key = 'pgs' # pgs objects or bytes
683
684 # go
685 best_ws = copy.deepcopy(orig_ws)
686 best_ow = copy.deepcopy(orig_osd_weight)
687 best_pe = pe
688 left = max_iterations
689 bad_steps = 0
690 next_ws = copy.deepcopy(best_ws)
691 next_ow = copy.deepcopy(best_ow)
692 while left > 0:
693 # adjust
694 self.log.debug('best_ws %s' % best_ws)
695 random.shuffle(roots)
696 for root in roots:
697 pools = best_pe.root_pools[root]
698 pgs = len(best_pe.target_by_root[root])
699 min_pgs = pgs * min_pg_per_osd
700 if best_pe.total_by_root[root] < min_pgs:
701 self.log.info('Skipping root %s (pools %s), total pgs %d '
702 '< minimum %d (%d per osd)',
703 root, pools, pgs, min_pgs, min_pg_per_osd)
704 continue
705 self.log.info('Balancing root %s (pools %s) by %s' %
706 (root, pools, key))
707 target = best_pe.target_by_root[root]
708 actual = best_pe.actual_by_root[root][key]
709 queue = sorted(actual.keys(),
710 key=lambda osd: -abs(target[osd] - actual[osd]))
711 for osd in queue:
712 if orig_osd_weight[osd] == 0:
713 self.log.debug('skipping out osd.%d', osd)
714 else:
715 deviation = target[osd] - actual[osd]
716 if deviation == 0:
717 break
718 self.log.debug('osd.%d deviation %f', osd, deviation)
719 weight = best_ws[osd]
720 ow = orig_osd_weight[osd]
721 if actual[osd] > 0:
722 calc_weight = target[osd] / actual[osd] * weight * ow
723 else:
724 # not enough to go on here... keep orig weight
725 calc_weight = weight / orig_osd_weight[osd]
726 new_weight = weight * (1.0 - step) + calc_weight * step
727 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
728 new_weight)
729 next_ws[osd] = new_weight
730 if ow < 1.0:
731 new_ow = min(1.0, max(step + (1.0 - step) * ow,
732 ow + .005))
733 self.log.debug('Reweight osd.%d reweight %f -> %f',
734 osd, ow, new_ow)
735 next_ow[osd] = new_ow
736
737 # normalize weights under this root
738 root_weight = crush.get_item_weight(pe.root_ids[root])
739 root_sum = sum(b for a,b in next_ws.iteritems()
740 if a in target.keys())
741 if root_sum > 0 and root_weight > 0:
742 factor = root_sum / root_weight
743 self.log.debug('normalizing root %s %d, weight %f, '
744 'ws sum %f, factor %f',
745 root, pe.root_ids[root], root_weight,
746 root_sum, factor)
747 for osd in actual.keys():
748 next_ws[osd] = next_ws[osd] / factor
749
750 # recalc
751 plan.compat_ws = copy.deepcopy(next_ws)
752 next_ms = plan.final_state()
753 next_pe = self.calc_eval(next_ms)
754 next_misplaced = next_ms.calc_misplaced_from(ms)
755 self.log.debug('Step result score %f -> %f, misplacing %f',
756 best_pe.score, next_pe.score, next_misplaced)
757
758 if next_misplaced > max_misplaced:
759 if best_pe.score < pe.score:
760 self.log.debug('Step misplaced %f > max %f, stopping',
761 next_misplaced, max_misplaced)
762 break
763 step /= 2.0
764 next_ws = copy.deepcopy(best_ws)
765 next_ow = copy.deepcopy(best_ow)
766 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
767 next_misplaced, max_misplaced, step)
768 else:
769 if next_pe.score > best_pe.score * 1.0001:
770 if bad_steps < 5 and random.randint(0, 100) < 70:
771 self.log.debug('Score got worse, taking another step')
772 else:
773 step /= 2.0
774 next_ws = copy.deepcopy(best_ws)
775 next_ow = copy.deepcopy(best_ow)
776 self.log.debug('Score got worse, trying smaller step %f',
777 step)
778 else:
779 bad_steps = 0
780 best_pe = next_pe
781 best_ws = next_ws
782 best_ow = next_ow
783 if best_pe.score == 0:
784 break
785 left -= 1
786
787 # allow a small regression if we are phasing out osd weights
788 fudge = 0
789 if next_ow != orig_osd_weight:
790 fudge = .001
791
792 if best_pe.score < pe.score + fudge:
793 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
794 plan.compat_ws = best_ws
795 for osd, w in best_ow.iteritems():
796 if w != orig_osd_weight[osd]:
797 self.log.debug('osd.%d reweight %f', osd, w)
798 plan.osd_weights[osd] = w
799 return True
800 else:
801 self.log.info('Failed to find further optimization, score %f',
802 pe.score)
803 return False
804
805 def get_compat_weight_set_weights(self):
806 # enable compat weight-set
807 self.log.debug('ceph osd crush weight-set create-compat')
808 result = CommandResult('')
809 self.send_command(result, 'mon', '', json.dumps({
810 'prefix': 'osd crush weight-set create-compat',
811 'format': 'json',
812 }), '')
813 r, outb, outs = result.wait()
814 if r != 0:
815 self.log.error('Error creating compat weight-set')
816 return
817
818 result = CommandResult('')
819 self.send_command(result, 'mon', '', json.dumps({
820 'prefix': 'osd crush dump',
821 'format': 'json',
822 }), '')
823 r, outb, outs = result.wait()
824 if r != 0:
825 self.log.error('Error dumping crush map')
826 return
827 try:
828 crushmap = json.loads(outb)
829 except:
830 raise RuntimeError('unable to parse crush map')
831
832 raw = crushmap.get('choose_args',{}).get('-1', [])
833 weight_set = {}
834 for b in raw:
835 bucket = None
836 for t in crushmap['buckets']:
837 if t['id'] == b['bucket_id']:
838 bucket = t
839 break
840 if not bucket:
841 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
842 self.log.debug('bucket items %s' % bucket['items'])
843 self.log.debug('weight set %s' % b['weight_set'][0])
844 if len(bucket['items']) != len(b['weight_set'][0]):
845 raise RuntimeError('weight-set size does not match bucket items')
846 for pos in range(len(bucket['items'])):
847 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
848
849 self.log.debug('weight_set weights %s' % weight_set)
850 return weight_set
851
852 def do_crush(self):
853 self.log.info('do_crush (not yet implemented)')
854
855 def do_osd_weight(self):
856 self.log.info('do_osd_weight (not yet implemented)')
857
858 def execute(self, plan):
859 self.log.info('Executing plan %s' % plan.name)
860
861 commands = []
862
863 # compat weight-set
864 if len(plan.compat_ws) and \
865 '-1' not in plan.initial.crush_dump.get('choose_args', {}):
866 self.log.debug('ceph osd crush weight-set create-compat')
867 result = CommandResult('')
868 self.send_command(result, 'mon', '', json.dumps({
869 'prefix': 'osd crush weight-set create-compat',
870 'format': 'json',
871 }), '')
872 r, outb, outs = result.wait()
873 if r != 0:
874 self.log.error('Error creating compat weight-set')
875 return
876
877 for osd, weight in plan.compat_ws.iteritems():
878 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
879 osd, weight)
880 result = CommandResult('')
881 self.send_command(result, 'mon', '', json.dumps({
882 'prefix': 'osd crush weight-set reweight-compat',
883 'format': 'json',
884 'item': 'osd.%d' % osd,
885 'weight': [weight],
886 }), '')
887 commands.append(result)
888
889 # new_weight
890 reweightn = {}
891 for osd, weight in plan.osd_weights.iteritems():
892 reweightn[str(osd)] = str(int(weight * float(0x10000)))
893 if len(reweightn):
894 self.log.info('ceph osd reweightn %s', reweightn)
895 result = CommandResult('')
896 self.send_command(result, 'mon', '', json.dumps({
897 'prefix': 'osd reweightn',
898 'format': 'json',
899 'weights': json.dumps(reweightn),
900 }), '')
901 commands.append(result)
902
903 # upmap
904 incdump = plan.inc.dump()
905 for pgid in incdump.get('old_pg_upmap_items', []):
906 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
907 result = CommandResult('foo')
908 self.send_command(result, 'mon', '', json.dumps({
909 'prefix': 'osd rm-pg-upmap-items',
910 'format': 'json',
911 'pgid': pgid,
912 }), 'foo')
913 commands.append(result)
914
915 for item in incdump.get('new_pg_upmap_items', []):
916 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
917 item['mappings'])
918 osdlist = []
919 for m in item['mappings']:
920 osdlist += [m['from'], m['to']]
921 result = CommandResult('foo')
922 self.send_command(result, 'mon', '', json.dumps({
923 'prefix': 'osd pg-upmap-items',
924 'format': 'json',
925 'pgid': item['pgid'],
926 'id': osdlist,
927 }), 'foo')
928 commands.append(result)
929
930 # wait for commands
931 self.log.debug('commands %s' % commands)
932 for result in commands:
933 r, outb, outs = result.wait()
934 if r != 0:
935 self.log.error('Error on command')
936 return
937 self.log.debug('done')