]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / pybind / mgr / balancer / module.py
1 """
2 Balance PG distribution across OSDs.
3 """
4
5 import copy
6 import enum
7 import errno
8 import json
9 import math
10 import random
11 import time
12 from mgr_module import CLIReadCommand, CLICommand, CommandResult, MgrModule, Option, OSDMap
13 from threading import Event
14 from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, Union
15 from mgr_module import CRUSHMap
16 import datetime
17
18 TIME_FORMAT = '%Y-%m-%d_%H:%M:%S'
19
20
21 class MappingState:
22 def __init__(self, osdmap, raw_pg_stats, raw_pool_stats, desc=''):
23 self.desc = desc
24 self.osdmap = osdmap
25 self.osdmap_dump = self.osdmap.dump()
26 self.crush = osdmap.get_crush()
27 self.crush_dump = self.crush.dump()
28 self.raw_pg_stats = raw_pg_stats
29 self.raw_pool_stats = raw_pool_stats
30 self.pg_stat = {
31 i['pgid']: i['stat_sum'] for i in raw_pg_stats.get('pg_stats', [])
32 }
33 osd_poolids = [p['pool'] for p in self.osdmap_dump.get('pools', [])]
34 pg_poolids = [p['poolid'] for p in raw_pool_stats.get('pool_stats', [])]
35 self.poolids = set(osd_poolids) & set(pg_poolids)
36 self.pg_up = {}
37 self.pg_up_by_poolid = {}
38 for poolid in self.poolids:
39 self.pg_up_by_poolid[poolid] = osdmap.map_pool_pgs_up(poolid)
40 for a, b in self.pg_up_by_poolid[poolid].items():
41 self.pg_up[a] = b
42
43 def calc_misplaced_from(self, other_ms):
44 num = len(other_ms.pg_up)
45 misplaced = 0
46 for pgid, before in other_ms.pg_up.items():
47 if before != self.pg_up.get(pgid, []):
48 misplaced += 1
49 if num > 0:
50 return float(misplaced) / float(num)
51 return 0.0
52
53
54 class Mode(enum.Enum):
55 none = 'none'
56 crush_compat = 'crush-compat'
57 upmap = 'upmap'
58
59
60 class Plan(object):
61 def __init__(self, name, mode, osdmap, pools):
62 self.name = name
63 self.mode = mode
64 self.osdmap = osdmap
65 self.osdmap_dump = osdmap.dump()
66 self.pools = pools
67 self.osd_weights = {}
68 self.compat_ws = {}
69 self.inc = osdmap.new_incremental()
70 self.pg_status = {}
71
72 def dump(self) -> str:
73 return json.dumps(self.inc.dump(), indent=4, sort_keys=True)
74
75 def show(self) -> str:
76 return 'upmap plan'
77
78
79 class MsPlan(Plan):
80 """
81 Plan with a preloaded MappingState member.
82 """
83
84 def __init__(self, name: str, mode: str, ms: MappingState, pools: List[str]) -> None:
85 super(MsPlan, self).__init__(name, mode, ms.osdmap, pools)
86 self.initial = ms
87
88 def final_state(self) -> MappingState:
89 self.inc.set_osd_reweights(self.osd_weights)
90 self.inc.set_crush_compat_weight_set_weights(self.compat_ws)
91 return MappingState(self.initial.osdmap.apply_incremental(self.inc),
92 self.initial.raw_pg_stats,
93 self.initial.raw_pool_stats,
94 'plan %s final' % self.name)
95
96 def show(self) -> str:
97 ls = []
98 ls.append('# starting osdmap epoch %d' % self.initial.osdmap.get_epoch())
99 ls.append('# starting crush version %d' %
100 self.initial.osdmap.get_crush_version())
101 ls.append('# mode %s' % self.mode)
102 if len(self.compat_ws) and \
103 not CRUSHMap.have_default_choose_args(self.initial.crush_dump):
104 ls.append('ceph osd crush weight-set create-compat')
105 for osd, weight in self.compat_ws.items():
106 ls.append('ceph osd crush weight-set reweight-compat %s %f' %
107 (osd, weight))
108 for osd, weight in self.osd_weights.items():
109 ls.append('ceph osd reweight osd.%d %f' % (osd, weight))
110 incdump = self.inc.dump()
111 for pgid in incdump.get('old_pg_upmap_items', []):
112 ls.append('ceph osd rm-pg-upmap-items %s' % pgid)
113 for item in incdump.get('new_pg_upmap_items', []):
114 osdlist = []
115 for m in item['mappings']:
116 osdlist += [m['from'], m['to']]
117 ls.append('ceph osd pg-upmap-items %s %s' %
118 (item['pgid'], ' '.join([str(a) for a in osdlist])))
119 return '\n'.join(ls)
120
121
122 class Eval:
123 def __init__(self, ms: MappingState):
124 self.ms = ms
125 self.root_ids: Dict[str, int] = {} # root name -> id
126 self.pool_name: Dict[str, str] = {} # pool id -> pool name
127 self.pool_id: Dict[str, int] = {} # pool name -> id
128 self.pool_roots: Dict[str, List[str]] = {} # pool name -> root name
129 self.root_pools: Dict[str, List[str]] = {} # root name -> pools
130 self.target_by_root: Dict[str, Dict[int, float]] = {} # root name -> target weight map
131 self.count_by_pool: Dict[str, dict] = {}
132 self.count_by_root: Dict[str, dict] = {}
133 self.actual_by_pool: Dict[str, dict] = {} # pool -> by_* -> actual weight map
134 self.actual_by_root: Dict[str, dict] = {} # pool -> by_* -> actual weight map
135 self.total_by_pool: Dict[str, dict] = {} # pool -> by_* -> total
136 self.total_by_root: Dict[str, dict] = {} # root -> by_* -> total
137 self.stats_by_pool: Dict[str, dict] = {} # pool -> by_* -> stddev or avg -> value
138 self.stats_by_root: Dict[str, dict] = {} # root -> by_* -> stddev or avg -> value
139
140 self.score_by_pool: Dict[str, float] = {}
141 self.score_by_root: Dict[str, Dict[str, float]] = {}
142
143 self.score = 0.0
144
145 def show(self, verbose: bool = False) -> str:
146 if verbose:
147 r = self.ms.desc + '\n'
148 r += 'target_by_root %s\n' % self.target_by_root
149 r += 'actual_by_pool %s\n' % self.actual_by_pool
150 r += 'actual_by_root %s\n' % self.actual_by_root
151 r += 'count_by_pool %s\n' % self.count_by_pool
152 r += 'count_by_root %s\n' % self.count_by_root
153 r += 'total_by_pool %s\n' % self.total_by_pool
154 r += 'total_by_root %s\n' % self.total_by_root
155 r += 'stats_by_root %s\n' % self.stats_by_root
156 r += 'score_by_pool %s\n' % self.score_by_pool
157 r += 'score_by_root %s\n' % self.score_by_root
158 else:
159 r = self.ms.desc + ' '
160 r += 'score %f (lower is better)\n' % self.score
161 return r
162
163 def calc_stats(self, count, target, total):
164 num = max(len(target), 1)
165 r: Dict[str, Dict[str, Union[int, float]]] = {}
166 for t in ('pgs', 'objects', 'bytes'):
167 if total[t] == 0:
168 r[t] = {
169 'max': 0,
170 'min': 0,
171 'avg': 0,
172 'stddev': 0,
173 'sum_weight': 0,
174 'score': 0,
175 }
176 continue
177
178 avg = float(total[t]) / float(num)
179 dev = 0.0
180
181 # score is a measure of how uneven the data distribution is.
182 # score lies between [0, 1), 0 means perfect distribution.
183 score = 0.0
184 sum_weight = 0.0
185
186 for k, v in count[t].items():
187 # adjust/normalize by weight
188 if target[k]:
189 adjusted = float(v) / target[k] / float(num)
190 else:
191 adjusted = 0.0
192
193 # Overweighted devices and their weights are factors to calculate reweight_urgency.
194 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
195 # situation than one 10% overfilled with 5 2% underfilled devices
196 if adjusted > avg:
197 '''
198 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
199 x = (adjusted - avg)/avg.
200 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
201 To bring range of F(x) in range [0, 1), we need to make the above modification.
202
203 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
204 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
205 2. A larger value of x, should imply more urgency to reweight.
206 3. Also, the difference between F(x) when x is large, should be minimal.
207 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
208
209 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
210
211 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
212 '''
213 score += target[k] * (math.erf(((adjusted - avg) / avg) / math.sqrt(2.0)))
214 sum_weight += target[k]
215 dev += (avg - adjusted) * (avg - adjusted)
216 stddev = math.sqrt(dev / float(max(num - 1, 1)))
217 score = score / max(sum_weight, 1)
218 r[t] = {
219 'max': max(count[t].values()),
220 'min': min(count[t].values()),
221 'avg': avg,
222 'stddev': stddev,
223 'sum_weight': sum_weight,
224 'score': score,
225 }
226 return r
227
228
229 class Module(MgrModule):
230 MODULE_OPTIONS = [
231 Option(name='active',
232 type='bool',
233 default=True,
234 desc='automatically balance PGs across cluster',
235 runtime=True),
236 Option(name='begin_time',
237 type='str',
238 default='0000',
239 desc='beginning time of day to automatically balance',
240 long_desc='This is a time of day in the format HHMM.',
241 runtime=True),
242 Option(name='end_time',
243 type='str',
244 default='2359',
245 desc='ending time of day to automatically balance',
246 long_desc='This is a time of day in the format HHMM.',
247 runtime=True),
248 Option(name='begin_weekday',
249 type='uint',
250 default=0,
251 min=0,
252 max=6,
253 desc='Restrict automatic balancing to this day of the week or later',
254 long_desc='0 = Sunday, 1 = Monday, etc.',
255 runtime=True),
256 Option(name='end_weekday',
257 type='uint',
258 default=0,
259 min=0,
260 max=6,
261 desc='Restrict automatic balancing to days of the week earlier than this',
262 long_desc='0 = Sunday, 1 = Monday, etc.',
263 runtime=True),
264 Option(name='crush_compat_max_iterations',
265 type='uint',
266 default=25,
267 min=1,
268 max=250,
269 desc='maximum number of iterations to attempt optimization',
270 runtime=True),
271 Option(name='crush_compat_metrics',
272 type='str',
273 default='pgs,objects,bytes',
274 desc='metrics with which to calculate OSD utilization',
275 long_desc='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
276 runtime=True),
277 Option(name='crush_compat_step',
278 type='float',
279 default=.5,
280 min=.001,
281 max=.999,
282 desc='aggressiveness of optimization',
283 long_desc='.99 is very aggressive, .01 is less aggressive',
284 runtime=True),
285 Option(name='min_score',
286 type='float',
287 default=0,
288 desc='minimum score, below which no optimization is attempted',
289 runtime=True),
290 Option(name='mode',
291 desc='Balancer mode',
292 default='upmap',
293 enum_allowed=['none', 'crush-compat', 'upmap'],
294 runtime=True),
295 Option(name='sleep_interval',
296 type='secs',
297 default=60,
298 desc='how frequently to wake up and attempt optimization',
299 runtime=True),
300 Option(name='upmap_max_optimizations',
301 type='uint',
302 default=10,
303 desc='maximum upmap optimizations to make per attempt',
304 runtime=True),
305 Option(name='upmap_max_deviation',
306 type='int',
307 default=5,
308 min=1,
309 desc='deviation below which no optimization is attempted',
310 long_desc='If the number of PGs are within this count then no optimization is attempted',
311 runtime=True),
312 Option(name='pool_ids',
313 type='str',
314 default='',
315 desc='pools which the automatic balancing will be limited to',
316 runtime=True)
317 ]
318
319 active = False
320 run = True
321 plans: Dict[str, Plan] = {}
322 mode = ''
323 optimizing = False
324 last_optimize_started = ''
325 last_optimize_duration = ''
326 optimize_result = ''
327 no_optimization_needed = False
328 success_string = 'Optimization plan created successfully'
329 in_progress_string = 'in progress'
330
331 def __init__(self, *args: Any, **kwargs: Any) -> None:
332 super(Module, self).__init__(*args, **kwargs)
333 self.event = Event()
334
335 @CLIReadCommand('balancer status')
336 def show_status(self) -> Tuple[int, str, str]:
337 """
338 Show balancer status
339 """
340 s = {
341 'plans': list(self.plans.keys()),
342 'active': self.active,
343 'last_optimize_started': self.last_optimize_started,
344 'last_optimize_duration': self.last_optimize_duration,
345 'optimize_result': self.optimize_result,
346 'no_optimization_needed': self.no_optimization_needed,
347 'mode': self.get_module_option('mode'),
348 }
349 return (0, json.dumps(s, indent=4, sort_keys=True), '')
350
351 @CLICommand('balancer mode')
352 def set_mode(self, mode: Mode) -> Tuple[int, str, str]:
353 """
354 Set balancer mode
355 """
356 if mode == Mode.upmap:
357 min_compat_client = self.get_osdmap().dump().get('require_min_compat_client', '')
358 if min_compat_client < 'luminous': # works well because version is alphabetized..
359 warn = ('min_compat_client "%s" '
360 '< "luminous", which is required for pg-upmap. '
361 'Try "ceph osd set-require-min-compat-client luminous" '
362 'before enabling this mode' % min_compat_client)
363 return (-errno.EPERM, '', warn)
364 elif mode == Mode.crush_compat:
365 ms = MappingState(self.get_osdmap(),
366 self.get("pg_stats"),
367 self.get("pool_stats"),
368 'initialize compat weight-set')
369 self.get_compat_weight_set_weights(ms) # ignore error
370 self.set_module_option('mode', mode.value)
371 return (0, '', '')
372
373 @CLICommand('balancer on')
374 def on(self) -> Tuple[int, str, str]:
375 """
376 Enable automatic balancing
377 """
378 if not self.active:
379 self.set_module_option('active', 'true')
380 self.active = True
381 self.event.set()
382 return (0, '', '')
383
384 @CLICommand('balancer off')
385 def off(self) -> Tuple[int, str, str]:
386 """
387 Disable automatic balancing
388 """
389 if self.active:
390 self.set_module_option('active', 'false')
391 self.active = False
392 self.event.set()
393 return (0, '', '')
394
395 @CLIReadCommand('balancer pool ls')
396 def pool_ls(self) -> Tuple[int, str, str]:
397 """
398 List automatic balancing pools
399
400 Note that empty list means all existing pools will be automatic balancing targets,
401 which is the default behaviour of balancer.
402 """
403 pool_ids = cast(str, self.get_module_option('pool_ids'))
404 if pool_ids == '':
405 return (0, '', '')
406 pool_ids = [int(p) for p in pool_ids.split(',')]
407 pool_name_by_id = dict((p['pool'], p['pool_name'])
408 for p in self.get_osdmap().dump().get('pools', []))
409 should_prune = False
410 final_ids: List[int] = []
411 final_names = []
412 for p in pool_ids:
413 if p in pool_name_by_id:
414 final_ids.append(p)
415 final_names.append(pool_name_by_id[p])
416 else:
417 should_prune = True
418 if should_prune: # some pools were gone, prune
419 self.set_module_option('pool_ids', ','.join(str(p) for p in final_ids))
420 return (0, json.dumps(sorted(final_names), indent=4, sort_keys=True), '')
421
422 @CLICommand('balancer pool add')
423 def pool_add(self, pools: Sequence[str]) -> Tuple[int, str, str]:
424 """
425 Enable automatic balancing for specific pools
426 """
427 raw_names = pools
428 pool_id_by_name = dict((p['pool_name'], p['pool'])
429 for p in self.get_osdmap().dump().get('pools', []))
430 invalid_names = [p for p in raw_names if p not in pool_id_by_name]
431 if invalid_names:
432 return (-errno.EINVAL, '', 'pool(s) %s not found' % invalid_names)
433 to_add = set(str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name)
434 pool_ids = cast(str, self.get_module_option('pool_ids'))
435 existing = set(pool_ids.split(',') if pool_ids else [])
436 final = to_add | existing
437 self.set_module_option('pool_ids', ','.join(final))
438 return (0, '', '')
439
440 @CLICommand('balancer pool rm')
441 def pool_rm(self, pools: Sequence[str]) -> Tuple[int, str, str]:
442 """
443 Disable automatic balancing for specific pools
444 """
445 raw_names = pools
446 existing = cast(str, self.get_module_option('pool_ids'))
447 if existing == '': # for idempotence
448 return (0, '', '')
449 existing = existing.split(',')
450 osdmap = self.get_osdmap()
451 pool_ids = [str(p['pool']) for p in osdmap.dump().get('pools', [])]
452 pool_id_by_name = dict((p['pool_name'], p['pool']) for p in osdmap.dump().get('pools', []))
453 final = [p for p in existing if p in pool_ids]
454 to_delete = [str(pool_id_by_name[p]) for p in raw_names if p in pool_id_by_name]
455 final = set(final) - set(to_delete)
456 self.set_module_option('pool_ids', ','.join(final))
457 return (0, '', '')
458
459 def _state_from_option(self, option: Optional[str] = None) -> Tuple[MappingState, List[str]]:
460 pools = []
461 if option is None:
462 ms = MappingState(self.get_osdmap(),
463 self.get("pg_stats"),
464 self.get("pool_stats"),
465 'current cluster')
466 elif option in self.plans:
467 plan = self.plans.get(option)
468 assert plan
469 pools = plan.pools
470 if plan.mode == 'upmap':
471 # Note that for upmap, to improve the efficiency,
472 # we use a basic version of Plan without keeping the obvious
473 # *redundant* MS member.
474 # Hence ms might not be accurate here since we are basically
475 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
476 # It should not be a big deal though..
477 ms = MappingState(plan.osdmap,
478 self.get("pg_stats"),
479 self.get("pool_stats"),
480 f'plan "{plan.name}"')
481 else:
482 ms = cast(MsPlan, plan).final_state()
483 else:
484 # not a plan, does it look like a pool?
485 osdmap = self.get_osdmap()
486 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
487 if option not in valid_pool_names:
488 raise ValueError(f'option "{option}" not a plan or a pool')
489 pools.append(option)
490 ms = MappingState(osdmap,
491 self.get("pg_stats"),
492 self.get("pool_stats"),
493 f'pool "{option}"')
494 return ms, pools
495
496 @CLIReadCommand('balancer eval-verbose')
497 def plan_eval_verbose(self, option: Optional[str] = None):
498 """
499 Evaluate data distribution for the current cluster or specific pool or specific
500 plan (verbosely)
501 """
502 try:
503 ms, pools = self._state_from_option(option)
504 return (0, self.evaluate(ms, pools, verbose=True), '')
505 except ValueError as e:
506 return (-errno.EINVAL, '', str(e))
507
508 @CLIReadCommand('balancer eval')
509 def plan_eval_brief(self, option: Optional[str] = None):
510 """
511 Evaluate data distribution for the current cluster or specific pool or specific plan
512 """
513 try:
514 ms, pools = self._state_from_option(option)
515 return (0, self.evaluate(ms, pools, verbose=False), '')
516 except ValueError as e:
517 return (-errno.EINVAL, '', str(e))
518
519 @CLIReadCommand('balancer optimize')
520 def plan_optimize(self, plan: str, pools: List[str] = []) -> Tuple[int, str, str]:
521 """
522 Run optimizer to create a new plan
523 """
524 # The GIL can be release by the active balancer, so disallow when active
525 if self.active:
526 return (-errno.EINVAL, '', 'Balancer enabled, disable to optimize manually')
527 if self.optimizing:
528 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
529 osdmap = self.get_osdmap()
530 valid_pool_names = [p['pool_name'] for p in osdmap.dump().get('pools', [])]
531 invalid_pool_names = []
532 for p in pools:
533 if p not in valid_pool_names:
534 invalid_pool_names.append(p)
535 if len(invalid_pool_names):
536 return (-errno.EINVAL, '', 'pools %s not found' % invalid_pool_names)
537 plan_ = self.plan_create(plan, osdmap, pools)
538 self.last_optimize_started = time.asctime(time.localtime())
539 self.optimize_result = self.in_progress_string
540 start = time.time()
541 r, detail = self.optimize(plan_)
542 end = time.time()
543 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
544 if r == 0:
545 # Add plan if an optimization was created
546 self.optimize_result = self.success_string
547 self.plans[plan] = plan_
548 else:
549 self.optimize_result = detail
550 return (r, '', detail)
551
552 @CLIReadCommand('balancer show')
553 def plan_show(self, plan: str) -> Tuple[int, str, str]:
554 """
555 Show details of an optimization plan
556 """
557 plan_ = self.plans.get(plan)
558 if not plan_:
559 return (-errno.ENOENT, '', f'plan {plan} not found')
560 return (0, plan_.show(), '')
561
562 @CLICommand('balancer rm')
563 def plan_rm(self, plan: str) -> Tuple[int, str, str]:
564 """
565 Discard an optimization plan
566 """
567 if plan in self.plans:
568 del self.plans[plan]
569 return (0, '', '')
570
571 @CLICommand('balancer reset')
572 def plan_reset(self) -> Tuple[int, str, str]:
573 """
574 Discard all optimization plans
575 """
576 self.plans = {}
577 return (0, '', '')
578
579 @CLIReadCommand('balancer dump')
580 def plan_dump(self, plan: str) -> Tuple[int, str, str]:
581 """
582 Show an optimization plan
583 """
584 plan_ = self.plans.get(plan)
585 if not plan_:
586 return -errno.ENOENT, '', f'plan {plan} not found'
587 else:
588 return (0, plan_.dump(), '')
589
590 @CLIReadCommand('balancer ls')
591 def plan_ls(self) -> Tuple[int, str, str]:
592 """
593 List all plans
594 """
595 return (0, json.dumps([p for p in self.plans], indent=4, sort_keys=True), '')
596
597 @CLIReadCommand('balancer execute')
598 def plan_execute(self, plan: str) -> Tuple[int, str, str]:
599 """
600 Execute an optimization plan
601 """
602 # The GIL can be release by the active balancer, so disallow when active
603 if self.active:
604 return (-errno.EINVAL, '', 'Balancer enabled, disable to execute a plan')
605 if self.optimizing:
606 return (-errno.EINVAL, '', 'Balancer finishing up....try again')
607 plan_ = self.plans.get(plan)
608 if not plan_:
609 return (-errno.ENOENT, '', f'plan {plan} not found')
610 r, detail = self.execute(plan_)
611 self.plan_rm(plan)
612 return (r, '', detail)
613
614 def shutdown(self) -> None:
615 self.log.info('Stopping')
616 self.run = False
617 self.event.set()
618
619 def time_permit(self) -> bool:
620 local_time = time.localtime()
621 time_of_day = time.strftime('%H%M', local_time)
622 weekday = (local_time.tm_wday + 1) % 7 # be compatible with C
623 permit = False
624
625 def check_time(time: str, option: str):
626 if len(time) != 4:
627 self.log.error('invalid time for %s - expected HHMM format', option)
628 try:
629 datetime.time(int(time[:2]), int(time[2:]))
630 except ValueError as err:
631 self.log.error('invalid time for %s - %s', option, err)
632
633 begin_time = cast(str, self.get_module_option('begin_time'))
634 check_time(begin_time, 'begin_time')
635 end_time = cast(str, self.get_module_option('end_time'))
636 check_time(end_time, 'end_time')
637 if begin_time < end_time:
638 permit = begin_time <= time_of_day < end_time
639 elif begin_time == end_time:
640 permit = True
641 else:
642 permit = time_of_day >= begin_time or time_of_day < end_time
643 if not permit:
644 self.log.debug("should run between %s - %s, now %s, skipping",
645 begin_time, end_time, time_of_day)
646 return False
647
648 begin_weekday = cast(int, self.get_module_option('begin_weekday'))
649 end_weekday = cast(int, self.get_module_option('end_weekday'))
650 if begin_weekday < end_weekday:
651 permit = begin_weekday <= weekday <= end_weekday
652 elif begin_weekday == end_weekday:
653 permit = True
654 else:
655 permit = weekday >= begin_weekday or weekday < end_weekday
656 if not permit:
657 self.log.debug("should run between weekday %d - %d, now %d, skipping",
658 begin_weekday, end_weekday, weekday)
659 return False
660
661 return True
662
663 def serve(self) -> None:
664 self.log.info('Starting')
665 while self.run:
666 self.active = cast(bool, self.get_module_option('active'))
667 sleep_interval = cast(float, self.get_module_option('sleep_interval'))
668 self.log.debug('Waking up [%s, now %s]',
669 "active" if self.active else "inactive",
670 time.strftime(TIME_FORMAT, time.localtime()))
671 if self.active and self.time_permit():
672 self.log.debug('Running')
673 name = 'auto_%s' % time.strftime(TIME_FORMAT, time.gmtime())
674 osdmap = self.get_osdmap()
675 pool_ids = cast(str, self.get_module_option('pool_ids'))
676 if pool_ids:
677 allow = [int(p) for p in pool_ids.split(',')]
678 else:
679 allow = []
680 final: List[str] = []
681 if allow:
682 pools = osdmap.dump().get('pools', [])
683 valid = [p['pool'] for p in pools]
684 ids = set(allow) & set(valid)
685 if set(allow) - set(valid): # some pools were gone, prune
686 self.set_module_option('pool_ids', ','.join(str(p) for p in ids))
687 pool_name_by_id = dict((p['pool'], p['pool_name']) for p in pools)
688 final = [pool_name_by_id[p] for p in ids if p in pool_name_by_id]
689 plan = self.plan_create(name, osdmap, final)
690 self.optimizing = True
691 self.last_optimize_started = time.asctime(time.localtime())
692 self.optimize_result = self.in_progress_string
693 start = time.time()
694 r, detail = self.optimize(plan)
695 end = time.time()
696 self.last_optimize_duration = str(datetime.timedelta(seconds=(end - start)))
697 if r == 0:
698 self.optimize_result = self.success_string
699 self.execute(plan)
700 else:
701 self.optimize_result = detail
702 self.optimizing = False
703 self.log.debug('Sleeping for %d', sleep_interval)
704 self.event.wait(sleep_interval)
705 self.event.clear()
706
707 def plan_create(self, name: str, osdmap: OSDMap, pools: List[str]) -> Plan:
708 mode = cast(str, self.get_module_option('mode'))
709 if mode == 'upmap':
710 # drop unnecessary MS member for upmap mode.
711 # this way we could effectively eliminate the usage of a
712 # complete pg_stats, which can become horribly inefficient
713 # as pg_num grows..
714 plan = Plan(name, mode, osdmap, pools)
715 else:
716 plan = MsPlan(name,
717 mode,
718 MappingState(osdmap,
719 self.get("pg_stats"),
720 self.get("pool_stats"),
721 'plan %s initial' % name),
722 pools)
723 return plan
724
725 def calc_eval(self, ms: MappingState, pools: List[str]) -> Eval:
726 pe = Eval(ms)
727 pool_rule = {}
728 pool_info = {}
729 for p in ms.osdmap_dump.get('pools', []):
730 if len(pools) and p['pool_name'] not in pools:
731 continue
732 # skip dead or not-yet-ready pools too
733 if p['pool'] not in ms.poolids:
734 continue
735 pe.pool_name[p['pool']] = p['pool_name']
736 pe.pool_id[p['pool_name']] = p['pool']
737 pool_rule[p['pool_name']] = p['crush_rule']
738 pe.pool_roots[p['pool_name']] = []
739 pool_info[p['pool_name']] = p
740 if len(pool_info) == 0:
741 return pe
742 self.log.debug('pool_name %s' % pe.pool_name)
743 self.log.debug('pool_id %s' % pe.pool_id)
744 self.log.debug('pools %s' % pools)
745 self.log.debug('pool_rule %s' % pool_rule)
746
747 osd_weight = {a['osd']: a['weight']
748 for a in ms.osdmap_dump.get('osds', []) if a['weight'] > 0}
749
750 # get expected distributions by root
751 actual_by_root: Dict[str, Dict[str, dict]] = {}
752 rootids = ms.crush.find_takes()
753 roots = []
754 for rootid in rootids:
755 ls = ms.osdmap.get_pools_by_take(rootid)
756 want = []
757 # find out roots associating with pools we are passed in
758 for candidate in ls:
759 if candidate in pe.pool_name:
760 want.append(candidate)
761 if len(want) == 0:
762 continue
763 root = ms.crush.get_item_name(rootid)
764 pe.root_pools[root] = []
765 for poolid in want:
766 pe.pool_roots[pe.pool_name[poolid]].append(root)
767 pe.root_pools[root].append(pe.pool_name[poolid])
768 pe.root_ids[root] = rootid
769 roots.append(root)
770 weight_map = ms.crush.get_take_weight_osd_map(rootid)
771 adjusted_map = {
772 osd: cw * osd_weight[osd]
773 for osd, cw in weight_map.items() if osd in osd_weight and cw > 0
774 }
775 sum_w = sum(adjusted_map.values())
776 assert len(adjusted_map) == 0 or sum_w > 0
777 pe.target_by_root[root] = {osd: w / sum_w
778 for osd, w in adjusted_map.items()}
779 actual_by_root[root] = {
780 'pgs': {},
781 'objects': {},
782 'bytes': {},
783 }
784 for osd in pe.target_by_root[root]:
785 actual_by_root[root]['pgs'][osd] = 0
786 actual_by_root[root]['objects'][osd] = 0
787 actual_by_root[root]['bytes'][osd] = 0
788 pe.total_by_root[root] = {
789 'pgs': 0,
790 'objects': 0,
791 'bytes': 0,
792 }
793 self.log.debug('pool_roots %s' % pe.pool_roots)
794 self.log.debug('root_pools %s' % pe.root_pools)
795 self.log.debug('target_by_root %s' % pe.target_by_root)
796
797 # pool and root actual
798 for pool, pi in pool_info.items():
799 poolid = pi['pool']
800 pm = ms.pg_up_by_poolid[poolid]
801 pgs = 0
802 objects = 0
803 bytes = 0
804 pgs_by_osd = {}
805 objects_by_osd = {}
806 bytes_by_osd = {}
807 for pgid, up in pm.items():
808 for osd in [int(osd) for osd in up]:
809 if osd == CRUSHMap.ITEM_NONE:
810 continue
811 if osd not in pgs_by_osd:
812 pgs_by_osd[osd] = 0
813 objects_by_osd[osd] = 0
814 bytes_by_osd[osd] = 0
815 pgs_by_osd[osd] += 1
816 objects_by_osd[osd] += ms.pg_stat[pgid]['num_objects']
817 bytes_by_osd[osd] += ms.pg_stat[pgid]['num_bytes']
818 # pick a root to associate this pg instance with.
819 # note that this is imprecise if the roots have
820 # overlapping children.
821 # FIXME: divide bytes by k for EC pools.
822 for root in pe.pool_roots[pool]:
823 if osd in pe.target_by_root[root]:
824 actual_by_root[root]['pgs'][osd] += 1
825 actual_by_root[root]['objects'][osd] += ms.pg_stat[pgid]['num_objects']
826 actual_by_root[root]['bytes'][osd] += ms.pg_stat[pgid]['num_bytes']
827 pgs += 1
828 objects += ms.pg_stat[pgid]['num_objects']
829 bytes += ms.pg_stat[pgid]['num_bytes']
830 pe.total_by_root[root]['pgs'] += 1
831 pe.total_by_root[root]['objects'] += ms.pg_stat[pgid]['num_objects']
832 pe.total_by_root[root]['bytes'] += ms.pg_stat[pgid]['num_bytes']
833 break
834 pe.count_by_pool[pool] = {
835 'pgs': {
836 k: v
837 for k, v in pgs_by_osd.items()
838 },
839 'objects': {
840 k: v
841 for k, v in objects_by_osd.items()
842 },
843 'bytes': {
844 k: v
845 for k, v in bytes_by_osd.items()
846 },
847 }
848 pe.actual_by_pool[pool] = {
849 'pgs': {
850 k: float(v) / float(max(pgs, 1))
851 for k, v in pgs_by_osd.items()
852 },
853 'objects': {
854 k: float(v) / float(max(objects, 1))
855 for k, v in objects_by_osd.items()
856 },
857 'bytes': {
858 k: float(v) / float(max(bytes, 1))
859 for k, v in bytes_by_osd.items()
860 },
861 }
862 pe.total_by_pool[pool] = {
863 'pgs': pgs,
864 'objects': objects,
865 'bytes': bytes,
866 }
867 for root in pe.total_by_root:
868 pe.count_by_root[root] = {
869 'pgs': {
870 k: float(v)
871 for k, v in actual_by_root[root]['pgs'].items()
872 },
873 'objects': {
874 k: float(v)
875 for k, v in actual_by_root[root]['objects'].items()
876 },
877 'bytes': {
878 k: float(v)
879 for k, v in actual_by_root[root]['bytes'].items()
880 },
881 }
882 pe.actual_by_root[root] = {
883 'pgs': {
884 k: float(v) / float(max(pe.total_by_root[root]['pgs'], 1))
885 for k, v in actual_by_root[root]['pgs'].items()
886 },
887 'objects': {
888 k: float(v) / float(max(pe.total_by_root[root]['objects'], 1))
889 for k, v in actual_by_root[root]['objects'].items()
890 },
891 'bytes': {
892 k: float(v) / float(max(pe.total_by_root[root]['bytes'], 1))
893 for k, v in actual_by_root[root]['bytes'].items()
894 },
895 }
896 self.log.debug('actual_by_pool %s' % pe.actual_by_pool)
897 self.log.debug('actual_by_root %s' % pe.actual_by_root)
898
899 # average and stddev and score
900 pe.stats_by_root = {
901 a: pe.calc_stats(
902 b,
903 pe.target_by_root[a],
904 pe.total_by_root[a]
905 ) for a, b in pe.count_by_root.items()
906 }
907 self.log.debug('stats_by_root %s' % pe.stats_by_root)
908
909 # the scores are already normalized
910 pe.score_by_root = {
911 r: {
912 'pgs': pe.stats_by_root[r]['pgs']['score'],
913 'objects': pe.stats_by_root[r]['objects']['score'],
914 'bytes': pe.stats_by_root[r]['bytes']['score'],
915 } for r in pe.total_by_root.keys()
916 }
917 self.log.debug('score_by_root %s' % pe.score_by_root)
918
919 # get the list of score metrics, comma separated
920 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
921
922 # total score is just average of normalized stddevs
923 pe.score = 0.0
924 for r, vs in pe.score_by_root.items():
925 for k, v in vs.items():
926 if k in metrics:
927 pe.score += v
928 pe.score /= len(metrics) * len(roots)
929 return pe
930
931 def evaluate(self, ms: MappingState, pools: List[str], verbose: bool = False) -> str:
932 pe = self.calc_eval(ms, pools)
933 return pe.show(verbose=verbose)
934
935 def optimize(self, plan: Plan) -> Tuple[int, str]:
936 self.log.info('Optimize plan %s' % plan.name)
937 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
938 self.log.info('Mode %s, max misplaced %f' %
939 (plan.mode, max_misplaced))
940
941 info = self.get('pg_status')
942 unknown = info.get('unknown_pgs_ratio', 0.0)
943 degraded = info.get('degraded_ratio', 0.0)
944 inactive = info.get('inactive_pgs_ratio', 0.0)
945 misplaced = info.get('misplaced_ratio', 0.0)
946 plan.pg_status = info
947 self.log.debug('unknown %f degraded %f inactive %f misplaced %g',
948 unknown, degraded, inactive, misplaced)
949 if unknown > 0.0:
950 detail = 'Some PGs (%f) are unknown; try again later' % unknown
951 self.log.info(detail)
952 return -errno.EAGAIN, detail
953 elif degraded > 0.0:
954 detail = 'Some objects (%f) are degraded; try again later' % degraded
955 self.log.info(detail)
956 return -errno.EAGAIN, detail
957 elif inactive > 0.0:
958 detail = 'Some PGs (%f) are inactive; try again later' % inactive
959 self.log.info(detail)
960 return -errno.EAGAIN, detail
961 elif misplaced >= max_misplaced:
962 detail = 'Too many objects (%f > %f) are misplaced; ' \
963 'try again later' % (misplaced, max_misplaced)
964 self.log.info(detail)
965 return -errno.EAGAIN, detail
966 else:
967 if plan.mode == 'upmap':
968 return self.do_upmap(plan)
969 elif plan.mode == 'crush-compat':
970 return self.do_crush_compat(cast(MsPlan, plan))
971 elif plan.mode == 'none':
972 detail = 'Please do "ceph balancer mode" to choose a valid mode first'
973 self.log.info('Idle')
974 return -errno.ENOEXEC, detail
975 else:
976 detail = 'Unrecognized mode %s' % plan.mode
977 self.log.info(detail)
978 return -errno.EINVAL, detail
979
980 def do_upmap(self, plan: Plan) -> Tuple[int, str]:
981 self.log.info('do_upmap')
982 max_optimizations = cast(float, self.get_module_option('upmap_max_optimizations'))
983 max_deviation = cast(int, self.get_module_option('upmap_max_deviation'))
984 osdmap_dump = plan.osdmap_dump
985
986 if len(plan.pools):
987 pools = plan.pools
988 else: # all
989 pools = [str(i['pool_name']) for i in osdmap_dump.get('pools', [])]
990 if len(pools) == 0:
991 detail = 'No pools available'
992 self.log.info(detail)
993 return -errno.ENOENT, detail
994 # shuffle pool list so they all get equal (in)attention
995 random.shuffle(pools)
996 self.log.info('pools %s' % pools)
997
998 adjusted_pools = []
999 inc = plan.inc
1000 total_did = 0
1001 left = max_optimizations
1002 pools_with_pg_merge = [p['pool_name'] for p in osdmap_dump.get('pools', [])
1003 if p['pg_num'] > p['pg_num_target']]
1004 crush_rule_by_pool_name = dict((p['pool_name'], p['crush_rule'])
1005 for p in osdmap_dump.get('pools', []))
1006 for pool in pools:
1007 if pool not in crush_rule_by_pool_name:
1008 self.log.info('pool %s does not exist' % pool)
1009 continue
1010 if pool in pools_with_pg_merge:
1011 self.log.info('pool %s has pending PG(s) for merging, skipping for now' % pool)
1012 continue
1013 adjusted_pools.append(pool)
1014 # shuffle so all pools get equal (in)attention
1015 random.shuffle(adjusted_pools)
1016 pool_dump = osdmap_dump.get('pools', [])
1017 for pool in adjusted_pools:
1018 for p in pool_dump:
1019 if p['pool_name'] == pool:
1020 pool_id = p['pool']
1021 break
1022
1023 # note that here we deliberately exclude any scrubbing pgs too
1024 # since scrubbing activities have significant impacts on performance
1025 num_pg_active_clean = 0
1026 for p in plan.pg_status.get('pgs_by_pool_state', []):
1027 pgs_pool_id = p['pool_id']
1028 if pgs_pool_id != pool_id:
1029 continue
1030 for s in p['pg_state_counts']:
1031 if s['state_name'] == 'active+clean':
1032 num_pg_active_clean += s['count']
1033 break
1034 available = min(left, num_pg_active_clean)
1035 did = plan.osdmap.calc_pg_upmaps(inc, max_deviation, available, [pool])
1036 total_did += did
1037 left -= did
1038 if left <= 0:
1039 break
1040 self.log.info('prepared %d/%d changes' % (total_did, max_optimizations))
1041 if total_did == 0:
1042 self.no_optimization_needed = True
1043 return -errno.EALREADY, 'Unable to find further optimization, ' \
1044 'or pool(s) pg_num is decreasing, ' \
1045 'or distribution is already perfect'
1046 return 0, ''
1047
1048 def do_crush_compat(self, plan: MsPlan) -> Tuple[int, str]:
1049 self.log.info('do_crush_compat')
1050 max_iterations = cast(int, self.get_module_option('crush_compat_max_iterations'))
1051 if max_iterations < 1:
1052 return -errno.EINVAL, '"crush_compat_max_iterations" must be >= 1'
1053 step = cast(float, self.get_module_option('crush_compat_step'))
1054 if step <= 0 or step >= 1.0:
1055 return -errno.EINVAL, '"crush_compat_step" must be in (0, 1)'
1056 max_misplaced = cast(float, self.get_ceph_option('target_max_misplaced_ratio'))
1057 min_pg_per_osd = 2
1058
1059 ms = plan.initial
1060 osdmap = ms.osdmap
1061 crush = osdmap.get_crush()
1062 pe = self.calc_eval(ms, plan.pools)
1063 min_score_to_optimize = cast(float, self.get_module_option('min_score'))
1064 if pe.score <= min_score_to_optimize:
1065 if pe.score == 0:
1066 detail = 'Distribution is already perfect'
1067 else:
1068 detail = 'score %f <= min_score %f, will not optimize' \
1069 % (pe.score, min_score_to_optimize)
1070 self.log.info(detail)
1071 return -errno.EALREADY, detail
1072
1073 # get current osd reweights
1074 orig_osd_weight = {a['osd']: a['weight']
1075 for a in ms.osdmap_dump.get('osds', [])}
1076
1077 # get current compat weight-set weights
1078 orig_ws = self.get_compat_weight_set_weights(ms)
1079 if not orig_ws:
1080 return -errno.EAGAIN, 'compat weight-set not available'
1081 orig_ws = {a: b for a, b in orig_ws.items() if a >= 0}
1082
1083 # Make sure roots don't overlap their devices. If so, we
1084 # can't proceed.
1085 roots = list(pe.target_by_root.keys())
1086 self.log.debug('roots %s', roots)
1087 visited: Dict[int, str] = {}
1088 overlap: Dict[int, List[str]] = {}
1089 for root, wm in pe.target_by_root.items():
1090 for osd in wm:
1091 if osd in visited:
1092 if osd not in overlap:
1093 overlap[osd] = [visited[osd]]
1094 overlap[osd].append(root)
1095 visited[osd] = root
1096 if len(overlap) > 0:
1097 detail = 'Some osds belong to multiple subtrees: %s' % \
1098 overlap
1099 self.log.error(detail)
1100 return -errno.EOPNOTSUPP, detail
1101
1102 # rebalance by pgs, objects, or bytes
1103 metrics = cast(str, self.get_module_option('crush_compat_metrics')).split(',')
1104 key = metrics[0] # balancing using the first score metric
1105 if key not in ['pgs', 'bytes', 'objects']:
1106 self.log.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key)
1107 key = 'pgs'
1108
1109 # go
1110 best_ws = copy.deepcopy(orig_ws)
1111 best_ow = copy.deepcopy(orig_osd_weight)
1112 best_pe = pe
1113 left = max_iterations
1114 bad_steps = 0
1115 next_ws = copy.deepcopy(best_ws)
1116 next_ow = copy.deepcopy(best_ow)
1117 while left > 0:
1118 # adjust
1119 self.log.debug('best_ws %s' % best_ws)
1120 random.shuffle(roots)
1121 for root in roots:
1122 pools = best_pe.root_pools[root]
1123 osds = len(best_pe.target_by_root[root])
1124 min_pgs = osds * min_pg_per_osd
1125 if best_pe.total_by_root[root][key] < min_pgs:
1126 self.log.info('Skipping root %s (pools %s), total pgs %d '
1127 '< minimum %d (%d per osd)',
1128 root, pools,
1129 best_pe.total_by_root[root][key],
1130 min_pgs, min_pg_per_osd)
1131 continue
1132 self.log.info('Balancing root %s (pools %s) by %s' %
1133 (root, pools, key))
1134 target = best_pe.target_by_root[root]
1135 actual = best_pe.actual_by_root[root][key]
1136 queue = sorted(actual.keys(),
1137 key=lambda osd: -abs(target[osd] - actual[osd]))
1138 for osd in queue:
1139 if orig_osd_weight[osd] == 0:
1140 self.log.debug('skipping out osd.%d', osd)
1141 else:
1142 deviation = target[osd] - actual[osd]
1143 if deviation == 0:
1144 break
1145 self.log.debug('osd.%d deviation %f', osd, deviation)
1146 weight = best_ws[osd]
1147 ow = orig_osd_weight[osd]
1148 if actual[osd] > 0:
1149 calc_weight = target[osd] / actual[osd] * weight * ow
1150 else:
1151 # for newly created osds, reset calc_weight at target value
1152 # this way weight-set will end up absorbing *step* of its
1153 # target (final) value at the very beginning and slowly catch up later.
1154 # note that if this turns out causing too many misplaced
1155 # pgs, then we'll reduce step and retry
1156 calc_weight = target[osd]
1157 new_weight = weight * (1.0 - step) + calc_weight * step
1158 self.log.debug('Reweight osd.%d %f -> %f', osd, weight,
1159 new_weight)
1160 next_ws[osd] = new_weight
1161 if ow < 1.0:
1162 new_ow = min(1.0, max(step + (1.0 - step) * ow,
1163 ow + .005))
1164 self.log.debug('Reweight osd.%d reweight %f -> %f',
1165 osd, ow, new_ow)
1166 next_ow[osd] = new_ow
1167
1168 # normalize weights under this root
1169 root_weight = crush.get_item_weight(pe.root_ids[root])
1170 root_sum = sum(b for a, b in next_ws.items()
1171 if a in target.keys())
1172 if root_sum > 0 and root_weight > 0:
1173 factor = root_sum / root_weight
1174 self.log.debug('normalizing root %s %d, weight %f, '
1175 'ws sum %f, factor %f',
1176 root, pe.root_ids[root], root_weight,
1177 root_sum, factor)
1178 for osd in actual.keys():
1179 next_ws[osd] = next_ws[osd] / factor
1180
1181 # recalc
1182 plan.compat_ws = copy.deepcopy(next_ws)
1183 next_ms = plan.final_state()
1184 next_pe = self.calc_eval(next_ms, plan.pools)
1185 next_misplaced = next_ms.calc_misplaced_from(ms)
1186 self.log.debug('Step result score %f -> %f, misplacing %f',
1187 best_pe.score, next_pe.score, next_misplaced)
1188
1189 if next_misplaced > max_misplaced:
1190 if best_pe.score < pe.score:
1191 self.log.debug('Step misplaced %f > max %f, stopping',
1192 next_misplaced, max_misplaced)
1193 break
1194 step /= 2.0
1195 next_ws = copy.deepcopy(best_ws)
1196 next_ow = copy.deepcopy(best_ow)
1197 self.log.debug('Step misplaced %f > max %f, reducing step to %f',
1198 next_misplaced, max_misplaced, step)
1199 else:
1200 if next_pe.score > best_pe.score * 1.0001:
1201 bad_steps += 1
1202 if bad_steps < 5 and random.randint(0, 100) < 70:
1203 self.log.debug('Score got worse, taking another step')
1204 else:
1205 step /= 2.0
1206 next_ws = copy.deepcopy(best_ws)
1207 next_ow = copy.deepcopy(best_ow)
1208 self.log.debug('Score got worse, trying smaller step %f',
1209 step)
1210 else:
1211 bad_steps = 0
1212 best_pe = next_pe
1213 best_ws = copy.deepcopy(next_ws)
1214 best_ow = copy.deepcopy(next_ow)
1215 if best_pe.score == 0:
1216 break
1217 left -= 1
1218
1219 # allow a small regression if we are phasing out osd weights
1220 fudge = 0.0
1221 if best_ow != orig_osd_weight:
1222 fudge = .001
1223
1224 if best_pe.score < pe.score + fudge:
1225 self.log.info('Success, score %f -> %f', pe.score, best_pe.score)
1226 plan.compat_ws = best_ws
1227 for osd, w in best_ow.items():
1228 if w != orig_osd_weight[osd]:
1229 self.log.debug('osd.%d reweight %f', osd, w)
1230 plan.osd_weights[osd] = w
1231 return 0, ''
1232 else:
1233 self.log.info('Failed to find further optimization, score %f',
1234 pe.score)
1235 plan.compat_ws = {}
1236 return -errno.EDOM, 'Unable to find further optimization, ' \
1237 'change balancer mode and retry might help'
1238
1239 def get_compat_weight_set_weights(self, ms: MappingState):
1240 have_choose_args = CRUSHMap.have_default_choose_args(ms.crush_dump)
1241 if have_choose_args:
1242 # get number of buckets in choose_args
1243 choose_args_len = len(CRUSHMap.get_default_choose_args(ms.crush_dump))
1244 if not have_choose_args or choose_args_len != len(ms.crush_dump['buckets']):
1245 # enable compat weight-set first
1246 self.log.debug('no choose_args or all buckets do not have weight-sets')
1247 self.log.debug('ceph osd crush weight-set create-compat')
1248 result = CommandResult('')
1249 self.send_command(result, 'mon', '', json.dumps({
1250 'prefix': 'osd crush weight-set create-compat',
1251 'format': 'json',
1252 }), '')
1253 r, outb, outs = result.wait()
1254 if r != 0:
1255 self.log.error('Error creating compat weight-set')
1256 return
1257
1258 result = CommandResult('')
1259 self.send_command(result, 'mon', '', json.dumps({
1260 'prefix': 'osd crush dump',
1261 'format': 'json',
1262 }), '')
1263 r, outb, outs = result.wait()
1264 if r != 0:
1265 self.log.error('Error dumping crush map')
1266 return
1267 try:
1268 crushmap = json.loads(outb)
1269 except json.JSONDecodeError:
1270 raise RuntimeError('unable to parse crush map')
1271 else:
1272 crushmap = ms.crush_dump
1273
1274 raw = CRUSHMap.get_default_choose_args(crushmap)
1275 weight_set = {}
1276 for b in raw:
1277 bucket = None
1278 for t in crushmap['buckets']:
1279 if t['id'] == b['bucket_id']:
1280 bucket = t
1281 break
1282 if not bucket:
1283 raise RuntimeError('could not find bucket %s' % b['bucket_id'])
1284 self.log.debug('bucket items %s' % bucket['items'])
1285 self.log.debug('weight set %s' % b['weight_set'][0])
1286 if len(bucket['items']) != len(b['weight_set'][0]):
1287 raise RuntimeError('weight-set size does not match bucket items')
1288 for pos in range(len(bucket['items'])):
1289 weight_set[bucket['items'][pos]['id']] = b['weight_set'][0][pos]
1290
1291 self.log.debug('weight_set weights %s' % weight_set)
1292 return weight_set
1293
1294 def do_crush(self) -> None:
1295 self.log.info('do_crush (not yet implemented)')
1296
1297 def do_osd_weight(self) -> None:
1298 self.log.info('do_osd_weight (not yet implemented)')
1299
1300 def execute(self, plan: Plan) -> Tuple[int, str]:
1301 self.log.info('Executing plan %s' % plan.name)
1302
1303 commands = []
1304
1305 # compat weight-set
1306 if len(plan.compat_ws):
1307 ms_plan = cast(MsPlan, plan)
1308 if not CRUSHMap.have_default_choose_args(ms_plan.initial.crush_dump):
1309 self.log.debug('ceph osd crush weight-set create-compat')
1310 result = CommandResult('')
1311 self.send_command(result, 'mon', '', json.dumps({
1312 'prefix': 'osd crush weight-set create-compat',
1313 'format': 'json',
1314 }), '')
1315 r, outb, outs = result.wait()
1316 if r != 0:
1317 self.log.error('Error creating compat weight-set')
1318 return r, outs
1319
1320 for osd, weight in plan.compat_ws.items():
1321 self.log.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1322 osd, weight)
1323 result = CommandResult('')
1324 self.send_command(result, 'mon', '', json.dumps({
1325 'prefix': 'osd crush weight-set reweight-compat',
1326 'format': 'json',
1327 'item': 'osd.%d' % osd,
1328 'weight': [weight],
1329 }), '')
1330 commands.append(result)
1331
1332 # new_weight
1333 reweightn = {}
1334 for osd, weight in plan.osd_weights.items():
1335 reweightn[str(osd)] = str(int(weight * float(0x10000)))
1336 if len(reweightn):
1337 self.log.info('ceph osd reweightn %s', reweightn)
1338 result = CommandResult('')
1339 self.send_command(result, 'mon', '', json.dumps({
1340 'prefix': 'osd reweightn',
1341 'format': 'json',
1342 'weights': json.dumps(reweightn),
1343 }), '')
1344 commands.append(result)
1345
1346 # upmap
1347 incdump = plan.inc.dump()
1348 for item in incdump.get('new_pg_upmap', []):
1349 self.log.info('ceph osd pg-upmap %s mappings %s', item['pgid'],
1350 item['osds'])
1351 result = CommandResult('foo')
1352 self.send_command(result, 'mon', '', json.dumps({
1353 'prefix': 'osd pg-upmap',
1354 'format': 'json',
1355 'pgid': item['pgid'],
1356 'id': item['osds'],
1357 }), 'foo')
1358 commands.append(result)
1359
1360 for pgid in incdump.get('old_pg_upmap', []):
1361 self.log.info('ceph osd rm-pg-upmap %s', pgid)
1362 result = CommandResult('foo')
1363 self.send_command(result, 'mon', '', json.dumps({
1364 'prefix': 'osd rm-pg-upmap',
1365 'format': 'json',
1366 'pgid': pgid,
1367 }), 'foo')
1368 commands.append(result)
1369
1370 for item in incdump.get('new_pg_upmap_items', []):
1371 self.log.info('ceph osd pg-upmap-items %s mappings %s', item['pgid'],
1372 item['mappings'])
1373 osdlist = []
1374 for m in item['mappings']:
1375 osdlist += [m['from'], m['to']]
1376 result = CommandResult('foo')
1377 self.send_command(result, 'mon', '', json.dumps({
1378 'prefix': 'osd pg-upmap-items',
1379 'format': 'json',
1380 'pgid': item['pgid'],
1381 'id': osdlist,
1382 }), 'foo')
1383 commands.append(result)
1384
1385 for pgid in incdump.get('old_pg_upmap_items', []):
1386 self.log.info('ceph osd rm-pg-upmap-items %s', pgid)
1387 result = CommandResult('foo')
1388 self.send_command(result, 'mon', '', json.dumps({
1389 'prefix': 'osd rm-pg-upmap-items',
1390 'format': 'json',
1391 'pgid': pgid,
1392 }), 'foo')
1393 commands.append(result)
1394
1395 # wait for commands
1396 self.log.debug('commands %s' % commands)
1397 for result in commands:
1398 r, outb, outs = result.wait()
1399 if r != 0:
1400 self.log.error('execute error: r = %d, detail = %s' % (r, outs))
1401 return r, outs
1402 self.log.debug('done')
1403 return 0, ''
1404
1405 def gather_telemetry(self) -> Dict[str, Any]:
1406 return {
1407 'active': self.active,
1408 'mode': self.mode,
1409 }