1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/compat.h"
18 #include "mon/MonClient.h"
19 #include "MDBalancer.h"
28 #include "include/Context.h"
29 #include "msg/Messenger.h"
37 using std::chrono::duration_cast
;
39 #include "common/config.h"
40 #include "common/errno.h"
42 #define dout_context g_ceph_context
44 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal " << __func__ << " "
48 auto subsys = ceph_subsys_mds;\
49 if ((dout_context)->_conf->subsys.should_gather(ceph_subsys_mds_balancer, lvl)) {\
50 subsys = ceph_subsys_mds_balancer;\
52 dout_impl(dout_context, ceph::dout::need_dynamic(subsys), lvl) dout_prefix
54 #define dendl dendl_impl; } while (0)
57 #define MIN_LOAD 50 // ??
58 #define MIN_REEXPORT 5 // will automatically reexport
59 #define MIN_OFFLOAD 10 // point at which i stop trying, close enough
62 int MDBalancer::proc_message(const cref_t
<Message
> &m
)
64 switch (m
->get_type()) {
66 case MSG_MDS_HEARTBEAT
:
67 handle_heartbeat(ref_cast
<MHeartbeat
>(m
));
71 derr
<< " balancer unknown message " << m
->get_type() << dendl_impl
;
72 ceph_abort_msg("balancer unknown message");
78 MDBalancer::MDBalancer(MDSRank
*m
, Messenger
*msgr
, MonClient
*monc
) :
79 mds(m
), messenger(msgr
), mon_client(monc
)
81 bal_fragment_dirs
= g_conf().get_val
<bool>("mds_bal_fragment_dirs");
82 bal_fragment_interval
= g_conf().get_val
<int64_t>("mds_bal_fragment_interval");
85 void MDBalancer::handle_conf_change(const std::set
<std::string
>& changed
, const MDSMap
& mds_map
)
87 if (changed
.count("mds_bal_fragment_dirs"))
88 bal_fragment_dirs
= g_conf().get_val
<bool>("mds_bal_fragment_dirs");
89 if (changed
.count("mds_bal_fragment_interval"))
90 bal_fragment_interval
= g_conf().get_val
<int64_t>("mds_bal_fragment_interval");
93 void MDBalancer::handle_export_pins(void)
95 auto &q
= mds
->mdcache
->export_pin_queue
;
97 dout(20) << "export_pin_queue size=" << q
.size() << dendl
;
98 while (it
!= q
.end()) {
101 ceph_assert(in
->is_dir());
102 mds_rank_t export_pin
= in
->get_export_pin(false);
103 if (export_pin
>= mds
->mdsmap
->get_max_mds()) {
104 dout(20) << " delay export pin on " << *in
<< dendl
;
105 in
->state_clear(CInode::STATE_QUEUEDEXPORTPIN
);
108 in
->state_set(CInode::STATE_DELAYEDEXPORTPIN
);
109 mds
->mdcache
->export_pin_delayed_queue
.insert(in
);
114 auto&& dfls
= in
->get_dirfrags();
115 for (auto dir
: dfls
) {
119 if (export_pin
== MDS_RANK_NONE
) {
120 if (dir
->state_test(CDir::STATE_AUXSUBTREE
)) {
121 if (dir
->is_frozen() || dir
->is_freezing()) {
126 dout(10) << " clear auxsubtree on " << *dir
<< dendl
;
127 dir
->state_clear(CDir::STATE_AUXSUBTREE
);
128 mds
->mdcache
->try_subtree_merge(dir
);
130 } else if (export_pin
== mds
->get_nodeid()) {
131 if (dir
->state_test(CDir::STATE_CREATING
) ||
132 dir
->is_frozen() || dir
->is_freezing()) {
137 if (!dir
->is_subtree_root()) {
138 dir
->state_set(CDir::STATE_AUXSUBTREE
);
139 mds
->mdcache
->adjust_subtree_auth(dir
, mds
->get_nodeid());
140 dout(10) << " create aux subtree on " << *dir
<< dendl
;
141 } else if (!dir
->state_test(CDir::STATE_AUXSUBTREE
)) {
142 dout(10) << " set auxsubtree bit on " << *dir
<< dendl
;
143 dir
->state_set(CDir::STATE_AUXSUBTREE
);
146 mds
->mdcache
->migrator
->export_dir(dir
, export_pin
);
152 in
->state_clear(CInode::STATE_QUEUEDEXPORTPIN
);
157 std::vector
<CDir
*> authsubs
= mds
->mdcache
->get_auth_subtrees();
158 bool print_auth_subtrees
= true;
160 if (authsubs
.size() > AUTH_TREES_THRESHOLD
&&
161 !g_conf()->subsys
.should_gather
<ceph_subsys_mds
, 25>()) {
162 dout(15) << "number of auth trees = " << authsubs
.size() << "; not "
163 "printing auth trees" << dendl
;
164 print_auth_subtrees
= false;
167 for (auto &cd
: authsubs
) {
168 mds_rank_t export_pin
= cd
->inode
->get_export_pin();
170 if (print_auth_subtrees
) {
171 dout(25) << "auth tree " << *cd
<< " export_pin=" << export_pin
<<
175 if (export_pin
>= 0 && export_pin
< mds
->mdsmap
->get_max_mds()
176 && export_pin
!= mds
->get_nodeid()) {
177 mds
->mdcache
->migrator
->export_dir(cd
, export_pin
);
182 void MDBalancer::tick()
184 static int num_bal_times
= g_conf()->mds_bal_max
;
185 auto bal_interval
= g_conf().get_val
<int64_t>("mds_bal_interval");
186 auto bal_max_until
= g_conf().get_val
<int64_t>("mds_bal_max_until");
187 time now
= clock::now();
189 if (g_conf()->mds_bal_export_pin
) {
190 handle_export_pins();
194 if (chrono::duration
<double>(now
-last_sample
).count() >
195 g_conf()->mds_bal_sample_interval
) {
196 dout(15) << "tick last_sample now " << now
<< dendl
;
200 // We can use duration_cast below, although the result is an int,
201 // because the values from g_conf are also integers.
203 if (mds
->get_nodeid() == 0
206 && duration_cast
<chrono::seconds
>(now
- last_heartbeat
).count() >= bal_interval
207 && (num_bal_times
|| (bal_max_until
>= 0 && mds
->get_uptime().count() > bal_max_until
))) {
208 last_heartbeat
= now
;
213 mds
->mdcache
->show_subtrees(10, true);
219 class C_Bal_SendHeartbeat
: public MDSInternalContext
{
221 explicit C_Bal_SendHeartbeat(MDSRank
*mds_
) : MDSInternalContext(mds_
) { }
222 void finish(int f
) override
{
223 mds
->balancer
->send_heartbeat();
228 double mds_load_t::mds_load() const
230 switch(g_conf()->mds_bal_mode
) {
233 .8 * auth
.meta_load() +
234 .2 * all
.meta_load() +
239 return req_rate
+ 10.0*queue_len
;
249 mds_load_t
MDBalancer::get_load()
251 auto now
= clock::now();
253 mds_load_t load
{DecayRate()}; /* zero DecayRate! */
255 if (mds
->mdcache
->get_root()) {
256 auto&& ls
= mds
->mdcache
->get_root()->get_dirfrags();
258 load
.auth
.add(d
->pop_auth_subtree_nested
);
259 load
.all
.add(d
->pop_nested
);
262 dout(20) << "no root, no load" << dendl
;
265 uint64_t num_requests
= mds
->get_num_requests();
267 uint64_t cpu_time
= 1;
269 string stat_path
= PROCPREFIX
"/proc/self/stat";
270 ifstream
stat_file(stat_path
);
271 if (stat_file
.is_open()) {
272 vector
<string
> stat_vec(std::istream_iterator
<string
>{stat_file
},
273 std::istream_iterator
<string
>());
274 if (stat_vec
.size() >= 15) {
276 cpu_time
= strtoll(stat_vec
[13].c_str(), nullptr, 10) +
277 strtoll(stat_vec
[14].c_str(), nullptr, 10);
279 derr
<< "input file '" << stat_path
<< "' not resolvable" << dendl_impl
;
282 derr
<< "input file '" << stat_path
<< "' not found" << dendl_impl
;
286 load
.queue_len
= messenger
->get_dispatch_queue_len();
288 bool update_last
= true;
289 if (last_get_load
!= clock::zero() &&
290 now
> last_get_load
) {
291 double el
= std::chrono::duration
<double>(now
-last_get_load
).count();
293 if (num_requests
> last_num_requests
)
294 load
.req_rate
= (num_requests
- last_num_requests
) / el
;
295 if (cpu_time
> last_cpu_time
)
296 load
.cpu_load_avg
= (cpu_time
- last_cpu_time
) / el
;
298 auto p
= mds_load
.find(mds
->get_nodeid());
299 if (p
!= mds_load
.end()) {
300 load
.req_rate
= p
->second
.req_rate
;
301 load
.cpu_load_avg
= p
->second
.cpu_load_avg
;
303 if (num_requests
>= last_num_requests
&& cpu_time
>= last_cpu_time
)
309 last_num_requests
= num_requests
;
310 last_cpu_time
= cpu_time
;
314 dout(15) << load
<< dendl
;
319 * Read synchronously from RADOS using a timeout. We cannot do daemon-local
320 * fallbacks (i.e. kick off async read when we are processing the map and
321 * check status when we get here) with the way the mds is structured.
323 int MDBalancer::localize_balancer()
325 /* reset everything */
329 ceph::mutex lock
= ceph::make_mutex("lock");
330 ceph::condition_variable cond
;
332 /* we assume that balancer is in the metadata pool */
333 object_t oid
= object_t(mds
->mdsmap
->get_balancer());
334 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
335 ceph_tid_t tid
= mds
->objecter
->read(oid
, oloc
, 0, 0, CEPH_NOSNAP
, &lua_src
, 0,
336 new C_SafeCond(lock
, cond
, &ack
, &r
));
337 dout(15) << "launched non-blocking read tid=" << tid
338 << " oid=" << oid
<< " oloc=" << oloc
<< dendl
;
340 /* timeout: if we waste half our time waiting for RADOS, then abort! */
341 std::cv_status ret_t
= [&] {
342 auto bal_interval
= g_conf().get_val
<int64_t>("mds_bal_interval");
343 std::unique_lock locker
{lock
};
344 return cond
.wait_for(locker
, std::chrono::seconds(bal_interval
/ 2));
346 /* success: store the balancer in memory and set the version. */
348 if (ret_t
== std::cv_status::timeout
) {
349 mds
->objecter
->op_cancel(tid
, -ECANCELED
);
352 bal_code
.assign(lua_src
.to_str());
353 bal_version
.assign(oid
.name
);
354 dout(10) "bal_code=" << bal_code
<< dendl
;
359 void MDBalancer::send_heartbeat()
361 if (mds
->is_cluster_degraded()) {
362 dout(10) << "degraded" << dendl
;
366 if (!mds
->mdcache
->is_open()) {
367 dout(10) << "not open" << dendl
;
368 mds
->mdcache
->wait_for_open(new C_Bal_SendHeartbeat(mds
));
372 if (mds
->get_nodeid() == 0) {
378 mds_load_t load
= get_load();
379 mds
->logger
->set(l_mds_load_cent
, 100 * load
.mds_load());
380 mds
->logger
->set(l_mds_dispatch_queue_len
, load
.queue_len
);
382 auto em
= mds_load
.emplace(std::piecewise_construct
, std::forward_as_tuple(mds
->get_nodeid()), std::forward_as_tuple(load
));
384 em
.first
->second
= load
;
387 // import_map -- how much do i import from whom
388 map
<mds_rank_t
, float> import_map
;
389 for (auto& im
: mds
->mdcache
->get_auth_subtrees()) {
390 mds_rank_t from
= im
->inode
->authority().first
;
391 if (from
== mds
->get_nodeid()) continue;
392 if (im
->get_inode()->is_stray()) continue;
393 import_map
[from
] += im
->pop_auth_subtree
.meta_load();
395 mds_import_map
[ mds
->get_nodeid() ] = import_map
;
398 dout(3) << " epoch " << beat_epoch
<< " load " << load
<< dendl
;
399 for (const auto& [rank
, load
] : import_map
) {
400 dout(5) << " import_map from " << rank
<< " -> " << load
<< dendl
;
405 mds
->get_mds_map()->get_up_mds_set(up
);
406 for (const auto& r
: up
) {
407 if (r
== mds
->get_nodeid())
409 auto hb
= make_message
<MHeartbeat
>(load
, beat_epoch
);
410 hb
->get_import_map() = import_map
;
411 mds
->send_message_mds(hb
, r
);
415 void MDBalancer::handle_heartbeat(const cref_t
<MHeartbeat
> &m
)
417 mds_rank_t who
= mds_rank_t(m
->get_source().num());
418 dout(25) << "=== got heartbeat " << m
->get_beat() << " from " << m
->get_source().num() << " " << m
->get_load() << dendl
;
420 if (!mds
->is_active())
423 if (!mds
->mdcache
->is_open()) {
424 dout(10) << "opening root on handle_heartbeat" << dendl
;
425 mds
->mdcache
->wait_for_open(new C_MDS_RetryMessage(mds
, m
));
429 if (mds
->is_cluster_degraded()) {
430 dout(10) << " degraded, ignoring" << dendl
;
434 if (mds
->get_nodeid() != 0 && m
->get_beat() > beat_epoch
) {
435 dout(10) << "receive next epoch " << m
->get_beat() << " from mds." << who
<< " before mds0" << dendl
;
437 beat_epoch
= m
->get_beat();
438 // clear the mds load info whose epoch is less than beat_epoch
443 dout(20) << " from mds0, new epoch " << m
->get_beat() << dendl
;
444 if (beat_epoch
!= m
->get_beat()) {
445 beat_epoch
= m
->get_beat();
451 mds
->mdcache
->show_subtrees();
452 } else if (mds
->get_nodeid() == 0) {
453 if (beat_epoch
!= m
->get_beat()) {
454 dout(10) << " old heartbeat epoch, ignoring" << dendl
;
460 auto em
= mds_load
.emplace(std::piecewise_construct
, std::forward_as_tuple(who
), std::forward_as_tuple(m
->get_load()));
462 em
.first
->second
= m
->get_load();
465 mds_import_map
[who
] = m
->get_import_map();
468 unsigned cluster_size
= mds
->get_mds_map()->get_num_in_mds();
469 if (mds_load
.size() == cluster_size
) {
471 //export_empties(); // no!
473 /* avoid spamming ceph -w if user does not turn mantle on */
474 if (mds
->mdsmap
->get_balancer() != "") {
475 int r
= mantle_prep_rebalance();
477 mds
->clog
->warn() << "using old balancer; mantle failed for "
478 << "balancer=" << mds
->mdsmap
->get_balancer()
479 << " : " << cpp_strerror(r
);
481 prep_rebalance(m
->get_beat());
486 double MDBalancer::try_match(balance_state_t
& state
, mds_rank_t ex
, double& maxex
,
487 mds_rank_t im
, double& maxim
)
489 if (maxex
<= 0 || maxim
<= 0) return 0.0;
491 double howmuch
= std::min(maxex
, maxim
);
493 dout(5) << " - mds." << ex
<< " exports " << howmuch
<< " to mds." << im
<< dendl
;
495 if (ex
== mds
->get_nodeid())
496 state
.targets
[im
] += howmuch
;
498 state
.exported
[ex
] += howmuch
;
499 state
.imported
[im
] += howmuch
;
507 void MDBalancer::queue_split(const CDir
*dir
, bool fast
)
509 dout(10) << __func__
<< " enqueuing " << *dir
510 << " (fast=" << fast
<< ")" << dendl
;
512 const dirfrag_t frag
= dir
->dirfrag();
514 auto callback
= [this, frag
](int r
) {
515 if (split_pending
.erase(frag
) == 0) {
516 // Someone beat me to it. This can happen in the fast splitting
517 // path, because we spawn two contexts, one with mds->timer and
518 // one with mds->queue_waiter. The loser can safely just drop
523 CDir
*split_dir
= mds
->mdcache
->get_dirfrag(frag
);
525 dout(10) << "drop split on " << frag
<< " because not in cache" << dendl
;
528 if (!split_dir
->is_auth()) {
529 dout(10) << "drop split on " << frag
<< " because non-auth" << dendl
;
533 // Pass on to MDCache: note that the split might still not
534 // happen if the checks in MDCache::can_fragment fail.
535 dout(10) << __func__
<< " splitting " << *split_dir
<< dendl
;
536 mds
->mdcache
->split_dir(split_dir
, g_conf()->mds_bal_split_bits
);
540 if (split_pending
.count(frag
) == 0) {
541 split_pending
.insert(frag
);
546 // Do the split ASAP: enqueue it in the MDSRank waiters which are
547 // run at the end of dispatching the current request
548 mds
->queue_waiter(new MDSInternalContextWrapper(mds
,
549 new LambdaContext(std::move(callback
))));
551 // Set a timer to really do the split: we don't do it immediately
552 // so that bursts of ops on a directory have a chance to go through
553 // before we freeze it.
554 mds
->timer
.add_event_after(bal_fragment_interval
,
555 new LambdaContext(std::move(callback
)));
559 void MDBalancer::queue_merge(CDir
*dir
)
561 const auto frag
= dir
->dirfrag();
562 auto callback
= [this, frag
](int r
) {
563 ceph_assert(frag
.frag
!= frag_t());
565 // frag must be in this set because only one context is in flight
566 // for a given frag at a time (because merge_pending is checked before
567 // starting one), and this context is the only one that erases it.
568 merge_pending
.erase(frag
);
570 CDir
*dir
= mds
->mdcache
->get_dirfrag(frag
);
572 dout(10) << "drop merge on " << frag
<< " because not in cache" << dendl
;
575 ceph_assert(dir
->dirfrag() == frag
);
577 if(!dir
->is_auth()) {
578 dout(10) << "drop merge on " << *dir
<< " because lost auth" << dendl
;
582 dout(10) << "merging " << *dir
<< dendl
;
584 CInode
*diri
= dir
->get_inode();
586 frag_t fg
= dir
->get_frag();
587 while (fg
!= frag_t()) {
588 frag_t sibfg
= fg
.get_sibling();
589 auto&& [complete
, sibs
] = diri
->get_dirfrags_under(sibfg
);
591 dout(10) << " not all sibs under " << sibfg
<< " in cache (have " << sibs
<< ")" << dendl
;
595 for (auto& sib
: sibs
) {
596 if (!sib
->is_auth() || !sib
->should_merge()) {
602 dout(10) << " not all sibs under " << sibfg
<< " " << sibs
<< " should_merge" << dendl
;
605 dout(10) << " all sibs under " << sibfg
<< " " << sibs
<< " should merge" << dendl
;
609 if (fg
!= dir
->get_frag())
610 mds
->mdcache
->merge_dir(diri
, fg
);
613 if (merge_pending
.count(frag
) == 0) {
614 dout(20) << " enqueued dir " << *dir
<< dendl
;
615 merge_pending
.insert(frag
);
616 mds
->timer
.add_event_after(bal_fragment_interval
,
617 new LambdaContext(std::move(callback
)));
619 dout(20) << " dir already in queue " << *dir
<< dendl
;
623 void MDBalancer::prep_rebalance(int beat
)
625 balance_state_t state
;
627 if (g_conf()->mds_thrash_exports
) {
628 //we're going to randomly export to all the mds in the cluster
629 set
<mds_rank_t
> up_mds
;
630 mds
->get_mds_map()->get_up_mds_set(up_mds
);
631 for (const auto &rank
: up_mds
) {
632 state
.targets
[rank
] = 0.0;
635 int cluster_size
= mds
->get_mds_map()->get_num_in_mds();
636 mds_rank_t whoami
= mds
->get_nodeid();
637 rebalance_time
= clock::now();
639 dout(7) << "cluster loads are" << dendl
;
641 mds
->mdcache
->migrator
->clear_export_queue();
643 // rescale! turn my mds_load back into meta_load units
644 double load_fac
= 1.0;
645 map
<mds_rank_t
, mds_load_t
>::iterator m
= mds_load
.find(whoami
);
646 if ((m
!= mds_load
.end()) && (m
->second
.mds_load() > 0)) {
647 double metald
= m
->second
.auth
.meta_load();
648 double mdsld
= m
->second
.mds_load();
649 load_fac
= metald
/ mdsld
;
650 dout(7) << " load_fac is " << load_fac
651 << " <- " << m
->second
.auth
<< " " << metald
656 mds_meta_load
.clear();
658 double total_load
= 0.0;
659 multimap
<double,mds_rank_t
> load_map
;
660 for (mds_rank_t i
=mds_rank_t(0); i
< mds_rank_t(cluster_size
); i
++) {
661 mds_load_t
& load
= mds_load
.at(i
);
663 double l
= load
.mds_load() * load_fac
;
664 mds_meta_load
[i
] = l
;
667 dout(7) << " mds." << i
669 << " = " << load
.mds_load()
670 << " ~ " << l
<< dendl
;
672 if (whoami
== i
) my_load
= l
;
675 load_map
.insert(pair
<double,mds_rank_t
>( l
, i
));
679 target_load
= total_load
/ (double)cluster_size
;
680 dout(7) << "my load " << my_load
681 << " target " << target_load
682 << " total " << total_load
686 for (const auto& [load
, rank
] : load_map
) {
687 if (load
< target_load
* (1.0 + g_conf()->mds_bal_min_rebalance
)) {
688 dout(7) << " mds." << rank
<< " is underloaded or barely overloaded." << dendl
;
689 mds_last_epoch_under_map
[rank
] = beat_epoch
;
693 int last_epoch_under
= mds_last_epoch_under_map
[whoami
];
694 if (last_epoch_under
== beat_epoch
) {
695 dout(7) << " i am underloaded or barely overloaded, doing nothing." << dendl
;
698 // am i over long enough?
699 if (last_epoch_under
&& beat_epoch
- last_epoch_under
< 2) {
700 dout(7) << " i am overloaded, but only for " << (beat_epoch
- last_epoch_under
) << " epochs" << dendl
;
704 dout(7) << " i am sufficiently overloaded" << dendl
;
707 // first separate exporters and importers
708 multimap
<double,mds_rank_t
> importers
;
709 multimap
<double,mds_rank_t
> exporters
;
710 set
<mds_rank_t
> importer_set
;
711 set
<mds_rank_t
> exporter_set
;
713 for (multimap
<double,mds_rank_t
>::iterator it
= load_map
.begin();
714 it
!= load_map
.end();
716 if (it
->first
< target_load
) {
717 dout(15) << " mds." << it
->second
<< " is importer" << dendl
;
718 importers
.insert(pair
<double,mds_rank_t
>(it
->first
,it
->second
));
719 importer_set
.insert(it
->second
);
721 int mds_last_epoch_under
= mds_last_epoch_under_map
[it
->second
];
722 if (!(mds_last_epoch_under
&& beat_epoch
- mds_last_epoch_under
< 2)) {
723 dout(15) << " mds." << it
->second
<< " is exporter" << dendl
;
724 exporters
.insert(pair
<double,mds_rank_t
>(it
->first
,it
->second
));
725 exporter_set
.insert(it
->second
);
731 // determine load transfer mapping
734 // analyze import_map; do any matches i can
736 dout(15) << " matching exporters to import sources" << dendl
;
738 // big -> small exporters
739 for (multimap
<double,mds_rank_t
>::reverse_iterator ex
= exporters
.rbegin();
740 ex
!= exporters
.rend();
742 double maxex
= get_maxex(state
, ex
->second
);
743 if (maxex
<= .001) continue;
745 // check importers. for now, just in arbitrary order (no intelligent matching).
746 for (map
<mds_rank_t
, float>::iterator im
= mds_import_map
[ex
->second
].begin();
747 im
!= mds_import_map
[ex
->second
].end();
749 double maxim
= get_maxim(state
, im
->first
);
750 if (maxim
<= .001) continue;
751 try_match(state
, ex
->second
, maxex
, im
->first
, maxim
);
752 if (maxex
<= .001) break;
759 dout(15) << " matching big exporters to big importers" << dendl
;
760 // big exporters to big importers
761 multimap
<double,mds_rank_t
>::reverse_iterator ex
= exporters
.rbegin();
762 multimap
<double,mds_rank_t
>::iterator im
= importers
.begin();
763 while (ex
!= exporters
.rend() &&
764 im
!= importers
.end()) {
765 double maxex
= get_maxex(state
, ex
->second
);
766 double maxim
= get_maxim(state
, im
->second
);
767 if (maxex
< .001 || maxim
< .001) break;
768 try_match(state
, ex
->second
, maxex
, im
->second
, maxim
);
769 if (maxex
<= .001) ++ex
;
770 if (maxim
<= .001) ++im
;
773 dout(15) << " matching small exporters to big importers" << dendl
;
774 // small exporters to big importers
775 multimap
<double,mds_rank_t
>::iterator ex
= exporters
.begin();
776 multimap
<double,mds_rank_t
>::iterator im
= importers
.begin();
777 while (ex
!= exporters
.end() &&
778 im
!= importers
.end()) {
779 double maxex
= get_maxex(state
, ex
->second
);
780 double maxim
= get_maxim(state
, im
->second
);
781 if (maxex
< .001 || maxim
< .001) break;
782 try_match(state
, ex
->second
, maxex
, im
->second
, maxim
);
783 if (maxex
<= .001) ++ex
;
784 if (maxim
<= .001) ++im
;
788 try_rebalance(state
);
791 int MDBalancer::mantle_prep_rebalance()
793 balance_state_t state
;
795 /* refresh balancer if it has changed */
796 if (bal_version
!= mds
->mdsmap
->get_balancer()) {
797 bal_version
.assign("");
798 int r
= localize_balancer();
801 /* only spam the cluster log from 1 mds on version changes */
802 if (mds
->get_nodeid() == 0)
803 mds
->clog
->info() << "mantle balancer version changed: " << bal_version
;
806 /* prepare for balancing */
807 int cluster_size
= mds
->get_mds_map()->get_num_in_mds();
808 rebalance_time
= clock::now();
809 mds
->mdcache
->migrator
->clear_export_queue();
811 /* fill in the metrics for each mds by grabbing load struct */
812 vector
< map
<string
, double> > metrics (cluster_size
);
813 for (mds_rank_t i
=mds_rank_t(0); i
< mds_rank_t(cluster_size
); i
++) {
814 mds_load_t
& load
= mds_load
.at(i
);
816 metrics
[i
] = {{"auth.meta_load", load
.auth
.meta_load()},
817 {"all.meta_load", load
.all
.meta_load()},
818 {"req_rate", load
.req_rate
},
819 {"queue_len", load
.queue_len
},
820 {"cpu_load_avg", load
.cpu_load_avg
}};
823 /* execute the balancer */
825 int ret
= mantle
.balance(bal_code
, mds
->get_nodeid(), metrics
, state
.targets
);
826 dout(7) << " mantle decided that new targets=" << state
.targets
<< dendl
;
828 /* mantle doesn't know about cluster size, so check target len here */
829 if ((int) state
.targets
.size() != cluster_size
)
834 try_rebalance(state
);
840 void MDBalancer::try_rebalance(balance_state_t
& state
)
842 if (g_conf()->mds_thrash_exports
) {
843 dout(5) << "mds_thrash is on; not performing standard rebalance operation!"
848 // make a sorted list of my imports
849 multimap
<double, CDir
*> import_pop_map
;
850 multimap
<mds_rank_t
, pair
<CDir
*, double> > import_from_map
;
852 for (auto& dir
: mds
->mdcache
->get_fullauth_subtrees()) {
853 CInode
*diri
= dir
->get_inode();
854 if (diri
->is_mdsdir())
856 if (diri
->get_export_pin(false) != MDS_RANK_NONE
)
858 if (dir
->is_freezing() || dir
->is_frozen())
859 continue; // export pbly already in progress
861 mds_rank_t from
= diri
->authority().first
;
862 double pop
= dir
->pop_auth_subtree
.meta_load();
863 if (g_conf()->mds_bal_idle_threshold
> 0 &&
864 pop
< g_conf()->mds_bal_idle_threshold
&&
865 diri
!= mds
->mdcache
->get_root() &&
866 from
!= mds
->get_nodeid()) {
867 dout(5) << " exporting idle (" << pop
<< ") import " << *dir
868 << " back to mds." << from
<< dendl
;
869 mds
->mdcache
->migrator
->export_dir_nicely(dir
, from
);
873 dout(15) << " map: i imported " << *dir
<< " from " << from
<< dendl
;
874 import_pop_map
.insert(make_pair(pop
, dir
));
875 import_from_map
.insert(make_pair(from
, make_pair(dir
, pop
)));
879 map
<mds_rank_t
, double> export_pop_map
;
881 for (auto &it
: state
.targets
) {
882 mds_rank_t target
= it
.first
;
883 double amount
= it
.second
;
885 if (amount
< MIN_OFFLOAD
)
887 if (amount
* 10 * state
.targets
.size() < target_load
)
890 dout(5) << "want to send " << amount
<< " to mds." << target
891 //<< " .. " << (*it).second << " * " << load_fac
893 << dendl
;//" .. fudge is " << fudge << dendl;
895 double& have
= export_pop_map
[target
];
897 mds
->mdcache
->show_subtrees();
899 // search imports from target
900 if (import_from_map
.count(target
)) {
901 dout(7) << " aha, looking through imports from target mds." << target
<< dendl
;
902 for (auto p
= import_from_map
.equal_range(target
);
903 p
.first
!= p
.second
; ) {
904 CDir
*dir
= p
.first
->second
.first
;
905 double pop
= p
.first
->second
.second
;
906 dout(7) << "considering " << *dir
<< " from " << (*p
.first
).first
<< dendl
;
907 auto plast
= p
.first
++;
909 if (dir
->inode
->is_base())
911 ceph_assert(dir
->inode
->authority().first
== target
); // cuz that's how i put it in the map, dummy
913 if (pop
<= amount
-have
) {
914 dout(7) << "reexporting " << *dir
<< " pop " << pop
915 << " back to mds." << target
<< dendl
;
916 mds
->mdcache
->migrator
->export_dir_nicely(dir
, target
);
918 import_from_map
.erase(plast
);
919 for (auto q
= import_pop_map
.equal_range(pop
);
920 q
.first
!= q
.second
; ) {
921 if (q
.first
->second
== dir
) {
922 import_pop_map
.erase(q
.first
);
928 dout(7) << "can't reexport " << *dir
<< ", too big " << pop
<< dendl
;
930 if (amount
-have
< MIN_OFFLOAD
)
937 for (auto &it
: state
.targets
) {
938 mds_rank_t target
= it
.first
;
939 double amount
= it
.second
;
941 if (!export_pop_map
.count(target
))
943 double& have
= export_pop_map
[target
];
944 if (amount
-have
< MIN_OFFLOAD
)
947 for (auto p
= import_pop_map
.begin();
948 p
!= import_pop_map
.end(); ) {
949 CDir
*dir
= p
->second
;
950 if (dir
->inode
->is_base()) {
955 double pop
= p
->first
;
956 if (pop
<= amount
-have
&& pop
> MIN_REEXPORT
) {
957 dout(5) << "reexporting " << *dir
<< " pop " << pop
958 << " to mds." << target
<< dendl
;
960 mds
->mdcache
->migrator
->export_dir_nicely(dir
, target
);
961 import_pop_map
.erase(p
++);
965 if (amount
-have
< MIN_OFFLOAD
)
970 set
<CDir
*> already_exporting
;
972 for (auto &it
: state
.targets
) {
973 mds_rank_t target
= it
.first
;
974 double amount
= it
.second
;
976 if (!export_pop_map
.count(target
))
978 double& have
= export_pop_map
[target
];
979 if (amount
-have
< MIN_OFFLOAD
)
982 // okay, search for fragments of my workload
983 std::vector
<CDir
*> exports
;
985 for (auto p
= import_pop_map
.rbegin();
986 p
!= import_pop_map
.rend();
988 CDir
*dir
= p
->second
;
989 find_exports(dir
, amount
, &exports
, have
, already_exporting
);
990 if (amount
-have
< MIN_OFFLOAD
)
993 //fudge = amount - have;
995 for (const auto& dir
: exports
) {
996 dout(5) << " - exporting " << dir
->pop_auth_subtree
997 << " " << dir
->pop_auth_subtree
.meta_load()
998 << " to mds." << target
<< " " << *dir
<< dendl
;
999 mds
->mdcache
->migrator
->export_dir_nicely(dir
, target
);
1003 dout(7) << "done" << dendl
;
1004 mds
->mdcache
->show_subtrees();
1007 void MDBalancer::find_exports(CDir
*dir
,
1009 std::vector
<CDir
*>* exports
,
1011 set
<CDir
*>& already_exporting
)
1013 auto now
= clock::now();
1014 auto duration
= std::chrono::duration
<double>(now
-rebalance_time
).count();
1015 if (duration
> 0.1) {
1016 derr
<< " balancer runs too long" << dendl_impl
;
1021 ceph_assert(dir
->is_auth());
1023 double need
= amount
- have
;
1024 if (need
< amount
* g_conf()->mds_bal_min_start
)
1025 return; // good enough!
1027 double needmax
= need
* g_conf()->mds_bal_need_max
;
1028 double needmin
= need
* g_conf()->mds_bal_need_min
;
1029 double midchunk
= need
* g_conf()->mds_bal_midchunk
;
1030 double minchunk
= need
* g_conf()->mds_bal_minchunk
;
1032 std::vector
<CDir
*> bigger_rep
, bigger_unrep
;
1033 multimap
<double, CDir
*> smaller
;
1035 double dir_pop
= dir
->pop_auth_subtree
.meta_load();
1036 dout(7) << "in " << dir_pop
<< " " << *dir
<< " need " << need
<< " (" << needmin
<< " - " << needmax
<< ")" << dendl
;
1038 double subdir_sum
= 0;
1039 for (elist
<CInode
*>::iterator it
= dir
->pop_lru_subdirs
.begin_use_current();
1044 ceph_assert(in
->is_dir());
1045 ceph_assert(in
->get_parent_dir() == dir
);
1047 auto&& dfls
= in
->get_nested_dirfrags();
1049 size_t num_idle_frags
= 0;
1050 for (const auto& subdir
: dfls
) {
1051 if (already_exporting
.count(subdir
))
1054 // we know all ancestor dirfrags up to subtree root are not freezing or frozen.
1055 // It's more efficient to use CDir::is_{freezing,frozen}_tree_root()
1056 if (subdir
->is_frozen_dir() || subdir
->is_frozen_tree_root() ||
1057 subdir
->is_freezing_dir() || subdir
->is_freezing_tree_root())
1058 continue; // can't export this right now!
1061 double pop
= subdir
->pop_auth_subtree
.meta_load();
1063 dout(15) << " subdir pop " << pop
<< " " << *subdir
<< dendl
;
1065 if (pop
< minchunk
) {
1071 if (pop
> needmin
&& pop
< needmax
) {
1072 exports
->push_back(subdir
);
1073 already_exporting
.insert(subdir
);
1079 if (subdir
->is_rep())
1080 bigger_rep
.push_back(subdir
);
1082 bigger_unrep
.push_back(subdir
);
1084 smaller
.insert(pair
<double,CDir
*>(pop
, subdir
));
1086 if (dfls
.size() == num_idle_frags
)
1087 in
->item_pop_lru
.remove_myself();
1089 dout(15) << " sum " << subdir_sum
<< " / " << dir_pop
<< dendl
;
1091 // grab some sufficiently big small items
1092 multimap
<double,CDir
*>::reverse_iterator it
;
1093 for (it
= smaller
.rbegin();
1094 it
!= smaller
.rend();
1097 if ((*it
).first
< midchunk
)
1100 dout(7) << " taking smaller " << *(*it
).second
<< dendl
;
1102 exports
->push_back((*it
).second
);
1103 already_exporting
.insert((*it
).second
);
1104 have
+= (*it
).first
;
1109 // apprently not enough; drill deeper into the hierarchy (if non-replicated)
1110 for (const auto& dir
: bigger_unrep
) {
1111 dout(15) << " descending into " << *dir
<< dendl
;
1112 find_exports(dir
, amount
, exports
, have
, already_exporting
);
1117 // ok fine, use smaller bits
1119 it
!= smaller
.rend();
1121 dout(7) << " taking (much) smaller " << it
->first
<< " " << *(*it
).second
<< dendl
;
1123 exports
->push_back((*it
).second
);
1124 already_exporting
.insert((*it
).second
);
1125 have
+= (*it
).first
;
1130 // ok fine, drill into replicated dirs
1131 for (const auto& dir
: bigger_rep
) {
1132 dout(7) << " descending into replicated " << *dir
<< dendl
;
1133 find_exports(dir
, amount
, exports
, have
, already_exporting
);
1139 void MDBalancer::hit_inode(CInode
*in
, int type
, int who
)
1142 in
->pop
.get(type
).hit();
1144 if (in
->get_parent_dn())
1145 hit_dir(in
->get_parent_dn()->get_dir(), type
, who
);
1148 void MDBalancer::maybe_fragment(CDir
*dir
, bool hot
)
1151 if (bal_fragment_dirs
&& bal_fragment_interval
> 0 &&
1153 !dir
->inode
->is_base() && // not root/mdsdir (for now at least)
1154 !dir
->inode
->is_stray()) { // not straydir
1157 if (g_conf()->mds_bal_split_size
> 0 && (dir
->should_split() || hot
)) {
1158 if (split_pending
.count(dir
->dirfrag()) == 0) {
1159 queue_split(dir
, false);
1161 if (dir
->should_split_fast()) {
1162 queue_split(dir
, true);
1164 dout(10) << ": fragment already enqueued to split: "
1171 if (dir
->get_frag() != frag_t() && dir
->should_merge() &&
1172 merge_pending
.count(dir
->dirfrag()) == 0) {
1178 void MDBalancer::hit_dir(CDir
*dir
, int type
, int who
, double amount
)
1181 double v
= dir
->pop_me
.get(type
).hit(amount
);
1183 const bool hot
= (v
> g_conf()->mds_bal_split_rd
&& type
== META_POP_IRD
) ||
1184 (v
> g_conf()->mds_bal_split_wr
&& type
== META_POP_IWR
);
1186 dout(20) << type
<< " pop is " << v
<< ", frag " << dir
->get_frag()
1187 << " size " << dir
->get_frag_size() << " " << dir
->pop_me
<< dendl
;
1189 maybe_fragment(dir
, hot
);
1192 if (type
== META_POP_IRD
&& who
>= 0) {
1193 dir
->pop_spread
.hit(who
);
1196 double rd_adj
= 0.0;
1197 if (type
== META_POP_IRD
&&
1198 dir
->last_popularity_sample
< last_sample
) {
1199 double dir_pop
= dir
->pop_auth_subtree
.get(type
).get(); // hmm??
1200 dir
->last_popularity_sample
= last_sample
;
1201 double pop_sp
= dir
->pop_spread
.get();
1202 dir_pop
+= pop_sp
* 10;
1204 //if (dir->ino() == inodeno_t(0x10000000002))
1206 dout(20) << type
<< " pop " << dir_pop
<< " spread " << pop_sp
1207 << " " << dir
->pop_spread
.last
[0]
1208 << " " << dir
->pop_spread
.last
[1]
1209 << " " << dir
->pop_spread
.last
[2]
1210 << " " << dir
->pop_spread
.last
[3]
1211 << " in " << *dir
<< dendl
;
1214 if (dir
->is_auth() && !dir
->is_ambiguous_auth()) {
1215 if (!dir
->is_rep() &&
1216 dir_pop
>= g_conf()->mds_bal_replicate_threshold
) {
1218 double rdp
= dir
->pop_me
.get(META_POP_IRD
).get();
1219 rd_adj
= rdp
/ mds
->get_mds_map()->get_num_in_mds() - rdp
;
1220 rd_adj
/= 2.0; // temper somewhat
1222 dout(5) << "replicating dir " << *dir
<< " pop " << dir_pop
<< " .. rdp " << rdp
<< " adj " << rd_adj
<< dendl
;
1224 dir
->dir_rep
= CDir::REP_ALL
;
1225 mds
->mdcache
->send_dir_updates(dir
, true);
1227 // fixme this should adjust the whole pop hierarchy
1228 dir
->pop_me
.get(META_POP_IRD
).adjust(rd_adj
);
1229 dir
->pop_auth_subtree
.get(META_POP_IRD
).adjust(rd_adj
);
1232 if (dir
->ino() != 1 &&
1234 dir_pop
< g_conf()->mds_bal_unreplicate_threshold
) {
1236 dout(5) << "unreplicating dir " << *dir
<< " pop " << dir_pop
<< dendl
;
1238 dir
->dir_rep
= CDir::REP_NONE
;
1239 mds
->mdcache
->send_dir_updates(dir
);
1245 bool hit_subtree
= dir
->is_auth(); // current auth subtree (if any)
1246 bool hit_subtree_nested
= dir
->is_auth(); // all nested auth subtrees
1249 CDir
*pdir
= dir
->inode
->get_parent_dir();
1250 dir
->pop_nested
.get(type
).hit(amount
);
1252 dir
->pop_nested
.get(META_POP_IRD
).adjust(rd_adj
);
1255 dir
->pop_auth_subtree
.get(type
).hit(amount
);
1258 dir
->pop_auth_subtree
.get(META_POP_IRD
).adjust(rd_adj
);
1260 if (dir
->is_subtree_root())
1261 hit_subtree
= false; // end of auth domain, stop hitting auth counters.
1263 pdir
->pop_lru_subdirs
.push_front(&dir
->get_inode()->item_pop_lru
);
1266 if (hit_subtree_nested
) {
1267 dir
->pop_auth_subtree_nested
.get(type
).hit(amount
);
1269 dir
->pop_auth_subtree_nested
.get(META_POP_IRD
).adjust(rd_adj
);
1278 * subtract off an exported chunk.
1279 * this excludes *dir itself (encode_export_dir should have take care of that)
1280 * we _just_ do the parents' nested counters.
1282 * NOTE: call me _after_ forcing *dir into a subtree root,
1283 * but _before_ doing the encode_export_dirs.
1285 void MDBalancer::subtract_export(CDir
*dir
)
1287 dirfrag_load_vec_t subload
= dir
->pop_auth_subtree
;
1290 dir
= dir
->inode
->get_parent_dir();
1293 dir
->pop_nested
.sub(subload
);
1294 dir
->pop_auth_subtree_nested
.sub(subload
);
1299 void MDBalancer::add_import(CDir
*dir
)
1301 dirfrag_load_vec_t subload
= dir
->pop_auth_subtree
;
1304 dir
= dir
->inode
->get_parent_dir();
1307 dir
->pop_nested
.add(subload
);
1308 dir
->pop_auth_subtree_nested
.add(subload
);
1312 void MDBalancer::adjust_pop_for_rename(CDir
*pdir
, CDir
*dir
, bool inc
)
1314 bool adjust_subtree_nest
= dir
->is_auth();
1315 bool adjust_subtree
= adjust_subtree_nest
&& !dir
->is_subtree_root();
1319 pdir
->pop_nested
.add(dir
->pop_nested
);
1320 if (adjust_subtree
) {
1321 pdir
->pop_auth_subtree
.add(dir
->pop_auth_subtree
);
1322 pdir
->pop_lru_subdirs
.push_front(&cur
->get_inode()->item_pop_lru
);
1325 if (adjust_subtree_nest
)
1326 pdir
->pop_auth_subtree_nested
.add(dir
->pop_auth_subtree_nested
);
1328 pdir
->pop_nested
.sub(dir
->pop_nested
);
1330 pdir
->pop_auth_subtree
.sub(dir
->pop_auth_subtree
);
1332 if (adjust_subtree_nest
)
1333 pdir
->pop_auth_subtree_nested
.sub(dir
->pop_auth_subtree_nested
);
1336 if (pdir
->is_subtree_root())
1337 adjust_subtree
= false;
1339 pdir
= pdir
->inode
->get_parent_dir();
1344 void MDBalancer::handle_mds_failure(mds_rank_t who
)
1347 mds_last_epoch_under_map
.clear();
1351 int MDBalancer::dump_loads(Formatter
*f
) const
1353 std::deque
<CDir
*> dfs
;
1354 if (mds
->mdcache
->get_root()) {
1355 mds
->mdcache
->get_root()->get_dirfrags(dfs
);
1357 dout(10) << "no root" << dendl
;
1360 f
->open_object_section("loads");
1362 f
->open_array_section("dirfrags");
1363 while (!dfs
.empty()) {
1364 CDir
*dir
= dfs
.front();
1367 f
->open_object_section("dir");
1371 for (auto it
= dir
->begin(); it
!= dir
->end(); ++it
) {
1372 CInode
*in
= it
->second
->get_linkage()->get_inode();
1373 if (!in
|| !in
->is_dir())
1376 auto&& ls
= in
->get_dirfrags();
1377 for (const auto& subdir
: ls
) {
1378 if (subdir
->pop_nested
.meta_load() < .001)
1380 dfs
.push_back(subdir
);
1384 f
->close_section(); // dirfrags array
1386 f
->open_object_section("mds_load");
1389 auto dump_mds_load
= [f
](mds_load_t
& load
) {
1390 f
->dump_float("request_rate", load
.req_rate
);
1391 f
->dump_float("cache_hit_rate", load
.cache_hit_rate
);
1392 f
->dump_float("queue_length", load
.queue_len
);
1393 f
->dump_float("cpu_load", load
.cpu_load_avg
);
1394 f
->dump_float("mds_load", load
.mds_load());
1396 f
->open_object_section("auth_dirfrags");
1399 f
->open_object_section("all_dirfrags");
1404 for (auto p
: mds_load
) {
1406 name
<< "mds." << p
.first
;
1407 f
->open_object_section(name
.str().c_str());
1408 dump_mds_load(p
.second
);
1412 f
->close_section(); // mds_load
1414 f
->open_object_section("mds_meta_load");
1415 for (auto p
: mds_meta_load
) {
1417 name
<< "mds." << p
.first
;
1418 f
->dump_float(name
.str().c_str(), p
.second
);
1420 f
->close_section(); // mds_meta_load
1422 f
->open_object_section("mds_import_map");
1423 for (auto p
: mds_import_map
) {
1425 name1
<< "mds." << p
.first
;
1426 f
->open_array_section(name1
.str().c_str());
1427 for (auto q
: p
.second
) {
1428 f
->open_object_section("from");
1430 name2
<< "mds." << q
.first
;
1431 f
->dump_float(name2
.str().c_str(), q
.second
);
1434 f
->close_section(); // mds.? array
1436 f
->close_section(); // mds_import_map
1438 f
->close_section(); // loads