]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
19d4b09cfe539a76d1e723fcf30038e1c61c5b79
2 Balance PG distribution across OSDs.
12 from mgr_module
import CLIReadCommand
, CLICommand
, CommandResult
, MgrModule
, Option
, OSDMap
13 from threading
import Event
14 from typing
import cast
, Any
, Dict
, List
, Optional
, Sequence
, Tuple
, Union
15 from mgr_module
import CRUSHMap
18 TIME_FORMAT
= '%Y-%m-%d_%H:%M:%S'
22 def __init__(self
, osdmap
, raw_pg_stats
, raw_pool_stats
, desc
=''):
25 self
.osdmap_dump
= self
.osdmap
.dump()
26 self
.crush
= osdmap
.get_crush()
27 self
.crush_dump
= self
.crush
.dump()
28 self
.raw_pg_stats
= raw_pg_stats
29 self
.raw_pool_stats
= raw_pool_stats
31 i
['pgid']: i
['stat_sum'] for i
in raw_pg_stats
.get('pg_stats', [])
33 osd_poolids
= [p
['pool'] for p
in self
.osdmap_dump
.get('pools', [])]
34 pg_poolids
= [p
['poolid'] for p
in raw_pool_stats
.get('pool_stats', [])]
35 self
.poolids
= set(osd_poolids
) & set(pg_poolids
)
37 self
.pg_up_by_poolid
= {}
38 for poolid
in self
.poolids
:
39 self
.pg_up_by_poolid
[poolid
] = osdmap
.map_pool_pgs_up(poolid
)
40 for a
, b
in self
.pg_up_by_poolid
[poolid
].items():
43 def calc_misplaced_from(self
, other_ms
):
44 num
= len(other_ms
.pg_up
)
46 for pgid
, before
in other_ms
.pg_up
.items():
47 if before
!= self
.pg_up
.get(pgid
, []):
50 return float(misplaced
) / float(num
)
54 class Mode(enum
.Enum
):
56 crush_compat
= 'crush-compat'
61 def __init__(self
, name
, mode
, osdmap
, pools
):
65 self
.osdmap_dump
= osdmap
.dump()
69 self
.inc
= osdmap
.new_incremental()
72 def dump(self
) -> str:
73 return json
.dumps(self
.inc
.dump(), indent
=4, sort_keys
=True)
75 def show(self
) -> str:
81 Plan with a preloaded MappingState member.
84 def __init__(self
, name
: str, mode
: str, ms
: MappingState
, pools
: List
[str]) -> None:
85 super(MsPlan
, self
).__init
__(name
, mode
, ms
.osdmap
, pools
)
88 def final_state(self
) -> MappingState
:
89 self
.inc
.set_osd_reweights(self
.osd_weights
)
90 self
.inc
.set_crush_compat_weight_set_weights(self
.compat_ws
)
91 return MappingState(self
.initial
.osdmap
.apply_incremental(self
.inc
),
92 self
.initial
.raw_pg_stats
,
93 self
.initial
.raw_pool_stats
,
94 'plan %s final' % self
.name
)
96 def show(self
) -> str:
98 ls
.append('# starting osdmap epoch %d' % self
.initial
.osdmap
.get_epoch())
99 ls
.append('# starting crush version %d' %
100 self
.initial
.osdmap
.get_crush_version())
101 ls
.append('# mode %s' % self
.mode
)
102 if len(self
.compat_ws
) and \
103 not CRUSHMap
.have_default_choose_args(self
.initial
.crush_dump
):
104 ls
.append('ceph osd crush weight-set create-compat')
105 for osd
, weight
in self
.compat_ws
.items():
106 ls
.append('ceph osd crush weight-set reweight-compat %s %f' %
108 for osd
, weight
in self
.osd_weights
.items():
109 ls
.append('ceph osd reweight osd.%d %f' % (osd
, weight
))
110 incdump
= self
.inc
.dump()
111 for pgid
in incdump
.get('old_pg_upmap_items', []):
112 ls
.append('ceph osd rm-pg-upmap-items %s' % pgid
)
113 for item
in incdump
.get('new_pg_upmap_items', []):
115 for m
in item
['mappings']:
116 osdlist
+= [m
['from'], m
['to']]
117 ls
.append('ceph osd pg-upmap-items %s %s' %
118 (item
['pgid'], ' '.join([str(a
) for a
in osdlist
])))
123 def __init__(self
, ms
: MappingState
):
125 self
.root_ids
: Dict
[str, int] = {} # root name -> id
126 self
.pool_name
: Dict
[str, str] = {} # pool id -> pool name
127 self
.pool_id
: Dict
[str, int] = {} # pool name -> id
128 self
.pool_roots
: Dict
[str, List
[str]] = {} # pool name -> root name
129 self
.root_pools
: Dict
[str, List
[str]] = {} # root name -> pools
130 self
.target_by_root
: Dict
[str, Dict
[int, float]] = {} # root name -> target weight map
131 self
.count_by_pool
: Dict
[str, dict] = {}
132 self
.count_by_root
: Dict
[str, dict] = {}
133 self
.actual_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> actual weight map
134 self
.actual_by_root
: Dict
[str, dict] = {} # pool -> by_* -> actual weight map
135 self
.total_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> total
136 self
.total_by_root
: Dict
[str, dict] = {} # root -> by_* -> total
137 self
.stats_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> stddev or avg -> value
138 self
.stats_by_root
: Dict
[str, dict] = {} # root -> by_* -> stddev or avg -> value
140 self
.score_by_pool
: Dict
[str, float] = {}
141 self
.score_by_root
: Dict
[str, Dict
[str, float]] = {}
145 def show(self
, verbose
: bool = False) -> str:
147 r
= self
.ms
.desc
+ '\n'
148 r
+= 'target_by_root %s\n' % self
.target_by_root
149 r
+= 'actual_by_pool %s\n' % self
.actual_by_pool
150 r
+= 'actual_by_root %s\n' % self
.actual_by_root
151 r
+= 'count_by_pool %s\n' % self
.count_by_pool
152 r
+= 'count_by_root %s\n' % self
.count_by_root
153 r
+= 'total_by_pool %s\n' % self
.total_by_pool
154 r
+= 'total_by_root %s\n' % self
.total_by_root
155 r
+= 'stats_by_root %s\n' % self
.stats_by_root
156 r
+= 'score_by_pool %s\n' % self
.score_by_pool
157 r
+= 'score_by_root %s\n' % self
.score_by_root
159 r
= self
.ms
.desc
+ ' '
160 r
+= 'score %f (lower is better)\n' % self
.score
163 def calc_stats(self
, count
, target
, total
):
164 num
= max(len(target
), 1)
165 r
: Dict
[str, Dict
[str, Union
[int, float]]] = {}
166 for t
in ('pgs', 'objects', 'bytes'):
178 avg
= float(total
[t
]) / float(num
)
181 # score is a measure of how uneven the data distribution is.
182 # score lies between [0, 1), 0 means perfect distribution.
186 for k
, v
in count
[t
].items():
187 # adjust/normalize by weight
189 adjusted
= float(v
) / target
[k
] / float(num
)
193 # Overweighted devices and their weights are factors to calculate reweight_urgency.
194 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
195 # situation than one 10% overfilled with 5 2% underfilled devices
198 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
199 x = (adjusted - avg)/avg.
200 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
201 To bring range of F(x) in range [0, 1), we need to make the above modification.
203 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
204 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
205 2. A larger value of x, should imply more urgency to reweight.
206 3. Also, the difference between F(x) when x is large, should be minimal.
207 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
209 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
211 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
213 score
+= target
[k
] * (math
.erf(((adjusted
- avg
) / avg
) / math
.sqrt(2.0)))
214 sum_weight
+= target
[k
]
215 dev
+= (avg
- adjusted
) * (avg
- adjusted
)
216 stddev
= math
.sqrt(dev
/ float(max(num
- 1, 1)))
217 score
= score
/ max(sum_weight
, 1)
219 'max': max(count
[t
].values()),
220 'min': min(count
[t
].values()),
223 'sum_weight': sum_weight
,
229 class Module(MgrModule
):
231 Option(name
='active',
234 desc
='automatically balance PGs across cluster',
236 Option(name
='begin_time',
239 desc
='beginning time of day to automatically balance',
240 long_desc
='This is a time of day in the format HHMM.',
242 Option(name
='end_time',
245 desc
='ending time of day to automatically balance',
246 long_desc
='This is a time of day in the format HHMM.',
248 Option(name
='begin_weekday',
253 desc
='Restrict automatic balancing to this day of the week or later',
254 long_desc
='0 or 7 = Sunday, 1 = Monday, etc.',
256 Option(name
='end_weekday',
261 desc
='Restrict automatic balancing to days of the week earlier than this',
262 long_desc
='0 or 7 = Sunday, 1 = Monday, etc.',
264 Option(name
='crush_compat_max_iterations',
269 desc
='maximum number of iterations to attempt optimization',
271 Option(name
='crush_compat_metrics',
273 default
='pgs,objects,bytes',
274 desc
='metrics with which to calculate OSD utilization',
275 long_desc
='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
277 Option(name
='crush_compat_step',
282 desc
='aggressiveness of optimization',
283 long_desc
='.99 is very aggressive, .01 is less aggressive',
285 Option(name
='min_score',
288 desc
='minimum score, below which no optimization is attempted',
291 desc
='Balancer mode',
293 enum_allowed
=['none', 'crush-compat', 'upmap'],
295 Option(name
='sleep_interval',
298 desc
='how frequently to wake up and attempt optimization',
300 Option(name
='upmap_max_optimizations',
303 desc
='maximum upmap optimizations to make per attempt',
305 Option(name
='upmap_max_deviation',
309 desc
='deviation below which no optimization is attempted',
310 long_desc
='If the number of PGs are within this count then no optimization is attempted',
312 Option(name
='pool_ids',
315 desc
='pools which the automatic balancing will be limited to',
321 plans
: Dict
[str, Plan
] = {}
324 last_optimize_started
= ''
325 last_optimize_duration
= ''
327 no_optimization_needed
= False
328 success_string
= 'Optimization plan created successfully'
329 in_progress_string
= 'in progress'
331 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
332 super(Module
, self
).__init
__(*args
, **kwargs
)
335 @CLIReadCommand('balancer status')
336 def show_status(self
) -> Tuple
[int, str, str]:
341 'plans': list(self
.plans
.keys()),
342 'active': self
.active
,
343 'last_optimize_started': self
.last_optimize_started
,
344 'last_optimize_duration': self
.last_optimize_duration
,
345 'optimize_result': self
.optimize_result
,
346 'no_optimization_needed': self
.no_optimization_needed
,
347 'mode': self
.get_module_option('mode'),
349 return (0, json
.dumps(s
, indent
=4, sort_keys
=True), '')
351 @CLICommand('balancer mode')
352 def set_mode(self
, mode
: Mode
) -> Tuple
[int, str, str]:
356 if mode
== Mode
.upmap
:
357 min_compat_client
= self
.get_osdmap().dump().get('require_min_compat_client', '')
358 if min_compat_client
< 'luminous': # works well because version is alphabetized..
359 warn
= ('min_compat_client "%s" '
360 '< "luminous", which is required for pg-upmap. '
361 'Try "ceph osd set-require-min-compat-client luminous" '
362 'before enabling this mode' % min_compat_client
)
363 return (-errno
.EPERM
, '', warn
)
364 elif mode
== Mode
.crush_compat
:
365 ms
= MappingState(self
.get_osdmap(),
366 self
.get("pg_stats"),
367 self
.get("pool_stats"),
368 'initialize compat weight-set')
369 self
.get_compat_weight_set_weights(ms
) # ignore error
370 self
.set_module_option('mode', mode
.value
)
373 @CLICommand('balancer on')
374 def on(self
) -> Tuple
[int, str, str]:
376 Enable automatic balancing
379 self
.set_module_option('active', 'true')
384 @CLICommand('balancer off')
385 def off(self
) -> Tuple
[int, str, str]:
387 Disable automatic balancing
390 self
.set_module_option('active', 'false')
395 @CLIReadCommand('balancer pool ls')
396 def pool_ls(self
) -> Tuple
[int, str, str]:
398 List automatic balancing pools
400 Note that empty list means all existing pools will be automatic balancing targets,
401 which is the default behaviour of balancer.
403 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
406 pool_ids
= [int(p
) for p
in pool_ids
.split(',')]
407 pool_name_by_id
= dict((p
['pool'], p
['pool_name'])
408 for p
in self
.get_osdmap().dump().get('pools', []))
410 final_ids
: List
[int] = []
413 if p
in pool_name_by_id
:
415 final_names
.append(pool_name_by_id
[p
])
418 if should_prune
: # some pools were gone, prune
419 self
.set_module_option('pool_ids', ','.join(str(p
) for p
in final_ids
))
420 return (0, json
.dumps(sorted(final_names
), indent
=4, sort_keys
=True), '')
422 @CLICommand('balancer pool add')
423 def pool_add(self
, pools
: Sequence
[str]) -> Tuple
[int, str, str]:
425 Enable automatic balancing for specific pools
428 pool_id_by_name
= dict((p
['pool_name'], p
['pool'])
429 for p
in self
.get_osdmap().dump().get('pools', []))
430 invalid_names
= [p
for p
in raw_names
if p
not in pool_id_by_name
]
432 return (-errno
.EINVAL
, '', 'pool(s) %s not found' % invalid_names
)
433 to_add
= set(str(pool_id_by_name
[p
]) for p
in raw_names
if p
in pool_id_by_name
)
434 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
435 existing
= set(pool_ids
.split(',') if pool_ids
else [])
436 final
= to_add | existing
437 self
.set_module_option('pool_ids', ','.join(final
))
440 @CLICommand('balancer pool rm')
441 def pool_rm(self
, pools
: Sequence
[str]) -> Tuple
[int, str, str]:
443 Disable automatic balancing for specific pools
446 existing
= cast(str, self
.get_module_option('pool_ids'))
447 if existing
== '': # for idempotence
449 existing
= existing
.split(',')
450 osdmap
= self
.get_osdmap()
451 pool_ids
= [str(p
['pool']) for p
in osdmap
.dump().get('pools', [])]
452 pool_id_by_name
= dict((p
['pool_name'], p
['pool']) for p
in osdmap
.dump().get('pools', []))
453 final
= [p
for p
in existing
if p
in pool_ids
]
454 to_delete
= [str(pool_id_by_name
[p
]) for p
in raw_names
if p
in pool_id_by_name
]
455 final
= set(final
) - set(to_delete
)
456 self
.set_module_option('pool_ids', ','.join(final
))
459 def _state_from_option(self
, option
: Optional
[str] = None) -> Tuple
[MappingState
, List
[str]]:
462 ms
= MappingState(self
.get_osdmap(),
463 self
.get("pg_stats"),
464 self
.get("pool_stats"),
466 elif option
in self
.plans
:
467 plan
= self
.plans
.get(option
)
470 if plan
.mode
== 'upmap':
471 # Note that for upmap, to improve the efficiency,
472 # we use a basic version of Plan without keeping the obvious
473 # *redundant* MS member.
474 # Hence ms might not be accurate here since we are basically
475 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
476 # It should not be a big deal though..
477 ms
= MappingState(plan
.osdmap
,
478 self
.get("pg_stats"),
479 self
.get("pool_stats"),
480 f
'plan "{plan.name}"')
482 ms
= cast(MsPlan
, plan
).final_state()
484 # not a plan, does it look like a pool?
485 osdmap
= self
.get_osdmap()
486 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
487 if option
not in valid_pool_names
:
488 raise ValueError(f
'option "{option}" not a plan or a pool')
490 ms
= MappingState(osdmap
,
491 self
.get("pg_stats"),
492 self
.get("pool_stats"),
496 @CLIReadCommand('balancer eval-verbose')
497 def plan_eval_verbose(self
, option
: Optional
[str] = None):
499 Evaluate data distribution for the current cluster or specific pool or specific
503 ms
, pools
= self
._state
_from
_option
(option
)
504 return (0, self
.evaluate(ms
, pools
, verbose
=True), '')
505 except ValueError as e
:
506 return (-errno
.EINVAL
, '', str(e
))
508 @CLIReadCommand('balancer eval')
509 def plan_eval_brief(self
, option
: Optional
[str] = None):
511 Evaluate data distribution for the current cluster or specific pool or specific plan
514 ms
, pools
= self
._state
_from
_option
(option
)
515 return (0, self
.evaluate(ms
, pools
, verbose
=False), '')
516 except ValueError as e
:
517 return (-errno
.EINVAL
, '', str(e
))
519 @CLIReadCommand('balancer optimize')
520 def plan_optimize(self
, plan
: str, pools
: List
[str] = []) -> Tuple
[int, str, str]:
522 Run optimizer to create a new plan
524 # The GIL can be release by the active balancer, so disallow when active
526 return (-errno
.EINVAL
, '', 'Balancer enabled, disable to optimize manually')
528 return (-errno
.EINVAL
, '', 'Balancer finishing up....try again')
529 osdmap
= self
.get_osdmap()
530 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
531 invalid_pool_names
= []
533 if p
not in valid_pool_names
:
534 invalid_pool_names
.append(p
)
535 if len(invalid_pool_names
):
536 return (-errno
.EINVAL
, '', 'pools %s not found' % invalid_pool_names
)
537 plan_
= self
.plan_create(plan
, osdmap
, pools
)
538 self
.last_optimize_started
= time
.asctime(time
.localtime())
539 self
.optimize_result
= self
.in_progress_string
541 r
, detail
= self
.optimize(plan_
)
543 self
.last_optimize_duration
= str(datetime
.timedelta(seconds
=(end
- start
)))
545 # Add plan if an optimization was created
546 self
.optimize_result
= self
.success_string
547 self
.plans
[plan
] = plan_
549 self
.optimize_result
= detail
550 return (r
, '', detail
)
552 @CLIReadCommand('balancer show')
553 def plan_show(self
, plan
: str) -> Tuple
[int, str, str]:
555 Show details of an optimization plan
557 plan_
= self
.plans
.get(plan
)
559 return (-errno
.ENOENT
, '', f
'plan {plan} not found')
560 return (0, plan_
.show(), '')
562 @CLICommand('balancer rm')
563 def plan_rm(self
, plan
: str) -> Tuple
[int, str, str]:
565 Discard an optimization plan
567 if plan
in self
.plans
:
571 @CLICommand('balancer reset')
572 def plan_reset(self
) -> Tuple
[int, str, str]:
574 Discard all optimization plans
579 @CLIReadCommand('balancer dump')
580 def plan_dump(self
, plan
: str) -> Tuple
[int, str, str]:
582 Show an optimization plan
584 plan_
= self
.plans
.get(plan
)
586 return -errno
.ENOENT
, '', f
'plan {plan} not found'
588 return (0, plan_
.dump(), '')
590 @CLIReadCommand('balancer ls')
591 def plan_ls(self
) -> Tuple
[int, str, str]:
595 return (0, json
.dumps([p
for p
in self
.plans
], indent
=4, sort_keys
=True), '')
597 @CLIReadCommand('balancer execute')
598 def plan_execute(self
, plan
: str) -> Tuple
[int, str, str]:
600 Execute an optimization plan
602 # The GIL can be release by the active balancer, so disallow when active
604 return (-errno
.EINVAL
, '', 'Balancer enabled, disable to execute a plan')
606 return (-errno
.EINVAL
, '', 'Balancer finishing up....try again')
607 plan_
= self
.plans
.get(plan
)
609 return (-errno
.ENOENT
, '', f
'plan {plan} not found')
610 r
, detail
= self
.execute(plan_
)
612 return (r
, '', detail
)
614 def shutdown(self
) -> None:
615 self
.log
.info('Stopping')
619 def time_permit(self
) -> bool:
620 local_time
= time
.localtime()
621 time_of_day
= time
.strftime('%H%M', local_time
)
622 weekday
= (local_time
.tm_wday
+ 1) % 7 # be compatible with C
625 begin_time
= cast(str, self
.get_module_option('begin_time'))
626 end_time
= cast(str, self
.get_module_option('end_time'))
627 if begin_time
<= end_time
:
628 permit
= begin_time
<= time_of_day
< end_time
630 permit
= time_of_day
>= begin_time
or time_of_day
< end_time
632 self
.log
.debug("should run between %s - %s, now %s, skipping",
633 begin_time
, end_time
, time_of_day
)
636 begin_weekday
= cast(int, self
.get_module_option('begin_weekday'))
637 end_weekday
= cast(int, self
.get_module_option('end_weekday'))
638 if begin_weekday
<= end_weekday
:
639 permit
= begin_weekday
<= weekday
< end_weekday
641 permit
= weekday
>= begin_weekday
or weekday
< end_weekday
643 self
.log
.debug("should run between weekday %d - %d, now %d, skipping",
644 begin_weekday
, end_weekday
, weekday
)
649 def serve(self
) -> None:
650 self
.log
.info('Starting')
652 self
.active
= cast(bool, self
.get_module_option('active'))
653 sleep_interval
= cast(float, self
.get_module_option('sleep_interval'))
654 self
.log
.debug('Waking up [%s, now %s]',
655 "active" if self
.active
else "inactive",
656 time
.strftime(TIME_FORMAT
, time
.localtime()))
657 if self
.active
and self
.time_permit():
658 self
.log
.debug('Running')
659 name
= 'auto_%s' % time
.strftime(TIME_FORMAT
, time
.gmtime())
660 osdmap
= self
.get_osdmap()
661 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
663 allow
= [int(p
) for p
in pool_ids
.split(',')]
666 final
: List
[str] = []
668 pools
= osdmap
.dump().get('pools', [])
669 valid
= [p
['pool'] for p
in pools
]
670 ids
= set(allow
) & set(valid
)
671 if set(allow
) - set(valid
): # some pools were gone, prune
672 self
.set_module_option('pool_ids', ','.join(str(p
) for p
in ids
))
673 pool_name_by_id
= dict((p
['pool'], p
['pool_name']) for p
in pools
)
674 final
= [pool_name_by_id
[p
] for p
in ids
if p
in pool_name_by_id
]
675 plan
= self
.plan_create(name
, osdmap
, final
)
676 self
.optimizing
= True
677 self
.last_optimize_started
= time
.asctime(time
.localtime())
678 self
.optimize_result
= self
.in_progress_string
680 r
, detail
= self
.optimize(plan
)
682 self
.last_optimize_duration
= str(datetime
.timedelta(seconds
=(end
- start
)))
684 self
.optimize_result
= self
.success_string
687 self
.optimize_result
= detail
688 self
.optimizing
= False
689 self
.log
.debug('Sleeping for %d', sleep_interval
)
690 self
.event
.wait(sleep_interval
)
693 def plan_create(self
, name
: str, osdmap
: OSDMap
, pools
: List
[str]) -> Plan
:
694 mode
= cast(str, self
.get_module_option('mode'))
696 # drop unnecessary MS member for upmap mode.
697 # this way we could effectively eliminate the usage of a
698 # complete pg_stats, which can become horribly inefficient
700 plan
= Plan(name
, mode
, osdmap
, pools
)
705 self
.get("pg_stats"),
706 self
.get("pool_stats"),
707 'plan %s initial' % name
),
711 def calc_eval(self
, ms
: MappingState
, pools
: List
[str]) -> Eval
:
715 for p
in ms
.osdmap_dump
.get('pools', []):
716 if len(pools
) and p
['pool_name'] not in pools
:
718 # skip dead or not-yet-ready pools too
719 if p
['pool'] not in ms
.poolids
:
721 pe
.pool_name
[p
['pool']] = p
['pool_name']
722 pe
.pool_id
[p
['pool_name']] = p
['pool']
723 pool_rule
[p
['pool_name']] = p
['crush_rule']
724 pe
.pool_roots
[p
['pool_name']] = []
725 pool_info
[p
['pool_name']] = p
726 if len(pool_info
) == 0:
728 self
.log
.debug('pool_name %s' % pe
.pool_name
)
729 self
.log
.debug('pool_id %s' % pe
.pool_id
)
730 self
.log
.debug('pools %s' % pools
)
731 self
.log
.debug('pool_rule %s' % pool_rule
)
733 osd_weight
= {a
['osd']: a
['weight']
734 for a
in ms
.osdmap_dump
.get('osds', []) if a
['weight'] > 0}
736 # get expected distributions by root
737 actual_by_root
: Dict
[str, Dict
[str, dict]] = {}
738 rootids
= ms
.crush
.find_takes()
740 for rootid
in rootids
:
741 ls
= ms
.osdmap
.get_pools_by_take(rootid
)
743 # find out roots associating with pools we are passed in
745 if candidate
in pe
.pool_name
:
746 want
.append(candidate
)
749 root
= ms
.crush
.get_item_name(rootid
)
750 pe
.root_pools
[root
] = []
752 pe
.pool_roots
[pe
.pool_name
[poolid
]].append(root
)
753 pe
.root_pools
[root
].append(pe
.pool_name
[poolid
])
754 pe
.root_ids
[root
] = rootid
756 weight_map
= ms
.crush
.get_take_weight_osd_map(rootid
)
758 osd
: cw
* osd_weight
[osd
]
759 for osd
, cw
in weight_map
.items() if osd
in osd_weight
and cw
> 0
761 sum_w
= sum(adjusted_map
.values())
762 assert len(adjusted_map
) == 0 or sum_w
> 0
763 pe
.target_by_root
[root
] = {osd
: w
/ sum_w
764 for osd
, w
in adjusted_map
.items()}
765 actual_by_root
[root
] = {
770 for osd
in pe
.target_by_root
[root
]:
771 actual_by_root
[root
]['pgs'][osd
] = 0
772 actual_by_root
[root
]['objects'][osd
] = 0
773 actual_by_root
[root
]['bytes'][osd
] = 0
774 pe
.total_by_root
[root
] = {
779 self
.log
.debug('pool_roots %s' % pe
.pool_roots
)
780 self
.log
.debug('root_pools %s' % pe
.root_pools
)
781 self
.log
.debug('target_by_root %s' % pe
.target_by_root
)
783 # pool and root actual
784 for pool
, pi
in pool_info
.items():
786 pm
= ms
.pg_up_by_poolid
[poolid
]
793 for pgid
, up
in pm
.items():
794 for osd
in [int(osd
) for osd
in up
]:
795 if osd
== CRUSHMap
.ITEM_NONE
:
797 if osd
not in pgs_by_osd
:
799 objects_by_osd
[osd
] = 0
800 bytes_by_osd
[osd
] = 0
802 objects_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_objects']
803 bytes_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_bytes']
804 # pick a root to associate this pg instance with.
805 # note that this is imprecise if the roots have
806 # overlapping children.
807 # FIXME: divide bytes by k for EC pools.
808 for root
in pe
.pool_roots
[pool
]:
809 if osd
in pe
.target_by_root
[root
]:
810 actual_by_root
[root
]['pgs'][osd
] += 1
811 actual_by_root
[root
]['objects'][osd
] += ms
.pg_stat
[pgid
]['num_objects']
812 actual_by_root
[root
]['bytes'][osd
] += ms
.pg_stat
[pgid
]['num_bytes']
814 objects
+= ms
.pg_stat
[pgid
]['num_objects']
815 bytes
+= ms
.pg_stat
[pgid
]['num_bytes']
816 pe
.total_by_root
[root
]['pgs'] += 1
817 pe
.total_by_root
[root
]['objects'] += ms
.pg_stat
[pgid
]['num_objects']
818 pe
.total_by_root
[root
]['bytes'] += ms
.pg_stat
[pgid
]['num_bytes']
820 pe
.count_by_pool
[pool
] = {
823 for k
, v
in pgs_by_osd
.items()
827 for k
, v
in objects_by_osd
.items()
831 for k
, v
in bytes_by_osd
.items()
834 pe
.actual_by_pool
[pool
] = {
836 k
: float(v
) / float(max(pgs
, 1))
837 for k
, v
in pgs_by_osd
.items()
840 k
: float(v
) / float(max(objects
, 1))
841 for k
, v
in objects_by_osd
.items()
844 k
: float(v
) / float(max(bytes
, 1))
845 for k
, v
in bytes_by_osd
.items()
848 pe
.total_by_pool
[pool
] = {
853 for root
in pe
.total_by_root
:
854 pe
.count_by_root
[root
] = {
857 for k
, v
in actual_by_root
[root
]['pgs'].items()
861 for k
, v
in actual_by_root
[root
]['objects'].items()
865 for k
, v
in actual_by_root
[root
]['bytes'].items()
868 pe
.actual_by_root
[root
] = {
870 k
: float(v
) / float(max(pe
.total_by_root
[root
]['pgs'], 1))
871 for k
, v
in actual_by_root
[root
]['pgs'].items()
874 k
: float(v
) / float(max(pe
.total_by_root
[root
]['objects'], 1))
875 for k
, v
in actual_by_root
[root
]['objects'].items()
878 k
: float(v
) / float(max(pe
.total_by_root
[root
]['bytes'], 1))
879 for k
, v
in actual_by_root
[root
]['bytes'].items()
882 self
.log
.debug('actual_by_pool %s' % pe
.actual_by_pool
)
883 self
.log
.debug('actual_by_root %s' % pe
.actual_by_root
)
885 # average and stddev and score
889 pe
.target_by_root
[a
],
891 ) for a
, b
in pe
.count_by_root
.items()
893 self
.log
.debug('stats_by_root %s' % pe
.stats_by_root
)
895 # the scores are already normalized
898 'pgs': pe
.stats_by_root
[r
]['pgs']['score'],
899 'objects': pe
.stats_by_root
[r
]['objects']['score'],
900 'bytes': pe
.stats_by_root
[r
]['bytes']['score'],
901 } for r
in pe
.total_by_root
.keys()
903 self
.log
.debug('score_by_root %s' % pe
.score_by_root
)
905 # get the list of score metrics, comma separated
906 metrics
= cast(str, self
.get_module_option('crush_compat_metrics')).split(',')
908 # total score is just average of normalized stddevs
910 for r
, vs
in pe
.score_by_root
.items():
911 for k
, v
in vs
.items():
914 pe
.score
/= len(metrics
) * len(roots
)
917 def evaluate(self
, ms
: MappingState
, pools
: List
[str], verbose
: bool = False) -> str:
918 pe
= self
.calc_eval(ms
, pools
)
919 return pe
.show(verbose
=verbose
)
921 def optimize(self
, plan
: Plan
) -> Tuple
[int, str]:
922 self
.log
.info('Optimize plan %s' % plan
.name
)
923 max_misplaced
= cast(float, self
.get_ceph_option('target_max_misplaced_ratio'))
924 self
.log
.info('Mode %s, max misplaced %f' %
925 (plan
.mode
, max_misplaced
))
927 info
= self
.get('pg_status')
928 unknown
= info
.get('unknown_pgs_ratio', 0.0)
929 degraded
= info
.get('degraded_ratio', 0.0)
930 inactive
= info
.get('inactive_pgs_ratio', 0.0)
931 misplaced
= info
.get('misplaced_ratio', 0.0)
932 plan
.pg_status
= info
933 self
.log
.debug('unknown %f degraded %f inactive %f misplaced %g',
934 unknown
, degraded
, inactive
, misplaced
)
936 detail
= 'Some PGs (%f) are unknown; try again later' % unknown
937 self
.log
.info(detail
)
938 return -errno
.EAGAIN
, detail
940 detail
= 'Some objects (%f) are degraded; try again later' % degraded
941 self
.log
.info(detail
)
942 return -errno
.EAGAIN
, detail
944 detail
= 'Some PGs (%f) are inactive; try again later' % inactive
945 self
.log
.info(detail
)
946 return -errno
.EAGAIN
, detail
947 elif misplaced
>= max_misplaced
:
948 detail
= 'Too many objects (%f > %f) are misplaced; ' \
949 'try again later' % (misplaced
, max_misplaced
)
950 self
.log
.info(detail
)
951 return -errno
.EAGAIN
, detail
953 if plan
.mode
== 'upmap':
954 return self
.do_upmap(plan
)
955 elif plan
.mode
== 'crush-compat':
956 return self
.do_crush_compat(cast(MsPlan
, plan
))
957 elif plan
.mode
== 'none':
958 detail
= 'Please do "ceph balancer mode" to choose a valid mode first'
959 self
.log
.info('Idle')
960 return -errno
.ENOEXEC
, detail
962 detail
= 'Unrecognized mode %s' % plan
.mode
963 self
.log
.info(detail
)
964 return -errno
.EINVAL
, detail
966 def do_upmap(self
, plan
: Plan
) -> Tuple
[int, str]:
967 self
.log
.info('do_upmap')
968 max_optimizations
= cast(float, self
.get_module_option('upmap_max_optimizations'))
969 max_deviation
= cast(int, self
.get_module_option('upmap_max_deviation'))
970 osdmap_dump
= plan
.osdmap_dump
975 pools
= [str(i
['pool_name']) for i
in osdmap_dump
.get('pools', [])]
977 detail
= 'No pools available'
978 self
.log
.info(detail
)
979 return -errno
.ENOENT
, detail
980 # shuffle pool list so they all get equal (in)attention
981 random
.shuffle(pools
)
982 self
.log
.info('pools %s' % pools
)
987 left
= max_optimizations
988 pools_with_pg_merge
= [p
['pool_name'] for p
in osdmap_dump
.get('pools', [])
989 if p
['pg_num'] > p
['pg_num_target']]
990 crush_rule_by_pool_name
= dict((p
['pool_name'], p
['crush_rule'])
991 for p
in osdmap_dump
.get('pools', []))
993 if pool
not in crush_rule_by_pool_name
:
994 self
.log
.info('pool %s does not exist' % pool
)
996 if pool
in pools_with_pg_merge
:
997 self
.log
.info('pool %s has pending PG(s) for merging, skipping for now' % pool
)
999 adjusted_pools
.append(pool
)
1000 # shuffle so all pools get equal (in)attention
1001 random
.shuffle(adjusted_pools
)
1002 pool_dump
= osdmap_dump
.get('pools', [])
1003 for pool
in adjusted_pools
:
1005 if p
['pool_name'] == pool
:
1009 # note that here we deliberately exclude any scrubbing pgs too
1010 # since scrubbing activities have significant impacts on performance
1011 num_pg_active_clean
= 0
1012 for p
in plan
.pg_status
.get('pgs_by_pool_state', []):
1013 pgs_pool_id
= p
['pool_id']
1014 if pgs_pool_id
!= pool_id
:
1016 for s
in p
['pg_state_counts']:
1017 if s
['state_name'] == 'active+clean':
1018 num_pg_active_clean
+= s
['count']
1020 available
= min(left
, num_pg_active_clean
)
1021 did
= plan
.osdmap
.calc_pg_upmaps(inc
, max_deviation
, available
, [pool
])
1026 self
.log
.info('prepared %d/%d changes' % (total_did
, max_optimizations
))
1028 self
.no_optimization_needed
= True
1029 return -errno
.EALREADY
, 'Unable to find further optimization, ' \
1030 'or pool(s) pg_num is decreasing, ' \
1031 'or distribution is already perfect'
1034 def do_crush_compat(self
, plan
: MsPlan
) -> Tuple
[int, str]:
1035 self
.log
.info('do_crush_compat')
1036 max_iterations
= cast(int, self
.get_module_option('crush_compat_max_iterations'))
1037 if max_iterations
< 1:
1038 return -errno
.EINVAL
, '"crush_compat_max_iterations" must be >= 1'
1039 step
= cast(float, self
.get_module_option('crush_compat_step'))
1040 if step
<= 0 or step
>= 1.0:
1041 return -errno
.EINVAL
, '"crush_compat_step" must be in (0, 1)'
1042 max_misplaced
= cast(float, self
.get_ceph_option('target_max_misplaced_ratio'))
1047 crush
= osdmap
.get_crush()
1048 pe
= self
.calc_eval(ms
, plan
.pools
)
1049 min_score_to_optimize
= cast(float, self
.get_module_option('min_score'))
1050 if pe
.score
<= min_score_to_optimize
:
1052 detail
= 'Distribution is already perfect'
1054 detail
= 'score %f <= min_score %f, will not optimize' \
1055 % (pe
.score
, min_score_to_optimize
)
1056 self
.log
.info(detail
)
1057 return -errno
.EALREADY
, detail
1059 # get current osd reweights
1060 orig_osd_weight
= {a
['osd']: a
['weight']
1061 for a
in ms
.osdmap_dump
.get('osds', [])}
1063 # get current compat weight-set weights
1064 orig_ws
= self
.get_compat_weight_set_weights(ms
)
1066 return -errno
.EAGAIN
, 'compat weight-set not available'
1067 orig_ws
= {a
: b
for a
, b
in orig_ws
.items() if a
>= 0}
1069 # Make sure roots don't overlap their devices. If so, we
1071 roots
= list(pe
.target_by_root
.keys())
1072 self
.log
.debug('roots %s', roots
)
1073 visited
: Dict
[int, str] = {}
1074 overlap
: Dict
[int, List
[str]] = {}
1075 for root
, wm
in pe
.target_by_root
.items():
1078 if osd
not in overlap
:
1079 overlap
[osd
] = [visited
[osd
]]
1080 overlap
[osd
].append(root
)
1082 if len(overlap
) > 0:
1083 detail
= 'Some osds belong to multiple subtrees: %s' % \
1085 self
.log
.error(detail
)
1086 return -errno
.EOPNOTSUPP
, detail
1088 # rebalance by pgs, objects, or bytes
1089 metrics
= cast(str, self
.get_module_option('crush_compat_metrics')).split(',')
1090 key
= metrics
[0] # balancing using the first score metric
1091 if key
not in ['pgs', 'bytes', 'objects']:
1092 self
.log
.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key
)
1096 best_ws
= copy
.deepcopy(orig_ws
)
1097 best_ow
= copy
.deepcopy(orig_osd_weight
)
1099 left
= max_iterations
1101 next_ws
= copy
.deepcopy(best_ws
)
1102 next_ow
= copy
.deepcopy(best_ow
)
1105 self
.log
.debug('best_ws %s' % best_ws
)
1106 random
.shuffle(roots
)
1108 pools
= best_pe
.root_pools
[root
]
1109 osds
= len(best_pe
.target_by_root
[root
])
1110 min_pgs
= osds
* min_pg_per_osd
1111 if best_pe
.total_by_root
[root
][key
] < min_pgs
:
1112 self
.log
.info('Skipping root %s (pools %s), total pgs %d '
1113 '< minimum %d (%d per osd)',
1115 best_pe
.total_by_root
[root
][key
],
1116 min_pgs
, min_pg_per_osd
)
1118 self
.log
.info('Balancing root %s (pools %s) by %s' %
1120 target
= best_pe
.target_by_root
[root
]
1121 actual
= best_pe
.actual_by_root
[root
][key
]
1122 queue
= sorted(actual
.keys(),
1123 key
=lambda osd
: -abs(target
[osd
] - actual
[osd
]))
1125 if orig_osd_weight
[osd
] == 0:
1126 self
.log
.debug('skipping out osd.%d', osd
)
1128 deviation
= target
[osd
] - actual
[osd
]
1131 self
.log
.debug('osd.%d deviation %f', osd
, deviation
)
1132 weight
= best_ws
[osd
]
1133 ow
= orig_osd_weight
[osd
]
1135 calc_weight
= target
[osd
] / actual
[osd
] * weight
* ow
1137 # for newly created osds, reset calc_weight at target value
1138 # this way weight-set will end up absorbing *step* of its
1139 # target (final) value at the very beginning and slowly catch up later.
1140 # note that if this turns out causing too many misplaced
1141 # pgs, then we'll reduce step and retry
1142 calc_weight
= target
[osd
]
1143 new_weight
= weight
* (1.0 - step
) + calc_weight
* step
1144 self
.log
.debug('Reweight osd.%d %f -> %f', osd
, weight
,
1146 next_ws
[osd
] = new_weight
1148 new_ow
= min(1.0, max(step
+ (1.0 - step
) * ow
,
1150 self
.log
.debug('Reweight osd.%d reweight %f -> %f',
1152 next_ow
[osd
] = new_ow
1154 # normalize weights under this root
1155 root_weight
= crush
.get_item_weight(pe
.root_ids
[root
])
1156 root_sum
= sum(b
for a
, b
in next_ws
.items()
1157 if a
in target
.keys())
1158 if root_sum
> 0 and root_weight
> 0:
1159 factor
= root_sum
/ root_weight
1160 self
.log
.debug('normalizing root %s %d, weight %f, '
1161 'ws sum %f, factor %f',
1162 root
, pe
.root_ids
[root
], root_weight
,
1164 for osd
in actual
.keys():
1165 next_ws
[osd
] = next_ws
[osd
] / factor
1168 plan
.compat_ws
= copy
.deepcopy(next_ws
)
1169 next_ms
= plan
.final_state()
1170 next_pe
= self
.calc_eval(next_ms
, plan
.pools
)
1171 next_misplaced
= next_ms
.calc_misplaced_from(ms
)
1172 self
.log
.debug('Step result score %f -> %f, misplacing %f',
1173 best_pe
.score
, next_pe
.score
, next_misplaced
)
1175 if next_misplaced
> max_misplaced
:
1176 if best_pe
.score
< pe
.score
:
1177 self
.log
.debug('Step misplaced %f > max %f, stopping',
1178 next_misplaced
, max_misplaced
)
1181 next_ws
= copy
.deepcopy(best_ws
)
1182 next_ow
= copy
.deepcopy(best_ow
)
1183 self
.log
.debug('Step misplaced %f > max %f, reducing step to %f',
1184 next_misplaced
, max_misplaced
, step
)
1186 if next_pe
.score
> best_pe
.score
* 1.0001:
1188 if bad_steps
< 5 and random
.randint(0, 100) < 70:
1189 self
.log
.debug('Score got worse, taking another step')
1192 next_ws
= copy
.deepcopy(best_ws
)
1193 next_ow
= copy
.deepcopy(best_ow
)
1194 self
.log
.debug('Score got worse, trying smaller step %f',
1199 best_ws
= copy
.deepcopy(next_ws
)
1200 best_ow
= copy
.deepcopy(next_ow
)
1201 if best_pe
.score
== 0:
1205 # allow a small regression if we are phasing out osd weights
1207 if best_ow
!= orig_osd_weight
:
1210 if best_pe
.score
< pe
.score
+ fudge
:
1211 self
.log
.info('Success, score %f -> %f', pe
.score
, best_pe
.score
)
1212 plan
.compat_ws
= best_ws
1213 for osd
, w
in best_ow
.items():
1214 if w
!= orig_osd_weight
[osd
]:
1215 self
.log
.debug('osd.%d reweight %f', osd
, w
)
1216 plan
.osd_weights
[osd
] = w
1219 self
.log
.info('Failed to find further optimization, score %f',
1222 return -errno
.EDOM
, 'Unable to find further optimization, ' \
1223 'change balancer mode and retry might help'
1225 def get_compat_weight_set_weights(self
, ms
: MappingState
):
1226 have_choose_args
= CRUSHMap
.have_default_choose_args(ms
.crush_dump
)
1227 if have_choose_args
:
1228 # get number of buckets in choose_args
1229 choose_args_len
= len(CRUSHMap
.get_default_choose_args(ms
.crush_dump
))
1230 if not have_choose_args
or choose_args_len
!= len(ms
.crush_dump
['buckets']):
1231 # enable compat weight-set first
1232 self
.log
.debug('no choose_args or all buckets do not have weight-sets')
1233 self
.log
.debug('ceph osd crush weight-set create-compat')
1234 result
= CommandResult('')
1235 self
.send_command(result
, 'mon', '', json
.dumps({
1236 'prefix': 'osd crush weight-set create-compat',
1239 r
, outb
, outs
= result
.wait()
1241 self
.log
.error('Error creating compat weight-set')
1244 result
= CommandResult('')
1245 self
.send_command(result
, 'mon', '', json
.dumps({
1246 'prefix': 'osd crush dump',
1249 r
, outb
, outs
= result
.wait()
1251 self
.log
.error('Error dumping crush map')
1254 crushmap
= json
.loads(outb
)
1255 except json
.JSONDecodeError
:
1256 raise RuntimeError('unable to parse crush map')
1258 crushmap
= ms
.crush_dump
1260 raw
= CRUSHMap
.get_default_choose_args(crushmap
)
1264 for t
in crushmap
['buckets']:
1265 if t
['id'] == b
['bucket_id']:
1269 raise RuntimeError('could not find bucket %s' % b
['bucket_id'])
1270 self
.log
.debug('bucket items %s' % bucket
['items'])
1271 self
.log
.debug('weight set %s' % b
['weight_set'][0])
1272 if len(bucket
['items']) != len(b
['weight_set'][0]):
1273 raise RuntimeError('weight-set size does not match bucket items')
1274 for pos
in range(len(bucket
['items'])):
1275 weight_set
[bucket
['items'][pos
]['id']] = b
['weight_set'][0][pos
]
1277 self
.log
.debug('weight_set weights %s' % weight_set
)
1280 def do_crush(self
) -> None:
1281 self
.log
.info('do_crush (not yet implemented)')
1283 def do_osd_weight(self
) -> None:
1284 self
.log
.info('do_osd_weight (not yet implemented)')
1286 def execute(self
, plan
: Plan
) -> Tuple
[int, str]:
1287 self
.log
.info('Executing plan %s' % plan
.name
)
1292 if len(plan
.compat_ws
):
1293 ms_plan
= cast(MsPlan
, plan
)
1294 if not CRUSHMap
.have_default_choose_args(ms_plan
.initial
.crush_dump
):
1295 self
.log
.debug('ceph osd crush weight-set create-compat')
1296 result
= CommandResult('')
1297 self
.send_command(result
, 'mon', '', json
.dumps({
1298 'prefix': 'osd crush weight-set create-compat',
1301 r
, outb
, outs
= result
.wait()
1303 self
.log
.error('Error creating compat weight-set')
1306 for osd
, weight
in plan
.compat_ws
.items():
1307 self
.log
.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1309 result
= CommandResult('')
1310 self
.send_command(result
, 'mon', '', json
.dumps({
1311 'prefix': 'osd crush weight-set reweight-compat',
1313 'item': 'osd.%d' % osd
,
1316 commands
.append(result
)
1320 for osd
, weight
in plan
.osd_weights
.items():
1321 reweightn
[str(osd
)] = str(int(weight
* float(0x10000)))
1323 self
.log
.info('ceph osd reweightn %s', reweightn
)
1324 result
= CommandResult('')
1325 self
.send_command(result
, 'mon', '', json
.dumps({
1326 'prefix': 'osd reweightn',
1328 'weights': json
.dumps(reweightn
),
1330 commands
.append(result
)
1333 incdump
= plan
.inc
.dump()
1334 for item
in incdump
.get('new_pg_upmap', []):
1335 self
.log
.info('ceph osd pg-upmap %s mappings %s', item
['pgid'],
1337 result
= CommandResult('foo')
1338 self
.send_command(result
, 'mon', '', json
.dumps({
1339 'prefix': 'osd pg-upmap',
1341 'pgid': item
['pgid'],
1344 commands
.append(result
)
1346 for pgid
in incdump
.get('old_pg_upmap', []):
1347 self
.log
.info('ceph osd rm-pg-upmap %s', pgid
)
1348 result
= CommandResult('foo')
1349 self
.send_command(result
, 'mon', '', json
.dumps({
1350 'prefix': 'osd rm-pg-upmap',
1354 commands
.append(result
)
1356 for item
in incdump
.get('new_pg_upmap_items', []):
1357 self
.log
.info('ceph osd pg-upmap-items %s mappings %s', item
['pgid'],
1360 for m
in item
['mappings']:
1361 osdlist
+= [m
['from'], m
['to']]
1362 result
= CommandResult('foo')
1363 self
.send_command(result
, 'mon', '', json
.dumps({
1364 'prefix': 'osd pg-upmap-items',
1366 'pgid': item
['pgid'],
1369 commands
.append(result
)
1371 for pgid
in incdump
.get('old_pg_upmap_items', []):
1372 self
.log
.info('ceph osd rm-pg-upmap-items %s', pgid
)
1373 result
= CommandResult('foo')
1374 self
.send_command(result
, 'mon', '', json
.dumps({
1375 'prefix': 'osd rm-pg-upmap-items',
1379 commands
.append(result
)
1382 self
.log
.debug('commands %s' % commands
)
1383 for result
in commands
:
1384 r
, outb
, outs
= result
.wait()
1386 self
.log
.error('execute error: r = %d, detail = %s' % (r
, outs
))
1388 self
.log
.debug('done')
1391 def gather_telemetry(self
) -> Dict
[str, Any
]:
1393 'active': self
.active
,