]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/Migrator.cc
update sources to v12.1.0
[ceph.git] / ceph / src / mds / Migrator.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSRank.h"
16#include "MDCache.h"
17#include "CInode.h"
18#include "CDir.h"
19#include "CDentry.h"
20#include "Migrator.h"
21#include "Locker.h"
22#include "Server.h"
23
24#include "MDBalancer.h"
25#include "MDLog.h"
26#include "MDSMap.h"
27#include "Mutation.h"
28
29#include "include/filepath.h"
30
31#include "events/EExport.h"
32#include "events/EImportStart.h"
33#include "events/EImportFinish.h"
34#include "events/ESessions.h"
35
36#include "msg/Messenger.h"
37
38#include "messages/MClientCaps.h"
39
40#include "messages/MExportDirDiscover.h"
41#include "messages/MExportDirDiscoverAck.h"
42#include "messages/MExportDirCancel.h"
43#include "messages/MExportDirPrep.h"
44#include "messages/MExportDirPrepAck.h"
45#include "messages/MExportDir.h"
46#include "messages/MExportDirAck.h"
47#include "messages/MExportDirNotify.h"
48#include "messages/MExportDirNotifyAck.h"
49#include "messages/MExportDirFinish.h"
50
51#include "messages/MExportCaps.h"
52#include "messages/MExportCapsAck.h"
53#include "messages/MGatherCaps.h"
54
55
56/*
57 * this is what the dir->dir_auth values look like
58 *
59 * dir_auth authbits
60 * export
61 * me me - before
62 * me, me me - still me, but preparing for export
63 * me, them me - send MExportDir (peer is preparing)
64 * them, me me - journaled EExport
65 * them them - done
66 *
67 * import:
68 * them them - before
69 * me, them me - journaled EImportStart
70 * me me - done
71 *
72 * which implies:
73 * - auth bit is set if i am listed as first _or_ second dir_auth.
74 */
75
76#include "common/config.h"
77
78
79#define dout_context g_ceph_context
80#define dout_subsys ceph_subsys_mds
81#undef dout_prefix
82#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".migrator "
83
84
85class MigratorContext : public MDSInternalContextBase {
86protected:
87 Migrator *mig;
88 MDSRank *get_mds() override {
89 return mig->mds;
90 }
91public:
92 explicit MigratorContext(Migrator *mig_) : mig(mig_) {
93 assert(mig != NULL);
94 }
95};
96
97class MigratorLogContext : public MDSLogContextBase {
98protected:
99 Migrator *mig;
100 MDSRank *get_mds() override {
101 return mig->mds;
102 }
103public:
104 explicit MigratorLogContext(Migrator *mig_) : mig(mig_) {
105 assert(mig != NULL);
106 }
107};
108
109/* This function DOES put the passed message before returning*/
110void Migrator::dispatch(Message *m)
111{
112 switch (m->get_type()) {
113 // import
114 case MSG_MDS_EXPORTDIRDISCOVER:
115 handle_export_discover(static_cast<MExportDirDiscover*>(m));
116 break;
117 case MSG_MDS_EXPORTDIRPREP:
118 handle_export_prep(static_cast<MExportDirPrep*>(m));
119 break;
120 case MSG_MDS_EXPORTDIR:
121 handle_export_dir(static_cast<MExportDir*>(m));
122 break;
123 case MSG_MDS_EXPORTDIRFINISH:
124 handle_export_finish(static_cast<MExportDirFinish*>(m));
125 break;
126 case MSG_MDS_EXPORTDIRCANCEL:
127 handle_export_cancel(static_cast<MExportDirCancel*>(m));
128 break;
129
130 // export
131 case MSG_MDS_EXPORTDIRDISCOVERACK:
132 handle_export_discover_ack(static_cast<MExportDirDiscoverAck*>(m));
133 break;
134 case MSG_MDS_EXPORTDIRPREPACK:
135 handle_export_prep_ack(static_cast<MExportDirPrepAck*>(m));
136 break;
137 case MSG_MDS_EXPORTDIRACK:
138 handle_export_ack(static_cast<MExportDirAck*>(m));
139 break;
140 case MSG_MDS_EXPORTDIRNOTIFYACK:
141 handle_export_notify_ack(static_cast<MExportDirNotifyAck*>(m));
142 break;
143
144 // export 3rd party (dir_auth adjustments)
145 case MSG_MDS_EXPORTDIRNOTIFY:
146 handle_export_notify(static_cast<MExportDirNotify*>(m));
147 break;
148
149 // caps
150 case MSG_MDS_EXPORTCAPS:
151 handle_export_caps(static_cast<MExportCaps*>(m));
152 break;
153 case MSG_MDS_GATHERCAPS:
154 handle_gather_caps(static_cast<MGatherCaps*>(m));
155 break;
156
157 default:
158 derr << "migrator unknown message " << m->get_type() << dendl;
159 assert(0 == "migrator unknown message");
160 }
161}
162
163
164class C_MDC_EmptyImport : public MigratorContext {
165 CDir *dir;
166public:
167 C_MDC_EmptyImport(Migrator *m, CDir *d) : MigratorContext(m), dir(d) {}
168 void finish(int r) override {
169 mig->export_empty_import(dir);
170 }
171};
172
173
174void Migrator::export_empty_import(CDir *dir)
175{
176 dout(7) << "export_empty_import " << *dir << dendl;
177 assert(dir->is_subtree_root());
178
179 if (dir->inode->is_auth()) {
180 dout(7) << " inode is auth" << dendl;
181 return;
182 }
183 if (!dir->is_auth()) {
184 dout(7) << " not auth" << dendl;
185 return;
186 }
187 if (dir->is_freezing() || dir->is_frozen()) {
188 dout(7) << " freezing or frozen" << dendl;
189 return;
190 }
191 if (dir->get_num_head_items() > 0) {
192 dout(7) << " not actually empty" << dendl;
193 return;
194 }
195 if (dir->inode->is_root()) {
196 dout(7) << " root" << dendl;
197 return;
198 }
199
200 mds_rank_t dest = dir->inode->authority().first;
201 //if (mds->is_shutting_down()) dest = 0; // this is more efficient.
202
203 dout(7) << " really empty, exporting to " << dest << dendl;
204 assert (dest != mds->get_nodeid());
205
206 dout(7) << "exporting to mds." << dest
207 << " empty import " << *dir << dendl;
208 export_dir( dir, dest );
209}
210
211void Migrator::find_stale_export_freeze()
212{
213 utime_t now = ceph_clock_now();
214 utime_t cutoff = now;
215 cutoff -= g_conf->mds_freeze_tree_timeout;
216
217
218 /*
219 * We could have situations like:
220 *
221 * - mds.0 authpins an item in subtree A
222 * - mds.0 sends request to mds.1 to authpin an item in subtree B
223 * - mds.0 freezes subtree A
224 * - mds.1 authpins an item in subtree B
225 * - mds.1 sends request to mds.0 to authpin an item in subtree A
226 * - mds.1 freezes subtree B
227 * - mds.1 receives the remote authpin request from mds.0
228 * (wait because subtree B is freezing)
229 * - mds.0 receives the remote authpin request from mds.1
230 * (wait because subtree A is freezing)
231 *
232 *
233 * - client request authpins items in subtree B
234 * - freeze subtree B
235 * - import subtree A which is parent of subtree B
236 * (authpins parent inode of subtree B, see CDir::set_dir_auth())
237 * - freeze subtree A
238 * - client request tries authpinning items in subtree A
239 * (wait because subtree A is freezing)
240 */
241 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
242 p != export_state.end(); ) {
243 CDir* dir = p->first;
244 export_state_t& stat = p->second;
245 ++p;
246 if (stat.state != EXPORT_DISCOVERING && stat.state != EXPORT_FREEZING)
247 continue;
248 if (stat.last_cum_auth_pins != dir->get_cum_auth_pins()) {
249 stat.last_cum_auth_pins = dir->get_cum_auth_pins();
250 stat.last_cum_auth_pins_change = now;
251 continue;
252 }
253 if (stat.last_cum_auth_pins_change >= cutoff)
254 continue;
255 if (stat.num_remote_waiters > 0 ||
256 (!dir->inode->is_root() && dir->get_parent_dir()->is_freezing())) {
257 export_try_cancel(dir);
258 }
259 }
260}
261
262void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
263{
264 dout(10) << "export_try_cancel " << *dir << dendl;
265
266 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
267 assert(it != export_state.end());
268
269 int state = it->second.state;
270 switch (state) {
271 case EXPORT_LOCKING:
272 dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
273 it->second.state = EXPORT_CANCELLED;
274 dir->auth_unpin(this);
275 break;
276 case EXPORT_DISCOVERING:
277 dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
278 it->second.state = EXPORT_CANCELLED;
279 dir->unfreeze_tree(); // cancel the freeze
280 dir->auth_unpin(this);
281 if (notify_peer &&
282 (!mds->is_cluster_degraded() ||
283 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
284 mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
285 break;
286
287 case EXPORT_FREEZING:
288 dout(10) << "export state=freezing : canceling freeze" << dendl;
289 it->second.state = EXPORT_CANCELLED;
290 dir->unfreeze_tree(); // cancel the freeze
291 if (notify_peer &&
292 (!mds->is_cluster_degraded() ||
293 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
294 mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
295 break;
296
297 // NOTE: state order reversal, warning comes after prepping
298 case EXPORT_WARNING:
299 dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
300 it->second.state = EXPORT_CANCELLING;
301 // fall-thru
302
303 case EXPORT_PREPPING:
304 if (state != EXPORT_WARNING) {
305 dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
306 it->second.state = EXPORT_CANCELLED;
307 }
308
309 {
310 // unpin bounds
311 set<CDir*> bounds;
312 cache->get_subtree_bounds(dir, bounds);
313 for (set<CDir*>::iterator q = bounds.begin();
314 q != bounds.end();
315 ++q) {
316 CDir *bd = *q;
317 bd->put(CDir::PIN_EXPORTBOUND);
318 bd->state_clear(CDir::STATE_EXPORTBOUND);
319 }
320 if (state == EXPORT_WARNING) {
321 // notify bystanders
322 export_notify_abort(dir, bounds);
323 // process delayed expires
324 cache->process_delayed_expire(dir);
325 }
326 }
327 dir->unfreeze_tree();
328 cache->adjust_subtree_auth(dir, mds->get_nodeid());
329 cache->try_subtree_merge(dir);
330 if (notify_peer &&
331 (!mds->is_cluster_degraded() ||
332 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
333 mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
334 break;
335
336 case EXPORT_EXPORTING:
337 dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
338 it->second.state = EXPORT_CANCELLING;
339 export_reverse(dir);
340 break;
341
342 case EXPORT_LOGGINGFINISH:
343 case EXPORT_NOTIFYING:
344 dout(10) << "export state=loggingfinish|notifying : ignoring dest failure, we were successful." << dendl;
345 // leave export_state, don't clean up now.
346 break;
347 case EXPORT_CANCELLING:
348 break;
349
350 default:
351 ceph_abort();
352 }
353
354 // finish clean-up?
355 if (it->second.state == EXPORT_CANCELLING ||
356 it->second.state == EXPORT_CANCELLED) {
357 MutationRef mut;
358 mut.swap(it->second.mut);
359
360 if (it->second.state == EXPORT_CANCELLED) {
361 export_state.erase(it);
362 dir->state_clear(CDir::STATE_EXPORTING);
363 // send pending import_maps?
364 cache->maybe_send_pending_resolves();
365 }
366
367 // drop locks
368 if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
369 MDRequestRef mdr = static_cast<MDRequestImpl*>(mut.get());
370 assert(mdr);
371 if (mdr->more()->waiting_on_slave.empty())
372 mds->mdcache->request_finish(mdr);
373 } else if (mut) {
374 mds->locker->drop_locks(mut.get());
375 mut->cleanup();
376 }
377
378 cache->show_subtrees();
379
380 maybe_do_queued_export();
381 }
382}
383
384void Migrator::export_cancel_finish(CDir *dir)
385{
386 assert(dir->state_test(CDir::STATE_EXPORTING));
387 dir->state_clear(CDir::STATE_EXPORTING);
388
389 // pinned by Migrator::export_notify_abort()
390 dir->auth_unpin(this);
391 // send pending import_maps? (these need to go out when all exports have finished.)
392 cache->maybe_send_pending_resolves();
393}
394
395// ==========================================================
396// mds failure handling
397
398void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
399{
400 dout(5) << "handle_mds_failure_or_stop mds." << who << dendl;
401
402 // check my exports
403
404 // first add an extra auth_pin on any freezes, so that canceling a
405 // nested freeze doesn't complete one further up the hierarchy and
406 // confuse the shit out of us. we'll remove it after canceling the
407 // freeze. this way no freeze completions run before we want them
408 // to.
409 list<CDir*> pinned_dirs;
410 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
411 p != export_state.end();
412 ++p) {
413 if (p->second.state == EXPORT_FREEZING) {
414 CDir *dir = p->first;
415 dout(10) << "adding temp auth_pin on freezing " << *dir << dendl;
416 dir->auth_pin(this);
417 pinned_dirs.push_back(dir);
418 }
419 }
420
421 map<CDir*,export_state_t>::iterator p = export_state.begin();
422 while (p != export_state.end()) {
423 map<CDir*,export_state_t>::iterator next = p;
424 ++next;
425 CDir *dir = p->first;
426
427 // abort exports:
428 // - that are going to the failed node
429 // - that aren't frozen yet (to avoid auth_pin deadlock)
430 // - they havne't prepped yet (they may need to discover bounds to do that)
431 if ((p->second.peer == who &&
432 p->second.state != EXPORT_CANCELLING) ||
433 p->second.state == EXPORT_LOCKING ||
434 p->second.state == EXPORT_DISCOVERING ||
435 p->second.state == EXPORT_FREEZING ||
436 p->second.state == EXPORT_PREPPING) {
437 // the guy i'm exporting to failed, or we're just freezing.
438 dout(10) << "cleaning up export state (" << p->second.state << ")"
439 << get_export_statename(p->second.state) << " of " << *dir << dendl;
440 export_try_cancel(dir);
441 } else if (p->second.peer != who) {
442 // bystander failed.
443 if (p->second.warning_ack_waiting.erase(who)) {
444 if (p->second.state == EXPORT_WARNING) {
445 p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
446 // exporter waiting for warning acks, let's fake theirs.
447 dout(10) << "faking export_warning_ack from mds." << who
448 << " on " << *dir << " to mds." << p->second.peer
449 << dendl;
450 if (p->second.warning_ack_waiting.empty())
451 export_go(dir);
452 }
453 }
454 if (p->second.notify_ack_waiting.erase(who)) {
455 // exporter is waiting for notify acks, fake it
456 dout(10) << "faking export_notify_ack from mds." << who
457 << " on " << *dir << " to mds." << p->second.peer
458 << dendl;
459 if (p->second.state == EXPORT_NOTIFYING) {
460 if (p->second.notify_ack_waiting.empty())
461 export_finish(dir);
462 } else if (p->second.state == EXPORT_CANCELLING) {
463 if (p->second.notify_ack_waiting.empty()) {
464 export_state.erase(p);
465 export_cancel_finish(dir);
466 }
467 }
468 }
469 }
470
471 // next!
472 p = next;
473 }
474
475
476 // check my imports
477 map<dirfrag_t,import_state_t>::iterator q = import_state.begin();
478 while (q != import_state.end()) {
479 map<dirfrag_t,import_state_t>::iterator next = q;
480 ++next;
481 dirfrag_t df = q->first;
482 CInode *diri = mds->mdcache->get_inode(df.ino);
483 CDir *dir = mds->mdcache->get_dirfrag(df);
484
485 if (q->second.peer == who) {
486 if (dir)
487 dout(10) << "cleaning up import state (" << q->second.state << ")"
488 << get_import_statename(q->second.state) << " of " << *dir << dendl;
489 else
490 dout(10) << "cleaning up import state (" << q->second.state << ")"
491 << get_import_statename(q->second.state) << " of " << df << dendl;
492
493 switch (q->second.state) {
494 case IMPORT_DISCOVERING:
495 dout(10) << "import state=discovering : clearing state" << dendl;
496 import_reverse_discovering(df);
497 break;
498
499 case IMPORT_DISCOVERED:
500 assert(diri);
501 dout(10) << "import state=discovered : unpinning inode " << *diri << dendl;
502 import_reverse_discovered(df, diri);
503 break;
504
505 case IMPORT_PREPPING:
506 assert(dir);
507 dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
508 import_reverse_prepping(dir);
509 break;
510
511 case IMPORT_PREPPED:
512 assert(dir);
513 dout(10) << "import state=prepped : unpinning base+bounds, unfreezing " << *dir << dendl;
514 {
515 set<CDir*> bounds;
516 cache->get_subtree_bounds(dir, bounds);
517 import_remove_pins(dir, bounds);
518
519 // adjust auth back to the exporter
520 cache->adjust_subtree_auth(dir, q->second.peer);
521 cache->try_subtree_merge(dir);
522
523 // notify bystanders ; wait in aborting state
524 import_state[df].state = IMPORT_ABORTING;
525 import_notify_abort(dir, bounds);
526 assert(g_conf->mds_kill_import_at != 10);
527 }
528 break;
529
530 case IMPORT_LOGGINGSTART:
531 assert(dir);
532 dout(10) << "import state=loggingstart : reversing import on " << *dir << dendl;
533 import_reverse(dir);
534 break;
535
536 case IMPORT_ACKING:
537 assert(dir);
538 // hrm. make this an ambiguous import, and wait for exporter recovery to disambiguate
539 dout(10) << "import state=acking : noting ambiguous import " << *dir << dendl;
540 {
541 set<CDir*> bounds;
542 cache->get_subtree_bounds(dir, bounds);
543 cache->add_ambiguous_import(dir, bounds);
544 }
545 break;
546
547 case IMPORT_FINISHING:
548 assert(dir);
549 dout(10) << "import state=finishing : finishing import on " << *dir << dendl;
550 import_finish(dir, true);
551 break;
552
553 case IMPORT_ABORTING:
554 assert(dir);
555 dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl;
556 break;
557 }
558 } else {
559 auto bystanders_entry = q->second.bystanders.find(who);
560 if (bystanders_entry != q->second.bystanders.end()) {
561 q->second.bystanders.erase(bystanders_entry);
562 if (q->second.state == IMPORT_ABORTING) {
563 assert(dir);
564 dout(10) << "faking export_notify_ack from mds." << who
565 << " on aborting import " << *dir << " from mds." << q->second.peer
566 << dendl;
567 if (q->second.bystanders.empty()) {
568 import_reverse_unfreeze(dir);
569 }
570 }
571 }
572 }
573
574 // next!
575 q = next;
576 }
577
578 while (!pinned_dirs.empty()) {
579 CDir *dir = pinned_dirs.front();
580 dout(10) << "removing temp auth_pin on " << *dir << dendl;
581 dir->auth_unpin(this);
582 pinned_dirs.pop_front();
583 }
584}
585
586
587
588void Migrator::show_importing()
589{
590 dout(10) << "show_importing" << dendl;
591 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
592 p != import_state.end();
593 ++p) {
594 CDir *dir = mds->mdcache->get_dirfrag(p->first);
595 if (dir) {
596 dout(10) << " importing from " << p->second.peer
597 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
598 << " " << p->first << " " << *dir << dendl;
599 } else {
600 dout(10) << " importing from " << p->second.peer
601 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
602 << " " << p->first << dendl;
603 }
604 }
605}
606
607void Migrator::show_exporting()
608{
609 dout(10) << "show_exporting" << dendl;
610 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
611 p != export_state.end();
612 ++p)
613 dout(10) << " exporting to " << p->second.peer
614 << ": (" << p->second.state << ") " << get_export_statename(p->second.state)
615 << " " << p->first->dirfrag() << " " << *p->first << dendl;
616}
617
618
619
620void Migrator::audit()
621{
622 if (!g_conf->subsys.should_gather(ceph_subsys_mds, 5))
623 return; // hrm.
624
625 // import_state
626 show_importing();
627 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
628 p != import_state.end();
629 ++p) {
630 if (p->second.state == IMPORT_DISCOVERING)
631 continue;
632 if (p->second.state == IMPORT_DISCOVERED) {
633 CInode *in = cache->get_inode(p->first.ino);
634 assert(in);
635 continue;
636 }
637 CDir *dir = cache->get_dirfrag(p->first);
638 assert(dir);
639 if (p->second.state == IMPORT_PREPPING)
640 continue;
641 if (p->second.state == IMPORT_ABORTING) {
642 assert(!dir->is_ambiguous_dir_auth());
643 assert(dir->get_dir_auth().first != mds->get_nodeid());
644 continue;
645 }
646 assert(dir->is_ambiguous_dir_auth());
647 assert(dir->authority().first == mds->get_nodeid() ||
648 dir->authority().second == mds->get_nodeid());
649 }
650
651 // export_state
652 show_exporting();
653 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
654 p != export_state.end();
655 ++p) {
656 CDir *dir = p->first;
657 if (p->second.state == EXPORT_LOCKING ||
658 p->second.state == EXPORT_DISCOVERING ||
659 p->second.state == EXPORT_FREEZING ||
660 p->second.state == EXPORT_CANCELLING)
661 continue;
662 assert(dir->is_ambiguous_dir_auth());
663 assert(dir->authority().first == mds->get_nodeid() ||
664 dir->authority().second == mds->get_nodeid());
665 }
666
667 // ambiguous+me subtrees should be importing|exporting
668
669 // write me
670}
671
672
673
674
675
676// ==========================================================
677// EXPORT
678
679void Migrator::export_dir_nicely(CDir *dir, mds_rank_t dest)
680{
681 // enqueue
682 dout(7) << "export_dir_nicely " << *dir << " to " << dest << dendl;
683 export_queue.push_back(pair<dirfrag_t,mds_rank_t>(dir->dirfrag(), dest));
684
685 maybe_do_queued_export();
686}
687
688void Migrator::maybe_do_queued_export()
689{
690 static bool running;
691 if (running)
692 return;
693 running = true;
694 while (!export_queue.empty() &&
695 export_state.size() <= 4) {
696 dirfrag_t df = export_queue.front().first;
697 mds_rank_t dest = export_queue.front().second;
698 export_queue.pop_front();
699
700 CDir *dir = mds->mdcache->get_dirfrag(df);
701 if (!dir) continue;
702 if (!dir->is_auth()) continue;
703
704 dout(0) << "nicely exporting to mds." << dest << " " << *dir << dendl;
705
706 export_dir(dir, dest);
707 }
708 running = false;
709}
710
711
712
713
714class C_MDC_ExportFreeze : public MigratorContext {
715 CDir *ex; // dir i'm exporting
716 uint64_t tid;
717public:
718 C_MDC_ExportFreeze(Migrator *m, CDir *e, uint64_t t) :
719 MigratorContext(m), ex(e), tid(t) {
720 assert(ex != NULL);
721 }
722 void finish(int r) override {
723 if (r >= 0)
724 mig->export_frozen(ex, tid);
725 }
726};
727
728
729void Migrator::get_export_lock_set(CDir *dir, set<SimpleLock*>& locks)
730{
731 // path
732 vector<CDentry*> trace;
733 cache->make_trace(trace, dir->inode);
734 for (vector<CDentry*>::iterator it = trace.begin();
735 it != trace.end();
736 ++it)
737 locks.insert(&(*it)->lock);
738
739 // prevent scatter gather race
740 locks.insert(&dir->get_inode()->dirfragtreelock);
741
742 // bound dftlocks:
743 // NOTE: We need to take an rdlock on bounding dirfrags during
744 // migration for a rather irritating reason: when we export the
745 // bound inode, we need to send scatterlock state for the dirfrags
746 // as well, so that the new auth also gets the correct info. If we
747 // race with a refragment, this info is useless, as we can't
748 // redivvy it up. And it's needed for the scatterlocks to work
749 // properly: when the auth is in a sync/lock state it keeps each
750 // dirfrag's portion in the local (auth OR replica) dirfrag.
751 set<CDir*> wouldbe_bounds;
752 cache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
753 for (set<CDir*>::iterator p = wouldbe_bounds.begin(); p != wouldbe_bounds.end(); ++p)
754 locks.insert(&(*p)->get_inode()->dirfragtreelock);
755}
756
757
31f18b77 758class C_M_ExportDirWait : public MigratorContext {
7c673cae
FG
759 MDRequestRef mdr;
760 int count;
761public:
31f18b77 762 C_M_ExportDirWait(Migrator *m, MDRequestRef mdr, int count)
7c673cae
FG
763 : MigratorContext(m), mdr(mdr), count(count) {}
764 void finish(int r) override {
765 mig->dispatch_export_dir(mdr, count);
766 }
767};
768
769
770/** export_dir(dir, dest)
771 * public method to initiate an export.
772 * will fail if the directory is freezing, frozen, unpinnable, or root.
773 */
774void Migrator::export_dir(CDir *dir, mds_rank_t dest)
775{
776 dout(7) << "export_dir " << *dir << " to " << dest << dendl;
777 assert(dir->is_auth());
778 assert(dest != mds->get_nodeid());
779
780 if (mds->mdcache->is_readonly()) {
781 dout(7) << "read-only FS, no exports for now" << dendl;
782 return;
783 }
31f18b77
FG
784 if (!mds->mdsmap->is_active(dest)) {
785 dout(7) << "dest not active, no exports for now" << dendl;
786 return;
787 }
7c673cae
FG
788 if (mds->is_cluster_degraded()) {
789 dout(7) << "cluster degraded, no exports for now" << dendl;
790 return;
791 }
792 if (dir->inode->is_system()) {
793 dout(7) << "i won't export system dirs (root, mdsdirs, stray, /.ceph, etc.)" << dendl;
794 //ceph_abort();
795 return;
796 }
797
798 if (!dir->inode->is_base() && dir->inode->get_projected_parent_dir()->inode->is_stray() &&
799 dir->inode->get_projected_parent_dir()->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
800 dout(7) << "i won't export anything in stray" << dendl;
801 return;
802 }
803
804 if (dir->is_frozen() ||
805 dir->is_freezing()) {
806 dout(7) << " can't export, freezing|frozen. wait for other exports to finish first." << dendl;
807 return;
808 }
809 if (dir->state_test(CDir::STATE_EXPORTING)) {
810 dout(7) << "already exporting" << dendl;
811 return;
812 }
813
814 if (!mds->is_stopping() && !dir->inode->is_exportable(dest)) {
815 dout(7) << "dir is export pinned" << dendl;
816 return;
817 }
818
819 if (dest == mds->get_nodeid() || !mds->mdsmap->is_up(dest)) {
820 dout(7) << "cannot export: dest " << dest << " is me or is not active" << dendl;
821 return;
822 }
823
824 if (g_conf->mds_thrash_exports) {
825 // create random subtree bound (which will not be exported)
826 list<CDir*> ls;
827 for (auto p = dir->begin(); p != dir->end(); ++p) {
828 auto dn = p->second;
829 CDentry::linkage_t *dnl= dn->get_linkage();
830 if (dnl->is_primary()) {
831 CInode *in = dnl->get_inode();
832 if (in->is_dir())
833 in->get_nested_dirfrags(ls);
834 }
835 }
836 if (ls.size() > 0) {
837 int n = rand() % ls.size();
838 auto p = ls.begin();
839 while (n--) ++p;
840 CDir *bd = *p;
841 if (!(bd->is_frozen() || bd->is_freezing())) {
842 assert(bd->is_auth());
843 dir->state_set(CDir::STATE_AUXSUBTREE);
844 mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
845 dout(0) << "export_dir: create aux subtree " << *bd << " under " << *dir << dendl;
846 }
847 }
848 }
849
850 mds->hit_export_target(ceph_clock_now(), dest, -1);
851
852 dir->auth_pin(this);
853 dir->state_set(CDir::STATE_EXPORTING);
854
855 MDRequestRef mdr = mds->mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
856 mdr->more()->export_dir = dir;
857
858 assert(export_state.count(dir) == 0);
859 export_state_t& stat = export_state[dir];
860 stat.state = EXPORT_LOCKING;
861 stat.peer = dest;
862 stat.tid = mdr->reqid.tid;
863 stat.mut = mdr;
864
865 return mds->mdcache->dispatch_request(mdr);
866}
867
868void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
869{
870 dout(7) << "dispatch_export_dir " << *mdr << dendl;
871
872 CDir *dir = mdr->more()->export_dir;
873 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
874 if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
875 // export must have aborted.
876 dout(7) << "export must have aborted " << *mdr << dendl;
877 mds->mdcache->request_finish(mdr);
878 return;
879 }
880 assert(it->second.state == EXPORT_LOCKING);
881
882 mds_rank_t dest = it->second.peer;
883
884 if (!mds->is_export_target(dest)) {
885 dout(7) << "dest is not yet an export target" << dendl;
886 if (count > 3) {
887 dout(5) << "dest has not been added as export target after three MDSMap epochs, canceling export" << dendl;
888 export_try_cancel(dir);
889 return;
890 }
31f18b77 891 mds->wait_for_mdsmap(mds->mdsmap->get_epoch(), new C_M_ExportDirWait(this, mdr, count+1));
7c673cae
FG
892 return;
893 }
894
895 if (!dir->inode->get_parent_dn()) {
896 dout(7) << "waiting for dir to become stable before export: " << *dir << dendl;
31f18b77 897 dir->add_waiter(CDir::WAIT_CREATED, new C_M_ExportDirWait(this, mdr, 1));
7c673cae
FG
898 return;
899 }
900
901 if (mdr->aborted || dir->is_frozen() || dir->is_freezing()) {
902 dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
903 export_try_cancel(dir);
904 return;
905 }
906
907 // locks?
908 set<SimpleLock*> rdlocks;
909 set<SimpleLock*> xlocks;
910 set<SimpleLock*> wrlocks;
911 get_export_lock_set(dir, rdlocks);
912 // If auth MDS of the subtree root inode is neither the exporter MDS
913 // nor the importer MDS and it gathers subtree root's fragstat/neststat
914 // while the subtree is exporting. It's possible that the exporter MDS
915 // and the importer MDS both are auth MDS of the subtree root or both
916 // are not auth MDS of the subtree root at the time they receive the
917 // lock messages. So the auth MDS of the subtree root inode may get no
918 // or duplicated fragstat/neststat for the subtree root dirfrag.
919 wrlocks.insert(&dir->get_inode()->filelock);
920 wrlocks.insert(&dir->get_inode()->nestlock);
921 if (dir->get_inode()->is_auth()) {
922 dir->get_inode()->filelock.set_scatter_wanted();
923 dir->get_inode()->nestlock.set_scatter_wanted();
924 }
925
926 if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, NULL, NULL, true)) {
927 if (mdr->aborted)
928 export_try_cancel(dir);
929 return;
930 }
931
932 assert(g_conf->mds_kill_export_at != 1);
933 it->second.state = EXPORT_DISCOVERING;
934
935 // send ExportDirDiscover (ask target)
936 filepath path;
937 dir->inode->make_path(path);
938 MExportDirDiscover *discover = new MExportDirDiscover(dir->dirfrag(), path,
939 mds->get_nodeid(),
940 it->second.tid);
941 mds->send_message_mds(discover, dest);
942 assert(g_conf->mds_kill_export_at != 2);
943
944 it->second.last_cum_auth_pins_change = ceph_clock_now();
945
946 // start the freeze, but hold it up with an auth_pin.
947 dir->freeze_tree();
948 assert(dir->is_freezing_tree());
949 dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir, it->second.tid));
950}
951
952/*
953 * called on receipt of MExportDirDiscoverAck
954 * the importer now has the directory's _inode_ in memory, and pinned.
955 *
956 * This function DOES put the passed message before returning
957 */
958void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m)
959{
960 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
961 mds_rank_t dest(m->get_source().num());
962 utime_t now = ceph_clock_now();
963 assert(dir);
964
965 dout(7) << "export_discover_ack from " << m->get_source()
966 << " on " << *dir << dendl;
967
968 mds->hit_export_target(now, dest, -1);
969
970 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
971 if (it == export_state.end() ||
972 it->second.tid != m->get_tid() ||
973 it->second.peer != dest) {
974 dout(7) << "must have aborted" << dendl;
975 } else {
976 assert(it->second.state == EXPORT_DISCOVERING);
977 // release locks to avoid deadlock
978 MDRequestRef mdr = static_cast<MDRequestImpl*>(it->second.mut.get());
979 assert(mdr);
980 mds->mdcache->request_finish(mdr);
981 it->second.mut.reset();
982 // freeze the subtree
983 it->second.state = EXPORT_FREEZING;
984 dir->auth_unpin(this);
985 assert(g_conf->mds_kill_export_at != 3);
986 }
987
988 m->put(); // done
989}
990
991class C_M_ExportSessionsFlushed : public MigratorContext {
992 CDir *dir;
993 uint64_t tid;
994public:
995 C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t)
996 : MigratorContext(m), dir(d), tid(t) {
997 assert(dir != NULL);
998 }
999 void finish(int r) override {
1000 mig->export_sessions_flushed(dir, tid);
1001 }
1002};
1003
1004void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
1005{
1006 dout(7) << "export_sessions_flushed " << *dir << dendl;
1007
1008 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1009 if (it == export_state.end() ||
1010 it->second.state == EXPORT_CANCELLING ||
1011 it->second.tid != tid) {
1012 // export must have aborted.
1013 dout(7) << "export must have aborted on " << dir << dendl;
1014 return;
1015 }
1016
1017 assert(it->second.state == EXPORT_PREPPING || it->second.state == EXPORT_WARNING);
1018 assert(it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0);
1019 it->second.warning_ack_waiting.erase(MDS_RANK_NONE);
1020 if (it->second.state == EXPORT_WARNING && it->second.warning_ack_waiting.empty())
1021 export_go(dir); // start export.
1022}
1023
1024void Migrator::export_frozen(CDir *dir, uint64_t tid)
1025{
1026 dout(7) << "export_frozen on " << *dir << dendl;
1027
1028 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1029 if (it == export_state.end() || it->second.tid != tid) {
1030 dout(7) << "export must have aborted" << dendl;
1031 return;
1032 }
1033
1034 assert(it->second.state == EXPORT_FREEZING);
1035 assert(dir->is_frozen_tree_root());
1036 assert(dir->get_cum_auth_pins() == 0);
1037
1038 CInode *diri = dir->get_inode();
1039
1040 // ok, try to grab all my locks.
1041 set<SimpleLock*> rdlocks;
1042 get_export_lock_set(dir, rdlocks);
1043 if ((diri->is_auth() && diri->is_frozen()) ||
1044 !mds->locker->can_rdlock_set(rdlocks) ||
1045 !diri->filelock.can_wrlock(-1) ||
1046 !diri->nestlock.can_wrlock(-1)) {
1047 dout(7) << "export_dir couldn't acquire all needed locks, failing. "
1048 << *dir << dendl;
1049
1050 // .. unwind ..
1051 dir->unfreeze_tree();
1052 dir->state_clear(CDir::STATE_EXPORTING);
1053
1054 mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
1055
1056 export_state.erase(it);
1057 return;
1058 }
1059
1060 it->second.mut = new MutationImpl();
1061 if (diri->is_auth())
1062 it->second.mut->auth_pin(diri);
1063 mds->locker->rdlock_take_set(rdlocks, it->second.mut);
1064 mds->locker->wrlock_force(&diri->filelock, it->second.mut);
1065 mds->locker->wrlock_force(&diri->nestlock, it->second.mut);
1066
1067 cache->show_subtrees();
1068
1069 // note the bounds.
1070 // force it into a subtree by listing auth as <me,me>.
1071 cache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
1072 set<CDir*> bounds;
1073 cache->get_subtree_bounds(dir, bounds);
1074
1075 // generate prep message, log entry.
1076 MExportDirPrep *prep = new MExportDirPrep(dir->dirfrag(), it->second.tid);
1077
1078 // include list of bystanders
1079 for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
1080 p != dir->replicas_end();
1081 ++p) {
1082 if (p->first != it->second.peer) {
1083 dout(10) << "bystander mds." << p->first << dendl;
1084 prep->add_bystander(p->first);
1085 }
1086 }
1087
1088 // include base dirfrag
1089 cache->replicate_dir(dir, it->second.peer, prep->basedir);
1090
1091 /*
1092 * include spanning tree for all nested exports.
1093 * these need to be on the destination _before_ the final export so that
1094 * dir_auth updates on any nested exports are properly absorbed.
1095 * this includes inodes and dirfrags included in the subtree, but
1096 * only the inodes at the bounds.
1097 *
1098 * each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
1099 */
1100 set<inodeno_t> inodes_added;
1101 set<dirfrag_t> dirfrags_added;
1102
1103 // check bounds
1104 for (set<CDir*>::iterator p = bounds.begin();
1105 p != bounds.end();
1106 ++p) {
1107 CDir *bound = *p;
1108
1109 // pin it.
1110 bound->get(CDir::PIN_EXPORTBOUND);
1111 bound->state_set(CDir::STATE_EXPORTBOUND);
1112
1113 dout(7) << " export bound " << *bound << dendl;
1114 prep->add_bound( bound->dirfrag() );
1115
1116 // trace to bound
1117 bufferlist tracebl;
1118 CDir *cur = bound;
1119
1120 char start = '-';
1121 while (1) {
1122 // don't repeat inodes
1123 if (inodes_added.count(cur->inode->ino()))
1124 break;
1125 inodes_added.insert(cur->inode->ino());
1126
1127 // prepend dentry + inode
1128 assert(cur->inode->is_auth());
1129 bufferlist bl;
1130 cache->replicate_dentry(cur->inode->parent, it->second.peer, bl);
1131 dout(7) << " added " << *cur->inode->parent << dendl;
1132 cache->replicate_inode(cur->inode, it->second.peer, bl,
1133 mds->mdsmap->get_up_features());
1134 dout(7) << " added " << *cur->inode << dendl;
1135 bl.claim_append(tracebl);
1136 tracebl.claim(bl);
1137
1138 cur = cur->get_parent_dir();
1139
1140 // don't repeat dirfrags
1141 if (dirfrags_added.count(cur->dirfrag()) ||
1142 cur == dir) {
1143 start = 'd'; // start with dentry
1144 break;
1145 }
1146 dirfrags_added.insert(cur->dirfrag());
1147
1148 // prepend dir
1149 cache->replicate_dir(cur, it->second.peer, bl);
1150 dout(7) << " added " << *cur << dendl;
1151 bl.claim_append(tracebl);
1152 tracebl.claim(bl);
1153
1154 start = 'f'; // start with dirfrag
1155 }
1156 bufferlist final_bl;
1157 dirfrag_t df = cur->dirfrag();
1158 ::encode(df, final_bl);
1159 ::encode(start, final_bl);
1160 final_bl.claim_append(tracebl);
1161 prep->add_trace(final_bl);
1162 }
1163
1164 // send.
1165 it->second.state = EXPORT_PREPPING;
1166 mds->send_message_mds(prep, it->second.peer);
1167 assert (g_conf->mds_kill_export_at != 4);
1168
1169 // make sure any new instantiations of caps are flushed out
1170 assert(it->second.warning_ack_waiting.empty());
1171
1172 set<client_t> export_client_set;
1173 get_export_client_set(dir, export_client_set);
1174
1175 MDSGatherBuilder gather(g_ceph_context);
1176 mds->server->flush_client_sessions(export_client_set, gather);
1177 if (gather.has_subs()) {
1178 it->second.warning_ack_waiting.insert(MDS_RANK_NONE);
1179 gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
1180 gather.activate();
1181 }
1182}
1183
1184void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
1185{
1186 list<CDir*> dfs;
1187 dfs.push_back(dir);
1188 while (!dfs.empty()) {
1189 CDir *dir = dfs.front();
1190 dfs.pop_front();
1191 for (CDir::map_t::iterator p = dir->begin(); p != dir->end(); ++p) {
1192 CDentry *dn = p->second;
1193 if (!dn->get_linkage()->is_primary())
1194 continue;
1195 CInode *in = dn->get_linkage()->get_inode();
1196 if (in->is_dir()) {
1197 // directory?
1198 list<CDir*> ls;
1199 in->get_dirfrags(ls);
1200 for (list<CDir*>::iterator q = ls.begin(); q != ls.end(); ++q) {
1201 if (!(*q)->state_test(CDir::STATE_EXPORTBOUND)) {
1202 // include nested dirfrag
1203 assert((*q)->get_dir_auth().first == CDIR_AUTH_PARENT);
1204 dfs.push_back(*q); // it's ours, recurse (later)
1205 }
1206 }
1207 }
1208 for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
1209 q != in->client_caps.end();
1210 ++q)
1211 client_set.insert(q->first);
1212 }
1213 }
1214}
1215
1216void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
1217{
1218 for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
1219 q != in->client_caps.end();
1220 ++q)
1221 client_set.insert(q->first);
1222}
1223
1224/* This function DOES put the passed message before returning*/
1225void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
1226{
1227 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
1228 mds_rank_t dest(m->get_source().num());
1229 utime_t now = ceph_clock_now();
1230 assert(dir);
1231
1232 dout(7) << "export_prep_ack " << *dir << dendl;
1233
1234 mds->hit_export_target(now, dest, -1);
1235
1236 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1237 if (it == export_state.end() ||
1238 it->second.tid != m->get_tid() ||
1239 it->second.peer != mds_rank_t(m->get_source().num())) {
1240 // export must have aborted.
1241 dout(7) << "export must have aborted" << dendl;
1242 m->put();
1243 return;
1244 }
1245 assert(it->second.state == EXPORT_PREPPING);
1246
1247 if (!m->is_success()) {
1248 dout(7) << "peer couldn't acquire all needed locks, canceling" << dendl;
1249 export_try_cancel(dir, false);
1250 m->put();
1251 return;
1252 }
1253
1254 assert (g_conf->mds_kill_export_at != 5);
1255 // send warnings
1256 set<CDir*> bounds;
1257 cache->get_subtree_bounds(dir, bounds);
1258
1259 assert(it->second.warning_ack_waiting.empty() ||
1260 (it->second.warning_ack_waiting.size() == 1 &&
1261 it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0));
1262 assert(it->second.notify_ack_waiting.empty());
1263
1264 for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
1265 p != dir->replicas_end();
1266 ++p) {
1267 if (p->first == it->second.peer) continue;
1268 if (mds->is_cluster_degraded() &&
1269 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first))
1270 continue; // only if active
1271 it->second.warning_ack_waiting.insert(p->first);
1272 it->second.notify_ack_waiting.insert(p->first); // we'll eventually get a notifyack, too!
1273
1274 MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), it->second.tid, true,
1275 mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
1276 mds_authority_t(mds->get_nodeid(),it->second.peer));
1277 for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q)
1278 notify->get_bounds().push_back((*q)->dirfrag());
1279 mds->send_message_mds(notify, p->first);
1280
1281 }
1282
1283 it->second.state = EXPORT_WARNING;
1284
1285 assert(g_conf->mds_kill_export_at != 6);
1286 // nobody to warn?
1287 if (it->second.warning_ack_waiting.empty())
1288 export_go(dir); // start export.
1289
1290 // done.
1291 m->put();
1292}
1293
1294
1295class C_M_ExportGo : public MigratorContext {
1296 CDir *dir;
1297 uint64_t tid;
1298public:
1299 C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
1300 MigratorContext(m), dir(d), tid(t) {
1301 assert(dir != NULL);
1302 }
1303 void finish(int r) override {
1304 mig->export_go_synced(dir, tid);
1305 }
1306};
1307
1308void Migrator::export_go(CDir *dir)
1309{
1310 assert(export_state.count(dir));
1311 dout(7) << "export_go " << *dir << " to " << export_state[dir].peer << dendl;
1312
1313 // first sync log to flush out e.g. any cap imports
1314 mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, export_state[dir].tid));
1315 mds->mdlog->flush();
1316}
1317
1318void Migrator::export_go_synced(CDir *dir, uint64_t tid)
1319{
1320 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1321 if (it == export_state.end() ||
1322 it->second.state == EXPORT_CANCELLING ||
1323 it->second.tid != tid) {
1324 // export must have aborted.
1325 dout(7) << "export must have aborted on " << dir << dendl;
1326 return;
1327 }
1328 assert(it->second.state == EXPORT_WARNING);
1329 mds_rank_t dest = it->second.peer;
1330
1331 dout(7) << "export_go_synced " << *dir << " to " << dest << dendl;
1332
1333 cache->show_subtrees();
1334
1335 it->second.state = EXPORT_EXPORTING;
1336 assert(g_conf->mds_kill_export_at != 7);
1337
1338 assert(dir->is_frozen_tree_root());
1339 assert(dir->get_cum_auth_pins() == 0);
1340
1341 // set ambiguous auth
1342 cache->adjust_subtree_auth(dir, mds->get_nodeid(), dest);
1343
1344 // take away the popularity we're sending.
1345 utime_t now = ceph_clock_now();
1346 mds->balancer->subtract_export(dir, now);
1347
1348 // fill export message with cache data
1349 MExportDir *req = new MExportDir(dir->dirfrag(), it->second.tid);
1350 map<client_t,entity_inst_t> exported_client_map;
1351 uint64_t num_exported_inodes = encode_export_dir(req->export_data,
1352 dir, // recur start point
1353 exported_client_map,
1354 now);
1355 ::encode(exported_client_map, req->client_map,
1356 mds->mdsmap->get_up_features());
1357
1358 // add bounds to message
1359 set<CDir*> bounds;
1360 cache->get_subtree_bounds(dir, bounds);
1361 for (set<CDir*>::iterator p = bounds.begin();
1362 p != bounds.end();
1363 ++p)
1364 req->add_export((*p)->dirfrag());
1365
1366 // send
1367 mds->send_message_mds(req, dest);
1368 assert(g_conf->mds_kill_export_at != 8);
1369
1370 mds->hit_export_target(now, dest, num_exported_inodes+1);
1371
1372 // stats
1373 if (mds->logger) mds->logger->inc(l_mds_exported);
1374 if (mds->logger) mds->logger->inc(l_mds_exported_inodes, num_exported_inodes);
1375
1376 cache->show_subtrees();
1377}
1378
1379
1380/** encode_export_inode
1381 * update our local state for this inode to export.
1382 * encode relevant state to be sent over the wire.
1383 * used by: encode_export_dir, file_rename (if foreign)
1384 *
1385 * FIXME: the separation between CInode.encode_export and these methods
1386 * is pretty arbitrary and dumb.
1387 */
1388void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
1389 map<client_t,entity_inst_t>& exported_client_map)
1390{
1391 dout(7) << "encode_export_inode " << *in << dendl;
1392 assert(!in->is_replica(mds->get_nodeid()));
1393
1394 // relax locks?
1395 if (!in->is_replicated()) {
1396 in->replicate_relax_locks();
1397 dout(20) << " did replicate_relax_locks, now " << *in << dendl;
1398 }
1399
1400 ::encode(in->inode.ino, enc_state);
1401 ::encode(in->last, enc_state);
1402 in->encode_export(enc_state);
1403
1404 // caps
1405 encode_export_inode_caps(in, true, enc_state, exported_client_map);
1406}
1407
1408void Migrator::encode_export_inode_caps(CInode *in, bool auth_cap, bufferlist& bl,
1409 map<client_t,entity_inst_t>& exported_client_map)
1410{
1411 dout(20) << "encode_export_inode_caps " << *in << dendl;
1412
1413 // encode caps
1414 map<client_t,Capability::Export> cap_map;
1415 in->export_client_caps(cap_map);
1416 ::encode(cap_map, bl);
1417 if (auth_cap) {
1418 ::encode(in->get_mds_caps_wanted(), bl);
1419
1420 in->state_set(CInode::STATE_EXPORTINGCAPS);
1421 in->get(CInode::PIN_EXPORTINGCAPS);
1422 }
1423
1424 // make note of clients named by exported capabilities
1425 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
1426 it != in->client_caps.end();
1427 ++it)
1428 exported_client_map[it->first] = mds->sessionmap.get_inst(entity_name_t::CLIENT(it->first.v));
1429}
1430
1431void Migrator::finish_export_inode_caps(CInode *in, mds_rank_t peer,
1432 map<client_t,Capability::Import>& peer_imported)
1433{
1434 dout(20) << "finish_export_inode_caps " << *in << dendl;
1435
1436 in->state_clear(CInode::STATE_EXPORTINGCAPS);
1437 in->put(CInode::PIN_EXPORTINGCAPS);
1438
1439 // tell (all) clients about migrating caps..
1440 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
1441 it != in->client_caps.end();
1442 ++it) {
1443 Capability *cap = it->second;
1444 dout(7) << "finish_export_inode_caps telling client." << it->first
1445 << " exported caps on " << *in << dendl;
1446 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, in->ino(), 0,
1447 cap->get_cap_id(), cap->get_mseq(), mds->get_osd_epoch_barrier());
1448
1449 map<client_t,Capability::Import>::iterator q = peer_imported.find(it->first);
1450 assert(q != peer_imported.end());
1451 m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, peer, 0);
1452 mds->send_message_client_counted(m, it->first);
1453 }
1454 in->clear_client_caps_after_export();
1455 mds->locker->eval(in, CEPH_CAP_LOCKS);
1456}
1457
1458void Migrator::finish_export_inode(CInode *in, utime_t now, mds_rank_t peer,
1459 map<client_t,Capability::Import>& peer_imported,
1460 list<MDSInternalContextBase*>& finished)
1461{
1462 dout(12) << "finish_export_inode " << *in << dendl;
1463
1464 // clean
1465 if (in->is_dirty())
1466 in->mark_clean();
1467
1468 // clear/unpin cached_by (we're no longer the authority)
1469 in->clear_replica_map();
1470
1471 // twiddle lock states for auth -> replica transition
1472 in->authlock.export_twiddle();
1473 in->linklock.export_twiddle();
1474 in->dirfragtreelock.export_twiddle();
1475 in->filelock.export_twiddle();
1476 in->nestlock.export_twiddle();
1477 in->xattrlock.export_twiddle();
1478 in->snaplock.export_twiddle();
1479 in->flocklock.export_twiddle();
1480 in->policylock.export_twiddle();
1481
1482 // mark auth
1483 assert(in->is_auth());
1484 in->state_clear(CInode::STATE_AUTH);
1485 in->replica_nonce = CInode::EXPORT_NONCE;
1486
1487 in->clear_dirty_rstat();
1488
1489 // no more auth subtree? clear scatter dirty
1490 if (!in->has_subtree_root_dirfrag(mds->get_nodeid()))
1491 in->clear_scatter_dirty();
1492
1493 in->item_open_file.remove_myself();
1494
1495 in->clear_dirty_parent();
1496
1497 in->clear_file_locks();
1498
1499 // waiters
1500 in->take_waiting(CInode::WAIT_ANY_MASK, finished);
1501
1502 in->finish_export(now);
1503
1504 finish_export_inode_caps(in, peer, peer_imported);
7c673cae
FG
1505}
1506
1507uint64_t Migrator::encode_export_dir(bufferlist& exportbl,
1508 CDir *dir,
1509 map<client_t,entity_inst_t>& exported_client_map,
1510 utime_t now)
1511{
1512 uint64_t num_exported = 0;
1513
1514 dout(7) << "encode_export_dir " << *dir << " " << dir->get_num_head_items() << " head items" << dendl;
1515
1516 assert(dir->get_projected_version() == dir->get_version());
1517
1518#ifdef MDS_VERIFY_FRAGSTAT
1519 if (dir->is_complete())
1520 dir->verify_fragstat();
1521#endif
1522
1523 // dir
1524 dirfrag_t df = dir->dirfrag();
1525 ::encode(df, exportbl);
1526 dir->encode_export(exportbl);
1527
1528 __u32 nden = dir->items.size();
1529 ::encode(nden, exportbl);
1530
1531 // dentries
1532 list<CDir*> subdirs;
1533 CDir::map_t::iterator it;
1534 for (it = dir->begin(); it != dir->end(); ++it) {
1535 CDentry *dn = it->second;
1536 CInode *in = dn->get_linkage()->get_inode();
1537
1538 if (!dn->is_replicated())
1539 dn->lock.replicate_relax();
1540
1541 num_exported++;
1542
1543 // -- dentry
1544 dout(7) << "encode_export_dir exporting " << *dn << dendl;
1545
1546 // dn name
1547 ::encode(dn->name, exportbl);
1548 ::encode(dn->last, exportbl);
1549
1550 // state
1551 dn->encode_export(exportbl);
1552
1553 // points to...
1554
1555 // null dentry?
1556 if (dn->get_linkage()->is_null()) {
1557 exportbl.append("N", 1); // null dentry
1558 continue;
1559 }
1560
1561 if (dn->get_linkage()->is_remote()) {
1562 // remote link
1563 exportbl.append("L", 1); // remote link
1564
1565 inodeno_t ino = dn->get_linkage()->get_remote_ino();
1566 unsigned char d_type = dn->get_linkage()->get_remote_d_type();
1567 ::encode(ino, exportbl);
1568 ::encode(d_type, exportbl);
1569 continue;
1570 }
1571
1572 // primary link
1573 // -- inode
1574 exportbl.append("I", 1); // inode dentry
1575
1576 encode_export_inode(in, exportbl, exported_client_map); // encode, and (update state for) export
1577
1578 // directory?
1579 list<CDir*> dfs;
1580 in->get_dirfrags(dfs);
1581 for (list<CDir*>::iterator p = dfs.begin(); p != dfs.end(); ++p) {
1582 CDir *t = *p;
1583 if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
1584 // include nested dirfrag
1585 assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
1586 subdirs.push_back(t); // it's ours, recurse (later)
1587 }
1588 }
1589 }
1590
1591 // subdirs
1592 for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); ++it)
1593 num_exported += encode_export_dir(exportbl, *it, exported_client_map, now);
1594
1595 return num_exported;
1596}
1597
1598void Migrator::finish_export_dir(CDir *dir, utime_t now, mds_rank_t peer,
1599 map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
1600 list<MDSInternalContextBase*>& finished, int *num_dentries)
1601{
1602 dout(10) << "finish_export_dir " << *dir << dendl;
1603
1604 // release open_by
1605 dir->clear_replica_map();
1606
1607 // mark
1608 assert(dir->is_auth());
1609 dir->state_clear(CDir::STATE_AUTH);
1610 dir->remove_bloom();
1611 dir->replica_nonce = CDir::EXPORT_NONCE;
1612
1613 if (dir->is_dirty())
1614 dir->mark_clean();
1615
1616 // suck up all waiters
1617 dir->take_waiting(CDir::WAIT_ANY_MASK, finished); // all dir waiters
1618
1619 // pop
1620 dir->finish_export(now);
1621
1622 // dentries
1623 list<CDir*> subdirs;
1624 CDir::map_t::iterator it;
1625 for (it = dir->begin(); it != dir->end(); ++it) {
1626 CDentry *dn = it->second;
1627 CInode *in = dn->get_linkage()->get_inode();
1628
1629 // dentry
1630 dn->finish_export();
1631
1632 // inode?
1633 if (dn->get_linkage()->is_primary()) {
1634 finish_export_inode(in, now, peer, peer_imported[in->ino()], finished);
1635
1636 // subdirs?
1637 in->get_nested_dirfrags(subdirs);
1638 }
1639
1640 cache->touch_dentry_bottom(dn); // move dentry to tail of LRU
1641 ++(*num_dentries);
1642 }
1643
1644 // subdirs
1645 for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); ++it)
1646 finish_export_dir(*it, now, peer, peer_imported, finished, num_dentries);
1647}
1648
1649class C_MDS_ExportFinishLogged : public MigratorLogContext {
1650 CDir *dir;
1651public:
1652 C_MDS_ExportFinishLogged(Migrator *m, CDir *d) : MigratorLogContext(m), dir(d) {}
1653 void finish(int r) override {
1654 mig->export_logged_finish(dir);
1655 }
1656};
1657
1658
1659/*
1660 * i should get an export_ack from the export target.
1661 *
1662 * This function DOES put the passed message before returning
1663 */
1664void Migrator::handle_export_ack(MExportDirAck *m)
1665{
1666 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
1667 mds_rank_t dest(m->get_source().num());
1668 utime_t now = ceph_clock_now();
1669 assert(dir);
1670 assert(dir->is_frozen_tree_root()); // i'm exporting!
1671
1672 // yay!
1673 dout(7) << "handle_export_ack " << *dir << dendl;
1674
1675 mds->hit_export_target(now, dest, -1);
1676
1677 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1678 assert(it != export_state.end());
1679 assert(it->second.state == EXPORT_EXPORTING);
1680 assert(it->second.tid == m->get_tid());
1681
1682 bufferlist::iterator bp = m->imported_caps.begin();
1683 ::decode(it->second.peer_imported, bp);
1684
1685 it->second.state = EXPORT_LOGGINGFINISH;
1686 assert (g_conf->mds_kill_export_at != 9);
1687 set<CDir*> bounds;
1688 cache->get_subtree_bounds(dir, bounds);
1689
7c673cae
FG
1690 // log completion.
1691 // include export bounds, to ensure they're in the journal.
31f18b77 1692 EExport *le = new EExport(mds->mdlog, dir, it->second.peer);;
7c673cae
FG
1693 mds->mdlog->start_entry(le);
1694
1695 le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
31f18b77 1696 le->metablob.add_dir(dir, false);
7c673cae
FG
1697 for (set<CDir*>::iterator p = bounds.begin();
1698 p != bounds.end();
1699 ++p) {
1700 CDir *bound = *p;
1701 le->get_bounds().insert(bound->dirfrag());
1702 le->metablob.add_dir_context(bound);
1703 le->metablob.add_dir(bound, false);
1704 }
1705
31f18b77
FG
1706 // list us second, them first.
1707 // this keeps authority().first in sync with subtree auth state in the journal.
1708 cache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid());
1709
7c673cae
FG
1710 // log export completion, then finish (unfreeze, trigger finish context, etc.)
1711 mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir));
1712 mds->mdlog->flush();
1713 assert (g_conf->mds_kill_export_at != 10);
1714
1715 m->put();
1716}
1717
1718void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
1719{
1720 dout(7) << "export_notify_abort " << *dir << dendl;
1721
1722 export_state_t& stat = export_state[dir];
1723 assert(stat.state == EXPORT_CANCELLING);
1724
1725 if (stat.notify_ack_waiting.empty()) {
1726 stat.state = EXPORT_CANCELLED;
1727 return;
1728 }
1729
1730 dir->auth_pin(this);
1731
1732 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
1733 p != stat.notify_ack_waiting.end();
1734 ++p) {
1735 MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(),stat.tid, true,
1736 pair<int,int>(mds->get_nodeid(),stat.peer),
1737 pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN));
1738 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
1739 notify->get_bounds().push_back((*i)->dirfrag());
1740 mds->send_message_mds(notify, *p);
1741 }
1742}
1743
1744/*
1745 * this happens if hte dest failes after i send teh export data but before it is acked
1746 * that is, we don't know they safely received and logged it, so we reverse our changes
1747 * and go on.
1748 */
1749void Migrator::export_reverse(CDir *dir)
1750{
1751 dout(7) << "export_reverse " << *dir << dendl;
1752
1753 set<CInode*> to_eval;
1754
1755 set<CDir*> bounds;
1756 cache->get_subtree_bounds(dir, bounds);
1757
1758 // remove exporting pins
1759 list<CDir*> rq;
1760 rq.push_back(dir);
1761 while (!rq.empty()) {
1762 CDir *t = rq.front();
1763 rq.pop_front();
1764 t->abort_export();
1765 for (CDir::map_t::iterator p = t->items.begin(); p != t->items.end(); ++p) {
1766 p->second->abort_export();
1767 if (!p->second->get_linkage()->is_primary())
1768 continue;
1769 CInode *in = p->second->get_linkage()->get_inode();
1770 in->abort_export();
1771 if (in->state_test(CInode::STATE_EVALSTALECAPS)) {
1772 in->state_clear(CInode::STATE_EVALSTALECAPS);
1773 to_eval.insert(in);
1774 }
1775 if (in->is_dir())
1776 in->get_nested_dirfrags(rq);
1777 }
1778 }
1779
1780 // unpin bounds
1781 for (const auto &bd : bounds) {
1782 bd->put(CDir::PIN_EXPORTBOUND);
1783 bd->state_clear(CDir::STATE_EXPORTBOUND);
1784 }
1785
1786 // adjust auth, with possible subtree merge.
1787 cache->adjust_subtree_auth(dir, mds->get_nodeid());
1788 cache->try_subtree_merge(dir);
1789
1790 // notify bystanders
1791 export_notify_abort(dir, bounds);
1792
1793 // process delayed expires
1794 cache->process_delayed_expire(dir);
1795
1796 // unfreeze
1797 dir->unfreeze_tree();
1798
1799 // revoke/resume stale caps
1800 for (auto in : to_eval) {
1801 bool need_issue = false;
1802 for (auto& p : in->get_client_caps()) {
1803 Capability *cap = p.second;
1804 if (cap->is_stale()) {
1805 mds->locker->revoke_stale_caps(cap);
1806 } else {
1807 need_issue = true;
1808 }
1809 }
1810 if (need_issue &&
1811 (!in->is_auth() || !mds->locker->eval(in, CEPH_CAP_LOCKS)))
1812 mds->locker->issue_caps(in);
1813 }
1814
1815 cache->show_cache();
1816}
1817
1818
1819/*
1820 * once i get the ack, and logged the EExportFinish(true),
1821 * send notifies (if any), otherwise go straight to finish.
1822 *
1823 */
1824void Migrator::export_logged_finish(CDir *dir)
1825{
1826 dout(7) << "export_logged_finish " << *dir << dendl;
1827
1828 export_state_t& stat = export_state[dir];
1829
1830 // send notifies
1831 set<CDir*> bounds;
1832 cache->get_subtree_bounds(dir, bounds);
1833
1834 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
1835 p != stat.notify_ack_waiting.end();
1836 ++p) {
1837 MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), stat.tid, true,
1838 pair<int,int>(mds->get_nodeid(), stat.peer),
1839 pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
1840
1841 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
1842 notify->get_bounds().push_back((*i)->dirfrag());
1843
1844 mds->send_message_mds(notify, *p);
1845 }
1846
1847 // wait for notifyacks
1848 stat.state = EXPORT_NOTIFYING;
1849 assert (g_conf->mds_kill_export_at != 11);
1850
1851 // no notifies to wait for?
1852 if (stat.notify_ack_waiting.empty()) {
1853 export_finish(dir); // skip notify/notify_ack stage.
1854 } else {
1855 // notify peer to send cap import messages to clients
1856 if (!mds->is_cluster_degraded() ||
1857 mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) {
1858 mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), false, stat.tid), stat.peer);
1859 } else {
1860 dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
1861 }
1862 }
1863}
1864
1865/*
1866 * warning:
1867 * i'll get an ack from each bystander.
1868 * when i get them all, do the export.
1869 * notify:
1870 * i'll get an ack from each bystander.
1871 * when i get them all, unfreeze and send the finish.
1872 *
1873 * This function DOES put the passed message before returning
1874 */
1875void Migrator::handle_export_notify_ack(MExportDirNotifyAck *m)
1876{
1877 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
1878 mds_rank_t dest(m->get_source().num());
1879 utime_t now = ceph_clock_now();
1880 assert(dir);
1881 mds_rank_t from = mds_rank_t(m->get_source().num());
1882
1883 mds->hit_export_target(now, dest, -1);
1884
1885 auto export_state_entry = export_state.find(dir);
1886 if (export_state_entry != export_state.end()) {
1887 export_state_t& stat = export_state_entry->second;
1888 if (stat.state == EXPORT_WARNING &&
1889 stat.warning_ack_waiting.erase(from)) {
1890 // exporting. process warning.
1891 dout(7) << "handle_export_notify_ack from " << m->get_source()
1892 << ": exporting, processing warning on " << *dir << dendl;
1893 if (stat.warning_ack_waiting.empty())
1894 export_go(dir); // start export.
1895 } else if (stat.state == EXPORT_NOTIFYING &&
1896 stat.notify_ack_waiting.erase(from)) {
1897 // exporting. process notify.
1898 dout(7) << "handle_export_notify_ack from " << m->get_source()
1899 << ": exporting, processing notify on " << *dir << dendl;
1900 if (stat.notify_ack_waiting.empty())
1901 export_finish(dir);
1902 } else if (stat.state == EXPORT_CANCELLING &&
1903 m->get_new_auth().second == CDIR_AUTH_UNKNOWN && // not warning ack
1904 stat.notify_ack_waiting.erase(from)) {
1905 dout(7) << "handle_export_notify_ack from " << m->get_source()
1906 << ": cancelling export, processing notify on " << *dir << dendl;
1907 if (stat.notify_ack_waiting.empty()) {
1908 export_state.erase(export_state_entry);
1909 export_cancel_finish(dir);
1910 }
1911 }
1912 }
1913 else {
1914 auto import_state_entry = import_state.find(dir->dirfrag());
1915 if (import_state_entry != import_state.end()) {
1916 import_state_t& stat = import_state_entry->second;
1917 if (stat.state == IMPORT_ABORTING) {
1918 // reversing import
1919 dout(7) << "handle_export_notify_ack from " << m->get_source()
1920 << ": aborting import on " << *dir << dendl;
1921 assert(stat.bystanders.count(from));
1922 stat.bystanders.erase(from);
1923 if (stat.bystanders.empty())
1924 import_reverse_unfreeze(dir);
1925 }
1926 }
1927 }
1928
1929 m->put();
1930}
1931
1932void Migrator::export_finish(CDir *dir)
1933{
1934 dout(5) << "export_finish " << *dir << dendl;
1935
1936 assert (g_conf->mds_kill_export_at != 12);
1937 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1938 if (it == export_state.end()) {
1939 dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << dendl;
1940 return;
1941 }
1942
1943 // send finish/commit to new auth
1944 if (!mds->is_cluster_degraded() ||
1945 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
1946 mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), true, it->second.tid), it->second.peer);
1947 } else {
1948 dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl;
1949 }
1950 assert(g_conf->mds_kill_export_at != 13);
1951
1952 // finish export (adjust local cache state)
1953 int num_dentries = 0;
1954 C_ContextsBase<MDSInternalContextBase, MDSInternalContextGather> *fin = new C_ContextsBase<MDSInternalContextBase, MDSInternalContextGather>(g_ceph_context);
1955 finish_export_dir(dir, ceph_clock_now(), it->second.peer,
1956 it->second.peer_imported, fin->contexts, &num_dentries);
1957
1958 // unpin bounds
1959 set<CDir*> bounds;
1960 cache->get_subtree_bounds(dir, bounds);
1961 for (set<CDir*>::iterator p = bounds.begin();
1962 p != bounds.end();
1963 ++p) {
1964 CDir *bd = *p;
1965 bd->put(CDir::PIN_EXPORTBOUND);
1966 bd->state_clear(CDir::STATE_EXPORTBOUND);
1967 }
1968
1969 if (dir->state_test(CDir::STATE_AUXSUBTREE))
1970 dir->state_clear(CDir::STATE_AUXSUBTREE);
1971
1972 // adjust auth, with possible subtree merge.
1973 // (we do this _after_ removing EXPORTBOUND pins, to allow merges)
1974 cache->adjust_subtree_auth(dir, it->second.peer);
1975 cache->try_subtree_merge(dir);
1976
1977 // no more auth subtree? clear scatter dirty
1978 if (!dir->get_inode()->is_auth() &&
1979 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
1980 dir->get_inode()->clear_scatter_dirty();
1981 // wake up scatter_nudge waiters
1982 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
1983 }
1984
1985 dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
1986
1987 // unfreeze
1988 dout(7) << "export_finish unfreezing" << dendl;
1989 dir->unfreeze_tree();
1990
1991 // discard delayed expires
1992 cache->discard_delayed_expire(dir);
1993
1994 MutationRef mut = it->second.mut;
1995 // remove from exporting list, clean up state
1996 export_state.erase(it);
1997 dir->state_clear(CDir::STATE_EXPORTING);
1998
1999 cache->show_subtrees();
2000 audit();
2001
2002 cache->trim(-1, num_dentries); // try trimming exported dentries
2003
2004 // send pending import_maps?
2005 mds->mdcache->maybe_send_pending_resolves();
2006
2007 // drop locks, unpin path
2008 if (mut) {
2009 mds->locker->drop_locks(mut.get());
2010 mut->cleanup();
2011 }
2012
2013 maybe_do_queued_export();
2014}
2015
2016
2017
2018
2019
2020
2021
2022
2023// ==========================================================
2024// IMPORT
2025
2026void Migrator::handle_export_discover(MExportDirDiscover *m)
2027{
2028 mds_rank_t from = m->get_source_mds();
2029 assert(from != mds->get_nodeid());
2030
2031 dout(7) << "handle_export_discover on " << m->get_path() << dendl;
2032
2033 // note import state
2034 dirfrag_t df = m->get_dirfrag();
2035 // only start discovering on this message once.
2036 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2037 if (!m->started) {
2038 assert(it == import_state.end());
2039 m->started = true;
2040 import_state[df].state = IMPORT_DISCOVERING;
2041 import_state[df].peer = from;
2042 import_state[df].tid = m->get_tid();
2043 } else {
2044 // am i retrying after ancient path_traverse results?
2045 if (it == import_state.end() ||
2046 it->second.peer != from ||
2047 it->second.tid != m->get_tid()) {
2048 dout(7) << " dropping obsolete message" << dendl;
2049 m->put();
2050 return;
2051 }
2052 assert(it->second.state == IMPORT_DISCOVERING);
2053 }
2054
2055 if (!mds->mdcache->is_open()) {
2056 dout(5) << " waiting for root" << dendl;
2057 mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m));
2058 return;
2059 }
2060
2061 assert (g_conf->mds_kill_import_at != 1);
2062
2063 // do we have it?
2064 CInode *in = cache->get_inode(m->get_dirfrag().ino);
2065 if (!in) {
2066 // must discover it!
2067 filepath fpath(m->get_path());
2068 vector<CDentry*> trace;
2069 MDRequestRef null_ref;
2070 int r = cache->path_traverse(null_ref, m, NULL, fpath, &trace, NULL, MDS_TRAVERSE_DISCOVER);
2071 if (r > 0) return;
2072 if (r < 0) {
2073 dout(7) << "handle_export_discover_2 failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
2074 ceph_abort(); // this shouldn't happen if the auth pins its path properly!!!!
2075 }
2076
2077 ceph_abort(); // this shouldn't happen; the get_inode above would have succeeded.
2078 }
2079
2080 // yay
2081 dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl;
2082
2083 import_state[df].state = IMPORT_DISCOVERED;
2084
2085 // pin inode in the cache (for now)
2086 assert(in->is_dir());
2087 in->get(CInode::PIN_IMPORTING);
2088
2089 // reply
2090 dout(7) << " sending export_discover_ack on " << *in << dendl;
2091 mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), import_state[df].peer);
2092 m->put();
2093 assert (g_conf->mds_kill_import_at != 2);
2094}
2095
2096void Migrator::import_reverse_discovering(dirfrag_t df)
2097{
2098 import_state.erase(df);
2099}
2100
2101void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
2102{
2103 // unpin base
2104 diri->put(CInode::PIN_IMPORTING);
2105 import_state.erase(df);
2106}
2107
2108void Migrator::import_reverse_prepping(CDir *dir)
2109{
2110 set<CDir*> bounds;
2111 cache->map_dirfrag_set(import_state[dir->dirfrag()].bound_ls, bounds);
2112 import_remove_pins(dir, bounds);
2113 import_reverse_final(dir);
2114}
2115
2116/* This function DOES put the passed message before returning*/
2117void Migrator::handle_export_cancel(MExportDirCancel *m)
2118{
2119 dout(7) << "handle_export_cancel on " << m->get_dirfrag() << dendl;
2120 dirfrag_t df = m->get_dirfrag();
2121 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2122 if (it == import_state.end()) {
2123 assert(0 == "got export_cancel in weird state");
2124 } else if (it->second.state == IMPORT_DISCOVERING) {
2125 import_reverse_discovering(df);
2126 } else if (it->second.state == IMPORT_DISCOVERED) {
2127 CInode *in = cache->get_inode(df.ino);
2128 assert(in);
2129 import_reverse_discovered(df, in);
2130 } else if (it->second.state == IMPORT_PREPPING) {
2131 CDir *dir = mds->mdcache->get_dirfrag(df);
2132 assert(dir);
2133 import_reverse_prepping(dir);
2134 } else if (it->second.state == IMPORT_PREPPED) {
2135 CDir *dir = mds->mdcache->get_dirfrag(df);
2136 assert(dir);
2137 set<CDir*> bounds;
2138 cache->get_subtree_bounds(dir, bounds);
2139 import_remove_pins(dir, bounds);
2140 // adjust auth back to the exportor
2141 cache->adjust_subtree_auth(dir, it->second.peer);
2142 cache->try_subtree_merge(dir);
2143 import_reverse_unfreeze(dir);
2144 } else {
2145 assert(0 == "got export_cancel in weird state");
2146 }
2147 m->put();
2148}
2149
2150/* This function DOES put the passed message before returning*/
2151void Migrator::handle_export_prep(MExportDirPrep *m)
2152{
2153 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
2154 assert(oldauth != mds->get_nodeid());
2155
2156 CDir *dir;
2157 CInode *diri;
2158 list<MDSInternalContextBase*> finished;
2159
2160 // assimilate root dir.
2161 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
2162 if (!m->did_assim()) {
2163 assert(it != import_state.end());
2164 assert(it->second.state == IMPORT_DISCOVERED);
31f18b77 2165 assert(it->second.peer == oldauth);
7c673cae
FG
2166 diri = cache->get_inode(m->get_dirfrag().ino);
2167 assert(diri);
2168 bufferlist::iterator p = m->basedir.begin();
2169 dir = cache->add_replica_dir(p, diri, oldauth, finished);
2170 dout(7) << "handle_export_prep on " << *dir << " (first pass)" << dendl;
2171 } else {
2172 if (it == import_state.end() ||
2173 it->second.peer != oldauth ||
2174 it->second.tid != m->get_tid()) {
2175 dout(7) << "handle_export_prep obsolete message, dropping" << dendl;
2176 m->put();
2177 return;
2178 }
2179 assert(it->second.state == IMPORT_PREPPING);
31f18b77 2180 assert(it->second.peer == oldauth);
7c673cae
FG
2181
2182 dir = cache->get_dirfrag(m->get_dirfrag());
2183 assert(dir);
2184 dout(7) << "handle_export_prep on " << *dir << " (subsequent pass)" << dendl;
2185 diri = dir->get_inode();
2186 }
2187 assert(dir->is_auth() == false);
2188
2189 cache->show_subtrees();
2190
2191 // build import bound map
2192 map<inodeno_t, fragset_t> import_bound_fragset;
2193 for (list<dirfrag_t>::iterator p = m->get_bounds().begin();
2194 p != m->get_bounds().end();
2195 ++p) {
2196 dout(10) << " bound " << *p << dendl;
2197 import_bound_fragset[p->ino].insert(p->frag);
2198 }
2199
2200 // assimilate contents?
2201 if (!m->did_assim()) {
2202 dout(7) << "doing assim on " << *dir << dendl;
2203 m->mark_assim(); // only do this the first time!
2204
2205 // change import state
2206 it->second.state = IMPORT_PREPPING;
2207 it->second.bound_ls = m->get_bounds();
2208 it->second.bystanders = m->get_bystanders();
2209 assert(g_conf->mds_kill_import_at != 3);
2210
2211 // bystander list
2212 dout(7) << "bystanders are " << it->second.bystanders << dendl;
2213
2214 // move pin to dir
2215 diri->put(CInode::PIN_IMPORTING);
2216 dir->get(CDir::PIN_IMPORTING);
2217 dir->state_set(CDir::STATE_IMPORTING);
2218
2219 // assimilate traces to exports
2220 // each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
2221 for (list<bufferlist>::iterator p = m->traces.begin();
2222 p != m->traces.end();
2223 ++p) {
2224 bufferlist::iterator q = p->begin();
2225 dirfrag_t df;
2226 ::decode(df, q);
2227 char start;
2228 ::decode(start, q);
2229 dout(10) << " trace from " << df << " start " << start << " len " << p->length() << dendl;
2230
2231 CDir *cur = 0;
2232 if (start == 'd') {
2233 cur = cache->get_dirfrag(df);
2234 assert(cur);
2235 dout(10) << " had " << *cur << dendl;
2236 } else if (start == 'f') {
2237 CInode *in = cache->get_inode(df.ino);
2238 assert(in);
2239 dout(10) << " had " << *in << dendl;
2240 cur = cache->add_replica_dir(q, in, oldauth, finished);
2241 dout(10) << " added " << *cur << dendl;
2242 } else if (start == '-') {
2243 // nothing
2244 } else
2245 assert(0 == "unrecognized start char");
2246
2247 while (start != '-') {
2248 CDentry *dn = cache->add_replica_dentry(q, cur, finished);
2249 dout(10) << " added " << *dn << dendl;
2250 CInode *in = cache->add_replica_inode(q, dn, finished);
2251 dout(10) << " added " << *in << dendl;
2252 if (q.end())
2253 break;
2254 cur = cache->add_replica_dir(q, in, oldauth, finished);
2255 dout(10) << " added " << *cur << dendl;
2256 }
2257 }
2258
2259 // make bound sticky
2260 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2261 p != import_bound_fragset.end();
2262 ++p) {
2263 CInode *in = cache->get_inode(p->first);
2264 assert(in);
2265 in->get_stickydirs();
2266 dout(7) << " set stickydirs on bound inode " << *in << dendl;
2267 }
2268
2269 } else {
2270 dout(7) << " not doing assim on " << *dir << dendl;
2271 }
2272
2273 if (!finished.empty())
2274 mds->queue_waiters(finished);
2275
2276
2277 // open all bounds
2278 set<CDir*> import_bounds;
2279 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2280 p != import_bound_fragset.end();
2281 ++p) {
2282 CInode *in = cache->get_inode(p->first);
2283 assert(in);
2284
2285 // map fragset into a frag_t list, based on the inode fragtree
2286 list<frag_t> fglist;
2287 for (set<frag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q)
2288 in->dirfragtree.get_leaves_under(*q, fglist);
2289 dout(10) << " bound inode " << p->first << " fragset " << p->second << " maps to " << fglist << dendl;
2290
2291 for (list<frag_t>::iterator q = fglist.begin();
2292 q != fglist.end();
2293 ++q) {
2294 CDir *bound = cache->get_dirfrag(dirfrag_t(p->first, *q));
2295 if (!bound) {
2296 dout(7) << " opening bounding dirfrag " << *q << " on " << *in << dendl;
2297 cache->open_remote_dirfrag(in, *q,
2298 new C_MDS_RetryMessage(mds, m));
2299 return;
2300 }
2301
2302 if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
2303 dout(7) << " pinning import bound " << *bound << dendl;
2304 bound->get(CDir::PIN_IMPORTBOUND);
2305 bound->state_set(CDir::STATE_IMPORTBOUND);
2306 } else {
2307 dout(7) << " already pinned import bound " << *bound << dendl;
2308 }
2309 import_bounds.insert(bound);
2310 }
2311 }
2312
2313 dout(7) << " all ready, noting auth and freezing import region" << dendl;
2314
2315 bool success = true;
2316 if (!mds->mdcache->is_readonly() &&
2317 dir->get_inode()->filelock.can_wrlock(-1) &&
2318 dir->get_inode()->nestlock.can_wrlock(-1)) {
2319 it->second.mut = new MutationImpl();
2320 // force some locks. hacky.
2321 mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);
2322 mds->locker->wrlock_force(&dir->inode->nestlock, it->second.mut);
2323
2324 // note that i am an ambiguous auth for this subtree.
2325 // specify bounds, since the exporter explicitly defines the region.
2326 cache->adjust_bounded_subtree_auth(dir, import_bounds,
2327 pair<int,int>(oldauth, mds->get_nodeid()));
2328 cache->verify_subtree_bounds(dir, import_bounds);
2329 // freeze.
2330 dir->_freeze_tree();
2331 // note new state
2332 it->second.state = IMPORT_PREPPED;
2333 } else {
2334 dout(7) << " couldn't acquire all needed locks, failing. " << *dir << dendl;
2335 success = false;
2336 import_reverse_prepping(dir);
2337 }
2338
2339 // ok!
2340 dout(7) << " sending export_prep_ack on " << *dir << dendl;
2341 mds->send_message(new MExportDirPrepAck(dir->dirfrag(), success, m->get_tid()), m->get_connection());
2342
2343 assert(g_conf->mds_kill_import_at != 4);
2344 // done
2345 m->put();
2346}
2347
2348
2349
2350
2351class C_MDS_ImportDirLoggedStart : public MigratorLogContext {
2352 dirfrag_t df;
2353 CDir *dir;
2354 mds_rank_t from;
2355public:
2356 map<client_t,entity_inst_t> imported_client_map;
2357 map<client_t,uint64_t> sseqmap;
2358
2359 C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
2360 MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
2361 }
2362 void finish(int r) override {
2363 mig->import_logged_start(df, dir, from, imported_client_map, sseqmap);
2364 }
2365};
2366
2367/* This function DOES put the passed message before returning*/
2368void Migrator::handle_export_dir(MExportDir *m)
2369{
2370 assert (g_conf->mds_kill_import_at != 5);
2371 CDir *dir = cache->get_dirfrag(m->dirfrag);
2372 assert(dir);
31f18b77
FG
2373
2374 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
2375 dout(7) << "handle_export_dir importing " << *dir << " from " << oldauth << dendl;
2376
2377 assert(!dir->is_auth());
7c673cae
FG
2378
2379 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->dirfrag);
2380 assert(it != import_state.end());
2381 assert(it->second.state == IMPORT_PREPPED);
2382 assert(it->second.tid == m->get_tid());
31f18b77 2383 assert(it->second.peer == oldauth);
7c673cae
FG
2384
2385 utime_t now = ceph_clock_now();
7c673cae
FG
2386
2387 if (!dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()))
2388 dir->get_inode()->dirfragtree.force_to_leaf(g_ceph_context, dir->get_frag());
2389
2390 cache->show_subtrees();
2391
31f18b77 2392 C_MDS_ImportDirLoggedStart *onlogged = new C_MDS_ImportDirLoggedStart(this, dir, oldauth);
7c673cae
FG
2393
2394 // start the journal entry
31f18b77 2395 EImportStart *le = new EImportStart(mds->mdlog, dir->dirfrag(), m->bounds, oldauth);
7c673cae
FG
2396 mds->mdlog->start_entry(le);
2397
2398 le->metablob.add_dir_context(dir);
2399
2400 // adjust auth (list us _first_)
2401 cache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth);
2402
2403 // new client sessions, open these after we journal
2404 // include imported sessions in EImportStart
2405 bufferlist::iterator cmp = m->client_map.begin();
2406 ::decode(onlogged->imported_client_map, cmp);
2407 assert(cmp.end());
2408 le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map, onlogged->sseqmap);
2409 le->client_map.claim(m->client_map);
2410
2411 bufferlist::iterator blp = m->export_data.begin();
2412 int num_imported_inodes = 0;
2413 while (!blp.end()) {
2414 num_imported_inodes +=
2415 decode_import_dir(blp,
2416 oldauth,
2417 dir, // import root
2418 le,
2419 mds->mdlog->get_current_segment(),
2420 it->second.peer_exports,
2421 it->second.updated_scatterlocks,
2422 now);
2423 }
2424 dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
2425
2426 // include bounds in EImportStart
2427 set<CDir*> import_bounds;
2428 for (vector<dirfrag_t>::iterator p = m->bounds.begin();
2429 p != m->bounds.end();
2430 ++p) {
2431 CDir *bd = cache->get_dirfrag(*p);
2432 assert(bd);
2433 le->metablob.add_dir(bd, false); // note that parent metadata is already in the event
2434 import_bounds.insert(bd);
2435 }
2436 cache->verify_subtree_bounds(dir, import_bounds);
2437
2438 // adjust popularity
2439 mds->balancer->add_import(dir, now);
2440
2441 dout(7) << "handle_export_dir did " << *dir << dendl;
2442
2443 // note state
2444 it->second.state = IMPORT_LOGGINGSTART;
2445 assert (g_conf->mds_kill_import_at != 6);
2446
2447 // log it
2448 mds->mdlog->submit_entry(le, onlogged);
2449 mds->mdlog->flush();
2450
2451 // some stats
2452 if (mds->logger) {
2453 mds->logger->inc(l_mds_imported);
2454 mds->logger->inc(l_mds_imported_inodes, num_imported_inodes);
2455 }
2456
2457 m->put();
2458}
2459
2460
2461/*
2462 * this is an import helper
2463 * called by import_finish, and import_reverse and friends.
2464 */
2465void Migrator::import_remove_pins(CDir *dir, set<CDir*>& bounds)
2466{
2467 import_state_t& stat = import_state[dir->dirfrag()];
2468 // root
2469 dir->put(CDir::PIN_IMPORTING);
2470 dir->state_clear(CDir::STATE_IMPORTING);
2471
2472 // bounding inodes
2473 set<inodeno_t> did;
2474 for (list<dirfrag_t>::iterator p = stat.bound_ls.begin();
2475 p != stat.bound_ls.end();
2476 ++p) {
2477 if (did.count(p->ino))
2478 continue;
2479 did.insert(p->ino);
2480 CInode *in = cache->get_inode(p->ino);
2481 assert(in);
2482 in->put_stickydirs();
2483 }
2484
2485 if (stat.state == IMPORT_PREPPING) {
2486 for (auto bd : bounds) {
2487 if (bd->state_test(CDir::STATE_IMPORTBOUND)) {
2488 bd->put(CDir::PIN_IMPORTBOUND);
2489 bd->state_clear(CDir::STATE_IMPORTBOUND);
2490 }
2491 }
2492 } else if (stat.state >= IMPORT_PREPPED) {
2493 // bounding dirfrags
2494 for (auto bd : bounds) {
2495 assert(bd->state_test(CDir::STATE_IMPORTBOUND));
2496 bd->put(CDir::PIN_IMPORTBOUND);
2497 bd->state_clear(CDir::STATE_IMPORTBOUND);
2498 }
2499 }
2500}
2501
2502
2503/*
2504 * note: this does teh full work of reversing and import and cleaning up
2505 * state.
2506 * called by both handle_mds_failure and by handle_resolve (if we are
2507 * a survivor coping with an exporter failure+recovery).
2508 */
2509void Migrator::import_reverse(CDir *dir)
2510{
2511 dout(7) << "import_reverse " << *dir << dendl;
2512
2513 import_state_t& stat = import_state[dir->dirfrag()];
2514 stat.state = IMPORT_ABORTING;
2515
2516 set<CDir*> bounds;
2517 cache->get_subtree_bounds(dir, bounds);
2518
2519 // remove pins
2520 import_remove_pins(dir, bounds);
2521
2522 // update auth, with possible subtree merge.
2523 assert(dir->is_subtree_root());
2524 if (mds->is_resolve())
2525 cache->trim_non_auth_subtree(dir);
2526
2527 cache->adjust_subtree_auth(dir, stat.peer);
2528
2529 C_ContextsBase<MDSInternalContextBase, MDSInternalContextGather> *fin = new C_ContextsBase<MDSInternalContextBase, MDSInternalContextGather>(g_ceph_context);
2530 if (!dir->get_inode()->is_auth() &&
2531 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2532 dir->get_inode()->clear_scatter_dirty();
2533 // wake up scatter_nudge waiters
2534 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2535 }
2536
2537 int num_dentries = 0;
2538 // adjust auth bits.
2539 list<CDir*> q;
2540 q.push_back(dir);
2541 while (!q.empty()) {
2542 CDir *cur = q.front();
2543 q.pop_front();
2544
2545 // dir
2546 assert(cur->is_auth());
2547 cur->state_clear(CDir::STATE_AUTH);
2548 cur->remove_bloom();
2549 cur->clear_replica_map();
2550 cur->set_replica_nonce(CDir::EXPORT_NONCE);
2551 if (cur->is_dirty())
2552 cur->mark_clean();
2553
2554 CDir::map_t::iterator it;
2555 for (it = cur->begin(); it != cur->end(); ++it) {
2556 CDentry *dn = it->second;
2557
2558 // dentry
2559 dn->state_clear(CDentry::STATE_AUTH);
2560 dn->clear_replica_map();
2561 dn->set_replica_nonce(CDentry::EXPORT_NONCE);
2562 if (dn->is_dirty())
2563 dn->mark_clean();
2564
2565 // inode?
2566 if (dn->get_linkage()->is_primary()) {
2567 CInode *in = dn->get_linkage()->get_inode();
2568 in->state_clear(CDentry::STATE_AUTH);
2569 in->clear_replica_map();
2570 in->set_replica_nonce(CInode::EXPORT_NONCE);
2571 if (in->is_dirty())
2572 in->mark_clean();
2573 in->clear_dirty_rstat();
2574 if (!in->has_subtree_root_dirfrag(mds->get_nodeid())) {
2575 in->clear_scatter_dirty();
2576 in->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2577 }
2578
2579 in->clear_dirty_parent();
2580
2581 in->authlock.clear_gather();
2582 in->linklock.clear_gather();
2583 in->dirfragtreelock.clear_gather();
2584 in->filelock.clear_gather();
2585
2586 in->clear_file_locks();
2587
2588 // non-bounding dir?
2589 list<CDir*> dfs;
2590 in->get_dirfrags(dfs);
2591 for (list<CDir*>::iterator p = dfs.begin(); p != dfs.end(); ++p)
2592 if (bounds.count(*p) == 0)
2593 q.push_back(*p);
2594 }
2595
2596 cache->touch_dentry_bottom(dn); // move dentry to tail of LRU
2597 ++num_dentries;
2598 }
2599 }
2600
2601 dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
2602
2603 if (stat.state == IMPORT_ACKING) {
2604 // remove imported caps
2605 for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
2606 p != stat.peer_exports.end();
2607 ++p) {
2608 CInode *in = p->first;
2609 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
2610 q != p->second.end();
2611 ++q) {
2612 Capability *cap = in->get_client_cap(q->first);
2613 assert(cap);
2614 if (cap->is_importing())
2615 in->remove_client_cap(q->first);
2616 }
2617 in->put(CInode::PIN_IMPORTINGCAPS);
2618 }
2619 for (map<client_t,entity_inst_t>::iterator p = stat.client_map.begin();
2620 p != stat.client_map.end();
2621 ++p) {
2622 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
2623 assert(session);
2624 session->dec_importing();
2625 }
2626 }
2627
2628 // log our failure
2629 mds->mdlog->start_submit_entry(new EImportFinish(dir, false)); // log failure
2630
2631 cache->try_subtree_merge(dir);
2632
2633 cache->trim(-1, num_dentries); // try trimming dentries
2634
2635 // notify bystanders; wait in aborting state
2636 import_notify_abort(dir, bounds);
2637}
2638
2639void Migrator::import_notify_finish(CDir *dir, set<CDir*>& bounds)
2640{
2641 dout(7) << "import_notify_finish " << *dir << dendl;
2642
2643 import_state_t& stat = import_state[dir->dirfrag()];
2644 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2645 p != stat.bystanders.end();
2646 ++p) {
2647 MExportDirNotify *notify =
2648 new MExportDirNotify(dir->dirfrag(), stat.tid, false,
2649 pair<int,int>(stat.peer, mds->get_nodeid()),
2650 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
2651 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2652 notify->get_bounds().push_back((*i)->dirfrag());
2653 mds->send_message_mds(notify, *p);
2654 }
2655}
2656
2657void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)
2658{
2659 dout(7) << "import_notify_abort " << *dir << dendl;
2660
2661 import_state_t& stat = import_state[dir->dirfrag()];
2662 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2663 p != stat.bystanders.end(); ) {
2664 if (mds->is_cluster_degraded() &&
2665 !mds->mdsmap->is_clientreplay_or_active_or_stopping(*p)) {
2666 // this can happen if both exporter and bystander fail in the same mdsmap epoch
2667 stat.bystanders.erase(p++);
2668 continue;
2669 }
2670 MExportDirNotify *notify =
2671 new MExportDirNotify(dir->dirfrag(), stat.tid, true,
2672 mds_authority_t(stat.peer, mds->get_nodeid()),
2673 mds_authority_t(stat.peer, CDIR_AUTH_UNKNOWN));
2674 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2675 notify->get_bounds().push_back((*i)->dirfrag());
2676 mds->send_message_mds(notify, *p);
2677 ++p;
2678 }
2679 if (stat.bystanders.empty()) {
2680 dout(7) << "no bystanders, finishing reverse now" << dendl;
2681 import_reverse_unfreeze(dir);
2682 } else {
2683 assert (g_conf->mds_kill_import_at != 10);
2684 }
2685}
2686
2687void Migrator::import_reverse_unfreeze(CDir *dir)
2688{
2689 assert(dir);
2690 dout(7) << "import_reverse_unfreeze " << *dir << dendl;
2691 dir->unfreeze_tree();
2692 cache->discard_delayed_expire(dir);
2693 import_reverse_final(dir);
2694}
2695
2696void Migrator::import_reverse_final(CDir *dir)
2697{
2698 dout(7) << "import_reverse_final " << *dir << dendl;
2699
2700 // clean up
2701 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
2702 assert(it != import_state.end());
2703
2704 MutationRef mut = it->second.mut;
2705 import_state.erase(it);
2706
2707 // send pending import_maps?
2708 mds->mdcache->maybe_send_pending_resolves();
2709
2710 if (mut) {
2711 mds->locker->drop_locks(mut.get());
2712 mut->cleanup();
2713 }
2714
2715 cache->show_subtrees();
2716 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
2717}
2718
2719
2720
2721
2722void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
2723 map<client_t,entity_inst_t>& imported_client_map,
2724 map<client_t,uint64_t>& sseqmap)
2725{
2726 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
2727 if (it == import_state.end() ||
2728 it->second.state != IMPORT_LOGGINGSTART) {
2729 dout(7) << "import " << df << " must have aborted" << dendl;
2730 mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
2731 return;
2732 }
2733
2734 dout(7) << "import_logged " << *dir << dendl;
2735
2736 // note state
2737 it->second.state = IMPORT_ACKING;
2738
2739 assert (g_conf->mds_kill_import_at != 7);
2740
2741 // force open client sessions and finish cap import
2742 mds->server->finish_force_open_sessions(imported_client_map, sseqmap, false);
2743 it->second.client_map.swap(imported_client_map);
2744
2745 map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
2746 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
2747 p != it->second.peer_exports.end();
2748 ++p) {
2749 // parameter 'peer' is NONE, delay sending cap import messages to client
2750 finish_import_inode_caps(p->first, MDS_RANK_NONE, true, p->second, imported_caps[p->first->ino()]);
2751 }
2752
2753 // send notify's etc.
2754 dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
2755
2756 // test surviving observer of a failed migration that did not complete
2757 //assert(dir->replica_map.size() < 2 || mds->get_nodeid() != 0);
2758
2759 MExportDirAck *ack = new MExportDirAck(dir->dirfrag(), it->second.tid);
2760 ::encode(imported_caps, ack->imported_caps);
2761
2762 mds->send_message_mds(ack, from);
2763 assert (g_conf->mds_kill_import_at != 8);
2764
2765 cache->show_subtrees();
2766}
2767
2768/* This function DOES put the passed message before returning*/
2769void Migrator::handle_export_finish(MExportDirFinish *m)
2770{
2771 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
2772 assert(dir);
2773 dout(7) << "handle_export_finish on " << *dir << (m->is_last() ? " last" : "") << dendl;
2774
2775 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
2776 assert(it != import_state.end());
2777 assert(it->second.tid == m->get_tid());
2778
2779 import_finish(dir, false, m->is_last());
2780
2781 m->put();
2782}
2783
2784void Migrator::import_finish(CDir *dir, bool notify, bool last)
2785{
2786 dout(7) << "import_finish on " << *dir << dendl;
2787
2788 map<dirfrag_t,import_state_t>::iterator it = import_state.find(dir->dirfrag());
2789 assert(it != import_state.end());
2790 assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING);
2791
2792 // log finish
2793 assert(g_conf->mds_kill_import_at != 9);
2794
2795 if (it->second.state == IMPORT_ACKING) {
2796 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
2797 p != it->second.peer_exports.end();
2798 ++p) {
2799 CInode *in = p->first;
2800 assert(in->is_auth());
2801 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
2802 q != p->second.end();
2803 ++q) {
2804 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
2805 assert(session);
2806 Capability *cap = in->get_client_cap(q->first);
2807 assert(cap);
2808 cap->merge(q->second, true);
2809 cap->clear_importing();
2810 mds->mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
2811 q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
2812 }
2813 p->second.clear();
2814 in->replica_caps_wanted = 0;
2815 }
2816 for (map<client_t,entity_inst_t>::iterator p = it->second.client_map.begin();
2817 p != it->second.client_map.end();
2818 ++p) {
2819 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
2820 assert(session);
2821 session->dec_importing();
2822 }
2823 }
2824
2825 if (!last) {
2826 assert(it->second.state == IMPORT_ACKING);
2827 it->second.state = IMPORT_FINISHING;
2828 return;
2829 }
2830
2831 // remove pins
2832 set<CDir*> bounds;
2833 cache->get_subtree_bounds(dir, bounds);
2834
2835 if (notify)
2836 import_notify_finish(dir, bounds);
2837
2838 import_remove_pins(dir, bounds);
2839
2840 map<CInode*, map<client_t,Capability::Export> > peer_exports;
2841 it->second.peer_exports.swap(peer_exports);
2842
2843 // clear import state (we're done!)
2844 MutationRef mut = it->second.mut;
2845 import_state.erase(it);
2846
2847 // adjust auth, with possible subtree merge.
2848 cache->adjust_subtree_auth(dir, mds->get_nodeid());
2849
2850 mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
2851
2852 cache->try_subtree_merge(dir);
2853
2854 // process delayed expires
2855 cache->process_delayed_expire(dir);
2856
2857 // ok now unfreeze (and thus kick waiters)
2858 dir->unfreeze_tree();
2859 cache->show_subtrees();
2860 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
2861
2862 if (mut) {
2863 mds->locker->drop_locks(mut.get());
2864 mut->cleanup();
2865 }
2866
2867 // re-eval imported caps
2868 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = peer_exports.begin();
2869 p != peer_exports.end();
2870 ++p) {
2871 if (p->first->is_auth())
2872 mds->locker->eval(p->first, CEPH_CAP_LOCKS, true);
2873 p->first->put(CInode::PIN_IMPORTINGCAPS);
2874 }
2875
2876 // send pending import_maps?
2877 mds->mdcache->maybe_send_pending_resolves();
2878
2879 // did i just import mydir?
2880 if (dir->ino() == MDS_INO_MDSDIR(mds->get_nodeid()))
2881 cache->populate_mydir();
2882
2883 // is it empty?
2884 if (dir->get_num_head_items() == 0 &&
2885 !dir->inode->is_auth()) {
2886 // reexport!
2887 export_empty_import(dir);
2888 }
2889}
2890
2891
2892void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp,
2893 mds_rank_t oldauth, LogSegment *ls,
2894 map<CInode*, map<client_t,Capability::Export> >& peer_exports,
2895 list<ScatterLock*>& updated_scatterlocks)
2896{
2897 dout(15) << "decode_import_inode on " << *dn << dendl;
2898
2899 inodeno_t ino;
2900 snapid_t last;
2901 ::decode(ino, blp);
2902 ::decode(last, blp);
2903
2904 bool added = false;
2905 CInode *in = cache->get_inode(ino, last);
2906 if (!in) {
2907 in = new CInode(mds->mdcache, true, 1, last);
2908 added = true;
2909 }
2910
2911 // state after link -- or not! -sage
2912 in->decode_import(blp, ls); // cap imports are noted for later action
2913
2914 // caps
2915 decode_import_inode_caps(in, true, blp, peer_exports);
2916
2917 // link before state -- or not! -sage
2918 if (dn->get_linkage()->get_inode() != in) {
2919 assert(!dn->get_linkage()->get_inode());
2920 dn->dir->link_primary_inode(dn, in);
2921 }
2922
2923 // add inode?
2924 if (added) {
2925 cache->add_inode(in);
2926 dout(10) << "added " << *in << dendl;
2927 } else {
2928 dout(10) << " had " << *in << dendl;
2929 }
2930
2931 if (in->inode.is_dirty_rstat())
2932 in->mark_dirty_rstat();
2933
2934 // clear if dirtyscattered, since we're going to journal this
2935 // but not until we _actually_ finish the import...
2936 if (in->filelock.is_dirty()) {
2937 updated_scatterlocks.push_back(&in->filelock);
2938 mds->locker->mark_updated_scatterlock(&in->filelock);
2939 }
2940
2941 if (in->dirfragtreelock.is_dirty()) {
2942 updated_scatterlocks.push_back(&in->dirfragtreelock);
2943 mds->locker->mark_updated_scatterlock(&in->dirfragtreelock);
2944 }
2945
2946 // adjust replica list
2947 //assert(!in->is_replica(oldauth)); // not true on failed export
2948 in->add_replica(oldauth, CInode::EXPORT_NONCE);
2949 if (in->is_replica(mds->get_nodeid()))
2950 in->remove_replica(mds->get_nodeid());
2951}
2952
2953void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
2954 bufferlist::iterator &blp,
2955 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
2956{
2957 map<client_t,Capability::Export> cap_map;
2958 ::decode(cap_map, blp);
2959 if (auth_cap)
2960 ::decode(in->get_mds_caps_wanted(), blp);
2961 if (!cap_map.empty() ||
2962 (auth_cap && !in->get_mds_caps_wanted().empty())) {
2963 peer_exports[in].swap(cap_map);
2964 in->get(CInode::PIN_IMPORTINGCAPS);
2965 }
2966}
2967
2968void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
2969 map<client_t,Capability::Export> &export_map,
2970 map<client_t,Capability::Import> &import_map)
2971{
2972 for (map<client_t,Capability::Export>::iterator it = export_map.begin();
2973 it != export_map.end();
2974 ++it) {
2975 dout(10) << "finish_import_inode_caps for client." << it->first << " on " << *in << dendl;
2976 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(it->first.v));
2977 assert(session);
2978
2979 Capability *cap = in->get_client_cap(it->first);
2980 if (!cap) {
2981 cap = in->add_client_cap(it->first, session);
2982 if (peer < 0)
2983 cap->mark_importing();
2984 }
2985
2986 Capability::Import& im = import_map[it->first];
2987 im.cap_id = cap->get_cap_id();
2988 im.mseq = auth_cap ? it->second.mseq : cap->get_mseq();
2989 im.issue_seq = cap->get_last_seq() + 1;
2990
2991 if (peer >= 0) {
2992 cap->merge(it->second, auth_cap);
2993 mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
2994 it->second.seq, it->second.mseq - 1, peer,
2995 auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
2996 }
2997 }
2998
2999 if (peer >= 0) {
3000 in->replica_caps_wanted = 0;
3001 in->put(CInode::PIN_IMPORTINGCAPS);
3002 }
3003}
3004
3005int Migrator::decode_import_dir(bufferlist::iterator& blp,
3006 mds_rank_t oldauth,
3007 CDir *import_root,
3008 EImportStart *le,
3009 LogSegment *ls,
3010 map<CInode*,map<client_t,Capability::Export> >& peer_exports,
3011 list<ScatterLock*>& updated_scatterlocks, utime_t now)
3012{
3013 // set up dir
3014 dirfrag_t df;
3015 ::decode(df, blp);
3016
3017 CInode *diri = cache->get_inode(df.ino);
3018 assert(diri);
3019 CDir *dir = diri->get_or_open_dirfrag(mds->mdcache, df.frag);
3020 assert(dir);
3021
3022 dout(7) << "decode_import_dir " << *dir << dendl;
3023
3024 // assimilate state
3025 dir->decode_import(blp, now, ls);
3026
3027 // adjust replica list
3028 //assert(!dir->is_replica(oldauth)); // not true on failed export
3029 dir->add_replica(oldauth, CDir::EXPORT_NONCE);
3030 if (dir->is_replica(mds->get_nodeid()))
3031 dir->remove_replica(mds->get_nodeid());
3032
3033 // add to journal entry
3034 if (le)
3035 le->metablob.add_import_dir(dir);
3036
3037 int num_imported = 0;
3038
3039 // take all waiters on this dir
3040 // NOTE: a pass of imported data is guaranteed to get all of my waiters because
3041 // a replica's presense in my cache implies/forces it's presense in authority's.
3042 list<MDSInternalContextBase*> waiters;
3043
3044 dir->take_waiting(CDir::WAIT_ANY_MASK, waiters);
3045 for (list<MDSInternalContextBase*>::iterator it = waiters.begin();
3046 it != waiters.end();
3047 ++it)
3048 import_root->add_waiter(CDir::WAIT_UNFREEZE, *it); // UNFREEZE will get kicked both on success or failure
3049
3050 dout(15) << "doing contents" << dendl;
3051
3052 // contents
3053 __u32 nden;
3054 ::decode(nden, blp);
3055
3056 for (; nden>0; nden--) {
3057 num_imported++;
3058
3059 // dentry
3060 string dname;
3061 snapid_t last;
3062 ::decode(dname, blp);
3063 ::decode(last, blp);
3064
3065 CDentry *dn = dir->lookup_exact_snap(dname, last);
3066 if (!dn)
3067 dn = dir->add_null_dentry(dname, 1, last);
3068
3069 dn->decode_import(blp, ls);
3070
3071 dn->add_replica(oldauth, CDentry::EXPORT_NONCE);
3072 if (dn->is_replica(mds->get_nodeid()))
3073 dn->remove_replica(mds->get_nodeid());
3074
3075 // dentry lock in unreadable state can block path traverse
3076 if (dn->lock.get_state() != LOCK_SYNC)
3077 mds->locker->try_eval(&dn->lock, NULL);
3078
3079 dout(15) << "decode_import_dir got " << *dn << dendl;
3080
3081 // points to...
3082 char icode;
3083 ::decode(icode, blp);
3084
3085 if (icode == 'N') {
3086 // null dentry
3087 assert(dn->get_linkage()->is_null());
3088
3089 // fall thru
3090 }
3091 else if (icode == 'L') {
3092 // remote link
3093 inodeno_t ino;
3094 unsigned char d_type;
3095 ::decode(ino, blp);
3096 ::decode(d_type, blp);
3097 if (dn->get_linkage()->is_remote()) {
3098 assert(dn->get_linkage()->get_remote_ino() == ino);
3099 } else {
3100 dir->link_remote_inode(dn, ino, d_type);
3101 }
3102 }
3103 else if (icode == 'I') {
3104 // inode
3105 assert(le);
3106 decode_import_inode(dn, blp, oldauth, ls,
3107 peer_exports, updated_scatterlocks);
3108 }
3109
3110 // add dentry to journal entry
3111 if (le)
3112 le->metablob.add_import_dentry(dn);
3113 }
3114
3115#ifdef MDS_VERIFY_FRAGSTAT
3116 if (dir->is_complete())
3117 dir->verify_fragstat();
3118#endif
3119
3120 dir->inode->maybe_export_pin();
3121
3122 dout(7) << "decode_import_dir done " << *dir << dendl;
3123 return num_imported;
3124}
3125
3126
3127
3128
3129
3130// authority bystander
3131
3132/* This function DOES put the passed message before returning*/
3133void Migrator::handle_export_notify(MExportDirNotify *m)
3134{
3135 if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) {
3136 m->put();
3137 return;
3138 }
3139
3140 CDir *dir = cache->get_dirfrag(m->get_dirfrag());
3141
3142 mds_rank_t from = mds_rank_t(m->get_source().num());
3143 mds_authority_t old_auth = m->get_old_auth();
3144 mds_authority_t new_auth = m->get_new_auth();
3145
3146 if (!dir) {
3147 dout(7) << "handle_export_notify " << old_auth << " -> " << new_auth
3148 << " on missing dir " << m->get_dirfrag() << dendl;
3149 } else if (dir->authority() != old_auth) {
3150 dout(7) << "handle_export_notify old_auth was " << dir->authority()
3151 << " != " << old_auth << " -> " << new_auth
3152 << " on " << *dir << dendl;
3153 } else {
3154 dout(7) << "handle_export_notify " << old_auth << " -> " << new_auth
3155 << " on " << *dir << dendl;
3156 // adjust auth
3157 set<CDir*> have;
3158 cache->map_dirfrag_set(m->get_bounds(), have);
3159 cache->adjust_bounded_subtree_auth(dir, have, new_auth);
3160
3161 // induce a merge?
3162 cache->try_subtree_merge(dir);
3163 }
3164
3165 // send ack
3166 if (m->wants_ack()) {
3167 mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag(), m->get_tid(), m->get_new_auth()), from);
3168 } else {
3169 // aborted. no ack.
3170 dout(7) << "handle_export_notify no ack requested" << dendl;
3171 }
3172
3173 m->put();
3174}
3175
3176/** cap exports **/
3177void Migrator::export_caps(CInode *in)
3178{
3179 mds_rank_t dest = in->authority().first;
3180 dout(7) << "export_caps to mds." << dest << " " << *in << dendl;
3181
3182 assert(in->is_any_caps());
3183 assert(!in->is_auth());
3184 assert(!in->is_ambiguous_auth());
3185 assert(!in->state_test(CInode::STATE_EXPORTINGCAPS));
3186
3187 MExportCaps *ex = new MExportCaps;
3188 ex->ino = in->ino();
3189
3190 encode_export_inode_caps(in, false, ex->cap_bl, ex->client_map);
3191
3192 mds->send_message_mds(ex, dest);
3193}
3194
3195void Migrator::handle_gather_caps(MGatherCaps *m)
3196{
3197 CInode *in = cache->get_inode(m->ino);
3198
3199 if (!in)
3200 goto out;
3201
3202 dout(10) << "handle_gather_caps " << *m << " from " << m->get_source()
3203 << " on " << *in
3204 << dendl;
3205 if (in->is_any_caps() &&
3206 !in->is_auth() &&
3207 !in->is_ambiguous_auth() &&
3208 !in->state_test(CInode::STATE_EXPORTINGCAPS))
3209 export_caps(in);
3210
3211out:
3212 m->put();
3213}
3214
3215class C_M_LoggedImportCaps : public MigratorLogContext {
3216 CInode *in;
3217 mds_rank_t from;
3218public:
3219 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3220 map<client_t,entity_inst_t> client_map;
3221 map<client_t,uint64_t> sseqmap;
3222
3223 C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
3224 void finish(int r) override {
3225 mig->logged_import_caps(in, from, peer_exports, client_map, sseqmap);
3226 }
3227};
3228
3229/* This function DOES put the passed message before returning*/
3230void Migrator::handle_export_caps(MExportCaps *ex)
3231{
3232 dout(10) << "handle_export_caps " << *ex << " from " << ex->get_source() << dendl;
3233 CInode *in = cache->get_inode(ex->ino);
3234
3235 assert(in);
3236 assert(in->is_auth());
3237
3238 // FIXME
3239 if (in->is_frozen())
3240 return;
3241
3242 C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
3243 this, in, mds_rank_t(ex->get_source().num()));
3244 finish->client_map = ex->client_map;
3245
3246 // decode new caps
3247 bufferlist::iterator blp = ex->cap_bl.begin();
3248 decode_import_inode_caps(in, false, blp, finish->peer_exports);
3249 assert(!finish->peer_exports.empty()); // thus, inode is pinned.
3250
3251 // journal open client sessions
3252 version_t pv = mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap);
3253
3254 ESessions *le = new ESessions(pv, ex->client_map);
3255 mds->mdlog->start_submit_entry(le, finish);
3256 mds->mdlog->flush();
3257
3258 ex->put();
3259}
3260
3261
3262void Migrator::logged_import_caps(CInode *in,
3263 mds_rank_t from,
3264 map<CInode*, map<client_t,Capability::Export> >& peer_exports,
3265 map<client_t,entity_inst_t>& client_map,
3266 map<client_t,uint64_t>& sseqmap)
3267{
3268 dout(10) << "logged_import_caps on " << *in << dendl;
3269 // see export_go() vs export_go_synced()
3270 assert(in->is_auth());
3271
3272 // force open client sessions and finish cap import
3273 mds->server->finish_force_open_sessions(client_map, sseqmap);
3274
3275 map<client_t,Capability::Import> imported_caps;
3276
3277 assert(peer_exports.count(in));
3278 // clients will release caps from the exporter when they receive the cap import message.
3279 finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps);
3280 mds->locker->eval(in, CEPH_CAP_LOCKS, true);
3281}