]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
update sources to v12.2.5
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
1
2 """
3 Balance PG distribution across OSDs.
4 """
5
6 import copy
7 import errno
8 import json
9 import math
10 import random
11 import time
12 from mgr_module import MgrModule, CommandResult
13 from threading import Event
14 from mgr_module import CRUSHMap
15
16 # available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight'
17 default_mode = 'none'
18 default_sleep_interval = 60 # seconds
19 default_max_misplaced = .05 # max ratio of pgs replaced at a time
20
21 TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
22
23 class MappingState:
24 def __init__(self, osdmap, pg_dump, desc=''):
25 self.desc = desc
26 self.osdmap = osdmap
27 self.osdmap_dump = self.osdmap.dump()
28 self.crush = osdmap.get_crush()
29 self.crush_dump = self.crush.dump()
30 self.pg_dump = pg_dump
31 self.pg_stat = {
32 i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', [])
33 }
34 osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
35 pg_poolids = [p['poolid'] for p in pg_dump.get('pool_stats', [])]
36 self.poolids = set(osd_poolids) & set(pg_poolids)
37 self.pg_up = {}
38 self.pg_up_by_poolid = {}
39 for poolid in self.poolids:
40 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
41 for a,b in self.pg_up_by_poolid[poolid].iteritems():
42 self.pg_up[a] = b
43
44 def calc_misplaced_from(self, other_ms):
45 num = len(other_ms.pg_up)
46 misplaced = 0
47 for pgid, before in other_ms.pg_up.iteritems():
48 if before != self.pg_up.get(pgid, []):
49 misplaced += 1
50 if num > 0:
51 return float(misplaced) / float(num)
52 return 0.0
53
54 class Plan:
55 def __init__(self, name, ms, pools):
56 self.mode = 'unknown'
57 self.name = name
58 self.initial = ms
59 self.pools = pools
60
61 self.osd_weights = {}
62 self.compat_ws = {}
63 self.inc = ms.osdmap.new_incremental()
64
65 def final_state(self):
66 self.inc.set_osd_reweights(self.osd_weights)
67 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
68 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
69 self.initial.pg_dump,
70 'plan %s final' % self.name)
71
72 def dump(self):
73 return json.dumps(self.inc.dump(), indent=4)
74
75 def show(self):
76 ls = []
77 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
78 ls.append('# starting crush version %d' %
79 self.initial.osdmap.get_crush_version())
80 ls.append('# mode %s' % self.mode)
81 if len(self.compat_ws) and \
82 '-1' not in self.initial.crush_dump.get('choose_args', {}):
83 ls.append('ceph osd crush weight-set create-compat')
84 for osd, weight in self.compat_ws.iteritems():
85 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
86 (osd, weight))
87 for osd, weight in self.osd_weights.iteritems():
88 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
89 incdump = self.inc.dump()
90 for pgid in incdump.get('old_pg_upmap_items', []):
91 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
92 for item in incdump.get('new_pg_upmap_items', []):
93 osdlist = []
94 for m in item['mappings']:
95 osdlist += [m['from'], m['to']]
96 ls.append('ceph osd pg-upmap-items %s %s' %
97 (item['pgid'], ' '.join([str(a) for a in osdlist])))
98 return '\n'.join(ls)
99
100
101 class Eval:
102 def __init__(self, ms):
103 self.ms = ms
104 self.root_ids = {} # root name -> id
105 self.pool_name = {} # pool id -> pool name
106 self.pool_id = {} # pool name -> id
107 self.pool_roots = {} # pool name -> root name
108 self.root_pools = {} # root name -> pools
109 self.target_by_root = {} # root name -> target weight map
110 self.count_by_pool = {}
111 self.count_by_root = {}
112 self.actual_by_pool = {} # pool -> by_* -> actual weight map
113 self.actual_by_root = {} # pool -> by_* -> actual weight map
114 self.total_by_pool = {} # pool -> by_* -> total
115 self.total_by_root = {} # root -> by_* -> total
116 self.stats_by_pool = {} # pool -> by_* -> stddev or avg -> value
117 self.stats_by_root = {} # root -> by_* -> stddev or avg -> value
118
119 self.score_by_pool = {}
120 self.score_by_root = {}
121
122 self.score = 0.0
123
124 def show(self, verbose=False):
125 if verbose:
126 r = self.ms.desc + '\n'
127 r += 'target_by_root %s\n' % self.target_by_root
128 r += 'actual_by_pool %s\n' % self.actual_by_pool
129 r += 'actual_by_root %s\n' % self.actual_by_root
130 r += 'count_by_pool %s\n' % self.count_by_pool
131 r += 'count_by_root %s\n' % self.count_by_root
132 r += 'total_by_pool %s\n' % self.total_by_pool
133 r += 'total_by_root %s\n' % self.total_by_root
134 r += 'stats_by_root %s\n' % self.stats_by_root
135 r += 'score_by_pool %s\n' % self.score_by_pool
136 r += 'score_by_root %s\n' % self.score_by_root
137 else:
138 r = self.ms.desc + ' '
139 r += 'score %f (lower is better)\n' % self.score
140 return r
141
142 def calc_stats(self, count, target, total):
143 num = max(len(target), 1)
144 r = {}
145 for t in ('pgs', 'objects', 'bytes'):
146 if total[t] == 0:
147 r[t] = {
148 'avg': 0,
149 'stddev': 0,
150 'sum_weight': 0,
151 'score': 0,
152 }
153 continue
154
155 avg = float(total[t]) / float(num)
156 dev = 0.0
157
158 # score is a measure of how uneven the data distribution is.
159 # score lies between [0, 1), 0 means perfect distribution.
160 score = 0.0
161 sum_weight = 0.0
162
163 for k, v in count[t].iteritems():
164 # adjust/normalize by weight
165 if target[k]:
166 adjusted = float(v) / target[k] / float(num)
167 else:
168 adjusted = 0.0
169
170 # Overweighted devices and their weights are factors to calculate reweight_urgency.
171 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
172 # situation than one 10% overfilled with 5 2% underfilled devices
173 if adjusted > avg:
174 '''
175 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
176 x = (adjusted - avg)/avg.
177 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
178 To bring range of F(x) in range [0, 1), we need to make the above modification.
179
180 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
181 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
182 2. A larger value of x, should imply more urgency to reweight.
183 3. Also, the difference between F(x) when x is large, should be minimal.
184 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
185
186 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
187
188 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
189 '''
190 score += target[k] * (math.erf(((adjusted - avg)/avg) / math.sqrt(2.0)))
191 sum_weight += target[k]
192 dev += (avg - adjusted) * (avg - adjusted)
193 stddev = math.sqrt(dev / float(max(num - 1, 1)))
194 score = score / max(sum_weight, 1)
195 r[t] = {
196 'avg': avg,
197 'stddev': stddev,
198 'sum_weight': sum_weight,
199 'score': score,
200 }
201 return r
202
203 class Module(MgrModule):
204 COMMANDS = [
205 {
206 "cmd": "balancer status",
207 "desc": "Show balancer status",
208 "perm": "r",
209 },
210 {
211 "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
212 "desc": "Set balancer mode",
213 "perm": "rw",
214 },
215 {
216 "cmd": "balancer on",
217 "desc": "Enable automatic balancing",
218 "perm": "rw",
219 },
220 {
221 "cmd": "balancer off",
222 "desc": "Disable automatic balancing",
223 "perm": "rw",
224 },
225 {
226 "cmd": "balancer eval name=option,type=CephString,req=false",
227 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan",
228 "perm": "r",
229 },
230 {
231 "cmd": "balancer eval-verbose name=option,type=CephString,req=false",
232 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely)",
233 "perm": "r",
234 },
235 {
236 "cmd": "balancer optimize name=plan,type=CephString name=pools,type=CephString,n=N,req=false",
237 "desc": "Run optimizer to create a new plan",
238 "perm": "rw",
239 },
240 {
241 "cmd": "balancer show name=plan,type=CephString",
242 "desc": "Show details of an optimization plan",
243 "perm": "r",
244 },
245 {
246 "cmd": "balancer rm name=plan,type=CephString",
247 "desc": "Discard an optimization plan",
248 "perm": "rw",
249 },
250 {
251 "cmd": "balancer reset",
252 "desc": "Discard all optimization plans",
253 "perm": "rw",
254 },
255 {
256 "cmd": "balancer dump name=plan,type=CephString",
257 "desc": "Show an optimization plan",
258 "perm": "r",
259 },
260 {
261 "cmd": "balancer execute name=plan,type=CephString",
262 "desc": "Execute an optimization plan",
263 "perm": "r",
264 },
265 ]
266 active = False
267 run = True
268 plans = {}
269 mode = ''
270
271 def __init__(self, *args, **kwargs):
272 super(Module, self).__init__(*args, **kwargs)
273 self.event = Event()
274
275 def handle_command(self, command):
276 self.log.warn("Handling command: '%s'" % str(command))
277 if command['prefix'] == 'balancer status':
278 s = {
279 'plans': self.plans.keys(),
280 'active': self.active,
281 'mode': self.get_config('mode', default_mode),
282 }
283 return (0, json.dumps(s, indent=4), '')
284 elif command['prefix'] == 'balancer mode':
285 self.set_config('mode', command['mode'])
286 return (0, '', '')
287 elif command['prefix'] == 'balancer on':
288 if not self.active:
289 self.set_config('active', '1')
290 self.active = True
291 self.event.set()
292 return (0, '', '')
293 elif command['prefix'] == 'balancer off':
294 if self.active:
295 self.set_config('active', '')
296 self.active = False
297 self.event.set()
298 return (0, '', '')
299 elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose':
300 verbose = command['prefix'] == 'balancer eval-verbose'
301 pools = []
302 if 'option' in command:
303 plan = self.plans.get(command['option'])
304 if not plan:
305 # not a plan, does it look like a pool?
306 osdmap = self.get_osdmap()
307 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
308 option = command['option']
309 if option not in valid_pool_names:
310 return (-errno.EINVAL, '', 'option "%s" not a plan or a pool' % option)
311 pools.append(option)
312 ms = MappingState(osdmap, self.get("pg_dump"), 'pool "%s"' % option)
313 else:
314 pools = plan.pools
315 ms = plan.final_state()
316 else:
317 ms = MappingState(self.get_osdmap(),
318 self.get("pg_dump"),
319 'current cluster')
320 return (0, self.evaluate(ms, pools, verbose=verbose), '')
321 elif command['prefix'] == 'balancer optimize':
322 pools = []
323 if 'pools' in command:
324 pools = command['pools']
325 osdmap = self.get_osdmap()
326 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
327 invalid_pool_names = []
328 for p in pools:
329 if p not in valid_pool_names:
330 invalid_pool_names.append(p)
331 if len(invalid_pool_names):
332 return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
333 plan = self.plan_create(command['plan'], osdmap, pools)
334 r, detail = self.optimize(plan)
335 # remove plan if we are currently unable to find an optimization
336 # or distribution is already perfect
337 if r:
338 self.plan_rm(command['plan'])
339 return (r, '', detail)
340 elif command['prefix'] == 'balancer rm':
341 self.plan_rm(command['plan'])
342 return (0, '', '')
343 elif command['prefix'] == 'balancer reset':
344 self.plans = {}
345 return (0, '', '')
346 elif command['prefix'] == 'balancer dump':
347 plan = self.plans.get(command['plan'])
348 if not plan:
349 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
350 return (0, plan.dump(), '')
351 elif command['prefix'] == 'balancer show':
352 plan = self.plans.get(command['plan'])
353 if not plan:
354 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
355 return (0, plan.show(), '')
356 elif command['prefix'] == 'balancer execute':
357 plan = self.plans.get(command['plan'])
358 if not plan:
359 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
360 r, detail = self.execute(plan)
361 self.plan_rm(command['plan'])
362 return (r, '', detail)
363 else:
364 return (-errno.EINVAL, '',
365 "Command not found '{0}'".format(command['prefix']))
366
367 def shutdown(self):
368 self.log.info('Stopping')
369 self.run = False
370 self.event.set()
371
372 def time_in_interval(self, tod, begin, end):
373 if begin <= end:
374 return tod >= begin and tod < end
375 else:
376 return tod >= begin or tod < end
377
378 def serve(self):
379 self.log.info('Starting')
380 while self.run:
381 self.active = self.get_config('active', '') is not ''
382 begin_time = self.get_config('begin_time') or '0000'
383 end_time = self.get_config('end_time') or '2400'
384 timeofday = time.strftime('%H%M', time.localtime())
385 self.log.debug('Waking up [%s, scheduled for %s-%s, now %s]',
386 "active" if self.active else "inactive",
387 begin_time, end_time, timeofday)
388 sleep_interval = float(self.get_config('sleep_interval',
389 default_sleep_interval))
390 if self.active and self.time_in_interval(timeofday, begin_time, end_time):
391 self.log.debug('Running')
392 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
393 plan = self.plan_create(name, self.get_osdmap(), [])
394 r, detail = self.optimize(plan)
395 if r == 0:
396 self.execute(plan)
397 self.plan_rm(name)
398 self.log.debug('Sleeping for %d', sleep_interval)
399 self.event.wait(sleep_interval)
400 self.event.clear()
401
402 def plan_create(self, name, osdmap, pools):
403 plan = Plan(name,
404 MappingState(osdmap,
405 self.get("pg_dump"),
406 'plan %s initial' % name),
407 pools)
408 self.plans[name] = plan
409 return plan
410
411 def plan_rm(self, name):
412 if name in self.plans:
413 del self.plans[name]
414
415 def calc_eval(self, ms, pools):
416 pe = Eval(ms)
417 pool_rule = {}
418 pool_info = {}
419 for p in ms.osdmap_dump.get('pools',[]):
420 if len(pools) and p['pool_name'] not in pools:
421 continue
422 # skip dead or not-yet-ready pools too
423 if p['pool'] not in ms.poolids:
424 continue
425 pe.pool_name[p['pool']] = p['pool_name']
426 pe.pool_id[p['pool_name']] = p['pool']
427 pool_rule[p['pool_name']] = p['crush_rule']
428 pe.pool_roots[p['pool_name']] = []
429 pool_info[p['pool_name']] = p
430 if len(pool_info) == 0:
431 return pe
432 self.log.debug('pool_name %s' % pe.pool_name)
433 self.log.debug('pool_id %s' % pe.pool_id)
434 self.log.debug('pools %s' % pools)
435 self.log.debug('pool_rule %s' % pool_rule)
436
437 osd_weight = { a['osd']: a['weight']
438 for a in ms.osdmap_dump.get('osds',[]) if a['weight'] > 0 }
439
440 # get expected distributions by root
441 actual_by_root = {}
442 rootids = ms.crush.find_takes()
443 roots = []
444 for rootid in rootids:
445 ls = ms.osdmap.get_pools_by_take(rootid)
446 want = []
447 # find out roots associating with pools we are passed in
448 for candidate in ls:
449 if candidate in pe.pool_name:
450 want.append(candidate)
451 if len(want) == 0:
452 continue
453 root = ms.crush.get_item_name(rootid)
454 pe.root_pools[root] = []
455 for poolid in want:
456 pe.pool_roots[pe.pool_name[poolid]].append(root)
457 pe.root_pools[root].append(pe.pool_name[poolid])
458 pe.root_ids[root] = rootid
459 roots.append(root)
460 weight_map = ms.crush.get_take_weight_osd_map(rootid)
461 adjusted_map = {
462 osd: cw * osd_weight[osd]
463 for osd,cw in weight_map.iteritems() if osd in osd_weight and cw > 0
464 }
465 sum_w = sum(adjusted_map.values())
466 assert len(adjusted_map) == 0 or sum_w > 0
467 pe.target_by_root[root] = { osd: w / sum_w
468 for osd,w in adjusted_map.iteritems() }
469 actual_by_root[root] = {
470 'pgs': {},
471 'objects': {},
472 'bytes': {},
473 }
474 for osd in pe.target_by_root[root].iterkeys():
475 actual_by_root[root]['pgs'][osd] = 0
476 actual_by_root[root]['objects'][osd] = 0
477 actual_by_root[root]['bytes'][osd] = 0
478 pe.total_by_root[root] = {
479 'pgs': 0,
480 'objects': 0,
481 'bytes': 0,
482 }
483 self.log.debug('pool_roots %s' % pe.pool_roots)
484 self.log.debug('root_pools %s' % pe.root_pools)
485 self.log.debug('target_by_root %s' % pe.target_by_root)
486
487 # pool and root actual
488 for pool, pi in pool_info.iteritems():
489 poolid = pi['pool']
490 pm = ms.pg_up_by_poolid[poolid]
491 pgs = 0
492 objects = 0
493 bytes = 0
494 pgs_by_osd = {}
495 objects_by_osd = {}
496 bytes_by_osd = {}
497 for root in pe.pool_roots[pool]:
498 for osd in pe.target_by_root[root].iterkeys():
499 pgs_by_osd[osd] = 0
500 objects_by_osd[osd] = 0
501 bytes_by_osd[osd] = 0
502 for pgid, up in pm.iteritems():
503 for osd in [int(osd) for osd in up]:
504 if osd == CRUSHMap.ITEM_NONE:
505 continue
506 pgs_by_osd[osd] += 1
507 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
508 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
509 # pick a root to associate this pg instance with.
510 # note that this is imprecise if the roots have
511 # overlapping children.
512 # FIXME: divide bytes by k for EC pools.
513 for root in pe.pool_roots[pool]:
514 if osd in pe.target_by_root[root]:
515 actual_by_root[root]['pgs'][osd] += 1
516 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
517 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
518 pgs += 1
519 objects += ms.pg_stat[pgid]['num_objects']
520 bytes += ms.pg_stat[pgid]['num_bytes']
521 pe.total_by_root[root]['pgs'] += 1
522 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
523 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
524 break
525 pe.count_by_pool[pool] = {
526 'pgs': {
527 k: v
528 for k, v in pgs_by_osd.iteritems()
529 },
530 'objects': {
531 k: v
532 for k, v in objects_by_osd.iteritems()
533 },
534 'bytes': {
535 k: v
536 for k, v in bytes_by_osd.iteritems()
537 },
538 }
539 pe.actual_by_pool[pool] = {
540 'pgs': {
541 k: float(v) / float(max(pgs, 1))
542 for k, v in pgs_by_osd.iteritems()
543 },
544 'objects': {
545 k: float(v) / float(max(objects, 1))
546 for k, v in objects_by_osd.iteritems()
547 },
548 'bytes': {
549 k: float(v) / float(max(bytes, 1))
550 for k, v in bytes_by_osd.iteritems()
551 },
552 }
553 pe.total_by_pool[pool] = {
554 'pgs': pgs,
555 'objects': objects,
556 'bytes': bytes,
557 }
558 for root in pe.total_by_root.iterkeys():
559 pe.count_by_root[root] = {
560 'pgs': {
561 k: float(v)
562 for k, v in actual_by_root[root]['pgs'].iteritems()
563 },
564 'objects': {
565 k: float(v)
566 for k, v in actual_by_root[root]['objects'].iteritems()
567 },
568 'bytes': {
569 k: float(v)
570 for k, v in actual_by_root[root]['bytes'].iteritems()
571 },
572 }
573 pe.actual_by_root[root] = {
574 'pgs': {
575 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
576 for k, v in actual_by_root[root]['pgs'].iteritems()
577 },
578 'objects': {
579 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
580 for k, v in actual_by_root[root]['objects'].iteritems()
581 },
582 'bytes': {
583 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
584 for k, v in actual_by_root[root]['bytes'].iteritems()
585 },
586 }
587 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
588 self.log.debug('actual_by_root %s' % pe.actual_by_root)
589
590 # average and stddev and score
591 pe.stats_by_root = {
592 a: pe.calc_stats(
593 b,
594 pe.target_by_root[a],
595 pe.total_by_root[a]
596 ) for a, b in pe.count_by_root.iteritems()
597 }
598 self.log.debug('stats_by_root %s' % pe.stats_by_root)
599
600 # the scores are already normalized
601 pe.score_by_root = {
602 r: {
603 'pgs': pe.stats_by_root[r]['pgs']['score'],
604 'objects': pe.stats_by_root[r]['objects']['score'],
605 'bytes': pe.stats_by_root[r]['bytes']['score'],
606 } for r in pe.total_by_root.keys()
607 }
608 self.log.debug('score_by_root %s' % pe.score_by_root)
609
610 # total score is just average of normalized stddevs
611 pe.score = 0.0
612 for r, vs in pe.score_by_root.iteritems():
613 for k, v in vs.iteritems():
614 pe.score += v
615 pe.score /= 3 * len(roots)
616 return pe
617
618 def evaluate(self, ms, pools, verbose=False):
619 pe = self.calc_eval(ms, pools)
620 return pe.show(verbose=verbose)
621
622 def optimize(self, plan):
623 self.log.info('Optimize plan %s' % plan.name)
624 plan.mode = self.get_config('mode', default_mode)
625 max_misplaced = float(self.get_config('max_misplaced',
626 default_max_misplaced))
627 self.log.info('Mode %s, max misplaced %f' %
628 (plan.mode, max_misplaced))
629
630 info = self.get('pg_status')
631 unknown = info.get('unknown_pgs_ratio', 0.0)
632 degraded = info.get('degraded_ratio', 0.0)
633 inactive = info.get('inactive_pgs_ratio', 0.0)
634 misplaced = info.get('misplaced_ratio', 0.0)
635 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
636 unknown, degraded, inactive, misplaced)
637 if unknown > 0.0:
638 detail = 'Some PGs (%f) are unknown; try again later' % unknown
639 self.log.info(detail)
640 return -errno.EAGAIN, detail
641 elif degraded > 0.0:
642 detail = 'Some objects (%f) are degraded; try again later' % degraded
643 self.log.info(detail)
644 return -errno.EAGAIN, detail
645 elif inactive > 0.0:
646 detail = 'Some PGs (%f) are inactive; try again later' % inactive
647 self.log.info(detail)
648 return -errno.EAGAIN, detail
649 elif misplaced >= max_misplaced:
650 detail = 'Too many objects (%f > %f) are misplaced; ' \
651 'try again later' % (misplaced, max_misplaced)
652 self.log.info(detail)
653 return -errno.EAGAIN, detail
654 else:
655 if plan.mode == 'upmap':
656 return self.do_upmap(plan)
657 elif plan.mode == 'crush-compat':
658 return self.do_crush_compat(plan)
659 elif plan.mode == 'none':
660 detail = 'Please do "ceph balancer mode" to choose a valid mode first'
661 self.log.info('Idle')
662 return -errno.ENOEXEC, detail
663 else:
664 detail = 'Unrecognized mode %s' % plan.mode
665 self.log.info(detail)
666 return -errno.EINVAL, detail
667 ##
668
669 def do_upmap(self, plan):
670 self.log.info('do_upmap')
671 max_iterations = int(self.get_config('upmap_max_iterations', 10))
672 max_deviation = float(self.get_config('upmap_max_deviation', .01))
673
674 ms = plan.initial
675 if len(plan.pools):
676 pools = plan.pools
677 else: # all
678 pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])]
679 if len(pools) == 0:
680 detail = 'No pools available'
681 self.log.info(detail)
682 return -errno.ENOENT, detail
683 # shuffle pool list so they all get equal (in)attention
684 random.shuffle(pools)
685 self.log.info('pools %s' % pools)
686
687 inc = plan.inc
688 total_did = 0
689 left = max_iterations
690 for pool in pools:
691 did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
692 total_did += did
693 left -= did
694 if left <= 0:
695 break
696 self.log.info('prepared %d/%d changes' % (total_did, max_iterations))
697 if total_did == 0:
698 return -errno.EALREADY, 'Unable to find further optimization,' \
699 'or distribution is already perfect'
700 return 0, ''
701
702 def do_crush_compat(self, plan):
703 self.log.info('do_crush_compat')
704 max_iterations = int(self.get_config('crush_compat_max_iterations', 25))
705 if max_iterations < 1:
706 return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
707 step = float(self.get_config('crush_compat_step', .5))
708 if step <= 0 or step >= 1.0:
709 return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
710 max_misplaced = float(self.get_config('max_misplaced',
711 default_max_misplaced))
712 min_pg_per_osd = 2
713
714 ms = plan.initial
715 osdmap = ms.osdmap
716 crush = osdmap.get_crush()
717 pe = self.calc_eval(ms, plan.pools)
718 min_score_to_optimize = float(self.get_config('min_score', 0))
719 if pe.score <= min_score_to_optimize:
720 if pe.score == 0:
721 detail = 'Distribution is already perfect'
722 else:
723 detail = 'score %f <= min_score %f, will not optimize' \
724 % (pe.score, min_score_to_optimize)
725 self.log.info(detail)
726 return -errno.EALREADY, detail
727
728 # get current osd reweights
729 orig_osd_weight = { a['osd']: a['weight']
730 for a in ms.osdmap_dump.get('osds',[]) }
731 reweighted_osds = [ a for a,b in orig_osd_weight.iteritems()
732 if b < 1.0 and b > 0.0 ]
733
734 # get current compat weight-set weights
735 orig_ws = self.get_compat_weight_set_weights(ms)
736 if not orig_ws:
737 return -errno.EAGAIN, 'compat weight-set not available'
738 orig_ws = { a: b for a, b in orig_ws.iteritems() if a >= 0 }
739
740 # Make sure roots don't overlap their devices. If so, we
741 # can't proceed.
742 roots = pe.target_by_root.keys()
743 self.log.debug('roots %s', roots)
744 visited = {}
745 overlap = {}
746 root_ids = {}
747 for root, wm in pe.target_by_root.iteritems():
748 for osd in wm.iterkeys():
749 if osd in visited:
750 overlap[osd] = 1
751 visited[osd] = 1
752 if len(overlap) > 0:
753 detail = 'Some osds belong to multiple subtrees: %s' % \
754 overlap.keys()
755 self.log.error(detail)
756 return -errno.EOPNOTSUPP, detail
757
758 key = 'pgs' # pgs objects or bytes
759
760 # go
761 best_ws = copy.deepcopy(orig_ws)
762 best_ow = copy.deepcopy(orig_osd_weight)
763 best_pe = pe
764 left = max_iterations
765 bad_steps = 0
766 next_ws = copy.deepcopy(best_ws)
767 next_ow = copy.deepcopy(best_ow)
768 while left > 0:
769 # adjust
770 self.log.debug('best_ws %s' % best_ws)
771 random.shuffle(roots)
772 for root in roots:
773 pools = best_pe.root_pools[root]
774 osds = len(best_pe.target_by_root[root])
775 min_pgs = osds * min_pg_per_osd
776 if best_pe.total_by_root[root][key] < min_pgs:
777 self.log.info('Skipping root %s (pools %s), total pgs %d '
778 '< minimum %d (%d per osd)',
779 root, pools,
780 best_pe.total_by_root[root][key],
781 min_pgs, min_pg_per_osd)
782 continue
783 self.log.info('Balancing root %s (pools %s) by %s' %
784 (root, pools, key))
785 target = best_pe.target_by_root[root]
786 actual = best_pe.actual_by_root[root][key]
787 queue = sorted(actual.keys(),
788 key=lambda osd: -abs(target[osd] - actual[osd]))
789 for osd in queue:
790 if orig_osd_weight[osd] == 0:
791 self.log.debug('skipping out osd.%d', osd)
792 else:
793 deviation = target[osd] - actual[osd]
794 if deviation == 0:
795 break
796 self.log.debug('osd.%d deviation %f', osd, deviation)
797 weight = best_ws[osd]
798 ow = orig_osd_weight[osd]
799 if actual[osd] > 0:
800 calc_weight = target[osd] / actual[osd] * weight * ow
801 else:
802 # not enough to go on here... keep orig weight
803 calc_weight = weight / orig_osd_weight[osd]
804 new_weight = weight * (1.0 - step) + calc_weight * step
805 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
806 new_weight)
807 next_ws[osd] = new_weight
808 if ow < 1.0:
809 new_ow = min(1.0, max(step + (1.0 - step) * ow,
810 ow + .005))
811 self.log.debug('Reweight osd.%d reweight %f -> %f',
812 osd, ow, new_ow)
813 next_ow[osd] = new_ow
814
815 # normalize weights under this root
816 root_weight = crush.get_item_weight(pe.root_ids[root])
817 root_sum = sum(b for a,b in next_ws.iteritems()
818 if a in target.keys())
819 if root_sum > 0 and root_weight > 0:
820 factor = root_sum / root_weight
821 self.log.debug('normalizing root %s %d, weight %f, '
822 'ws sum %f, factor %f',
823 root, pe.root_ids[root], root_weight,
824 root_sum, factor)
825 for osd in actual.keys():
826 next_ws[osd] = next_ws[osd] / factor
827
828 # recalc
829 plan.compat_ws = copy.deepcopy(next_ws)
830 next_ms = plan.final_state()
831 next_pe = self.calc_eval(next_ms, plan.pools)
832 next_misplaced = next_ms.calc_misplaced_from(ms)
833 self.log.debug('Step result score %f -> %f, misplacing %f',
834 best_pe.score, next_pe.score, next_misplaced)
835
836 if next_misplaced > max_misplaced:
837 if best_pe.score < pe.score:
838 self.log.debug('Step misplaced %f > max %f, stopping',
839 next_misplaced, max_misplaced)
840 break
841 step /= 2.0
842 next_ws = copy.deepcopy(best_ws)
843 next_ow = copy.deepcopy(best_ow)
844 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
845 next_misplaced, max_misplaced, step)
846 else:
847 if next_pe.score > best_pe.score * 1.0001:
848 bad_steps += 1
849 if bad_steps < 5 and random.randint(0, 100) < 70:
850 self.log.debug('Score got worse, taking another step')
851 else:
852 step /= 2.0
853 next_ws = copy.deepcopy(best_ws)
854 next_ow = copy.deepcopy(best_ow)
855 self.log.debug('Score got worse, trying smaller step %f',
856 step)
857 else:
858 bad_steps = 0
859 best_pe = next_pe
860 best_ws = next_ws
861 best_ow = next_ow
862 if best_pe.score == 0:
863 break
864 left -= 1
865
866 # allow a small regression if we are phasing out osd weights
867 fudge = 0
868 if next_ow != orig_osd_weight:
869 fudge = .001
870
871 if best_pe.score < pe.score + fudge:
872 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
873 plan.compat_ws = best_ws
874 for osd, w in best_ow.iteritems():
875 if w != orig_osd_weight[osd]:
876 self.log.debug('osd.%d reweight %f', osd, w)
877 plan.osd_weights[osd] = w
878 return 0, ''
879 else:
880 self.log.info('Failed to find further optimization, score %f',
881 pe.score)
882 plan.compat_ws = {}
883 return -errno.EDOM, 'Unable to find further optimization, ' \
884 'change balancer mode and retry might help'
885
886 def get_compat_weight_set_weights(self, ms):
887 if '-1' not in ms.crush_dump.get('choose_args', {}):
888 # enable compat weight-set first
889 self.log.debug('ceph osd crush weight-set create-compat')
890 result = CommandResult('')
891 self.send_command(result, 'mon', '', json.dumps({
892 'prefix': 'osd crush weight-set create-compat',
893 'format': 'json',
894 }), '')
895 r, outb, outs = result.wait()
896 if r != 0:
897 self.log.error('Error creating compat weight-set')
898 return
899
900 result = CommandResult('')
901 self.send_command(result, 'mon', '', json.dumps({
902 'prefix': 'osd crush dump',
903 'format': 'json',
904 }), '')
905 r, outb, outs = result.wait()
906 if r != 0:
907 self.log.error('Error dumping crush map')
908 return
909 try:
910 crushmap = json.loads(outb)
911 except:
912 raise RuntimeError('unable to parse crush map')
913 else:
914 crushmap = ms.crush_dump
915
916 raw = crushmap.get('choose_args',{}).get('-1', [])
917 weight_set = {}
918 for b in raw:
919 bucket = None
920 for t in crushmap['buckets']:
921 if t['id'] == b['bucket_id']:
922 bucket = t
923 break
924 if not bucket:
925 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
926 self.log.debug('bucket items %s' % bucket['items'])
927 self.log.debug('weight set %s' % b['weight_set'][0])
928 if len(bucket['items']) != len(b['weight_set'][0]):
929 raise RuntimeError('weight-set size does not match bucket items')
930 for pos in range(len(bucket['items'])):
931 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
932
933 self.log.debug('weight_set weights %s' % weight_set)
934 return weight_set
935
936 def do_crush(self):
937 self.log.info('do_crush (not yet implemented)')
938
939 def do_osd_weight(self):
940 self.log.info('do_osd_weight (not yet implemented)')
941
942 def execute(self, plan):
943 self.log.info('Executing plan %s' % plan.name)
944
945 commands = []
946
947 # compat weight-set
948 if len(plan.compat_ws) and \
949 '-1' not in plan.initial.crush_dump.get('choose_args', {}):
950 self.log.debug('ceph osd crush weight-set create-compat')
951 result = CommandResult('')
952 self.send_command(result, 'mon', '', json.dumps({
953 'prefix': 'osd crush weight-set create-compat',
954 'format': 'json',
955 }), '')
956 r, outb, outs = result.wait()
957 if r != 0:
958 self.log.error('Error creating compat weight-set')
959 return r, outs
960
961 for osd, weight in plan.compat_ws.iteritems():
962 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
963 osd, weight)
964 result = CommandResult('')
965 self.send_command(result, 'mon', '', json.dumps({
966 'prefix': 'osd crush weight-set reweight-compat',
967 'format': 'json',
968 'item': 'osd.%d' % osd,
969 'weight': [weight],
970 }), '')
971 commands.append(result)
972
973 # new_weight
974 reweightn = {}
975 for osd, weight in plan.osd_weights.iteritems():
976 reweightn[str(osd)] = str(int(weight * float(0x10000)))
977 if len(reweightn):
978 self.log.info('ceph osd reweightn %s', reweightn)
979 result = CommandResult('')
980 self.send_command(result, 'mon', '', json.dumps({
981 'prefix': 'osd reweightn',
982 'format': 'json',
983 'weights': json.dumps(reweightn),
984 }), '')
985 commands.append(result)
986
987 # upmap
988 incdump = plan.inc.dump()
989 for pgid in incdump.get('old_pg_upmap_items', []):
990 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
991 result = CommandResult('foo')
992 self.send_command(result, 'mon', '', json.dumps({
993 'prefix': 'osd rm-pg-upmap-items',
994 'format': 'json',
995 'pgid': pgid,
996 }), 'foo')
997 commands.append(result)
998
999 for item in incdump.get('new_pg_upmap_items', []):
1000 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
1001 item['mappings'])
1002 osdlist = []
1003 for m in item['mappings']:
1004 osdlist += [m['from'], m['to']]
1005 result = CommandResult('foo')
1006 self.send_command(result, 'mon', '', json.dumps({
1007 'prefix': 'osd pg-upmap-items',
1008 'format': 'json',
1009 'pgid': item['pgid'],
1010 'id': osdlist,
1011 }), 'foo')
1012 commands.append(result)
1013
1014 # wait for commands
1015 self.log.debug('commands %s' % commands)
1016 for result in commands:
1017 r, outb, outs = result.wait()
1018 if r != 0:
1019 self.log.error('execute error: r = %d, detail = %s' % (r, outs))
1020 return r, outs
1021 self.log.debug('done')
1022 return 0, ''