]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
update source to 12.2.11
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
1
2 """
3 Balance PG distribution across OSDs.
4 """
5
6 import copy
7 import errno
8 import json
9 import math
10 import random
11 import six
12 import time
13 from mgr_module import MgrModule, CommandResult
14 from threading import Event
15 from mgr_module import CRUSHMap
16
17 # available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight'
18 default_mode = 'none'
19 default_sleep_interval = 60 # seconds
20 default_max_misplaced = .05 # max ratio of pgs replaced at a time
21
22 TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
23
24 class MappingState:
25 def __init__(self, osdmap, pg_dump, desc=''):
26 self.desc = desc
27 self.osdmap = osdmap
28 self.osdmap_dump = self.osdmap.dump()
29 self.crush = osdmap.get_crush()
30 self.crush_dump = self.crush.dump()
31 self.pg_dump = pg_dump
32 self.pg_stat = {
33 i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', [])
34 }
35 osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
36 pg_poolids = [p['poolid'] for p in pg_dump.get('pool_stats', [])]
37 self.poolids = set(osd_poolids) & set(pg_poolids)
38 self.pg_up = {}
39 self.pg_up_by_poolid = {}
40 for poolid in self.poolids:
41 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
42 for a,b in six.iteritems(self.pg_up_by_poolid[poolid]):
43 self.pg_up[a] = b
44
45 def calc_misplaced_from(self, other_ms):
46 num = len(other_ms.pg_up)
47 misplaced = 0
48 for pgid, before in six.iteritems(other_ms.pg_up):
49 if before != self.pg_up.get(pgid, []):
50 misplaced += 1
51 if num > 0:
52 return float(misplaced) / float(num)
53 return 0.0
54
55 class Plan:
56 def __init__(self, name, ms, pools):
57 self.mode = 'unknown'
58 self.name = name
59 self.initial = ms
60 self.pools = pools
61
62 self.osd_weights = {}
63 self.compat_ws = {}
64 self.inc = ms.osdmap.new_incremental()
65
66 def final_state(self):
67 self.inc.set_osd_reweights(self.osd_weights)
68 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
69 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
70 self.initial.pg_dump,
71 'plan %s final' % self.name)
72
73 def dump(self):
74 return json.dumps(self.inc.dump(), indent=4)
75
76 def show(self):
77 ls = []
78 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
79 ls.append('# starting crush version %d' %
80 self.initial.osdmap.get_crush_version())
81 ls.append('# mode %s' % self.mode)
82 if len(self.compat_ws) and \
83 '-1' not in self.initial.crush_dump.get('choose_args', {}):
84 ls.append('ceph osd crush weight-set create-compat')
85 for osd, weight in six.iteritems(self.compat_ws):
86 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
87 (osd, weight))
88 for osd, weight in six.iteritems(self.osd_weights):
89 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
90 incdump = self.inc.dump()
91 for pgid in incdump.get('old_pg_upmap_items', []):
92 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
93 for item in incdump.get('new_pg_upmap_items', []):
94 osdlist = []
95 for m in item['mappings']:
96 osdlist += [m['from'], m['to']]
97 ls.append('ceph osd pg-upmap-items %s %s' %
98 (item['pgid'], ' '.join([str(a) for a in osdlist])))
99 return '\n'.join(ls)
100
101
102 class Eval:
103 def __init__(self, ms):
104 self.ms = ms
105 self.root_ids = {} # root name -> id
106 self.pool_name = {} # pool id -> pool name
107 self.pool_id = {} # pool name -> id
108 self.pool_roots = {} # pool name -> root name
109 self.root_pools = {} # root name -> pools
110 self.target_by_root = {} # root name -> target weight map
111 self.count_by_pool = {}
112 self.count_by_root = {}
113 self.actual_by_pool = {} # pool -> by_* -> actual weight map
114 self.actual_by_root = {} # pool -> by_* -> actual weight map
115 self.total_by_pool = {} # pool -> by_* -> total
116 self.total_by_root = {} # root -> by_* -> total
117 self.stats_by_pool = {} # pool -> by_* -> stddev or avg -> value
118 self.stats_by_root = {} # root -> by_* -> stddev or avg -> value
119
120 self.score_by_pool = {}
121 self.score_by_root = {}
122
123 self.score = 0.0
124
125 def show(self, verbose=False):
126 if verbose:
127 r = self.ms.desc + '\n'
128 r += 'target_by_root %s\n' % self.target_by_root
129 r += 'actual_by_pool %s\n' % self.actual_by_pool
130 r += 'actual_by_root %s\n' % self.actual_by_root
131 r += 'count_by_pool %s\n' % self.count_by_pool
132 r += 'count_by_root %s\n' % self.count_by_root
133 r += 'total_by_pool %s\n' % self.total_by_pool
134 r += 'total_by_root %s\n' % self.total_by_root
135 r += 'stats_by_root %s\n' % self.stats_by_root
136 r += 'score_by_pool %s\n' % self.score_by_pool
137 r += 'score_by_root %s\n' % self.score_by_root
138 else:
139 r = self.ms.desc + ' '
140 r += 'score %f (lower is better)\n' % self.score
141 return r
142
143 def calc_stats(self, count, target, total):
144 num = max(len(target), 1)
145 r = {}
146 for t in ('pgs', 'objects', 'bytes'):
147 if total[t] == 0:
148 r[t] = {
149 'avg': 0,
150 'stddev': 0,
151 'sum_weight': 0,
152 'score': 0,
153 }
154 continue
155
156 avg = float(total[t]) / float(num)
157 dev = 0.0
158
159 # score is a measure of how uneven the data distribution is.
160 # score lies between [0, 1), 0 means perfect distribution.
161 score = 0.0
162 sum_weight = 0.0
163
164 for k, v in six.iteritems(count[t]):
165 # adjust/normalize by weight
166 if target[k]:
167 adjusted = float(v) / target[k] / float(num)
168 else:
169 adjusted = 0.0
170
171 # Overweighted devices and their weights are factors to calculate reweight_urgency.
172 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
173 # situation than one 10% overfilled with 5 2% underfilled devices
174 if adjusted > avg:
175 '''
176 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
177 x = (adjusted - avg)/avg.
178 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
179 To bring range of F(x) in range [0, 1), we need to make the above modification.
180
181 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
182 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
183 2. A larger value of x, should imply more urgency to reweight.
184 3. Also, the difference between F(x) when x is large, should be minimal.
185 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
186
187 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
188
189 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
190 '''
191 score += target[k] * (math.erf(((adjusted - avg)/avg) / math.sqrt(2.0)))
192 sum_weight += target[k]
193 dev += (avg - adjusted) * (avg - adjusted)
194 stddev = math.sqrt(dev / float(max(num - 1, 1)))
195 score = score / max(sum_weight, 1)
196 r[t] = {
197 'avg': avg,
198 'stddev': stddev,
199 'sum_weight': sum_weight,
200 'score': score,
201 }
202 return r
203
204 class Module(MgrModule):
205 COMMANDS = [
206 {
207 "cmd": "balancer status",
208 "desc": "Show balancer status",
209 "perm": "r",
210 },
211 {
212 "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
213 "desc": "Set balancer mode",
214 "perm": "rw",
215 },
216 {
217 "cmd": "balancer on",
218 "desc": "Enable automatic balancing",
219 "perm": "rw",
220 },
221 {
222 "cmd": "balancer off",
223 "desc": "Disable automatic balancing",
224 "perm": "rw",
225 },
226 {
227 "cmd": "balancer eval name=option,type=CephString,req=false",
228 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan",
229 "perm": "r",
230 },
231 {
232 "cmd": "balancer eval-verbose name=option,type=CephString,req=false",
233 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely)",
234 "perm": "r",
235 },
236 {
237 "cmd": "balancer optimize name=plan,type=CephString name=pools,type=CephString,n=N,req=false",
238 "desc": "Run optimizer to create a new plan",
239 "perm": "rw",
240 },
241 {
242 "cmd": "balancer show name=plan,type=CephString",
243 "desc": "Show details of an optimization plan",
244 "perm": "r",
245 },
246 {
247 "cmd": "balancer rm name=plan,type=CephString",
248 "desc": "Discard an optimization plan",
249 "perm": "rw",
250 },
251 {
252 "cmd": "balancer reset",
253 "desc": "Discard all optimization plans",
254 "perm": "rw",
255 },
256 {
257 "cmd": "balancer dump name=plan,type=CephString",
258 "desc": "Show an optimization plan",
259 "perm": "r",
260 },
261 {
262 "cmd": "balancer ls",
263 "desc": "List all plans",
264 "perm": "r",
265 },
266 {
267 "cmd": "balancer execute name=plan,type=CephString",
268 "desc": "Execute an optimization plan",
269 "perm": "r",
270 },
271 ]
272 active = False
273 run = True
274 plans = {}
275 mode = ''
276
277 def __init__(self, *args, **kwargs):
278 super(Module, self).__init__(*args, **kwargs)
279 self.event = Event()
280
281 def handle_command(self, command):
282 self.log.warn("Handling command: '%s'" % str(command))
283 if command['prefix'] == 'balancer status':
284 s = {
285 'plans': list(self.plans.keys()),
286 'active': self.active,
287 'mode': self.get_config('mode', default_mode),
288 }
289 return (0, json.dumps(s, indent=4), '')
290 elif command['prefix'] == 'balancer mode':
291 self.set_config('mode', command['mode'])
292 return (0, '', '')
293 elif command['prefix'] == 'balancer on':
294 if not self.active:
295 self.set_config('active', '1')
296 self.active = True
297 self.event.set()
298 return (0, '', '')
299 elif command['prefix'] == 'balancer off':
300 if self.active:
301 self.set_config('active', '')
302 self.active = False
303 self.event.set()
304 return (0, '', '')
305 elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose':
306 verbose = command['prefix'] == 'balancer eval-verbose'
307 pools = []
308 if 'option' in command:
309 plan = self.plans.get(command['option'])
310 if not plan:
311 # not a plan, does it look like a pool?
312 osdmap = self.get_osdmap()
313 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
314 option = command['option']
315 if option not in valid_pool_names:
316 return (-errno.EINVAL, '', 'option "%s" not a plan or a pool' % option)
317 pools.append(option)
318 ms = MappingState(osdmap, self.get("pg_dump"), 'pool "%s"' % option)
319 else:
320 pools = plan.pools
321 ms = plan.final_state()
322 else:
323 ms = MappingState(self.get_osdmap(),
324 self.get("pg_dump"),
325 'current cluster')
326 return (0, self.evaluate(ms, pools, verbose=verbose), '')
327 elif command['prefix'] == 'balancer optimize':
328 pools = []
329 if 'pools' in command:
330 pools = command['pools']
331 osdmap = self.get_osdmap()
332 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
333 invalid_pool_names = []
334 for p in pools:
335 if p not in valid_pool_names:
336 invalid_pool_names.append(p)
337 if len(invalid_pool_names):
338 return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
339 plan = self.plan_create(command['plan'], osdmap, pools)
340 r, detail = self.optimize(plan)
341 # remove plan if we are currently unable to find an optimization
342 # or distribution is already perfect
343 if r:
344 self.plan_rm(command['plan'])
345 return (r, '', detail)
346 elif command['prefix'] == 'balancer rm':
347 self.plan_rm(command['plan'])
348 return (0, '', '')
349 elif command['prefix'] == 'balancer reset':
350 self.plans = {}
351 return (0, '', '')
352 elif command['prefix'] == 'balancer ls':
353 return (0, json.dumps([p for p in self.plans], indent=4), '')
354 elif command['prefix'] == 'balancer dump':
355 plan = self.plans.get(command['plan'])
356 if not plan:
357 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
358 return (0, plan.dump(), '')
359 elif command['prefix'] == 'balancer show':
360 plan = self.plans.get(command['plan'])
361 if not plan:
362 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
363 return (0, plan.show(), '')
364 elif command['prefix'] == 'balancer execute':
365 plan = self.plans.get(command['plan'])
366 if not plan:
367 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
368 r, detail = self.execute(plan)
369 self.plan_rm(command['plan'])
370 return (r, '', detail)
371 else:
372 return (-errno.EINVAL, '',
373 "Command not found '{0}'".format(command['prefix']))
374
375 def shutdown(self):
376 self.log.info('Stopping')
377 self.run = False
378 self.event.set()
379
380 def time_in_interval(self, tod, begin, end):
381 if begin <= end:
382 return tod >= begin and tod < end
383 else:
384 return tod >= begin or tod < end
385
386 def serve(self):
387 self.log.info('Starting')
388 while self.run:
389 self.active = self.get_config('active', '') is not ''
390 begin_time = self.get_config('begin_time') or '0000'
391 end_time = self.get_config('end_time') or '2400'
392 timeofday = time.strftime('%H%M', time.localtime())
393 self.log.debug('Waking up [%s, scheduled for %s-%s, now %s]',
394 "active" if self.active else "inactive",
395 begin_time, end_time, timeofday)
396 sleep_interval = float(self.get_config('sleep_interval',
397 default_sleep_interval))
398 if self.active and self.time_in_interval(timeofday, begin_time, end_time):
399 self.log.debug('Running')
400 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
401 plan = self.plan_create(name, self.get_osdmap(), [])
402 r, detail = self.optimize(plan)
403 if r == 0:
404 self.execute(plan)
405 self.plan_rm(name)
406 self.log.debug('Sleeping for %d', sleep_interval)
407 self.event.wait(sleep_interval)
408 self.event.clear()
409
410 def plan_create(self, name, osdmap, pools):
411 plan = Plan(name,
412 MappingState(osdmap,
413 self.get("pg_dump"),
414 'plan %s initial' % name),
415 pools)
416 self.plans[name] = plan
417 return plan
418
419 def plan_rm(self, name):
420 if name in self.plans:
421 del self.plans[name]
422
423 def calc_eval(self, ms, pools):
424 pe = Eval(ms)
425 pool_rule = {}
426 pool_info = {}
427 for p in ms.osdmap_dump.get('pools',[]):
428 if len(pools) and p['pool_name'] not in pools:
429 continue
430 # skip dead or not-yet-ready pools too
431 if p['pool'] not in ms.poolids:
432 continue
433 pe.pool_name[p['pool']] = p['pool_name']
434 pe.pool_id[p['pool_name']] = p['pool']
435 pool_rule[p['pool_name']] = p['crush_rule']
436 pe.pool_roots[p['pool_name']] = []
437 pool_info[p['pool_name']] = p
438 if len(pool_info) == 0:
439 return pe
440 self.log.debug('pool_name %s' % pe.pool_name)
441 self.log.debug('pool_id %s' % pe.pool_id)
442 self.log.debug('pools %s' % pools)
443 self.log.debug('pool_rule %s' % pool_rule)
444
445 osd_weight = { a['osd']: a['weight']
446 for a in ms.osdmap_dump.get('osds',[]) if a['weight'] > 0 }
447
448 # get expected distributions by root
449 actual_by_root = {}
450 rootids = ms.crush.find_takes()
451 roots = []
452 for rootid in rootids:
453 ls = ms.osdmap.get_pools_by_take(rootid)
454 want = []
455 # find out roots associating with pools we are passed in
456 for candidate in ls:
457 if candidate in pe.pool_name:
458 want.append(candidate)
459 if len(want) == 0:
460 continue
461 root = ms.crush.get_item_name(rootid)
462 pe.root_pools[root] = []
463 for poolid in want:
464 pe.pool_roots[pe.pool_name[poolid]].append(root)
465 pe.root_pools[root].append(pe.pool_name[poolid])
466 pe.root_ids[root] = rootid
467 roots.append(root)
468 weight_map = ms.crush.get_take_weight_osd_map(rootid)
469 adjusted_map = {
470 osd: cw * osd_weight[osd]
471 for osd,cw in six.iteritems(weight_map) if osd in osd_weight and cw > 0
472 }
473 sum_w = sum(adjusted_map.values())
474 assert len(adjusted_map) == 0 or sum_w > 0
475 pe.target_by_root[root] = { osd: w / sum_w
476 for osd,w in six.iteritems(adjusted_map) }
477 actual_by_root[root] = {
478 'pgs': {},
479 'objects': {},
480 'bytes': {},
481 }
482 for osd in pe.target_by_root[root].iterkeys():
483 actual_by_root[root]['pgs'][osd] = 0
484 actual_by_root[root]['objects'][osd] = 0
485 actual_by_root[root]['bytes'][osd] = 0
486 pe.total_by_root[root] = {
487 'pgs': 0,
488 'objects': 0,
489 'bytes': 0,
490 }
491 self.log.debug('pool_roots %s' % pe.pool_roots)
492 self.log.debug('root_pools %s' % pe.root_pools)
493 self.log.debug('target_by_root %s' % pe.target_by_root)
494
495 # pool and root actual
496 for pool, pi in six.iteritems(pool_info):
497 poolid = pi['pool']
498 pm = ms.pg_up_by_poolid[poolid]
499 pgs = 0
500 objects = 0
501 bytes = 0
502 pgs_by_osd = {}
503 objects_by_osd = {}
504 bytes_by_osd = {}
505 for root in pe.pool_roots[pool]:
506 for osd in pe.target_by_root[root].iterkeys():
507 pgs_by_osd[osd] = 0
508 objects_by_osd[osd] = 0
509 bytes_by_osd[osd] = 0
510 for pgid, up in six.iteritems(pm):
511 for osd in [int(osd) for osd in up]:
512 if osd == CRUSHMap.ITEM_NONE:
513 continue
514 pgs_by_osd[osd] += 1
515 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
516 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
517 # pick a root to associate this pg instance with.
518 # note that this is imprecise if the roots have
519 # overlapping children.
520 # FIXME: divide bytes by k for EC pools.
521 for root in pe.pool_roots[pool]:
522 if osd in pe.target_by_root[root]:
523 actual_by_root[root]['pgs'][osd] += 1
524 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
525 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
526 pgs += 1
527 objects += ms.pg_stat[pgid]['num_objects']
528 bytes += ms.pg_stat[pgid]['num_bytes']
529 pe.total_by_root[root]['pgs'] += 1
530 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
531 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
532 break
533 pe.count_by_pool[pool] = {
534 'pgs': {
535 k: v
536 for k, v in six.iteritems(pgs_by_osd)
537 },
538 'objects': {
539 k: v
540 for k, v in six.iteritems(objects_by_osd)
541 },
542 'bytes': {
543 k: v
544 for k, v in six.iteritems(bytes_by_osd)
545 },
546 }
547 pe.actual_by_pool[pool] = {
548 'pgs': {
549 k: float(v) / float(max(pgs, 1))
550 for k, v in six.iteritems(pgs_by_osd)
551 },
552 'objects': {
553 k: float(v) / float(max(objects, 1))
554 for k, v in six.iteritems(objects_by_osd)
555 },
556 'bytes': {
557 k: float(v) / float(max(bytes, 1))
558 for k, v in six.iteritems(bytes_by_osd)
559 },
560 }
561 pe.total_by_pool[pool] = {
562 'pgs': pgs,
563 'objects': objects,
564 'bytes': bytes,
565 }
566 for root in pe.total_by_root.iterkeys():
567 pe.count_by_root[root] = {
568 'pgs': {
569 k: float(v)
570 for k, v in six.iteritems(actual_by_root[root]['pgs'])
571 },
572 'objects': {
573 k: float(v)
574 for k, v in six.iteritems(actual_by_root[root]['objects'])
575 },
576 'bytes': {
577 k: float(v)
578 for k, v in six.iteritems(actual_by_root[root]['bytes'])
579 },
580 }
581 pe.actual_by_root[root] = {
582 'pgs': {
583 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
584 for k, v in six.iteritems(actual_by_root[root]['pgs'])
585 },
586 'objects': {
587 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
588 for k, v in six.iteritems(actual_by_root[root]['objects'])
589 },
590 'bytes': {
591 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
592 for k, v in six.iteritems(actual_by_root[root]['bytes'])
593 },
594 }
595 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
596 self.log.debug('actual_by_root %s' % pe.actual_by_root)
597
598 # average and stddev and score
599 pe.stats_by_root = {
600 a: pe.calc_stats(
601 b,
602 pe.target_by_root[a],
603 pe.total_by_root[a]
604 ) for a, b in six.iteritems(pe.count_by_root)
605 }
606 self.log.debug('stats_by_root %s' % pe.stats_by_root)
607
608 # the scores are already normalized
609 pe.score_by_root = {
610 r: {
611 'pgs': pe.stats_by_root[r]['pgs']['score'],
612 'objects': pe.stats_by_root[r]['objects']['score'],
613 'bytes': pe.stats_by_root[r]['bytes']['score'],
614 } for r in pe.total_by_root.keys()
615 }
616 self.log.debug('score_by_root %s' % pe.score_by_root)
617
618 # get the list of score metrics, comma separated
619 metrics = self.get_config('crush_compat_metrics', 'pgs,objects,bytes').split(',')
620
621 # total score is just average of normalized stddevs
622 pe.score = 0.0
623 for r, vs in six.iteritems(pe.score_by_root):
624 for k, v in six.iteritems(vs):
625 if k in metrics:
626 pe.score += v
627 pe.score /= len(metrics) * len(roots)
628 return pe
629
630 def evaluate(self, ms, pools, verbose=False):
631 pe = self.calc_eval(ms, pools)
632 return pe.show(verbose=verbose)
633
634 def optimize(self, plan):
635 self.log.info('Optimize plan %s' % plan.name)
636 plan.mode = self.get_config('mode', default_mode)
637 max_misplaced = float(self.get_config('max_misplaced',
638 default_max_misplaced))
639 self.log.info('Mode %s, max misplaced %f' %
640 (plan.mode, max_misplaced))
641
642 info = self.get('pg_status')
643 unknown = info.get('unknown_pgs_ratio', 0.0)
644 degraded = info.get('degraded_ratio', 0.0)
645 inactive = info.get('inactive_pgs_ratio', 0.0)
646 misplaced = info.get('misplaced_ratio', 0.0)
647 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
648 unknown, degraded, inactive, misplaced)
649 if unknown > 0.0:
650 detail = 'Some PGs (%f) are unknown; try again later' % unknown
651 self.log.info(detail)
652 return -errno.EAGAIN, detail
653 elif degraded > 0.0:
654 detail = 'Some objects (%f) are degraded; try again later' % degraded
655 self.log.info(detail)
656 return -errno.EAGAIN, detail
657 elif inactive > 0.0:
658 detail = 'Some PGs (%f) are inactive; try again later' % inactive
659 self.log.info(detail)
660 return -errno.EAGAIN, detail
661 elif misplaced >= max_misplaced:
662 detail = 'Too many objects (%f > %f) are misplaced; ' \
663 'try again later' % (misplaced, max_misplaced)
664 self.log.info(detail)
665 return -errno.EAGAIN, detail
666 else:
667 if plan.mode == 'upmap':
668 return self.do_upmap(plan)
669 elif plan.mode == 'crush-compat':
670 return self.do_crush_compat(plan)
671 elif plan.mode == 'none':
672 detail = 'Please do "ceph balancer mode" to choose a valid mode first'
673 self.log.info('Idle')
674 return -errno.ENOEXEC, detail
675 else:
676 detail = 'Unrecognized mode %s' % plan.mode
677 self.log.info(detail)
678 return -errno.EINVAL, detail
679 ##
680
681 def do_upmap(self, plan):
682 self.log.info('do_upmap')
683 max_iterations = int(self.get_config('upmap_max_iterations', 10))
684 max_deviation = float(self.get_config('upmap_max_deviation', .01))
685
686 ms = plan.initial
687 if len(plan.pools):
688 pools = plan.pools
689 else: # all
690 pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])]
691 if len(pools) == 0:
692 detail = 'No pools available'
693 self.log.info(detail)
694 return -errno.ENOENT, detail
695 # shuffle pool list so they all get equal (in)attention
696 random.shuffle(pools)
697 self.log.info('pools %s' % pools)
698
699 inc = plan.inc
700 total_did = 0
701 left = max_iterations
702 for pool in pools:
703 did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
704 total_did += did
705 left -= did
706 if left <= 0:
707 break
708 self.log.info('prepared %d/%d changes' % (total_did, max_iterations))
709 if total_did == 0:
710 return -errno.EALREADY, 'Unable to find further optimization,' \
711 'or distribution is already perfect'
712 return 0, ''
713
714 def do_crush_compat(self, plan):
715 self.log.info('do_crush_compat')
716 max_iterations = int(self.get_config('crush_compat_max_iterations', 25))
717 if max_iterations < 1:
718 return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
719 step = float(self.get_config('crush_compat_step', .5))
720 if step <= 0 or step >= 1.0:
721 return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
722 max_misplaced = float(self.get_config('max_misplaced',
723 default_max_misplaced))
724 min_pg_per_osd = 2
725
726 ms = plan.initial
727 osdmap = ms.osdmap
728 crush = osdmap.get_crush()
729 pe = self.calc_eval(ms, plan.pools)
730 min_score_to_optimize = float(self.get_config('min_score', 0))
731 if pe.score <= min_score_to_optimize:
732 if pe.score == 0:
733 detail = 'Distribution is already perfect'
734 else:
735 detail = 'score %f <= min_score %f, will not optimize' \
736 % (pe.score, min_score_to_optimize)
737 self.log.info(detail)
738 return -errno.EALREADY, detail
739
740 # get current osd reweights
741 orig_osd_weight = { a['osd']: a['weight']
742 for a in ms.osdmap_dump.get('osds',[]) }
743 reweighted_osds = [ a for a,b in six.iteritems(orig_osd_weight)
744 if b < 1.0 and b > 0.0 ]
745
746 # get current compat weight-set weights
747 orig_ws = self.get_compat_weight_set_weights(ms)
748 if not orig_ws:
749 return -errno.EAGAIN, 'compat weight-set not available'
750 orig_ws = { a: b for a, b in six.iteritems(orig_ws) if a >= 0 }
751
752 # Make sure roots don't overlap their devices. If so, we
753 # can't proceed.
754 roots = pe.target_by_root.keys()
755 self.log.debug('roots %s', roots)
756 visited = {}
757 overlap = {}
758 root_ids = {}
759 for root, wm in six.iteritems(pe.target_by_root):
760 for osd in wm.iterkeys():
761 if osd in visited:
762 overlap[osd] = 1
763 visited[osd] = 1
764 if len(overlap) > 0:
765 detail = 'Some osds belong to multiple subtrees: %s' % \
766 overlap.keys()
767 self.log.error(detail)
768 return -errno.EOPNOTSUPP, detail
769
770 # rebalance by pgs, objects, or bytes
771 metrics = self.get_config('crush_compat_metrics', 'pgs,objects,bytes').split(',')
772 key = metrics[0] # balancing using the first score metric
773 if key not in ['pgs', 'bytes', 'objects']:
774 self.log.warn("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
775 key = 'pgs'
776
777 # go
778 best_ws = copy.deepcopy(orig_ws)
779 best_ow = copy.deepcopy(orig_osd_weight)
780 best_pe = pe
781 left = max_iterations
782 bad_steps = 0
783 next_ws = copy.deepcopy(best_ws)
784 next_ow = copy.deepcopy(best_ow)
785 while left > 0:
786 # adjust
787 self.log.debug('best_ws %s' % best_ws)
788 random.shuffle(roots)
789 for root in roots:
790 pools = best_pe.root_pools[root]
791 osds = len(best_pe.target_by_root[root])
792 min_pgs = osds * min_pg_per_osd
793 if best_pe.total_by_root[root][key] < min_pgs:
794 self.log.info('Skipping root %s (pools %s), total pgs %d '
795 '< minimum %d (%d per osd)',
796 root, pools,
797 best_pe.total_by_root[root][key],
798 min_pgs, min_pg_per_osd)
799 continue
800 self.log.info('Balancing root %s (pools %s) by %s' %
801 (root, pools, key))
802 target = best_pe.target_by_root[root]
803 actual = best_pe.actual_by_root[root][key]
804 queue = sorted(actual.keys(),
805 key=lambda osd: -abs(target[osd] - actual[osd]))
806 for osd in queue:
807 if orig_osd_weight[osd] == 0:
808 self.log.debug('skipping out osd.%d', osd)
809 else:
810 deviation = target[osd] - actual[osd]
811 if deviation == 0:
812 break
813 self.log.debug('osd.%d deviation %f', osd, deviation)
814 weight = best_ws[osd]
815 ow = orig_osd_weight[osd]
816 if actual[osd] > 0:
817 calc_weight = target[osd] / actual[osd] * weight * ow
818 else:
819 # not enough to go on here... keep orig weight
820 calc_weight = weight / orig_osd_weight[osd]
821 new_weight = weight * (1.0 - step) + calc_weight * step
822 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
823 new_weight)
824 next_ws[osd] = new_weight
825 if ow < 1.0:
826 new_ow = min(1.0, max(step + (1.0 - step) * ow,
827 ow + .005))
828 self.log.debug('Reweight osd.%d reweight %f -> %f',
829 osd, ow, new_ow)
830 next_ow[osd] = new_ow
831
832 # normalize weights under this root
833 root_weight = crush.get_item_weight(pe.root_ids[root])
834 root_sum = sum(b for a,b in six.iteritems(next_ws)
835 if a in target.keys())
836 if root_sum > 0 and root_weight > 0:
837 factor = root_sum / root_weight
838 self.log.debug('normalizing root %s %d, weight %f, '
839 'ws sum %f, factor %f',
840 root, pe.root_ids[root], root_weight,
841 root_sum, factor)
842 for osd in actual.keys():
843 next_ws[osd] = next_ws[osd] / factor
844
845 # recalc
846 plan.compat_ws = copy.deepcopy(next_ws)
847 next_ms = plan.final_state()
848 next_pe = self.calc_eval(next_ms, plan.pools)
849 next_misplaced = next_ms.calc_misplaced_from(ms)
850 self.log.debug('Step result score %f -> %f, misplacing %f',
851 best_pe.score, next_pe.score, next_misplaced)
852
853 if next_misplaced > max_misplaced:
854 if best_pe.score < pe.score:
855 self.log.debug('Step misplaced %f > max %f, stopping',
856 next_misplaced, max_misplaced)
857 break
858 step /= 2.0
859 next_ws = copy.deepcopy(best_ws)
860 next_ow = copy.deepcopy(best_ow)
861 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
862 next_misplaced, max_misplaced, step)
863 else:
864 if next_pe.score > best_pe.score * 1.0001:
865 bad_steps += 1
866 if bad_steps < 5 and random.randint(0, 100) < 70:
867 self.log.debug('Score got worse, taking another step')
868 else:
869 step /= 2.0
870 next_ws = copy.deepcopy(best_ws)
871 next_ow = copy.deepcopy(best_ow)
872 self.log.debug('Score got worse, trying smaller step %f',
873 step)
874 else:
875 bad_steps = 0
876 best_pe = next_pe
877 best_ws = copy.deepcopy(next_ws)
878 best_ow = copy.deepcopy(next_ow)
879 if best_pe.score == 0:
880 break
881 left -= 1
882
883 # allow a small regression if we are phasing out osd weights
884 fudge = 0
885 if next_ow != orig_osd_weight:
886 fudge = .001
887
888 if best_pe.score < pe.score + fudge:
889 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
890 plan.compat_ws = best_ws
891 for osd, w in six.iteritems(best_ow):
892 if w != orig_osd_weight[osd]:
893 self.log.debug('osd.%d reweight %f', osd, w)
894 plan.osd_weights[osd] = w
895 return 0, ''
896 else:
897 self.log.info('Failed to find further optimization, score %f',
898 pe.score)
899 plan.compat_ws = {}
900 return -errno.EDOM, 'Unable to find further optimization, ' \
901 'change balancer mode and retry might help'
902
903 def get_compat_weight_set_weights(self, ms):
904 if '-1' not in ms.crush_dump.get('choose_args', {}):
905 # enable compat weight-set first
906 self.log.debug('ceph osd crush weight-set create-compat')
907 result = CommandResult('')
908 self.send_command(result, 'mon', '', json.dumps({
909 'prefix': 'osd crush weight-set create-compat',
910 'format': 'json',
911 }), '')
912 r, outb, outs = result.wait()
913 if r != 0:
914 self.log.error('Error creating compat weight-set')
915 return
916
917 result = CommandResult('')
918 self.send_command(result, 'mon', '', json.dumps({
919 'prefix': 'osd crush dump',
920 'format': 'json',
921 }), '')
922 r, outb, outs = result.wait()
923 if r != 0:
924 self.log.error('Error dumping crush map')
925 return
926 try:
927 crushmap = json.loads(outb)
928 except:
929 raise RuntimeError('unable to parse crush map')
930 else:
931 crushmap = ms.crush_dump
932
933 raw = crushmap.get('choose_args',{}).get('-1', [])
934 weight_set = {}
935 for b in raw:
936 bucket = None
937 for t in crushmap['buckets']:
938 if t['id'] == b['bucket_id']:
939 bucket = t
940 break
941 if not bucket:
942 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
943 self.log.debug('bucket items %s' % bucket['items'])
944 self.log.debug('weight set %s' % b['weight_set'][0])
945 if len(bucket['items']) != len(b['weight_set'][0]):
946 raise RuntimeError('weight-set size does not match bucket items')
947 for pos in range(len(bucket['items'])):
948 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
949
950 self.log.debug('weight_set weights %s' % weight_set)
951 return weight_set
952
953 def do_crush(self):
954 self.log.info('do_crush (not yet implemented)')
955
956 def do_osd_weight(self):
957 self.log.info('do_osd_weight (not yet implemented)')
958
959 def execute(self, plan):
960 self.log.info('Executing plan %s' % plan.name)
961
962 commands = []
963
964 # compat weight-set
965 if len(plan.compat_ws) and \
966 '-1' not in plan.initial.crush_dump.get('choose_args', {}):
967 self.log.debug('ceph osd crush weight-set create-compat')
968 result = CommandResult('')
969 self.send_command(result, 'mon', '', json.dumps({
970 'prefix': 'osd crush weight-set create-compat',
971 'format': 'json',
972 }), '')
973 r, outb, outs = result.wait()
974 if r != 0:
975 self.log.error('Error creating compat weight-set')
976 return r, outs
977
978 for osd, weight in six.iteritems(plan.compat_ws):
979 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
980 osd, weight)
981 result = CommandResult('')
982 self.send_command(result, 'mon', '', json.dumps({
983 'prefix': 'osd crush weight-set reweight-compat',
984 'format': 'json',
985 'item': 'osd.%d' % osd,
986 'weight': [weight],
987 }), '')
988 commands.append(result)
989
990 # new_weight
991 reweightn = {}
992 for osd, weight in six.iteritems(plan.osd_weights):
993 reweightn[str(osd)] = str(int(weight * float(0x10000)))
994 if len(reweightn):
995 self.log.info('ceph osd reweightn %s', reweightn)
996 result = CommandResult('')
997 self.send_command(result, 'mon', '', json.dumps({
998 'prefix': 'osd reweightn',
999 'format': 'json',
1000 'weights': json.dumps(reweightn),
1001 }), '')
1002 commands.append(result)
1003
1004 # upmap
1005 incdump = plan.inc.dump()
1006 for pgid in incdump.get('old_pg_upmap_items', []):
1007 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
1008 result = CommandResult('foo')
1009 self.send_command(result, 'mon', '', json.dumps({
1010 'prefix': 'osd rm-pg-upmap-items',
1011 'format': 'json',
1012 'pgid': pgid,
1013 }), 'foo')
1014 commands.append(result)
1015
1016 for item in incdump.get('new_pg_upmap_items', []):
1017 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
1018 item['mappings'])
1019 osdlist = []
1020 for m in item['mappings']:
1021 osdlist += [m['from'], m['to']]
1022 result = CommandResult('foo')
1023 self.send_command(result, 'mon', '', json.dumps({
1024 'prefix': 'osd pg-upmap-items',
1025 'format': 'json',
1026 'pgid': item['pgid'],
1027 'id': osdlist,
1028 }), 'foo')
1029 commands.append(result)
1030
1031 # wait for commands
1032 self.log.debug('commands %s' % commands)
1033 for result in commands:
1034 r, outb, outs = result.wait()
1035 if r != 0:
1036 self.log.error('execute error: r = %d, detail = %s' % (r, outs))
1037 return r, outs
1038 self.log.debug('done')
1039 return 0, ''