]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
d313bc4de848dc801a79ac9582a4dd2276210926
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
1 """
2 Balance PG distribution across OSDs.
3 """
4
5 import copy
6 import enum
7 import errno
8 import json
9 import math
10 import random
11 import time
12 from mgr_module import CLIReadCommand, CLICommand, CommandResult, MgrModule, Option, OSDMap
13 from threading import Event
14 from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, Union
15 from mgr_module import CRUSHMap
16 import datetime
17
18 TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
19
20
21 class MappingState:
22 def __init__(self, osdmap, raw_pg_stats, raw_pool_stats, desc=''):
23 self.desc = desc
24 self.osdmap = osdmap
25 self.osdmap_dump = self.osdmap.dump()
26 self.crush = osdmap.get_crush()
27 self.crush_dump = self.crush.dump()
28 self.raw_pg_stats = raw_pg_stats
29 self.raw_pool_stats = raw_pool_stats
30 self.pg_stat = {
31 i['pgid']: i['stat_sum'] for i in raw_pg_stats.get('pg_stats', [])
32 }
33 osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
34 pg_poolids = [p['poolid'] for p in raw_pool_stats.get('pool_stats', [])]
35 self.poolids = set(osd_poolids) & set(pg_poolids)
36 self.pg_up = {}
37 self.pg_up_by_poolid = {}
38 for poolid in self.poolids:
39 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
40 for a, b in self.pg_up_by_poolid[poolid].items():
41 self.pg_up[a] = b
42
43 def calc_misplaced_from(self, other_ms):
44 num = len(other_ms.pg_up)
45 misplaced = 0
46 for pgid, before in other_ms.pg_up.items():
47 if before != self.pg_up.get(pgid, []):
48 misplaced += 1
49 if num > 0:
50 return float(misplaced) / float(num)
51 return 0.0
52
53
54 class Mode(enum.Enum):
55 none = 'none'
56 crush_compat = 'crush-compat'
57 upmap = 'upmap'
58
59
60 class Plan(object):
61 def __init__(self, name, mode, osdmap, pools):
62 self.name = name
63 self.mode = mode
64 self.osdmap = osdmap
65 self.osdmap_dump = osdmap.dump()
66 self.pools = pools
67 self.osd_weights = {}
68 self.compat_ws = {}
69 self.inc = osdmap.new_incremental()
70 self.pg_status = {}
71
72 def dump(self) -> str:
73 return json.dumps(self.inc.dump(), indent=4, sort_keys=True)
74
75 def show(self) -> str:
76 return 'upmap plan'
77
78
79 class MsPlan(Plan):
80 """
81 Plan with a preloaded MappingState member.
82 """
83
84 def __init__(self, name: str, mode: str, ms: MappingState, pools: List[str]) -> None:
85 super(MsPlan, self).__init__(name, mode, ms.osdmap, pools)
86 self.initial = ms
87
88 def final_state(self) -> MappingState:
89 self.inc.set_osd_reweights(self.osd_weights)
90 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
91 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
92 self.initial.raw_pg_stats,
93 self.initial.raw_pool_stats,
94 'plan %s final' % self.name)
95
96 def show(self) -> str:
97 ls = []
98 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
99 ls.append('# starting crush version %d' %
100 self.initial.osdmap.get_crush_version())
101 ls.append('# mode %s' % self.mode)
102 if len(self.compat_ws) and \
103 not CRUSHMap.have_default_choose_args(self.initial.crush_dump):
104 ls.append('ceph osd crush weight-set create-compat')
105 for osd, weight in self.compat_ws.items():
106 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
107 (osd, weight))
108 for osd, weight in self.osd_weights.items():
109 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
110 incdump = self.inc.dump()
111 for pgid in incdump.get('old_pg_upmap_items', []):
112 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
113 for item in incdump.get('new_pg_upmap_items', []):
114 osdlist = []
115 for m in item['mappings']:
116 osdlist += [m['from'], m['to']]
117 ls.append('ceph osd pg-upmap-items %s %s' %
118 (item['pgid'], ' '.join([str(a) for a in osdlist])))
119 return '\n'.join(ls)
120
121
122 class Eval:
123 def __init__(self, ms: MappingState):
124 self.ms = ms
125 self.root_ids: Dict[str, int] = {} # root name -> id
126 self.pool_name: Dict[str, str] = {} # pool id -> pool name
127 self.pool_id: Dict[str, int] = {} # pool name -> id
128 self.pool_roots: Dict[str, List[str]] = {} # pool name -> root name
129 self.root_pools: Dict[str, List[str]] = {} # root name -> pools
130 self.target_by_root: Dict[str, Dict[int, float]] = {} # root name -> target weight map
131 self.count_by_pool: Dict[str, dict] = {}
132 self.count_by_root: Dict[str, dict] = {}
133 self.actual_by_pool: Dict[str, dict] = {} # pool -> by_* -> actual weight map
134 self.actual_by_root: Dict[str, dict] = {} # pool -> by_* -> actual weight map
135 self.total_by_pool: Dict[str, dict] = {} # pool -> by_* -> total
136 self.total_by_root: Dict[str, dict] = {} # root -> by_* -> total
137 self.stats_by_pool: Dict[str, dict] = {} # pool -> by_* -> stddev or avg -> value
138 self.stats_by_root: Dict[str, dict] = {} # root -> by_* -> stddev or avg -> value
139
140 self.score_by_pool: Dict[str, float] = {}
141 self.score_by_root: Dict[str, Dict[str, float]] = {}
142
143 self.score = 0.0
144
145 def show(self, verbose: bool = False) -> str:
146 if verbose:
147 r = self.ms.desc + '\n'
148 r += 'target_by_root %s\n' % self.target_by_root
149 r += 'actual_by_pool %s\n' % self.actual_by_pool
150 r += 'actual_by_root %s\n' % self.actual_by_root
151 r += 'count_by_pool %s\n' % self.count_by_pool
152 r += 'count_by_root %s\n' % self.count_by_root
153 r += 'total_by_pool %s\n' % self.total_by_pool
154 r += 'total_by_root %s\n' % self.total_by_root
155 r += 'stats_by_root %s\n' % self.stats_by_root
156 r += 'score_by_pool %s\n' % self.score_by_pool
157 r += 'score_by_root %s\n' % self.score_by_root
158 else:
159 r = self.ms.desc + ' '
160 r += 'score %f (lower is better)\n' % self.score
161 return r
162
163 def calc_stats(self, count, target, total):
164 num = max(len(target), 1)
165 r: Dict[str, Dict[str, Union[int, float]]] = {}
166 for t in ('pgs', 'objects', 'bytes'):
167 if total[t] == 0:
168 r[t] = {
169 'max': 0,
170 'min': 0,
171 'avg': 0,
172 'stddev': 0,
173 'sum_weight': 0,
174 'score': 0,
175 }
176 continue
177
178 avg = float(total[t]) / float(num)
179 dev = 0.0
180
181 # score is a measure of how uneven the data distribution is.
182 # score lies between [0, 1), 0 means perfect distribution.
183 score = 0.0
184 sum_weight = 0.0
185
186 for k, v in count[t].items():
187 # adjust/normalize by weight
188 if target[k]:
189 adjusted = float(v) / target[k] / float(num)
190 else:
191 adjusted = 0.0
192
193 # Overweighted devices and their weights are factors to calculate reweight_urgency.
194 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
195 # situation than one 10% overfilled with 5 2% underfilled devices
196 if adjusted > avg:
197 '''
198 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
199 x = (adjusted - avg)/avg.
200 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
201 To bring range of F(x) in range [0, 1), we need to make the above modification.
202
203 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
204 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
205 2. A larger value of x, should imply more urgency to reweight.
206 3. Also, the difference between F(x) when x is large, should be minimal.
207 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
208
209 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
210
211 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
212 '''
213 score += target[k] * (math.erf(((adjusted - avg) / avg) / math.sqrt(2.0)))
214 sum_weight += target[k]
215 dev += (avg - adjusted) * (avg - adjusted)
216 stddev = math.sqrt(dev / float(max(num - 1, 1)))
217 score = score / max(sum_weight, 1)
218 r[t] = {
219 'max': max(count[t].values()),
220 'min': min(count[t].values()),
221 'avg': avg,
222 'stddev': stddev,
223 'sum_weight': sum_weight,
224 'score': score,
225 }
226 return r
227
228
229 class Module(MgrModule):
230 MODULE_OPTIONS = [
231 Option(name='active',
232 type='bool',
233 default=True,
234 desc='automatically balance PGs across cluster',
235 runtime=True),
236 Option(name='begin_time',
237 type='str',
238 default='0000',
239 desc='beginning time of day to automatically balance',
240 long_desc='This is a time of day in the format HHMM.',
241 runtime=True),
242 Option(name='end_time',
243 type='str',
244 default='2400',
245 desc='ending time of day to automatically balance',
246 long_desc='This is a time of day in the format HHMM.',
247 runtime=True),
248 Option(name='begin_weekday',
249 type='uint',
250 default=0,
251 min=0,
252 max=7,
253 desc='Restrict automatic balancing to this day of the week or later',
254 long_desc='0 or 7 = Sunday, 1 = Monday, etc.',
255 runtime=True),
256 Option(name='end_weekday',
257 type='uint',
258 default=7,
259 min=0,
260 max=7,
261 desc='Restrict automatic balancing to days of the week earlier than this',
262 long_desc='0 or 7 = Sunday, 1 = Monday, etc.',
263 runtime=True),
264 Option(name='crush_compat_max_iterations',
265 type='uint',
266 default=25,
267 min=1,
268 max=250,
269 desc='maximum number of iterations to attempt optimization',
270 runtime=True),
271 Option(name='crush_compat_metrics',
272 type='str',
273 default='pgs,objects,bytes',
274 desc='metrics with which to calculate OSD utilization',
275 long_desc='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
276 runtime=True),
277 Option(name='crush_compat_step',
278 type='float',
279 default=.5,
280 min=.001,
281 max=.999,
282 desc='aggressiveness of optimization',
283 long_desc='.99 is very aggressive, .01 is less aggressive',
284 runtime=True),
285 Option(name='min_score',
286 type='float',
287 default=0,
288 desc='minimum score, below which no optimization is attempted',
289 runtime=True),
290 Option(name='mode',
291 desc='Balancer mode',
292 default='upmap',
293 enum_allowed=['none', 'crush-compat', 'upmap'],
294 runtime=True),
295 Option(name='sleep_interval',
296 type='secs',
297 default=60,
298 desc='how frequently to wake up and attempt optimization',
299 runtime=True),
300 Option(name='upmap_max_optimizations',
301 type='uint',
302 default=10,
303 desc='maximum upmap optimizations to make per attempt',
304 runtime=True),
305 Option(name='upmap_max_deviation',
306 type='int',
307 default=5,
308 min=1,
309 desc='deviation below which no optimization is attempted',
310 long_desc='If the number of PGs are within this count then no optimization is attempted',
311 runtime=True),
312 Option(name='pool_ids',
313 type='str',
314 default='',
315 desc='pools which the automatic balancing will be limited to',
316 runtime=True)
317 ]
318
319 active = False
320 run = True
321 plans: Dict[str, Plan] = {}
322 mode = ''
323 optimizing = False
324 last_optimize_started = ''
325 last_optimize_duration = ''
326 optimize_result = ''
327 success_string = 'Optimization plan created successfully'
328 in_progress_string = 'in progress'
329
330 def __init__(self, *args: Any, **kwargs: Any) -> None:
331 super(Module, self).__init__(*args, **kwargs)
332 self.event = Event()
333
334 @CLIReadCommand('balancer status')
335 def show_status(self) -> Tuple[int, str, str]:
336 """
337 Show balancer status
338 """
339 s = {
340 'plans': list(self.plans.keys()),
341 'active': self.active,
342 'last_optimize_started': self.last_optimize_started,
343 'last_optimize_duration': self.last_optimize_duration,
344 'optimize_result': self.optimize_result,
345 'mode': self.get_module_option('mode'),
346 }
347 return (0, json.dumps(s, indent=4, sort_keys=True), '')
348
349 @CLICommand('balancer mode')
350 def set_mode(self, mode: Mode) -> Tuple[int, str, str]:
351 """
352 Set balancer mode
353 """
354 if mode == Mode.upmap:
355 min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
356 if min_compat_client < 'luminous': # works well because version is alphabetized..
357 warn = ('min_compat_client "%s" '
358 '< "luminous", which is required for pg-upmap. '
359 'Try "ceph osd set-require-min-compat-client luminous" '
360 'before enabling this mode' % min_compat_client)
361 return (-errno.EPERM, '', warn)
362 elif mode == Mode.crush_compat:
363 ms = MappingState(self.get_osdmap(),
364 self.get("pg_stats"),
365 self.get("pool_stats"),
366 'initialize compat weight-set')
367 self.get_compat_weight_set_weights(ms) # ignore error
368 self.set_module_option('mode', mode.value)
369 return (0, '', '')
370
371 @CLICommand('balancer on')
372 def on(self) -> Tuple[int, str, str]:
373 """
374 Enable automatic balancing
375 """
376 if not self.active:
377 self.set_module_option('active', 'true')
378 self.active = True
379 self.event.set()
380 return (0, '', '')
381
382 @CLICommand('balancer off')
383 def off(self) -> Tuple[int, str, str]:
384 """
385 Disable automatic balancing
386 """
387 if self.active:
388 self.set_module_option('active', 'false')
389 self.active = False
390 self.event.set()
391 return (0, '', '')
392
393 @CLIReadCommand('balancer pool ls')
394 def pool_ls(self) -> Tuple[int, str, str]:
395 """
396 List automatic balancing pools
397
398 Note that empty list means all existing pools will be automatic balancing targets,
399 which is the default behaviour of balancer.
400 """
401 pool_ids = cast(str, self.get_module_option('pool_ids'))
402 if pool_ids == '':
403 return (0, '', '')
404 pool_ids = [int(p) for p in pool_ids.split(',')]
405 pool_name_by_id = dict((p['pool'], p['pool_name'])
406 for p in self.get_osdmap().dump().get('pools', []))
407 should_prune = False
408 final_ids: List[int] = []
409 final_names = []
410 for p in pool_ids:
411 if p in pool_name_by_id:
412 final_ids.append(p)
413 final_names.append(pool_name_by_id[p])
414 else:
415 should_prune = True
416 if should_prune: # some pools were gone, prune
417 self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids))
418 return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
419
420 @CLICommand('balancer pool add')
421 def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]:
422 """
423 Enable automatic balancing for specific pools
424 """
425 raw_names = pools
426 pool_id_by_name = dict((p['pool_name'], p['pool'])
427 for p in self.get_osdmap().dump().get('pools', []))
428 invalid_names = [p for p in raw_names if p not in pool_id_by_name]
429 if invalid_names:
430 return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
431 to_add = set(str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name)
432 pool_ids = cast(str, self.get_module_option('pool_ids'))
433 existing = set(pool_ids.split(',') if pool_ids else [])
434 final = to_add | existing
435 self.set_module_option('pool_ids', ','.join(final))
436 return (0, '', '')
437
438 @CLICommand('balancer pool rm')
439 def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]:
440 """
441 Disable automatic balancing for specific pools
442 """
443 raw_names = pools
444 existing = cast(str, self.get_module_option('pool_ids'))
445 if existing == '': # for idempotence
446 return (0, '', '')
447 existing = existing.split(',')
448 osdmap = self.get_osdmap()
449 pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
450 pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
451 final = [p for p in existing if p in pool_ids]
452 to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
453 final = set(final) - set(to_delete)
454 self.set_module_option('pool_ids', ','.join(final))
455 return (0, '', '')
456
457 def _state_from_option(self, option: Optional[str] = None) -> Tuple[MappingState, List[str]]:
458 pools = []
459 if option is None:
460 ms = MappingState(self.get_osdmap(),
461 self.get("pg_stats"),
462 self.get("pool_stats"),
463 'current cluster')
464 elif option in self.plans:
465 plan = self.plans.get(option)
466 assert plan
467 pools = plan.pools
468 if plan.mode == 'upmap':
469 # Note that for upmap, to improve the efficiency,
470 # we use a basic version of Plan without keeping the obvious
471 # *redundant* MS member.
472 # Hence ms might not be accurate here since we are basically
473 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
474 # It should not be a big deal though..
475 ms = MappingState(plan.osdmap,
476 self.get("pg_stats"),
477 self.get("pool_stats"),
478 f'plan "{plan.name}"')
479 else:
480 ms = cast(MsPlan, plan).final_state()
481 else:
482 # not a plan, does it look like a pool?
483 osdmap = self.get_osdmap()
484 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
485 if option not in valid_pool_names:
486 raise ValueError(f'option "{option}" not a plan or a pool')
487 pools.append(option)
488 ms = MappingState(osdmap,
489 self.get("pg_stats"),
490 self.get("pool_stats"),
491 f'pool "{option}"')
492 return ms, pools
493
494 @CLIReadCommand('balancer eval-verbose')
495 def plan_eval_verbose(self, option: Optional[str] = None):
496 """
497 Evaluate data distribution for the current cluster or specific pool or specific
498 plan (verbosely)
499 """
500 try:
501 ms, pools = self._state_from_option(option)
502 return (0, self.evaluate(ms, pools, verbose=True), '')
503 except ValueError as e:
504 return (-errno.EINVAL, '', str(e))
505
506 @CLIReadCommand('balancer eval')
507 def plan_eval_brief(self, option: Optional[str] = None):
508 """
509 Evaluate data distribution for the current cluster or specific pool or specific plan
510 """
511 try:
512 ms, pools = self._state_from_option(option)
513 return (0, self.evaluate(ms, pools, verbose=False), '')
514 except ValueError as e:
515 return (-errno.EINVAL, '', str(e))
516
517 @CLIReadCommand('balancer optimize')
518 def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]:
519 """
520 Run optimizer to create a new plan
521 """
522 # The GIL can be release by the active balancer, so disallow when active
523 if self.active:
524 return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
525 if self.optimizing:
526 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
527 osdmap = self.get_osdmap()
528 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
529 invalid_pool_names = []
530 for p in pools:
531 if p not in valid_pool_names:
532 invalid_pool_names.append(p)
533 if len(invalid_pool_names):
534 return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
535 plan_ = self.plan_create(plan, osdmap, pools)
536 self.last_optimize_started = time.asctime(time.localtime())
537 self.optimize_result = self.in_progress_string
538 start = time.time()
539 r, detail = self.optimize(plan_)
540 end = time.time()
541 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
542 if r == 0:
543 # Add plan if an optimization was created
544 self.optimize_result = self.success_string
545 self.plans[plan] = plan_
546 else:
547 self.optimize_result = detail
548 return (r, '', detail)
549
550 @CLIReadCommand('balancer show')
551 def plan_show(self, plan: str) -> Tuple[int, str, str]:
552 """
553 Show details of an optimization plan
554 """
555 plan_ = self.plans.get(plan)
556 if not plan_:
557 return (-errno.ENOENT, '', f'plan {plan} not found')
558 return (0, plan_.show(), '')
559
560 @CLICommand('balancer rm')
561 def plan_rm(self, plan: str) -> Tuple[int, str, str]:
562 """
563 Discard an optimization plan
564 """
565 if plan in self.plans:
566 del self.plans[plan]
567 return (0, '', '')
568
569 @CLICommand('balancer reset')
570 def plan_reset(self) -> Tuple[int, str, str]:
571 """
572 Discard all optimization plans
573 """
574 self.plans = {}
575 return (0, '', '')
576
577 @CLIReadCommand('balancer dump')
578 def plan_dump(self, plan: str) -> Tuple[int, str, str]:
579 """
580 Show an optimization plan
581 """
582 plan_ = self.plans.get(plan)
583 if not plan_:
584 return -errno.ENOENT, '', f'plan {plan} not found'
585 else:
586 return (0, plan_.dump(), '')
587
588 @CLIReadCommand('balancer ls')
589 def plan_ls(self) -> Tuple[int, str, str]:
590 """
591 List all plans
592 """
593 return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
594
595 @CLIReadCommand('balancer execute')
596 def plan_execute(self, plan: str) -> Tuple[int, str, str]:
597 """
598 Execute an optimization plan
599 """
600 # The GIL can be release by the active balancer, so disallow when active
601 if self.active:
602 return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
603 if self.optimizing:
604 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
605 plan_ = self.plans.get(plan)
606 if not plan_:
607 return (-errno.ENOENT, '', f'plan {plan} not found')
608 r, detail = self.execute(plan_)
609 self.plan_rm(plan)
610 return (r, '', detail)
611
612 def shutdown(self) -> None:
613 self.log.info('Stopping')
614 self.run = False
615 self.event.set()
616
617 def time_permit(self) -> bool:
618 local_time = time.localtime()
619 time_of_day = time.strftime('%H%M', local_time)
620 weekday = (local_time.tm_wday + 1) % 7 # be compatible with C
621 permit = False
622
623 begin_time = cast(str, self.get_module_option('begin_time'))
624 end_time = cast(str, self.get_module_option('end_time'))
625 if begin_time <= end_time:
626 permit = begin_time <= time_of_day < end_time
627 else:
628 permit = time_of_day >= begin_time or time_of_day < end_time
629 if not permit:
630 self.log.debug("should run between %s - %s, now %s, skipping",
631 begin_time, end_time, time_of_day)
632 return False
633
634 begin_weekday = cast(int, self.get_module_option('begin_weekday'))
635 end_weekday = cast(int, self.get_module_option('end_weekday'))
636 if begin_weekday <= end_weekday:
637 permit = begin_weekday <= weekday < end_weekday
638 else:
639 permit = weekday >= begin_weekday or weekday < end_weekday
640 if not permit:
641 self.log.debug("should run between weekday %d - %d, now %d, skipping",
642 begin_weekday, end_weekday, weekday)
643 return False
644
645 return True
646
647 def serve(self) -> None:
648 self.log.info('Starting')
649 while self.run:
650 self.active = cast(bool, self.get_module_option('active'))
651 sleep_interval = cast(float, self.get_module_option('sleep_interval'))
652 self.log.debug('Waking up [%s, now %s]',
653 "active" if self.active else "inactive",
654 time.strftime(TIME_FORMAT, time.localtime()))
655 if self.active and self.time_permit():
656 self.log.debug('Running')
657 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
658 osdmap = self.get_osdmap()
659 pool_ids = cast(str, self.get_module_option('pool_ids'))
660 if pool_ids:
661 allow = [int(p) for p in pool_ids.split(',')]
662 else:
663 allow = []
664 final: List[str] = []
665 if allow:
666 pools = osdmap.dump().get('pools', [])
667 valid = [p['pool'] for p in pools]
668 ids = set(allow) & set(valid)
669 if set(allow) - set(valid): # some pools were gone, prune
670 self.set_module_option('pool_ids', ','.join(str(p) for p in ids))
671 pool_name_by_id = dict((p['pool'], p['pool_name']) for p in pools)
672 final = [pool_name_by_id[p] for p in ids if p in pool_name_by_id]
673 plan = self.plan_create(name, osdmap, final)
674 self.optimizing = True
675 self.last_optimize_started = time.asctime(time.localtime())
676 self.optimize_result = self.in_progress_string
677 start = time.time()
678 r, detail = self.optimize(plan)
679 end = time.time()
680 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
681 if r == 0:
682 self.optimize_result = self.success_string
683 self.execute(plan)
684 else:
685 self.optimize_result = detail
686 self.optimizing = False
687 self.log.debug('Sleeping for %d', sleep_interval)
688 self.event.wait(sleep_interval)
689 self.event.clear()
690
691 def plan_create(self, name: str, osdmap: OSDMap, pools: List[str]) -> Plan:
692 mode = cast(str, self.get_module_option('mode'))
693 if mode == 'upmap':
694 # drop unnecessary MS member for upmap mode.
695 # this way we could effectively eliminate the usage of a
696 # complete pg_stats, which can become horribly inefficient
697 # as pg_num grows..
698 plan = Plan(name, mode, osdmap, pools)
699 else:
700 plan = MsPlan(name,
701 mode,
702 MappingState(osdmap,
703 self.get("pg_stats"),
704 self.get("pool_stats"),
705 'plan %s initial' % name),
706 pools)
707 return plan
708
709 def calc_eval(self, ms: MappingState, pools: List[str]) -> Eval:
710 pe = Eval(ms)
711 pool_rule = {}
712 pool_info = {}
713 for p in ms.osdmap_dump.get('pools', []):
714 if len(pools) and p['pool_name'] not in pools:
715 continue
716 # skip dead or not-yet-ready pools too
717 if p['pool'] not in ms.poolids:
718 continue
719 pe.pool_name[p['pool']] = p['pool_name']
720 pe.pool_id[p['pool_name']] = p['pool']
721 pool_rule[p['pool_name']] = p['crush_rule']
722 pe.pool_roots[p['pool_name']] = []
723 pool_info[p['pool_name']] = p
724 if len(pool_info) == 0:
725 return pe
726 self.log.debug('pool_name %s' % pe.pool_name)
727 self.log.debug('pool_id %s' % pe.pool_id)
728 self.log.debug('pools %s' % pools)
729 self.log.debug('pool_rule %s' % pool_rule)
730
731 osd_weight = {a['osd']: a['weight']
732 for a in ms.osdmap_dump.get('osds', []) if a['weight'] > 0}
733
734 # get expected distributions by root
735 actual_by_root: Dict[str, Dict[str, dict]] = {}
736 rootids = ms.crush.find_takes()
737 roots = []
738 for rootid in rootids:
739 ls = ms.osdmap.get_pools_by_take(rootid)
740 want = []
741 # find out roots associating with pools we are passed in
742 for candidate in ls:
743 if candidate in pe.pool_name:
744 want.append(candidate)
745 if len(want) == 0:
746 continue
747 root = ms.crush.get_item_name(rootid)
748 pe.root_pools[root] = []
749 for poolid in want:
750 pe.pool_roots[pe.pool_name[poolid]].append(root)
751 pe.root_pools[root].append(pe.pool_name[poolid])
752 pe.root_ids[root] = rootid
753 roots.append(root)
754 weight_map = ms.crush.get_take_weight_osd_map(rootid)
755 adjusted_map = {
756 osd: cw * osd_weight[osd]
757 for osd, cw in weight_map.items() if osd in osd_weight and cw > 0
758 }
759 sum_w = sum(adjusted_map.values())
760 assert len(adjusted_map) == 0 or sum_w > 0
761 pe.target_by_root[root] = {osd: w / sum_w
762 for osd, w in adjusted_map.items()}
763 actual_by_root[root] = {
764 'pgs': {},
765 'objects': {},
766 'bytes': {},
767 }
768 for osd in pe.target_by_root[root]:
769 actual_by_root[root]['pgs'][osd] = 0
770 actual_by_root[root]['objects'][osd] = 0
771 actual_by_root[root]['bytes'][osd] = 0
772 pe.total_by_root[root] = {
773 'pgs': 0,
774 'objects': 0,
775 'bytes': 0,
776 }
777 self.log.debug('pool_roots %s' % pe.pool_roots)
778 self.log.debug('root_pools %s' % pe.root_pools)
779 self.log.debug('target_by_root %s' % pe.target_by_root)
780
781 # pool and root actual
782 for pool, pi in pool_info.items():
783 poolid = pi['pool']
784 pm = ms.pg_up_by_poolid[poolid]
785 pgs = 0
786 objects = 0
787 bytes = 0
788 pgs_by_osd = {}
789 objects_by_osd = {}
790 bytes_by_osd = {}
791 for pgid, up in pm.items():
792 for osd in [int(osd) for osd in up]:
793 if osd == CRUSHMap.ITEM_NONE:
794 continue
795 if osd not in pgs_by_osd:
796 pgs_by_osd[osd] = 0
797 objects_by_osd[osd] = 0
798 bytes_by_osd[osd] = 0
799 pgs_by_osd[osd] += 1
800 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
801 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
802 # pick a root to associate this pg instance with.
803 # note that this is imprecise if the roots have
804 # overlapping children.
805 # FIXME: divide bytes by k for EC pools.
806 for root in pe.pool_roots[pool]:
807 if osd in pe.target_by_root[root]:
808 actual_by_root[root]['pgs'][osd] += 1
809 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
810 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
811 pgs += 1
812 objects += ms.pg_stat[pgid]['num_objects']
813 bytes += ms.pg_stat[pgid]['num_bytes']
814 pe.total_by_root[root]['pgs'] += 1
815 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
816 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
817 break
818 pe.count_by_pool[pool] = {
819 'pgs': {
820 k: v
821 for k, v in pgs_by_osd.items()
822 },
823 'objects': {
824 k: v
825 for k, v in objects_by_osd.items()
826 },
827 'bytes': {
828 k: v
829 for k, v in bytes_by_osd.items()
830 },
831 }
832 pe.actual_by_pool[pool] = {
833 'pgs': {
834 k: float(v) / float(max(pgs, 1))
835 for k, v in pgs_by_osd.items()
836 },
837 'objects': {
838 k: float(v) / float(max(objects, 1))
839 for k, v in objects_by_osd.items()
840 },
841 'bytes': {
842 k: float(v) / float(max(bytes, 1))
843 for k, v in bytes_by_osd.items()
844 },
845 }
846 pe.total_by_pool[pool] = {
847 'pgs': pgs,
848 'objects': objects,
849 'bytes': bytes,
850 }
851 for root in pe.total_by_root:
852 pe.count_by_root[root] = {
853 'pgs': {
854 k: float(v)
855 for k, v in actual_by_root[root]['pgs'].items()
856 },
857 'objects': {
858 k: float(v)
859 for k, v in actual_by_root[root]['objects'].items()
860 },
861 'bytes': {
862 k: float(v)
863 for k, v in actual_by_root[root]['bytes'].items()
864 },
865 }
866 pe.actual_by_root[root] = {
867 'pgs': {
868 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
869 for k, v in actual_by_root[root]['pgs'].items()
870 },
871 'objects': {
872 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
873 for k, v in actual_by_root[root]['objects'].items()
874 },
875 'bytes': {
876 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
877 for k, v in actual_by_root[root]['bytes'].items()
878 },
879 }
880 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
881 self.log.debug('actual_by_root %s' % pe.actual_by_root)
882
883 # average and stddev and score
884 pe.stats_by_root = {
885 a: pe.calc_stats(
886 b,
887 pe.target_by_root[a],
888 pe.total_by_root[a]
889 ) for a, b in pe.count_by_root.items()
890 }
891 self.log.debug('stats_by_root %s' % pe.stats_by_root)
892
893 # the scores are already normalized
894 pe.score_by_root = {
895 r: {
896 'pgs': pe.stats_by_root[r]['pgs']['score'],
897 'objects': pe.stats_by_root[r]['objects']['score'],
898 'bytes': pe.stats_by_root[r]['bytes']['score'],
899 } for r in pe.total_by_root.keys()
900 }
901 self.log.debug('score_by_root %s' % pe.score_by_root)
902
903 # get the list of score metrics, comma separated
904 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
905
906 # total score is just average of normalized stddevs
907 pe.score = 0.0
908 for r, vs in pe.score_by_root.items():
909 for k, v in vs.items():
910 if k in metrics:
911 pe.score += v
912 pe.score /= len(metrics) * len(roots)
913 return pe
914
915 def evaluate(self, ms: MappingState, pools: List[str], verbose: bool = False) -> str:
916 pe = self.calc_eval(ms, pools)
917 return pe.show(verbose=verbose)
918
919 def optimize(self, plan: Plan) -> Tuple[int, str]:
920 self.log.info('Optimize plan %s' % plan.name)
921 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
922 self.log.info('Mode %s, max misplaced %f' %
923 (plan.mode, max_misplaced))
924
925 info = self.get('pg_status')
926 unknown = info.get('unknown_pgs_ratio', 0.0)
927 degraded = info.get('degraded_ratio', 0.0)
928 inactive = info.get('inactive_pgs_ratio', 0.0)
929 misplaced = info.get('misplaced_ratio', 0.0)
930 plan.pg_status = info
931 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
932 unknown, degraded, inactive, misplaced)
933 if unknown > 0.0:
934 detail = 'Some PGs (%f) are unknown; try again later' % unknown
935 self.log.info(detail)
936 return -errno.EAGAIN, detail
937 elif degraded > 0.0:
938 detail = 'Some objects (%f) are degraded; try again later' % degraded
939 self.log.info(detail)
940 return -errno.EAGAIN, detail
941 elif inactive > 0.0:
942 detail = 'Some PGs (%f) are inactive; try again later' % inactive
943 self.log.info(detail)
944 return -errno.EAGAIN, detail
945 elif misplaced >= max_misplaced:
946 detail = 'Too many objects (%f > %f) are misplaced; ' \
947 'try again later' % (misplaced, max_misplaced)
948 self.log.info(detail)
949 return -errno.EAGAIN, detail
950 else:
951 if plan.mode == 'upmap':
952 return self.do_upmap(plan)
953 elif plan.mode == 'crush-compat':
954 return self.do_crush_compat(cast(MsPlan, plan))
955 elif plan.mode == 'none':
956 detail = 'Please do "ceph balancer mode" to choose a valid mode first'
957 self.log.info('Idle')
958 return -errno.ENOEXEC, detail
959 else:
960 detail = 'Unrecognized mode %s' % plan.mode
961 self.log.info(detail)
962 return -errno.EINVAL, detail
963
964 def do_upmap(self, plan: Plan) -> Tuple[int, str]:
965 self.log.info('do_upmap')
966 max_optimizations = cast(float, self.get_module_option('upmap_max_optimizations'))
967 max_deviation = cast(int, self.get_module_option('upmap_max_deviation'))
968 osdmap_dump = plan.osdmap_dump
969
970 if len(plan.pools):
971 pools = plan.pools
972 else: # all
973 pools = [str(i['pool_name']) for i in osdmap_dump.get('pools', [])]
974 if len(pools) == 0:
975 detail = 'No pools available'
976 self.log.info(detail)
977 return -errno.ENOENT, detail
978 # shuffle pool list so they all get equal (in)attention
979 random.shuffle(pools)
980 self.log.info('pools %s' % pools)
981
982 adjusted_pools = []
983 inc = plan.inc
984 total_did = 0
985 left = max_optimizations
986 pools_with_pg_merge = [p['pool_name'] for p in osdmap_dump.get('pools', [])
987 if p['pg_num'] > p['pg_num_target']]
988 crush_rule_by_pool_name = dict((p['pool_name'], p['crush_rule'])
989 for p in osdmap_dump.get('pools', []))
990 for pool in pools:
991 if pool not in crush_rule_by_pool_name:
992 self.log.info('pool %s does not exist' % pool)
993 continue
994 if pool in pools_with_pg_merge:
995 self.log.info('pool %s has pending PG(s) for merging, skipping for now' % pool)
996 continue
997 adjusted_pools.append(pool)
998 # shuffle so all pools get equal (in)attention
999 random.shuffle(adjusted_pools)
1000 pool_dump = osdmap_dump.get('pools', [])
1001 for pool in adjusted_pools:
1002 for p in pool_dump:
1003 if p['pool_name'] == pool:
1004 pool_id = p['pool']
1005 break
1006
1007 # note that here we deliberately exclude any scrubbing pgs too
1008 # since scrubbing activities have significant impacts on performance
1009 num_pg_active_clean = 0
1010 for p in plan.pg_status.get('pgs_by_pool_state', []):
1011 pgs_pool_id = p['pool_id']
1012 if pgs_pool_id != pool_id:
1013 continue
1014 for s in p['pg_state_counts']:
1015 if s['state_name'] == 'active+clean':
1016 num_pg_active_clean += s['count']
1017 break
1018 available = min(left, num_pg_active_clean)
1019 did = plan.osdmap.calc_pg_upmaps(inc, max_deviation, available, [pool])
1020 total_did += did
1021 left -= did
1022 if left <= 0:
1023 break
1024 self.log.info('prepared %d/%d changes' % (total_did, max_optimizations))
1025 if total_did == 0:
1026 return -errno.EALREADY, 'Unable to find further optimization, ' \
1027 'or pool(s) pg_num is decreasing, ' \
1028 'or distribution is already perfect'
1029 return 0, ''
1030
1031 def do_crush_compat(self, plan: MsPlan) -> Tuple[int, str]:
1032 self.log.info('do_crush_compat')
1033 max_iterations = cast(int, self.get_module_option('crush_compat_max_iterations'))
1034 if max_iterations < 1:
1035 return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
1036 step = cast(float, self.get_module_option('crush_compat_step'))
1037 if step <= 0 or step >= 1.0:
1038 return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
1039 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
1040 min_pg_per_osd = 2
1041
1042 ms = plan.initial
1043 osdmap = ms.osdmap
1044 crush = osdmap.get_crush()
1045 pe = self.calc_eval(ms, plan.pools)
1046 min_score_to_optimize = cast(float, self.get_module_option('min_score'))
1047 if pe.score <= min_score_to_optimize:
1048 if pe.score == 0:
1049 detail = 'Distribution is already perfect'
1050 else:
1051 detail = 'score %f <= min_score %f, will not optimize' \
1052 % (pe.score, min_score_to_optimize)
1053 self.log.info(detail)
1054 return -errno.EALREADY, detail
1055
1056 # get current osd reweights
1057 orig_osd_weight = {a['osd']: a['weight']
1058 for a in ms.osdmap_dump.get('osds', [])}
1059
1060 # get current compat weight-set weights
1061 orig_ws = self.get_compat_weight_set_weights(ms)
1062 if not orig_ws:
1063 return -errno.EAGAIN, 'compat weight-set not available'
1064 orig_ws = {a: b for a, b in orig_ws.items() if a >= 0}
1065
1066 # Make sure roots don't overlap their devices. If so, we
1067 # can't proceed.
1068 roots = list(pe.target_by_root.keys())
1069 self.log.debug('roots %s', roots)
1070 visited: Dict[int, str] = {}
1071 overlap: Dict[int, List[str]] = {}
1072 for root, wm in pe.target_by_root.items():
1073 for osd in wm:
1074 if osd in visited:
1075 if osd not in overlap:
1076 overlap[osd] = [visited[osd]]
1077 overlap[osd].append(root)
1078 visited[osd] = root
1079 if len(overlap) > 0:
1080 detail = 'Some osds belong to multiple subtrees: %s' % \
1081 overlap
1082 self.log.error(detail)
1083 return -errno.EOPNOTSUPP, detail
1084
1085 # rebalance by pgs, objects, or bytes
1086 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
1087 key = metrics[0] # balancing using the first score metric
1088 if key not in ['pgs', 'bytes', 'objects']:
1089 self.log.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
1090 key = 'pgs'
1091
1092 # go
1093 best_ws = copy.deepcopy(orig_ws)
1094 best_ow = copy.deepcopy(orig_osd_weight)
1095 best_pe = pe
1096 left = max_iterations
1097 bad_steps = 0
1098 next_ws = copy.deepcopy(best_ws)
1099 next_ow = copy.deepcopy(best_ow)
1100 while left > 0:
1101 # adjust
1102 self.log.debug('best_ws %s' % best_ws)
1103 random.shuffle(roots)
1104 for root in roots:
1105 pools = best_pe.root_pools[root]
1106 osds = len(best_pe.target_by_root[root])
1107 min_pgs = osds * min_pg_per_osd
1108 if best_pe.total_by_root[root][key] < min_pgs:
1109 self.log.info('Skipping root %s (pools %s), total pgs %d '
1110 '< minimum %d (%d per osd)',
1111 root, pools,
1112 best_pe.total_by_root[root][key],
1113 min_pgs, min_pg_per_osd)
1114 continue
1115 self.log.info('Balancing root %s (pools %s) by %s' %
1116 (root, pools, key))
1117 target = best_pe.target_by_root[root]
1118 actual = best_pe.actual_by_root[root][key]
1119 queue = sorted(actual.keys(),
1120 key=lambda osd: -abs(target[osd] - actual[osd]))
1121 for osd in queue:
1122 if orig_osd_weight[osd] == 0:
1123 self.log.debug('skipping out osd.%d', osd)
1124 else:
1125 deviation = target[osd] - actual[osd]
1126 if deviation == 0:
1127 break
1128 self.log.debug('osd.%d deviation %f', osd, deviation)
1129 weight = best_ws[osd]
1130 ow = orig_osd_weight[osd]
1131 if actual[osd] > 0:
1132 calc_weight = target[osd] / actual[osd] * weight * ow
1133 else:
1134 # for newly created osds, reset calc_weight at target value
1135 # this way weight-set will end up absorbing *step* of its
1136 # target (final) value at the very beginning and slowly catch up later.
1137 # note that if this turns out causing too many misplaced
1138 # pgs, then we'll reduce step and retry
1139 calc_weight = target[osd]
1140 new_weight = weight * (1.0 - step) + calc_weight * step
1141 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
1142 new_weight)
1143 next_ws[osd] = new_weight
1144 if ow < 1.0:
1145 new_ow = min(1.0, max(step + (1.0 - step) * ow,
1146 ow + .005))
1147 self.log.debug('Reweight osd.%d reweight %f -> %f',
1148 osd, ow, new_ow)
1149 next_ow[osd] = new_ow
1150
1151 # normalize weights under this root
1152 root_weight = crush.get_item_weight(pe.root_ids[root])
1153 root_sum = sum(b for a, b in next_ws.items()
1154 if a in target.keys())
1155 if root_sum > 0 and root_weight > 0:
1156 factor = root_sum / root_weight
1157 self.log.debug('normalizing root %s %d, weight %f, '
1158 'ws sum %f, factor %f',
1159 root, pe.root_ids[root], root_weight,
1160 root_sum, factor)
1161 for osd in actual.keys():
1162 next_ws[osd] = next_ws[osd] / factor
1163
1164 # recalc
1165 plan.compat_ws = copy.deepcopy(next_ws)
1166 next_ms = plan.final_state()
1167 next_pe = self.calc_eval(next_ms, plan.pools)
1168 next_misplaced = next_ms.calc_misplaced_from(ms)
1169 self.log.debug('Step result score %f -> %f, misplacing %f',
1170 best_pe.score, next_pe.score, next_misplaced)
1171
1172 if next_misplaced > max_misplaced:
1173 if best_pe.score < pe.score:
1174 self.log.debug('Step misplaced %f > max %f, stopping',
1175 next_misplaced, max_misplaced)
1176 break
1177 step /= 2.0
1178 next_ws = copy.deepcopy(best_ws)
1179 next_ow = copy.deepcopy(best_ow)
1180 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
1181 next_misplaced, max_misplaced, step)
1182 else:
1183 if next_pe.score > best_pe.score * 1.0001:
1184 bad_steps += 1
1185 if bad_steps < 5 and random.randint(0, 100) < 70:
1186 self.log.debug('Score got worse, taking another step')
1187 else:
1188 step /= 2.0
1189 next_ws = copy.deepcopy(best_ws)
1190 next_ow = copy.deepcopy(best_ow)
1191 self.log.debug('Score got worse, trying smaller step %f',
1192 step)
1193 else:
1194 bad_steps = 0
1195 best_pe = next_pe
1196 best_ws = copy.deepcopy(next_ws)
1197 best_ow = copy.deepcopy(next_ow)
1198 if best_pe.score == 0:
1199 break
1200 left -= 1
1201
1202 # allow a small regression if we are phasing out osd weights
1203 fudge = 0.0
1204 if best_ow != orig_osd_weight:
1205 fudge = .001
1206
1207 if best_pe.score < pe.score + fudge:
1208 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
1209 plan.compat_ws = best_ws
1210 for osd, w in best_ow.items():
1211 if w != orig_osd_weight[osd]:
1212 self.log.debug('osd.%d reweight %f', osd, w)
1213 plan.osd_weights[osd] = w
1214 return 0, ''
1215 else:
1216 self.log.info('Failed to find further optimization, score %f',
1217 pe.score)
1218 plan.compat_ws = {}
1219 return -errno.EDOM, 'Unable to find further optimization, ' \
1220 'change balancer mode and retry might help'
1221
1222 def get_compat_weight_set_weights(self, ms: MappingState):
1223 have_choose_args = CRUSHMap.have_default_choose_args(ms.crush_dump)
1224 if have_choose_args:
1225 # get number of buckets in choose_args
1226 choose_args_len = len(CRUSHMap.get_default_choose_args(ms.crush_dump))
1227 if not have_choose_args or choose_args_len != len(ms.crush_dump['buckets']):
1228 # enable compat weight-set first
1229 self.log.debug('no choose_args or all buckets do not have weight-sets')
1230 self.log.debug('ceph osd crush weight-set create-compat')
1231 result = CommandResult('')
1232 self.send_command(result, 'mon', '', json.dumps({
1233 'prefix': 'osd crush weight-set create-compat',
1234 'format': 'json',
1235 }), '')
1236 r, outb, outs = result.wait()
1237 if r != 0:
1238 self.log.error('Error creating compat weight-set')
1239 return
1240
1241 result = CommandResult('')
1242 self.send_command(result, 'mon', '', json.dumps({
1243 'prefix': 'osd crush dump',
1244 'format': 'json',
1245 }), '')
1246 r, outb, outs = result.wait()
1247 if r != 0:
1248 self.log.error('Error dumping crush map')
1249 return
1250 try:
1251 crushmap = json.loads(outb)
1252 except json.JSONDecodeError:
1253 raise RuntimeError('unable to parse crush map')
1254 else:
1255 crushmap = ms.crush_dump
1256
1257 raw = CRUSHMap.get_default_choose_args(crushmap)
1258 weight_set = {}
1259 for b in raw:
1260 bucket = None
1261 for t in crushmap['buckets']:
1262 if t['id'] == b['bucket_id']:
1263 bucket = t
1264 break
1265 if not bucket:
1266 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
1267 self.log.debug('bucket items %s' % bucket['items'])
1268 self.log.debug('weight set %s' % b['weight_set'][0])
1269 if len(bucket['items']) != len(b['weight_set'][0]):
1270 raise RuntimeError('weight-set size does not match bucket items')
1271 for pos in range(len(bucket['items'])):
1272 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
1273
1274 self.log.debug('weight_set weights %s' % weight_set)
1275 return weight_set
1276
1277 def do_crush(self) -> None:
1278 self.log.info('do_crush (not yet implemented)')
1279
1280 def do_osd_weight(self) -> None:
1281 self.log.info('do_osd_weight (not yet implemented)')
1282
1283 def execute(self, plan: Plan) -> Tuple[int, str]:
1284 self.log.info('Executing plan %s' % plan.name)
1285
1286 commands = []
1287
1288 # compat weight-set
1289 if len(plan.compat_ws):
1290 ms_plan = cast(MsPlan, plan)
1291 if not CRUSHMap.have_default_choose_args(ms_plan.initial.crush_dump):
1292 self.log.debug('ceph osd crush weight-set create-compat')
1293 result = CommandResult('')
1294 self.send_command(result, 'mon', '', json.dumps({
1295 'prefix': 'osd crush weight-set create-compat',
1296 'format': 'json',
1297 }), '')
1298 r, outb, outs = result.wait()
1299 if r != 0:
1300 self.log.error('Error creating compat weight-set')
1301 return r, outs
1302
1303 for osd, weight in plan.compat_ws.items():
1304 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1305 osd, weight)
1306 result = CommandResult('')
1307 self.send_command(result, 'mon', '', json.dumps({
1308 'prefix': 'osd crush weight-set reweight-compat',
1309 'format': 'json',
1310 'item': 'osd.%d' % osd,
1311 'weight': [weight],
1312 }), '')
1313 commands.append(result)
1314
1315 # new_weight
1316 reweightn = {}
1317 for osd, weight in plan.osd_weights.items():
1318 reweightn[str(osd)] = str(int(weight * float(0x10000)))
1319 if len(reweightn):
1320 self.log.info('ceph osd reweightn %s', reweightn)
1321 result = CommandResult('')
1322 self.send_command(result, 'mon', '', json.dumps({
1323 'prefix': 'osd reweightn',
1324 'format': 'json',
1325 'weights': json.dumps(reweightn),
1326 }), '')
1327 commands.append(result)
1328
1329 # upmap
1330 incdump = plan.inc.dump()
1331 for item in incdump.get('new_pg_upmap', []):
1332 self.log.info('ceph osd pg-upmap %s mappings %s', item['pgid'],
1333 item['osds'])
1334 result = CommandResult('foo')
1335 self.send_command(result, 'mon', '', json.dumps({
1336 'prefix': 'osd pg-upmap',
1337 'format': 'json',
1338 'pgid': item['pgid'],
1339 'id': item['osds'],
1340 }), 'foo')
1341 commands.append(result)
1342
1343 for pgid in incdump.get('old_pg_upmap', []):
1344 self.log.info('ceph osd rm-pg-upmap %s', pgid)
1345 result = CommandResult('foo')
1346 self.send_command(result, 'mon', '', json.dumps({
1347 'prefix': 'osd rm-pg-upmap',
1348 'format': 'json',
1349 'pgid': pgid,
1350 }), 'foo')
1351 commands.append(result)
1352
1353 for item in incdump.get('new_pg_upmap_items', []):
1354 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
1355 item['mappings'])
1356 osdlist = []
1357 for m in item['mappings']:
1358 osdlist += [m['from'], m['to']]
1359 result = CommandResult('foo')
1360 self.send_command(result, 'mon', '', json.dumps({
1361 'prefix': 'osd pg-upmap-items',
1362 'format': 'json',
1363 'pgid': item['pgid'],
1364 'id': osdlist,
1365 }), 'foo')
1366 commands.append(result)
1367
1368 for pgid in incdump.get('old_pg_upmap_items', []):
1369 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
1370 result = CommandResult('foo')
1371 self.send_command(result, 'mon', '', json.dumps({
1372 'prefix': 'osd rm-pg-upmap-items',
1373 'format': 'json',
1374 'pgid': pgid,
1375 }), 'foo')
1376 commands.append(result)
1377
1378 # wait for commands
1379 self.log.debug('commands %s' % commands)
1380 for result in commands:
1381 r, outb, outs = result.wait()
1382 if r != 0:
1383 self.log.error('execute error: r = %d, detail = %s' % (r, outs))
1384 return r, outs
1385 self.log.debug('done')
1386 return 0, ''
1387
1388 def gather_telemetry(self) -> Dict[str, Any]:
1389 return {
1390 'active': self.active,
1391 'mode': self.mode,
1392 }