]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDBalancer.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / mds / MDBalancer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "include/compat.h"
16 #include "mdstypes.h"
17
18 #include "MDBalancer.h"
19 #include "MDSRank.h"
20 #include "mon/MonClient.h"
21 #include "MDSMap.h"
22 #include "CInode.h"
23 #include "CDir.h"
24 #include "MDCache.h"
25 #include "Migrator.h"
26 #include "Mantle.h"
27
28 #include "include/Context.h"
29 #include "msg/Messenger.h"
30 #include "messages/MHeartbeat.h"
31
32 #include <fstream>
33 #include <iostream>
34 #include <vector>
35 #include <map>
36 using std::map;
37 using std::vector;
38
39 #include "common/config.h"
40 #include "common/errno.h"
41
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
44 #undef dout_prefix
45 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal "
46 #undef dout
47 #define dout(lvl) \
48 do {\
49 auto subsys = ceph_subsys_mds;\
50 if ((dout_context)->_conf->subsys.should_gather(ceph_subsys_mds_balancer, lvl)) {\
51 subsys = ceph_subsys_mds_balancer;\
52 }\
53 dout_impl(dout_context, subsys, lvl) dout_prefix
54 #undef dendl
55 #define dendl dendl_impl; } while (0)
56
57
58 #define MIN_LOAD 50 // ??
59 #define MIN_REEXPORT 5 // will automatically reexport
60 #define MIN_OFFLOAD 10 // point at which i stop trying, close enough
61
62
63 /* This function DOES put the passed message before returning */
64 int MDBalancer::proc_message(Message *m)
65 {
66 switch (m->get_type()) {
67
68 case MSG_MDS_HEARTBEAT:
69 handle_heartbeat(static_cast<MHeartbeat*>(m));
70 break;
71
72 default:
73 dout(0) << " balancer unknown message " << m->get_type() << dendl;
74 assert(0 == "balancer unknown message");
75 }
76
77 return 0;
78 }
79
80 void MDBalancer::handle_export_pins(void)
81 {
82 auto &q = mds->mdcache->export_pin_queue;
83 auto it = q.begin();
84 dout(20) << "export_pin_queue size=" << q.size() << dendl;
85 while (it != q.end()) {
86 auto current = it++;
87 CInode *in = *current;
88 assert(in->is_dir());
89 mds_rank_t export_pin = in->get_export_pin();
90 if (!in->is_exportable(export_pin)) {
91 dout(10) << "can no longer export " << *in << " because export pins have since changed" << dendl;
92 q.erase(current);
93 continue;
94 }
95 dout(10) << "exporting dirfrags of " << *in << " to " << export_pin << dendl;
96 bool has_auth = false;
97 list<frag_t> ls;
98 in->dirfragtree.get_leaves(ls);
99 for (const auto &fg : ls) {
100 CDir *cd = in->get_dirfrag(fg);
101 if (cd && cd->is_auth()) {
102 /* N.B. when we are no longer auth after exporting, this function will remove the inode from the queue */
103 mds->mdcache->migrator->export_dir(cd, export_pin);
104 has_auth = true;
105 }
106 }
107 if (!has_auth) {
108 dout(10) << "can no longer export " << *in << " because I am not auth for any dirfrags" << dendl;
109 q.erase(current);
110 continue;
111 }
112 }
113
114 set<CDir *> authsubs;
115 mds->mdcache->get_auth_subtrees(authsubs);
116 for (auto &cd : authsubs) {
117 mds_rank_t export_pin = cd->inode->get_export_pin();
118 dout(10) << "auth tree " << *cd << " export_pin=" << export_pin << dendl;
119 if (export_pin >= 0 && export_pin != mds->get_nodeid()) {
120 dout(10) << "exporting auth subtree " << *cd->inode << " to " << export_pin << dendl;
121 mds->mdcache->migrator->export_dir(cd, export_pin);
122 }
123 }
124 }
125
126 void MDBalancer::tick()
127 {
128 static int num_bal_times = g_conf->mds_bal_max;
129 static utime_t first = ceph_clock_now();
130 utime_t now = ceph_clock_now();
131 utime_t elapsed = now;
132 elapsed -= first;
133
134 if (g_conf->mds_bal_export_pin) {
135 handle_export_pins();
136 }
137
138 // sample?
139 if ((double)now - (double)last_sample > g_conf->mds_bal_sample_interval) {
140 dout(15) << "tick last_sample now " << now << dendl;
141 last_sample = now;
142 }
143
144 // balance?
145 if (last_heartbeat == utime_t())
146 last_heartbeat = now;
147 if (mds->get_nodeid() == 0 &&
148 g_conf->mds_bal_interval > 0 &&
149 (num_bal_times ||
150 (g_conf->mds_bal_max_until >= 0 &&
151 elapsed.sec() > g_conf->mds_bal_max_until)) &&
152 mds->is_active() &&
153 now.sec() - last_heartbeat.sec() >= g_conf->mds_bal_interval) {
154 last_heartbeat = now;
155 send_heartbeat();
156 num_bal_times--;
157 }
158 }
159
160
161
162
163 class C_Bal_SendHeartbeat : public MDSInternalContext {
164 public:
165 explicit C_Bal_SendHeartbeat(MDSRank *mds_) : MDSInternalContext(mds_) { }
166 void finish(int f) override {
167 mds->balancer->send_heartbeat();
168 }
169 };
170
171
172 double mds_load_t::mds_load()
173 {
174 switch(g_conf->mds_bal_mode) {
175 case 0:
176 return
177 .8 * auth.meta_load() +
178 .2 * all.meta_load() +
179 req_rate +
180 10.0 * queue_len;
181
182 case 1:
183 return req_rate + 10.0*queue_len;
184
185 case 2:
186 return cpu_load_avg;
187
188 }
189 ceph_abort();
190 return 0;
191 }
192
193 mds_load_t MDBalancer::get_load(utime_t now)
194 {
195 mds_load_t load(now);
196
197 if (mds->mdcache->get_root()) {
198 list<CDir*> ls;
199 mds->mdcache->get_root()->get_dirfrags(ls);
200 for (list<CDir*>::iterator p = ls.begin();
201 p != ls.end();
202 ++p) {
203 load.auth.add(now, mds->mdcache->decayrate, (*p)->pop_auth_subtree_nested);
204 load.all.add(now, mds->mdcache->decayrate, (*p)->pop_nested);
205 }
206 } else {
207 dout(20) << "get_load no root, no load" << dendl;
208 }
209
210 load.req_rate = mds->get_req_rate();
211 load.queue_len = messenger->get_dispatch_queue_len();
212
213 ifstream cpu(PROCPREFIX "/proc/loadavg");
214 if (cpu.is_open())
215 cpu >> load.cpu_load_avg;
216 else
217 dout(0) << "input file " PROCPREFIX "'/proc/loadavg' not found" << dendl;
218
219 dout(15) << "get_load " << load << dendl;
220 return load;
221 }
222
223 /*
224 * Read synchronously from RADOS using a timeout. We cannot do daemon-local
225 * fallbacks (i.e. kick off async read when we are processing the map and
226 * check status when we get here) with the way the mds is structured.
227 */
228 int MDBalancer::localize_balancer()
229 {
230 /* reset everything */
231 bool ack = false;
232 int r = 0;
233 bufferlist lua_src;
234 Mutex lock("lock");
235 Cond cond;
236
237 /* we assume that balancer is in the metadata pool */
238 object_t oid = object_t(mds->mdsmap->get_balancer());
239 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
240 ceph_tid_t tid = mds->objecter->read(oid, oloc, 0, 0, CEPH_NOSNAP, &lua_src, 0,
241 new C_SafeCond(&lock, &cond, &ack, &r));
242 dout(15) << "launched non-blocking read tid=" << tid
243 << " oid=" << oid << " oloc=" << oloc << dendl;
244
245 /* timeout: if we waste half our time waiting for RADOS, then abort! */
246 double t = ceph_clock_now() + g_conf->mds_bal_interval/2;
247 utime_t timeout;
248 timeout.set_from_double(t);
249 lock.Lock();
250 int ret_t = cond.WaitUntil(lock, timeout);
251 lock.Unlock();
252
253 /* success: store the balancer in memory and set the version. */
254 if (!r) {
255 if (ret_t == ETIMEDOUT) {
256 mds->objecter->op_cancel(tid, -ECANCELED);
257 return -ETIMEDOUT;
258 }
259 bal_code.assign(lua_src.to_str());
260 bal_version.assign(oid.name);
261 dout(0) << "localized balancer, bal_code=" << bal_code << dendl;
262 }
263 return r;
264 }
265
266 void MDBalancer::send_heartbeat()
267 {
268 utime_t now = ceph_clock_now();
269
270 if (mds->is_cluster_degraded()) {
271 dout(10) << "send_heartbeat degraded" << dendl;
272 return;
273 }
274
275 if (!mds->mdcache->is_open()) {
276 dout(5) << "not open" << dendl;
277 mds->mdcache->wait_for_open(new C_Bal_SendHeartbeat(mds));
278 return;
279 }
280
281 mds_load.clear();
282 if (mds->get_nodeid() == 0)
283 beat_epoch++;
284
285 // my load
286 mds_load_t load = get_load(now);
287 map<mds_rank_t, mds_load_t>::value_type val(mds->get_nodeid(), load);
288 mds_load.insert(val);
289
290 // import_map -- how much do i import from whom
291 map<mds_rank_t, float> import_map;
292 set<CDir*> authsubs;
293 mds->mdcache->get_auth_subtrees(authsubs);
294 for (set<CDir*>::iterator it = authsubs.begin();
295 it != authsubs.end();
296 ++it) {
297 CDir *im = *it;
298 mds_rank_t from = im->inode->authority().first;
299 if (from == mds->get_nodeid()) continue;
300 if (im->get_inode()->is_stray()) continue;
301 import_map[from] += im->pop_auth_subtree.meta_load(now, mds->mdcache->decayrate);
302 }
303 mds_import_map[ mds->get_nodeid() ] = import_map;
304
305
306 dout(5) << "mds." << mds->get_nodeid() << " epoch " << beat_epoch << " load " << load << dendl;
307 for (map<mds_rank_t, float>::iterator it = import_map.begin();
308 it != import_map.end();
309 ++it) {
310 dout(5) << " import_map from " << it->first << " -> " << it->second << dendl;
311 }
312
313
314 set<mds_rank_t> up;
315 mds->get_mds_map()->get_up_mds_set(up);
316 for (set<mds_rank_t>::iterator p = up.begin(); p != up.end(); ++p) {
317 if (*p == mds->get_nodeid())
318 continue;
319 MHeartbeat *hb = new MHeartbeat(load, beat_epoch);
320 hb->get_import_map() = import_map;
321 messenger->send_message(hb,
322 mds->mdsmap->get_inst(*p));
323 }
324 }
325
326 /* This function DOES put the passed message before returning */
327 void MDBalancer::handle_heartbeat(MHeartbeat *m)
328 {
329 typedef map<mds_rank_t, mds_load_t> mds_load_map_t;
330
331 mds_rank_t who = mds_rank_t(m->get_source().num());
332 dout(25) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source().num() << " " << m->get_load() << dendl;
333
334 if (!mds->is_active())
335 goto out;
336
337 if (!mds->mdcache->is_open()) {
338 dout(10) << "opening root on handle_heartbeat" << dendl;
339 mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m));
340 return;
341 }
342
343 if (mds->is_cluster_degraded()) {
344 dout(10) << " degraded, ignoring" << dendl;
345 goto out;
346 }
347
348 if (who == 0) {
349 dout(20) << " from mds0, new epoch" << dendl;
350 beat_epoch = m->get_beat();
351 send_heartbeat();
352
353 mds->mdcache->show_subtrees();
354 }
355
356 {
357 // set mds_load[who]
358 mds_load_map_t::value_type val(who, m->get_load());
359 pair < mds_load_map_t::iterator, bool > rval (mds_load.insert(val));
360 if (!rval.second) {
361 rval.first->second = val.second;
362 }
363 }
364 mds_import_map[ who ] = m->get_import_map();
365
366 //dout(0) << " load is " << load << " have " << mds_load.size() << dendl;
367
368 {
369 unsigned cluster_size = mds->get_mds_map()->get_num_in_mds();
370 if (mds_load.size() == cluster_size) {
371 // let's go!
372 //export_empties(); // no!
373
374 /* avoid spamming ceph -w if user does not turn mantle on */
375 if (mds->mdsmap->get_balancer() != "") {
376 int r = mantle_prep_rebalance();
377 if (!r) return;
378 mds->clog->warn() << "using old balancer; mantle failed for "
379 << "balancer=" << mds->mdsmap->get_balancer()
380 << " : " << cpp_strerror(r);
381 }
382 prep_rebalance(m->get_beat());
383 }
384 }
385
386 // done
387 out:
388 m->put();
389 }
390
391
392 void MDBalancer::export_empties()
393 {
394 dout(5) << "export_empties checking for empty imports" << dendl;
395
396 std::set<CDir *> subtrees;
397 mds->mdcache->get_fullauth_subtrees(subtrees);
398 for (auto &dir : subtrees) {
399 if (dir->is_freezing() || dir->is_frozen())
400 continue;
401
402 if (!dir->inode->is_base() &&
403 !dir->inode->is_stray() &&
404 dir->get_num_head_items() == 0)
405 mds->mdcache->migrator->export_empty_import(dir);
406 }
407 }
408
409
410
411 double MDBalancer::try_match(balance_state_t& state, mds_rank_t ex, double& maxex,
412 mds_rank_t im, double& maxim)
413 {
414 if (maxex <= 0 || maxim <= 0) return 0.0;
415
416 double howmuch = MIN(maxex, maxim);
417 if (howmuch <= 0) return 0.0;
418
419 dout(5) << " - mds." << ex << " exports " << howmuch << " to mds." << im << dendl;
420
421 if (ex == mds->get_nodeid())
422 state.targets[im] += howmuch;
423
424 state.exported[ex] += howmuch;
425 state.imported[im] += howmuch;
426
427 maxex -= howmuch;
428 maxim -= howmuch;
429
430 return howmuch;
431 }
432
433 void MDBalancer::queue_split(const CDir *dir, bool fast)
434 {
435 dout(10) << __func__ << " enqueuing " << *dir
436 << " (fast=" << fast << ")" << dendl;
437
438 assert(mds->mdsmap->allows_dirfrags());
439 const dirfrag_t frag = dir->dirfrag();
440
441 auto callback = [this, frag](int r) {
442 if (split_pending.erase(frag) == 0) {
443 // Someone beat me to it. This can happen in the fast splitting
444 // path, because we spawn two contexts, one with mds->timer and
445 // one with mds->queue_waiter. The loser can safely just drop
446 // out.
447 return;
448 }
449
450 CDir *split_dir = mds->mdcache->get_dirfrag(frag);
451 if (!split_dir) {
452 dout(10) << "drop split on " << frag << " because not in cache" << dendl;
453 return;
454 }
455 if (!split_dir->is_auth()) {
456 dout(10) << "drop split on " << frag << " because non-auth" << dendl;
457 return;
458 }
459
460 // Pass on to MDCache: note that the split might still not
461 // happen if the checks in MDCache::can_fragment fail.
462 dout(10) << __func__ << " splitting " << *split_dir << dendl;
463 mds->mdcache->split_dir(split_dir, g_conf->mds_bal_split_bits);
464 };
465
466 bool is_new = false;
467 if (split_pending.count(frag) == 0) {
468 split_pending.insert(frag);
469 is_new = true;
470 }
471
472 if (fast) {
473 // Do the split ASAP: enqueue it in the MDSRank waiters which are
474 // run at the end of dispatching the current request
475 mds->queue_waiter(new MDSInternalContextWrapper(mds,
476 new FunctionContext(callback)));
477 } else if (is_new) {
478 // Set a timer to really do the split: we don't do it immediately
479 // so that bursts of ops on a directory have a chance to go through
480 // before we freeze it.
481 mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
482 new FunctionContext(callback));
483 }
484 }
485
486 void MDBalancer::queue_merge(CDir *dir)
487 {
488 const auto frag = dir->dirfrag();
489 auto callback = [this, frag](int r) {
490 assert(frag.frag != frag_t());
491
492 // frag must be in this set because only one context is in flight
493 // for a given frag at a time (because merge_pending is checked before
494 // starting one), and this context is the only one that erases it.
495 merge_pending.erase(frag);
496
497 CDir *dir = mds->mdcache->get_dirfrag(frag);
498 if (!dir) {
499 dout(10) << "drop merge on " << frag << " because not in cache" << dendl;
500 return;
501 }
502 assert(dir->dirfrag() == frag);
503
504 if(!dir->is_auth()) {
505 dout(10) << "drop merge on " << *dir << " because lost auth" << dendl;
506 return;
507 }
508
509 dout(10) << "merging " << *dir << dendl;
510
511 CInode *diri = dir->get_inode();
512
513 frag_t fg = dir->get_frag();
514 while (fg != frag_t()) {
515 frag_t sibfg = fg.get_sibling();
516 list<CDir*> sibs;
517 bool complete = diri->get_dirfrags_under(sibfg, sibs);
518 if (!complete) {
519 dout(10) << " not all sibs under " << sibfg << " in cache (have " << sibs << ")" << dendl;
520 break;
521 }
522 bool all = true;
523 for (list<CDir*>::iterator p = sibs.begin(); p != sibs.end(); ++p) {
524 CDir *sib = *p;
525 if (!sib->is_auth() || !sib->should_merge()) {
526 all = false;
527 break;
528 }
529 }
530 if (!all) {
531 dout(10) << " not all sibs under " << sibfg << " " << sibs << " should_merge" << dendl;
532 break;
533 }
534 dout(10) << " all sibs under " << sibfg << " " << sibs << " should merge" << dendl;
535 fg = fg.parent();
536 }
537
538 if (fg != dir->get_frag())
539 mds->mdcache->merge_dir(diri, fg);
540 };
541
542 if (merge_pending.count(frag) == 0) {
543 dout(20) << __func__ << " enqueued dir " << *dir << dendl;
544 merge_pending.insert(frag);
545 mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
546 new FunctionContext(callback));
547 } else {
548 dout(20) << __func__ << " dir already in queue " << *dir << dendl;
549 }
550 }
551
552 void MDBalancer::prep_rebalance(int beat)
553 {
554 balance_state_t state;
555
556 if (g_conf->mds_thrash_exports) {
557 //we're going to randomly export to all the mds in the cluster
558 set<mds_rank_t> up_mds;
559 mds->get_mds_map()->get_up_mds_set(up_mds);
560 for (const auto &rank : up_mds) {
561 state.targets[rank] = 0.0;
562 }
563 } else {
564 int cluster_size = mds->get_mds_map()->get_num_in_mds();
565 mds_rank_t whoami = mds->get_nodeid();
566 rebalance_time = ceph_clock_now();
567
568 dout(5) << " prep_rebalance: cluster loads are" << dendl;
569
570 mds->mdcache->migrator->clear_export_queue();
571
572 // rescale! turn my mds_load back into meta_load units
573 double load_fac = 1.0;
574 map<mds_rank_t, mds_load_t>::iterator m = mds_load.find(whoami);
575 if ((m != mds_load.end()) && (m->second.mds_load() > 0)) {
576 double metald = m->second.auth.meta_load(rebalance_time, mds->mdcache->decayrate);
577 double mdsld = m->second.mds_load();
578 load_fac = metald / mdsld;
579 dout(7) << " load_fac is " << load_fac
580 << " <- " << m->second.auth << " " << metald
581 << " / " << mdsld
582 << dendl;
583 }
584
585 double total_load = 0.0;
586 multimap<double,mds_rank_t> load_map;
587 for (mds_rank_t i=mds_rank_t(0); i < mds_rank_t(cluster_size); i++) {
588 map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now()));
589 std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
590 mds_load_t &load(r.first->second);
591
592 double l = load.mds_load() * load_fac;
593 mds_meta_load[i] = l;
594
595 if (whoami == 0)
596 dout(0) << " mds." << i
597 << " " << load
598 << " = " << load.mds_load()
599 << " ~ " << l << dendl;
600
601 if (whoami == i) my_load = l;
602 total_load += l;
603
604 load_map.insert(pair<double,mds_rank_t>( l, i ));
605 }
606
607 // target load
608 target_load = total_load / (double)cluster_size;
609 dout(5) << "prep_rebalance: my load " << my_load
610 << " target " << target_load
611 << " total " << total_load
612 << dendl;
613
614 // under or over?
615 if (my_load < target_load * (1.0 + g_conf->mds_bal_min_rebalance)) {
616 dout(5) << " i am underloaded or barely overloaded, doing nothing." << dendl;
617 last_epoch_under = beat_epoch;
618 mds->mdcache->show_subtrees();
619 return;
620 }
621
622 last_epoch_over = beat_epoch;
623
624 // am i over long enough?
625 if (last_epoch_under && beat_epoch - last_epoch_under < 2) {
626 dout(5) << " i am overloaded, but only for " << (beat_epoch - last_epoch_under) << " epochs" << dendl;
627 return;
628 }
629
630 dout(5) << " i am sufficiently overloaded" << dendl;
631
632
633 // first separate exporters and importers
634 multimap<double,mds_rank_t> importers;
635 multimap<double,mds_rank_t> exporters;
636 set<mds_rank_t> importer_set;
637 set<mds_rank_t> exporter_set;
638
639 for (multimap<double,mds_rank_t>::iterator it = load_map.begin();
640 it != load_map.end();
641 ++it) {
642 if (it->first < target_load) {
643 dout(15) << " mds." << it->second << " is importer" << dendl;
644 importers.insert(pair<double,mds_rank_t>(it->first,it->second));
645 importer_set.insert(it->second);
646 } else {
647 dout(15) << " mds." << it->second << " is exporter" << dendl;
648 exporters.insert(pair<double,mds_rank_t>(it->first,it->second));
649 exporter_set.insert(it->second);
650 }
651 }
652
653
654 // determine load transfer mapping
655
656 if (true) {
657 // analyze import_map; do any matches i can
658
659 dout(15) << " matching exporters to import sources" << dendl;
660
661 // big -> small exporters
662 for (multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
663 ex != exporters.rend();
664 ++ex) {
665 double maxex = get_maxex(state, ex->second);
666 if (maxex <= .001) continue;
667
668 // check importers. for now, just in arbitrary order (no intelligent matching).
669 for (map<mds_rank_t, float>::iterator im = mds_import_map[ex->second].begin();
670 im != mds_import_map[ex->second].end();
671 ++im) {
672 double maxim = get_maxim(state, im->first);
673 if (maxim <= .001) continue;
674 try_match(state, ex->second, maxex, im->first, maxim);
675 if (maxex <= .001) break;
676 }
677 }
678 }
679
680 // old way
681 if (beat % 2 == 1) {
682 dout(15) << " matching big exporters to big importers" << dendl;
683 // big exporters to big importers
684 multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
685 multimap<double,mds_rank_t>::iterator im = importers.begin();
686 while (ex != exporters.rend() &&
687 im != importers.end()) {
688 double maxex = get_maxex(state, ex->second);
689 double maxim = get_maxim(state, im->second);
690 if (maxex < .001 || maxim < .001) break;
691 try_match(state, ex->second, maxex, im->second, maxim);
692 if (maxex <= .001) ++ex;
693 if (maxim <= .001) ++im;
694 }
695 } else { // new way
696 dout(15) << " matching small exporters to big importers" << dendl;
697 // small exporters to big importers
698 multimap<double,mds_rank_t>::iterator ex = exporters.begin();
699 multimap<double,mds_rank_t>::iterator im = importers.begin();
700 while (ex != exporters.end() &&
701 im != importers.end()) {
702 double maxex = get_maxex(state, ex->second);
703 double maxim = get_maxim(state, im->second);
704 if (maxex < .001 || maxim < .001) break;
705 try_match(state, ex->second, maxex, im->second, maxim);
706 if (maxex <= .001) ++ex;
707 if (maxim <= .001) ++im;
708 }
709 }
710 }
711 try_rebalance(state);
712 }
713
714 void MDBalancer::hit_targets(const balance_state_t& state)
715 {
716 utime_t now = ceph_clock_now();
717 for (auto &it : state.targets) {
718 mds_rank_t target = it.first;
719 mds->hit_export_target(now, target, g_conf->mds_bal_target_decay);
720 }
721 }
722
723 int MDBalancer::mantle_prep_rebalance()
724 {
725 balance_state_t state;
726
727 /* refresh balancer if it has changed */
728 if (bal_version != mds->mdsmap->get_balancer()) {
729 bal_version.assign("");
730 int r = localize_balancer();
731 if (r) return r;
732
733 /* only spam the cluster log from 1 mds on version changes */
734 if (mds->get_nodeid() == 0)
735 mds->clog->info() << "mantle balancer version changed: " << bal_version;
736 }
737
738 /* prepare for balancing */
739 int cluster_size = mds->get_mds_map()->get_num_in_mds();
740 rebalance_time = ceph_clock_now();
741 mds->mdcache->migrator->clear_export_queue();
742
743 /* fill in the metrics for each mds by grabbing load struct */
744 vector < map<string, double> > metrics (cluster_size);
745 for (mds_rank_t i=mds_rank_t(0);
746 i < mds_rank_t(cluster_size);
747 i++) {
748 map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now()));
749 std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
750 mds_load_t &load(r.first->second);
751
752 metrics[i] = {{"auth.meta_load", load.auth.meta_load()},
753 {"all.meta_load", load.all.meta_load()},
754 {"req_rate", load.req_rate},
755 {"queue_len", load.queue_len},
756 {"cpu_load_avg", load.cpu_load_avg}};
757 }
758
759 /* execute the balancer */
760 Mantle mantle;
761 int ret = mantle.balance(bal_code, mds->get_nodeid(), metrics, state.targets);
762 dout(2) << " mantle decided that new targets=" << state.targets << dendl;
763
764 /* mantle doesn't know about cluster size, so check target len here */
765 if ((int) state.targets.size() != cluster_size)
766 return -EINVAL;
767 else if (ret)
768 return ret;
769
770 try_rebalance(state);
771 return 0;
772 }
773
774
775
776 void MDBalancer::try_rebalance(balance_state_t& state)
777 {
778 if (!check_targets(state))
779 return;
780
781 if (g_conf->mds_thrash_exports) {
782 dout(5) << "mds_thrash is on; not performing standard rebalance operation!"
783 << dendl;
784 return;
785 }
786
787 // make a sorted list of my imports
788 map<double,CDir*> import_pop_map;
789 multimap<mds_rank_t,CDir*> import_from_map;
790 set<CDir*> fullauthsubs;
791
792 mds->mdcache->get_fullauth_subtrees(fullauthsubs);
793 for (set<CDir*>::iterator it = fullauthsubs.begin();
794 it != fullauthsubs.end();
795 ++it) {
796 CDir *im = *it;
797 if (im->get_inode()->is_stray()) continue;
798
799 double pop = im->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
800 if (g_conf->mds_bal_idle_threshold > 0 &&
801 pop < g_conf->mds_bal_idle_threshold &&
802 im->inode != mds->mdcache->get_root() &&
803 im->inode->authority().first != mds->get_nodeid()) {
804 dout(0) << " exporting idle (" << pop << ") import " << *im
805 << " back to mds." << im->inode->authority().first
806 << dendl;
807 mds->mdcache->migrator->export_dir_nicely(im, im->inode->authority().first);
808 continue;
809 }
810
811 import_pop_map[ pop ] = im;
812 mds_rank_t from = im->inode->authority().first;
813 dout(15) << " map: i imported " << *im << " from " << from << dendl;
814 import_from_map.insert(pair<mds_rank_t,CDir*>(from, im));
815 }
816
817
818
819 // do my exports!
820 set<CDir*> already_exporting;
821
822 for (auto &it : state.targets) {
823 mds_rank_t target = it.first;
824 double amount = it.second;
825
826 if (amount < MIN_OFFLOAD) continue;
827 if (amount / target_load < .2) continue;
828
829 dout(5) << "want to send " << amount << " to mds." << target
830 //<< " .. " << (*it).second << " * " << load_fac
831 << " -> " << amount
832 << dendl;//" .. fudge is " << fudge << dendl;
833 double have = 0.0;
834
835
836 mds->mdcache->show_subtrees();
837
838 // search imports from target
839 if (import_from_map.count(target)) {
840 dout(5) << " aha, looking through imports from target mds." << target << dendl;
841 pair<multimap<mds_rank_t,CDir*>::iterator, multimap<mds_rank_t,CDir*>::iterator> p =
842 import_from_map.equal_range(target);
843 while (p.first != p.second) {
844 CDir *dir = (*p.first).second;
845 dout(5) << "considering " << *dir << " from " << (*p.first).first << dendl;
846 multimap<mds_rank_t,CDir*>::iterator plast = p.first++;
847
848 if (dir->inode->is_base() ||
849 dir->inode->is_stray())
850 continue;
851 if (dir->is_freezing() || dir->is_frozen()) continue; // export pbly already in progress
852 double pop = dir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
853 assert(dir->inode->authority().first == target); // cuz that's how i put it in the map, dummy
854
855 if (pop <= amount-have) {
856 dout(0) << "reexporting " << *dir
857 << " pop " << pop
858 << " back to mds." << target << dendl;
859 mds->mdcache->migrator->export_dir_nicely(dir, target);
860 have += pop;
861 import_from_map.erase(plast);
862 import_pop_map.erase(pop);
863 } else {
864 dout(5) << "can't reexport " << *dir << ", too big " << pop << dendl;
865 }
866 if (amount-have < MIN_OFFLOAD) break;
867 }
868 }
869 if (amount-have < MIN_OFFLOAD) {
870 continue;
871 }
872
873 // any other imports
874 if (false)
875 for (map<double,CDir*>::iterator import = import_pop_map.begin();
876 import != import_pop_map.end();
877 import++) {
878 CDir *imp = (*import).second;
879 if (imp->inode->is_base() ||
880 imp->inode->is_stray())
881 continue;
882
883 double pop = (*import).first;
884 if (pop < amount-have || pop < MIN_REEXPORT) {
885 dout(0) << "reexporting " << *imp
886 << " pop " << pop
887 << " back to mds." << imp->inode->authority()
888 << dendl;
889 have += pop;
890 mds->mdcache->migrator->export_dir_nicely(imp, imp->inode->authority().first);
891 }
892 if (amount-have < MIN_OFFLOAD) break;
893 }
894 if (amount-have < MIN_OFFLOAD) {
895 //fudge = amount-have;
896 continue;
897 }
898
899 // okay, search for fragments of my workload
900 set<CDir*> candidates;
901 mds->mdcache->get_fullauth_subtrees(candidates);
902
903 list<CDir*> exports;
904
905 for (set<CDir*>::iterator pot = candidates.begin();
906 pot != candidates.end();
907 ++pot) {
908 if ((*pot)->get_inode()->is_stray()) continue;
909 find_exports(*pot, amount, exports, have, already_exporting);
910 if (have > amount-MIN_OFFLOAD)
911 break;
912 }
913 //fudge = amount - have;
914
915 for (list<CDir*>::iterator it = exports.begin(); it != exports.end(); ++it) {
916 dout(0) << " - exporting "
917 << (*it)->pop_auth_subtree
918 << " "
919 << (*it)->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate)
920 << " to mds." << target
921 << " " << **it
922 << dendl;
923 mds->mdcache->migrator->export_dir_nicely(*it, target);
924 }
925 }
926
927 dout(5) << "rebalance done" << dendl;
928 mds->mdcache->show_subtrees();
929 }
930
931
932 /* Check that all targets are in the MDSMap export_targets for my rank. */
933 bool MDBalancer::check_targets(const balance_state_t& state)
934 {
935 for (const auto &it : state.targets) {
936 if (!mds->is_export_target(it.first)) {
937 return false;
938 }
939 }
940 return true;
941 }
942
943 void MDBalancer::find_exports(CDir *dir,
944 double amount,
945 list<CDir*>& exports,
946 double& have,
947 set<CDir*>& already_exporting)
948 {
949 double need = amount - have;
950 if (need < amount * g_conf->mds_bal_min_start)
951 return; // good enough!
952 double needmax = need * g_conf->mds_bal_need_max;
953 double needmin = need * g_conf->mds_bal_need_min;
954 double midchunk = need * g_conf->mds_bal_midchunk;
955 double minchunk = need * g_conf->mds_bal_minchunk;
956
957 list<CDir*> bigger_rep, bigger_unrep;
958 multimap<double, CDir*> smaller;
959
960 double dir_pop = dir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
961 dout(7) << " find_exports in " << dir_pop << " " << *dir << " need " << need << " (" << needmin << " - " << needmax << ")" << dendl;
962
963 double subdir_sum = 0;
964 for (CDir::map_t::iterator it = dir->begin();
965 it != dir->end();
966 ++it) {
967 CInode *in = it->second->get_linkage()->get_inode();
968 if (!in) continue;
969 if (!in->is_dir()) continue;
970
971 list<CDir*> dfls;
972 in->get_dirfrags(dfls);
973 for (list<CDir*>::iterator p = dfls.begin();
974 p != dfls.end();
975 ++p) {
976 CDir *subdir = *p;
977 if (!subdir->is_auth()) continue;
978 if (already_exporting.count(subdir)) continue;
979
980 if (subdir->is_frozen()) continue; // can't export this right now!
981
982 // how popular?
983 double pop = subdir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
984 subdir_sum += pop;
985 dout(15) << " subdir pop " << pop << " " << *subdir << dendl;
986
987 if (pop < minchunk) continue;
988
989 // lucky find?
990 if (pop > needmin && pop < needmax) {
991 exports.push_back(subdir);
992 already_exporting.insert(subdir);
993 have += pop;
994 return;
995 }
996
997 if (pop > need) {
998 if (subdir->is_rep())
999 bigger_rep.push_back(subdir);
1000 else
1001 bigger_unrep.push_back(subdir);
1002 } else
1003 smaller.insert(pair<double,CDir*>(pop, subdir));
1004 }
1005 }
1006 dout(15) << " sum " << subdir_sum << " / " << dir_pop << dendl;
1007
1008 // grab some sufficiently big small items
1009 multimap<double,CDir*>::reverse_iterator it;
1010 for (it = smaller.rbegin();
1011 it != smaller.rend();
1012 ++it) {
1013
1014 if ((*it).first < midchunk)
1015 break; // try later
1016
1017 dout(7) << " taking smaller " << *(*it).second << dendl;
1018
1019 exports.push_back((*it).second);
1020 already_exporting.insert((*it).second);
1021 have += (*it).first;
1022 if (have > needmin)
1023 return;
1024 }
1025
1026 // apprently not enough; drill deeper into the hierarchy (if non-replicated)
1027 for (list<CDir*>::iterator it = bigger_unrep.begin();
1028 it != bigger_unrep.end();
1029 ++it) {
1030 dout(15) << " descending into " << **it << dendl;
1031 find_exports(*it, amount, exports, have, already_exporting);
1032 if (have > needmin)
1033 return;
1034 }
1035
1036 // ok fine, use smaller bits
1037 for (;
1038 it != smaller.rend();
1039 ++it) {
1040 dout(7) << " taking (much) smaller " << it->first << " " << *(*it).second << dendl;
1041
1042 exports.push_back((*it).second);
1043 already_exporting.insert((*it).second);
1044 have += (*it).first;
1045 if (have > needmin)
1046 return;
1047 }
1048
1049 // ok fine, drill into replicated dirs
1050 for (list<CDir*>::iterator it = bigger_rep.begin();
1051 it != bigger_rep.end();
1052 ++it) {
1053 dout(7) << " descending into replicated " << **it << dendl;
1054 find_exports(*it, amount, exports, have, already_exporting);
1055 if (have > needmin)
1056 return;
1057 }
1058
1059 }
1060
1061 void MDBalancer::hit_inode(utime_t now, CInode *in, int type, int who)
1062 {
1063 // hit inode
1064 in->pop.get(type).hit(now, mds->mdcache->decayrate);
1065
1066 if (in->get_parent_dn())
1067 hit_dir(now, in->get_parent_dn()->get_dir(), type, who);
1068 }
1069
1070 void MDBalancer::maybe_fragment(CDir *dir, bool hot)
1071 {
1072 // split/merge
1073 if (g_conf->mds_bal_frag && g_conf->mds_bal_fragment_interval > 0 &&
1074 !dir->inode->is_base() && // not root/base (for now at least)
1075 dir->is_auth()) {
1076
1077 // split
1078 if (g_conf->mds_bal_split_size > 0 &&
1079 mds->mdsmap->allows_dirfrags() &&
1080 (dir->should_split() || hot))
1081 {
1082 if (split_pending.count(dir->dirfrag()) == 0) {
1083 queue_split(dir, false);
1084 } else {
1085 if (dir->should_split_fast()) {
1086 queue_split(dir, true);
1087 } else {
1088 dout(10) << __func__ << ": fragment already enqueued to split: "
1089 << *dir << dendl;
1090 }
1091 }
1092 }
1093
1094 // merge?
1095 if (dir->get_frag() != frag_t() && dir->should_merge() &&
1096 merge_pending.count(dir->dirfrag()) == 0) {
1097 queue_merge(dir);
1098 }
1099 }
1100 }
1101
1102 void MDBalancer::hit_dir(utime_t now, CDir *dir, int type, int who, double amount)
1103 {
1104 // hit me
1105 double v = dir->pop_me.get(type).hit(now, amount);
1106
1107 const bool hot = (v > g_conf->mds_bal_split_rd && type == META_POP_IRD) ||
1108 (v > g_conf->mds_bal_split_wr && type == META_POP_IWR);
1109
1110 dout(20) << "hit_dir " << type << " pop is " << v << ", frag " << dir->get_frag()
1111 << " size " << dir->get_frag_size() << dendl;
1112
1113 maybe_fragment(dir, hot);
1114
1115 // replicate?
1116 if (type == META_POP_IRD && who >= 0) {
1117 dir->pop_spread.hit(now, mds->mdcache->decayrate, who);
1118 }
1119
1120 double rd_adj = 0.0;
1121 if (type == META_POP_IRD &&
1122 dir->last_popularity_sample < last_sample) {
1123 double dir_pop = dir->pop_auth_subtree.get(type).get(now, mds->mdcache->decayrate); // hmm??
1124 dir->last_popularity_sample = last_sample;
1125 double pop_sp = dir->pop_spread.get(now, mds->mdcache->decayrate);
1126 dir_pop += pop_sp * 10;
1127
1128 //if (dir->ino() == inodeno_t(0x10000000002))
1129 if (pop_sp > 0) {
1130 dout(20) << "hit_dir " << type << " pop " << dir_pop << " spread " << pop_sp
1131 << " " << dir->pop_spread.last[0]
1132 << " " << dir->pop_spread.last[1]
1133 << " " << dir->pop_spread.last[2]
1134 << " " << dir->pop_spread.last[3]
1135 << " in " << *dir << dendl;
1136 }
1137
1138 if (dir->is_auth() && !dir->is_ambiguous_auth()) {
1139 if (!dir->is_rep() &&
1140 dir_pop >= g_conf->mds_bal_replicate_threshold) {
1141 // replicate
1142 double rdp = dir->pop_me.get(META_POP_IRD).get(now, mds->mdcache->decayrate);
1143 rd_adj = rdp / mds->get_mds_map()->get_num_in_mds() - rdp;
1144 rd_adj /= 2.0; // temper somewhat
1145
1146 dout(0) << "replicating dir " << *dir << " pop " << dir_pop << " .. rdp " << rdp << " adj " << rd_adj << dendl;
1147
1148 dir->dir_rep = CDir::REP_ALL;
1149 mds->mdcache->send_dir_updates(dir, true);
1150
1151 // fixme this should adjust the whole pop hierarchy
1152 dir->pop_me.get(META_POP_IRD).adjust(rd_adj);
1153 dir->pop_auth_subtree.get(META_POP_IRD).adjust(rd_adj);
1154 }
1155
1156 if (dir->ino() != 1 &&
1157 dir->is_rep() &&
1158 dir_pop < g_conf->mds_bal_unreplicate_threshold) {
1159 // unreplicate
1160 dout(0) << "unreplicating dir " << *dir << " pop " << dir_pop << dendl;
1161
1162 dir->dir_rep = CDir::REP_NONE;
1163 mds->mdcache->send_dir_updates(dir);
1164 }
1165 }
1166 }
1167
1168 // adjust ancestors
1169 bool hit_subtree = dir->is_auth(); // current auth subtree (if any)
1170 bool hit_subtree_nested = dir->is_auth(); // all nested auth subtrees
1171
1172 while (true) {
1173 dir->pop_nested.get(type).hit(now, amount);
1174 if (rd_adj != 0.0)
1175 dir->pop_nested.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1176
1177 if (hit_subtree) {
1178 dir->pop_auth_subtree.get(type).hit(now, amount);
1179 if (rd_adj != 0.0)
1180 dir->pop_auth_subtree.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1181 }
1182
1183 if (hit_subtree_nested) {
1184 dir->pop_auth_subtree_nested.get(type).hit(now, mds->mdcache->decayrate, amount);
1185 if (rd_adj != 0.0)
1186 dir->pop_auth_subtree_nested.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1187 }
1188
1189 if (dir->is_subtree_root())
1190 hit_subtree = false; // end of auth domain, stop hitting auth counters.
1191
1192 if (dir->inode->get_parent_dn() == 0) break;
1193 dir = dir->inode->get_parent_dn()->get_dir();
1194 }
1195 }
1196
1197
1198 /*
1199 * subtract off an exported chunk.
1200 * this excludes *dir itself (encode_export_dir should have take care of that)
1201 * we _just_ do the parents' nested counters.
1202 *
1203 * NOTE: call me _after_ forcing *dir into a subtree root,
1204 * but _before_ doing the encode_export_dirs.
1205 */
1206 void MDBalancer::subtract_export(CDir *dir, utime_t now)
1207 {
1208 dirfrag_load_vec_t subload = dir->pop_auth_subtree;
1209
1210 while (true) {
1211 dir = dir->inode->get_parent_dir();
1212 if (!dir) break;
1213
1214 dir->pop_nested.sub(now, mds->mdcache->decayrate, subload);
1215 dir->pop_auth_subtree_nested.sub(now, mds->mdcache->decayrate, subload);
1216 }
1217 }
1218
1219
1220 void MDBalancer::add_import(CDir *dir, utime_t now)
1221 {
1222 dirfrag_load_vec_t subload = dir->pop_auth_subtree;
1223
1224 while (true) {
1225 dir = dir->inode->get_parent_dir();
1226 if (!dir) break;
1227
1228 dir->pop_nested.add(now, mds->mdcache->decayrate, subload);
1229 dir->pop_auth_subtree_nested.add(now, mds->mdcache->decayrate, subload);
1230 }
1231 }
1232