]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
d313bc4de848dc801a79ac9582a4dd2276210926
2 Balance PG distribution across OSDs.
12 from mgr_module
import CLIReadCommand
, CLICommand
, CommandResult
, MgrModule
, Option
, OSDMap
13 from threading
import Event
14 from typing
import cast
, Any
, Dict
, List
, Optional
, Sequence
, Tuple
, Union
15 from mgr_module
import CRUSHMap
18 TIME_FORMAT
= '%Y-%m-%d_%H:%M:%S'
22 def __init__(self
, osdmap
, raw_pg_stats
, raw_pool_stats
, desc
=''):
25 self
.osdmap_dump
= self
.osdmap
.dump()
26 self
.crush
= osdmap
.get_crush()
27 self
.crush_dump
= self
.crush
.dump()
28 self
.raw_pg_stats
= raw_pg_stats
29 self
.raw_pool_stats
= raw_pool_stats
31 i
['pgid']: i
['stat_sum'] for i
in raw_pg_stats
.get('pg_stats', [])
33 osd_poolids
= [p
['pool'] for p
in self
.osdmap_dump
.get('pools', [])]
34 pg_poolids
= [p
['poolid'] for p
in raw_pool_stats
.get('pool_stats', [])]
35 self
.poolids
= set(osd_poolids
) & set(pg_poolids
)
37 self
.pg_up_by_poolid
= {}
38 for poolid
in self
.poolids
:
39 self
.pg_up_by_poolid
[poolid
] = osdmap
.map_pool_pgs_up(poolid
)
40 for a
, b
in self
.pg_up_by_poolid
[poolid
].items():
43 def calc_misplaced_from(self
, other_ms
):
44 num
= len(other_ms
.pg_up
)
46 for pgid
, before
in other_ms
.pg_up
.items():
47 if before
!= self
.pg_up
.get(pgid
, []):
50 return float(misplaced
) / float(num
)
54 class Mode(enum
.Enum
):
56 crush_compat
= 'crush-compat'
61 def __init__(self
, name
, mode
, osdmap
, pools
):
65 self
.osdmap_dump
= osdmap
.dump()
69 self
.inc
= osdmap
.new_incremental()
72 def dump(self
) -> str:
73 return json
.dumps(self
.inc
.dump(), indent
=4, sort_keys
=True)
75 def show(self
) -> str:
81 Plan with a preloaded MappingState member.
84 def __init__(self
, name
: str, mode
: str, ms
: MappingState
, pools
: List
[str]) -> None:
85 super(MsPlan
, self
).__init
__(name
, mode
, ms
.osdmap
, pools
)
88 def final_state(self
) -> MappingState
:
89 self
.inc
.set_osd_reweights(self
.osd_weights
)
90 self
.inc
.set_crush_compat_weight_set_weights(self
.compat_ws
)
91 return MappingState(self
.initial
.osdmap
.apply_incremental(self
.inc
),
92 self
.initial
.raw_pg_stats
,
93 self
.initial
.raw_pool_stats
,
94 'plan %s final' % self
.name
)
96 def show(self
) -> str:
98 ls
.append('# starting osdmap epoch %d' % self
.initial
.osdmap
.get_epoch())
99 ls
.append('# starting crush version %d' %
100 self
.initial
.osdmap
.get_crush_version())
101 ls
.append('# mode %s' % self
.mode
)
102 if len(self
.compat_ws
) and \
103 not CRUSHMap
.have_default_choose_args(self
.initial
.crush_dump
):
104 ls
.append('ceph osd crush weight-set create-compat')
105 for osd
, weight
in self
.compat_ws
.items():
106 ls
.append('ceph osd crush weight-set reweight-compat %s %f' %
108 for osd
, weight
in self
.osd_weights
.items():
109 ls
.append('ceph osd reweight osd.%d %f' % (osd
, weight
))
110 incdump
= self
.inc
.dump()
111 for pgid
in incdump
.get('old_pg_upmap_items', []):
112 ls
.append('ceph osd rm-pg-upmap-items %s' % pgid
)
113 for item
in incdump
.get('new_pg_upmap_items', []):
115 for m
in item
['mappings']:
116 osdlist
+= [m
['from'], m
['to']]
117 ls
.append('ceph osd pg-upmap-items %s %s' %
118 (item
['pgid'], ' '.join([str(a
) for a
in osdlist
])))
123 def __init__(self
, ms
: MappingState
):
125 self
.root_ids
: Dict
[str, int] = {} # root name -> id
126 self
.pool_name
: Dict
[str, str] = {} # pool id -> pool name
127 self
.pool_id
: Dict
[str, int] = {} # pool name -> id
128 self
.pool_roots
: Dict
[str, List
[str]] = {} # pool name -> root name
129 self
.root_pools
: Dict
[str, List
[str]] = {} # root name -> pools
130 self
.target_by_root
: Dict
[str, Dict
[int, float]] = {} # root name -> target weight map
131 self
.count_by_pool
: Dict
[str, dict] = {}
132 self
.count_by_root
: Dict
[str, dict] = {}
133 self
.actual_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> actual weight map
134 self
.actual_by_root
: Dict
[str, dict] = {} # pool -> by_* -> actual weight map
135 self
.total_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> total
136 self
.total_by_root
: Dict
[str, dict] = {} # root -> by_* -> total
137 self
.stats_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> stddev or avg -> value
138 self
.stats_by_root
: Dict
[str, dict] = {} # root -> by_* -> stddev or avg -> value
140 self
.score_by_pool
: Dict
[str, float] = {}
141 self
.score_by_root
: Dict
[str, Dict
[str, float]] = {}
145 def show(self
, verbose
: bool = False) -> str:
147 r
= self
.ms
.desc
+ '\n'
148 r
+= 'target_by_root %s\n' % self
.target_by_root
149 r
+= 'actual_by_pool %s\n' % self
.actual_by_pool
150 r
+= 'actual_by_root %s\n' % self
.actual_by_root
151 r
+= 'count_by_pool %s\n' % self
.count_by_pool
152 r
+= 'count_by_root %s\n' % self
.count_by_root
153 r
+= 'total_by_pool %s\n' % self
.total_by_pool
154 r
+= 'total_by_root %s\n' % self
.total_by_root
155 r
+= 'stats_by_root %s\n' % self
.stats_by_root
156 r
+= 'score_by_pool %s\n' % self
.score_by_pool
157 r
+= 'score_by_root %s\n' % self
.score_by_root
159 r
= self
.ms
.desc
+ ' '
160 r
+= 'score %f (lower is better)\n' % self
.score
163 def calc_stats(self
, count
, target
, total
):
164 num
= max(len(target
), 1)
165 r
: Dict
[str, Dict
[str, Union
[int, float]]] = {}
166 for t
in ('pgs', 'objects', 'bytes'):
178 avg
= float(total
[t
]) / float(num
)
181 # score is a measure of how uneven the data distribution is.
182 # score lies between [0, 1), 0 means perfect distribution.
186 for k
, v
in count
[t
].items():
187 # adjust/normalize by weight
189 adjusted
= float(v
) / target
[k
] / float(num
)
193 # Overweighted devices and their weights are factors to calculate reweight_urgency.
194 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
195 # situation than one 10% overfilled with 5 2% underfilled devices
198 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
199 x = (adjusted - avg)/avg.
200 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
201 To bring range of F(x) in range [0, 1), we need to make the above modification.
203 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
204 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
205 2. A larger value of x, should imply more urgency to reweight.
206 3. Also, the difference between F(x) when x is large, should be minimal.
207 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
209 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
211 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
213 score
+= target
[k
] * (math
.erf(((adjusted
- avg
) / avg
) / math
.sqrt(2.0)))
214 sum_weight
+= target
[k
]
215 dev
+= (avg
- adjusted
) * (avg
- adjusted
)
216 stddev
= math
.sqrt(dev
/ float(max(num
- 1, 1)))
217 score
= score
/ max(sum_weight
, 1)
219 'max': max(count
[t
].values()),
220 'min': min(count
[t
].values()),
223 'sum_weight': sum_weight
,
229 class Module(MgrModule
):
231 Option(name
='active',
234 desc
='automatically balance PGs across cluster',
236 Option(name
='begin_time',
239 desc
='beginning time of day to automatically balance',
240 long_desc
='This is a time of day in the format HHMM.',
242 Option(name
='end_time',
245 desc
='ending time of day to automatically balance',
246 long_desc
='This is a time of day in the format HHMM.',
248 Option(name
='begin_weekday',
253 desc
='Restrict automatic balancing to this day of the week or later',
254 long_desc
='0 or 7 = Sunday, 1 = Monday, etc.',
256 Option(name
='end_weekday',
261 desc
='Restrict automatic balancing to days of the week earlier than this',
262 long_desc
='0 or 7 = Sunday, 1 = Monday, etc.',
264 Option(name
='crush_compat_max_iterations',
269 desc
='maximum number of iterations to attempt optimization',
271 Option(name
='crush_compat_metrics',
273 default
='pgs,objects,bytes',
274 desc
='metrics with which to calculate OSD utilization',
275 long_desc
='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
277 Option(name
='crush_compat_step',
282 desc
='aggressiveness of optimization',
283 long_desc
='.99 is very aggressive, .01 is less aggressive',
285 Option(name
='min_score',
288 desc
='minimum score, below which no optimization is attempted',
291 desc
='Balancer mode',
293 enum_allowed
=['none', 'crush-compat', 'upmap'],
295 Option(name
='sleep_interval',
298 desc
='how frequently to wake up and attempt optimization',
300 Option(name
='upmap_max_optimizations',
303 desc
='maximum upmap optimizations to make per attempt',
305 Option(name
='upmap_max_deviation',
309 desc
='deviation below which no optimization is attempted',
310 long_desc
='If the number of PGs are within this count then no optimization is attempted',
312 Option(name
='pool_ids',
315 desc
='pools which the automatic balancing will be limited to',
321 plans
: Dict
[str, Plan
] = {}
324 last_optimize_started
= ''
325 last_optimize_duration
= ''
327 success_string
= 'Optimization plan created successfully'
328 in_progress_string
= 'in progress'
330 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
331 super(Module
, self
).__init
__(*args
, **kwargs
)
334 @CLIReadCommand('balancer status')
335 def show_status(self
) -> Tuple
[int, str, str]:
340 'plans': list(self
.plans
.keys()),
341 'active': self
.active
,
342 'last_optimize_started': self
.last_optimize_started
,
343 'last_optimize_duration': self
.last_optimize_duration
,
344 'optimize_result': self
.optimize_result
,
345 'mode': self
.get_module_option('mode'),
347 return (0, json
.dumps(s
, indent
=4, sort_keys
=True), '')
349 @CLICommand('balancer mode')
350 def set_mode(self
, mode
: Mode
) -> Tuple
[int, str, str]:
354 if mode
== Mode
.upmap
:
355 min_compat_client
= self
.get_osdmap().dump().get('require_min_compat_client', '')
356 if min_compat_client
< 'luminous': # works well because version is alphabetized..
357 warn
= ('min_compat_client "%s" '
358 '< "luminous", which is required for pg-upmap. '
359 'Try "ceph osd set-require-min-compat-client luminous" '
360 'before enabling this mode' % min_compat_client
)
361 return (-errno
.EPERM
, '', warn
)
362 elif mode
== Mode
.crush_compat
:
363 ms
= MappingState(self
.get_osdmap(),
364 self
.get("pg_stats"),
365 self
.get("pool_stats"),
366 'initialize compat weight-set')
367 self
.get_compat_weight_set_weights(ms
) # ignore error
368 self
.set_module_option('mode', mode
.value
)
371 @CLICommand('balancer on')
372 def on(self
) -> Tuple
[int, str, str]:
374 Enable automatic balancing
377 self
.set_module_option('active', 'true')
382 @CLICommand('balancer off')
383 def off(self
) -> Tuple
[int, str, str]:
385 Disable automatic balancing
388 self
.set_module_option('active', 'false')
393 @CLIReadCommand('balancer pool ls')
394 def pool_ls(self
) -> Tuple
[int, str, str]:
396 List automatic balancing pools
398 Note that empty list means all existing pools will be automatic balancing targets,
399 which is the default behaviour of balancer.
401 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
404 pool_ids
= [int(p
) for p
in pool_ids
.split(',')]
405 pool_name_by_id
= dict((p
['pool'], p
['pool_name'])
406 for p
in self
.get_osdmap().dump().get('pools', []))
408 final_ids
: List
[int] = []
411 if p
in pool_name_by_id
:
413 final_names
.append(pool_name_by_id
[p
])
416 if should_prune
: # some pools were gone, prune
417 self
.set_module_option('pool_ids', ','.join(str(p
) for p
in final_ids
))
418 return (0, json
.dumps(sorted(final_names
), indent
=4, sort_keys
=True), '')
420 @CLICommand('balancer pool add')
421 def pool_add(self
, pools
: Sequence
[str]) -> Tuple
[int, str, str]:
423 Enable automatic balancing for specific pools
426 pool_id_by_name
= dict((p
['pool_name'], p
['pool'])
427 for p
in self
.get_osdmap().dump().get('pools', []))
428 invalid_names
= [p
for p
in raw_names
if p
not in pool_id_by_name
]
430 return (-errno
.EINVAL
, '', 'pool(s) %s not found' % invalid_names
)
431 to_add
= set(str(pool_id_by_name
[p
]) for p
in raw_names
if p
in pool_id_by_name
)
432 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
433 existing
= set(pool_ids
.split(',') if pool_ids
else [])
434 final
= to_add | existing
435 self
.set_module_option('pool_ids', ','.join(final
))
438 @CLICommand('balancer pool rm')
439 def pool_rm(self
, pools
: Sequence
[str]) -> Tuple
[int, str, str]:
441 Disable automatic balancing for specific pools
444 existing
= cast(str, self
.get_module_option('pool_ids'))
445 if existing
== '': # for idempotence
447 existing
= existing
.split(',')
448 osdmap
= self
.get_osdmap()
449 pool_ids
= [str(p
['pool']) for p
in osdmap
.dump().get('pools', [])]
450 pool_id_by_name
= dict((p
['pool_name'], p
['pool']) for p
in osdmap
.dump().get('pools', []))
451 final
= [p
for p
in existing
if p
in pool_ids
]
452 to_delete
= [str(pool_id_by_name
[p
]) for p
in raw_names
if p
in pool_id_by_name
]
453 final
= set(final
) - set(to_delete
)
454 self
.set_module_option('pool_ids', ','.join(final
))
457 def _state_from_option(self
, option
: Optional
[str] = None) -> Tuple
[MappingState
, List
[str]]:
460 ms
= MappingState(self
.get_osdmap(),
461 self
.get("pg_stats"),
462 self
.get("pool_stats"),
464 elif option
in self
.plans
:
465 plan
= self
.plans
.get(option
)
468 if plan
.mode
== 'upmap':
469 # Note that for upmap, to improve the efficiency,
470 # we use a basic version of Plan without keeping the obvious
471 # *redundant* MS member.
472 # Hence ms might not be accurate here since we are basically
473 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
474 # It should not be a big deal though..
475 ms
= MappingState(plan
.osdmap
,
476 self
.get("pg_stats"),
477 self
.get("pool_stats"),
478 f
'plan "{plan.name}"')
480 ms
= cast(MsPlan
, plan
).final_state()
482 # not a plan, does it look like a pool?
483 osdmap
= self
.get_osdmap()
484 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
485 if option
not in valid_pool_names
:
486 raise ValueError(f
'option "{option}" not a plan or a pool')
488 ms
= MappingState(osdmap
,
489 self
.get("pg_stats"),
490 self
.get("pool_stats"),
494 @CLIReadCommand('balancer eval-verbose')
495 def plan_eval_verbose(self
, option
: Optional
[str] = None):
497 Evaluate data distribution for the current cluster or specific pool or specific
501 ms
, pools
= self
._state
_from
_option
(option
)
502 return (0, self
.evaluate(ms
, pools
, verbose
=True), '')
503 except ValueError as e
:
504 return (-errno
.EINVAL
, '', str(e
))
506 @CLIReadCommand('balancer eval')
507 def plan_eval_brief(self
, option
: Optional
[str] = None):
509 Evaluate data distribution for the current cluster or specific pool or specific plan
512 ms
, pools
= self
._state
_from
_option
(option
)
513 return (0, self
.evaluate(ms
, pools
, verbose
=False), '')
514 except ValueError as e
:
515 return (-errno
.EINVAL
, '', str(e
))
517 @CLIReadCommand('balancer optimize')
518 def plan_optimize(self
, plan
: str, pools
: List
[str] = []) -> Tuple
[int, str, str]:
520 Run optimizer to create a new plan
522 # The GIL can be release by the active balancer, so disallow when active
524 return (-errno
.EINVAL
, '', 'Balancer enabled, disable to optimize manually')
526 return (-errno
.EINVAL
, '', 'Balancer finishing up....try again')
527 osdmap
= self
.get_osdmap()
528 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
529 invalid_pool_names
= []
531 if p
not in valid_pool_names
:
532 invalid_pool_names
.append(p
)
533 if len(invalid_pool_names
):
534 return (-errno
.EINVAL
, '', 'pools %s not found' % invalid_pool_names
)
535 plan_
= self
.plan_create(plan
, osdmap
, pools
)
536 self
.last_optimize_started
= time
.asctime(time
.localtime())
537 self
.optimize_result
= self
.in_progress_string
539 r
, detail
= self
.optimize(plan_
)
541 self
.last_optimize_duration
= str(datetime
.timedelta(seconds
=(end
- start
)))
543 # Add plan if an optimization was created
544 self
.optimize_result
= self
.success_string
545 self
.plans
[plan
] = plan_
547 self
.optimize_result
= detail
548 return (r
, '', detail
)
550 @CLIReadCommand('balancer show')
551 def plan_show(self
, plan
: str) -> Tuple
[int, str, str]:
553 Show details of an optimization plan
555 plan_
= self
.plans
.get(plan
)
557 return (-errno
.ENOENT
, '', f
'plan {plan} not found')
558 return (0, plan_
.show(), '')
560 @CLICommand('balancer rm')
561 def plan_rm(self
, plan
: str) -> Tuple
[int, str, str]:
563 Discard an optimization plan
565 if plan
in self
.plans
:
569 @CLICommand('balancer reset')
570 def plan_reset(self
) -> Tuple
[int, str, str]:
572 Discard all optimization plans
577 @CLIReadCommand('balancer dump')
578 def plan_dump(self
, plan
: str) -> Tuple
[int, str, str]:
580 Show an optimization plan
582 plan_
= self
.plans
.get(plan
)
584 return -errno
.ENOENT
, '', f
'plan {plan} not found'
586 return (0, plan_
.dump(), '')
588 @CLIReadCommand('balancer ls')
589 def plan_ls(self
) -> Tuple
[int, str, str]:
593 return (0, json
.dumps([p
for p
in self
.plans
], indent
=4, sort_keys
=True), '')
595 @CLIReadCommand('balancer execute')
596 def plan_execute(self
, plan
: str) -> Tuple
[int, str, str]:
598 Execute an optimization plan
600 # The GIL can be release by the active balancer, so disallow when active
602 return (-errno
.EINVAL
, '', 'Balancer enabled, disable to execute a plan')
604 return (-errno
.EINVAL
, '', 'Balancer finishing up....try again')
605 plan_
= self
.plans
.get(plan
)
607 return (-errno
.ENOENT
, '', f
'plan {plan} not found')
608 r
, detail
= self
.execute(plan_
)
610 return (r
, '', detail
)
612 def shutdown(self
) -> None:
613 self
.log
.info('Stopping')
617 def time_permit(self
) -> bool:
618 local_time
= time
.localtime()
619 time_of_day
= time
.strftime('%H%M', local_time
)
620 weekday
= (local_time
.tm_wday
+ 1) % 7 # be compatible with C
623 begin_time
= cast(str, self
.get_module_option('begin_time'))
624 end_time
= cast(str, self
.get_module_option('end_time'))
625 if begin_time
<= end_time
:
626 permit
= begin_time
<= time_of_day
< end_time
628 permit
= time_of_day
>= begin_time
or time_of_day
< end_time
630 self
.log
.debug("should run between %s - %s, now %s, skipping",
631 begin_time
, end_time
, time_of_day
)
634 begin_weekday
= cast(int, self
.get_module_option('begin_weekday'))
635 end_weekday
= cast(int, self
.get_module_option('end_weekday'))
636 if begin_weekday
<= end_weekday
:
637 permit
= begin_weekday
<= weekday
< end_weekday
639 permit
= weekday
>= begin_weekday
or weekday
< end_weekday
641 self
.log
.debug("should run between weekday %d - %d, now %d, skipping",
642 begin_weekday
, end_weekday
, weekday
)
647 def serve(self
) -> None:
648 self
.log
.info('Starting')
650 self
.active
= cast(bool, self
.get_module_option('active'))
651 sleep_interval
= cast(float, self
.get_module_option('sleep_interval'))
652 self
.log
.debug('Waking up [%s, now %s]',
653 "active" if self
.active
else "inactive",
654 time
.strftime(TIME_FORMAT
, time
.localtime()))
655 if self
.active
and self
.time_permit():
656 self
.log
.debug('Running')
657 name
= 'auto_%s' % time
.strftime(TIME_FORMAT
, time
.gmtime())
658 osdmap
= self
.get_osdmap()
659 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
661 allow
= [int(p
) for p
in pool_ids
.split(',')]
664 final
: List
[str] = []
666 pools
= osdmap
.dump().get('pools', [])
667 valid
= [p
['pool'] for p
in pools
]
668 ids
= set(allow
) & set(valid
)
669 if set(allow
) - set(valid
): # some pools were gone, prune
670 self
.set_module_option('pool_ids', ','.join(str(p
) for p
in ids
))
671 pool_name_by_id
= dict((p
['pool'], p
['pool_name']) for p
in pools
)
672 final
= [pool_name_by_id
[p
] for p
in ids
if p
in pool_name_by_id
]
673 plan
= self
.plan_create(name
, osdmap
, final
)
674 self
.optimizing
= True
675 self
.last_optimize_started
= time
.asctime(time
.localtime())
676 self
.optimize_result
= self
.in_progress_string
678 r
, detail
= self
.optimize(plan
)
680 self
.last_optimize_duration
= str(datetime
.timedelta(seconds
=(end
- start
)))
682 self
.optimize_result
= self
.success_string
685 self
.optimize_result
= detail
686 self
.optimizing
= False
687 self
.log
.debug('Sleeping for %d', sleep_interval
)
688 self
.event
.wait(sleep_interval
)
691 def plan_create(self
, name
: str, osdmap
: OSDMap
, pools
: List
[str]) -> Plan
:
692 mode
= cast(str, self
.get_module_option('mode'))
694 # drop unnecessary MS member for upmap mode.
695 # this way we could effectively eliminate the usage of a
696 # complete pg_stats, which can become horribly inefficient
698 plan
= Plan(name
, mode
, osdmap
, pools
)
703 self
.get("pg_stats"),
704 self
.get("pool_stats"),
705 'plan %s initial' % name
),
709 def calc_eval(self
, ms
: MappingState
, pools
: List
[str]) -> Eval
:
713 for p
in ms
.osdmap_dump
.get('pools', []):
714 if len(pools
) and p
['pool_name'] not in pools
:
716 # skip dead or not-yet-ready pools too
717 if p
['pool'] not in ms
.poolids
:
719 pe
.pool_name
[p
['pool']] = p
['pool_name']
720 pe
.pool_id
[p
['pool_name']] = p
['pool']
721 pool_rule
[p
['pool_name']] = p
['crush_rule']
722 pe
.pool_roots
[p
['pool_name']] = []
723 pool_info
[p
['pool_name']] = p
724 if len(pool_info
) == 0:
726 self
.log
.debug('pool_name %s' % pe
.pool_name
)
727 self
.log
.debug('pool_id %s' % pe
.pool_id
)
728 self
.log
.debug('pools %s' % pools
)
729 self
.log
.debug('pool_rule %s' % pool_rule
)
731 osd_weight
= {a
['osd']: a
['weight']
732 for a
in ms
.osdmap_dump
.get('osds', []) if a
['weight'] > 0}
734 # get expected distributions by root
735 actual_by_root
: Dict
[str, Dict
[str, dict]] = {}
736 rootids
= ms
.crush
.find_takes()
738 for rootid
in rootids
:
739 ls
= ms
.osdmap
.get_pools_by_take(rootid
)
741 # find out roots associating with pools we are passed in
743 if candidate
in pe
.pool_name
:
744 want
.append(candidate
)
747 root
= ms
.crush
.get_item_name(rootid
)
748 pe
.root_pools
[root
] = []
750 pe
.pool_roots
[pe
.pool_name
[poolid
]].append(root
)
751 pe
.root_pools
[root
].append(pe
.pool_name
[poolid
])
752 pe
.root_ids
[root
] = rootid
754 weight_map
= ms
.crush
.get_take_weight_osd_map(rootid
)
756 osd
: cw
* osd_weight
[osd
]
757 for osd
, cw
in weight_map
.items() if osd
in osd_weight
and cw
> 0
759 sum_w
= sum(adjusted_map
.values())
760 assert len(adjusted_map
) == 0 or sum_w
> 0
761 pe
.target_by_root
[root
] = {osd
: w
/ sum_w
762 for osd
, w
in adjusted_map
.items()}
763 actual_by_root
[root
] = {
768 for osd
in pe
.target_by_root
[root
]:
769 actual_by_root
[root
]['pgs'][osd
] = 0
770 actual_by_root
[root
]['objects'][osd
] = 0
771 actual_by_root
[root
]['bytes'][osd
] = 0
772 pe
.total_by_root
[root
] = {
777 self
.log
.debug('pool_roots %s' % pe
.pool_roots
)
778 self
.log
.debug('root_pools %s' % pe
.root_pools
)
779 self
.log
.debug('target_by_root %s' % pe
.target_by_root
)
781 # pool and root actual
782 for pool
, pi
in pool_info
.items():
784 pm
= ms
.pg_up_by_poolid
[poolid
]
791 for pgid
, up
in pm
.items():
792 for osd
in [int(osd
) for osd
in up
]:
793 if osd
== CRUSHMap
.ITEM_NONE
:
795 if osd
not in pgs_by_osd
:
797 objects_by_osd
[osd
] = 0
798 bytes_by_osd
[osd
] = 0
800 objects_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_objects']
801 bytes_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_bytes']
802 # pick a root to associate this pg instance with.
803 # note that this is imprecise if the roots have
804 # overlapping children.
805 # FIXME: divide bytes by k for EC pools.
806 for root
in pe
.pool_roots
[pool
]:
807 if osd
in pe
.target_by_root
[root
]:
808 actual_by_root
[root
]['pgs'][osd
] += 1
809 actual_by_root
[root
]['objects'][osd
] += ms
.pg_stat
[pgid
]['num_objects']
810 actual_by_root
[root
]['bytes'][osd
] += ms
.pg_stat
[pgid
]['num_bytes']
812 objects
+= ms
.pg_stat
[pgid
]['num_objects']
813 bytes
+= ms
.pg_stat
[pgid
]['num_bytes']
814 pe
.total_by_root
[root
]['pgs'] += 1
815 pe
.total_by_root
[root
]['objects'] += ms
.pg_stat
[pgid
]['num_objects']
816 pe
.total_by_root
[root
]['bytes'] += ms
.pg_stat
[pgid
]['num_bytes']
818 pe
.count_by_pool
[pool
] = {
821 for k
, v
in pgs_by_osd
.items()
825 for k
, v
in objects_by_osd
.items()
829 for k
, v
in bytes_by_osd
.items()
832 pe
.actual_by_pool
[pool
] = {
834 k
: float(v
) / float(max(pgs
, 1))
835 for k
, v
in pgs_by_osd
.items()
838 k
: float(v
) / float(max(objects
, 1))
839 for k
, v
in objects_by_osd
.items()
842 k
: float(v
) / float(max(bytes
, 1))
843 for k
, v
in bytes_by_osd
.items()
846 pe
.total_by_pool
[pool
] = {
851 for root
in pe
.total_by_root
:
852 pe
.count_by_root
[root
] = {
855 for k
, v
in actual_by_root
[root
]['pgs'].items()
859 for k
, v
in actual_by_root
[root
]['objects'].items()
863 for k
, v
in actual_by_root
[root
]['bytes'].items()
866 pe
.actual_by_root
[root
] = {
868 k
: float(v
) / float(max(pe
.total_by_root
[root
]['pgs'], 1))
869 for k
, v
in actual_by_root
[root
]['pgs'].items()
872 k
: float(v
) / float(max(pe
.total_by_root
[root
]['objects'], 1))
873 for k
, v
in actual_by_root
[root
]['objects'].items()
876 k
: float(v
) / float(max(pe
.total_by_root
[root
]['bytes'], 1))
877 for k
, v
in actual_by_root
[root
]['bytes'].items()
880 self
.log
.debug('actual_by_pool %s' % pe
.actual_by_pool
)
881 self
.log
.debug('actual_by_root %s' % pe
.actual_by_root
)
883 # average and stddev and score
887 pe
.target_by_root
[a
],
889 ) for a
, b
in pe
.count_by_root
.items()
891 self
.log
.debug('stats_by_root %s' % pe
.stats_by_root
)
893 # the scores are already normalized
896 'pgs': pe
.stats_by_root
[r
]['pgs']['score'],
897 'objects': pe
.stats_by_root
[r
]['objects']['score'],
898 'bytes': pe
.stats_by_root
[r
]['bytes']['score'],
899 } for r
in pe
.total_by_root
.keys()
901 self
.log
.debug('score_by_root %s' % pe
.score_by_root
)
903 # get the list of score metrics, comma separated
904 metrics
= cast(str, self
.get_module_option('crush_compat_metrics')).split(',')
906 # total score is just average of normalized stddevs
908 for r
, vs
in pe
.score_by_root
.items():
909 for k
, v
in vs
.items():
912 pe
.score
/= len(metrics
) * len(roots
)
915 def evaluate(self
, ms
: MappingState
, pools
: List
[str], verbose
: bool = False) -> str:
916 pe
= self
.calc_eval(ms
, pools
)
917 return pe
.show(verbose
=verbose
)
919 def optimize(self
, plan
: Plan
) -> Tuple
[int, str]:
920 self
.log
.info('Optimize plan %s' % plan
.name
)
921 max_misplaced
= cast(float, self
.get_ceph_option('target_max_misplaced_ratio'))
922 self
.log
.info('Mode %s, max misplaced %f' %
923 (plan
.mode
, max_misplaced
))
925 info
= self
.get('pg_status')
926 unknown
= info
.get('unknown_pgs_ratio', 0.0)
927 degraded
= info
.get('degraded_ratio', 0.0)
928 inactive
= info
.get('inactive_pgs_ratio', 0.0)
929 misplaced
= info
.get('misplaced_ratio', 0.0)
930 plan
.pg_status
= info
931 self
.log
.debug('unknown %f degraded %f inactive %f misplaced %g',
932 unknown
, degraded
, inactive
, misplaced
)
934 detail
= 'Some PGs (%f) are unknown; try again later' % unknown
935 self
.log
.info(detail
)
936 return -errno
.EAGAIN
, detail
938 detail
= 'Some objects (%f) are degraded; try again later' % degraded
939 self
.log
.info(detail
)
940 return -errno
.EAGAIN
, detail
942 detail
= 'Some PGs (%f) are inactive; try again later' % inactive
943 self
.log
.info(detail
)
944 return -errno
.EAGAIN
, detail
945 elif misplaced
>= max_misplaced
:
946 detail
= 'Too many objects (%f > %f) are misplaced; ' \
947 'try again later' % (misplaced
, max_misplaced
)
948 self
.log
.info(detail
)
949 return -errno
.EAGAIN
, detail
951 if plan
.mode
== 'upmap':
952 return self
.do_upmap(plan
)
953 elif plan
.mode
== 'crush-compat':
954 return self
.do_crush_compat(cast(MsPlan
, plan
))
955 elif plan
.mode
== 'none':
956 detail
= 'Please do "ceph balancer mode" to choose a valid mode first'
957 self
.log
.info('Idle')
958 return -errno
.ENOEXEC
, detail
960 detail
= 'Unrecognized mode %s' % plan
.mode
961 self
.log
.info(detail
)
962 return -errno
.EINVAL
, detail
964 def do_upmap(self
, plan
: Plan
) -> Tuple
[int, str]:
965 self
.log
.info('do_upmap')
966 max_optimizations
= cast(float, self
.get_module_option('upmap_max_optimizations'))
967 max_deviation
= cast(int, self
.get_module_option('upmap_max_deviation'))
968 osdmap_dump
= plan
.osdmap_dump
973 pools
= [str(i
['pool_name']) for i
in osdmap_dump
.get('pools', [])]
975 detail
= 'No pools available'
976 self
.log
.info(detail
)
977 return -errno
.ENOENT
, detail
978 # shuffle pool list so they all get equal (in)attention
979 random
.shuffle(pools
)
980 self
.log
.info('pools %s' % pools
)
985 left
= max_optimizations
986 pools_with_pg_merge
= [p
['pool_name'] for p
in osdmap_dump
.get('pools', [])
987 if p
['pg_num'] > p
['pg_num_target']]
988 crush_rule_by_pool_name
= dict((p
['pool_name'], p
['crush_rule'])
989 for p
in osdmap_dump
.get('pools', []))
991 if pool
not in crush_rule_by_pool_name
:
992 self
.log
.info('pool %s does not exist' % pool
)
994 if pool
in pools_with_pg_merge
:
995 self
.log
.info('pool %s has pending PG(s) for merging, skipping for now' % pool
)
997 adjusted_pools
.append(pool
)
998 # shuffle so all pools get equal (in)attention
999 random
.shuffle(adjusted_pools
)
1000 pool_dump
= osdmap_dump
.get('pools', [])
1001 for pool
in adjusted_pools
:
1003 if p
['pool_name'] == pool
:
1007 # note that here we deliberately exclude any scrubbing pgs too
1008 # since scrubbing activities have significant impacts on performance
1009 num_pg_active_clean
= 0
1010 for p
in plan
.pg_status
.get('pgs_by_pool_state', []):
1011 pgs_pool_id
= p
['pool_id']
1012 if pgs_pool_id
!= pool_id
:
1014 for s
in p
['pg_state_counts']:
1015 if s
['state_name'] == 'active+clean':
1016 num_pg_active_clean
+= s
['count']
1018 available
= min(left
, num_pg_active_clean
)
1019 did
= plan
.osdmap
.calc_pg_upmaps(inc
, max_deviation
, available
, [pool
])
1024 self
.log
.info('prepared %d/%d changes' % (total_did
, max_optimizations
))
1026 return -errno
.EALREADY
, 'Unable to find further optimization, ' \
1027 'or pool(s) pg_num is decreasing, ' \
1028 'or distribution is already perfect'
1031 def do_crush_compat(self
, plan
: MsPlan
) -> Tuple
[int, str]:
1032 self
.log
.info('do_crush_compat')
1033 max_iterations
= cast(int, self
.get_module_option('crush_compat_max_iterations'))
1034 if max_iterations
< 1:
1035 return -errno
.EINVAL
, '"crush_compat_max_iterations" must be >= 1'
1036 step
= cast(float, self
.get_module_option('crush_compat_step'))
1037 if step
<= 0 or step
>= 1.0:
1038 return -errno
.EINVAL
, '"crush_compat_step" must be in (0, 1)'
1039 max_misplaced
= cast(float, self
.get_ceph_option('target_max_misplaced_ratio'))
1044 crush
= osdmap
.get_crush()
1045 pe
= self
.calc_eval(ms
, plan
.pools
)
1046 min_score_to_optimize
= cast(float, self
.get_module_option('min_score'))
1047 if pe
.score
<= min_score_to_optimize
:
1049 detail
= 'Distribution is already perfect'
1051 detail
= 'score %f <= min_score %f, will not optimize' \
1052 % (pe
.score
, min_score_to_optimize
)
1053 self
.log
.info(detail
)
1054 return -errno
.EALREADY
, detail
1056 # get current osd reweights
1057 orig_osd_weight
= {a
['osd']: a
['weight']
1058 for a
in ms
.osdmap_dump
.get('osds', [])}
1060 # get current compat weight-set weights
1061 orig_ws
= self
.get_compat_weight_set_weights(ms
)
1063 return -errno
.EAGAIN
, 'compat weight-set not available'
1064 orig_ws
= {a
: b
for a
, b
in orig_ws
.items() if a
>= 0}
1066 # Make sure roots don't overlap their devices. If so, we
1068 roots
= list(pe
.target_by_root
.keys())
1069 self
.log
.debug('roots %s', roots
)
1070 visited
: Dict
[int, str] = {}
1071 overlap
: Dict
[int, List
[str]] = {}
1072 for root
, wm
in pe
.target_by_root
.items():
1075 if osd
not in overlap
:
1076 overlap
[osd
] = [visited
[osd
]]
1077 overlap
[osd
].append(root
)
1079 if len(overlap
) > 0:
1080 detail
= 'Some osds belong to multiple subtrees: %s' % \
1082 self
.log
.error(detail
)
1083 return -errno
.EOPNOTSUPP
, detail
1085 # rebalance by pgs, objects, or bytes
1086 metrics
= cast(str, self
.get_module_option('crush_compat_metrics')).split(',')
1087 key
= metrics
[0] # balancing using the first score metric
1088 if key
not in ['pgs', 'bytes', 'objects']:
1089 self
.log
.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key
)
1093 best_ws
= copy
.deepcopy(orig_ws
)
1094 best_ow
= copy
.deepcopy(orig_osd_weight
)
1096 left
= max_iterations
1098 next_ws
= copy
.deepcopy(best_ws
)
1099 next_ow
= copy
.deepcopy(best_ow
)
1102 self
.log
.debug('best_ws %s' % best_ws
)
1103 random
.shuffle(roots
)
1105 pools
= best_pe
.root_pools
[root
]
1106 osds
= len(best_pe
.target_by_root
[root
])
1107 min_pgs
= osds
* min_pg_per_osd
1108 if best_pe
.total_by_root
[root
][key
] < min_pgs
:
1109 self
.log
.info('Skipping root %s (pools %s), total pgs %d '
1110 '< minimum %d (%d per osd)',
1112 best_pe
.total_by_root
[root
][key
],
1113 min_pgs
, min_pg_per_osd
)
1115 self
.log
.info('Balancing root %s (pools %s) by %s' %
1117 target
= best_pe
.target_by_root
[root
]
1118 actual
= best_pe
.actual_by_root
[root
][key
]
1119 queue
= sorted(actual
.keys(),
1120 key
=lambda osd
: -abs(target
[osd
] - actual
[osd
]))
1122 if orig_osd_weight
[osd
] == 0:
1123 self
.log
.debug('skipping out osd.%d', osd
)
1125 deviation
= target
[osd
] - actual
[osd
]
1128 self
.log
.debug('osd.%d deviation %f', osd
, deviation
)
1129 weight
= best_ws
[osd
]
1130 ow
= orig_osd_weight
[osd
]
1132 calc_weight
= target
[osd
] / actual
[osd
] * weight
* ow
1134 # for newly created osds, reset calc_weight at target value
1135 # this way weight-set will end up absorbing *step* of its
1136 # target (final) value at the very beginning and slowly catch up later.
1137 # note that if this turns out causing too many misplaced
1138 # pgs, then we'll reduce step and retry
1139 calc_weight
= target
[osd
]
1140 new_weight
= weight
* (1.0 - step
) + calc_weight
* step
1141 self
.log
.debug('Reweight osd.%d %f -> %f', osd
, weight
,
1143 next_ws
[osd
] = new_weight
1145 new_ow
= min(1.0, max(step
+ (1.0 - step
) * ow
,
1147 self
.log
.debug('Reweight osd.%d reweight %f -> %f',
1149 next_ow
[osd
] = new_ow
1151 # normalize weights under this root
1152 root_weight
= crush
.get_item_weight(pe
.root_ids
[root
])
1153 root_sum
= sum(b
for a
, b
in next_ws
.items()
1154 if a
in target
.keys())
1155 if root_sum
> 0 and root_weight
> 0:
1156 factor
= root_sum
/ root_weight
1157 self
.log
.debug('normalizing root %s %d, weight %f, '
1158 'ws sum %f, factor %f',
1159 root
, pe
.root_ids
[root
], root_weight
,
1161 for osd
in actual
.keys():
1162 next_ws
[osd
] = next_ws
[osd
] / factor
1165 plan
.compat_ws
= copy
.deepcopy(next_ws
)
1166 next_ms
= plan
.final_state()
1167 next_pe
= self
.calc_eval(next_ms
, plan
.pools
)
1168 next_misplaced
= next_ms
.calc_misplaced_from(ms
)
1169 self
.log
.debug('Step result score %f -> %f, misplacing %f',
1170 best_pe
.score
, next_pe
.score
, next_misplaced
)
1172 if next_misplaced
> max_misplaced
:
1173 if best_pe
.score
< pe
.score
:
1174 self
.log
.debug('Step misplaced %f > max %f, stopping',
1175 next_misplaced
, max_misplaced
)
1178 next_ws
= copy
.deepcopy(best_ws
)
1179 next_ow
= copy
.deepcopy(best_ow
)
1180 self
.log
.debug('Step misplaced %f > max %f, reducing step to %f',
1181 next_misplaced
, max_misplaced
, step
)
1183 if next_pe
.score
> best_pe
.score
* 1.0001:
1185 if bad_steps
< 5 and random
.randint(0, 100) < 70:
1186 self
.log
.debug('Score got worse, taking another step')
1189 next_ws
= copy
.deepcopy(best_ws
)
1190 next_ow
= copy
.deepcopy(best_ow
)
1191 self
.log
.debug('Score got worse, trying smaller step %f',
1196 best_ws
= copy
.deepcopy(next_ws
)
1197 best_ow
= copy
.deepcopy(next_ow
)
1198 if best_pe
.score
== 0:
1202 # allow a small regression if we are phasing out osd weights
1204 if best_ow
!= orig_osd_weight
:
1207 if best_pe
.score
< pe
.score
+ fudge
:
1208 self
.log
.info('Success, score %f -> %f', pe
.score
, best_pe
.score
)
1209 plan
.compat_ws
= best_ws
1210 for osd
, w
in best_ow
.items():
1211 if w
!= orig_osd_weight
[osd
]:
1212 self
.log
.debug('osd.%d reweight %f', osd
, w
)
1213 plan
.osd_weights
[osd
] = w
1216 self
.log
.info('Failed to find further optimization, score %f',
1219 return -errno
.EDOM
, 'Unable to find further optimization, ' \
1220 'change balancer mode and retry might help'
1222 def get_compat_weight_set_weights(self
, ms
: MappingState
):
1223 have_choose_args
= CRUSHMap
.have_default_choose_args(ms
.crush_dump
)
1224 if have_choose_args
:
1225 # get number of buckets in choose_args
1226 choose_args_len
= len(CRUSHMap
.get_default_choose_args(ms
.crush_dump
))
1227 if not have_choose_args
or choose_args_len
!= len(ms
.crush_dump
['buckets']):
1228 # enable compat weight-set first
1229 self
.log
.debug('no choose_args or all buckets do not have weight-sets')
1230 self
.log
.debug('ceph osd crush weight-set create-compat')
1231 result
= CommandResult('')
1232 self
.send_command(result
, 'mon', '', json
.dumps({
1233 'prefix': 'osd crush weight-set create-compat',
1236 r
, outb
, outs
= result
.wait()
1238 self
.log
.error('Error creating compat weight-set')
1241 result
= CommandResult('')
1242 self
.send_command(result
, 'mon', '', json
.dumps({
1243 'prefix': 'osd crush dump',
1246 r
, outb
, outs
= result
.wait()
1248 self
.log
.error('Error dumping crush map')
1251 crushmap
= json
.loads(outb
)
1252 except json
.JSONDecodeError
:
1253 raise RuntimeError('unable to parse crush map')
1255 crushmap
= ms
.crush_dump
1257 raw
= CRUSHMap
.get_default_choose_args(crushmap
)
1261 for t
in crushmap
['buckets']:
1262 if t
['id'] == b
['bucket_id']:
1266 raise RuntimeError('could not find bucket %s' % b
['bucket_id'])
1267 self
.log
.debug('bucket items %s' % bucket
['items'])
1268 self
.log
.debug('weight set %s' % b
['weight_set'][0])
1269 if len(bucket
['items']) != len(b
['weight_set'][0]):
1270 raise RuntimeError('weight-set size does not match bucket items')
1271 for pos
in range(len(bucket
['items'])):
1272 weight_set
[bucket
['items'][pos
]['id']] = b
['weight_set'][0][pos
]
1274 self
.log
.debug('weight_set weights %s' % weight_set
)
1277 def do_crush(self
) -> None:
1278 self
.log
.info('do_crush (not yet implemented)')
1280 def do_osd_weight(self
) -> None:
1281 self
.log
.info('do_osd_weight (not yet implemented)')
1283 def execute(self
, plan
: Plan
) -> Tuple
[int, str]:
1284 self
.log
.info('Executing plan %s' % plan
.name
)
1289 if len(plan
.compat_ws
):
1290 ms_plan
= cast(MsPlan
, plan
)
1291 if not CRUSHMap
.have_default_choose_args(ms_plan
.initial
.crush_dump
):
1292 self
.log
.debug('ceph osd crush weight-set create-compat')
1293 result
= CommandResult('')
1294 self
.send_command(result
, 'mon', '', json
.dumps({
1295 'prefix': 'osd crush weight-set create-compat',
1298 r
, outb
, outs
= result
.wait()
1300 self
.log
.error('Error creating compat weight-set')
1303 for osd
, weight
in plan
.compat_ws
.items():
1304 self
.log
.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1306 result
= CommandResult('')
1307 self
.send_command(result
, 'mon', '', json
.dumps({
1308 'prefix': 'osd crush weight-set reweight-compat',
1310 'item': 'osd.%d' % osd
,
1313 commands
.append(result
)
1317 for osd
, weight
in plan
.osd_weights
.items():
1318 reweightn
[str(osd
)] = str(int(weight
* float(0x10000)))
1320 self
.log
.info('ceph osd reweightn %s', reweightn
)
1321 result
= CommandResult('')
1322 self
.send_command(result
, 'mon', '', json
.dumps({
1323 'prefix': 'osd reweightn',
1325 'weights': json
.dumps(reweightn
),
1327 commands
.append(result
)
1330 incdump
= plan
.inc
.dump()
1331 for item
in incdump
.get('new_pg_upmap', []):
1332 self
.log
.info('ceph osd pg-upmap %s mappings %s', item
['pgid'],
1334 result
= CommandResult('foo')
1335 self
.send_command(result
, 'mon', '', json
.dumps({
1336 'prefix': 'osd pg-upmap',
1338 'pgid': item
['pgid'],
1341 commands
.append(result
)
1343 for pgid
in incdump
.get('old_pg_upmap', []):
1344 self
.log
.info('ceph osd rm-pg-upmap %s', pgid
)
1345 result
= CommandResult('foo')
1346 self
.send_command(result
, 'mon', '', json
.dumps({
1347 'prefix': 'osd rm-pg-upmap',
1351 commands
.append(result
)
1353 for item
in incdump
.get('new_pg_upmap_items', []):
1354 self
.log
.info('ceph osd pg-upmap-items %s mappings %s', item
['pgid'],
1357 for m
in item
['mappings']:
1358 osdlist
+= [m
['from'], m
['to']]
1359 result
= CommandResult('foo')
1360 self
.send_command(result
, 'mon', '', json
.dumps({
1361 'prefix': 'osd pg-upmap-items',
1363 'pgid': item
['pgid'],
1366 commands
.append(result
)
1368 for pgid
in incdump
.get('old_pg_upmap_items', []):
1369 self
.log
.info('ceph osd rm-pg-upmap-items %s', pgid
)
1370 result
= CommandResult('foo')
1371 self
.send_command(result
, 'mon', '', json
.dumps({
1372 'prefix': 'osd rm-pg-upmap-items',
1376 commands
.append(result
)
1379 self
.log
.debug('commands %s' % commands
)
1380 for result
in commands
:
1381 r
, outb
, outs
= result
.wait()
1383 self
.log
.error('execute error: r = %d, detail = %s' % (r
, outs
))
1385 self
.log
.debug('done')
1388 def gather_telemetry(self
) -> Dict
[str, Any
]:
1390 'active': self
.active
,