]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/balancer/module.py
2 Balance PG distribution across OSDs.
12 from mgr_module
import CLIReadCommand
, CLICommand
, CommandResult
, MgrModule
, Option
, OSDMap
13 from threading
import Event
14 from typing
import cast
, Any
, Dict
, List
, Optional
, Sequence
, Tuple
, Union
15 from mgr_module
import CRUSHMap
18 TIME_FORMAT
= '%Y-%m-%d_%H:%M:%S'
22 def __init__(self
, osdmap
, raw_pg_stats
, raw_pool_stats
, desc
=''):
25 self
.osdmap_dump
= self
.osdmap
.dump()
26 self
.crush
= osdmap
.get_crush()
27 self
.crush_dump
= self
.crush
.dump()
28 self
.raw_pg_stats
= raw_pg_stats
29 self
.raw_pool_stats
= raw_pool_stats
31 i
['pgid']: i
['stat_sum'] for i
in raw_pg_stats
.get('pg_stats', [])
33 osd_poolids
= [p
['pool'] for p
in self
.osdmap_dump
.get('pools', [])]
34 pg_poolids
= [p
['poolid'] for p
in raw_pool_stats
.get('pool_stats', [])]
35 self
.poolids
= set(osd_poolids
) & set(pg_poolids
)
37 self
.pg_up_by_poolid
= {}
38 for poolid
in self
.poolids
:
39 self
.pg_up_by_poolid
[poolid
] = osdmap
.map_pool_pgs_up(poolid
)
40 for a
, b
in self
.pg_up_by_poolid
[poolid
].items():
43 def calc_misplaced_from(self
, other_ms
):
44 num
= len(other_ms
.pg_up
)
46 for pgid
, before
in other_ms
.pg_up
.items():
47 if before
!= self
.pg_up
.get(pgid
, []):
50 return float(misplaced
) / float(num
)
54 class Mode(enum
.Enum
):
56 crush_compat
= 'crush-compat'
61 def __init__(self
, name
, mode
, osdmap
, pools
):
65 self
.osdmap_dump
= osdmap
.dump()
69 self
.inc
= osdmap
.new_incremental()
72 def dump(self
) -> str:
73 return json
.dumps(self
.inc
.dump(), indent
=4, sort_keys
=True)
75 def show(self
) -> str:
81 Plan with a preloaded MappingState member.
84 def __init__(self
, name
: str, mode
: str, ms
: MappingState
, pools
: List
[str]) -> None:
85 super(MsPlan
, self
).__init
__(name
, mode
, ms
.osdmap
, pools
)
88 def final_state(self
) -> MappingState
:
89 self
.inc
.set_osd_reweights(self
.osd_weights
)
90 self
.inc
.set_crush_compat_weight_set_weights(self
.compat_ws
)
91 return MappingState(self
.initial
.osdmap
.apply_incremental(self
.inc
),
92 self
.initial
.raw_pg_stats
,
93 self
.initial
.raw_pool_stats
,
94 'plan %s final' % self
.name
)
96 def show(self
) -> str:
98 ls
.append('# starting osdmap epoch %d' % self
.initial
.osdmap
.get_epoch())
99 ls
.append('# starting crush version %d' %
100 self
.initial
.osdmap
.get_crush_version())
101 ls
.append('# mode %s' % self
.mode
)
102 if len(self
.compat_ws
) and \
103 not CRUSHMap
.have_default_choose_args(self
.initial
.crush_dump
):
104 ls
.append('ceph osd crush weight-set create-compat')
105 for osd
, weight
in self
.compat_ws
.items():
106 ls
.append('ceph osd crush weight-set reweight-compat %s %f' %
108 for osd
, weight
in self
.osd_weights
.items():
109 ls
.append('ceph osd reweight osd.%d %f' % (osd
, weight
))
110 incdump
= self
.inc
.dump()
111 for pgid
in incdump
.get('old_pg_upmap_items', []):
112 ls
.append('ceph osd rm-pg-upmap-items %s' % pgid
)
113 for item
in incdump
.get('new_pg_upmap_items', []):
115 for m
in item
['mappings']:
116 osdlist
+= [m
['from'], m
['to']]
117 ls
.append('ceph osd pg-upmap-items %s %s' %
118 (item
['pgid'], ' '.join([str(a
) for a
in osdlist
])))
123 def __init__(self
, ms
: MappingState
):
125 self
.root_ids
: Dict
[str, int] = {} # root name -> id
126 self
.pool_name
: Dict
[str, str] = {} # pool id -> pool name
127 self
.pool_id
: Dict
[str, int] = {} # pool name -> id
128 self
.pool_roots
: Dict
[str, List
[str]] = {} # pool name -> root name
129 self
.root_pools
: Dict
[str, List
[str]] = {} # root name -> pools
130 self
.target_by_root
: Dict
[str, Dict
[int, float]] = {} # root name -> target weight map
131 self
.count_by_pool
: Dict
[str, dict] = {}
132 self
.count_by_root
: Dict
[str, dict] = {}
133 self
.actual_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> actual weight map
134 self
.actual_by_root
: Dict
[str, dict] = {} # pool -> by_* -> actual weight map
135 self
.total_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> total
136 self
.total_by_root
: Dict
[str, dict] = {} # root -> by_* -> total
137 self
.stats_by_pool
: Dict
[str, dict] = {} # pool -> by_* -> stddev or avg -> value
138 self
.stats_by_root
: Dict
[str, dict] = {} # root -> by_* -> stddev or avg -> value
140 self
.score_by_pool
: Dict
[str, float] = {}
141 self
.score_by_root
: Dict
[str, Dict
[str, float]] = {}
145 def show(self
, verbose
: bool = False) -> str:
147 r
= self
.ms
.desc
+ '\n'
148 r
+= 'target_by_root %s\n' % self
.target_by_root
149 r
+= 'actual_by_pool %s\n' % self
.actual_by_pool
150 r
+= 'actual_by_root %s\n' % self
.actual_by_root
151 r
+= 'count_by_pool %s\n' % self
.count_by_pool
152 r
+= 'count_by_root %s\n' % self
.count_by_root
153 r
+= 'total_by_pool %s\n' % self
.total_by_pool
154 r
+= 'total_by_root %s\n' % self
.total_by_root
155 r
+= 'stats_by_root %s\n' % self
.stats_by_root
156 r
+= 'score_by_pool %s\n' % self
.score_by_pool
157 r
+= 'score_by_root %s\n' % self
.score_by_root
159 r
= self
.ms
.desc
+ ' '
160 r
+= 'score %f (lower is better)\n' % self
.score
163 def calc_stats(self
, count
, target
, total
):
164 num
= max(len(target
), 1)
165 r
: Dict
[str, Dict
[str, Union
[int, float]]] = {}
166 for t
in ('pgs', 'objects', 'bytes'):
178 avg
= float(total
[t
]) / float(num
)
181 # score is a measure of how uneven the data distribution is.
182 # score lies between [0, 1), 0 means perfect distribution.
186 for k
, v
in count
[t
].items():
187 # adjust/normalize by weight
189 adjusted
= float(v
) / target
[k
] / float(num
)
193 # Overweighted devices and their weights are factors to calculate reweight_urgency.
194 # One 10% underfilled device with 5 2% overfilled devices, is arguably a better
195 # situation than one 10% overfilled with 5 2% underfilled devices
198 F(x) = 2*phi(x) - 1, where phi(x) = cdf of standard normal distribution
199 x = (adjusted - avg)/avg.
200 Since, we're considering only over-weighted devices, x >= 0, and so phi(x) lies in [0.5, 1).
201 To bring range of F(x) in range [0, 1), we need to make the above modification.
203 In general, we need to use a function F(x), where x = (adjusted - avg)/avg
204 1. which is bounded between 0 and 1, so that ultimately reweight_urgency will also be bounded.
205 2. A larger value of x, should imply more urgency to reweight.
206 3. Also, the difference between F(x) when x is large, should be minimal.
207 4. The value of F(x) should get close to 1 (highest urgency to reweight) with steeply.
209 Could have used F(x) = (1 - e^(-x)). But that had slower convergence to 1, compared to the one currently in use.
211 cdf of standard normal distribution: https://stackoverflow.com/a/29273201
213 score
+= target
[k
] * (math
.erf(((adjusted
- avg
) / avg
) / math
.sqrt(2.0)))
214 sum_weight
+= target
[k
]
215 dev
+= (avg
- adjusted
) * (avg
- adjusted
)
216 stddev
= math
.sqrt(dev
/ float(max(num
- 1, 1)))
217 score
= score
/ max(sum_weight
, 1)
219 'max': max(count
[t
].values()),
220 'min': min(count
[t
].values()),
223 'sum_weight': sum_weight
,
229 class Module(MgrModule
):
231 Option(name
='active',
234 desc
='automatically balance PGs across cluster',
236 Option(name
='begin_time',
239 desc
='beginning time of day to automatically balance',
240 long_desc
='This is a time of day in the format HHMM.',
242 Option(name
='end_time',
245 desc
='ending time of day to automatically balance',
246 long_desc
='This is a time of day in the format HHMM.',
248 Option(name
='begin_weekday',
253 desc
='Restrict automatic balancing to this day of the week or later',
254 long_desc
='0 = Sunday, 1 = Monday, etc.',
256 Option(name
='end_weekday',
261 desc
='Restrict automatic balancing to days of the week earlier than this',
262 long_desc
='0 = Sunday, 1 = Monday, etc.',
264 Option(name
='crush_compat_max_iterations',
269 desc
='maximum number of iterations to attempt optimization',
271 Option(name
='crush_compat_metrics',
273 default
='pgs,objects,bytes',
274 desc
='metrics with which to calculate OSD utilization',
275 long_desc
='Value is a list of one or more of "pgs", "objects", or "bytes", and indicates which metrics to use to balance utilization.',
277 Option(name
='crush_compat_step',
282 desc
='aggressiveness of optimization',
283 long_desc
='.99 is very aggressive, .01 is less aggressive',
285 Option(name
='min_score',
288 desc
='minimum score, below which no optimization is attempted',
291 desc
='Balancer mode',
293 enum_allowed
=['none', 'crush-compat', 'upmap'],
295 Option(name
='sleep_interval',
298 desc
='how frequently to wake up and attempt optimization',
300 Option(name
='upmap_max_optimizations',
303 desc
='maximum upmap optimizations to make per attempt',
305 Option(name
='upmap_max_deviation',
309 desc
='deviation below which no optimization is attempted',
310 long_desc
='If the number of PGs are within this count then no optimization is attempted',
312 Option(name
='pool_ids',
315 desc
='pools which the automatic balancing will be limited to',
321 plans
: Dict
[str, Plan
] = {}
324 last_optimize_started
= ''
325 last_optimize_duration
= ''
327 no_optimization_needed
= False
328 success_string
= 'Optimization plan created successfully'
329 in_progress_string
= 'in progress'
331 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
332 super(Module
, self
).__init
__(*args
, **kwargs
)
335 @CLIReadCommand('balancer status')
336 def show_status(self
) -> Tuple
[int, str, str]:
341 'plans': list(self
.plans
.keys()),
342 'active': self
.active
,
343 'last_optimize_started': self
.last_optimize_started
,
344 'last_optimize_duration': self
.last_optimize_duration
,
345 'optimize_result': self
.optimize_result
,
346 'no_optimization_needed': self
.no_optimization_needed
,
347 'mode': self
.get_module_option('mode'),
349 return (0, json
.dumps(s
, indent
=4, sort_keys
=True), '')
351 @CLICommand('balancer mode')
352 def set_mode(self
, mode
: Mode
) -> Tuple
[int, str, str]:
356 if mode
== Mode
.upmap
:
357 min_compat_client
= self
.get_osdmap().dump().get('require_min_compat_client', '')
358 if min_compat_client
< 'luminous': # works well because version is alphabetized..
359 warn
= ('min_compat_client "%s" '
360 '< "luminous", which is required for pg-upmap. '
361 'Try "ceph osd set-require-min-compat-client luminous" '
362 'before enabling this mode' % min_compat_client
)
363 return (-errno
.EPERM
, '', warn
)
364 elif mode
== Mode
.crush_compat
:
365 ms
= MappingState(self
.get_osdmap(),
366 self
.get("pg_stats"),
367 self
.get("pool_stats"),
368 'initialize compat weight-set')
369 self
.get_compat_weight_set_weights(ms
) # ignore error
370 self
.set_module_option('mode', mode
.value
)
373 @CLICommand('balancer on')
374 def on(self
) -> Tuple
[int, str, str]:
376 Enable automatic balancing
379 self
.set_module_option('active', 'true')
384 @CLICommand('balancer off')
385 def off(self
) -> Tuple
[int, str, str]:
387 Disable automatic balancing
390 self
.set_module_option('active', 'false')
395 @CLIReadCommand('balancer pool ls')
396 def pool_ls(self
) -> Tuple
[int, str, str]:
398 List automatic balancing pools
400 Note that empty list means all existing pools will be automatic balancing targets,
401 which is the default behaviour of balancer.
403 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
406 pool_ids
= [int(p
) for p
in pool_ids
.split(',')]
407 pool_name_by_id
= dict((p
['pool'], p
['pool_name'])
408 for p
in self
.get_osdmap().dump().get('pools', []))
410 final_ids
: List
[int] = []
413 if p
in pool_name_by_id
:
415 final_names
.append(pool_name_by_id
[p
])
418 if should_prune
: # some pools were gone, prune
419 self
.set_module_option('pool_ids', ','.join(str(p
) for p
in final_ids
))
420 return (0, json
.dumps(sorted(final_names
), indent
=4, sort_keys
=True), '')
422 @CLICommand('balancer pool add')
423 def pool_add(self
, pools
: Sequence
[str]) -> Tuple
[int, str, str]:
425 Enable automatic balancing for specific pools
428 pool_id_by_name
= dict((p
['pool_name'], p
['pool'])
429 for p
in self
.get_osdmap().dump().get('pools', []))
430 invalid_names
= [p
for p
in raw_names
if p
not in pool_id_by_name
]
432 return (-errno
.EINVAL
, '', 'pool(s) %s not found' % invalid_names
)
433 to_add
= set(str(pool_id_by_name
[p
]) for p
in raw_names
if p
in pool_id_by_name
)
434 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
435 existing
= set(pool_ids
.split(',') if pool_ids
else [])
436 final
= to_add | existing
437 self
.set_module_option('pool_ids', ','.join(final
))
440 @CLICommand('balancer pool rm')
441 def pool_rm(self
, pools
: Sequence
[str]) -> Tuple
[int, str, str]:
443 Disable automatic balancing for specific pools
446 existing
= cast(str, self
.get_module_option('pool_ids'))
447 if existing
== '': # for idempotence
449 existing
= existing
.split(',')
450 osdmap
= self
.get_osdmap()
451 pool_ids
= [str(p
['pool']) for p
in osdmap
.dump().get('pools', [])]
452 pool_id_by_name
= dict((p
['pool_name'], p
['pool']) for p
in osdmap
.dump().get('pools', []))
453 final
= [p
for p
in existing
if p
in pool_ids
]
454 to_delete
= [str(pool_id_by_name
[p
]) for p
in raw_names
if p
in pool_id_by_name
]
455 final
= set(final
) - set(to_delete
)
456 self
.set_module_option('pool_ids', ','.join(final
))
459 def _state_from_option(self
, option
: Optional
[str] = None) -> Tuple
[MappingState
, List
[str]]:
462 ms
= MappingState(self
.get_osdmap(),
463 self
.get("pg_stats"),
464 self
.get("pool_stats"),
466 elif option
in self
.plans
:
467 plan
= self
.plans
.get(option
)
470 if plan
.mode
== 'upmap':
471 # Note that for upmap, to improve the efficiency,
472 # we use a basic version of Plan without keeping the obvious
473 # *redundant* MS member.
474 # Hence ms might not be accurate here since we are basically
475 # using an old snapshotted osdmap vs a fresh copy of pg_stats.
476 # It should not be a big deal though..
477 ms
= MappingState(plan
.osdmap
,
478 self
.get("pg_stats"),
479 self
.get("pool_stats"),
480 f
'plan "{plan.name}"')
482 ms
= cast(MsPlan
, plan
).final_state()
484 # not a plan, does it look like a pool?
485 osdmap
= self
.get_osdmap()
486 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
487 if option
not in valid_pool_names
:
488 raise ValueError(f
'option "{option}" not a plan or a pool')
490 ms
= MappingState(osdmap
,
491 self
.get("pg_stats"),
492 self
.get("pool_stats"),
496 @CLIReadCommand('balancer eval-verbose')
497 def plan_eval_verbose(self
, option
: Optional
[str] = None):
499 Evaluate data distribution for the current cluster or specific pool or specific
503 ms
, pools
= self
._state
_from
_option
(option
)
504 return (0, self
.evaluate(ms
, pools
, verbose
=True), '')
505 except ValueError as e
:
506 return (-errno
.EINVAL
, '', str(e
))
508 @CLIReadCommand('balancer eval')
509 def plan_eval_brief(self
, option
: Optional
[str] = None):
511 Evaluate data distribution for the current cluster or specific pool or specific plan
514 ms
, pools
= self
._state
_from
_option
(option
)
515 return (0, self
.evaluate(ms
, pools
, verbose
=False), '')
516 except ValueError as e
:
517 return (-errno
.EINVAL
, '', str(e
))
519 @CLIReadCommand('balancer optimize')
520 def plan_optimize(self
, plan
: str, pools
: List
[str] = []) -> Tuple
[int, str, str]:
522 Run optimizer to create a new plan
524 # The GIL can be release by the active balancer, so disallow when active
526 return (-errno
.EINVAL
, '', 'Balancer enabled, disable to optimize manually')
528 return (-errno
.EINVAL
, '', 'Balancer finishing up....try again')
529 osdmap
= self
.get_osdmap()
530 valid_pool_names
= [p
['pool_name'] for p
in osdmap
.dump().get('pools', [])]
531 invalid_pool_names
= []
533 if p
not in valid_pool_names
:
534 invalid_pool_names
.append(p
)
535 if len(invalid_pool_names
):
536 return (-errno
.EINVAL
, '', 'pools %s not found' % invalid_pool_names
)
537 plan_
= self
.plan_create(plan
, osdmap
, pools
)
538 self
.last_optimize_started
= time
.asctime(time
.localtime())
539 self
.optimize_result
= self
.in_progress_string
541 r
, detail
= self
.optimize(plan_
)
543 self
.last_optimize_duration
= str(datetime
.timedelta(seconds
=(end
- start
)))
545 # Add plan if an optimization was created
546 self
.optimize_result
= self
.success_string
547 self
.plans
[plan
] = plan_
549 self
.optimize_result
= detail
550 return (r
, '', detail
)
552 @CLIReadCommand('balancer show')
553 def plan_show(self
, plan
: str) -> Tuple
[int, str, str]:
555 Show details of an optimization plan
557 plan_
= self
.plans
.get(plan
)
559 return (-errno
.ENOENT
, '', f
'plan {plan} not found')
560 return (0, plan_
.show(), '')
562 @CLICommand('balancer rm')
563 def plan_rm(self
, plan
: str) -> Tuple
[int, str, str]:
565 Discard an optimization plan
567 if plan
in self
.plans
:
571 @CLICommand('balancer reset')
572 def plan_reset(self
) -> Tuple
[int, str, str]:
574 Discard all optimization plans
579 @CLIReadCommand('balancer dump')
580 def plan_dump(self
, plan
: str) -> Tuple
[int, str, str]:
582 Show an optimization plan
584 plan_
= self
.plans
.get(plan
)
586 return -errno
.ENOENT
, '', f
'plan {plan} not found'
588 return (0, plan_
.dump(), '')
590 @CLIReadCommand('balancer ls')
591 def plan_ls(self
) -> Tuple
[int, str, str]:
595 return (0, json
.dumps([p
for p
in self
.plans
], indent
=4, sort_keys
=True), '')
597 @CLIReadCommand('balancer execute')
598 def plan_execute(self
, plan
: str) -> Tuple
[int, str, str]:
600 Execute an optimization plan
602 # The GIL can be release by the active balancer, so disallow when active
604 return (-errno
.EINVAL
, '', 'Balancer enabled, disable to execute a plan')
606 return (-errno
.EINVAL
, '', 'Balancer finishing up....try again')
607 plan_
= self
.plans
.get(plan
)
609 return (-errno
.ENOENT
, '', f
'plan {plan} not found')
610 r
, detail
= self
.execute(plan_
)
612 return (r
, '', detail
)
614 def shutdown(self
) -> None:
615 self
.log
.info('Stopping')
619 def time_permit(self
) -> bool:
620 local_time
= time
.localtime()
621 time_of_day
= time
.strftime('%H%M', local_time
)
622 weekday
= (local_time
.tm_wday
+ 1) % 7 # be compatible with C
625 def check_time(time
: str, option
: str):
627 self
.log
.error('invalid time for %s - expected HHMM format', option
)
629 datetime
.time(int(time
[:2]), int(time
[2:]))
630 except ValueError as err
:
631 self
.log
.error('invalid time for %s - %s', option
, err
)
633 begin_time
= cast(str, self
.get_module_option('begin_time'))
634 check_time(begin_time
, 'begin_time')
635 end_time
= cast(str, self
.get_module_option('end_time'))
636 check_time(end_time
, 'end_time')
637 if begin_time
< end_time
:
638 permit
= begin_time
<= time_of_day
< end_time
639 elif begin_time
== end_time
:
642 permit
= time_of_day
>= begin_time
or time_of_day
< end_time
644 self
.log
.debug("should run between %s - %s, now %s, skipping",
645 begin_time
, end_time
, time_of_day
)
648 begin_weekday
= cast(int, self
.get_module_option('begin_weekday'))
649 end_weekday
= cast(int, self
.get_module_option('end_weekday'))
650 if begin_weekday
< end_weekday
:
651 permit
= begin_weekday
<= weekday
<= end_weekday
652 elif begin_weekday
== end_weekday
:
655 permit
= weekday
>= begin_weekday
or weekday
< end_weekday
657 self
.log
.debug("should run between weekday %d - %d, now %d, skipping",
658 begin_weekday
, end_weekday
, weekday
)
663 def serve(self
) -> None:
664 self
.log
.info('Starting')
666 self
.active
= cast(bool, self
.get_module_option('active'))
667 sleep_interval
= cast(float, self
.get_module_option('sleep_interval'))
668 self
.log
.debug('Waking up [%s, now %s]',
669 "active" if self
.active
else "inactive",
670 time
.strftime(TIME_FORMAT
, time
.localtime()))
671 if self
.active
and self
.time_permit():
672 self
.log
.debug('Running')
673 name
= 'auto_%s' % time
.strftime(TIME_FORMAT
, time
.gmtime())
674 osdmap
= self
.get_osdmap()
675 pool_ids
= cast(str, self
.get_module_option('pool_ids'))
677 allow
= [int(p
) for p
in pool_ids
.split(',')]
680 final
: List
[str] = []
682 pools
= osdmap
.dump().get('pools', [])
683 valid
= [p
['pool'] for p
in pools
]
684 ids
= set(allow
) & set(valid
)
685 if set(allow
) - set(valid
): # some pools were gone, prune
686 self
.set_module_option('pool_ids', ','.join(str(p
) for p
in ids
))
687 pool_name_by_id
= dict((p
['pool'], p
['pool_name']) for p
in pools
)
688 final
= [pool_name_by_id
[p
] for p
in ids
if p
in pool_name_by_id
]
689 plan
= self
.plan_create(name
, osdmap
, final
)
690 self
.optimizing
= True
691 self
.last_optimize_started
= time
.asctime(time
.localtime())
692 self
.optimize_result
= self
.in_progress_string
694 r
, detail
= self
.optimize(plan
)
696 self
.last_optimize_duration
= str(datetime
.timedelta(seconds
=(end
- start
)))
698 self
.optimize_result
= self
.success_string
701 self
.optimize_result
= detail
702 self
.optimizing
= False
703 self
.log
.debug('Sleeping for %d', sleep_interval
)
704 self
.event
.wait(sleep_interval
)
707 def plan_create(self
, name
: str, osdmap
: OSDMap
, pools
: List
[str]) -> Plan
:
708 mode
= cast(str, self
.get_module_option('mode'))
710 # drop unnecessary MS member for upmap mode.
711 # this way we could effectively eliminate the usage of a
712 # complete pg_stats, which can become horribly inefficient
714 plan
= Plan(name
, mode
, osdmap
, pools
)
719 self
.get("pg_stats"),
720 self
.get("pool_stats"),
721 'plan %s initial' % name
),
725 def calc_eval(self
, ms
: MappingState
, pools
: List
[str]) -> Eval
:
729 for p
in ms
.osdmap_dump
.get('pools', []):
730 if len(pools
) and p
['pool_name'] not in pools
:
732 # skip dead or not-yet-ready pools too
733 if p
['pool'] not in ms
.poolids
:
735 pe
.pool_name
[p
['pool']] = p
['pool_name']
736 pe
.pool_id
[p
['pool_name']] = p
['pool']
737 pool_rule
[p
['pool_name']] = p
['crush_rule']
738 pe
.pool_roots
[p
['pool_name']] = []
739 pool_info
[p
['pool_name']] = p
740 if len(pool_info
) == 0:
742 self
.log
.debug('pool_name %s' % pe
.pool_name
)
743 self
.log
.debug('pool_id %s' % pe
.pool_id
)
744 self
.log
.debug('pools %s' % pools
)
745 self
.log
.debug('pool_rule %s' % pool_rule
)
747 osd_weight
= {a
['osd']: a
['weight']
748 for a
in ms
.osdmap_dump
.get('osds', []) if a
['weight'] > 0}
750 # get expected distributions by root
751 actual_by_root
: Dict
[str, Dict
[str, dict]] = {}
752 rootids
= ms
.crush
.find_takes()
754 for rootid
in rootids
:
755 ls
= ms
.osdmap
.get_pools_by_take(rootid
)
757 # find out roots associating with pools we are passed in
759 if candidate
in pe
.pool_name
:
760 want
.append(candidate
)
763 root
= ms
.crush
.get_item_name(rootid
)
764 pe
.root_pools
[root
] = []
766 pe
.pool_roots
[pe
.pool_name
[poolid
]].append(root
)
767 pe
.root_pools
[root
].append(pe
.pool_name
[poolid
])
768 pe
.root_ids
[root
] = rootid
770 weight_map
= ms
.crush
.get_take_weight_osd_map(rootid
)
772 osd
: cw
* osd_weight
[osd
]
773 for osd
, cw
in weight_map
.items() if osd
in osd_weight
and cw
> 0
775 sum_w
= sum(adjusted_map
.values())
776 assert len(adjusted_map
) == 0 or sum_w
> 0
777 pe
.target_by_root
[root
] = {osd
: w
/ sum_w
778 for osd
, w
in adjusted_map
.items()}
779 actual_by_root
[root
] = {
784 for osd
in pe
.target_by_root
[root
]:
785 actual_by_root
[root
]['pgs'][osd
] = 0
786 actual_by_root
[root
]['objects'][osd
] = 0
787 actual_by_root
[root
]['bytes'][osd
] = 0
788 pe
.total_by_root
[root
] = {
793 self
.log
.debug('pool_roots %s' % pe
.pool_roots
)
794 self
.log
.debug('root_pools %s' % pe
.root_pools
)
795 self
.log
.debug('target_by_root %s' % pe
.target_by_root
)
797 # pool and root actual
798 for pool
, pi
in pool_info
.items():
800 pm
= ms
.pg_up_by_poolid
[poolid
]
807 for pgid
, up
in pm
.items():
808 for osd
in [int(osd
) for osd
in up
]:
809 if osd
== CRUSHMap
.ITEM_NONE
:
811 if osd
not in pgs_by_osd
:
813 objects_by_osd
[osd
] = 0
814 bytes_by_osd
[osd
] = 0
816 objects_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_objects']
817 bytes_by_osd
[osd
] += ms
.pg_stat
[pgid
]['num_bytes']
818 # pick a root to associate this pg instance with.
819 # note that this is imprecise if the roots have
820 # overlapping children.
821 # FIXME: divide bytes by k for EC pools.
822 for root
in pe
.pool_roots
[pool
]:
823 if osd
in pe
.target_by_root
[root
]:
824 actual_by_root
[root
]['pgs'][osd
] += 1
825 actual_by_root
[root
]['objects'][osd
] += ms
.pg_stat
[pgid
]['num_objects']
826 actual_by_root
[root
]['bytes'][osd
] += ms
.pg_stat
[pgid
]['num_bytes']
828 objects
+= ms
.pg_stat
[pgid
]['num_objects']
829 bytes
+= ms
.pg_stat
[pgid
]['num_bytes']
830 pe
.total_by_root
[root
]['pgs'] += 1
831 pe
.total_by_root
[root
]['objects'] += ms
.pg_stat
[pgid
]['num_objects']
832 pe
.total_by_root
[root
]['bytes'] += ms
.pg_stat
[pgid
]['num_bytes']
834 pe
.count_by_pool
[pool
] = {
837 for k
, v
in pgs_by_osd
.items()
841 for k
, v
in objects_by_osd
.items()
845 for k
, v
in bytes_by_osd
.items()
848 pe
.actual_by_pool
[pool
] = {
850 k
: float(v
) / float(max(pgs
, 1))
851 for k
, v
in pgs_by_osd
.items()
854 k
: float(v
) / float(max(objects
, 1))
855 for k
, v
in objects_by_osd
.items()
858 k
: float(v
) / float(max(bytes
, 1))
859 for k
, v
in bytes_by_osd
.items()
862 pe
.total_by_pool
[pool
] = {
867 for root
in pe
.total_by_root
:
868 pe
.count_by_root
[root
] = {
871 for k
, v
in actual_by_root
[root
]['pgs'].items()
875 for k
, v
in actual_by_root
[root
]['objects'].items()
879 for k
, v
in actual_by_root
[root
]['bytes'].items()
882 pe
.actual_by_root
[root
] = {
884 k
: float(v
) / float(max(pe
.total_by_root
[root
]['pgs'], 1))
885 for k
, v
in actual_by_root
[root
]['pgs'].items()
888 k
: float(v
) / float(max(pe
.total_by_root
[root
]['objects'], 1))
889 for k
, v
in actual_by_root
[root
]['objects'].items()
892 k
: float(v
) / float(max(pe
.total_by_root
[root
]['bytes'], 1))
893 for k
, v
in actual_by_root
[root
]['bytes'].items()
896 self
.log
.debug('actual_by_pool %s' % pe
.actual_by_pool
)
897 self
.log
.debug('actual_by_root %s' % pe
.actual_by_root
)
899 # average and stddev and score
903 pe
.target_by_root
[a
],
905 ) for a
, b
in pe
.count_by_root
.items()
907 self
.log
.debug('stats_by_root %s' % pe
.stats_by_root
)
909 # the scores are already normalized
912 'pgs': pe
.stats_by_root
[r
]['pgs']['score'],
913 'objects': pe
.stats_by_root
[r
]['objects']['score'],
914 'bytes': pe
.stats_by_root
[r
]['bytes']['score'],
915 } for r
in pe
.total_by_root
.keys()
917 self
.log
.debug('score_by_root %s' % pe
.score_by_root
)
919 # get the list of score metrics, comma separated
920 metrics
= cast(str, self
.get_module_option('crush_compat_metrics')).split(',')
922 # total score is just average of normalized stddevs
924 for r
, vs
in pe
.score_by_root
.items():
925 for k
, v
in vs
.items():
928 pe
.score
/= len(metrics
) * len(roots
)
931 def evaluate(self
, ms
: MappingState
, pools
: List
[str], verbose
: bool = False) -> str:
932 pe
= self
.calc_eval(ms
, pools
)
933 return pe
.show(verbose
=verbose
)
935 def optimize(self
, plan
: Plan
) -> Tuple
[int, str]:
936 self
.log
.info('Optimize plan %s' % plan
.name
)
937 max_misplaced
= cast(float, self
.get_ceph_option('target_max_misplaced_ratio'))
938 self
.log
.info('Mode %s, max misplaced %f' %
939 (plan
.mode
, max_misplaced
))
941 info
= self
.get('pg_status')
942 unknown
= info
.get('unknown_pgs_ratio', 0.0)
943 degraded
= info
.get('degraded_ratio', 0.0)
944 inactive
= info
.get('inactive_pgs_ratio', 0.0)
945 misplaced
= info
.get('misplaced_ratio', 0.0)
946 plan
.pg_status
= info
947 self
.log
.debug('unknown %f degraded %f inactive %f misplaced %g',
948 unknown
, degraded
, inactive
, misplaced
)
950 detail
= 'Some PGs (%f) are unknown; try again later' % unknown
951 self
.log
.info(detail
)
952 return -errno
.EAGAIN
, detail
954 detail
= 'Some objects (%f) are degraded; try again later' % degraded
955 self
.log
.info(detail
)
956 return -errno
.EAGAIN
, detail
958 detail
= 'Some PGs (%f) are inactive; try again later' % inactive
959 self
.log
.info(detail
)
960 return -errno
.EAGAIN
, detail
961 elif misplaced
>= max_misplaced
:
962 detail
= 'Too many objects (%f > %f) are misplaced; ' \
963 'try again later' % (misplaced
, max_misplaced
)
964 self
.log
.info(detail
)
965 return -errno
.EAGAIN
, detail
967 if plan
.mode
== 'upmap':
968 return self
.do_upmap(plan
)
969 elif plan
.mode
== 'crush-compat':
970 return self
.do_crush_compat(cast(MsPlan
, plan
))
971 elif plan
.mode
== 'none':
972 detail
= 'Please do "ceph balancer mode" to choose a valid mode first'
973 self
.log
.info('Idle')
974 return -errno
.ENOEXEC
, detail
976 detail
= 'Unrecognized mode %s' % plan
.mode
977 self
.log
.info(detail
)
978 return -errno
.EINVAL
, detail
980 def do_upmap(self
, plan
: Plan
) -> Tuple
[int, str]:
981 self
.log
.info('do_upmap')
982 max_optimizations
= cast(float, self
.get_module_option('upmap_max_optimizations'))
983 max_deviation
= cast(int, self
.get_module_option('upmap_max_deviation'))
984 osdmap_dump
= plan
.osdmap_dump
989 pools
= [str(i
['pool_name']) for i
in osdmap_dump
.get('pools', [])]
991 detail
= 'No pools available'
992 self
.log
.info(detail
)
993 return -errno
.ENOENT
, detail
994 # shuffle pool list so they all get equal (in)attention
995 random
.shuffle(pools
)
996 self
.log
.info('pools %s' % pools
)
1001 left
= max_optimizations
1002 pools_with_pg_merge
= [p
['pool_name'] for p
in osdmap_dump
.get('pools', [])
1003 if p
['pg_num'] > p
['pg_num_target']]
1004 crush_rule_by_pool_name
= dict((p
['pool_name'], p
['crush_rule'])
1005 for p
in osdmap_dump
.get('pools', []))
1007 if pool
not in crush_rule_by_pool_name
:
1008 self
.log
.info('pool %s does not exist' % pool
)
1010 if pool
in pools_with_pg_merge
:
1011 self
.log
.info('pool %s has pending PG(s) for merging, skipping for now' % pool
)
1013 adjusted_pools
.append(pool
)
1014 # shuffle so all pools get equal (in)attention
1015 random
.shuffle(adjusted_pools
)
1016 pool_dump
= osdmap_dump
.get('pools', [])
1017 for pool
in adjusted_pools
:
1019 if p
['pool_name'] == pool
:
1023 # note that here we deliberately exclude any scrubbing pgs too
1024 # since scrubbing activities have significant impacts on performance
1025 num_pg_active_clean
= 0
1026 for p
in plan
.pg_status
.get('pgs_by_pool_state', []):
1027 pgs_pool_id
= p
['pool_id']
1028 if pgs_pool_id
!= pool_id
:
1030 for s
in p
['pg_state_counts']:
1031 if s
['state_name'] == 'active+clean':
1032 num_pg_active_clean
+= s
['count']
1034 available
= min(left
, num_pg_active_clean
)
1035 did
= plan
.osdmap
.calc_pg_upmaps(inc
, max_deviation
, available
, [pool
])
1040 self
.log
.info('prepared %d/%d changes' % (total_did
, max_optimizations
))
1042 self
.no_optimization_needed
= True
1043 return -errno
.EALREADY
, 'Unable to find further optimization, ' \
1044 'or pool(s) pg_num is decreasing, ' \
1045 'or distribution is already perfect'
1048 def do_crush_compat(self
, plan
: MsPlan
) -> Tuple
[int, str]:
1049 self
.log
.info('do_crush_compat')
1050 max_iterations
= cast(int, self
.get_module_option('crush_compat_max_iterations'))
1051 if max_iterations
< 1:
1052 return -errno
.EINVAL
, '"crush_compat_max_iterations" must be >= 1'
1053 step
= cast(float, self
.get_module_option('crush_compat_step'))
1054 if step
<= 0 or step
>= 1.0:
1055 return -errno
.EINVAL
, '"crush_compat_step" must be in (0, 1)'
1056 max_misplaced
= cast(float, self
.get_ceph_option('target_max_misplaced_ratio'))
1061 crush
= osdmap
.get_crush()
1062 pe
= self
.calc_eval(ms
, plan
.pools
)
1063 min_score_to_optimize
= cast(float, self
.get_module_option('min_score'))
1064 if pe
.score
<= min_score_to_optimize
:
1066 detail
= 'Distribution is already perfect'
1068 detail
= 'score %f <= min_score %f, will not optimize' \
1069 % (pe
.score
, min_score_to_optimize
)
1070 self
.log
.info(detail
)
1071 return -errno
.EALREADY
, detail
1073 # get current osd reweights
1074 orig_osd_weight
= {a
['osd']: a
['weight']
1075 for a
in ms
.osdmap_dump
.get('osds', [])}
1077 # get current compat weight-set weights
1078 orig_ws
= self
.get_compat_weight_set_weights(ms
)
1080 return -errno
.EAGAIN
, 'compat weight-set not available'
1081 orig_ws
= {a
: b
for a
, b
in orig_ws
.items() if a
>= 0}
1083 # Make sure roots don't overlap their devices. If so, we
1085 roots
= list(pe
.target_by_root
.keys())
1086 self
.log
.debug('roots %s', roots
)
1087 visited
: Dict
[int, str] = {}
1088 overlap
: Dict
[int, List
[str]] = {}
1089 for root
, wm
in pe
.target_by_root
.items():
1092 if osd
not in overlap
:
1093 overlap
[osd
] = [visited
[osd
]]
1094 overlap
[osd
].append(root
)
1096 if len(overlap
) > 0:
1097 detail
= 'Some osds belong to multiple subtrees: %s' % \
1099 self
.log
.error(detail
)
1100 return -errno
.EOPNOTSUPP
, detail
1102 # rebalance by pgs, objects, or bytes
1103 metrics
= cast(str, self
.get_module_option('crush_compat_metrics')).split(',')
1104 key
= metrics
[0] # balancing using the first score metric
1105 if key
not in ['pgs', 'bytes', 'objects']:
1106 self
.log
.warning("Invalid crush_compat balancing key %s. Using 'pgs'." % key
)
1110 best_ws
= copy
.deepcopy(orig_ws
)
1111 best_ow
= copy
.deepcopy(orig_osd_weight
)
1113 left
= max_iterations
1115 next_ws
= copy
.deepcopy(best_ws
)
1116 next_ow
= copy
.deepcopy(best_ow
)
1119 self
.log
.debug('best_ws %s' % best_ws
)
1120 random
.shuffle(roots
)
1122 pools
= best_pe
.root_pools
[root
]
1123 osds
= len(best_pe
.target_by_root
[root
])
1124 min_pgs
= osds
* min_pg_per_osd
1125 if best_pe
.total_by_root
[root
][key
] < min_pgs
:
1126 self
.log
.info('Skipping root %s (pools %s), total pgs %d '
1127 '< minimum %d (%d per osd)',
1129 best_pe
.total_by_root
[root
][key
],
1130 min_pgs
, min_pg_per_osd
)
1132 self
.log
.info('Balancing root %s (pools %s) by %s' %
1134 target
= best_pe
.target_by_root
[root
]
1135 actual
= best_pe
.actual_by_root
[root
][key
]
1136 queue
= sorted(actual
.keys(),
1137 key
=lambda osd
: -abs(target
[osd
] - actual
[osd
]))
1139 if orig_osd_weight
[osd
] == 0:
1140 self
.log
.debug('skipping out osd.%d', osd
)
1142 deviation
= target
[osd
] - actual
[osd
]
1145 self
.log
.debug('osd.%d deviation %f', osd
, deviation
)
1146 weight
= best_ws
[osd
]
1147 ow
= orig_osd_weight
[osd
]
1149 calc_weight
= target
[osd
] / actual
[osd
] * weight
* ow
1151 # for newly created osds, reset calc_weight at target value
1152 # this way weight-set will end up absorbing *step* of its
1153 # target (final) value at the very beginning and slowly catch up later.
1154 # note that if this turns out causing too many misplaced
1155 # pgs, then we'll reduce step and retry
1156 calc_weight
= target
[osd
]
1157 new_weight
= weight
* (1.0 - step
) + calc_weight
* step
1158 self
.log
.debug('Reweight osd.%d %f -> %f', osd
, weight
,
1160 next_ws
[osd
] = new_weight
1162 new_ow
= min(1.0, max(step
+ (1.0 - step
) * ow
,
1164 self
.log
.debug('Reweight osd.%d reweight %f -> %f',
1166 next_ow
[osd
] = new_ow
1168 # normalize weights under this root
1169 root_weight
= crush
.get_item_weight(pe
.root_ids
[root
])
1170 root_sum
= sum(b
for a
, b
in next_ws
.items()
1171 if a
in target
.keys())
1172 if root_sum
> 0 and root_weight
> 0:
1173 factor
= root_sum
/ root_weight
1174 self
.log
.debug('normalizing root %s %d, weight %f, '
1175 'ws sum %f, factor %f',
1176 root
, pe
.root_ids
[root
], root_weight
,
1178 for osd
in actual
.keys():
1179 next_ws
[osd
] = next_ws
[osd
] / factor
1182 plan
.compat_ws
= copy
.deepcopy(next_ws
)
1183 next_ms
= plan
.final_state()
1184 next_pe
= self
.calc_eval(next_ms
, plan
.pools
)
1185 next_misplaced
= next_ms
.calc_misplaced_from(ms
)
1186 self
.log
.debug('Step result score %f -> %f, misplacing %f',
1187 best_pe
.score
, next_pe
.score
, next_misplaced
)
1189 if next_misplaced
> max_misplaced
:
1190 if best_pe
.score
< pe
.score
:
1191 self
.log
.debug('Step misplaced %f > max %f, stopping',
1192 next_misplaced
, max_misplaced
)
1195 next_ws
= copy
.deepcopy(best_ws
)
1196 next_ow
= copy
.deepcopy(best_ow
)
1197 self
.log
.debug('Step misplaced %f > max %f, reducing step to %f',
1198 next_misplaced
, max_misplaced
, step
)
1200 if next_pe
.score
> best_pe
.score
* 1.0001:
1202 if bad_steps
< 5 and random
.randint(0, 100) < 70:
1203 self
.log
.debug('Score got worse, taking another step')
1206 next_ws
= copy
.deepcopy(best_ws
)
1207 next_ow
= copy
.deepcopy(best_ow
)
1208 self
.log
.debug('Score got worse, trying smaller step %f',
1213 best_ws
= copy
.deepcopy(next_ws
)
1214 best_ow
= copy
.deepcopy(next_ow
)
1215 if best_pe
.score
== 0:
1219 # allow a small regression if we are phasing out osd weights
1221 if best_ow
!= orig_osd_weight
:
1224 if best_pe
.score
< pe
.score
+ fudge
:
1225 self
.log
.info('Success, score %f -> %f', pe
.score
, best_pe
.score
)
1226 plan
.compat_ws
= best_ws
1227 for osd
, w
in best_ow
.items():
1228 if w
!= orig_osd_weight
[osd
]:
1229 self
.log
.debug('osd.%d reweight %f', osd
, w
)
1230 plan
.osd_weights
[osd
] = w
1233 self
.log
.info('Failed to find further optimization, score %f',
1236 return -errno
.EDOM
, 'Unable to find further optimization, ' \
1237 'change balancer mode and retry might help'
1239 def get_compat_weight_set_weights(self
, ms
: MappingState
):
1240 have_choose_args
= CRUSHMap
.have_default_choose_args(ms
.crush_dump
)
1241 if have_choose_args
:
1242 # get number of buckets in choose_args
1243 choose_args_len
= len(CRUSHMap
.get_default_choose_args(ms
.crush_dump
))
1244 if not have_choose_args
or choose_args_len
!= len(ms
.crush_dump
['buckets']):
1245 # enable compat weight-set first
1246 self
.log
.debug('no choose_args or all buckets do not have weight-sets')
1247 self
.log
.debug('ceph osd crush weight-set create-compat')
1248 result
= CommandResult('')
1249 self
.send_command(result
, 'mon', '', json
.dumps({
1250 'prefix': 'osd crush weight-set create-compat',
1253 r
, outb
, outs
= result
.wait()
1255 self
.log
.error('Error creating compat weight-set')
1258 result
= CommandResult('')
1259 self
.send_command(result
, 'mon', '', json
.dumps({
1260 'prefix': 'osd crush dump',
1263 r
, outb
, outs
= result
.wait()
1265 self
.log
.error('Error dumping crush map')
1268 crushmap
= json
.loads(outb
)
1269 except json
.JSONDecodeError
:
1270 raise RuntimeError('unable to parse crush map')
1272 crushmap
= ms
.crush_dump
1274 raw
= CRUSHMap
.get_default_choose_args(crushmap
)
1278 for t
in crushmap
['buckets']:
1279 if t
['id'] == b
['bucket_id']:
1283 raise RuntimeError('could not find bucket %s' % b
['bucket_id'])
1284 self
.log
.debug('bucket items %s' % bucket
['items'])
1285 self
.log
.debug('weight set %s' % b
['weight_set'][0])
1286 if len(bucket
['items']) != len(b
['weight_set'][0]):
1287 raise RuntimeError('weight-set size does not match bucket items')
1288 for pos
in range(len(bucket
['items'])):
1289 weight_set
[bucket
['items'][pos
]['id']] = b
['weight_set'][0][pos
]
1291 self
.log
.debug('weight_set weights %s' % weight_set
)
1294 def do_crush(self
) -> None:
1295 self
.log
.info('do_crush (not yet implemented)')
1297 def do_osd_weight(self
) -> None:
1298 self
.log
.info('do_osd_weight (not yet implemented)')
1300 def execute(self
, plan
: Plan
) -> Tuple
[int, str]:
1301 self
.log
.info('Executing plan %s' % plan
.name
)
1306 if len(plan
.compat_ws
):
1307 ms_plan
= cast(MsPlan
, plan
)
1308 if not CRUSHMap
.have_default_choose_args(ms_plan
.initial
.crush_dump
):
1309 self
.log
.debug('ceph osd crush weight-set create-compat')
1310 result
= CommandResult('')
1311 self
.send_command(result
, 'mon', '', json
.dumps({
1312 'prefix': 'osd crush weight-set create-compat',
1315 r
, outb
, outs
= result
.wait()
1317 self
.log
.error('Error creating compat weight-set')
1320 for osd
, weight
in plan
.compat_ws
.items():
1321 self
.log
.info('ceph osd crush weight-set reweight-compat osd.%d %f',
1323 result
= CommandResult('')
1324 self
.send_command(result
, 'mon', '', json
.dumps({
1325 'prefix': 'osd crush weight-set reweight-compat',
1327 'item': 'osd.%d' % osd
,
1330 commands
.append(result
)
1334 for osd
, weight
in plan
.osd_weights
.items():
1335 reweightn
[str(osd
)] = str(int(weight
* float(0x10000)))
1337 self
.log
.info('ceph osd reweightn %s', reweightn
)
1338 result
= CommandResult('')
1339 self
.send_command(result
, 'mon', '', json
.dumps({
1340 'prefix': 'osd reweightn',
1342 'weights': json
.dumps(reweightn
),
1344 commands
.append(result
)
1347 incdump
= plan
.inc
.dump()
1348 for item
in incdump
.get('new_pg_upmap', []):
1349 self
.log
.info('ceph osd pg-upmap %s mappings %s', item
['pgid'],
1351 result
= CommandResult('foo')
1352 self
.send_command(result
, 'mon', '', json
.dumps({
1353 'prefix': 'osd pg-upmap',
1355 'pgid': item
['pgid'],
1358 commands
.append(result
)
1360 for pgid
in incdump
.get('old_pg_upmap', []):
1361 self
.log
.info('ceph osd rm-pg-upmap %s', pgid
)
1362 result
= CommandResult('foo')
1363 self
.send_command(result
, 'mon', '', json
.dumps({
1364 'prefix': 'osd rm-pg-upmap',
1368 commands
.append(result
)
1370 for item
in incdump
.get('new_pg_upmap_items', []):
1371 self
.log
.info('ceph osd pg-upmap-items %s mappings %s', item
['pgid'],
1374 for m
in item
['mappings']:
1375 osdlist
+= [m
['from'], m
['to']]
1376 result
= CommandResult('foo')
1377 self
.send_command(result
, 'mon', '', json
.dumps({
1378 'prefix': 'osd pg-upmap-items',
1380 'pgid': item
['pgid'],
1383 commands
.append(result
)
1385 for pgid
in incdump
.get('old_pg_upmap_items', []):
1386 self
.log
.info('ceph osd rm-pg-upmap-items %s', pgid
)
1387 result
= CommandResult('foo')
1388 self
.send_command(result
, 'mon', '', json
.dumps({
1389 'prefix': 'osd rm-pg-upmap-items',
1393 commands
.append(result
)
1396 self
.log
.debug('commands %s' % commands
)
1397 for result
in commands
:
1398 r
, outb
, outs
= result
.wait()
1400 self
.log
.error('execute error: r = %d, detail = %s' % (r
, outs
))
1402 self
.log
.debug('done')
1405 def gather_telemetry(self
) -> Dict
[str, Any
]:
1407 'active': self
.active
,