1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/compat.h"
18 #include "MDBalancer.h"
20 #include "mon/MonClient.h"
28 #include "include/Context.h"
29 #include "msg/Messenger.h"
30 #include "messages/MHeartbeat.h"
39 #include "common/config.h"
40 #include "common/errno.h"
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
45 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal "
49 auto subsys = ceph_subsys_mds;\
50 if ((dout_context)->_conf->subsys.should_gather(ceph_subsys_mds_balancer, lvl)) {\
51 subsys = ceph_subsys_mds_balancer;\
53 dout_impl(dout_context, subsys, lvl) dout_prefix
55 #define dendl dendl_impl; } while (0)
58 #define MIN_LOAD 50 // ??
59 #define MIN_REEXPORT 5 // will automatically reexport
60 #define MIN_OFFLOAD 10 // point at which i stop trying, close enough
63 /* This function DOES put the passed message before returning */
64 int MDBalancer::proc_message(Message
*m
)
66 switch (m
->get_type()) {
68 case MSG_MDS_HEARTBEAT
:
69 handle_heartbeat(static_cast<MHeartbeat
*>(m
));
73 derr
<< " balancer unknown message " << m
->get_type() << dendl_impl
;
74 assert(0 == "balancer unknown message");
80 void MDBalancer::handle_export_pins(void)
82 auto &q
= mds
->mdcache
->export_pin_queue
;
84 dout(20) << "export_pin_queue size=" << q
.size() << dendl
;
85 while (it
!= q
.end()) {
89 mds_rank_t export_pin
= in
->get_export_pin(false);
93 in
->get_dirfrags(dfls
);
94 for (auto dir
: dfls
) {
98 if (export_pin
== MDS_RANK_NONE
) {
99 if (dir
->state_test(CDir::STATE_AUXSUBTREE
)) {
100 if (dir
->is_frozen() || dir
->is_freezing()) {
105 dout(10) << " clear auxsubtree on " << *dir
<< dendl
;
106 dir
->state_clear(CDir::STATE_AUXSUBTREE
);
107 mds
->mdcache
->try_subtree_merge(dir
);
109 } else if (export_pin
== mds
->get_nodeid()) {
110 if (dir
->state_test(CDir::STATE_CREATING
) ||
111 dir
->is_frozen() || dir
->is_freezing()) {
116 if (!dir
->is_subtree_root()) {
117 dir
->state_set(CDir::STATE_AUXSUBTREE
);
118 mds
->mdcache
->adjust_subtree_auth(dir
, mds
->get_nodeid());
119 dout(10) << " create aux subtree on " << *dir
<< dendl
;
120 } else if (!dir
->state_test(CDir::STATE_AUXSUBTREE
)) {
121 dout(10) << " set auxsubtree bit on " << *dir
<< dendl
;
122 dir
->state_set(CDir::STATE_AUXSUBTREE
);
125 mds
->mdcache
->migrator
->export_dir(dir
, export_pin
);
131 in
->state_clear(CInode::STATE_QUEUEDEXPORTPIN
);
136 set
<CDir
*> authsubs
;
137 mds
->mdcache
->get_auth_subtrees(authsubs
);
138 for (auto &cd
: authsubs
) {
139 mds_rank_t export_pin
= cd
->inode
->get_export_pin();
140 dout(10) << "auth tree " << *cd
<< " export_pin=" << export_pin
<< dendl
;
141 if (export_pin
>= 0 && export_pin
!= mds
->get_nodeid()) {
142 dout(10) << "exporting auth subtree " << *cd
->inode
<< " to " << export_pin
<< dendl
;
143 mds
->mdcache
->migrator
->export_dir(cd
, export_pin
);
148 void MDBalancer::tick()
150 static int num_bal_times
= g_conf
->mds_bal_max
;
151 static utime_t first
= ceph_clock_now();
152 utime_t now
= ceph_clock_now();
153 utime_t elapsed
= now
;
156 if (g_conf
->mds_bal_export_pin
) {
157 handle_export_pins();
161 if ((double)now
- (double)last_sample
> g_conf
->mds_bal_sample_interval
) {
162 dout(15) << "tick last_sample now " << now
<< dendl
;
167 if (mds
->get_nodeid() == 0 &&
168 g_conf
->mds_bal_interval
> 0 &&
170 (g_conf
->mds_bal_max_until
>= 0 &&
171 elapsed
.sec() > g_conf
->mds_bal_max_until
)) &&
173 now
.sec() - last_heartbeat
.sec() >= g_conf
->mds_bal_interval
) {
174 last_heartbeat
= now
;
183 class C_Bal_SendHeartbeat
: public MDSInternalContext
{
185 explicit C_Bal_SendHeartbeat(MDSRank
*mds_
) : MDSInternalContext(mds_
) { }
186 void finish(int f
) override
{
187 mds
->balancer
->send_heartbeat();
192 double mds_load_t::mds_load()
194 switch(g_conf
->mds_bal_mode
) {
197 .8 * auth
.meta_load() +
198 .2 * all
.meta_load() +
203 return req_rate
+ 10.0*queue_len
;
213 mds_load_t
MDBalancer::get_load(utime_t now
)
215 mds_load_t
load(now
);
217 if (mds
->mdcache
->get_root()) {
219 mds
->mdcache
->get_root()->get_dirfrags(ls
);
220 for (list
<CDir
*>::iterator p
= ls
.begin();
223 load
.auth
.add(now
, mds
->mdcache
->decayrate
, (*p
)->pop_auth_subtree_nested
);
224 load
.all
.add(now
, mds
->mdcache
->decayrate
, (*p
)->pop_nested
);
227 dout(20) << "get_load no root, no load" << dendl
;
230 uint64_t num_requests
= mds
->get_num_requests();
231 bool new_req_rate
= false;
232 if (last_get_load
!= utime_t() &&
233 now
> last_get_load
&&
234 num_requests
>= last_num_requests
) {
238 load
.req_rate
= (num_requests
- last_num_requests
) / (double)el
;
243 auto p
= mds_load
.find(mds
->get_nodeid());
244 if (p
!= mds_load
.end())
245 load
.req_rate
= p
->second
.req_rate
;
248 last_num_requests
= num_requests
;
250 load
.queue_len
= messenger
->get_dispatch_queue_len();
252 ifstream
cpu(PROCPREFIX
"/proc/loadavg");
254 cpu
>> load
.cpu_load_avg
;
256 derr
<< "input file " PROCPREFIX
"'/proc/loadavg' not found" << dendl_impl
;
258 dout(15) << "get_load " << load
<< dendl
;
263 * Read synchronously from RADOS using a timeout. We cannot do daemon-local
264 * fallbacks (i.e. kick off async read when we are processing the map and
265 * check status when we get here) with the way the mds is structured.
267 int MDBalancer::localize_balancer()
269 /* reset everything */
276 /* we assume that balancer is in the metadata pool */
277 object_t oid
= object_t(mds
->mdsmap
->get_balancer());
278 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
279 ceph_tid_t tid
= mds
->objecter
->read(oid
, oloc
, 0, 0, CEPH_NOSNAP
, &lua_src
, 0,
280 new C_SafeCond(&lock
, &cond
, &ack
, &r
));
281 dout(15) << "launched non-blocking read tid=" << tid
282 << " oid=" << oid
<< " oloc=" << oloc
<< dendl
;
284 /* timeout: if we waste half our time waiting for RADOS, then abort! */
285 double t
= ceph_clock_now() + g_conf
->mds_bal_interval
/2;
287 timeout
.set_from_double(t
);
289 int ret_t
= cond
.WaitUntil(lock
, timeout
);
292 /* success: store the balancer in memory and set the version. */
294 if (ret_t
== ETIMEDOUT
) {
295 mds
->objecter
->op_cancel(tid
, -ECANCELED
);
298 bal_code
.assign(lua_src
.to_str());
299 bal_version
.assign(oid
.name
);
300 dout(10) << "localized balancer, bal_code=" << bal_code
<< dendl
;
305 void MDBalancer::send_heartbeat()
307 utime_t now
= ceph_clock_now();
309 if (mds
->is_cluster_degraded()) {
310 dout(10) << "send_heartbeat degraded" << dendl
;
314 if (!mds
->mdcache
->is_open()) {
315 dout(5) << "not open" << dendl
;
316 mds
->mdcache
->wait_for_open(new C_Bal_SendHeartbeat(mds
));
320 if (mds
->get_nodeid() == 0) {
326 mds_load_t load
= get_load(now
);
327 mds
->logger
->set(l_mds_load_cent
, 100 * load
.mds_load());
328 mds
->logger
->set(l_mds_dispatch_queue_len
, load
.queue_len
);
330 mds_load
[mds
->get_nodeid()] = load
;
332 // import_map -- how much do i import from whom
333 map
<mds_rank_t
, float> import_map
;
335 mds
->mdcache
->get_auth_subtrees(authsubs
);
336 for (set
<CDir
*>::iterator it
= authsubs
.begin();
337 it
!= authsubs
.end();
340 mds_rank_t from
= im
->inode
->authority().first
;
341 if (from
== mds
->get_nodeid()) continue;
342 if (im
->get_inode()->is_stray()) continue;
343 import_map
[from
] += im
->pop_auth_subtree
.meta_load(now
, mds
->mdcache
->decayrate
);
345 mds_import_map
[ mds
->get_nodeid() ] = import_map
;
348 dout(5) << "mds." << mds
->get_nodeid() << " epoch " << beat_epoch
<< " load " << load
<< dendl
;
349 for (map
<mds_rank_t
, float>::iterator it
= import_map
.begin();
350 it
!= import_map
.end();
352 dout(5) << " import_map from " << it
->first
<< " -> " << it
->second
<< dendl
;
357 mds
->get_mds_map()->get_up_mds_set(up
);
358 for (set
<mds_rank_t
>::iterator p
= up
.begin(); p
!= up
.end(); ++p
) {
359 if (*p
== mds
->get_nodeid())
361 MHeartbeat
*hb
= new MHeartbeat(load
, beat_epoch
);
362 hb
->get_import_map() = import_map
;
363 messenger
->send_message(hb
,
364 mds
->mdsmap
->get_inst(*p
));
368 /* This function DOES put the passed message before returning */
369 void MDBalancer::handle_heartbeat(MHeartbeat
*m
)
371 mds_rank_t who
= mds_rank_t(m
->get_source().num());
372 dout(25) << "=== got heartbeat " << m
->get_beat() << " from " << m
->get_source().num() << " " << m
->get_load() << dendl
;
374 if (!mds
->is_active())
377 if (!mds
->mdcache
->is_open()) {
378 dout(10) << "opening root on handle_heartbeat" << dendl
;
379 mds
->mdcache
->wait_for_open(new C_MDS_RetryMessage(mds
, m
));
383 if (mds
->is_cluster_degraded()) {
384 dout(10) << " degraded, ignoring" << dendl
;
388 if (mds
->get_nodeid() != 0 && m
->get_beat() > beat_epoch
) {
389 dout(10) << "receive next epoch " << m
->get_beat() << " from mds." << who
<< " before mds0" << dendl
;
391 beat_epoch
= m
->get_beat();
392 // clear the mds load info whose epoch is less than beat_epoch
397 dout(20) << " from mds0, new epoch " << m
->get_beat() << dendl
;
398 if (beat_epoch
!= m
->get_beat()) {
399 beat_epoch
= m
->get_beat();
405 mds
->mdcache
->show_subtrees();
406 } else if (mds
->get_nodeid() == 0) {
407 if (beat_epoch
!= m
->get_beat()) {
408 dout(10) << " old heartbeat epoch, ignoring" << dendl
;
413 mds_load
[who
] = m
->get_load();
414 mds_import_map
[who
] = m
->get_import_map();
417 unsigned cluster_size
= mds
->get_mds_map()->get_num_in_mds();
418 if (mds_load
.size() == cluster_size
) {
420 //export_empties(); // no!
422 /* avoid spamming ceph -w if user does not turn mantle on */
423 if (mds
->mdsmap
->get_balancer() != "") {
424 int r
= mantle_prep_rebalance();
426 mds
->clog
->warn() << "using old balancer; mantle failed for "
427 << "balancer=" << mds
->mdsmap
->get_balancer()
428 << " : " << cpp_strerror(r
);
430 prep_rebalance(m
->get_beat());
439 double MDBalancer::try_match(balance_state_t
& state
, mds_rank_t ex
, double& maxex
,
440 mds_rank_t im
, double& maxim
)
442 if (maxex
<= 0 || maxim
<= 0) return 0.0;
444 double howmuch
= MIN(maxex
, maxim
);
445 if (howmuch
<= 0) return 0.0;
447 dout(5) << " - mds." << ex
<< " exports " << howmuch
<< " to mds." << im
<< dendl
;
449 if (ex
== mds
->get_nodeid())
450 state
.targets
[im
] += howmuch
;
452 state
.exported
[ex
] += howmuch
;
453 state
.imported
[im
] += howmuch
;
461 void MDBalancer::queue_split(const CDir
*dir
, bool fast
)
463 dout(10) << __func__
<< " enqueuing " << *dir
464 << " (fast=" << fast
<< ")" << dendl
;
466 assert(mds
->mdsmap
->allows_dirfrags());
467 const dirfrag_t frag
= dir
->dirfrag();
469 auto callback
= [this, frag
](int r
) {
470 if (split_pending
.erase(frag
) == 0) {
471 // Someone beat me to it. This can happen in the fast splitting
472 // path, because we spawn two contexts, one with mds->timer and
473 // one with mds->queue_waiter. The loser can safely just drop
478 CDir
*split_dir
= mds
->mdcache
->get_dirfrag(frag
);
480 dout(10) << "drop split on " << frag
<< " because not in cache" << dendl
;
483 if (!split_dir
->is_auth()) {
484 dout(10) << "drop split on " << frag
<< " because non-auth" << dendl
;
488 // Pass on to MDCache: note that the split might still not
489 // happen if the checks in MDCache::can_fragment fail.
490 dout(10) << __func__
<< " splitting " << *split_dir
<< dendl
;
491 mds
->mdcache
->split_dir(split_dir
, g_conf
->mds_bal_split_bits
);
495 if (split_pending
.count(frag
) == 0) {
496 split_pending
.insert(frag
);
501 // Do the split ASAP: enqueue it in the MDSRank waiters which are
502 // run at the end of dispatching the current request
503 mds
->queue_waiter(new MDSInternalContextWrapper(mds
,
504 new FunctionContext(callback
)));
506 // Set a timer to really do the split: we don't do it immediately
507 // so that bursts of ops on a directory have a chance to go through
508 // before we freeze it.
509 mds
->timer
.add_event_after(g_conf
->mds_bal_fragment_interval
,
510 new FunctionContext(callback
));
514 void MDBalancer::queue_merge(CDir
*dir
)
516 const auto frag
= dir
->dirfrag();
517 auto callback
= [this, frag
](int r
) {
518 assert(frag
.frag
!= frag_t());
520 // frag must be in this set because only one context is in flight
521 // for a given frag at a time (because merge_pending is checked before
522 // starting one), and this context is the only one that erases it.
523 merge_pending
.erase(frag
);
525 CDir
*dir
= mds
->mdcache
->get_dirfrag(frag
);
527 dout(10) << "drop merge on " << frag
<< " because not in cache" << dendl
;
530 assert(dir
->dirfrag() == frag
);
532 if(!dir
->is_auth()) {
533 dout(10) << "drop merge on " << *dir
<< " because lost auth" << dendl
;
537 dout(10) << "merging " << *dir
<< dendl
;
539 CInode
*diri
= dir
->get_inode();
541 frag_t fg
= dir
->get_frag();
542 while (fg
!= frag_t()) {
543 frag_t sibfg
= fg
.get_sibling();
545 bool complete
= diri
->get_dirfrags_under(sibfg
, sibs
);
547 dout(10) << " not all sibs under " << sibfg
<< " in cache (have " << sibs
<< ")" << dendl
;
551 for (list
<CDir
*>::iterator p
= sibs
.begin(); p
!= sibs
.end(); ++p
) {
553 if (!sib
->is_auth() || !sib
->should_merge()) {
559 dout(10) << " not all sibs under " << sibfg
<< " " << sibs
<< " should_merge" << dendl
;
562 dout(10) << " all sibs under " << sibfg
<< " " << sibs
<< " should merge" << dendl
;
566 if (fg
!= dir
->get_frag())
567 mds
->mdcache
->merge_dir(diri
, fg
);
570 if (merge_pending
.count(frag
) == 0) {
571 dout(20) << __func__
<< " enqueued dir " << *dir
<< dendl
;
572 merge_pending
.insert(frag
);
573 mds
->timer
.add_event_after(g_conf
->mds_bal_fragment_interval
,
574 new FunctionContext(callback
));
576 dout(20) << __func__
<< " dir already in queue " << *dir
<< dendl
;
580 void MDBalancer::prep_rebalance(int beat
)
582 balance_state_t state
;
584 if (g_conf
->mds_thrash_exports
) {
585 //we're going to randomly export to all the mds in the cluster
586 set
<mds_rank_t
> up_mds
;
587 mds
->get_mds_map()->get_up_mds_set(up_mds
);
588 for (const auto &rank
: up_mds
) {
589 state
.targets
[rank
] = 0.0;
592 int cluster_size
= mds
->get_mds_map()->get_num_in_mds();
593 mds_rank_t whoami
= mds
->get_nodeid();
594 rebalance_time
= ceph_clock_now();
596 dout(5) << " prep_rebalance: cluster loads are" << dendl
;
598 mds
->mdcache
->migrator
->clear_export_queue();
600 // rescale! turn my mds_load back into meta_load units
601 double load_fac
= 1.0;
602 map
<mds_rank_t
, mds_load_t
>::iterator m
= mds_load
.find(whoami
);
603 if ((m
!= mds_load
.end()) && (m
->second
.mds_load() > 0)) {
604 double metald
= m
->second
.auth
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
605 double mdsld
= m
->second
.mds_load();
606 load_fac
= metald
/ mdsld
;
607 dout(7) << " load_fac is " << load_fac
608 << " <- " << m
->second
.auth
<< " " << metald
613 mds_meta_load
.clear();
615 double total_load
= 0.0;
616 multimap
<double,mds_rank_t
> load_map
;
617 for (mds_rank_t i
=mds_rank_t(0); i
< mds_rank_t(cluster_size
); i
++) {
618 mds_load_t
& load
= mds_load
.at(i
);
620 double l
= load
.mds_load() * load_fac
;
621 mds_meta_load
[i
] = l
;
624 dout(5) << " mds." << i
626 << " = " << load
.mds_load()
627 << " ~ " << l
<< dendl
;
629 if (whoami
== i
) my_load
= l
;
632 load_map
.insert(pair
<double,mds_rank_t
>( l
, i
));
636 target_load
= total_load
/ (double)cluster_size
;
637 dout(5) << "prep_rebalance: my load " << my_load
638 << " target " << target_load
639 << " total " << total_load
643 for (auto p
: load_map
) {
644 if (p
.first
< target_load
* (1.0 + g_conf
->mds_bal_min_rebalance
)) {
645 dout(5) << " mds." << p
.second
<< " is underloaded or barely overloaded." << dendl
;
646 mds_last_epoch_under_map
[p
.second
] = beat_epoch
;
650 int last_epoch_under
= mds_last_epoch_under_map
[whoami
];
651 if (last_epoch_under
== beat_epoch
) {
652 dout(5) << " i am underloaded or barely overloaded, doing nothing." << dendl
;
655 // am i over long enough?
656 if (last_epoch_under
&& beat_epoch
- last_epoch_under
< 2) {
657 dout(5) << " i am overloaded, but only for " << (beat_epoch
- last_epoch_under
) << " epochs" << dendl
;
661 dout(5) << " i am sufficiently overloaded" << dendl
;
664 // first separate exporters and importers
665 multimap
<double,mds_rank_t
> importers
;
666 multimap
<double,mds_rank_t
> exporters
;
667 set
<mds_rank_t
> importer_set
;
668 set
<mds_rank_t
> exporter_set
;
670 for (multimap
<double,mds_rank_t
>::iterator it
= load_map
.begin();
671 it
!= load_map
.end();
673 if (it
->first
< target_load
) {
674 dout(15) << " mds." << it
->second
<< " is importer" << dendl
;
675 importers
.insert(pair
<double,mds_rank_t
>(it
->first
,it
->second
));
676 importer_set
.insert(it
->second
);
678 int mds_last_epoch_under
= mds_last_epoch_under_map
[it
->second
];
679 if (!(mds_last_epoch_under
&& beat_epoch
- mds_last_epoch_under
< 2)) {
680 dout(15) << " mds." << it
->second
<< " is exporter" << dendl
;
681 exporters
.insert(pair
<double,mds_rank_t
>(it
->first
,it
->second
));
682 exporter_set
.insert(it
->second
);
688 // determine load transfer mapping
691 // analyze import_map; do any matches i can
693 dout(15) << " matching exporters to import sources" << dendl
;
695 // big -> small exporters
696 for (multimap
<double,mds_rank_t
>::reverse_iterator ex
= exporters
.rbegin();
697 ex
!= exporters
.rend();
699 double maxex
= get_maxex(state
, ex
->second
);
700 if (maxex
<= .001) continue;
702 // check importers. for now, just in arbitrary order (no intelligent matching).
703 for (map
<mds_rank_t
, float>::iterator im
= mds_import_map
[ex
->second
].begin();
704 im
!= mds_import_map
[ex
->second
].end();
706 double maxim
= get_maxim(state
, im
->first
);
707 if (maxim
<= .001) continue;
708 try_match(state
, ex
->second
, maxex
, im
->first
, maxim
);
709 if (maxex
<= .001) break;
716 dout(15) << " matching big exporters to big importers" << dendl
;
717 // big exporters to big importers
718 multimap
<double,mds_rank_t
>::reverse_iterator ex
= exporters
.rbegin();
719 multimap
<double,mds_rank_t
>::iterator im
= importers
.begin();
720 while (ex
!= exporters
.rend() &&
721 im
!= importers
.end()) {
722 double maxex
= get_maxex(state
, ex
->second
);
723 double maxim
= get_maxim(state
, im
->second
);
724 if (maxex
< .001 || maxim
< .001) break;
725 try_match(state
, ex
->second
, maxex
, im
->second
, maxim
);
726 if (maxex
<= .001) ++ex
;
727 if (maxim
<= .001) ++im
;
730 dout(15) << " matching small exporters to big importers" << dendl
;
731 // small exporters to big importers
732 multimap
<double,mds_rank_t
>::iterator ex
= exporters
.begin();
733 multimap
<double,mds_rank_t
>::iterator im
= importers
.begin();
734 while (ex
!= exporters
.end() &&
735 im
!= importers
.end()) {
736 double maxex
= get_maxex(state
, ex
->second
);
737 double maxim
= get_maxim(state
, im
->second
);
738 if (maxex
< .001 || maxim
< .001) break;
739 try_match(state
, ex
->second
, maxex
, im
->second
, maxim
);
740 if (maxex
<= .001) ++ex
;
741 if (maxim
<= .001) ++im
;
745 try_rebalance(state
);
748 int MDBalancer::mantle_prep_rebalance()
750 balance_state_t state
;
752 /* refresh balancer if it has changed */
753 if (bal_version
!= mds
->mdsmap
->get_balancer()) {
754 bal_version
.assign("");
755 int r
= localize_balancer();
758 /* only spam the cluster log from 1 mds on version changes */
759 if (mds
->get_nodeid() == 0)
760 mds
->clog
->info() << "mantle balancer version changed: " << bal_version
;
763 /* prepare for balancing */
764 int cluster_size
= mds
->get_mds_map()->get_num_in_mds();
765 rebalance_time
= ceph_clock_now();
766 mds
->mdcache
->migrator
->clear_export_queue();
768 /* fill in the metrics for each mds by grabbing load struct */
769 vector
< map
<string
, double> > metrics (cluster_size
);
770 for (mds_rank_t i
=mds_rank_t(0); i
< mds_rank_t(cluster_size
); i
++) {
771 mds_load_t
& load
= mds_load
.at(i
);
773 metrics
[i
] = {{"auth.meta_load", load
.auth
.meta_load()},
774 {"all.meta_load", load
.all
.meta_load()},
775 {"req_rate", load
.req_rate
},
776 {"queue_len", load
.queue_len
},
777 {"cpu_load_avg", load
.cpu_load_avg
}};
780 /* execute the balancer */
782 int ret
= mantle
.balance(bal_code
, mds
->get_nodeid(), metrics
, state
.targets
);
783 dout(2) << " mantle decided that new targets=" << state
.targets
<< dendl
;
785 /* mantle doesn't know about cluster size, so check target len here */
786 if ((int) state
.targets
.size() != cluster_size
)
791 try_rebalance(state
);
797 void MDBalancer::try_rebalance(balance_state_t
& state
)
799 if (g_conf
->mds_thrash_exports
) {
800 dout(5) << "mds_thrash is on; not performing standard rebalance operation!"
805 // make a sorted list of my imports
806 multimap
<double, CDir
*> import_pop_map
;
807 multimap
<mds_rank_t
, pair
<CDir
*, double> > import_from_map
;
808 set
<CDir
*> fullauthsubs
;
810 mds
->mdcache
->get_fullauth_subtrees(fullauthsubs
);
811 for (auto dir
: fullauthsubs
) {
812 CInode
*diri
= dir
->get_inode();
813 if (diri
->is_mdsdir())
815 if (diri
->get_export_pin(false) != MDS_RANK_NONE
)
817 if (dir
->is_freezing() || dir
->is_frozen())
818 continue; // export pbly already in progress
820 mds_rank_t from
= diri
->authority().first
;
821 double pop
= dir
->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
822 if (g_conf
->mds_bal_idle_threshold
> 0 &&
823 pop
< g_conf
->mds_bal_idle_threshold
&&
824 diri
!= mds
->mdcache
->get_root() &&
825 from
!= mds
->get_nodeid()) {
826 dout(5) << " exporting idle (" << pop
<< ") import " << *dir
827 << " back to mds." << from
<< dendl
;
828 mds
->mdcache
->migrator
->export_dir_nicely(dir
, from
);
832 dout(15) << " map: i imported " << *dir
<< " from " << from
<< dendl
;
833 import_pop_map
.insert(make_pair(pop
, dir
));
834 import_from_map
.insert(make_pair(from
, make_pair(dir
, pop
)));
838 map
<mds_rank_t
, double> export_pop_map
;
840 for (auto &it
: state
.targets
) {
841 mds_rank_t target
= it
.first
;
842 double amount
= it
.second
;
844 if (amount
/ target_load
< .2)
846 if (amount
< MIN_OFFLOAD
)
849 dout(5) << "want to send " << amount
<< " to mds." << target
850 //<< " .. " << (*it).second << " * " << load_fac
852 << dendl
;//" .. fudge is " << fudge << dendl;
854 double& have
= export_pop_map
[target
];
856 mds
->mdcache
->show_subtrees();
858 // search imports from target
859 if (import_from_map
.count(target
)) {
860 dout(5) << " aha, looking through imports from target mds." << target
<< dendl
;
861 for (auto p
= import_from_map
.equal_range(target
);
862 p
.first
!= p
.second
; ) {
863 CDir
*dir
= p
.first
->second
.first
;
864 double pop
= p
.first
->second
.second
;
865 dout(5) << "considering " << *dir
<< " from " << (*p
.first
).first
<< dendl
;
866 auto plast
= p
.first
++;
868 if (dir
->inode
->is_base())
870 assert(dir
->inode
->authority().first
== target
); // cuz that's how i put it in the map, dummy
872 if (pop
<= amount
-have
) {
873 dout(5) << "reexporting " << *dir
<< " pop " << pop
874 << " back to mds." << target
<< dendl
;
875 mds
->mdcache
->migrator
->export_dir_nicely(dir
, target
);
877 import_from_map
.erase(plast
);
878 for (auto q
= import_pop_map
.equal_range(pop
);
879 q
.first
!= q
.second
; ) {
880 if (q
.first
->second
== dir
) {
881 import_pop_map
.erase(q
.first
);
887 dout(5) << "can't reexport " << *dir
<< ", too big " << pop
<< dendl
;
889 if (amount
-have
< MIN_OFFLOAD
)
896 for (auto &it
: state
.targets
) {
897 mds_rank_t target
= it
.first
;
898 double amount
= it
.second
;
900 if (!export_pop_map
.count(target
))
902 double& have
= export_pop_map
[target
];
903 if (amount
-have
< MIN_OFFLOAD
)
906 for (auto p
= import_pop_map
.begin();
907 p
!= import_pop_map
.end(); ) {
908 CDir
*dir
= p
->second
;
909 if (dir
->inode
->is_base()) {
914 double pop
= p
->first
;
915 if (pop
<= amount
-have
&& pop
> MIN_REEXPORT
) {
916 dout(0) << "reexporting " << *dir
<< " pop " << pop
917 << " to mds." << target
<< dendl
;
919 mds
->mdcache
->migrator
->export_dir_nicely(dir
, target
);
920 import_pop_map
.erase(p
++);
924 if (amount
-have
< MIN_OFFLOAD
)
929 set
<CDir
*> already_exporting
;
931 for (auto &it
: state
.targets
) {
932 mds_rank_t target
= it
.first
;
933 double amount
= it
.second
;
935 if (!export_pop_map
.count(target
))
937 double& have
= export_pop_map
[target
];
938 if (amount
-have
< MIN_OFFLOAD
)
941 // okay, search for fragments of my workload
944 for (auto p
= import_pop_map
.rbegin();
945 p
!= import_pop_map
.rend();
947 CDir
*dir
= p
->second
;
948 find_exports(dir
, amount
, exports
, have
, already_exporting
);
949 if (amount
-have
< MIN_OFFLOAD
)
952 //fudge = amount - have;
954 for (auto dir
: exports
) {
955 dout(5) << " - exporting " << dir
->pop_auth_subtree
956 << " " << dir
->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
)
957 << " to mds." << target
<< " " << *dir
<< dendl
;
958 mds
->mdcache
->migrator
->export_dir_nicely(dir
, target
);
962 dout(5) << "rebalance done" << dendl
;
963 mds
->mdcache
->show_subtrees();
966 void MDBalancer::find_exports(CDir
*dir
,
968 list
<CDir
*>& exports
,
970 set
<CDir
*>& already_exporting
)
972 utime_t now
= ceph_clock_now();
973 if ((double)(now
- rebalance_time
) > 0.1) {
974 derr
<< " balancer runs too long" << dendl_impl
;
979 assert(dir
->is_auth());
981 double need
= amount
- have
;
982 if (need
< amount
* g_conf
->mds_bal_min_start
)
983 return; // good enough!
985 double needmax
= need
* g_conf
->mds_bal_need_max
;
986 double needmin
= need
* g_conf
->mds_bal_need_min
;
987 double midchunk
= need
* g_conf
->mds_bal_midchunk
;
988 double minchunk
= need
* g_conf
->mds_bal_minchunk
;
990 list
<CDir
*> bigger_rep
, bigger_unrep
;
991 multimap
<double, CDir
*> smaller
;
993 double dir_pop
= dir
->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
994 dout(7) << " find_exports in " << dir_pop
<< " " << *dir
<< " need " << need
<< " (" << needmin
<< " - " << needmax
<< ")" << dendl
;
996 double subdir_sum
= 0;
997 for (elist
<CInode
*>::iterator it
= dir
->pop_lru_subdirs
.begin_use_current();
1002 assert(in
->is_dir());
1003 assert(in
->get_parent_dir() == dir
);
1006 in
->get_nested_dirfrags(dfls
);
1008 size_t num_idle_frags
= 0;
1009 for (list
<CDir
*>::iterator p
= dfls
.begin();
1013 if (already_exporting
.count(subdir
))
1016 // we know all ancestor dirfrags up to subtree root are not freezing or frozen.
1017 // It's more efficient to use CDir::is_{freezing,frozen}_tree_root()
1018 if (subdir
->is_frozen_dir() || subdir
->is_frozen_tree_root() ||
1019 subdir
->is_freezing_dir() || subdir
->is_freezing_tree_root())
1020 continue; // can't export this right now!
1023 double pop
= subdir
->pop_auth_subtree
.meta_load(rebalance_time
, mds
->mdcache
->decayrate
);
1025 dout(15) << " subdir pop " << pop
<< " " << *subdir
<< dendl
;
1027 if (pop
< minchunk
) {
1033 if (pop
> needmin
&& pop
< needmax
) {
1034 exports
.push_back(subdir
);
1035 already_exporting
.insert(subdir
);
1041 if (subdir
->is_rep())
1042 bigger_rep
.push_back(subdir
);
1044 bigger_unrep
.push_back(subdir
);
1046 smaller
.insert(pair
<double,CDir
*>(pop
, subdir
));
1048 if (dfls
.size() == num_idle_frags
)
1049 in
->item_pop_lru
.remove_myself();
1051 dout(15) << " sum " << subdir_sum
<< " / " << dir_pop
<< dendl
;
1053 // grab some sufficiently big small items
1054 multimap
<double,CDir
*>::reverse_iterator it
;
1055 for (it
= smaller
.rbegin();
1056 it
!= smaller
.rend();
1059 if ((*it
).first
< midchunk
)
1062 dout(7) << " taking smaller " << *(*it
).second
<< dendl
;
1064 exports
.push_back((*it
).second
);
1065 already_exporting
.insert((*it
).second
);
1066 have
+= (*it
).first
;
1071 // apprently not enough; drill deeper into the hierarchy (if non-replicated)
1072 for (list
<CDir
*>::iterator it
= bigger_unrep
.begin();
1073 it
!= bigger_unrep
.end();
1075 dout(15) << " descending into " << **it
<< dendl
;
1076 find_exports(*it
, amount
, exports
, have
, already_exporting
);
1081 // ok fine, use smaller bits
1083 it
!= smaller
.rend();
1085 dout(7) << " taking (much) smaller " << it
->first
<< " " << *(*it
).second
<< dendl
;
1087 exports
.push_back((*it
).second
);
1088 already_exporting
.insert((*it
).second
);
1089 have
+= (*it
).first
;
1094 // ok fine, drill into replicated dirs
1095 for (list
<CDir
*>::iterator it
= bigger_rep
.begin();
1096 it
!= bigger_rep
.end();
1098 dout(7) << " descending into replicated " << **it
<< dendl
;
1099 find_exports(*it
, amount
, exports
, have
, already_exporting
);
1105 void MDBalancer::hit_inode(const utime_t
& now
, CInode
*in
, int type
, int who
)
1108 in
->pop
.get(type
).hit(now
, mds
->mdcache
->decayrate
);
1110 if (in
->get_parent_dn())
1111 hit_dir(now
, in
->get_parent_dn()->get_dir(), type
, who
);
1114 void MDBalancer::maybe_fragment(CDir
*dir
, bool hot
)
1117 if (g_conf
->mds_bal_frag
&& g_conf
->mds_bal_fragment_interval
> 0 &&
1118 !dir
->inode
->is_base() && // not root/base (for now at least)
1122 if (g_conf
->mds_bal_split_size
> 0 &&
1123 mds
->mdsmap
->allows_dirfrags() &&
1124 (dir
->should_split() || hot
))
1126 if (split_pending
.count(dir
->dirfrag()) == 0) {
1127 queue_split(dir
, false);
1129 if (dir
->should_split_fast()) {
1130 queue_split(dir
, true);
1132 dout(10) << __func__
<< ": fragment already enqueued to split: "
1139 if (dir
->get_frag() != frag_t() && dir
->should_merge() &&
1140 merge_pending
.count(dir
->dirfrag()) == 0) {
1146 void MDBalancer::hit_dir(const utime_t
& now
, CDir
*dir
, int type
, int who
, double amount
)
1149 double v
= dir
->pop_me
.get(type
).hit(now
, mds
->mdcache
->decayrate
, amount
);
1151 const bool hot
= (v
> g_conf
->mds_bal_split_rd
&& type
== META_POP_IRD
) ||
1152 (v
> g_conf
->mds_bal_split_wr
&& type
== META_POP_IWR
);
1154 dout(20) << "hit_dir " << type
<< " pop is " << v
<< ", frag " << dir
->get_frag()
1155 << " size " << dir
->get_frag_size() << dendl
;
1157 maybe_fragment(dir
, hot
);
1160 if (type
== META_POP_IRD
&& who
>= 0) {
1161 dir
->pop_spread
.hit(now
, mds
->mdcache
->decayrate
, who
);
1164 double rd_adj
= 0.0;
1165 if (type
== META_POP_IRD
&&
1166 dir
->last_popularity_sample
< last_sample
) {
1167 double dir_pop
= dir
->pop_auth_subtree
.get(type
).get(now
, mds
->mdcache
->decayrate
); // hmm??
1168 dir
->last_popularity_sample
= last_sample
;
1169 double pop_sp
= dir
->pop_spread
.get(now
, mds
->mdcache
->decayrate
);
1170 dir_pop
+= pop_sp
* 10;
1172 //if (dir->ino() == inodeno_t(0x10000000002))
1174 dout(20) << "hit_dir " << type
<< " pop " << dir_pop
<< " spread " << pop_sp
1175 << " " << dir
->pop_spread
.last
[0]
1176 << " " << dir
->pop_spread
.last
[1]
1177 << " " << dir
->pop_spread
.last
[2]
1178 << " " << dir
->pop_spread
.last
[3]
1179 << " in " << *dir
<< dendl
;
1182 if (dir
->is_auth() && !dir
->is_ambiguous_auth()) {
1183 if (!dir
->is_rep() &&
1184 dir_pop
>= g_conf
->mds_bal_replicate_threshold
) {
1186 double rdp
= dir
->pop_me
.get(META_POP_IRD
).get(now
, mds
->mdcache
->decayrate
);
1187 rd_adj
= rdp
/ mds
->get_mds_map()->get_num_in_mds() - rdp
;
1188 rd_adj
/= 2.0; // temper somewhat
1190 dout(5) << "replicating dir " << *dir
<< " pop " << dir_pop
<< " .. rdp " << rdp
<< " adj " << rd_adj
<< dendl
;
1192 dir
->dir_rep
= CDir::REP_ALL
;
1193 mds
->mdcache
->send_dir_updates(dir
, true);
1195 // fixme this should adjust the whole pop hierarchy
1196 dir
->pop_me
.get(META_POP_IRD
).adjust(rd_adj
);
1197 dir
->pop_auth_subtree
.get(META_POP_IRD
).adjust(rd_adj
);
1200 if (dir
->ino() != 1 &&
1202 dir_pop
< g_conf
->mds_bal_unreplicate_threshold
) {
1204 dout(5) << "unreplicating dir " << *dir
<< " pop " << dir_pop
<< dendl
;
1206 dir
->dir_rep
= CDir::REP_NONE
;
1207 mds
->mdcache
->send_dir_updates(dir
);
1213 bool hit_subtree
= dir
->is_auth(); // current auth subtree (if any)
1214 bool hit_subtree_nested
= dir
->is_auth(); // all nested auth subtrees
1217 CDir
*pdir
= dir
->inode
->get_parent_dir();
1218 dir
->pop_nested
.get(type
).hit(now
, mds
->mdcache
->decayrate
, amount
);
1220 dir
->pop_nested
.get(META_POP_IRD
).adjust(now
, mds
->mdcache
->decayrate
, rd_adj
);
1223 dir
->pop_auth_subtree
.get(type
).hit(now
, mds
->mdcache
->decayrate
, amount
);
1226 dir
->pop_auth_subtree
.get(META_POP_IRD
).adjust(now
, mds
->mdcache
->decayrate
, rd_adj
);
1228 if (dir
->is_subtree_root())
1229 hit_subtree
= false; // end of auth domain, stop hitting auth counters.
1231 pdir
->pop_lru_subdirs
.push_front(&dir
->get_inode()->item_pop_lru
);
1234 if (hit_subtree_nested
) {
1235 dir
->pop_auth_subtree_nested
.get(type
).hit(now
, mds
->mdcache
->decayrate
, amount
);
1237 dir
->pop_auth_subtree_nested
.get(META_POP_IRD
).adjust(now
, mds
->mdcache
->decayrate
, rd_adj
);
1246 * subtract off an exported chunk.
1247 * this excludes *dir itself (encode_export_dir should have take care of that)
1248 * we _just_ do the parents' nested counters.
1250 * NOTE: call me _after_ forcing *dir into a subtree root,
1251 * but _before_ doing the encode_export_dirs.
1253 void MDBalancer::subtract_export(CDir
*dir
, utime_t now
)
1255 dirfrag_load_vec_t subload
= dir
->pop_auth_subtree
;
1258 dir
= dir
->inode
->get_parent_dir();
1261 dir
->pop_nested
.sub(now
, mds
->mdcache
->decayrate
, subload
);
1262 dir
->pop_auth_subtree_nested
.sub(now
, mds
->mdcache
->decayrate
, subload
);
1267 void MDBalancer::add_import(CDir
*dir
, utime_t now
)
1269 dirfrag_load_vec_t subload
= dir
->pop_auth_subtree
;
1272 dir
= dir
->inode
->get_parent_dir();
1275 dir
->pop_nested
.add(now
, mds
->mdcache
->decayrate
, subload
);
1276 dir
->pop_auth_subtree_nested
.add(now
, mds
->mdcache
->decayrate
, subload
);
1280 void MDBalancer::adjust_pop_for_rename(CDir
*pdir
, CDir
*dir
, utime_t now
, bool inc
)
1282 DecayRate
& rate
= mds
->mdcache
->decayrate
;
1284 bool adjust_subtree_nest
= dir
->is_auth();
1285 bool adjust_subtree
= adjust_subtree_nest
&& !dir
->is_subtree_root();
1289 pdir
->pop_nested
.add(now
, rate
, dir
->pop_nested
);
1290 if (adjust_subtree
) {
1291 pdir
->pop_auth_subtree
.add(now
, rate
, dir
->pop_auth_subtree
);
1292 pdir
->pop_lru_subdirs
.push_front(&cur
->get_inode()->item_pop_lru
);
1295 if (adjust_subtree_nest
)
1296 pdir
->pop_auth_subtree_nested
.add(now
, rate
, dir
->pop_auth_subtree_nested
);
1298 pdir
->pop_nested
.sub(now
, rate
, dir
->pop_nested
);
1300 pdir
->pop_auth_subtree
.sub(now
, rate
, dir
->pop_auth_subtree
);
1302 if (adjust_subtree_nest
)
1303 pdir
->pop_auth_subtree_nested
.sub(now
, rate
, dir
->pop_auth_subtree_nested
);
1306 if (pdir
->is_subtree_root())
1307 adjust_subtree
= false;
1309 pdir
= pdir
->inode
->get_parent_dir();
1314 void MDBalancer::handle_mds_failure(mds_rank_t who
)
1317 mds_last_epoch_under_map
.clear();
1321 int MDBalancer::dump_loads(Formatter
*f
)
1323 utime_t now
= ceph_clock_now();
1324 DecayRate
& decayrate
= mds
->mdcache
->decayrate
;
1327 if (mds
->mdcache
->get_root()) {
1328 mds
->mdcache
->get_root()->get_dirfrags(dfs
);
1330 dout(5) << "dump_load no root" << dendl
;
1333 f
->open_object_section("loads");
1335 f
->open_array_section("dirfrags");
1336 while (!dfs
.empty()) {
1337 CDir
*dir
= dfs
.front();
1341 f
->open_object_section("dir");
1342 dir
->dump_load(f
, now
, decayrate
);
1346 for (auto it
= dir
->begin(); it
!= dir
->end(); ++it
) {
1347 CInode
*in
= it
->second
->get_linkage()->get_inode();
1348 if (!in
|| !in
->is_dir())
1352 in
->get_dirfrags(ls
);
1353 for (auto subdir
: ls
) {
1354 if (subdir
->pop_nested
.meta_load() < .001)
1356 dfs
.push_back(subdir
);
1360 f
->close_section(); // dirfrags array
1362 f
->open_object_section("mds_load");
1365 auto dump_mds_load
= [f
, now
](mds_load_t
& load
) {
1366 f
->dump_float("request_rate", load
.req_rate
);
1367 f
->dump_float("cache_hit_rate", load
.cache_hit_rate
);
1368 f
->dump_float("queue_length", load
.queue_len
);
1369 f
->dump_float("cpu_load", load
.cpu_load_avg
);
1370 f
->dump_float("mds_load", load
.mds_load());
1372 DecayRate rate
; // no decay
1373 f
->open_object_section("auth_dirfrags");
1374 load
.auth
.dump(f
, now
, rate
);
1376 f
->open_object_section("all_dirfrags");
1377 load
.all
.dump(f
, now
, rate
);
1381 for (auto p
: mds_load
) {
1383 name
<< "mds." << p
.first
;
1384 f
->open_object_section(name
.str().c_str());
1385 dump_mds_load(p
.second
);
1389 f
->close_section(); // mds_load
1391 f
->open_object_section("mds_meta_load");
1392 for (auto p
: mds_meta_load
) {
1394 name
<< "mds." << p
.first
;
1395 f
->dump_float(name
.str().c_str(), p
.second
);
1397 f
->close_section(); // mds_meta_load
1399 f
->open_object_section("mds_import_map");
1400 for (auto p
: mds_import_map
) {
1402 name1
<< "mds." << p
.first
;
1403 f
->open_array_section(name1
.str().c_str());
1404 for (auto q
: p
.second
) {
1405 f
->open_object_section("from");
1407 name2
<< "mds." << q
.first
;
1408 f
->dump_float(name2
.str().c_str(), q
.second
);
1411 f
->close_section(); // mds.? array
1413 f
->close_section(); // mds_import_map
1415 f
->close_section(); // loads