]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Migrator.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mds / Migrator.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSRank.h"
16 #include "MDCache.h"
17 #include "CInode.h"
18 #include "CDir.h"
19 #include "CDentry.h"
20 #include "Migrator.h"
21 #include "Locker.h"
22 #include "Server.h"
23
24 #include "MDBalancer.h"
25 #include "MDLog.h"
26 #include "MDSMap.h"
27 #include "Mutation.h"
28
29 #include "include/filepath.h"
30 #include "common/likely.h"
31
32 #include "events/EExport.h"
33 #include "events/EImportStart.h"
34 #include "events/EImportFinish.h"
35 #include "events/ESessions.h"
36
37 #include "msg/Messenger.h"
38
39 #include "messages/MClientCaps.h"
40
41 /*
42 * this is what the dir->dir_auth values look like
43 *
44 * dir_auth authbits
45 * export
46 * me me - before
47 * me, me me - still me, but preparing for export
48 * me, them me - send MExportDir (peer is preparing)
49 * them, me me - journaled EExport
50 * them them - done
51 *
52 * import:
53 * them them - before
54 * me, them me - journaled EImportStart
55 * me me - done
56 *
57 * which implies:
58 * - auth bit is set if i am listed as first _or_ second dir_auth.
59 */
60
61 #include "common/config.h"
62
63
64 #define dout_context g_ceph_context
65 #define dout_subsys ceph_subsys_mds
66 #undef dout_prefix
67 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".mig " << __func__ << " "
68
69 using namespace std;
70
71 class MigratorContext : public MDSContext {
72 protected:
73 Migrator *mig;
74 MDSRank *get_mds() override {
75 return mig->mds;
76 }
77 public:
78 explicit MigratorContext(Migrator *mig_) : mig(mig_) {
79 ceph_assert(mig != NULL);
80 }
81 };
82
83 class MigratorLogContext : public MDSLogContextBase {
84 protected:
85 Migrator *mig;
86 MDSRank *get_mds() override {
87 return mig->mds;
88 }
89 public:
90 explicit MigratorLogContext(Migrator *mig_) : mig(mig_) {
91 ceph_assert(mig != NULL);
92 }
93 };
94
95 void Migrator::dispatch(const cref_t<Message> &m)
96 {
97 switch (m->get_type()) {
98 // import
99 case MSG_MDS_EXPORTDIRDISCOVER:
100 handle_export_discover(ref_cast<MExportDirDiscover>(m));
101 break;
102 case MSG_MDS_EXPORTDIRPREP:
103 handle_export_prep(ref_cast<MExportDirPrep>(m));
104 break;
105 case MSG_MDS_EXPORTDIR:
106 if (unlikely(inject_session_race)) {
107 dout(0) << "waiting for inject_session_race" << dendl;
108 mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m));
109 } else {
110 handle_export_dir(ref_cast<MExportDir>(m));
111 }
112 break;
113 case MSG_MDS_EXPORTDIRFINISH:
114 handle_export_finish(ref_cast<MExportDirFinish>(m));
115 break;
116 case MSG_MDS_EXPORTDIRCANCEL:
117 handle_export_cancel(ref_cast<MExportDirCancel>(m));
118 break;
119
120 // export
121 case MSG_MDS_EXPORTDIRDISCOVERACK:
122 handle_export_discover_ack(ref_cast<MExportDirDiscoverAck>(m));
123 break;
124 case MSG_MDS_EXPORTDIRPREPACK:
125 handle_export_prep_ack(ref_cast<MExportDirPrepAck>(m));
126 break;
127 case MSG_MDS_EXPORTDIRACK:
128 handle_export_ack(ref_cast<MExportDirAck>(m));
129 break;
130 case MSG_MDS_EXPORTDIRNOTIFYACK:
131 handle_export_notify_ack(ref_cast<MExportDirNotifyAck>(m));
132 break;
133
134 // export 3rd party (dir_auth adjustments)
135 case MSG_MDS_EXPORTDIRNOTIFY:
136 handle_export_notify(ref_cast<MExportDirNotify>(m));
137 break;
138
139 // caps
140 case MSG_MDS_EXPORTCAPS:
141 handle_export_caps(ref_cast<MExportCaps>(m));
142 break;
143 case MSG_MDS_EXPORTCAPSACK:
144 handle_export_caps_ack(ref_cast<MExportCapsAck>(m));
145 break;
146 case MSG_MDS_GATHERCAPS:
147 handle_gather_caps(ref_cast<MGatherCaps>(m));
148 break;
149
150 default:
151 derr << "migrator unknown message " << m->get_type() << dendl;
152 ceph_abort_msg("migrator unknown message");
153 }
154 }
155
156
157 class C_MDC_EmptyImport : public MigratorContext {
158 CDir *dir;
159 public:
160 C_MDC_EmptyImport(Migrator *m, CDir *d) :
161 MigratorContext(m), dir(d) {
162 dir->get(CDir::PIN_PTRWAITER);
163 }
164 void finish(int r) override {
165 mig->export_empty_import(dir);
166 dir->put(CDir::PIN_PTRWAITER);
167 }
168 };
169
170
171 void Migrator::export_empty_import(CDir *dir)
172 {
173 dout(7) << *dir << dendl;
174 ceph_assert(dir->is_subtree_root());
175
176 if (dir->inode->is_auth()) {
177 dout(7) << " inode is auth" << dendl;
178 return;
179 }
180 if (!dir->is_auth()) {
181 dout(7) << " not auth" << dendl;
182 return;
183 }
184 if (dir->is_freezing() || dir->is_frozen()) {
185 dout(7) << " freezing or frozen" << dendl;
186 return;
187 }
188 if (dir->get_num_head_items() > 0) {
189 dout(7) << " not actually empty" << dendl;
190 return;
191 }
192 if (dir->inode->is_root()) {
193 dout(7) << " root" << dendl;
194 return;
195 }
196
197 mds_rank_t dest = dir->inode->authority().first;
198 //if (mds->is_shutting_down()) dest = 0; // this is more efficient.
199
200 dout(7) << " really empty, exporting to " << dest << dendl;
201 assert (dest != mds->get_nodeid());
202
203 dout(7) << "exporting to mds." << dest
204 << " empty import " << *dir << dendl;
205 export_dir( dir, dest );
206 }
207
208 void Migrator::find_stale_export_freeze()
209 {
210 utime_t now = ceph_clock_now();
211 utime_t cutoff = now;
212 cutoff -= g_conf()->mds_freeze_tree_timeout;
213
214
215 /*
216 * We could have situations like:
217 *
218 * - mds.0 authpins an item in subtree A
219 * - mds.0 sends request to mds.1 to authpin an item in subtree B
220 * - mds.0 freezes subtree A
221 * - mds.1 authpins an item in subtree B
222 * - mds.1 sends request to mds.0 to authpin an item in subtree A
223 * - mds.1 freezes subtree B
224 * - mds.1 receives the remote authpin request from mds.0
225 * (wait because subtree B is freezing)
226 * - mds.0 receives the remote authpin request from mds.1
227 * (wait because subtree A is freezing)
228 *
229 *
230 * - client request authpins items in subtree B
231 * - freeze subtree B
232 * - import subtree A which is parent of subtree B
233 * (authpins parent inode of subtree B, see CDir::set_dir_auth())
234 * - freeze subtree A
235 * - client request tries authpinning items in subtree A
236 * (wait because subtree A is freezing)
237 */
238 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
239 p != export_state.end(); ) {
240 CDir* dir = p->first;
241 export_state_t& stat = p->second;
242 ++p;
243 if (stat.state != EXPORT_DISCOVERING && stat.state != EXPORT_FREEZING)
244 continue;
245 ceph_assert(dir->freeze_tree_state);
246 if (stat.last_cum_auth_pins != dir->freeze_tree_state->auth_pins) {
247 stat.last_cum_auth_pins = dir->freeze_tree_state->auth_pins;
248 stat.last_cum_auth_pins_change = now;
249 continue;
250 }
251 if (stat.last_cum_auth_pins_change >= cutoff)
252 continue;
253 if (stat.num_remote_waiters > 0 ||
254 (!dir->inode->is_root() && dir->get_parent_dir()->is_freezing())) {
255 export_try_cancel(dir);
256 }
257 }
258 }
259
260 void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
261 {
262 dout(10) << *dir << dendl;
263
264 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
265 ceph_assert(it != export_state.end());
266
267 int state = it->second.state;
268 switch (state) {
269 case EXPORT_LOCKING:
270 dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
271 num_locking_exports--;
272 it->second.state = EXPORT_CANCELLED;
273 dir->auth_unpin(this);
274 break;
275 case EXPORT_DISCOVERING:
276 dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
277 it->second.state = EXPORT_CANCELLED;
278 dir->unfreeze_tree(); // cancel the freeze
279 dir->auth_unpin(this);
280 if (notify_peer &&
281 (!mds->is_cluster_degraded() ||
282 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
283 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
284 it->second.tid),
285 it->second.peer);
286 break;
287
288 case EXPORT_FREEZING:
289 dout(10) << "export state=freezing : canceling freeze" << dendl;
290 it->second.state = EXPORT_CANCELLED;
291 dir->unfreeze_tree(); // cancel the freeze
292 if (dir->is_subtree_root())
293 mdcache->try_subtree_merge(dir);
294 if (notify_peer &&
295 (!mds->is_cluster_degraded() ||
296 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
297 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
298 it->second.tid),
299 it->second.peer);
300 break;
301
302 // NOTE: state order reversal, warning comes after prepping
303 case EXPORT_WARNING:
304 dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
305 it->second.state = EXPORT_CANCELLING;
306 // fall-thru
307
308 case EXPORT_PREPPING:
309 if (state != EXPORT_WARNING) {
310 dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
311 it->second.state = EXPORT_CANCELLED;
312 }
313
314 {
315 // unpin bounds
316 set<CDir*> bounds;
317 mdcache->get_subtree_bounds(dir, bounds);
318 for (set<CDir*>::iterator q = bounds.begin();
319 q != bounds.end();
320 ++q) {
321 CDir *bd = *q;
322 bd->put(CDir::PIN_EXPORTBOUND);
323 bd->state_clear(CDir::STATE_EXPORTBOUND);
324 }
325 if (state == EXPORT_WARNING) {
326 // notify bystanders
327 export_notify_abort(dir, it->second, bounds);
328 // process delayed expires
329 mdcache->process_delayed_expire(dir);
330 }
331 }
332 dir->unfreeze_tree();
333 mdcache->try_subtree_merge(dir);
334 if (notify_peer &&
335 (!mds->is_cluster_degraded() ||
336 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
337 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
338 it->second.tid),
339 it->second.peer);
340 break;
341
342 case EXPORT_EXPORTING:
343 dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
344 it->second.state = EXPORT_CANCELLING;
345 export_reverse(dir, it->second);
346 break;
347
348 case EXPORT_LOGGINGFINISH:
349 case EXPORT_NOTIFYING:
350 dout(10) << "export state=loggingfinish|notifying : ignoring dest failure, we were successful." << dendl;
351 // leave export_state, don't clean up now.
352 break;
353 case EXPORT_CANCELLING:
354 break;
355
356 default:
357 ceph_abort();
358 }
359
360 // finish clean-up?
361 if (it->second.state == EXPORT_CANCELLING ||
362 it->second.state == EXPORT_CANCELLED) {
363 MutationRef mut;
364 mut.swap(it->second.mut);
365
366 if (it->second.state == EXPORT_CANCELLED) {
367 export_cancel_finish(it);
368 }
369
370 // drop locks
371 if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
372 MDRequestRef mdr = static_cast<MDRequestImpl*>(mut.get());
373 ceph_assert(mdr);
374 mdcache->request_kill(mdr);
375 } else if (mut) {
376 mds->locker->drop_locks(mut.get());
377 mut->cleanup();
378 }
379
380 mdcache->show_subtrees();
381
382 maybe_do_queued_export();
383 }
384 }
385
386 void Migrator::export_cancel_finish(export_state_iterator& it)
387 {
388 CDir *dir = it->first;
389 bool unpin = (it->second.state == EXPORT_CANCELLING);
390 auto parent = std::move(it->second.parent);
391
392 total_exporting_size -= it->second.approx_size;
393 export_state.erase(it);
394
395 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
396 dir->clear_exporting();
397
398 if (unpin) {
399 // pinned by Migrator::export_notify_abort()
400 dir->auth_unpin(this);
401 }
402 // send pending import_maps? (these need to go out when all exports have finished.)
403 mdcache->maybe_send_pending_resolves();
404
405 if (parent)
406 child_export_finish(parent, false);
407 }
408
409 // ==========================================================
410 // mds failure handling
411
412 void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
413 {
414 dout(5) << who << dendl;
415
416 // check my exports
417
418 // first add an extra auth_pin on any freezes, so that canceling a
419 // nested freeze doesn't complete one further up the hierarchy and
420 // confuse the shit out of us. we'll remove it after canceling the
421 // freeze. this way no freeze completions run before we want them
422 // to.
423 std::vector<CDir*> pinned_dirs;
424 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
425 p != export_state.end();
426 ++p) {
427 if (p->second.state == EXPORT_FREEZING) {
428 CDir *dir = p->first;
429 dout(10) << "adding temp auth_pin on freezing " << *dir << dendl;
430 dir->auth_pin(this);
431 pinned_dirs.push_back(dir);
432 }
433 }
434
435 map<CDir*,export_state_t>::iterator p = export_state.begin();
436 while (p != export_state.end()) {
437 map<CDir*,export_state_t>::iterator next = p;
438 ++next;
439 CDir *dir = p->first;
440
441 // abort exports:
442 // - that are going to the failed node
443 // - that aren't frozen yet (to avoid auth_pin deadlock)
444 // - they havne't prepped yet (they may need to discover bounds to do that)
445 if ((p->second.peer == who &&
446 p->second.state != EXPORT_CANCELLING) ||
447 p->second.state == EXPORT_LOCKING ||
448 p->second.state == EXPORT_DISCOVERING ||
449 p->second.state == EXPORT_FREEZING ||
450 p->second.state == EXPORT_PREPPING) {
451 // the guy i'm exporting to failed, or we're just freezing.
452 dout(10) << "cleaning up export state (" << p->second.state << ")"
453 << get_export_statename(p->second.state) << " of " << *dir << dendl;
454 export_try_cancel(dir);
455 } else if (p->second.peer != who) {
456 // bystander failed.
457 if (p->second.warning_ack_waiting.erase(who)) {
458 if (p->second.state == EXPORT_WARNING) {
459 p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
460 // exporter waiting for warning acks, let's fake theirs.
461 dout(10) << "faking export_warning_ack from mds." << who
462 << " on " << *dir << " to mds." << p->second.peer
463 << dendl;
464 if (p->second.warning_ack_waiting.empty())
465 export_go(dir);
466 }
467 }
468 if (p->second.notify_ack_waiting.erase(who)) {
469 // exporter is waiting for notify acks, fake it
470 dout(10) << "faking export_notify_ack from mds." << who
471 << " on " << *dir << " to mds." << p->second.peer
472 << dendl;
473 if (p->second.state == EXPORT_NOTIFYING) {
474 if (p->second.notify_ack_waiting.empty())
475 export_finish(dir);
476 } else if (p->second.state == EXPORT_CANCELLING) {
477 if (p->second.notify_ack_waiting.empty()) {
478 export_cancel_finish(p);
479 }
480 }
481 }
482 }
483
484 // next!
485 p = next;
486 }
487
488
489 // check my imports
490 map<dirfrag_t,import_state_t>::iterator q = import_state.begin();
491 while (q != import_state.end()) {
492 map<dirfrag_t,import_state_t>::iterator next = q;
493 ++next;
494 dirfrag_t df = q->first;
495 CInode *diri = mdcache->get_inode(df.ino);
496 CDir *dir = mdcache->get_dirfrag(df);
497
498 if (q->second.peer == who) {
499 if (dir)
500 dout(10) << "cleaning up import state (" << q->second.state << ")"
501 << get_import_statename(q->second.state) << " of " << *dir << dendl;
502 else
503 dout(10) << "cleaning up import state (" << q->second.state << ")"
504 << get_import_statename(q->second.state) << " of " << df << dendl;
505
506 switch (q->second.state) {
507 case IMPORT_DISCOVERING:
508 dout(10) << "import state=discovering : clearing state" << dendl;
509 import_reverse_discovering(df);
510 break;
511
512 case IMPORT_DISCOVERED:
513 ceph_assert(diri);
514 dout(10) << "import state=discovered : unpinning inode " << *diri << dendl;
515 import_reverse_discovered(df, diri);
516 break;
517
518 case IMPORT_PREPPING:
519 ceph_assert(dir);
520 dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
521 import_reverse_prepping(dir, q->second);
522 break;
523
524 case IMPORT_PREPPED:
525 ceph_assert(dir);
526 dout(10) << "import state=prepped : unpinning base+bounds, unfreezing " << *dir << dendl;
527 {
528 set<CDir*> bounds;
529 mdcache->get_subtree_bounds(dir, bounds);
530 import_remove_pins(dir, bounds);
531
532 // adjust auth back to the exporter
533 mdcache->adjust_subtree_auth(dir, q->second.peer);
534
535 // notify bystanders ; wait in aborting state
536 q->second.state = IMPORT_ABORTING;
537 import_notify_abort(dir, bounds);
538 ceph_assert(g_conf()->mds_kill_import_at != 10);
539 }
540 break;
541
542 case IMPORT_LOGGINGSTART:
543 ceph_assert(dir);
544 dout(10) << "import state=loggingstart : reversing import on " << *dir << dendl;
545 import_reverse(dir);
546 break;
547
548 case IMPORT_ACKING:
549 ceph_assert(dir);
550 // hrm. make this an ambiguous import, and wait for exporter recovery to disambiguate
551 dout(10) << "import state=acking : noting ambiguous import " << *dir << dendl;
552 {
553 set<CDir*> bounds;
554 mdcache->get_subtree_bounds(dir, bounds);
555 mdcache->add_ambiguous_import(dir, bounds);
556 }
557 break;
558
559 case IMPORT_FINISHING:
560 ceph_assert(dir);
561 dout(10) << "import state=finishing : finishing import on " << *dir << dendl;
562 import_finish(dir, true);
563 break;
564
565 case IMPORT_ABORTING:
566 ceph_assert(dir);
567 dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl;
568 break;
569 }
570 } else {
571 auto bystanders_entry = q->second.bystanders.find(who);
572 if (bystanders_entry != q->second.bystanders.end()) {
573 q->second.bystanders.erase(bystanders_entry);
574 if (q->second.state == IMPORT_ABORTING) {
575 ceph_assert(dir);
576 dout(10) << "faking export_notify_ack from mds." << who
577 << " on aborting import " << *dir << " from mds." << q->second.peer
578 << dendl;
579 if (q->second.bystanders.empty())
580 import_reverse_unfreeze(dir);
581 }
582 }
583 }
584
585 // next!
586 q = next;
587 }
588
589 for (const auto& dir : pinned_dirs) {
590 dout(10) << "removing temp auth_pin on " << *dir << dendl;
591 dir->auth_unpin(this);
592 }
593 }
594
595
596
597 void Migrator::show_importing()
598 {
599 dout(10) << dendl;
600 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
601 p != import_state.end();
602 ++p) {
603 CDir *dir = mdcache->get_dirfrag(p->first);
604 if (dir) {
605 dout(10) << " importing from " << p->second.peer
606 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
607 << " " << p->first << " " << *dir << dendl;
608 } else {
609 dout(10) << " importing from " << p->second.peer
610 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
611 << " " << p->first << dendl;
612 }
613 }
614 }
615
616 void Migrator::show_exporting()
617 {
618 dout(10) << dendl;
619 for (const auto& [dir, state] : export_state) {
620 dout(10) << " exporting to " << state.peer
621 << ": (" << state.state << ") " << get_export_statename(state.state)
622 << " " << dir->dirfrag() << " " << *dir << dendl;
623 }
624 }
625
626
627
628 void Migrator::audit()
629 {
630 if (!g_conf()->subsys.should_gather<ceph_subsys_mds, 5>())
631 return; // hrm.
632
633 // import_state
634 show_importing();
635 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
636 p != import_state.end();
637 ++p) {
638 if (p->second.state == IMPORT_DISCOVERING)
639 continue;
640 if (p->second.state == IMPORT_DISCOVERED) {
641 CInode *in = mdcache->get_inode(p->first.ino);
642 ceph_assert(in);
643 continue;
644 }
645 CDir *dir = mdcache->get_dirfrag(p->first);
646 ceph_assert(dir);
647 if (p->second.state == IMPORT_PREPPING)
648 continue;
649 if (p->second.state == IMPORT_ABORTING) {
650 ceph_assert(!dir->is_ambiguous_dir_auth());
651 ceph_assert(dir->get_dir_auth().first != mds->get_nodeid());
652 continue;
653 }
654 ceph_assert(dir->is_ambiguous_dir_auth());
655 ceph_assert(dir->authority().first == mds->get_nodeid() ||
656 dir->authority().second == mds->get_nodeid());
657 }
658
659 // export_state
660 show_exporting();
661 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
662 p != export_state.end();
663 ++p) {
664 CDir *dir = p->first;
665 if (p->second.state == EXPORT_LOCKING ||
666 p->second.state == EXPORT_DISCOVERING ||
667 p->second.state == EXPORT_FREEZING ||
668 p->second.state == EXPORT_CANCELLING)
669 continue;
670 ceph_assert(dir->is_ambiguous_dir_auth());
671 ceph_assert(dir->authority().first == mds->get_nodeid() ||
672 dir->authority().second == mds->get_nodeid());
673 }
674
675 // ambiguous+me subtrees should be importing|exporting
676
677 // write me
678 }
679
680
681
682
683
684 // ==========================================================
685 // EXPORT
686
687 void Migrator::export_dir_nicely(CDir *dir, mds_rank_t dest)
688 {
689 // enqueue
690 dout(7) << *dir << " to " << dest << dendl;
691 export_queue.push_back(pair<dirfrag_t,mds_rank_t>(dir->dirfrag(), dest));
692
693 maybe_do_queued_export();
694 }
695
696 void Migrator::maybe_do_queued_export()
697 {
698 static bool running;
699 if (running)
700 return;
701 running = true;
702
703 uint64_t max_total_size = max_export_size * 2;
704
705 while (!export_queue.empty() &&
706 max_total_size > total_exporting_size &&
707 max_total_size - total_exporting_size >=
708 max_export_size * (num_locking_exports + 1)) {
709
710 dirfrag_t df = export_queue.front().first;
711 mds_rank_t dest = export_queue.front().second;
712 export_queue.pop_front();
713
714 CDir *dir = mdcache->get_dirfrag(df);
715 if (!dir) continue;
716 if (!dir->is_auth()) continue;
717
718 dout(7) << "nicely exporting to mds." << dest << " " << *dir << dendl;
719
720 export_dir(dir, dest);
721 }
722
723 running = false;
724 }
725
726
727
728
729 class C_MDC_ExportFreeze : public MigratorContext {
730 CDir *dir; // dir i'm exporting
731 uint64_t tid;
732 public:
733 C_MDC_ExportFreeze(Migrator *m, CDir *e, uint64_t t) :
734 MigratorContext(m), dir(e), tid(t) {
735 dir->get(CDir::PIN_PTRWAITER);
736 }
737 void finish(int r) override {
738 if (r >= 0)
739 mig->export_frozen(dir, tid);
740 dir->put(CDir::PIN_PTRWAITER);
741 }
742 };
743
744
745 bool Migrator::export_try_grab_locks(CDir *dir, MutationRef& mut)
746 {
747 CInode *diri = dir->get_inode();
748
749 if (!diri->filelock.can_wrlock(diri->get_loner()) ||
750 !diri->nestlock.can_wrlock(diri->get_loner()))
751 return false;
752
753 MutationImpl::LockOpVec lov;
754
755 set<CDir*> wouldbe_bounds;
756 set<CInode*> bound_inodes;
757 mdcache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
758 for (auto& bound : wouldbe_bounds)
759 bound_inodes.insert(bound->get_inode());
760 for (auto& in : bound_inodes)
761 lov.add_rdlock(&in->dirfragtreelock);
762
763 lov.add_rdlock(&diri->dirfragtreelock);
764
765 CInode* in = diri;
766 while (true) {
767 lov.add_rdlock(&in->snaplock);
768 CDentry* pdn = in->get_projected_parent_dn();
769 if (!pdn)
770 break;
771 in = pdn->get_dir()->get_inode();
772 }
773
774 if (!mds->locker->rdlock_try_set(lov, mut))
775 return false;
776
777 mds->locker->wrlock_force(&diri->filelock, mut);
778 mds->locker->wrlock_force(&diri->nestlock, mut);
779
780 return true;
781 }
782
783
784 /** export_dir(dir, dest)
785 * public method to initiate an export.
786 * will fail if the directory is freezing, frozen, unpinnable, or root.
787 */
788 void Migrator::export_dir(CDir *dir, mds_rank_t dest)
789 {
790 ceph_assert(dir->is_auth());
791 ceph_assert(dest != mds->get_nodeid());
792
793 CDir* parent = dir->inode->get_projected_parent_dir();
794 if (!mds->is_stopping() && !dir->is_exportable(dest) && dir->get_num_head_items() > 0) {
795 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": dir is export pinned" << dendl;
796 return;
797 } else if (!(mds->is_active() || mds->is_stopping())) {
798 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": not active" << dendl;
799 return;
800 } else if (mdcache->is_readonly()) {
801 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": read-only FS, no exports for now" << dendl;
802 return;
803 } else if (!mds->mdsmap->is_active(dest)) {
804 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": destination not active" << dendl;
805 return;
806 } else if (mds->is_cluster_degraded()) {
807 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": cluster degraded" << dendl;
808 return;
809 } else if (dir->inode->is_system()) {
810 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is a system directory" << dendl;
811 return;
812 } else if (dir->is_frozen() || dir->is_freezing()) {
813 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is frozen" << dendl;
814 return;
815 } else if (dir->state_test(CDir::STATE_EXPORTING)) {
816 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": already exporting" << dendl;
817 return;
818 } else if (parent && parent->inode->is_stray()
819 && parent->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
820 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": in stray directory" << dendl;
821 return;
822 }
823
824 if (unlikely(g_conf()->mds_thrash_exports)) {
825 // create random subtree bound (which will not be exported)
826 std::vector<CDir*> ls;
827 for (auto p = dir->begin(); p != dir->end(); ++p) {
828 auto dn = p->second;
829 CDentry::linkage_t *dnl= dn->get_linkage();
830 if (dnl->is_primary()) {
831 CInode *in = dnl->get_inode();
832 if (in->is_dir()) {
833 auto&& dirs = in->get_nested_dirfrags();
834 ls.insert(std::end(ls), std::begin(dirs), std::end(dirs));
835 }
836 }
837 }
838 if (ls.size() > 0) {
839 int n = rand() % ls.size();
840 auto p = ls.begin();
841 while (n--) ++p;
842 CDir *bd = *p;
843 if (!(bd->is_frozen() || bd->is_freezing())) {
844 ceph_assert(bd->is_auth());
845 dir->state_set(CDir::STATE_AUXSUBTREE);
846 mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
847 dout(7) << "create aux subtree " << *bd << " under " << *dir << dendl;
848 }
849 }
850 }
851
852 dout(4) << "Starting export to mds." << dest << " " << *dir << dendl;
853
854 mds->hit_export_target(dest, -1);
855
856 dir->auth_pin(this);
857 dir->mark_exporting();
858
859 MDRequestRef mdr = mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
860 mdr->more()->export_dir = dir;
861 mdr->pin(dir);
862
863 ceph_assert(export_state.count(dir) == 0);
864 export_state_t& stat = export_state[dir];
865 num_locking_exports++;
866 stat.state = EXPORT_LOCKING;
867 stat.peer = dest;
868 stat.tid = mdr->reqid.tid;
869 stat.mut = mdr;
870
871 mdcache->dispatch_request(mdr);
872 }
873
874 /*
875 * check if directory is too large to be export in whole. If it is,
876 * choose some subdirs, whose total size is suitable.
877 */
878 void Migrator::maybe_split_export(CDir* dir, uint64_t max_size, bool null_okay,
879 vector<pair<CDir*, size_t> >& results)
880 {
881 static const unsigned frag_size = 800;
882 static const unsigned inode_size = 1000;
883 static const unsigned cap_size = 80;
884 static const unsigned remote_size = 10;
885 static const unsigned null_size = 1;
886
887 // state for depth-first search
888 struct LevelData {
889 CDir *dir;
890 CDir::dentry_key_map::iterator iter;
891 size_t dirfrag_size = frag_size;
892 size_t subdirs_size = 0;
893 bool complete = true;
894 vector<CDir*> siblings;
895 vector<pair<CDir*, size_t> > subdirs;
896 LevelData(const LevelData&) = default;
897 LevelData(CDir *d) :
898 dir(d), iter(d->begin()) {}
899 };
900
901 vector<LevelData> stack;
902 stack.emplace_back(dir);
903
904 size_t found_size = 0;
905 size_t skipped_size = 0;
906
907 for (;;) {
908 auto& data = stack.back();
909 CDir *cur = data.dir;
910 auto& it = data.iter;
911 auto& dirfrag_size = data.dirfrag_size;
912
913 while(it != cur->end()) {
914 CDentry *dn = it->second;
915 ++it;
916
917 dirfrag_size += dn->name.size();
918 if (dn->get_linkage()->is_null()) {
919 dirfrag_size += null_size;
920 continue;
921 }
922 if (dn->get_linkage()->is_remote()) {
923 dirfrag_size += remote_size;
924 continue;
925 }
926
927 CInode *in = dn->get_linkage()->get_inode();
928 dirfrag_size += inode_size;
929 dirfrag_size += in->get_client_caps().size() * cap_size;
930
931 if (in->is_dir()) {
932 auto ls = in->get_nested_dirfrags();
933 std::reverse(ls.begin(), ls.end());
934
935 bool complete = true;
936 for (auto p = ls.begin(); p != ls.end(); ) {
937 if ((*p)->state_test(CDir::STATE_EXPORTING) ||
938 (*p)->is_freezing_dir() || (*p)->is_frozen_dir()) {
939 complete = false;
940 p = ls.erase(p);
941 } else {
942 ++p;
943 }
944 }
945 if (!complete) {
946 // skip exporting dir's ancestors. because they can't get
947 // frozen (exporting dir's parent inode is auth pinned).
948 for (auto p = stack.rbegin(); p < stack.rend(); ++p) {
949 if (!p->complete)
950 break;
951 p->complete = false;
952 }
953 }
954 if (!ls.empty()) {
955 stack.emplace_back(ls.back());
956 ls.pop_back();
957 stack.back().siblings.swap(ls);
958 break;
959 }
960 }
961 }
962 // did above loop push new dirfrag into the stack?
963 if (stack.back().dir != cur)
964 continue;
965
966 if (data.complete) {
967 auto cur_size = data.subdirs_size + dirfrag_size;
968 // we can do nothing with large dirfrag
969 if (cur_size >= max_size && found_size * 2 > max_size)
970 break;
971
972 found_size += dirfrag_size;
973
974 if (stack.size() > 1) {
975 auto& parent = stack[stack.size() - 2];
976 parent.subdirs.emplace_back(cur, cur_size);
977 parent.subdirs_size += cur_size;
978 }
979 } else {
980 // can't merge current dirfrag to its parent if there is skipped subdir
981 results.insert(results.end(), data.subdirs.begin(), data.subdirs.end());
982 skipped_size += dirfrag_size;
983 }
984
985 vector<CDir*> ls;
986 ls.swap(data.siblings);
987
988 stack.pop_back();
989 if (stack.empty())
990 break;
991
992 if (found_size >= max_size)
993 break;
994
995 // next dirfrag
996 if (!ls.empty()) {
997 stack.emplace_back(ls.back());
998 ls.pop_back();
999 stack.back().siblings.swap(ls);
1000 }
1001 }
1002
1003 for (auto& p : stack)
1004 results.insert(results.end(), p.subdirs.begin(), p.subdirs.end());
1005
1006 if (results.empty() && (!skipped_size || !null_okay))
1007 results.emplace_back(dir, found_size + skipped_size);
1008 }
1009
1010 class C_M_ExportDirWait : public MigratorContext {
1011 MDRequestRef mdr;
1012 int count;
1013 public:
1014 C_M_ExportDirWait(Migrator *m, MDRequestRef mdr, int count)
1015 : MigratorContext(m), mdr(mdr), count(count) {}
1016 void finish(int r) override {
1017 mig->dispatch_export_dir(mdr, count);
1018 }
1019 };
1020
1021 void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
1022 {
1023 CDir *dir = mdr->more()->export_dir;
1024 dout(7) << *mdr << " " << *dir << dendl;
1025
1026 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1027 if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
1028 // export must have aborted.
1029 dout(7) << "export must have aborted " << *mdr << dendl;
1030 ceph_assert(mdr->killed || mdr->aborted);
1031 if (mdr->aborted) {
1032 mdr->aborted = false;
1033 mdcache->request_kill(mdr);
1034 }
1035 return;
1036 }
1037 ceph_assert(it->second.state == EXPORT_LOCKING);
1038
1039 if (mdr->more()->peer_error || dir->is_frozen() || dir->is_freezing()) {
1040 dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
1041 export_try_cancel(dir);
1042 return;
1043 }
1044
1045 mds_rank_t dest = it->second.peer;
1046 if (!mds->is_export_target(dest)) {
1047 dout(7) << "dest is not yet an export target" << dendl;
1048 if (count > 3) {
1049 dout(7) << "dest has not been added as export target after three MDSMap epochs, canceling export" << dendl;
1050 export_try_cancel(dir);
1051 return;
1052 }
1053
1054 mds->locker->drop_locks(mdr.get());
1055 mdr->drop_local_auth_pins();
1056
1057 mds->wait_for_mdsmap(mds->mdsmap->get_epoch(), new C_M_ExportDirWait(this, mdr, count+1));
1058 return;
1059 }
1060
1061 if (!dir->inode->get_parent_dn()) {
1062 dout(7) << "waiting for dir to become stable before export: " << *dir << dendl;
1063 dir->add_waiter(CDir::WAIT_CREATED, new C_M_ExportDirWait(this, mdr, 1));
1064 return;
1065 }
1066
1067 // locks?
1068 if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
1069 MutationImpl::LockOpVec lov;
1070 // If auth MDS of the subtree root inode is neither the exporter MDS
1071 // nor the importer MDS and it gathers subtree root's fragstat/neststat
1072 // while the subtree is exporting. It's possible that the exporter MDS
1073 // and the importer MDS both are auth MDS of the subtree root or both
1074 // are not auth MDS of the subtree root at the time they receive the
1075 // lock messages. So the auth MDS of the subtree root inode may get no
1076 // or duplicated fragstat/neststat for the subtree root dirfrag.
1077 lov.lock_scatter_gather(&dir->get_inode()->filelock);
1078 lov.lock_scatter_gather(&dir->get_inode()->nestlock);
1079 if (dir->get_inode()->is_auth()) {
1080 dir->get_inode()->filelock.set_scatter_wanted();
1081 dir->get_inode()->nestlock.set_scatter_wanted();
1082 }
1083 lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
1084
1085 if (!mds->locker->acquire_locks(mdr, lov, nullptr, true)) {
1086 if (mdr->aborted)
1087 export_try_cancel(dir);
1088 return;
1089 }
1090
1091 lov.clear();
1092 // bound dftlocks:
1093 // NOTE: We need to take an rdlock on bounding dirfrags during
1094 // migration for a rather irritating reason: when we export the
1095 // bound inode, we need to send scatterlock state for the dirfrags
1096 // as well, so that the new auth also gets the correct info. If we
1097 // race with a refragment, this info is useless, as we can't
1098 // redivvy it up. And it's needed for the scatterlocks to work
1099 // properly: when the auth is in a sync/lock state it keeps each
1100 // dirfrag's portion in the local (auth OR replica) dirfrag.
1101 set<CDir*> wouldbe_bounds;
1102 set<CInode*> bound_inodes;
1103 mdcache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
1104 for (auto& bound : wouldbe_bounds)
1105 bound_inodes.insert(bound->get_inode());
1106 for (auto& in : bound_inodes)
1107 lov.add_rdlock(&in->dirfragtreelock);
1108
1109 if (!mds->locker->rdlock_try_set(lov, mdr))
1110 return;
1111
1112 if (!mds->locker->try_rdlock_snap_layout(dir->get_inode(), mdr))
1113 return;
1114
1115 mdr->locking_state |= MutationImpl::ALL_LOCKED;
1116 }
1117
1118 ceph_assert(g_conf()->mds_kill_export_at != 1);
1119
1120 auto parent = it->second.parent;
1121
1122 vector<pair<CDir*, size_t> > results;
1123 maybe_split_export(dir, max_export_size, (bool)parent, results);
1124
1125 if (results.size() == 1 && results.front().first == dir) {
1126 num_locking_exports--;
1127 it->second.state = EXPORT_DISCOVERING;
1128 // send ExportDirDiscover (ask target)
1129 filepath path;
1130 dir->inode->make_path(path);
1131 auto discover = make_message<MExportDirDiscover>(dir->dirfrag(), path,
1132 mds->get_nodeid(),
1133 it->second.tid);
1134 mds->send_message_mds(discover, dest);
1135 ceph_assert(g_conf()->mds_kill_export_at != 2);
1136
1137 it->second.last_cum_auth_pins_change = ceph_clock_now();
1138 it->second.approx_size = results.front().second;
1139 total_exporting_size += it->second.approx_size;
1140
1141 // start the freeze, but hold it up with an auth_pin.
1142 dir->freeze_tree();
1143 ceph_assert(dir->is_freezing_tree());
1144 dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir, it->second.tid));
1145 return;
1146 }
1147
1148 if (parent) {
1149 parent->pending_children += results.size();
1150 } else {
1151 parent = std::make_shared<export_base_t>(dir->dirfrag(), dest,
1152 results.size(), export_queue_gen);
1153 }
1154
1155 if (results.empty()) {
1156 dout(7) << "subtree's children all are under exporting, retry rest parts of parent export "
1157 << parent->dirfrag << dendl;
1158 parent->restart = true;
1159 } else {
1160 dout(7) << "subtree is too large, splitting it into: " << dendl;
1161 }
1162
1163 for (auto& p : results) {
1164 CDir *sub = p.first;
1165 ceph_assert(sub != dir);
1166 dout(7) << " sub " << *sub << dendl;
1167
1168 sub->auth_pin(this);
1169 sub->mark_exporting();
1170
1171 MDRequestRef _mdr = mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
1172 _mdr->more()->export_dir = sub;
1173 _mdr->pin(sub);
1174
1175 ceph_assert(export_state.count(sub) == 0);
1176 auto& stat = export_state[sub];
1177 num_locking_exports++;
1178 stat.state = EXPORT_LOCKING;
1179 stat.peer = dest;
1180 stat.tid = _mdr->reqid.tid;
1181 stat.mut = _mdr;
1182 stat.parent = parent;
1183 mdcache->dispatch_request(_mdr);
1184 }
1185
1186 // cancel the original one
1187 export_try_cancel(dir);
1188 }
1189
1190 void Migrator::child_export_finish(std::shared_ptr<export_base_t>& parent, bool success)
1191 {
1192 if (success)
1193 parent->restart = true;
1194 if (--parent->pending_children == 0) {
1195 if (parent->restart &&
1196 parent->export_queue_gen == export_queue_gen) {
1197 CDir *origin = mdcache->get_dirfrag(parent->dirfrag);
1198 if (origin && origin->is_auth()) {
1199 dout(7) << "child_export_finish requeue " << *origin << dendl;
1200 export_queue.emplace_front(origin->dirfrag(), parent->dest);
1201 }
1202 }
1203 }
1204 }
1205
1206 /*
1207 * called on receipt of MExportDirDiscoverAck
1208 * the importer now has the directory's _inode_ in memory, and pinned.
1209 */
1210 void Migrator::handle_export_discover_ack(const cref_t<MExportDirDiscoverAck> &m)
1211 {
1212 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
1213 mds_rank_t dest(m->get_source().num());
1214 ceph_assert(dir);
1215
1216 dout(7) << "from " << m->get_source()
1217 << " on " << *dir << dendl;
1218
1219 mds->hit_export_target(dest, -1);
1220
1221 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1222 if (it == export_state.end() ||
1223 it->second.tid != m->get_tid() ||
1224 it->second.peer != dest) {
1225 dout(7) << "must have aborted" << dendl;
1226 } else {
1227 ceph_assert(it->second.state == EXPORT_DISCOVERING);
1228
1229 if (m->is_success()) {
1230 // release locks to avoid deadlock
1231 MDRequestRef mdr = static_cast<MDRequestImpl*>(it->second.mut.get());
1232 ceph_assert(mdr);
1233 mdcache->request_finish(mdr);
1234 it->second.mut.reset();
1235 // freeze the subtree
1236 it->second.state = EXPORT_FREEZING;
1237 dir->auth_unpin(this);
1238 ceph_assert(g_conf()->mds_kill_export_at != 3);
1239
1240 } else {
1241 dout(7) << "peer failed to discover (not active?), canceling" << dendl;
1242 export_try_cancel(dir, false);
1243 }
1244 }
1245 }
1246
1247 class C_M_ExportSessionsFlushed : public MigratorContext {
1248 CDir *dir;
1249 uint64_t tid;
1250 public:
1251 C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t) :
1252 MigratorContext(m), dir(d), tid(t) {
1253 dir->get(CDir::PIN_PTRWAITER);
1254 }
1255 void finish(int r) override {
1256 mig->export_sessions_flushed(dir, tid);
1257 dir->put(CDir::PIN_PTRWAITER);
1258 }
1259 };
1260
1261 void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
1262 {
1263 dout(7) << *dir << dendl;
1264
1265 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1266 if (it == export_state.end() ||
1267 it->second.state == EXPORT_CANCELLING ||
1268 it->second.tid != tid) {
1269 // export must have aborted.
1270 dout(7) << "export must have aborted on " << dir << dendl;
1271 return;
1272 }
1273
1274 ceph_assert(it->second.state == EXPORT_PREPPING || it->second.state == EXPORT_WARNING);
1275 ceph_assert(it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0);
1276 it->second.warning_ack_waiting.erase(MDS_RANK_NONE);
1277 if (it->second.state == EXPORT_WARNING && it->second.warning_ack_waiting.empty())
1278 export_go(dir); // start export.
1279 }
1280
1281 void Migrator::encode_export_prep_trace(bufferlist &final_bl, CDir *bound,
1282 CDir *dir, export_state_t &es,
1283 set<inodeno_t> &inodes_added,
1284 set<dirfrag_t> &dirfrags_added)
1285 {
1286 ENCODE_START(1, 1, final_bl);
1287
1288 dout(7) << " started to encode dir " << *bound << dendl;
1289 CDir *cur = bound;
1290 bufferlist tracebl;
1291 char start = '-';
1292
1293 while (1) {
1294 // don't repeat inodes
1295 if (inodes_added.count(cur->inode->ino()))
1296 break;
1297 inodes_added.insert(cur->inode->ino());
1298
1299 // prepend dentry + inode
1300 ceph_assert(cur->inode->is_auth());
1301 bufferlist bl;
1302 mdcache->encode_replica_dentry(cur->inode->parent, es.peer, bl);
1303 dout(7) << " added " << *cur->inode->parent << dendl;
1304 mdcache->encode_replica_inode(cur->inode, es.peer, bl, mds->mdsmap->get_up_features());
1305 dout(7) << " added " << *cur->inode << dendl;
1306 bl.claim_append(tracebl);
1307 tracebl = std::move(bl);
1308
1309 cur = cur->get_parent_dir();
1310 // don't repeat dirfrags
1311 if (dirfrags_added.count(cur->dirfrag()) || cur == dir) {
1312 start = 'd'; // start with dentry
1313 break;
1314 }
1315 dirfrags_added.insert(cur->dirfrag());
1316
1317 // prepend dir
1318 mdcache->encode_replica_dir(cur, es.peer, bl);
1319 dout(7) << " added " << *cur << dendl;
1320 bl.claim_append(tracebl);
1321 tracebl = std::move(bl);
1322 start = 'f'; // start with dirfrag
1323 }
1324 dirfrag_t df = cur->dirfrag();
1325 encode(df, final_bl);
1326 encode(start, final_bl);
1327 final_bl.claim_append(tracebl);
1328
1329 ENCODE_FINISH(final_bl);
1330 }
1331
1332 void Migrator::export_frozen(CDir *dir, uint64_t tid)
1333 {
1334 dout(7) << *dir << dendl;
1335
1336 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1337 if (it == export_state.end() || it->second.tid != tid) {
1338 dout(7) << "export must have aborted" << dendl;
1339 return;
1340 }
1341
1342 ceph_assert(it->second.state == EXPORT_FREEZING);
1343 ceph_assert(dir->is_frozen_tree_root());
1344
1345 it->second.mut = new MutationImpl();
1346
1347 // ok, try to grab all my locks.
1348 CInode *diri = dir->get_inode();
1349 if ((diri->is_auth() && diri->is_frozen()) ||
1350 !export_try_grab_locks(dir, it->second.mut)) {
1351 dout(7) << "export_dir couldn't acquire all needed locks, failing. "
1352 << *dir << dendl;
1353 export_try_cancel(dir);
1354 return;
1355 }
1356
1357 if (diri->is_auth())
1358 it->second.mut->auth_pin(diri);
1359
1360 mdcache->show_subtrees();
1361
1362 // CDir::_freeze_tree() should have forced it into subtree.
1363 ceph_assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
1364 // note the bounds.
1365 set<CDir*> bounds;
1366 mdcache->get_subtree_bounds(dir, bounds);
1367
1368 // generate prep message, log entry.
1369 auto prep = make_message<MExportDirPrep>(dir->dirfrag(), it->second.tid);
1370
1371 // include list of bystanders
1372 for (const auto &p : dir->get_replicas()) {
1373 if (p.first != it->second.peer) {
1374 dout(10) << "bystander mds." << p.first << dendl;
1375 prep->add_bystander(p.first);
1376 }
1377 }
1378
1379 // include base dirfrag
1380 mdcache->encode_replica_dir(dir, it->second.peer, prep->basedir);
1381
1382 /*
1383 * include spanning tree for all nested exports.
1384 * these need to be on the destination _before_ the final export so that
1385 * dir_auth updates on any nested exports are properly absorbed.
1386 * this includes inodes and dirfrags included in the subtree, but
1387 * only the inodes at the bounds.
1388 *
1389 * each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
1390 */
1391 set<inodeno_t> inodes_added;
1392 set<dirfrag_t> dirfrags_added;
1393
1394 // check bounds
1395 for (auto &bound : bounds){
1396 // pin it.
1397 bound->get(CDir::PIN_EXPORTBOUND);
1398 bound->state_set(CDir::STATE_EXPORTBOUND);
1399
1400 dout(7) << " export bound " << *bound << dendl;
1401 prep->add_bound( bound->dirfrag() );
1402
1403 bufferlist final_bl;
1404 encode_export_prep_trace(final_bl, bound, dir, it->second, inodes_added, dirfrags_added);
1405 prep->add_trace(final_bl);
1406 }
1407
1408 // send.
1409 it->second.state = EXPORT_PREPPING;
1410 mds->send_message_mds(prep, it->second.peer);
1411 ceph_assert(g_conf()->mds_kill_export_at != 4);
1412
1413 // make sure any new instantiations of caps are flushed out
1414 ceph_assert(it->second.warning_ack_waiting.empty());
1415
1416 set<client_t> export_client_set;
1417 get_export_client_set(dir, export_client_set);
1418
1419 MDSGatherBuilder gather(g_ceph_context);
1420 mds->server->flush_client_sessions(export_client_set, gather);
1421 if (gather.has_subs()) {
1422 it->second.warning_ack_waiting.insert(MDS_RANK_NONE);
1423 gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
1424 gather.activate();
1425 }
1426 }
1427
1428 void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
1429 {
1430 deque<CDir*> dfs;
1431 dfs.push_back(dir);
1432 while (!dfs.empty()) {
1433 CDir *dir = dfs.front();
1434 dfs.pop_front();
1435 for (auto& p : *dir) {
1436 CDentry *dn = p.second;
1437 if (!dn->get_linkage()->is_primary())
1438 continue;
1439 CInode *in = dn->get_linkage()->get_inode();
1440 if (in->is_dir()) {
1441 // directory?
1442 auto&& ls = in->get_dirfrags();
1443 for (auto& q : ls) {
1444 if (!q->state_test(CDir::STATE_EXPORTBOUND)) {
1445 // include nested dirfrag
1446 ceph_assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
1447 dfs.push_back(q); // it's ours, recurse (later)
1448 }
1449 }
1450 }
1451 for (auto& q : in->get_client_caps()) {
1452 client_set.insert(q.first);
1453 }
1454 }
1455 }
1456 }
1457
1458 void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
1459 {
1460 for (const auto &p : in->get_client_caps()) {
1461 client_set.insert(p.first);
1462 }
1463 }
1464
1465 void Migrator::handle_export_prep_ack(const cref_t<MExportDirPrepAck> &m)
1466 {
1467 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
1468 mds_rank_t dest(m->get_source().num());
1469 ceph_assert(dir);
1470
1471 dout(7) << *dir << dendl;
1472
1473 mds->hit_export_target(dest, -1);
1474
1475 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1476 if (it == export_state.end() ||
1477 it->second.tid != m->get_tid() ||
1478 it->second.peer != mds_rank_t(m->get_source().num())) {
1479 // export must have aborted.
1480 dout(7) << "export must have aborted" << dendl;
1481 return;
1482 }
1483 ceph_assert(it->second.state == EXPORT_PREPPING);
1484
1485 if (!m->is_success()) {
1486 dout(7) << "peer couldn't acquire all needed locks or wasn't active, canceling" << dendl;
1487 export_try_cancel(dir, false);
1488 return;
1489 }
1490
1491 ceph_assert(g_conf()->mds_kill_export_at != 5);
1492 // send warnings
1493 set<CDir*> bounds;
1494 mdcache->get_subtree_bounds(dir, bounds);
1495
1496 ceph_assert(it->second.warning_ack_waiting.empty() ||
1497 (it->second.warning_ack_waiting.size() == 1 &&
1498 it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0));
1499 ceph_assert(it->second.notify_ack_waiting.empty());
1500
1501 for (const auto &p : dir->get_replicas()) {
1502 if (p.first == it->second.peer) continue;
1503 if (mds->is_cluster_degraded() &&
1504 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first))
1505 continue; // only if active
1506 it->second.warning_ack_waiting.insert(p.first);
1507 it->second.notify_ack_waiting.insert(p.first); // we'll eventually get a notifyack, too!
1508
1509 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), it->second.tid, true,
1510 mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
1511 mds_authority_t(mds->get_nodeid(),it->second.peer));
1512 for (auto &cdir : bounds) {
1513 notify->get_bounds().push_back(cdir->dirfrag());
1514 }
1515 mds->send_message_mds(notify, p.first);
1516
1517 }
1518
1519 it->second.state = EXPORT_WARNING;
1520
1521 ceph_assert(g_conf()->mds_kill_export_at != 6);
1522 // nobody to warn?
1523 if (it->second.warning_ack_waiting.empty())
1524 export_go(dir); // start export.
1525 }
1526
1527
1528 class C_M_ExportGo : public MigratorContext {
1529 CDir *dir;
1530 uint64_t tid;
1531 public:
1532 C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
1533 MigratorContext(m), dir(d), tid(t) {
1534 dir->get(CDir::PIN_PTRWAITER);
1535 }
1536 void finish(int r) override {
1537 mig->export_go_synced(dir, tid);
1538 dir->put(CDir::PIN_PTRWAITER);
1539 }
1540 };
1541
1542 void Migrator::export_go(CDir *dir)
1543 {
1544 auto it = export_state.find(dir);
1545 ceph_assert(it != export_state.end());
1546 dout(7) << *dir << " to " << it->second.peer << dendl;
1547
1548 // first sync log to flush out e.g. any cap imports
1549 mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, it->second.tid));
1550 mds->mdlog->flush();
1551 }
1552
1553 void Migrator::export_go_synced(CDir *dir, uint64_t tid)
1554 {
1555 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1556 if (it == export_state.end() ||
1557 it->second.state == EXPORT_CANCELLING ||
1558 it->second.tid != tid) {
1559 // export must have aborted.
1560 dout(7) << "export must have aborted on " << dir << dendl;
1561 return;
1562 }
1563 ceph_assert(it->second.state == EXPORT_WARNING);
1564 mds_rank_t dest = it->second.peer;
1565
1566 dout(7) << *dir << " to " << dest << dendl;
1567
1568 mdcache->show_subtrees();
1569
1570 it->second.state = EXPORT_EXPORTING;
1571 ceph_assert(g_conf()->mds_kill_export_at != 7);
1572
1573 ceph_assert(dir->is_frozen_tree_root());
1574
1575 // set ambiguous auth
1576 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), dest);
1577
1578 // take away the popularity we're sending.
1579 mds->balancer->subtract_export(dir);
1580
1581 // fill export message with cache data
1582 auto req = make_message<MExportDir>(dir->dirfrag(), it->second.tid);
1583 map<client_t,entity_inst_t> exported_client_map;
1584 map<client_t,client_metadata_t> exported_client_metadata_map;
1585 uint64_t num_exported_inodes = 0;
1586 encode_export_dir(req->export_data, dir, // recur start point
1587 exported_client_map, exported_client_metadata_map,
1588 num_exported_inodes);
1589 encode(exported_client_map, req->client_map, mds->mdsmap->get_up_features());
1590 encode(exported_client_metadata_map, req->client_map);
1591
1592 // add bounds to message
1593 set<CDir*> bounds;
1594 mdcache->get_subtree_bounds(dir, bounds);
1595 for (set<CDir*>::iterator p = bounds.begin();
1596 p != bounds.end();
1597 ++p)
1598 req->add_export((*p)->dirfrag());
1599
1600 // send
1601 mds->send_message_mds(req, dest);
1602 ceph_assert(g_conf()->mds_kill_export_at != 8);
1603
1604 mds->hit_export_target(dest, num_exported_inodes+1);
1605
1606 // stats
1607 if (mds->logger) mds->logger->inc(l_mds_exported);
1608 if (mds->logger) mds->logger->inc(l_mds_exported_inodes, num_exported_inodes);
1609
1610 mdcache->show_subtrees();
1611 }
1612
1613
1614 /** encode_export_inode
1615 * update our local state for this inode to export.
1616 * encode relevant state to be sent over the wire.
1617 * used by: encode_export_dir, file_rename (if foreign)
1618 *
1619 * FIXME: the separation between CInode.encode_export and these methods
1620 * is pretty arbitrary and dumb.
1621 */
1622 void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
1623 map<client_t,entity_inst_t>& exported_client_map,
1624 map<client_t,client_metadata_t>& exported_client_metadata_map)
1625 {
1626 ENCODE_START(1, 1, enc_state);
1627 dout(7) << *in << dendl;
1628 ceph_assert(!in->is_replica(mds->get_nodeid()));
1629
1630 encode(in->ino(), enc_state);
1631 encode(in->last, enc_state);
1632 in->encode_export(enc_state);
1633
1634 // caps
1635 encode_export_inode_caps(in, true, enc_state, exported_client_map, exported_client_metadata_map);
1636 ENCODE_FINISH(enc_state);
1637 }
1638
1639 void Migrator::encode_export_inode_caps(CInode *in, bool auth_cap, bufferlist& bl,
1640 map<client_t,entity_inst_t>& exported_client_map,
1641 map<client_t,client_metadata_t>& exported_client_metadata_map)
1642 {
1643 ENCODE_START(1, 1, bl);
1644 dout(20) << *in << dendl;
1645 // encode caps
1646 map<client_t,Capability::Export> cap_map;
1647 in->export_client_caps(cap_map);
1648 encode(cap_map, bl);
1649 if (auth_cap) {
1650 encode(in->get_mds_caps_wanted(), bl);
1651
1652 in->state_set(CInode::STATE_EXPORTINGCAPS);
1653 in->get(CInode::PIN_EXPORTINGCAPS);
1654 }
1655
1656 // make note of clients named by exported capabilities
1657 for (const auto &p : in->get_client_caps()) {
1658 if (exported_client_map.count(p.first))
1659 continue;
1660 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p.first.v));
1661 exported_client_map[p.first] = session->info.inst;
1662 exported_client_metadata_map[p.first] = session->info.client_metadata;
1663 }
1664 ENCODE_FINISH(bl);
1665 }
1666
1667 void Migrator::finish_export_inode_caps(CInode *in, mds_rank_t peer,
1668 map<client_t,Capability::Import>& peer_imported)
1669 {
1670 dout(20) << *in << dendl;
1671
1672 in->state_clear(CInode::STATE_EXPORTINGCAPS);
1673 in->put(CInode::PIN_EXPORTINGCAPS);
1674
1675 // tell (all) clients about migrating caps..
1676 for (const auto &p : in->get_client_caps()) {
1677 const Capability *cap = &p.second;
1678 dout(7) << p.first
1679 << " exported caps on " << *in << dendl;
1680 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
1681 cap->get_cap_id(), cap->get_mseq(),
1682 mds->get_osd_epoch_barrier());
1683 map<client_t,Capability::Import>::iterator q = peer_imported.find(p.first);
1684 ceph_assert(q != peer_imported.end());
1685 m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
1686 (q->second.cap_id > 0 ? peer : -1), 0);
1687 mds->send_message_client_counted(m, p.first);
1688 }
1689 in->clear_client_caps_after_export();
1690 mds->locker->eval(in, CEPH_CAP_LOCKS);
1691 }
1692
1693 void Migrator::finish_export_inode(CInode *in, mds_rank_t peer,
1694 map<client_t,Capability::Import>& peer_imported,
1695 MDSContext::vec& finished)
1696 {
1697 dout(12) << *in << dendl;
1698
1699 // clean
1700 if (in->is_dirty())
1701 in->mark_clean();
1702
1703 // clear/unpin cached_by (we're no longer the authority)
1704 in->clear_replica_map();
1705
1706 // twiddle lock states for auth -> replica transition
1707 in->authlock.export_twiddle();
1708 in->linklock.export_twiddle();
1709 in->dirfragtreelock.export_twiddle();
1710 in->filelock.export_twiddle();
1711 in->nestlock.export_twiddle();
1712 in->xattrlock.export_twiddle();
1713 in->snaplock.export_twiddle();
1714 in->flocklock.export_twiddle();
1715 in->policylock.export_twiddle();
1716
1717 // mark auth
1718 ceph_assert(in->is_auth());
1719 in->state_clear(CInode::STATE_AUTH);
1720 in->replica_nonce = CInode::EXPORT_NONCE;
1721
1722 in->clear_dirty_rstat();
1723
1724 // no more auth subtree? clear scatter dirty
1725 if (!in->has_subtree_root_dirfrag(mds->get_nodeid()))
1726 in->clear_scatter_dirty();
1727
1728 in->clear_dirty_parent();
1729
1730 in->clear_clientwriteable();
1731
1732 in->clear_file_locks();
1733
1734 // waiters
1735 in->take_waiting(CInode::WAIT_ANY_MASK, finished);
1736
1737 in->finish_export();
1738
1739 finish_export_inode_caps(in, peer, peer_imported);
1740 }
1741
1742 void Migrator::encode_export_dir(bufferlist& exportbl,
1743 CDir *dir,
1744 map<client_t,entity_inst_t>& exported_client_map,
1745 map<client_t,client_metadata_t>& exported_client_metadata_map,
1746 uint64_t &num_exported)
1747 {
1748 // This has to be declared before ENCODE_STARTED as it will need to be referenced after ENCODE_FINISH.
1749 std::vector<CDir*> subdirs;
1750
1751 ENCODE_START(1, 1, exportbl);
1752 dout(7) << *dir << " " << dir->get_num_head_items() << " head items" << dendl;
1753
1754 ceph_assert(dir->get_projected_version() == dir->get_version());
1755
1756 #ifdef MDS_VERIFY_FRAGSTAT
1757 if (dir->is_complete())
1758 dir->verify_fragstat();
1759 #endif
1760
1761 // dir
1762 dirfrag_t df = dir->dirfrag();
1763 encode(df, exportbl);
1764 dir->encode_export(exportbl);
1765
1766 __u32 nden = dir->items.size();
1767 encode(nden, exportbl);
1768
1769 // dentries
1770 for (auto &p : *dir) {
1771 CDentry *dn = p.second;
1772 CInode *in = dn->get_linkage()->get_inode();
1773
1774 num_exported++;
1775
1776 // -- dentry
1777 dout(7) << " exporting " << *dn << dendl;
1778
1779 // dn name
1780 encode(dn->get_name(), exportbl);
1781 encode(dn->last, exportbl);
1782
1783 // state
1784 dn->encode_export(exportbl);
1785
1786 // points to...
1787
1788 // null dentry?
1789 if (dn->get_linkage()->is_null()) {
1790 exportbl.append("N", 1); // null dentry
1791 continue;
1792 }
1793
1794 if (dn->get_linkage()->is_remote()) {
1795 inodeno_t ino = dn->get_linkage()->get_remote_ino();
1796 unsigned char d_type = dn->get_linkage()->get_remote_d_type();
1797 auto& alternate_name = dn->alternate_name;
1798 // remote link
1799 CDentry::encode_remote(ino, d_type, alternate_name, exportbl);
1800 continue;
1801 }
1802
1803 // primary link
1804 // -- inode
1805 exportbl.append("i", 1); // inode dentry
1806
1807 ENCODE_START(2, 1, exportbl);
1808 encode_export_inode(in, exportbl, exported_client_map, exported_client_metadata_map); // encode, and (update state for) export
1809 encode(dn->alternate_name, exportbl);
1810 ENCODE_FINISH(exportbl);
1811
1812 // directory?
1813 auto&& dfs = in->get_dirfrags();
1814 for (const auto& t : dfs) {
1815 if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
1816 // include nested dirfrag
1817 ceph_assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
1818 subdirs.push_back(t); // it's ours, recurse (later)
1819 }
1820 }
1821 }
1822
1823 ENCODE_FINISH(exportbl);
1824 // subdirs
1825 for (const auto &dir : subdirs) {
1826 encode_export_dir(exportbl, dir, exported_client_map, exported_client_metadata_map, num_exported);
1827 }
1828 }
1829
1830 void Migrator::finish_export_dir(CDir *dir, mds_rank_t peer,
1831 map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
1832 MDSContext::vec& finished, int *num_dentries)
1833 {
1834 dout(10) << *dir << dendl;
1835
1836 // release open_by
1837 dir->clear_replica_map();
1838
1839 // mark
1840 ceph_assert(dir->is_auth());
1841 dir->state_clear(CDir::STATE_AUTH);
1842 dir->remove_bloom();
1843 dir->replica_nonce = CDir::EXPORT_NONCE;
1844
1845 if (dir->is_dirty())
1846 dir->mark_clean();
1847
1848 // suck up all waiters
1849 dir->take_waiting(CDir::WAIT_ANY_MASK, finished); // all dir waiters
1850
1851 // pop
1852 dir->finish_export();
1853
1854 // dentries
1855 std::vector<CDir*> subdirs;
1856 for (auto &p : *dir) {
1857 CDentry *dn = p.second;
1858 CInode *in = dn->get_linkage()->get_inode();
1859
1860 // dentry
1861 dn->finish_export();
1862
1863 // inode?
1864 if (dn->get_linkage()->is_primary()) {
1865 finish_export_inode(in, peer, peer_imported[in->ino()], finished);
1866
1867 // subdirs?
1868 auto&& dirs = in->get_nested_dirfrags();
1869 subdirs.insert(std::end(subdirs), std::begin(dirs), std::end(dirs));
1870 }
1871
1872 mdcache->touch_dentry_bottom(dn); // move dentry to tail of LRU
1873 ++(*num_dentries);
1874 }
1875
1876 // subdirs
1877 for (const auto& dir : subdirs) {
1878 finish_export_dir(dir, peer, peer_imported, finished, num_dentries);
1879 }
1880 }
1881
1882 class C_MDS_ExportFinishLogged : public MigratorLogContext {
1883 CDir *dir;
1884 public:
1885 C_MDS_ExportFinishLogged(Migrator *m, CDir *d) : MigratorLogContext(m), dir(d) {}
1886 void finish(int r) override {
1887 mig->export_logged_finish(dir);
1888 }
1889 };
1890
1891
1892 /*
1893 * i should get an export_ack from the export target.
1894 */
1895 void Migrator::handle_export_ack(const cref_t<MExportDirAck> &m)
1896 {
1897 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
1898 mds_rank_t dest(m->get_source().num());
1899 ceph_assert(dir);
1900 ceph_assert(dir->is_frozen_tree_root()); // i'm exporting!
1901
1902 // yay!
1903 dout(7) << *dir << dendl;
1904
1905 mds->hit_export_target(dest, -1);
1906
1907 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1908 ceph_assert(it != export_state.end());
1909 ceph_assert(it->second.state == EXPORT_EXPORTING);
1910 ceph_assert(it->second.tid == m->get_tid());
1911
1912 auto bp = m->imported_caps.cbegin();
1913 decode(it->second.peer_imported, bp);
1914
1915 it->second.state = EXPORT_LOGGINGFINISH;
1916 ceph_assert(g_conf()->mds_kill_export_at != 9);
1917 set<CDir*> bounds;
1918 mdcache->get_subtree_bounds(dir, bounds);
1919
1920 // log completion.
1921 // include export bounds, to ensure they're in the journal.
1922 EExport *le = new EExport(mds->mdlog, dir, it->second.peer);;
1923 mds->mdlog->start_entry(le);
1924
1925 le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
1926 le->metablob.add_dir(dir, false);
1927 for (set<CDir*>::iterator p = bounds.begin();
1928 p != bounds.end();
1929 ++p) {
1930 CDir *bound = *p;
1931 le->get_bounds().insert(bound->dirfrag());
1932 le->metablob.add_dir_context(bound);
1933 le->metablob.add_dir(bound, false);
1934 }
1935
1936 // list us second, them first.
1937 // this keeps authority().first in sync with subtree auth state in the journal.
1938 mdcache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid());
1939
1940 // log export completion, then finish (unfreeze, trigger finish context, etc.)
1941 mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir));
1942 mds->mdlog->flush();
1943 ceph_assert(g_conf()->mds_kill_export_at != 10);
1944 }
1945
1946 void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
1947 {
1948 dout(7) << *dir << dendl;
1949
1950 ceph_assert(stat.state == EXPORT_CANCELLING);
1951
1952 if (stat.notify_ack_waiting.empty()) {
1953 stat.state = EXPORT_CANCELLED;
1954 return;
1955 }
1956
1957 dir->auth_pin(this);
1958
1959 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
1960 p != stat.notify_ack_waiting.end();
1961 ++p) {
1962 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
1963 pair<int,int>(mds->get_nodeid(), stat.peer),
1964 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
1965 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
1966 notify->get_bounds().push_back((*i)->dirfrag());
1967 mds->send_message_mds(notify, *p);
1968 }
1969 }
1970
1971 /*
1972 * this happens if the dest failes after i send the export data but before it is acked
1973 * that is, we don't know they safely received and logged it, so we reverse our changes
1974 * and go on.
1975 */
1976 void Migrator::export_reverse(CDir *dir, export_state_t& stat)
1977 {
1978 dout(7) << *dir << dendl;
1979
1980 set<CInode*> to_eval;
1981
1982 set<CDir*> bounds;
1983 mdcache->get_subtree_bounds(dir, bounds);
1984
1985 // remove exporting pins
1986 std::deque<CDir*> rq;
1987 rq.push_back(dir);
1988 while (!rq.empty()) {
1989 CDir *t = rq.front();
1990 rq.pop_front();
1991 t->abort_export();
1992 for (auto &p : *t) {
1993 CDentry *dn = p.second;
1994 dn->abort_export();
1995 if (!dn->get_linkage()->is_primary())
1996 continue;
1997 CInode *in = dn->get_linkage()->get_inode();
1998 in->abort_export();
1999 if (in->state_test(CInode::STATE_EVALSTALECAPS)) {
2000 in->state_clear(CInode::STATE_EVALSTALECAPS);
2001 to_eval.insert(in);
2002 }
2003 if (in->is_dir()) {
2004 auto&& dirs = in->get_nested_dirfrags();
2005 for (const auto& dir : dirs) {
2006 rq.push_back(dir);
2007 }
2008 }
2009 }
2010 }
2011
2012 // unpin bounds
2013 for (auto bd : bounds) {
2014 bd->put(CDir::PIN_EXPORTBOUND);
2015 bd->state_clear(CDir::STATE_EXPORTBOUND);
2016 }
2017
2018 // notify bystanders
2019 export_notify_abort(dir, stat, bounds);
2020
2021 // unfreeze tree, with possible subtree merge.
2022 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
2023
2024 // process delayed expires
2025 mdcache->process_delayed_expire(dir);
2026
2027 dir->unfreeze_tree();
2028 mdcache->try_subtree_merge(dir);
2029
2030 // revoke/resume stale caps
2031 for (auto in : to_eval) {
2032 bool need_issue = false;
2033 for (auto &p : in->client_caps) {
2034 Capability *cap = &p.second;
2035 if (!cap->is_stale()) {
2036 need_issue = true;
2037 break;
2038 }
2039 }
2040 if (need_issue &&
2041 (!in->is_auth() || !mds->locker->eval(in, CEPH_CAP_LOCKS)))
2042 mds->locker->issue_caps(in);
2043 }
2044
2045 mdcache->show_cache();
2046 }
2047
2048
2049 /*
2050 * once i get the ack, and logged the EExportFinish(true),
2051 * send notifies (if any), otherwise go straight to finish.
2052 *
2053 */
2054 void Migrator::export_logged_finish(CDir *dir)
2055 {
2056 dout(7) << *dir << dendl;
2057
2058 export_state_t& stat = export_state[dir];
2059
2060 // send notifies
2061 set<CDir*> bounds;
2062 mdcache->get_subtree_bounds(dir, bounds);
2063
2064 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
2065 p != stat.notify_ack_waiting.end();
2066 ++p) {
2067 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
2068 pair<int,int>(mds->get_nodeid(), stat.peer),
2069 pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
2070
2071 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2072 notify->get_bounds().push_back((*i)->dirfrag());
2073
2074 mds->send_message_mds(notify, *p);
2075 }
2076
2077 // wait for notifyacks
2078 stat.state = EXPORT_NOTIFYING;
2079 ceph_assert(g_conf()->mds_kill_export_at != 11);
2080
2081 // no notifies to wait for?
2082 if (stat.notify_ack_waiting.empty()) {
2083 export_finish(dir); // skip notify/notify_ack stage.
2084 } else {
2085 // notify peer to send cap import messages to clients
2086 if (!mds->is_cluster_degraded() ||
2087 mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) {
2088 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), false, stat.tid), stat.peer);
2089 } else {
2090 dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
2091 }
2092 }
2093 }
2094
2095 /*
2096 * warning:
2097 * i'll get an ack from each bystander.
2098 * when i get them all, do the export.
2099 * notify:
2100 * i'll get an ack from each bystander.
2101 * when i get them all, unfreeze and send the finish.
2102 */
2103 void Migrator::handle_export_notify_ack(const cref_t<MExportDirNotifyAck> &m)
2104 {
2105 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
2106 mds_rank_t dest(m->get_source().num());
2107 ceph_assert(dir);
2108 mds_rank_t from = mds_rank_t(m->get_source().num());
2109
2110 mds->hit_export_target(dest, -1);
2111
2112 auto export_state_entry = export_state.find(dir);
2113 if (export_state_entry != export_state.end()) {
2114 export_state_t& stat = export_state_entry->second;
2115 if (stat.state == EXPORT_WARNING &&
2116 stat.warning_ack_waiting.erase(from)) {
2117 // exporting. process warning.
2118 dout(7) << "from " << m->get_source()
2119 << ": exporting, processing warning on " << *dir << dendl;
2120 if (stat.warning_ack_waiting.empty())
2121 export_go(dir); // start export.
2122 } else if (stat.state == EXPORT_NOTIFYING &&
2123 stat.notify_ack_waiting.erase(from)) {
2124 // exporting. process notify.
2125 dout(7) << "from " << m->get_source()
2126 << ": exporting, processing notify on " << *dir << dendl;
2127 if (stat.notify_ack_waiting.empty())
2128 export_finish(dir);
2129 } else if (stat.state == EXPORT_CANCELLING &&
2130 m->get_new_auth().second == CDIR_AUTH_UNKNOWN && // not warning ack
2131 stat.notify_ack_waiting.erase(from)) {
2132 dout(7) << "from " << m->get_source()
2133 << ": cancelling export, processing notify on " << *dir << dendl;
2134 if (stat.notify_ack_waiting.empty()) {
2135 export_cancel_finish(export_state_entry);
2136 }
2137 }
2138 }
2139 else {
2140 auto import_state_entry = import_state.find(dir->dirfrag());
2141 if (import_state_entry != import_state.end()) {
2142 import_state_t& stat = import_state_entry->second;
2143 if (stat.state == IMPORT_ABORTING) {
2144 // reversing import
2145 dout(7) << "from " << m->get_source()
2146 << ": aborting import on " << *dir << dendl;
2147 ceph_assert(stat.bystanders.count(from));
2148 stat.bystanders.erase(from);
2149 if (stat.bystanders.empty())
2150 import_reverse_unfreeze(dir);
2151 }
2152 }
2153 }
2154 }
2155
2156 void Migrator::export_finish(CDir *dir)
2157 {
2158 dout(3) << *dir << dendl;
2159
2160 ceph_assert(g_conf()->mds_kill_export_at != 12);
2161 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
2162 if (it == export_state.end()) {
2163 dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << dendl;
2164 return;
2165 }
2166
2167 // send finish/commit to new auth
2168 if (!mds->is_cluster_degraded() ||
2169 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
2170 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), true, it->second.tid), it->second.peer);
2171 } else {
2172 dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl;
2173 }
2174 ceph_assert(g_conf()->mds_kill_export_at != 13);
2175
2176 // finish export (adjust local cache state)
2177 int num_dentries = 0;
2178 MDSContext::vec finished;
2179 finish_export_dir(dir, it->second.peer,
2180 it->second.peer_imported, finished, &num_dentries);
2181
2182 ceph_assert(!dir->is_auth());
2183 mdcache->adjust_subtree_auth(dir, it->second.peer);
2184
2185 // unpin bounds
2186 set<CDir*> bounds;
2187 mdcache->get_subtree_bounds(dir, bounds);
2188 for (set<CDir*>::iterator p = bounds.begin();
2189 p != bounds.end();
2190 ++p) {
2191 CDir *bd = *p;
2192 bd->put(CDir::PIN_EXPORTBOUND);
2193 bd->state_clear(CDir::STATE_EXPORTBOUND);
2194 }
2195
2196 if (dir->state_test(CDir::STATE_AUXSUBTREE))
2197 dir->state_clear(CDir::STATE_AUXSUBTREE);
2198
2199 // discard delayed expires
2200 mdcache->discard_delayed_expire(dir);
2201
2202 dout(7) << "unfreezing" << dendl;
2203
2204 // unfreeze tree, with possible subtree merge.
2205 // (we do this _after_ removing EXPORTBOUND pins, to allow merges)
2206 dir->unfreeze_tree();
2207 mdcache->try_subtree_merge(dir);
2208
2209 // no more auth subtree? clear scatter dirty
2210 if (!dir->get_inode()->is_auth() &&
2211 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2212 dir->get_inode()->clear_scatter_dirty();
2213 // wake up scatter_nudge waiters
2214 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, finished);
2215 }
2216
2217 if (!finished.empty())
2218 mds->queue_waiters(finished);
2219
2220 MutationRef mut = std::move(it->second.mut);
2221 auto parent = std::move(it->second.parent);
2222 // remove from exporting list, clean up state
2223 total_exporting_size -= it->second.approx_size;
2224 export_state.erase(it);
2225
2226 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
2227 dir->clear_exporting();
2228
2229 mdcache->show_subtrees();
2230 audit();
2231
2232 mdcache->trim(num_dentries); // try trimming exported dentries
2233
2234 // send pending import_maps?
2235 mdcache->maybe_send_pending_resolves();
2236
2237 // drop locks, unpin path
2238 if (mut) {
2239 mds->locker->drop_locks(mut.get());
2240 mut->cleanup();
2241 }
2242
2243 if (parent)
2244 child_export_finish(parent, true);
2245
2246 maybe_do_queued_export();
2247 }
2248
2249
2250
2251 class C_MDS_ExportDiscover : public MigratorContext {
2252 public:
2253 C_MDS_ExportDiscover(Migrator *mig, const cref_t<MExportDirDiscover>& m) : MigratorContext(mig), m(m) {}
2254 void finish(int r) override {
2255 mig->handle_export_discover(m, true);
2256 }
2257 private:
2258 cref_t<MExportDirDiscover> m;
2259 };
2260
2261 class C_MDS_ExportDiscoverFactory : public MDSContextFactory {
2262 public:
2263 C_MDS_ExportDiscoverFactory(Migrator *mig, cref_t<MExportDirDiscover> m) : mig(mig), m(m) {}
2264 MDSContext *build() {
2265 return new C_MDS_ExportDiscover(mig, m);
2266 }
2267 private:
2268 Migrator *mig;
2269 cref_t<MExportDirDiscover> m;
2270 };
2271
2272 // ==========================================================
2273 // IMPORT
2274
2275 void Migrator::handle_export_discover(const cref_t<MExportDirDiscover> &m, bool started)
2276 {
2277 mds_rank_t from = m->get_source_mds();
2278 ceph_assert(from != mds->get_nodeid());
2279
2280 dout(7) << m->get_path() << dendl;
2281
2282 // note import state
2283 dirfrag_t df = m->get_dirfrag();
2284
2285 if (!mds->is_active()) {
2286 dout(7) << " not active, send NACK " << dendl;
2287 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid(), false), from);
2288 return;
2289 }
2290
2291 // only start discovering on this message once.
2292 import_state_t *p_state;
2293 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2294 if (!started) {
2295 ceph_assert(it == import_state.end());
2296 p_state = &import_state[df];
2297 p_state->state = IMPORT_DISCOVERING;
2298 p_state->peer = from;
2299 p_state->tid = m->get_tid();
2300 } else {
2301 // am i retrying after ancient path_traverse results?
2302 if (it == import_state.end() ||
2303 it->second.peer != from ||
2304 it->second.tid != m->get_tid()) {
2305 dout(7) << " dropping obsolete message" << dendl;
2306 return;
2307 }
2308 ceph_assert(it->second.state == IMPORT_DISCOVERING);
2309 p_state = &it->second;
2310 }
2311
2312 C_MDS_ExportDiscoverFactory cf(this, m);
2313 if (!mdcache->is_open()) {
2314 dout(10) << " waiting for root" << dendl;
2315 mds->mdcache->wait_for_open(cf.build());
2316 return;
2317 }
2318
2319 ceph_assert(g_conf()->mds_kill_import_at != 1);
2320
2321 // do we have it?
2322 CInode *in = mdcache->get_inode(m->get_dirfrag().ino);
2323 if (!in) {
2324 // must discover it!
2325 filepath fpath(m->get_path());
2326 vector<CDentry*> trace;
2327 MDRequestRef null_ref;
2328 int r = mdcache->path_traverse(null_ref, cf, fpath,
2329 MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_PATH_LOCKED,
2330 &trace);
2331 if (r > 0) return;
2332 if (r < 0) {
2333 dout(7) << "failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
2334 ceph_abort(); // this shouldn't happen if the auth pins its path properly!!!!
2335 }
2336
2337 ceph_abort(); // this shouldn't happen; the get_inode above would have succeeded.
2338 }
2339
2340 // yay
2341 dout(7) << "have " << df << " inode " << *in << dendl;
2342
2343 p_state->state = IMPORT_DISCOVERED;
2344
2345 // pin inode in the cache (for now)
2346 ceph_assert(in->is_dir());
2347 in->get(CInode::PIN_IMPORTING);
2348
2349 // reply
2350 dout(7) << " sending export_discover_ack on " << *in << dendl;
2351 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid()), p_state->peer);
2352 ceph_assert(g_conf()->mds_kill_import_at != 2);
2353 }
2354
2355 void Migrator::import_reverse_discovering(dirfrag_t df)
2356 {
2357 import_state.erase(df);
2358 }
2359
2360 void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
2361 {
2362 // unpin base
2363 diri->put(CInode::PIN_IMPORTING);
2364 import_state.erase(df);
2365 }
2366
2367 void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
2368 {
2369 set<CDir*> bounds;
2370 mdcache->map_dirfrag_set(stat.bound_ls, bounds);
2371 import_remove_pins(dir, bounds);
2372 import_reverse_final(dir);
2373 }
2374
2375 void Migrator::handle_export_cancel(const cref_t<MExportDirCancel> &m)
2376 {
2377 dout(7) << "on " << m->get_dirfrag() << dendl;
2378 dirfrag_t df = m->get_dirfrag();
2379 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2380 if (it == import_state.end()) {
2381 ceph_abort_msg("got export_cancel in weird state");
2382 } else if (it->second.state == IMPORT_DISCOVERING) {
2383 import_reverse_discovering(df);
2384 } else if (it->second.state == IMPORT_DISCOVERED) {
2385 CInode *in = mdcache->get_inode(df.ino);
2386 ceph_assert(in);
2387 import_reverse_discovered(df, in);
2388 } else if (it->second.state == IMPORT_PREPPING) {
2389 CDir *dir = mdcache->get_dirfrag(df);
2390 ceph_assert(dir);
2391 import_reverse_prepping(dir, it->second);
2392 } else if (it->second.state == IMPORT_PREPPED) {
2393 CDir *dir = mdcache->get_dirfrag(df);
2394 ceph_assert(dir);
2395 set<CDir*> bounds;
2396 mdcache->get_subtree_bounds(dir, bounds);
2397 import_remove_pins(dir, bounds);
2398 // adjust auth back to the exportor
2399 mdcache->adjust_subtree_auth(dir, it->second.peer);
2400 import_reverse_unfreeze(dir);
2401 } else {
2402 ceph_abort_msg("got export_cancel in weird state");
2403 }
2404 }
2405
2406 class C_MDS_ExportPrep : public MigratorContext {
2407 public:
2408 C_MDS_ExportPrep(Migrator *mig, const cref_t<MExportDirPrep>& m) : MigratorContext(mig), m(m) {}
2409 void finish(int r) override {
2410 mig->handle_export_prep(m, true);
2411 }
2412 private:
2413 cref_t<MExportDirPrep> m;
2414 };
2415
2416 class C_MDS_ExportPrepFactory : public MDSContextFactory {
2417 public:
2418 C_MDS_ExportPrepFactory(Migrator *mig, cref_t<MExportDirPrep> m) : mig(mig), m(m) {}
2419 MDSContext *build() {
2420 return new C_MDS_ExportPrep(mig, m);
2421 }
2422 private:
2423 Migrator *mig;
2424 cref_t<MExportDirPrep> m;
2425 };
2426
2427 void Migrator::decode_export_prep_trace(bufferlist::const_iterator& blp, mds_rank_t oldauth, MDSContext::vec& finished)
2428 {
2429 DECODE_START(1, blp);
2430 dirfrag_t df;
2431 decode(df, blp);
2432 char start;
2433 decode(start, blp);
2434 dout(10) << " trace from " << df << " start " << start << dendl;
2435
2436 CDir *cur = nullptr;
2437 if (start == 'd') {
2438 cur = mdcache->get_dirfrag(df);
2439 ceph_assert(cur);
2440 dout(10) << " had " << *cur << dendl;
2441 } else if (start == 'f') {
2442 CInode *in = mdcache->get_inode(df.ino);
2443 ceph_assert(in);
2444 dout(10) << " had " << *in << dendl;
2445 mdcache->decode_replica_dir(cur, blp, in, oldauth, finished);
2446 dout(10) << " added " << *cur << dendl;
2447 } else if (start == '-') {
2448 // nothing
2449 } else
2450 ceph_abort_msg("unrecognized start char");
2451
2452 while (!blp.end()) {
2453 CDentry *dn = nullptr;
2454 mdcache->decode_replica_dentry(dn, blp, cur, finished);
2455 dout(10) << " added " << *dn << dendl;
2456 CInode *in = nullptr;
2457 mdcache->decode_replica_inode(in, blp, dn, finished);
2458 dout(10) << " added " << *in << dendl;
2459 if (blp.end())
2460 break;
2461 mdcache->decode_replica_dir(cur, blp, in, oldauth, finished);
2462 dout(10) << " added " << *cur << dendl;
2463 }
2464
2465 DECODE_FINISH(blp);
2466 }
2467
2468 void Migrator::handle_export_prep(const cref_t<MExportDirPrep> &m, bool did_assim)
2469 {
2470 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
2471 ceph_assert(oldauth != mds->get_nodeid());
2472
2473 CDir *dir;
2474 CInode *diri;
2475 MDSContext::vec finished;
2476
2477 // assimilate root dir.
2478 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
2479 if (!did_assim) {
2480 ceph_assert(it != import_state.end());
2481 ceph_assert(it->second.state == IMPORT_DISCOVERED);
2482 ceph_assert(it->second.peer == oldauth);
2483 diri = mdcache->get_inode(m->get_dirfrag().ino);
2484 ceph_assert(diri);
2485 auto p = m->basedir.cbegin();
2486 mdcache->decode_replica_dir(dir, p, diri, oldauth, finished);
2487 dout(7) << "on " << *dir << " (first pass)" << dendl;
2488 } else {
2489 if (it == import_state.end() ||
2490 it->second.peer != oldauth ||
2491 it->second.tid != m->get_tid()) {
2492 dout(7) << "obsolete message, dropping" << dendl;
2493 return;
2494 }
2495 ceph_assert(it->second.state == IMPORT_PREPPING);
2496 ceph_assert(it->second.peer == oldauth);
2497
2498 dir = mdcache->get_dirfrag(m->get_dirfrag());
2499 ceph_assert(dir);
2500 dout(7) << "on " << *dir << " (subsequent pass)" << dendl;
2501 diri = dir->get_inode();
2502 }
2503 ceph_assert(dir->is_auth() == false);
2504
2505 mdcache->show_subtrees();
2506
2507 // build import bound map
2508 map<inodeno_t, fragset_t> import_bound_fragset;
2509 for (const auto &bound : m->get_bounds()) {
2510 dout(10) << " bound " << bound << dendl;
2511 import_bound_fragset[bound.ino].insert_raw(bound.frag);
2512 }
2513 // assimilate contents?
2514 if (!did_assim) {
2515 dout(7) << "doing assim on " << *dir << dendl;
2516
2517 // change import state
2518 it->second.state = IMPORT_PREPPING;
2519 it->second.bound_ls = m->get_bounds();
2520 it->second.bystanders = m->get_bystanders();
2521 ceph_assert(g_conf()->mds_kill_import_at != 3);
2522
2523 // bystander list
2524 dout(7) << "bystanders are " << it->second.bystanders << dendl;
2525
2526 // move pin to dir
2527 diri->put(CInode::PIN_IMPORTING);
2528 dir->get(CDir::PIN_IMPORTING);
2529 dir->state_set(CDir::STATE_IMPORTING);
2530
2531 // assimilate traces to exports
2532 // each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
2533 for (const auto &bl : m->traces) {
2534 auto blp = bl.cbegin();
2535 decode_export_prep_trace(blp, oldauth, finished);
2536 }
2537
2538 // make bound sticky
2539 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2540 p != import_bound_fragset.end();
2541 ++p) {
2542 p->second.simplify();
2543 CInode *in = mdcache->get_inode(p->first);
2544 ceph_assert(in);
2545 in->get_stickydirs();
2546 dout(7) << " set stickydirs on bound inode " << *in << dendl;
2547 }
2548
2549 } else {
2550 dout(7) << " not doing assim on " << *dir << dendl;
2551 }
2552
2553 MDSGatherBuilder gather(g_ceph_context);
2554
2555 if (!finished.empty())
2556 mds->queue_waiters(finished);
2557
2558
2559 bool success = true;
2560 if (mds->is_active()) {
2561 // open all bounds
2562 set<CDir*> import_bounds;
2563 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2564 p != import_bound_fragset.end();
2565 ++p) {
2566 CInode *in = mdcache->get_inode(p->first);
2567 ceph_assert(in);
2568
2569 // map fragset into a frag_t list, based on the inode fragtree
2570 frag_vec_t leaves;
2571 for (const auto& frag : p->second) {
2572 in->dirfragtree.get_leaves_under(frag, leaves);
2573 }
2574 dout(10) << " bound inode " << p->first << " fragset " << p->second << " maps to " << leaves << dendl;
2575
2576 for (const auto& leaf : leaves) {
2577 CDir *bound = mdcache->get_dirfrag(dirfrag_t(p->first, leaf));
2578 if (!bound) {
2579 dout(7) << " opening bounding dirfrag " << leaf << " on " << *in << dendl;
2580 mdcache->open_remote_dirfrag(in, leaf, gather.new_sub());
2581 continue;
2582 }
2583
2584 if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
2585 dout(7) << " pinning import bound " << *bound << dendl;
2586 bound->get(CDir::PIN_IMPORTBOUND);
2587 bound->state_set(CDir::STATE_IMPORTBOUND);
2588 } else {
2589 dout(7) << " already pinned import bound " << *bound << dendl;
2590 }
2591 import_bounds.insert(bound);
2592 }
2593 }
2594
2595 if (gather.has_subs()) {
2596 C_MDS_ExportPrepFactory cf(this, m);
2597 gather.set_finisher(cf.build());
2598 gather.activate();
2599 return;
2600 }
2601
2602 dout(7) << " all ready, noting auth and freezing import region" << dendl;
2603
2604 if (!mdcache->is_readonly() &&
2605 // for pinning scatter gather. loner has a higher chance to get wrlock
2606 diri->filelock.can_wrlock(diri->get_loner()) &&
2607 diri->nestlock.can_wrlock(diri->get_loner())) {
2608 it->second.mut = new MutationImpl();
2609 // force some locks. hacky.
2610 mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);
2611 mds->locker->wrlock_force(&dir->inode->nestlock, it->second.mut);
2612
2613 // note that i am an ambiguous auth for this subtree.
2614 // specify bounds, since the exporter explicitly defines the region.
2615 mdcache->adjust_bounded_subtree_auth(dir, import_bounds,
2616 pair<int,int>(oldauth, mds->get_nodeid()));
2617 mdcache->verify_subtree_bounds(dir, import_bounds);
2618 // freeze.
2619 dir->_freeze_tree();
2620 // note new state
2621 it->second.state = IMPORT_PREPPED;
2622 } else {
2623 dout(7) << " couldn't acquire all needed locks, failing. " << *dir << dendl;
2624 success = false;
2625 }
2626 } else {
2627 dout(7) << " not active, failing. " << *dir << dendl;
2628 success = false;
2629 }
2630
2631 if (!success)
2632 import_reverse_prepping(dir, it->second);
2633
2634 // ok!
2635 dout(7) << " sending export_prep_ack on " << *dir << dendl;
2636 mds->send_message(make_message<MExportDirPrepAck>(dir->dirfrag(), success, m->get_tid()), m->get_connection());
2637
2638 ceph_assert(g_conf()->mds_kill_import_at != 4);
2639 }
2640
2641
2642
2643
2644 class C_MDS_ImportDirLoggedStart : public MigratorLogContext {
2645 dirfrag_t df;
2646 CDir *dir;
2647 mds_rank_t from;
2648 public:
2649 map<client_t,pair<Session*,uint64_t> > imported_session_map;
2650
2651 C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
2652 MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
2653 dir->get(CDir::PIN_PTRWAITER);
2654 }
2655 void finish(int r) override {
2656 mig->import_logged_start(df, dir, from, imported_session_map);
2657 dir->put(CDir::PIN_PTRWAITER);
2658 }
2659 };
2660
2661 void Migrator::handle_export_dir(const cref_t<MExportDir> &m)
2662 {
2663 ceph_assert(g_conf()->mds_kill_import_at != 5);
2664 CDir *dir = mdcache->get_dirfrag(m->dirfrag);
2665 ceph_assert(dir);
2666
2667 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
2668 dout(7) << "importing " << *dir << " from " << oldauth << dendl;
2669
2670 ceph_assert(!dir->is_auth());
2671 ceph_assert(dir->freeze_tree_state);
2672
2673 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->dirfrag);
2674 ceph_assert(it != import_state.end());
2675 ceph_assert(it->second.state == IMPORT_PREPPED);
2676 ceph_assert(it->second.tid == m->get_tid());
2677 ceph_assert(it->second.peer == oldauth);
2678
2679 if (!dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()))
2680 dir->get_inode()->dirfragtree.force_to_leaf(g_ceph_context, dir->get_frag());
2681
2682 mdcache->show_subtrees();
2683
2684 C_MDS_ImportDirLoggedStart *onlogged = new C_MDS_ImportDirLoggedStart(this, dir, oldauth);
2685
2686 // start the journal entry
2687 EImportStart *le = new EImportStart(mds->mdlog, dir->dirfrag(), m->bounds, oldauth);
2688 mds->mdlog->start_entry(le);
2689
2690 le->metablob.add_dir_context(dir);
2691
2692 // adjust auth (list us _first_)
2693 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth);
2694
2695 // new client sessions, open these after we journal
2696 // include imported sessions in EImportStart
2697 auto cmp = m->client_map.cbegin();
2698 map<client_t,entity_inst_t> client_map;
2699 map<client_t,client_metadata_t> client_metadata_map;
2700 decode(client_map, cmp);
2701 decode(client_metadata_map, cmp);
2702 ceph_assert(cmp.end());
2703 le->cmapv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
2704 onlogged->imported_session_map);
2705 encode(client_map, le->client_map, mds->mdsmap->get_up_features());
2706 encode(client_metadata_map, le->client_map);
2707
2708 auto blp = m->export_data.cbegin();
2709 int num_imported_inodes = 0;
2710 while (!blp.end()) {
2711 decode_import_dir(blp,
2712 oldauth,
2713 dir, // import root
2714 le,
2715 mds->mdlog->get_current_segment(),
2716 it->second.peer_exports,
2717 it->second.updated_scatterlocks,
2718 num_imported_inodes);
2719 }
2720 dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
2721
2722 // include bounds in EImportStart
2723 set<CDir*> import_bounds;
2724 for (const auto &bound : m->bounds) {
2725 CDir *bd = mdcache->get_dirfrag(bound);
2726 ceph_assert(bd);
2727 le->metablob.add_dir(bd, false); // note that parent metadata is already in the event
2728 import_bounds.insert(bd);
2729 }
2730 mdcache->verify_subtree_bounds(dir, import_bounds);
2731
2732 // adjust popularity
2733 mds->balancer->add_import(dir);
2734
2735 dout(7) << "did " << *dir << dendl;
2736
2737 // note state
2738 it->second.state = IMPORT_LOGGINGSTART;
2739 ceph_assert(g_conf()->mds_kill_import_at != 6);
2740
2741 // log it
2742 mds->mdlog->submit_entry(le, onlogged);
2743 mds->mdlog->flush();
2744
2745 // some stats
2746 if (mds->logger) {
2747 mds->logger->inc(l_mds_imported);
2748 mds->logger->inc(l_mds_imported_inodes, num_imported_inodes);
2749 }
2750 }
2751
2752
2753 /*
2754 * this is an import helper
2755 * called by import_finish, and import_reverse and friends.
2756 */
2757 void Migrator::import_remove_pins(CDir *dir, set<CDir*>& bounds)
2758 {
2759 import_state_t& stat = import_state[dir->dirfrag()];
2760 // root
2761 dir->put(CDir::PIN_IMPORTING);
2762 dir->state_clear(CDir::STATE_IMPORTING);
2763
2764 // bounding inodes
2765 set<inodeno_t> did;
2766 for (list<dirfrag_t>::iterator p = stat.bound_ls.begin();
2767 p != stat.bound_ls.end();
2768 ++p) {
2769 if (did.count(p->ino))
2770 continue;
2771 did.insert(p->ino);
2772 CInode *in = mdcache->get_inode(p->ino);
2773 ceph_assert(in);
2774 in->put_stickydirs();
2775 }
2776
2777 if (stat.state == IMPORT_PREPPING) {
2778 for (auto bd : bounds) {
2779 if (bd->state_test(CDir::STATE_IMPORTBOUND)) {
2780 bd->put(CDir::PIN_IMPORTBOUND);
2781 bd->state_clear(CDir::STATE_IMPORTBOUND);
2782 }
2783 }
2784 } else if (stat.state >= IMPORT_PREPPED) {
2785 // bounding dirfrags
2786 for (auto bd : bounds) {
2787 ceph_assert(bd->state_test(CDir::STATE_IMPORTBOUND));
2788 bd->put(CDir::PIN_IMPORTBOUND);
2789 bd->state_clear(CDir::STATE_IMPORTBOUND);
2790 }
2791 }
2792 }
2793
2794 class C_MDC_QueueContexts : public MigratorContext {
2795 public:
2796 MDSContext::vec contexts;
2797 C_MDC_QueueContexts(Migrator *m) : MigratorContext(m) {}
2798 void finish(int r) override {
2799 // execute contexts immediately after 'this' context
2800 get_mds()->queue_waiters_front(contexts);
2801 }
2802 };
2803
2804 /*
2805 * note: this does teh full work of reversing and import and cleaning up
2806 * state.
2807 * called by both handle_mds_failure and by handle_resolve (if we are
2808 * a survivor coping with an exporter failure+recovery).
2809 */
2810 void Migrator::import_reverse(CDir *dir)
2811 {
2812 dout(7) << *dir << dendl;
2813
2814 import_state_t& stat = import_state[dir->dirfrag()];
2815 stat.state = IMPORT_ABORTING;
2816
2817 set<CDir*> bounds;
2818 mdcache->get_subtree_bounds(dir, bounds);
2819
2820 // remove pins
2821 import_remove_pins(dir, bounds);
2822
2823 // update auth, with possible subtree merge.
2824 ceph_assert(dir->is_subtree_root());
2825 if (mds->is_resolve())
2826 mdcache->trim_non_auth_subtree(dir);
2827
2828 mdcache->adjust_subtree_auth(dir, stat.peer);
2829
2830 auto fin = new C_MDC_QueueContexts(this);
2831 if (!dir->get_inode()->is_auth() &&
2832 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2833 dir->get_inode()->clear_scatter_dirty();
2834 // wake up scatter_nudge waiters
2835 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2836 }
2837
2838 int num_dentries = 0;
2839 // adjust auth bits.
2840 std::deque<CDir*> q;
2841 q.push_back(dir);
2842 while (!q.empty()) {
2843 CDir *cur = q.front();
2844 q.pop_front();
2845
2846 // dir
2847 cur->abort_import();
2848
2849 for (auto &p : *cur) {
2850 CDentry *dn = p.second;
2851
2852 // dentry
2853 dn->state_clear(CDentry::STATE_AUTH);
2854 dn->clear_replica_map();
2855 dn->set_replica_nonce(CDentry::EXPORT_NONCE);
2856 if (dn->is_dirty())
2857 dn->mark_clean();
2858
2859 // inode?
2860 if (dn->get_linkage()->is_primary()) {
2861 CInode *in = dn->get_linkage()->get_inode();
2862 in->state_clear(CDentry::STATE_AUTH);
2863 in->clear_replica_map();
2864 in->set_replica_nonce(CInode::EXPORT_NONCE);
2865 if (in->is_dirty())
2866 in->mark_clean();
2867 in->clear_dirty_rstat();
2868 if (!in->has_subtree_root_dirfrag(mds->get_nodeid())) {
2869 in->clear_scatter_dirty();
2870 in->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2871 }
2872
2873 in->clear_dirty_parent();
2874
2875 in->clear_clientwriteable();
2876 in->state_clear(CInode::STATE_NEEDSRECOVER);
2877
2878 in->authlock.clear_gather();
2879 in->linklock.clear_gather();
2880 in->dirfragtreelock.clear_gather();
2881 in->filelock.clear_gather();
2882
2883 in->clear_file_locks();
2884
2885 // non-bounding dir?
2886 auto&& dfs = in->get_dirfrags();
2887 for (const auto& dir : dfs) {
2888 if (bounds.count(dir) == 0)
2889 q.push_back(dir);
2890 }
2891 }
2892
2893 mdcache->touch_dentry_bottom(dn); // move dentry to tail of LRU
2894 ++num_dentries;
2895 }
2896 }
2897
2898 dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
2899
2900 if (stat.state == IMPORT_ACKING) {
2901 // remove imported caps
2902 for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
2903 p != stat.peer_exports.end();
2904 ++p) {
2905 CInode *in = p->first;
2906 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
2907 q != p->second.end();
2908 ++q) {
2909 Capability *cap = in->get_client_cap(q->first);
2910 if (!cap) {
2911 ceph_assert(!stat.session_map.count(q->first));
2912 continue;
2913 }
2914 if (cap->is_importing())
2915 in->remove_client_cap(q->first);
2916 else
2917 cap->clear_clientwriteable();
2918 }
2919 in->put(CInode::PIN_IMPORTINGCAPS);
2920 }
2921 for (auto& p : stat.session_map) {
2922 Session *session = p.second.first;
2923 session->dec_importing();
2924 }
2925 }
2926
2927 // log our failure
2928 mds->mdlog->start_submit_entry(new EImportFinish(dir, false)); // log failure
2929
2930 mdcache->trim(num_dentries); // try trimming dentries
2931
2932 // notify bystanders; wait in aborting state
2933 import_notify_abort(dir, bounds);
2934 }
2935
2936 void Migrator::import_notify_finish(CDir *dir, set<CDir*>& bounds)
2937 {
2938 dout(7) << *dir << dendl;
2939
2940 import_state_t& stat = import_state[dir->dirfrag()];
2941 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2942 p != stat.bystanders.end();
2943 ++p) {
2944 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, false,
2945 pair<int,int>(stat.peer, mds->get_nodeid()),
2946 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
2947 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2948 notify->get_bounds().push_back((*i)->dirfrag());
2949 mds->send_message_mds(notify, *p);
2950 }
2951 }
2952
2953 void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)
2954 {
2955 dout(7) << *dir << dendl;
2956
2957 import_state_t& stat = import_state[dir->dirfrag()];
2958 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2959 p != stat.bystanders.end(); ) {
2960 if (mds->is_cluster_degraded() &&
2961 !mds->mdsmap->is_clientreplay_or_active_or_stopping(*p)) {
2962 // this can happen if both exporter and bystander fail in the same mdsmap epoch
2963 stat.bystanders.erase(p++);
2964 continue;
2965 }
2966 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
2967 mds_authority_t(stat.peer, mds->get_nodeid()),
2968 mds_authority_t(stat.peer, CDIR_AUTH_UNKNOWN));
2969 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2970 notify->get_bounds().push_back((*i)->dirfrag());
2971 mds->send_message_mds(notify, *p);
2972 ++p;
2973 }
2974 if (stat.bystanders.empty()) {
2975 dout(7) << "no bystanders, finishing reverse now" << dendl;
2976 import_reverse_unfreeze(dir);
2977 } else {
2978 ceph_assert(g_conf()->mds_kill_import_at != 10);
2979 }
2980 }
2981
2982 void Migrator::import_reverse_unfreeze(CDir *dir)
2983 {
2984 dout(7) << *dir << dendl;
2985 ceph_assert(!dir->is_auth());
2986 mdcache->discard_delayed_expire(dir);
2987 dir->unfreeze_tree();
2988 if (dir->is_subtree_root())
2989 mdcache->try_subtree_merge(dir);
2990 import_reverse_final(dir);
2991 }
2992
2993 void Migrator::import_reverse_final(CDir *dir)
2994 {
2995 dout(7) << *dir << dendl;
2996
2997 // clean up
2998 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
2999 ceph_assert(it != import_state.end());
3000
3001 MutationRef mut = it->second.mut;
3002 import_state.erase(it);
3003
3004 // send pending import_maps?
3005 mdcache->maybe_send_pending_resolves();
3006
3007 if (mut) {
3008 mds->locker->drop_locks(mut.get());
3009 mut->cleanup();
3010 }
3011
3012 mdcache->show_subtrees();
3013 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
3014 }
3015
3016
3017
3018
3019 void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
3020 map<client_t,pair<Session*,uint64_t> >& imported_session_map)
3021 {
3022 dout(7) << *dir << dendl;
3023
3024 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
3025 if (it == import_state.end() ||
3026 it->second.state != IMPORT_LOGGINGSTART) {
3027 dout(7) << "import " << df << " must have aborted" << dendl;
3028 mds->server->finish_force_open_sessions(imported_session_map);
3029 return;
3030 }
3031
3032 // note state
3033 it->second.state = IMPORT_ACKING;
3034
3035 ceph_assert(g_conf()->mds_kill_import_at != 7);
3036
3037 // force open client sessions and finish cap import
3038 mds->server->finish_force_open_sessions(imported_session_map, false);
3039
3040 map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
3041 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3042 p != it->second.peer_exports.end();
3043 ++p) {
3044 // parameter 'peer' is NONE, delay sending cap import messages to client
3045 finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map,
3046 p->second, imported_caps[p->first->ino()]);
3047 }
3048
3049 it->second.session_map.swap(imported_session_map);
3050
3051 // send notify's etc.
3052 dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
3053
3054 // test surviving observer of a failed migration that did not complete
3055 //assert(dir->replica_map.size() < 2 || mds->get_nodeid() != 0);
3056
3057 auto ack = make_message<MExportDirAck>(dir->dirfrag(), it->second.tid);
3058 encode(imported_caps, ack->imported_caps);
3059
3060 mds->send_message_mds(ack, from);
3061 ceph_assert(g_conf()->mds_kill_import_at != 8);
3062
3063 mdcache->show_subtrees();
3064 }
3065
3066 void Migrator::handle_export_finish(const cref_t<MExportDirFinish> &m)
3067 {
3068 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
3069 ceph_assert(dir);
3070 dout(7) << *dir << (m->is_last() ? " last" : "") << dendl;
3071
3072 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
3073 ceph_assert(it != import_state.end());
3074 ceph_assert(it->second.tid == m->get_tid());
3075
3076 import_finish(dir, false, m->is_last());
3077 }
3078
3079 void Migrator::import_finish(CDir *dir, bool notify, bool last)
3080 {
3081 dout(7) << *dir << dendl;
3082
3083 map<dirfrag_t,import_state_t>::iterator it = import_state.find(dir->dirfrag());
3084 ceph_assert(it != import_state.end());
3085 ceph_assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING);
3086
3087 if (it->second.state == IMPORT_ACKING) {
3088 ceph_assert(dir->is_auth());
3089 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
3090 }
3091
3092 // log finish
3093 ceph_assert(g_conf()->mds_kill_import_at != 9);
3094
3095 if (it->second.state == IMPORT_ACKING) {
3096 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3097 p != it->second.peer_exports.end();
3098 ++p) {
3099 CInode *in = p->first;
3100 ceph_assert(in->is_auth());
3101 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
3102 q != p->second.end();
3103 ++q) {
3104 auto r = it->second.session_map.find(q->first);
3105 if (r == it->second.session_map.end())
3106 continue;
3107
3108 Session *session = r->second.first;
3109 Capability *cap = in->get_client_cap(q->first);
3110 ceph_assert(cap);
3111 cap->merge(q->second, true);
3112 cap->clear_importing();
3113 mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
3114 q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
3115 }
3116 p->second.clear();
3117 in->replica_caps_wanted = 0;
3118 }
3119 for (auto& p : it->second.session_map) {
3120 Session *session = p.second.first;
3121 session->dec_importing();
3122 }
3123 }
3124
3125 if (!last) {
3126 ceph_assert(it->second.state == IMPORT_ACKING);
3127 it->second.state = IMPORT_FINISHING;
3128 return;
3129 }
3130
3131 // remove pins
3132 set<CDir*> bounds;
3133 mdcache->get_subtree_bounds(dir, bounds);
3134
3135 if (notify)
3136 import_notify_finish(dir, bounds);
3137
3138 import_remove_pins(dir, bounds);
3139
3140 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3141 it->second.peer_exports.swap(peer_exports);
3142
3143 // clear import state (we're done!)
3144 MutationRef mut = it->second.mut;
3145 import_state.erase(it);
3146
3147 mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
3148
3149 // process delayed expires
3150 mdcache->process_delayed_expire(dir);
3151
3152 // unfreeze tree, with possible subtree merge.
3153 dir->unfreeze_tree();
3154 mdcache->try_subtree_merge(dir);
3155
3156 mdcache->show_subtrees();
3157 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
3158
3159 if (mut) {
3160 mds->locker->drop_locks(mut.get());
3161 mut->cleanup();
3162 }
3163
3164 // re-eval imported caps
3165 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = peer_exports.begin();
3166 p != peer_exports.end();
3167 ++p) {
3168 if (p->first->is_auth())
3169 mds->locker->eval(p->first, CEPH_CAP_LOCKS, true);
3170 p->first->put(CInode::PIN_IMPORTINGCAPS);
3171 }
3172
3173 // send pending import_maps?
3174 mdcache->maybe_send_pending_resolves();
3175
3176 // did i just import mydir?
3177 if (dir->ino() == MDS_INO_MDSDIR(mds->get_nodeid()))
3178 mdcache->populate_mydir();
3179
3180 // is it empty?
3181 if (dir->get_num_head_items() == 0 &&
3182 !dir->inode->is_auth()) {
3183 // reexport!
3184 export_empty_import(dir);
3185 }
3186 }
3187
3188 void Migrator::decode_import_inode(CDentry *dn, bufferlist::const_iterator& blp,
3189 mds_rank_t oldauth, LogSegment *ls,
3190 map<CInode*, map<client_t,Capability::Export> >& peer_exports,
3191 list<ScatterLock*>& updated_scatterlocks)
3192 {
3193 CInode *in;
3194 bool added = false;
3195 DECODE_START(1, blp);
3196 dout(15) << " on " << *dn << dendl;
3197
3198 inodeno_t ino;
3199 snapid_t last;
3200 decode(ino, blp);
3201 decode(last, blp);
3202
3203 in = mdcache->get_inode(ino, last);
3204 if (!in) {
3205 in = new CInode(mds->mdcache, true, 2, last);
3206 added = true;
3207 }
3208
3209 // state after link -- or not! -sage
3210 in->decode_import(blp, ls); // cap imports are noted for later action
3211
3212 // caps
3213 decode_import_inode_caps(in, true, blp, peer_exports);
3214
3215 DECODE_FINISH(blp);
3216
3217 // link before state -- or not! -sage
3218 if (dn->get_linkage()->get_inode() != in) {
3219 ceph_assert(!dn->get_linkage()->get_inode());
3220 dn->dir->link_primary_inode(dn, in);
3221 }
3222
3223 if (in->is_dir())
3224 dn->dir->pop_lru_subdirs.push_back(&in->item_pop_lru);
3225
3226 // add inode?
3227 if (added) {
3228 mdcache->add_inode(in);
3229 dout(10) << "added " << *in << dendl;
3230 } else {
3231 dout(10) << " had " << *in << dendl;
3232 }
3233
3234 if (in->get_inode()->is_dirty_rstat())
3235 in->mark_dirty_rstat();
3236
3237 if (!in->get_inode()->client_ranges.empty())
3238 in->mark_clientwriteable();
3239
3240 // clear if dirtyscattered, since we're going to journal this
3241 // but not until we _actually_ finish the import...
3242 if (in->filelock.is_dirty()) {
3243 updated_scatterlocks.push_back(&in->filelock);
3244 mds->locker->mark_updated_scatterlock(&in->filelock);
3245 }
3246
3247 if (in->dirfragtreelock.is_dirty()) {
3248 updated_scatterlocks.push_back(&in->dirfragtreelock);
3249 mds->locker->mark_updated_scatterlock(&in->dirfragtreelock);
3250 }
3251
3252 // adjust replica list
3253 //assert(!in->is_replica(oldauth)); // not true on failed export
3254 in->add_replica(oldauth, CInode::EXPORT_NONCE);
3255 if (in->is_replica(mds->get_nodeid()))
3256 in->remove_replica(mds->get_nodeid());
3257
3258 if (in->snaplock.is_stable() &&
3259 in->snaplock.get_state() != LOCK_SYNC)
3260 mds->locker->try_eval(&in->snaplock, NULL);
3261
3262 if (in->policylock.is_stable() &&
3263 in->policylock.get_state() != LOCK_SYNC)
3264 mds->locker->try_eval(&in->policylock, NULL);
3265 }
3266
3267 void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
3268 bufferlist::const_iterator &blp,
3269 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
3270 {
3271 DECODE_START(1, blp);
3272 map<client_t,Capability::Export> cap_map;
3273 decode(cap_map, blp);
3274 if (auth_cap) {
3275 mempool::mds_co::compact_map<int32_t,int32_t> mds_wanted;
3276 decode(mds_wanted, blp);
3277 mds_wanted.erase(mds->get_nodeid());
3278 in->set_mds_caps_wanted(mds_wanted);
3279 }
3280 if (!cap_map.empty() ||
3281 (auth_cap && (in->get_caps_wanted() & ~CEPH_CAP_PIN))) {
3282 peer_exports[in].swap(cap_map);
3283 in->get(CInode::PIN_IMPORTINGCAPS);
3284 }
3285 DECODE_FINISH(blp);
3286 }
3287
3288 void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
3289 const map<client_t,pair<Session*,uint64_t> >& session_map,
3290 const map<client_t,Capability::Export> &export_map,
3291 map<client_t,Capability::Import> &import_map)
3292 {
3293 const auto& client_ranges = in->get_projected_inode()->client_ranges;
3294 auto r = client_ranges.cbegin();
3295 bool needs_recover = false;
3296
3297 for (auto& it : export_map) {
3298 dout(10) << "for client." << it.first << " on " << *in << dendl;
3299
3300 auto p = session_map.find(it.first);
3301 if (p == session_map.end()) {
3302 dout(10) << " no session for client." << it.first << dendl;
3303 (void)import_map[it.first];
3304 continue;
3305 }
3306
3307 Session *session = p->second.first;
3308
3309 Capability *cap = in->get_client_cap(it.first);
3310 if (!cap) {
3311 cap = in->add_client_cap(it.first, session);
3312 if (peer < 0)
3313 cap->mark_importing();
3314 }
3315
3316 if (auth_cap) {
3317 while (r != client_ranges.cend() && r->first < it.first) {
3318 needs_recover = true;
3319 ++r;
3320 }
3321 if (r != client_ranges.cend() && r->first == it.first) {
3322 cap->mark_clientwriteable();
3323 ++r;
3324 }
3325 }
3326
3327 // Always ask exporter mds to send cap export messages for auth caps.
3328 // For non-auth caps, ask exporter mds to send cap export messages to
3329 // clients who haven't opened sessions. The cap export messages will
3330 // make clients open sessions.
3331 if (auth_cap || !session->get_connection()) {
3332 Capability::Import& im = import_map[it.first];
3333 im.cap_id = cap->get_cap_id();
3334 im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
3335 im.issue_seq = cap->get_last_seq() + 1;
3336 }
3337
3338 if (peer >= 0) {
3339 cap->merge(it.second, auth_cap);
3340 mdcache->do_cap_import(session, in, cap, it.second.cap_id,
3341 it.second.seq, it.second.mseq - 1, peer,
3342 auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
3343 }
3344 }
3345
3346 if (auth_cap) {
3347 if (r != client_ranges.cend())
3348 needs_recover = true;
3349 if (needs_recover)
3350 in->state_set(CInode::STATE_NEEDSRECOVER);
3351 }
3352
3353 if (peer >= 0) {
3354 in->replica_caps_wanted = 0;
3355 in->put(CInode::PIN_IMPORTINGCAPS);
3356 }
3357 }
3358
3359 void Migrator::decode_import_dir(bufferlist::const_iterator& blp,
3360 mds_rank_t oldauth,
3361 CDir *import_root,
3362 EImportStart *le,
3363 LogSegment *ls,
3364 map<CInode*,map<client_t,Capability::Export> >& peer_exports,
3365 list<ScatterLock*>& updated_scatterlocks, int &num_imported)
3366 {
3367 DECODE_START(1, blp);
3368 // set up dir
3369 dirfrag_t df;
3370 decode(df, blp);
3371
3372 CInode *diri = mdcache->get_inode(df.ino);
3373 ceph_assert(diri);
3374 CDir *dir = diri->get_or_open_dirfrag(mds->mdcache, df.frag);
3375 ceph_assert(dir);
3376
3377 dout(7) << *dir << dendl;
3378
3379 if (!dir->freeze_tree_state) {
3380 ceph_assert(dir->get_version() == 0);
3381 dir->freeze_tree_state = import_root->freeze_tree_state;
3382 }
3383
3384 // assimilate state
3385 dir->decode_import(blp, ls);
3386
3387 // adjust replica list
3388 //assert(!dir->is_replica(oldauth)); // not true on failed export
3389 dir->add_replica(oldauth, CDir::EXPORT_NONCE);
3390 if (dir->is_replica(mds->get_nodeid()))
3391 dir->remove_replica(mds->get_nodeid());
3392
3393 // add to journal entry
3394 if (le)
3395 le->metablob.add_import_dir(dir);
3396
3397 int num_imported = 0;
3398
3399 // take all waiters on this dir
3400 // NOTE: a pass of imported data is guaranteed to get all of my waiters because
3401 // a replica's presense in my cache implies/forces it's presense in authority's.
3402 MDSContext::vec waiters;
3403 dir->take_waiting(CDir::WAIT_ANY_MASK, waiters);
3404 for (auto c : waiters)
3405 dir->add_waiter(CDir::WAIT_UNFREEZE, c); // UNFREEZE will get kicked both on success or failure
3406
3407 dout(15) << "doing contents" << dendl;
3408
3409 // contents
3410 __u32 nden;
3411 decode(nden, blp);
3412
3413 for (; nden>0; nden--) {
3414 num_imported++;
3415
3416 // dentry
3417 string dname;
3418 snapid_t last;
3419 decode(dname, blp);
3420 decode(last, blp);
3421
3422 CDentry *dn = dir->lookup_exact_snap(dname, last);
3423 if (!dn)
3424 dn = dir->add_null_dentry(dname, 1, last);
3425
3426 dn->decode_import(blp, ls);
3427
3428 dn->add_replica(oldauth, CDentry::EXPORT_NONCE);
3429 if (dn->is_replica(mds->get_nodeid()))
3430 dn->remove_replica(mds->get_nodeid());
3431
3432 // dentry lock in unreadable state can block path traverse
3433 if (dn->lock.get_state() != LOCK_SYNC)
3434 mds->locker->try_eval(&dn->lock, NULL);
3435
3436 dout(15) << " got " << *dn << dendl;
3437
3438 // points to...
3439 char icode;
3440 decode(icode, blp);
3441
3442 if (icode == 'N') {
3443 // null dentry
3444 ceph_assert(dn->get_linkage()->is_null());
3445
3446 // fall thru
3447 }
3448 else if (icode == 'L' || icode == 'l') {
3449 // remote link
3450 inodeno_t ino;
3451 unsigned char d_type;
3452 mempool::mds_co::string alternate_name;
3453
3454 CDentry::decode_remote(icode, ino, d_type, alternate_name, blp);
3455
3456 if (dn->get_linkage()->is_remote()) {
3457 ceph_assert(dn->get_linkage()->get_remote_ino() == ino);
3458 ceph_assert(dn->get_alternate_name() == alternate_name);
3459 } else {
3460 dir->link_remote_inode(dn, ino, d_type);
3461 dn->set_alternate_name(std::move(alternate_name));
3462 }
3463 }
3464 else if (icode == 'I' || icode == 'i') {
3465 // inode
3466 ceph_assert(le);
3467 if (icode == 'i') {
3468 DECODE_START(2, blp);
3469 decode_import_inode(dn, blp, oldauth, ls,
3470 peer_exports, updated_scatterlocks);
3471 ceph_assert(!dn->is_projected());
3472 decode(dn->alternate_name, blp);
3473 DECODE_FINISH(blp);
3474 } else {
3475 decode_import_inode(dn, blp, oldauth, ls,
3476 peer_exports, updated_scatterlocks);
3477 }
3478 }
3479
3480 // add dentry to journal entry
3481 if (le)
3482 le->metablob.add_import_dentry(dn);
3483 }
3484
3485 #ifdef MDS_VERIFY_FRAGSTAT
3486 if (dir->is_complete())
3487 dir->verify_fragstat();
3488 #endif
3489
3490 dir->inode->maybe_export_pin();
3491
3492 dout(7) << " done " << *dir << dendl;
3493 DECODE_FINISH(blp);
3494 }
3495
3496
3497
3498
3499
3500 // authority bystander
3501
3502 void Migrator::handle_export_notify(const cref_t<MExportDirNotify> &m)
3503 {
3504 if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) {
3505 return;
3506 }
3507
3508 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
3509
3510 mds_rank_t from = mds_rank_t(m->get_source().num());
3511 mds_authority_t old_auth = m->get_old_auth();
3512 mds_authority_t new_auth = m->get_new_auth();
3513
3514 if (!dir) {
3515 dout(7) << old_auth << " -> " << new_auth
3516 << " on missing dir " << m->get_dirfrag() << dendl;
3517 } else if (dir->authority() != old_auth) {
3518 dout(7) << "old_auth was " << dir->authority()
3519 << " != " << old_auth << " -> " << new_auth
3520 << " on " << *dir << dendl;
3521 } else {
3522 dout(7) << old_auth << " -> " << new_auth
3523 << " on " << *dir << dendl;
3524 // adjust auth
3525 set<CDir*> have;
3526 mdcache->map_dirfrag_set(m->get_bounds(), have);
3527 mdcache->adjust_bounded_subtree_auth(dir, have, new_auth);
3528
3529 // induce a merge?
3530 mdcache->try_subtree_merge(dir);
3531 }
3532
3533 // send ack
3534 if (m->wants_ack()) {
3535 mds->send_message_mds(make_message<MExportDirNotifyAck>(m->get_dirfrag(), m->get_tid(), m->get_new_auth()), from);
3536 } else {
3537 // aborted. no ack.
3538 dout(7) << "no ack requested" << dendl;
3539 }
3540 }
3541
3542 /** cap exports **/
3543 void Migrator::export_caps(CInode *in)
3544 {
3545 mds_rank_t dest = in->authority().first;
3546 dout(7) << "to mds." << dest << " " << *in << dendl;
3547
3548 ceph_assert(in->is_any_caps());
3549 ceph_assert(!in->is_auth());
3550 ceph_assert(!in->is_ambiguous_auth());
3551 ceph_assert(!in->state_test(CInode::STATE_EXPORTINGCAPS));
3552
3553 auto ex = make_message<MExportCaps>();
3554 ex->ino = in->ino();
3555
3556 encode_export_inode_caps(in, false, ex->cap_bl, ex->client_map, ex->client_metadata_map);
3557
3558 mds->send_message_mds(ex, dest);
3559 }
3560
3561 void Migrator::handle_export_caps_ack(const cref_t<MExportCapsAck> &ack)
3562 {
3563 mds_rank_t from = ack->get_source().num();
3564 CInode *in = mdcache->get_inode(ack->ino);
3565 if (in) {
3566 ceph_assert(!in->is_auth());
3567
3568 dout(10) << *ack << " from "
3569 << ack->get_source() << " on " << *in << dendl;
3570
3571 map<client_t,Capability::Import> imported_caps;
3572 map<client_t,uint64_t> caps_ids;
3573 auto blp = ack->cap_bl.cbegin();
3574 decode(imported_caps, blp);
3575 decode(caps_ids, blp);
3576
3577 for (auto& it : imported_caps) {
3578 Capability *cap = in->get_client_cap(it.first);
3579 if (!cap || cap->get_cap_id() != caps_ids.at(it.first))
3580 continue;
3581
3582 dout(7) << " telling client." << it.first
3583 << " exported caps on " << *in << dendl;
3584 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
3585 cap->get_cap_id(), cap->get_mseq(),
3586 mds->get_osd_epoch_barrier());
3587 m->set_cap_peer(it.second.cap_id, it.second.issue_seq, it.second.mseq, from, 0);
3588 mds->send_message_client_counted(m, it.first);
3589
3590 in->remove_client_cap(it.first);
3591 }
3592
3593 mds->locker->request_inode_file_caps(in);
3594 mds->locker->try_eval(in, CEPH_CAP_LOCKS);
3595 }
3596 }
3597
3598 void Migrator::handle_gather_caps(const cref_t<MGatherCaps> &m)
3599 {
3600 CInode *in = mdcache->get_inode(m->ino);
3601 if (!in)
3602 return;
3603
3604 dout(10) << *m << " from " << m->get_source()
3605 << " on " << *in << dendl;
3606
3607 if (in->is_any_caps() &&
3608 !in->is_auth() &&
3609 !in->is_ambiguous_auth() &&
3610 !in->state_test(CInode::STATE_EXPORTINGCAPS))
3611 export_caps(in);
3612 }
3613
3614 class C_M_LoggedImportCaps : public MigratorLogContext {
3615 CInode *in;
3616 mds_rank_t from;
3617 public:
3618 map<client_t,pair<Session*,uint64_t> > imported_session_map;
3619 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3620
3621 C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
3622 void finish(int r) override {
3623 mig->logged_import_caps(in, from, imported_session_map, peer_exports);
3624 }
3625 };
3626
3627 void Migrator::handle_export_caps(const cref_t<MExportCaps> &ex)
3628 {
3629 dout(10) << *ex << " from " << ex->get_source() << dendl;
3630 CInode *in = mdcache->get_inode(ex->ino);
3631
3632 ceph_assert(in);
3633 ceph_assert(in->is_auth());
3634
3635 // FIXME
3636 if (!in->can_auth_pin()) {
3637 return;
3638 }
3639
3640 in->auth_pin(this);
3641
3642 map<client_t,entity_inst_t> client_map{ex->client_map};
3643 map<client_t,client_metadata_t> client_metadata_map{ex->client_metadata_map};
3644
3645 C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
3646 this, in, mds_rank_t(ex->get_source().num()));
3647
3648 version_t pv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
3649 finish->imported_session_map);
3650 // decode new caps
3651 auto blp = ex->cap_bl.cbegin();
3652 decode_import_inode_caps(in, false, blp, finish->peer_exports);
3653 ceph_assert(!finish->peer_exports.empty()); // thus, inode is pinned.
3654
3655 // journal open client sessions
3656 ESessions *le = new ESessions(pv, std::move(client_map),
3657 std::move(client_metadata_map));
3658 mds->mdlog->start_submit_entry(le, finish);
3659 mds->mdlog->flush();
3660 }
3661
3662
3663 void Migrator::logged_import_caps(CInode *in,
3664 mds_rank_t from,
3665 map<client_t,pair<Session*,uint64_t> >& imported_session_map,
3666 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
3667 {
3668 dout(10) << *in << dendl;
3669 // see export_go() vs export_go_synced()
3670 ceph_assert(in->is_auth());
3671
3672 // force open client sessions and finish cap import
3673 mds->server->finish_force_open_sessions(imported_session_map);
3674
3675 auto it = peer_exports.find(in);
3676 ceph_assert(it != peer_exports.end());
3677
3678 // clients will release caps from the exporter when they receive the cap import message.
3679 map<client_t,Capability::Import> imported_caps;
3680 finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
3681 mds->locker->eval(in, CEPH_CAP_LOCKS, true);
3682
3683 if (!imported_caps.empty()) {
3684 auto ack = make_message<MExportCapsAck>(in->ino());
3685 map<client_t,uint64_t> peer_caps_ids;
3686 for (auto &p : imported_caps )
3687 peer_caps_ids[p.first] = it->second.at(p.first).cap_id;
3688
3689 encode(imported_caps, ack->cap_bl);
3690 encode(peer_caps_ids, ack->cap_bl);
3691 mds->send_message_mds(ack, from);
3692 }
3693
3694 in->auth_unpin(this);
3695 }
3696
3697 Migrator::Migrator(MDSRank *m, MDCache *c) : mds(m), mdcache(c) {
3698 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
3699 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
3700 }
3701
3702 void Migrator::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
3703 {
3704 if (changed.count("mds_max_export_size"))
3705 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
3706 if (changed.count("mds_inject_migrator_session_race")) {
3707 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
3708 dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
3709 }
3710 }