]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
2 Balance PG distribution across OSDs.
12 from mgr_module
import MgrModule
, CommandResult
13 from threading
import Event
14 from mgr_module
import CRUSHMap
17 TIME_FORMAT
= '%Y-%m-%d_%H:%M:%S'
20 def __init__(self
, osdmap
, raw_pg_stats
, raw_pool_stats
, desc
=''):
23 self
.osdmap_dump
= self
.osdmap
.dump()
24 self
.crush
= osdmap
.get_crush()
25 self
.crush_dump
= self
.crush
.dump()
26 self
.raw_pg_stats
= raw_pg_stats
27 self
.raw_pool_stats
= raw_pool_stats
29 i
['pgid']: i
['stat_sum'] for i
in raw_pg_stats
.get('pg_stats', [])
31 osd_poolids
= [p
['pool'] for p
in self
.osdmap_dump
.get('pools', [])]
32 pg_poolids
= [p
['poolid'] for p
in raw_pool_stats
.get('pool_stats', [])]
33 self
.poolids
= set(osd_poolids
) & set(pg_poolids
)
35 self
.pg_up_by_poolid
= {}
36 for poolid
in self
.poolids
:
37 self
.pg_up_by_poolid
[poolid
] = osdmap
.map_pool_pgs_up(poolid
)
38 for a
,b
in six
.iteritems(self
.pg_up_by_poolid
[poolid
]):
41 def calc_misplaced_from(self
, other_ms
):
42 num
= len(other_ms
.pg_up
)
44 for pgid
, before
in six
.iteritems(other_ms
.pg_up
):
45 if before
!= self
.pg_up
.get(pgid
, []):
48 return float(misplaced
) / float(num
)
52 def __init__(self
, name
, mode
, osdmap
, pools
):
56 self
.osdmap_dump
= osdmap
.dump()
60 self
.inc
= osdmap
.new_incremental()
65 Plan with a preloaded MappingState member.
67 def __init__(self
, name
, mode
, ms
, pools
):
68 super(MsPlan
, self
).__init
__(name
, mode
, ms
.osdmap
, pools
)
71 def final_state(self
):
72 self
.inc
.set_osd_reweights(self
.osd_weights
)
73 self
.inc
.set_crush_compat_weight_set_weights(self
.compat_ws
)
74 return MappingState(self
.initial
.osdmap
.apply_incremental(self
.inc
),
75 self
.initial
.raw_pg_stats
,
76 self
.initial
.raw_pool_stats
,
77 'plan %s final' % self
.name
)
80 return json
.dumps(self
.inc
.dump(), indent
=4, sort_keys
=True)
84 ls
.append('# starting osdmap epoch %d' % self
.initial
.osdmap
.get_epoch())
85 ls
.append('# starting crush version %d' %
86 self
.initial
.osdmap
.get_crush_version())
87 ls
.append('# mode %s' % self
.mode
)
88 if len(self
.compat_ws
) and \
89 not CRUSHMap
.have_default_choose_args(self
.initial
.crush_dump
):
90 ls
.append('ceph osd crush weight-set create-compat')
91 for osd
, weight
in six
.iteritems(self
.compat_ws
):
92 ls
.append('ceph osd crush weight-set reweight-compat %s %f' %
94 for osd
, weight
in six
.iteritems(self
.osd_weights
):
95 ls
.append('ceph osd reweight osd.%d %f' % (osd
, weight
))
96 incdump
= self
.inc
.dump()
97 for pgid
in incdump
.get('old_pg_upmap_items', []):
98 ls
.append('ceph osd rm-pg-upmap-items %s' % pgid
)
99 for item
in incdump
.get('new_pg_upmap_items', []):
101 for m
in item
['mappings']:
102 osdlist
+= [m
['from'], m
['to']]
103 ls
.append('ceph osd pg-upmap-items %s %s' %
104 (item
['pgid'], ' '.join([str(a
) for a
in osdlist
])))
109 def __init__(self
, ms
):
111 self
.root_ids
= {} # root name -> id
112 self
.pool_name
= {} # pool id -> pool name
113 self
.pool_id
= {} # pool name -> id
114 self
.pool_roots
= {} # pool name -> root name
115 self
.root_pools
= {} # root name -> pools
116 self
.target_by_root
= {} # root name -> target weight map
117 self
.count_by_pool
= {}
118 self
.count_by_root
= {}
119 self
.actual_by_pool
= {} # pool -> by_* -> actual weight map
120 self
.actual_by_root
= {} # pool -> by_* -> actual weight map
121 self
.total_by_pool
= {} # pool -> by_* -> total
122 self
.total_by_root
= {} # root -> by_* -> total
123 self
.stats_by_pool
= {} # pool -> by_* -> stddev or avg -> value
124 self
.stats_by_root
= {} # root -> by_* -> stddev or avg -> value
126 self
.score_by_pool
= {}
127 self
.score_by_root
= {}
131 def show(self
, verbose
=False):
133 r
= self
.ms
.desc
+ '\n'
134 r
+= 'target_by_root %s\n' % self
.target_by_root
135 r
+= 'actual_by_pool %s\n' % self
.actual_by_pool
136 r
+= 'actual_by_root %s\n' % self
.actual_by_root
137 r
+= 'count_by_pool %s\n' % self
.count_by_pool
138 r
+= 'count_by_root %s\n' % self
.count_by_root
139 r
+= 'total_by_pool %s\n' % self
.total_by_pool
140 r
+= 'total_by_root %s\n' % self
.total_by_root
141 r
+= 'stats_by_root %s\n' % self
.stats_by_root
142 r
+= 'score_by_pool %s\n' % self
.score_by_pool
143 r
+= 'score_by_root %s\n' % self
.score_by_root
145 r
= self
.ms
.desc
+ ' '
146 r
+= 'score %f (lower is better)\n' % self
.score
149 def calc_stats(self
, count
, target
, total
):
150 num
= max(len(target
), 1)
152 for t
in ('pgs', 'objects', 'bytes'):
164 avg
= float(total
[t
]) / float(num
)
167 # score is a measure of how uneven the data distribution is.
168 # score lies between [0, 1), 0 means perfect distribution.
172 for k
, v
in six
.iteritems(count
[t
]):
173 # adjust/normalize by weight
175 adjusted
= float(v
) / target
[k
] / float(num
)
179 # Overweighted devices and their weights are factors to calculate reweight_urgency.
180 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
181 # situation than one 10% overfilled with 5 2% underfilled devices
184 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
185 x = (adjusted - avg)/avg.
186 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
187 To bring range of F(x) in range [0, 1), we need to make the above modification.
189 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
190 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
191 2. A larger value of x, should imply more urgency to reweight.
192 3. Also, the difference between F(x) when x is large, should be minimal.
193 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
195 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
197 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
199 score
+= target
[k
] * (math
.erf(((adjusted
- avg
)/avg
) / math
.sqrt(2.0)))
200 sum_weight
+= target
[k
]
201 dev
+= (avg
- adjusted
) * (avg
- adjusted
)
202 stddev
= math
.sqrt(dev
/ float(max(num
- 1, 1)))
203 score
= score
/ max(sum_weight
, 1)
205 'max': max(count
[t
].values()),
206 'min': min(count
[t
].values()),
209 'sum_weight': sum_weight
,
214 class Module(MgrModule
):
220 'desc': 'automatically balance PGs across cluster',
224 'name': 'begin_time',
227 'desc': 'beginning time of day to automatically balance',
228 'long_desc': 'This is a time of day in the format HHMM.',
235 'desc': 'ending time of day to automatically balance',
236 'long_desc': 'This is a time of day in the format HHMM.',
240 'name': 'begin_weekday',
245 'desc': 'Restrict automatic balancing to this day of the week or later',
246 'long_desc': '0 or 7 = Sunday, 1 = Monday, etc.',
250 'name': 'end_weekday',
255 'desc': 'Restrict automatic balancing to days of the week earlier than this',
256 'long_desc': '0 or 7 = Sunday, 1 = Monday, etc.',
260 'name': 'crush_compat_max_iterations',
265 'desc': 'maximum number of iterations to attempt optimization',
269 'name': 'crush_compat_metrics',
271 'default': 'pgs,objects,bytes',
272 'desc': 'metrics with which to calculate OSD utilization',
273 'long_desc': 'Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
277 'name': 'crush_compat_step',
282 'desc': 'aggressiveness of optimization',
283 'long_desc': '.99 is very aggressive, .01 is less aggressive',
290 'desc': 'minimum score, below which no optimization is attempted',
295 'desc': 'Balancer mode',
297 'enum_allowed': ['none', 'crush-compat', 'upmap'],
301 'name': 'sleep_interval',
304 'desc': 'how frequently to wake up and attempt optimization',
308 'name': 'upmap_max_optimizations',
311 'desc': 'maximum upmap optimizations to make per attempt',
315 'name': 'upmap_max_deviation',
319 'desc': 'deviation below which no optimization is attempted',
320 'long_desc': 'If the number of PGs are within this count then no optimization is attempted',
327 'desc': 'pools which the automatic balancing will be limited to',
334 "cmd": "balancer status",
335 "desc": "Show balancer status",
339 "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
340 "desc": "Set balancer mode",
344 "cmd": "balancer on",
345 "desc": "Enable automatic balancing",
349 "cmd": "balancer off",
350 "desc": "Disable automatic balancing",
354 "cmd": "balancer pool ls",
355 "desc": "List automatic balancing pools. "
356 "Note that empty list means all existing pools will be automatic balancing targets, "
357 "which is the default behaviour of balancer.",
361 "cmd": "balancer pool add name=pools,type=CephString,n=N",
362 "desc": "Enable automatic balancing for specific pools",
366 "cmd": "balancer pool rm name=pools,type=CephString,n=N",
367 "desc": "Disable automatic balancing for specific pools",
371 "cmd": "balancer eval name=option,type=CephString,req=false",
372 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan",
376 "cmd": "balancer eval-verbose name=option,type=CephString,req=false",
377 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely)",
381 "cmd": "balancer optimize name=plan,type=CephString name=pools,type=CephString,n=N,req=false",
382 "desc": "Run optimizer to create a new plan",
386 "cmd": "balancer show name=plan,type=CephString",
387 "desc": "Show details of an optimization plan",
391 "cmd": "balancer rm name=plan,type=CephString",
392 "desc": "Discard an optimization plan",
396 "cmd": "balancer reset",
397 "desc": "Discard all optimization plans",
401 "cmd": "balancer dump name=plan,type=CephString",
402 "desc": "Show an optimization plan",
406 "cmd": "balancer ls",
407 "desc": "List all plans",
411 "cmd": "balancer execute name=plan,type=CephString",
412 "desc": "Execute an optimization plan",
421 last_optimize_started
= ''
422 last_optimize_duration
= ''
424 success_string
= 'Optimization plan created successfully'
425 in_progress_string
= 'in progress'
427 def __init__(self
, *args
, **kwargs
):
428 super(Module
, self
).__init
__(*args
, **kwargs
)
431 def handle_command(self
, inbuf
, command
):
432 self
.log
.warning("Handling command: '%s'" % str(command
))
433 if command
['prefix'] == 'balancer status':
435 'plans': list(self
.plans
.keys()),
436 'active': self
.active
,
437 'last_optimize_started': self
.last_optimize_started
,
438 'last_optimize_duration': self
.last_optimize_duration
,
439 'optimize_result': self
.optimize_result
,
440 'mode': self
.get_module_option('mode'),
442 return (0, json
.dumps(s
, indent
=4, sort_keys
=True), '')
443 elif command
['prefix'] == 'balancer mode':
444 if command
['mode'] == 'upmap':
445 min_compat_client
= self
.get_osdmap().dump().get('require_min_compat_client', '')
446 if min_compat_client
< 'luminous': # works well because version is alphabetized..
447 warn
= 'min_compat_client "%s" ' \
448 '< "luminous", which is required for pg-upmap. ' \
449 'Try "ceph osd set-require-min-compat-client luminous" ' \
450 'before enabling this mode' % min_compat_client
451 return (-errno
.EPERM
, '', warn
)
452 elif command
['mode'] == 'crush-compat':
453 ms
= MappingState(self
.get_osdmap(),
454 self
.get("pg_stats"),
455 self
.get("pool_stats"),
456 'initialize compat weight-set')
457 self
.get_compat_weight_set_weights(ms
) # ignore error
458 self
.set_module_option('mode', command
['mode'])
460 elif command
['prefix'] == 'balancer on':
462 self
.set_module_option('active', 'true')
466 elif command
['prefix'] == 'balancer off':
468 self
.set_module_option('active', 'false')
472 elif command
['prefix'] == 'balancer pool ls':
473 pool_ids
= self
.get_module_option('pool_ids')
476 pool_ids
= pool_ids
.split(',')
477 pool_ids
= [int(p
) for p
in pool_ids
]
478 pool_name_by_id
= dict((p
['pool'], p
['pool_name']) for p
in self
.get_osdmap().dump().get('pools', []))
483 if p
in pool_name_by_id
:
485 final_names
.append(pool_name_by_id
[p
])
488 if should_prune
: # some pools were gone, prune
489 self
.set_module_option('pool_ids', ','.join(final_ids
))
490 return (0, json
.dumps(sorted(final_names
), indent
=4, sort_keys
=True), '')
491 elif command
['prefix'] == 'balancer pool add':
492 raw_names
= command
['pools']
493 pool_id_by_name
= dict((p
['pool_name'], p
['pool']) for p
in self
.get_osdmap().dump().get('pools', []))
494 invalid_names
= [p
for p
in raw_names
if p
not in pool_id_by_name
]
496 return (-errno
.EINVAL
, '', 'pool(s) %s not found' % invalid_names
)
497 to_add
= [str(pool_id_by_name
[p
]) for p
in raw_names
if p
in pool_id_by_name
]
498 existing
= self
.get_module_option('pool_ids')
500 if existing
is not '':
501 existing
= existing
.split(',')
502 final
= set(to_add
) |
set(existing
)
503 self
.set_module_option('pool_ids', ','.join(final
))
505 elif command
['prefix'] == 'balancer pool rm':
506 raw_names
= command
['pools']
507 existing
= self
.get_module_option('pool_ids')
508 if existing
is '': # for idempotence
510 existing
= existing
.split(',')
511 osdmap
= self
.get_osdmap()
512 pool_ids
= [str(p
['pool']) for p
in osdmap
.dump().get('pools', [])]
513 pool_id_by_name
= dict((p
['pool_name'], p
['pool']) for p
in osdmap
.dump().get('pools', []))
514 final
= [p
for p
in existing
if p
in pool_ids
]
515 to_delete
= [str(pool_id_by_name
[p
]) for p
in raw_names
if p
in pool_id_by_name
]
516 final
= set(final
) - set(to_delete
)
517 self
.set_module_option('pool_ids', ','.join(final
))
519 elif command
['prefix'] == 'balancer eval' or command
['prefix'] == 'balancer eval-verbose':
520 verbose
= command
['prefix'] == 'balancer eval-verbose'
522 if 'option' in command
:
523 plan
= self
.plans
.get(command
['option'])
525 # not a plan, does it look like a pool?
526 osdmap
= self
.get_osdmap()
527 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
528 option
= command
['option']
529 if option
not in valid_pool_names
:
530 return (-errno
.EINVAL
, '', 'option "%s" not a plan or a pool' % option
)
532 ms
= MappingState(osdmap
, self
.get("pg_stats"), self
.get("pool_stats"), 'pool "%s"' % option
)
535 if plan
.mode
== 'upmap':
536 # Note that for upmap, to improve the efficiency,
537 # we use a basic version of Plan without keeping the obvious
538 # *redundant* MS member.
539 # Hence ms might not be accurate here since we are basically
540 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
541 # It should not be a big deal though..
542 ms
= MappingState(plan
.osdmap
,
543 self
.get("pg_stats"),
544 self
.get("pool_stats"),
545 'plan "%s"' % plan
.name
)
547 ms
= plan
.final_state()
549 ms
= MappingState(self
.get_osdmap(),
550 self
.get("pg_stats"),
551 self
.get("pool_stats"),
553 return (0, self
.evaluate(ms
, pools
, verbose
=verbose
), '')
554 elif command
['prefix'] == 'balancer optimize':
555 # The GIL can be release by the active balancer, so disallow when active
557 return (-errno
.EINVAL
, '', 'Balancer enabled, disable to optimize manually')
559 return (-errno
.EINVAL
, '', 'Balancer finishing up....try again')
561 if 'pools' in command
:
562 pools
= command
['pools']
563 osdmap
= self
.get_osdmap()
564 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
565 invalid_pool_names
= []
567 if p
not in valid_pool_names
:
568 invalid_pool_names
.append(p
)
569 if len(invalid_pool_names
):
570 return (-errno
.EINVAL
, '', 'pools %s not found' % invalid_pool_names
)
571 plan
= self
.plan_create(command
['plan'], osdmap
, pools
)
572 self
.last_optimize_started
= time
.asctime(time
.localtime())
573 self
.optimize_result
= self
.in_progress_string
575 r
, detail
= self
.optimize(plan
)
577 self
.last_optimize_duration
= str(datetime
.timedelta(seconds
=(end
- start
)))
579 # Add plan if an optimization was created
580 self
.optimize_result
= self
.success_string
581 self
.plans
[command
['plan']] = plan
583 self
.optimize_result
= detail
584 return (r
, '', detail
)
585 elif command
['prefix'] == 'balancer rm':
586 self
.plan_rm(command
['plan'])
588 elif command
['prefix'] == 'balancer reset':
591 elif command
['prefix'] == 'balancer ls':
592 return (0, json
.dumps([p
for p
in self
.plans
], indent
=4, sort_keys
=True), '')
593 elif command
['prefix'] == 'balancer dump':
594 plan
= self
.plans
.get(command
['plan'])
596 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
597 return (0, plan
.dump(), '')
598 elif command
['prefix'] == 'balancer show':
599 plan
= self
.plans
.get(command
['plan'])
601 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
602 return (0, plan
.show(), '')
603 elif command
['prefix'] == 'balancer execute':
604 # The GIL can be release by the active balancer, so disallow when active
606 return (-errno
.EINVAL
, '', 'Balancer enabled, disable to execute a plan')
608 return (-errno
.EINVAL
, '', 'Balancer finishing up....try again')
609 plan
= self
.plans
.get(command
['plan'])
611 return (-errno
.ENOENT
, '', 'plan %s not found' % command
['plan'])
612 r
, detail
= self
.execute(plan
)
613 self
.plan_rm(command
['plan'])
614 return (r
, '', detail
)
616 return (-errno
.EINVAL
, '',
617 "Command not found '{0}'".format(command
['prefix']))
620 self
.log
.info('Stopping')
624 def time_permit(self
):
625 local_time
= time
.localtime()
626 time_of_day
= time
.strftime('%H%M', local_time
)
627 weekday
= (local_time
.tm_wday
+ 1) % 7 # be compatible with C
630 begin_time
= self
.get_module_option('begin_time')
631 end_time
= self
.get_module_option('end_time')
632 if begin_time
<= end_time
:
633 permit
= begin_time
<= time_of_day
< end_time
635 permit
= time_of_day
>= begin_time
or time_of_day
< end_time
637 self
.log
.debug("should run between %s - %s, now %s, skipping",
638 begin_time
, end_time
, time_of_day
)
641 begin_weekday
= self
.get_module_option('begin_weekday')
642 end_weekday
= self
.get_module_option('end_weekday')
643 if begin_weekday
<= end_weekday
:
644 permit
= begin_weekday
<= weekday
< end_weekday
646 permit
= weekday
>= begin_weekday
or weekday
< end_weekday
648 self
.log
.debug("should run between weekday %d - %d, now %d, skipping",
649 begin_weekday
, end_weekday
, weekday
)
655 self
.log
.info('Starting')
657 self
.active
= self
.get_module_option('active')
658 sleep_interval
= self
.get_module_option('sleep_interval')
659 self
.log
.debug('Waking up [%s, now %s]',
660 "active" if self
.active
else "inactive",
661 time
.strftime(TIME_FORMAT
, time
.localtime()))
662 if self
.active
and self
.time_permit():
663 self
.log
.debug('Running')
664 name
= 'auto_%s' % time
.strftime(TIME_FORMAT
, time
.gmtime())
665 osdmap
= self
.get_osdmap()
666 allow
= self
.get_module_option('pool_ids')
669 allow
= allow
.split(',')
670 valid
= [str(p
['pool']) for p
in osdmap
.dump().get('pools', [])]
671 final
= set(allow
) & set(valid
)
672 if set(allow
) - set(valid
): # some pools were gone, prune
673 self
.set_module_option('pool_ids', ','.join(final
))
674 pool_name_by_id
= dict((p
['pool'], p
['pool_name']) for p
in osdmap
.dump().get('pools', []))
675 final
= [int(p
) for p
in final
]
676 final
= [pool_name_by_id
[p
] for p
in final
if p
in pool_name_by_id
]
677 plan
= self
.plan_create(name
, osdmap
, final
)
678 self
.optimizing
= True
679 self
.last_optimize_started
= time
.asctime(time
.localtime())
680 self
.optimize_result
= self
.in_progress_string
682 r
, detail
= self
.optimize(plan
)
684 self
.last_optimize_duration
= str(datetime
.timedelta(seconds
=(end
- start
)))
686 self
.optimize_result
= self
.success_string
689 self
.optimize_result
= detail
690 self
.optimizing
= False
691 self
.log
.debug('Sleeping for %d', sleep_interval
)
692 self
.event
.wait(sleep_interval
)
695 def plan_create(self
, name
, osdmap
, pools
):
696 mode
= self
.get_module_option('mode')
698 # drop unnecessary MS member for upmap mode.
699 # this way we could effectively eliminate the usage of a
700 # complete pg_stats, which can become horribly inefficient
702 plan
= Plan(name
, mode
, osdmap
, pools
)
707 self
.get("pg_stats"),
708 self
.get("pool_stats"),
709 'plan %s initial' % name
),
713 def plan_rm(self
, name
):
714 if name
in self
.plans
:
717 def calc_eval(self
, ms
, pools
):
721 for p
in ms
.osdmap_dump
.get('pools',[]):
722 if len(pools
) and p
['pool_name'] not in pools
:
724 # skip dead or not-yet-ready pools too
725 if p
['pool'] not in ms
.poolids
:
727 pe
.pool_name
[p
['pool']] = p
['pool_name']
728 pe
.pool_id
[p
['pool_name']] = p
['pool']
729 pool_rule
[p
['pool_name']] = p
['crush_rule']
730 pe
.pool_roots
[p
['pool_name']] = []
731 pool_info
[p
['pool_name']] = p
732 if len(pool_info
) == 0:
734 self
.log
.debug('pool_name %s' % pe
.pool_name
)
735 self
.log
.debug('pool_id %s' % pe
.pool_id
)
736 self
.log
.debug('pools %s' % pools
)
737 self
.log
.debug('pool_rule %s' % pool_rule
)
739 osd_weight
= { a
['osd']: a
['weight']
740 for a
in ms
.osdmap_dump
.get('osds',[]) if a
['weight'] > 0 }
742 # get expected distributions by root
744 rootids
= ms
.crush
.find_takes()
746 for rootid
in rootids
:
747 ls
= ms
.osdmap
.get_pools_by_take(rootid
)
749 # find out roots associating with pools we are passed in
751 if candidate
in pe
.pool_name
:
752 want
.append(candidate
)
755 root
= ms
.crush
.get_item_name(rootid
)
756 pe
.root_pools
[root
] = []
758 pe
.pool_roots
[pe
.pool_name
[poolid
]].append(root
)
759 pe
.root_pools
[root
].append(pe
.pool_name
[poolid
])
760 pe
.root_ids
[root
] = rootid
762 weight_map
= ms
.crush
.get_take_weight_osd_map(rootid
)
764 osd
: cw
* osd_weight
[osd
]
765 for osd
,cw
in six
.iteritems(weight_map
) if osd
in osd_weight
and cw
> 0
767 sum_w
= sum(adjusted_map
.values())
768 assert len(adjusted_map
) == 0 or sum_w
> 0
769 pe
.target_by_root
[root
] = { osd
: w
/ sum_w
770 for osd
,w
in six
.iteritems(adjusted_map
) }
771 actual_by_root
[root
] = {
776 for osd
in pe
.target_by_root
[root
]:
777 actual_by_root
[root
]['pgs'][osd
] = 0
778 actual_by_root
[root
]['objects'][osd
] = 0
779 actual_by_root
[root
]['bytes'][osd
] = 0
780 pe
.total_by_root
[root
] = {
785 self
.log
.debug('pool_roots %s' % pe
.pool_roots
)
786 self
.log
.debug('root_pools %s' % pe
.root_pools
)
787 self
.log
.debug('target_by_root %s' % pe
.target_by_root
)
789 # pool and root actual
790 for pool
, pi
in six
.iteritems(pool_info
):
792 pm
= ms
.pg_up_by_poolid
[poolid
]
799 for pgid
, up
in six
.iteritems(pm
):
800 for osd
in [int(osd
) for osd
in up
]:
801 if osd
== CRUSHMap
.ITEM_NONE
:
803 if osd
not in pgs_by_osd
:
805 objects_by_osd
[osd
] = 0
806 bytes_by_osd
[osd
] = 0
808 objects_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_objects']
809 bytes_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_bytes']
810 # pick a root to associate this pg instance with.
811 # note that this is imprecise if the roots have
812 # overlapping children.
813 # FIXME: divide bytes by k for EC pools.
814 for root
in pe
.pool_roots
[pool
]:
815 if osd
in pe
.target_by_root
[root
]:
816 actual_by_root
[root
]['pgs'][osd
] += 1
817 actual_by_root
[root
]['objects'][osd
] += ms
.pg_stat
[pgid
]['num_objects']
818 actual_by_root
[root
]['bytes'][osd
] += ms
.pg_stat
[pgid
]['num_bytes']
820 objects
+= ms
.pg_stat
[pgid
]['num_objects']
821 bytes
+= ms
.pg_stat
[pgid
]['num_bytes']
822 pe
.total_by_root
[root
]['pgs'] += 1
823 pe
.total_by_root
[root
]['objects'] += ms
.pg_stat
[pgid
]['num_objects']
824 pe
.total_by_root
[root
]['bytes'] += ms
.pg_stat
[pgid
]['num_bytes']
826 pe
.count_by_pool
[pool
] = {
829 for k
, v
in six
.iteritems(pgs_by_osd
)
833 for k
, v
in six
.iteritems(objects_by_osd
)
837 for k
, v
in six
.iteritems(bytes_by_osd
)
840 pe
.actual_by_pool
[pool
] = {
842 k
: float(v
) / float(max(pgs
, 1))
843 for k
, v
in six
.iteritems(pgs_by_osd
)
846 k
: float(v
) / float(max(objects
, 1))
847 for k
, v
in six
.iteritems(objects_by_osd
)
850 k
: float(v
) / float(max(bytes
, 1))
851 for k
, v
in six
.iteritems(bytes_by_osd
)
854 pe
.total_by_pool
[pool
] = {
859 for root
in pe
.total_by_root
:
860 pe
.count_by_root
[root
] = {
863 for k
, v
in six
.iteritems(actual_by_root
[root
]['pgs'])
867 for k
, v
in six
.iteritems(actual_by_root
[root
]['objects'])
871 for k
, v
in six
.iteritems(actual_by_root
[root
]['bytes'])
874 pe
.actual_by_root
[root
] = {
876 k
: float(v
) / float(max(pe
.total_by_root
[root
]['pgs'], 1))
877 for k
, v
in six
.iteritems(actual_by_root
[root
]['pgs'])
880 k
: float(v
) / float(max(pe
.total_by_root
[root
]['objects'], 1))
881 for k
, v
in six
.iteritems(actual_by_root
[root
]['objects'])
884 k
: float(v
) / float(max(pe
.total_by_root
[root
]['bytes'], 1))
885 for k
, v
in six
.iteritems(actual_by_root
[root
]['bytes'])
888 self
.log
.debug('actual_by_pool %s' % pe
.actual_by_pool
)
889 self
.log
.debug('actual_by_root %s' % pe
.actual_by_root
)
891 # average and stddev and score
895 pe
.target_by_root
[a
],
897 ) for a
, b
in six
.iteritems(pe
.count_by_root
)
899 self
.log
.debug('stats_by_root %s' % pe
.stats_by_root
)
901 # the scores are already normalized
904 'pgs': pe
.stats_by_root
[r
]['pgs']['score'],
905 'objects': pe
.stats_by_root
[r
]['objects']['score'],
906 'bytes': pe
.stats_by_root
[r
]['bytes']['score'],
907 } for r
in pe
.total_by_root
.keys()
909 self
.log
.debug('score_by_root %s' % pe
.score_by_root
)
911 # get the list of score metrics, comma separated
912 metrics
= self
.get_module_option('crush_compat_metrics').split(',')
914 # total score is just average of normalized stddevs
916 for r
, vs
in six
.iteritems(pe
.score_by_root
):
917 for k
, v
in six
.iteritems(vs
):
920 pe
.score
/= len(metrics
) * len(roots
)
923 def evaluate(self
, ms
, pools
, verbose
=False):
924 pe
= self
.calc_eval(ms
, pools
)
925 return pe
.show(verbose
=verbose
)
927 def optimize(self
, plan
):
928 self
.log
.info('Optimize plan %s' % plan
.name
)
929 max_misplaced
= self
.get_ceph_option('target_max_misplaced_ratio')
930 self
.log
.info('Mode %s, max misplaced %f' %
931 (plan
.mode
, max_misplaced
))
933 info
= self
.get('pg_status')
934 unknown
= info
.get('unknown_pgs_ratio', 0.0)
935 degraded
= info
.get('degraded_ratio', 0.0)
936 inactive
= info
.get('inactive_pgs_ratio', 0.0)
937 misplaced
= info
.get('misplaced_ratio', 0.0)
938 plan
.pg_status
= info
939 self
.log
.debug('unknown %f degraded %f inactive %f misplaced %g',
940 unknown
, degraded
, inactive
, misplaced
)
942 detail
= 'Some PGs (%f) are unknown; try again later' % unknown
943 self
.log
.info(detail
)
944 return -errno
.EAGAIN
, detail
946 detail
= 'Some objects (%f) are degraded; try again later' % degraded
947 self
.log
.info(detail
)
948 return -errno
.EAGAIN
, detail
950 detail
= 'Some PGs (%f) are inactive; try again later' % inactive
951 self
.log
.info(detail
)
952 return -errno
.EAGAIN
, detail
953 elif misplaced
>= max_misplaced
:
954 detail
= 'Too many objects (%f > %f) are misplaced; ' \
955 'try again later' % (misplaced
, max_misplaced
)
956 self
.log
.info(detail
)
957 return -errno
.EAGAIN
, detail
959 if plan
.mode
== 'upmap':
960 return self
.do_upmap(plan
)
961 elif plan
.mode
== 'crush-compat':
962 return self
.do_crush_compat(plan
)
963 elif plan
.mode
== 'none':
964 detail
= 'Please do "ceph balancer mode" to choose a valid mode first'
965 self
.log
.info('Idle')
966 return -errno
.ENOEXEC
, detail
968 detail
= 'Unrecognized mode %s' % plan
.mode
969 self
.log
.info(detail
)
970 return -errno
.EINVAL
, detail
972 def do_upmap(self
, plan
):
973 self
.log
.info('do_upmap')
974 max_optimizations
= self
.get_module_option('upmap_max_optimizations')
975 max_deviation
= self
.get_module_option('upmap_max_deviation')
976 osdmap_dump
= plan
.osdmap_dump
981 pools
= [str(i
['pool_name']) for i
in osdmap_dump
.get('pools',[])]
983 detail
= 'No pools available'
984 self
.log
.info(detail
)
985 return -errno
.ENOENT
, detail
986 # shuffle pool list so they all get equal (in)attention
987 random
.shuffle(pools
)
988 self
.log
.info('pools %s' % pools
)
993 left
= max_optimizations
994 pools_with_pg_merge
= [p
['pool_name'] for p
in osdmap_dump
.get('pools', [])
995 if p
['pg_num'] > p
['pg_num_target']]
996 crush_rule_by_pool_name
= dict((p
['pool_name'], p
['crush_rule']) for p
in osdmap_dump
.get('pools', []))
998 if pool
not in crush_rule_by_pool_name
:
999 self
.log
.info('pool %s does not exist' % pool
)
1001 if pool
in pools_with_pg_merge
:
1002 self
.log
.info('pool %s has pending PG(s) for merging, skipping for now' % pool
)
1004 adjusted_pools
.append(pool
)
1005 # shuffle so all pools get equal (in)attention
1006 random
.shuffle(adjusted_pools
)
1007 pool_dump
= osdmap_dump
.get('pools', [])
1008 for pool
in adjusted_pools
:
1011 if p
['pool_name'] == pool
:
1012 num_pg
= p
['pg_num']
1016 # note that here we deliberately exclude any scrubbing pgs too
1017 # since scrubbing activities have significant impacts on performance
1018 num_pg_active_clean
= 0
1019 for p
in plan
.pg_status
.get('pgs_by_pool_state', []):
1020 pgs_pool_id
= p
['pool_id']
1021 if pgs_pool_id
!= pool_id
:
1023 for s
in p
['pg_state_counts']:
1024 if s
['state_name'] == 'active+clean':
1025 num_pg_active_clean
+= s
['count']
1027 available
= left
- (num_pg
- num_pg_active_clean
)
1028 did
= plan
.osdmap
.calc_pg_upmaps(inc
, max_deviation
, available
, [pool
])
1033 self
.log
.info('prepared %d/%d changes' % (total_did
, max_optimizations
))
1035 return -errno
.EALREADY
, 'Unable to find further optimization, ' \
1036 'or pool(s) pg_num is decreasing, ' \
1037 'or distribution is already perfect'
1040 def do_crush_compat(self
, plan
):
1041 self
.log
.info('do_crush_compat')
1042 max_iterations
= self
.get_module_option('crush_compat_max_iterations')
1043 if max_iterations
< 1:
1044 return -errno
.EINVAL
, '"crush_compat_max_iterations" must be >= 1'
1045 step
= self
.get_module_option('crush_compat_step')
1046 if step
<= 0 or step
>= 1.0:
1047 return -errno
.EINVAL
, '"crush_compat_step" must be in (0, 1)'
1048 max_misplaced
= self
.get_ceph_option('target_max_misplaced_ratio')
1053 crush
= osdmap
.get_crush()
1054 pe
= self
.calc_eval(ms
, plan
.pools
)
1055 min_score_to_optimize
= self
.get_module_option('min_score')
1056 if pe
.score
<= min_score_to_optimize
:
1058 detail
= 'Distribution is already perfect'
1060 detail
= 'score %f <= min_score %f, will not optimize' \
1061 % (pe
.score
, min_score_to_optimize
)
1062 self
.log
.info(detail
)
1063 return -errno
.EALREADY
, detail
1065 # get current osd reweights
1066 orig_osd_weight
= { a
['osd']: a
['weight']
1067 for a
in ms
.osdmap_dump
.get('osds',[]) }
1068 reweighted_osds
= [ a
for a
,b
in six
.iteritems(orig_osd_weight
)
1069 if b
< 1.0 and b
> 0.0 ]
1071 # get current compat weight-set weights
1072 orig_ws
= self
.get_compat_weight_set_weights(ms
)
1074 return -errno
.EAGAIN
, 'compat weight-set not available'
1075 orig_ws
= { a
: b
for a
, b
in six
.iteritems(orig_ws
) if a
>= 0 }
1077 # Make sure roots don't overlap their devices. If so, we
1079 roots
= list(pe
.target_by_root
.keys())
1080 self
.log
.debug('roots %s', roots
)
1084 for root
, wm
in six
.iteritems(pe
.target_by_root
):
1087 if osd
not in overlap
:
1088 overlap
[osd
] = [ visited
[osd
] ]
1089 overlap
[osd
].append(root
)
1091 if len(overlap
) > 0:
1092 detail
= 'Some osds belong to multiple subtrees: %s' % \
1094 self
.log
.error(detail
)
1095 return -errno
.EOPNOTSUPP
, detail
1097 # rebalance by pgs, objects, or bytes
1098 metrics
= self
.get_module_option('crush_compat_metrics').split(',')
1099 key
= metrics
[0] # balancing using the first score metric
1100 if key
not in ['pgs', 'bytes', 'objects']:
1101 self
.log
.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key
)
1105 best_ws
= copy
.deepcopy(orig_ws
)
1106 best_ow
= copy
.deepcopy(orig_osd_weight
)
1108 left
= max_iterations
1110 next_ws
= copy
.deepcopy(best_ws
)
1111 next_ow
= copy
.deepcopy(best_ow
)
1114 self
.log
.debug('best_ws %s' % best_ws
)
1115 random
.shuffle(roots
)
1117 pools
= best_pe
.root_pools
[root
]
1118 osds
= len(best_pe
.target_by_root
[root
])
1119 min_pgs
= osds
* min_pg_per_osd
1120 if best_pe
.total_by_root
[root
][key
] < min_pgs
:
1121 self
.log
.info('Skipping root %s (pools %s), total pgs %d '
1122 '< minimum %d (%d per osd)',
1124 best_pe
.total_by_root
[root
][key
],
1125 min_pgs
, min_pg_per_osd
)
1127 self
.log
.info('Balancing root %s (pools %s) by %s' %
1129 target
= best_pe
.target_by_root
[root
]
1130 actual
= best_pe
.actual_by_root
[root
][key
]
1131 queue
= sorted(actual
.keys(),
1132 key
=lambda osd
: -abs(target
[osd
] - actual
[osd
]))
1134 if orig_osd_weight
[osd
] == 0:
1135 self
.log
.debug('skipping out osd.%d', osd
)
1137 deviation
= target
[osd
] - actual
[osd
]
1140 self
.log
.debug('osd.%d deviation %f', osd
, deviation
)
1141 weight
= best_ws
[osd
]
1142 ow
= orig_osd_weight
[osd
]
1144 calc_weight
= target
[osd
] / actual
[osd
] * weight
* ow
1146 # for newly created osds, reset calc_weight at target value
1147 # this way weight-set will end up absorbing *step* of its
1148 # target (final) value at the very beginning and slowly catch up later.
1149 # note that if this turns out causing too many misplaced
1150 # pgs, then we'll reduce step and retry
1151 calc_weight
= target
[osd
]
1152 new_weight
= weight
* (1.0 - step
) + calc_weight
* step
1153 self
.log
.debug('Reweight osd.%d %f -> %f', osd
, weight
,
1155 next_ws
[osd
] = new_weight
1157 new_ow
= min(1.0, max(step
+ (1.0 - step
) * ow
,
1159 self
.log
.debug('Reweight osd.%d reweight %f -> %f',
1161 next_ow
[osd
] = new_ow
1163 # normalize weights under this root
1164 root_weight
= crush
.get_item_weight(pe
.root_ids
[root
])
1165 root_sum
= sum(b
for a
,b
in six
.iteritems(next_ws
)
1166 if a
in target
.keys())
1167 if root_sum
> 0 and root_weight
> 0:
1168 factor
= root_sum
/ root_weight
1169 self
.log
.debug('normalizing root %s %d, weight %f, '
1170 'ws sum %f, factor %f',
1171 root
, pe
.root_ids
[root
], root_weight
,
1173 for osd
in actual
.keys():
1174 next_ws
[osd
] = next_ws
[osd
] / factor
1177 plan
.compat_ws
= copy
.deepcopy(next_ws
)
1178 next_ms
= plan
.final_state()
1179 next_pe
= self
.calc_eval(next_ms
, plan
.pools
)
1180 next_misplaced
= next_ms
.calc_misplaced_from(ms
)
1181 self
.log
.debug('Step result score %f -> %f, misplacing %f',
1182 best_pe
.score
, next_pe
.score
, next_misplaced
)
1184 if next_misplaced
> max_misplaced
:
1185 if best_pe
.score
< pe
.score
:
1186 self
.log
.debug('Step misplaced %f > max %f, stopping',
1187 next_misplaced
, max_misplaced
)
1190 next_ws
= copy
.deepcopy(best_ws
)
1191 next_ow
= copy
.deepcopy(best_ow
)
1192 self
.log
.debug('Step misplaced %f > max %f, reducing step to %f',
1193 next_misplaced
, max_misplaced
, step
)
1195 if next_pe
.score
> best_pe
.score
* 1.0001:
1197 if bad_steps
< 5 and random
.randint(0, 100) < 70:
1198 self
.log
.debug('Score got worse, taking another step')
1201 next_ws
= copy
.deepcopy(best_ws
)
1202 next_ow
= copy
.deepcopy(best_ow
)
1203 self
.log
.debug('Score got worse, trying smaller step %f',
1208 best_ws
= copy
.deepcopy(next_ws
)
1209 best_ow
= copy
.deepcopy(next_ow
)
1210 if best_pe
.score
== 0:
1214 # allow a small regression if we are phasing out osd weights
1216 if best_ow
!= orig_osd_weight
:
1219 if best_pe
.score
< pe
.score
+ fudge
:
1220 self
.log
.info('Success, score %f -> %f', pe
.score
, best_pe
.score
)
1221 plan
.compat_ws
= best_ws
1222 for osd
, w
in six
.iteritems(best_ow
):
1223 if w
!= orig_osd_weight
[osd
]:
1224 self
.log
.debug('osd.%d reweight %f', osd
, w
)
1225 plan
.osd_weights
[osd
] = w
1228 self
.log
.info('Failed to find further optimization, score %f',
1231 return -errno
.EDOM
, 'Unable to find further optimization, ' \
1232 'change balancer mode and retry might help'
1234 def get_compat_weight_set_weights(self
, ms
):
1235 if not CRUSHMap
.have_default_choose_args(ms
.crush_dump
):
1236 # enable compat weight-set first
1237 self
.log
.debug('ceph osd crush weight-set create-compat')
1238 result
= CommandResult('')
1239 self
.send_command(result
, 'mon', '', json
.dumps({
1240 'prefix': 'osd crush weight-set create-compat',
1243 r
, outb
, outs
= result
.wait()
1245 self
.log
.error('Error creating compat weight-set')
1248 result
= CommandResult('')
1249 self
.send_command(result
, 'mon', '', json
.dumps({
1250 'prefix': 'osd crush dump',
1253 r
, outb
, outs
= result
.wait()
1255 self
.log
.error('Error dumping crush map')
1258 crushmap
= json
.loads(outb
)
1260 raise RuntimeError('unable to parse crush map')
1262 crushmap
= ms
.crush_dump
1264 raw
= CRUSHMap
.get_default_choose_args(crushmap
)
1268 for t
in crushmap
['buckets']:
1269 if t
['id'] == b
['bucket_id']:
1273 raise RuntimeError('could not find bucket %s' % b
['bucket_id'])
1274 self
.log
.debug('bucket items %s' % bucket
['items'])
1275 self
.log
.debug('weight set %s' % b
['weight_set'][0])
1276 if len(bucket
['items']) != len(b
['weight_set'][0]):
1277 raise RuntimeError('weight-set size does not match bucket items')
1278 for pos
in range(len(bucket
['items'])):
1279 weight_set
[bucket
['items'][pos
]['id']] = b
['weight_set'][0][pos
]
1281 self
.log
.debug('weight_set weights %s' % weight_set
)
1285 self
.log
.info('do_crush (not yet implemented)')
1287 def do_osd_weight(self
):
1288 self
.log
.info('do_osd_weight (not yet implemented)')
1290 def execute(self
, plan
):
1291 self
.log
.info('Executing plan %s' % plan
.name
)
1296 if len(plan
.compat_ws
) and \
1297 not CRUSHMap
.have_default_choose_args(plan
.initial
.crush_dump
):
1298 self
.log
.debug('ceph osd crush weight-set create-compat')
1299 result
= CommandResult('')
1300 self
.send_command(result
, 'mon', '', json
.dumps({
1301 'prefix': 'osd crush weight-set create-compat',
1304 r
, outb
, outs
= result
.wait()
1306 self
.log
.error('Error creating compat weight-set')
1309 for osd
, weight
in six
.iteritems(plan
.compat_ws
):
1310 self
.log
.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1312 result
= CommandResult('')
1313 self
.send_command(result
, 'mon', '', json
.dumps({
1314 'prefix': 'osd crush weight-set reweight-compat',
1316 'item': 'osd.%d' % osd
,
1319 commands
.append(result
)
1323 for osd
, weight
in six
.iteritems(plan
.osd_weights
):
1324 reweightn
[str(osd
)] = str(int(weight
* float(0x10000)))
1326 self
.log
.info('ceph osd reweightn %s', reweightn
)
1327 result
= CommandResult('')
1328 self
.send_command(result
, 'mon', '', json
.dumps({
1329 'prefix': 'osd reweightn',
1331 'weights': json
.dumps(reweightn
),
1333 commands
.append(result
)
1336 incdump
= plan
.inc
.dump()
1337 for item
in incdump
.get('new_pg_upmap', []):
1338 self
.log
.info('ceph osd pg-upmap %s mappings %s', item
['pgid'],
1340 result
= CommandResult('foo')
1341 self
.send_command(result
, 'mon', '', json
.dumps({
1342 'prefix': 'osd pg-upmap',
1344 'pgid': item
['pgid'],
1347 commands
.append(result
)
1349 for pgid
in incdump
.get('old_pg_upmap', []):
1350 self
.log
.info('ceph osd rm-pg-upmap %s', pgid
)
1351 result
= CommandResult('foo')
1352 self
.send_command(result
, 'mon', '', json
.dumps({
1353 'prefix': 'osd rm-pg-upmap',
1357 commands
.append(result
)
1359 for item
in incdump
.get('new_pg_upmap_items', []):
1360 self
.log
.info('ceph osd pg-upmap-items %s mappings %s', item
['pgid'],
1363 for m
in item
['mappings']:
1364 osdlist
+= [m
['from'], m
['to']]
1365 result
= CommandResult('foo')
1366 self
.send_command(result
, 'mon', '', json
.dumps({
1367 'prefix': 'osd pg-upmap-items',
1369 'pgid': item
['pgid'],
1372 commands
.append(result
)
1374 for pgid
in incdump
.get('old_pg_upmap_items', []):
1375 self
.log
.info('ceph osd rm-pg-upmap-items %s', pgid
)
1376 result
= CommandResult('foo')
1377 self
.send_command(result
, 'mon', '', json
.dumps({
1378 'prefix': 'osd rm-pg-upmap-items',
1382 commands
.append(result
)
1385 self
.log
.debug('commands %s' % commands
)
1386 for result
in commands
:
1387 r
, outb
, outs
= result
.wait()
1389 self
.log
.error('execute error: r = %d, detail = %s' % (r
, outs
))
1391 self
.log
.debug('done')
1394 def gather_telemetry(self
):
1396 'active': self
.active
,