]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
19d4b09cfe539a76d1e723fcf30038e1c61c5b79
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
1 """
2 Balance PG distribution across OSDs.
3 """
4
5 import copy
6 import enum
7 import errno
8 import json
9 import math
10 import random
11 import time
12 from mgr_module import CLIReadCommand, CLICommand, CommandResult, MgrModule, Option, OSDMap
13 from threading import Event
14 from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, Union
15 from mgr_module import CRUSHMap
16 import datetime
17
18 TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
19
20
21 class MappingState:
22 def __init__(self, osdmap, raw_pg_stats, raw_pool_stats, desc=''):
23 self.desc = desc
24 self.osdmap = osdmap
25 self.osdmap_dump = self.osdmap.dump()
26 self.crush = osdmap.get_crush()
27 self.crush_dump = self.crush.dump()
28 self.raw_pg_stats = raw_pg_stats
29 self.raw_pool_stats = raw_pool_stats
30 self.pg_stat = {
31 i['pgid']: i['stat_sum'] for i in raw_pg_stats.get('pg_stats', [])
32 }
33 osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
34 pg_poolids = [p['poolid'] for p in raw_pool_stats.get('pool_stats', [])]
35 self.poolids = set(osd_poolids) & set(pg_poolids)
36 self.pg_up = {}
37 self.pg_up_by_poolid = {}
38 for poolid in self.poolids:
39 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
40 for a, b in self.pg_up_by_poolid[poolid].items():
41 self.pg_up[a] = b
42
43 def calc_misplaced_from(self, other_ms):
44 num = len(other_ms.pg_up)
45 misplaced = 0
46 for pgid, before in other_ms.pg_up.items():
47 if before != self.pg_up.get(pgid, []):
48 misplaced += 1
49 if num > 0:
50 return float(misplaced) / float(num)
51 return 0.0
52
53
54 class Mode(enum.Enum):
55 none = 'none'
56 crush_compat = 'crush-compat'
57 upmap = 'upmap'
58
59
60 class Plan(object):
61 def __init__(self, name, mode, osdmap, pools):
62 self.name = name
63 self.mode = mode
64 self.osdmap = osdmap
65 self.osdmap_dump = osdmap.dump()
66 self.pools = pools
67 self.osd_weights = {}
68 self.compat_ws = {}
69 self.inc = osdmap.new_incremental()
70 self.pg_status = {}
71
72 def dump(self) -> str:
73 return json.dumps(self.inc.dump(), indent=4, sort_keys=True)
74
75 def show(self) -> str:
76 return 'upmap plan'
77
78
79 class MsPlan(Plan):
80 """
81 Plan with a preloaded MappingState member.
82 """
83
84 def __init__(self, name: str, mode: str, ms: MappingState, pools: List[str]) -> None:
85 super(MsPlan, self).__init__(name, mode, ms.osdmap, pools)
86 self.initial = ms
87
88 def final_state(self) -> MappingState:
89 self.inc.set_osd_reweights(self.osd_weights)
90 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
91 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
92 self.initial.raw_pg_stats,
93 self.initial.raw_pool_stats,
94 'plan %s final' % self.name)
95
96 def show(self) -> str:
97 ls = []
98 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
99 ls.append('# starting crush version %d' %
100 self.initial.osdmap.get_crush_version())
101 ls.append('# mode %s' % self.mode)
102 if len(self.compat_ws) and \
103 not CRUSHMap.have_default_choose_args(self.initial.crush_dump):
104 ls.append('ceph osd crush weight-set create-compat')
105 for osd, weight in self.compat_ws.items():
106 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
107 (osd, weight))
108 for osd, weight in self.osd_weights.items():
109 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
110 incdump = self.inc.dump()
111 for pgid in incdump.get('old_pg_upmap_items', []):
112 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
113 for item in incdump.get('new_pg_upmap_items', []):
114 osdlist = []
115 for m in item['mappings']:
116 osdlist += [m['from'], m['to']]
117 ls.append('ceph osd pg-upmap-items %s %s' %
118 (item['pgid'], ' '.join([str(a) for a in osdlist])))
119 return '\n'.join(ls)
120
121
122 class Eval:
123 def __init__(self, ms: MappingState):
124 self.ms = ms
125 self.root_ids: Dict[str, int] = {} # root name -> id
126 self.pool_name: Dict[str, str] = {} # pool id -> pool name
127 self.pool_id: Dict[str, int] = {} # pool name -> id
128 self.pool_roots: Dict[str, List[str]] = {} # pool name -> root name
129 self.root_pools: Dict[str, List[str]] = {} # root name -> pools
130 self.target_by_root: Dict[str, Dict[int, float]] = {} # root name -> target weight map
131 self.count_by_pool: Dict[str, dict] = {}
132 self.count_by_root: Dict[str, dict] = {}
133 self.actual_by_pool: Dict[str, dict] = {} # pool -> by_* -> actual weight map
134 self.actual_by_root: Dict[str, dict] = {} # pool -> by_* -> actual weight map
135 self.total_by_pool: Dict[str, dict] = {} # pool -> by_* -> total
136 self.total_by_root: Dict[str, dict] = {} # root -> by_* -> total
137 self.stats_by_pool: Dict[str, dict] = {} # pool -> by_* -> stddev or avg -> value
138 self.stats_by_root: Dict[str, dict] = {} # root -> by_* -> stddev or avg -> value
139
140 self.score_by_pool: Dict[str, float] = {}
141 self.score_by_root: Dict[str, Dict[str, float]] = {}
142
143 self.score = 0.0
144
145 def show(self, verbose: bool = False) -> str:
146 if verbose:
147 r = self.ms.desc + '\n'
148 r += 'target_by_root %s\n' % self.target_by_root
149 r += 'actual_by_pool %s\n' % self.actual_by_pool
150 r += 'actual_by_root %s\n' % self.actual_by_root
151 r += 'count_by_pool %s\n' % self.count_by_pool
152 r += 'count_by_root %s\n' % self.count_by_root
153 r += 'total_by_pool %s\n' % self.total_by_pool
154 r += 'total_by_root %s\n' % self.total_by_root
155 r += 'stats_by_root %s\n' % self.stats_by_root
156 r += 'score_by_pool %s\n' % self.score_by_pool
157 r += 'score_by_root %s\n' % self.score_by_root
158 else:
159 r = self.ms.desc + ' '
160 r += 'score %f (lower is better)\n' % self.score
161 return r
162
163 def calc_stats(self, count, target, total):
164 num = max(len(target), 1)
165 r: Dict[str, Dict[str, Union[int, float]]] = {}
166 for t in ('pgs', 'objects', 'bytes'):
167 if total[t] == 0:
168 r[t] = {
169 'max': 0,
170 'min': 0,
171 'avg': 0,
172 'stddev': 0,
173 'sum_weight': 0,
174 'score': 0,
175 }
176 continue
177
178 avg = float(total[t]) / float(num)
179 dev = 0.0
180
181 # score is a measure of how uneven the data distribution is.
182 # score lies between [0, 1), 0 means perfect distribution.
183 score = 0.0
184 sum_weight = 0.0
185
186 for k, v in count[t].items():
187 # adjust/normalize by weight
188 if target[k]:
189 adjusted = float(v) / target[k] / float(num)
190 else:
191 adjusted = 0.0
192
193 # Overweighted devices and their weights are factors to calculate reweight_urgency.
194 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
195 # situation than one 10% overfilled with 5 2% underfilled devices
196 if adjusted > avg:
197 '''
198 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
199 x = (adjusted - avg)/avg.
200 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
201 To bring range of F(x) in range [0, 1), we need to make the above modification.
202
203 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
204 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
205 2. A larger value of x, should imply more urgency to reweight.
206 3. Also, the difference between F(x) when x is large, should be minimal.
207 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
208
209 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
210
211 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
212 '''
213 score += target[k] * (math.erf(((adjusted - avg) / avg) / math.sqrt(2.0)))
214 sum_weight += target[k]
215 dev += (avg - adjusted) * (avg - adjusted)
216 stddev = math.sqrt(dev / float(max(num - 1, 1)))
217 score = score / max(sum_weight, 1)
218 r[t] = {
219 'max': max(count[t].values()),
220 'min': min(count[t].values()),
221 'avg': avg,
222 'stddev': stddev,
223 'sum_weight': sum_weight,
224 'score': score,
225 }
226 return r
227
228
229 class Module(MgrModule):
230 MODULE_OPTIONS = [
231 Option(name='active',
232 type='bool',
233 default=True,
234 desc='automatically balance PGs across cluster',
235 runtime=True),
236 Option(name='begin_time',
237 type='str',
238 default='0000',
239 desc='beginning time of day to automatically balance',
240 long_desc='This is a time of day in the format HHMM.',
241 runtime=True),
242 Option(name='end_time',
243 type='str',
244 default='2400',
245 desc='ending time of day to automatically balance',
246 long_desc='This is a time of day in the format HHMM.',
247 runtime=True),
248 Option(name='begin_weekday',
249 type='uint',
250 default=0,
251 min=0,
252 max=7,
253 desc='Restrict automatic balancing to this day of the week or later',
254 long_desc='0 or 7 = Sunday, 1 = Monday, etc.',
255 runtime=True),
256 Option(name='end_weekday',
257 type='uint',
258 default=7,
259 min=0,
260 max=7,
261 desc='Restrict automatic balancing to days of the week earlier than this',
262 long_desc='0 or 7 = Sunday, 1 = Monday, etc.',
263 runtime=True),
264 Option(name='crush_compat_max_iterations',
265 type='uint',
266 default=25,
267 min=1,
268 max=250,
269 desc='maximum number of iterations to attempt optimization',
270 runtime=True),
271 Option(name='crush_compat_metrics',
272 type='str',
273 default='pgs,objects,bytes',
274 desc='metrics with which to calculate OSD utilization',
275 long_desc='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
276 runtime=True),
277 Option(name='crush_compat_step',
278 type='float',
279 default=.5,
280 min=.001,
281 max=.999,
282 desc='aggressiveness of optimization',
283 long_desc='.99 is very aggressive, .01 is less aggressive',
284 runtime=True),
285 Option(name='min_score',
286 type='float',
287 default=0,
288 desc='minimum score, below which no optimization is attempted',
289 runtime=True),
290 Option(name='mode',
291 desc='Balancer mode',
292 default='upmap',
293 enum_allowed=['none', 'crush-compat', 'upmap'],
294 runtime=True),
295 Option(name='sleep_interval',
296 type='secs',
297 default=60,
298 desc='how frequently to wake up and attempt optimization',
299 runtime=True),
300 Option(name='upmap_max_optimizations',
301 type='uint',
302 default=10,
303 desc='maximum upmap optimizations to make per attempt',
304 runtime=True),
305 Option(name='upmap_max_deviation',
306 type='int',
307 default=5,
308 min=1,
309 desc='deviation below which no optimization is attempted',
310 long_desc='If the number of PGs are within this count then no optimization is attempted',
311 runtime=True),
312 Option(name='pool_ids',
313 type='str',
314 default='',
315 desc='pools which the automatic balancing will be limited to',
316 runtime=True)
317 ]
318
319 active = False
320 run = True
321 plans: Dict[str, Plan] = {}
322 mode = ''
323 optimizing = False
324 last_optimize_started = ''
325 last_optimize_duration = ''
326 optimize_result = ''
327 no_optimization_needed = False
328 success_string = 'Optimization plan created successfully'
329 in_progress_string = 'in progress'
330
331 def __init__(self, *args: Any, **kwargs: Any) -> None:
332 super(Module, self).__init__(*args, **kwargs)
333 self.event = Event()
334
335 @CLIReadCommand('balancer status')
336 def show_status(self) -> Tuple[int, str, str]:
337 """
338 Show balancer status
339 """
340 s = {
341 'plans': list(self.plans.keys()),
342 'active': self.active,
343 'last_optimize_started': self.last_optimize_started,
344 'last_optimize_duration': self.last_optimize_duration,
345 'optimize_result': self.optimize_result,
346 'no_optimization_needed': self.no_optimization_needed,
347 'mode': self.get_module_option('mode'),
348 }
349 return (0, json.dumps(s, indent=4, sort_keys=True), '')
350
351 @CLICommand('balancer mode')
352 def set_mode(self, mode: Mode) -> Tuple[int, str, str]:
353 """
354 Set balancer mode
355 """
356 if mode == Mode.upmap:
357 min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
358 if min_compat_client < 'luminous': # works well because version is alphabetized..
359 warn = ('min_compat_client "%s" '
360 '< "luminous", which is required for pg-upmap. '
361 'Try "ceph osd set-require-min-compat-client luminous" '
362 'before enabling this mode' % min_compat_client)
363 return (-errno.EPERM, '', warn)
364 elif mode == Mode.crush_compat:
365 ms = MappingState(self.get_osdmap(),
366 self.get("pg_stats"),
367 self.get("pool_stats"),
368 'initialize compat weight-set')
369 self.get_compat_weight_set_weights(ms) # ignore error
370 self.set_module_option('mode', mode.value)
371 return (0, '', '')
372
373 @CLICommand('balancer on')
374 def on(self) -> Tuple[int, str, str]:
375 """
376 Enable automatic balancing
377 """
378 if not self.active:
379 self.set_module_option('active', 'true')
380 self.active = True
381 self.event.set()
382 return (0, '', '')
383
384 @CLICommand('balancer off')
385 def off(self) -> Tuple[int, str, str]:
386 """
387 Disable automatic balancing
388 """
389 if self.active:
390 self.set_module_option('active', 'false')
391 self.active = False
392 self.event.set()
393 return (0, '', '')
394
395 @CLIReadCommand('balancer pool ls')
396 def pool_ls(self) -> Tuple[int, str, str]:
397 """
398 List automatic balancing pools
399
400 Note that empty list means all existing pools will be automatic balancing targets,
401 which is the default behaviour of balancer.
402 """
403 pool_ids = cast(str, self.get_module_option('pool_ids'))
404 if pool_ids == '':
405 return (0, '', '')
406 pool_ids = [int(p) for p in pool_ids.split(',')]
407 pool_name_by_id = dict((p['pool'], p['pool_name'])
408 for p in self.get_osdmap().dump().get('pools', []))
409 should_prune = False
410 final_ids: List[int] = []
411 final_names = []
412 for p in pool_ids:
413 if p in pool_name_by_id:
414 final_ids.append(p)
415 final_names.append(pool_name_by_id[p])
416 else:
417 should_prune = True
418 if should_prune: # some pools were gone, prune
419 self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids))
420 return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
421
422 @CLICommand('balancer pool add')
423 def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]:
424 """
425 Enable automatic balancing for specific pools
426 """
427 raw_names = pools
428 pool_id_by_name = dict((p['pool_name'], p['pool'])
429 for p in self.get_osdmap().dump().get('pools', []))
430 invalid_names = [p for p in raw_names if p not in pool_id_by_name]
431 if invalid_names:
432 return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
433 to_add = set(str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name)
434 pool_ids = cast(str, self.get_module_option('pool_ids'))
435 existing = set(pool_ids.split(',') if pool_ids else [])
436 final = to_add | existing
437 self.set_module_option('pool_ids', ','.join(final))
438 return (0, '', '')
439
440 @CLICommand('balancer pool rm')
441 def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]:
442 """
443 Disable automatic balancing for specific pools
444 """
445 raw_names = pools
446 existing = cast(str, self.get_module_option('pool_ids'))
447 if existing == '': # for idempotence
448 return (0, '', '')
449 existing = existing.split(',')
450 osdmap = self.get_osdmap()
451 pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
452 pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
453 final = [p for p in existing if p in pool_ids]
454 to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
455 final = set(final) - set(to_delete)
456 self.set_module_option('pool_ids', ','.join(final))
457 return (0, '', '')
458
459 def _state_from_option(self, option: Optional[str] = None) -> Tuple[MappingState, List[str]]:
460 pools = []
461 if option is None:
462 ms = MappingState(self.get_osdmap(),
463 self.get("pg_stats"),
464 self.get("pool_stats"),
465 'current cluster')
466 elif option in self.plans:
467 plan = self.plans.get(option)
468 assert plan
469 pools = plan.pools
470 if plan.mode == 'upmap':
471 # Note that for upmap, to improve the efficiency,
472 # we use a basic version of Plan without keeping the obvious
473 # *redundant* MS member.
474 # Hence ms might not be accurate here since we are basically
475 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
476 # It should not be a big deal though..
477 ms = MappingState(plan.osdmap,
478 self.get("pg_stats"),
479 self.get("pool_stats"),
480 f'plan "{plan.name}"')
481 else:
482 ms = cast(MsPlan, plan).final_state()
483 else:
484 # not a plan, does it look like a pool?
485 osdmap = self.get_osdmap()
486 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
487 if option not in valid_pool_names:
488 raise ValueError(f'option "{option}" not a plan or a pool')
489 pools.append(option)
490 ms = MappingState(osdmap,
491 self.get("pg_stats"),
492 self.get("pool_stats"),
493 f'pool "{option}"')
494 return ms, pools
495
496 @CLIReadCommand('balancer eval-verbose')
497 def plan_eval_verbose(self, option: Optional[str] = None):
498 """
499 Evaluate data distribution for the current cluster or specific pool or specific
500 plan (verbosely)
501 """
502 try:
503 ms, pools = self._state_from_option(option)
504 return (0, self.evaluate(ms, pools, verbose=True), '')
505 except ValueError as e:
506 return (-errno.EINVAL, '', str(e))
507
508 @CLIReadCommand('balancer eval')
509 def plan_eval_brief(self, option: Optional[str] = None):
510 """
511 Evaluate data distribution for the current cluster or specific pool or specific plan
512 """
513 try:
514 ms, pools = self._state_from_option(option)
515 return (0, self.evaluate(ms, pools, verbose=False), '')
516 except ValueError as e:
517 return (-errno.EINVAL, '', str(e))
518
519 @CLIReadCommand('balancer optimize')
520 def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]:
521 """
522 Run optimizer to create a new plan
523 """
524 # The GIL can be release by the active balancer, so disallow when active
525 if self.active:
526 return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
527 if self.optimizing:
528 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
529 osdmap = self.get_osdmap()
530 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
531 invalid_pool_names = []
532 for p in pools:
533 if p not in valid_pool_names:
534 invalid_pool_names.append(p)
535 if len(invalid_pool_names):
536 return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
537 plan_ = self.plan_create(plan, osdmap, pools)
538 self.last_optimize_started = time.asctime(time.localtime())
539 self.optimize_result = self.in_progress_string
540 start = time.time()
541 r, detail = self.optimize(plan_)
542 end = time.time()
543 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
544 if r == 0:
545 # Add plan if an optimization was created
546 self.optimize_result = self.success_string
547 self.plans[plan] = plan_
548 else:
549 self.optimize_result = detail
550 return (r, '', detail)
551
552 @CLIReadCommand('balancer show')
553 def plan_show(self, plan: str) -> Tuple[int, str, str]:
554 """
555 Show details of an optimization plan
556 """
557 plan_ = self.plans.get(plan)
558 if not plan_:
559 return (-errno.ENOENT, '', f'plan {plan} not found')
560 return (0, plan_.show(), '')
561
562 @CLICommand('balancer rm')
563 def plan_rm(self, plan: str) -> Tuple[int, str, str]:
564 """
565 Discard an optimization plan
566 """
567 if plan in self.plans:
568 del self.plans[plan]
569 return (0, '', '')
570
571 @CLICommand('balancer reset')
572 def plan_reset(self) -> Tuple[int, str, str]:
573 """
574 Discard all optimization plans
575 """
576 self.plans = {}
577 return (0, '', '')
578
579 @CLIReadCommand('balancer dump')
580 def plan_dump(self, plan: str) -> Tuple[int, str, str]:
581 """
582 Show an optimization plan
583 """
584 plan_ = self.plans.get(plan)
585 if not plan_:
586 return -errno.ENOENT, '', f'plan {plan} not found'
587 else:
588 return (0, plan_.dump(), '')
589
590 @CLIReadCommand('balancer ls')
591 def plan_ls(self) -> Tuple[int, str, str]:
592 """
593 List all plans
594 """
595 return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
596
597 @CLIReadCommand('balancer execute')
598 def plan_execute(self, plan: str) -> Tuple[int, str, str]:
599 """
600 Execute an optimization plan
601 """
602 # The GIL can be release by the active balancer, so disallow when active
603 if self.active:
604 return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
605 if self.optimizing:
606 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
607 plan_ = self.plans.get(plan)
608 if not plan_:
609 return (-errno.ENOENT, '', f'plan {plan} not found')
610 r, detail = self.execute(plan_)
611 self.plan_rm(plan)
612 return (r, '', detail)
613
614 def shutdown(self) -> None:
615 self.log.info('Stopping')
616 self.run = False
617 self.event.set()
618
619 def time_permit(self) -> bool:
620 local_time = time.localtime()
621 time_of_day = time.strftime('%H%M', local_time)
622 weekday = (local_time.tm_wday + 1) % 7 # be compatible with C
623 permit = False
624
625 begin_time = cast(str, self.get_module_option('begin_time'))
626 end_time = cast(str, self.get_module_option('end_time'))
627 if begin_time <= end_time:
628 permit = begin_time <= time_of_day < end_time
629 else:
630 permit = time_of_day >= begin_time or time_of_day < end_time
631 if not permit:
632 self.log.debug("should run between %s - %s, now %s, skipping",
633 begin_time, end_time, time_of_day)
634 return False
635
636 begin_weekday = cast(int, self.get_module_option('begin_weekday'))
637 end_weekday = cast(int, self.get_module_option('end_weekday'))
638 if begin_weekday <= end_weekday:
639 permit = begin_weekday <= weekday < end_weekday
640 else:
641 permit = weekday >= begin_weekday or weekday < end_weekday
642 if not permit:
643 self.log.debug("should run between weekday %d - %d, now %d, skipping",
644 begin_weekday, end_weekday, weekday)
645 return False
646
647 return True
648
649 def serve(self) -> None:
650 self.log.info('Starting')
651 while self.run:
652 self.active = cast(bool, self.get_module_option('active'))
653 sleep_interval = cast(float, self.get_module_option('sleep_interval'))
654 self.log.debug('Waking up [%s, now %s]',
655 "active" if self.active else "inactive",
656 time.strftime(TIME_FORMAT, time.localtime()))
657 if self.active and self.time_permit():
658 self.log.debug('Running')
659 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
660 osdmap = self.get_osdmap()
661 pool_ids = cast(str, self.get_module_option('pool_ids'))
662 if pool_ids:
663 allow = [int(p) for p in pool_ids.split(',')]
664 else:
665 allow = []
666 final: List[str] = []
667 if allow:
668 pools = osdmap.dump().get('pools', [])
669 valid = [p['pool'] for p in pools]
670 ids = set(allow) & set(valid)
671 if set(allow) - set(valid): # some pools were gone, prune
672 self.set_module_option('pool_ids', ','.join(str(p) for p in ids))
673 pool_name_by_id = dict((p['pool'], p['pool_name']) for p in pools)
674 final = [pool_name_by_id[p] for p in ids if p in pool_name_by_id]
675 plan = self.plan_create(name, osdmap, final)
676 self.optimizing = True
677 self.last_optimize_started = time.asctime(time.localtime())
678 self.optimize_result = self.in_progress_string
679 start = time.time()
680 r, detail = self.optimize(plan)
681 end = time.time()
682 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
683 if r == 0:
684 self.optimize_result = self.success_string
685 self.execute(plan)
686 else:
687 self.optimize_result = detail
688 self.optimizing = False
689 self.log.debug('Sleeping for %d', sleep_interval)
690 self.event.wait(sleep_interval)
691 self.event.clear()
692
693 def plan_create(self, name: str, osdmap: OSDMap, pools: List[str]) -> Plan:
694 mode = cast(str, self.get_module_option('mode'))
695 if mode == 'upmap':
696 # drop unnecessary MS member for upmap mode.
697 # this way we could effectively eliminate the usage of a
698 # complete pg_stats, which can become horribly inefficient
699 # as pg_num grows..
700 plan = Plan(name, mode, osdmap, pools)
701 else:
702 plan = MsPlan(name,
703 mode,
704 MappingState(osdmap,
705 self.get("pg_stats"),
706 self.get("pool_stats"),
707 'plan %s initial' % name),
708 pools)
709 return plan
710
711 def calc_eval(self, ms: MappingState, pools: List[str]) -> Eval:
712 pe = Eval(ms)
713 pool_rule = {}
714 pool_info = {}
715 for p in ms.osdmap_dump.get('pools', []):
716 if len(pools) and p['pool_name'] not in pools:
717 continue
718 # skip dead or not-yet-ready pools too
719 if p['pool'] not in ms.poolids:
720 continue
721 pe.pool_name[p['pool']] = p['pool_name']
722 pe.pool_id[p['pool_name']] = p['pool']
723 pool_rule[p['pool_name']] = p['crush_rule']
724 pe.pool_roots[p['pool_name']] = []
725 pool_info[p['pool_name']] = p
726 if len(pool_info) == 0:
727 return pe
728 self.log.debug('pool_name %s' % pe.pool_name)
729 self.log.debug('pool_id %s' % pe.pool_id)
730 self.log.debug('pools %s' % pools)
731 self.log.debug('pool_rule %s' % pool_rule)
732
733 osd_weight = {a['osd']: a['weight']
734 for a in ms.osdmap_dump.get('osds', []) if a['weight'] > 0}
735
736 # get expected distributions by root
737 actual_by_root: Dict[str, Dict[str, dict]] = {}
738 rootids = ms.crush.find_takes()
739 roots = []
740 for rootid in rootids:
741 ls = ms.osdmap.get_pools_by_take(rootid)
742 want = []
743 # find out roots associating with pools we are passed in
744 for candidate in ls:
745 if candidate in pe.pool_name:
746 want.append(candidate)
747 if len(want) == 0:
748 continue
749 root = ms.crush.get_item_name(rootid)
750 pe.root_pools[root] = []
751 for poolid in want:
752 pe.pool_roots[pe.pool_name[poolid]].append(root)
753 pe.root_pools[root].append(pe.pool_name[poolid])
754 pe.root_ids[root] = rootid
755 roots.append(root)
756 weight_map = ms.crush.get_take_weight_osd_map(rootid)
757 adjusted_map = {
758 osd: cw * osd_weight[osd]
759 for osd, cw in weight_map.items() if osd in osd_weight and cw > 0
760 }
761 sum_w = sum(adjusted_map.values())
762 assert len(adjusted_map) == 0 or sum_w > 0
763 pe.target_by_root[root] = {osd: w / sum_w
764 for osd, w in adjusted_map.items()}
765 actual_by_root[root] = {
766 'pgs': {},
767 'objects': {},
768 'bytes': {},
769 }
770 for osd in pe.target_by_root[root]:
771 actual_by_root[root]['pgs'][osd] = 0
772 actual_by_root[root]['objects'][osd] = 0
773 actual_by_root[root]['bytes'][osd] = 0
774 pe.total_by_root[root] = {
775 'pgs': 0,
776 'objects': 0,
777 'bytes': 0,
778 }
779 self.log.debug('pool_roots %s' % pe.pool_roots)
780 self.log.debug('root_pools %s' % pe.root_pools)
781 self.log.debug('target_by_root %s' % pe.target_by_root)
782
783 # pool and root actual
784 for pool, pi in pool_info.items():
785 poolid = pi['pool']
786 pm = ms.pg_up_by_poolid[poolid]
787 pgs = 0
788 objects = 0
789 bytes = 0
790 pgs_by_osd = {}
791 objects_by_osd = {}
792 bytes_by_osd = {}
793 for pgid, up in pm.items():
794 for osd in [int(osd) for osd in up]:
795 if osd == CRUSHMap.ITEM_NONE:
796 continue
797 if osd not in pgs_by_osd:
798 pgs_by_osd[osd] = 0
799 objects_by_osd[osd] = 0
800 bytes_by_osd[osd] = 0
801 pgs_by_osd[osd] += 1
802 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
803 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
804 # pick a root to associate this pg instance with.
805 # note that this is imprecise if the roots have
806 # overlapping children.
807 # FIXME: divide bytes by k for EC pools.
808 for root in pe.pool_roots[pool]:
809 if osd in pe.target_by_root[root]:
810 actual_by_root[root]['pgs'][osd] += 1
811 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
812 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
813 pgs += 1
814 objects += ms.pg_stat[pgid]['num_objects']
815 bytes += ms.pg_stat[pgid]['num_bytes']
816 pe.total_by_root[root]['pgs'] += 1
817 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
818 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
819 break
820 pe.count_by_pool[pool] = {
821 'pgs': {
822 k: v
823 for k, v in pgs_by_osd.items()
824 },
825 'objects': {
826 k: v
827 for k, v in objects_by_osd.items()
828 },
829 'bytes': {
830 k: v
831 for k, v in bytes_by_osd.items()
832 },
833 }
834 pe.actual_by_pool[pool] = {
835 'pgs': {
836 k: float(v) / float(max(pgs, 1))
837 for k, v in pgs_by_osd.items()
838 },
839 'objects': {
840 k: float(v) / float(max(objects, 1))
841 for k, v in objects_by_osd.items()
842 },
843 'bytes': {
844 k: float(v) / float(max(bytes, 1))
845 for k, v in bytes_by_osd.items()
846 },
847 }
848 pe.total_by_pool[pool] = {
849 'pgs': pgs,
850 'objects': objects,
851 'bytes': bytes,
852 }
853 for root in pe.total_by_root:
854 pe.count_by_root[root] = {
855 'pgs': {
856 k: float(v)
857 for k, v in actual_by_root[root]['pgs'].items()
858 },
859 'objects': {
860 k: float(v)
861 for k, v in actual_by_root[root]['objects'].items()
862 },
863 'bytes': {
864 k: float(v)
865 for k, v in actual_by_root[root]['bytes'].items()
866 },
867 }
868 pe.actual_by_root[root] = {
869 'pgs': {
870 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
871 for k, v in actual_by_root[root]['pgs'].items()
872 },
873 'objects': {
874 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
875 for k, v in actual_by_root[root]['objects'].items()
876 },
877 'bytes': {
878 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
879 for k, v in actual_by_root[root]['bytes'].items()
880 },
881 }
882 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
883 self.log.debug('actual_by_root %s' % pe.actual_by_root)
884
885 # average and stddev and score
886 pe.stats_by_root = {
887 a: pe.calc_stats(
888 b,
889 pe.target_by_root[a],
890 pe.total_by_root[a]
891 ) for a, b in pe.count_by_root.items()
892 }
893 self.log.debug('stats_by_root %s' % pe.stats_by_root)
894
895 # the scores are already normalized
896 pe.score_by_root = {
897 r: {
898 'pgs': pe.stats_by_root[r]['pgs']['score'],
899 'objects': pe.stats_by_root[r]['objects']['score'],
900 'bytes': pe.stats_by_root[r]['bytes']['score'],
901 } for r in pe.total_by_root.keys()
902 }
903 self.log.debug('score_by_root %s' % pe.score_by_root)
904
905 # get the list of score metrics, comma separated
906 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
907
908 # total score is just average of normalized stddevs
909 pe.score = 0.0
910 for r, vs in pe.score_by_root.items():
911 for k, v in vs.items():
912 if k in metrics:
913 pe.score += v
914 pe.score /= len(metrics) * len(roots)
915 return pe
916
917 def evaluate(self, ms: MappingState, pools: List[str], verbose: bool = False) -> str:
918 pe = self.calc_eval(ms, pools)
919 return pe.show(verbose=verbose)
920
921 def optimize(self, plan: Plan) -> Tuple[int, str]:
922 self.log.info('Optimize plan %s' % plan.name)
923 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
924 self.log.info('Mode %s, max misplaced %f' %
925 (plan.mode, max_misplaced))
926
927 info = self.get('pg_status')
928 unknown = info.get('unknown_pgs_ratio', 0.0)
929 degraded = info.get('degraded_ratio', 0.0)
930 inactive = info.get('inactive_pgs_ratio', 0.0)
931 misplaced = info.get('misplaced_ratio', 0.0)
932 plan.pg_status = info
933 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
934 unknown, degraded, inactive, misplaced)
935 if unknown > 0.0:
936 detail = 'Some PGs (%f) are unknown; try again later' % unknown
937 self.log.info(detail)
938 return -errno.EAGAIN, detail
939 elif degraded > 0.0:
940 detail = 'Some objects (%f) are degraded; try again later' % degraded
941 self.log.info(detail)
942 return -errno.EAGAIN, detail
943 elif inactive > 0.0:
944 detail = 'Some PGs (%f) are inactive; try again later' % inactive
945 self.log.info(detail)
946 return -errno.EAGAIN, detail
947 elif misplaced >= max_misplaced:
948 detail = 'Too many objects (%f > %f) are misplaced; ' \
949 'try again later' % (misplaced, max_misplaced)
950 self.log.info(detail)
951 return -errno.EAGAIN, detail
952 else:
953 if plan.mode == 'upmap':
954 return self.do_upmap(plan)
955 elif plan.mode == 'crush-compat':
956 return self.do_crush_compat(cast(MsPlan, plan))
957 elif plan.mode == 'none':
958 detail = 'Please do "ceph balancer mode" to choose a valid mode first'
959 self.log.info('Idle')
960 return -errno.ENOEXEC, detail
961 else:
962 detail = 'Unrecognized mode %s' % plan.mode
963 self.log.info(detail)
964 return -errno.EINVAL, detail
965
966 def do_upmap(self, plan: Plan) -> Tuple[int, str]:
967 self.log.info('do_upmap')
968 max_optimizations = cast(float, self.get_module_option('upmap_max_optimizations'))
969 max_deviation = cast(int, self.get_module_option('upmap_max_deviation'))
970 osdmap_dump = plan.osdmap_dump
971
972 if len(plan.pools):
973 pools = plan.pools
974 else: # all
975 pools = [str(i['pool_name']) for i in osdmap_dump.get('pools', [])]
976 if len(pools) == 0:
977 detail = 'No pools available'
978 self.log.info(detail)
979 return -errno.ENOENT, detail
980 # shuffle pool list so they all get equal (in)attention
981 random.shuffle(pools)
982 self.log.info('pools %s' % pools)
983
984 adjusted_pools = []
985 inc = plan.inc
986 total_did = 0
987 left = max_optimizations
988 pools_with_pg_merge = [p['pool_name'] for p in osdmap_dump.get('pools', [])
989 if p['pg_num'] > p['pg_num_target']]
990 crush_rule_by_pool_name = dict((p['pool_name'], p['crush_rule'])
991 for p in osdmap_dump.get('pools', []))
992 for pool in pools:
993 if pool not in crush_rule_by_pool_name:
994 self.log.info('pool %s does not exist' % pool)
995 continue
996 if pool in pools_with_pg_merge:
997 self.log.info('pool %s has pending PG(s) for merging, skipping for now' % pool)
998 continue
999 adjusted_pools.append(pool)
1000 # shuffle so all pools get equal (in)attention
1001 random.shuffle(adjusted_pools)
1002 pool_dump = osdmap_dump.get('pools', [])
1003 for pool in adjusted_pools:
1004 for p in pool_dump:
1005 if p['pool_name'] == pool:
1006 pool_id = p['pool']
1007 break
1008
1009 # note that here we deliberately exclude any scrubbing pgs too
1010 # since scrubbing activities have significant impacts on performance
1011 num_pg_active_clean = 0
1012 for p in plan.pg_status.get('pgs_by_pool_state', []):
1013 pgs_pool_id = p['pool_id']
1014 if pgs_pool_id != pool_id:
1015 continue
1016 for s in p['pg_state_counts']:
1017 if s['state_name'] == 'active+clean':
1018 num_pg_active_clean += s['count']
1019 break
1020 available = min(left, num_pg_active_clean)
1021 did = plan.osdmap.calc_pg_upmaps(inc, max_deviation, available, [pool])
1022 total_did += did
1023 left -= did
1024 if left <= 0:
1025 break
1026 self.log.info('prepared %d/%d changes' % (total_did, max_optimizations))
1027 if total_did == 0:
1028 self.no_optimization_needed = True
1029 return -errno.EALREADY, 'Unable to find further optimization, ' \
1030 'or pool(s) pg_num is decreasing, ' \
1031 'or distribution is already perfect'
1032 return 0, ''
1033
1034 def do_crush_compat(self, plan: MsPlan) -> Tuple[int, str]:
1035 self.log.info('do_crush_compat')
1036 max_iterations = cast(int, self.get_module_option('crush_compat_max_iterations'))
1037 if max_iterations < 1:
1038 return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
1039 step = cast(float, self.get_module_option('crush_compat_step'))
1040 if step <= 0 or step >= 1.0:
1041 return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
1042 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
1043 min_pg_per_osd = 2
1044
1045 ms = plan.initial
1046 osdmap = ms.osdmap
1047 crush = osdmap.get_crush()
1048 pe = self.calc_eval(ms, plan.pools)
1049 min_score_to_optimize = cast(float, self.get_module_option('min_score'))
1050 if pe.score <= min_score_to_optimize:
1051 if pe.score == 0:
1052 detail = 'Distribution is already perfect'
1053 else:
1054 detail = 'score %f <= min_score %f, will not optimize' \
1055 % (pe.score, min_score_to_optimize)
1056 self.log.info(detail)
1057 return -errno.EALREADY, detail
1058
1059 # get current osd reweights
1060 orig_osd_weight = {a['osd']: a['weight']
1061 for a in ms.osdmap_dump.get('osds', [])}
1062
1063 # get current compat weight-set weights
1064 orig_ws = self.get_compat_weight_set_weights(ms)
1065 if not orig_ws:
1066 return -errno.EAGAIN, 'compat weight-set not available'
1067 orig_ws = {a: b for a, b in orig_ws.items() if a >= 0}
1068
1069 # Make sure roots don't overlap their devices. If so, we
1070 # can't proceed.
1071 roots = list(pe.target_by_root.keys())
1072 self.log.debug('roots %s', roots)
1073 visited: Dict[int, str] = {}
1074 overlap: Dict[int, List[str]] = {}
1075 for root, wm in pe.target_by_root.items():
1076 for osd in wm:
1077 if osd in visited:
1078 if osd not in overlap:
1079 overlap[osd] = [visited[osd]]
1080 overlap[osd].append(root)
1081 visited[osd] = root
1082 if len(overlap) > 0:
1083 detail = 'Some osds belong to multiple subtrees: %s' % \
1084 overlap
1085 self.log.error(detail)
1086 return -errno.EOPNOTSUPP, detail
1087
1088 # rebalance by pgs, objects, or bytes
1089 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
1090 key = metrics[0] # balancing using the first score metric
1091 if key not in ['pgs', 'bytes', 'objects']:
1092 self.log.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
1093 key = 'pgs'
1094
1095 # go
1096 best_ws = copy.deepcopy(orig_ws)
1097 best_ow = copy.deepcopy(orig_osd_weight)
1098 best_pe = pe
1099 left = max_iterations
1100 bad_steps = 0
1101 next_ws = copy.deepcopy(best_ws)
1102 next_ow = copy.deepcopy(best_ow)
1103 while left > 0:
1104 # adjust
1105 self.log.debug('best_ws %s' % best_ws)
1106 random.shuffle(roots)
1107 for root in roots:
1108 pools = best_pe.root_pools[root]
1109 osds = len(best_pe.target_by_root[root])
1110 min_pgs = osds * min_pg_per_osd
1111 if best_pe.total_by_root[root][key] < min_pgs:
1112 self.log.info('Skipping root %s (pools %s), total pgs %d '
1113 '< minimum %d (%d per osd)',
1114 root, pools,
1115 best_pe.total_by_root[root][key],
1116 min_pgs, min_pg_per_osd)
1117 continue
1118 self.log.info('Balancing root %s (pools %s) by %s' %
1119 (root, pools, key))
1120 target = best_pe.target_by_root[root]
1121 actual = best_pe.actual_by_root[root][key]
1122 queue = sorted(actual.keys(),
1123 key=lambda osd: -abs(target[osd] - actual[osd]))
1124 for osd in queue:
1125 if orig_osd_weight[osd] == 0:
1126 self.log.debug('skipping out osd.%d', osd)
1127 else:
1128 deviation = target[osd] - actual[osd]
1129 if deviation == 0:
1130 break
1131 self.log.debug('osd.%d deviation %f', osd, deviation)
1132 weight = best_ws[osd]
1133 ow = orig_osd_weight[osd]
1134 if actual[osd] > 0:
1135 calc_weight = target[osd] / actual[osd] * weight * ow
1136 else:
1137 # for newly created osds, reset calc_weight at target value
1138 # this way weight-set will end up absorbing *step* of its
1139 # target (final) value at the very beginning and slowly catch up later.
1140 # note that if this turns out causing too many misplaced
1141 # pgs, then we'll reduce step and retry
1142 calc_weight = target[osd]
1143 new_weight = weight * (1.0 - step) + calc_weight * step
1144 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
1145 new_weight)
1146 next_ws[osd] = new_weight
1147 if ow < 1.0:
1148 new_ow = min(1.0, max(step + (1.0 - step) * ow,
1149 ow + .005))
1150 self.log.debug('Reweight osd.%d reweight %f -> %f',
1151 osd, ow, new_ow)
1152 next_ow[osd] = new_ow
1153
1154 # normalize weights under this root
1155 root_weight = crush.get_item_weight(pe.root_ids[root])
1156 root_sum = sum(b for a, b in next_ws.items()
1157 if a in target.keys())
1158 if root_sum > 0 and root_weight > 0:
1159 factor = root_sum / root_weight
1160 self.log.debug('normalizing root %s %d, weight %f, '
1161 'ws sum %f, factor %f',
1162 root, pe.root_ids[root], root_weight,
1163 root_sum, factor)
1164 for osd in actual.keys():
1165 next_ws[osd] = next_ws[osd] / factor
1166
1167 # recalc
1168 plan.compat_ws = copy.deepcopy(next_ws)
1169 next_ms = plan.final_state()
1170 next_pe = self.calc_eval(next_ms, plan.pools)
1171 next_misplaced = next_ms.calc_misplaced_from(ms)
1172 self.log.debug('Step result score %f -> %f, misplacing %f',
1173 best_pe.score, next_pe.score, next_misplaced)
1174
1175 if next_misplaced > max_misplaced:
1176 if best_pe.score < pe.score:
1177 self.log.debug('Step misplaced %f > max %f, stopping',
1178 next_misplaced, max_misplaced)
1179 break
1180 step /= 2.0
1181 next_ws = copy.deepcopy(best_ws)
1182 next_ow = copy.deepcopy(best_ow)
1183 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
1184 next_misplaced, max_misplaced, step)
1185 else:
1186 if next_pe.score > best_pe.score * 1.0001:
1187 bad_steps += 1
1188 if bad_steps < 5 and random.randint(0, 100) < 70:
1189 self.log.debug('Score got worse, taking another step')
1190 else:
1191 step /= 2.0
1192 next_ws = copy.deepcopy(best_ws)
1193 next_ow = copy.deepcopy(best_ow)
1194 self.log.debug('Score got worse, trying smaller step %f',
1195 step)
1196 else:
1197 bad_steps = 0
1198 best_pe = next_pe
1199 best_ws = copy.deepcopy(next_ws)
1200 best_ow = copy.deepcopy(next_ow)
1201 if best_pe.score == 0:
1202 break
1203 left -= 1
1204
1205 # allow a small regression if we are phasing out osd weights
1206 fudge = 0.0
1207 if best_ow != orig_osd_weight:
1208 fudge = .001
1209
1210 if best_pe.score < pe.score + fudge:
1211 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
1212 plan.compat_ws = best_ws
1213 for osd, w in best_ow.items():
1214 if w != orig_osd_weight[osd]:
1215 self.log.debug('osd.%d reweight %f', osd, w)
1216 plan.osd_weights[osd] = w
1217 return 0, ''
1218 else:
1219 self.log.info('Failed to find further optimization, score %f',
1220 pe.score)
1221 plan.compat_ws = {}
1222 return -errno.EDOM, 'Unable to find further optimization, ' \
1223 'change balancer mode and retry might help'
1224
1225 def get_compat_weight_set_weights(self, ms: MappingState):
1226 have_choose_args = CRUSHMap.have_default_choose_args(ms.crush_dump)
1227 if have_choose_args:
1228 # get number of buckets in choose_args
1229 choose_args_len = len(CRUSHMap.get_default_choose_args(ms.crush_dump))
1230 if not have_choose_args or choose_args_len != len(ms.crush_dump['buckets']):
1231 # enable compat weight-set first
1232 self.log.debug('no choose_args or all buckets do not have weight-sets')
1233 self.log.debug('ceph osd crush weight-set create-compat')
1234 result = CommandResult('')
1235 self.send_command(result, 'mon', '', json.dumps({
1236 'prefix': 'osd crush weight-set create-compat',
1237 'format': 'json',
1238 }), '')
1239 r, outb, outs = result.wait()
1240 if r != 0:
1241 self.log.error('Error creating compat weight-set')
1242 return
1243
1244 result = CommandResult('')
1245 self.send_command(result, 'mon', '', json.dumps({
1246 'prefix': 'osd crush dump',
1247 'format': 'json',
1248 }), '')
1249 r, outb, outs = result.wait()
1250 if r != 0:
1251 self.log.error('Error dumping crush map')
1252 return
1253 try:
1254 crushmap = json.loads(outb)
1255 except json.JSONDecodeError:
1256 raise RuntimeError('unable to parse crush map')
1257 else:
1258 crushmap = ms.crush_dump
1259
1260 raw = CRUSHMap.get_default_choose_args(crushmap)
1261 weight_set = {}
1262 for b in raw:
1263 bucket = None
1264 for t in crushmap['buckets']:
1265 if t['id'] == b['bucket_id']:
1266 bucket = t
1267 break
1268 if not bucket:
1269 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
1270 self.log.debug('bucket items %s' % bucket['items'])
1271 self.log.debug('weight set %s' % b['weight_set'][0])
1272 if len(bucket['items']) != len(b['weight_set'][0]):
1273 raise RuntimeError('weight-set size does not match bucket items')
1274 for pos in range(len(bucket['items'])):
1275 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
1276
1277 self.log.debug('weight_set weights %s' % weight_set)
1278 return weight_set
1279
1280 def do_crush(self) -> None:
1281 self.log.info('do_crush (not yet implemented)')
1282
1283 def do_osd_weight(self) -> None:
1284 self.log.info('do_osd_weight (not yet implemented)')
1285
1286 def execute(self, plan: Plan) -> Tuple[int, str]:
1287 self.log.info('Executing plan %s' % plan.name)
1288
1289 commands = []
1290
1291 # compat weight-set
1292 if len(plan.compat_ws):
1293 ms_plan = cast(MsPlan, plan)
1294 if not CRUSHMap.have_default_choose_args(ms_plan.initial.crush_dump):
1295 self.log.debug('ceph osd crush weight-set create-compat')
1296 result = CommandResult('')
1297 self.send_command(result, 'mon', '', json.dumps({
1298 'prefix': 'osd crush weight-set create-compat',
1299 'format': 'json',
1300 }), '')
1301 r, outb, outs = result.wait()
1302 if r != 0:
1303 self.log.error('Error creating compat weight-set')
1304 return r, outs
1305
1306 for osd, weight in plan.compat_ws.items():
1307 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1308 osd, weight)
1309 result = CommandResult('')
1310 self.send_command(result, 'mon', '', json.dumps({
1311 'prefix': 'osd crush weight-set reweight-compat',
1312 'format': 'json',
1313 'item': 'osd.%d' % osd,
1314 'weight': [weight],
1315 }), '')
1316 commands.append(result)
1317
1318 # new_weight
1319 reweightn = {}
1320 for osd, weight in plan.osd_weights.items():
1321 reweightn[str(osd)] = str(int(weight * float(0x10000)))
1322 if len(reweightn):
1323 self.log.info('ceph osd reweightn %s', reweightn)
1324 result = CommandResult('')
1325 self.send_command(result, 'mon', '', json.dumps({
1326 'prefix': 'osd reweightn',
1327 'format': 'json',
1328 'weights': json.dumps(reweightn),
1329 }), '')
1330 commands.append(result)
1331
1332 # upmap
1333 incdump = plan.inc.dump()
1334 for item in incdump.get('new_pg_upmap', []):
1335 self.log.info('ceph osd pg-upmap %s mappings %s', item['pgid'],
1336 item['osds'])
1337 result = CommandResult('foo')
1338 self.send_command(result, 'mon', '', json.dumps({
1339 'prefix': 'osd pg-upmap',
1340 'format': 'json',
1341 'pgid': item['pgid'],
1342 'id': item['osds'],
1343 }), 'foo')
1344 commands.append(result)
1345
1346 for pgid in incdump.get('old_pg_upmap', []):
1347 self.log.info('ceph osd rm-pg-upmap %s', pgid)
1348 result = CommandResult('foo')
1349 self.send_command(result, 'mon', '', json.dumps({
1350 'prefix': 'osd rm-pg-upmap',
1351 'format': 'json',
1352 'pgid': pgid,
1353 }), 'foo')
1354 commands.append(result)
1355
1356 for item in incdump.get('new_pg_upmap_items', []):
1357 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
1358 item['mappings'])
1359 osdlist = []
1360 for m in item['mappings']:
1361 osdlist += [m['from'], m['to']]
1362 result = CommandResult('foo')
1363 self.send_command(result, 'mon', '', json.dumps({
1364 'prefix': 'osd pg-upmap-items',
1365 'format': 'json',
1366 'pgid': item['pgid'],
1367 'id': osdlist,
1368 }), 'foo')
1369 commands.append(result)
1370
1371 for pgid in incdump.get('old_pg_upmap_items', []):
1372 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
1373 result = CommandResult('foo')
1374 self.send_command(result, 'mon', '', json.dumps({
1375 'prefix': 'osd rm-pg-upmap-items',
1376 'format': 'json',
1377 'pgid': pgid,
1378 }), 'foo')
1379 commands.append(result)
1380
1381 # wait for commands
1382 self.log.debug('commands %s' % commands)
1383 for result in commands:
1384 r, outb, outs = result.wait()
1385 if r != 0:
1386 self.log.error('execute error: r = %d, detail = %s' % (r, outs))
1387 return r, outs
1388 self.log.debug('done')
1389 return 0, ''
1390
1391 def gather_telemetry(self) -> Dict[str, Any]:
1392 return {
1393 'active': self.active,
1394 'mode': self.mode,
1395 }