]>
Commit | Line | Data |
---|---|---|
3efd9988 FG |
1 | |
2 | """ | |
3 | Balance PG distribution across OSDs. | |
4 | """ | |
5 | ||
6 | import copy | |
7 | import errno | |
8 | import json | |
9 | import math | |
10 | import random | |
11 | import time | |
12 | from mgr_module import MgrModule, CommandResult | |
13 | from threading import Event | |
b32b8144 | 14 | from mgr_module import CRUSHMap |
3efd9988 FG |
15 | |
16 | # available modes: 'none', 'crush', 'crush-compat', 'upmap', 'osd_weight' | |
17 | default_mode = 'none' | |
18 | default_sleep_interval = 60 # seconds | |
19 | default_max_misplaced = .05 # max ratio of pgs replaced at a time | |
20 | ||
21 | TIME_FORMAT = '%Y-%m-%d_%H:%M:%S' | |
22 | ||
3efd9988 FG |
23 | class MappingState: |
24 | def __init__(self, osdmap, pg_dump, desc=''): | |
25 | self.desc = desc | |
26 | self.osdmap = osdmap | |
27 | self.osdmap_dump = self.osdmap.dump() | |
28 | self.crush = osdmap.get_crush() | |
29 | self.crush_dump = self.crush.dump() | |
30 | self.pg_dump = pg_dump | |
31 | self.pg_stat = { | |
32 | i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', []) | |
33 | } | |
34 | self.poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])] | |
35 | self.pg_up = {} | |
36 | self.pg_up_by_poolid = {} | |
37 | for poolid in self.poolids: | |
38 | self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid) | |
39 | for a,b in self.pg_up_by_poolid[poolid].iteritems(): | |
40 | self.pg_up[a] = b | |
41 | ||
42 | def calc_misplaced_from(self, other_ms): | |
43 | num = len(other_ms.pg_up) | |
44 | misplaced = 0 | |
45 | for pgid, before in other_ms.pg_up.iteritems(): | |
46 | if before != self.pg_up.get(pgid, []): | |
47 | misplaced += 1 | |
48 | if num > 0: | |
49 | return float(misplaced) / float(num) | |
50 | return 0.0 | |
51 | ||
52 | class Plan: | |
53 | def __init__(self, name, ms): | |
54 | self.mode = 'unknown' | |
55 | self.name = name | |
56 | self.initial = ms | |
57 | ||
58 | self.osd_weights = {} | |
59 | self.compat_ws = {} | |
60 | self.inc = ms.osdmap.new_incremental() | |
61 | ||
62 | def final_state(self): | |
63 | self.inc.set_osd_reweights(self.osd_weights) | |
64 | self.inc.set_crush_compat_weight_set_weights(self.compat_ws) | |
65 | return MappingState(self.initial.osdmap.apply_incremental(self.inc), | |
66 | self.initial.pg_dump, | |
67 | 'plan %s final' % self.name) | |
68 | ||
69 | def dump(self): | |
70 | return json.dumps(self.inc.dump(), indent=4) | |
71 | ||
72 | def show(self): | |
73 | ls = [] | |
74 | ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch()) | |
75 | ls.append('# starting crush version %d' % | |
76 | self.initial.osdmap.get_crush_version()) | |
77 | ls.append('# mode %s' % self.mode) | |
78 | if len(self.compat_ws) and \ | |
79 | '-1' not in self.initial.crush_dump.get('choose_args', {}): | |
80 | ls.append('ceph osd crush weight-set create-compat') | |
81 | for osd, weight in self.compat_ws.iteritems(): | |
82 | ls.append('ceph osd crush weight-set reweight-compat %s %f' % | |
83 | (osd, weight)) | |
84 | for osd, weight in self.osd_weights.iteritems(): | |
85 | ls.append('ceph osd reweight osd.%d %f' % (osd, weight)) | |
86 | incdump = self.inc.dump() | |
87 | for pgid in incdump.get('old_pg_upmap_items', []): | |
88 | ls.append('ceph osd rm-pg-upmap-items %s' % pgid) | |
89 | for item in incdump.get('new_pg_upmap_items', []): | |
90 | osdlist = [] | |
91 | for m in item['mappings']: | |
92 | osdlist += [m['from'], m['to']] | |
93 | ls.append('ceph osd pg-upmap-items %s %s' % | |
94 | (item['pgid'], ' '.join([str(a) for a in osdlist]))) | |
95 | return '\n'.join(ls) | |
96 | ||
97 | ||
98 | class Eval: | |
99 | root_ids = {} # root name -> id | |
100 | pool_name = {} # pool id -> pool name | |
101 | pool_id = {} # pool name -> id | |
102 | pool_roots = {} # pool name -> root name | |
103 | root_pools = {} # root name -> pools | |
104 | target_by_root = {} # root name -> target weight map | |
105 | count_by_pool = {} | |
106 | count_by_root = {} | |
107 | actual_by_pool = {} # pool -> by_* -> actual weight map | |
108 | actual_by_root = {} # pool -> by_* -> actual weight map | |
109 | total_by_pool = {} # pool -> by_* -> total | |
110 | total_by_root = {} # root -> by_* -> total | |
111 | stats_by_pool = {} # pool -> by_* -> stddev or avg -> value | |
112 | stats_by_root = {} # root -> by_* -> stddev or avg -> value | |
113 | ||
114 | score_by_pool = {} | |
115 | score_by_root = {} | |
116 | ||
117 | score = 0.0 | |
118 | ||
119 | def __init__(self, ms): | |
120 | self.ms = ms | |
121 | ||
122 | def show(self, verbose=False): | |
123 | if verbose: | |
124 | r = self.ms.desc + '\n' | |
125 | r += 'target_by_root %s\n' % self.target_by_root | |
126 | r += 'actual_by_pool %s\n' % self.actual_by_pool | |
127 | r += 'actual_by_root %s\n' % self.actual_by_root | |
128 | r += 'count_by_pool %s\n' % self.count_by_pool | |
129 | r += 'count_by_root %s\n' % self.count_by_root | |
130 | r += 'total_by_pool %s\n' % self.total_by_pool | |
131 | r += 'total_by_root %s\n' % self.total_by_root | |
132 | r += 'stats_by_root %s\n' % self.stats_by_root | |
133 | r += 'score_by_pool %s\n' % self.score_by_pool | |
134 | r += 'score_by_root %s\n' % self.score_by_root | |
135 | else: | |
136 | r = self.ms.desc + ' ' | |
137 | r += 'score %f (lower is better)\n' % self.score | |
138 | return r | |
139 | ||
140 | def calc_stats(self, count, target, total): | |
141 | num = max(len(target), 1) | |
142 | r = {} | |
143 | for t in ('pgs', 'objects', 'bytes'): | |
144 | avg = float(total[t]) / float(num) | |
145 | dev = 0.0 | |
146 | ||
147 | # score is a measure of how uneven the data distribution is. | |
148 | # score lies between [0, 1), 0 means perfect distribution. | |
149 | score = 0.0 | |
150 | sum_weight = 0.0 | |
151 | ||
152 | for k, v in count[t].iteritems(): | |
153 | # adjust/normalize by weight | |
154 | if target[k]: | |
155 | adjusted = float(v) / target[k] / float(num) | |
156 | else: | |
157 | adjusted = 0.0 | |
158 | ||
159 | # Overweighted devices and their weights are factors to calculate reweight_urgency. | |
160 | # One 10% underfilled device with 5 2% overfilled devices, is arguably a better | |
161 | # situation than one 10% overfilled with 5 2% underfilled devices | |
162 | if adjusted > avg: | |
163 | ''' | |
164 | F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution | |
165 | x = (adjusted - avg)/avg. | |
166 | Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1). | |
167 | To bring range of F(x) in range [0, 1), we need to make the above modification. | |
168 | ||
169 | In general, we need to use a function F(x), where x = (adjusted - avg)/avg | |
170 | 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded. | |
171 | 2. A larger value of x, should imply more urgency to reweight. | |
172 | 3. Also, the difference between F(x) when x is large, should be minimal. | |
173 | 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply. | |
174 | ||
175 | Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use. | |
176 | ||
177 | cdf of standard normal distribution: https://stackoverflow.com/a/29273201 | |
178 | ''' | |
179 | score += target[k] * (math.erf(((adjusted - avg)/avg) / math.sqrt(2.0))) | |
180 | sum_weight += target[k] | |
181 | dev += (avg - adjusted) * (avg - adjusted) | |
182 | stddev = math.sqrt(dev / float(max(num - 1, 1))) | |
183 | score = score / max(sum_weight, 1) | |
184 | r[t] = { | |
185 | 'avg': avg, | |
186 | 'stddev': stddev, | |
187 | 'sum_weight': sum_weight, | |
188 | 'score': score, | |
189 | } | |
190 | return r | |
191 | ||
192 | class Module(MgrModule): | |
193 | COMMANDS = [ | |
194 | { | |
195 | "cmd": "balancer status", | |
196 | "desc": "Show balancer status", | |
197 | "perm": "r", | |
198 | }, | |
199 | { | |
200 | "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap", | |
201 | "desc": "Set balancer mode", | |
202 | "perm": "rw", | |
203 | }, | |
204 | { | |
205 | "cmd": "balancer on", | |
206 | "desc": "Enable automatic balancing", | |
207 | "perm": "rw", | |
208 | }, | |
209 | { | |
210 | "cmd": "balancer off", | |
211 | "desc": "Disable automatic balancing", | |
212 | "perm": "rw", | |
213 | }, | |
214 | { | |
215 | "cmd": "balancer eval name=plan,type=CephString,req=false", | |
216 | "desc": "Evaluate data distribution for the current cluster or specific plan", | |
217 | "perm": "r", | |
218 | }, | |
219 | { | |
220 | "cmd": "balancer eval-verbose name=plan,type=CephString,req=false", | |
221 | "desc": "Evaluate data distribution for the current cluster or specific plan (verbosely)", | |
222 | "perm": "r", | |
223 | }, | |
224 | { | |
225 | "cmd": "balancer optimize name=plan,type=CephString", | |
226 | "desc": "Run optimizer to create a new plan", | |
227 | "perm": "rw", | |
228 | }, | |
229 | { | |
230 | "cmd": "balancer show name=plan,type=CephString", | |
231 | "desc": "Show details of an optimization plan", | |
232 | "perm": "r", | |
233 | }, | |
234 | { | |
235 | "cmd": "balancer rm name=plan,type=CephString", | |
236 | "desc": "Discard an optimization plan", | |
237 | "perm": "rw", | |
238 | }, | |
239 | { | |
240 | "cmd": "balancer reset", | |
241 | "desc": "Discard all optimization plans", | |
242 | "perm": "rw", | |
243 | }, | |
244 | { | |
245 | "cmd": "balancer dump name=plan,type=CephString", | |
246 | "desc": "Show an optimization plan", | |
247 | "perm": "r", | |
248 | }, | |
249 | { | |
250 | "cmd": "balancer execute name=plan,type=CephString", | |
251 | "desc": "Execute an optimization plan", | |
252 | "perm": "r", | |
253 | }, | |
254 | ] | |
255 | active = False | |
256 | run = True | |
257 | plans = {} | |
258 | mode = '' | |
259 | ||
260 | def __init__(self, *args, **kwargs): | |
261 | super(Module, self).__init__(*args, **kwargs) | |
262 | self.event = Event() | |
263 | ||
264 | def handle_command(self, command): | |
265 | self.log.warn("Handling command: '%s'" % str(command)) | |
266 | if command['prefix'] == 'balancer status': | |
267 | s = { | |
268 | 'plans': self.plans.keys(), | |
269 | 'active': self.active, | |
270 | 'mode': self.get_config('mode', default_mode), | |
271 | } | |
272 | return (0, json.dumps(s, indent=4), '') | |
273 | elif command['prefix'] == 'balancer mode': | |
274 | self.set_config('mode', command['mode']) | |
275 | return (0, '', '') | |
276 | elif command['prefix'] == 'balancer on': | |
277 | if not self.active: | |
278 | self.set_config('active', '1') | |
279 | self.active = True | |
280 | self.event.set() | |
281 | return (0, '', '') | |
282 | elif command['prefix'] == 'balancer off': | |
283 | if self.active: | |
284 | self.set_config('active', '') | |
285 | self.active = False | |
286 | self.event.set() | |
287 | return (0, '', '') | |
288 | elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose': | |
289 | verbose = command['prefix'] == 'balancer eval-verbose' | |
290 | if 'plan' in command: | |
291 | plan = self.plans.get(command['plan']) | |
292 | if not plan: | |
293 | return (-errno.ENOENT, '', 'plan %s not found' % | |
294 | command['plan']) | |
295 | ms = plan.final_state() | |
296 | else: | |
297 | ms = MappingState(self.get_osdmap(), | |
298 | self.get("pg_dump"), | |
299 | 'current cluster') | |
300 | return (0, self.evaluate(ms, verbose=verbose), '') | |
301 | elif command['prefix'] == 'balancer optimize': | |
302 | plan = self.plan_create(command['plan']) | |
303 | self.optimize(plan) | |
304 | return (0, '', '') | |
305 | elif command['prefix'] == 'balancer rm': | |
b32b8144 | 306 | self.plan_rm(command['plan']) |
3efd9988 FG |
307 | return (0, '', '') |
308 | elif command['prefix'] == 'balancer reset': | |
309 | self.plans = {} | |
310 | return (0, '', '') | |
311 | elif command['prefix'] == 'balancer dump': | |
312 | plan = self.plans.get(command['plan']) | |
313 | if not plan: | |
314 | return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) | |
315 | return (0, plan.dump(), '') | |
316 | elif command['prefix'] == 'balancer show': | |
317 | plan = self.plans.get(command['plan']) | |
318 | if not plan: | |
319 | return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) | |
320 | return (0, plan.show(), '') | |
321 | elif command['prefix'] == 'balancer execute': | |
322 | plan = self.plans.get(command['plan']) | |
323 | if not plan: | |
324 | return (-errno.ENOENT, '', 'plan %s not found' % command['plan']) | |
325 | self.execute(plan) | |
326 | self.plan_rm(plan) | |
327 | return (0, '', '') | |
328 | else: | |
329 | return (-errno.EINVAL, '', | |
330 | "Command not found '{0}'".format(command['prefix'])) | |
331 | ||
332 | def shutdown(self): | |
333 | self.log.info('Stopping') | |
334 | self.run = False | |
335 | self.event.set() | |
336 | ||
337 | def time_in_interval(self, tod, begin, end): | |
338 | if begin <= end: | |
339 | return tod >= begin and tod < end | |
340 | else: | |
341 | return tod >= begin or tod < end | |
342 | ||
343 | def serve(self): | |
344 | self.log.info('Starting') | |
345 | while self.run: | |
346 | self.active = self.get_config('active', '') is not '' | |
347 | begin_time = self.get_config('begin_time') or '0000' | |
348 | end_time = self.get_config('end_time') or '2400' | |
349 | timeofday = time.strftime('%H%M', time.localtime()) | |
350 | self.log.debug('Waking up [%s, scheduled for %s-%s, now %s]', | |
351 | "active" if self.active else "inactive", | |
352 | begin_time, end_time, timeofday) | |
353 | sleep_interval = float(self.get_config('sleep_interval', | |
354 | default_sleep_interval)) | |
355 | if self.active and self.time_in_interval(timeofday, begin_time, end_time): | |
356 | self.log.debug('Running') | |
357 | name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime()) | |
358 | plan = self.plan_create(name) | |
359 | if self.optimize(plan): | |
360 | self.execute(plan) | |
361 | self.plan_rm(name) | |
362 | self.log.debug('Sleeping for %d', sleep_interval) | |
363 | self.event.wait(sleep_interval) | |
364 | self.event.clear() | |
365 | ||
366 | def plan_create(self, name): | |
367 | plan = Plan(name, MappingState(self.get_osdmap(), | |
368 | self.get("pg_dump"), | |
369 | 'plan %s initial' % name)) | |
370 | self.plans[name] = plan | |
371 | return plan | |
372 | ||
373 | def plan_rm(self, name): | |
374 | if name in self.plans: | |
375 | del self.plans[name] | |
376 | ||
377 | def calc_eval(self, ms): | |
378 | pe = Eval(ms) | |
379 | pool_rule = {} | |
380 | pool_info = {} | |
381 | for p in ms.osdmap_dump.get('pools',[]): | |
382 | pe.pool_name[p['pool']] = p['pool_name'] | |
383 | pe.pool_id[p['pool_name']] = p['pool'] | |
384 | pool_rule[p['pool_name']] = p['crush_rule'] | |
385 | pe.pool_roots[p['pool_name']] = [] | |
386 | pool_info[p['pool_name']] = p | |
387 | pools = pe.pool_id.keys() | |
388 | if len(pools) == 0: | |
389 | return pe | |
390 | self.log.debug('pool_name %s' % pe.pool_name) | |
391 | self.log.debug('pool_id %s' % pe.pool_id) | |
392 | self.log.debug('pools %s' % pools) | |
393 | self.log.debug('pool_rule %s' % pool_rule) | |
394 | ||
395 | osd_weight = { a['osd']: a['weight'] | |
396 | for a in ms.osdmap_dump.get('osds',[]) } | |
397 | ||
398 | # get expected distributions by root | |
399 | actual_by_root = {} | |
400 | rootids = ms.crush.find_takes() | |
401 | roots = [] | |
402 | for rootid in rootids: | |
403 | root = ms.crush.get_item_name(rootid) | |
404 | pe.root_ids[root] = rootid | |
405 | roots.append(root) | |
406 | ls = ms.osdmap.get_pools_by_take(rootid) | |
407 | pe.root_pools[root] = [] | |
408 | for poolid in ls: | |
409 | pe.pool_roots[pe.pool_name[poolid]].append(root) | |
410 | pe.root_pools[root].append(pe.pool_name[poolid]) | |
411 | weight_map = ms.crush.get_take_weight_osd_map(rootid) | |
412 | adjusted_map = { | |
413 | osd: cw * osd_weight.get(osd, 1.0) | |
414 | for osd,cw in weight_map.iteritems() | |
415 | } | |
416 | sum_w = sum(adjusted_map.values()) or 1.0 | |
417 | pe.target_by_root[root] = { osd: w / sum_w | |
418 | for osd,w in adjusted_map.iteritems() } | |
419 | actual_by_root[root] = { | |
420 | 'pgs': {}, | |
421 | 'objects': {}, | |
422 | 'bytes': {}, | |
423 | } | |
424 | for osd in pe.target_by_root[root].iterkeys(): | |
425 | actual_by_root[root]['pgs'][osd] = 0 | |
426 | actual_by_root[root]['objects'][osd] = 0 | |
427 | actual_by_root[root]['bytes'][osd] = 0 | |
428 | pe.total_by_root[root] = { | |
429 | 'pgs': 0, | |
430 | 'objects': 0, | |
431 | 'bytes': 0, | |
432 | } | |
433 | self.log.debug('pool_roots %s' % pe.pool_roots) | |
434 | self.log.debug('root_pools %s' % pe.root_pools) | |
435 | self.log.debug('target_by_root %s' % pe.target_by_root) | |
436 | ||
437 | # pool and root actual | |
438 | for pool, pi in pool_info.iteritems(): | |
439 | poolid = pi['pool'] | |
440 | pm = ms.pg_up_by_poolid[poolid] | |
441 | pgs = 0 | |
442 | objects = 0 | |
443 | bytes = 0 | |
444 | pgs_by_osd = {} | |
445 | objects_by_osd = {} | |
446 | bytes_by_osd = {} | |
447 | for root in pe.pool_roots[pool]: | |
448 | for osd in pe.target_by_root[root].iterkeys(): | |
449 | pgs_by_osd[osd] = 0 | |
450 | objects_by_osd[osd] = 0 | |
451 | bytes_by_osd[osd] = 0 | |
452 | for pgid, up in pm.iteritems(): | |
453 | for osd in [int(osd) for osd in up]: | |
b32b8144 FG |
454 | if osd == CRUSHMap.ITEM_NONE: |
455 | continue | |
3efd9988 FG |
456 | pgs_by_osd[osd] += 1 |
457 | objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects'] | |
458 | bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes'] | |
459 | # pick a root to associate this pg instance with. | |
460 | # note that this is imprecise if the roots have | |
461 | # overlapping children. | |
462 | # FIXME: divide bytes by k for EC pools. | |
463 | for root in pe.pool_roots[pool]: | |
464 | if osd in pe.target_by_root[root]: | |
465 | actual_by_root[root]['pgs'][osd] += 1 | |
466 | actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects'] | |
467 | actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes'] | |
468 | pgs += 1 | |
469 | objects += ms.pg_stat[pgid]['num_objects'] | |
470 | bytes += ms.pg_stat[pgid]['num_bytes'] | |
471 | pe.total_by_root[root]['pgs'] += 1 | |
472 | pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects'] | |
473 | pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes'] | |
474 | break | |
475 | pe.count_by_pool[pool] = { | |
476 | 'pgs': { | |
477 | k: v | |
478 | for k, v in pgs_by_osd.iteritems() | |
479 | }, | |
480 | 'objects': { | |
481 | k: v | |
482 | for k, v in objects_by_osd.iteritems() | |
483 | }, | |
484 | 'bytes': { | |
485 | k: v | |
486 | for k, v in bytes_by_osd.iteritems() | |
487 | }, | |
488 | } | |
489 | pe.actual_by_pool[pool] = { | |
490 | 'pgs': { | |
491 | k: float(v) / float(max(pgs, 1)) | |
492 | for k, v in pgs_by_osd.iteritems() | |
493 | }, | |
494 | 'objects': { | |
495 | k: float(v) / float(max(objects, 1)) | |
496 | for k, v in objects_by_osd.iteritems() | |
497 | }, | |
498 | 'bytes': { | |
499 | k: float(v) / float(max(bytes, 1)) | |
500 | for k, v in bytes_by_osd.iteritems() | |
501 | }, | |
502 | } | |
503 | pe.total_by_pool[pool] = { | |
504 | 'pgs': pgs, | |
505 | 'objects': objects, | |
506 | 'bytes': bytes, | |
507 | } | |
508 | for root, m in pe.total_by_root.iteritems(): | |
509 | pe.count_by_root[root] = { | |
510 | 'pgs': { | |
511 | k: float(v) | |
512 | for k, v in actual_by_root[root]['pgs'].iteritems() | |
513 | }, | |
514 | 'objects': { | |
515 | k: float(v) | |
516 | for k, v in actual_by_root[root]['objects'].iteritems() | |
517 | }, | |
518 | 'bytes': { | |
519 | k: float(v) | |
520 | for k, v in actual_by_root[root]['bytes'].iteritems() | |
521 | }, | |
522 | } | |
523 | pe.actual_by_root[root] = { | |
524 | 'pgs': { | |
525 | k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1)) | |
526 | for k, v in actual_by_root[root]['pgs'].iteritems() | |
527 | }, | |
528 | 'objects': { | |
529 | k: float(v) / float(max(pe.total_by_root[root]['objects'], 1)) | |
530 | for k, v in actual_by_root[root]['objects'].iteritems() | |
531 | }, | |
532 | 'bytes': { | |
533 | k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1)) | |
534 | for k, v in actual_by_root[root]['bytes'].iteritems() | |
535 | }, | |
536 | } | |
537 | self.log.debug('actual_by_pool %s' % pe.actual_by_pool) | |
538 | self.log.debug('actual_by_root %s' % pe.actual_by_root) | |
539 | ||
540 | # average and stddev and score | |
541 | pe.stats_by_root = { | |
542 | a: pe.calc_stats( | |
543 | b, | |
544 | pe.target_by_root[a], | |
545 | pe.total_by_root[a] | |
546 | ) for a, b in pe.count_by_root.iteritems() | |
547 | } | |
548 | ||
549 | # the scores are already normalized | |
550 | pe.score_by_root = { | |
551 | r: { | |
552 | 'pgs': pe.stats_by_root[r]['pgs']['score'], | |
553 | 'objects': pe.stats_by_root[r]['objects']['score'], | |
554 | 'bytes': pe.stats_by_root[r]['bytes']['score'], | |
555 | } for r in pe.total_by_root.keys() | |
556 | } | |
557 | ||
558 | # total score is just average of normalized stddevs | |
559 | pe.score = 0.0 | |
560 | for r, vs in pe.score_by_root.iteritems(): | |
561 | for k, v in vs.iteritems(): | |
562 | pe.score += v | |
563 | pe.score /= 3 * len(roots) | |
564 | return pe | |
565 | ||
566 | def evaluate(self, ms, verbose=False): | |
567 | pe = self.calc_eval(ms) | |
568 | return pe.show(verbose=verbose) | |
569 | ||
570 | def optimize(self, plan): | |
571 | self.log.info('Optimize plan %s' % plan.name) | |
572 | plan.mode = self.get_config('mode', default_mode) | |
573 | max_misplaced = float(self.get_config('max_misplaced', | |
574 | default_max_misplaced)) | |
575 | self.log.info('Mode %s, max misplaced %f' % | |
576 | (plan.mode, max_misplaced)) | |
577 | ||
578 | info = self.get('pg_status') | |
579 | unknown = info.get('unknown_pgs_ratio', 0.0) | |
580 | degraded = info.get('degraded_ratio', 0.0) | |
581 | inactive = info.get('inactive_pgs_ratio', 0.0) | |
582 | misplaced = info.get('misplaced_ratio', 0.0) | |
583 | self.log.debug('unknown %f degraded %f inactive %f misplaced %g', | |
584 | unknown, degraded, inactive, misplaced) | |
585 | if unknown > 0.0: | |
586 | self.log.info('Some PGs (%f) are unknown; waiting', unknown) | |
587 | elif degraded > 0.0: | |
588 | self.log.info('Some objects (%f) are degraded; waiting', degraded) | |
589 | elif inactive > 0.0: | |
590 | self.log.info('Some PGs (%f) are inactive; waiting', inactive) | |
591 | elif misplaced >= max_misplaced: | |
592 | self.log.info('Too many objects (%f > %f) are misplaced; waiting', | |
593 | misplaced, max_misplaced) | |
594 | else: | |
595 | if plan.mode == 'upmap': | |
596 | return self.do_upmap(plan) | |
597 | elif plan.mode == 'crush-compat': | |
598 | return self.do_crush_compat(plan) | |
599 | elif plan.mode == 'none': | |
600 | self.log.info('Idle') | |
601 | else: | |
602 | self.log.info('Unrecognized mode %s' % plan.mode) | |
603 | return False | |
604 | ||
605 | ## | |
606 | ||
607 | def do_upmap(self, plan): | |
608 | self.log.info('do_upmap') | |
b32b8144 FG |
609 | max_iterations = int(self.get_config('upmap_max_iterations', 10)) |
610 | max_deviation = float(self.get_config('upmap_max_deviation', .01)) | |
3efd9988 FG |
611 | |
612 | ms = plan.initial | |
613 | pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])] | |
614 | if len(pools) == 0: | |
615 | self.log.info('no pools, nothing to do') | |
616 | return False | |
617 | # shuffle pool list so they all get equal (in)attention | |
618 | random.shuffle(pools) | |
619 | self.log.info('pools %s' % pools) | |
620 | ||
621 | inc = plan.inc | |
622 | total_did = 0 | |
623 | left = max_iterations | |
624 | for pool in pools: | |
625 | did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool]) | |
626 | total_did += did | |
627 | left -= did | |
628 | if left <= 0: | |
629 | break | |
630 | self.log.info('prepared %d/%d changes' % (total_did, max_iterations)) | |
631 | return True | |
632 | ||
633 | def do_crush_compat(self, plan): | |
634 | self.log.info('do_crush_compat') | |
b32b8144 | 635 | max_iterations = int(self.get_config('crush_compat_max_iterations', 25)) |
3efd9988 FG |
636 | if max_iterations < 1: |
637 | return False | |
b32b8144 | 638 | step = float(self.get_config('crush_compat_step', .5)) |
3efd9988 FG |
639 | if step <= 0 or step >= 1.0: |
640 | return False | |
641 | max_misplaced = float(self.get_config('max_misplaced', | |
642 | default_max_misplaced)) | |
643 | min_pg_per_osd = 2 | |
644 | ||
645 | ms = plan.initial | |
646 | osdmap = ms.osdmap | |
647 | crush = osdmap.get_crush() | |
648 | pe = self.calc_eval(ms) | |
649 | if pe.score == 0: | |
650 | self.log.info('Distribution is already perfect') | |
651 | return False | |
652 | ||
653 | # get current osd reweights | |
654 | orig_osd_weight = { a['osd']: a['weight'] | |
655 | for a in ms.osdmap_dump.get('osds',[]) } | |
656 | reweighted_osds = [ a for a,b in orig_osd_weight.iteritems() | |
657 | if b < 1.0 and b > 0.0 ] | |
658 | ||
659 | # get current compat weight-set weights | |
660 | orig_ws = self.get_compat_weight_set_weights() | |
b32b8144 FG |
661 | if orig_ws is None: |
662 | return False | |
3efd9988 FG |
663 | orig_ws = { a: b for a, b in orig_ws.iteritems() if a >= 0 } |
664 | ||
665 | # Make sure roots don't overlap their devices. If so, we | |
666 | # can't proceed. | |
667 | roots = pe.target_by_root.keys() | |
668 | self.log.debug('roots %s', roots) | |
669 | visited = {} | |
670 | overlap = {} | |
671 | root_ids = {} | |
672 | for root, wm in pe.target_by_root.iteritems(): | |
673 | for osd in wm.iterkeys(): | |
674 | if osd in visited: | |
675 | overlap[osd] = 1 | |
676 | visited[osd] = 1 | |
677 | if len(overlap) > 0: | |
b32b8144 | 678 | self.log.error('error: some osds belong to multiple subtrees: %s' % |
3efd9988 FG |
679 | overlap) |
680 | return False | |
681 | ||
682 | key = 'pgs' # pgs objects or bytes | |
683 | ||
684 | # go | |
685 | best_ws = copy.deepcopy(orig_ws) | |
686 | best_ow = copy.deepcopy(orig_osd_weight) | |
687 | best_pe = pe | |
688 | left = max_iterations | |
689 | bad_steps = 0 | |
690 | next_ws = copy.deepcopy(best_ws) | |
691 | next_ow = copy.deepcopy(best_ow) | |
692 | while left > 0: | |
693 | # adjust | |
694 | self.log.debug('best_ws %s' % best_ws) | |
695 | random.shuffle(roots) | |
696 | for root in roots: | |
697 | pools = best_pe.root_pools[root] | |
698 | pgs = len(best_pe.target_by_root[root]) | |
699 | min_pgs = pgs * min_pg_per_osd | |
700 | if best_pe.total_by_root[root] < min_pgs: | |
701 | self.log.info('Skipping root %s (pools %s), total pgs %d ' | |
702 | '< minimum %d (%d per osd)', | |
703 | root, pools, pgs, min_pgs, min_pg_per_osd) | |
704 | continue | |
705 | self.log.info('Balancing root %s (pools %s) by %s' % | |
706 | (root, pools, key)) | |
707 | target = best_pe.target_by_root[root] | |
708 | actual = best_pe.actual_by_root[root][key] | |
709 | queue = sorted(actual.keys(), | |
710 | key=lambda osd: -abs(target[osd] - actual[osd])) | |
711 | for osd in queue: | |
712 | if orig_osd_weight[osd] == 0: | |
713 | self.log.debug('skipping out osd.%d', osd) | |
714 | else: | |
715 | deviation = target[osd] - actual[osd] | |
716 | if deviation == 0: | |
717 | break | |
718 | self.log.debug('osd.%d deviation %f', osd, deviation) | |
719 | weight = best_ws[osd] | |
720 | ow = orig_osd_weight[osd] | |
721 | if actual[osd] > 0: | |
722 | calc_weight = target[osd] / actual[osd] * weight * ow | |
723 | else: | |
724 | # not enough to go on here... keep orig weight | |
725 | calc_weight = weight / orig_osd_weight[osd] | |
726 | new_weight = weight * (1.0 - step) + calc_weight * step | |
727 | self.log.debug('Reweight osd.%d %f -> %f', osd, weight, | |
728 | new_weight) | |
729 | next_ws[osd] = new_weight | |
730 | if ow < 1.0: | |
731 | new_ow = min(1.0, max(step + (1.0 - step) * ow, | |
732 | ow + .005)) | |
733 | self.log.debug('Reweight osd.%d reweight %f -> %f', | |
734 | osd, ow, new_ow) | |
735 | next_ow[osd] = new_ow | |
736 | ||
737 | # normalize weights under this root | |
738 | root_weight = crush.get_item_weight(pe.root_ids[root]) | |
739 | root_sum = sum(b for a,b in next_ws.iteritems() | |
740 | if a in target.keys()) | |
741 | if root_sum > 0 and root_weight > 0: | |
742 | factor = root_sum / root_weight | |
743 | self.log.debug('normalizing root %s %d, weight %f, ' | |
744 | 'ws sum %f, factor %f', | |
745 | root, pe.root_ids[root], root_weight, | |
746 | root_sum, factor) | |
747 | for osd in actual.keys(): | |
748 | next_ws[osd] = next_ws[osd] / factor | |
749 | ||
750 | # recalc | |
751 | plan.compat_ws = copy.deepcopy(next_ws) | |
752 | next_ms = plan.final_state() | |
753 | next_pe = self.calc_eval(next_ms) | |
754 | next_misplaced = next_ms.calc_misplaced_from(ms) | |
755 | self.log.debug('Step result score %f -> %f, misplacing %f', | |
756 | best_pe.score, next_pe.score, next_misplaced) | |
757 | ||
758 | if next_misplaced > max_misplaced: | |
759 | if best_pe.score < pe.score: | |
760 | self.log.debug('Step misplaced %f > max %f, stopping', | |
761 | next_misplaced, max_misplaced) | |
762 | break | |
763 | step /= 2.0 | |
764 | next_ws = copy.deepcopy(best_ws) | |
765 | next_ow = copy.deepcopy(best_ow) | |
766 | self.log.debug('Step misplaced %f > max %f, reducing step to %f', | |
767 | next_misplaced, max_misplaced, step) | |
768 | else: | |
769 | if next_pe.score > best_pe.score * 1.0001: | |
770 | if bad_steps < 5 and random.randint(0, 100) < 70: | |
771 | self.log.debug('Score got worse, taking another step') | |
772 | else: | |
773 | step /= 2.0 | |
774 | next_ws = copy.deepcopy(best_ws) | |
775 | next_ow = copy.deepcopy(best_ow) | |
776 | self.log.debug('Score got worse, trying smaller step %f', | |
777 | step) | |
778 | else: | |
779 | bad_steps = 0 | |
780 | best_pe = next_pe | |
781 | best_ws = next_ws | |
782 | best_ow = next_ow | |
783 | if best_pe.score == 0: | |
784 | break | |
785 | left -= 1 | |
786 | ||
787 | # allow a small regression if we are phasing out osd weights | |
788 | fudge = 0 | |
789 | if next_ow != orig_osd_weight: | |
790 | fudge = .001 | |
791 | ||
792 | if best_pe.score < pe.score + fudge: | |
793 | self.log.info('Success, score %f -> %f', pe.score, best_pe.score) | |
794 | plan.compat_ws = best_ws | |
795 | for osd, w in best_ow.iteritems(): | |
796 | if w != orig_osd_weight[osd]: | |
797 | self.log.debug('osd.%d reweight %f', osd, w) | |
798 | plan.osd_weights[osd] = w | |
799 | return True | |
800 | else: | |
801 | self.log.info('Failed to find further optimization, score %f', | |
802 | pe.score) | |
803 | return False | |
804 | ||
805 | def get_compat_weight_set_weights(self): | |
806 | # enable compat weight-set | |
807 | self.log.debug('ceph osd crush weight-set create-compat') | |
808 | result = CommandResult('') | |
809 | self.send_command(result, 'mon', '', json.dumps({ | |
810 | 'prefix': 'osd crush weight-set create-compat', | |
811 | 'format': 'json', | |
812 | }), '') | |
813 | r, outb, outs = result.wait() | |
814 | if r != 0: | |
815 | self.log.error('Error creating compat weight-set') | |
816 | return | |
817 | ||
818 | result = CommandResult('') | |
819 | self.send_command(result, 'mon', '', json.dumps({ | |
820 | 'prefix': 'osd crush dump', | |
821 | 'format': 'json', | |
822 | }), '') | |
823 | r, outb, outs = result.wait() | |
824 | if r != 0: | |
825 | self.log.error('Error dumping crush map') | |
826 | return | |
827 | try: | |
828 | crushmap = json.loads(outb) | |
829 | except: | |
830 | raise RuntimeError('unable to parse crush map') | |
831 | ||
832 | raw = crushmap.get('choose_args',{}).get('-1', []) | |
833 | weight_set = {} | |
834 | for b in raw: | |
835 | bucket = None | |
836 | for t in crushmap['buckets']: | |
837 | if t['id'] == b['bucket_id']: | |
838 | bucket = t | |
839 | break | |
840 | if not bucket: | |
841 | raise RuntimeError('could not find bucket %s' % b['bucket_id']) | |
842 | self.log.debug('bucket items %s' % bucket['items']) | |
843 | self.log.debug('weight set %s' % b['weight_set'][0]) | |
844 | if len(bucket['items']) != len(b['weight_set'][0]): | |
845 | raise RuntimeError('weight-set size does not match bucket items') | |
846 | for pos in range(len(bucket['items'])): | |
847 | weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos] | |
848 | ||
849 | self.log.debug('weight_set weights %s' % weight_set) | |
850 | return weight_set | |
851 | ||
852 | def do_crush(self): | |
853 | self.log.info('do_crush (not yet implemented)') | |
854 | ||
855 | def do_osd_weight(self): | |
856 | self.log.info('do_osd_weight (not yet implemented)') | |
857 | ||
858 | def execute(self, plan): | |
859 | self.log.info('Executing plan %s' % plan.name) | |
860 | ||
861 | commands = [] | |
862 | ||
863 | # compat weight-set | |
864 | if len(plan.compat_ws) and \ | |
865 | '-1' not in plan.initial.crush_dump.get('choose_args', {}): | |
866 | self.log.debug('ceph osd crush weight-set create-compat') | |
867 | result = CommandResult('') | |
868 | self.send_command(result, 'mon', '', json.dumps({ | |
869 | 'prefix': 'osd crush weight-set create-compat', | |
870 | 'format': 'json', | |
871 | }), '') | |
872 | r, outb, outs = result.wait() | |
873 | if r != 0: | |
874 | self.log.error('Error creating compat weight-set') | |
875 | return | |
876 | ||
877 | for osd, weight in plan.compat_ws.iteritems(): | |
878 | self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f', | |
879 | osd, weight) | |
b32b8144 | 880 | result = CommandResult('') |
3efd9988 FG |
881 | self.send_command(result, 'mon', '', json.dumps({ |
882 | 'prefix': 'osd crush weight-set reweight-compat', | |
883 | 'format': 'json', | |
884 | 'item': 'osd.%d' % osd, | |
885 | 'weight': [weight], | |
b32b8144 | 886 | }), '') |
3efd9988 FG |
887 | commands.append(result) |
888 | ||
889 | # new_weight | |
890 | reweightn = {} | |
891 | for osd, weight in plan.osd_weights.iteritems(): | |
892 | reweightn[str(osd)] = str(int(weight * float(0x10000))) | |
893 | if len(reweightn): | |
894 | self.log.info('ceph osd reweightn %s', reweightn) | |
b32b8144 | 895 | result = CommandResult('') |
3efd9988 FG |
896 | self.send_command(result, 'mon', '', json.dumps({ |
897 | 'prefix': 'osd reweightn', | |
898 | 'format': 'json', | |
899 | 'weights': json.dumps(reweightn), | |
b32b8144 | 900 | }), '') |
3efd9988 FG |
901 | commands.append(result) |
902 | ||
903 | # upmap | |
904 | incdump = plan.inc.dump() | |
905 | for pgid in incdump.get('old_pg_upmap_items', []): | |
906 | self.log.info('ceph osd rm-pg-upmap-items %s', pgid) | |
907 | result = CommandResult('foo') | |
908 | self.send_command(result, 'mon', '', json.dumps({ | |
909 | 'prefix': 'osd rm-pg-upmap-items', | |
910 | 'format': 'json', | |
911 | 'pgid': pgid, | |
912 | }), 'foo') | |
913 | commands.append(result) | |
914 | ||
915 | for item in incdump.get('new_pg_upmap_items', []): | |
916 | self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'], | |
917 | item['mappings']) | |
918 | osdlist = [] | |
919 | for m in item['mappings']: | |
920 | osdlist += [m['from'], m['to']] | |
921 | result = CommandResult('foo') | |
922 | self.send_command(result, 'mon', '', json.dumps({ | |
923 | 'prefix': 'osd pg-upmap-items', | |
924 | 'format': 'json', | |
925 | 'pgid': item['pgid'], | |
926 | 'id': osdlist, | |
927 | }), 'foo') | |
928 | commands.append(result) | |
929 | ||
930 | # wait for commands | |
931 | self.log.debug('commands %s' % commands) | |
932 | for result in commands: | |
933 | r, outb, outs = result.wait() | |
934 | if r != 0: | |
935 | self.log.error('Error on command') | |
936 | return | |
937 | self.log.debug('done') |