]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
1
2 """
3 Balance PG distribution across OSDs.
4 """
5
6 import copy
7 import errno
8 import json
9 import math
10 import random
11 import six
12 import time
13 from mgr_module import MgrModule, CommandResult
14 from threading import Event
15 from mgr_module import CRUSHMap
16 import datetime
17
18 TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
19
20 class MappingState:
21 def __init__(self, osdmap, pg_dump, desc=''):
22 self.desc = desc
23 self.osdmap = osdmap
24 self.osdmap_dump = self.osdmap.dump()
25 self.crush = osdmap.get_crush()
26 self.crush_dump = self.crush.dump()
27 self.pg_dump = pg_dump
28 self.pg_stat = {
29 i['pgid']: i['stat_sum'] for i in pg_dump.get('pg_stats', [])
30 }
31 osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
32 pg_poolids = [p['poolid'] for p in pg_dump.get('pool_stats', [])]
33 self.poolids = set(osd_poolids) & set(pg_poolids)
34 self.pg_up = {}
35 self.pg_up_by_poolid = {}
36 for poolid in self.poolids:
37 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
38 for a,b in six.iteritems(self.pg_up_by_poolid[poolid]):
39 self.pg_up[a] = b
40
41 def calc_misplaced_from(self, other_ms):
42 num = len(other_ms.pg_up)
43 misplaced = 0
44 for pgid, before in six.iteritems(other_ms.pg_up):
45 if before != self.pg_up.get(pgid, []):
46 misplaced += 1
47 if num > 0:
48 return float(misplaced) / float(num)
49 return 0.0
50
51 class Plan:
52 def __init__(self, name, ms, pools):
53 self.mode = 'unknown'
54 self.name = name
55 self.initial = ms
56 self.pools = pools
57
58 self.osd_weights = {}
59 self.compat_ws = {}
60 self.inc = ms.osdmap.new_incremental()
61
62 def final_state(self):
63 self.inc.set_osd_reweights(self.osd_weights)
64 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
65 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
66 self.initial.pg_dump,
67 'plan %s final' % self.name)
68
69 def dump(self):
70 return json.dumps(self.inc.dump(), indent=4)
71
72 def show(self):
73 ls = []
74 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
75 ls.append('# starting crush version %d' %
76 self.initial.osdmap.get_crush_version())
77 ls.append('# mode %s' % self.mode)
78 if len(self.compat_ws) and \
79 not CRUSHMap.have_default_choose_args(self.initial.crush_dump):
80 ls.append('ceph osd crush weight-set create-compat')
81 for osd, weight in six.iteritems(self.compat_ws):
82 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
83 (osd, weight))
84 for osd, weight in six.iteritems(self.osd_weights):
85 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
86 incdump = self.inc.dump()
87 for pgid in incdump.get('old_pg_upmap_items', []):
88 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
89 for item in incdump.get('new_pg_upmap_items', []):
90 osdlist = []
91 for m in item['mappings']:
92 osdlist += [m['from'], m['to']]
93 ls.append('ceph osd pg-upmap-items %s %s' %
94 (item['pgid'], ' '.join([str(a) for a in osdlist])))
95 return '\n'.join(ls)
96
97
98 class Eval:
99 def __init__(self, ms):
100 self.ms = ms
101 self.root_ids = {} # root name -> id
102 self.pool_name = {} # pool id -> pool name
103 self.pool_id = {} # pool name -> id
104 self.pool_roots = {} # pool name -> root name
105 self.root_pools = {} # root name -> pools
106 self.target_by_root = {} # root name -> target weight map
107 self.count_by_pool = {}
108 self.count_by_root = {}
109 self.actual_by_pool = {} # pool -> by_* -> actual weight map
110 self.actual_by_root = {} # pool -> by_* -> actual weight map
111 self.total_by_pool = {} # pool -> by_* -> total
112 self.total_by_root = {} # root -> by_* -> total
113 self.stats_by_pool = {} # pool -> by_* -> stddev or avg -> value
114 self.stats_by_root = {} # root -> by_* -> stddev or avg -> value
115
116 self.score_by_pool = {}
117 self.score_by_root = {}
118
119 self.score = 0.0
120
121 def show(self, verbose=False):
122 if verbose:
123 r = self.ms.desc + '\n'
124 r += 'target_by_root %s\n' % self.target_by_root
125 r += 'actual_by_pool %s\n' % self.actual_by_pool
126 r += 'actual_by_root %s\n' % self.actual_by_root
127 r += 'count_by_pool %s\n' % self.count_by_pool
128 r += 'count_by_root %s\n' % self.count_by_root
129 r += 'total_by_pool %s\n' % self.total_by_pool
130 r += 'total_by_root %s\n' % self.total_by_root
131 r += 'stats_by_root %s\n' % self.stats_by_root
132 r += 'score_by_pool %s\n' % self.score_by_pool
133 r += 'score_by_root %s\n' % self.score_by_root
134 else:
135 r = self.ms.desc + ' '
136 r += 'score %f (lower is better)\n' % self.score
137 return r
138
139 def calc_stats(self, count, target, total):
140 num = max(len(target), 1)
141 r = {}
142 for t in ('pgs', 'objects', 'bytes'):
143 if total[t] == 0:
144 r[t] = {
145 'avg': 0,
146 'stddev': 0,
147 'sum_weight': 0,
148 'score': 0,
149 }
150 continue
151
152 avg = float(total[t]) / float(num)
153 dev = 0.0
154
155 # score is a measure of how uneven the data distribution is.
156 # score lies between [0, 1), 0 means perfect distribution.
157 score = 0.0
158 sum_weight = 0.0
159
160 for k, v in six.iteritems(count[t]):
161 # adjust/normalize by weight
162 if target[k]:
163 adjusted = float(v) / target[k] / float(num)
164 else:
165 adjusted = 0.0
166
167 # Overweighted devices and their weights are factors to calculate reweight_urgency.
168 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
169 # situation than one 10% overfilled with 5 2% underfilled devices
170 if adjusted > avg:
171 '''
172 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
173 x = (adjusted - avg)/avg.
174 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
175 To bring range of F(x) in range [0, 1), we need to make the above modification.
176
177 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
178 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
179 2. A larger value of x, should imply more urgency to reweight.
180 3. Also, the difference between F(x) when x is large, should be minimal.
181 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
182
183 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
184
185 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
186 '''
187 score += target[k] * (math.erf(((adjusted - avg)/avg) / math.sqrt(2.0)))
188 sum_weight += target[k]
189 dev += (avg - adjusted) * (avg - adjusted)
190 stddev = math.sqrt(dev / float(max(num - 1, 1)))
191 score = score / max(sum_weight, 1)
192 r[t] = {
193 'avg': avg,
194 'stddev': stddev,
195 'sum_weight': sum_weight,
196 'score': score,
197 }
198 return r
199
200 class Module(MgrModule):
201 MODULE_OPTIONS = [
202 {
203 'name': 'active',
204 'type': 'bool',
205 'default': False,
206 'desc': 'automatically balance PGs across cluster',
207 'runtime': True,
208 },
209 {
210 'name': 'begin_time',
211 'type': 'str',
212 'default': '0000',
213 'desc': 'beginning time of day to automatically balance',
214 'long_desc': 'This is a time of day in the format HHMM.',
215 'runtime': True,
216 },
217 {
218 'name': 'end_time',
219 'type': 'str',
220 'default': '2400',
221 'desc': 'ending time of day to automatically balance',
222 'long_desc': 'This is a time of day in the format HHMM.',
223 'runtime': True,
224 },
225 {
226 'name': 'begin_weekday',
227 'type': 'uint',
228 'default': 0,
229 'min': 0,
230 'max': 7,
231 'desc': 'Restrict automatic balancing to this day of the week or later',
232 'long_desc': '0 or 7 = Sunday, 1 = Monday, etc.',
233 'runtime': True,
234 },
235 {
236 'name': 'end_weekday',
237 'type': 'uint',
238 'default': 7,
239 'min': 0,
240 'max': 7,
241 'desc': 'Restrict automatic balancing to days of the week earlier than this',
242 'long_desc': '0 or 7 = Sunday, 1 = Monday, etc.',
243 'runtime': True,
244 },
245 {
246 'name': 'crush_compat_max_iterations',
247 'type': 'uint',
248 'default': 25,
249 'min': 1,
250 'max': 250,
251 'desc': 'maximum number of iterations to attempt optimization',
252 'runtime': True,
253 },
254 {
255 'name': 'crush_compat_metrics',
256 'type': 'str',
257 'default': 'pgs,objects,bytes',
258 'desc': 'metrics with which to calculate OSD utilization',
259 'long_desc': 'Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
260 'runtime': True,
261 },
262 {
263 'name': 'crush_compat_step',
264 'type': 'float',
265 'default': .5,
266 'min': .001,
267 'max': .999,
268 'desc': 'aggressiveness of optimization',
269 'long_desc': '.99 is very aggressive, .01 is less aggressive',
270 'runtime': True,
271 },
272 {
273 'name': 'min_score',
274 'type': 'float',
275 'default': 0,
276 'desc': 'minimum score, below which no optimization is attempted',
277 'runtime': True,
278 },
279 {
280 'name': 'mode',
281 'desc': 'Balancer mode',
282 'default': 'none',
283 'enum_allowed': ['none', 'crush-compat', 'upmap'],
284 'runtime': True,
285 },
286 {
287 'name': 'sleep_interval',
288 'type': 'secs',
289 'default': 60,
290 'desc': 'how frequently to wake up and attempt optimization',
291 'runtime': True,
292 },
293 {
294 'name': 'upmap_max_iterations',
295 'type': 'uint',
296 'default': 10,
297 'desc': 'maximum upmap optimization iterations',
298 'runtime': True,
299 },
300 {
301 'name': 'upmap_max_deviation',
302 'type': 'int',
303 'default': 5,
304 'min': 1,
305 'desc': 'deviation below which no optimization is attempted',
306 'long_desc': 'If the number of PGs are within this count then no optimization is attempted',
307 'runtime': True,
308 },
309 {
310 'name': 'pool_ids',
311 'type': 'str',
312 'default': '',
313 'desc': 'pools which the automatic balancing will be limited to',
314 'runtime': True,
315 },
316 ]
317
318 COMMANDS = [
319 {
320 "cmd": "balancer status",
321 "desc": "Show balancer status",
322 "perm": "r",
323 },
324 {
325 "cmd": "balancer mode name=mode,type=CephChoices,strings=none|crush-compat|upmap",
326 "desc": "Set balancer mode",
327 "perm": "rw",
328 },
329 {
330 "cmd": "balancer on",
331 "desc": "Enable automatic balancing",
332 "perm": "rw",
333 },
334 {
335 "cmd": "balancer off",
336 "desc": "Disable automatic balancing",
337 "perm": "rw",
338 },
339 {
340 "cmd": "balancer pool ls",
341 "desc": "List automatic balancing pools. "
342 "Note that empty list means all existing pools will be automatic balancing targets, "
343 "which is the default behaviour of balancer.",
344 "perm": "r",
345 },
346 {
347 "cmd": "balancer pool add name=pools,type=CephString,n=N",
348 "desc": "Enable automatic balancing for specific pools",
349 "perm": "rw",
350 },
351 {
352 "cmd": "balancer pool rm name=pools,type=CephString,n=N",
353 "desc": "Disable automatic balancing for specific pools",
354 "perm": "rw",
355 },
356 {
357 "cmd": "balancer eval name=option,type=CephString,req=false",
358 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan",
359 "perm": "r",
360 },
361 {
362 "cmd": "balancer eval-verbose name=option,type=CephString,req=false",
363 "desc": "Evaluate data distribution for the current cluster or specific pool or specific plan (verbosely)",
364 "perm": "r",
365 },
366 {
367 "cmd": "balancer optimize name=plan,type=CephString name=pools,type=CephString,n=N,req=false",
368 "desc": "Run optimizer to create a new plan",
369 "perm": "rw",
370 },
371 {
372 "cmd": "balancer show name=plan,type=CephString",
373 "desc": "Show details of an optimization plan",
374 "perm": "r",
375 },
376 {
377 "cmd": "balancer rm name=plan,type=CephString",
378 "desc": "Discard an optimization plan",
379 "perm": "rw",
380 },
381 {
382 "cmd": "balancer reset",
383 "desc": "Discard all optimization plans",
384 "perm": "rw",
385 },
386 {
387 "cmd": "balancer dump name=plan,type=CephString",
388 "desc": "Show an optimization plan",
389 "perm": "r",
390 },
391 {
392 "cmd": "balancer ls",
393 "desc": "List all plans",
394 "perm": "r",
395 },
396 {
397 "cmd": "balancer execute name=plan,type=CephString",
398 "desc": "Execute an optimization plan",
399 "perm": "rw",
400 },
401 ]
402 active = False
403 run = True
404 plans = {}
405 mode = ''
406 optimizing = False
407 last_optimize_started = ''
408 last_optimize_duration = ''
409 optimize_result = ''
410 success_string = 'Optimization plan created successfully'
411 in_progress_string = 'in progress'
412
413 def __init__(self, *args, **kwargs):
414 super(Module, self).__init__(*args, **kwargs)
415 self.event = Event()
416
417 def handle_command(self, inbuf, command):
418 self.log.warn("Handling command: '%s'" % str(command))
419 if command['prefix'] == 'balancer status':
420 s = {
421 'plans': list(self.plans.keys()),
422 'active': self.active,
423 'last_optimize_started': self.last_optimize_started,
424 'last_optimize_duration': self.last_optimize_duration,
425 'optimize_result': self.optimize_result,
426 'mode': self.get_module_option('mode'),
427 }
428 return (0, json.dumps(s, indent=4), '')
429 elif command['prefix'] == 'balancer mode':
430 if command['mode'] == 'upmap':
431 min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
432 if min_compat_client < 'luminous': # works well because version is alphabetized..
433 warn = 'min_compat_client "%s" ' \
434 '< "luminous", which is required for pg-upmap. ' \
435 'Try "ceph osd set-require-min-compat-client luminous" ' \
436 'before enabling this mode' % min_compat_client
437 return (-errno.EPERM, '', warn)
438 elif command['mode'] == 'crush-compat':
439 ms = MappingState(self.get_osdmap(),
440 self.get("pg_dump"),
441 'initialize compat weight-set')
442 self.get_compat_weight_set_weights(ms) # ignore error
443 self.set_module_option('mode', command['mode'])
444 return (0, '', '')
445 elif command['prefix'] == 'balancer on':
446 if not self.active:
447 self.set_module_option('active', 'true')
448 self.active = True
449 self.event.set()
450 return (0, '', '')
451 elif command['prefix'] == 'balancer off':
452 if self.active:
453 self.set_module_option('active', 'false')
454 self.active = False
455 self.event.set()
456 return (0, '', '')
457 elif command['prefix'] == 'balancer pool ls':
458 pool_ids = self.get_module_option('pool_ids')
459 if pool_ids is '':
460 return (0, '', '')
461 pool_ids = pool_ids.split(',')
462 pool_ids = [int(p) for p in pool_ids]
463 pool_name_by_id = dict((p['pool'], p['pool_name']) for p in self.get_osdmap().dump().get('pools', []))
464 should_prune = False
465 final_ids = []
466 final_names = []
467 for p in pool_ids:
468 if p in pool_name_by_id:
469 final_ids.append(p)
470 final_names.append(pool_name_by_id[p])
471 else:
472 should_prune = True
473 if should_prune: # some pools were gone, prune
474 self.set_module_option('pool_ids', ','.join(final_ids))
475 return (0, json.dumps(final_names, indent=4), '')
476 elif command['prefix'] == 'balancer pool add':
477 raw_names = command['pools']
478 pool_id_by_name = dict((p['pool_name'], p['pool']) for p in self.get_osdmap().dump().get('pools', []))
479 invalid_names = [p for p in raw_names if p not in pool_id_by_name]
480 if invalid_names:
481 return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
482 to_add = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
483 existing = self.get_module_option('pool_ids')
484 final = to_add
485 if existing is not '':
486 existing = existing.split(',')
487 final = set(to_add) | set(existing)
488 self.set_module_option('pool_ids', ','.join(final))
489 return (0, '', '')
490 elif command['prefix'] == 'balancer pool rm':
491 raw_names = command['pools']
492 existing = self.get_module_option('pool_ids')
493 if existing is '': # for idempotence
494 return (0, '', '')
495 existing = existing.split(',')
496 osdmap = self.get_osdmap()
497 pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
498 pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
499 final = [p for p in existing if p in pool_ids]
500 to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
501 final = set(final) - set(to_delete)
502 self.set_module_option('pool_ids', ','.join(final))
503 return (0, '', '')
504 elif command['prefix'] == 'balancer eval' or command['prefix'] == 'balancer eval-verbose':
505 verbose = command['prefix'] == 'balancer eval-verbose'
506 pools = []
507 if 'option' in command:
508 plan = self.plans.get(command['option'])
509 if not plan:
510 # not a plan, does it look like a pool?
511 osdmap = self.get_osdmap()
512 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
513 option = command['option']
514 if option not in valid_pool_names:
515 return (-errno.EINVAL, '', 'option "%s" not a plan or a pool' % option)
516 pools.append(option)
517 ms = MappingState(osdmap, self.get("pg_dump"), 'pool "%s"' % option)
518 else:
519 pools = plan.pools
520 ms = plan.final_state()
521 else:
522 ms = MappingState(self.get_osdmap(),
523 self.get("pg_dump"),
524 'current cluster')
525 return (0, self.evaluate(ms, pools, verbose=verbose), '')
526 elif command['prefix'] == 'balancer optimize':
527 # The GIL can be release by the active balancer, so disallow when active
528 if self.active:
529 return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
530 if self.optimizing:
531 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
532 pools = []
533 if 'pools' in command:
534 pools = command['pools']
535 osdmap = self.get_osdmap()
536 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
537 invalid_pool_names = []
538 for p in pools:
539 if p not in valid_pool_names:
540 invalid_pool_names.append(p)
541 if len(invalid_pool_names):
542 return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
543 plan = self.plan_create(command['plan'], osdmap, pools)
544 self.last_optimize_started = time.asctime(time.localtime())
545 self.optimize_result = self.in_progress_string
546 start = time.time()
547 r, detail = self.optimize(plan)
548 end = time.time()
549 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
550 if r == 0:
551 # Add plan if an optimization was created
552 self.optimize_result = self.success_string
553 self.plans[command['plan']] = plan
554 else:
555 self.optimize_result = detail
556 return (r, '', detail)
557 elif command['prefix'] == 'balancer rm':
558 self.plan_rm(command['plan'])
559 return (0, '', '')
560 elif command['prefix'] == 'balancer reset':
561 self.plans = {}
562 return (0, '', '')
563 elif command['prefix'] == 'balancer ls':
564 return (0, json.dumps([p for p in self.plans], indent=4), '')
565 elif command['prefix'] == 'balancer dump':
566 plan = self.plans.get(command['plan'])
567 if not plan:
568 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
569 return (0, plan.dump(), '')
570 elif command['prefix'] == 'balancer show':
571 plan = self.plans.get(command['plan'])
572 if not plan:
573 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
574 return (0, plan.show(), '')
575 elif command['prefix'] == 'balancer execute':
576 # The GIL can be release by the active balancer, so disallow when active
577 if self.active:
578 return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
579 if self.optimizing:
580 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
581 plan = self.plans.get(command['plan'])
582 if not plan:
583 return (-errno.ENOENT, '', 'plan %s not found' % command['plan'])
584 r, detail = self.execute(plan)
585 self.plan_rm(command['plan'])
586 return (r, '', detail)
587 else:
588 return (-errno.EINVAL, '',
589 "Command not found '{0}'".format(command['prefix']))
590
591 def shutdown(self):
592 self.log.info('Stopping')
593 self.run = False
594 self.event.set()
595
596 def time_permit(self):
597 local_time = time.localtime()
598 time_of_day = time.strftime('%H%M', local_time)
599 weekday = (local_time.tm_wday + 1) % 7 # be compatible with C
600 permit = False
601
602 begin_time = self.get_module_option('begin_time')
603 end_time = self.get_module_option('end_time')
604 if begin_time <= end_time:
605 permit = begin_time <= time_of_day < end_time
606 else:
607 permit = time_of_day >= begin_time or time_of_day < end_time
608 if not permit:
609 self.log.debug("should run between %s - %s, now %s, skipping",
610 begin_time, end_time, time_of_day)
611 return False
612
613 begin_weekday = self.get_module_option('begin_weekday')
614 end_weekday = self.get_module_option('end_weekday')
615 if begin_weekday <= end_weekday:
616 permit = begin_weekday <= weekday < end_weekday
617 else:
618 permit = weekday >= begin_weekday or weekday < end_weekday
619 if not permit:
620 self.log.debug("should run between weekday %d - %d, now %d, skipping",
621 begin_weekday, end_weekday, weekday)
622 return False
623
624 return True
625
626 def serve(self):
627 self.log.info('Starting')
628 while self.run:
629 self.active = self.get_module_option('active')
630 sleep_interval = self.get_module_option('sleep_interval')
631 self.log.debug('Waking up [%s, now %s]',
632 "active" if self.active else "inactive",
633 time.strftime(TIME_FORMAT, time.localtime()))
634 if self.active and self.time_permit():
635 self.log.debug('Running')
636 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
637 osdmap = self.get_osdmap()
638 allow = self.get_module_option('pool_ids')
639 final = []
640 if allow is not '':
641 allow = allow.split(',')
642 valid = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
643 final = set(allow) & set(valid)
644 if set(allow) - set(valid): # some pools were gone, prune
645 self.set_module_option('pool_ids', ','.join(final))
646 pool_name_by_id = dict((p['pool'], p['pool_name']) for p in osdmap.dump().get('pools', []))
647 final = [int(p) for p in final]
648 final = [pool_name_by_id[p] for p in final if p in pool_name_by_id]
649 plan = self.plan_create(name, osdmap, final)
650 self.optimizing = True
651 self.last_optimize_started = time.asctime(time.localtime())
652 self.optimize_result = self.in_progress_string
653 start = time.time()
654 r, detail = self.optimize(plan)
655 end = time.time()
656 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
657 if r == 0:
658 self.optimize_result = self.success_string
659 self.execute(plan)
660 else:
661 self.optimize_result = detail
662 self.optimizing = False
663 self.log.debug('Sleeping for %d', sleep_interval)
664 self.event.wait(sleep_interval)
665 self.event.clear()
666
667 def plan_create(self, name, osdmap, pools):
668 plan = Plan(name,
669 MappingState(osdmap,
670 self.get("pg_dump"),
671 'plan %s initial' % name),
672 pools)
673 return plan
674
675 def plan_rm(self, name):
676 if name in self.plans:
677 del self.plans[name]
678
679 def calc_eval(self, ms, pools):
680 pe = Eval(ms)
681 pool_rule = {}
682 pool_info = {}
683 for p in ms.osdmap_dump.get('pools',[]):
684 if len(pools) and p['pool_name'] not in pools:
685 continue
686 # skip dead or not-yet-ready pools too
687 if p['pool'] not in ms.poolids:
688 continue
689 pe.pool_name[p['pool']] = p['pool_name']
690 pe.pool_id[p['pool_name']] = p['pool']
691 pool_rule[p['pool_name']] = p['crush_rule']
692 pe.pool_roots[p['pool_name']] = []
693 pool_info[p['pool_name']] = p
694 if len(pool_info) == 0:
695 return pe
696 self.log.debug('pool_name %s' % pe.pool_name)
697 self.log.debug('pool_id %s' % pe.pool_id)
698 self.log.debug('pools %s' % pools)
699 self.log.debug('pool_rule %s' % pool_rule)
700
701 osd_weight = { a['osd']: a['weight']
702 for a in ms.osdmap_dump.get('osds',[]) if a['weight'] > 0 }
703
704 # get expected distributions by root
705 actual_by_root = {}
706 rootids = ms.crush.find_takes()
707 roots = []
708 for rootid in rootids:
709 ls = ms.osdmap.get_pools_by_take(rootid)
710 want = []
711 # find out roots associating with pools we are passed in
712 for candidate in ls:
713 if candidate in pe.pool_name:
714 want.append(candidate)
715 if len(want) == 0:
716 continue
717 root = ms.crush.get_item_name(rootid)
718 pe.root_pools[root] = []
719 for poolid in want:
720 pe.pool_roots[pe.pool_name[poolid]].append(root)
721 pe.root_pools[root].append(pe.pool_name[poolid])
722 pe.root_ids[root] = rootid
723 roots.append(root)
724 weight_map = ms.crush.get_take_weight_osd_map(rootid)
725 adjusted_map = {
726 osd: cw * osd_weight[osd]
727 for osd,cw in six.iteritems(weight_map) if osd in osd_weight and cw > 0
728 }
729 sum_w = sum(adjusted_map.values())
730 assert len(adjusted_map) == 0 or sum_w > 0
731 pe.target_by_root[root] = { osd: w / sum_w
732 for osd,w in six.iteritems(adjusted_map) }
733 actual_by_root[root] = {
734 'pgs': {},
735 'objects': {},
736 'bytes': {},
737 }
738 for osd in pe.target_by_root[root]:
739 actual_by_root[root]['pgs'][osd] = 0
740 actual_by_root[root]['objects'][osd] = 0
741 actual_by_root[root]['bytes'][osd] = 0
742 pe.total_by_root[root] = {
743 'pgs': 0,
744 'objects': 0,
745 'bytes': 0,
746 }
747 self.log.debug('pool_roots %s' % pe.pool_roots)
748 self.log.debug('root_pools %s' % pe.root_pools)
749 self.log.debug('target_by_root %s' % pe.target_by_root)
750
751 # pool and root actual
752 for pool, pi in six.iteritems(pool_info):
753 poolid = pi['pool']
754 pm = ms.pg_up_by_poolid[poolid]
755 pgs = 0
756 objects = 0
757 bytes = 0
758 pgs_by_osd = {}
759 objects_by_osd = {}
760 bytes_by_osd = {}
761 for root in pe.pool_roots[pool]:
762 for osd in pe.target_by_root[root]:
763 pgs_by_osd[osd] = 0
764 objects_by_osd[osd] = 0
765 bytes_by_osd[osd] = 0
766 for pgid, up in six.iteritems(pm):
767 for osd in [int(osd) for osd in up]:
768 if osd == CRUSHMap.ITEM_NONE:
769 continue
770 pgs_by_osd[osd] += 1
771 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
772 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
773 # pick a root to associate this pg instance with.
774 # note that this is imprecise if the roots have
775 # overlapping children.
776 # FIXME: divide bytes by k for EC pools.
777 for root in pe.pool_roots[pool]:
778 if osd in pe.target_by_root[root]:
779 actual_by_root[root]['pgs'][osd] += 1
780 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
781 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
782 pgs += 1
783 objects += ms.pg_stat[pgid]['num_objects']
784 bytes += ms.pg_stat[pgid]['num_bytes']
785 pe.total_by_root[root]['pgs'] += 1
786 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
787 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
788 break
789 pe.count_by_pool[pool] = {
790 'pgs': {
791 k: v
792 for k, v in six.iteritems(pgs_by_osd)
793 },
794 'objects': {
795 k: v
796 for k, v in six.iteritems(objects_by_osd)
797 },
798 'bytes': {
799 k: v
800 for k, v in six.iteritems(bytes_by_osd)
801 },
802 }
803 pe.actual_by_pool[pool] = {
804 'pgs': {
805 k: float(v) / float(max(pgs, 1))
806 for k, v in six.iteritems(pgs_by_osd)
807 },
808 'objects': {
809 k: float(v) / float(max(objects, 1))
810 for k, v in six.iteritems(objects_by_osd)
811 },
812 'bytes': {
813 k: float(v) / float(max(bytes, 1))
814 for k, v in six.iteritems(bytes_by_osd)
815 },
816 }
817 pe.total_by_pool[pool] = {
818 'pgs': pgs,
819 'objects': objects,
820 'bytes': bytes,
821 }
822 for root in pe.total_by_root:
823 pe.count_by_root[root] = {
824 'pgs': {
825 k: float(v)
826 for k, v in six.iteritems(actual_by_root[root]['pgs'])
827 },
828 'objects': {
829 k: float(v)
830 for k, v in six.iteritems(actual_by_root[root]['objects'])
831 },
832 'bytes': {
833 k: float(v)
834 for k, v in six.iteritems(actual_by_root[root]['bytes'])
835 },
836 }
837 pe.actual_by_root[root] = {
838 'pgs': {
839 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
840 for k, v in six.iteritems(actual_by_root[root]['pgs'])
841 },
842 'objects': {
843 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
844 for k, v in six.iteritems(actual_by_root[root]['objects'])
845 },
846 'bytes': {
847 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
848 for k, v in six.iteritems(actual_by_root[root]['bytes'])
849 },
850 }
851 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
852 self.log.debug('actual_by_root %s' % pe.actual_by_root)
853
854 # average and stddev and score
855 pe.stats_by_root = {
856 a: pe.calc_stats(
857 b,
858 pe.target_by_root[a],
859 pe.total_by_root[a]
860 ) for a, b in six.iteritems(pe.count_by_root)
861 }
862 self.log.debug('stats_by_root %s' % pe.stats_by_root)
863
864 # the scores are already normalized
865 pe.score_by_root = {
866 r: {
867 'pgs': pe.stats_by_root[r]['pgs']['score'],
868 'objects': pe.stats_by_root[r]['objects']['score'],
869 'bytes': pe.stats_by_root[r]['bytes']['score'],
870 } for r in pe.total_by_root.keys()
871 }
872 self.log.debug('score_by_root %s' % pe.score_by_root)
873
874 # get the list of score metrics, comma separated
875 metrics = self.get_module_option('crush_compat_metrics').split(',')
876
877 # total score is just average of normalized stddevs
878 pe.score = 0.0
879 for r, vs in six.iteritems(pe.score_by_root):
880 for k, v in six.iteritems(vs):
881 if k in metrics:
882 pe.score += v
883 pe.score /= len(metrics) * len(roots)
884 return pe
885
886 def evaluate(self, ms, pools, verbose=False):
887 pe = self.calc_eval(ms, pools)
888 return pe.show(verbose=verbose)
889
890 def optimize(self, plan):
891 self.log.info('Optimize plan %s' % plan.name)
892 plan.mode = self.get_module_option('mode')
893 max_misplaced = float(self.get_ceph_option('target_max_misplaced_ratio'))
894 self.log.info('Mode %s, max misplaced %f' %
895 (plan.mode, max_misplaced))
896
897 info = self.get('pg_status')
898 unknown = info.get('unknown_pgs_ratio', 0.0)
899 degraded = info.get('degraded_ratio', 0.0)
900 inactive = info.get('inactive_pgs_ratio', 0.0)
901 misplaced = info.get('misplaced_ratio', 0.0)
902 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
903 unknown, degraded, inactive, misplaced)
904 if unknown > 0.0:
905 detail = 'Some PGs (%f) are unknown; try again later' % unknown
906 self.log.info(detail)
907 return -errno.EAGAIN, detail
908 elif degraded > 0.0:
909 detail = 'Some objects (%f) are degraded; try again later' % degraded
910 self.log.info(detail)
911 return -errno.EAGAIN, detail
912 elif inactive > 0.0:
913 detail = 'Some PGs (%f) are inactive; try again later' % inactive
914 self.log.info(detail)
915 return -errno.EAGAIN, detail
916 elif misplaced >= max_misplaced:
917 detail = 'Too many objects (%f > %f) are misplaced; ' \
918 'try again later' % (misplaced, max_misplaced)
919 self.log.info(detail)
920 return -errno.EAGAIN, detail
921 else:
922 if plan.mode == 'upmap':
923 return self.do_upmap(plan)
924 elif plan.mode == 'crush-compat':
925 return self.do_crush_compat(plan)
926 elif plan.mode == 'none':
927 detail = 'Please do "ceph balancer mode" to choose a valid mode first'
928 self.log.info('Idle')
929 return -errno.ENOEXEC, detail
930 else:
931 detail = 'Unrecognized mode %s' % plan.mode
932 self.log.info(detail)
933 return -errno.EINVAL, detail
934 ##
935
936 def do_upmap(self, plan):
937 self.log.info('do_upmap')
938 max_iterations = self.get_module_option('upmap_max_iterations')
939 max_deviation = self.get_module_option('upmap_max_deviation')
940
941 ms = plan.initial
942 if len(plan.pools):
943 pools = plan.pools
944 else: # all
945 pools = [str(i['pool_name']) for i in ms.osdmap_dump.get('pools',[])]
946 if len(pools) == 0:
947 detail = 'No pools available'
948 self.log.info(detail)
949 return -errno.ENOENT, detail
950 # shuffle pool list so they all get equal (in)attention
951 random.shuffle(pools)
952 self.log.info('pools %s' % pools)
953
954 adjusted_pools = []
955 inc = plan.inc
956 total_did = 0
957 left = max_iterations
958 osdmap_dump = self.get_osdmap().dump()
959 pools_with_pg_merge = [p['pool_name'] for p in osdmap_dump.get('pools', [])
960 if p['pg_num'] > p['pg_num_target']]
961 crush_rule_by_pool_name = dict((p['pool_name'], p['crush_rule']) for p in osdmap_dump.get('pools', []))
962 for pool in pools:
963 if pool not in crush_rule_by_pool_name:
964 self.log.info('pool %s does not exist' % pool)
965 continue
966 if pool in pools_with_pg_merge:
967 self.log.info('pool %s has pending PG(s) for merging, skipping for now' % pool)
968 continue
969 adjusted_pools.append(pool)
970 # shuffle so all pools get equal (in)attention
971 random.shuffle(adjusted_pools)
972 for pool in adjusted_pools:
973 did = ms.osdmap.calc_pg_upmaps(inc, max_deviation, left, [pool])
974 total_did += did
975 left -= did
976 if left <= 0:
977 break
978 self.log.info('prepared %d/%d changes' % (total_did, max_iterations))
979 if total_did == 0:
980 return -errno.EALREADY, 'Unable to find further optimization, ' \
981 'or pool(s) pg_num is decreasing, ' \
982 'or distribution is already perfect'
983 return 0, ''
984
985 def do_crush_compat(self, plan):
986 self.log.info('do_crush_compat')
987 max_iterations = self.get_module_option('crush_compat_max_iterations')
988 if max_iterations < 1:
989 return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
990 step = self.get_module_option('crush_compat_step')
991 if step <= 0 or step >= 1.0:
992 return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
993 max_misplaced = float(self.get_ceph_option('target_max_misplaced_ratio'))
994 min_pg_per_osd = 2
995
996 ms = plan.initial
997 osdmap = ms.osdmap
998 crush = osdmap.get_crush()
999 pe = self.calc_eval(ms, plan.pools)
1000 min_score_to_optimize = self.get_module_option('min_score')
1001 if pe.score <= min_score_to_optimize:
1002 if pe.score == 0:
1003 detail = 'Distribution is already perfect'
1004 else:
1005 detail = 'score %f <= min_score %f, will not optimize' \
1006 % (pe.score, min_score_to_optimize)
1007 self.log.info(detail)
1008 return -errno.EALREADY, detail
1009
1010 # get current osd reweights
1011 orig_osd_weight = { a['osd']: a['weight']
1012 for a in ms.osdmap_dump.get('osds',[]) }
1013 reweighted_osds = [ a for a,b in six.iteritems(orig_osd_weight)
1014 if b < 1.0 and b > 0.0 ]
1015
1016 # get current compat weight-set weights
1017 orig_ws = self.get_compat_weight_set_weights(ms)
1018 if not orig_ws:
1019 return -errno.EAGAIN, 'compat weight-set not available'
1020 orig_ws = { a: b for a, b in six.iteritems(orig_ws) if a >= 0 }
1021
1022 # Make sure roots don't overlap their devices. If so, we
1023 # can't proceed.
1024 roots = list(pe.target_by_root.keys())
1025 self.log.debug('roots %s', roots)
1026 visited = {}
1027 overlap = {}
1028 root_ids = {}
1029 for root, wm in six.iteritems(pe.target_by_root):
1030 for osd in wm:
1031 if osd in visited:
1032 if osd not in overlap:
1033 overlap[osd] = [ visited[osd] ]
1034 overlap[osd].append(root)
1035 visited[osd] = root
1036 if len(overlap) > 0:
1037 detail = 'Some osds belong to multiple subtrees: %s' % \
1038 overlap
1039 self.log.error(detail)
1040 return -errno.EOPNOTSUPP, detail
1041
1042 # rebalance by pgs, objects, or bytes
1043 metrics = self.get_module_option('crush_compat_metrics').split(',')
1044 key = metrics[0] # balancing using the first score metric
1045 if key not in ['pgs', 'bytes', 'objects']:
1046 self.log.warn("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
1047 key = 'pgs'
1048
1049 # go
1050 best_ws = copy.deepcopy(orig_ws)
1051 best_ow = copy.deepcopy(orig_osd_weight)
1052 best_pe = pe
1053 left = max_iterations
1054 bad_steps = 0
1055 next_ws = copy.deepcopy(best_ws)
1056 next_ow = copy.deepcopy(best_ow)
1057 while left > 0:
1058 # adjust
1059 self.log.debug('best_ws %s' % best_ws)
1060 random.shuffle(roots)
1061 for root in roots:
1062 pools = best_pe.root_pools[root]
1063 osds = len(best_pe.target_by_root[root])
1064 min_pgs = osds * min_pg_per_osd
1065 if best_pe.total_by_root[root][key] < min_pgs:
1066 self.log.info('Skipping root %s (pools %s), total pgs %d '
1067 '< minimum %d (%d per osd)',
1068 root, pools,
1069 best_pe.total_by_root[root][key],
1070 min_pgs, min_pg_per_osd)
1071 continue
1072 self.log.info('Balancing root %s (pools %s) by %s' %
1073 (root, pools, key))
1074 target = best_pe.target_by_root[root]
1075 actual = best_pe.actual_by_root[root][key]
1076 queue = sorted(actual.keys(),
1077 key=lambda osd: -abs(target[osd] - actual[osd]))
1078 for osd in queue:
1079 if orig_osd_weight[osd] == 0:
1080 self.log.debug('skipping out osd.%d', osd)
1081 else:
1082 deviation = target[osd] - actual[osd]
1083 if deviation == 0:
1084 break
1085 self.log.debug('osd.%d deviation %f', osd, deviation)
1086 weight = best_ws[osd]
1087 ow = orig_osd_weight[osd]
1088 if actual[osd] > 0:
1089 calc_weight = target[osd] / actual[osd] * weight * ow
1090 else:
1091 # for newly created osds, reset calc_weight at target value
1092 # this way weight-set will end up absorbing *step* of its
1093 # target (final) value at the very beginning and slowly catch up later.
1094 # note that if this turns out causing too many misplaced
1095 # pgs, then we'll reduce step and retry
1096 calc_weight = target[osd]
1097 new_weight = weight * (1.0 - step) + calc_weight * step
1098 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
1099 new_weight)
1100 next_ws[osd] = new_weight
1101 if ow < 1.0:
1102 new_ow = min(1.0, max(step + (1.0 - step) * ow,
1103 ow + .005))
1104 self.log.debug('Reweight osd.%d reweight %f -> %f',
1105 osd, ow, new_ow)
1106 next_ow[osd] = new_ow
1107
1108 # normalize weights under this root
1109 root_weight = crush.get_item_weight(pe.root_ids[root])
1110 root_sum = sum(b for a,b in six.iteritems(next_ws)
1111 if a in target.keys())
1112 if root_sum > 0 and root_weight > 0:
1113 factor = root_sum / root_weight
1114 self.log.debug('normalizing root %s %d, weight %f, '
1115 'ws sum %f, factor %f',
1116 root, pe.root_ids[root], root_weight,
1117 root_sum, factor)
1118 for osd in actual.keys():
1119 next_ws[osd] = next_ws[osd] / factor
1120
1121 # recalc
1122 plan.compat_ws = copy.deepcopy(next_ws)
1123 next_ms = plan.final_state()
1124 next_pe = self.calc_eval(next_ms, plan.pools)
1125 next_misplaced = next_ms.calc_misplaced_from(ms)
1126 self.log.debug('Step result score %f -> %f, misplacing %f',
1127 best_pe.score, next_pe.score, next_misplaced)
1128
1129 if next_misplaced > max_misplaced:
1130 if best_pe.score < pe.score:
1131 self.log.debug('Step misplaced %f > max %f, stopping',
1132 next_misplaced, max_misplaced)
1133 break
1134 step /= 2.0
1135 next_ws = copy.deepcopy(best_ws)
1136 next_ow = copy.deepcopy(best_ow)
1137 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
1138 next_misplaced, max_misplaced, step)
1139 else:
1140 if next_pe.score > best_pe.score * 1.0001:
1141 bad_steps += 1
1142 if bad_steps < 5 and random.randint(0, 100) < 70:
1143 self.log.debug('Score got worse, taking another step')
1144 else:
1145 step /= 2.0
1146 next_ws = copy.deepcopy(best_ws)
1147 next_ow = copy.deepcopy(best_ow)
1148 self.log.debug('Score got worse, trying smaller step %f',
1149 step)
1150 else:
1151 bad_steps = 0
1152 best_pe = next_pe
1153 best_ws = copy.deepcopy(next_ws)
1154 best_ow = copy.deepcopy(next_ow)
1155 if best_pe.score == 0:
1156 break
1157 left -= 1
1158
1159 # allow a small regression if we are phasing out osd weights
1160 fudge = 0
1161 if best_ow != orig_osd_weight:
1162 fudge = .001
1163
1164 if best_pe.score < pe.score + fudge:
1165 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
1166 plan.compat_ws = best_ws
1167 for osd, w in six.iteritems(best_ow):
1168 if w != orig_osd_weight[osd]:
1169 self.log.debug('osd.%d reweight %f', osd, w)
1170 plan.osd_weights[osd] = w
1171 return 0, ''
1172 else:
1173 self.log.info('Failed to find further optimization, score %f',
1174 pe.score)
1175 plan.compat_ws = {}
1176 return -errno.EDOM, 'Unable to find further optimization, ' \
1177 'change balancer mode and retry might help'
1178
1179 def get_compat_weight_set_weights(self, ms):
1180 if not CRUSHMap.have_default_choose_args(ms.crush_dump):
1181 # enable compat weight-set first
1182 self.log.debug('ceph osd crush weight-set create-compat')
1183 result = CommandResult('')
1184 self.send_command(result, 'mon', '', json.dumps({
1185 'prefix': 'osd crush weight-set create-compat',
1186 'format': 'json',
1187 }), '')
1188 r, outb, outs = result.wait()
1189 if r != 0:
1190 self.log.error('Error creating compat weight-set')
1191 return
1192
1193 result = CommandResult('')
1194 self.send_command(result, 'mon', '', json.dumps({
1195 'prefix': 'osd crush dump',
1196 'format': 'json',
1197 }), '')
1198 r, outb, outs = result.wait()
1199 if r != 0:
1200 self.log.error('Error dumping crush map')
1201 return
1202 try:
1203 crushmap = json.loads(outb)
1204 except:
1205 raise RuntimeError('unable to parse crush map')
1206 else:
1207 crushmap = ms.crush_dump
1208
1209 raw = CRUSHMap.get_default_choose_args(crushmap)
1210 weight_set = {}
1211 for b in raw:
1212 bucket = None
1213 for t in crushmap['buckets']:
1214 if t['id'] == b['bucket_id']:
1215 bucket = t
1216 break
1217 if not bucket:
1218 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
1219 self.log.debug('bucket items %s' % bucket['items'])
1220 self.log.debug('weight set %s' % b['weight_set'][0])
1221 if len(bucket['items']) != len(b['weight_set'][0]):
1222 raise RuntimeError('weight-set size does not match bucket items')
1223 for pos in range(len(bucket['items'])):
1224 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
1225
1226 self.log.debug('weight_set weights %s' % weight_set)
1227 return weight_set
1228
1229 def do_crush(self):
1230 self.log.info('do_crush (not yet implemented)')
1231
1232 def do_osd_weight(self):
1233 self.log.info('do_osd_weight (not yet implemented)')
1234
1235 def execute(self, plan):
1236 self.log.info('Executing plan %s' % plan.name)
1237
1238 commands = []
1239
1240 # compat weight-set
1241 if len(plan.compat_ws) and \
1242 not CRUSHMap.have_default_choose_args(plan.initial.crush_dump):
1243 self.log.debug('ceph osd crush weight-set create-compat')
1244 result = CommandResult('')
1245 self.send_command(result, 'mon', '', json.dumps({
1246 'prefix': 'osd crush weight-set create-compat',
1247 'format': 'json',
1248 }), '')
1249 r, outb, outs = result.wait()
1250 if r != 0:
1251 self.log.error('Error creating compat weight-set')
1252 return r, outs
1253
1254 for osd, weight in six.iteritems(plan.compat_ws):
1255 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1256 osd, weight)
1257 result = CommandResult('')
1258 self.send_command(result, 'mon', '', json.dumps({
1259 'prefix': 'osd crush weight-set reweight-compat',
1260 'format': 'json',
1261 'item': 'osd.%d' % osd,
1262 'weight': [weight],
1263 }), '')
1264 commands.append(result)
1265
1266 # new_weight
1267 reweightn = {}
1268 for osd, weight in six.iteritems(plan.osd_weights):
1269 reweightn[str(osd)] = str(int(weight * float(0x10000)))
1270 if len(reweightn):
1271 self.log.info('ceph osd reweightn %s', reweightn)
1272 result = CommandResult('')
1273 self.send_command(result, 'mon', '', json.dumps({
1274 'prefix': 'osd reweightn',
1275 'format': 'json',
1276 'weights': json.dumps(reweightn),
1277 }), '')
1278 commands.append(result)
1279
1280 # upmap
1281 incdump = plan.inc.dump()
1282 for pgid in incdump.get('old_pg_upmap_items', []):
1283 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
1284 result = CommandResult('foo')
1285 self.send_command(result, 'mon', '', json.dumps({
1286 'prefix': 'osd rm-pg-upmap-items',
1287 'format': 'json',
1288 'pgid': pgid,
1289 }), 'foo')
1290 commands.append(result)
1291
1292 for item in incdump.get('new_pg_upmap_items', []):
1293 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
1294 item['mappings'])
1295 osdlist = []
1296 for m in item['mappings']:
1297 osdlist += [m['from'], m['to']]
1298 result = CommandResult('foo')
1299 self.send_command(result, 'mon', '', json.dumps({
1300 'prefix': 'osd pg-upmap-items',
1301 'format': 'json',
1302 'pgid': item['pgid'],
1303 'id': osdlist,
1304 }), 'foo')
1305 commands.append(result)
1306
1307 # wait for commands
1308 self.log.debug('commands %s' % commands)
1309 for result in commands:
1310 r, outb, outs = result.wait()
1311 if r != 0:
1312 self.log.error('execute error: r = %d, detail = %s' % (r, outs))
1313 return r, outs
1314 self.log.debug('done')
1315 return 0, ''
1316
1317 def gather_telemetry(self):
1318 return {
1319 'active': self.active,
1320 'mode': self.mode,
1321 }