]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Migrator.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / mds / Migrator.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSRank.h"
16 #include "MDCache.h"
17 #include "CInode.h"
18 #include "CDir.h"
19 #include "CDentry.h"
20 #include "Migrator.h"
21 #include "Locker.h"
22 #include "Server.h"
23
24 #include "MDBalancer.h"
25 #include "MDLog.h"
26 #include "MDSMap.h"
27 #include "Mutation.h"
28
29 #include "include/filepath.h"
30 #include "common/likely.h"
31
32 #include "events/EExport.h"
33 #include "events/EImportStart.h"
34 #include "events/EImportFinish.h"
35 #include "events/ESessions.h"
36
37 #include "msg/Messenger.h"
38
39 #include "messages/MClientCaps.h"
40
41 /*
42 * this is what the dir->dir_auth values look like
43 *
44 * dir_auth authbits
45 * export
46 * me me - before
47 * me, me me - still me, but preparing for export
48 * me, them me - send MExportDir (peer is preparing)
49 * them, me me - journaled EExport
50 * them them - done
51 *
52 * import:
53 * them them - before
54 * me, them me - journaled EImportStart
55 * me me - done
56 *
57 * which implies:
58 * - auth bit is set if i am listed as first _or_ second dir_auth.
59 */
60
61 #include "common/config.h"
62
63
64 #define dout_context g_ceph_context
65 #define dout_subsys ceph_subsys_mds
66 #undef dout_prefix
67 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".mig " << __func__ << " "
68
69 using namespace std;
70
71 class MigratorContext : public MDSContext {
72 protected:
73 Migrator *mig;
74 MDSRank *get_mds() override {
75 return mig->mds;
76 }
77 public:
78 explicit MigratorContext(Migrator *mig_) : mig(mig_) {
79 ceph_assert(mig != NULL);
80 }
81 };
82
83 class MigratorLogContext : public MDSLogContextBase {
84 protected:
85 Migrator *mig;
86 MDSRank *get_mds() override {
87 return mig->mds;
88 }
89 public:
90 explicit MigratorLogContext(Migrator *mig_) : mig(mig_) {
91 ceph_assert(mig != NULL);
92 }
93 };
94
95 void Migrator::dispatch(const cref_t<Message> &m)
96 {
97 switch (m->get_type()) {
98 // import
99 case MSG_MDS_EXPORTDIRDISCOVER:
100 handle_export_discover(ref_cast<MExportDirDiscover>(m));
101 break;
102 case MSG_MDS_EXPORTDIRPREP:
103 handle_export_prep(ref_cast<MExportDirPrep>(m));
104 break;
105 case MSG_MDS_EXPORTDIR:
106 if (unlikely(inject_session_race)) {
107 dout(0) << "waiting for inject_session_race" << dendl;
108 mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m));
109 } else {
110 handle_export_dir(ref_cast<MExportDir>(m));
111 }
112 break;
113 case MSG_MDS_EXPORTDIRFINISH:
114 handle_export_finish(ref_cast<MExportDirFinish>(m));
115 break;
116 case MSG_MDS_EXPORTDIRCANCEL:
117 handle_export_cancel(ref_cast<MExportDirCancel>(m));
118 break;
119
120 // export
121 case MSG_MDS_EXPORTDIRDISCOVERACK:
122 handle_export_discover_ack(ref_cast<MExportDirDiscoverAck>(m));
123 break;
124 case MSG_MDS_EXPORTDIRPREPACK:
125 handle_export_prep_ack(ref_cast<MExportDirPrepAck>(m));
126 break;
127 case MSG_MDS_EXPORTDIRACK:
128 handle_export_ack(ref_cast<MExportDirAck>(m));
129 break;
130 case MSG_MDS_EXPORTDIRNOTIFYACK:
131 handle_export_notify_ack(ref_cast<MExportDirNotifyAck>(m));
132 break;
133
134 // export 3rd party (dir_auth adjustments)
135 case MSG_MDS_EXPORTDIRNOTIFY:
136 handle_export_notify(ref_cast<MExportDirNotify>(m));
137 break;
138
139 // caps
140 case MSG_MDS_EXPORTCAPS:
141 handle_export_caps(ref_cast<MExportCaps>(m));
142 break;
143 case MSG_MDS_EXPORTCAPSACK:
144 handle_export_caps_ack(ref_cast<MExportCapsAck>(m));
145 break;
146 case MSG_MDS_GATHERCAPS:
147 handle_gather_caps(ref_cast<MGatherCaps>(m));
148 break;
149
150 default:
151 derr << "migrator unknown message " << m->get_type() << dendl;
152 ceph_abort_msg("migrator unknown message");
153 }
154 }
155
156 void Migrator::export_empty_import(CDir *dir)
157 {
158 dout(7) << *dir << dendl;
159 ceph_assert(dir->is_subtree_root());
160
161 if (dir->inode->is_auth()) {
162 dout(7) << " inode is auth" << dendl;
163 return;
164 }
165 if (!dir->is_auth()) {
166 dout(7) << " not auth" << dendl;
167 return;
168 }
169 if (dir->is_freezing() || dir->is_frozen()) {
170 dout(7) << " freezing or frozen" << dendl;
171 return;
172 }
173 if (dir->get_num_head_items() > 0) {
174 dout(7) << " not actually empty" << dendl;
175 return;
176 }
177 if (dir->inode->is_root()) {
178 dout(7) << " root" << dendl;
179 return;
180 }
181
182 mds_rank_t dest = dir->inode->authority().first;
183 //if (mds->is_shutting_down()) dest = 0; // this is more efficient.
184
185 dout(7) << " really empty, exporting to " << dest << dendl;
186 assert (dest != mds->get_nodeid());
187
188 dout(7) << "exporting to mds." << dest
189 << " empty import " << *dir << dendl;
190 export_dir( dir, dest );
191 }
192
193 void Migrator::find_stale_export_freeze()
194 {
195 utime_t now = ceph_clock_now();
196 utime_t cutoff = now;
197 cutoff -= g_conf()->mds_freeze_tree_timeout;
198
199
200 /*
201 * We could have situations like:
202 *
203 * - mds.0 authpins an item in subtree A
204 * - mds.0 sends request to mds.1 to authpin an item in subtree B
205 * - mds.0 freezes subtree A
206 * - mds.1 authpins an item in subtree B
207 * - mds.1 sends request to mds.0 to authpin an item in subtree A
208 * - mds.1 freezes subtree B
209 * - mds.1 receives the remote authpin request from mds.0
210 * (wait because subtree B is freezing)
211 * - mds.0 receives the remote authpin request from mds.1
212 * (wait because subtree A is freezing)
213 *
214 *
215 * - client request authpins items in subtree B
216 * - freeze subtree B
217 * - import subtree A which is parent of subtree B
218 * (authpins parent inode of subtree B, see CDir::set_dir_auth())
219 * - freeze subtree A
220 * - client request tries authpinning items in subtree A
221 * (wait because subtree A is freezing)
222 */
223 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
224 p != export_state.end(); ) {
225 CDir* dir = p->first;
226 export_state_t& stat = p->second;
227 ++p;
228 if (stat.state != EXPORT_DISCOVERING && stat.state != EXPORT_FREEZING)
229 continue;
230 ceph_assert(dir->freeze_tree_state);
231 if (stat.last_cum_auth_pins != dir->freeze_tree_state->auth_pins) {
232 stat.last_cum_auth_pins = dir->freeze_tree_state->auth_pins;
233 stat.last_cum_auth_pins_change = now;
234 continue;
235 }
236 if (stat.last_cum_auth_pins_change >= cutoff)
237 continue;
238 if (stat.num_remote_waiters > 0 ||
239 (!dir->inode->is_root() && dir->get_parent_dir()->is_freezing())) {
240 export_try_cancel(dir);
241 }
242 }
243 }
244
245 void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
246 {
247 dout(10) << *dir << dendl;
248
249 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
250 ceph_assert(it != export_state.end());
251
252 int state = it->second.state;
253 switch (state) {
254 case EXPORT_LOCKING:
255 dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
256 num_locking_exports--;
257 it->second.state = EXPORT_CANCELLED;
258 dir->auth_unpin(this);
259 break;
260 case EXPORT_DISCOVERING:
261 dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
262 it->second.state = EXPORT_CANCELLED;
263 dir->unfreeze_tree(); // cancel the freeze
264 dir->auth_unpin(this);
265 if (notify_peer &&
266 (!mds->is_cluster_degraded() ||
267 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
268 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
269 it->second.tid),
270 it->second.peer);
271 break;
272
273 case EXPORT_FREEZING:
274 dout(10) << "export state=freezing : canceling freeze" << dendl;
275 it->second.state = EXPORT_CANCELLED;
276 dir->unfreeze_tree(); // cancel the freeze
277 if (dir->is_subtree_root())
278 mdcache->try_subtree_merge(dir);
279 if (notify_peer &&
280 (!mds->is_cluster_degraded() ||
281 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
282 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
283 it->second.tid),
284 it->second.peer);
285 break;
286
287 // NOTE: state order reversal, warning comes after prepping
288 case EXPORT_WARNING:
289 dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
290 it->second.state = EXPORT_CANCELLING;
291 // fall-thru
292
293 case EXPORT_PREPPING:
294 if (state != EXPORT_WARNING) {
295 dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
296 it->second.state = EXPORT_CANCELLED;
297 }
298
299 {
300 // unpin bounds
301 set<CDir*> bounds;
302 mdcache->get_subtree_bounds(dir, bounds);
303 for (set<CDir*>::iterator q = bounds.begin();
304 q != bounds.end();
305 ++q) {
306 CDir *bd = *q;
307 bd->put(CDir::PIN_EXPORTBOUND);
308 bd->state_clear(CDir::STATE_EXPORTBOUND);
309 }
310 if (state == EXPORT_WARNING) {
311 // notify bystanders
312 export_notify_abort(dir, it->second, bounds);
313 // process delayed expires
314 mdcache->process_delayed_expire(dir);
315 }
316 }
317 dir->unfreeze_tree();
318 mdcache->try_subtree_merge(dir);
319 if (notify_peer &&
320 (!mds->is_cluster_degraded() ||
321 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
322 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
323 it->second.tid),
324 it->second.peer);
325 break;
326
327 case EXPORT_EXPORTING:
328 dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
329 it->second.state = EXPORT_CANCELLING;
330 export_reverse(dir, it->second);
331 break;
332
333 case EXPORT_LOGGINGFINISH:
334 case EXPORT_NOTIFYING:
335 dout(10) << "export state=loggingfinish|notifying : ignoring dest failure, we were successful." << dendl;
336 // leave export_state, don't clean up now.
337 break;
338 case EXPORT_CANCELLING:
339 break;
340
341 default:
342 ceph_abort();
343 }
344
345 // finish clean-up?
346 if (it->second.state == EXPORT_CANCELLING ||
347 it->second.state == EXPORT_CANCELLED) {
348 MutationRef mut;
349 mut.swap(it->second.mut);
350
351 if (it->second.state == EXPORT_CANCELLED) {
352 export_cancel_finish(it);
353 }
354
355 // drop locks
356 if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
357 MDRequestRef mdr = static_cast<MDRequestImpl*>(mut.get());
358 ceph_assert(mdr);
359 mdcache->request_kill(mdr);
360 } else if (mut) {
361 mds->locker->drop_locks(mut.get());
362 mut->cleanup();
363 }
364
365 mdcache->show_subtrees();
366
367 maybe_do_queued_export();
368 }
369 }
370
371 void Migrator::export_cancel_finish(export_state_iterator& it)
372 {
373 CDir *dir = it->first;
374 bool unpin = (it->second.state == EXPORT_CANCELLING);
375 auto parent = std::move(it->second.parent);
376
377 total_exporting_size -= it->second.approx_size;
378 export_state.erase(it);
379
380 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
381 dir->clear_exporting();
382
383 if (unpin) {
384 // pinned by Migrator::export_notify_abort()
385 dir->auth_unpin(this);
386 }
387 // send pending import_maps? (these need to go out when all exports have finished.)
388 mdcache->maybe_send_pending_resolves();
389
390 if (parent)
391 child_export_finish(parent, false);
392 }
393
394 // ==========================================================
395 // mds failure handling
396
397 void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
398 {
399 dout(5) << who << dendl;
400
401 // check my exports
402
403 // first add an extra auth_pin on any freezes, so that canceling a
404 // nested freeze doesn't complete one further up the hierarchy and
405 // confuse the shit out of us. we'll remove it after canceling the
406 // freeze. this way no freeze completions run before we want them
407 // to.
408 std::vector<CDir*> pinned_dirs;
409 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
410 p != export_state.end();
411 ++p) {
412 if (p->second.state == EXPORT_FREEZING) {
413 CDir *dir = p->first;
414 dout(10) << "adding temp auth_pin on freezing " << *dir << dendl;
415 dir->auth_pin(this);
416 pinned_dirs.push_back(dir);
417 }
418 }
419
420 map<CDir*,export_state_t>::iterator p = export_state.begin();
421 while (p != export_state.end()) {
422 map<CDir*,export_state_t>::iterator next = p;
423 ++next;
424 CDir *dir = p->first;
425
426 // abort exports:
427 // - that are going to the failed node
428 // - that aren't frozen yet (to avoid auth_pin deadlock)
429 // - they havne't prepped yet (they may need to discover bounds to do that)
430 if ((p->second.peer == who &&
431 p->second.state != EXPORT_CANCELLING) ||
432 p->second.state == EXPORT_LOCKING ||
433 p->second.state == EXPORT_DISCOVERING ||
434 p->second.state == EXPORT_FREEZING ||
435 p->second.state == EXPORT_PREPPING) {
436 // the guy i'm exporting to failed, or we're just freezing.
437 dout(10) << "cleaning up export state (" << p->second.state << ")"
438 << get_export_statename(p->second.state) << " of " << *dir << dendl;
439 export_try_cancel(dir);
440 } else if (p->second.peer != who) {
441 // bystander failed.
442 if (p->second.warning_ack_waiting.erase(who)) {
443 if (p->second.state == EXPORT_WARNING) {
444 p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
445 // exporter waiting for warning acks, let's fake theirs.
446 dout(10) << "faking export_warning_ack from mds." << who
447 << " on " << *dir << " to mds." << p->second.peer
448 << dendl;
449 if (p->second.warning_ack_waiting.empty())
450 export_go(dir);
451 }
452 }
453 if (p->second.notify_ack_waiting.erase(who)) {
454 // exporter is waiting for notify acks, fake it
455 dout(10) << "faking export_notify_ack from mds." << who
456 << " on " << *dir << " to mds." << p->second.peer
457 << dendl;
458 if (p->second.state == EXPORT_NOTIFYING) {
459 if (p->second.notify_ack_waiting.empty())
460 export_finish(dir);
461 } else if (p->second.state == EXPORT_CANCELLING) {
462 if (p->second.notify_ack_waiting.empty()) {
463 export_cancel_finish(p);
464 }
465 }
466 }
467 }
468
469 // next!
470 p = next;
471 }
472
473
474 // check my imports
475 map<dirfrag_t,import_state_t>::iterator q = import_state.begin();
476 while (q != import_state.end()) {
477 map<dirfrag_t,import_state_t>::iterator next = q;
478 ++next;
479 dirfrag_t df = q->first;
480 CInode *diri = mdcache->get_inode(df.ino);
481 CDir *dir = mdcache->get_dirfrag(df);
482
483 if (q->second.peer == who) {
484 if (dir)
485 dout(10) << "cleaning up import state (" << q->second.state << ")"
486 << get_import_statename(q->second.state) << " of " << *dir << dendl;
487 else
488 dout(10) << "cleaning up import state (" << q->second.state << ")"
489 << get_import_statename(q->second.state) << " of " << df << dendl;
490
491 switch (q->second.state) {
492 case IMPORT_DISCOVERING:
493 dout(10) << "import state=discovering : clearing state" << dendl;
494 import_reverse_discovering(df);
495 break;
496
497 case IMPORT_DISCOVERED:
498 ceph_assert(diri);
499 dout(10) << "import state=discovered : unpinning inode " << *diri << dendl;
500 import_reverse_discovered(df, diri);
501 break;
502
503 case IMPORT_PREPPING:
504 ceph_assert(dir);
505 dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
506 import_reverse_prepping(dir, q->second);
507 break;
508
509 case IMPORT_PREPPED:
510 ceph_assert(dir);
511 dout(10) << "import state=prepped : unpinning base+bounds, unfreezing " << *dir << dendl;
512 {
513 set<CDir*> bounds;
514 mdcache->get_subtree_bounds(dir, bounds);
515 import_remove_pins(dir, bounds);
516
517 // adjust auth back to the exporter
518 mdcache->adjust_subtree_auth(dir, q->second.peer);
519
520 // notify bystanders ; wait in aborting state
521 q->second.state = IMPORT_ABORTING;
522 import_notify_abort(dir, bounds);
523 ceph_assert(g_conf()->mds_kill_import_at != 10);
524 }
525 break;
526
527 case IMPORT_LOGGINGSTART:
528 ceph_assert(dir);
529 dout(10) << "import state=loggingstart : reversing import on " << *dir << dendl;
530 import_reverse(dir);
531 break;
532
533 case IMPORT_ACKING:
534 ceph_assert(dir);
535 // hrm. make this an ambiguous import, and wait for exporter recovery to disambiguate
536 dout(10) << "import state=acking : noting ambiguous import " << *dir << dendl;
537 {
538 set<CDir*> bounds;
539 mdcache->get_subtree_bounds(dir, bounds);
540 mdcache->add_ambiguous_import(dir, bounds);
541 }
542 break;
543
544 case IMPORT_FINISHING:
545 ceph_assert(dir);
546 dout(10) << "import state=finishing : finishing import on " << *dir << dendl;
547 import_finish(dir, true);
548 break;
549
550 case IMPORT_ABORTING:
551 ceph_assert(dir);
552 dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl;
553 break;
554 }
555 } else {
556 auto bystanders_entry = q->second.bystanders.find(who);
557 if (bystanders_entry != q->second.bystanders.end()) {
558 q->second.bystanders.erase(bystanders_entry);
559 if (q->second.state == IMPORT_ABORTING) {
560 ceph_assert(dir);
561 dout(10) << "faking export_notify_ack from mds." << who
562 << " on aborting import " << *dir << " from mds." << q->second.peer
563 << dendl;
564 if (q->second.bystanders.empty())
565 import_reverse_unfreeze(dir);
566 }
567 }
568 }
569
570 // next!
571 q = next;
572 }
573
574 for (const auto& dir : pinned_dirs) {
575 dout(10) << "removing temp auth_pin on " << *dir << dendl;
576 dir->auth_unpin(this);
577 }
578 }
579
580
581
582 void Migrator::show_importing()
583 {
584 dout(10) << dendl;
585 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
586 p != import_state.end();
587 ++p) {
588 CDir *dir = mdcache->get_dirfrag(p->first);
589 if (dir) {
590 dout(10) << " importing from " << p->second.peer
591 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
592 << " " << p->first << " " << *dir << dendl;
593 } else {
594 dout(10) << " importing from " << p->second.peer
595 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
596 << " " << p->first << dendl;
597 }
598 }
599 }
600
601 void Migrator::show_exporting()
602 {
603 dout(10) << dendl;
604 for (const auto& [dir, state] : export_state) {
605 dout(10) << " exporting to " << state.peer
606 << ": (" << state.state << ") " << get_export_statename(state.state)
607 << " " << dir->dirfrag() << " " << *dir << dendl;
608 }
609 }
610
611
612
613 void Migrator::audit()
614 {
615 if (!g_conf()->subsys.should_gather<ceph_subsys_mds, 5>())
616 return; // hrm.
617
618 // import_state
619 show_importing();
620 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
621 p != import_state.end();
622 ++p) {
623 if (p->second.state == IMPORT_DISCOVERING)
624 continue;
625 if (p->second.state == IMPORT_DISCOVERED) {
626 CInode *in = mdcache->get_inode(p->first.ino);
627 ceph_assert(in);
628 continue;
629 }
630 CDir *dir = mdcache->get_dirfrag(p->first);
631 ceph_assert(dir);
632 if (p->second.state == IMPORT_PREPPING)
633 continue;
634 if (p->second.state == IMPORT_ABORTING) {
635 ceph_assert(!dir->is_ambiguous_dir_auth());
636 ceph_assert(dir->get_dir_auth().first != mds->get_nodeid());
637 continue;
638 }
639 ceph_assert(dir->is_ambiguous_dir_auth());
640 ceph_assert(dir->authority().first == mds->get_nodeid() ||
641 dir->authority().second == mds->get_nodeid());
642 }
643
644 // export_state
645 show_exporting();
646 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
647 p != export_state.end();
648 ++p) {
649 CDir *dir = p->first;
650 if (p->second.state == EXPORT_LOCKING ||
651 p->second.state == EXPORT_DISCOVERING ||
652 p->second.state == EXPORT_FREEZING ||
653 p->second.state == EXPORT_CANCELLING)
654 continue;
655 ceph_assert(dir->is_ambiguous_dir_auth());
656 ceph_assert(dir->authority().first == mds->get_nodeid() ||
657 dir->authority().second == mds->get_nodeid());
658 }
659
660 // ambiguous+me subtrees should be importing|exporting
661
662 // write me
663 }
664
665
666
667
668
669 // ==========================================================
670 // EXPORT
671
672 void Migrator::export_dir_nicely(CDir *dir, mds_rank_t dest)
673 {
674 // enqueue
675 dout(7) << *dir << " to " << dest << dendl;
676 export_queue.push_back(pair<dirfrag_t,mds_rank_t>(dir->dirfrag(), dest));
677
678 maybe_do_queued_export();
679 }
680
681 void Migrator::maybe_do_queued_export()
682 {
683 static bool running;
684 if (running)
685 return;
686 running = true;
687
688 uint64_t max_total_size = max_export_size * 2;
689
690 while (!export_queue.empty() &&
691 max_total_size > total_exporting_size &&
692 max_total_size - total_exporting_size >=
693 max_export_size * (num_locking_exports + 1)) {
694
695 dirfrag_t df = export_queue.front().first;
696 mds_rank_t dest = export_queue.front().second;
697 export_queue.pop_front();
698
699 CDir *dir = mdcache->get_dirfrag(df);
700 if (!dir) continue;
701 if (!dir->is_auth()) continue;
702
703 dout(7) << "nicely exporting to mds." << dest << " " << *dir << dendl;
704
705 export_dir(dir, dest);
706 }
707
708 running = false;
709 }
710
711
712
713
714 class C_MDC_ExportFreeze : public MigratorContext {
715 CDir *dir; // dir i'm exporting
716 uint64_t tid;
717 public:
718 C_MDC_ExportFreeze(Migrator *m, CDir *e, uint64_t t) :
719 MigratorContext(m), dir(e), tid(t) {
720 dir->get(CDir::PIN_PTRWAITER);
721 }
722 void finish(int r) override {
723 if (r >= 0)
724 mig->export_frozen(dir, tid);
725 dir->put(CDir::PIN_PTRWAITER);
726 }
727 };
728
729
730 bool Migrator::export_try_grab_locks(CDir *dir, MutationRef& mut)
731 {
732 CInode *diri = dir->get_inode();
733
734 if (!diri->filelock.can_wrlock(diri->get_loner()) ||
735 !diri->nestlock.can_wrlock(diri->get_loner()))
736 return false;
737
738 MutationImpl::LockOpVec lov;
739
740 set<CDir*> wouldbe_bounds;
741 set<CInode*> bound_inodes;
742 mdcache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
743 for (auto& bound : wouldbe_bounds)
744 bound_inodes.insert(bound->get_inode());
745 for (auto& in : bound_inodes)
746 lov.add_rdlock(&in->dirfragtreelock);
747
748 lov.add_rdlock(&diri->dirfragtreelock);
749
750 CInode* in = diri;
751 while (true) {
752 lov.add_rdlock(&in->snaplock);
753 CDentry* pdn = in->get_projected_parent_dn();
754 if (!pdn)
755 break;
756 in = pdn->get_dir()->get_inode();
757 }
758
759 if (!mds->locker->rdlock_try_set(lov, mut))
760 return false;
761
762 mds->locker->wrlock_force(&diri->filelock, mut);
763 mds->locker->wrlock_force(&diri->nestlock, mut);
764
765 return true;
766 }
767
768
769 /** export_dir(dir, dest)
770 * public method to initiate an export.
771 * will fail if the directory is freezing, frozen, unpinnable, or root.
772 */
773 void Migrator::export_dir(CDir *dir, mds_rank_t dest)
774 {
775 ceph_assert(dir->is_auth());
776 ceph_assert(dest != mds->get_nodeid());
777
778 CDir* parent = dir->inode->get_projected_parent_dir();
779 if (!mds->is_stopping() && !dir->is_exportable(dest) && dir->get_num_head_items() > 0) {
780 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": dir is export pinned" << dendl;
781 return;
782 } else if (!(mds->is_active() || mds->is_stopping())) {
783 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": not active" << dendl;
784 return;
785 } else if (mdcache->is_readonly()) {
786 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": read-only FS, no exports for now" << dendl;
787 return;
788 } else if (!mds->mdsmap->is_active(dest)) {
789 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": destination not active" << dendl;
790 return;
791 } else if (mds->is_cluster_degraded()) {
792 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": cluster degraded" << dendl;
793 return;
794 } else if (dir->inode->is_system()) {
795 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is a system directory" << dendl;
796 return;
797 } else if (dir->is_frozen() || dir->is_freezing()) {
798 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is frozen" << dendl;
799 return;
800 } else if (dir->state_test(CDir::STATE_EXPORTING)) {
801 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": already exporting" << dendl;
802 return;
803 } else if (parent && parent->inode->is_stray()
804 && parent->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
805 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": in stray directory" << dendl;
806 return;
807 }
808
809 if (unlikely(g_conf()->mds_thrash_exports)) {
810 // create random subtree bound (which will not be exported)
811 std::vector<CDir*> ls;
812 for (auto p = dir->begin(); p != dir->end(); ++p) {
813 auto dn = p->second;
814 CDentry::linkage_t *dnl= dn->get_linkage();
815 if (dnl->is_primary()) {
816 CInode *in = dnl->get_inode();
817 if (in->is_dir()) {
818 auto&& dirs = in->get_nested_dirfrags();
819 ls.insert(std::end(ls), std::begin(dirs), std::end(dirs));
820 }
821 }
822 }
823 if (ls.size() > 0) {
824 int n = rand() % ls.size();
825 auto p = ls.begin();
826 while (n--) ++p;
827 CDir *bd = *p;
828 if (!(bd->is_frozen() || bd->is_freezing())) {
829 ceph_assert(bd->is_auth());
830 dir->state_set(CDir::STATE_AUXSUBTREE);
831 mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
832 dout(7) << "create aux subtree " << *bd << " under " << *dir << dendl;
833 }
834 }
835 }
836
837 dout(4) << "Starting export to mds." << dest << " " << *dir << dendl;
838
839 mds->hit_export_target(dest, -1);
840
841 dir->auth_pin(this);
842 dir->mark_exporting();
843
844 MDRequestRef mdr = mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
845 mdr->more()->export_dir = dir;
846 mdr->pin(dir);
847
848 ceph_assert(export_state.count(dir) == 0);
849 export_state_t& stat = export_state[dir];
850 num_locking_exports++;
851 stat.state = EXPORT_LOCKING;
852 stat.peer = dest;
853 stat.tid = mdr->reqid.tid;
854 stat.mut = mdr;
855
856 mdcache->dispatch_request(mdr);
857 }
858
859 /*
860 * check if directory is too large to be export in whole. If it is,
861 * choose some subdirs, whose total size is suitable.
862 */
863 void Migrator::maybe_split_export(CDir* dir, uint64_t max_size, bool null_okay,
864 vector<pair<CDir*, size_t> >& results)
865 {
866 static const unsigned frag_size = 800;
867 static const unsigned inode_size = 1000;
868 static const unsigned cap_size = 80;
869 static const unsigned remote_size = 10;
870 static const unsigned null_size = 1;
871
872 // state for depth-first search
873 struct LevelData {
874 CDir *dir;
875 CDir::dentry_key_map::iterator iter;
876 size_t dirfrag_size = frag_size;
877 size_t subdirs_size = 0;
878 bool complete = true;
879 vector<CDir*> siblings;
880 vector<pair<CDir*, size_t> > subdirs;
881 LevelData(const LevelData&) = default;
882 LevelData(CDir *d) :
883 dir(d), iter(d->begin()) {}
884 };
885
886 vector<LevelData> stack;
887 stack.emplace_back(dir);
888
889 size_t found_size = 0;
890 size_t skipped_size = 0;
891
892 for (;;) {
893 auto& data = stack.back();
894 CDir *cur = data.dir;
895 auto& it = data.iter;
896 auto& dirfrag_size = data.dirfrag_size;
897
898 while(it != cur->end()) {
899 CDentry *dn = it->second;
900 ++it;
901
902 dirfrag_size += dn->name.size();
903 if (dn->get_linkage()->is_null()) {
904 dirfrag_size += null_size;
905 continue;
906 }
907 if (dn->get_linkage()->is_remote()) {
908 dirfrag_size += remote_size;
909 continue;
910 }
911
912 CInode *in = dn->get_linkage()->get_inode();
913 dirfrag_size += inode_size;
914 dirfrag_size += in->get_client_caps().size() * cap_size;
915
916 if (in->is_dir()) {
917 auto ls = in->get_nested_dirfrags();
918 std::reverse(ls.begin(), ls.end());
919
920 bool complete = true;
921 for (auto p = ls.begin(); p != ls.end(); ) {
922 if ((*p)->state_test(CDir::STATE_EXPORTING) ||
923 (*p)->is_freezing_dir() || (*p)->is_frozen_dir()) {
924 complete = false;
925 p = ls.erase(p);
926 } else {
927 ++p;
928 }
929 }
930 if (!complete) {
931 // skip exporting dir's ancestors. because they can't get
932 // frozen (exporting dir's parent inode is auth pinned).
933 for (auto p = stack.rbegin(); p < stack.rend(); ++p) {
934 if (!p->complete)
935 break;
936 p->complete = false;
937 }
938 }
939 if (!ls.empty()) {
940 stack.emplace_back(ls.back());
941 ls.pop_back();
942 stack.back().siblings.swap(ls);
943 break;
944 }
945 }
946 }
947 // did above loop push new dirfrag into the stack?
948 if (stack.back().dir != cur)
949 continue;
950
951 if (data.complete) {
952 auto cur_size = data.subdirs_size + dirfrag_size;
953 // we can do nothing with large dirfrag
954 if (cur_size >= max_size && found_size * 2 > max_size)
955 break;
956
957 found_size += dirfrag_size;
958
959 if (stack.size() > 1) {
960 auto& parent = stack[stack.size() - 2];
961 parent.subdirs.emplace_back(cur, cur_size);
962 parent.subdirs_size += cur_size;
963 }
964 } else {
965 // can't merge current dirfrag to its parent if there is skipped subdir
966 results.insert(results.end(), data.subdirs.begin(), data.subdirs.end());
967 skipped_size += dirfrag_size;
968 }
969
970 vector<CDir*> ls;
971 ls.swap(data.siblings);
972
973 stack.pop_back();
974 if (stack.empty())
975 break;
976
977 if (found_size >= max_size)
978 break;
979
980 // next dirfrag
981 if (!ls.empty()) {
982 stack.emplace_back(ls.back());
983 ls.pop_back();
984 stack.back().siblings.swap(ls);
985 }
986 }
987
988 for (auto& p : stack)
989 results.insert(results.end(), p.subdirs.begin(), p.subdirs.end());
990
991 if (results.empty() && (!skipped_size || !null_okay))
992 results.emplace_back(dir, found_size + skipped_size);
993 }
994
995 class C_M_ExportDirWait : public MigratorContext {
996 MDRequestRef mdr;
997 int count;
998 public:
999 C_M_ExportDirWait(Migrator *m, MDRequestRef mdr, int count)
1000 : MigratorContext(m), mdr(mdr), count(count) {}
1001 void finish(int r) override {
1002 mig->dispatch_export_dir(mdr, count);
1003 }
1004 };
1005
1006 void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
1007 {
1008 CDir *dir = mdr->more()->export_dir;
1009 dout(7) << *mdr << " " << *dir << dendl;
1010
1011 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1012 if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
1013 // export must have aborted.
1014 dout(7) << "export must have aborted " << *mdr << dendl;
1015 ceph_assert(mdr->killed || mdr->aborted);
1016 if (mdr->aborted) {
1017 mdr->aborted = false;
1018 mdcache->request_kill(mdr);
1019 }
1020 return;
1021 }
1022 ceph_assert(it->second.state == EXPORT_LOCKING);
1023
1024 if (mdr->more()->peer_error || dir->is_frozen() || dir->is_freezing()) {
1025 dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
1026 export_try_cancel(dir);
1027 return;
1028 }
1029
1030 mds_rank_t dest = it->second.peer;
1031 if (!mds->is_export_target(dest)) {
1032 dout(7) << "dest is not yet an export target" << dendl;
1033 if (count > 3) {
1034 dout(7) << "dest has not been added as export target after three MDSMap epochs, canceling export" << dendl;
1035 export_try_cancel(dir);
1036 return;
1037 }
1038
1039 mds->locker->drop_locks(mdr.get());
1040 mdr->drop_local_auth_pins();
1041
1042 mds->wait_for_mdsmap(mds->mdsmap->get_epoch(), new C_M_ExportDirWait(this, mdr, count+1));
1043 return;
1044 }
1045
1046 if (!dir->inode->get_parent_dn()) {
1047 dout(7) << "waiting for dir to become stable before export: " << *dir << dendl;
1048 dir->add_waiter(CDir::WAIT_CREATED, new C_M_ExportDirWait(this, mdr, 1));
1049 return;
1050 }
1051
1052 // locks?
1053 if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
1054 MutationImpl::LockOpVec lov;
1055 // If auth MDS of the subtree root inode is neither the exporter MDS
1056 // nor the importer MDS and it gathers subtree root's fragstat/neststat
1057 // while the subtree is exporting. It's possible that the exporter MDS
1058 // and the importer MDS both are auth MDS of the subtree root or both
1059 // are not auth MDS of the subtree root at the time they receive the
1060 // lock messages. So the auth MDS of the subtree root inode may get no
1061 // or duplicated fragstat/neststat for the subtree root dirfrag.
1062 lov.lock_scatter_gather(&dir->get_inode()->filelock);
1063 lov.lock_scatter_gather(&dir->get_inode()->nestlock);
1064 if (dir->get_inode()->is_auth()) {
1065 dir->get_inode()->filelock.set_scatter_wanted();
1066 dir->get_inode()->nestlock.set_scatter_wanted();
1067 }
1068 lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
1069
1070 if (!mds->locker->acquire_locks(mdr, lov, nullptr, true)) {
1071 if (mdr->aborted)
1072 export_try_cancel(dir);
1073 return;
1074 }
1075
1076 lov.clear();
1077 // bound dftlocks:
1078 // NOTE: We need to take an rdlock on bounding dirfrags during
1079 // migration for a rather irritating reason: when we export the
1080 // bound inode, we need to send scatterlock state for the dirfrags
1081 // as well, so that the new auth also gets the correct info. If we
1082 // race with a refragment, this info is useless, as we can't
1083 // redivvy it up. And it's needed for the scatterlocks to work
1084 // properly: when the auth is in a sync/lock state it keeps each
1085 // dirfrag's portion in the local (auth OR replica) dirfrag.
1086 set<CDir*> wouldbe_bounds;
1087 set<CInode*> bound_inodes;
1088 mdcache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
1089 for (auto& bound : wouldbe_bounds)
1090 bound_inodes.insert(bound->get_inode());
1091 for (auto& in : bound_inodes)
1092 lov.add_rdlock(&in->dirfragtreelock);
1093
1094 if (!mds->locker->rdlock_try_set(lov, mdr))
1095 return;
1096
1097 if (!mds->locker->try_rdlock_snap_layout(dir->get_inode(), mdr))
1098 return;
1099
1100 mdr->locking_state |= MutationImpl::ALL_LOCKED;
1101 }
1102
1103 ceph_assert(g_conf()->mds_kill_export_at != 1);
1104
1105 auto parent = it->second.parent;
1106
1107 vector<pair<CDir*, size_t> > results;
1108 maybe_split_export(dir, max_export_size, (bool)parent, results);
1109
1110 if (results.size() == 1 && results.front().first == dir) {
1111 num_locking_exports--;
1112 it->second.state = EXPORT_DISCOVERING;
1113 // send ExportDirDiscover (ask target)
1114 filepath path;
1115 dir->inode->make_path(path);
1116 auto discover = make_message<MExportDirDiscover>(dir->dirfrag(), path,
1117 mds->get_nodeid(),
1118 it->second.tid);
1119 mds->send_message_mds(discover, dest);
1120 ceph_assert(g_conf()->mds_kill_export_at != 2);
1121
1122 it->second.last_cum_auth_pins_change = ceph_clock_now();
1123 it->second.approx_size = results.front().second;
1124 total_exporting_size += it->second.approx_size;
1125
1126 // start the freeze, but hold it up with an auth_pin.
1127 dir->freeze_tree();
1128 ceph_assert(dir->is_freezing_tree());
1129 dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir, it->second.tid));
1130 return;
1131 }
1132
1133 if (parent) {
1134 parent->pending_children += results.size();
1135 } else {
1136 parent = std::make_shared<export_base_t>(dir->dirfrag(), dest,
1137 results.size(), export_queue_gen);
1138 }
1139
1140 if (results.empty()) {
1141 dout(7) << "subtree's children all are under exporting, retry rest parts of parent export "
1142 << parent->dirfrag << dendl;
1143 parent->restart = true;
1144 } else {
1145 dout(7) << "subtree is too large, splitting it into: " << dendl;
1146 }
1147
1148 for (auto& p : results) {
1149 CDir *sub = p.first;
1150 ceph_assert(sub != dir);
1151 dout(7) << " sub " << *sub << dendl;
1152
1153 sub->auth_pin(this);
1154 sub->mark_exporting();
1155
1156 MDRequestRef _mdr = mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
1157 _mdr->more()->export_dir = sub;
1158 _mdr->pin(sub);
1159
1160 ceph_assert(export_state.count(sub) == 0);
1161 auto& stat = export_state[sub];
1162 num_locking_exports++;
1163 stat.state = EXPORT_LOCKING;
1164 stat.peer = dest;
1165 stat.tid = _mdr->reqid.tid;
1166 stat.mut = _mdr;
1167 stat.parent = parent;
1168 mdcache->dispatch_request(_mdr);
1169 }
1170
1171 // cancel the original one
1172 export_try_cancel(dir);
1173 }
1174
1175 void Migrator::child_export_finish(std::shared_ptr<export_base_t>& parent, bool success)
1176 {
1177 if (success)
1178 parent->restart = true;
1179 if (--parent->pending_children == 0) {
1180 if (parent->restart &&
1181 parent->export_queue_gen == export_queue_gen) {
1182 CDir *origin = mdcache->get_dirfrag(parent->dirfrag);
1183 if (origin && origin->is_auth()) {
1184 dout(7) << "child_export_finish requeue " << *origin << dendl;
1185 export_queue.emplace_front(origin->dirfrag(), parent->dest);
1186 }
1187 }
1188 }
1189 }
1190
1191 /*
1192 * called on receipt of MExportDirDiscoverAck
1193 * the importer now has the directory's _inode_ in memory, and pinned.
1194 */
1195 void Migrator::handle_export_discover_ack(const cref_t<MExportDirDiscoverAck> &m)
1196 {
1197 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
1198 mds_rank_t dest(m->get_source().num());
1199 ceph_assert(dir);
1200
1201 dout(7) << "from " << m->get_source()
1202 << " on " << *dir << dendl;
1203
1204 mds->hit_export_target(dest, -1);
1205
1206 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1207 if (it == export_state.end() ||
1208 it->second.tid != m->get_tid() ||
1209 it->second.peer != dest) {
1210 dout(7) << "must have aborted" << dendl;
1211 } else {
1212 ceph_assert(it->second.state == EXPORT_DISCOVERING);
1213
1214 if (m->is_success()) {
1215 // release locks to avoid deadlock
1216 MDRequestRef mdr = static_cast<MDRequestImpl*>(it->second.mut.get());
1217 ceph_assert(mdr);
1218 mdcache->request_finish(mdr);
1219 it->second.mut.reset();
1220 // freeze the subtree
1221 it->second.state = EXPORT_FREEZING;
1222 dir->auth_unpin(this);
1223 ceph_assert(g_conf()->mds_kill_export_at != 3);
1224
1225 } else {
1226 dout(7) << "peer failed to discover (not active?), canceling" << dendl;
1227 export_try_cancel(dir, false);
1228 }
1229 }
1230 }
1231
1232 class C_M_ExportSessionsFlushed : public MigratorContext {
1233 CDir *dir;
1234 uint64_t tid;
1235 public:
1236 C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t) :
1237 MigratorContext(m), dir(d), tid(t) {
1238 dir->get(CDir::PIN_PTRWAITER);
1239 }
1240 void finish(int r) override {
1241 mig->export_sessions_flushed(dir, tid);
1242 dir->put(CDir::PIN_PTRWAITER);
1243 }
1244 };
1245
1246 void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
1247 {
1248 dout(7) << *dir << dendl;
1249
1250 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1251 if (it == export_state.end() ||
1252 it->second.state == EXPORT_CANCELLING ||
1253 it->second.tid != tid) {
1254 // export must have aborted.
1255 dout(7) << "export must have aborted on " << dir << dendl;
1256 return;
1257 }
1258
1259 ceph_assert(it->second.state == EXPORT_PREPPING || it->second.state == EXPORT_WARNING);
1260 ceph_assert(it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0);
1261 it->second.warning_ack_waiting.erase(MDS_RANK_NONE);
1262 if (it->second.state == EXPORT_WARNING && it->second.warning_ack_waiting.empty())
1263 export_go(dir); // start export.
1264 }
1265
1266 void Migrator::encode_export_prep_trace(bufferlist &final_bl, CDir *bound,
1267 CDir *dir, export_state_t &es,
1268 set<inodeno_t> &inodes_added,
1269 set<dirfrag_t> &dirfrags_added)
1270 {
1271 ENCODE_START(1, 1, final_bl);
1272
1273 dout(7) << " started to encode dir " << *bound << dendl;
1274 CDir *cur = bound;
1275 bufferlist tracebl;
1276 char start = '-';
1277
1278 while (1) {
1279 // don't repeat inodes
1280 if (inodes_added.count(cur->inode->ino()))
1281 break;
1282 inodes_added.insert(cur->inode->ino());
1283
1284 // prepend dentry + inode
1285 ceph_assert(cur->inode->is_auth());
1286 bufferlist bl;
1287 mdcache->encode_replica_dentry(cur->inode->parent, es.peer, bl);
1288 dout(7) << " added " << *cur->inode->parent << dendl;
1289 mdcache->encode_replica_inode(cur->inode, es.peer, bl, mds->mdsmap->get_up_features());
1290 dout(7) << " added " << *cur->inode << dendl;
1291 bl.claim_append(tracebl);
1292 tracebl = std::move(bl);
1293
1294 cur = cur->get_parent_dir();
1295 // don't repeat dirfrags
1296 if (dirfrags_added.count(cur->dirfrag()) || cur == dir) {
1297 start = 'd'; // start with dentry
1298 break;
1299 }
1300 dirfrags_added.insert(cur->dirfrag());
1301
1302 // prepend dir
1303 mdcache->encode_replica_dir(cur, es.peer, bl);
1304 dout(7) << " added " << *cur << dendl;
1305 bl.claim_append(tracebl);
1306 tracebl = std::move(bl);
1307 start = 'f'; // start with dirfrag
1308 }
1309 dirfrag_t df = cur->dirfrag();
1310 encode(df, final_bl);
1311 encode(start, final_bl);
1312 final_bl.claim_append(tracebl);
1313
1314 ENCODE_FINISH(final_bl);
1315 }
1316
1317 void Migrator::export_frozen(CDir *dir, uint64_t tid)
1318 {
1319 dout(7) << *dir << dendl;
1320
1321 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1322 if (it == export_state.end() || it->second.tid != tid) {
1323 dout(7) << "export must have aborted" << dendl;
1324 return;
1325 }
1326
1327 ceph_assert(it->second.state == EXPORT_FREEZING);
1328 ceph_assert(dir->is_frozen_tree_root());
1329
1330 it->second.mut = new MutationImpl();
1331
1332 // ok, try to grab all my locks.
1333 CInode *diri = dir->get_inode();
1334 if ((diri->is_auth() && diri->is_frozen()) ||
1335 !export_try_grab_locks(dir, it->second.mut)) {
1336 dout(7) << "export_dir couldn't acquire all needed locks, failing. "
1337 << *dir << dendl;
1338 export_try_cancel(dir);
1339 return;
1340 }
1341
1342 if (diri->is_auth())
1343 it->second.mut->auth_pin(diri);
1344
1345 mdcache->show_subtrees();
1346
1347 // CDir::_freeze_tree() should have forced it into subtree.
1348 ceph_assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
1349 // note the bounds.
1350 set<CDir*> bounds;
1351 mdcache->get_subtree_bounds(dir, bounds);
1352
1353 // generate prep message, log entry.
1354 auto prep = make_message<MExportDirPrep>(dir->dirfrag(), it->second.tid);
1355
1356 // include list of bystanders
1357 for (const auto &p : dir->get_replicas()) {
1358 if (p.first != it->second.peer) {
1359 dout(10) << "bystander mds." << p.first << dendl;
1360 prep->add_bystander(p.first);
1361 }
1362 }
1363
1364 // include base dirfrag
1365 mdcache->encode_replica_dir(dir, it->second.peer, prep->basedir);
1366
1367 /*
1368 * include spanning tree for all nested exports.
1369 * these need to be on the destination _before_ the final export so that
1370 * dir_auth updates on any nested exports are properly absorbed.
1371 * this includes inodes and dirfrags included in the subtree, but
1372 * only the inodes at the bounds.
1373 *
1374 * each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
1375 */
1376 set<inodeno_t> inodes_added;
1377 set<dirfrag_t> dirfrags_added;
1378
1379 // check bounds
1380 for (auto &bound : bounds){
1381 // pin it.
1382 bound->get(CDir::PIN_EXPORTBOUND);
1383 bound->state_set(CDir::STATE_EXPORTBOUND);
1384
1385 dout(7) << " export bound " << *bound << dendl;
1386 prep->add_bound( bound->dirfrag() );
1387
1388 bufferlist final_bl;
1389 encode_export_prep_trace(final_bl, bound, dir, it->second, inodes_added, dirfrags_added);
1390 prep->add_trace(final_bl);
1391 }
1392
1393 // send.
1394 it->second.state = EXPORT_PREPPING;
1395 mds->send_message_mds(prep, it->second.peer);
1396 ceph_assert(g_conf()->mds_kill_export_at != 4);
1397
1398 // make sure any new instantiations of caps are flushed out
1399 ceph_assert(it->second.warning_ack_waiting.empty());
1400
1401 set<client_t> export_client_set;
1402 get_export_client_set(dir, export_client_set);
1403
1404 MDSGatherBuilder gather(g_ceph_context);
1405 mds->server->flush_client_sessions(export_client_set, gather);
1406 if (gather.has_subs()) {
1407 it->second.warning_ack_waiting.insert(MDS_RANK_NONE);
1408 gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
1409 gather.activate();
1410 }
1411 }
1412
1413 void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
1414 {
1415 deque<CDir*> dfs;
1416 dfs.push_back(dir);
1417 while (!dfs.empty()) {
1418 CDir *dir = dfs.front();
1419 dfs.pop_front();
1420 for (auto& p : *dir) {
1421 CDentry *dn = p.second;
1422 if (!dn->get_linkage()->is_primary())
1423 continue;
1424 CInode *in = dn->get_linkage()->get_inode();
1425 if (in->is_dir()) {
1426 // directory?
1427 auto&& ls = in->get_dirfrags();
1428 for (auto& q : ls) {
1429 if (!q->state_test(CDir::STATE_EXPORTBOUND)) {
1430 // include nested dirfrag
1431 ceph_assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
1432 dfs.push_back(q); // it's ours, recurse (later)
1433 }
1434 }
1435 }
1436 for (auto& q : in->get_client_caps()) {
1437 client_set.insert(q.first);
1438 }
1439 }
1440 }
1441 }
1442
1443 void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
1444 {
1445 for (const auto &p : in->get_client_caps()) {
1446 client_set.insert(p.first);
1447 }
1448 }
1449
1450 void Migrator::handle_export_prep_ack(const cref_t<MExportDirPrepAck> &m)
1451 {
1452 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
1453 mds_rank_t dest(m->get_source().num());
1454 ceph_assert(dir);
1455
1456 dout(7) << *dir << dendl;
1457
1458 mds->hit_export_target(dest, -1);
1459
1460 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1461 if (it == export_state.end() ||
1462 it->second.tid != m->get_tid() ||
1463 it->second.peer != mds_rank_t(m->get_source().num())) {
1464 // export must have aborted.
1465 dout(7) << "export must have aborted" << dendl;
1466 return;
1467 }
1468 ceph_assert(it->second.state == EXPORT_PREPPING);
1469
1470 if (!m->is_success()) {
1471 dout(7) << "peer couldn't acquire all needed locks or wasn't active, canceling" << dendl;
1472 export_try_cancel(dir, false);
1473 return;
1474 }
1475
1476 ceph_assert(g_conf()->mds_kill_export_at != 5);
1477 // send warnings
1478 set<CDir*> bounds;
1479 mdcache->get_subtree_bounds(dir, bounds);
1480
1481 ceph_assert(it->second.warning_ack_waiting.empty() ||
1482 (it->second.warning_ack_waiting.size() == 1 &&
1483 it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0));
1484 ceph_assert(it->second.notify_ack_waiting.empty());
1485
1486 for (const auto &p : dir->get_replicas()) {
1487 if (p.first == it->second.peer) continue;
1488 if (mds->is_cluster_degraded() &&
1489 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first))
1490 continue; // only if active
1491 it->second.warning_ack_waiting.insert(p.first);
1492 it->second.notify_ack_waiting.insert(p.first); // we'll eventually get a notifyack, too!
1493
1494 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), it->second.tid, true,
1495 mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
1496 mds_authority_t(mds->get_nodeid(),it->second.peer));
1497 for (auto &cdir : bounds) {
1498 notify->get_bounds().push_back(cdir->dirfrag());
1499 }
1500 mds->send_message_mds(notify, p.first);
1501
1502 }
1503
1504 it->second.state = EXPORT_WARNING;
1505
1506 ceph_assert(g_conf()->mds_kill_export_at != 6);
1507 // nobody to warn?
1508 if (it->second.warning_ack_waiting.empty())
1509 export_go(dir); // start export.
1510 }
1511
1512
1513 class C_M_ExportGo : public MigratorContext {
1514 CDir *dir;
1515 uint64_t tid;
1516 public:
1517 C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
1518 MigratorContext(m), dir(d), tid(t) {
1519 dir->get(CDir::PIN_PTRWAITER);
1520 }
1521 void finish(int r) override {
1522 mig->export_go_synced(dir, tid);
1523 dir->put(CDir::PIN_PTRWAITER);
1524 }
1525 };
1526
1527 void Migrator::export_go(CDir *dir)
1528 {
1529 auto it = export_state.find(dir);
1530 ceph_assert(it != export_state.end());
1531 dout(7) << *dir << " to " << it->second.peer << dendl;
1532
1533 // first sync log to flush out e.g. any cap imports
1534 mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, it->second.tid));
1535 mds->mdlog->flush();
1536 }
1537
1538 void Migrator::export_go_synced(CDir *dir, uint64_t tid)
1539 {
1540 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1541 if (it == export_state.end() ||
1542 it->second.state == EXPORT_CANCELLING ||
1543 it->second.tid != tid) {
1544 // export must have aborted.
1545 dout(7) << "export must have aborted on " << dir << dendl;
1546 return;
1547 }
1548 ceph_assert(it->second.state == EXPORT_WARNING);
1549 mds_rank_t dest = it->second.peer;
1550
1551 dout(7) << *dir << " to " << dest << dendl;
1552
1553 mdcache->show_subtrees();
1554
1555 it->second.state = EXPORT_EXPORTING;
1556 ceph_assert(g_conf()->mds_kill_export_at != 7);
1557
1558 ceph_assert(dir->is_frozen_tree_root());
1559
1560 // set ambiguous auth
1561 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), dest);
1562
1563 // take away the popularity we're sending.
1564 mds->balancer->subtract_export(dir);
1565
1566 // fill export message with cache data
1567 auto req = make_message<MExportDir>(dir->dirfrag(), it->second.tid);
1568 map<client_t,entity_inst_t> exported_client_map;
1569 map<client_t,client_metadata_t> exported_client_metadata_map;
1570 uint64_t num_exported_inodes = 0;
1571 encode_export_dir(req->export_data, dir, // recur start point
1572 exported_client_map, exported_client_metadata_map,
1573 num_exported_inodes);
1574 encode(exported_client_map, req->client_map, mds->mdsmap->get_up_features());
1575 encode(exported_client_metadata_map, req->client_map);
1576
1577 // add bounds to message
1578 set<CDir*> bounds;
1579 mdcache->get_subtree_bounds(dir, bounds);
1580 for (set<CDir*>::iterator p = bounds.begin();
1581 p != bounds.end();
1582 ++p)
1583 req->add_export((*p)->dirfrag());
1584
1585 // send
1586 mds->send_message_mds(req, dest);
1587 ceph_assert(g_conf()->mds_kill_export_at != 8);
1588
1589 mds->hit_export_target(dest, num_exported_inodes+1);
1590
1591 // stats
1592 if (mds->logger) mds->logger->inc(l_mds_exported);
1593 if (mds->logger) mds->logger->inc(l_mds_exported_inodes, num_exported_inodes);
1594
1595 mdcache->show_subtrees();
1596 }
1597
1598
1599 /** encode_export_inode
1600 * update our local state for this inode to export.
1601 * encode relevant state to be sent over the wire.
1602 * used by: encode_export_dir, file_rename (if foreign)
1603 *
1604 * FIXME: the separation between CInode.encode_export and these methods
1605 * is pretty arbitrary and dumb.
1606 */
1607 void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
1608 map<client_t,entity_inst_t>& exported_client_map,
1609 map<client_t,client_metadata_t>& exported_client_metadata_map)
1610 {
1611 ENCODE_START(1, 1, enc_state);
1612 dout(7) << *in << dendl;
1613 ceph_assert(!in->is_replica(mds->get_nodeid()));
1614
1615 encode(in->ino(), enc_state);
1616 encode(in->last, enc_state);
1617 in->encode_export(enc_state);
1618
1619 // caps
1620 encode_export_inode_caps(in, true, enc_state, exported_client_map, exported_client_metadata_map);
1621 ENCODE_FINISH(enc_state);
1622 }
1623
1624 void Migrator::encode_export_inode_caps(CInode *in, bool auth_cap, bufferlist& bl,
1625 map<client_t,entity_inst_t>& exported_client_map,
1626 map<client_t,client_metadata_t>& exported_client_metadata_map)
1627 {
1628 ENCODE_START(1, 1, bl);
1629 dout(20) << *in << dendl;
1630 // encode caps
1631 map<client_t,Capability::Export> cap_map;
1632 in->export_client_caps(cap_map);
1633 encode(cap_map, bl);
1634 if (auth_cap) {
1635 encode(in->get_mds_caps_wanted(), bl);
1636
1637 in->state_set(CInode::STATE_EXPORTINGCAPS);
1638 in->get(CInode::PIN_EXPORTINGCAPS);
1639 }
1640
1641 // make note of clients named by exported capabilities
1642 for (const auto &p : in->get_client_caps()) {
1643 if (exported_client_map.count(p.first))
1644 continue;
1645 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p.first.v));
1646 exported_client_map[p.first] = session->info.inst;
1647 exported_client_metadata_map[p.first] = session->info.client_metadata;
1648 }
1649 ENCODE_FINISH(bl);
1650 }
1651
1652 void Migrator::finish_export_inode_caps(CInode *in, mds_rank_t peer,
1653 map<client_t,Capability::Import>& peer_imported)
1654 {
1655 dout(20) << *in << dendl;
1656
1657 in->state_clear(CInode::STATE_EXPORTINGCAPS);
1658 in->put(CInode::PIN_EXPORTINGCAPS);
1659
1660 // tell (all) clients about migrating caps..
1661 for (const auto &p : in->get_client_caps()) {
1662 const Capability *cap = &p.second;
1663 dout(7) << p.first
1664 << " exported caps on " << *in << dendl;
1665 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
1666 cap->get_cap_id(), cap->get_mseq(),
1667 mds->get_osd_epoch_barrier());
1668 map<client_t,Capability::Import>::iterator q = peer_imported.find(p.first);
1669 ceph_assert(q != peer_imported.end());
1670 m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
1671 (q->second.cap_id > 0 ? peer : -1), 0);
1672 mds->send_message_client_counted(m, p.first);
1673 }
1674 in->clear_client_caps_after_export();
1675 mds->locker->eval(in, CEPH_CAP_LOCKS);
1676 }
1677
1678 void Migrator::finish_export_inode(CInode *in, mds_rank_t peer,
1679 map<client_t,Capability::Import>& peer_imported,
1680 MDSContext::vec& finished)
1681 {
1682 dout(12) << *in << dendl;
1683
1684 // clean
1685 if (in->is_dirty())
1686 in->mark_clean();
1687
1688 // clear/unpin cached_by (we're no longer the authority)
1689 in->clear_replica_map();
1690
1691 // twiddle lock states for auth -> replica transition
1692 in->authlock.export_twiddle();
1693 in->linklock.export_twiddle();
1694 in->dirfragtreelock.export_twiddle();
1695 in->filelock.export_twiddle();
1696 in->nestlock.export_twiddle();
1697 in->xattrlock.export_twiddle();
1698 in->snaplock.export_twiddle();
1699 in->flocklock.export_twiddle();
1700 in->policylock.export_twiddle();
1701
1702 // mark auth
1703 ceph_assert(in->is_auth());
1704 in->state_clear(CInode::STATE_AUTH);
1705 in->replica_nonce = CInode::EXPORT_NONCE;
1706
1707 in->clear_dirty_rstat();
1708
1709 // no more auth subtree? clear scatter dirty
1710 if (!in->has_subtree_root_dirfrag(mds->get_nodeid()))
1711 in->clear_scatter_dirty();
1712
1713 in->clear_dirty_parent();
1714
1715 in->clear_clientwriteable();
1716
1717 in->clear_file_locks();
1718
1719 // waiters
1720 in->take_waiting(CInode::WAIT_ANY_MASK, finished);
1721
1722 in->finish_export();
1723
1724 finish_export_inode_caps(in, peer, peer_imported);
1725 }
1726
1727 void Migrator::encode_export_dir(bufferlist& exportbl,
1728 CDir *dir,
1729 map<client_t,entity_inst_t>& exported_client_map,
1730 map<client_t,client_metadata_t>& exported_client_metadata_map,
1731 uint64_t &num_exported)
1732 {
1733 // This has to be declared before ENCODE_STARTED as it will need to be referenced after ENCODE_FINISH.
1734 std::vector<CDir*> subdirs;
1735
1736 ENCODE_START(1, 1, exportbl);
1737 dout(7) << *dir << " " << dir->get_num_head_items() << " head items" << dendl;
1738
1739 ceph_assert(dir->get_projected_version() == dir->get_version());
1740
1741 #ifdef MDS_VERIFY_FRAGSTAT
1742 if (dir->is_complete())
1743 dir->verify_fragstat();
1744 #endif
1745
1746 // dir
1747 dirfrag_t df = dir->dirfrag();
1748 encode(df, exportbl);
1749 dir->encode_export(exportbl);
1750
1751 __u32 nden = dir->items.size();
1752 encode(nden, exportbl);
1753
1754 // dentries
1755 for (auto &p : *dir) {
1756 CDentry *dn = p.second;
1757 CInode *in = dn->get_linkage()->get_inode();
1758
1759 num_exported++;
1760
1761 // -- dentry
1762 dout(7) << " exporting " << *dn << dendl;
1763
1764 // dn name
1765 encode(dn->get_name(), exportbl);
1766 encode(dn->last, exportbl);
1767
1768 // state
1769 dn->encode_export(exportbl);
1770
1771 // points to...
1772
1773 // null dentry?
1774 if (dn->get_linkage()->is_null()) {
1775 exportbl.append("N", 1); // null dentry
1776 continue;
1777 }
1778
1779 if (dn->get_linkage()->is_remote()) {
1780 inodeno_t ino = dn->get_linkage()->get_remote_ino();
1781 unsigned char d_type = dn->get_linkage()->get_remote_d_type();
1782 auto& alternate_name = dn->alternate_name;
1783 // remote link
1784 CDentry::encode_remote(ino, d_type, alternate_name, exportbl);
1785 continue;
1786 }
1787
1788 // primary link
1789 // -- inode
1790 exportbl.append("i", 1); // inode dentry
1791
1792 ENCODE_START(2, 1, exportbl);
1793 encode_export_inode(in, exportbl, exported_client_map, exported_client_metadata_map); // encode, and (update state for) export
1794 encode(dn->alternate_name, exportbl);
1795 ENCODE_FINISH(exportbl);
1796
1797 // directory?
1798 auto&& dfs = in->get_dirfrags();
1799 for (const auto& t : dfs) {
1800 if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
1801 // include nested dirfrag
1802 ceph_assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
1803 subdirs.push_back(t); // it's ours, recurse (later)
1804 }
1805 }
1806 }
1807
1808 ENCODE_FINISH(exportbl);
1809 // subdirs
1810 for (const auto &dir : subdirs) {
1811 encode_export_dir(exportbl, dir, exported_client_map, exported_client_metadata_map, num_exported);
1812 }
1813 }
1814
1815 void Migrator::finish_export_dir(CDir *dir, mds_rank_t peer,
1816 map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
1817 MDSContext::vec& finished, int *num_dentries)
1818 {
1819 dout(10) << *dir << dendl;
1820
1821 // release open_by
1822 dir->clear_replica_map();
1823
1824 // mark
1825 ceph_assert(dir->is_auth());
1826 dir->state_clear(CDir::STATE_AUTH);
1827 dir->remove_bloom();
1828 dir->replica_nonce = CDir::EXPORT_NONCE;
1829
1830 if (dir->is_dirty())
1831 dir->mark_clean();
1832
1833 // suck up all waiters
1834 dir->take_waiting(CDir::WAIT_ANY_MASK, finished); // all dir waiters
1835
1836 // pop
1837 dir->finish_export();
1838
1839 // dentries
1840 std::vector<CDir*> subdirs;
1841 for (auto &p : *dir) {
1842 CDentry *dn = p.second;
1843 CInode *in = dn->get_linkage()->get_inode();
1844
1845 // dentry
1846 dn->finish_export();
1847
1848 // inode?
1849 if (dn->get_linkage()->is_primary()) {
1850 finish_export_inode(in, peer, peer_imported[in->ino()], finished);
1851
1852 // subdirs?
1853 auto&& dirs = in->get_nested_dirfrags();
1854 subdirs.insert(std::end(subdirs), std::begin(dirs), std::end(dirs));
1855 }
1856
1857 mdcache->touch_dentry_bottom(dn); // move dentry to tail of LRU
1858 ++(*num_dentries);
1859 }
1860
1861 // subdirs
1862 for (const auto& dir : subdirs) {
1863 finish_export_dir(dir, peer, peer_imported, finished, num_dentries);
1864 }
1865 }
1866
1867 class C_MDS_ExportFinishLogged : public MigratorLogContext {
1868 CDir *dir;
1869 public:
1870 C_MDS_ExportFinishLogged(Migrator *m, CDir *d) : MigratorLogContext(m), dir(d) {}
1871 void finish(int r) override {
1872 mig->export_logged_finish(dir);
1873 }
1874 };
1875
1876
1877 /*
1878 * i should get an export_ack from the export target.
1879 */
1880 void Migrator::handle_export_ack(const cref_t<MExportDirAck> &m)
1881 {
1882 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
1883 mds_rank_t dest(m->get_source().num());
1884 ceph_assert(dir);
1885 ceph_assert(dir->is_frozen_tree_root()); // i'm exporting!
1886
1887 // yay!
1888 dout(7) << *dir << dendl;
1889
1890 mds->hit_export_target(dest, -1);
1891
1892 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1893 ceph_assert(it != export_state.end());
1894 ceph_assert(it->second.state == EXPORT_EXPORTING);
1895 ceph_assert(it->second.tid == m->get_tid());
1896
1897 auto bp = m->imported_caps.cbegin();
1898 decode(it->second.peer_imported, bp);
1899
1900 it->second.state = EXPORT_LOGGINGFINISH;
1901 ceph_assert(g_conf()->mds_kill_export_at != 9);
1902 set<CDir*> bounds;
1903 mdcache->get_subtree_bounds(dir, bounds);
1904
1905 // log completion.
1906 // include export bounds, to ensure they're in the journal.
1907 EExport *le = new EExport(mds->mdlog, dir, it->second.peer);;
1908 mds->mdlog->start_entry(le);
1909
1910 le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
1911 le->metablob.add_dir(dir, false);
1912 for (set<CDir*>::iterator p = bounds.begin();
1913 p != bounds.end();
1914 ++p) {
1915 CDir *bound = *p;
1916 le->get_bounds().insert(bound->dirfrag());
1917 le->metablob.add_dir_context(bound);
1918 le->metablob.add_dir(bound, false);
1919 }
1920
1921 // list us second, them first.
1922 // this keeps authority().first in sync with subtree auth state in the journal.
1923 mdcache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid());
1924
1925 // log export completion, then finish (unfreeze, trigger finish context, etc.)
1926 mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir));
1927 mds->mdlog->flush();
1928 ceph_assert(g_conf()->mds_kill_export_at != 10);
1929 }
1930
1931 void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
1932 {
1933 dout(7) << *dir << dendl;
1934
1935 ceph_assert(stat.state == EXPORT_CANCELLING);
1936
1937 if (stat.notify_ack_waiting.empty()) {
1938 stat.state = EXPORT_CANCELLED;
1939 return;
1940 }
1941
1942 dir->auth_pin(this);
1943
1944 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
1945 p != stat.notify_ack_waiting.end();
1946 ++p) {
1947 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
1948 pair<int,int>(mds->get_nodeid(), stat.peer),
1949 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
1950 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
1951 notify->get_bounds().push_back((*i)->dirfrag());
1952 mds->send_message_mds(notify, *p);
1953 }
1954 }
1955
1956 /*
1957 * this happens if the dest failes after i send the export data but before it is acked
1958 * that is, we don't know they safely received and logged it, so we reverse our changes
1959 * and go on.
1960 */
1961 void Migrator::export_reverse(CDir *dir, export_state_t& stat)
1962 {
1963 dout(7) << *dir << dendl;
1964
1965 set<CInode*> to_eval;
1966
1967 set<CDir*> bounds;
1968 mdcache->get_subtree_bounds(dir, bounds);
1969
1970 // remove exporting pins
1971 std::deque<CDir*> rq;
1972 rq.push_back(dir);
1973 while (!rq.empty()) {
1974 CDir *t = rq.front();
1975 rq.pop_front();
1976 t->abort_export();
1977 for (auto &p : *t) {
1978 CDentry *dn = p.second;
1979 dn->abort_export();
1980 if (!dn->get_linkage()->is_primary())
1981 continue;
1982 CInode *in = dn->get_linkage()->get_inode();
1983 in->abort_export();
1984 if (in->state_test(CInode::STATE_EVALSTALECAPS)) {
1985 in->state_clear(CInode::STATE_EVALSTALECAPS);
1986 to_eval.insert(in);
1987 }
1988 if (in->is_dir()) {
1989 auto&& dirs = in->get_nested_dirfrags();
1990 for (const auto& dir : dirs) {
1991 rq.push_back(dir);
1992 }
1993 }
1994 }
1995 }
1996
1997 // unpin bounds
1998 for (auto bd : bounds) {
1999 bd->put(CDir::PIN_EXPORTBOUND);
2000 bd->state_clear(CDir::STATE_EXPORTBOUND);
2001 }
2002
2003 // notify bystanders
2004 export_notify_abort(dir, stat, bounds);
2005
2006 // unfreeze tree, with possible subtree merge.
2007 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
2008
2009 // process delayed expires
2010 mdcache->process_delayed_expire(dir);
2011
2012 dir->unfreeze_tree();
2013 mdcache->try_subtree_merge(dir);
2014
2015 // revoke/resume stale caps
2016 for (auto in : to_eval) {
2017 bool need_issue = false;
2018 for (auto &p : in->client_caps) {
2019 Capability *cap = &p.second;
2020 if (!cap->is_stale()) {
2021 need_issue = true;
2022 break;
2023 }
2024 }
2025 if (need_issue &&
2026 (!in->is_auth() || !mds->locker->eval(in, CEPH_CAP_LOCKS)))
2027 mds->locker->issue_caps(in);
2028 }
2029
2030 mdcache->show_cache();
2031 }
2032
2033
2034 /*
2035 * once i get the ack, and logged the EExportFinish(true),
2036 * send notifies (if any), otherwise go straight to finish.
2037 *
2038 */
2039 void Migrator::export_logged_finish(CDir *dir)
2040 {
2041 dout(7) << *dir << dendl;
2042
2043 export_state_t& stat = export_state[dir];
2044
2045 // send notifies
2046 set<CDir*> bounds;
2047 mdcache->get_subtree_bounds(dir, bounds);
2048
2049 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
2050 p != stat.notify_ack_waiting.end();
2051 ++p) {
2052 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
2053 pair<int,int>(mds->get_nodeid(), stat.peer),
2054 pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
2055
2056 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2057 notify->get_bounds().push_back((*i)->dirfrag());
2058
2059 mds->send_message_mds(notify, *p);
2060 }
2061
2062 // wait for notifyacks
2063 stat.state = EXPORT_NOTIFYING;
2064 ceph_assert(g_conf()->mds_kill_export_at != 11);
2065
2066 // no notifies to wait for?
2067 if (stat.notify_ack_waiting.empty()) {
2068 export_finish(dir); // skip notify/notify_ack stage.
2069 } else {
2070 // notify peer to send cap import messages to clients
2071 if (!mds->is_cluster_degraded() ||
2072 mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) {
2073 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), false, stat.tid), stat.peer);
2074 } else {
2075 dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
2076 }
2077 }
2078 }
2079
2080 /*
2081 * warning:
2082 * i'll get an ack from each bystander.
2083 * when i get them all, do the export.
2084 * notify:
2085 * i'll get an ack from each bystander.
2086 * when i get them all, unfreeze and send the finish.
2087 */
2088 void Migrator::handle_export_notify_ack(const cref_t<MExportDirNotifyAck> &m)
2089 {
2090 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
2091 mds_rank_t dest(m->get_source().num());
2092 ceph_assert(dir);
2093 mds_rank_t from = mds_rank_t(m->get_source().num());
2094
2095 mds->hit_export_target(dest, -1);
2096
2097 auto export_state_entry = export_state.find(dir);
2098 if (export_state_entry != export_state.end()) {
2099 export_state_t& stat = export_state_entry->second;
2100 if (stat.state == EXPORT_WARNING &&
2101 stat.warning_ack_waiting.erase(from)) {
2102 // exporting. process warning.
2103 dout(7) << "from " << m->get_source()
2104 << ": exporting, processing warning on " << *dir << dendl;
2105 if (stat.warning_ack_waiting.empty())
2106 export_go(dir); // start export.
2107 } else if (stat.state == EXPORT_NOTIFYING &&
2108 stat.notify_ack_waiting.erase(from)) {
2109 // exporting. process notify.
2110 dout(7) << "from " << m->get_source()
2111 << ": exporting, processing notify on " << *dir << dendl;
2112 if (stat.notify_ack_waiting.empty())
2113 export_finish(dir);
2114 } else if (stat.state == EXPORT_CANCELLING &&
2115 m->get_new_auth().second == CDIR_AUTH_UNKNOWN && // not warning ack
2116 stat.notify_ack_waiting.erase(from)) {
2117 dout(7) << "from " << m->get_source()
2118 << ": cancelling export, processing notify on " << *dir << dendl;
2119 if (stat.notify_ack_waiting.empty()) {
2120 export_cancel_finish(export_state_entry);
2121 }
2122 }
2123 }
2124 else {
2125 auto import_state_entry = import_state.find(dir->dirfrag());
2126 if (import_state_entry != import_state.end()) {
2127 import_state_t& stat = import_state_entry->second;
2128 if (stat.state == IMPORT_ABORTING) {
2129 // reversing import
2130 dout(7) << "from " << m->get_source()
2131 << ": aborting import on " << *dir << dendl;
2132 ceph_assert(stat.bystanders.count(from));
2133 stat.bystanders.erase(from);
2134 if (stat.bystanders.empty())
2135 import_reverse_unfreeze(dir);
2136 }
2137 }
2138 }
2139 }
2140
2141 void Migrator::export_finish(CDir *dir)
2142 {
2143 dout(3) << *dir << dendl;
2144
2145 ceph_assert(g_conf()->mds_kill_export_at != 12);
2146 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
2147 if (it == export_state.end()) {
2148 dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << dendl;
2149 return;
2150 }
2151
2152 // send finish/commit to new auth
2153 if (!mds->is_cluster_degraded() ||
2154 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
2155 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), true, it->second.tid), it->second.peer);
2156 } else {
2157 dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl;
2158 }
2159 ceph_assert(g_conf()->mds_kill_export_at != 13);
2160
2161 // finish export (adjust local cache state)
2162 int num_dentries = 0;
2163 MDSContext::vec finished;
2164 finish_export_dir(dir, it->second.peer,
2165 it->second.peer_imported, finished, &num_dentries);
2166
2167 ceph_assert(!dir->is_auth());
2168 mdcache->adjust_subtree_auth(dir, it->second.peer);
2169
2170 // unpin bounds
2171 set<CDir*> bounds;
2172 mdcache->get_subtree_bounds(dir, bounds);
2173 for (set<CDir*>::iterator p = bounds.begin();
2174 p != bounds.end();
2175 ++p) {
2176 CDir *bd = *p;
2177 bd->put(CDir::PIN_EXPORTBOUND);
2178 bd->state_clear(CDir::STATE_EXPORTBOUND);
2179 }
2180
2181 if (dir->state_test(CDir::STATE_AUXSUBTREE))
2182 dir->state_clear(CDir::STATE_AUXSUBTREE);
2183
2184 // discard delayed expires
2185 mdcache->discard_delayed_expire(dir);
2186
2187 dout(7) << "unfreezing" << dendl;
2188
2189 // unfreeze tree, with possible subtree merge.
2190 // (we do this _after_ removing EXPORTBOUND pins, to allow merges)
2191 dir->unfreeze_tree();
2192 mdcache->try_subtree_merge(dir);
2193
2194 // no more auth subtree? clear scatter dirty
2195 if (!dir->get_inode()->is_auth() &&
2196 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2197 dir->get_inode()->clear_scatter_dirty();
2198 // wake up scatter_nudge waiters
2199 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, finished);
2200 }
2201
2202 if (!finished.empty())
2203 mds->queue_waiters(finished);
2204
2205 MutationRef mut = std::move(it->second.mut);
2206 auto parent = std::move(it->second.parent);
2207 // remove from exporting list, clean up state
2208 total_exporting_size -= it->second.approx_size;
2209 export_state.erase(it);
2210
2211 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
2212 dir->clear_exporting();
2213
2214 mdcache->show_subtrees();
2215 audit();
2216
2217 mdcache->trim(num_dentries); // try trimming exported dentries
2218
2219 // send pending import_maps?
2220 mdcache->maybe_send_pending_resolves();
2221
2222 // drop locks, unpin path
2223 if (mut) {
2224 mds->locker->drop_locks(mut.get());
2225 mut->cleanup();
2226 }
2227
2228 if (parent)
2229 child_export_finish(parent, true);
2230
2231 maybe_do_queued_export();
2232 }
2233
2234
2235
2236 class C_MDS_ExportDiscover : public MigratorContext {
2237 public:
2238 C_MDS_ExportDiscover(Migrator *mig, const cref_t<MExportDirDiscover>& m) : MigratorContext(mig), m(m) {}
2239 void finish(int r) override {
2240 mig->handle_export_discover(m, true);
2241 }
2242 private:
2243 cref_t<MExportDirDiscover> m;
2244 };
2245
2246 class C_MDS_ExportDiscoverFactory : public MDSContextFactory {
2247 public:
2248 C_MDS_ExportDiscoverFactory(Migrator *mig, cref_t<MExportDirDiscover> m) : mig(mig), m(m) {}
2249 MDSContext *build() {
2250 return new C_MDS_ExportDiscover(mig, m);
2251 }
2252 private:
2253 Migrator *mig;
2254 cref_t<MExportDirDiscover> m;
2255 };
2256
2257 // ==========================================================
2258 // IMPORT
2259
2260 void Migrator::handle_export_discover(const cref_t<MExportDirDiscover> &m, bool started)
2261 {
2262 mds_rank_t from = m->get_source_mds();
2263 ceph_assert(from != mds->get_nodeid());
2264
2265 dout(7) << m->get_path() << dendl;
2266
2267 // note import state
2268 dirfrag_t df = m->get_dirfrag();
2269
2270 if (!mds->is_active()) {
2271 dout(7) << " not active, send NACK " << dendl;
2272 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid(), false), from);
2273 return;
2274 }
2275
2276 // only start discovering on this message once.
2277 import_state_t *p_state;
2278 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2279 if (!started) {
2280 ceph_assert(it == import_state.end());
2281 p_state = &import_state[df];
2282 p_state->state = IMPORT_DISCOVERING;
2283 p_state->peer = from;
2284 p_state->tid = m->get_tid();
2285 } else {
2286 // am i retrying after ancient path_traverse results?
2287 if (it == import_state.end() ||
2288 it->second.peer != from ||
2289 it->second.tid != m->get_tid()) {
2290 dout(7) << " dropping obsolete message" << dendl;
2291 return;
2292 }
2293 ceph_assert(it->second.state == IMPORT_DISCOVERING);
2294 p_state = &it->second;
2295 }
2296
2297 C_MDS_ExportDiscoverFactory cf(this, m);
2298 if (!mdcache->is_open()) {
2299 dout(10) << " waiting for root" << dendl;
2300 mds->mdcache->wait_for_open(cf.build());
2301 return;
2302 }
2303
2304 ceph_assert(g_conf()->mds_kill_import_at != 1);
2305
2306 // do we have it?
2307 CInode *in = mdcache->get_inode(m->get_dirfrag().ino);
2308 if (!in) {
2309 // must discover it!
2310 filepath fpath(m->get_path());
2311 vector<CDentry*> trace;
2312 MDRequestRef null_ref;
2313 int r = mdcache->path_traverse(null_ref, cf, fpath,
2314 MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_PATH_LOCKED,
2315 &trace);
2316 if (r > 0) return;
2317 if (r < 0) {
2318 dout(7) << "failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
2319 ceph_abort(); // this shouldn't happen if the auth pins its path properly!!!!
2320 }
2321
2322 ceph_abort(); // this shouldn't happen; the get_inode above would have succeeded.
2323 }
2324
2325 // yay
2326 dout(7) << "have " << df << " inode " << *in << dendl;
2327
2328 p_state->state = IMPORT_DISCOVERED;
2329
2330 // pin inode in the cache (for now)
2331 ceph_assert(in->is_dir());
2332 in->get(CInode::PIN_IMPORTING);
2333
2334 // reply
2335 dout(7) << " sending export_discover_ack on " << *in << dendl;
2336 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid()), p_state->peer);
2337 ceph_assert(g_conf()->mds_kill_import_at != 2);
2338 }
2339
2340 void Migrator::import_reverse_discovering(dirfrag_t df)
2341 {
2342 import_state.erase(df);
2343 }
2344
2345 void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
2346 {
2347 // unpin base
2348 diri->put(CInode::PIN_IMPORTING);
2349 import_state.erase(df);
2350 }
2351
2352 void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
2353 {
2354 set<CDir*> bounds;
2355 mdcache->map_dirfrag_set(stat.bound_ls, bounds);
2356 import_remove_pins(dir, bounds);
2357 import_reverse_final(dir);
2358 }
2359
2360 void Migrator::handle_export_cancel(const cref_t<MExportDirCancel> &m)
2361 {
2362 dout(7) << "on " << m->get_dirfrag() << dendl;
2363 dirfrag_t df = m->get_dirfrag();
2364 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2365 if (it == import_state.end()) {
2366 ceph_abort_msg("got export_cancel in weird state");
2367 } else if (it->second.state == IMPORT_DISCOVERING) {
2368 import_reverse_discovering(df);
2369 } else if (it->second.state == IMPORT_DISCOVERED) {
2370 CInode *in = mdcache->get_inode(df.ino);
2371 ceph_assert(in);
2372 import_reverse_discovered(df, in);
2373 } else if (it->second.state == IMPORT_PREPPING) {
2374 CDir *dir = mdcache->get_dirfrag(df);
2375 ceph_assert(dir);
2376 import_reverse_prepping(dir, it->second);
2377 } else if (it->second.state == IMPORT_PREPPED) {
2378 CDir *dir = mdcache->get_dirfrag(df);
2379 ceph_assert(dir);
2380 set<CDir*> bounds;
2381 mdcache->get_subtree_bounds(dir, bounds);
2382 import_remove_pins(dir, bounds);
2383 // adjust auth back to the exportor
2384 mdcache->adjust_subtree_auth(dir, it->second.peer);
2385 import_reverse_unfreeze(dir);
2386 } else {
2387 ceph_abort_msg("got export_cancel in weird state");
2388 }
2389 }
2390
2391 class C_MDS_ExportPrep : public MigratorContext {
2392 public:
2393 C_MDS_ExportPrep(Migrator *mig, const cref_t<MExportDirPrep>& m) : MigratorContext(mig), m(m) {}
2394 void finish(int r) override {
2395 mig->handle_export_prep(m, true);
2396 }
2397 private:
2398 cref_t<MExportDirPrep> m;
2399 };
2400
2401 class C_MDS_ExportPrepFactory : public MDSContextFactory {
2402 public:
2403 C_MDS_ExportPrepFactory(Migrator *mig, cref_t<MExportDirPrep> m) : mig(mig), m(m) {}
2404 MDSContext *build() {
2405 return new C_MDS_ExportPrep(mig, m);
2406 }
2407 private:
2408 Migrator *mig;
2409 cref_t<MExportDirPrep> m;
2410 };
2411
2412 void Migrator::decode_export_prep_trace(bufferlist::const_iterator& blp, mds_rank_t oldauth, MDSContext::vec& finished)
2413 {
2414 DECODE_START(1, blp);
2415 dirfrag_t df;
2416 decode(df, blp);
2417 char start;
2418 decode(start, blp);
2419 dout(10) << " trace from " << df << " start " << start << dendl;
2420
2421 CDir *cur = nullptr;
2422 if (start == 'd') {
2423 cur = mdcache->get_dirfrag(df);
2424 ceph_assert(cur);
2425 dout(10) << " had " << *cur << dendl;
2426 } else if (start == 'f') {
2427 CInode *in = mdcache->get_inode(df.ino);
2428 ceph_assert(in);
2429 dout(10) << " had " << *in << dendl;
2430 mdcache->decode_replica_dir(cur, blp, in, oldauth, finished);
2431 dout(10) << " added " << *cur << dendl;
2432 } else if (start == '-') {
2433 // nothing
2434 } else
2435 ceph_abort_msg("unrecognized start char");
2436
2437 while (!blp.end()) {
2438 CDentry *dn = nullptr;
2439 mdcache->decode_replica_dentry(dn, blp, cur, finished);
2440 dout(10) << " added " << *dn << dendl;
2441 CInode *in = nullptr;
2442 mdcache->decode_replica_inode(in, blp, dn, finished);
2443 dout(10) << " added " << *in << dendl;
2444 if (blp.end())
2445 break;
2446 mdcache->decode_replica_dir(cur, blp, in, oldauth, finished);
2447 dout(10) << " added " << *cur << dendl;
2448 }
2449
2450 DECODE_FINISH(blp);
2451 }
2452
2453 void Migrator::handle_export_prep(const cref_t<MExportDirPrep> &m, bool did_assim)
2454 {
2455 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
2456 ceph_assert(oldauth != mds->get_nodeid());
2457
2458 CDir *dir;
2459 CInode *diri;
2460 MDSContext::vec finished;
2461
2462 // assimilate root dir.
2463 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
2464 if (!did_assim) {
2465 ceph_assert(it != import_state.end());
2466 ceph_assert(it->second.state == IMPORT_DISCOVERED);
2467 ceph_assert(it->second.peer == oldauth);
2468 diri = mdcache->get_inode(m->get_dirfrag().ino);
2469 ceph_assert(diri);
2470 auto p = m->basedir.cbegin();
2471 mdcache->decode_replica_dir(dir, p, diri, oldauth, finished);
2472 dout(7) << "on " << *dir << " (first pass)" << dendl;
2473 } else {
2474 if (it == import_state.end() ||
2475 it->second.peer != oldauth ||
2476 it->second.tid != m->get_tid()) {
2477 dout(7) << "obsolete message, dropping" << dendl;
2478 return;
2479 }
2480 ceph_assert(it->second.state == IMPORT_PREPPING);
2481 ceph_assert(it->second.peer == oldauth);
2482
2483 dir = mdcache->get_dirfrag(m->get_dirfrag());
2484 ceph_assert(dir);
2485 dout(7) << "on " << *dir << " (subsequent pass)" << dendl;
2486 diri = dir->get_inode();
2487 }
2488 ceph_assert(dir->is_auth() == false);
2489
2490 mdcache->show_subtrees();
2491
2492 // build import bound map
2493 map<inodeno_t, fragset_t> import_bound_fragset;
2494 for (const auto &bound : m->get_bounds()) {
2495 dout(10) << " bound " << bound << dendl;
2496 import_bound_fragset[bound.ino].insert_raw(bound.frag);
2497 }
2498 // assimilate contents?
2499 if (!did_assim) {
2500 dout(7) << "doing assim on " << *dir << dendl;
2501
2502 // change import state
2503 it->second.state = IMPORT_PREPPING;
2504 it->second.bound_ls = m->get_bounds();
2505 it->second.bystanders = m->get_bystanders();
2506 ceph_assert(g_conf()->mds_kill_import_at != 3);
2507
2508 // bystander list
2509 dout(7) << "bystanders are " << it->second.bystanders << dendl;
2510
2511 // move pin to dir
2512 diri->put(CInode::PIN_IMPORTING);
2513 dir->get(CDir::PIN_IMPORTING);
2514 dir->state_set(CDir::STATE_IMPORTING);
2515
2516 // assimilate traces to exports
2517 // each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
2518 for (const auto &bl : m->traces) {
2519 auto blp = bl.cbegin();
2520 decode_export_prep_trace(blp, oldauth, finished);
2521 }
2522
2523 // make bound sticky
2524 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2525 p != import_bound_fragset.end();
2526 ++p) {
2527 p->second.simplify();
2528 CInode *in = mdcache->get_inode(p->first);
2529 ceph_assert(in);
2530 in->get_stickydirs();
2531 dout(7) << " set stickydirs on bound inode " << *in << dendl;
2532 }
2533
2534 } else {
2535 dout(7) << " not doing assim on " << *dir << dendl;
2536 }
2537
2538 MDSGatherBuilder gather(g_ceph_context);
2539
2540 if (!finished.empty())
2541 mds->queue_waiters(finished);
2542
2543
2544 bool success = true;
2545 if (mds->is_active()) {
2546 // open all bounds
2547 set<CDir*> import_bounds;
2548 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2549 p != import_bound_fragset.end();
2550 ++p) {
2551 CInode *in = mdcache->get_inode(p->first);
2552 ceph_assert(in);
2553
2554 // map fragset into a frag_t list, based on the inode fragtree
2555 frag_vec_t leaves;
2556 for (const auto& frag : p->second) {
2557 in->dirfragtree.get_leaves_under(frag, leaves);
2558 }
2559 dout(10) << " bound inode " << p->first << " fragset " << p->second << " maps to " << leaves << dendl;
2560
2561 for (const auto& leaf : leaves) {
2562 CDir *bound = mdcache->get_dirfrag(dirfrag_t(p->first, leaf));
2563 if (!bound) {
2564 dout(7) << " opening bounding dirfrag " << leaf << " on " << *in << dendl;
2565 mdcache->open_remote_dirfrag(in, leaf, gather.new_sub());
2566 continue;
2567 }
2568
2569 if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
2570 dout(7) << " pinning import bound " << *bound << dendl;
2571 bound->get(CDir::PIN_IMPORTBOUND);
2572 bound->state_set(CDir::STATE_IMPORTBOUND);
2573 } else {
2574 dout(7) << " already pinned import bound " << *bound << dendl;
2575 }
2576 import_bounds.insert(bound);
2577 }
2578 }
2579
2580 if (gather.has_subs()) {
2581 C_MDS_ExportPrepFactory cf(this, m);
2582 gather.set_finisher(cf.build());
2583 gather.activate();
2584 return;
2585 }
2586
2587 dout(7) << " all ready, noting auth and freezing import region" << dendl;
2588
2589 if (!mdcache->is_readonly() &&
2590 // for pinning scatter gather. loner has a higher chance to get wrlock
2591 diri->filelock.can_wrlock(diri->get_loner()) &&
2592 diri->nestlock.can_wrlock(diri->get_loner())) {
2593 it->second.mut = new MutationImpl();
2594 // force some locks. hacky.
2595 mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);
2596 mds->locker->wrlock_force(&dir->inode->nestlock, it->second.mut);
2597
2598 // note that i am an ambiguous auth for this subtree.
2599 // specify bounds, since the exporter explicitly defines the region.
2600 mdcache->adjust_bounded_subtree_auth(dir, import_bounds,
2601 pair<int,int>(oldauth, mds->get_nodeid()));
2602 mdcache->verify_subtree_bounds(dir, import_bounds);
2603 // freeze.
2604 dir->_freeze_tree();
2605 // note new state
2606 it->second.state = IMPORT_PREPPED;
2607 } else {
2608 dout(7) << " couldn't acquire all needed locks, failing. " << *dir << dendl;
2609 success = false;
2610 }
2611 } else {
2612 dout(7) << " not active, failing. " << *dir << dendl;
2613 success = false;
2614 }
2615
2616 if (!success)
2617 import_reverse_prepping(dir, it->second);
2618
2619 // ok!
2620 dout(7) << " sending export_prep_ack on " << *dir << dendl;
2621 mds->send_message(make_message<MExportDirPrepAck>(dir->dirfrag(), success, m->get_tid()), m->get_connection());
2622
2623 ceph_assert(g_conf()->mds_kill_import_at != 4);
2624 }
2625
2626
2627
2628
2629 class C_MDS_ImportDirLoggedStart : public MigratorLogContext {
2630 dirfrag_t df;
2631 CDir *dir;
2632 mds_rank_t from;
2633 public:
2634 map<client_t,pair<Session*,uint64_t> > imported_session_map;
2635
2636 C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
2637 MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
2638 dir->get(CDir::PIN_PTRWAITER);
2639 }
2640 void finish(int r) override {
2641 mig->import_logged_start(df, dir, from, imported_session_map);
2642 dir->put(CDir::PIN_PTRWAITER);
2643 }
2644 };
2645
2646 void Migrator::handle_export_dir(const cref_t<MExportDir> &m)
2647 {
2648 ceph_assert(g_conf()->mds_kill_import_at != 5);
2649 CDir *dir = mdcache->get_dirfrag(m->dirfrag);
2650 ceph_assert(dir);
2651
2652 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
2653 dout(7) << "importing " << *dir << " from " << oldauth << dendl;
2654
2655 ceph_assert(!dir->is_auth());
2656 ceph_assert(dir->freeze_tree_state);
2657
2658 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->dirfrag);
2659 ceph_assert(it != import_state.end());
2660 ceph_assert(it->second.state == IMPORT_PREPPED);
2661 ceph_assert(it->second.tid == m->get_tid());
2662 ceph_assert(it->second.peer == oldauth);
2663
2664 if (!dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()))
2665 dir->get_inode()->dirfragtree.force_to_leaf(g_ceph_context, dir->get_frag());
2666
2667 mdcache->show_subtrees();
2668
2669 C_MDS_ImportDirLoggedStart *onlogged = new C_MDS_ImportDirLoggedStart(this, dir, oldauth);
2670
2671 // start the journal entry
2672 EImportStart *le = new EImportStart(mds->mdlog, dir->dirfrag(), m->bounds, oldauth);
2673 mds->mdlog->start_entry(le);
2674
2675 le->metablob.add_dir_context(dir);
2676
2677 // adjust auth (list us _first_)
2678 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth);
2679
2680 // new client sessions, open these after we journal
2681 // include imported sessions in EImportStart
2682 auto cmp = m->client_map.cbegin();
2683 map<client_t,entity_inst_t> client_map;
2684 map<client_t,client_metadata_t> client_metadata_map;
2685 decode(client_map, cmp);
2686 decode(client_metadata_map, cmp);
2687 ceph_assert(cmp.end());
2688 le->cmapv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
2689 onlogged->imported_session_map);
2690 encode(client_map, le->client_map, mds->mdsmap->get_up_features());
2691 encode(client_metadata_map, le->client_map);
2692
2693 auto blp = m->export_data.cbegin();
2694 int num_imported_inodes = 0;
2695 while (!blp.end()) {
2696 decode_import_dir(blp,
2697 oldauth,
2698 dir, // import root
2699 le,
2700 mds->mdlog->get_current_segment(),
2701 it->second.peer_exports,
2702 it->second.updated_scatterlocks,
2703 num_imported_inodes);
2704 }
2705 dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
2706
2707 // include bounds in EImportStart
2708 set<CDir*> import_bounds;
2709 for (const auto &bound : m->bounds) {
2710 CDir *bd = mdcache->get_dirfrag(bound);
2711 ceph_assert(bd);
2712 le->metablob.add_dir(bd, false); // note that parent metadata is already in the event
2713 import_bounds.insert(bd);
2714 }
2715 mdcache->verify_subtree_bounds(dir, import_bounds);
2716
2717 // adjust popularity
2718 mds->balancer->add_import(dir);
2719
2720 dout(7) << "did " << *dir << dendl;
2721
2722 // note state
2723 it->second.state = IMPORT_LOGGINGSTART;
2724 ceph_assert(g_conf()->mds_kill_import_at != 6);
2725
2726 // log it
2727 mds->mdlog->submit_entry(le, onlogged);
2728 mds->mdlog->flush();
2729
2730 // some stats
2731 if (mds->logger) {
2732 mds->logger->inc(l_mds_imported);
2733 mds->logger->inc(l_mds_imported_inodes, num_imported_inodes);
2734 }
2735 }
2736
2737
2738 /*
2739 * this is an import helper
2740 * called by import_finish, and import_reverse and friends.
2741 */
2742 void Migrator::import_remove_pins(CDir *dir, set<CDir*>& bounds)
2743 {
2744 import_state_t& stat = import_state[dir->dirfrag()];
2745 // root
2746 dir->put(CDir::PIN_IMPORTING);
2747 dir->state_clear(CDir::STATE_IMPORTING);
2748
2749 // bounding inodes
2750 set<inodeno_t> did;
2751 for (list<dirfrag_t>::iterator p = stat.bound_ls.begin();
2752 p != stat.bound_ls.end();
2753 ++p) {
2754 if (did.count(p->ino))
2755 continue;
2756 did.insert(p->ino);
2757 CInode *in = mdcache->get_inode(p->ino);
2758 ceph_assert(in);
2759 in->put_stickydirs();
2760 }
2761
2762 if (stat.state == IMPORT_PREPPING) {
2763 for (auto bd : bounds) {
2764 if (bd->state_test(CDir::STATE_IMPORTBOUND)) {
2765 bd->put(CDir::PIN_IMPORTBOUND);
2766 bd->state_clear(CDir::STATE_IMPORTBOUND);
2767 }
2768 }
2769 } else if (stat.state >= IMPORT_PREPPED) {
2770 // bounding dirfrags
2771 for (auto bd : bounds) {
2772 ceph_assert(bd->state_test(CDir::STATE_IMPORTBOUND));
2773 bd->put(CDir::PIN_IMPORTBOUND);
2774 bd->state_clear(CDir::STATE_IMPORTBOUND);
2775 }
2776 }
2777 }
2778
2779 class C_MDC_QueueContexts : public MigratorContext {
2780 public:
2781 MDSContext::vec contexts;
2782 C_MDC_QueueContexts(Migrator *m) : MigratorContext(m) {}
2783 void finish(int r) override {
2784 // execute contexts immediately after 'this' context
2785 get_mds()->queue_waiters_front(contexts);
2786 }
2787 };
2788
2789 /*
2790 * note: this does teh full work of reversing and import and cleaning up
2791 * state.
2792 * called by both handle_mds_failure and by handle_resolve (if we are
2793 * a survivor coping with an exporter failure+recovery).
2794 */
2795 void Migrator::import_reverse(CDir *dir)
2796 {
2797 dout(7) << *dir << dendl;
2798
2799 import_state_t& stat = import_state[dir->dirfrag()];
2800 stat.state = IMPORT_ABORTING;
2801
2802 set<CDir*> bounds;
2803 mdcache->get_subtree_bounds(dir, bounds);
2804
2805 // remove pins
2806 import_remove_pins(dir, bounds);
2807
2808 // update auth, with possible subtree merge.
2809 ceph_assert(dir->is_subtree_root());
2810 if (mds->is_resolve())
2811 mdcache->trim_non_auth_subtree(dir);
2812
2813 mdcache->adjust_subtree_auth(dir, stat.peer);
2814
2815 auto fin = new C_MDC_QueueContexts(this);
2816 if (!dir->get_inode()->is_auth() &&
2817 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2818 dir->get_inode()->clear_scatter_dirty();
2819 // wake up scatter_nudge waiters
2820 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2821 }
2822
2823 int num_dentries = 0;
2824 // adjust auth bits.
2825 std::deque<CDir*> q;
2826 q.push_back(dir);
2827 while (!q.empty()) {
2828 CDir *cur = q.front();
2829 q.pop_front();
2830
2831 // dir
2832 cur->abort_import();
2833
2834 for (auto &p : *cur) {
2835 CDentry *dn = p.second;
2836
2837 // dentry
2838 dn->clear_auth();
2839 dn->clear_replica_map();
2840 dn->set_replica_nonce(CDentry::EXPORT_NONCE);
2841 if (dn->is_dirty())
2842 dn->mark_clean();
2843
2844 // inode?
2845 if (dn->get_linkage()->is_primary()) {
2846 CInode *in = dn->get_linkage()->get_inode();
2847 in->state_clear(CInode::STATE_AUTH);
2848 in->clear_replica_map();
2849 in->set_replica_nonce(CInode::EXPORT_NONCE);
2850 if (in->is_dirty())
2851 in->mark_clean();
2852 in->clear_dirty_rstat();
2853 if (!in->has_subtree_root_dirfrag(mds->get_nodeid())) {
2854 in->clear_scatter_dirty();
2855 in->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2856 }
2857
2858 in->clear_dirty_parent();
2859
2860 in->clear_clientwriteable();
2861 in->state_clear(CInode::STATE_NEEDSRECOVER);
2862
2863 in->authlock.clear_gather();
2864 in->linklock.clear_gather();
2865 in->dirfragtreelock.clear_gather();
2866 in->filelock.clear_gather();
2867
2868 in->clear_file_locks();
2869
2870 // non-bounding dir?
2871 auto&& dfs = in->get_dirfrags();
2872 for (const auto& dir : dfs) {
2873 if (bounds.count(dir) == 0)
2874 q.push_back(dir);
2875 }
2876 }
2877
2878 mdcache->touch_dentry_bottom(dn); // move dentry to tail of LRU
2879 ++num_dentries;
2880 }
2881 }
2882
2883 dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
2884
2885 if (stat.state == IMPORT_ACKING) {
2886 // remove imported caps
2887 for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
2888 p != stat.peer_exports.end();
2889 ++p) {
2890 CInode *in = p->first;
2891 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
2892 q != p->second.end();
2893 ++q) {
2894 Capability *cap = in->get_client_cap(q->first);
2895 if (!cap) {
2896 ceph_assert(!stat.session_map.count(q->first));
2897 continue;
2898 }
2899 if (cap->is_importing())
2900 in->remove_client_cap(q->first);
2901 else
2902 cap->clear_clientwriteable();
2903 }
2904 in->put(CInode::PIN_IMPORTINGCAPS);
2905 }
2906 for (auto& p : stat.session_map) {
2907 Session *session = p.second.first;
2908 session->dec_importing();
2909 }
2910 }
2911
2912 // log our failure
2913 mds->mdlog->start_submit_entry(new EImportFinish(dir, false)); // log failure
2914
2915 mdcache->trim(num_dentries); // try trimming dentries
2916
2917 // notify bystanders; wait in aborting state
2918 import_notify_abort(dir, bounds);
2919 }
2920
2921 void Migrator::import_notify_finish(CDir *dir, set<CDir*>& bounds)
2922 {
2923 dout(7) << *dir << dendl;
2924
2925 import_state_t& stat = import_state[dir->dirfrag()];
2926 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2927 p != stat.bystanders.end();
2928 ++p) {
2929 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, false,
2930 pair<int,int>(stat.peer, mds->get_nodeid()),
2931 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
2932 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2933 notify->get_bounds().push_back((*i)->dirfrag());
2934 mds->send_message_mds(notify, *p);
2935 }
2936 }
2937
2938 void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)
2939 {
2940 dout(7) << *dir << dendl;
2941
2942 import_state_t& stat = import_state[dir->dirfrag()];
2943 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2944 p != stat.bystanders.end(); ) {
2945 if (mds->is_cluster_degraded() &&
2946 !mds->mdsmap->is_clientreplay_or_active_or_stopping(*p)) {
2947 // this can happen if both exporter and bystander fail in the same mdsmap epoch
2948 stat.bystanders.erase(p++);
2949 continue;
2950 }
2951 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
2952 mds_authority_t(stat.peer, mds->get_nodeid()),
2953 mds_authority_t(stat.peer, CDIR_AUTH_UNKNOWN));
2954 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2955 notify->get_bounds().push_back((*i)->dirfrag());
2956 mds->send_message_mds(notify, *p);
2957 ++p;
2958 }
2959 if (stat.bystanders.empty()) {
2960 dout(7) << "no bystanders, finishing reverse now" << dendl;
2961 import_reverse_unfreeze(dir);
2962 } else {
2963 ceph_assert(g_conf()->mds_kill_import_at != 10);
2964 }
2965 }
2966
2967 void Migrator::import_reverse_unfreeze(CDir *dir)
2968 {
2969 dout(7) << *dir << dendl;
2970 ceph_assert(!dir->is_auth());
2971 mdcache->discard_delayed_expire(dir);
2972 dir->unfreeze_tree();
2973 if (dir->is_subtree_root())
2974 mdcache->try_subtree_merge(dir);
2975 import_reverse_final(dir);
2976 }
2977
2978 void Migrator::import_reverse_final(CDir *dir)
2979 {
2980 dout(7) << *dir << dendl;
2981
2982 // clean up
2983 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
2984 ceph_assert(it != import_state.end());
2985
2986 MutationRef mut = it->second.mut;
2987 import_state.erase(it);
2988
2989 // send pending import_maps?
2990 mdcache->maybe_send_pending_resolves();
2991
2992 if (mut) {
2993 mds->locker->drop_locks(mut.get());
2994 mut->cleanup();
2995 }
2996
2997 mdcache->show_subtrees();
2998 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
2999 }
3000
3001
3002
3003
3004 void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
3005 map<client_t,pair<Session*,uint64_t> >& imported_session_map)
3006 {
3007 dout(7) << *dir << dendl;
3008
3009 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
3010 if (it == import_state.end() ||
3011 it->second.state != IMPORT_LOGGINGSTART) {
3012 dout(7) << "import " << df << " must have aborted" << dendl;
3013 mds->server->finish_force_open_sessions(imported_session_map);
3014 return;
3015 }
3016
3017 // note state
3018 it->second.state = IMPORT_ACKING;
3019
3020 ceph_assert(g_conf()->mds_kill_import_at != 7);
3021
3022 // force open client sessions and finish cap import
3023 mds->server->finish_force_open_sessions(imported_session_map, false);
3024
3025 map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
3026 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3027 p != it->second.peer_exports.end();
3028 ++p) {
3029 // parameter 'peer' is NONE, delay sending cap import messages to client
3030 finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map,
3031 p->second, imported_caps[p->first->ino()]);
3032 }
3033
3034 it->second.session_map.swap(imported_session_map);
3035
3036 // send notify's etc.
3037 dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
3038
3039 // test surviving observer of a failed migration that did not complete
3040 //assert(dir->replica_map.size() < 2 || mds->get_nodeid() != 0);
3041
3042 auto ack = make_message<MExportDirAck>(dir->dirfrag(), it->second.tid);
3043 encode(imported_caps, ack->imported_caps);
3044
3045 mds->send_message_mds(ack, from);
3046 ceph_assert(g_conf()->mds_kill_import_at != 8);
3047
3048 mdcache->show_subtrees();
3049 }
3050
3051 void Migrator::handle_export_finish(const cref_t<MExportDirFinish> &m)
3052 {
3053 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
3054 ceph_assert(dir);
3055 dout(7) << *dir << (m->is_last() ? " last" : "") << dendl;
3056
3057 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
3058 ceph_assert(it != import_state.end());
3059 ceph_assert(it->second.tid == m->get_tid());
3060
3061 import_finish(dir, false, m->is_last());
3062 }
3063
3064 void Migrator::import_finish(CDir *dir, bool notify, bool last)
3065 {
3066 dout(7) << *dir << dendl;
3067
3068 map<dirfrag_t,import_state_t>::iterator it = import_state.find(dir->dirfrag());
3069 ceph_assert(it != import_state.end());
3070 ceph_assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING);
3071
3072 if (it->second.state == IMPORT_ACKING) {
3073 ceph_assert(dir->is_auth());
3074 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
3075 }
3076
3077 // log finish
3078 ceph_assert(g_conf()->mds_kill_import_at != 9);
3079
3080 if (it->second.state == IMPORT_ACKING) {
3081 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3082 p != it->second.peer_exports.end();
3083 ++p) {
3084 CInode *in = p->first;
3085 ceph_assert(in->is_auth());
3086 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
3087 q != p->second.end();
3088 ++q) {
3089 auto r = it->second.session_map.find(q->first);
3090 if (r == it->second.session_map.end())
3091 continue;
3092
3093 Session *session = r->second.first;
3094 Capability *cap = in->get_client_cap(q->first);
3095 ceph_assert(cap);
3096 cap->merge(q->second, true);
3097 cap->clear_importing();
3098 mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
3099 q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
3100 }
3101 p->second.clear();
3102 in->replica_caps_wanted = 0;
3103 }
3104 for (auto& p : it->second.session_map) {
3105 Session *session = p.second.first;
3106 session->dec_importing();
3107 }
3108 }
3109
3110 if (!last) {
3111 ceph_assert(it->second.state == IMPORT_ACKING);
3112 it->second.state = IMPORT_FINISHING;
3113 return;
3114 }
3115
3116 // remove pins
3117 set<CDir*> bounds;
3118 mdcache->get_subtree_bounds(dir, bounds);
3119
3120 if (notify)
3121 import_notify_finish(dir, bounds);
3122
3123 import_remove_pins(dir, bounds);
3124
3125 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3126 it->second.peer_exports.swap(peer_exports);
3127
3128 // clear import state (we're done!)
3129 MutationRef mut = it->second.mut;
3130 import_state.erase(it);
3131
3132 mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
3133
3134 // process delayed expires
3135 mdcache->process_delayed_expire(dir);
3136
3137 // unfreeze tree, with possible subtree merge.
3138 dir->unfreeze_tree();
3139 mdcache->try_subtree_merge(dir);
3140
3141 mdcache->show_subtrees();
3142 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
3143
3144 if (mut) {
3145 mds->locker->drop_locks(mut.get());
3146 mut->cleanup();
3147 }
3148
3149 // re-eval imported caps
3150 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = peer_exports.begin();
3151 p != peer_exports.end();
3152 ++p) {
3153 if (p->first->is_auth())
3154 mds->locker->eval(p->first, CEPH_CAP_LOCKS, true);
3155 p->first->put(CInode::PIN_IMPORTINGCAPS);
3156 }
3157
3158 // send pending import_maps?
3159 mdcache->maybe_send_pending_resolves();
3160
3161 // did i just import mydir?
3162 if (dir->ino() == MDS_INO_MDSDIR(mds->get_nodeid()))
3163 mdcache->populate_mydir();
3164
3165 // is it empty?
3166 if (dir->get_num_head_items() == 0 &&
3167 !dir->inode->is_auth()) {
3168 // reexport!
3169 export_empty_import(dir);
3170 }
3171 }
3172
3173 void Migrator::decode_import_inode(CDentry *dn, bufferlist::const_iterator& blp,
3174 mds_rank_t oldauth, LogSegment *ls,
3175 map<CInode*, map<client_t,Capability::Export> >& peer_exports,
3176 list<ScatterLock*>& updated_scatterlocks)
3177 {
3178 CInode *in;
3179 bool added = false;
3180 DECODE_START(1, blp);
3181 dout(15) << " on " << *dn << dendl;
3182
3183 inodeno_t ino;
3184 snapid_t last;
3185 decode(ino, blp);
3186 decode(last, blp);
3187
3188 in = mdcache->get_inode(ino, last);
3189 if (!in) {
3190 in = new CInode(mds->mdcache, true, 2, last);
3191 added = true;
3192 }
3193
3194 // state after link -- or not! -sage
3195 in->decode_import(blp, ls); // cap imports are noted for later action
3196
3197 // caps
3198 decode_import_inode_caps(in, true, blp, peer_exports);
3199
3200 DECODE_FINISH(blp);
3201
3202 // link before state -- or not! -sage
3203 if (dn->get_linkage()->get_inode() != in) {
3204 ceph_assert(!dn->get_linkage()->get_inode());
3205 dn->dir->link_primary_inode(dn, in);
3206 }
3207
3208 if (in->is_dir())
3209 dn->dir->pop_lru_subdirs.push_back(&in->item_pop_lru);
3210
3211 // add inode?
3212 if (added) {
3213 mdcache->add_inode(in);
3214 dout(10) << "added " << *in << dendl;
3215 } else {
3216 dout(10) << " had " << *in << dendl;
3217 }
3218
3219 if (in->get_inode()->is_dirty_rstat())
3220 in->mark_dirty_rstat();
3221
3222 if (!in->get_inode()->client_ranges.empty())
3223 in->mark_clientwriteable();
3224
3225 // clear if dirtyscattered, since we're going to journal this
3226 // but not until we _actually_ finish the import...
3227 if (in->filelock.is_dirty()) {
3228 updated_scatterlocks.push_back(&in->filelock);
3229 mds->locker->mark_updated_scatterlock(&in->filelock);
3230 }
3231
3232 if (in->dirfragtreelock.is_dirty()) {
3233 updated_scatterlocks.push_back(&in->dirfragtreelock);
3234 mds->locker->mark_updated_scatterlock(&in->dirfragtreelock);
3235 }
3236
3237 // adjust replica list
3238 //assert(!in->is_replica(oldauth)); // not true on failed export
3239 in->add_replica(oldauth, CInode::EXPORT_NONCE);
3240 if (in->is_replica(mds->get_nodeid()))
3241 in->remove_replica(mds->get_nodeid());
3242
3243 if (in->snaplock.is_stable() &&
3244 in->snaplock.get_state() != LOCK_SYNC)
3245 mds->locker->try_eval(&in->snaplock, NULL);
3246
3247 if (in->policylock.is_stable() &&
3248 in->policylock.get_state() != LOCK_SYNC)
3249 mds->locker->try_eval(&in->policylock, NULL);
3250 }
3251
3252 void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
3253 bufferlist::const_iterator &blp,
3254 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
3255 {
3256 DECODE_START(1, blp);
3257 map<client_t,Capability::Export> cap_map;
3258 decode(cap_map, blp);
3259 if (auth_cap) {
3260 mempool::mds_co::compact_map<int32_t,int32_t> mds_wanted;
3261 decode(mds_wanted, blp);
3262 mds_wanted.erase(mds->get_nodeid());
3263 in->set_mds_caps_wanted(mds_wanted);
3264 }
3265 if (!cap_map.empty() ||
3266 (auth_cap && (in->get_caps_wanted() & ~CEPH_CAP_PIN))) {
3267 peer_exports[in].swap(cap_map);
3268 in->get(CInode::PIN_IMPORTINGCAPS);
3269 }
3270 DECODE_FINISH(blp);
3271 }
3272
3273 void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
3274 const map<client_t,pair<Session*,uint64_t> >& session_map,
3275 const map<client_t,Capability::Export> &export_map,
3276 map<client_t,Capability::Import> &import_map)
3277 {
3278 const auto& client_ranges = in->get_projected_inode()->client_ranges;
3279 auto r = client_ranges.cbegin();
3280 bool needs_recover = false;
3281
3282 for (auto& it : export_map) {
3283 dout(10) << "for client." << it.first << " on " << *in << dendl;
3284
3285 auto p = session_map.find(it.first);
3286 if (p == session_map.end()) {
3287 dout(10) << " no session for client." << it.first << dendl;
3288 (void)import_map[it.first];
3289 continue;
3290 }
3291
3292 Session *session = p->second.first;
3293
3294 Capability *cap = in->get_client_cap(it.first);
3295 if (!cap) {
3296 cap = in->add_client_cap(it.first, session);
3297 if (peer < 0)
3298 cap->mark_importing();
3299 }
3300
3301 if (auth_cap) {
3302 while (r != client_ranges.cend() && r->first < it.first) {
3303 needs_recover = true;
3304 ++r;
3305 }
3306 if (r != client_ranges.cend() && r->first == it.first) {
3307 cap->mark_clientwriteable();
3308 ++r;
3309 }
3310 }
3311
3312 // Always ask exporter mds to send cap export messages for auth caps.
3313 // For non-auth caps, ask exporter mds to send cap export messages to
3314 // clients who haven't opened sessions. The cap export messages will
3315 // make clients open sessions.
3316 if (auth_cap || !session->get_connection()) {
3317 Capability::Import& im = import_map[it.first];
3318 im.cap_id = cap->get_cap_id();
3319 im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
3320 im.issue_seq = cap->get_last_seq() + 1;
3321 }
3322
3323 if (peer >= 0) {
3324 cap->merge(it.second, auth_cap);
3325 mdcache->do_cap_import(session, in, cap, it.second.cap_id,
3326 it.second.seq, it.second.mseq - 1, peer,
3327 auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
3328 }
3329 }
3330
3331 if (auth_cap) {
3332 if (r != client_ranges.cend())
3333 needs_recover = true;
3334 if (needs_recover)
3335 in->state_set(CInode::STATE_NEEDSRECOVER);
3336 }
3337
3338 if (peer >= 0) {
3339 in->replica_caps_wanted = 0;
3340 in->put(CInode::PIN_IMPORTINGCAPS);
3341 }
3342 }
3343
3344 void Migrator::decode_import_dir(bufferlist::const_iterator& blp,
3345 mds_rank_t oldauth,
3346 CDir *import_root,
3347 EImportStart *le,
3348 LogSegment *ls,
3349 map<CInode*,map<client_t,Capability::Export> >& peer_exports,
3350 list<ScatterLock*>& updated_scatterlocks, int &num_imported)
3351 {
3352 DECODE_START(1, blp);
3353 // set up dir
3354 dirfrag_t df;
3355 decode(df, blp);
3356
3357 CInode *diri = mdcache->get_inode(df.ino);
3358 ceph_assert(diri);
3359 CDir *dir = diri->get_or_open_dirfrag(mds->mdcache, df.frag);
3360 ceph_assert(dir);
3361
3362 dout(7) << *dir << dendl;
3363
3364 if (!dir->freeze_tree_state) {
3365 ceph_assert(dir->get_version() == 0);
3366 dir->freeze_tree_state = import_root->freeze_tree_state;
3367 }
3368
3369 // assimilate state
3370 dir->decode_import(blp, ls);
3371
3372 // adjust replica list
3373 //assert(!dir->is_replica(oldauth)); // not true on failed export
3374 dir->add_replica(oldauth, CDir::EXPORT_NONCE);
3375 if (dir->is_replica(mds->get_nodeid()))
3376 dir->remove_replica(mds->get_nodeid());
3377
3378 // add to journal entry
3379 if (le)
3380 le->metablob.add_import_dir(dir);
3381
3382 // take all waiters on this dir
3383 // NOTE: a pass of imported data is guaranteed to get all of my waiters because
3384 // a replica's presense in my cache implies/forces it's presense in authority's.
3385 MDSContext::vec waiters;
3386 dir->take_waiting(CDir::WAIT_ANY_MASK, waiters);
3387 for (auto c : waiters)
3388 dir->add_waiter(CDir::WAIT_UNFREEZE, c); // UNFREEZE will get kicked both on success or failure
3389
3390 dout(15) << "doing contents" << dendl;
3391
3392 // contents
3393 __u32 nden;
3394 decode(nden, blp);
3395
3396 for (; nden>0; nden--) {
3397 num_imported++;
3398
3399 // dentry
3400 string dname;
3401 snapid_t last;
3402 decode(dname, blp);
3403 decode(last, blp);
3404
3405 CDentry *dn = dir->lookup_exact_snap(dname, last);
3406 if (!dn)
3407 dn = dir->add_null_dentry(dname, 1, last);
3408
3409 dn->decode_import(blp, ls);
3410
3411 dn->add_replica(oldauth, CDentry::EXPORT_NONCE);
3412 if (dn->is_replica(mds->get_nodeid()))
3413 dn->remove_replica(mds->get_nodeid());
3414
3415 // dentry lock in unreadable state can block path traverse
3416 if (dn->lock.get_state() != LOCK_SYNC)
3417 mds->locker->try_eval(&dn->lock, NULL);
3418
3419 dout(15) << " got " << *dn << dendl;
3420
3421 // points to...
3422 char icode;
3423 decode(icode, blp);
3424
3425 if (icode == 'N') {
3426 // null dentry
3427 ceph_assert(dn->get_linkage()->is_null());
3428
3429 // fall thru
3430 }
3431 else if (icode == 'L' || icode == 'l') {
3432 // remote link
3433 inodeno_t ino;
3434 unsigned char d_type;
3435 mempool::mds_co::string alternate_name;
3436
3437 CDentry::decode_remote(icode, ino, d_type, alternate_name, blp);
3438
3439 if (dn->get_linkage()->is_remote()) {
3440 ceph_assert(dn->get_linkage()->get_remote_ino() == ino);
3441 ceph_assert(dn->get_alternate_name() == alternate_name);
3442 } else {
3443 dir->link_remote_inode(dn, ino, d_type);
3444 dn->set_alternate_name(std::move(alternate_name));
3445 }
3446 }
3447 else if (icode == 'I' || icode == 'i') {
3448 // inode
3449 ceph_assert(le);
3450 if (icode == 'i') {
3451 DECODE_START(2, blp);
3452 decode_import_inode(dn, blp, oldauth, ls,
3453 peer_exports, updated_scatterlocks);
3454 ceph_assert(!dn->is_projected());
3455 decode(dn->alternate_name, blp);
3456 DECODE_FINISH(blp);
3457 } else {
3458 decode_import_inode(dn, blp, oldauth, ls,
3459 peer_exports, updated_scatterlocks);
3460 }
3461 }
3462
3463 // add dentry to journal entry
3464 if (le)
3465 le->metablob.add_import_dentry(dn);
3466 }
3467
3468 #ifdef MDS_VERIFY_FRAGSTAT
3469 if (dir->is_complete())
3470 dir->verify_fragstat();
3471 #endif
3472
3473 dir->inode->maybe_export_pin();
3474
3475 dout(7) << " done " << *dir << dendl;
3476 DECODE_FINISH(blp);
3477 }
3478
3479
3480
3481
3482
3483 // authority bystander
3484
3485 void Migrator::handle_export_notify(const cref_t<MExportDirNotify> &m)
3486 {
3487 if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) {
3488 return;
3489 }
3490
3491 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
3492
3493 mds_rank_t from = mds_rank_t(m->get_source().num());
3494 mds_authority_t old_auth = m->get_old_auth();
3495 mds_authority_t new_auth = m->get_new_auth();
3496
3497 if (!dir) {
3498 dout(7) << old_auth << " -> " << new_auth
3499 << " on missing dir " << m->get_dirfrag() << dendl;
3500 } else if (dir->authority() != old_auth) {
3501 dout(7) << "old_auth was " << dir->authority()
3502 << " != " << old_auth << " -> " << new_auth
3503 << " on " << *dir << dendl;
3504 } else {
3505 dout(7) << old_auth << " -> " << new_auth
3506 << " on " << *dir << dendl;
3507 // adjust auth
3508 set<CDir*> have;
3509 mdcache->map_dirfrag_set(m->get_bounds(), have);
3510 mdcache->adjust_bounded_subtree_auth(dir, have, new_auth);
3511
3512 // induce a merge?
3513 mdcache->try_subtree_merge(dir);
3514 }
3515
3516 // send ack
3517 if (m->wants_ack()) {
3518 mds->send_message_mds(make_message<MExportDirNotifyAck>(m->get_dirfrag(), m->get_tid(), m->get_new_auth()), from);
3519 } else {
3520 // aborted. no ack.
3521 dout(7) << "no ack requested" << dendl;
3522 }
3523 }
3524
3525 /** cap exports **/
3526 void Migrator::export_caps(CInode *in)
3527 {
3528 mds_rank_t dest = in->authority().first;
3529 dout(7) << "to mds." << dest << " " << *in << dendl;
3530
3531 ceph_assert(in->is_any_caps());
3532 ceph_assert(!in->is_auth());
3533 ceph_assert(!in->is_ambiguous_auth());
3534 ceph_assert(!in->state_test(CInode::STATE_EXPORTINGCAPS));
3535
3536 auto ex = make_message<MExportCaps>();
3537 ex->ino = in->ino();
3538
3539 encode_export_inode_caps(in, false, ex->cap_bl, ex->client_map, ex->client_metadata_map);
3540
3541 mds->send_message_mds(ex, dest);
3542 }
3543
3544 void Migrator::handle_export_caps_ack(const cref_t<MExportCapsAck> &ack)
3545 {
3546 mds_rank_t from = ack->get_source().num();
3547 CInode *in = mdcache->get_inode(ack->ino);
3548 if (in) {
3549 ceph_assert(!in->is_auth());
3550
3551 dout(10) << *ack << " from "
3552 << ack->get_source() << " on " << *in << dendl;
3553
3554 map<client_t,Capability::Import> imported_caps;
3555 map<client_t,uint64_t> caps_ids;
3556 auto blp = ack->cap_bl.cbegin();
3557 decode(imported_caps, blp);
3558 decode(caps_ids, blp);
3559
3560 for (auto& it : imported_caps) {
3561 Capability *cap = in->get_client_cap(it.first);
3562 if (!cap || cap->get_cap_id() != caps_ids.at(it.first))
3563 continue;
3564
3565 dout(7) << " telling client." << it.first
3566 << " exported caps on " << *in << dendl;
3567 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
3568 cap->get_cap_id(), cap->get_mseq(),
3569 mds->get_osd_epoch_barrier());
3570 m->set_cap_peer(it.second.cap_id, it.second.issue_seq, it.second.mseq, from, 0);
3571 mds->send_message_client_counted(m, it.first);
3572
3573 in->remove_client_cap(it.first);
3574 }
3575
3576 mds->locker->request_inode_file_caps(in);
3577 mds->locker->try_eval(in, CEPH_CAP_LOCKS);
3578 }
3579 }
3580
3581 void Migrator::handle_gather_caps(const cref_t<MGatherCaps> &m)
3582 {
3583 CInode *in = mdcache->get_inode(m->ino);
3584 if (!in)
3585 return;
3586
3587 dout(10) << *m << " from " << m->get_source()
3588 << " on " << *in << dendl;
3589
3590 if (in->is_any_caps() &&
3591 !in->is_auth() &&
3592 !in->is_ambiguous_auth() &&
3593 !in->state_test(CInode::STATE_EXPORTINGCAPS))
3594 export_caps(in);
3595 }
3596
3597 class C_M_LoggedImportCaps : public MigratorLogContext {
3598 CInode *in;
3599 mds_rank_t from;
3600 public:
3601 map<client_t,pair<Session*,uint64_t> > imported_session_map;
3602 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3603
3604 C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
3605 void finish(int r) override {
3606 mig->logged_import_caps(in, from, imported_session_map, peer_exports);
3607 }
3608 };
3609
3610 void Migrator::handle_export_caps(const cref_t<MExportCaps> &ex)
3611 {
3612 dout(10) << *ex << " from " << ex->get_source() << dendl;
3613 CInode *in = mdcache->get_inode(ex->ino);
3614
3615 ceph_assert(in);
3616 ceph_assert(in->is_auth());
3617
3618 // FIXME
3619 if (!in->can_auth_pin()) {
3620 return;
3621 }
3622
3623 in->auth_pin(this);
3624
3625 map<client_t,entity_inst_t> client_map{ex->client_map};
3626 map<client_t,client_metadata_t> client_metadata_map{ex->client_metadata_map};
3627
3628 C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
3629 this, in, mds_rank_t(ex->get_source().num()));
3630
3631 version_t pv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
3632 finish->imported_session_map);
3633 // decode new caps
3634 auto blp = ex->cap_bl.cbegin();
3635 decode_import_inode_caps(in, false, blp, finish->peer_exports);
3636 ceph_assert(!finish->peer_exports.empty()); // thus, inode is pinned.
3637
3638 // journal open client sessions
3639 ESessions *le = new ESessions(pv, std::move(client_map),
3640 std::move(client_metadata_map));
3641 mds->mdlog->start_submit_entry(le, finish);
3642 mds->mdlog->flush();
3643 }
3644
3645
3646 void Migrator::logged_import_caps(CInode *in,
3647 mds_rank_t from,
3648 map<client_t,pair<Session*,uint64_t> >& imported_session_map,
3649 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
3650 {
3651 dout(10) << *in << dendl;
3652 // see export_go() vs export_go_synced()
3653 ceph_assert(in->is_auth());
3654
3655 // force open client sessions and finish cap import
3656 mds->server->finish_force_open_sessions(imported_session_map);
3657
3658 auto it = peer_exports.find(in);
3659 ceph_assert(it != peer_exports.end());
3660
3661 // clients will release caps from the exporter when they receive the cap import message.
3662 map<client_t,Capability::Import> imported_caps;
3663 finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
3664 mds->locker->eval(in, CEPH_CAP_LOCKS, true);
3665
3666 if (!imported_caps.empty()) {
3667 auto ack = make_message<MExportCapsAck>(in->ino());
3668 map<client_t,uint64_t> peer_caps_ids;
3669 for (auto &p : imported_caps )
3670 peer_caps_ids[p.first] = it->second.at(p.first).cap_id;
3671
3672 encode(imported_caps, ack->cap_bl);
3673 encode(peer_caps_ids, ack->cap_bl);
3674 mds->send_message_mds(ack, from);
3675 }
3676
3677 in->auth_unpin(this);
3678 }
3679
3680 Migrator::Migrator(MDSRank *m, MDCache *c) : mds(m), mdcache(c) {
3681 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
3682 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
3683 }
3684
3685 void Migrator::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
3686 {
3687 if (changed.count("mds_max_export_size"))
3688 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
3689 if (changed.count("mds_inject_migrator_session_race")) {
3690 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
3691 dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
3692 }
3693 }