]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
3 Balance PG distribution across OSDs.
12 from mgr_module
import MgrModule
, CommandResult
13 from threading
import Event
15 # available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight'
17 default_sleep_interval
= 60 # seconds
18 default_max_misplaced
= .05 # max ratio of pgs replaced at a time
20 TIME_FORMAT
= '%Y-%m-%d_%H:%M:%S'
24 def __init__(self
, osdmap
, pg_dump
, desc
=''):
27 self
.osdmap_dump
= self
.osdmap
.dump()
28 self
.crush
= osdmap
.get_crush()
29 self
.crush_dump
= self
.crush
.dump()
30 self
.pg_dump
= pg_dump
32 i
['pgid']: i
['stat_sum'] for i
in pg_dump
.get('pg_stats', [])
34 self
.poolids
= [p
['pool'] for p
in self
.osdmap_dump
.get('pools', [])]
36 self
.pg_up_by_poolid
= {}
37 for poolid
in self
.poolids
:
38 self
.pg_up_by_poolid
[poolid
] = osdmap
.map_pool_pgs_up(poolid
)
39 for a
,b
in self
.pg_up_by_poolid
[poolid
].iteritems():
42 def calc_misplaced_from(self
, other_ms
):
43 num
= len(other_ms
.pg_up
)
45 for pgid
, before
in other_ms
.pg_up
.iteritems():
46 if before
!= self
.pg_up
.get(pgid
, []):
49 return float(misplaced
) / float(num
)
53 def __init__(self
, name
, ms
):
60 self
.inc
= ms
.osdmap
.new_incremental()
62 def final_state(self
):
63 self
.inc
.set_osd_reweights(self
.osd_weights
)
64 self
.inc
.set_crush_compat_weight_set_weights(self
.compat_ws
)
65 return MappingState(self
.initial
.osdmap
.apply_incremental(self
.inc
),
67 'plan %s final' % self
.name
)
70 return json
.dumps(self
.inc
.dump(), indent
=4)
74 ls
.append('# starting osdmap epoch %d' % self
.initial
.osdmap
.get_epoch())
75 ls
.append('# starting crush version %d' %
76 self
.initial
.osdmap
.get_crush_version())
77 ls
.append('# mode %s' % self
.mode
)
78 if len(self
.compat_ws
) and \
79 '-1' not in self
.initial
.crush_dump
.get('choose_args', {}):
80 ls
.append('ceph osd crush weight-set create-compat')
81 for osd
, weight
in self
.compat_ws
.iteritems():
82 ls
.append('ceph osd crush weight-set reweight-compat %s %f' %
84 for osd
, weight
in self
.osd_weights
.iteritems():
85 ls
.append('ceph osd reweight osd.%d %f' % (osd
, weight
))
86 incdump
= self
.inc
.dump()
87 for pgid
in incdump
.get('old_pg_upmap_items', []):
88 ls
.append('ceph osd rm-pg-upmap-items %s' % pgid
)
89 for item
in incdump
.get('new_pg_upmap_items', []):
91 for m
in item
['mappings']:
92 osdlist
+= [m
['from'], m
['to']]
93 ls
.append('ceph osd pg-upmap-items %s %s' %
94 (item
['pgid'], ' '.join([str(a
) for a
in osdlist
])))
99 root_ids
= {} # root name -> id
100 pool_name
= {} # pool id -> pool name
101 pool_id
= {} # pool name -> id
102 pool_roots
= {} # pool name -> root name
103 root_pools
= {} # root name -> pools
104 target_by_root
= {} # root name -> target weight map
107 actual_by_pool
= {} # pool -> by_* -> actual weight map
108 actual_by_root
= {} # pool -> by_* -> actual weight map
109 total_by_pool
= {} # pool -> by_* -> total
110 total_by_root
= {} # root -> by_* -> total
111 stats_by_pool
= {} # pool -> by_* -> stddev or avg -> value
112 stats_by_root
= {} # root -> by_* -> stddev or avg -> value
119 def __init__(self
, ms
):
122 def show(self
, verbose
=False):
124 r
= self
.ms
.desc
+ '\n'
125 r
+= 'target_by_root %s\n' % self
.target_by_root
126 r
+= 'actual_by_pool %s\n' % self
.actual_by_pool
127 r
+= 'actual_by_root %s\n' % self
.actual_by_root
128 r
+= 'count_by_pool %s\n' % self
.count_by_pool
129 r
+= 'count_by_root %s\n' % self
.count_by_root
130 r
+= 'total_by_pool %s\n' % self
.total_by_pool
131 r
+= 'total_by_root %s\n' % self
.total_by_root
132 r
+= 'stats_by_root %s\n' % self
.stats_by_root
133 r
+= 'score_by_pool %s\n' % self
.score_by_pool
134 r
+= 'score_by_root %s\n' % self
.score_by_root
136 r
= self
.ms
.desc
+ ' '
137 r
+= 'score %f (lower is better)\n' % self
.score
140 def calc_stats(self
, count
, target
, total
):
141 num
= max(len(target
), 1)
143 for t
in ('pgs', 'objects', 'bytes'):
144 avg
= float(total
[t
]) / float(num
)
147 # score is a measure of how uneven the data distribution is.
148 # score lies between [0, 1), 0 means perfect distribution.
152 for k
, v
in count
[t
].iteritems():
153 # adjust/normalize by weight
155 adjusted
= float(v
) / target
[k
] / float(num
)
159 # Overweighted devices and their weights are factors to calculate reweight_urgency.
160 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
161 # situation than one 10% overfilled with 5 2% underfilled devices
164 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
165 x = (adjusted - avg)/avg.
166 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
167 To bring range of F(x) in range [0, 1), we need to make the above modification.
169 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
170 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
171 2. A larger value of x, should imply more urgency to reweight.
172 3. Also, the difference between F(x) when x is large, should be minimal.
173 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
175 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
177 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
179 score
+= target
[k
] * (math
.erf(((adjusted
- avg
)/avg
) / math
.sqrt(2.0)))
180 sum_weight
+= target
[k
]
181 dev
+= (avg
- adjusted
) * (avg
- adjusted
)
182 stddev
= math
.sqrt(dev
/ float(max(num
- 1, 1)))
183 score
= score
/ max(sum_weight
, 1)
187 'sum_weight': sum_weight
,
192 class Module(MgrModule
):
195 "cmd": "balancer status",
196 "desc": "Show balancer status",
200 "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
201 "desc": "Set balancer mode",
205 "cmd": "balancer on",
206 "desc": "Enable automatic balancing",
210 "cmd": "balancer off",
211 "desc": "Disable automatic balancing",
215 "cmd": "balancer eval name=plan,type=CephString,req=false",
216 "desc": "Evaluate data distribution for the current cluster or specific plan",
220 "cmd": "balancer eval-verbose name=plan,type=CephString,req=false",
221 "desc": "Evaluate data distribution for the current cluster or specific plan (verbosely)",
225 "cmd": "balancer optimize name=plan,type=CephString",
226 "desc": "Run optimizer to create a new plan",
230 "cmd": "balancer show name=plan,type=CephString",
231 "desc": "Show details of an optimization plan",
235 "cmd": "balancer rm name=plan,type=CephString",
236 "desc": "Discard an optimization plan",
240 "cmd": "balancer reset",
241 "desc": "Discard all optimization plans",
245 "cmd": "balancer dump name=plan,type=CephString",
246 "desc": "Show an optimization plan",
250 "cmd": "balancer execute name=plan,type=CephString",
251 "desc": "Execute an optimization plan",
260 def __init__(self
, *args
, **kwargs
):
261 super(Module
, self
).__init
__(*args
, **kwargs
)
264 def handle_command(self
, command
):
265 self
.log
.warn("Handling command: '%s'" % str(command
))
266 if command
['prefix'] == 'balancer status':
268 'plans': self
.plans
.keys(),
269 'active': self
.active
,
270 'mode': self
.get_config('mode', default_mode
),
272 return (0, json
.dumps(s
, indent
=4), '')
273 elif command
['prefix'] == 'balancer mode':
274 self
.set_config('mode', command
['mode'])
276 elif command
['prefix'] == 'balancer on':
278 self
.set_config('active', '1')
282 elif command
['prefix'] == 'balancer off':
284 self
.set_config('active', '')
288 elif command
['prefix'] == 'balancer eval' or command
['prefix'] == 'balancer eval-verbose':
289 verbose
= command
['prefix'] == 'balancer eval-verbose'
290 if 'plan' in command
:
291 plan
= self
.plans
.get(command
['plan'])
293 return (-errno
.ENOENT
, '', 'plan %s not found' %
295 ms
= plan
.final_state()
297 ms
= MappingState(self
.get_osdmap(),
300 return (0, self
.evaluate(ms
, verbose
=verbose
), '')
301 elif command
['prefix'] == 'balancer optimize':
302 plan
= self
.plan_create(command
['plan'])
305 elif command
['prefix'] == 'balancer rm':
306 self
.plan_rm(command
['name'])
308 elif command
['prefix'] == 'balancer reset':
311 elif command
['prefix'] == 'balancer dump':
312 plan
= self
.plans
.get(command
['plan'])
314 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
315 return (0, plan
.dump(), '')
316 elif command
['prefix'] == 'balancer show':
317 plan
= self
.plans
.get(command
['plan'])
319 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
320 return (0, plan
.show(), '')
321 elif command
['prefix'] == 'balancer execute':
322 plan
= self
.plans
.get(command
['plan'])
324 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
329 return (-errno
.EINVAL
, '',
330 "Command not found '{0}'".format(command
['prefix']))
333 self
.log
.info('Stopping')
337 def time_in_interval(self
, tod
, begin
, end
):
339 return tod
>= begin
and tod
< end
341 return tod
>= begin
or tod
< end
344 self
.log
.info('Starting')
346 self
.active
= self
.get_config('active', '') is not ''
347 begin_time
= self
.get_config('begin_time') or '0000'
348 end_time
= self
.get_config('end_time') or '2400'
349 timeofday
= time
.strftime('%H%M', time
.localtime())
350 self
.log
.debug('Waking up [%s, scheduled for %s-%s, now %s]',
351 "active" if self
.active
else "inactive",
352 begin_time
, end_time
, timeofday
)
353 sleep_interval
= float(self
.get_config('sleep_interval',
354 default_sleep_interval
))
355 if self
.active
and self
.time_in_interval(timeofday
, begin_time
, end_time
):
356 self
.log
.debug('Running')
357 name
= 'auto_%s' % time
.strftime(TIME_FORMAT
, time
.gmtime())
358 plan
= self
.plan_create(name
)
359 if self
.optimize(plan
):
362 self
.log
.debug('Sleeping for %d', sleep_interval
)
363 self
.event
.wait(sleep_interval
)
366 def plan_create(self
, name
):
367 plan
= Plan(name
, MappingState(self
.get_osdmap(),
369 'plan %s initial' % name
))
370 self
.plans
[name
] = plan
373 def plan_rm(self
, name
):
374 if name
in self
.plans
:
377 def calc_eval(self
, ms
):
381 for p
in ms
.osdmap_dump
.get('pools',[]):
382 pe
.pool_name
[p
['pool']] = p
['pool_name']
383 pe
.pool_id
[p
['pool_name']] = p
['pool']
384 pool_rule
[p
['pool_name']] = p
['crush_rule']
385 pe
.pool_roots
[p
['pool_name']] = []
386 pool_info
[p
['pool_name']] = p
387 pools
= pe
.pool_id
.keys()
390 self
.log
.debug('pool_name %s' % pe
.pool_name
)
391 self
.log
.debug('pool_id %s' % pe
.pool_id
)
392 self
.log
.debug('pools %s' % pools
)
393 self
.log
.debug('pool_rule %s' % pool_rule
)
395 osd_weight
= { a
['osd']: a
['weight']
396 for a
in ms
.osdmap_dump
.get('osds',[]) }
398 # get expected distributions by root
400 rootids
= ms
.crush
.find_takes()
402 for rootid
in rootids
:
403 root
= ms
.crush
.get_item_name(rootid
)
404 pe
.root_ids
[root
] = rootid
406 ls
= ms
.osdmap
.get_pools_by_take(rootid
)
407 pe
.root_pools
[root
] = []
409 pe
.pool_roots
[pe
.pool_name
[poolid
]].append(root
)
410 pe
.root_pools
[root
].append(pe
.pool_name
[poolid
])
411 weight_map
= ms
.crush
.get_take_weight_osd_map(rootid
)
413 osd
: cw
* osd_weight
.get(osd
, 1.0)
414 for osd
,cw
in weight_map
.iteritems()
416 sum_w
= sum(adjusted_map
.values()) or 1.0
417 pe
.target_by_root
[root
] = { osd
: w
/ sum_w
418 for osd
,w
in adjusted_map
.iteritems() }
419 actual_by_root
[root
] = {
424 for osd
in pe
.target_by_root
[root
].iterkeys():
425 actual_by_root
[root
]['pgs'][osd
] = 0
426 actual_by_root
[root
]['objects'][osd
] = 0
427 actual_by_root
[root
]['bytes'][osd
] = 0
428 pe
.total_by_root
[root
] = {
433 self
.log
.debug('pool_roots %s' % pe
.pool_roots
)
434 self
.log
.debug('root_pools %s' % pe
.root_pools
)
435 self
.log
.debug('target_by_root %s' % pe
.target_by_root
)
437 # pool and root actual
438 for pool
, pi
in pool_info
.iteritems():
440 pm
= ms
.pg_up_by_poolid
[poolid
]
447 for root
in pe
.pool_roots
[pool
]:
448 for osd
in pe
.target_by_root
[root
].iterkeys():
450 objects_by_osd
[osd
] = 0
451 bytes_by_osd
[osd
] = 0
452 for pgid
, up
in pm
.iteritems():
453 for osd
in [int(osd
) for osd
in up
]:
455 objects_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_objects']
456 bytes_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_bytes']
457 # pick a root to associate this pg instance with.
458 # note that this is imprecise if the roots have
459 # overlapping children.
460 # FIXME: divide bytes by k for EC pools.
461 for root
in pe
.pool_roots
[pool
]:
462 if osd
in pe
.target_by_root
[root
]:
463 actual_by_root
[root
]['pgs'][osd
] += 1
464 actual_by_root
[root
]['objects'][osd
] += ms
.pg_stat
[pgid
]['num_objects']
465 actual_by_root
[root
]['bytes'][osd
] += ms
.pg_stat
[pgid
]['num_bytes']
467 objects
+= ms
.pg_stat
[pgid
]['num_objects']
468 bytes
+= ms
.pg_stat
[pgid
]['num_bytes']
469 pe
.total_by_root
[root
]['pgs'] += 1
470 pe
.total_by_root
[root
]['objects'] += ms
.pg_stat
[pgid
]['num_objects']
471 pe
.total_by_root
[root
]['bytes'] += ms
.pg_stat
[pgid
]['num_bytes']
473 pe
.count_by_pool
[pool
] = {
476 for k
, v
in pgs_by_osd
.iteritems()
480 for k
, v
in objects_by_osd
.iteritems()
484 for k
, v
in bytes_by_osd
.iteritems()
487 pe
.actual_by_pool
[pool
] = {
489 k
: float(v
) / float(max(pgs
, 1))
490 for k
, v
in pgs_by_osd
.iteritems()
493 k
: float(v
) / float(max(objects
, 1))
494 for k
, v
in objects_by_osd
.iteritems()
497 k
: float(v
) / float(max(bytes
, 1))
498 for k
, v
in bytes_by_osd
.iteritems()
501 pe
.total_by_pool
[pool
] = {
506 for root
, m
in pe
.total_by_root
.iteritems():
507 pe
.count_by_root
[root
] = {
510 for k
, v
in actual_by_root
[root
]['pgs'].iteritems()
514 for k
, v
in actual_by_root
[root
]['objects'].iteritems()
518 for k
, v
in actual_by_root
[root
]['bytes'].iteritems()
521 pe
.actual_by_root
[root
] = {
523 k
: float(v
) / float(max(pe
.total_by_root
[root
]['pgs'], 1))
524 for k
, v
in actual_by_root
[root
]['pgs'].iteritems()
527 k
: float(v
) / float(max(pe
.total_by_root
[root
]['objects'], 1))
528 for k
, v
in actual_by_root
[root
]['objects'].iteritems()
531 k
: float(v
) / float(max(pe
.total_by_root
[root
]['bytes'], 1))
532 for k
, v
in actual_by_root
[root
]['bytes'].iteritems()
535 self
.log
.debug('actual_by_pool %s' % pe
.actual_by_pool
)
536 self
.log
.debug('actual_by_root %s' % pe
.actual_by_root
)
538 # average and stddev and score
542 pe
.target_by_root
[a
],
544 ) for a
, b
in pe
.count_by_root
.iteritems()
547 # the scores are already normalized
550 'pgs': pe
.stats_by_root
[r
]['pgs']['score'],
551 'objects': pe
.stats_by_root
[r
]['objects']['score'],
552 'bytes': pe
.stats_by_root
[r
]['bytes']['score'],
553 } for r
in pe
.total_by_root
.keys()
556 # total score is just average of normalized stddevs
558 for r
, vs
in pe
.score_by_root
.iteritems():
559 for k
, v
in vs
.iteritems():
561 pe
.score
/= 3 * len(roots
)
564 def evaluate(self
, ms
, verbose
=False):
565 pe
= self
.calc_eval(ms
)
566 return pe
.show(verbose
=verbose
)
568 def optimize(self
, plan
):
569 self
.log
.info('Optimize plan %s' % plan
.name
)
570 plan
.mode
= self
.get_config('mode', default_mode
)
571 max_misplaced
= float(self
.get_config('max_misplaced',
572 default_max_misplaced
))
573 self
.log
.info('Mode %s, max misplaced %f' %
574 (plan
.mode
, max_misplaced
))
576 info
= self
.get('pg_status')
577 unknown
= info
.get('unknown_pgs_ratio', 0.0)
578 degraded
= info
.get('degraded_ratio', 0.0)
579 inactive
= info
.get('inactive_pgs_ratio', 0.0)
580 misplaced
= info
.get('misplaced_ratio', 0.0)
581 self
.log
.debug('unknown %f degraded %f inactive %f misplaced %g',
582 unknown
, degraded
, inactive
, misplaced
)
584 self
.log
.info('Some PGs (%f) are unknown; waiting', unknown
)
586 self
.log
.info('Some objects (%f) are degraded; waiting', degraded
)
588 self
.log
.info('Some PGs (%f) are inactive; waiting', inactive
)
589 elif misplaced
>= max_misplaced
:
590 self
.log
.info('Too many objects (%f > %f) are misplaced; waiting',
591 misplaced
, max_misplaced
)
593 if plan
.mode
== 'upmap':
594 return self
.do_upmap(plan
)
595 elif plan
.mode
== 'crush-compat':
596 return self
.do_crush_compat(plan
)
597 elif plan
.mode
== 'none':
598 self
.log
.info('Idle')
600 self
.log
.info('Unrecognized mode %s' % plan
.mode
)
605 def do_upmap(self
, plan
):
606 self
.log
.info('do_upmap')
607 max_iterations
= self
.get_config('upmap_max_iterations', 10)
608 max_deviation
= self
.get_config('upmap_max_deviation', .01)
611 pools
= [str(i
['pool_name']) for i
in ms
.osdmap_dump
.get('pools',[])]
613 self
.log
.info('no pools, nothing to do')
615 # shuffle pool list so they all get equal (in)attention
616 random
.shuffle(pools
)
617 self
.log
.info('pools %s' % pools
)
621 left
= max_iterations
623 did
= ms
.osdmap
.calc_pg_upmaps(inc
, max_deviation
, left
, [pool
])
628 self
.log
.info('prepared %d/%d changes' % (total_did
, max_iterations
))
631 def do_crush_compat(self
, plan
):
632 self
.log
.info('do_crush_compat')
633 max_iterations
= self
.get_config('crush_compat_max_iterations', 25)
634 if max_iterations
< 1:
636 step
= self
.get_config('crush_compat_step', .5)
637 if step
<= 0 or step
>= 1.0:
639 max_misplaced
= float(self
.get_config('max_misplaced',
640 default_max_misplaced
))
645 crush
= osdmap
.get_crush()
646 pe
= self
.calc_eval(ms
)
648 self
.log
.info('Distribution is already perfect')
651 # get current osd reweights
652 orig_osd_weight
= { a
['osd']: a
['weight']
653 for a
in ms
.osdmap_dump
.get('osds',[]) }
654 reweighted_osds
= [ a
for a
,b
in orig_osd_weight
.iteritems()
655 if b
< 1.0 and b
> 0.0 ]
657 # get current compat weight-set weights
658 orig_ws
= self
.get_compat_weight_set_weights()
659 orig_ws
= { a
: b
for a
, b
in orig_ws
.iteritems() if a
>= 0 }
661 # Make sure roots don't overlap their devices. If so, we
663 roots
= pe
.target_by_root
.keys()
664 self
.log
.debug('roots %s', roots
)
668 for root
, wm
in pe
.target_by_root
.iteritems():
669 for osd
in wm
.iterkeys():
674 self
.log
.err('error: some osds belong to multiple subtrees: %s' %
678 key
= 'pgs' # pgs objects or bytes
681 best_ws
= copy
.deepcopy(orig_ws
)
682 best_ow
= copy
.deepcopy(orig_osd_weight
)
684 left
= max_iterations
686 next_ws
= copy
.deepcopy(best_ws
)
687 next_ow
= copy
.deepcopy(best_ow
)
690 self
.log
.debug('best_ws %s' % best_ws
)
691 random
.shuffle(roots
)
693 pools
= best_pe
.root_pools
[root
]
694 pgs
= len(best_pe
.target_by_root
[root
])
695 min_pgs
= pgs
* min_pg_per_osd
696 if best_pe
.total_by_root
[root
] < min_pgs
:
697 self
.log
.info('Skipping root %s (pools %s), total pgs %d '
698 '< minimum %d (%d per osd)',
699 root
, pools
, pgs
, min_pgs
, min_pg_per_osd
)
701 self
.log
.info('Balancing root %s (pools %s) by %s' %
703 target
= best_pe
.target_by_root
[root
]
704 actual
= best_pe
.actual_by_root
[root
][key
]
705 queue
= sorted(actual
.keys(),
706 key
=lambda osd
: -abs(target
[osd
] - actual
[osd
]))
708 if orig_osd_weight
[osd
] == 0:
709 self
.log
.debug('skipping out osd.%d', osd
)
711 deviation
= target
[osd
] - actual
[osd
]
714 self
.log
.debug('osd.%d deviation %f', osd
, deviation
)
715 weight
= best_ws
[osd
]
716 ow
= orig_osd_weight
[osd
]
718 calc_weight
= target
[osd
] / actual
[osd
] * weight
* ow
720 # not enough to go on here... keep orig weight
721 calc_weight
= weight
/ orig_osd_weight
[osd
]
722 new_weight
= weight
* (1.0 - step
) + calc_weight
* step
723 self
.log
.debug('Reweight osd.%d %f -> %f', osd
, weight
,
725 next_ws
[osd
] = new_weight
727 new_ow
= min(1.0, max(step
+ (1.0 - step
) * ow
,
729 self
.log
.debug('Reweight osd.%d reweight %f -> %f',
731 next_ow
[osd
] = new_ow
733 # normalize weights under this root
734 root_weight
= crush
.get_item_weight(pe
.root_ids
[root
])
735 root_sum
= sum(b
for a
,b
in next_ws
.iteritems()
736 if a
in target
.keys())
737 if root_sum
> 0 and root_weight
> 0:
738 factor
= root_sum
/ root_weight
739 self
.log
.debug('normalizing root %s %d, weight %f, '
740 'ws sum %f, factor %f',
741 root
, pe
.root_ids
[root
], root_weight
,
743 for osd
in actual
.keys():
744 next_ws
[osd
] = next_ws
[osd
] / factor
747 plan
.compat_ws
= copy
.deepcopy(next_ws
)
748 next_ms
= plan
.final_state()
749 next_pe
= self
.calc_eval(next_ms
)
750 next_misplaced
= next_ms
.calc_misplaced_from(ms
)
751 self
.log
.debug('Step result score %f -> %f, misplacing %f',
752 best_pe
.score
, next_pe
.score
, next_misplaced
)
754 if next_misplaced
> max_misplaced
:
755 if best_pe
.score
< pe
.score
:
756 self
.log
.debug('Step misplaced %f > max %f, stopping',
757 next_misplaced
, max_misplaced
)
760 next_ws
= copy
.deepcopy(best_ws
)
761 next_ow
= copy
.deepcopy(best_ow
)
762 self
.log
.debug('Step misplaced %f > max %f, reducing step to %f',
763 next_misplaced
, max_misplaced
, step
)
765 if next_pe
.score
> best_pe
.score
* 1.0001:
766 if bad_steps
< 5 and random
.randint(0, 100) < 70:
767 self
.log
.debug('Score got worse, taking another step')
770 next_ws
= copy
.deepcopy(best_ws
)
771 next_ow
= copy
.deepcopy(best_ow
)
772 self
.log
.debug('Score got worse, trying smaller step %f',
779 if best_pe
.score
== 0:
783 # allow a small regression if we are phasing out osd weights
785 if next_ow
!= orig_osd_weight
:
788 if best_pe
.score
< pe
.score
+ fudge
:
789 self
.log
.info('Success, score %f -> %f', pe
.score
, best_pe
.score
)
790 plan
.compat_ws
= best_ws
791 for osd
, w
in best_ow
.iteritems():
792 if w
!= orig_osd_weight
[osd
]:
793 self
.log
.debug('osd.%d reweight %f', osd
, w
)
794 plan
.osd_weights
[osd
] = w
797 self
.log
.info('Failed to find further optimization, score %f',
801 def get_compat_weight_set_weights(self
):
802 # enable compat weight-set
803 self
.log
.debug('ceph osd crush weight-set create-compat')
804 result
= CommandResult('')
805 self
.send_command(result
, 'mon', '', json
.dumps({
806 'prefix': 'osd crush weight-set create-compat',
809 r
, outb
, outs
= result
.wait()
811 self
.log
.error('Error creating compat weight-set')
814 result
= CommandResult('')
815 self
.send_command(result
, 'mon', '', json
.dumps({
816 'prefix': 'osd crush dump',
819 r
, outb
, outs
= result
.wait()
821 self
.log
.error('Error dumping crush map')
824 crushmap
= json
.loads(outb
)
826 raise RuntimeError('unable to parse crush map')
828 raw
= crushmap
.get('choose_args',{}).get('-1', [])
832 for t
in crushmap
['buckets']:
833 if t
['id'] == b
['bucket_id']:
837 raise RuntimeError('could not find bucket %s' % b
['bucket_id'])
838 self
.log
.debug('bucket items %s' % bucket
['items'])
839 self
.log
.debug('weight set %s' % b
['weight_set'][0])
840 if len(bucket
['items']) != len(b
['weight_set'][0]):
841 raise RuntimeError('weight-set size does not match bucket items')
842 for pos
in range(len(bucket
['items'])):
843 weight_set
[bucket
['items'][pos
]['id']] = b
['weight_set'][0][pos
]
845 self
.log
.debug('weight_set weights %s' % weight_set
)
849 self
.log
.info('do_crush (not yet implemented)')
851 def do_osd_weight(self
):
852 self
.log
.info('do_osd_weight (not yet implemented)')
854 def execute(self
, plan
):
855 self
.log
.info('Executing plan %s' % plan
.name
)
860 if len(plan
.compat_ws
) and \
861 '-1' not in plan
.initial
.crush_dump
.get('choose_args', {}):
862 self
.log
.debug('ceph osd crush weight-set create-compat')
863 result
= CommandResult('')
864 self
.send_command(result
, 'mon', '', json
.dumps({
865 'prefix': 'osd crush weight-set create-compat',
868 r
, outb
, outs
= result
.wait()
870 self
.log
.error('Error creating compat weight-set')
873 for osd
, weight
in plan
.compat_ws
.iteritems():
874 self
.log
.info('ceph osd crush weight-set reweight-compat osd.%d %f',
876 result
= CommandResult('foo')
877 self
.send_command(result
, 'mon', '', json
.dumps({
878 'prefix': 'osd crush weight-set reweight-compat',
880 'item': 'osd.%d' % osd
,
883 commands
.append(result
)
887 for osd
, weight
in plan
.osd_weights
.iteritems():
888 reweightn
[str(osd
)] = str(int(weight
* float(0x10000)))
890 self
.log
.info('ceph osd reweightn %s', reweightn
)
891 result
= CommandResult('foo')
892 self
.send_command(result
, 'mon', '', json
.dumps({
893 'prefix': 'osd reweightn',
895 'weights': json
.dumps(reweightn
),
897 commands
.append(result
)
900 incdump
= plan
.inc
.dump()
901 for pgid
in incdump
.get('old_pg_upmap_items', []):
902 self
.log
.info('ceph osd rm-pg-upmap-items %s', pgid
)
903 result
= CommandResult('foo')
904 self
.send_command(result
, 'mon', '', json
.dumps({
905 'prefix': 'osd rm-pg-upmap-items',
909 commands
.append(result
)
911 for item
in incdump
.get('new_pg_upmap_items', []):
912 self
.log
.info('ceph osd pg-upmap-items %s mappings %s', item
['pgid'],
915 for m
in item
['mappings']:
916 osdlist
+= [m
['from'], m
['to']]
917 result
= CommandResult('foo')
918 self
.send_command(result
, 'mon', '', json
.dumps({
919 'prefix': 'osd pg-upmap-items',
921 'pgid': item
['pgid'],
924 commands
.append(result
)
927 self
.log
.debug('commands %s' % commands
)
928 for result
in commands
:
929 r
, outb
, outs
= result
.wait()
931 self
.log
.error('Error on command')
933 self
.log
.debug('done')