]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Migrator.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / mds / Migrator.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSRank.h"
16 #include "MDCache.h"
17 #include "CInode.h"
18 #include "CDir.h"
19 #include "CDentry.h"
20 #include "Migrator.h"
21 #include "Locker.h"
22 #include "Server.h"
23
24 #include "MDBalancer.h"
25 #include "MDLog.h"
26 #include "MDSMap.h"
27 #include "Mutation.h"
28
29 #include "include/filepath.h"
30 #include "common/likely.h"
31
32 #include "events/EExport.h"
33 #include "events/EImportStart.h"
34 #include "events/EImportFinish.h"
35 #include "events/ESessions.h"
36
37 #include "msg/Messenger.h"
38
39 #include "messages/MClientCaps.h"
40
41 /*
42 * this is what the dir->dir_auth values look like
43 *
44 * dir_auth authbits
45 * export
46 * me me - before
47 * me, me me - still me, but preparing for export
48 * me, them me - send MExportDir (peer is preparing)
49 * them, me me - journaled EExport
50 * them them - done
51 *
52 * import:
53 * them them - before
54 * me, them me - journaled EImportStart
55 * me me - done
56 *
57 * which implies:
58 * - auth bit is set if i am listed as first _or_ second dir_auth.
59 */
60
61 #include "common/config.h"
62
63
64 #define dout_context g_ceph_context
65 #define dout_subsys ceph_subsys_mds
66 #undef dout_prefix
67 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".mig " << __func__ << " "
68
69
70 class MigratorContext : public MDSContext {
71 protected:
72 Migrator *mig;
73 MDSRank *get_mds() override {
74 return mig->mds;
75 }
76 public:
77 explicit MigratorContext(Migrator *mig_) : mig(mig_) {
78 ceph_assert(mig != NULL);
79 }
80 };
81
82 class MigratorLogContext : public MDSLogContextBase {
83 protected:
84 Migrator *mig;
85 MDSRank *get_mds() override {
86 return mig->mds;
87 }
88 public:
89 explicit MigratorLogContext(Migrator *mig_) : mig(mig_) {
90 ceph_assert(mig != NULL);
91 }
92 };
93
94 void Migrator::dispatch(const cref_t<Message> &m)
95 {
96 switch (m->get_type()) {
97 // import
98 case MSG_MDS_EXPORTDIRDISCOVER:
99 handle_export_discover(ref_cast<MExportDirDiscover>(m));
100 break;
101 case MSG_MDS_EXPORTDIRPREP:
102 handle_export_prep(ref_cast<MExportDirPrep>(m));
103 break;
104 case MSG_MDS_EXPORTDIR:
105 if (unlikely(inject_session_race)) {
106 dout(0) << "waiting for inject_session_race" << dendl;
107 mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m));
108 } else {
109 handle_export_dir(ref_cast<MExportDir>(m));
110 }
111 break;
112 case MSG_MDS_EXPORTDIRFINISH:
113 handle_export_finish(ref_cast<MExportDirFinish>(m));
114 break;
115 case MSG_MDS_EXPORTDIRCANCEL:
116 handle_export_cancel(ref_cast<MExportDirCancel>(m));
117 break;
118
119 // export
120 case MSG_MDS_EXPORTDIRDISCOVERACK:
121 handle_export_discover_ack(ref_cast<MExportDirDiscoverAck>(m));
122 break;
123 case MSG_MDS_EXPORTDIRPREPACK:
124 handle_export_prep_ack(ref_cast<MExportDirPrepAck>(m));
125 break;
126 case MSG_MDS_EXPORTDIRACK:
127 handle_export_ack(ref_cast<MExportDirAck>(m));
128 break;
129 case MSG_MDS_EXPORTDIRNOTIFYACK:
130 handle_export_notify_ack(ref_cast<MExportDirNotifyAck>(m));
131 break;
132
133 // export 3rd party (dir_auth adjustments)
134 case MSG_MDS_EXPORTDIRNOTIFY:
135 handle_export_notify(ref_cast<MExportDirNotify>(m));
136 break;
137
138 // caps
139 case MSG_MDS_EXPORTCAPS:
140 handle_export_caps(ref_cast<MExportCaps>(m));
141 break;
142 case MSG_MDS_EXPORTCAPSACK:
143 handle_export_caps_ack(ref_cast<MExportCapsAck>(m));
144 break;
145 case MSG_MDS_GATHERCAPS:
146 handle_gather_caps(ref_cast<MGatherCaps>(m));
147 break;
148
149 default:
150 derr << "migrator unknown message " << m->get_type() << dendl;
151 ceph_abort_msg("migrator unknown message");
152 }
153 }
154
155
156 class C_MDC_EmptyImport : public MigratorContext {
157 CDir *dir;
158 public:
159 C_MDC_EmptyImport(Migrator *m, CDir *d) :
160 MigratorContext(m), dir(d) {
161 dir->get(CDir::PIN_PTRWAITER);
162 }
163 void finish(int r) override {
164 mig->export_empty_import(dir);
165 dir->put(CDir::PIN_PTRWAITER);
166 }
167 };
168
169
170 void Migrator::export_empty_import(CDir *dir)
171 {
172 dout(7) << *dir << dendl;
173 ceph_assert(dir->is_subtree_root());
174
175 if (dir->inode->is_auth()) {
176 dout(7) << " inode is auth" << dendl;
177 return;
178 }
179 if (!dir->is_auth()) {
180 dout(7) << " not auth" << dendl;
181 return;
182 }
183 if (dir->is_freezing() || dir->is_frozen()) {
184 dout(7) << " freezing or frozen" << dendl;
185 return;
186 }
187 if (dir->get_num_head_items() > 0) {
188 dout(7) << " not actually empty" << dendl;
189 return;
190 }
191 if (dir->inode->is_root()) {
192 dout(7) << " root" << dendl;
193 return;
194 }
195
196 mds_rank_t dest = dir->inode->authority().first;
197 //if (mds->is_shutting_down()) dest = 0; // this is more efficient.
198
199 dout(7) << " really empty, exporting to " << dest << dendl;
200 assert (dest != mds->get_nodeid());
201
202 dout(7) << "exporting to mds." << dest
203 << " empty import " << *dir << dendl;
204 export_dir( dir, dest );
205 }
206
207 void Migrator::find_stale_export_freeze()
208 {
209 utime_t now = ceph_clock_now();
210 utime_t cutoff = now;
211 cutoff -= g_conf()->mds_freeze_tree_timeout;
212
213
214 /*
215 * We could have situations like:
216 *
217 * - mds.0 authpins an item in subtree A
218 * - mds.0 sends request to mds.1 to authpin an item in subtree B
219 * - mds.0 freezes subtree A
220 * - mds.1 authpins an item in subtree B
221 * - mds.1 sends request to mds.0 to authpin an item in subtree A
222 * - mds.1 freezes subtree B
223 * - mds.1 receives the remote authpin request from mds.0
224 * (wait because subtree B is freezing)
225 * - mds.0 receives the remote authpin request from mds.1
226 * (wait because subtree A is freezing)
227 *
228 *
229 * - client request authpins items in subtree B
230 * - freeze subtree B
231 * - import subtree A which is parent of subtree B
232 * (authpins parent inode of subtree B, see CDir::set_dir_auth())
233 * - freeze subtree A
234 * - client request tries authpinning items in subtree A
235 * (wait because subtree A is freezing)
236 */
237 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
238 p != export_state.end(); ) {
239 CDir* dir = p->first;
240 export_state_t& stat = p->second;
241 ++p;
242 if (stat.state != EXPORT_DISCOVERING && stat.state != EXPORT_FREEZING)
243 continue;
244 ceph_assert(dir->freeze_tree_state);
245 if (stat.last_cum_auth_pins != dir->freeze_tree_state->auth_pins) {
246 stat.last_cum_auth_pins = dir->freeze_tree_state->auth_pins;
247 stat.last_cum_auth_pins_change = now;
248 continue;
249 }
250 if (stat.last_cum_auth_pins_change >= cutoff)
251 continue;
252 if (stat.num_remote_waiters > 0 ||
253 (!dir->inode->is_root() && dir->get_parent_dir()->is_freezing())) {
254 export_try_cancel(dir);
255 }
256 }
257 }
258
259 void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
260 {
261 dout(10) << *dir << dendl;
262
263 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
264 ceph_assert(it != export_state.end());
265
266 int state = it->second.state;
267 switch (state) {
268 case EXPORT_LOCKING:
269 dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
270 num_locking_exports--;
271 it->second.state = EXPORT_CANCELLED;
272 dir->auth_unpin(this);
273 break;
274 case EXPORT_DISCOVERING:
275 dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
276 it->second.state = EXPORT_CANCELLED;
277 dir->unfreeze_tree(); // cancel the freeze
278 dir->auth_unpin(this);
279 if (notify_peer &&
280 (!mds->is_cluster_degraded() ||
281 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
282 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
283 it->second.tid),
284 it->second.peer);
285 break;
286
287 case EXPORT_FREEZING:
288 dout(10) << "export state=freezing : canceling freeze" << dendl;
289 it->second.state = EXPORT_CANCELLED;
290 dir->unfreeze_tree(); // cancel the freeze
291 if (dir->is_subtree_root())
292 cache->try_subtree_merge(dir);
293 if (notify_peer &&
294 (!mds->is_cluster_degraded() ||
295 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
296 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
297 it->second.tid),
298 it->second.peer);
299 break;
300
301 // NOTE: state order reversal, warning comes after prepping
302 case EXPORT_WARNING:
303 dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
304 it->second.state = EXPORT_CANCELLING;
305 // fall-thru
306
307 case EXPORT_PREPPING:
308 if (state != EXPORT_WARNING) {
309 dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
310 it->second.state = EXPORT_CANCELLED;
311 }
312
313 {
314 // unpin bounds
315 set<CDir*> bounds;
316 cache->get_subtree_bounds(dir, bounds);
317 for (set<CDir*>::iterator q = bounds.begin();
318 q != bounds.end();
319 ++q) {
320 CDir *bd = *q;
321 bd->put(CDir::PIN_EXPORTBOUND);
322 bd->state_clear(CDir::STATE_EXPORTBOUND);
323 }
324 if (state == EXPORT_WARNING) {
325 // notify bystanders
326 export_notify_abort(dir, it->second, bounds);
327 // process delayed expires
328 cache->process_delayed_expire(dir);
329 }
330 }
331 dir->unfreeze_tree();
332 cache->try_subtree_merge(dir);
333 if (notify_peer &&
334 (!mds->is_cluster_degraded() ||
335 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
336 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
337 it->second.tid),
338 it->second.peer);
339 break;
340
341 case EXPORT_EXPORTING:
342 dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
343 it->second.state = EXPORT_CANCELLING;
344 export_reverse(dir, it->second);
345 break;
346
347 case EXPORT_LOGGINGFINISH:
348 case EXPORT_NOTIFYING:
349 dout(10) << "export state=loggingfinish|notifying : ignoring dest failure, we were successful." << dendl;
350 // leave export_state, don't clean up now.
351 break;
352 case EXPORT_CANCELLING:
353 break;
354
355 default:
356 ceph_abort();
357 }
358
359 // finish clean-up?
360 if (it->second.state == EXPORT_CANCELLING ||
361 it->second.state == EXPORT_CANCELLED) {
362 MutationRef mut;
363 mut.swap(it->second.mut);
364
365 if (it->second.state == EXPORT_CANCELLED) {
366 export_cancel_finish(it);
367 }
368
369 // drop locks
370 if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
371 MDRequestRef mdr = static_cast<MDRequestImpl*>(mut.get());
372 ceph_assert(mdr);
373 mds->mdcache->request_kill(mdr);
374 } else if (mut) {
375 mds->locker->drop_locks(mut.get());
376 mut->cleanup();
377 }
378
379 cache->show_subtrees();
380
381 maybe_do_queued_export();
382 }
383 }
384
385 void Migrator::export_cancel_finish(export_state_iterator& it)
386 {
387 CDir *dir = it->first;
388 bool unpin = (it->second.state == EXPORT_CANCELLING);
389 auto parent = std::move(it->second.parent);
390
391 total_exporting_size -= it->second.approx_size;
392 export_state.erase(it);
393
394 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
395 dir->clear_exporting();
396
397 if (unpin) {
398 // pinned by Migrator::export_notify_abort()
399 dir->auth_unpin(this);
400 }
401 // send pending import_maps? (these need to go out when all exports have finished.)
402 cache->maybe_send_pending_resolves();
403
404 if (parent)
405 child_export_finish(parent, false);
406 }
407
408 // ==========================================================
409 // mds failure handling
410
411 void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
412 {
413 dout(5) << who << dendl;
414
415 // check my exports
416
417 // first add an extra auth_pin on any freezes, so that canceling a
418 // nested freeze doesn't complete one further up the hierarchy and
419 // confuse the shit out of us. we'll remove it after canceling the
420 // freeze. this way no freeze completions run before we want them
421 // to.
422 std::vector<CDir*> pinned_dirs;
423 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
424 p != export_state.end();
425 ++p) {
426 if (p->second.state == EXPORT_FREEZING) {
427 CDir *dir = p->first;
428 dout(10) << "adding temp auth_pin on freezing " << *dir << dendl;
429 dir->auth_pin(this);
430 pinned_dirs.push_back(dir);
431 }
432 }
433
434 map<CDir*,export_state_t>::iterator p = export_state.begin();
435 while (p != export_state.end()) {
436 map<CDir*,export_state_t>::iterator next = p;
437 ++next;
438 CDir *dir = p->first;
439
440 // abort exports:
441 // - that are going to the failed node
442 // - that aren't frozen yet (to avoid auth_pin deadlock)
443 // - they havne't prepped yet (they may need to discover bounds to do that)
444 if ((p->second.peer == who &&
445 p->second.state != EXPORT_CANCELLING) ||
446 p->second.state == EXPORT_LOCKING ||
447 p->second.state == EXPORT_DISCOVERING ||
448 p->second.state == EXPORT_FREEZING ||
449 p->second.state == EXPORT_PREPPING) {
450 // the guy i'm exporting to failed, or we're just freezing.
451 dout(10) << "cleaning up export state (" << p->second.state << ")"
452 << get_export_statename(p->second.state) << " of " << *dir << dendl;
453 export_try_cancel(dir);
454 } else if (p->second.peer != who) {
455 // bystander failed.
456 if (p->second.warning_ack_waiting.erase(who)) {
457 if (p->second.state == EXPORT_WARNING) {
458 p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
459 // exporter waiting for warning acks, let's fake theirs.
460 dout(10) << "faking export_warning_ack from mds." << who
461 << " on " << *dir << " to mds." << p->second.peer
462 << dendl;
463 if (p->second.warning_ack_waiting.empty())
464 export_go(dir);
465 }
466 }
467 if (p->second.notify_ack_waiting.erase(who)) {
468 // exporter is waiting for notify acks, fake it
469 dout(10) << "faking export_notify_ack from mds." << who
470 << " on " << *dir << " to mds." << p->second.peer
471 << dendl;
472 if (p->second.state == EXPORT_NOTIFYING) {
473 if (p->second.notify_ack_waiting.empty())
474 export_finish(dir);
475 } else if (p->second.state == EXPORT_CANCELLING) {
476 if (p->second.notify_ack_waiting.empty()) {
477 export_cancel_finish(p);
478 }
479 }
480 }
481 }
482
483 // next!
484 p = next;
485 }
486
487
488 // check my imports
489 map<dirfrag_t,import_state_t>::iterator q = import_state.begin();
490 while (q != import_state.end()) {
491 map<dirfrag_t,import_state_t>::iterator next = q;
492 ++next;
493 dirfrag_t df = q->first;
494 CInode *diri = mds->mdcache->get_inode(df.ino);
495 CDir *dir = mds->mdcache->get_dirfrag(df);
496
497 if (q->second.peer == who) {
498 if (dir)
499 dout(10) << "cleaning up import state (" << q->second.state << ")"
500 << get_import_statename(q->second.state) << " of " << *dir << dendl;
501 else
502 dout(10) << "cleaning up import state (" << q->second.state << ")"
503 << get_import_statename(q->second.state) << " of " << df << dendl;
504
505 switch (q->second.state) {
506 case IMPORT_DISCOVERING:
507 dout(10) << "import state=discovering : clearing state" << dendl;
508 import_reverse_discovering(df);
509 break;
510
511 case IMPORT_DISCOVERED:
512 ceph_assert(diri);
513 dout(10) << "import state=discovered : unpinning inode " << *diri << dendl;
514 import_reverse_discovered(df, diri);
515 break;
516
517 case IMPORT_PREPPING:
518 ceph_assert(dir);
519 dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
520 import_reverse_prepping(dir, q->second);
521 break;
522
523 case IMPORT_PREPPED:
524 ceph_assert(dir);
525 dout(10) << "import state=prepped : unpinning base+bounds, unfreezing " << *dir << dendl;
526 {
527 set<CDir*> bounds;
528 cache->get_subtree_bounds(dir, bounds);
529 import_remove_pins(dir, bounds);
530
531 // adjust auth back to the exporter
532 cache->adjust_subtree_auth(dir, q->second.peer);
533
534 // notify bystanders ; wait in aborting state
535 q->second.state = IMPORT_ABORTING;
536 import_notify_abort(dir, bounds);
537 ceph_assert(g_conf()->mds_kill_import_at != 10);
538 }
539 break;
540
541 case IMPORT_LOGGINGSTART:
542 ceph_assert(dir);
543 dout(10) << "import state=loggingstart : reversing import on " << *dir << dendl;
544 import_reverse(dir);
545 break;
546
547 case IMPORT_ACKING:
548 ceph_assert(dir);
549 // hrm. make this an ambiguous import, and wait for exporter recovery to disambiguate
550 dout(10) << "import state=acking : noting ambiguous import " << *dir << dendl;
551 {
552 set<CDir*> bounds;
553 cache->get_subtree_bounds(dir, bounds);
554 cache->add_ambiguous_import(dir, bounds);
555 }
556 break;
557
558 case IMPORT_FINISHING:
559 ceph_assert(dir);
560 dout(10) << "import state=finishing : finishing import on " << *dir << dendl;
561 import_finish(dir, true);
562 break;
563
564 case IMPORT_ABORTING:
565 ceph_assert(dir);
566 dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl;
567 break;
568 }
569 } else {
570 auto bystanders_entry = q->second.bystanders.find(who);
571 if (bystanders_entry != q->second.bystanders.end()) {
572 q->second.bystanders.erase(bystanders_entry);
573 if (q->second.state == IMPORT_ABORTING) {
574 ceph_assert(dir);
575 dout(10) << "faking export_notify_ack from mds." << who
576 << " on aborting import " << *dir << " from mds." << q->second.peer
577 << dendl;
578 if (q->second.bystanders.empty())
579 import_reverse_unfreeze(dir);
580 }
581 }
582 }
583
584 // next!
585 q = next;
586 }
587
588 for (const auto& dir : pinned_dirs) {
589 dout(10) << "removing temp auth_pin on " << *dir << dendl;
590 dir->auth_unpin(this);
591 }
592 }
593
594
595
596 void Migrator::show_importing()
597 {
598 dout(10) << dendl;
599 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
600 p != import_state.end();
601 ++p) {
602 CDir *dir = mds->mdcache->get_dirfrag(p->first);
603 if (dir) {
604 dout(10) << " importing from " << p->second.peer
605 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
606 << " " << p->first << " " << *dir << dendl;
607 } else {
608 dout(10) << " importing from " << p->second.peer
609 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
610 << " " << p->first << dendl;
611 }
612 }
613 }
614
615 void Migrator::show_exporting()
616 {
617 dout(10) << dendl;
618 for (const auto& [dir, state] : export_state) {
619 dout(10) << " exporting to " << state.peer
620 << ": (" << state.state << ") " << get_export_statename(state.state)
621 << " " << dir->dirfrag() << " " << *dir << dendl;
622 }
623 }
624
625
626
627 void Migrator::audit()
628 {
629 if (!g_conf()->subsys.should_gather<ceph_subsys_mds, 5>())
630 return; // hrm.
631
632 // import_state
633 show_importing();
634 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
635 p != import_state.end();
636 ++p) {
637 if (p->second.state == IMPORT_DISCOVERING)
638 continue;
639 if (p->second.state == IMPORT_DISCOVERED) {
640 CInode *in = cache->get_inode(p->first.ino);
641 ceph_assert(in);
642 continue;
643 }
644 CDir *dir = cache->get_dirfrag(p->first);
645 ceph_assert(dir);
646 if (p->second.state == IMPORT_PREPPING)
647 continue;
648 if (p->second.state == IMPORT_ABORTING) {
649 ceph_assert(!dir->is_ambiguous_dir_auth());
650 ceph_assert(dir->get_dir_auth().first != mds->get_nodeid());
651 continue;
652 }
653 ceph_assert(dir->is_ambiguous_dir_auth());
654 ceph_assert(dir->authority().first == mds->get_nodeid() ||
655 dir->authority().second == mds->get_nodeid());
656 }
657
658 // export_state
659 show_exporting();
660 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
661 p != export_state.end();
662 ++p) {
663 CDir *dir = p->first;
664 if (p->second.state == EXPORT_LOCKING ||
665 p->second.state == EXPORT_DISCOVERING ||
666 p->second.state == EXPORT_FREEZING ||
667 p->second.state == EXPORT_CANCELLING)
668 continue;
669 ceph_assert(dir->is_ambiguous_dir_auth());
670 ceph_assert(dir->authority().first == mds->get_nodeid() ||
671 dir->authority().second == mds->get_nodeid());
672 }
673
674 // ambiguous+me subtrees should be importing|exporting
675
676 // write me
677 }
678
679
680
681
682
683 // ==========================================================
684 // EXPORT
685
686 void Migrator::export_dir_nicely(CDir *dir, mds_rank_t dest)
687 {
688 // enqueue
689 dout(7) << *dir << " to " << dest << dendl;
690 export_queue.push_back(pair<dirfrag_t,mds_rank_t>(dir->dirfrag(), dest));
691
692 maybe_do_queued_export();
693 }
694
695 void Migrator::maybe_do_queued_export()
696 {
697 static bool running;
698 if (running)
699 return;
700 running = true;
701
702 uint64_t max_total_size = max_export_size * 2;
703
704 while (!export_queue.empty() &&
705 max_total_size > total_exporting_size &&
706 max_total_size - total_exporting_size >=
707 max_export_size * (num_locking_exports + 1)) {
708
709 dirfrag_t df = export_queue.front().first;
710 mds_rank_t dest = export_queue.front().second;
711 export_queue.pop_front();
712
713 CDir *dir = mds->mdcache->get_dirfrag(df);
714 if (!dir) continue;
715 if (!dir->is_auth()) continue;
716
717 dout(7) << "nicely exporting to mds." << dest << " " << *dir << dendl;
718
719 export_dir(dir, dest);
720 }
721
722 running = false;
723 }
724
725
726
727
728 class C_MDC_ExportFreeze : public MigratorContext {
729 CDir *dir; // dir i'm exporting
730 uint64_t tid;
731 public:
732 C_MDC_ExportFreeze(Migrator *m, CDir *e, uint64_t t) :
733 MigratorContext(m), dir(e), tid(t) {
734 dir->get(CDir::PIN_PTRWAITER);
735 }
736 void finish(int r) override {
737 if (r >= 0)
738 mig->export_frozen(dir, tid);
739 dir->put(CDir::PIN_PTRWAITER);
740 }
741 };
742
743
744 bool Migrator::export_try_grab_locks(CDir *dir, MutationRef& mut)
745 {
746 CInode *diri = dir->get_inode();
747
748 if (!diri->filelock.can_wrlock(diri->get_loner()) ||
749 !diri->nestlock.can_wrlock(diri->get_loner()))
750 return false;
751
752 MutationImpl::LockOpVec lov;
753
754 set<CDir*> wouldbe_bounds;
755 set<CInode*> bound_inodes;
756 cache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
757 for (auto& bound : wouldbe_bounds)
758 bound_inodes.insert(bound->get_inode());
759 for (auto& in : bound_inodes)
760 lov.add_rdlock(&in->dirfragtreelock);
761
762 lov.add_rdlock(&diri->dirfragtreelock);
763
764 CInode* in = diri;
765 while (true) {
766 lov.add_rdlock(&in->snaplock);
767 CDentry* pdn = in->get_projected_parent_dn();
768 if (!pdn)
769 break;
770 in = pdn->get_dir()->get_inode();
771 }
772
773 if (!mds->locker->rdlock_try_set(lov, mut))
774 return false;
775
776 mds->locker->wrlock_force(&diri->filelock, mut);
777 mds->locker->wrlock_force(&diri->nestlock, mut);
778
779 return true;
780 }
781
782
783 /** export_dir(dir, dest)
784 * public method to initiate an export.
785 * will fail if the directory is freezing, frozen, unpinnable, or root.
786 */
787 void Migrator::export_dir(CDir *dir, mds_rank_t dest)
788 {
789 ceph_assert(dir->is_auth());
790 ceph_assert(dest != mds->get_nodeid());
791
792 CDir* parent = dir->inode->get_projected_parent_dir();
793 if (!mds->is_stopping() && !dir->inode->is_exportable(dest)) {
794 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": dir is export pinned" << dendl;
795 return;
796 } else if (!(mds->is_active() || mds->is_stopping())) {
797 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": not active" << dendl;
798 return;
799 } else if (mds->mdcache->is_readonly()) {
800 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": read-only FS, no exports for now" << dendl;
801 return;
802 } else if (!mds->mdsmap->is_active(dest)) {
803 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": destination not active" << dendl;
804 return;
805 } else if (mds->is_cluster_degraded()) {
806 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": cluster degraded" << dendl;
807 return;
808 } else if (dir->inode->is_system()) {
809 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is a system directory" << dendl;
810 return;
811 } else if (dir->is_frozen() || dir->is_freezing()) {
812 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is frozen" << dendl;
813 return;
814 } else if (dir->state_test(CDir::STATE_EXPORTING)) {
815 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": already exporting" << dendl;
816 return;
817 } else if (parent && parent->inode->is_stray()
818 && parent->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
819 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": in stray directory" << dendl;
820 return;
821 }
822
823 if (unlikely(g_conf()->mds_thrash_exports)) {
824 // create random subtree bound (which will not be exported)
825 std::vector<CDir*> ls;
826 for (auto p = dir->begin(); p != dir->end(); ++p) {
827 auto dn = p->second;
828 CDentry::linkage_t *dnl= dn->get_linkage();
829 if (dnl->is_primary()) {
830 CInode *in = dnl->get_inode();
831 if (in->is_dir()) {
832 auto&& dirs = in->get_nested_dirfrags();
833 ls.insert(std::end(ls), std::begin(dirs), std::end(dirs));
834 }
835 }
836 }
837 if (ls.size() > 0) {
838 int n = rand() % ls.size();
839 auto p = ls.begin();
840 while (n--) ++p;
841 CDir *bd = *p;
842 if (!(bd->is_frozen() || bd->is_freezing())) {
843 ceph_assert(bd->is_auth());
844 dir->state_set(CDir::STATE_AUXSUBTREE);
845 mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
846 dout(7) << "create aux subtree " << *bd << " under " << *dir << dendl;
847 }
848 }
849 }
850
851 dout(4) << "Starting export to mds." << dest << " " << *dir << dendl;
852
853 mds->hit_export_target(dest, -1);
854
855 dir->auth_pin(this);
856 dir->mark_exporting();
857
858 MDRequestRef mdr = mds->mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
859 mdr->more()->export_dir = dir;
860 mdr->pin(dir);
861
862 ceph_assert(export_state.count(dir) == 0);
863 export_state_t& stat = export_state[dir];
864 num_locking_exports++;
865 stat.state = EXPORT_LOCKING;
866 stat.peer = dest;
867 stat.tid = mdr->reqid.tid;
868 stat.mut = mdr;
869
870 mds->mdcache->dispatch_request(mdr);
871 }
872
873 /*
874 * check if directory is too large to be export in whole. If it is,
875 * choose some subdirs, whose total size is suitable.
876 */
877 void Migrator::maybe_split_export(CDir* dir, uint64_t max_size, bool null_okay,
878 vector<pair<CDir*, size_t> >& results)
879 {
880 static const unsigned frag_size = 800;
881 static const unsigned inode_size = 1000;
882 static const unsigned cap_size = 80;
883 static const unsigned remote_size = 10;
884 static const unsigned null_size = 1;
885
886 // state for depth-first search
887 struct LevelData {
888 CDir *dir;
889 CDir::dentry_key_map::iterator iter;
890 size_t dirfrag_size = frag_size;
891 size_t subdirs_size = 0;
892 bool complete = true;
893 vector<CDir*> siblings;
894 vector<pair<CDir*, size_t> > subdirs;
895 LevelData(const LevelData&) = default;
896 LevelData(CDir *d) :
897 dir(d), iter(d->begin()) {}
898 };
899
900 vector<LevelData> stack;
901 stack.emplace_back(dir);
902
903 size_t found_size = 0;
904 size_t skipped_size = 0;
905
906 for (;;) {
907 auto& data = stack.back();
908 CDir *cur = data.dir;
909 auto& it = data.iter;
910 auto& dirfrag_size = data.dirfrag_size;
911
912 while(it != cur->end()) {
913 CDentry *dn = it->second;
914 ++it;
915
916 dirfrag_size += dn->name.size();
917 if (dn->get_linkage()->is_null()) {
918 dirfrag_size += null_size;
919 continue;
920 }
921 if (dn->get_linkage()->is_remote()) {
922 dirfrag_size += remote_size;
923 continue;
924 }
925
926 CInode *in = dn->get_linkage()->get_inode();
927 dirfrag_size += inode_size;
928 dirfrag_size += in->get_client_caps().size() * cap_size;
929
930 if (in->is_dir()) {
931 auto ls = in->get_nested_dirfrags();
932 std::reverse(ls.begin(), ls.end());
933
934 bool complete = true;
935 for (auto p = ls.begin(); p != ls.end(); ) {
936 if ((*p)->state_test(CDir::STATE_EXPORTING) ||
937 (*p)->is_freezing_dir() || (*p)->is_frozen_dir()) {
938 complete = false;
939 p = ls.erase(p);
940 } else {
941 ++p;
942 }
943 }
944 if (!complete) {
945 // skip exporting dir's ancestors. because they can't get
946 // frozen (exporting dir's parent inode is auth pinned).
947 for (auto p = stack.rbegin(); p < stack.rend(); ++p) {
948 if (!p->complete)
949 break;
950 p->complete = false;
951 }
952 }
953 if (!ls.empty()) {
954 stack.emplace_back(ls.back());
955 ls.pop_back();
956 stack.back().siblings.swap(ls);
957 break;
958 }
959 }
960 }
961 // did above loop push new dirfrag into the stack?
962 if (stack.back().dir != cur)
963 continue;
964
965 if (data.complete) {
966 auto cur_size = data.subdirs_size + dirfrag_size;
967 // we can do nothing with large dirfrag
968 if (cur_size >= max_size && found_size * 2 > max_size)
969 break;
970
971 found_size += dirfrag_size;
972
973 if (stack.size() > 1) {
974 auto& parent = stack[stack.size() - 2];
975 parent.subdirs.emplace_back(cur, cur_size);
976 parent.subdirs_size += cur_size;
977 }
978 } else {
979 // can't merge current dirfrag to its parent if there is skipped subdir
980 results.insert(results.end(), data.subdirs.begin(), data.subdirs.end());
981 skipped_size += dirfrag_size;
982 }
983
984 vector<CDir*> ls;
985 ls.swap(data.siblings);
986
987 stack.pop_back();
988 if (stack.empty())
989 break;
990
991 if (found_size >= max_size)
992 break;
993
994 // next dirfrag
995 if (!ls.empty()) {
996 stack.emplace_back(ls.back());
997 ls.pop_back();
998 stack.back().siblings.swap(ls);
999 }
1000 }
1001
1002 for (auto& p : stack)
1003 results.insert(results.end(), p.subdirs.begin(), p.subdirs.end());
1004
1005 if (results.empty() && (!skipped_size || !null_okay))
1006 results.emplace_back(dir, found_size + skipped_size);
1007 }
1008
1009 class C_M_ExportDirWait : public MigratorContext {
1010 MDRequestRef mdr;
1011 int count;
1012 public:
1013 C_M_ExportDirWait(Migrator *m, MDRequestRef mdr, int count)
1014 : MigratorContext(m), mdr(mdr), count(count) {}
1015 void finish(int r) override {
1016 mig->dispatch_export_dir(mdr, count);
1017 }
1018 };
1019
1020 void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
1021 {
1022 CDir *dir = mdr->more()->export_dir;
1023 dout(7) << *mdr << " " << *dir << dendl;
1024
1025 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1026 if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
1027 // export must have aborted.
1028 dout(7) << "export must have aborted " << *mdr << dendl;
1029 ceph_assert(mdr->killed || mdr->aborted);
1030 if (mdr->aborted) {
1031 mdr->aborted = false;
1032 mds->mdcache->request_kill(mdr);
1033 }
1034 return;
1035 }
1036 ceph_assert(it->second.state == EXPORT_LOCKING);
1037
1038 if (mdr->more()->slave_error || dir->is_frozen() || dir->is_freezing()) {
1039 dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
1040 export_try_cancel(dir);
1041 return;
1042 }
1043
1044 mds_rank_t dest = it->second.peer;
1045 if (!mds->is_export_target(dest)) {
1046 dout(7) << "dest is not yet an export target" << dendl;
1047 if (count > 3) {
1048 dout(7) << "dest has not been added as export target after three MDSMap epochs, canceling export" << dendl;
1049 export_try_cancel(dir);
1050 return;
1051 }
1052
1053 mds->locker->drop_locks(mdr.get());
1054 mdr->drop_local_auth_pins();
1055
1056 mds->wait_for_mdsmap(mds->mdsmap->get_epoch(), new C_M_ExportDirWait(this, mdr, count+1));
1057 return;
1058 }
1059
1060 if (!dir->inode->get_parent_dn()) {
1061 dout(7) << "waiting for dir to become stable before export: " << *dir << dendl;
1062 dir->add_waiter(CDir::WAIT_CREATED, new C_M_ExportDirWait(this, mdr, 1));
1063 return;
1064 }
1065
1066 // locks?
1067 if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
1068 MutationImpl::LockOpVec lov;
1069 // If auth MDS of the subtree root inode is neither the exporter MDS
1070 // nor the importer MDS and it gathers subtree root's fragstat/neststat
1071 // while the subtree is exporting. It's possible that the exporter MDS
1072 // and the importer MDS both are auth MDS of the subtree root or both
1073 // are not auth MDS of the subtree root at the time they receive the
1074 // lock messages. So the auth MDS of the subtree root inode may get no
1075 // or duplicated fragstat/neststat for the subtree root dirfrag.
1076 lov.lock_scatter_gather(&dir->get_inode()->filelock);
1077 lov.lock_scatter_gather(&dir->get_inode()->nestlock);
1078 if (dir->get_inode()->is_auth()) {
1079 dir->get_inode()->filelock.set_scatter_wanted();
1080 dir->get_inode()->nestlock.set_scatter_wanted();
1081 }
1082 lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
1083
1084 if (!mds->locker->acquire_locks(mdr, lov, nullptr, true)) {
1085 if (mdr->aborted)
1086 export_try_cancel(dir);
1087 return;
1088 }
1089
1090 lov.clear();
1091 // bound dftlocks:
1092 // NOTE: We need to take an rdlock on bounding dirfrags during
1093 // migration for a rather irritating reason: when we export the
1094 // bound inode, we need to send scatterlock state for the dirfrags
1095 // as well, so that the new auth also gets the correct info. If we
1096 // race with a refragment, this info is useless, as we can't
1097 // redivvy it up. And it's needed for the scatterlocks to work
1098 // properly: when the auth is in a sync/lock state it keeps each
1099 // dirfrag's portion in the local (auth OR replica) dirfrag.
1100 set<CDir*> wouldbe_bounds;
1101 set<CInode*> bound_inodes;
1102 cache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
1103 for (auto& bound : wouldbe_bounds)
1104 bound_inodes.insert(bound->get_inode());
1105 for (auto& in : bound_inodes)
1106 lov.add_rdlock(&in->dirfragtreelock);
1107
1108 if (!mds->locker->rdlock_try_set(lov, mdr))
1109 return;
1110
1111 if (!mds->locker->try_rdlock_snap_layout(dir->get_inode(), mdr))
1112 return;
1113
1114 mdr->locking_state |= MutationImpl::ALL_LOCKED;
1115 }
1116
1117 ceph_assert(g_conf()->mds_kill_export_at != 1);
1118
1119 auto parent = it->second.parent;
1120
1121 vector<pair<CDir*, size_t> > results;
1122 maybe_split_export(dir, max_export_size, (bool)parent, results);
1123
1124 if (results.size() == 1 && results.front().first == dir) {
1125 num_locking_exports--;
1126 it->second.state = EXPORT_DISCOVERING;
1127 // send ExportDirDiscover (ask target)
1128 filepath path;
1129 dir->inode->make_path(path);
1130 auto discover = make_message<MExportDirDiscover>(dir->dirfrag(), path,
1131 mds->get_nodeid(),
1132 it->second.tid);
1133 mds->send_message_mds(discover, dest);
1134 ceph_assert(g_conf()->mds_kill_export_at != 2);
1135
1136 it->second.last_cum_auth_pins_change = ceph_clock_now();
1137 it->second.approx_size = results.front().second;
1138 total_exporting_size += it->second.approx_size;
1139
1140 // start the freeze, but hold it up with an auth_pin.
1141 dir->freeze_tree();
1142 ceph_assert(dir->is_freezing_tree());
1143 dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir, it->second.tid));
1144 return;
1145 }
1146
1147 if (parent) {
1148 parent->pending_children += results.size();
1149 } else {
1150 parent = std::make_shared<export_base_t>(dir->dirfrag(), dest,
1151 results.size(), export_queue_gen);
1152 }
1153
1154 if (results.empty()) {
1155 dout(7) << "subtree's children all are under exporting, retry rest parts of parent export "
1156 << parent->dirfrag << dendl;
1157 parent->restart = true;
1158 } else {
1159 dout(7) << "subtree is too large, splitting it into: " << dendl;
1160 }
1161
1162 for (auto& p : results) {
1163 CDir *sub = p.first;
1164 ceph_assert(sub != dir);
1165 dout(7) << " sub " << *sub << dendl;
1166
1167 sub->auth_pin(this);
1168 sub->mark_exporting();
1169
1170 MDRequestRef _mdr = mds->mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
1171 _mdr->more()->export_dir = sub;
1172 _mdr->pin(sub);
1173
1174 ceph_assert(export_state.count(sub) == 0);
1175 auto& stat = export_state[sub];
1176 num_locking_exports++;
1177 stat.state = EXPORT_LOCKING;
1178 stat.peer = dest;
1179 stat.tid = _mdr->reqid.tid;
1180 stat.mut = _mdr;
1181 stat.parent = parent;
1182 mds->mdcache->dispatch_request(_mdr);
1183 }
1184
1185 // cancel the original one
1186 export_try_cancel(dir);
1187 }
1188
1189 void Migrator::child_export_finish(std::shared_ptr<export_base_t>& parent, bool success)
1190 {
1191 if (success)
1192 parent->restart = true;
1193 if (--parent->pending_children == 0) {
1194 if (parent->restart &&
1195 parent->export_queue_gen == export_queue_gen) {
1196 CDir *origin = mds->mdcache->get_dirfrag(parent->dirfrag);
1197 if (origin && origin->is_auth()) {
1198 dout(7) << "child_export_finish requeue " << *origin << dendl;
1199 export_queue.emplace_front(origin->dirfrag(), parent->dest);
1200 }
1201 }
1202 }
1203 }
1204
1205 /*
1206 * called on receipt of MExportDirDiscoverAck
1207 * the importer now has the directory's _inode_ in memory, and pinned.
1208 */
1209 void Migrator::handle_export_discover_ack(const cref_t<MExportDirDiscoverAck> &m)
1210 {
1211 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
1212 mds_rank_t dest(m->get_source().num());
1213 ceph_assert(dir);
1214
1215 dout(7) << "from " << m->get_source()
1216 << " on " << *dir << dendl;
1217
1218 mds->hit_export_target(dest, -1);
1219
1220 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1221 if (it == export_state.end() ||
1222 it->second.tid != m->get_tid() ||
1223 it->second.peer != dest) {
1224 dout(7) << "must have aborted" << dendl;
1225 } else {
1226 ceph_assert(it->second.state == EXPORT_DISCOVERING);
1227
1228 if (m->is_success()) {
1229 // release locks to avoid deadlock
1230 MDRequestRef mdr = static_cast<MDRequestImpl*>(it->second.mut.get());
1231 ceph_assert(mdr);
1232 mds->mdcache->request_finish(mdr);
1233 it->second.mut.reset();
1234 // freeze the subtree
1235 it->second.state = EXPORT_FREEZING;
1236 dir->auth_unpin(this);
1237 ceph_assert(g_conf()->mds_kill_export_at != 3);
1238
1239 } else {
1240 dout(7) << "peer failed to discover (not active?), canceling" << dendl;
1241 export_try_cancel(dir, false);
1242 }
1243 }
1244 }
1245
1246 class C_M_ExportSessionsFlushed : public MigratorContext {
1247 CDir *dir;
1248 uint64_t tid;
1249 public:
1250 C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t) :
1251 MigratorContext(m), dir(d), tid(t) {
1252 dir->get(CDir::PIN_PTRWAITER);
1253 }
1254 void finish(int r) override {
1255 mig->export_sessions_flushed(dir, tid);
1256 dir->put(CDir::PIN_PTRWAITER);
1257 }
1258 };
1259
1260 void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
1261 {
1262 dout(7) << *dir << dendl;
1263
1264 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1265 if (it == export_state.end() ||
1266 it->second.state == EXPORT_CANCELLING ||
1267 it->second.tid != tid) {
1268 // export must have aborted.
1269 dout(7) << "export must have aborted on " << dir << dendl;
1270 return;
1271 }
1272
1273 ceph_assert(it->second.state == EXPORT_PREPPING || it->second.state == EXPORT_WARNING);
1274 ceph_assert(it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0);
1275 it->second.warning_ack_waiting.erase(MDS_RANK_NONE);
1276 if (it->second.state == EXPORT_WARNING && it->second.warning_ack_waiting.empty())
1277 export_go(dir); // start export.
1278 }
1279
1280 void Migrator::encode_export_prep_trace(bufferlist &final_bl, CDir *bound,
1281 CDir *dir, export_state_t &es,
1282 set<inodeno_t> &inodes_added,
1283 set<dirfrag_t> &dirfrags_added)
1284 {
1285 ENCODE_START(1, 1, final_bl);
1286
1287 dout(7) << " started to encode dir " << *bound << dendl;
1288 CDir *cur = bound;
1289 bufferlist tracebl;
1290 char start = '-';
1291
1292 while (1) {
1293 // don't repeat inodes
1294 if (inodes_added.count(cur->inode->ino()))
1295 break;
1296 inodes_added.insert(cur->inode->ino());
1297
1298 // prepend dentry + inode
1299 ceph_assert(cur->inode->is_auth());
1300 bufferlist bl;
1301 cache->encode_replica_dentry(cur->inode->parent, es.peer, bl);
1302 dout(7) << " added " << *cur->inode->parent << dendl;
1303 cache->encode_replica_inode(cur->inode, es.peer, bl, mds->mdsmap->get_up_features());
1304 dout(7) << " added " << *cur->inode << dendl;
1305 bl.claim_append(tracebl);
1306 tracebl.claim(bl);
1307
1308 cur = cur->get_parent_dir();
1309 // don't repeat dirfrags
1310 if (dirfrags_added.count(cur->dirfrag()) || cur == dir) {
1311 start = 'd'; // start with dentry
1312 break;
1313 }
1314 dirfrags_added.insert(cur->dirfrag());
1315
1316 // prepend dir
1317 cache->encode_replica_dir(cur, es.peer, bl);
1318 dout(7) << " added " << *cur << dendl;
1319 bl.claim_append(tracebl);
1320 tracebl.claim(bl);
1321 start = 'f'; // start with dirfrag
1322 }
1323 dirfrag_t df = cur->dirfrag();
1324 encode(df, final_bl);
1325 encode(start, final_bl);
1326 final_bl.claim_append(tracebl);
1327
1328 ENCODE_FINISH(final_bl);
1329 }
1330
1331 void Migrator::export_frozen(CDir *dir, uint64_t tid)
1332 {
1333 dout(7) << *dir << dendl;
1334
1335 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1336 if (it == export_state.end() || it->second.tid != tid) {
1337 dout(7) << "export must have aborted" << dendl;
1338 return;
1339 }
1340
1341 ceph_assert(it->second.state == EXPORT_FREEZING);
1342 ceph_assert(dir->is_frozen_tree_root());
1343
1344 it->second.mut = new MutationImpl();
1345
1346 // ok, try to grab all my locks.
1347 CInode *diri = dir->get_inode();
1348 if ((diri->is_auth() && diri->is_frozen()) ||
1349 !export_try_grab_locks(dir, it->second.mut)) {
1350 dout(7) << "export_dir couldn't acquire all needed locks, failing. "
1351 << *dir << dendl;
1352 export_try_cancel(dir);
1353 return;
1354 }
1355
1356 if (diri->is_auth())
1357 it->second.mut->auth_pin(diri);
1358
1359 cache->show_subtrees();
1360
1361 // CDir::_freeze_tree() should have forced it into subtree.
1362 ceph_assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
1363 // note the bounds.
1364 set<CDir*> bounds;
1365 cache->get_subtree_bounds(dir, bounds);
1366
1367 // generate prep message, log entry.
1368 auto prep = make_message<MExportDirPrep>(dir->dirfrag(), it->second.tid);
1369
1370 // include list of bystanders
1371 for (const auto &p : dir->get_replicas()) {
1372 if (p.first != it->second.peer) {
1373 dout(10) << "bystander mds." << p.first << dendl;
1374 prep->add_bystander(p.first);
1375 }
1376 }
1377
1378 // include base dirfrag
1379 cache->encode_replica_dir(dir, it->second.peer, prep->basedir);
1380
1381 /*
1382 * include spanning tree for all nested exports.
1383 * these need to be on the destination _before_ the final export so that
1384 * dir_auth updates on any nested exports are properly absorbed.
1385 * this includes inodes and dirfrags included in the subtree, but
1386 * only the inodes at the bounds.
1387 *
1388 * each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
1389 */
1390 set<inodeno_t> inodes_added;
1391 set<dirfrag_t> dirfrags_added;
1392
1393 // check bounds
1394 for (auto &bound : bounds){
1395 // pin it.
1396 bound->get(CDir::PIN_EXPORTBOUND);
1397 bound->state_set(CDir::STATE_EXPORTBOUND);
1398
1399 dout(7) << " export bound " << *bound << dendl;
1400 prep->add_bound( bound->dirfrag() );
1401
1402 bufferlist final_bl;
1403 encode_export_prep_trace(final_bl, bound, dir, it->second, inodes_added, dirfrags_added);
1404 prep->add_trace(final_bl);
1405 }
1406
1407 // send.
1408 it->second.state = EXPORT_PREPPING;
1409 mds->send_message_mds(prep, it->second.peer);
1410 assert (g_conf()->mds_kill_export_at != 4);
1411
1412 // make sure any new instantiations of caps are flushed out
1413 ceph_assert(it->second.warning_ack_waiting.empty());
1414
1415 set<client_t> export_client_set;
1416 get_export_client_set(dir, export_client_set);
1417
1418 MDSGatherBuilder gather(g_ceph_context);
1419 mds->server->flush_client_sessions(export_client_set, gather);
1420 if (gather.has_subs()) {
1421 it->second.warning_ack_waiting.insert(MDS_RANK_NONE);
1422 gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
1423 gather.activate();
1424 }
1425 }
1426
1427 void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
1428 {
1429 deque<CDir*> dfs;
1430 dfs.push_back(dir);
1431 while (!dfs.empty()) {
1432 CDir *dir = dfs.front();
1433 dfs.pop_front();
1434 for (auto& p : *dir) {
1435 CDentry *dn = p.second;
1436 if (!dn->get_linkage()->is_primary())
1437 continue;
1438 CInode *in = dn->get_linkage()->get_inode();
1439 if (in->is_dir()) {
1440 // directory?
1441 auto&& ls = in->get_dirfrags();
1442 for (auto& q : ls) {
1443 if (!q->state_test(CDir::STATE_EXPORTBOUND)) {
1444 // include nested dirfrag
1445 ceph_assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
1446 dfs.push_back(q); // it's ours, recurse (later)
1447 }
1448 }
1449 }
1450 for (auto& q : in->get_client_caps()) {
1451 client_set.insert(q.first);
1452 }
1453 }
1454 }
1455 }
1456
1457 void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
1458 {
1459 for (const auto &p : in->get_client_caps()) {
1460 client_set.insert(p.first);
1461 }
1462 }
1463
1464 void Migrator::handle_export_prep_ack(const cref_t<MExportDirPrepAck> &m)
1465 {
1466 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
1467 mds_rank_t dest(m->get_source().num());
1468 ceph_assert(dir);
1469
1470 dout(7) << *dir << dendl;
1471
1472 mds->hit_export_target(dest, -1);
1473
1474 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1475 if (it == export_state.end() ||
1476 it->second.tid != m->get_tid() ||
1477 it->second.peer != mds_rank_t(m->get_source().num())) {
1478 // export must have aborted.
1479 dout(7) << "export must have aborted" << dendl;
1480 return;
1481 }
1482 ceph_assert(it->second.state == EXPORT_PREPPING);
1483
1484 if (!m->is_success()) {
1485 dout(7) << "peer couldn't acquire all needed locks or wasn't active, canceling" << dendl;
1486 export_try_cancel(dir, false);
1487 return;
1488 }
1489
1490 assert (g_conf()->mds_kill_export_at != 5);
1491 // send warnings
1492 set<CDir*> bounds;
1493 cache->get_subtree_bounds(dir, bounds);
1494
1495 ceph_assert(it->second.warning_ack_waiting.empty() ||
1496 (it->second.warning_ack_waiting.size() == 1 &&
1497 it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0));
1498 ceph_assert(it->second.notify_ack_waiting.empty());
1499
1500 for (const auto &p : dir->get_replicas()) {
1501 if (p.first == it->second.peer) continue;
1502 if (mds->is_cluster_degraded() &&
1503 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first))
1504 continue; // only if active
1505 it->second.warning_ack_waiting.insert(p.first);
1506 it->second.notify_ack_waiting.insert(p.first); // we'll eventually get a notifyack, too!
1507
1508 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), it->second.tid, true,
1509 mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
1510 mds_authority_t(mds->get_nodeid(),it->second.peer));
1511 for (auto &cdir : bounds) {
1512 notify->get_bounds().push_back(cdir->dirfrag());
1513 }
1514 mds->send_message_mds(notify, p.first);
1515
1516 }
1517
1518 it->second.state = EXPORT_WARNING;
1519
1520 ceph_assert(g_conf()->mds_kill_export_at != 6);
1521 // nobody to warn?
1522 if (it->second.warning_ack_waiting.empty())
1523 export_go(dir); // start export.
1524 }
1525
1526
1527 class C_M_ExportGo : public MigratorContext {
1528 CDir *dir;
1529 uint64_t tid;
1530 public:
1531 C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
1532 MigratorContext(m), dir(d), tid(t) {
1533 dir->get(CDir::PIN_PTRWAITER);
1534 }
1535 void finish(int r) override {
1536 mig->export_go_synced(dir, tid);
1537 dir->put(CDir::PIN_PTRWAITER);
1538 }
1539 };
1540
1541 void Migrator::export_go(CDir *dir)
1542 {
1543 auto it = export_state.find(dir);
1544 ceph_assert(it != export_state.end());
1545 dout(7) << *dir << " to " << it->second.peer << dendl;
1546
1547 // first sync log to flush out e.g. any cap imports
1548 mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, it->second.tid));
1549 mds->mdlog->flush();
1550 }
1551
1552 void Migrator::export_go_synced(CDir *dir, uint64_t tid)
1553 {
1554 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1555 if (it == export_state.end() ||
1556 it->second.state == EXPORT_CANCELLING ||
1557 it->second.tid != tid) {
1558 // export must have aborted.
1559 dout(7) << "export must have aborted on " << dir << dendl;
1560 return;
1561 }
1562 ceph_assert(it->second.state == EXPORT_WARNING);
1563 mds_rank_t dest = it->second.peer;
1564
1565 dout(7) << *dir << " to " << dest << dendl;
1566
1567 cache->show_subtrees();
1568
1569 it->second.state = EXPORT_EXPORTING;
1570 ceph_assert(g_conf()->mds_kill_export_at != 7);
1571
1572 ceph_assert(dir->is_frozen_tree_root());
1573
1574 // set ambiguous auth
1575 cache->adjust_subtree_auth(dir, mds->get_nodeid(), dest);
1576
1577 // take away the popularity we're sending.
1578 mds->balancer->subtract_export(dir);
1579
1580 // fill export message with cache data
1581 auto req = make_message<MExportDir>(dir->dirfrag(), it->second.tid);
1582 map<client_t,entity_inst_t> exported_client_map;
1583 map<client_t,client_metadata_t> exported_client_metadata_map;
1584 uint64_t num_exported_inodes = 0;
1585 encode_export_dir(req->export_data, dir, // recur start point
1586 exported_client_map, exported_client_metadata_map,
1587 num_exported_inodes);
1588 encode(exported_client_map, req->client_map, mds->mdsmap->get_up_features());
1589 encode(exported_client_metadata_map, req->client_map);
1590
1591 // add bounds to message
1592 set<CDir*> bounds;
1593 cache->get_subtree_bounds(dir, bounds);
1594 for (set<CDir*>::iterator p = bounds.begin();
1595 p != bounds.end();
1596 ++p)
1597 req->add_export((*p)->dirfrag());
1598
1599 // send
1600 mds->send_message_mds(req, dest);
1601 ceph_assert(g_conf()->mds_kill_export_at != 8);
1602
1603 mds->hit_export_target(dest, num_exported_inodes+1);
1604
1605 // stats
1606 if (mds->logger) mds->logger->inc(l_mds_exported);
1607 if (mds->logger) mds->logger->inc(l_mds_exported_inodes, num_exported_inodes);
1608
1609 cache->show_subtrees();
1610 }
1611
1612
1613 /** encode_export_inode
1614 * update our local state for this inode to export.
1615 * encode relevant state to be sent over the wire.
1616 * used by: encode_export_dir, file_rename (if foreign)
1617 *
1618 * FIXME: the separation between CInode.encode_export and these methods
1619 * is pretty arbitrary and dumb.
1620 */
1621 void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
1622 map<client_t,entity_inst_t>& exported_client_map,
1623 map<client_t,client_metadata_t>& exported_client_metadata_map)
1624 {
1625 ENCODE_START(1, 1, enc_state);
1626 dout(7) << *in << dendl;
1627 ceph_assert(!in->is_replica(mds->get_nodeid()));
1628
1629 encode(in->inode.ino, enc_state);
1630 encode(in->last, enc_state);
1631 in->encode_export(enc_state);
1632
1633 // caps
1634 encode_export_inode_caps(in, true, enc_state, exported_client_map, exported_client_metadata_map);
1635 ENCODE_FINISH(enc_state);
1636 }
1637
1638 void Migrator::encode_export_inode_caps(CInode *in, bool auth_cap, bufferlist& bl,
1639 map<client_t,entity_inst_t>& exported_client_map,
1640 map<client_t,client_metadata_t>& exported_client_metadata_map)
1641 {
1642 ENCODE_START(1, 1, bl);
1643 dout(20) << *in << dendl;
1644 // encode caps
1645 map<client_t,Capability::Export> cap_map;
1646 in->export_client_caps(cap_map);
1647 encode(cap_map, bl);
1648 if (auth_cap) {
1649 encode(in->get_mds_caps_wanted(), bl);
1650
1651 in->state_set(CInode::STATE_EXPORTINGCAPS);
1652 in->get(CInode::PIN_EXPORTINGCAPS);
1653 }
1654
1655 // make note of clients named by exported capabilities
1656 for (const auto &p : in->get_client_caps()) {
1657 if (exported_client_map.count(p.first))
1658 continue;
1659 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p.first.v));
1660 exported_client_map[p.first] = session->info.inst;
1661 exported_client_metadata_map[p.first] = session->info.client_metadata;
1662 }
1663 ENCODE_FINISH(bl);
1664 }
1665
1666 void Migrator::finish_export_inode_caps(CInode *in, mds_rank_t peer,
1667 map<client_t,Capability::Import>& peer_imported)
1668 {
1669 dout(20) << *in << dendl;
1670
1671 in->state_clear(CInode::STATE_EXPORTINGCAPS);
1672 in->put(CInode::PIN_EXPORTINGCAPS);
1673
1674 // tell (all) clients about migrating caps..
1675 for (const auto &p : in->get_client_caps()) {
1676 const Capability *cap = &p.second;
1677 dout(7) << p.first
1678 << " exported caps on " << *in << dendl;
1679 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
1680 cap->get_cap_id(), cap->get_mseq(),
1681 mds->get_osd_epoch_barrier());
1682 map<client_t,Capability::Import>::iterator q = peer_imported.find(p.first);
1683 ceph_assert(q != peer_imported.end());
1684 m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
1685 (q->second.cap_id > 0 ? peer : -1), 0);
1686 mds->send_message_client_counted(m, p.first);
1687 }
1688 in->clear_client_caps_after_export();
1689 mds->locker->eval(in, CEPH_CAP_LOCKS);
1690 }
1691
1692 void Migrator::finish_export_inode(CInode *in, mds_rank_t peer,
1693 map<client_t,Capability::Import>& peer_imported,
1694 MDSContext::vec& finished)
1695 {
1696 dout(12) << *in << dendl;
1697
1698 // clean
1699 if (in->is_dirty())
1700 in->mark_clean();
1701
1702 // clear/unpin cached_by (we're no longer the authority)
1703 in->clear_replica_map();
1704
1705 // twiddle lock states for auth -> replica transition
1706 in->authlock.export_twiddle();
1707 in->linklock.export_twiddle();
1708 in->dirfragtreelock.export_twiddle();
1709 in->filelock.export_twiddle();
1710 in->nestlock.export_twiddle();
1711 in->xattrlock.export_twiddle();
1712 in->snaplock.export_twiddle();
1713 in->flocklock.export_twiddle();
1714 in->policylock.export_twiddle();
1715
1716 // mark auth
1717 ceph_assert(in->is_auth());
1718 in->state_clear(CInode::STATE_AUTH);
1719 in->replica_nonce = CInode::EXPORT_NONCE;
1720
1721 in->clear_dirty_rstat();
1722
1723 // no more auth subtree? clear scatter dirty
1724 if (!in->has_subtree_root_dirfrag(mds->get_nodeid()))
1725 in->clear_scatter_dirty();
1726
1727 in->clear_dirty_parent();
1728
1729 in->clear_file_locks();
1730
1731 // waiters
1732 in->take_waiting(CInode::WAIT_ANY_MASK, finished);
1733
1734 in->finish_export();
1735
1736 finish_export_inode_caps(in, peer, peer_imported);
1737 }
1738
1739 void Migrator::encode_export_dir(bufferlist& exportbl,
1740 CDir *dir,
1741 map<client_t,entity_inst_t>& exported_client_map,
1742 map<client_t,client_metadata_t>& exported_client_metadata_map,
1743 uint64_t &num_exported)
1744 {
1745 // This has to be declared before ENCODE_STARTED as it will need to be referenced after ENCODE_FINISH.
1746 std::vector<CDir*> subdirs;
1747
1748 ENCODE_START(1, 1, exportbl);
1749 dout(7) << *dir << " " << dir->get_num_head_items() << " head items" << dendl;
1750
1751 ceph_assert(dir->get_projected_version() == dir->get_version());
1752
1753 #ifdef MDS_VERIFY_FRAGSTAT
1754 if (dir->is_complete())
1755 dir->verify_fragstat();
1756 #endif
1757
1758 // dir
1759 dirfrag_t df = dir->dirfrag();
1760 encode(df, exportbl);
1761 dir->encode_export(exportbl);
1762
1763 __u32 nden = dir->items.size();
1764 encode(nden, exportbl);
1765
1766 // dentries
1767 for (auto &p : *dir) {
1768 CDentry *dn = p.second;
1769 CInode *in = dn->get_linkage()->get_inode();
1770
1771 num_exported++;
1772
1773 // -- dentry
1774 dout(7) << " exporting " << *dn << dendl;
1775
1776 // dn name
1777 encode(dn->get_name(), exportbl);
1778 encode(dn->last, exportbl);
1779
1780 // state
1781 dn->encode_export(exportbl);
1782
1783 // points to...
1784
1785 // null dentry?
1786 if (dn->get_linkage()->is_null()) {
1787 exportbl.append("N", 1); // null dentry
1788 continue;
1789 }
1790
1791 if (dn->get_linkage()->is_remote()) {
1792 // remote link
1793 exportbl.append("L", 1); // remote link
1794
1795 inodeno_t ino = dn->get_linkage()->get_remote_ino();
1796 unsigned char d_type = dn->get_linkage()->get_remote_d_type();
1797 encode(ino, exportbl);
1798 encode(d_type, exportbl);
1799 continue;
1800 }
1801
1802 // primary link
1803 // -- inode
1804 exportbl.append("I", 1); // inode dentry
1805
1806 encode_export_inode(in, exportbl, exported_client_map, exported_client_metadata_map); // encode, and (update state for) export
1807
1808 // directory?
1809 auto&& dfs = in->get_dirfrags();
1810 for (const auto& t : dfs) {
1811 if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
1812 // include nested dirfrag
1813 ceph_assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
1814 subdirs.push_back(t); // it's ours, recurse (later)
1815 }
1816 }
1817 }
1818
1819 ENCODE_FINISH(exportbl);
1820 // subdirs
1821 for (const auto &dir : subdirs) {
1822 encode_export_dir(exportbl, dir, exported_client_map, exported_client_metadata_map, num_exported);
1823 }
1824 }
1825
1826 void Migrator::finish_export_dir(CDir *dir, mds_rank_t peer,
1827 map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
1828 MDSContext::vec& finished, int *num_dentries)
1829 {
1830 dout(10) << *dir << dendl;
1831
1832 // release open_by
1833 dir->clear_replica_map();
1834
1835 // mark
1836 ceph_assert(dir->is_auth());
1837 dir->state_clear(CDir::STATE_AUTH);
1838 dir->remove_bloom();
1839 dir->replica_nonce = CDir::EXPORT_NONCE;
1840
1841 if (dir->is_dirty())
1842 dir->mark_clean();
1843
1844 // suck up all waiters
1845 dir->take_waiting(CDir::WAIT_ANY_MASK, finished); // all dir waiters
1846
1847 // pop
1848 dir->finish_export();
1849
1850 // dentries
1851 std::vector<CDir*> subdirs;
1852 for (auto &p : *dir) {
1853 CDentry *dn = p.second;
1854 CInode *in = dn->get_linkage()->get_inode();
1855
1856 // dentry
1857 dn->finish_export();
1858
1859 // inode?
1860 if (dn->get_linkage()->is_primary()) {
1861 finish_export_inode(in, peer, peer_imported[in->ino()], finished);
1862
1863 // subdirs?
1864 auto&& dirs = in->get_nested_dirfrags();
1865 subdirs.insert(std::end(subdirs), std::begin(dirs), std::end(dirs));
1866 }
1867
1868 cache->touch_dentry_bottom(dn); // move dentry to tail of LRU
1869 ++(*num_dentries);
1870 }
1871
1872 // subdirs
1873 for (const auto& dir : subdirs) {
1874 finish_export_dir(dir, peer, peer_imported, finished, num_dentries);
1875 }
1876 }
1877
1878 class C_MDS_ExportFinishLogged : public MigratorLogContext {
1879 CDir *dir;
1880 public:
1881 C_MDS_ExportFinishLogged(Migrator *m, CDir *d) : MigratorLogContext(m), dir(d) {}
1882 void finish(int r) override {
1883 mig->export_logged_finish(dir);
1884 }
1885 };
1886
1887
1888 /*
1889 * i should get an export_ack from the export target.
1890 */
1891 void Migrator::handle_export_ack(const cref_t<MExportDirAck> &m)
1892 {
1893 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
1894 mds_rank_t dest(m->get_source().num());
1895 ceph_assert(dir);
1896 ceph_assert(dir->is_frozen_tree_root()); // i'm exporting!
1897
1898 // yay!
1899 dout(7) << *dir << dendl;
1900
1901 mds->hit_export_target(dest, -1);
1902
1903 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1904 ceph_assert(it != export_state.end());
1905 ceph_assert(it->second.state == EXPORT_EXPORTING);
1906 ceph_assert(it->second.tid == m->get_tid());
1907
1908 auto bp = m->imported_caps.cbegin();
1909 decode(it->second.peer_imported, bp);
1910
1911 it->second.state = EXPORT_LOGGINGFINISH;
1912 assert (g_conf()->mds_kill_export_at != 9);
1913 set<CDir*> bounds;
1914 cache->get_subtree_bounds(dir, bounds);
1915
1916 // log completion.
1917 // include export bounds, to ensure they're in the journal.
1918 EExport *le = new EExport(mds->mdlog, dir, it->second.peer);;
1919 mds->mdlog->start_entry(le);
1920
1921 le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
1922 le->metablob.add_dir(dir, false);
1923 for (set<CDir*>::iterator p = bounds.begin();
1924 p != bounds.end();
1925 ++p) {
1926 CDir *bound = *p;
1927 le->get_bounds().insert(bound->dirfrag());
1928 le->metablob.add_dir_context(bound);
1929 le->metablob.add_dir(bound, false);
1930 }
1931
1932 // list us second, them first.
1933 // this keeps authority().first in sync with subtree auth state in the journal.
1934 cache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid());
1935
1936 // log export completion, then finish (unfreeze, trigger finish context, etc.)
1937 mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir));
1938 mds->mdlog->flush();
1939 assert (g_conf()->mds_kill_export_at != 10);
1940 }
1941
1942 void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
1943 {
1944 dout(7) << *dir << dendl;
1945
1946 ceph_assert(stat.state == EXPORT_CANCELLING);
1947
1948 if (stat.notify_ack_waiting.empty()) {
1949 stat.state = EXPORT_CANCELLED;
1950 return;
1951 }
1952
1953 dir->auth_pin(this);
1954
1955 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
1956 p != stat.notify_ack_waiting.end();
1957 ++p) {
1958 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
1959 pair<int,int>(mds->get_nodeid(), stat.peer),
1960 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
1961 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
1962 notify->get_bounds().push_back((*i)->dirfrag());
1963 mds->send_message_mds(notify, *p);
1964 }
1965 }
1966
1967 /*
1968 * this happens if hte dest failes after i send teh export data but before it is acked
1969 * that is, we don't know they safely received and logged it, so we reverse our changes
1970 * and go on.
1971 */
1972 void Migrator::export_reverse(CDir *dir, export_state_t& stat)
1973 {
1974 dout(7) << *dir << dendl;
1975
1976 set<CInode*> to_eval;
1977
1978 set<CDir*> bounds;
1979 cache->get_subtree_bounds(dir, bounds);
1980
1981 // remove exporting pins
1982 std::deque<CDir*> rq;
1983 rq.push_back(dir);
1984 while (!rq.empty()) {
1985 CDir *t = rq.front();
1986 rq.pop_front();
1987 t->abort_export();
1988 for (auto &p : *t) {
1989 CDentry *dn = p.second;
1990 dn->abort_export();
1991 if (!dn->get_linkage()->is_primary())
1992 continue;
1993 CInode *in = dn->get_linkage()->get_inode();
1994 in->abort_export();
1995 if (in->state_test(CInode::STATE_EVALSTALECAPS)) {
1996 in->state_clear(CInode::STATE_EVALSTALECAPS);
1997 to_eval.insert(in);
1998 }
1999 if (in->is_dir()) {
2000 auto&& dirs = in->get_nested_dirfrags();
2001 for (const auto& dir : dirs) {
2002 rq.push_back(dir);
2003 }
2004 }
2005 }
2006 }
2007
2008 // unpin bounds
2009 for (auto bd : bounds) {
2010 bd->put(CDir::PIN_EXPORTBOUND);
2011 bd->state_clear(CDir::STATE_EXPORTBOUND);
2012 }
2013
2014 // notify bystanders
2015 export_notify_abort(dir, stat, bounds);
2016
2017 // unfreeze tree, with possible subtree merge.
2018 cache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
2019
2020 // process delayed expires
2021 cache->process_delayed_expire(dir);
2022
2023 dir->unfreeze_tree();
2024 cache->try_subtree_merge(dir);
2025
2026 // revoke/resume stale caps
2027 for (auto in : to_eval) {
2028 bool need_issue = false;
2029 for (auto &p : in->client_caps) {
2030 Capability *cap = &p.second;
2031 if (!cap->is_stale()) {
2032 need_issue = true;
2033 break;
2034 }
2035 }
2036 if (need_issue &&
2037 (!in->is_auth() || !mds->locker->eval(in, CEPH_CAP_LOCKS)))
2038 mds->locker->issue_caps(in);
2039 }
2040
2041 cache->show_cache();
2042 }
2043
2044
2045 /*
2046 * once i get the ack, and logged the EExportFinish(true),
2047 * send notifies (if any), otherwise go straight to finish.
2048 *
2049 */
2050 void Migrator::export_logged_finish(CDir *dir)
2051 {
2052 dout(7) << *dir << dendl;
2053
2054 export_state_t& stat = export_state[dir];
2055
2056 // send notifies
2057 set<CDir*> bounds;
2058 cache->get_subtree_bounds(dir, bounds);
2059
2060 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
2061 p != stat.notify_ack_waiting.end();
2062 ++p) {
2063 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
2064 pair<int,int>(mds->get_nodeid(), stat.peer),
2065 pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
2066
2067 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2068 notify->get_bounds().push_back((*i)->dirfrag());
2069
2070 mds->send_message_mds(notify, *p);
2071 }
2072
2073 // wait for notifyacks
2074 stat.state = EXPORT_NOTIFYING;
2075 assert (g_conf()->mds_kill_export_at != 11);
2076
2077 // no notifies to wait for?
2078 if (stat.notify_ack_waiting.empty()) {
2079 export_finish(dir); // skip notify/notify_ack stage.
2080 } else {
2081 // notify peer to send cap import messages to clients
2082 if (!mds->is_cluster_degraded() ||
2083 mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) {
2084 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), false, stat.tid), stat.peer);
2085 } else {
2086 dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
2087 }
2088 }
2089 }
2090
2091 /*
2092 * warning:
2093 * i'll get an ack from each bystander.
2094 * when i get them all, do the export.
2095 * notify:
2096 * i'll get an ack from each bystander.
2097 * when i get them all, unfreeze and send the finish.
2098 */
2099 void Migrator::handle_export_notify_ack(const cref_t<MExportDirNotifyAck> &m)
2100 {
2101 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
2102 mds_rank_t dest(m->get_source().num());
2103 ceph_assert(dir);
2104 mds_rank_t from = mds_rank_t(m->get_source().num());
2105
2106 mds->hit_export_target(dest, -1);
2107
2108 auto export_state_entry = export_state.find(dir);
2109 if (export_state_entry != export_state.end()) {
2110 export_state_t& stat = export_state_entry->second;
2111 if (stat.state == EXPORT_WARNING &&
2112 stat.warning_ack_waiting.erase(from)) {
2113 // exporting. process warning.
2114 dout(7) << "from " << m->get_source()
2115 << ": exporting, processing warning on " << *dir << dendl;
2116 if (stat.warning_ack_waiting.empty())
2117 export_go(dir); // start export.
2118 } else if (stat.state == EXPORT_NOTIFYING &&
2119 stat.notify_ack_waiting.erase(from)) {
2120 // exporting. process notify.
2121 dout(7) << "from " << m->get_source()
2122 << ": exporting, processing notify on " << *dir << dendl;
2123 if (stat.notify_ack_waiting.empty())
2124 export_finish(dir);
2125 } else if (stat.state == EXPORT_CANCELLING &&
2126 m->get_new_auth().second == CDIR_AUTH_UNKNOWN && // not warning ack
2127 stat.notify_ack_waiting.erase(from)) {
2128 dout(7) << "from " << m->get_source()
2129 << ": cancelling export, processing notify on " << *dir << dendl;
2130 if (stat.notify_ack_waiting.empty()) {
2131 export_cancel_finish(export_state_entry);
2132 }
2133 }
2134 }
2135 else {
2136 auto import_state_entry = import_state.find(dir->dirfrag());
2137 if (import_state_entry != import_state.end()) {
2138 import_state_t& stat = import_state_entry->second;
2139 if (stat.state == IMPORT_ABORTING) {
2140 // reversing import
2141 dout(7) << "from " << m->get_source()
2142 << ": aborting import on " << *dir << dendl;
2143 ceph_assert(stat.bystanders.count(from));
2144 stat.bystanders.erase(from);
2145 if (stat.bystanders.empty())
2146 import_reverse_unfreeze(dir);
2147 }
2148 }
2149 }
2150 }
2151
2152 void Migrator::export_finish(CDir *dir)
2153 {
2154 dout(3) << *dir << dendl;
2155
2156 assert (g_conf()->mds_kill_export_at != 12);
2157 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
2158 if (it == export_state.end()) {
2159 dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << dendl;
2160 return;
2161 }
2162
2163 // send finish/commit to new auth
2164 if (!mds->is_cluster_degraded() ||
2165 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
2166 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), true, it->second.tid), it->second.peer);
2167 } else {
2168 dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl;
2169 }
2170 ceph_assert(g_conf()->mds_kill_export_at != 13);
2171
2172 // finish export (adjust local cache state)
2173 int num_dentries = 0;
2174 MDSContext::vec finished;
2175 finish_export_dir(dir, it->second.peer,
2176 it->second.peer_imported, finished, &num_dentries);
2177
2178 ceph_assert(!dir->is_auth());
2179 cache->adjust_subtree_auth(dir, it->second.peer);
2180
2181 // unpin bounds
2182 set<CDir*> bounds;
2183 cache->get_subtree_bounds(dir, bounds);
2184 for (set<CDir*>::iterator p = bounds.begin();
2185 p != bounds.end();
2186 ++p) {
2187 CDir *bd = *p;
2188 bd->put(CDir::PIN_EXPORTBOUND);
2189 bd->state_clear(CDir::STATE_EXPORTBOUND);
2190 }
2191
2192 if (dir->state_test(CDir::STATE_AUXSUBTREE))
2193 dir->state_clear(CDir::STATE_AUXSUBTREE);
2194
2195 // discard delayed expires
2196 cache->discard_delayed_expire(dir);
2197
2198 dout(7) << "unfreezing" << dendl;
2199
2200 // unfreeze tree, with possible subtree merge.
2201 // (we do this _after_ removing EXPORTBOUND pins, to allow merges)
2202 dir->unfreeze_tree();
2203 cache->try_subtree_merge(dir);
2204
2205 // no more auth subtree? clear scatter dirty
2206 if (!dir->get_inode()->is_auth() &&
2207 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2208 dir->get_inode()->clear_scatter_dirty();
2209 // wake up scatter_nudge waiters
2210 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, finished);
2211 }
2212
2213 if (!finished.empty())
2214 mds->queue_waiters(finished);
2215
2216 MutationRef mut = std::move(it->second.mut);
2217 auto parent = std::move(it->second.parent);
2218 // remove from exporting list, clean up state
2219 total_exporting_size -= it->second.approx_size;
2220 export_state.erase(it);
2221
2222 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
2223 dir->clear_exporting();
2224
2225 cache->show_subtrees();
2226 audit();
2227
2228 cache->trim(num_dentries); // try trimming exported dentries
2229
2230 // send pending import_maps?
2231 mds->mdcache->maybe_send_pending_resolves();
2232
2233 // drop locks, unpin path
2234 if (mut) {
2235 mds->locker->drop_locks(mut.get());
2236 mut->cleanup();
2237 }
2238
2239 if (parent)
2240 child_export_finish(parent, true);
2241
2242 maybe_do_queued_export();
2243 }
2244
2245
2246
2247 class C_MDS_ExportDiscover : public MigratorContext {
2248 public:
2249 C_MDS_ExportDiscover(Migrator *mig, const cref_t<MExportDirDiscover>& m) : MigratorContext(mig), m(m) {}
2250 void finish(int r) override {
2251 mig->handle_export_discover(m, true);
2252 }
2253 private:
2254 cref_t<MExportDirDiscover> m;
2255 };
2256
2257 class C_MDS_ExportDiscoverFactory : public MDSContextFactory {
2258 public:
2259 C_MDS_ExportDiscoverFactory(Migrator *mig, cref_t<MExportDirDiscover> m) : mig(mig), m(m) {}
2260 MDSContext *build() {
2261 return new C_MDS_ExportDiscover(mig, m);
2262 }
2263 private:
2264 Migrator *mig;
2265 cref_t<MExportDirDiscover> m;
2266 };
2267
2268 // ==========================================================
2269 // IMPORT
2270
2271 void Migrator::handle_export_discover(const cref_t<MExportDirDiscover> &m, bool started)
2272 {
2273 mds_rank_t from = m->get_source_mds();
2274 ceph_assert(from != mds->get_nodeid());
2275
2276 dout(7) << m->get_path() << dendl;
2277
2278 // note import state
2279 dirfrag_t df = m->get_dirfrag();
2280
2281 if (!mds->is_active()) {
2282 dout(7) << " not active, send NACK " << dendl;
2283 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid(), false), from);
2284 return;
2285 }
2286
2287 // only start discovering on this message once.
2288 import_state_t *p_state;
2289 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2290 if (!started) {
2291 ceph_assert(it == import_state.end());
2292 p_state = &import_state[df];
2293 p_state->state = IMPORT_DISCOVERING;
2294 p_state->peer = from;
2295 p_state->tid = m->get_tid();
2296 } else {
2297 // am i retrying after ancient path_traverse results?
2298 if (it == import_state.end() ||
2299 it->second.peer != from ||
2300 it->second.tid != m->get_tid()) {
2301 dout(7) << " dropping obsolete message" << dendl;
2302 return;
2303 }
2304 ceph_assert(it->second.state == IMPORT_DISCOVERING);
2305 p_state = &it->second;
2306 }
2307
2308 C_MDS_ExportDiscoverFactory cf(this, m);
2309 if (!mds->mdcache->is_open()) {
2310 dout(10) << " waiting for root" << dendl;
2311 mds->mdcache->wait_for_open(cf.build());
2312 return;
2313 }
2314
2315 assert (g_conf()->mds_kill_import_at != 1);
2316
2317 // do we have it?
2318 CInode *in = cache->get_inode(m->get_dirfrag().ino);
2319 if (!in) {
2320 // must discover it!
2321 filepath fpath(m->get_path());
2322 vector<CDentry*> trace;
2323 MDRequestRef null_ref;
2324 int r = cache->path_traverse(null_ref, cf, fpath,
2325 MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_PATH_LOCKED,
2326 &trace);
2327 if (r > 0) return;
2328 if (r < 0) {
2329 dout(7) << "failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
2330 ceph_abort(); // this shouldn't happen if the auth pins its path properly!!!!
2331 }
2332
2333 ceph_abort(); // this shouldn't happen; the get_inode above would have succeeded.
2334 }
2335
2336 // yay
2337 dout(7) << "have " << df << " inode " << *in << dendl;
2338
2339 p_state->state = IMPORT_DISCOVERED;
2340
2341 // pin inode in the cache (for now)
2342 ceph_assert(in->is_dir());
2343 in->get(CInode::PIN_IMPORTING);
2344
2345 // reply
2346 dout(7) << " sending export_discover_ack on " << *in << dendl;
2347 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid()), p_state->peer);
2348 assert (g_conf()->mds_kill_import_at != 2);
2349 }
2350
2351 void Migrator::import_reverse_discovering(dirfrag_t df)
2352 {
2353 import_state.erase(df);
2354 }
2355
2356 void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
2357 {
2358 // unpin base
2359 diri->put(CInode::PIN_IMPORTING);
2360 import_state.erase(df);
2361 }
2362
2363 void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
2364 {
2365 set<CDir*> bounds;
2366 cache->map_dirfrag_set(stat.bound_ls, bounds);
2367 import_remove_pins(dir, bounds);
2368 import_reverse_final(dir);
2369 }
2370
2371 void Migrator::handle_export_cancel(const cref_t<MExportDirCancel> &m)
2372 {
2373 dout(7) << "on " << m->get_dirfrag() << dendl;
2374 dirfrag_t df = m->get_dirfrag();
2375 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2376 if (it == import_state.end()) {
2377 ceph_abort_msg("got export_cancel in weird state");
2378 } else if (it->second.state == IMPORT_DISCOVERING) {
2379 import_reverse_discovering(df);
2380 } else if (it->second.state == IMPORT_DISCOVERED) {
2381 CInode *in = cache->get_inode(df.ino);
2382 ceph_assert(in);
2383 import_reverse_discovered(df, in);
2384 } else if (it->second.state == IMPORT_PREPPING) {
2385 CDir *dir = mds->mdcache->get_dirfrag(df);
2386 ceph_assert(dir);
2387 import_reverse_prepping(dir, it->second);
2388 } else if (it->second.state == IMPORT_PREPPED) {
2389 CDir *dir = mds->mdcache->get_dirfrag(df);
2390 ceph_assert(dir);
2391 set<CDir*> bounds;
2392 cache->get_subtree_bounds(dir, bounds);
2393 import_remove_pins(dir, bounds);
2394 // adjust auth back to the exportor
2395 cache->adjust_subtree_auth(dir, it->second.peer);
2396 import_reverse_unfreeze(dir);
2397 } else {
2398 ceph_abort_msg("got export_cancel in weird state");
2399 }
2400 }
2401
2402 class C_MDS_ExportPrep : public MigratorContext {
2403 public:
2404 C_MDS_ExportPrep(Migrator *mig, const cref_t<MExportDirPrep>& m) : MigratorContext(mig), m(m) {}
2405 void finish(int r) override {
2406 mig->handle_export_prep(m, true);
2407 }
2408 private:
2409 cref_t<MExportDirPrep> m;
2410 };
2411
2412 class C_MDS_ExportPrepFactory : public MDSContextFactory {
2413 public:
2414 C_MDS_ExportPrepFactory(Migrator *mig, cref_t<MExportDirPrep> m) : mig(mig), m(m) {}
2415 MDSContext *build() {
2416 return new C_MDS_ExportPrep(mig, m);
2417 }
2418 private:
2419 Migrator *mig;
2420 cref_t<MExportDirPrep> m;
2421 };
2422
2423 void Migrator::decode_export_prep_trace(bufferlist::const_iterator& blp, mds_rank_t oldauth, MDSContext::vec& finished)
2424 {
2425 DECODE_START(1, blp);
2426 dirfrag_t df;
2427 decode(df, blp);
2428 char start;
2429 decode(start, blp);
2430 dout(10) << " trace from " << df << " start " << start << dendl;
2431
2432 CDir *cur = nullptr;
2433 if (start == 'd') {
2434 cur = cache->get_dirfrag(df);
2435 ceph_assert(cur);
2436 dout(10) << " had " << *cur << dendl;
2437 } else if (start == 'f') {
2438 CInode *in = cache->get_inode(df.ino);
2439 ceph_assert(in);
2440 dout(10) << " had " << *in << dendl;
2441 cache->decode_replica_dir(cur, blp, in, oldauth, finished);
2442 dout(10) << " added " << *cur << dendl;
2443 } else if (start == '-') {
2444 // nothing
2445 } else
2446 ceph_abort_msg("unrecognized start char");
2447
2448 while (!blp.end()) {
2449 CDentry *dn = nullptr;
2450 cache->decode_replica_dentry(dn, blp, cur, finished);
2451 dout(10) << " added " << *dn << dendl;
2452 CInode *in = nullptr;
2453 cache->decode_replica_inode(in, blp, dn, finished);
2454 dout(10) << " added " << *in << dendl;
2455 if (blp.end())
2456 break;
2457 cache->decode_replica_dir(cur, blp, in, oldauth, finished);
2458 dout(10) << " added " << *cur << dendl;
2459 }
2460
2461 DECODE_FINISH(blp);
2462 }
2463
2464 void Migrator::handle_export_prep(const cref_t<MExportDirPrep> &m, bool did_assim)
2465 {
2466 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
2467 ceph_assert(oldauth != mds->get_nodeid());
2468
2469 CDir *dir;
2470 CInode *diri;
2471 MDSContext::vec finished;
2472
2473 // assimilate root dir.
2474 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
2475 if (!did_assim) {
2476 ceph_assert(it != import_state.end());
2477 ceph_assert(it->second.state == IMPORT_DISCOVERED);
2478 ceph_assert(it->second.peer == oldauth);
2479 diri = cache->get_inode(m->get_dirfrag().ino);
2480 ceph_assert(diri);
2481 auto p = m->basedir.cbegin();
2482 cache->decode_replica_dir(dir, p, diri, oldauth, finished);
2483 dout(7) << "on " << *dir << " (first pass)" << dendl;
2484 } else {
2485 if (it == import_state.end() ||
2486 it->second.peer != oldauth ||
2487 it->second.tid != m->get_tid()) {
2488 dout(7) << "obsolete message, dropping" << dendl;
2489 return;
2490 }
2491 ceph_assert(it->second.state == IMPORT_PREPPING);
2492 ceph_assert(it->second.peer == oldauth);
2493
2494 dir = cache->get_dirfrag(m->get_dirfrag());
2495 ceph_assert(dir);
2496 dout(7) << "on " << *dir << " (subsequent pass)" << dendl;
2497 diri = dir->get_inode();
2498 }
2499 ceph_assert(dir->is_auth() == false);
2500
2501 cache->show_subtrees();
2502
2503 // build import bound map
2504 map<inodeno_t, fragset_t> import_bound_fragset;
2505 for (const auto &bound : m->get_bounds()) {
2506 dout(10) << " bound " << bound << dendl;
2507 import_bound_fragset[bound.ino].insert_raw(bound.frag);
2508 }
2509 // assimilate contents?
2510 if (!did_assim) {
2511 dout(7) << "doing assim on " << *dir << dendl;
2512
2513 // change import state
2514 it->second.state = IMPORT_PREPPING;
2515 it->second.bound_ls = m->get_bounds();
2516 it->second.bystanders = m->get_bystanders();
2517 ceph_assert(g_conf()->mds_kill_import_at != 3);
2518
2519 // bystander list
2520 dout(7) << "bystanders are " << it->second.bystanders << dendl;
2521
2522 // move pin to dir
2523 diri->put(CInode::PIN_IMPORTING);
2524 dir->get(CDir::PIN_IMPORTING);
2525 dir->state_set(CDir::STATE_IMPORTING);
2526
2527 // assimilate traces to exports
2528 // each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
2529 for (const auto &bl : m->traces) {
2530 auto blp = bl.cbegin();
2531 decode_export_prep_trace(blp, oldauth, finished);
2532 }
2533
2534 // make bound sticky
2535 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2536 p != import_bound_fragset.end();
2537 ++p) {
2538 p->second.simplify();
2539 CInode *in = cache->get_inode(p->first);
2540 ceph_assert(in);
2541 in->get_stickydirs();
2542 dout(7) << " set stickydirs on bound inode " << *in << dendl;
2543 }
2544
2545 } else {
2546 dout(7) << " not doing assim on " << *dir << dendl;
2547 }
2548
2549 MDSGatherBuilder gather(g_ceph_context);
2550
2551 if (!finished.empty())
2552 mds->queue_waiters(finished);
2553
2554
2555 bool success = true;
2556 if (mds->is_active()) {
2557 // open all bounds
2558 set<CDir*> import_bounds;
2559 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2560 p != import_bound_fragset.end();
2561 ++p) {
2562 CInode *in = cache->get_inode(p->first);
2563 ceph_assert(in);
2564
2565 // map fragset into a frag_t list, based on the inode fragtree
2566 frag_vec_t leaves;
2567 for (const auto& frag : p->second) {
2568 in->dirfragtree.get_leaves_under(frag, leaves);
2569 }
2570 dout(10) << " bound inode " << p->first << " fragset " << p->second << " maps to " << leaves << dendl;
2571
2572 for (const auto& leaf : leaves) {
2573 CDir *bound = cache->get_dirfrag(dirfrag_t(p->first, leaf));
2574 if (!bound) {
2575 dout(7) << " opening bounding dirfrag " << leaf << " on " << *in << dendl;
2576 cache->open_remote_dirfrag(in, leaf, gather.new_sub());
2577 continue;
2578 }
2579
2580 if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
2581 dout(7) << " pinning import bound " << *bound << dendl;
2582 bound->get(CDir::PIN_IMPORTBOUND);
2583 bound->state_set(CDir::STATE_IMPORTBOUND);
2584 } else {
2585 dout(7) << " already pinned import bound " << *bound << dendl;
2586 }
2587 import_bounds.insert(bound);
2588 }
2589 }
2590
2591 if (gather.has_subs()) {
2592 C_MDS_ExportPrepFactory cf(this, m);
2593 gather.set_finisher(cf.build());
2594 gather.activate();
2595 return;
2596 }
2597
2598 dout(7) << " all ready, noting auth and freezing import region" << dendl;
2599
2600 if (!mds->mdcache->is_readonly() &&
2601 // for pinning scatter gather. loner has a higher chance to get wrlock
2602 diri->filelock.can_wrlock(diri->get_loner()) &&
2603 diri->nestlock.can_wrlock(diri->get_loner())) {
2604 it->second.mut = new MutationImpl();
2605 // force some locks. hacky.
2606 mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);
2607 mds->locker->wrlock_force(&dir->inode->nestlock, it->second.mut);
2608
2609 // note that i am an ambiguous auth for this subtree.
2610 // specify bounds, since the exporter explicitly defines the region.
2611 cache->adjust_bounded_subtree_auth(dir, import_bounds,
2612 pair<int,int>(oldauth, mds->get_nodeid()));
2613 cache->verify_subtree_bounds(dir, import_bounds);
2614 // freeze.
2615 dir->_freeze_tree();
2616 // note new state
2617 it->second.state = IMPORT_PREPPED;
2618 } else {
2619 dout(7) << " couldn't acquire all needed locks, failing. " << *dir << dendl;
2620 success = false;
2621 }
2622 } else {
2623 dout(7) << " not active, failing. " << *dir << dendl;
2624 success = false;
2625 }
2626
2627 if (!success)
2628 import_reverse_prepping(dir, it->second);
2629
2630 // ok!
2631 dout(7) << " sending export_prep_ack on " << *dir << dendl;
2632 mds->send_message(make_message<MExportDirPrepAck>(dir->dirfrag(), success, m->get_tid()), m->get_connection());
2633
2634 ceph_assert(g_conf()->mds_kill_import_at != 4);
2635 }
2636
2637
2638
2639
2640 class C_MDS_ImportDirLoggedStart : public MigratorLogContext {
2641 dirfrag_t df;
2642 CDir *dir;
2643 mds_rank_t from;
2644 public:
2645 map<client_t,pair<Session*,uint64_t> > imported_session_map;
2646
2647 C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
2648 MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
2649 dir->get(CDir::PIN_PTRWAITER);
2650 }
2651 void finish(int r) override {
2652 mig->import_logged_start(df, dir, from, imported_session_map);
2653 dir->put(CDir::PIN_PTRWAITER);
2654 }
2655 };
2656
2657 void Migrator::handle_export_dir(const cref_t<MExportDir> &m)
2658 {
2659 assert (g_conf()->mds_kill_import_at != 5);
2660 CDir *dir = cache->get_dirfrag(m->dirfrag);
2661 ceph_assert(dir);
2662
2663 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
2664 dout(7) << "importing " << *dir << " from " << oldauth << dendl;
2665
2666 ceph_assert(!dir->is_auth());
2667 ceph_assert(dir->freeze_tree_state);
2668
2669 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->dirfrag);
2670 ceph_assert(it != import_state.end());
2671 ceph_assert(it->second.state == IMPORT_PREPPED);
2672 ceph_assert(it->second.tid == m->get_tid());
2673 ceph_assert(it->second.peer == oldauth);
2674
2675 if (!dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()))
2676 dir->get_inode()->dirfragtree.force_to_leaf(g_ceph_context, dir->get_frag());
2677
2678 cache->show_subtrees();
2679
2680 C_MDS_ImportDirLoggedStart *onlogged = new C_MDS_ImportDirLoggedStart(this, dir, oldauth);
2681
2682 // start the journal entry
2683 EImportStart *le = new EImportStart(mds->mdlog, dir->dirfrag(), m->bounds, oldauth);
2684 mds->mdlog->start_entry(le);
2685
2686 le->metablob.add_dir_context(dir);
2687
2688 // adjust auth (list us _first_)
2689 cache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth);
2690
2691 // new client sessions, open these after we journal
2692 // include imported sessions in EImportStart
2693 auto cmp = m->client_map.cbegin();
2694 map<client_t,entity_inst_t> client_map;
2695 map<client_t,client_metadata_t> client_metadata_map;
2696 decode(client_map, cmp);
2697 decode(client_metadata_map, cmp);
2698 ceph_assert(cmp.end());
2699 le->cmapv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
2700 onlogged->imported_session_map);
2701 encode(client_map, le->client_map, mds->mdsmap->get_up_features());
2702 encode(client_metadata_map, le->client_map);
2703
2704 auto blp = m->export_data.cbegin();
2705 int num_imported_inodes = 0;
2706 while (!blp.end()) {
2707 decode_import_dir(blp,
2708 oldauth,
2709 dir, // import root
2710 le,
2711 mds->mdlog->get_current_segment(),
2712 it->second.peer_exports,
2713 it->second.updated_scatterlocks,
2714 num_imported_inodes);
2715 }
2716 dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
2717
2718 // include bounds in EImportStart
2719 set<CDir*> import_bounds;
2720 for (const auto &bound : m->bounds) {
2721 CDir *bd = cache->get_dirfrag(bound);
2722 ceph_assert(bd);
2723 le->metablob.add_dir(bd, false); // note that parent metadata is already in the event
2724 import_bounds.insert(bd);
2725 }
2726 cache->verify_subtree_bounds(dir, import_bounds);
2727
2728 // adjust popularity
2729 mds->balancer->add_import(dir);
2730
2731 dout(7) << "did " << *dir << dendl;
2732
2733 // note state
2734 it->second.state = IMPORT_LOGGINGSTART;
2735 assert (g_conf()->mds_kill_import_at != 6);
2736
2737 // log it
2738 mds->mdlog->submit_entry(le, onlogged);
2739 mds->mdlog->flush();
2740
2741 // some stats
2742 if (mds->logger) {
2743 mds->logger->inc(l_mds_imported);
2744 mds->logger->inc(l_mds_imported_inodes, num_imported_inodes);
2745 }
2746 }
2747
2748
2749 /*
2750 * this is an import helper
2751 * called by import_finish, and import_reverse and friends.
2752 */
2753 void Migrator::import_remove_pins(CDir *dir, set<CDir*>& bounds)
2754 {
2755 import_state_t& stat = import_state[dir->dirfrag()];
2756 // root
2757 dir->put(CDir::PIN_IMPORTING);
2758 dir->state_clear(CDir::STATE_IMPORTING);
2759
2760 // bounding inodes
2761 set<inodeno_t> did;
2762 for (list<dirfrag_t>::iterator p = stat.bound_ls.begin();
2763 p != stat.bound_ls.end();
2764 ++p) {
2765 if (did.count(p->ino))
2766 continue;
2767 did.insert(p->ino);
2768 CInode *in = cache->get_inode(p->ino);
2769 ceph_assert(in);
2770 in->put_stickydirs();
2771 }
2772
2773 if (stat.state == IMPORT_PREPPING) {
2774 for (auto bd : bounds) {
2775 if (bd->state_test(CDir::STATE_IMPORTBOUND)) {
2776 bd->put(CDir::PIN_IMPORTBOUND);
2777 bd->state_clear(CDir::STATE_IMPORTBOUND);
2778 }
2779 }
2780 } else if (stat.state >= IMPORT_PREPPED) {
2781 // bounding dirfrags
2782 for (auto bd : bounds) {
2783 ceph_assert(bd->state_test(CDir::STATE_IMPORTBOUND));
2784 bd->put(CDir::PIN_IMPORTBOUND);
2785 bd->state_clear(CDir::STATE_IMPORTBOUND);
2786 }
2787 }
2788 }
2789
2790 class C_MDC_QueueContexts : public MigratorContext {
2791 public:
2792 MDSContext::vec contexts;
2793 C_MDC_QueueContexts(Migrator *m) : MigratorContext(m) {}
2794 void finish(int r) override {
2795 // execute contexts immediately after 'this' context
2796 get_mds()->queue_waiters_front(contexts);
2797 }
2798 };
2799
2800 /*
2801 * note: this does teh full work of reversing and import and cleaning up
2802 * state.
2803 * called by both handle_mds_failure and by handle_resolve (if we are
2804 * a survivor coping with an exporter failure+recovery).
2805 */
2806 void Migrator::import_reverse(CDir *dir)
2807 {
2808 dout(7) << *dir << dendl;
2809
2810 import_state_t& stat = import_state[dir->dirfrag()];
2811 stat.state = IMPORT_ABORTING;
2812
2813 set<CDir*> bounds;
2814 cache->get_subtree_bounds(dir, bounds);
2815
2816 // remove pins
2817 import_remove_pins(dir, bounds);
2818
2819 // update auth, with possible subtree merge.
2820 ceph_assert(dir->is_subtree_root());
2821 if (mds->is_resolve())
2822 cache->trim_non_auth_subtree(dir);
2823
2824 cache->adjust_subtree_auth(dir, stat.peer);
2825
2826 auto fin = new C_MDC_QueueContexts(this);
2827 if (!dir->get_inode()->is_auth() &&
2828 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2829 dir->get_inode()->clear_scatter_dirty();
2830 // wake up scatter_nudge waiters
2831 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2832 }
2833
2834 int num_dentries = 0;
2835 // adjust auth bits.
2836 std::deque<CDir*> q;
2837 q.push_back(dir);
2838 while (!q.empty()) {
2839 CDir *cur = q.front();
2840 q.pop_front();
2841
2842 // dir
2843 cur->abort_import();
2844
2845 for (auto &p : *cur) {
2846 CDentry *dn = p.second;
2847
2848 // dentry
2849 dn->state_clear(CDentry::STATE_AUTH);
2850 dn->clear_replica_map();
2851 dn->set_replica_nonce(CDentry::EXPORT_NONCE);
2852 if (dn->is_dirty())
2853 dn->mark_clean();
2854
2855 // inode?
2856 if (dn->get_linkage()->is_primary()) {
2857 CInode *in = dn->get_linkage()->get_inode();
2858 in->state_clear(CDentry::STATE_AUTH);
2859 in->clear_replica_map();
2860 in->set_replica_nonce(CInode::EXPORT_NONCE);
2861 if (in->is_dirty())
2862 in->mark_clean();
2863 in->clear_dirty_rstat();
2864 if (!in->has_subtree_root_dirfrag(mds->get_nodeid())) {
2865 in->clear_scatter_dirty();
2866 in->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2867 }
2868
2869 in->clear_dirty_parent();
2870
2871 in->authlock.clear_gather();
2872 in->linklock.clear_gather();
2873 in->dirfragtreelock.clear_gather();
2874 in->filelock.clear_gather();
2875
2876 in->clear_file_locks();
2877
2878 // non-bounding dir?
2879 auto&& dfs = in->get_dirfrags();
2880 for (const auto& dir : dfs) {
2881 if (bounds.count(dir) == 0)
2882 q.push_back(dir);
2883 }
2884 }
2885
2886 cache->touch_dentry_bottom(dn); // move dentry to tail of LRU
2887 ++num_dentries;
2888 }
2889 }
2890
2891 dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
2892
2893 if (stat.state == IMPORT_ACKING) {
2894 // remove imported caps
2895 for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
2896 p != stat.peer_exports.end();
2897 ++p) {
2898 CInode *in = p->first;
2899 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
2900 q != p->second.end();
2901 ++q) {
2902 Capability *cap = in->get_client_cap(q->first);
2903 if (!cap) {
2904 ceph_assert(!stat.session_map.count(q->first));
2905 continue;
2906 }
2907 if (cap->is_importing())
2908 in->remove_client_cap(q->first);
2909 }
2910 in->put(CInode::PIN_IMPORTINGCAPS);
2911 }
2912 for (auto& p : stat.session_map) {
2913 Session *session = p.second.first;
2914 session->dec_importing();
2915 }
2916 }
2917
2918 // log our failure
2919 mds->mdlog->start_submit_entry(new EImportFinish(dir, false)); // log failure
2920
2921 cache->trim(num_dentries); // try trimming dentries
2922
2923 // notify bystanders; wait in aborting state
2924 import_notify_abort(dir, bounds);
2925 }
2926
2927 void Migrator::import_notify_finish(CDir *dir, set<CDir*>& bounds)
2928 {
2929 dout(7) << *dir << dendl;
2930
2931 import_state_t& stat = import_state[dir->dirfrag()];
2932 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2933 p != stat.bystanders.end();
2934 ++p) {
2935 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, false,
2936 pair<int,int>(stat.peer, mds->get_nodeid()),
2937 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
2938 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2939 notify->get_bounds().push_back((*i)->dirfrag());
2940 mds->send_message_mds(notify, *p);
2941 }
2942 }
2943
2944 void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)
2945 {
2946 dout(7) << *dir << dendl;
2947
2948 import_state_t& stat = import_state[dir->dirfrag()];
2949 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2950 p != stat.bystanders.end(); ) {
2951 if (mds->is_cluster_degraded() &&
2952 !mds->mdsmap->is_clientreplay_or_active_or_stopping(*p)) {
2953 // this can happen if both exporter and bystander fail in the same mdsmap epoch
2954 stat.bystanders.erase(p++);
2955 continue;
2956 }
2957 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
2958 mds_authority_t(stat.peer, mds->get_nodeid()),
2959 mds_authority_t(stat.peer, CDIR_AUTH_UNKNOWN));
2960 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2961 notify->get_bounds().push_back((*i)->dirfrag());
2962 mds->send_message_mds(notify, *p);
2963 ++p;
2964 }
2965 if (stat.bystanders.empty()) {
2966 dout(7) << "no bystanders, finishing reverse now" << dendl;
2967 import_reverse_unfreeze(dir);
2968 } else {
2969 assert (g_conf()->mds_kill_import_at != 10);
2970 }
2971 }
2972
2973 void Migrator::import_reverse_unfreeze(CDir *dir)
2974 {
2975 dout(7) << *dir << dendl;
2976 ceph_assert(!dir->is_auth());
2977 cache->discard_delayed_expire(dir);
2978 dir->unfreeze_tree();
2979 if (dir->is_subtree_root())
2980 cache->try_subtree_merge(dir);
2981 import_reverse_final(dir);
2982 }
2983
2984 void Migrator::import_reverse_final(CDir *dir)
2985 {
2986 dout(7) << *dir << dendl;
2987
2988 // clean up
2989 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
2990 ceph_assert(it != import_state.end());
2991
2992 MutationRef mut = it->second.mut;
2993 import_state.erase(it);
2994
2995 // send pending import_maps?
2996 mds->mdcache->maybe_send_pending_resolves();
2997
2998 if (mut) {
2999 mds->locker->drop_locks(mut.get());
3000 mut->cleanup();
3001 }
3002
3003 cache->show_subtrees();
3004 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
3005 }
3006
3007
3008
3009
3010 void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
3011 map<client_t,pair<Session*,uint64_t> >& imported_session_map)
3012 {
3013 dout(7) << *dir << dendl;
3014
3015 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
3016 if (it == import_state.end() ||
3017 it->second.state != IMPORT_LOGGINGSTART) {
3018 dout(7) << "import " << df << " must have aborted" << dendl;
3019 mds->server->finish_force_open_sessions(imported_session_map);
3020 return;
3021 }
3022
3023 // note state
3024 it->second.state = IMPORT_ACKING;
3025
3026 assert (g_conf()->mds_kill_import_at != 7);
3027
3028 // force open client sessions and finish cap import
3029 mds->server->finish_force_open_sessions(imported_session_map, false);
3030
3031 map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
3032 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3033 p != it->second.peer_exports.end();
3034 ++p) {
3035 // parameter 'peer' is NONE, delay sending cap import messages to client
3036 finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map,
3037 p->second, imported_caps[p->first->ino()]);
3038 }
3039
3040 it->second.session_map.swap(imported_session_map);
3041
3042 // send notify's etc.
3043 dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
3044
3045 // test surviving observer of a failed migration that did not complete
3046 //assert(dir->replica_map.size() < 2 || mds->get_nodeid() != 0);
3047
3048 auto ack = make_message<MExportDirAck>(dir->dirfrag(), it->second.tid);
3049 encode(imported_caps, ack->imported_caps);
3050
3051 mds->send_message_mds(ack, from);
3052 assert (g_conf()->mds_kill_import_at != 8);
3053
3054 cache->show_subtrees();
3055 }
3056
3057 void Migrator::handle_export_finish(const cref_t<MExportDirFinish> &m)
3058 {
3059 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
3060 ceph_assert(dir);
3061 dout(7) << *dir << (m->is_last() ? " last" : "") << dendl;
3062
3063 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
3064 ceph_assert(it != import_state.end());
3065 ceph_assert(it->second.tid == m->get_tid());
3066
3067 import_finish(dir, false, m->is_last());
3068 }
3069
3070 void Migrator::import_finish(CDir *dir, bool notify, bool last)
3071 {
3072 dout(7) << *dir << dendl;
3073
3074 map<dirfrag_t,import_state_t>::iterator it = import_state.find(dir->dirfrag());
3075 ceph_assert(it != import_state.end());
3076 ceph_assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING);
3077
3078 if (it->second.state == IMPORT_ACKING) {
3079 ceph_assert(dir->is_auth());
3080 cache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
3081 }
3082
3083 // log finish
3084 ceph_assert(g_conf()->mds_kill_import_at != 9);
3085
3086 if (it->second.state == IMPORT_ACKING) {
3087 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3088 p != it->second.peer_exports.end();
3089 ++p) {
3090 CInode *in = p->first;
3091 ceph_assert(in->is_auth());
3092 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
3093 q != p->second.end();
3094 ++q) {
3095 auto r = it->second.session_map.find(q->first);
3096 if (r == it->second.session_map.end())
3097 continue;
3098
3099 Session *session = r->second.first;
3100 Capability *cap = in->get_client_cap(q->first);
3101 ceph_assert(cap);
3102 cap->merge(q->second, true);
3103 cap->clear_importing();
3104 mds->mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
3105 q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
3106 }
3107 p->second.clear();
3108 in->replica_caps_wanted = 0;
3109 }
3110 for (auto& p : it->second.session_map) {
3111 Session *session = p.second.first;
3112 session->dec_importing();
3113 }
3114 }
3115
3116 if (!last) {
3117 ceph_assert(it->second.state == IMPORT_ACKING);
3118 it->second.state = IMPORT_FINISHING;
3119 return;
3120 }
3121
3122 // remove pins
3123 set<CDir*> bounds;
3124 cache->get_subtree_bounds(dir, bounds);
3125
3126 if (notify)
3127 import_notify_finish(dir, bounds);
3128
3129 import_remove_pins(dir, bounds);
3130
3131 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3132 it->second.peer_exports.swap(peer_exports);
3133
3134 // clear import state (we're done!)
3135 MutationRef mut = it->second.mut;
3136 import_state.erase(it);
3137
3138 mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
3139
3140 // process delayed expires
3141 cache->process_delayed_expire(dir);
3142
3143 // unfreeze tree, with possible subtree merge.
3144 dir->unfreeze_tree();
3145 cache->try_subtree_merge(dir);
3146
3147 cache->show_subtrees();
3148 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
3149
3150 if (mut) {
3151 mds->locker->drop_locks(mut.get());
3152 mut->cleanup();
3153 }
3154
3155 // re-eval imported caps
3156 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = peer_exports.begin();
3157 p != peer_exports.end();
3158 ++p) {
3159 if (p->first->is_auth())
3160 mds->locker->eval(p->first, CEPH_CAP_LOCKS, true);
3161 p->first->put(CInode::PIN_IMPORTINGCAPS);
3162 }
3163
3164 // send pending import_maps?
3165 mds->mdcache->maybe_send_pending_resolves();
3166
3167 // did i just import mydir?
3168 if (dir->ino() == MDS_INO_MDSDIR(mds->get_nodeid()))
3169 cache->populate_mydir();
3170
3171 // is it empty?
3172 if (dir->get_num_head_items() == 0 &&
3173 !dir->inode->is_auth()) {
3174 // reexport!
3175 export_empty_import(dir);
3176 }
3177 }
3178
3179
3180 void Migrator::decode_import_inode(CDentry *dn, bufferlist::const_iterator& blp,
3181 mds_rank_t oldauth, LogSegment *ls,
3182 map<CInode*, map<client_t,Capability::Export> >& peer_exports,
3183 list<ScatterLock*>& updated_scatterlocks)
3184 {
3185 CInode *in;
3186 bool added = false;
3187 DECODE_START(1, blp);
3188 dout(15) << " on " << *dn << dendl;
3189
3190 inodeno_t ino;
3191 snapid_t last;
3192 decode(ino, blp);
3193 decode(last, blp);
3194
3195 in = cache->get_inode(ino, last);
3196 if (!in) {
3197 in = new CInode(mds->mdcache, true, 1, last);
3198 added = true;
3199 }
3200
3201 // state after link -- or not! -sage
3202 in->decode_import(blp, ls); // cap imports are noted for later action
3203
3204 // caps
3205 decode_import_inode_caps(in, true, blp, peer_exports);
3206
3207 DECODE_FINISH(blp);
3208
3209 // link before state -- or not! -sage
3210 if (dn->get_linkage()->get_inode() != in) {
3211 ceph_assert(!dn->get_linkage()->get_inode());
3212 dn->dir->link_primary_inode(dn, in);
3213 }
3214
3215 if (in->is_dir())
3216 dn->dir->pop_lru_subdirs.push_back(&in->item_pop_lru);
3217
3218 // add inode?
3219 if (added) {
3220 cache->add_inode(in);
3221 dout(10) << "added " << *in << dendl;
3222 } else {
3223 dout(10) << " had " << *in << dendl;
3224 }
3225
3226 if (in->inode.is_dirty_rstat())
3227 in->mark_dirty_rstat();
3228
3229 // clear if dirtyscattered, since we're going to journal this
3230 // but not until we _actually_ finish the import...
3231 if (in->filelock.is_dirty()) {
3232 updated_scatterlocks.push_back(&in->filelock);
3233 mds->locker->mark_updated_scatterlock(&in->filelock);
3234 }
3235
3236 if (in->dirfragtreelock.is_dirty()) {
3237 updated_scatterlocks.push_back(&in->dirfragtreelock);
3238 mds->locker->mark_updated_scatterlock(&in->dirfragtreelock);
3239 }
3240
3241 // adjust replica list
3242 //assert(!in->is_replica(oldauth)); // not true on failed export
3243 in->add_replica(oldauth, CInode::EXPORT_NONCE);
3244 if (in->is_replica(mds->get_nodeid()))
3245 in->remove_replica(mds->get_nodeid());
3246
3247 if (in->snaplock.is_stable() &&
3248 in->snaplock.get_state() != LOCK_SYNC)
3249 mds->locker->try_eval(&in->snaplock, NULL);
3250
3251 if (in->policylock.is_stable() &&
3252 in->policylock.get_state() != LOCK_SYNC)
3253 mds->locker->try_eval(&in->policylock, NULL);
3254 }
3255
3256 void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
3257 bufferlist::const_iterator &blp,
3258 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
3259 {
3260 DECODE_START(1, blp);
3261 map<client_t,Capability::Export> cap_map;
3262 decode(cap_map, blp);
3263 if (auth_cap) {
3264 mempool::mds_co::compact_map<int32_t,int32_t> mds_wanted;
3265 decode(mds_wanted, blp);
3266 mds_wanted.erase(mds->get_nodeid());
3267 in->set_mds_caps_wanted(mds_wanted);
3268 }
3269 if (!cap_map.empty() ||
3270 (auth_cap && (in->get_caps_wanted() & ~CEPH_CAP_PIN))) {
3271 peer_exports[in].swap(cap_map);
3272 in->get(CInode::PIN_IMPORTINGCAPS);
3273 }
3274 DECODE_FINISH(blp);
3275 }
3276
3277 void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
3278 const map<client_t,pair<Session*,uint64_t> >& session_map,
3279 const map<client_t,Capability::Export> &export_map,
3280 map<client_t,Capability::Import> &import_map)
3281 {
3282 for (auto& it : export_map) {
3283 dout(10) << "for client." << it.first << " on " << *in << dendl;
3284
3285 auto p = session_map.find(it.first);
3286 if (p == session_map.end()) {
3287 dout(10) << " no session for client." << it.first << dendl;
3288 (void)import_map[it.first];
3289 continue;
3290 }
3291
3292 Session *session = p->second.first;
3293
3294 Capability *cap = in->get_client_cap(it.first);
3295 if (!cap) {
3296 cap = in->add_client_cap(it.first, session);
3297 if (peer < 0)
3298 cap->mark_importing();
3299 }
3300
3301 // Always ask exporter mds to send cap export messages for auth caps.
3302 // For non-auth caps, ask exporter mds to send cap export messages to
3303 // clients who haven't opened sessions. The cap export messages will
3304 // make clients open sessions.
3305 if (auth_cap || !session->get_connection()) {
3306 Capability::Import& im = import_map[it.first];
3307 im.cap_id = cap->get_cap_id();
3308 im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
3309 im.issue_seq = cap->get_last_seq() + 1;
3310 }
3311
3312 if (peer >= 0) {
3313 cap->merge(it.second, auth_cap);
3314 mds->mdcache->do_cap_import(session, in, cap, it.second.cap_id,
3315 it.second.seq, it.second.mseq - 1, peer,
3316 auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
3317 }
3318 }
3319
3320 if (peer >= 0) {
3321 in->replica_caps_wanted = 0;
3322 in->put(CInode::PIN_IMPORTINGCAPS);
3323 }
3324 }
3325
3326 void Migrator::decode_import_dir(bufferlist::const_iterator& blp,
3327 mds_rank_t oldauth,
3328 CDir *import_root,
3329 EImportStart *le,
3330 LogSegment *ls,
3331 map<CInode*,map<client_t,Capability::Export> >& peer_exports,
3332 list<ScatterLock*>& updated_scatterlocks, int &num_imported)
3333 {
3334 DECODE_START(1, blp);
3335 // set up dir
3336 dirfrag_t df;
3337 decode(df, blp);
3338
3339 CInode *diri = cache->get_inode(df.ino);
3340 ceph_assert(diri);
3341 CDir *dir = diri->get_or_open_dirfrag(mds->mdcache, df.frag);
3342 ceph_assert(dir);
3343
3344 dout(7) << *dir << dendl;
3345
3346 if (!dir->freeze_tree_state) {
3347 ceph_assert(dir->get_version() == 0);
3348 dir->freeze_tree_state = import_root->freeze_tree_state;
3349 }
3350
3351 // assimilate state
3352 dir->decode_import(blp, ls);
3353
3354 // adjust replica list
3355 //assert(!dir->is_replica(oldauth)); // not true on failed export
3356 dir->add_replica(oldauth, CDir::EXPORT_NONCE);
3357 if (dir->is_replica(mds->get_nodeid()))
3358 dir->remove_replica(mds->get_nodeid());
3359
3360 // add to journal entry
3361 if (le)
3362 le->metablob.add_import_dir(dir);
3363
3364 int num_imported = 0;
3365
3366 // take all waiters on this dir
3367 // NOTE: a pass of imported data is guaranteed to get all of my waiters because
3368 // a replica's presense in my cache implies/forces it's presense in authority's.
3369 MDSContext::vec waiters;
3370 dir->take_waiting(CDir::WAIT_ANY_MASK, waiters);
3371 for (auto c : waiters)
3372 dir->add_waiter(CDir::WAIT_UNFREEZE, c); // UNFREEZE will get kicked both on success or failure
3373
3374 dout(15) << "doing contents" << dendl;
3375
3376 // contents
3377 __u32 nden;
3378 decode(nden, blp);
3379
3380 for (; nden>0; nden--) {
3381 num_imported++;
3382
3383 // dentry
3384 string dname;
3385 snapid_t last;
3386 decode(dname, blp);
3387 decode(last, blp);
3388
3389 CDentry *dn = dir->lookup_exact_snap(dname, last);
3390 if (!dn)
3391 dn = dir->add_null_dentry(dname, 1, last);
3392
3393 dn->decode_import(blp, ls);
3394
3395 dn->add_replica(oldauth, CDentry::EXPORT_NONCE);
3396 if (dn->is_replica(mds->get_nodeid()))
3397 dn->remove_replica(mds->get_nodeid());
3398
3399 // dentry lock in unreadable state can block path traverse
3400 if (dn->lock.get_state() != LOCK_SYNC)
3401 mds->locker->try_eval(&dn->lock, NULL);
3402
3403 dout(15) << " got " << *dn << dendl;
3404
3405 // points to...
3406 char icode;
3407 decode(icode, blp);
3408
3409 if (icode == 'N') {
3410 // null dentry
3411 ceph_assert(dn->get_linkage()->is_null());
3412
3413 // fall thru
3414 }
3415 else if (icode == 'L') {
3416 // remote link
3417 inodeno_t ino;
3418 unsigned char d_type;
3419 decode(ino, blp);
3420 decode(d_type, blp);
3421 if (dn->get_linkage()->is_remote()) {
3422 ceph_assert(dn->get_linkage()->get_remote_ino() == ino);
3423 } else {
3424 dir->link_remote_inode(dn, ino, d_type);
3425 }
3426 }
3427 else if (icode == 'I') {
3428 // inode
3429 ceph_assert(le);
3430 decode_import_inode(dn, blp, oldauth, ls,
3431 peer_exports, updated_scatterlocks);
3432 }
3433
3434 // add dentry to journal entry
3435 if (le)
3436 le->metablob.add_import_dentry(dn);
3437 }
3438
3439 #ifdef MDS_VERIFY_FRAGSTAT
3440 if (dir->is_complete())
3441 dir->verify_fragstat();
3442 #endif
3443
3444 dir->inode->maybe_export_pin();
3445
3446 dout(7) << " done " << *dir << dendl;
3447 DECODE_FINISH(blp);
3448 }
3449
3450
3451
3452
3453
3454 // authority bystander
3455
3456 void Migrator::handle_export_notify(const cref_t<MExportDirNotify> &m)
3457 {
3458 if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) {
3459 return;
3460 }
3461
3462 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
3463
3464 mds_rank_t from = mds_rank_t(m->get_source().num());
3465 mds_authority_t old_auth = m->get_old_auth();
3466 mds_authority_t new_auth = m->get_new_auth();
3467
3468 if (!dir) {
3469 dout(7) << old_auth << " -> " << new_auth
3470 << " on missing dir " << m->get_dirfrag() << dendl;
3471 } else if (dir->authority() != old_auth) {
3472 dout(7) << "old_auth was " << dir->authority()
3473 << " != " << old_auth << " -> " << new_auth
3474 << " on " << *dir << dendl;
3475 } else {
3476 dout(7) << old_auth << " -> " << new_auth
3477 << " on " << *dir << dendl;
3478 // adjust auth
3479 set<CDir*> have;
3480 cache->map_dirfrag_set(m->get_bounds(), have);
3481 cache->adjust_bounded_subtree_auth(dir, have, new_auth);
3482
3483 // induce a merge?
3484 cache->try_subtree_merge(dir);
3485 }
3486
3487 // send ack
3488 if (m->wants_ack()) {
3489 mds->send_message_mds(make_message<MExportDirNotifyAck>(m->get_dirfrag(), m->get_tid(), m->get_new_auth()), from);
3490 } else {
3491 // aborted. no ack.
3492 dout(7) << "no ack requested" << dendl;
3493 }
3494 }
3495
3496 /** cap exports **/
3497 void Migrator::export_caps(CInode *in)
3498 {
3499 mds_rank_t dest = in->authority().first;
3500 dout(7) << "to mds." << dest << " " << *in << dendl;
3501
3502 ceph_assert(in->is_any_caps());
3503 ceph_assert(!in->is_auth());
3504 ceph_assert(!in->is_ambiguous_auth());
3505 ceph_assert(!in->state_test(CInode::STATE_EXPORTINGCAPS));
3506
3507 auto ex = make_message<MExportCaps>();
3508 ex->ino = in->ino();
3509
3510 encode_export_inode_caps(in, false, ex->cap_bl, ex->client_map, ex->client_metadata_map);
3511
3512 mds->send_message_mds(ex, dest);
3513 }
3514
3515 void Migrator::handle_export_caps_ack(const cref_t<MExportCapsAck> &ack)
3516 {
3517 mds_rank_t from = ack->get_source().num();
3518 CInode *in = cache->get_inode(ack->ino);
3519 if (in) {
3520 ceph_assert(!in->is_auth());
3521
3522 dout(10) << *ack << " from "
3523 << ack->get_source() << " on " << *in << dendl;
3524
3525 map<client_t,Capability::Import> imported_caps;
3526 map<client_t,uint64_t> caps_ids;
3527 auto blp = ack->cap_bl.cbegin();
3528 decode(imported_caps, blp);
3529 decode(caps_ids, blp);
3530
3531 for (auto& it : imported_caps) {
3532 Capability *cap = in->get_client_cap(it.first);
3533 if (!cap || cap->get_cap_id() != caps_ids.at(it.first))
3534 continue;
3535
3536 dout(7) << " telling client." << it.first
3537 << " exported caps on " << *in << dendl;
3538 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
3539 cap->get_cap_id(), cap->get_mseq(),
3540 mds->get_osd_epoch_barrier());
3541 m->set_cap_peer(it.second.cap_id, it.second.issue_seq, it.second.mseq, from, 0);
3542 mds->send_message_client_counted(m, it.first);
3543
3544 in->remove_client_cap(it.first);
3545 }
3546
3547 mds->locker->request_inode_file_caps(in);
3548 mds->locker->try_eval(in, CEPH_CAP_LOCKS);
3549 }
3550 }
3551
3552 void Migrator::handle_gather_caps(const cref_t<MGatherCaps> &m)
3553 {
3554 CInode *in = cache->get_inode(m->ino);
3555 if (!in)
3556 return;
3557
3558 dout(10) << *m << " from " << m->get_source()
3559 << " on " << *in << dendl;
3560
3561 if (in->is_any_caps() &&
3562 !in->is_auth() &&
3563 !in->is_ambiguous_auth() &&
3564 !in->state_test(CInode::STATE_EXPORTINGCAPS))
3565 export_caps(in);
3566 }
3567
3568 class C_M_LoggedImportCaps : public MigratorLogContext {
3569 CInode *in;
3570 mds_rank_t from;
3571 public:
3572 map<client_t,pair<Session*,uint64_t> > imported_session_map;
3573 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3574
3575 C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
3576 void finish(int r) override {
3577 mig->logged_import_caps(in, from, imported_session_map, peer_exports);
3578 }
3579 };
3580
3581 void Migrator::handle_export_caps(const cref_t<MExportCaps> &ex)
3582 {
3583 dout(10) << *ex << " from " << ex->get_source() << dendl;
3584 CInode *in = cache->get_inode(ex->ino);
3585
3586 ceph_assert(in);
3587 ceph_assert(in->is_auth());
3588
3589 // FIXME
3590 if (!in->can_auth_pin()) {
3591 return;
3592 }
3593
3594 in->auth_pin(this);
3595
3596 map<client_t,entity_inst_t> client_map{ex->client_map};
3597 map<client_t,client_metadata_t> client_metadata_map{ex->client_metadata_map};
3598
3599 C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
3600 this, in, mds_rank_t(ex->get_source().num()));
3601
3602 version_t pv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
3603 finish->imported_session_map);
3604 // decode new caps
3605 auto blp = ex->cap_bl.cbegin();
3606 decode_import_inode_caps(in, false, blp, finish->peer_exports);
3607 ceph_assert(!finish->peer_exports.empty()); // thus, inode is pinned.
3608
3609 // journal open client sessions
3610 ESessions *le = new ESessions(pv, std::move(client_map),
3611 std::move(client_metadata_map));
3612 mds->mdlog->start_submit_entry(le, finish);
3613 mds->mdlog->flush();
3614 }
3615
3616
3617 void Migrator::logged_import_caps(CInode *in,
3618 mds_rank_t from,
3619 map<client_t,pair<Session*,uint64_t> >& imported_session_map,
3620 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
3621 {
3622 dout(10) << *in << dendl;
3623 // see export_go() vs export_go_synced()
3624 ceph_assert(in->is_auth());
3625
3626 // force open client sessions and finish cap import
3627 mds->server->finish_force_open_sessions(imported_session_map);
3628
3629 auto it = peer_exports.find(in);
3630 ceph_assert(it != peer_exports.end());
3631
3632 // clients will release caps from the exporter when they receive the cap import message.
3633 map<client_t,Capability::Import> imported_caps;
3634 finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
3635 mds->locker->eval(in, CEPH_CAP_LOCKS, true);
3636
3637 if (!imported_caps.empty()) {
3638 auto ack = make_message<MExportCapsAck>(in->ino());
3639 map<client_t,uint64_t> peer_caps_ids;
3640 for (auto &p : imported_caps )
3641 peer_caps_ids[p.first] = it->second.at(p.first).cap_id;
3642
3643 encode(imported_caps, ack->cap_bl);
3644 encode(peer_caps_ids, ack->cap_bl);
3645 mds->send_message_mds(ack, from);
3646 }
3647
3648 in->auth_unpin(this);
3649 }
3650
3651 Migrator::Migrator(MDSRank *m, MDCache *c) : mds(m), cache(c) {
3652 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
3653 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
3654 }
3655
3656 void Migrator::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
3657 {
3658 if (changed.count("mds_max_export_size"))
3659 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
3660 if (changed.count("mds_inject_migrator_session_race")) {
3661 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
3662 dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
3663 }
3664 }