1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/compat.h"
18 #include "MDBalancer.h"
20 #include "mon/MonClient.h"
28 #include "include/Context.h"
29 #include "msg/Messenger.h"
30 #include "messages/MHeartbeat.h"
39 #include "common/config.h"
40 #include "common/errno.h"
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
45 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal "
49 auto subsys = ceph_subsys_mds;\
50 if ((dout_context)->_conf->subsys.should_gather(ceph_subsys_mds_balancer, lvl)) {\
51 subsys = ceph_subsys_mds_balancer;\
53 dout_impl(dout_context, subsys, lvl) dout_prefix
55 #define dendl dendl_impl; } while (0)
58 #define MIN_LOAD 50 // ??
59 #define MIN_REEXPORT 5 // will automatically reexport
60 #define MIN_OFFLOAD 10 // point at which i stop trying, close enough
63 /* This function DOES put the passed message before returning */
64 int MDBalancer::proc_message(Message
*m
)
66 switch (m
->get_type()) {
68 case MSG_MDS_HEARTBEAT
:
69 handle_heartbeat(static_cast<MHeartbeat
*>(m
));
73 dout(0) << " balancer unknown message " << m
->get_type() << dendl
;
74 assert(0 == "balancer unknown message");
80 void MDBalancer::handle_export_pins(void)
82 auto &q
= mds
->mdcache
->export_pin_queue
;
84 dout(20) << "export_pin_queue size=" << q
.size() << dendl
;
85 while (it
!= q
.end()) {
87 CInode
*in
= *current
;
89 mds_rank_t export_pin
= in
->get_export_pin();
90 if (!in
->is_exportable(export_pin
)) {
91 dout(10) << "can no longer export " << *in
<< " because export pins have since changed" << dendl
;
95 dout(10) << "exporting dirfrags of " << *in
<< " to " << export_pin
<< dendl
;
96 bool has_auth
= false;
98 in
->dirfragtree
.get_leaves(ls
);
99 for (const auto &fg
: ls
) {
100 CDir
*cd
= in
->get_dirfrag(fg
);
101 if (cd
&& cd
->is_auth()) {
102 /* N.B. when we are no longer auth after exporting, this function will remove the inode from the queue */
103 mds
->mdcache
->migrator
->export_dir(cd
, export_pin
);
108 dout(10) << "can no longer export " << *in
<< " because I am not auth for any dirfrags" << dendl
;
114 set
<CDir
*> authsubs
;
115 mds
->mdcache
->get_auth_subtrees(authsubs
);
116 for (auto &cd
: authsubs
) {
117 mds_rank_t export_pin
= cd
->inode
->get_export_pin();
118 dout(10) << "auth tree " << *cd
<< " export_pin=" << export_pin
<< dendl
;
119 if (export_pin
>= 0 && export_pin
!= mds
->get_nodeid()) {
120 dout(10) << "exporting auth subtree " << *cd
->inode
<< " to " << export_pin
<< dendl
;
121 mds
->mdcache
->migrator
->export_dir(cd
, export_pin
);
126 void MDBalancer::tick()
128 static int num_bal_times
= g_conf
->mds_bal_max
;
129 static utime_t first
= ceph_clock_now();
130 utime_t now
= ceph_clock_now();
131 utime_t elapsed
= now
;
134 if (g_conf
->mds_bal_export_pin
) {
135 handle_export_pins();
139 if ((double)now
- (double)last_sample
> g_conf
->mds_bal_sample_interval
) {
140 dout(15) << "tick last_sample now " << now
<< dendl
;
145 if (last_heartbeat
== utime_t())
146 last_heartbeat
= now
;
147 if (mds
->get_nodeid() == 0 &&
148 g_conf
->mds_bal_interval
> 0 &&
150 (g_conf
->mds_bal_max_until
>= 0 &&
151 elapsed
.sec() > g_conf
->mds_bal_max_until
)) &&
153 now
.sec() - last_heartbeat
.sec() >= g_conf
->mds_bal_interval
) {
154 last_heartbeat
= now
;
163 class C_Bal_SendHeartbeat
: public MDSInternalContext
{
165 explicit C_Bal_SendHeartbeat(MDSRank
*mds_
) : MDSInternalContext(mds_
) { }
166 void finish(int f
) override
{
167 mds
->balancer
->send_heartbeat();
172 double mds_load_t::mds_load()
174 switch(g_conf
->mds_bal_mode
) {
177 .8 * auth
.meta_load() +
178 .2 * all
.meta_load() +
183 return req_rate
+ 10.0*queue_len
;
193 mds_load_t
MDBalancer::get_load(utime_t now
)
195 mds_load_t
load(now
);
197 if (mds
->mdcache
->get_root()) {
199 mds
->mdcache
->get_root()->get_dirfrags(ls
);
200 for (list
<CDir
*>::iterator p
= ls
.begin();
203 load
.auth
.add(now
, mds
->mdcache
->decayrate
, (*p
)->pop_auth_subtree_nested
);
204 load
.all
.add(now
, mds
->mdcache
->decayrate
, (*p
)->pop_nested
);
207 dout(20) << "get_load no root, no load" << dendl
;
210 load
.req_rate
= mds
->get_req_rate();
211 load
.queue_len
= messenger
->get_dispatch_queue_len();
213 ifstream
cpu(PROCPREFIX
"/proc/loadavg");
215 cpu
>> load
.cpu_load_avg
;
217 dout(0) << "input file " PROCPREFIX
"'/proc/loadavg' not found" << dendl
;
219 dout(15) << "get_load " << load
<< dendl
;
224 * Read synchronously from RADOS using a timeout. We cannot do daemon-local
225 * fallbacks (i.e. kick off async read when we are processing the map and
226 * check status when we get here) with the way the mds is structured.
228 int MDBalancer::localize_balancer()
230 /* reset everything */
237 /* we assume that balancer is in the metadata pool */
238 object_t oid
= object_t(mds
->mdsmap
->get_balancer());
239 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
240 ceph_tid_t tid
= mds
->objecter
->read(oid
, oloc
, 0, 0, CEPH_NOSNAP
, &lua_src
, 0,
241 new C_SafeCond(&lock
, &cond
, &ack
, &r
));
242 dout(15) << "launched non-blocking read tid=" << tid
243 << " oid=" << oid
<< " oloc=" << oloc
<< dendl
;
245 /* timeout: if we waste half our time waiting for RADOS, then abort! */
246 double t
= ceph_clock_now() + g_conf
->mds_bal_interval
/2;
248 timeout
.set_from_double(t
);
250 int ret_t
= cond
.WaitUntil(lock
, timeout
);
253 /* success: store the balancer in memory and set the version. */
255 if (ret_t
== ETIMEDOUT
) {
256 mds
->objecter
->op_cancel(tid
, -ECANCELED
);
259 bal_code
.assign(lua_src
.to_str());
260 bal_version
.assign(oid
.name
);
261 dout(0) << "localized balancer, bal_code=" << bal_code
<< dendl
;
266 void MDBalancer::send_heartbeat()
268 utime_t now
= ceph_clock_now();
270 if (mds
->is_cluster_degraded()) {
271 dout(10) << "send_heartbeat degraded" << dendl
;
275 if (!mds
->mdcache
->is_open()) {
276 dout(5) << "not open" << dendl
;
277 mds
->mdcache
->wait_for_open(new C_Bal_SendHeartbeat(mds
));
282 if (mds
->get_nodeid() == 0)
286 mds_load_t load
= get_load(now
);
287 map
<mds_rank_t
, mds_load_t
>::value_type
val(mds
->get_nodeid(), load
);
288 mds_load
.insert(val
);
290 // import_map -- how much do i import from whom
291 map
<mds_rank_t
, float> import_map
;
293 mds
->mdcache
->get_auth_subtrees(authsubs
);
294 for (set
<CDir
*>::iterator it
= authsubs
.begin();
295 it
!= authsubs
.end();
298 mds_rank_t from
= im
->inode
->authority().first
;
299 if (from
== mds
->get_nodeid()) continue;
300 if (im
->get_inode()->is_stray()) continue;
301 import_map
[from
] += im
->pop_auth_subtree
.meta_load(now
, mds
->mdcache
->decayrate
);
303 mds_import_map
[ mds
->get_nodeid() ] = import_map
;
306 dout(5) << "mds." << mds
->get_nodeid() << " epoch " << beat_epoch
<< " load " << load
<< dendl
;
307 for (map
<mds_rank_t
, float>::iterator it
= import_map
.begin();
308 it
!= import_map
.end();
310 dout(5) << " import_map from " << it
->first
<< " -> " << it
->second
<< dendl
;
315 mds
->get_mds_map()->get_up_mds_set(up
);
316 for (set
<mds_rank_t
>::iterator p
= up
.begin(); p
!= up
.end(); ++p
) {
317 if (*p
== mds
->get_nodeid())
319 MHeartbeat
*hb
= new MHeartbeat(load
, beat_epoch
);
320 hb
->get_import_map() = import_map
;
321 messenger
->send_message(hb
,
322 mds
->mdsmap
->get_inst(*p
));
326 /* This function DOES put the passed message before returning */
327 void MDBalancer::handle_heartbeat(MHeartbeat
*m
)
329 typedef map
<mds_rank_t
, mds_load_t
> mds_load_map_t
;
331 mds_rank_t who
= mds_rank_t(m
->get_source().num());
332 dout(25) << "=== got heartbeat " << m
->get_beat() << " from " << m
->get_source().num() << " " << m
->get_load() << dendl
;
334 if (!mds
->is_active())
337 if (!mds
->mdcache
->is_open()) {
338 dout(10) << "opening root on handle_heartbeat" << dendl
;
339 mds
->mdcache
->wait_for_open(new C_MDS_RetryMessage(mds
, m
));
343 if (mds
->is_cluster_degraded()) {
344 dout(10) << " degraded, ignoring" << dendl
;
349 dout(20) << " from mds0, new epoch" << dendl
;
350 beat_epoch
= m
->get_beat();
353 mds
->mdcache
->show_subtrees();
358 mds_load_map_t::value_type
val(who
, m
->get_load());
359 pair
< mds_load_map_t::iterator
, bool > rval (mds_load
.insert(val
));
361 rval
.first
->second
= val
.second
;
364 mds_import_map
[ who
] = m
->get_import_map();
366 //dout(0) << " load is " << load << " have " << mds_load.size() << dendl;
369 unsigned cluster_size
= mds
->get_mds_map()->get_num_in_mds();
370 if (mds_load
.size() == cluster_size
) {
372 //export_empties(); // no!
374 /* avoid spamming ceph -w if user does not turn mantle on */
375 if (mds
->mdsmap
->get_balancer() != "") {
376 int r
= mantle_prep_rebalance();
378 mds
->clog
->warn() << "using old balancer; mantle failed for "
379 << "balancer=" << mds
->mdsmap
->get_balancer()
380 << " : " << cpp_strerror(r
);
382 prep_rebalance(m
->get_beat());
392 void MDBalancer::export_empties()
394 dout(5) << "export_empties checking for empty imports" << dendl
;
396 std::set
<CDir
*> subtrees
;
397 mds
->mdcache
->get_fullauth_subtrees(subtrees
);
398 for (auto &dir
: subtrees
) {
399 if (dir
->is_freezing() || dir
->is_frozen())
402 if (!dir
->inode
->is_base() &&
403 !dir
->inode
->is_stray() &&
404 dir
->get_num_head_items() == 0)
405 mds
->mdcache
->migrator
->export_empty_import(dir
);
411 double MDBalancer::try_match(balance_state_t
& state
, mds_rank_t ex
, double& maxex
,
412 mds_rank_t im
, double& maxim
)
414 if (maxex
<= 0 || maxim
<= 0) return 0.0;
416 double howmuch
= MIN(maxex
, maxim
);
417 if (howmuch
<= 0) return 0.0;
419 dout(5) << " - mds." << ex
<< " exports " << howmuch
<< " to mds." << im
<< dendl
;
421 if (ex
== mds
->get_nodeid())
422 state
.targets
[im
] += howmuch
;
424 state
.exported
[ex
] += howmuch
;
425 state
.imported
[im
] += howmuch
;
433 void MDBalancer::queue_split(const CDir
*dir
, bool fast
)
435 dout(10) << __func__
<< " enqueuing " << *dir
436 << " (fast=" << fast
<< ")" << dendl
;
438 assert(mds
->mdsmap
->allows_dirfrags());
439 const dirfrag_t frag
= dir
->dirfrag();
441 auto callback
= [this, frag
](int r
) {
442 if (split_pending
.erase(frag
) == 0) {
443 // Someone beat me to it. This can happen in the fast splitting
444 // path, because we spawn two contexts, one with mds->timer and
445 // one with mds->queue_waiter. The loser can safely just drop
450 CDir
*split_dir
= mds
->mdcache
->get_dirfrag(frag
);
452 dout(10) << "drop split on " << frag
<< " because not in cache" << dendl
;
455 if (!split_dir
->is_auth()) {
456 dout(10) << "drop split on " << frag
<< " because non-auth" << dendl
;
460 // Pass on to MDCache: note that the split might still not
461 // happen if the checks in MDCache::can_fragment fail.
462 dout(10) << __func__
<< " splitting " << *split_dir
<< dendl
;
463 mds
->mdcache
->split_dir(split_dir
, g_conf
->mds_bal_split_bits
);
467 if (split_pending
.count(frag
) == 0) {
468 split_pending
.insert(frag
);
473 // Do the split ASAP: enqueue it in the MDSRank waiters which are
474 // run at the end of dispatching the current request
475 mds
->queue_waiter(new MDSInternalContextWrapper(mds
,
476 new FunctionContext(callback
)));
478 // Set a timer to really do the split: we don't do it immediately
479 // so that bursts of ops on a directory have a chance to go through
480 // before we freeze it.
481 mds
->timer
.add_event_after(g_conf
->mds_bal_fragment_interval
,
482 new FunctionContext(callback
));
486 void MDBalancer::queue_merge(CDir
*dir
)
488 const auto frag
= dir
->dirfrag();
489 auto callback
= [this, frag
](int r
) {
490 assert(frag
.frag
!= frag_t());
492 // frag must be in this set because only one context is in flight
493 // for a given frag at a time (because merge_pending is checked before
494 // starting one), and this context is the only one that erases it.
495 merge_pending
.erase(frag
);
497 CDir
*dir
= mds
->mdcache
->get_dirfrag(frag
);
499 dout(10) << "drop merge on " << frag
<< " because not in cache" << dendl
;
502 assert(dir
->dirfrag() == frag
);
504 if(!dir
->is_auth()) {
505 dout(10) << "drop merge on " << *dir
<< " because lost auth" << dendl
;
509 dout(10) << "merging " << *dir
<< dendl
;
511 CInode
*diri
= dir
->get_inode();
513 frag_t fg
= dir
->get_frag();
514 while (fg
!= frag_t()) {
515 frag_t sibfg
= fg
.get_sibling();
517 bool complete
= diri
->get_dirfrags_under(sibfg
, sibs
);
519 dout(10) << " not all sibs under " << sibfg
<< " in cache (have " << sibs
<< ")" << dendl
;
523 for (list
<CDir
*>::iterator p
= sibs
.begin(); p
!= sibs
.end(); ++p
) {
525 if (!sib
->is_auth() || !sib
->should_merge()) {
531 dout(10) << " not all sibs under " << sibfg
<< " " << sibs
<< " should_merge" << dendl
;
534 dout(10) << " all sibs under " << sibfg
<< " " << sibs
<< " should merge" << dendl
;
538 if (fg
!= dir
->get_frag())
539 mds
->mdcache
->merge_dir(diri
, fg
);
542 if (merge_pending
.count(frag
) == 0) {
543 dout(20) << __func__
<< " enqueued dir " << *dir
<< dendl
;
544 merge_pending
.insert(frag
);
545 mds
->timer
.add_event_after(g_conf
->mds_bal_fragment_interval
,
546 new FunctionContext(callback
));
548 dout(20) << __func__
<< " dir already in queue " << *dir
<< dendl
;
552 void MDBalancer::prep_rebalance(int beat
)
554 balance_state_t state
;
556 if (g_conf
->mds_thrash_exports
) {
557 //we're going to randomly export to all the mds in the cluster
558 set
<mds_rank_t
> up_mds
;
559 mds
->get_mds_map()->get_up_mds_set(up_mds
);
560 for (const auto &rank
: up_mds
) {
561 state
.targets
[rank
] = 0.0;
564 int cluster_size
= mds
->get_mds_map()->get_num_in_mds();
565 mds_rank_t whoami
= mds
->get_nodeid();
566 rebalance_time
= ceph_clock_now();
568 dout(5) << " prep_rebalance: cluster loads are" << dendl
;
570 mds
->mdcache
->migrator
->clear_export_queue();
572 // rescale! turn my mds_load back into meta_load units
573 double load_fac
= 1.0;
574 map
<mds_rank_t
, mds_load_t
>::iterator m
= mds_load
.find(whoami
);
575 if ((m
!= mds_load
.end()) && (m
->second
.mds_load() > 0)) {
576 double metald
= m
->second
.auth
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
577 double mdsld
= m
->second
.mds_load();
578 load_fac
= metald
/ mdsld
;
579 dout(7) << " load_fac is " << load_fac
580 << " <- " << m
->second
.auth
<< " " << metald
585 double total_load
= 0.0;
586 multimap
<double,mds_rank_t
> load_map
;
587 for (mds_rank_t i
=mds_rank_t(0); i
< mds_rank_t(cluster_size
); i
++) {
588 map
<mds_rank_t
, mds_load_t
>::value_type
val(i
, mds_load_t(ceph_clock_now()));
589 std::pair
< map
<mds_rank_t
, mds_load_t
>::iterator
, bool > r(mds_load
.insert(val
));
590 mds_load_t
&load(r
.first
->second
);
592 double l
= load
.mds_load() * load_fac
;
593 mds_meta_load
[i
] = l
;
596 dout(0) << " mds." << i
598 << " = " << load
.mds_load()
599 << " ~ " << l
<< dendl
;
601 if (whoami
== i
) my_load
= l
;
604 load_map
.insert(pair
<double,mds_rank_t
>( l
, i
));
608 target_load
= total_load
/ (double)cluster_size
;
609 dout(5) << "prep_rebalance: my load " << my_load
610 << " target " << target_load
611 << " total " << total_load
615 if (my_load
< target_load
* (1.0 + g_conf
->mds_bal_min_rebalance
)) {
616 dout(5) << " i am underloaded or barely overloaded, doing nothing." << dendl
;
617 last_epoch_under
= beat_epoch
;
618 mds
->mdcache
->show_subtrees();
622 last_epoch_over
= beat_epoch
;
624 // am i over long enough?
625 if (last_epoch_under
&& beat_epoch
- last_epoch_under
< 2) {
626 dout(5) << " i am overloaded, but only for " << (beat_epoch
- last_epoch_under
) << " epochs" << dendl
;
630 dout(5) << " i am sufficiently overloaded" << dendl
;
633 // first separate exporters and importers
634 multimap
<double,mds_rank_t
> importers
;
635 multimap
<double,mds_rank_t
> exporters
;
636 set
<mds_rank_t
> importer_set
;
637 set
<mds_rank_t
> exporter_set
;
639 for (multimap
<double,mds_rank_t
>::iterator it
= load_map
.begin();
640 it
!= load_map
.end();
642 if (it
->first
< target_load
) {
643 dout(15) << " mds." << it
->second
<< " is importer" << dendl
;
644 importers
.insert(pair
<double,mds_rank_t
>(it
->first
,it
->second
));
645 importer_set
.insert(it
->second
);
647 dout(15) << " mds." << it
->second
<< " is exporter" << dendl
;
648 exporters
.insert(pair
<double,mds_rank_t
>(it
->first
,it
->second
));
649 exporter_set
.insert(it
->second
);
654 // determine load transfer mapping
657 // analyze import_map; do any matches i can
659 dout(15) << " matching exporters to import sources" << dendl
;
661 // big -> small exporters
662 for (multimap
<double,mds_rank_t
>::reverse_iterator ex
= exporters
.rbegin();
663 ex
!= exporters
.rend();
665 double maxex
= get_maxex(state
, ex
->second
);
666 if (maxex
<= .001) continue;
668 // check importers. for now, just in arbitrary order (no intelligent matching).
669 for (map
<mds_rank_t
, float>::iterator im
= mds_import_map
[ex
->second
].begin();
670 im
!= mds_import_map
[ex
->second
].end();
672 double maxim
= get_maxim(state
, im
->first
);
673 if (maxim
<= .001) continue;
674 try_match(state
, ex
->second
, maxex
, im
->first
, maxim
);
675 if (maxex
<= .001) break;
682 dout(15) << " matching big exporters to big importers" << dendl
;
683 // big exporters to big importers
684 multimap
<double,mds_rank_t
>::reverse_iterator ex
= exporters
.rbegin();
685 multimap
<double,mds_rank_t
>::iterator im
= importers
.begin();
686 while (ex
!= exporters
.rend() &&
687 im
!= importers
.end()) {
688 double maxex
= get_maxex(state
, ex
->second
);
689 double maxim
= get_maxim(state
, im
->second
);
690 if (maxex
< .001 || maxim
< .001) break;
691 try_match(state
, ex
->second
, maxex
, im
->second
, maxim
);
692 if (maxex
<= .001) ++ex
;
693 if (maxim
<= .001) ++im
;
696 dout(15) << " matching small exporters to big importers" << dendl
;
697 // small exporters to big importers
698 multimap
<double,mds_rank_t
>::iterator ex
= exporters
.begin();
699 multimap
<double,mds_rank_t
>::iterator im
= importers
.begin();
700 while (ex
!= exporters
.end() &&
701 im
!= importers
.end()) {
702 double maxex
= get_maxex(state
, ex
->second
);
703 double maxim
= get_maxim(state
, im
->second
);
704 if (maxex
< .001 || maxim
< .001) break;
705 try_match(state
, ex
->second
, maxex
, im
->second
, maxim
);
706 if (maxex
<= .001) ++ex
;
707 if (maxim
<= .001) ++im
;
711 try_rebalance(state
);
714 void MDBalancer::hit_targets(const balance_state_t
& state
)
716 utime_t now
= ceph_clock_now();
717 for (auto &it
: state
.targets
) {
718 mds_rank_t target
= it
.first
;
719 mds
->hit_export_target(now
, target
, g_conf
->mds_bal_target_decay
);
723 int MDBalancer::mantle_prep_rebalance()
725 balance_state_t state
;
727 /* refresh balancer if it has changed */
728 if (bal_version
!= mds
->mdsmap
->get_balancer()) {
729 bal_version
.assign("");
730 int r
= localize_balancer();
733 /* only spam the cluster log from 1 mds on version changes */
734 if (mds
->get_nodeid() == 0)
735 mds
->clog
->info() << "mantle balancer version changed: " << bal_version
;
738 /* prepare for balancing */
739 int cluster_size
= mds
->get_mds_map()->get_num_in_mds();
740 rebalance_time
= ceph_clock_now();
741 mds
->mdcache
->migrator
->clear_export_queue();
743 /* fill in the metrics for each mds by grabbing load struct */
744 vector
< map
<string
, double> > metrics (cluster_size
);
745 for (mds_rank_t i
=mds_rank_t(0);
746 i
< mds_rank_t(cluster_size
);
748 map
<mds_rank_t
, mds_load_t
>::value_type
val(i
, mds_load_t(ceph_clock_now()));
749 std::pair
< map
<mds_rank_t
, mds_load_t
>::iterator
, bool > r(mds_load
.insert(val
));
750 mds_load_t
&load(r
.first
->second
);
752 metrics
[i
] = {{"auth.meta_load", load
.auth
.meta_load()},
753 {"all.meta_load", load
.all
.meta_load()},
754 {"req_rate", load
.req_rate
},
755 {"queue_len", load
.queue_len
},
756 {"cpu_load_avg", load
.cpu_load_avg
}};
759 /* execute the balancer */
761 int ret
= mantle
.balance(bal_code
, mds
->get_nodeid(), metrics
, state
.targets
);
762 dout(2) << " mantle decided that new targets=" << state
.targets
<< dendl
;
764 /* mantle doesn't know about cluster size, so check target len here */
765 if ((int) state
.targets
.size() != cluster_size
)
770 try_rebalance(state
);
776 void MDBalancer::try_rebalance(balance_state_t
& state
)
778 if (!check_targets(state
))
781 if (g_conf
->mds_thrash_exports
) {
782 dout(5) << "mds_thrash is on; not performing standard rebalance operation!"
787 // make a sorted list of my imports
788 map
<double,CDir
*> import_pop_map
;
789 multimap
<mds_rank_t
,CDir
*> import_from_map
;
790 set
<CDir
*> fullauthsubs
;
792 mds
->mdcache
->get_fullauth_subtrees(fullauthsubs
);
793 for (set
<CDir
*>::iterator it
= fullauthsubs
.begin();
794 it
!= fullauthsubs
.end();
797 if (im
->get_inode()->is_stray()) continue;
799 double pop
= im
->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
800 if (g_conf
->mds_bal_idle_threshold
> 0 &&
801 pop
< g_conf
->mds_bal_idle_threshold
&&
802 im
->inode
!= mds
->mdcache
->get_root() &&
803 im
->inode
->authority().first
!= mds
->get_nodeid()) {
804 dout(0) << " exporting idle (" << pop
<< ") import " << *im
805 << " back to mds." << im
->inode
->authority().first
807 mds
->mdcache
->migrator
->export_dir_nicely(im
, im
->inode
->authority().first
);
811 import_pop_map
[ pop
] = im
;
812 mds_rank_t from
= im
->inode
->authority().first
;
813 dout(15) << " map: i imported " << *im
<< " from " << from
<< dendl
;
814 import_from_map
.insert(pair
<mds_rank_t
,CDir
*>(from
, im
));
820 set
<CDir
*> already_exporting
;
822 for (auto &it
: state
.targets
) {
823 mds_rank_t target
= it
.first
;
824 double amount
= it
.second
;
826 if (amount
< MIN_OFFLOAD
) continue;
827 if (amount
/ target_load
< .2) continue;
829 dout(5) << "want to send " << amount
<< " to mds." << target
830 //<< " .. " << (*it).second << " * " << load_fac
832 << dendl
;//" .. fudge is " << fudge << dendl;
836 mds
->mdcache
->show_subtrees();
838 // search imports from target
839 if (import_from_map
.count(target
)) {
840 dout(5) << " aha, looking through imports from target mds." << target
<< dendl
;
841 pair
<multimap
<mds_rank_t
,CDir
*>::iterator
, multimap
<mds_rank_t
,CDir
*>::iterator
> p
=
842 import_from_map
.equal_range(target
);
843 while (p
.first
!= p
.second
) {
844 CDir
*dir
= (*p
.first
).second
;
845 dout(5) << "considering " << *dir
<< " from " << (*p
.first
).first
<< dendl
;
846 multimap
<mds_rank_t
,CDir
*>::iterator plast
= p
.first
++;
848 if (dir
->inode
->is_base() ||
849 dir
->inode
->is_stray())
851 if (dir
->is_freezing() || dir
->is_frozen()) continue; // export pbly already in progress
852 double pop
= dir
->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
853 assert(dir
->inode
->authority().first
== target
); // cuz that's how i put it in the map, dummy
855 if (pop
<= amount
-have
) {
856 dout(0) << "reexporting " << *dir
858 << " back to mds." << target
<< dendl
;
859 mds
->mdcache
->migrator
->export_dir_nicely(dir
, target
);
861 import_from_map
.erase(plast
);
862 import_pop_map
.erase(pop
);
864 dout(5) << "can't reexport " << *dir
<< ", too big " << pop
<< dendl
;
866 if (amount
-have
< MIN_OFFLOAD
) break;
869 if (amount
-have
< MIN_OFFLOAD
) {
875 for (map
<double,CDir
*>::iterator import
= import_pop_map
.begin();
876 import
!= import_pop_map
.end();
878 CDir
*imp
= (*import
).second
;
879 if (imp
->inode
->is_base() ||
880 imp
->inode
->is_stray())
883 double pop
= (*import
).first
;
884 if (pop
< amount
-have
|| pop
< MIN_REEXPORT
) {
885 dout(0) << "reexporting " << *imp
887 << " back to mds." << imp
->inode
->authority()
890 mds
->mdcache
->migrator
->export_dir_nicely(imp
, imp
->inode
->authority().first
);
892 if (amount
-have
< MIN_OFFLOAD
) break;
894 if (amount
-have
< MIN_OFFLOAD
) {
895 //fudge = amount-have;
899 // okay, search for fragments of my workload
900 set
<CDir
*> candidates
;
901 mds
->mdcache
->get_fullauth_subtrees(candidates
);
905 for (set
<CDir
*>::iterator pot
= candidates
.begin();
906 pot
!= candidates
.end();
908 if ((*pot
)->get_inode()->is_stray()) continue;
909 find_exports(*pot
, amount
, exports
, have
, already_exporting
);
910 if (have
> amount
-MIN_OFFLOAD
)
913 //fudge = amount - have;
915 for (list
<CDir
*>::iterator it
= exports
.begin(); it
!= exports
.end(); ++it
) {
916 dout(0) << " - exporting "
917 << (*it
)->pop_auth_subtree
919 << (*it
)->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
)
920 << " to mds." << target
923 mds
->mdcache
->migrator
->export_dir_nicely(*it
, target
);
927 dout(5) << "rebalance done" << dendl
;
928 mds
->mdcache
->show_subtrees();
932 /* Check that all targets are in the MDSMap export_targets for my rank. */
933 bool MDBalancer::check_targets(const balance_state_t
& state
)
935 for (const auto &it
: state
.targets
) {
936 if (!mds
->is_export_target(it
.first
)) {
943 void MDBalancer::find_exports(CDir
*dir
,
945 list
<CDir
*>& exports
,
947 set
<CDir
*>& already_exporting
)
949 double need
= amount
- have
;
950 if (need
< amount
* g_conf
->mds_bal_min_start
)
951 return; // good enough!
952 double needmax
= need
* g_conf
->mds_bal_need_max
;
953 double needmin
= need
* g_conf
->mds_bal_need_min
;
954 double midchunk
= need
* g_conf
->mds_bal_midchunk
;
955 double minchunk
= need
* g_conf
->mds_bal_minchunk
;
957 list
<CDir
*> bigger_rep
, bigger_unrep
;
958 multimap
<double, CDir
*> smaller
;
960 double dir_pop
= dir
->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
961 dout(7) << " find_exports in " << dir_pop
<< " " << *dir
<< " need " << need
<< " (" << needmin
<< " - " << needmax
<< ")" << dendl
;
963 double subdir_sum
= 0;
964 for (CDir::map_t::iterator it
= dir
->begin();
967 CInode
*in
= it
->second
->get_linkage()->get_inode();
969 if (!in
->is_dir()) continue;
972 in
->get_dirfrags(dfls
);
973 for (list
<CDir
*>::iterator p
= dfls
.begin();
977 if (!subdir
->is_auth()) continue;
978 if (already_exporting
.count(subdir
)) continue;
980 if (subdir
->is_frozen()) continue; // can't export this right now!
983 double pop
= subdir
->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
985 dout(15) << " subdir pop " << pop
<< " " << *subdir
<< dendl
;
987 if (pop
< minchunk
) continue;
990 if (pop
> needmin
&& pop
< needmax
) {
991 exports
.push_back(subdir
);
992 already_exporting
.insert(subdir
);
998 if (subdir
->is_rep())
999 bigger_rep
.push_back(subdir
);
1001 bigger_unrep
.push_back(subdir
);
1003 smaller
.insert(pair
<double,CDir
*>(pop
, subdir
));
1006 dout(15) << " sum " << subdir_sum
<< " / " << dir_pop
<< dendl
;
1008 // grab some sufficiently big small items
1009 multimap
<double,CDir
*>::reverse_iterator it
;
1010 for (it
= smaller
.rbegin();
1011 it
!= smaller
.rend();
1014 if ((*it
).first
< midchunk
)
1017 dout(7) << " taking smaller " << *(*it
).second
<< dendl
;
1019 exports
.push_back((*it
).second
);
1020 already_exporting
.insert((*it
).second
);
1021 have
+= (*it
).first
;
1026 // apprently not enough; drill deeper into the hierarchy (if non-replicated)
1027 for (list
<CDir
*>::iterator it
= bigger_unrep
.begin();
1028 it
!= bigger_unrep
.end();
1030 dout(15) << " descending into " << **it
<< dendl
;
1031 find_exports(*it
, amount
, exports
, have
, already_exporting
);
1036 // ok fine, use smaller bits
1038 it
!= smaller
.rend();
1040 dout(7) << " taking (much) smaller " << it
->first
<< " " << *(*it
).second
<< dendl
;
1042 exports
.push_back((*it
).second
);
1043 already_exporting
.insert((*it
).second
);
1044 have
+= (*it
).first
;
1049 // ok fine, drill into replicated dirs
1050 for (list
<CDir
*>::iterator it
= bigger_rep
.begin();
1051 it
!= bigger_rep
.end();
1053 dout(7) << " descending into replicated " << **it
<< dendl
;
1054 find_exports(*it
, amount
, exports
, have
, already_exporting
);
1061 void MDBalancer::hit_inode(utime_t now
, CInode
*in
, int type
, int who
)
1064 in
->pop
.get(type
).hit(now
, mds
->mdcache
->decayrate
);
1066 if (in
->get_parent_dn())
1067 hit_dir(now
, in
->get_parent_dn()->get_dir(), type
, who
);
1070 void MDBalancer::maybe_fragment(CDir
*dir
, bool hot
)
1073 if (g_conf
->mds_bal_frag
&& g_conf
->mds_bal_fragment_interval
> 0 &&
1074 !dir
->inode
->is_base() && // not root/base (for now at least)
1078 if (g_conf
->mds_bal_split_size
> 0 &&
1079 mds
->mdsmap
->allows_dirfrags() &&
1080 (dir
->should_split() || hot
))
1082 if (split_pending
.count(dir
->dirfrag()) == 0) {
1083 queue_split(dir
, false);
1085 if (dir
->should_split_fast()) {
1086 queue_split(dir
, true);
1088 dout(10) << __func__
<< ": fragment already enqueued to split: "
1095 if (dir
->get_frag() != frag_t() && dir
->should_merge() &&
1096 merge_pending
.count(dir
->dirfrag()) == 0) {
1102 void MDBalancer::hit_dir(utime_t now
, CDir
*dir
, int type
, int who
, double amount
)
1105 double v
= dir
->pop_me
.get(type
).hit(now
, amount
);
1107 const bool hot
= (v
> g_conf
->mds_bal_split_rd
&& type
== META_POP_IRD
) ||
1108 (v
> g_conf
->mds_bal_split_wr
&& type
== META_POP_IWR
);
1110 dout(20) << "hit_dir " << type
<< " pop is " << v
<< ", frag " << dir
->get_frag()
1111 << " size " << dir
->get_frag_size() << dendl
;
1113 maybe_fragment(dir
, hot
);
1116 if (type
== META_POP_IRD
&& who
>= 0) {
1117 dir
->pop_spread
.hit(now
, mds
->mdcache
->decayrate
, who
);
1120 double rd_adj
= 0.0;
1121 if (type
== META_POP_IRD
&&
1122 dir
->last_popularity_sample
< last_sample
) {
1123 double dir_pop
= dir
->pop_auth_subtree
.get(type
).get(now
, mds
->mdcache
->decayrate
); // hmm??
1124 dir
->last_popularity_sample
= last_sample
;
1125 double pop_sp
= dir
->pop_spread
.get(now
, mds
->mdcache
->decayrate
);
1126 dir_pop
+= pop_sp
* 10;
1128 //if (dir->ino() == inodeno_t(0x10000000002))
1130 dout(20) << "hit_dir " << type
<< " pop " << dir_pop
<< " spread " << pop_sp
1131 << " " << dir
->pop_spread
.last
[0]
1132 << " " << dir
->pop_spread
.last
[1]
1133 << " " << dir
->pop_spread
.last
[2]
1134 << " " << dir
->pop_spread
.last
[3]
1135 << " in " << *dir
<< dendl
;
1138 if (dir
->is_auth() && !dir
->is_ambiguous_auth()) {
1139 if (!dir
->is_rep() &&
1140 dir_pop
>= g_conf
->mds_bal_replicate_threshold
) {
1142 double rdp
= dir
->pop_me
.get(META_POP_IRD
).get(now
, mds
->mdcache
->decayrate
);
1143 rd_adj
= rdp
/ mds
->get_mds_map()->get_num_in_mds() - rdp
;
1144 rd_adj
/= 2.0; // temper somewhat
1146 dout(0) << "replicating dir " << *dir
<< " pop " << dir_pop
<< " .. rdp " << rdp
<< " adj " << rd_adj
<< dendl
;
1148 dir
->dir_rep
= CDir::REP_ALL
;
1149 mds
->mdcache
->send_dir_updates(dir
, true);
1151 // fixme this should adjust the whole pop hierarchy
1152 dir
->pop_me
.get(META_POP_IRD
).adjust(rd_adj
);
1153 dir
->pop_auth_subtree
.get(META_POP_IRD
).adjust(rd_adj
);
1156 if (dir
->ino() != 1 &&
1158 dir_pop
< g_conf
->mds_bal_unreplicate_threshold
) {
1160 dout(0) << "unreplicating dir " << *dir
<< " pop " << dir_pop
<< dendl
;
1162 dir
->dir_rep
= CDir::REP_NONE
;
1163 mds
->mdcache
->send_dir_updates(dir
);
1169 bool hit_subtree
= dir
->is_auth(); // current auth subtree (if any)
1170 bool hit_subtree_nested
= dir
->is_auth(); // all nested auth subtrees
1173 dir
->pop_nested
.get(type
).hit(now
, amount
);
1175 dir
->pop_nested
.get(META_POP_IRD
).adjust(now
, mds
->mdcache
->decayrate
, rd_adj
);
1178 dir
->pop_auth_subtree
.get(type
).hit(now
, amount
);
1180 dir
->pop_auth_subtree
.get(META_POP_IRD
).adjust(now
, mds
->mdcache
->decayrate
, rd_adj
);
1183 if (hit_subtree_nested
) {
1184 dir
->pop_auth_subtree_nested
.get(type
).hit(now
, mds
->mdcache
->decayrate
, amount
);
1186 dir
->pop_auth_subtree_nested
.get(META_POP_IRD
).adjust(now
, mds
->mdcache
->decayrate
, rd_adj
);
1189 if (dir
->is_subtree_root())
1190 hit_subtree
= false; // end of auth domain, stop hitting auth counters.
1192 if (dir
->inode
->get_parent_dn() == 0) break;
1193 dir
= dir
->inode
->get_parent_dn()->get_dir();
1199 * subtract off an exported chunk.
1200 * this excludes *dir itself (encode_export_dir should have take care of that)
1201 * we _just_ do the parents' nested counters.
1203 * NOTE: call me _after_ forcing *dir into a subtree root,
1204 * but _before_ doing the encode_export_dirs.
1206 void MDBalancer::subtract_export(CDir
*dir
, utime_t now
)
1208 dirfrag_load_vec_t subload
= dir
->pop_auth_subtree
;
1211 dir
= dir
->inode
->get_parent_dir();
1214 dir
->pop_nested
.sub(now
, mds
->mdcache
->decayrate
, subload
);
1215 dir
->pop_auth_subtree_nested
.sub(now
, mds
->mdcache
->decayrate
, subload
);
1220 void MDBalancer::add_import(CDir
*dir
, utime_t now
)
1222 dirfrag_load_vec_t subload
= dir
->pop_auth_subtree
;
1225 dir
= dir
->inode
->get_parent_dir();
1228 dir
->pop_nested
.add(now
, mds
->mdcache
->decayrate
, subload
);
1229 dir
->pop_auth_subtree_nested
.add(now
, mds
->mdcache
->decayrate
, subload
);