]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
3 Balance PG distribution across OSDs.
13 from mgr_module
import MgrModule
, CommandResult
14 from threading
import Event
15 from mgr_module
import CRUSHMap
17 # available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight'
19 default_sleep_interval
= 60 # seconds
20 default_max_misplaced
= .05 # max ratio of pgs replaced at a time
22 TIME_FORMAT
= '%Y-%m-%d_%H:%M:%S'
25 def __init__(self
, osdmap
, pg_dump
, desc
=''):
28 self
.osdmap_dump
= self
.osdmap
.dump()
29 self
.crush
= osdmap
.get_crush()
30 self
.crush_dump
= self
.crush
.dump()
31 self
.pg_dump
= pg_dump
33 i
['pgid']: i
['stat_sum'] for i
in pg_dump
.get('pg_stats', [])
35 osd_poolids
= [p
['pool'] for p
in self
.osdmap_dump
.get('pools', [])]
36 pg_poolids
= [p
['poolid'] for p
in pg_dump
.get('pool_stats', [])]
37 self
.poolids
= set(osd_poolids
) & set(pg_poolids
)
39 self
.pg_up_by_poolid
= {}
40 for poolid
in self
.poolids
:
41 self
.pg_up_by_poolid
[poolid
] = osdmap
.map_pool_pgs_up(poolid
)
42 for a
,b
in six
.iteritems(self
.pg_up_by_poolid
[poolid
]):
45 def calc_misplaced_from(self
, other_ms
):
46 num
= len(other_ms
.pg_up
)
48 for pgid
, before
in six
.iteritems(other_ms
.pg_up
):
49 if before
!= self
.pg_up
.get(pgid
, []):
52 return float(misplaced
) / float(num
)
56 def __init__(self
, name
, ms
, pools
):
64 self
.inc
= ms
.osdmap
.new_incremental()
66 def final_state(self
):
67 self
.inc
.set_osd_reweights(self
.osd_weights
)
68 self
.inc
.set_crush_compat_weight_set_weights(self
.compat_ws
)
69 return MappingState(self
.initial
.osdmap
.apply_incremental(self
.inc
),
71 'plan %s final' % self
.name
)
74 return json
.dumps(self
.inc
.dump(), indent
=4)
78 ls
.append('# starting osdmap epoch %d' % self
.initial
.osdmap
.get_epoch())
79 ls
.append('# starting crush version %d' %
80 self
.initial
.osdmap
.get_crush_version())
81 ls
.append('# mode %s' % self
.mode
)
82 if len(self
.compat_ws
) and \
83 '-1' not in self
.initial
.crush_dump
.get('choose_args', {}):
84 ls
.append('ceph osd crush weight-set create-compat')
85 for osd
, weight
in six
.iteritems(self
.compat_ws
):
86 ls
.append('ceph osd crush weight-set reweight-compat %s %f' %
88 for osd
, weight
in six
.iteritems(self
.osd_weights
):
89 ls
.append('ceph osd reweight osd.%d %f' % (osd
, weight
))
90 incdump
= self
.inc
.dump()
91 for pgid
in incdump
.get('old_pg_upmap_items', []):
92 ls
.append('ceph osd rm-pg-upmap-items %s' % pgid
)
93 for item
in incdump
.get('new_pg_upmap_items', []):
95 for m
in item
['mappings']:
96 osdlist
+= [m
['from'], m
['to']]
97 ls
.append('ceph osd pg-upmap-items %s %s' %
98 (item
['pgid'], ' '.join([str(a
) for a
in osdlist
])))
103 def __init__(self
, ms
):
105 self
.root_ids
= {} # root name -> id
106 self
.pool_name
= {} # pool id -> pool name
107 self
.pool_id
= {} # pool name -> id
108 self
.pool_roots
= {} # pool name -> root name
109 self
.root_pools
= {} # root name -> pools
110 self
.target_by_root
= {} # root name -> target weight map
111 self
.count_by_pool
= {}
112 self
.count_by_root
= {}
113 self
.actual_by_pool
= {} # pool -> by_* -> actual weight map
114 self
.actual_by_root
= {} # pool -> by_* -> actual weight map
115 self
.total_by_pool
= {} # pool -> by_* -> total
116 self
.total_by_root
= {} # root -> by_* -> total
117 self
.stats_by_pool
= {} # pool -> by_* -> stddev or avg -> value
118 self
.stats_by_root
= {} # root -> by_* -> stddev or avg -> value
120 self
.score_by_pool
= {}
121 self
.score_by_root
= {}
125 def show(self
, verbose
=False):
127 r
= self
.ms
.desc
+ '\n'
128 r
+= 'target_by_root %s\n' % self
.target_by_root
129 r
+= 'actual_by_pool %s\n' % self
.actual_by_pool
130 r
+= 'actual_by_root %s\n' % self
.actual_by_root
131 r
+= 'count_by_pool %s\n' % self
.count_by_pool
132 r
+= 'count_by_root %s\n' % self
.count_by_root
133 r
+= 'total_by_pool %s\n' % self
.total_by_pool
134 r
+= 'total_by_root %s\n' % self
.total_by_root
135 r
+= 'stats_by_root %s\n' % self
.stats_by_root
136 r
+= 'score_by_pool %s\n' % self
.score_by_pool
137 r
+= 'score_by_root %s\n' % self
.score_by_root
139 r
= self
.ms
.desc
+ ' '
140 r
+= 'score %f (lower is better)\n' % self
.score
143 def calc_stats(self
, count
, target
, total
):
144 num
= max(len(target
), 1)
146 for t
in ('pgs', 'objects', 'bytes'):
156 avg
= float(total
[t
]) / float(num
)
159 # score is a measure of how uneven the data distribution is.
160 # score lies between [0, 1), 0 means perfect distribution.
164 for k
, v
in six
.iteritems(count
[t
]):
165 # adjust/normalize by weight
167 adjusted
= float(v
) / target
[k
] / float(num
)
171 # Overweighted devices and their weights are factors to calculate reweight_urgency.
172 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
173 # situation than one 10% overfilled with 5 2% underfilled devices
176 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
177 x = (adjusted - avg)/avg.
178 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
179 To bring range of F(x) in range [0, 1), we need to make the above modification.
181 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
182 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
183 2. A larger value of x, should imply more urgency to reweight.
184 3. Also, the difference between F(x) when x is large, should be minimal.
185 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
187 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
189 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
191 score
+= target
[k
] * (math
.erf(((adjusted
- avg
)/avg
) / math
.sqrt(2.0)))
192 sum_weight
+= target
[k
]
193 dev
+= (avg
- adjusted
) * (avg
- adjusted
)
194 stddev
= math
.sqrt(dev
/ float(max(num
- 1, 1)))
195 score
= score
/ max(sum_weight
, 1)
199 'sum_weight': sum_weight
,
204 class Module(MgrModule
):
207 "cmd": "balancer status",
208 "desc": "Show balancer status",
212 "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
213 "desc": "Set balancer mode",
217 "cmd": "balancer on",
218 "desc": "Enable automatic balancing",
222 "cmd": "balancer off",
223 "desc": "Disable automatic balancing",
227 "cmd": "balancer eval name=option,type=CephString,req=false",
228 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan",
232 "cmd": "balancer eval-verbose name=option,type=CephString,req=false",
233 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely)",
237 "cmd": "balancer optimize name=plan,type=CephString name=pools,type=CephString,n=N,req=false",
238 "desc": "Run optimizer to create a new plan",
242 "cmd": "balancer show name=plan,type=CephString",
243 "desc": "Show details of an optimization plan",
247 "cmd": "balancer rm name=plan,type=CephString",
248 "desc": "Discard an optimization plan",
252 "cmd": "balancer reset",
253 "desc": "Discard all optimization plans",
257 "cmd": "balancer dump name=plan,type=CephString",
258 "desc": "Show an optimization plan",
262 "cmd": "balancer ls",
263 "desc": "List all plans",
267 "cmd": "balancer execute name=plan,type=CephString",
268 "desc": "Execute an optimization plan",
277 def __init__(self
, *args
, **kwargs
):
278 super(Module
, self
).__init
__(*args
, **kwargs
)
281 def handle_command(self
, command
):
282 self
.log
.warn("Handling command: '%s'" % str(command
))
283 if command
['prefix'] == 'balancer status':
285 'plans': list(self
.plans
.keys()),
286 'active': self
.active
,
287 'mode': self
.get_config('mode', default_mode
),
289 return (0, json
.dumps(s
, indent
=4), '')
290 elif command
['prefix'] == 'balancer mode':
291 self
.set_config('mode', command
['mode'])
293 elif command
['prefix'] == 'balancer on':
295 self
.set_config('active', '1')
299 elif command
['prefix'] == 'balancer off':
301 self
.set_config('active', '')
305 elif command
['prefix'] == 'balancer eval' or command
['prefix'] == 'balancer eval-verbose':
306 verbose
= command
['prefix'] == 'balancer eval-verbose'
308 if 'option' in command
:
309 plan
= self
.plans
.get(command
['option'])
311 # not a plan, does it look like a pool?
312 osdmap
= self
.get_osdmap()
313 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
314 option
= command
['option']
315 if option
not in valid_pool_names
:
316 return (-errno
.EINVAL
, '', 'option "%s" not a plan or a pool' % option
)
318 ms
= MappingState(osdmap
, self
.get("pg_dump"), 'pool "%s"' % option
)
321 ms
= plan
.final_state()
323 ms
= MappingState(self
.get_osdmap(),
326 return (0, self
.evaluate(ms
, pools
, verbose
=verbose
), '')
327 elif command
['prefix'] == 'balancer optimize':
329 if 'pools' in command
:
330 pools
= command
['pools']
331 osdmap
= self
.get_osdmap()
332 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
333 invalid_pool_names
= []
335 if p
not in valid_pool_names
:
336 invalid_pool_names
.append(p
)
337 if len(invalid_pool_names
):
338 return (-errno
.EINVAL
, '', 'pools %s not found' % invalid_pool_names
)
339 plan
= self
.plan_create(command
['plan'], osdmap
, pools
)
340 r
, detail
= self
.optimize(plan
)
341 # remove plan if we are currently unable to find an optimization
342 # or distribution is already perfect
344 self
.plan_rm(command
['plan'])
345 return (r
, '', detail
)
346 elif command
['prefix'] == 'balancer rm':
347 self
.plan_rm(command
['plan'])
349 elif command
['prefix'] == 'balancer reset':
352 elif command
['prefix'] == 'balancer ls':
353 return (0, json
.dumps([p
for p
in self
.plans
], indent
=4), '')
354 elif command
['prefix'] == 'balancer dump':
355 plan
= self
.plans
.get(command
['plan'])
357 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
358 return (0, plan
.dump(), '')
359 elif command
['prefix'] == 'balancer show':
360 plan
= self
.plans
.get(command
['plan'])
362 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
363 return (0, plan
.show(), '')
364 elif command
['prefix'] == 'balancer execute':
365 plan
= self
.plans
.get(command
['plan'])
367 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
368 r
, detail
= self
.execute(plan
)
369 self
.plan_rm(command
['plan'])
370 return (r
, '', detail
)
372 return (-errno
.EINVAL
, '',
373 "Command not found '{0}'".format(command
['prefix']))
376 self
.log
.info('Stopping')
380 def time_in_interval(self
, tod
, begin
, end
):
382 return tod
>= begin
and tod
< end
384 return tod
>= begin
or tod
< end
387 self
.log
.info('Starting')
389 self
.active
= self
.get_config('active', '') is not ''
390 begin_time
= self
.get_config('begin_time') or '0000'
391 end_time
= self
.get_config('end_time') or '2400'
392 timeofday
= time
.strftime('%H%M', time
.localtime())
393 self
.log
.debug('Waking up [%s, scheduled for %s-%s, now %s]',
394 "active" if self
.active
else "inactive",
395 begin_time
, end_time
, timeofday
)
396 sleep_interval
= float(self
.get_config('sleep_interval',
397 default_sleep_interval
))
398 if self
.active
and self
.time_in_interval(timeofday
, begin_time
, end_time
):
399 self
.log
.debug('Running')
400 name
= 'auto_%s' % time
.strftime(TIME_FORMAT
, time
.gmtime())
401 plan
= self
.plan_create(name
, self
.get_osdmap(), [])
402 r
, detail
= self
.optimize(plan
)
406 self
.log
.debug('Sleeping for %d', sleep_interval
)
407 self
.event
.wait(sleep_interval
)
410 def plan_create(self
, name
, osdmap
, pools
):
414 'plan %s initial' % name
),
416 self
.plans
[name
] = plan
419 def plan_rm(self
, name
):
420 if name
in self
.plans
:
423 def calc_eval(self
, ms
, pools
):
427 for p
in ms
.osdmap_dump
.get('pools',[]):
428 if len(pools
) and p
['pool_name'] not in pools
:
430 # skip dead or not-yet-ready pools too
431 if p
['pool'] not in ms
.poolids
:
433 pe
.pool_name
[p
['pool']] = p
['pool_name']
434 pe
.pool_id
[p
['pool_name']] = p
['pool']
435 pool_rule
[p
['pool_name']] = p
['crush_rule']
436 pe
.pool_roots
[p
['pool_name']] = []
437 pool_info
[p
['pool_name']] = p
438 if len(pool_info
) == 0:
440 self
.log
.debug('pool_name %s' % pe
.pool_name
)
441 self
.log
.debug('pool_id %s' % pe
.pool_id
)
442 self
.log
.debug('pools %s' % pools
)
443 self
.log
.debug('pool_rule %s' % pool_rule
)
445 osd_weight
= { a
['osd']: a
['weight']
446 for a
in ms
.osdmap_dump
.get('osds',[]) if a
['weight'] > 0 }
448 # get expected distributions by root
450 rootids
= ms
.crush
.find_takes()
452 for rootid
in rootids
:
453 ls
= ms
.osdmap
.get_pools_by_take(rootid
)
455 # find out roots associating with pools we are passed in
457 if candidate
in pe
.pool_name
:
458 want
.append(candidate
)
461 root
= ms
.crush
.get_item_name(rootid
)
462 pe
.root_pools
[root
] = []
464 pe
.pool_roots
[pe
.pool_name
[poolid
]].append(root
)
465 pe
.root_pools
[root
].append(pe
.pool_name
[poolid
])
466 pe
.root_ids
[root
] = rootid
468 weight_map
= ms
.crush
.get_take_weight_osd_map(rootid
)
470 osd
: cw
* osd_weight
[osd
]
471 for osd
,cw
in six
.iteritems(weight_map
) if osd
in osd_weight
and cw
> 0
473 sum_w
= sum(adjusted_map
.values())
474 assert len(adjusted_map
) == 0 or sum_w
> 0
475 pe
.target_by_root
[root
] = { osd
: w
/ sum_w
476 for osd
,w
in six
.iteritems(adjusted_map
) }
477 actual_by_root
[root
] = {
482 for osd
in pe
.target_by_root
[root
].iterkeys():
483 actual_by_root
[root
]['pgs'][osd
] = 0
484 actual_by_root
[root
]['objects'][osd
] = 0
485 actual_by_root
[root
]['bytes'][osd
] = 0
486 pe
.total_by_root
[root
] = {
491 self
.log
.debug('pool_roots %s' % pe
.pool_roots
)
492 self
.log
.debug('root_pools %s' % pe
.root_pools
)
493 self
.log
.debug('target_by_root %s' % pe
.target_by_root
)
495 # pool and root actual
496 for pool
, pi
in six
.iteritems(pool_info
):
498 pm
= ms
.pg_up_by_poolid
[poolid
]
505 for root
in pe
.pool_roots
[pool
]:
506 for osd
in pe
.target_by_root
[root
].iterkeys():
508 objects_by_osd
[osd
] = 0
509 bytes_by_osd
[osd
] = 0
510 for pgid
, up
in six
.iteritems(pm
):
511 for osd
in [int(osd
) for osd
in up
]:
512 if osd
== CRUSHMap
.ITEM_NONE
:
515 objects_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_objects']
516 bytes_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_bytes']
517 # pick a root to associate this pg instance with.
518 # note that this is imprecise if the roots have
519 # overlapping children.
520 # FIXME: divide bytes by k for EC pools.
521 for root
in pe
.pool_roots
[pool
]:
522 if osd
in pe
.target_by_root
[root
]:
523 actual_by_root
[root
]['pgs'][osd
] += 1
524 actual_by_root
[root
]['objects'][osd
] += ms
.pg_stat
[pgid
]['num_objects']
525 actual_by_root
[root
]['bytes'][osd
] += ms
.pg_stat
[pgid
]['num_bytes']
527 objects
+= ms
.pg_stat
[pgid
]['num_objects']
528 bytes
+= ms
.pg_stat
[pgid
]['num_bytes']
529 pe
.total_by_root
[root
]['pgs'] += 1
530 pe
.total_by_root
[root
]['objects'] += ms
.pg_stat
[pgid
]['num_objects']
531 pe
.total_by_root
[root
]['bytes'] += ms
.pg_stat
[pgid
]['num_bytes']
533 pe
.count_by_pool
[pool
] = {
536 for k
, v
in six
.iteritems(pgs_by_osd
)
540 for k
, v
in six
.iteritems(objects_by_osd
)
544 for k
, v
in six
.iteritems(bytes_by_osd
)
547 pe
.actual_by_pool
[pool
] = {
549 k
: float(v
) / float(max(pgs
, 1))
550 for k
, v
in six
.iteritems(pgs_by_osd
)
553 k
: float(v
) / float(max(objects
, 1))
554 for k
, v
in six
.iteritems(objects_by_osd
)
557 k
: float(v
) / float(max(bytes
, 1))
558 for k
, v
in six
.iteritems(bytes_by_osd
)
561 pe
.total_by_pool
[pool
] = {
566 for root
in pe
.total_by_root
.iterkeys():
567 pe
.count_by_root
[root
] = {
570 for k
, v
in six
.iteritems(actual_by_root
[root
]['pgs'])
574 for k
, v
in six
.iteritems(actual_by_root
[root
]['objects'])
578 for k
, v
in six
.iteritems(actual_by_root
[root
]['bytes'])
581 pe
.actual_by_root
[root
] = {
583 k
: float(v
) / float(max(pe
.total_by_root
[root
]['pgs'], 1))
584 for k
, v
in six
.iteritems(actual_by_root
[root
]['pgs'])
587 k
: float(v
) / float(max(pe
.total_by_root
[root
]['objects'], 1))
588 for k
, v
in six
.iteritems(actual_by_root
[root
]['objects'])
591 k
: float(v
) / float(max(pe
.total_by_root
[root
]['bytes'], 1))
592 for k
, v
in six
.iteritems(actual_by_root
[root
]['bytes'])
595 self
.log
.debug('actual_by_pool %s' % pe
.actual_by_pool
)
596 self
.log
.debug('actual_by_root %s' % pe
.actual_by_root
)
598 # average and stddev and score
602 pe
.target_by_root
[a
],
604 ) for a
, b
in six
.iteritems(pe
.count_by_root
)
606 self
.log
.debug('stats_by_root %s' % pe
.stats_by_root
)
608 # the scores are already normalized
611 'pgs': pe
.stats_by_root
[r
]['pgs']['score'],
612 'objects': pe
.stats_by_root
[r
]['objects']['score'],
613 'bytes': pe
.stats_by_root
[r
]['bytes']['score'],
614 } for r
in pe
.total_by_root
.keys()
616 self
.log
.debug('score_by_root %s' % pe
.score_by_root
)
618 # get the list of score metrics, comma separated
619 metrics
= self
.get_config('crush_compat_metrics', 'pgs,objects,bytes').split(',')
621 # total score is just average of normalized stddevs
623 for r
, vs
in six
.iteritems(pe
.score_by_root
):
624 for k
, v
in six
.iteritems(vs
):
627 pe
.score
/= len(metrics
) * len(roots
)
630 def evaluate(self
, ms
, pools
, verbose
=False):
631 pe
= self
.calc_eval(ms
, pools
)
632 return pe
.show(verbose
=verbose
)
634 def optimize(self
, plan
):
635 self
.log
.info('Optimize plan %s' % plan
.name
)
636 plan
.mode
= self
.get_config('mode', default_mode
)
637 max_misplaced
= float(self
.get_config('max_misplaced',
638 default_max_misplaced
))
639 self
.log
.info('Mode %s, max misplaced %f' %
640 (plan
.mode
, max_misplaced
))
642 info
= self
.get('pg_status')
643 unknown
= info
.get('unknown_pgs_ratio', 0.0)
644 degraded
= info
.get('degraded_ratio', 0.0)
645 inactive
= info
.get('inactive_pgs_ratio', 0.0)
646 misplaced
= info
.get('misplaced_ratio', 0.0)
647 self
.log
.debug('unknown %f degraded %f inactive %f misplaced %g',
648 unknown
, degraded
, inactive
, misplaced
)
650 detail
= 'Some PGs (%f) are unknown; try again later' % unknown
651 self
.log
.info(detail
)
652 return -errno
.EAGAIN
, detail
654 detail
= 'Some objects (%f) are degraded; try again later' % degraded
655 self
.log
.info(detail
)
656 return -errno
.EAGAIN
, detail
658 detail
= 'Some PGs (%f) are inactive; try again later' % inactive
659 self
.log
.info(detail
)
660 return -errno
.EAGAIN
, detail
661 elif misplaced
>= max_misplaced
:
662 detail
= 'Too many objects (%f > %f) are misplaced; ' \
663 'try again later' % (misplaced
, max_misplaced
)
664 self
.log
.info(detail
)
665 return -errno
.EAGAIN
, detail
667 if plan
.mode
== 'upmap':
668 return self
.do_upmap(plan
)
669 elif plan
.mode
== 'crush-compat':
670 return self
.do_crush_compat(plan
)
671 elif plan
.mode
== 'none':
672 detail
= 'Please do "ceph balancer mode" to choose a valid mode first'
673 self
.log
.info('Idle')
674 return -errno
.ENOEXEC
, detail
676 detail
= 'Unrecognized mode %s' % plan
.mode
677 self
.log
.info(detail
)
678 return -errno
.EINVAL
, detail
681 def do_upmap(self
, plan
):
682 self
.log
.info('do_upmap')
683 max_iterations
= int(self
.get_config('upmap_max_iterations', 10))
684 max_deviation
= float(self
.get_config('upmap_max_deviation', .01))
690 pools
= [str(i
['pool_name']) for i
in ms
.osdmap_dump
.get('pools',[])]
692 detail
= 'No pools available'
693 self
.log
.info(detail
)
694 return -errno
.ENOENT
, detail
695 # shuffle pool list so they all get equal (in)attention
696 random
.shuffle(pools
)
697 self
.log
.info('pools %s' % pools
)
701 left
= max_iterations
703 did
= ms
.osdmap
.calc_pg_upmaps(inc
, max_deviation
, left
, [pool
])
708 self
.log
.info('prepared %d/%d changes' % (total_did
, max_iterations
))
710 return -errno
.EALREADY
, 'Unable to find further optimization,' \
711 'or distribution is already perfect'
714 def do_crush_compat(self
, plan
):
715 self
.log
.info('do_crush_compat')
716 max_iterations
= int(self
.get_config('crush_compat_max_iterations', 25))
717 if max_iterations
< 1:
718 return -errno
.EINVAL
, '"crush_compat_max_iterations" must be >= 1'
719 step
= float(self
.get_config('crush_compat_step', .5))
720 if step
<= 0 or step
>= 1.0:
721 return -errno
.EINVAL
, '"crush_compat_step" must be in (0, 1)'
722 max_misplaced
= float(self
.get_config('max_misplaced',
723 default_max_misplaced
))
728 crush
= osdmap
.get_crush()
729 pe
= self
.calc_eval(ms
, plan
.pools
)
730 min_score_to_optimize
= float(self
.get_config('min_score', 0))
731 if pe
.score
<= min_score_to_optimize
:
733 detail
= 'Distribution is already perfect'
735 detail
= 'score %f <= min_score %f, will not optimize' \
736 % (pe
.score
, min_score_to_optimize
)
737 self
.log
.info(detail
)
738 return -errno
.EALREADY
, detail
740 # get current osd reweights
741 orig_osd_weight
= { a
['osd']: a
['weight']
742 for a
in ms
.osdmap_dump
.get('osds',[]) }
743 reweighted_osds
= [ a
for a
,b
in six
.iteritems(orig_osd_weight
)
744 if b
< 1.0 and b
> 0.0 ]
746 # get current compat weight-set weights
747 orig_ws
= self
.get_compat_weight_set_weights(ms
)
749 return -errno
.EAGAIN
, 'compat weight-set not available'
750 orig_ws
= { a
: b
for a
, b
in six
.iteritems(orig_ws
) if a
>= 0 }
752 # Make sure roots don't overlap their devices. If so, we
754 roots
= pe
.target_by_root
.keys()
755 self
.log
.debug('roots %s', roots
)
759 for root
, wm
in six
.iteritems(pe
.target_by_root
):
760 for osd
in wm
.iterkeys():
765 detail
= 'Some osds belong to multiple subtrees: %s' % \
767 self
.log
.error(detail
)
768 return -errno
.EOPNOTSUPP
, detail
770 # rebalance by pgs, objects, or bytes
771 metrics
= self
.get_config('crush_compat_metrics', 'pgs,objects,bytes').split(',')
772 key
= metrics
[0] # balancing using the first score metric
773 if key
not in ['pgs', 'bytes', 'objects']:
774 self
.log
.warn("Invalid crush_compat balancing key %s. Using 'pgs'." % key
)
778 best_ws
= copy
.deepcopy(orig_ws
)
779 best_ow
= copy
.deepcopy(orig_osd_weight
)
781 left
= max_iterations
783 next_ws
= copy
.deepcopy(best_ws
)
784 next_ow
= copy
.deepcopy(best_ow
)
787 self
.log
.debug('best_ws %s' % best_ws
)
788 random
.shuffle(roots
)
790 pools
= best_pe
.root_pools
[root
]
791 osds
= len(best_pe
.target_by_root
[root
])
792 min_pgs
= osds
* min_pg_per_osd
793 if best_pe
.total_by_root
[root
][key
] < min_pgs
:
794 self
.log
.info('Skipping root %s (pools %s), total pgs %d '
795 '< minimum %d (%d per osd)',
797 best_pe
.total_by_root
[root
][key
],
798 min_pgs
, min_pg_per_osd
)
800 self
.log
.info('Balancing root %s (pools %s) by %s' %
802 target
= best_pe
.target_by_root
[root
]
803 actual
= best_pe
.actual_by_root
[root
][key
]
804 queue
= sorted(actual
.keys(),
805 key
=lambda osd
: -abs(target
[osd
] - actual
[osd
]))
807 if orig_osd_weight
[osd
] == 0:
808 self
.log
.debug('skipping out osd.%d', osd
)
810 deviation
= target
[osd
] - actual
[osd
]
813 self
.log
.debug('osd.%d deviation %f', osd
, deviation
)
814 weight
= best_ws
[osd
]
815 ow
= orig_osd_weight
[osd
]
817 calc_weight
= target
[osd
] / actual
[osd
] * weight
* ow
819 # not enough to go on here... keep orig weight
820 calc_weight
= weight
/ orig_osd_weight
[osd
]
821 new_weight
= weight
* (1.0 - step
) + calc_weight
* step
822 self
.log
.debug('Reweight osd.%d %f -> %f', osd
, weight
,
824 next_ws
[osd
] = new_weight
826 new_ow
= min(1.0, max(step
+ (1.0 - step
) * ow
,
828 self
.log
.debug('Reweight osd.%d reweight %f -> %f',
830 next_ow
[osd
] = new_ow
832 # normalize weights under this root
833 root_weight
= crush
.get_item_weight(pe
.root_ids
[root
])
834 root_sum
= sum(b
for a
,b
in six
.iteritems(next_ws
)
835 if a
in target
.keys())
836 if root_sum
> 0 and root_weight
> 0:
837 factor
= root_sum
/ root_weight
838 self
.log
.debug('normalizing root %s %d, weight %f, '
839 'ws sum %f, factor %f',
840 root
, pe
.root_ids
[root
], root_weight
,
842 for osd
in actual
.keys():
843 next_ws
[osd
] = next_ws
[osd
] / factor
846 plan
.compat_ws
= copy
.deepcopy(next_ws
)
847 next_ms
= plan
.final_state()
848 next_pe
= self
.calc_eval(next_ms
, plan
.pools
)
849 next_misplaced
= next_ms
.calc_misplaced_from(ms
)
850 self
.log
.debug('Step result score %f -> %f, misplacing %f',
851 best_pe
.score
, next_pe
.score
, next_misplaced
)
853 if next_misplaced
> max_misplaced
:
854 if best_pe
.score
< pe
.score
:
855 self
.log
.debug('Step misplaced %f > max %f, stopping',
856 next_misplaced
, max_misplaced
)
859 next_ws
= copy
.deepcopy(best_ws
)
860 next_ow
= copy
.deepcopy(best_ow
)
861 self
.log
.debug('Step misplaced %f > max %f, reducing step to %f',
862 next_misplaced
, max_misplaced
, step
)
864 if next_pe
.score
> best_pe
.score
* 1.0001:
866 if bad_steps
< 5 and random
.randint(0, 100) < 70:
867 self
.log
.debug('Score got worse, taking another step')
870 next_ws
= copy
.deepcopy(best_ws
)
871 next_ow
= copy
.deepcopy(best_ow
)
872 self
.log
.debug('Score got worse, trying smaller step %f',
877 best_ws
= copy
.deepcopy(next_ws
)
878 best_ow
= copy
.deepcopy(next_ow
)
879 if best_pe
.score
== 0:
883 # allow a small regression if we are phasing out osd weights
885 if next_ow
!= orig_osd_weight
:
888 if best_pe
.score
< pe
.score
+ fudge
:
889 self
.log
.info('Success, score %f -> %f', pe
.score
, best_pe
.score
)
890 plan
.compat_ws
= best_ws
891 for osd
, w
in six
.iteritems(best_ow
):
892 if w
!= orig_osd_weight
[osd
]:
893 self
.log
.debug('osd.%d reweight %f', osd
, w
)
894 plan
.osd_weights
[osd
] = w
897 self
.log
.info('Failed to find further optimization, score %f',
900 return -errno
.EDOM
, 'Unable to find further optimization, ' \
901 'change balancer mode and retry might help'
903 def get_compat_weight_set_weights(self
, ms
):
904 if '-1' not in ms
.crush_dump
.get('choose_args', {}):
905 # enable compat weight-set first
906 self
.log
.debug('ceph osd crush weight-set create-compat')
907 result
= CommandResult('')
908 self
.send_command(result
, 'mon', '', json
.dumps({
909 'prefix': 'osd crush weight-set create-compat',
912 r
, outb
, outs
= result
.wait()
914 self
.log
.error('Error creating compat weight-set')
917 result
= CommandResult('')
918 self
.send_command(result
, 'mon', '', json
.dumps({
919 'prefix': 'osd crush dump',
922 r
, outb
, outs
= result
.wait()
924 self
.log
.error('Error dumping crush map')
927 crushmap
= json
.loads(outb
)
929 raise RuntimeError('unable to parse crush map')
931 crushmap
= ms
.crush_dump
933 raw
= crushmap
.get('choose_args',{}).get('-1', [])
937 for t
in crushmap
['buckets']:
938 if t
['id'] == b
['bucket_id']:
942 raise RuntimeError('could not find bucket %s' % b
['bucket_id'])
943 self
.log
.debug('bucket items %s' % bucket
['items'])
944 self
.log
.debug('weight set %s' % b
['weight_set'][0])
945 if len(bucket
['items']) != len(b
['weight_set'][0]):
946 raise RuntimeError('weight-set size does not match bucket items')
947 for pos
in range(len(bucket
['items'])):
948 weight_set
[bucket
['items'][pos
]['id']] = b
['weight_set'][0][pos
]
950 self
.log
.debug('weight_set weights %s' % weight_set
)
954 self
.log
.info('do_crush (not yet implemented)')
956 def do_osd_weight(self
):
957 self
.log
.info('do_osd_weight (not yet implemented)')
959 def execute(self
, plan
):
960 self
.log
.info('Executing plan %s' % plan
.name
)
965 if len(plan
.compat_ws
) and \
966 '-1' not in plan
.initial
.crush_dump
.get('choose_args', {}):
967 self
.log
.debug('ceph osd crush weight-set create-compat')
968 result
= CommandResult('')
969 self
.send_command(result
, 'mon', '', json
.dumps({
970 'prefix': 'osd crush weight-set create-compat',
973 r
, outb
, outs
= result
.wait()
975 self
.log
.error('Error creating compat weight-set')
978 for osd
, weight
in six
.iteritems(plan
.compat_ws
):
979 self
.log
.info('ceph osd crush weight-set reweight-compat osd.%d %f',
981 result
= CommandResult('')
982 self
.send_command(result
, 'mon', '', json
.dumps({
983 'prefix': 'osd crush weight-set reweight-compat',
985 'item': 'osd.%d' % osd
,
988 commands
.append(result
)
992 for osd
, weight
in six
.iteritems(plan
.osd_weights
):
993 reweightn
[str(osd
)] = str(int(weight
* float(0x10000)))
995 self
.log
.info('ceph osd reweightn %s', reweightn
)
996 result
= CommandResult('')
997 self
.send_command(result
, 'mon', '', json
.dumps({
998 'prefix': 'osd reweightn',
1000 'weights': json
.dumps(reweightn
),
1002 commands
.append(result
)
1005 incdump
= plan
.inc
.dump()
1006 for pgid
in incdump
.get('old_pg_upmap_items', []):
1007 self
.log
.info('ceph osd rm-pg-upmap-items %s', pgid
)
1008 result
= CommandResult('foo')
1009 self
.send_command(result
, 'mon', '', json
.dumps({
1010 'prefix': 'osd rm-pg-upmap-items',
1014 commands
.append(result
)
1016 for item
in incdump
.get('new_pg_upmap_items', []):
1017 self
.log
.info('ceph osd pg-upmap-items %s mappings %s', item
['pgid'],
1020 for m
in item
['mappings']:
1021 osdlist
+= [m
['from'], m
['to']]
1022 result
= CommandResult('foo')
1023 self
.send_command(result
, 'mon', '', json
.dumps({
1024 'prefix': 'osd pg-upmap-items',
1026 'pgid': item
['pgid'],
1029 commands
.append(result
)
1032 self
.log
.debug('commands %s' % commands
)
1033 for result
in commands
:
1034 r
, outb
, outs
= result
.wait()
1036 self
.log
.error('execute error: r = %d, detail = %s' % (r
, outs
))
1038 self
.log
.debug('done')