]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/MDBalancer.cc
update sources to v12.2.5
[ceph.git] / ceph / src / mds / MDBalancer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "include/compat.h"
16#include "mdstypes.h"
17
18#include "MDBalancer.h"
19#include "MDSRank.h"
20#include "mon/MonClient.h"
21#include "MDSMap.h"
22#include "CInode.h"
23#include "CDir.h"
24#include "MDCache.h"
25#include "Migrator.h"
26#include "Mantle.h"
27
28#include "include/Context.h"
29#include "msg/Messenger.h"
30#include "messages/MHeartbeat.h"
31
32#include <fstream>
33#include <iostream>
34#include <vector>
35#include <map>
36using std::map;
37using std::vector;
38
39#include "common/config.h"
40#include "common/errno.h"
41
42#define dout_context g_ceph_context
43#define dout_subsys ceph_subsys_mds
44#undef dout_prefix
45#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal "
46#undef dout
47#define dout(lvl) \
48 do {\
49 auto subsys = ceph_subsys_mds;\
50 if ((dout_context)->_conf->subsys.should_gather(ceph_subsys_mds_balancer, lvl)) {\
51 subsys = ceph_subsys_mds_balancer;\
52 }\
53 dout_impl(dout_context, subsys, lvl) dout_prefix
54#undef dendl
55#define dendl dendl_impl; } while (0)
56
57
58#define MIN_LOAD 50 // ??
59#define MIN_REEXPORT 5 // will automatically reexport
60#define MIN_OFFLOAD 10 // point at which i stop trying, close enough
61
62
63/* This function DOES put the passed message before returning */
64int MDBalancer::proc_message(Message *m)
65{
66 switch (m->get_type()) {
67
68 case MSG_MDS_HEARTBEAT:
69 handle_heartbeat(static_cast<MHeartbeat*>(m));
70 break;
71
72 default:
b32b8144 73 derr << " balancer unknown message " << m->get_type() << dendl_impl;
7c673cae
FG
74 assert(0 == "balancer unknown message");
75 }
76
77 return 0;
78}
79
80void MDBalancer::handle_export_pins(void)
81{
82 auto &q = mds->mdcache->export_pin_queue;
83 auto it = q.begin();
84 dout(20) << "export_pin_queue size=" << q.size() << dendl;
85 while (it != q.end()) {
31f18b77
FG
86 auto cur = it++;
87 CInode *in = *cur;
7c673cae 88 assert(in->is_dir());
31f18b77
FG
89 mds_rank_t export_pin = in->get_export_pin(false);
90
91 bool remove = true;
92 list<CDir*> dfls;
93 in->get_dirfrags(dfls);
94 for (auto dir : dfls) {
95 if (!dir->is_auth())
96 continue;
97
98 if (export_pin == MDS_RANK_NONE) {
99 if (dir->state_test(CDir::STATE_AUXSUBTREE)) {
100 if (dir->is_frozen() || dir->is_freezing()) {
101 // try again later
102 remove = false;
103 continue;
104 }
105 dout(10) << " clear auxsubtree on " << *dir << dendl;
106 dir->state_clear(CDir::STATE_AUXSUBTREE);
107 mds->mdcache->try_subtree_merge(dir);
108 }
109 } else if (export_pin == mds->get_nodeid()) {
110 if (dir->state_test(CDir::STATE_CREATING) ||
111 dir->is_frozen() || dir->is_freezing()) {
112 // try again later
113 remove = false;
114 continue;
115 }
116 if (!dir->is_subtree_root()) {
117 dir->state_set(CDir::STATE_AUXSUBTREE);
118 mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
119 dout(10) << " create aux subtree on " << *dir << dendl;
120 } else if (!dir->state_test(CDir::STATE_AUXSUBTREE)) {
121 dout(10) << " set auxsubtree bit on " << *dir << dendl;
122 dir->state_set(CDir::STATE_AUXSUBTREE);
123 }
124 } else {
125 mds->mdcache->migrator->export_dir(dir, export_pin);
126 remove = false;
7c673cae
FG
127 }
128 }
31f18b77
FG
129
130 if (remove) {
131 in->state_clear(CInode::STATE_QUEUEDEXPORTPIN);
132 q.erase(cur);
7c673cae
FG
133 }
134 }
135
136 set<CDir *> authsubs;
137 mds->mdcache->get_auth_subtrees(authsubs);
138 for (auto &cd : authsubs) {
139 mds_rank_t export_pin = cd->inode->get_export_pin();
140 dout(10) << "auth tree " << *cd << " export_pin=" << export_pin << dendl;
141 if (export_pin >= 0 && export_pin != mds->get_nodeid()) {
142 dout(10) << "exporting auth subtree " << *cd->inode << " to " << export_pin << dendl;
143 mds->mdcache->migrator->export_dir(cd, export_pin);
144 }
145 }
146}
147
148void MDBalancer::tick()
149{
150 static int num_bal_times = g_conf->mds_bal_max;
151 static utime_t first = ceph_clock_now();
152 utime_t now = ceph_clock_now();
153 utime_t elapsed = now;
154 elapsed -= first;
155
156 if (g_conf->mds_bal_export_pin) {
157 handle_export_pins();
158 }
159
160 // sample?
161 if ((double)now - (double)last_sample > g_conf->mds_bal_sample_interval) {
162 dout(15) << "tick last_sample now " << now << dendl;
163 last_sample = now;
164 }
165
166 // balance?
167 if (last_heartbeat == utime_t())
168 last_heartbeat = now;
169 if (mds->get_nodeid() == 0 &&
170 g_conf->mds_bal_interval > 0 &&
171 (num_bal_times ||
172 (g_conf->mds_bal_max_until >= 0 &&
173 elapsed.sec() > g_conf->mds_bal_max_until)) &&
174 mds->is_active() &&
175 now.sec() - last_heartbeat.sec() >= g_conf->mds_bal_interval) {
176 last_heartbeat = now;
177 send_heartbeat();
178 num_bal_times--;
179 }
180}
181
182
183
184
185class C_Bal_SendHeartbeat : public MDSInternalContext {
186public:
187 explicit C_Bal_SendHeartbeat(MDSRank *mds_) : MDSInternalContext(mds_) { }
188 void finish(int f) override {
189 mds->balancer->send_heartbeat();
190 }
191};
192
193
194double mds_load_t::mds_load()
195{
196 switch(g_conf->mds_bal_mode) {
197 case 0:
198 return
199 .8 * auth.meta_load() +
200 .2 * all.meta_load() +
201 req_rate +
202 10.0 * queue_len;
203
204 case 1:
205 return req_rate + 10.0*queue_len;
206
207 case 2:
208 return cpu_load_avg;
209
210 }
211 ceph_abort();
212 return 0;
213}
214
215mds_load_t MDBalancer::get_load(utime_t now)
216{
217 mds_load_t load(now);
218
219 if (mds->mdcache->get_root()) {
220 list<CDir*> ls;
221 mds->mdcache->get_root()->get_dirfrags(ls);
222 for (list<CDir*>::iterator p = ls.begin();
223 p != ls.end();
224 ++p) {
225 load.auth.add(now, mds->mdcache->decayrate, (*p)->pop_auth_subtree_nested);
226 load.all.add(now, mds->mdcache->decayrate, (*p)->pop_nested);
227 }
228 } else {
229 dout(20) << "get_load no root, no load" << dendl;
230 }
231
232 load.req_rate = mds->get_req_rate();
233 load.queue_len = messenger->get_dispatch_queue_len();
234
235 ifstream cpu(PROCPREFIX "/proc/loadavg");
236 if (cpu.is_open())
237 cpu >> load.cpu_load_avg;
238 else
b32b8144 239 derr << "input file " PROCPREFIX "'/proc/loadavg' not found" << dendl_impl;
7c673cae
FG
240
241 dout(15) << "get_load " << load << dendl;
242 return load;
243}
244
245/*
246 * Read synchronously from RADOS using a timeout. We cannot do daemon-local
247 * fallbacks (i.e. kick off async read when we are processing the map and
248 * check status when we get here) with the way the mds is structured.
249 */
250int MDBalancer::localize_balancer()
251{
252 /* reset everything */
253 bool ack = false;
254 int r = 0;
255 bufferlist lua_src;
256 Mutex lock("lock");
257 Cond cond;
258
259 /* we assume that balancer is in the metadata pool */
260 object_t oid = object_t(mds->mdsmap->get_balancer());
261 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
262 ceph_tid_t tid = mds->objecter->read(oid, oloc, 0, 0, CEPH_NOSNAP, &lua_src, 0,
263 new C_SafeCond(&lock, &cond, &ack, &r));
264 dout(15) << "launched non-blocking read tid=" << tid
265 << " oid=" << oid << " oloc=" << oloc << dendl;
266
267 /* timeout: if we waste half our time waiting for RADOS, then abort! */
268 double t = ceph_clock_now() + g_conf->mds_bal_interval/2;
269 utime_t timeout;
270 timeout.set_from_double(t);
271 lock.Lock();
272 int ret_t = cond.WaitUntil(lock, timeout);
273 lock.Unlock();
274
275 /* success: store the balancer in memory and set the version. */
276 if (!r) {
277 if (ret_t == ETIMEDOUT) {
278 mds->objecter->op_cancel(tid, -ECANCELED);
279 return -ETIMEDOUT;
280 }
281 bal_code.assign(lua_src.to_str());
282 bal_version.assign(oid.name);
b32b8144 283 dout(10) << "localized balancer, bal_code=" << bal_code << dendl;
7c673cae
FG
284 }
285 return r;
286}
287
288void MDBalancer::send_heartbeat()
289{
290 utime_t now = ceph_clock_now();
291
292 if (mds->is_cluster_degraded()) {
293 dout(10) << "send_heartbeat degraded" << dendl;
294 return;
295 }
296
297 if (!mds->mdcache->is_open()) {
298 dout(5) << "not open" << dendl;
299 mds->mdcache->wait_for_open(new C_Bal_SendHeartbeat(mds));
300 return;
301 }
302
94b18763 303 if (mds->get_nodeid() == 0) {
7c673cae 304 beat_epoch++;
94b18763
FG
305
306 mds_load.clear();
307 }
7c673cae
FG
308
309 // my load
310 mds_load_t load = get_load(now);
311 map<mds_rank_t, mds_load_t>::value_type val(mds->get_nodeid(), load);
312 mds_load.insert(val);
313
314 // import_map -- how much do i import from whom
315 map<mds_rank_t, float> import_map;
316 set<CDir*> authsubs;
317 mds->mdcache->get_auth_subtrees(authsubs);
318 for (set<CDir*>::iterator it = authsubs.begin();
319 it != authsubs.end();
320 ++it) {
321 CDir *im = *it;
322 mds_rank_t from = im->inode->authority().first;
323 if (from == mds->get_nodeid()) continue;
324 if (im->get_inode()->is_stray()) continue;
325 import_map[from] += im->pop_auth_subtree.meta_load(now, mds->mdcache->decayrate);
326 }
327 mds_import_map[ mds->get_nodeid() ] = import_map;
328
329
330 dout(5) << "mds." << mds->get_nodeid() << " epoch " << beat_epoch << " load " << load << dendl;
331 for (map<mds_rank_t, float>::iterator it = import_map.begin();
332 it != import_map.end();
333 ++it) {
334 dout(5) << " import_map from " << it->first << " -> " << it->second << dendl;
335 }
336
337
338 set<mds_rank_t> up;
339 mds->get_mds_map()->get_up_mds_set(up);
340 for (set<mds_rank_t>::iterator p = up.begin(); p != up.end(); ++p) {
341 if (*p == mds->get_nodeid())
342 continue;
343 MHeartbeat *hb = new MHeartbeat(load, beat_epoch);
344 hb->get_import_map() = import_map;
345 messenger->send_message(hb,
346 mds->mdsmap->get_inst(*p));
347 }
348}
349
350/* This function DOES put the passed message before returning */
351void MDBalancer::handle_heartbeat(MHeartbeat *m)
352{
353 typedef map<mds_rank_t, mds_load_t> mds_load_map_t;
354
355 mds_rank_t who = mds_rank_t(m->get_source().num());
356 dout(25) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source().num() << " " << m->get_load() << dendl;
357
358 if (!mds->is_active())
359 goto out;
360
361 if (!mds->mdcache->is_open()) {
362 dout(10) << "opening root on handle_heartbeat" << dendl;
363 mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m));
364 return;
365 }
366
367 if (mds->is_cluster_degraded()) {
368 dout(10) << " degraded, ignoring" << dendl;
369 goto out;
370 }
371
94b18763
FG
372 if (mds->get_nodeid() != 0 && m->get_beat() > beat_epoch) {
373 dout(10) << "receive next epoch " << m->get_beat() << " from mds." << who << " before mds0" << dendl;
374
375 beat_epoch = m->get_beat();
376 // clear the mds load info whose epoch is less than beat_epoch
377 mds_load.clear();
378 }
379
7c673cae 380 if (who == 0) {
94b18763
FG
381 dout(20) << " from mds0, new epoch " << m->get_beat() << dendl;
382 if (beat_epoch != m->get_beat()) {
383 mds_load.clear();
384 }
7c673cae
FG
385 beat_epoch = m->get_beat();
386 send_heartbeat();
387
388 mds->mdcache->show_subtrees();
389 }
390
391 {
392 // set mds_load[who]
393 mds_load_map_t::value_type val(who, m->get_load());
394 pair < mds_load_map_t::iterator, bool > rval (mds_load.insert(val));
395 if (!rval.second) {
396 rval.first->second = val.second;
397 }
398 }
399 mds_import_map[ who ] = m->get_import_map();
400
7c673cae
FG
401 {
402 unsigned cluster_size = mds->get_mds_map()->get_num_in_mds();
403 if (mds_load.size() == cluster_size) {
404 // let's go!
405 //export_empties(); // no!
406
407 /* avoid spamming ceph -w if user does not turn mantle on */
408 if (mds->mdsmap->get_balancer() != "") {
409 int r = mantle_prep_rebalance();
31f18b77 410 if (!r) goto out;
7c673cae
FG
411 mds->clog->warn() << "using old balancer; mantle failed for "
412 << "balancer=" << mds->mdsmap->get_balancer()
413 << " : " << cpp_strerror(r);
414 }
415 prep_rebalance(m->get_beat());
416 }
417 }
418
419 // done
420 out:
421 m->put();
422}
423
424
425void MDBalancer::export_empties()
426{
427 dout(5) << "export_empties checking for empty imports" << dendl;
428
429 std::set<CDir *> subtrees;
430 mds->mdcache->get_fullauth_subtrees(subtrees);
431 for (auto &dir : subtrees) {
432 if (dir->is_freezing() || dir->is_frozen())
433 continue;
434
435 if (!dir->inode->is_base() &&
436 !dir->inode->is_stray() &&
437 dir->get_num_head_items() == 0)
438 mds->mdcache->migrator->export_empty_import(dir);
439 }
440}
441
442
443
444double MDBalancer::try_match(balance_state_t& state, mds_rank_t ex, double& maxex,
445 mds_rank_t im, double& maxim)
446{
447 if (maxex <= 0 || maxim <= 0) return 0.0;
448
449 double howmuch = MIN(maxex, maxim);
450 if (howmuch <= 0) return 0.0;
451
452 dout(5) << " - mds." << ex << " exports " << howmuch << " to mds." << im << dendl;
453
454 if (ex == mds->get_nodeid())
455 state.targets[im] += howmuch;
456
457 state.exported[ex] += howmuch;
458 state.imported[im] += howmuch;
459
460 maxex -= howmuch;
461 maxim -= howmuch;
462
463 return howmuch;
464}
465
466void MDBalancer::queue_split(const CDir *dir, bool fast)
467{
468 dout(10) << __func__ << " enqueuing " << *dir
469 << " (fast=" << fast << ")" << dendl;
470
471 assert(mds->mdsmap->allows_dirfrags());
472 const dirfrag_t frag = dir->dirfrag();
473
474 auto callback = [this, frag](int r) {
475 if (split_pending.erase(frag) == 0) {
476 // Someone beat me to it. This can happen in the fast splitting
477 // path, because we spawn two contexts, one with mds->timer and
478 // one with mds->queue_waiter. The loser can safely just drop
479 // out.
480 return;
481 }
482
483 CDir *split_dir = mds->mdcache->get_dirfrag(frag);
484 if (!split_dir) {
485 dout(10) << "drop split on " << frag << " because not in cache" << dendl;
486 return;
487 }
488 if (!split_dir->is_auth()) {
489 dout(10) << "drop split on " << frag << " because non-auth" << dendl;
490 return;
491 }
492
493 // Pass on to MDCache: note that the split might still not
494 // happen if the checks in MDCache::can_fragment fail.
495 dout(10) << __func__ << " splitting " << *split_dir << dendl;
496 mds->mdcache->split_dir(split_dir, g_conf->mds_bal_split_bits);
497 };
498
499 bool is_new = false;
500 if (split_pending.count(frag) == 0) {
501 split_pending.insert(frag);
502 is_new = true;
503 }
504
505 if (fast) {
506 // Do the split ASAP: enqueue it in the MDSRank waiters which are
507 // run at the end of dispatching the current request
508 mds->queue_waiter(new MDSInternalContextWrapper(mds,
509 new FunctionContext(callback)));
510 } else if (is_new) {
511 // Set a timer to really do the split: we don't do it immediately
512 // so that bursts of ops on a directory have a chance to go through
513 // before we freeze it.
514 mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
515 new FunctionContext(callback));
516 }
517}
518
519void MDBalancer::queue_merge(CDir *dir)
520{
521 const auto frag = dir->dirfrag();
522 auto callback = [this, frag](int r) {
523 assert(frag.frag != frag_t());
524
525 // frag must be in this set because only one context is in flight
526 // for a given frag at a time (because merge_pending is checked before
527 // starting one), and this context is the only one that erases it.
528 merge_pending.erase(frag);
529
530 CDir *dir = mds->mdcache->get_dirfrag(frag);
531 if (!dir) {
532 dout(10) << "drop merge on " << frag << " because not in cache" << dendl;
533 return;
534 }
535 assert(dir->dirfrag() == frag);
536
537 if(!dir->is_auth()) {
538 dout(10) << "drop merge on " << *dir << " because lost auth" << dendl;
539 return;
540 }
541
542 dout(10) << "merging " << *dir << dendl;
543
544 CInode *diri = dir->get_inode();
545
546 frag_t fg = dir->get_frag();
547 while (fg != frag_t()) {
548 frag_t sibfg = fg.get_sibling();
549 list<CDir*> sibs;
550 bool complete = diri->get_dirfrags_under(sibfg, sibs);
551 if (!complete) {
552 dout(10) << " not all sibs under " << sibfg << " in cache (have " << sibs << ")" << dendl;
553 break;
554 }
555 bool all = true;
556 for (list<CDir*>::iterator p = sibs.begin(); p != sibs.end(); ++p) {
557 CDir *sib = *p;
558 if (!sib->is_auth() || !sib->should_merge()) {
559 all = false;
560 break;
561 }
562 }
563 if (!all) {
564 dout(10) << " not all sibs under " << sibfg << " " << sibs << " should_merge" << dendl;
565 break;
566 }
567 dout(10) << " all sibs under " << sibfg << " " << sibs << " should merge" << dendl;
568 fg = fg.parent();
569 }
570
571 if (fg != dir->get_frag())
572 mds->mdcache->merge_dir(diri, fg);
573 };
574
575 if (merge_pending.count(frag) == 0) {
576 dout(20) << __func__ << " enqueued dir " << *dir << dendl;
577 merge_pending.insert(frag);
578 mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
579 new FunctionContext(callback));
580 } else {
581 dout(20) << __func__ << " dir already in queue " << *dir << dendl;
582 }
583}
584
585void MDBalancer::prep_rebalance(int beat)
586{
587 balance_state_t state;
588
589 if (g_conf->mds_thrash_exports) {
590 //we're going to randomly export to all the mds in the cluster
591 set<mds_rank_t> up_mds;
592 mds->get_mds_map()->get_up_mds_set(up_mds);
593 for (const auto &rank : up_mds) {
594 state.targets[rank] = 0.0;
595 }
596 } else {
597 int cluster_size = mds->get_mds_map()->get_num_in_mds();
598 mds_rank_t whoami = mds->get_nodeid();
599 rebalance_time = ceph_clock_now();
600
601 dout(5) << " prep_rebalance: cluster loads are" << dendl;
602
603 mds->mdcache->migrator->clear_export_queue();
604
605 // rescale! turn my mds_load back into meta_load units
606 double load_fac = 1.0;
607 map<mds_rank_t, mds_load_t>::iterator m = mds_load.find(whoami);
608 if ((m != mds_load.end()) && (m->second.mds_load() > 0)) {
609 double metald = m->second.auth.meta_load(rebalance_time, mds->mdcache->decayrate);
610 double mdsld = m->second.mds_load();
611 load_fac = metald / mdsld;
612 dout(7) << " load_fac is " << load_fac
613 << " <- " << m->second.auth << " " << metald
614 << " / " << mdsld
615 << dendl;
616 }
617
618 double total_load = 0.0;
619 multimap<double,mds_rank_t> load_map;
620 for (mds_rank_t i=mds_rank_t(0); i < mds_rank_t(cluster_size); i++) {
621 map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now()));
622 std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
623 mds_load_t &load(r.first->second);
624
625 double l = load.mds_load() * load_fac;
626 mds_meta_load[i] = l;
627
628 if (whoami == 0)
b32b8144 629 dout(5) << " mds." << i
7c673cae
FG
630 << " " << load
631 << " = " << load.mds_load()
632 << " ~ " << l << dendl;
633
634 if (whoami == i) my_load = l;
635 total_load += l;
636
637 load_map.insert(pair<double,mds_rank_t>( l, i ));
638 }
639
640 // target load
641 target_load = total_load / (double)cluster_size;
642 dout(5) << "prep_rebalance: my load " << my_load
643 << " target " << target_load
644 << " total " << total_load
645 << dendl;
646
647 // under or over?
648 if (my_load < target_load * (1.0 + g_conf->mds_bal_min_rebalance)) {
649 dout(5) << " i am underloaded or barely overloaded, doing nothing." << dendl;
650 last_epoch_under = beat_epoch;
651 mds->mdcache->show_subtrees();
652 return;
653 }
654
7c673cae
FG
655 // am i over long enough?
656 if (last_epoch_under && beat_epoch - last_epoch_under < 2) {
657 dout(5) << " i am overloaded, but only for " << (beat_epoch - last_epoch_under) << " epochs" << dendl;
658 return;
659 }
660
661 dout(5) << " i am sufficiently overloaded" << dendl;
662
663
664 // first separate exporters and importers
665 multimap<double,mds_rank_t> importers;
666 multimap<double,mds_rank_t> exporters;
667 set<mds_rank_t> importer_set;
668 set<mds_rank_t> exporter_set;
669
670 for (multimap<double,mds_rank_t>::iterator it = load_map.begin();
671 it != load_map.end();
672 ++it) {
673 if (it->first < target_load) {
674 dout(15) << " mds." << it->second << " is importer" << dendl;
675 importers.insert(pair<double,mds_rank_t>(it->first,it->second));
676 importer_set.insert(it->second);
677 } else {
678 dout(15) << " mds." << it->second << " is exporter" << dendl;
679 exporters.insert(pair<double,mds_rank_t>(it->first,it->second));
680 exporter_set.insert(it->second);
681 }
682 }
683
684
685 // determine load transfer mapping
686
687 if (true) {
688 // analyze import_map; do any matches i can
689
690 dout(15) << " matching exporters to import sources" << dendl;
691
692 // big -> small exporters
693 for (multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
694 ex != exporters.rend();
695 ++ex) {
696 double maxex = get_maxex(state, ex->second);
697 if (maxex <= .001) continue;
698
699 // check importers. for now, just in arbitrary order (no intelligent matching).
700 for (map<mds_rank_t, float>::iterator im = mds_import_map[ex->second].begin();
701 im != mds_import_map[ex->second].end();
702 ++im) {
703 double maxim = get_maxim(state, im->first);
704 if (maxim <= .001) continue;
705 try_match(state, ex->second, maxex, im->first, maxim);
706 if (maxex <= .001) break;
707 }
708 }
709 }
710
711 // old way
712 if (beat % 2 == 1) {
713 dout(15) << " matching big exporters to big importers" << dendl;
714 // big exporters to big importers
715 multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
716 multimap<double,mds_rank_t>::iterator im = importers.begin();
717 while (ex != exporters.rend() &&
718 im != importers.end()) {
719 double maxex = get_maxex(state, ex->second);
720 double maxim = get_maxim(state, im->second);
721 if (maxex < .001 || maxim < .001) break;
722 try_match(state, ex->second, maxex, im->second, maxim);
723 if (maxex <= .001) ++ex;
724 if (maxim <= .001) ++im;
725 }
726 } else { // new way
727 dout(15) << " matching small exporters to big importers" << dendl;
728 // small exporters to big importers
729 multimap<double,mds_rank_t>::iterator ex = exporters.begin();
730 multimap<double,mds_rank_t>::iterator im = importers.begin();
731 while (ex != exporters.end() &&
732 im != importers.end()) {
733 double maxex = get_maxex(state, ex->second);
734 double maxim = get_maxim(state, im->second);
735 if (maxex < .001 || maxim < .001) break;
736 try_match(state, ex->second, maxex, im->second, maxim);
737 if (maxex <= .001) ++ex;
738 if (maxim <= .001) ++im;
739 }
740 }
741 }
742 try_rebalance(state);
743}
744
7c673cae
FG
745int MDBalancer::mantle_prep_rebalance()
746{
747 balance_state_t state;
748
749 /* refresh balancer if it has changed */
750 if (bal_version != mds->mdsmap->get_balancer()) {
751 bal_version.assign("");
752 int r = localize_balancer();
753 if (r) return r;
754
755 /* only spam the cluster log from 1 mds on version changes */
756 if (mds->get_nodeid() == 0)
757 mds->clog->info() << "mantle balancer version changed: " << bal_version;
758 }
759
760 /* prepare for balancing */
761 int cluster_size = mds->get_mds_map()->get_num_in_mds();
762 rebalance_time = ceph_clock_now();
763 mds->mdcache->migrator->clear_export_queue();
764
765 /* fill in the metrics for each mds by grabbing load struct */
766 vector < map<string, double> > metrics (cluster_size);
767 for (mds_rank_t i=mds_rank_t(0);
768 i < mds_rank_t(cluster_size);
769 i++) {
770 map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now()));
771 std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
772 mds_load_t &load(r.first->second);
773
774 metrics[i] = {{"auth.meta_load", load.auth.meta_load()},
775 {"all.meta_load", load.all.meta_load()},
776 {"req_rate", load.req_rate},
777 {"queue_len", load.queue_len},
778 {"cpu_load_avg", load.cpu_load_avg}};
779 }
780
781 /* execute the balancer */
782 Mantle mantle;
783 int ret = mantle.balance(bal_code, mds->get_nodeid(), metrics, state.targets);
784 dout(2) << " mantle decided that new targets=" << state.targets << dendl;
785
786 /* mantle doesn't know about cluster size, so check target len here */
787 if ((int) state.targets.size() != cluster_size)
788 return -EINVAL;
789 else if (ret)
790 return ret;
791
792 try_rebalance(state);
793 return 0;
794}
795
796
797
798void MDBalancer::try_rebalance(balance_state_t& state)
799{
7c673cae
FG
800 if (g_conf->mds_thrash_exports) {
801 dout(5) << "mds_thrash is on; not performing standard rebalance operation!"
802 << dendl;
803 return;
804 }
805
806 // make a sorted list of my imports
807 map<double,CDir*> import_pop_map;
808 multimap<mds_rank_t,CDir*> import_from_map;
809 set<CDir*> fullauthsubs;
810
811 mds->mdcache->get_fullauth_subtrees(fullauthsubs);
812 for (set<CDir*>::iterator it = fullauthsubs.begin();
813 it != fullauthsubs.end();
814 ++it) {
815 CDir *im = *it;
816 if (im->get_inode()->is_stray()) continue;
817
818 double pop = im->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
819 if (g_conf->mds_bal_idle_threshold > 0 &&
820 pop < g_conf->mds_bal_idle_threshold &&
821 im->inode != mds->mdcache->get_root() &&
822 im->inode->authority().first != mds->get_nodeid()) {
b32b8144 823 dout(5) << " exporting idle (" << pop << ") import " << *im
7c673cae
FG
824 << " back to mds." << im->inode->authority().first
825 << dendl;
826 mds->mdcache->migrator->export_dir_nicely(im, im->inode->authority().first);
827 continue;
828 }
829
830 import_pop_map[ pop ] = im;
831 mds_rank_t from = im->inode->authority().first;
832 dout(15) << " map: i imported " << *im << " from " << from << dendl;
833 import_from_map.insert(pair<mds_rank_t,CDir*>(from, im));
834 }
835
836
837
838 // do my exports!
839 set<CDir*> already_exporting;
840
841 for (auto &it : state.targets) {
842 mds_rank_t target = it.first;
843 double amount = it.second;
844
845 if (amount < MIN_OFFLOAD) continue;
846 if (amount / target_load < .2) continue;
847
848 dout(5) << "want to send " << amount << " to mds." << target
849 //<< " .. " << (*it).second << " * " << load_fac
850 << " -> " << amount
851 << dendl;//" .. fudge is " << fudge << dendl;
852 double have = 0.0;
853
854
855 mds->mdcache->show_subtrees();
856
857 // search imports from target
858 if (import_from_map.count(target)) {
859 dout(5) << " aha, looking through imports from target mds." << target << dendl;
860 pair<multimap<mds_rank_t,CDir*>::iterator, multimap<mds_rank_t,CDir*>::iterator> p =
861 import_from_map.equal_range(target);
862 while (p.first != p.second) {
863 CDir *dir = (*p.first).second;
864 dout(5) << "considering " << *dir << " from " << (*p.first).first << dendl;
865 multimap<mds_rank_t,CDir*>::iterator plast = p.first++;
866
867 if (dir->inode->is_base() ||
868 dir->inode->is_stray())
869 continue;
870 if (dir->is_freezing() || dir->is_frozen()) continue; // export pbly already in progress
871 double pop = dir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
872 assert(dir->inode->authority().first == target); // cuz that's how i put it in the map, dummy
873
874 if (pop <= amount-have) {
b32b8144 875 dout(5) << "reexporting " << *dir
7c673cae
FG
876 << " pop " << pop
877 << " back to mds." << target << dendl;
878 mds->mdcache->migrator->export_dir_nicely(dir, target);
879 have += pop;
880 import_from_map.erase(plast);
881 import_pop_map.erase(pop);
882 } else {
883 dout(5) << "can't reexport " << *dir << ", too big " << pop << dendl;
884 }
885 if (amount-have < MIN_OFFLOAD) break;
886 }
887 }
888 if (amount-have < MIN_OFFLOAD) {
889 continue;
890 }
891
892 // any other imports
893 if (false)
894 for (map<double,CDir*>::iterator import = import_pop_map.begin();
895 import != import_pop_map.end();
896 import++) {
897 CDir *imp = (*import).second;
898 if (imp->inode->is_base() ||
899 imp->inode->is_stray())
900 continue;
901
902 double pop = (*import).first;
903 if (pop < amount-have || pop < MIN_REEXPORT) {
b32b8144 904 dout(5) << "reexporting " << *imp
7c673cae
FG
905 << " pop " << pop
906 << " back to mds." << imp->inode->authority()
907 << dendl;
908 have += pop;
909 mds->mdcache->migrator->export_dir_nicely(imp, imp->inode->authority().first);
910 }
911 if (amount-have < MIN_OFFLOAD) break;
912 }
913 if (amount-have < MIN_OFFLOAD) {
914 //fudge = amount-have;
915 continue;
916 }
917
918 // okay, search for fragments of my workload
919 set<CDir*> candidates;
920 mds->mdcache->get_fullauth_subtrees(candidates);
921
922 list<CDir*> exports;
923
924 for (set<CDir*>::iterator pot = candidates.begin();
925 pot != candidates.end();
926 ++pot) {
927 if ((*pot)->get_inode()->is_stray()) continue;
928 find_exports(*pot, amount, exports, have, already_exporting);
929 if (have > amount-MIN_OFFLOAD)
930 break;
931 }
932 //fudge = amount - have;
933
934 for (list<CDir*>::iterator it = exports.begin(); it != exports.end(); ++it) {
b32b8144 935 dout(5) << " - exporting "
7c673cae
FG
936 << (*it)->pop_auth_subtree
937 << " "
938 << (*it)->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate)
939 << " to mds." << target
940 << " " << **it
941 << dendl;
942 mds->mdcache->migrator->export_dir_nicely(*it, target);
943 }
944 }
945
946 dout(5) << "rebalance done" << dendl;
947 mds->mdcache->show_subtrees();
948}
949
7c673cae
FG
950void MDBalancer::find_exports(CDir *dir,
951 double amount,
952 list<CDir*>& exports,
953 double& have,
954 set<CDir*>& already_exporting)
955{
956 double need = amount - have;
957 if (need < amount * g_conf->mds_bal_min_start)
958 return; // good enough!
959 double needmax = need * g_conf->mds_bal_need_max;
960 double needmin = need * g_conf->mds_bal_need_min;
961 double midchunk = need * g_conf->mds_bal_midchunk;
962 double minchunk = need * g_conf->mds_bal_minchunk;
963
964 list<CDir*> bigger_rep, bigger_unrep;
965 multimap<double, CDir*> smaller;
966
967 double dir_pop = dir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
968 dout(7) << " find_exports in " << dir_pop << " " << *dir << " need " << need << " (" << needmin << " - " << needmax << ")" << dendl;
969
970 double subdir_sum = 0;
94b18763 971 for (auto it = dir->begin(); it != dir->end(); ++it) {
7c673cae
FG
972 CInode *in = it->second->get_linkage()->get_inode();
973 if (!in) continue;
974 if (!in->is_dir()) continue;
975
976 list<CDir*> dfls;
977 in->get_dirfrags(dfls);
978 for (list<CDir*>::iterator p = dfls.begin();
979 p != dfls.end();
980 ++p) {
981 CDir *subdir = *p;
982 if (!subdir->is_auth()) continue;
983 if (already_exporting.count(subdir)) continue;
984
985 if (subdir->is_frozen()) continue; // can't export this right now!
986
987 // how popular?
988 double pop = subdir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
989 subdir_sum += pop;
990 dout(15) << " subdir pop " << pop << " " << *subdir << dendl;
991
992 if (pop < minchunk) continue;
993
994 // lucky find?
995 if (pop > needmin && pop < needmax) {
996 exports.push_back(subdir);
997 already_exporting.insert(subdir);
998 have += pop;
999 return;
1000 }
1001
1002 if (pop > need) {
1003 if (subdir->is_rep())
1004 bigger_rep.push_back(subdir);
1005 else
1006 bigger_unrep.push_back(subdir);
1007 } else
1008 smaller.insert(pair<double,CDir*>(pop, subdir));
1009 }
1010 }
1011 dout(15) << " sum " << subdir_sum << " / " << dir_pop << dendl;
1012
1013 // grab some sufficiently big small items
1014 multimap<double,CDir*>::reverse_iterator it;
1015 for (it = smaller.rbegin();
1016 it != smaller.rend();
1017 ++it) {
1018
1019 if ((*it).first < midchunk)
1020 break; // try later
1021
1022 dout(7) << " taking smaller " << *(*it).second << dendl;
1023
1024 exports.push_back((*it).second);
1025 already_exporting.insert((*it).second);
1026 have += (*it).first;
1027 if (have > needmin)
1028 return;
1029 }
1030
1031 // apprently not enough; drill deeper into the hierarchy (if non-replicated)
1032 for (list<CDir*>::iterator it = bigger_unrep.begin();
1033 it != bigger_unrep.end();
1034 ++it) {
1035 dout(15) << " descending into " << **it << dendl;
1036 find_exports(*it, amount, exports, have, already_exporting);
1037 if (have > needmin)
1038 return;
1039 }
1040
1041 // ok fine, use smaller bits
1042 for (;
1043 it != smaller.rend();
1044 ++it) {
1045 dout(7) << " taking (much) smaller " << it->first << " " << *(*it).second << dendl;
1046
1047 exports.push_back((*it).second);
1048 already_exporting.insert((*it).second);
1049 have += (*it).first;
1050 if (have > needmin)
1051 return;
1052 }
1053
1054 // ok fine, drill into replicated dirs
1055 for (list<CDir*>::iterator it = bigger_rep.begin();
1056 it != bigger_rep.end();
1057 ++it) {
1058 dout(7) << " descending into replicated " << **it << dendl;
1059 find_exports(*it, amount, exports, have, already_exporting);
1060 if (have > needmin)
1061 return;
1062 }
1063
1064}
1065
1066void MDBalancer::hit_inode(utime_t now, CInode *in, int type, int who)
1067{
1068 // hit inode
1069 in->pop.get(type).hit(now, mds->mdcache->decayrate);
1070
1071 if (in->get_parent_dn())
1072 hit_dir(now, in->get_parent_dn()->get_dir(), type, who);
1073}
1074
1075void MDBalancer::maybe_fragment(CDir *dir, bool hot)
1076{
1077 // split/merge
1078 if (g_conf->mds_bal_frag && g_conf->mds_bal_fragment_interval > 0 &&
1079 !dir->inode->is_base() && // not root/base (for now at least)
1080 dir->is_auth()) {
1081
1082 // split
1083 if (g_conf->mds_bal_split_size > 0 &&
1084 mds->mdsmap->allows_dirfrags() &&
1085 (dir->should_split() || hot))
1086 {
1087 if (split_pending.count(dir->dirfrag()) == 0) {
1088 queue_split(dir, false);
1089 } else {
1090 if (dir->should_split_fast()) {
1091 queue_split(dir, true);
1092 } else {
1093 dout(10) << __func__ << ": fragment already enqueued to split: "
1094 << *dir << dendl;
1095 }
1096 }
1097 }
1098
1099 // merge?
1100 if (dir->get_frag() != frag_t() && dir->should_merge() &&
1101 merge_pending.count(dir->dirfrag()) == 0) {
1102 queue_merge(dir);
1103 }
1104 }
1105}
1106
1107void MDBalancer::hit_dir(utime_t now, CDir *dir, int type, int who, double amount)
1108{
1109 // hit me
94b18763 1110 double v = dir->pop_me.get(type).hit(now, mds->mdcache->decayrate, amount);
7c673cae
FG
1111
1112 const bool hot = (v > g_conf->mds_bal_split_rd && type == META_POP_IRD) ||
1113 (v > g_conf->mds_bal_split_wr && type == META_POP_IWR);
1114
1115 dout(20) << "hit_dir " << type << " pop is " << v << ", frag " << dir->get_frag()
1116 << " size " << dir->get_frag_size() << dendl;
1117
1118 maybe_fragment(dir, hot);
1119
1120 // replicate?
1121 if (type == META_POP_IRD && who >= 0) {
1122 dir->pop_spread.hit(now, mds->mdcache->decayrate, who);
1123 }
1124
1125 double rd_adj = 0.0;
1126 if (type == META_POP_IRD &&
1127 dir->last_popularity_sample < last_sample) {
1128 double dir_pop = dir->pop_auth_subtree.get(type).get(now, mds->mdcache->decayrate); // hmm??
1129 dir->last_popularity_sample = last_sample;
1130 double pop_sp = dir->pop_spread.get(now, mds->mdcache->decayrate);
1131 dir_pop += pop_sp * 10;
1132
1133 //if (dir->ino() == inodeno_t(0x10000000002))
1134 if (pop_sp > 0) {
1135 dout(20) << "hit_dir " << type << " pop " << dir_pop << " spread " << pop_sp
1136 << " " << dir->pop_spread.last[0]
1137 << " " << dir->pop_spread.last[1]
1138 << " " << dir->pop_spread.last[2]
1139 << " " << dir->pop_spread.last[3]
1140 << " in " << *dir << dendl;
1141 }
1142
1143 if (dir->is_auth() && !dir->is_ambiguous_auth()) {
1144 if (!dir->is_rep() &&
1145 dir_pop >= g_conf->mds_bal_replicate_threshold) {
1146 // replicate
1147 double rdp = dir->pop_me.get(META_POP_IRD).get(now, mds->mdcache->decayrate);
1148 rd_adj = rdp / mds->get_mds_map()->get_num_in_mds() - rdp;
1149 rd_adj /= 2.0; // temper somewhat
1150
b32b8144 1151 dout(5) << "replicating dir " << *dir << " pop " << dir_pop << " .. rdp " << rdp << " adj " << rd_adj << dendl;
7c673cae
FG
1152
1153 dir->dir_rep = CDir::REP_ALL;
1154 mds->mdcache->send_dir_updates(dir, true);
1155
1156 // fixme this should adjust the whole pop hierarchy
1157 dir->pop_me.get(META_POP_IRD).adjust(rd_adj);
1158 dir->pop_auth_subtree.get(META_POP_IRD).adjust(rd_adj);
1159 }
1160
1161 if (dir->ino() != 1 &&
1162 dir->is_rep() &&
1163 dir_pop < g_conf->mds_bal_unreplicate_threshold) {
1164 // unreplicate
b32b8144 1165 dout(5) << "unreplicating dir " << *dir << " pop " << dir_pop << dendl;
7c673cae
FG
1166
1167 dir->dir_rep = CDir::REP_NONE;
1168 mds->mdcache->send_dir_updates(dir);
1169 }
1170 }
1171 }
1172
1173 // adjust ancestors
1174 bool hit_subtree = dir->is_auth(); // current auth subtree (if any)
1175 bool hit_subtree_nested = dir->is_auth(); // all nested auth subtrees
1176
1177 while (true) {
94b18763 1178 dir->pop_nested.get(type).hit(now, mds->mdcache->decayrate, amount);
7c673cae
FG
1179 if (rd_adj != 0.0)
1180 dir->pop_nested.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1181
1182 if (hit_subtree) {
94b18763 1183 dir->pop_auth_subtree.get(type).hit(now, mds->mdcache->decayrate, amount);
7c673cae
FG
1184 if (rd_adj != 0.0)
1185 dir->pop_auth_subtree.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1186 }
1187
1188 if (hit_subtree_nested) {
1189 dir->pop_auth_subtree_nested.get(type).hit(now, mds->mdcache->decayrate, amount);
1190 if (rd_adj != 0.0)
1191 dir->pop_auth_subtree_nested.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1192 }
1193
1194 if (dir->is_subtree_root())
1195 hit_subtree = false; // end of auth domain, stop hitting auth counters.
1196
1197 if (dir->inode->get_parent_dn() == 0) break;
1198 dir = dir->inode->get_parent_dn()->get_dir();
1199 }
1200}
1201
1202
1203/*
1204 * subtract off an exported chunk.
1205 * this excludes *dir itself (encode_export_dir should have take care of that)
1206 * we _just_ do the parents' nested counters.
1207 *
1208 * NOTE: call me _after_ forcing *dir into a subtree root,
1209 * but _before_ doing the encode_export_dirs.
1210 */
1211void MDBalancer::subtract_export(CDir *dir, utime_t now)
1212{
1213 dirfrag_load_vec_t subload = dir->pop_auth_subtree;
1214
1215 while (true) {
1216 dir = dir->inode->get_parent_dir();
1217 if (!dir) break;
1218
1219 dir->pop_nested.sub(now, mds->mdcache->decayrate, subload);
1220 dir->pop_auth_subtree_nested.sub(now, mds->mdcache->decayrate, subload);
1221 }
1222}
1223
1224
1225void MDBalancer::add_import(CDir *dir, utime_t now)
1226{
1227 dirfrag_load_vec_t subload = dir->pop_auth_subtree;
1228
1229 while (true) {
1230 dir = dir->inode->get_parent_dir();
1231 if (!dir) break;
1232
1233 dir->pop_nested.add(now, mds->mdcache->decayrate, subload);
1234 dir->pop_auth_subtree_nested.add(now, mds->mdcache->decayrate, subload);
1235 }
1236}
1237
224ce89b
WB
1238void MDBalancer::handle_mds_failure(mds_rank_t who)
1239{
1240 if (0 == who) {
1241 last_epoch_under = 0;
1242 }
1243}