]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/Migrator.cc
import ceph 16.2.6
[ceph.git] / ceph / src / mds / Migrator.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSRank.h"
16#include "MDCache.h"
17#include "CInode.h"
18#include "CDir.h"
19#include "CDentry.h"
20#include "Migrator.h"
21#include "Locker.h"
22#include "Server.h"
23
24#include "MDBalancer.h"
25#include "MDLog.h"
26#include "MDSMap.h"
27#include "Mutation.h"
28
29#include "include/filepath.h"
28e407b8 30#include "common/likely.h"
7c673cae
FG
31
32#include "events/EExport.h"
33#include "events/EImportStart.h"
34#include "events/EImportFinish.h"
35#include "events/ESessions.h"
36
37#include "msg/Messenger.h"
38
39#include "messages/MClientCaps.h"
40
7c673cae
FG
41/*
42 * this is what the dir->dir_auth values look like
43 *
44 * dir_auth authbits
45 * export
46 * me me - before
47 * me, me me - still me, but preparing for export
48 * me, them me - send MExportDir (peer is preparing)
49 * them, me me - journaled EExport
50 * them them - done
51 *
52 * import:
53 * them them - before
54 * me, them me - journaled EImportStart
55 * me me - done
56 *
57 * which implies:
58 * - auth bit is set if i am listed as first _or_ second dir_auth.
59 */
60
61#include "common/config.h"
62
63
64#define dout_context g_ceph_context
65#define dout_subsys ceph_subsys_mds
66#undef dout_prefix
9f95a23c 67#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".mig " << __func__ << " "
7c673cae
FG
68
69
11fdf7f2 70class MigratorContext : public MDSContext {
7c673cae
FG
71protected:
72 Migrator *mig;
73 MDSRank *get_mds() override {
74 return mig->mds;
75 }
76public:
77 explicit MigratorContext(Migrator *mig_) : mig(mig_) {
11fdf7f2 78 ceph_assert(mig != NULL);
7c673cae
FG
79 }
80};
81
82class MigratorLogContext : public MDSLogContextBase {
83protected:
84 Migrator *mig;
85 MDSRank *get_mds() override {
86 return mig->mds;
87 }
88public:
89 explicit MigratorLogContext(Migrator *mig_) : mig(mig_) {
11fdf7f2 90 ceph_assert(mig != NULL);
7c673cae
FG
91 }
92};
93
9f95a23c 94void Migrator::dispatch(const cref_t<Message> &m)
7c673cae
FG
95{
96 switch (m->get_type()) {
97 // import
98 case MSG_MDS_EXPORTDIRDISCOVER:
9f95a23c 99 handle_export_discover(ref_cast<MExportDirDiscover>(m));
7c673cae
FG
100 break;
101 case MSG_MDS_EXPORTDIRPREP:
9f95a23c 102 handle_export_prep(ref_cast<MExportDirPrep>(m));
7c673cae
FG
103 break;
104 case MSG_MDS_EXPORTDIR:
28e407b8
AA
105 if (unlikely(inject_session_race)) {
106 dout(0) << "waiting for inject_session_race" << dendl;
107 mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m));
108 } else {
9f95a23c 109 handle_export_dir(ref_cast<MExportDir>(m));
28e407b8 110 }
7c673cae
FG
111 break;
112 case MSG_MDS_EXPORTDIRFINISH:
9f95a23c 113 handle_export_finish(ref_cast<MExportDirFinish>(m));
7c673cae
FG
114 break;
115 case MSG_MDS_EXPORTDIRCANCEL:
9f95a23c 116 handle_export_cancel(ref_cast<MExportDirCancel>(m));
7c673cae
FG
117 break;
118
119 // export
120 case MSG_MDS_EXPORTDIRDISCOVERACK:
9f95a23c 121 handle_export_discover_ack(ref_cast<MExportDirDiscoverAck>(m));
7c673cae
FG
122 break;
123 case MSG_MDS_EXPORTDIRPREPACK:
9f95a23c 124 handle_export_prep_ack(ref_cast<MExportDirPrepAck>(m));
7c673cae
FG
125 break;
126 case MSG_MDS_EXPORTDIRACK:
9f95a23c 127 handle_export_ack(ref_cast<MExportDirAck>(m));
7c673cae
FG
128 break;
129 case MSG_MDS_EXPORTDIRNOTIFYACK:
9f95a23c 130 handle_export_notify_ack(ref_cast<MExportDirNotifyAck>(m));
11fdf7f2 131 break;
7c673cae
FG
132
133 // export 3rd party (dir_auth adjustments)
134 case MSG_MDS_EXPORTDIRNOTIFY:
9f95a23c 135 handle_export_notify(ref_cast<MExportDirNotify>(m));
7c673cae
FG
136 break;
137
138 // caps
139 case MSG_MDS_EXPORTCAPS:
9f95a23c 140 handle_export_caps(ref_cast<MExportCaps>(m));
7c673cae 141 break;
1adf2230 142 case MSG_MDS_EXPORTCAPSACK:
9f95a23c 143 handle_export_caps_ack(ref_cast<MExportCapsAck>(m));
1adf2230 144 break;
7c673cae 145 case MSG_MDS_GATHERCAPS:
9f95a23c 146 handle_gather_caps(ref_cast<MGatherCaps>(m));
7c673cae
FG
147 break;
148
149 default:
150 derr << "migrator unknown message " << m->get_type() << dendl;
11fdf7f2 151 ceph_abort_msg("migrator unknown message");
7c673cae
FG
152 }
153}
154
155
156class C_MDC_EmptyImport : public MigratorContext {
157 CDir *dir;
158public:
9f95a23c
TL
159 C_MDC_EmptyImport(Migrator *m, CDir *d) :
160 MigratorContext(m), dir(d) {
161 dir->get(CDir::PIN_PTRWAITER);
162 }
7c673cae
FG
163 void finish(int r) override {
164 mig->export_empty_import(dir);
9f95a23c 165 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
166 }
167};
168
169
170void Migrator::export_empty_import(CDir *dir)
171{
9f95a23c 172 dout(7) << *dir << dendl;
11fdf7f2 173 ceph_assert(dir->is_subtree_root());
7c673cae
FG
174
175 if (dir->inode->is_auth()) {
176 dout(7) << " inode is auth" << dendl;
177 return;
178 }
179 if (!dir->is_auth()) {
180 dout(7) << " not auth" << dendl;
181 return;
182 }
183 if (dir->is_freezing() || dir->is_frozen()) {
184 dout(7) << " freezing or frozen" << dendl;
185 return;
186 }
187 if (dir->get_num_head_items() > 0) {
188 dout(7) << " not actually empty" << dendl;
189 return;
190 }
191 if (dir->inode->is_root()) {
192 dout(7) << " root" << dendl;
193 return;
194 }
195
196 mds_rank_t dest = dir->inode->authority().first;
197 //if (mds->is_shutting_down()) dest = 0; // this is more efficient.
198
199 dout(7) << " really empty, exporting to " << dest << dendl;
200 assert (dest != mds->get_nodeid());
201
202 dout(7) << "exporting to mds." << dest
203 << " empty import " << *dir << dendl;
204 export_dir( dir, dest );
205}
206
207void Migrator::find_stale_export_freeze()
208{
209 utime_t now = ceph_clock_now();
210 utime_t cutoff = now;
11fdf7f2 211 cutoff -= g_conf()->mds_freeze_tree_timeout;
7c673cae
FG
212
213
214 /*
215 * We could have situations like:
216 *
217 * - mds.0 authpins an item in subtree A
218 * - mds.0 sends request to mds.1 to authpin an item in subtree B
219 * - mds.0 freezes subtree A
220 * - mds.1 authpins an item in subtree B
221 * - mds.1 sends request to mds.0 to authpin an item in subtree A
222 * - mds.1 freezes subtree B
223 * - mds.1 receives the remote authpin request from mds.0
224 * (wait because subtree B is freezing)
225 * - mds.0 receives the remote authpin request from mds.1
226 * (wait because subtree A is freezing)
227 *
228 *
229 * - client request authpins items in subtree B
230 * - freeze subtree B
231 * - import subtree A which is parent of subtree B
232 * (authpins parent inode of subtree B, see CDir::set_dir_auth())
233 * - freeze subtree A
234 * - client request tries authpinning items in subtree A
235 * (wait because subtree A is freezing)
236 */
237 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
238 p != export_state.end(); ) {
239 CDir* dir = p->first;
240 export_state_t& stat = p->second;
241 ++p;
242 if (stat.state != EXPORT_DISCOVERING && stat.state != EXPORT_FREEZING)
243 continue;
11fdf7f2
TL
244 ceph_assert(dir->freeze_tree_state);
245 if (stat.last_cum_auth_pins != dir->freeze_tree_state->auth_pins) {
246 stat.last_cum_auth_pins = dir->freeze_tree_state->auth_pins;
7c673cae
FG
247 stat.last_cum_auth_pins_change = now;
248 continue;
249 }
250 if (stat.last_cum_auth_pins_change >= cutoff)
251 continue;
252 if (stat.num_remote_waiters > 0 ||
253 (!dir->inode->is_root() && dir->get_parent_dir()->is_freezing())) {
254 export_try_cancel(dir);
255 }
256 }
257}
258
259void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
260{
9f95a23c 261 dout(10) << *dir << dendl;
7c673cae
FG
262
263 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
11fdf7f2 264 ceph_assert(it != export_state.end());
7c673cae
FG
265
266 int state = it->second.state;
267 switch (state) {
268 case EXPORT_LOCKING:
269 dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
91327a77 270 num_locking_exports--;
7c673cae
FG
271 it->second.state = EXPORT_CANCELLED;
272 dir->auth_unpin(this);
273 break;
274 case EXPORT_DISCOVERING:
275 dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
276 it->second.state = EXPORT_CANCELLED;
277 dir->unfreeze_tree(); // cancel the freeze
278 dir->auth_unpin(this);
279 if (notify_peer &&
280 (!mds->is_cluster_degraded() ||
281 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
9f95a23c
TL
282 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
283 it->second.tid),
284 it->second.peer);
7c673cae
FG
285 break;
286
287 case EXPORT_FREEZING:
288 dout(10) << "export state=freezing : canceling freeze" << dendl;
289 it->second.state = EXPORT_CANCELLED;
290 dir->unfreeze_tree(); // cancel the freeze
224ce89b 291 if (dir->is_subtree_root())
f67539c2 292 mdcache->try_subtree_merge(dir);
7c673cae
FG
293 if (notify_peer &&
294 (!mds->is_cluster_degraded() ||
295 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
9f95a23c
TL
296 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
297 it->second.tid),
298 it->second.peer);
7c673cae
FG
299 break;
300
301 // NOTE: state order reversal, warning comes after prepping
302 case EXPORT_WARNING:
303 dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
304 it->second.state = EXPORT_CANCELLING;
305 // fall-thru
306
307 case EXPORT_PREPPING:
308 if (state != EXPORT_WARNING) {
309 dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
310 it->second.state = EXPORT_CANCELLED;
311 }
312
313 {
314 // unpin bounds
315 set<CDir*> bounds;
f67539c2 316 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
317 for (set<CDir*>::iterator q = bounds.begin();
318 q != bounds.end();
319 ++q) {
320 CDir *bd = *q;
321 bd->put(CDir::PIN_EXPORTBOUND);
322 bd->state_clear(CDir::STATE_EXPORTBOUND);
323 }
324 if (state == EXPORT_WARNING) {
325 // notify bystanders
b32b8144 326 export_notify_abort(dir, it->second, bounds);
7c673cae 327 // process delayed expires
f67539c2 328 mdcache->process_delayed_expire(dir);
7c673cae
FG
329 }
330 }
331 dir->unfreeze_tree();
f67539c2 332 mdcache->try_subtree_merge(dir);
7c673cae
FG
333 if (notify_peer &&
334 (!mds->is_cluster_degraded() ||
335 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
9f95a23c
TL
336 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
337 it->second.tid),
338 it->second.peer);
7c673cae
FG
339 break;
340
341 case EXPORT_EXPORTING:
342 dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
343 it->second.state = EXPORT_CANCELLING;
b32b8144 344 export_reverse(dir, it->second);
7c673cae
FG
345 break;
346
347 case EXPORT_LOGGINGFINISH:
348 case EXPORT_NOTIFYING:
349 dout(10) << "export state=loggingfinish|notifying : ignoring dest failure, we were successful." << dendl;
350 // leave export_state, don't clean up now.
351 break;
352 case EXPORT_CANCELLING:
353 break;
354
355 default:
356 ceph_abort();
357 }
358
359 // finish clean-up?
360 if (it->second.state == EXPORT_CANCELLING ||
361 it->second.state == EXPORT_CANCELLED) {
362 MutationRef mut;
363 mut.swap(it->second.mut);
364
365 if (it->second.state == EXPORT_CANCELLED) {
91327a77 366 export_cancel_finish(it);
7c673cae
FG
367 }
368
369 // drop locks
370 if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
371 MDRequestRef mdr = static_cast<MDRequestImpl*>(mut.get());
11fdf7f2 372 ceph_assert(mdr);
f67539c2 373 mdcache->request_kill(mdr);
7c673cae
FG
374 } else if (mut) {
375 mds->locker->drop_locks(mut.get());
376 mut->cleanup();
377 }
378
f67539c2 379 mdcache->show_subtrees();
7c673cae
FG
380
381 maybe_do_queued_export();
382 }
383}
384
91327a77 385void Migrator::export_cancel_finish(export_state_iterator& it)
7c673cae 386{
91327a77
AA
387 CDir *dir = it->first;
388 bool unpin = (it->second.state == EXPORT_CANCELLING);
389 auto parent = std::move(it->second.parent);
390
391 total_exporting_size -= it->second.approx_size;
392 export_state.erase(it);
393
11fdf7f2 394 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
1adf2230 395 dir->clear_exporting();
7c673cae 396
91327a77
AA
397 if (unpin) {
398 // pinned by Migrator::export_notify_abort()
399 dir->auth_unpin(this);
400 }
7c673cae 401 // send pending import_maps? (these need to go out when all exports have finished.)
f67539c2 402 mdcache->maybe_send_pending_resolves();
91327a77
AA
403
404 if (parent)
405 child_export_finish(parent, false);
7c673cae
FG
406}
407
408// ==========================================================
409// mds failure handling
410
411void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
412{
9f95a23c 413 dout(5) << who << dendl;
7c673cae
FG
414
415 // check my exports
416
417 // first add an extra auth_pin on any freezes, so that canceling a
418 // nested freeze doesn't complete one further up the hierarchy and
419 // confuse the shit out of us. we'll remove it after canceling the
420 // freeze. this way no freeze completions run before we want them
421 // to.
9f95a23c 422 std::vector<CDir*> pinned_dirs;
7c673cae
FG
423 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
424 p != export_state.end();
425 ++p) {
426 if (p->second.state == EXPORT_FREEZING) {
427 CDir *dir = p->first;
428 dout(10) << "adding temp auth_pin on freezing " << *dir << dendl;
429 dir->auth_pin(this);
430 pinned_dirs.push_back(dir);
431 }
432 }
433
434 map<CDir*,export_state_t>::iterator p = export_state.begin();
435 while (p != export_state.end()) {
436 map<CDir*,export_state_t>::iterator next = p;
437 ++next;
438 CDir *dir = p->first;
439
440 // abort exports:
441 // - that are going to the failed node
442 // - that aren't frozen yet (to avoid auth_pin deadlock)
443 // - they havne't prepped yet (they may need to discover bounds to do that)
444 if ((p->second.peer == who &&
445 p->second.state != EXPORT_CANCELLING) ||
446 p->second.state == EXPORT_LOCKING ||
447 p->second.state == EXPORT_DISCOVERING ||
448 p->second.state == EXPORT_FREEZING ||
449 p->second.state == EXPORT_PREPPING) {
450 // the guy i'm exporting to failed, or we're just freezing.
451 dout(10) << "cleaning up export state (" << p->second.state << ")"
452 << get_export_statename(p->second.state) << " of " << *dir << dendl;
453 export_try_cancel(dir);
454 } else if (p->second.peer != who) {
455 // bystander failed.
456 if (p->second.warning_ack_waiting.erase(who)) {
457 if (p->second.state == EXPORT_WARNING) {
458 p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
459 // exporter waiting for warning acks, let's fake theirs.
460 dout(10) << "faking export_warning_ack from mds." << who
461 << " on " << *dir << " to mds." << p->second.peer
462 << dendl;
463 if (p->second.warning_ack_waiting.empty())
464 export_go(dir);
465 }
466 }
467 if (p->second.notify_ack_waiting.erase(who)) {
468 // exporter is waiting for notify acks, fake it
469 dout(10) << "faking export_notify_ack from mds." << who
470 << " on " << *dir << " to mds." << p->second.peer
471 << dendl;
472 if (p->second.state == EXPORT_NOTIFYING) {
473 if (p->second.notify_ack_waiting.empty())
474 export_finish(dir);
475 } else if (p->second.state == EXPORT_CANCELLING) {
476 if (p->second.notify_ack_waiting.empty()) {
91327a77 477 export_cancel_finish(p);
7c673cae
FG
478 }
479 }
480 }
481 }
482
483 // next!
484 p = next;
485 }
486
487
488 // check my imports
489 map<dirfrag_t,import_state_t>::iterator q = import_state.begin();
490 while (q != import_state.end()) {
491 map<dirfrag_t,import_state_t>::iterator next = q;
492 ++next;
493 dirfrag_t df = q->first;
f67539c2
TL
494 CInode *diri = mdcache->get_inode(df.ino);
495 CDir *dir = mdcache->get_dirfrag(df);
7c673cae
FG
496
497 if (q->second.peer == who) {
498 if (dir)
499 dout(10) << "cleaning up import state (" << q->second.state << ")"
500 << get_import_statename(q->second.state) << " of " << *dir << dendl;
501 else
502 dout(10) << "cleaning up import state (" << q->second.state << ")"
503 << get_import_statename(q->second.state) << " of " << df << dendl;
504
505 switch (q->second.state) {
506 case IMPORT_DISCOVERING:
507 dout(10) << "import state=discovering : clearing state" << dendl;
508 import_reverse_discovering(df);
509 break;
510
511 case IMPORT_DISCOVERED:
11fdf7f2 512 ceph_assert(diri);
7c673cae
FG
513 dout(10) << "import state=discovered : unpinning inode " << *diri << dendl;
514 import_reverse_discovered(df, diri);
515 break;
516
517 case IMPORT_PREPPING:
11fdf7f2 518 ceph_assert(dir);
7c673cae 519 dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
b32b8144 520 import_reverse_prepping(dir, q->second);
7c673cae
FG
521 break;
522
523 case IMPORT_PREPPED:
11fdf7f2 524 ceph_assert(dir);
7c673cae
FG
525 dout(10) << "import state=prepped : unpinning base+bounds, unfreezing " << *dir << dendl;
526 {
527 set<CDir*> bounds;
f67539c2 528 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
529 import_remove_pins(dir, bounds);
530
531 // adjust auth back to the exporter
f67539c2 532 mdcache->adjust_subtree_auth(dir, q->second.peer);
7c673cae
FG
533
534 // notify bystanders ; wait in aborting state
b32b8144 535 q->second.state = IMPORT_ABORTING;
7c673cae 536 import_notify_abort(dir, bounds);
11fdf7f2 537 ceph_assert(g_conf()->mds_kill_import_at != 10);
7c673cae
FG
538 }
539 break;
540
541 case IMPORT_LOGGINGSTART:
11fdf7f2 542 ceph_assert(dir);
7c673cae
FG
543 dout(10) << "import state=loggingstart : reversing import on " << *dir << dendl;
544 import_reverse(dir);
545 break;
546
547 case IMPORT_ACKING:
11fdf7f2 548 ceph_assert(dir);
7c673cae
FG
549 // hrm. make this an ambiguous import, and wait for exporter recovery to disambiguate
550 dout(10) << "import state=acking : noting ambiguous import " << *dir << dendl;
551 {
552 set<CDir*> bounds;
f67539c2
TL
553 mdcache->get_subtree_bounds(dir, bounds);
554 mdcache->add_ambiguous_import(dir, bounds);
7c673cae
FG
555 }
556 break;
557
558 case IMPORT_FINISHING:
11fdf7f2 559 ceph_assert(dir);
7c673cae
FG
560 dout(10) << "import state=finishing : finishing import on " << *dir << dendl;
561 import_finish(dir, true);
562 break;
563
564 case IMPORT_ABORTING:
11fdf7f2 565 ceph_assert(dir);
7c673cae
FG
566 dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl;
567 break;
568 }
569 } else {
570 auto bystanders_entry = q->second.bystanders.find(who);
571 if (bystanders_entry != q->second.bystanders.end()) {
572 q->second.bystanders.erase(bystanders_entry);
573 if (q->second.state == IMPORT_ABORTING) {
11fdf7f2 574 ceph_assert(dir);
7c673cae
FG
575 dout(10) << "faking export_notify_ack from mds." << who
576 << " on aborting import " << *dir << " from mds." << q->second.peer
577 << dendl;
224ce89b 578 if (q->second.bystanders.empty())
7c673cae 579 import_reverse_unfreeze(dir);
7c673cae
FG
580 }
581 }
582 }
583
584 // next!
585 q = next;
586 }
587
9f95a23c 588 for (const auto& dir : pinned_dirs) {
7c673cae
FG
589 dout(10) << "removing temp auth_pin on " << *dir << dendl;
590 dir->auth_unpin(this);
7c673cae
FG
591 }
592}
593
594
595
596void Migrator::show_importing()
597{
9f95a23c 598 dout(10) << dendl;
7c673cae
FG
599 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
600 p != import_state.end();
601 ++p) {
f67539c2 602 CDir *dir = mdcache->get_dirfrag(p->first);
7c673cae
FG
603 if (dir) {
604 dout(10) << " importing from " << p->second.peer
605 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
606 << " " << p->first << " " << *dir << dendl;
607 } else {
608 dout(10) << " importing from " << p->second.peer
609 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
610 << " " << p->first << dendl;
611 }
612 }
613}
614
615void Migrator::show_exporting()
616{
9f95a23c
TL
617 dout(10) << dendl;
618 for (const auto& [dir, state] : export_state) {
619 dout(10) << " exporting to " << state.peer
620 << ": (" << state.state << ") " << get_export_statename(state.state)
621 << " " << dir->dirfrag() << " " << *dir << dendl;
622 }
7c673cae
FG
623}
624
625
626
627void Migrator::audit()
628{
11fdf7f2 629 if (!g_conf()->subsys.should_gather<ceph_subsys_mds, 5>())
7c673cae
FG
630 return; // hrm.
631
632 // import_state
633 show_importing();
634 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
635 p != import_state.end();
636 ++p) {
637 if (p->second.state == IMPORT_DISCOVERING)
638 continue;
639 if (p->second.state == IMPORT_DISCOVERED) {
f67539c2 640 CInode *in = mdcache->get_inode(p->first.ino);
11fdf7f2 641 ceph_assert(in);
7c673cae
FG
642 continue;
643 }
f67539c2 644 CDir *dir = mdcache->get_dirfrag(p->first);
11fdf7f2 645 ceph_assert(dir);
7c673cae
FG
646 if (p->second.state == IMPORT_PREPPING)
647 continue;
648 if (p->second.state == IMPORT_ABORTING) {
11fdf7f2
TL
649 ceph_assert(!dir->is_ambiguous_dir_auth());
650 ceph_assert(dir->get_dir_auth().first != mds->get_nodeid());
7c673cae
FG
651 continue;
652 }
11fdf7f2
TL
653 ceph_assert(dir->is_ambiguous_dir_auth());
654 ceph_assert(dir->authority().first == mds->get_nodeid() ||
7c673cae
FG
655 dir->authority().second == mds->get_nodeid());
656 }
657
658 // export_state
659 show_exporting();
660 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
661 p != export_state.end();
662 ++p) {
663 CDir *dir = p->first;
664 if (p->second.state == EXPORT_LOCKING ||
665 p->second.state == EXPORT_DISCOVERING ||
666 p->second.state == EXPORT_FREEZING ||
667 p->second.state == EXPORT_CANCELLING)
668 continue;
11fdf7f2
TL
669 ceph_assert(dir->is_ambiguous_dir_auth());
670 ceph_assert(dir->authority().first == mds->get_nodeid() ||
7c673cae
FG
671 dir->authority().second == mds->get_nodeid());
672 }
673
674 // ambiguous+me subtrees should be importing|exporting
675
676 // write me
677}
678
679
680
681
682
683// ==========================================================
684// EXPORT
685
686void Migrator::export_dir_nicely(CDir *dir, mds_rank_t dest)
687{
688 // enqueue
9f95a23c 689 dout(7) << *dir << " to " << dest << dendl;
7c673cae
FG
690 export_queue.push_back(pair<dirfrag_t,mds_rank_t>(dir->dirfrag(), dest));
691
692 maybe_do_queued_export();
693}
694
695void Migrator::maybe_do_queued_export()
696{
697 static bool running;
698 if (running)
699 return;
700 running = true;
91327a77
AA
701
702 uint64_t max_total_size = max_export_size * 2;
703
7c673cae 704 while (!export_queue.empty() &&
91327a77
AA
705 max_total_size > total_exporting_size &&
706 max_total_size - total_exporting_size >=
707 max_export_size * (num_locking_exports + 1)) {
708
7c673cae
FG
709 dirfrag_t df = export_queue.front().first;
710 mds_rank_t dest = export_queue.front().second;
711 export_queue.pop_front();
712
f67539c2 713 CDir *dir = mdcache->get_dirfrag(df);
7c673cae
FG
714 if (!dir) continue;
715 if (!dir->is_auth()) continue;
716
9f95a23c 717 dout(7) << "nicely exporting to mds." << dest << " " << *dir << dendl;
7c673cae
FG
718
719 export_dir(dir, dest);
720 }
91327a77 721
7c673cae
FG
722 running = false;
723}
724
725
726
727
728class C_MDC_ExportFreeze : public MigratorContext {
9f95a23c 729 CDir *dir; // dir i'm exporting
7c673cae
FG
730 uint64_t tid;
731public:
732 C_MDC_ExportFreeze(Migrator *m, CDir *e, uint64_t t) :
9f95a23c
TL
733 MigratorContext(m), dir(e), tid(t) {
734 dir->get(CDir::PIN_PTRWAITER);
735 }
7c673cae
FG
736 void finish(int r) override {
737 if (r >= 0)
9f95a23c
TL
738 mig->export_frozen(dir, tid);
739 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
740 }
741};
742
743
9f95a23c 744bool Migrator::export_try_grab_locks(CDir *dir, MutationRef& mut)
7c673cae 745{
9f95a23c 746 CInode *diri = dir->get_inode();
11fdf7f2 747
9f95a23c
TL
748 if (!diri->filelock.can_wrlock(diri->get_loner()) ||
749 !diri->nestlock.can_wrlock(diri->get_loner()))
750 return false;
11fdf7f2 751
9f95a23c 752 MutationImpl::LockOpVec lov;
11fdf7f2 753
9f95a23c
TL
754 set<CDir*> wouldbe_bounds;
755 set<CInode*> bound_inodes;
f67539c2 756 mdcache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
9f95a23c
TL
757 for (auto& bound : wouldbe_bounds)
758 bound_inodes.insert(bound->get_inode());
759 for (auto& in : bound_inodes)
760 lov.add_rdlock(&in->dirfragtreelock);
761
762 lov.add_rdlock(&diri->dirfragtreelock);
763
764 CInode* in = diri;
765 while (true) {
766 lov.add_rdlock(&in->snaplock);
767 CDentry* pdn = in->get_projected_parent_dn();
768 if (!pdn)
769 break;
770 in = pdn->get_dir()->get_inode();
771 }
7c673cae 772
9f95a23c
TL
773 if (!mds->locker->rdlock_try_set(lov, mut))
774 return false;
7c673cae 775
9f95a23c
TL
776 mds->locker->wrlock_force(&diri->filelock, mut);
777 mds->locker->wrlock_force(&diri->nestlock, mut);
11fdf7f2 778
9f95a23c 779 return true;
7c673cae
FG
780}
781
782
7c673cae
FG
783/** export_dir(dir, dest)
784 * public method to initiate an export.
785 * will fail if the directory is freezing, frozen, unpinnable, or root.
786 */
787void Migrator::export_dir(CDir *dir, mds_rank_t dest)
788{
11fdf7f2
TL
789 ceph_assert(dir->is_auth());
790 ceph_assert(dest != mds->get_nodeid());
7c673cae 791
9f95a23c 792 CDir* parent = dir->inode->get_projected_parent_dir();
f67539c2 793 if (!mds->is_stopping() && !dir->is_exportable(dest) && dir->get_num_head_items() > 0) {
9f95a23c 794 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": dir is export pinned" << dendl;
81eedcae 795 return;
9f95a23c
TL
796 } else if (!(mds->is_active() || mds->is_stopping())) {
797 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": not active" << dendl;
181888fb 798 return;
f67539c2 799 } else if (mdcache->is_readonly()) {
9f95a23c 800 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": read-only FS, no exports for now" << dendl;
7c673cae 801 return;
9f95a23c
TL
802 } else if (!mds->mdsmap->is_active(dest)) {
803 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": destination not active" << dendl;
31f18b77 804 return;
9f95a23c
TL
805 } else if (mds->is_cluster_degraded()) {
806 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": cluster degraded" << dendl;
7c673cae 807 return;
9f95a23c
TL
808 } else if (dir->inode->is_system()) {
809 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is a system directory" << dendl;
7c673cae 810 return;
9f95a23c
TL
811 } else if (dir->is_frozen() || dir->is_freezing()) {
812 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is frozen" << dendl;
7c673cae 813 return;
9f95a23c
TL
814 } else if (dir->state_test(CDir::STATE_EXPORTING)) {
815 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": already exporting" << dendl;
816 return;
817 } else if (parent && parent->inode->is_stray()
818 && parent->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
819 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": in stray directory" << dendl;
7c673cae
FG
820 return;
821 }
822
9f95a23c 823 if (unlikely(g_conf()->mds_thrash_exports)) {
7c673cae 824 // create random subtree bound (which will not be exported)
9f95a23c 825 std::vector<CDir*> ls;
7c673cae
FG
826 for (auto p = dir->begin(); p != dir->end(); ++p) {
827 auto dn = p->second;
828 CDentry::linkage_t *dnl= dn->get_linkage();
829 if (dnl->is_primary()) {
830 CInode *in = dnl->get_inode();
9f95a23c
TL
831 if (in->is_dir()) {
832 auto&& dirs = in->get_nested_dirfrags();
833 ls.insert(std::end(ls), std::begin(dirs), std::end(dirs));
834 }
7c673cae
FG
835 }
836 }
837 if (ls.size() > 0) {
838 int n = rand() % ls.size();
839 auto p = ls.begin();
840 while (n--) ++p;
841 CDir *bd = *p;
842 if (!(bd->is_frozen() || bd->is_freezing())) {
11fdf7f2 843 ceph_assert(bd->is_auth());
7c673cae 844 dir->state_set(CDir::STATE_AUXSUBTREE);
f67539c2 845 mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
9f95a23c 846 dout(7) << "create aux subtree " << *bd << " under " << *dir << dendl;
7c673cae
FG
847 }
848 }
849 }
850
9f95a23c
TL
851 dout(4) << "Starting export to mds." << dest << " " << *dir << dendl;
852
11fdf7f2 853 mds->hit_export_target(dest, -1);
7c673cae
FG
854
855 dir->auth_pin(this);
1adf2230 856 dir->mark_exporting();
7c673cae 857
f67539c2 858 MDRequestRef mdr = mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
7c673cae 859 mdr->more()->export_dir = dir;
9f95a23c 860 mdr->pin(dir);
7c673cae 861
11fdf7f2 862 ceph_assert(export_state.count(dir) == 0);
7c673cae 863 export_state_t& stat = export_state[dir];
91327a77 864 num_locking_exports++;
7c673cae
FG
865 stat.state = EXPORT_LOCKING;
866 stat.peer = dest;
867 stat.tid = mdr->reqid.tid;
868 stat.mut = mdr;
869
f67539c2 870 mdcache->dispatch_request(mdr);
7c673cae
FG
871}
872
91327a77
AA
873/*
874 * check if directory is too large to be export in whole. If it is,
875 * choose some subdirs, whose total size is suitable.
876 */
877void Migrator::maybe_split_export(CDir* dir, uint64_t max_size, bool null_okay,
878 vector<pair<CDir*, size_t> >& results)
7c673cae 879{
91327a77
AA
880 static const unsigned frag_size = 800;
881 static const unsigned inode_size = 1000;
882 static const unsigned cap_size = 80;
883 static const unsigned remote_size = 10;
884 static const unsigned null_size = 1;
885
886 // state for depth-first search
887 struct LevelData {
888 CDir *dir;
889 CDir::dentry_key_map::iterator iter;
890 size_t dirfrag_size = frag_size;
891 size_t subdirs_size = 0;
892 bool complete = true;
893 vector<CDir*> siblings;
894 vector<pair<CDir*, size_t> > subdirs;
895 LevelData(const LevelData&) = default;
896 LevelData(CDir *d) :
897 dir(d), iter(d->begin()) {}
898 };
899
900 vector<LevelData> stack;
901 stack.emplace_back(dir);
902
903 size_t found_size = 0;
904 size_t skipped_size = 0;
905
906 for (;;) {
907 auto& data = stack.back();
908 CDir *cur = data.dir;
909 auto& it = data.iter;
910 auto& dirfrag_size = data.dirfrag_size;
911
912 while(it != cur->end()) {
913 CDentry *dn = it->second;
914 ++it;
915
916 dirfrag_size += dn->name.size();
917 if (dn->get_linkage()->is_null()) {
918 dirfrag_size += null_size;
919 continue;
920 }
921 if (dn->get_linkage()->is_remote()) {
922 dirfrag_size += remote_size;
923 continue;
924 }
925
926 CInode *in = dn->get_linkage()->get_inode();
927 dirfrag_size += inode_size;
928 dirfrag_size += in->get_client_caps().size() * cap_size;
929
930 if (in->is_dir()) {
9f95a23c 931 auto ls = in->get_nested_dirfrags();
91327a77
AA
932 std::reverse(ls.begin(), ls.end());
933
934 bool complete = true;
935 for (auto p = ls.begin(); p != ls.end(); ) {
936 if ((*p)->state_test(CDir::STATE_EXPORTING) ||
937 (*p)->is_freezing_dir() || (*p)->is_frozen_dir()) {
938 complete = false;
939 p = ls.erase(p);
940 } else {
941 ++p;
942 }
943 }
944 if (!complete) {
945 // skip exporting dir's ancestors. because they can't get
946 // frozen (exporting dir's parent inode is auth pinned).
947 for (auto p = stack.rbegin(); p < stack.rend(); ++p) {
948 if (!p->complete)
949 break;
950 p->complete = false;
951 }
952 }
953 if (!ls.empty()) {
954 stack.emplace_back(ls.back());
955 ls.pop_back();
956 stack.back().siblings.swap(ls);
957 break;
958 }
959 }
960 }
961 // did above loop push new dirfrag into the stack?
962 if (stack.back().dir != cur)
963 continue;
964
965 if (data.complete) {
966 auto cur_size = data.subdirs_size + dirfrag_size;
967 // we can do nothing with large dirfrag
968 if (cur_size >= max_size && found_size * 2 > max_size)
969 break;
7c673cae 970
91327a77
AA
971 found_size += dirfrag_size;
972
973 if (stack.size() > 1) {
974 auto& parent = stack[stack.size() - 2];
975 parent.subdirs.emplace_back(cur, cur_size);
976 parent.subdirs_size += cur_size;
977 }
978 } else {
979 // can't merge current dirfrag to its parent if there is skipped subdir
980 results.insert(results.end(), data.subdirs.begin(), data.subdirs.end());
981 skipped_size += dirfrag_size;
982 }
983
984 vector<CDir*> ls;
985 ls.swap(data.siblings);
986
987 stack.pop_back();
988 if (stack.empty())
989 break;
990
991 if (found_size >= max_size)
992 break;
993
994 // next dirfrag
995 if (!ls.empty()) {
996 stack.emplace_back(ls.back());
997 ls.pop_back();
998 stack.back().siblings.swap(ls);
999 }
1000 }
1001
1002 for (auto& p : stack)
1003 results.insert(results.end(), p.subdirs.begin(), p.subdirs.end());
1004
1005 if (results.empty() && (!skipped_size || !null_okay))
1006 results.emplace_back(dir, found_size + skipped_size);
1007}
1008
1009class C_M_ExportDirWait : public MigratorContext {
1010 MDRequestRef mdr;
1011 int count;
1012public:
1013 C_M_ExportDirWait(Migrator *m, MDRequestRef mdr, int count)
1014 : MigratorContext(m), mdr(mdr), count(count) {}
1015 void finish(int r) override {
1016 mig->dispatch_export_dir(mdr, count);
1017 }
1018};
1019
1020void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
1021{
7c673cae 1022 CDir *dir = mdr->more()->export_dir;
9f95a23c 1023 dout(7) << *mdr << " " << *dir << dendl;
91327a77 1024
7c673cae
FG
1025 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1026 if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
1027 // export must have aborted.
1028 dout(7) << "export must have aborted " << *mdr << dendl;
11fdf7f2 1029 ceph_assert(mdr->killed || mdr->aborted);
91327a77
AA
1030 if (mdr->aborted) {
1031 mdr->aborted = false;
f67539c2 1032 mdcache->request_kill(mdr);
91327a77 1033 }
7c673cae
FG
1034 return;
1035 }
11fdf7f2 1036 ceph_assert(it->second.state == EXPORT_LOCKING);
7c673cae 1037
f67539c2 1038 if (mdr->more()->peer_error || dir->is_frozen() || dir->is_freezing()) {
9f95a23c
TL
1039 dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
1040 export_try_cancel(dir);
1041 return;
1042 }
7c673cae 1043
9f95a23c 1044 mds_rank_t dest = it->second.peer;
7c673cae
FG
1045 if (!mds->is_export_target(dest)) {
1046 dout(7) << "dest is not yet an export target" << dendl;
1047 if (count > 3) {
9f95a23c 1048 dout(7) << "dest has not been added as export target after three MDSMap epochs, canceling export" << dendl;
7c673cae
FG
1049 export_try_cancel(dir);
1050 return;
1051 }
224ce89b
WB
1052
1053 mds->locker->drop_locks(mdr.get());
1054 mdr->drop_local_auth_pins();
1055
31f18b77 1056 mds->wait_for_mdsmap(mds->mdsmap->get_epoch(), new C_M_ExportDirWait(this, mdr, count+1));
7c673cae
FG
1057 return;
1058 }
1059
1060 if (!dir->inode->get_parent_dn()) {
1061 dout(7) << "waiting for dir to become stable before export: " << *dir << dendl;
31f18b77 1062 dir->add_waiter(CDir::WAIT_CREATED, new C_M_ExportDirWait(this, mdr, 1));
7c673cae
FG
1063 return;
1064 }
1065
7c673cae 1066 // locks?
9f95a23c
TL
1067 if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
1068 MutationImpl::LockOpVec lov;
1069 // If auth MDS of the subtree root inode is neither the exporter MDS
1070 // nor the importer MDS and it gathers subtree root's fragstat/neststat
1071 // while the subtree is exporting. It's possible that the exporter MDS
1072 // and the importer MDS both are auth MDS of the subtree root or both
1073 // are not auth MDS of the subtree root at the time they receive the
1074 // lock messages. So the auth MDS of the subtree root inode may get no
1075 // or duplicated fragstat/neststat for the subtree root dirfrag.
1076 lov.lock_scatter_gather(&dir->get_inode()->filelock);
1077 lov.lock_scatter_gather(&dir->get_inode()->nestlock);
1078 if (dir->get_inode()->is_auth()) {
1079 dir->get_inode()->filelock.set_scatter_wanted();
1080 dir->get_inode()->nestlock.set_scatter_wanted();
1081 }
1082 lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
1083
1084 if (!mds->locker->acquire_locks(mdr, lov, nullptr, true)) {
1085 if (mdr->aborted)
1086 export_try_cancel(dir);
1087 return;
1088 }
1089
1090 lov.clear();
1091 // bound dftlocks:
1092 // NOTE: We need to take an rdlock on bounding dirfrags during
1093 // migration for a rather irritating reason: when we export the
1094 // bound inode, we need to send scatterlock state for the dirfrags
1095 // as well, so that the new auth also gets the correct info. If we
1096 // race with a refragment, this info is useless, as we can't
1097 // redivvy it up. And it's needed for the scatterlocks to work
1098 // properly: when the auth is in a sync/lock state it keeps each
1099 // dirfrag's portion in the local (auth OR replica) dirfrag.
1100 set<CDir*> wouldbe_bounds;
1101 set<CInode*> bound_inodes;
f67539c2 1102 mdcache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
9f95a23c
TL
1103 for (auto& bound : wouldbe_bounds)
1104 bound_inodes.insert(bound->get_inode());
1105 for (auto& in : bound_inodes)
1106 lov.add_rdlock(&in->dirfragtreelock);
1107
1108 if (!mds->locker->rdlock_try_set(lov, mdr))
1109 return;
1110
1111 if (!mds->locker->try_rdlock_snap_layout(dir->get_inode(), mdr))
1112 return;
1113
1114 mdr->locking_state |= MutationImpl::ALL_LOCKED;
7c673cae
FG
1115 }
1116
11fdf7f2 1117 ceph_assert(g_conf()->mds_kill_export_at != 1);
7c673cae 1118
91327a77
AA
1119 auto parent = it->second.parent;
1120
1121 vector<pair<CDir*, size_t> > results;
1122 maybe_split_export(dir, max_export_size, (bool)parent, results);
1123
1124 if (results.size() == 1 && results.front().first == dir) {
1125 num_locking_exports--;
1126 it->second.state = EXPORT_DISCOVERING;
1127 // send ExportDirDiscover (ask target)
1128 filepath path;
1129 dir->inode->make_path(path);
9f95a23c
TL
1130 auto discover = make_message<MExportDirDiscover>(dir->dirfrag(), path,
1131 mds->get_nodeid(),
1132 it->second.tid);
91327a77 1133 mds->send_message_mds(discover, dest);
11fdf7f2 1134 ceph_assert(g_conf()->mds_kill_export_at != 2);
91327a77
AA
1135
1136 it->second.last_cum_auth_pins_change = ceph_clock_now();
1137 it->second.approx_size = results.front().second;
91327a77
AA
1138 total_exporting_size += it->second.approx_size;
1139
1140 // start the freeze, but hold it up with an auth_pin.
1141 dir->freeze_tree();
11fdf7f2 1142 ceph_assert(dir->is_freezing_tree());
91327a77
AA
1143 dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir, it->second.tid));
1144 return;
1145 }
1146
1147 if (parent) {
1148 parent->pending_children += results.size();
1149 } else {
1150 parent = std::make_shared<export_base_t>(dir->dirfrag(), dest,
1151 results.size(), export_queue_gen);
1152 }
1153
1154 if (results.empty()) {
1155 dout(7) << "subtree's children all are under exporting, retry rest parts of parent export "
1156 << parent->dirfrag << dendl;
1157 parent->restart = true;
1158 } else {
1159 dout(7) << "subtree is too large, splitting it into: " << dendl;
1160 }
1161
1162 for (auto& p : results) {
1163 CDir *sub = p.first;
11fdf7f2 1164 ceph_assert(sub != dir);
91327a77
AA
1165 dout(7) << " sub " << *sub << dendl;
1166
1167 sub->auth_pin(this);
1168 sub->mark_exporting();
1169
f67539c2 1170 MDRequestRef _mdr = mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
91327a77 1171 _mdr->more()->export_dir = sub;
9f95a23c 1172 _mdr->pin(sub);
91327a77 1173
11fdf7f2 1174 ceph_assert(export_state.count(sub) == 0);
91327a77
AA
1175 auto& stat = export_state[sub];
1176 num_locking_exports++;
1177 stat.state = EXPORT_LOCKING;
1178 stat.peer = dest;
1179 stat.tid = _mdr->reqid.tid;
1180 stat.mut = _mdr;
1181 stat.parent = parent;
f67539c2 1182 mdcache->dispatch_request(_mdr);
91327a77
AA
1183 }
1184
1185 // cancel the original one
1186 export_try_cancel(dir);
1187}
1188
91327a77
AA
1189void Migrator::child_export_finish(std::shared_ptr<export_base_t>& parent, bool success)
1190{
1191 if (success)
1192 parent->restart = true;
1193 if (--parent->pending_children == 0) {
1194 if (parent->restart &&
1195 parent->export_queue_gen == export_queue_gen) {
f67539c2 1196 CDir *origin = mdcache->get_dirfrag(parent->dirfrag);
91327a77
AA
1197 if (origin && origin->is_auth()) {
1198 dout(7) << "child_export_finish requeue " << *origin << dendl;
1199 export_queue.emplace_front(origin->dirfrag(), parent->dest);
1200 }
1201 }
1202 }
7c673cae
FG
1203}
1204
1205/*
1206 * called on receipt of MExportDirDiscoverAck
1207 * the importer now has the directory's _inode_ in memory, and pinned.
7c673cae 1208 */
9f95a23c 1209void Migrator::handle_export_discover_ack(const cref_t<MExportDirDiscoverAck> &m)
7c673cae 1210{
f67539c2 1211 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae 1212 mds_rank_t dest(m->get_source().num());
11fdf7f2 1213 ceph_assert(dir);
7c673cae 1214
9f95a23c 1215 dout(7) << "from " << m->get_source()
7c673cae
FG
1216 << " on " << *dir << dendl;
1217
11fdf7f2 1218 mds->hit_export_target(dest, -1);
7c673cae
FG
1219
1220 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1221 if (it == export_state.end() ||
1222 it->second.tid != m->get_tid() ||
1223 it->second.peer != dest) {
1224 dout(7) << "must have aborted" << dendl;
1225 } else {
11fdf7f2 1226 ceph_assert(it->second.state == EXPORT_DISCOVERING);
c07f9fc5
FG
1227
1228 if (m->is_success()) {
1229 // release locks to avoid deadlock
1230 MDRequestRef mdr = static_cast<MDRequestImpl*>(it->second.mut.get());
11fdf7f2 1231 ceph_assert(mdr);
f67539c2 1232 mdcache->request_finish(mdr);
c07f9fc5
FG
1233 it->second.mut.reset();
1234 // freeze the subtree
1235 it->second.state = EXPORT_FREEZING;
1236 dir->auth_unpin(this);
11fdf7f2 1237 ceph_assert(g_conf()->mds_kill_export_at != 3);
c07f9fc5
FG
1238
1239 } else {
1240 dout(7) << "peer failed to discover (not active?), canceling" << dendl;
1241 export_try_cancel(dir, false);
1242 }
7c673cae 1243 }
7c673cae
FG
1244}
1245
1246class C_M_ExportSessionsFlushed : public MigratorContext {
1247 CDir *dir;
1248 uint64_t tid;
1249public:
9f95a23c
TL
1250 C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t) :
1251 MigratorContext(m), dir(d), tid(t) {
1252 dir->get(CDir::PIN_PTRWAITER);
7c673cae
FG
1253 }
1254 void finish(int r) override {
1255 mig->export_sessions_flushed(dir, tid);
9f95a23c 1256 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
1257 }
1258};
1259
1260void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
1261{
9f95a23c 1262 dout(7) << *dir << dendl;
7c673cae
FG
1263
1264 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1265 if (it == export_state.end() ||
1266 it->second.state == EXPORT_CANCELLING ||
1267 it->second.tid != tid) {
1268 // export must have aborted.
1269 dout(7) << "export must have aborted on " << dir << dendl;
1270 return;
1271 }
1272
11fdf7f2
TL
1273 ceph_assert(it->second.state == EXPORT_PREPPING || it->second.state == EXPORT_WARNING);
1274 ceph_assert(it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0);
7c673cae
FG
1275 it->second.warning_ack_waiting.erase(MDS_RANK_NONE);
1276 if (it->second.state == EXPORT_WARNING && it->second.warning_ack_waiting.empty())
1277 export_go(dir); // start export.
1278}
1279
9f95a23c
TL
1280void Migrator::encode_export_prep_trace(bufferlist &final_bl, CDir *bound,
1281 CDir *dir, export_state_t &es,
1282 set<inodeno_t> &inodes_added,
1283 set<dirfrag_t> &dirfrags_added)
1284{
1285 ENCODE_START(1, 1, final_bl);
1286
1287 dout(7) << " started to encode dir " << *bound << dendl;
1288 CDir *cur = bound;
1289 bufferlist tracebl;
1290 char start = '-';
1291
1292 while (1) {
1293 // don't repeat inodes
1294 if (inodes_added.count(cur->inode->ino()))
1295 break;
1296 inodes_added.insert(cur->inode->ino());
1297
1298 // prepend dentry + inode
1299 ceph_assert(cur->inode->is_auth());
1300 bufferlist bl;
f67539c2 1301 mdcache->encode_replica_dentry(cur->inode->parent, es.peer, bl);
9f95a23c 1302 dout(7) << " added " << *cur->inode->parent << dendl;
f67539c2 1303 mdcache->encode_replica_inode(cur->inode, es.peer, bl, mds->mdsmap->get_up_features());
9f95a23c
TL
1304 dout(7) << " added " << *cur->inode << dendl;
1305 bl.claim_append(tracebl);
f67539c2 1306 tracebl = std::move(bl);
9f95a23c
TL
1307
1308 cur = cur->get_parent_dir();
1309 // don't repeat dirfrags
1310 if (dirfrags_added.count(cur->dirfrag()) || cur == dir) {
1311 start = 'd'; // start with dentry
1312 break;
1313 }
1314 dirfrags_added.insert(cur->dirfrag());
1315
1316 // prepend dir
f67539c2 1317 mdcache->encode_replica_dir(cur, es.peer, bl);
9f95a23c
TL
1318 dout(7) << " added " << *cur << dendl;
1319 bl.claim_append(tracebl);
f67539c2 1320 tracebl = std::move(bl);
9f95a23c
TL
1321 start = 'f'; // start with dirfrag
1322 }
1323 dirfrag_t df = cur->dirfrag();
1324 encode(df, final_bl);
1325 encode(start, final_bl);
1326 final_bl.claim_append(tracebl);
1327
1328 ENCODE_FINISH(final_bl);
1329}
1330
7c673cae
FG
1331void Migrator::export_frozen(CDir *dir, uint64_t tid)
1332{
9f95a23c 1333 dout(7) << *dir << dendl;
7c673cae
FG
1334
1335 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1336 if (it == export_state.end() || it->second.tid != tid) {
1337 dout(7) << "export must have aborted" << dendl;
1338 return;
1339 }
1340
11fdf7f2
TL
1341 ceph_assert(it->second.state == EXPORT_FREEZING);
1342 ceph_assert(dir->is_frozen_tree_root());
7c673cae 1343
9f95a23c 1344 it->second.mut = new MutationImpl();
7c673cae
FG
1345
1346 // ok, try to grab all my locks.
9f95a23c 1347 CInode *diri = dir->get_inode();
7c673cae 1348 if ((diri->is_auth() && diri->is_frozen()) ||
9f95a23c 1349 !export_try_grab_locks(dir, it->second.mut)) {
7c673cae
FG
1350 dout(7) << "export_dir couldn't acquire all needed locks, failing. "
1351 << *dir << dendl;
91327a77 1352 export_try_cancel(dir);
7c673cae
FG
1353 return;
1354 }
1355
7c673cae
FG
1356 if (diri->is_auth())
1357 it->second.mut->auth_pin(diri);
7c673cae 1358
f67539c2 1359 mdcache->show_subtrees();
7c673cae 1360
224ce89b 1361 // CDir::_freeze_tree() should have forced it into subtree.
11fdf7f2 1362 ceph_assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
7c673cae 1363 // note the bounds.
7c673cae 1364 set<CDir*> bounds;
f67539c2 1365 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
1366
1367 // generate prep message, log entry.
9f95a23c 1368 auto prep = make_message<MExportDirPrep>(dir->dirfrag(), it->second.tid);
7c673cae
FG
1369
1370 // include list of bystanders
181888fb
FG
1371 for (const auto &p : dir->get_replicas()) {
1372 if (p.first != it->second.peer) {
1373 dout(10) << "bystander mds." << p.first << dendl;
1374 prep->add_bystander(p.first);
7c673cae
FG
1375 }
1376 }
1377
1378 // include base dirfrag
f67539c2 1379 mdcache->encode_replica_dir(dir, it->second.peer, prep->basedir);
7c673cae
FG
1380
1381 /*
1382 * include spanning tree for all nested exports.
1383 * these need to be on the destination _before_ the final export so that
1384 * dir_auth updates on any nested exports are properly absorbed.
1385 * this includes inodes and dirfrags included in the subtree, but
1386 * only the inodes at the bounds.
1387 *
1388 * each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
1389 */
1390 set<inodeno_t> inodes_added;
1391 set<dirfrag_t> dirfrags_added;
1392
1393 // check bounds
9f95a23c 1394 for (auto &bound : bounds){
7c673cae 1395 // pin it.
91327a77
AA
1396 bound->get(CDir::PIN_EXPORTBOUND);
1397 bound->state_set(CDir::STATE_EXPORTBOUND);
9f95a23c 1398
7c673cae
FG
1399 dout(7) << " export bound " << *bound << dendl;
1400 prep->add_bound( bound->dirfrag() );
9f95a23c 1401
7c673cae 1402 bufferlist final_bl;
9f95a23c 1403 encode_export_prep_trace(final_bl, bound, dir, it->second, inodes_added, dirfrags_added);
7c673cae
FG
1404 prep->add_trace(final_bl);
1405 }
1406
1407 // send.
1408 it->second.state = EXPORT_PREPPING;
1409 mds->send_message_mds(prep, it->second.peer);
11fdf7f2 1410 assert (g_conf()->mds_kill_export_at != 4);
7c673cae
FG
1411
1412 // make sure any new instantiations of caps are flushed out
11fdf7f2 1413 ceph_assert(it->second.warning_ack_waiting.empty());
7c673cae 1414
91327a77
AA
1415 set<client_t> export_client_set;
1416 get_export_client_set(dir, export_client_set);
1417
7c673cae
FG
1418 MDSGatherBuilder gather(g_ceph_context);
1419 mds->server->flush_client_sessions(export_client_set, gather);
1420 if (gather.has_subs()) {
1421 it->second.warning_ack_waiting.insert(MDS_RANK_NONE);
1422 gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
1423 gather.activate();
1424 }
1425}
1426
91327a77 1427void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
7c673cae 1428{
91327a77 1429 deque<CDir*> dfs;
7c673cae
FG
1430 dfs.push_back(dir);
1431 while (!dfs.empty()) {
1432 CDir *dir = dfs.front();
1433 dfs.pop_front();
91327a77 1434 for (auto& p : *dir) {
94b18763 1435 CDentry *dn = p.second;
91327a77 1436 if (!dn->get_linkage()->is_primary())
7c673cae
FG
1437 continue;
1438 CInode *in = dn->get_linkage()->get_inode();
1439 if (in->is_dir()) {
1440 // directory?
9f95a23c 1441 auto&& ls = in->get_dirfrags();
91327a77
AA
1442 for (auto& q : ls) {
1443 if (!q->state_test(CDir::STATE_EXPORTBOUND)) {
7c673cae 1444 // include nested dirfrag
11fdf7f2 1445 ceph_assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
91327a77 1446 dfs.push_back(q); // it's ours, recurse (later)
7c673cae
FG
1447 }
1448 }
1449 }
91327a77
AA
1450 for (auto& q : in->get_client_caps()) {
1451 client_set.insert(q.first);
b32b8144 1452 }
7c673cae
FG
1453 }
1454 }
1455}
1456
1457void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
1458{
11fdf7f2
TL
1459 for (const auto &p : in->get_client_caps()) {
1460 client_set.insert(p.first);
1461 }
7c673cae
FG
1462}
1463
9f95a23c 1464void Migrator::handle_export_prep_ack(const cref_t<MExportDirPrepAck> &m)
7c673cae 1465{
f67539c2 1466 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae 1467 mds_rank_t dest(m->get_source().num());
11fdf7f2 1468 ceph_assert(dir);
7c673cae 1469
9f95a23c 1470 dout(7) << *dir << dendl;
7c673cae 1471
11fdf7f2 1472 mds->hit_export_target(dest, -1);
7c673cae
FG
1473
1474 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1475 if (it == export_state.end() ||
1476 it->second.tid != m->get_tid() ||
1477 it->second.peer != mds_rank_t(m->get_source().num())) {
1478 // export must have aborted.
1479 dout(7) << "export must have aborted" << dendl;
7c673cae
FG
1480 return;
1481 }
11fdf7f2 1482 ceph_assert(it->second.state == EXPORT_PREPPING);
7c673cae
FG
1483
1484 if (!m->is_success()) {
c07f9fc5 1485 dout(7) << "peer couldn't acquire all needed locks or wasn't active, canceling" << dendl;
7c673cae 1486 export_try_cancel(dir, false);
7c673cae
FG
1487 return;
1488 }
1489
11fdf7f2 1490 assert (g_conf()->mds_kill_export_at != 5);
7c673cae
FG
1491 // send warnings
1492 set<CDir*> bounds;
f67539c2 1493 mdcache->get_subtree_bounds(dir, bounds);
7c673cae 1494
11fdf7f2 1495 ceph_assert(it->second.warning_ack_waiting.empty() ||
7c673cae
FG
1496 (it->second.warning_ack_waiting.size() == 1 &&
1497 it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0));
11fdf7f2 1498 ceph_assert(it->second.notify_ack_waiting.empty());
7c673cae 1499
181888fb
FG
1500 for (const auto &p : dir->get_replicas()) {
1501 if (p.first == it->second.peer) continue;
7c673cae 1502 if (mds->is_cluster_degraded() &&
181888fb 1503 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first))
7c673cae 1504 continue; // only if active
181888fb
FG
1505 it->second.warning_ack_waiting.insert(p.first);
1506 it->second.notify_ack_waiting.insert(p.first); // we'll eventually get a notifyack, too!
7c673cae 1507
9f95a23c 1508 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), it->second.tid, true,
11fdf7f2
TL
1509 mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
1510 mds_authority_t(mds->get_nodeid(),it->second.peer));
1511 for (auto &cdir : bounds) {
1512 notify->get_bounds().push_back(cdir->dirfrag());
1513 }
181888fb 1514 mds->send_message_mds(notify, p.first);
7c673cae
FG
1515
1516 }
1517
1518 it->second.state = EXPORT_WARNING;
1519
11fdf7f2 1520 ceph_assert(g_conf()->mds_kill_export_at != 6);
7c673cae
FG
1521 // nobody to warn?
1522 if (it->second.warning_ack_waiting.empty())
1523 export_go(dir); // start export.
7c673cae
FG
1524}
1525
1526
1527class C_M_ExportGo : public MigratorContext {
1528 CDir *dir;
1529 uint64_t tid;
1530public:
1531 C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
1532 MigratorContext(m), dir(d), tid(t) {
9f95a23c
TL
1533 dir->get(CDir::PIN_PTRWAITER);
1534 }
7c673cae
FG
1535 void finish(int r) override {
1536 mig->export_go_synced(dir, tid);
9f95a23c 1537 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
1538 }
1539};
1540
1541void Migrator::export_go(CDir *dir)
1542{
b32b8144 1543 auto it = export_state.find(dir);
11fdf7f2 1544 ceph_assert(it != export_state.end());
9f95a23c 1545 dout(7) << *dir << " to " << it->second.peer << dendl;
7c673cae
FG
1546
1547 // first sync log to flush out e.g. any cap imports
b32b8144 1548 mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, it->second.tid));
7c673cae
FG
1549 mds->mdlog->flush();
1550}
1551
1552void Migrator::export_go_synced(CDir *dir, uint64_t tid)
1553{
1554 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1555 if (it == export_state.end() ||
1556 it->second.state == EXPORT_CANCELLING ||
1557 it->second.tid != tid) {
1558 // export must have aborted.
1559 dout(7) << "export must have aborted on " << dir << dendl;
1560 return;
1561 }
11fdf7f2 1562 ceph_assert(it->second.state == EXPORT_WARNING);
7c673cae
FG
1563 mds_rank_t dest = it->second.peer;
1564
9f95a23c 1565 dout(7) << *dir << " to " << dest << dendl;
7c673cae 1566
f67539c2 1567 mdcache->show_subtrees();
7c673cae
FG
1568
1569 it->second.state = EXPORT_EXPORTING;
11fdf7f2 1570 ceph_assert(g_conf()->mds_kill_export_at != 7);
7c673cae 1571
11fdf7f2 1572 ceph_assert(dir->is_frozen_tree_root());
7c673cae
FG
1573
1574 // set ambiguous auth
f67539c2 1575 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), dest);
7c673cae
FG
1576
1577 // take away the popularity we're sending.
11fdf7f2 1578 mds->balancer->subtract_export(dir);
7c673cae
FG
1579
1580 // fill export message with cache data
9f95a23c 1581 auto req = make_message<MExportDir>(dir->dirfrag(), it->second.tid);
7c673cae 1582 map<client_t,entity_inst_t> exported_client_map;
11fdf7f2 1583 map<client_t,client_metadata_t> exported_client_metadata_map;
9f95a23c
TL
1584 uint64_t num_exported_inodes = 0;
1585 encode_export_dir(req->export_data, dir, // recur start point
1586 exported_client_map, exported_client_metadata_map,
1587 num_exported_inodes);
11fdf7f2
TL
1588 encode(exported_client_map, req->client_map, mds->mdsmap->get_up_features());
1589 encode(exported_client_metadata_map, req->client_map);
7c673cae
FG
1590
1591 // add bounds to message
1592 set<CDir*> bounds;
f67539c2 1593 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
1594 for (set<CDir*>::iterator p = bounds.begin();
1595 p != bounds.end();
1596 ++p)
1597 req->add_export((*p)->dirfrag());
1598
1599 // send
1600 mds->send_message_mds(req, dest);
11fdf7f2 1601 ceph_assert(g_conf()->mds_kill_export_at != 8);
7c673cae 1602
11fdf7f2 1603 mds->hit_export_target(dest, num_exported_inodes+1);
7c673cae
FG
1604
1605 // stats
1606 if (mds->logger) mds->logger->inc(l_mds_exported);
1607 if (mds->logger) mds->logger->inc(l_mds_exported_inodes, num_exported_inodes);
1608
f67539c2 1609 mdcache->show_subtrees();
7c673cae
FG
1610}
1611
1612
1613/** encode_export_inode
1614 * update our local state for this inode to export.
1615 * encode relevant state to be sent over the wire.
1616 * used by: encode_export_dir, file_rename (if foreign)
1617 *
1618 * FIXME: the separation between CInode.encode_export and these methods
1619 * is pretty arbitrary and dumb.
1620 */
1621void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
11fdf7f2
TL
1622 map<client_t,entity_inst_t>& exported_client_map,
1623 map<client_t,client_metadata_t>& exported_client_metadata_map)
7c673cae 1624{
9f95a23c
TL
1625 ENCODE_START(1, 1, enc_state);
1626 dout(7) << *in << dendl;
11fdf7f2 1627 ceph_assert(!in->is_replica(mds->get_nodeid()));
7c673cae 1628
f67539c2 1629 encode(in->ino(), enc_state);
11fdf7f2 1630 encode(in->last, enc_state);
7c673cae
FG
1631 in->encode_export(enc_state);
1632
1633 // caps
11fdf7f2 1634 encode_export_inode_caps(in, true, enc_state, exported_client_map, exported_client_metadata_map);
9f95a23c 1635 ENCODE_FINISH(enc_state);
7c673cae
FG
1636}
1637
1638void Migrator::encode_export_inode_caps(CInode *in, bool auth_cap, bufferlist& bl,
11fdf7f2
TL
1639 map<client_t,entity_inst_t>& exported_client_map,
1640 map<client_t,client_metadata_t>& exported_client_metadata_map)
7c673cae 1641{
9f95a23c
TL
1642 ENCODE_START(1, 1, bl);
1643 dout(20) << *in << dendl;
7c673cae
FG
1644 // encode caps
1645 map<client_t,Capability::Export> cap_map;
1646 in->export_client_caps(cap_map);
11fdf7f2 1647 encode(cap_map, bl);
7c673cae 1648 if (auth_cap) {
11fdf7f2 1649 encode(in->get_mds_caps_wanted(), bl);
7c673cae
FG
1650
1651 in->state_set(CInode::STATE_EXPORTINGCAPS);
1652 in->get(CInode::PIN_EXPORTINGCAPS);
1653 }
1654
1655 // make note of clients named by exported capabilities
11fdf7f2
TL
1656 for (const auto &p : in->get_client_caps()) {
1657 if (exported_client_map.count(p.first))
1658 continue;
1659 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p.first.v));
1660 exported_client_map[p.first] = session->info.inst;
1661 exported_client_metadata_map[p.first] = session->info.client_metadata;
1662 }
9f95a23c 1663 ENCODE_FINISH(bl);
7c673cae
FG
1664}
1665
1666void Migrator::finish_export_inode_caps(CInode *in, mds_rank_t peer,
1667 map<client_t,Capability::Import>& peer_imported)
1668{
9f95a23c 1669 dout(20) << *in << dendl;
7c673cae
FG
1670
1671 in->state_clear(CInode::STATE_EXPORTINGCAPS);
1672 in->put(CInode::PIN_EXPORTINGCAPS);
1673
1674 // tell (all) clients about migrating caps..
11fdf7f2
TL
1675 for (const auto &p : in->get_client_caps()) {
1676 const Capability *cap = &p.second;
9f95a23c 1677 dout(7) << p.first
7c673cae 1678 << " exported caps on " << *in << dendl;
9f95a23c
TL
1679 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
1680 cap->get_cap_id(), cap->get_mseq(),
1681 mds->get_osd_epoch_barrier());
11fdf7f2
TL
1682 map<client_t,Capability::Import>::iterator q = peer_imported.find(p.first);
1683 ceph_assert(q != peer_imported.end());
28e407b8
AA
1684 m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
1685 (q->second.cap_id > 0 ? peer : -1), 0);
11fdf7f2 1686 mds->send_message_client_counted(m, p.first);
7c673cae
FG
1687 }
1688 in->clear_client_caps_after_export();
1689 mds->locker->eval(in, CEPH_CAP_LOCKS);
1690}
1691
11fdf7f2 1692void Migrator::finish_export_inode(CInode *in, mds_rank_t peer,
7c673cae 1693 map<client_t,Capability::Import>& peer_imported,
11fdf7f2 1694 MDSContext::vec& finished)
7c673cae 1695{
9f95a23c 1696 dout(12) << *in << dendl;
7c673cae
FG
1697
1698 // clean
1699 if (in->is_dirty())
1700 in->mark_clean();
1701
1702 // clear/unpin cached_by (we're no longer the authority)
1703 in->clear_replica_map();
1704
1705 // twiddle lock states for auth -> replica transition
1706 in->authlock.export_twiddle();
1707 in->linklock.export_twiddle();
1708 in->dirfragtreelock.export_twiddle();
1709 in->filelock.export_twiddle();
1710 in->nestlock.export_twiddle();
1711 in->xattrlock.export_twiddle();
1712 in->snaplock.export_twiddle();
1713 in->flocklock.export_twiddle();
1714 in->policylock.export_twiddle();
1715
1716 // mark auth
11fdf7f2 1717 ceph_assert(in->is_auth());
7c673cae
FG
1718 in->state_clear(CInode::STATE_AUTH);
1719 in->replica_nonce = CInode::EXPORT_NONCE;
1720
1721 in->clear_dirty_rstat();
1722
1723 // no more auth subtree? clear scatter dirty
1724 if (!in->has_subtree_root_dirfrag(mds->get_nodeid()))
1725 in->clear_scatter_dirty();
1726
7c673cae
FG
1727 in->clear_dirty_parent();
1728
f91f0fd5
TL
1729 in->clear_clientwriteable();
1730
7c673cae
FG
1731 in->clear_file_locks();
1732
1733 // waiters
1734 in->take_waiting(CInode::WAIT_ANY_MASK, finished);
1735
11fdf7f2 1736 in->finish_export();
7c673cae
FG
1737
1738 finish_export_inode_caps(in, peer, peer_imported);
7c673cae
FG
1739}
1740
9f95a23c 1741void Migrator::encode_export_dir(bufferlist& exportbl,
7c673cae
FG
1742 CDir *dir,
1743 map<client_t,entity_inst_t>& exported_client_map,
9f95a23c
TL
1744 map<client_t,client_metadata_t>& exported_client_metadata_map,
1745 uint64_t &num_exported)
7c673cae 1746{
9f95a23c
TL
1747 // This has to be declared before ENCODE_STARTED as it will need to be referenced after ENCODE_FINISH.
1748 std::vector<CDir*> subdirs;
1749
1750 ENCODE_START(1, 1, exportbl);
1751 dout(7) << *dir << " " << dir->get_num_head_items() << " head items" << dendl;
7c673cae 1752
11fdf7f2 1753 ceph_assert(dir->get_projected_version() == dir->get_version());
7c673cae
FG
1754
1755#ifdef MDS_VERIFY_FRAGSTAT
1756 if (dir->is_complete())
1757 dir->verify_fragstat();
1758#endif
1759
1760 // dir
1761 dirfrag_t df = dir->dirfrag();
11fdf7f2 1762 encode(df, exportbl);
7c673cae
FG
1763 dir->encode_export(exportbl);
1764
1765 __u32 nden = dir->items.size();
11fdf7f2 1766 encode(nden, exportbl);
7c673cae
FG
1767
1768 // dentries
94b18763
FG
1769 for (auto &p : *dir) {
1770 CDentry *dn = p.second;
7c673cae 1771 CInode *in = dn->get_linkage()->get_inode();
7c673cae
FG
1772
1773 num_exported++;
1774
1775 // -- dentry
9f95a23c 1776 dout(7) << " exporting " << *dn << dendl;
7c673cae
FG
1777
1778 // dn name
11fdf7f2
TL
1779 encode(dn->get_name(), exportbl);
1780 encode(dn->last, exportbl);
7c673cae
FG
1781
1782 // state
1783 dn->encode_export(exportbl);
1784
1785 // points to...
1786
1787 // null dentry?
1788 if (dn->get_linkage()->is_null()) {
1789 exportbl.append("N", 1); // null dentry
1790 continue;
1791 }
1792
1793 if (dn->get_linkage()->is_remote()) {
7c673cae
FG
1794 inodeno_t ino = dn->get_linkage()->get_remote_ino();
1795 unsigned char d_type = dn->get_linkage()->get_remote_d_type();
f67539c2
TL
1796 auto& alternate_name = dn->alternate_name;
1797 // remote link
1798 CDentry::encode_remote(ino, d_type, alternate_name, exportbl);
7c673cae
FG
1799 continue;
1800 }
1801
1802 // primary link
1803 // -- inode
f67539c2
TL
1804 exportbl.append("i", 1); // inode dentry
1805
1806 ENCODE_START(2, 1, exportbl);
11fdf7f2 1807 encode_export_inode(in, exportbl, exported_client_map, exported_client_metadata_map); // encode, and (update state for) export
f67539c2
TL
1808 encode(dn->alternate_name, exportbl);
1809 ENCODE_FINISH(exportbl);
9f95a23c 1810
7c673cae 1811 // directory?
9f95a23c
TL
1812 auto&& dfs = in->get_dirfrags();
1813 for (const auto& t : dfs) {
7c673cae
FG
1814 if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
1815 // include nested dirfrag
11fdf7f2 1816 ceph_assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
9f95a23c 1817 subdirs.push_back(t); // it's ours, recurse (later)
7c673cae
FG
1818 }
1819 }
1820 }
1821
9f95a23c 1822 ENCODE_FINISH(exportbl);
7c673cae 1823 // subdirs
9f95a23c
TL
1824 for (const auto &dir : subdirs) {
1825 encode_export_dir(exportbl, dir, exported_client_map, exported_client_metadata_map, num_exported);
1826 }
7c673cae
FG
1827}
1828
11fdf7f2 1829void Migrator::finish_export_dir(CDir *dir, mds_rank_t peer,
7c673cae 1830 map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
11fdf7f2 1831 MDSContext::vec& finished, int *num_dentries)
7c673cae 1832{
9f95a23c 1833 dout(10) << *dir << dendl;
7c673cae
FG
1834
1835 // release open_by
1836 dir->clear_replica_map();
1837
1838 // mark
11fdf7f2 1839 ceph_assert(dir->is_auth());
7c673cae
FG
1840 dir->state_clear(CDir::STATE_AUTH);
1841 dir->remove_bloom();
1842 dir->replica_nonce = CDir::EXPORT_NONCE;
1843
1844 if (dir->is_dirty())
1845 dir->mark_clean();
1846
1847 // suck up all waiters
1848 dir->take_waiting(CDir::WAIT_ANY_MASK, finished); // all dir waiters
1849
1850 // pop
11fdf7f2 1851 dir->finish_export();
7c673cae
FG
1852
1853 // dentries
9f95a23c 1854 std::vector<CDir*> subdirs;
94b18763
FG
1855 for (auto &p : *dir) {
1856 CDentry *dn = p.second;
7c673cae
FG
1857 CInode *in = dn->get_linkage()->get_inode();
1858
1859 // dentry
1860 dn->finish_export();
1861
1862 // inode?
1863 if (dn->get_linkage()->is_primary()) {
11fdf7f2 1864 finish_export_inode(in, peer, peer_imported[in->ino()], finished);
7c673cae
FG
1865
1866 // subdirs?
9f95a23c
TL
1867 auto&& dirs = in->get_nested_dirfrags();
1868 subdirs.insert(std::end(subdirs), std::begin(dirs), std::end(dirs));
7c673cae
FG
1869 }
1870
f67539c2 1871 mdcache->touch_dentry_bottom(dn); // move dentry to tail of LRU
7c673cae
FG
1872 ++(*num_dentries);
1873 }
1874
1875 // subdirs
9f95a23c
TL
1876 for (const auto& dir : subdirs) {
1877 finish_export_dir(dir, peer, peer_imported, finished, num_dentries);
1878 }
7c673cae
FG
1879}
1880
1881class C_MDS_ExportFinishLogged : public MigratorLogContext {
1882 CDir *dir;
1883public:
1884 C_MDS_ExportFinishLogged(Migrator *m, CDir *d) : MigratorLogContext(m), dir(d) {}
1885 void finish(int r) override {
1886 mig->export_logged_finish(dir);
1887 }
1888};
1889
1890
1891/*
1892 * i should get an export_ack from the export target.
7c673cae 1893 */
9f95a23c 1894void Migrator::handle_export_ack(const cref_t<MExportDirAck> &m)
7c673cae 1895{
f67539c2 1896 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae 1897 mds_rank_t dest(m->get_source().num());
11fdf7f2
TL
1898 ceph_assert(dir);
1899 ceph_assert(dir->is_frozen_tree_root()); // i'm exporting!
7c673cae
FG
1900
1901 // yay!
9f95a23c 1902 dout(7) << *dir << dendl;
7c673cae 1903
11fdf7f2 1904 mds->hit_export_target(dest, -1);
7c673cae
FG
1905
1906 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
11fdf7f2
TL
1907 ceph_assert(it != export_state.end());
1908 ceph_assert(it->second.state == EXPORT_EXPORTING);
1909 ceph_assert(it->second.tid == m->get_tid());
7c673cae 1910
11fdf7f2
TL
1911 auto bp = m->imported_caps.cbegin();
1912 decode(it->second.peer_imported, bp);
7c673cae
FG
1913
1914 it->second.state = EXPORT_LOGGINGFINISH;
11fdf7f2 1915 assert (g_conf()->mds_kill_export_at != 9);
7c673cae 1916 set<CDir*> bounds;
f67539c2 1917 mdcache->get_subtree_bounds(dir, bounds);
7c673cae 1918
7c673cae
FG
1919 // log completion.
1920 // include export bounds, to ensure they're in the journal.
31f18b77 1921 EExport *le = new EExport(mds->mdlog, dir, it->second.peer);;
7c673cae
FG
1922 mds->mdlog->start_entry(le);
1923
1924 le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
31f18b77 1925 le->metablob.add_dir(dir, false);
7c673cae
FG
1926 for (set<CDir*>::iterator p = bounds.begin();
1927 p != bounds.end();
1928 ++p) {
1929 CDir *bound = *p;
1930 le->get_bounds().insert(bound->dirfrag());
1931 le->metablob.add_dir_context(bound);
1932 le->metablob.add_dir(bound, false);
1933 }
1934
31f18b77
FG
1935 // list us second, them first.
1936 // this keeps authority().first in sync with subtree auth state in the journal.
f67539c2 1937 mdcache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid());
31f18b77 1938
7c673cae
FG
1939 // log export completion, then finish (unfreeze, trigger finish context, etc.)
1940 mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir));
1941 mds->mdlog->flush();
11fdf7f2 1942 assert (g_conf()->mds_kill_export_at != 10);
7c673cae
FG
1943}
1944
b32b8144 1945void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
7c673cae 1946{
9f95a23c 1947 dout(7) << *dir << dendl;
7c673cae 1948
11fdf7f2 1949 ceph_assert(stat.state == EXPORT_CANCELLING);
7c673cae
FG
1950
1951 if (stat.notify_ack_waiting.empty()) {
1952 stat.state = EXPORT_CANCELLED;
1953 return;
1954 }
1955
1956 dir->auth_pin(this);
1957
1958 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
1959 p != stat.notify_ack_waiting.end();
1960 ++p) {
9f95a23c 1961 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
11fdf7f2
TL
1962 pair<int,int>(mds->get_nodeid(), stat.peer),
1963 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
7c673cae
FG
1964 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
1965 notify->get_bounds().push_back((*i)->dirfrag());
1966 mds->send_message_mds(notify, *p);
1967 }
1968}
1969
1970/*
1971 * this happens if hte dest failes after i send teh export data but before it is acked
1972 * that is, we don't know they safely received and logged it, so we reverse our changes
1973 * and go on.
1974 */
b32b8144 1975void Migrator::export_reverse(CDir *dir, export_state_t& stat)
7c673cae 1976{
9f95a23c 1977 dout(7) << *dir << dendl;
7c673cae
FG
1978
1979 set<CInode*> to_eval;
1980
1981 set<CDir*> bounds;
f67539c2 1982 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
1983
1984 // remove exporting pins
9f95a23c 1985 std::deque<CDir*> rq;
7c673cae
FG
1986 rq.push_back(dir);
1987 while (!rq.empty()) {
1988 CDir *t = rq.front();
1989 rq.pop_front();
1990 t->abort_export();
94b18763
FG
1991 for (auto &p : *t) {
1992 CDentry *dn = p.second;
1993 dn->abort_export();
1994 if (!dn->get_linkage()->is_primary())
7c673cae 1995 continue;
94b18763 1996 CInode *in = dn->get_linkage()->get_inode();
7c673cae
FG
1997 in->abort_export();
1998 if (in->state_test(CInode::STATE_EVALSTALECAPS)) {
1999 in->state_clear(CInode::STATE_EVALSTALECAPS);
2000 to_eval.insert(in);
2001 }
9f95a23c
TL
2002 if (in->is_dir()) {
2003 auto&& dirs = in->get_nested_dirfrags();
2004 for (const auto& dir : dirs) {
2005 rq.push_back(dir);
2006 }
2007 }
7c673cae
FG
2008 }
2009 }
2010
2011 // unpin bounds
b32b8144 2012 for (auto bd : bounds) {
7c673cae
FG
2013 bd->put(CDir::PIN_EXPORTBOUND);
2014 bd->state_clear(CDir::STATE_EXPORTBOUND);
2015 }
2016
7c673cae 2017 // notify bystanders
b32b8144 2018 export_notify_abort(dir, stat, bounds);
7c673cae 2019
224ce89b 2020 // unfreeze tree, with possible subtree merge.
f67539c2 2021 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
224ce89b 2022
7c673cae 2023 // process delayed expires
f67539c2 2024 mdcache->process_delayed_expire(dir);
224ce89b 2025
7c673cae 2026 dir->unfreeze_tree();
f67539c2 2027 mdcache->try_subtree_merge(dir);
7c673cae
FG
2028
2029 // revoke/resume stale caps
2030 for (auto in : to_eval) {
2031 bool need_issue = false;
11fdf7f2
TL
2032 for (auto &p : in->client_caps) {
2033 Capability *cap = &p.second;
a8e16298 2034 if (!cap->is_stale()) {
7c673cae 2035 need_issue = true;
a8e16298 2036 break;
7c673cae
FG
2037 }
2038 }
2039 if (need_issue &&
2040 (!in->is_auth() || !mds->locker->eval(in, CEPH_CAP_LOCKS)))
2041 mds->locker->issue_caps(in);
2042 }
2043
f67539c2 2044 mdcache->show_cache();
7c673cae
FG
2045}
2046
2047
2048/*
2049 * once i get the ack, and logged the EExportFinish(true),
2050 * send notifies (if any), otherwise go straight to finish.
2051 *
2052 */
2053void Migrator::export_logged_finish(CDir *dir)
2054{
9f95a23c 2055 dout(7) << *dir << dendl;
7c673cae
FG
2056
2057 export_state_t& stat = export_state[dir];
2058
2059 // send notifies
2060 set<CDir*> bounds;
f67539c2 2061 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
2062
2063 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
2064 p != stat.notify_ack_waiting.end();
2065 ++p) {
9f95a23c 2066 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
11fdf7f2
TL
2067 pair<int,int>(mds->get_nodeid(), stat.peer),
2068 pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
7c673cae
FG
2069
2070 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2071 notify->get_bounds().push_back((*i)->dirfrag());
2072
2073 mds->send_message_mds(notify, *p);
2074 }
2075
2076 // wait for notifyacks
2077 stat.state = EXPORT_NOTIFYING;
11fdf7f2 2078 assert (g_conf()->mds_kill_export_at != 11);
7c673cae
FG
2079
2080 // no notifies to wait for?
2081 if (stat.notify_ack_waiting.empty()) {
2082 export_finish(dir); // skip notify/notify_ack stage.
2083 } else {
2084 // notify peer to send cap import messages to clients
2085 if (!mds->is_cluster_degraded() ||
2086 mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) {
9f95a23c 2087 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), false, stat.tid), stat.peer);
7c673cae
FG
2088 } else {
2089 dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
2090 }
2091 }
2092}
2093
2094/*
2095 * warning:
2096 * i'll get an ack from each bystander.
2097 * when i get them all, do the export.
2098 * notify:
2099 * i'll get an ack from each bystander.
2100 * when i get them all, unfreeze and send the finish.
7c673cae 2101 */
9f95a23c 2102void Migrator::handle_export_notify_ack(const cref_t<MExportDirNotifyAck> &m)
7c673cae 2103{
f67539c2 2104 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae 2105 mds_rank_t dest(m->get_source().num());
11fdf7f2 2106 ceph_assert(dir);
7c673cae
FG
2107 mds_rank_t from = mds_rank_t(m->get_source().num());
2108
11fdf7f2 2109 mds->hit_export_target(dest, -1);
7c673cae
FG
2110
2111 auto export_state_entry = export_state.find(dir);
2112 if (export_state_entry != export_state.end()) {
2113 export_state_t& stat = export_state_entry->second;
2114 if (stat.state == EXPORT_WARNING &&
2115 stat.warning_ack_waiting.erase(from)) {
2116 // exporting. process warning.
9f95a23c 2117 dout(7) << "from " << m->get_source()
7c673cae
FG
2118 << ": exporting, processing warning on " << *dir << dendl;
2119 if (stat.warning_ack_waiting.empty())
2120 export_go(dir); // start export.
2121 } else if (stat.state == EXPORT_NOTIFYING &&
2122 stat.notify_ack_waiting.erase(from)) {
2123 // exporting. process notify.
9f95a23c 2124 dout(7) << "from " << m->get_source()
7c673cae
FG
2125 << ": exporting, processing notify on " << *dir << dendl;
2126 if (stat.notify_ack_waiting.empty())
2127 export_finish(dir);
2128 } else if (stat.state == EXPORT_CANCELLING &&
2129 m->get_new_auth().second == CDIR_AUTH_UNKNOWN && // not warning ack
2130 stat.notify_ack_waiting.erase(from)) {
9f95a23c 2131 dout(7) << "from " << m->get_source()
7c673cae
FG
2132 << ": cancelling export, processing notify on " << *dir << dendl;
2133 if (stat.notify_ack_waiting.empty()) {
91327a77 2134 export_cancel_finish(export_state_entry);
7c673cae
FG
2135 }
2136 }
2137 }
2138 else {
2139 auto import_state_entry = import_state.find(dir->dirfrag());
2140 if (import_state_entry != import_state.end()) {
2141 import_state_t& stat = import_state_entry->second;
2142 if (stat.state == IMPORT_ABORTING) {
2143 // reversing import
9f95a23c 2144 dout(7) << "from " << m->get_source()
7c673cae 2145 << ": aborting import on " << *dir << dendl;
11fdf7f2 2146 ceph_assert(stat.bystanders.count(from));
7c673cae
FG
2147 stat.bystanders.erase(from);
2148 if (stat.bystanders.empty())
2149 import_reverse_unfreeze(dir);
2150 }
2151 }
2152 }
7c673cae
FG
2153}
2154
2155void Migrator::export_finish(CDir *dir)
2156{
9f95a23c 2157 dout(3) << *dir << dendl;
7c673cae 2158
11fdf7f2 2159 assert (g_conf()->mds_kill_export_at != 12);
7c673cae
FG
2160 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
2161 if (it == export_state.end()) {
2162 dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << dendl;
2163 return;
2164 }
2165
2166 // send finish/commit to new auth
2167 if (!mds->is_cluster_degraded() ||
2168 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
9f95a23c 2169 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), true, it->second.tid), it->second.peer);
7c673cae
FG
2170 } else {
2171 dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl;
2172 }
11fdf7f2 2173 ceph_assert(g_conf()->mds_kill_export_at != 13);
7c673cae
FG
2174
2175 // finish export (adjust local cache state)
2176 int num_dentries = 0;
11fdf7f2
TL
2177 MDSContext::vec finished;
2178 finish_export_dir(dir, it->second.peer,
224ce89b
WB
2179 it->second.peer_imported, finished, &num_dentries);
2180
11fdf7f2 2181 ceph_assert(!dir->is_auth());
f67539c2 2182 mdcache->adjust_subtree_auth(dir, it->second.peer);
224ce89b 2183
7c673cae
FG
2184 // unpin bounds
2185 set<CDir*> bounds;
f67539c2 2186 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
2187 for (set<CDir*>::iterator p = bounds.begin();
2188 p != bounds.end();
2189 ++p) {
2190 CDir *bd = *p;
2191 bd->put(CDir::PIN_EXPORTBOUND);
2192 bd->state_clear(CDir::STATE_EXPORTBOUND);
2193 }
2194
2195 if (dir->state_test(CDir::STATE_AUXSUBTREE))
2196 dir->state_clear(CDir::STATE_AUXSUBTREE);
2197
224ce89b 2198 // discard delayed expires
f67539c2 2199 mdcache->discard_delayed_expire(dir);
224ce89b 2200
9f95a23c 2201 dout(7) << "unfreezing" << dendl;
224ce89b
WB
2202
2203 // unfreeze tree, with possible subtree merge.
7c673cae 2204 // (we do this _after_ removing EXPORTBOUND pins, to allow merges)
224ce89b 2205 dir->unfreeze_tree();
f67539c2 2206 mdcache->try_subtree_merge(dir);
7c673cae
FG
2207
2208 // no more auth subtree? clear scatter dirty
2209 if (!dir->get_inode()->is_auth() &&
2210 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2211 dir->get_inode()->clear_scatter_dirty();
2212 // wake up scatter_nudge waiters
224ce89b 2213 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, finished);
7c673cae
FG
2214 }
2215
224ce89b
WB
2216 if (!finished.empty())
2217 mds->queue_waiters(finished);
7c673cae 2218
91327a77
AA
2219 MutationRef mut = std::move(it->second.mut);
2220 auto parent = std::move(it->second.parent);
7c673cae 2221 // remove from exporting list, clean up state
91327a77 2222 total_exporting_size -= it->second.approx_size;
7c673cae 2223 export_state.erase(it);
91327a77 2224
11fdf7f2 2225 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
1adf2230 2226 dir->clear_exporting();
7c673cae 2227
f67539c2 2228 mdcache->show_subtrees();
7c673cae
FG
2229 audit();
2230
f67539c2 2231 mdcache->trim(num_dentries); // try trimming exported dentries
7c673cae
FG
2232
2233 // send pending import_maps?
f67539c2 2234 mdcache->maybe_send_pending_resolves();
7c673cae
FG
2235
2236 // drop locks, unpin path
2237 if (mut) {
2238 mds->locker->drop_locks(mut.get());
2239 mut->cleanup();
2240 }
91327a77
AA
2241
2242 if (parent)
2243 child_export_finish(parent, true);
2244
7c673cae
FG
2245 maybe_do_queued_export();
2246}
2247
2248
2249
11fdf7f2
TL
2250class C_MDS_ExportDiscover : public MigratorContext {
2251public:
9f95a23c 2252 C_MDS_ExportDiscover(Migrator *mig, const cref_t<MExportDirDiscover>& m) : MigratorContext(mig), m(m) {}
11fdf7f2
TL
2253 void finish(int r) override {
2254 mig->handle_export_discover(m, true);
2255 }
2256private:
9f95a23c 2257 cref_t<MExportDirDiscover> m;
11fdf7f2 2258};
7c673cae 2259
11fdf7f2
TL
2260class C_MDS_ExportDiscoverFactory : public MDSContextFactory {
2261public:
9f95a23c 2262 C_MDS_ExportDiscoverFactory(Migrator *mig, cref_t<MExportDirDiscover> m) : mig(mig), m(m) {}
11fdf7f2
TL
2263 MDSContext *build() {
2264 return new C_MDS_ExportDiscover(mig, m);
2265 }
2266private:
2267 Migrator *mig;
9f95a23c 2268 cref_t<MExportDirDiscover> m;
11fdf7f2 2269};
7c673cae
FG
2270
2271// ==========================================================
2272// IMPORT
2273
9f95a23c 2274void Migrator::handle_export_discover(const cref_t<MExportDirDiscover> &m, bool started)
7c673cae
FG
2275{
2276 mds_rank_t from = m->get_source_mds();
11fdf7f2 2277 ceph_assert(from != mds->get_nodeid());
7c673cae 2278
9f95a23c 2279 dout(7) << m->get_path() << dendl;
7c673cae
FG
2280
2281 // note import state
2282 dirfrag_t df = m->get_dirfrag();
c07f9fc5
FG
2283
2284 if (!mds->is_active()) {
2285 dout(7) << " not active, send NACK " << dendl;
9f95a23c 2286 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid(), false), from);
c07f9fc5
FG
2287 return;
2288 }
2289
7c673cae 2290 // only start discovering on this message once.
b32b8144 2291 import_state_t *p_state;
7c673cae 2292 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
11fdf7f2
TL
2293 if (!started) {
2294 ceph_assert(it == import_state.end());
b32b8144
FG
2295 p_state = &import_state[df];
2296 p_state->state = IMPORT_DISCOVERING;
2297 p_state->peer = from;
2298 p_state->tid = m->get_tid();
7c673cae
FG
2299 } else {
2300 // am i retrying after ancient path_traverse results?
2301 if (it == import_state.end() ||
2302 it->second.peer != from ||
2303 it->second.tid != m->get_tid()) {
2304 dout(7) << " dropping obsolete message" << dendl;
7c673cae
FG
2305 return;
2306 }
11fdf7f2 2307 ceph_assert(it->second.state == IMPORT_DISCOVERING);
b32b8144 2308 p_state = &it->second;
7c673cae
FG
2309 }
2310
11fdf7f2 2311 C_MDS_ExportDiscoverFactory cf(this, m);
f67539c2 2312 if (!mdcache->is_open()) {
9f95a23c 2313 dout(10) << " waiting for root" << dendl;
11fdf7f2 2314 mds->mdcache->wait_for_open(cf.build());
7c673cae
FG
2315 return;
2316 }
2317
11fdf7f2 2318 assert (g_conf()->mds_kill_import_at != 1);
7c673cae
FG
2319
2320 // do we have it?
f67539c2 2321 CInode *in = mdcache->get_inode(m->get_dirfrag().ino);
7c673cae
FG
2322 if (!in) {
2323 // must discover it!
2324 filepath fpath(m->get_path());
2325 vector<CDentry*> trace;
2326 MDRequestRef null_ref;
f67539c2
TL
2327 int r = mdcache->path_traverse(null_ref, cf, fpath,
2328 MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_PATH_LOCKED,
2329 &trace);
7c673cae
FG
2330 if (r > 0) return;
2331 if (r < 0) {
9f95a23c 2332 dout(7) << "failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
7c673cae
FG
2333 ceph_abort(); // this shouldn't happen if the auth pins its path properly!!!!
2334 }
2335
2336 ceph_abort(); // this shouldn't happen; the get_inode above would have succeeded.
2337 }
2338
2339 // yay
9f95a23c 2340 dout(7) << "have " << df << " inode " << *in << dendl;
7c673cae 2341
b32b8144 2342 p_state->state = IMPORT_DISCOVERED;
7c673cae
FG
2343
2344 // pin inode in the cache (for now)
11fdf7f2 2345 ceph_assert(in->is_dir());
7c673cae
FG
2346 in->get(CInode::PIN_IMPORTING);
2347
2348 // reply
2349 dout(7) << " sending export_discover_ack on " << *in << dendl;
9f95a23c 2350 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid()), p_state->peer);
11fdf7f2 2351 assert (g_conf()->mds_kill_import_at != 2);
7c673cae
FG
2352}
2353
2354void Migrator::import_reverse_discovering(dirfrag_t df)
2355{
2356 import_state.erase(df);
2357}
2358
2359void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
2360{
2361 // unpin base
2362 diri->put(CInode::PIN_IMPORTING);
2363 import_state.erase(df);
2364}
2365
b32b8144 2366void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
7c673cae
FG
2367{
2368 set<CDir*> bounds;
f67539c2 2369 mdcache->map_dirfrag_set(stat.bound_ls, bounds);
7c673cae
FG
2370 import_remove_pins(dir, bounds);
2371 import_reverse_final(dir);
2372}
2373
9f95a23c 2374void Migrator::handle_export_cancel(const cref_t<MExportDirCancel> &m)
7c673cae 2375{
9f95a23c 2376 dout(7) << "on " << m->get_dirfrag() << dendl;
7c673cae
FG
2377 dirfrag_t df = m->get_dirfrag();
2378 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2379 if (it == import_state.end()) {
11fdf7f2 2380 ceph_abort_msg("got export_cancel in weird state");
7c673cae
FG
2381 } else if (it->second.state == IMPORT_DISCOVERING) {
2382 import_reverse_discovering(df);
2383 } else if (it->second.state == IMPORT_DISCOVERED) {
f67539c2 2384 CInode *in = mdcache->get_inode(df.ino);
11fdf7f2 2385 ceph_assert(in);
7c673cae
FG
2386 import_reverse_discovered(df, in);
2387 } else if (it->second.state == IMPORT_PREPPING) {
f67539c2 2388 CDir *dir = mdcache->get_dirfrag(df);
11fdf7f2 2389 ceph_assert(dir);
b32b8144 2390 import_reverse_prepping(dir, it->second);
7c673cae 2391 } else if (it->second.state == IMPORT_PREPPED) {
f67539c2 2392 CDir *dir = mdcache->get_dirfrag(df);
11fdf7f2 2393 ceph_assert(dir);
7c673cae 2394 set<CDir*> bounds;
f67539c2 2395 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
2396 import_remove_pins(dir, bounds);
2397 // adjust auth back to the exportor
f67539c2 2398 mdcache->adjust_subtree_auth(dir, it->second.peer);
7c673cae
FG
2399 import_reverse_unfreeze(dir);
2400 } else {
11fdf7f2 2401 ceph_abort_msg("got export_cancel in weird state");
7c673cae 2402 }
7c673cae
FG
2403}
2404
11fdf7f2
TL
2405class C_MDS_ExportPrep : public MigratorContext {
2406public:
9f95a23c 2407 C_MDS_ExportPrep(Migrator *mig, const cref_t<MExportDirPrep>& m) : MigratorContext(mig), m(m) {}
11fdf7f2
TL
2408 void finish(int r) override {
2409 mig->handle_export_prep(m, true);
2410 }
2411private:
9f95a23c 2412 cref_t<MExportDirPrep> m;
11fdf7f2
TL
2413};
2414
2415class C_MDS_ExportPrepFactory : public MDSContextFactory {
2416public:
9f95a23c 2417 C_MDS_ExportPrepFactory(Migrator *mig, cref_t<MExportDirPrep> m) : mig(mig), m(m) {}
11fdf7f2
TL
2418 MDSContext *build() {
2419 return new C_MDS_ExportPrep(mig, m);
2420 }
2421private:
2422 Migrator *mig;
9f95a23c 2423 cref_t<MExportDirPrep> m;
11fdf7f2
TL
2424};
2425
9f95a23c
TL
2426void Migrator::decode_export_prep_trace(bufferlist::const_iterator& blp, mds_rank_t oldauth, MDSContext::vec& finished)
2427{
2428 DECODE_START(1, blp);
2429 dirfrag_t df;
2430 decode(df, blp);
2431 char start;
2432 decode(start, blp);
2433 dout(10) << " trace from " << df << " start " << start << dendl;
2434
2435 CDir *cur = nullptr;
2436 if (start == 'd') {
f67539c2 2437 cur = mdcache->get_dirfrag(df);
9f95a23c
TL
2438 ceph_assert(cur);
2439 dout(10) << " had " << *cur << dendl;
2440 } else if (start == 'f') {
f67539c2 2441 CInode *in = mdcache->get_inode(df.ino);
9f95a23c
TL
2442 ceph_assert(in);
2443 dout(10) << " had " << *in << dendl;
f67539c2 2444 mdcache->decode_replica_dir(cur, blp, in, oldauth, finished);
9f95a23c
TL
2445 dout(10) << " added " << *cur << dendl;
2446 } else if (start == '-') {
2447 // nothing
2448 } else
2449 ceph_abort_msg("unrecognized start char");
2450
2451 while (!blp.end()) {
2452 CDentry *dn = nullptr;
f67539c2 2453 mdcache->decode_replica_dentry(dn, blp, cur, finished);
9f95a23c
TL
2454 dout(10) << " added " << *dn << dendl;
2455 CInode *in = nullptr;
f67539c2 2456 mdcache->decode_replica_inode(in, blp, dn, finished);
9f95a23c
TL
2457 dout(10) << " added " << *in << dendl;
2458 if (blp.end())
2459 break;
f67539c2 2460 mdcache->decode_replica_dir(cur, blp, in, oldauth, finished);
9f95a23c
TL
2461 dout(10) << " added " << *cur << dendl;
2462 }
2463
2464 DECODE_FINISH(blp);
2465}
2466
2467void Migrator::handle_export_prep(const cref_t<MExportDirPrep> &m, bool did_assim)
7c673cae
FG
2468{
2469 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
11fdf7f2 2470 ceph_assert(oldauth != mds->get_nodeid());
7c673cae
FG
2471
2472 CDir *dir;
2473 CInode *diri;
11fdf7f2 2474 MDSContext::vec finished;
7c673cae
FG
2475
2476 // assimilate root dir.
2477 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
11fdf7f2
TL
2478 if (!did_assim) {
2479 ceph_assert(it != import_state.end());
2480 ceph_assert(it->second.state == IMPORT_DISCOVERED);
2481 ceph_assert(it->second.peer == oldauth);
f67539c2 2482 diri = mdcache->get_inode(m->get_dirfrag().ino);
11fdf7f2
TL
2483 ceph_assert(diri);
2484 auto p = m->basedir.cbegin();
f67539c2 2485 mdcache->decode_replica_dir(dir, p, diri, oldauth, finished);
9f95a23c 2486 dout(7) << "on " << *dir << " (first pass)" << dendl;
7c673cae
FG
2487 } else {
2488 if (it == import_state.end() ||
2489 it->second.peer != oldauth ||
2490 it->second.tid != m->get_tid()) {
9f95a23c 2491 dout(7) << "obsolete message, dropping" << dendl;
7c673cae
FG
2492 return;
2493 }
11fdf7f2
TL
2494 ceph_assert(it->second.state == IMPORT_PREPPING);
2495 ceph_assert(it->second.peer == oldauth);
7c673cae 2496
f67539c2 2497 dir = mdcache->get_dirfrag(m->get_dirfrag());
11fdf7f2 2498 ceph_assert(dir);
9f95a23c 2499 dout(7) << "on " << *dir << " (subsequent pass)" << dendl;
7c673cae
FG
2500 diri = dir->get_inode();
2501 }
11fdf7f2 2502 ceph_assert(dir->is_auth() == false);
7c673cae 2503
f67539c2 2504 mdcache->show_subtrees();
7c673cae
FG
2505
2506 // build import bound map
2507 map<inodeno_t, fragset_t> import_bound_fragset;
11fdf7f2
TL
2508 for (const auto &bound : m->get_bounds()) {
2509 dout(10) << " bound " << bound << dendl;
9f95a23c 2510 import_bound_fragset[bound.ino].insert_raw(bound.frag);
7c673cae 2511 }
7c673cae 2512 // assimilate contents?
11fdf7f2 2513 if (!did_assim) {
7c673cae 2514 dout(7) << "doing assim on " << *dir << dendl;
7c673cae
FG
2515
2516 // change import state
2517 it->second.state = IMPORT_PREPPING;
2518 it->second.bound_ls = m->get_bounds();
2519 it->second.bystanders = m->get_bystanders();
11fdf7f2 2520 ceph_assert(g_conf()->mds_kill_import_at != 3);
7c673cae
FG
2521
2522 // bystander list
2523 dout(7) << "bystanders are " << it->second.bystanders << dendl;
2524
2525 // move pin to dir
2526 diri->put(CInode::PIN_IMPORTING);
2527 dir->get(CDir::PIN_IMPORTING);
2528 dir->state_set(CDir::STATE_IMPORTING);
2529
2530 // assimilate traces to exports
2531 // each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
11fdf7f2 2532 for (const auto &bl : m->traces) {
9f95a23c
TL
2533 auto blp = bl.cbegin();
2534 decode_export_prep_trace(blp, oldauth, finished);
7c673cae
FG
2535 }
2536
2537 // make bound sticky
2538 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2539 p != import_bound_fragset.end();
2540 ++p) {
9f95a23c 2541 p->second.simplify();
f67539c2 2542 CInode *in = mdcache->get_inode(p->first);
11fdf7f2 2543 ceph_assert(in);
7c673cae
FG
2544 in->get_stickydirs();
2545 dout(7) << " set stickydirs on bound inode " << *in << dendl;
2546 }
2547
2548 } else {
2549 dout(7) << " not doing assim on " << *dir << dendl;
2550 }
2551
81eedcae 2552 MDSGatherBuilder gather(g_ceph_context);
11fdf7f2 2553
7c673cae
FG
2554 if (!finished.empty())
2555 mds->queue_waiters(finished);
2556
2557
c07f9fc5
FG
2558 bool success = true;
2559 if (mds->is_active()) {
2560 // open all bounds
2561 set<CDir*> import_bounds;
2562 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2563 p != import_bound_fragset.end();
2564 ++p) {
f67539c2 2565 CInode *in = mdcache->get_inode(p->first);
11fdf7f2 2566 ceph_assert(in);
7c673cae 2567
c07f9fc5 2568 // map fragset into a frag_t list, based on the inode fragtree
11fdf7f2
TL
2569 frag_vec_t leaves;
2570 for (const auto& frag : p->second) {
2571 in->dirfragtree.get_leaves_under(frag, leaves);
2572 }
2573 dout(10) << " bound inode " << p->first << " fragset " << p->second << " maps to " << leaves << dendl;
c07f9fc5 2574
11fdf7f2 2575 for (const auto& leaf : leaves) {
f67539c2 2576 CDir *bound = mdcache->get_dirfrag(dirfrag_t(p->first, leaf));
c07f9fc5 2577 if (!bound) {
11fdf7f2 2578 dout(7) << " opening bounding dirfrag " << leaf << " on " << *in << dendl;
f67539c2 2579 mdcache->open_remote_dirfrag(in, leaf, gather.new_sub());
81eedcae 2580 continue;
c07f9fc5 2581 }
7c673cae 2582
c07f9fc5
FG
2583 if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
2584 dout(7) << " pinning import bound " << *bound << dendl;
2585 bound->get(CDir::PIN_IMPORTBOUND);
2586 bound->state_set(CDir::STATE_IMPORTBOUND);
2587 } else {
2588 dout(7) << " already pinned import bound " << *bound << dendl;
2589 }
2590 import_bounds.insert(bound);
7c673cae 2591 }
7c673cae 2592 }
7c673cae 2593
81eedcae
TL
2594 if (gather.has_subs()) {
2595 C_MDS_ExportPrepFactory cf(this, m);
2596 gather.set_finisher(cf.build());
2597 gather.activate();
2598 return;
2599 }
2600
c07f9fc5
FG
2601 dout(7) << " all ready, noting auth and freezing import region" << dendl;
2602
f67539c2 2603 if (!mdcache->is_readonly() &&
9f95a23c
TL
2604 // for pinning scatter gather. loner has a higher chance to get wrlock
2605 diri->filelock.can_wrlock(diri->get_loner()) &&
2606 diri->nestlock.can_wrlock(diri->get_loner())) {
c07f9fc5
FG
2607 it->second.mut = new MutationImpl();
2608 // force some locks. hacky.
2609 mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);
2610 mds->locker->wrlock_force(&dir->inode->nestlock, it->second.mut);
2611
2612 // note that i am an ambiguous auth for this subtree.
2613 // specify bounds, since the exporter explicitly defines the region.
f67539c2 2614 mdcache->adjust_bounded_subtree_auth(dir, import_bounds,
c07f9fc5 2615 pair<int,int>(oldauth, mds->get_nodeid()));
f67539c2 2616 mdcache->verify_subtree_bounds(dir, import_bounds);
c07f9fc5
FG
2617 // freeze.
2618 dir->_freeze_tree();
2619 // note new state
2620 it->second.state = IMPORT_PREPPED;
2621 } else {
2622 dout(7) << " couldn't acquire all needed locks, failing. " << *dir << dendl;
2623 success = false;
2624 }
7c673cae 2625 } else {
c07f9fc5 2626 dout(7) << " not active, failing. " << *dir << dendl;
7c673cae 2627 success = false;
7c673cae
FG
2628 }
2629
c07f9fc5 2630 if (!success)
b32b8144 2631 import_reverse_prepping(dir, it->second);
c07f9fc5 2632
7c673cae
FG
2633 // ok!
2634 dout(7) << " sending export_prep_ack on " << *dir << dendl;
9f95a23c 2635 mds->send_message(make_message<MExportDirPrepAck>(dir->dirfrag(), success, m->get_tid()), m->get_connection());
7c673cae 2636
11fdf7f2 2637 ceph_assert(g_conf()->mds_kill_import_at != 4);
7c673cae
FG
2638}
2639
2640
2641
2642
2643class C_MDS_ImportDirLoggedStart : public MigratorLogContext {
2644 dirfrag_t df;
2645 CDir *dir;
2646 mds_rank_t from;
2647public:
28e407b8 2648 map<client_t,pair<Session*,uint64_t> > imported_session_map;
7c673cae
FG
2649
2650 C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
2651 MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
9f95a23c 2652 dir->get(CDir::PIN_PTRWAITER);
7c673cae
FG
2653 }
2654 void finish(int r) override {
28e407b8 2655 mig->import_logged_start(df, dir, from, imported_session_map);
9f95a23c 2656 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
2657 }
2658};
2659
9f95a23c 2660void Migrator::handle_export_dir(const cref_t<MExportDir> &m)
7c673cae 2661{
11fdf7f2 2662 assert (g_conf()->mds_kill_import_at != 5);
f67539c2 2663 CDir *dir = mdcache->get_dirfrag(m->dirfrag);
11fdf7f2 2664 ceph_assert(dir);
31f18b77
FG
2665
2666 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
9f95a23c 2667 dout(7) << "importing " << *dir << " from " << oldauth << dendl;
31f18b77 2668
11fdf7f2
TL
2669 ceph_assert(!dir->is_auth());
2670 ceph_assert(dir->freeze_tree_state);
7c673cae
FG
2671
2672 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->dirfrag);
11fdf7f2
TL
2673 ceph_assert(it != import_state.end());
2674 ceph_assert(it->second.state == IMPORT_PREPPED);
2675 ceph_assert(it->second.tid == m->get_tid());
2676 ceph_assert(it->second.peer == oldauth);
7c673cae
FG
2677
2678 if (!dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()))
2679 dir->get_inode()->dirfragtree.force_to_leaf(g_ceph_context, dir->get_frag());
2680
f67539c2 2681 mdcache->show_subtrees();
7c673cae 2682
31f18b77 2683 C_MDS_ImportDirLoggedStart *onlogged = new C_MDS_ImportDirLoggedStart(this, dir, oldauth);
7c673cae
FG
2684
2685 // start the journal entry
31f18b77 2686 EImportStart *le = new EImportStart(mds->mdlog, dir->dirfrag(), m->bounds, oldauth);
7c673cae
FG
2687 mds->mdlog->start_entry(le);
2688
2689 le->metablob.add_dir_context(dir);
2690
2691 // adjust auth (list us _first_)
f67539c2 2692 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth);
7c673cae
FG
2693
2694 // new client sessions, open these after we journal
2695 // include imported sessions in EImportStart
11fdf7f2 2696 auto cmp = m->client_map.cbegin();
28e407b8 2697 map<client_t,entity_inst_t> client_map;
11fdf7f2 2698 map<client_t,client_metadata_t> client_metadata_map;
28e407b8 2699 decode(client_map, cmp);
11fdf7f2
TL
2700 decode(client_metadata_map, cmp);
2701 ceph_assert(cmp.end());
2702 le->cmapv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
2703 onlogged->imported_session_map);
28e407b8 2704 encode(client_map, le->client_map, mds->mdsmap->get_up_features());
11fdf7f2 2705 encode(client_metadata_map, le->client_map);
7c673cae 2706
11fdf7f2 2707 auto blp = m->export_data.cbegin();
7c673cae
FG
2708 int num_imported_inodes = 0;
2709 while (!blp.end()) {
9f95a23c
TL
2710 decode_import_dir(blp,
2711 oldauth,
2712 dir, // import root
2713 le,
2714 mds->mdlog->get_current_segment(),
2715 it->second.peer_exports,
2716 it->second.updated_scatterlocks,
2717 num_imported_inodes);
7c673cae
FG
2718 }
2719 dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
2720
2721 // include bounds in EImportStart
2722 set<CDir*> import_bounds;
11fdf7f2 2723 for (const auto &bound : m->bounds) {
f67539c2 2724 CDir *bd = mdcache->get_dirfrag(bound);
11fdf7f2 2725 ceph_assert(bd);
7c673cae
FG
2726 le->metablob.add_dir(bd, false); // note that parent metadata is already in the event
2727 import_bounds.insert(bd);
2728 }
f67539c2 2729 mdcache->verify_subtree_bounds(dir, import_bounds);
7c673cae
FG
2730
2731 // adjust popularity
11fdf7f2 2732 mds->balancer->add_import(dir);
7c673cae 2733
9f95a23c 2734 dout(7) << "did " << *dir << dendl;
7c673cae
FG
2735
2736 // note state
2737 it->second.state = IMPORT_LOGGINGSTART;
11fdf7f2 2738 assert (g_conf()->mds_kill_import_at != 6);
7c673cae
FG
2739
2740 // log it
2741 mds->mdlog->submit_entry(le, onlogged);
2742 mds->mdlog->flush();
2743
2744 // some stats
2745 if (mds->logger) {
2746 mds->logger->inc(l_mds_imported);
2747 mds->logger->inc(l_mds_imported_inodes, num_imported_inodes);
2748 }
7c673cae
FG
2749}
2750
2751
2752/*
2753 * this is an import helper
2754 * called by import_finish, and import_reverse and friends.
2755 */
2756void Migrator::import_remove_pins(CDir *dir, set<CDir*>& bounds)
2757{
2758 import_state_t& stat = import_state[dir->dirfrag()];
2759 // root
2760 dir->put(CDir::PIN_IMPORTING);
2761 dir->state_clear(CDir::STATE_IMPORTING);
2762
2763 // bounding inodes
2764 set<inodeno_t> did;
2765 for (list<dirfrag_t>::iterator p = stat.bound_ls.begin();
2766 p != stat.bound_ls.end();
2767 ++p) {
2768 if (did.count(p->ino))
2769 continue;
2770 did.insert(p->ino);
f67539c2 2771 CInode *in = mdcache->get_inode(p->ino);
11fdf7f2 2772 ceph_assert(in);
7c673cae
FG
2773 in->put_stickydirs();
2774 }
2775
2776 if (stat.state == IMPORT_PREPPING) {
2777 for (auto bd : bounds) {
2778 if (bd->state_test(CDir::STATE_IMPORTBOUND)) {
2779 bd->put(CDir::PIN_IMPORTBOUND);
2780 bd->state_clear(CDir::STATE_IMPORTBOUND);
2781 }
2782 }
2783 } else if (stat.state >= IMPORT_PREPPED) {
2784 // bounding dirfrags
2785 for (auto bd : bounds) {
11fdf7f2 2786 ceph_assert(bd->state_test(CDir::STATE_IMPORTBOUND));
7c673cae
FG
2787 bd->put(CDir::PIN_IMPORTBOUND);
2788 bd->state_clear(CDir::STATE_IMPORTBOUND);
2789 }
2790 }
2791}
2792
91327a77
AA
2793class C_MDC_QueueContexts : public MigratorContext {
2794public:
11fdf7f2 2795 MDSContext::vec contexts;
91327a77
AA
2796 C_MDC_QueueContexts(Migrator *m) : MigratorContext(m) {}
2797 void finish(int r) override {
2798 // execute contexts immediately after 'this' context
2799 get_mds()->queue_waiters_front(contexts);
2800 }
2801};
7c673cae
FG
2802
2803/*
2804 * note: this does teh full work of reversing and import and cleaning up
2805 * state.
2806 * called by both handle_mds_failure and by handle_resolve (if we are
2807 * a survivor coping with an exporter failure+recovery).
2808 */
2809void Migrator::import_reverse(CDir *dir)
2810{
9f95a23c 2811 dout(7) << *dir << dendl;
7c673cae
FG
2812
2813 import_state_t& stat = import_state[dir->dirfrag()];
2814 stat.state = IMPORT_ABORTING;
2815
2816 set<CDir*> bounds;
f67539c2 2817 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
2818
2819 // remove pins
2820 import_remove_pins(dir, bounds);
2821
2822 // update auth, with possible subtree merge.
11fdf7f2 2823 ceph_assert(dir->is_subtree_root());
7c673cae 2824 if (mds->is_resolve())
f67539c2 2825 mdcache->trim_non_auth_subtree(dir);
7c673cae 2826
f67539c2 2827 mdcache->adjust_subtree_auth(dir, stat.peer);
7c673cae 2828
91327a77 2829 auto fin = new C_MDC_QueueContexts(this);
7c673cae
FG
2830 if (!dir->get_inode()->is_auth() &&
2831 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2832 dir->get_inode()->clear_scatter_dirty();
2833 // wake up scatter_nudge waiters
2834 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2835 }
2836
2837 int num_dentries = 0;
2838 // adjust auth bits.
9f95a23c 2839 std::deque<CDir*> q;
7c673cae
FG
2840 q.push_back(dir);
2841 while (!q.empty()) {
2842 CDir *cur = q.front();
2843 q.pop_front();
2844
2845 // dir
11fdf7f2 2846 cur->abort_import();
7c673cae 2847
94b18763
FG
2848 for (auto &p : *cur) {
2849 CDentry *dn = p.second;
7c673cae
FG
2850
2851 // dentry
2852 dn->state_clear(CDentry::STATE_AUTH);
2853 dn->clear_replica_map();
2854 dn->set_replica_nonce(CDentry::EXPORT_NONCE);
2855 if (dn->is_dirty())
2856 dn->mark_clean();
2857
2858 // inode?
2859 if (dn->get_linkage()->is_primary()) {
2860 CInode *in = dn->get_linkage()->get_inode();
2861 in->state_clear(CDentry::STATE_AUTH);
2862 in->clear_replica_map();
2863 in->set_replica_nonce(CInode::EXPORT_NONCE);
2864 if (in->is_dirty())
2865 in->mark_clean();
2866 in->clear_dirty_rstat();
2867 if (!in->has_subtree_root_dirfrag(mds->get_nodeid())) {
2868 in->clear_scatter_dirty();
2869 in->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2870 }
2871
2872 in->clear_dirty_parent();
2873
f91f0fd5
TL
2874 in->clear_clientwriteable();
2875 in->state_clear(CInode::STATE_NEEDSRECOVER);
2876
7c673cae
FG
2877 in->authlock.clear_gather();
2878 in->linklock.clear_gather();
2879 in->dirfragtreelock.clear_gather();
2880 in->filelock.clear_gather();
2881
2882 in->clear_file_locks();
2883
2884 // non-bounding dir?
9f95a23c
TL
2885 auto&& dfs = in->get_dirfrags();
2886 for (const auto& dir : dfs) {
2887 if (bounds.count(dir) == 0)
2888 q.push_back(dir);
2889 }
7c673cae
FG
2890 }
2891
f67539c2 2892 mdcache->touch_dentry_bottom(dn); // move dentry to tail of LRU
7c673cae
FG
2893 ++num_dentries;
2894 }
2895 }
2896
2897 dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
2898
2899 if (stat.state == IMPORT_ACKING) {
2900 // remove imported caps
2901 for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
28e407b8
AA
2902 p != stat.peer_exports.end();
2903 ++p) {
7c673cae
FG
2904 CInode *in = p->first;
2905 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
28e407b8
AA
2906 q != p->second.end();
2907 ++q) {
7c673cae 2908 Capability *cap = in->get_client_cap(q->first);
28e407b8 2909 if (!cap) {
11fdf7f2 2910 ceph_assert(!stat.session_map.count(q->first));
28e407b8
AA
2911 continue;
2912 }
7c673cae
FG
2913 if (cap->is_importing())
2914 in->remove_client_cap(q->first);
f91f0fd5
TL
2915 else
2916 cap->clear_clientwriteable();
7c673cae
FG
2917 }
2918 in->put(CInode::PIN_IMPORTINGCAPS);
2919 }
28e407b8
AA
2920 for (auto& p : stat.session_map) {
2921 Session *session = p.second.first;
7c673cae
FG
2922 session->dec_importing();
2923 }
2924 }
2925
2926 // log our failure
2927 mds->mdlog->start_submit_entry(new EImportFinish(dir, false)); // log failure
2928
f67539c2 2929 mdcache->trim(num_dentries); // try trimming dentries
7c673cae
FG
2930
2931 // notify bystanders; wait in aborting state
2932 import_notify_abort(dir, bounds);
2933}
2934
2935void Migrator::import_notify_finish(CDir *dir, set<CDir*>& bounds)
2936{
9f95a23c 2937 dout(7) << *dir << dendl;
7c673cae
FG
2938
2939 import_state_t& stat = import_state[dir->dirfrag()];
2940 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2941 p != stat.bystanders.end();
2942 ++p) {
9f95a23c 2943 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, false,
11fdf7f2
TL
2944 pair<int,int>(stat.peer, mds->get_nodeid()),
2945 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
7c673cae
FG
2946 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2947 notify->get_bounds().push_back((*i)->dirfrag());
2948 mds->send_message_mds(notify, *p);
2949 }
2950}
2951
2952void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)
2953{
9f95a23c 2954 dout(7) << *dir << dendl;
7c673cae
FG
2955
2956 import_state_t& stat = import_state[dir->dirfrag()];
2957 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2958 p != stat.bystanders.end(); ) {
2959 if (mds->is_cluster_degraded() &&
2960 !mds->mdsmap->is_clientreplay_or_active_or_stopping(*p)) {
2961 // this can happen if both exporter and bystander fail in the same mdsmap epoch
2962 stat.bystanders.erase(p++);
2963 continue;
2964 }
9f95a23c 2965 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
11fdf7f2
TL
2966 mds_authority_t(stat.peer, mds->get_nodeid()),
2967 mds_authority_t(stat.peer, CDIR_AUTH_UNKNOWN));
7c673cae
FG
2968 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2969 notify->get_bounds().push_back((*i)->dirfrag());
2970 mds->send_message_mds(notify, *p);
2971 ++p;
2972 }
2973 if (stat.bystanders.empty()) {
2974 dout(7) << "no bystanders, finishing reverse now" << dendl;
2975 import_reverse_unfreeze(dir);
2976 } else {
11fdf7f2 2977 assert (g_conf()->mds_kill_import_at != 10);
7c673cae
FG
2978 }
2979}
2980
2981void Migrator::import_reverse_unfreeze(CDir *dir)
2982{
9f95a23c 2983 dout(7) << *dir << dendl;
11fdf7f2 2984 ceph_assert(!dir->is_auth());
f67539c2 2985 mdcache->discard_delayed_expire(dir);
224ce89b
WB
2986 dir->unfreeze_tree();
2987 if (dir->is_subtree_root())
f67539c2 2988 mdcache->try_subtree_merge(dir);
7c673cae
FG
2989 import_reverse_final(dir);
2990}
2991
2992void Migrator::import_reverse_final(CDir *dir)
2993{
9f95a23c 2994 dout(7) << *dir << dendl;
7c673cae
FG
2995
2996 // clean up
2997 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
11fdf7f2 2998 ceph_assert(it != import_state.end());
7c673cae
FG
2999
3000 MutationRef mut = it->second.mut;
3001 import_state.erase(it);
3002
3003 // send pending import_maps?
f67539c2 3004 mdcache->maybe_send_pending_resolves();
7c673cae
FG
3005
3006 if (mut) {
3007 mds->locker->drop_locks(mut.get());
3008 mut->cleanup();
3009 }
3010
f67539c2 3011 mdcache->show_subtrees();
7c673cae
FG
3012 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
3013}
3014
3015
3016
3017
3018void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
28e407b8 3019 map<client_t,pair<Session*,uint64_t> >& imported_session_map)
7c673cae 3020{
9f95a23c
TL
3021 dout(7) << *dir << dendl;
3022
7c673cae
FG
3023 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
3024 if (it == import_state.end() ||
3025 it->second.state != IMPORT_LOGGINGSTART) {
3026 dout(7) << "import " << df << " must have aborted" << dendl;
28e407b8 3027 mds->server->finish_force_open_sessions(imported_session_map);
7c673cae
FG
3028 return;
3029 }
3030
7c673cae
FG
3031 // note state
3032 it->second.state = IMPORT_ACKING;
3033
11fdf7f2 3034 assert (g_conf()->mds_kill_import_at != 7);
7c673cae
FG
3035
3036 // force open client sessions and finish cap import
28e407b8 3037 mds->server->finish_force_open_sessions(imported_session_map, false);
7c673cae
FG
3038
3039 map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
3040 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3041 p != it->second.peer_exports.end();
3042 ++p) {
3043 // parameter 'peer' is NONE, delay sending cap import messages to client
28e407b8
AA
3044 finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map,
3045 p->second, imported_caps[p->first->ino()]);
7c673cae 3046 }
28e407b8
AA
3047
3048 it->second.session_map.swap(imported_session_map);
7c673cae
FG
3049
3050 // send notify's etc.
3051 dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
3052
3053 // test surviving observer of a failed migration that did not complete
3054 //assert(dir->replica_map.size() < 2 || mds->get_nodeid() != 0);
3055
9f95a23c 3056 auto ack = make_message<MExportDirAck>(dir->dirfrag(), it->second.tid);
11fdf7f2 3057 encode(imported_caps, ack->imported_caps);
7c673cae
FG
3058
3059 mds->send_message_mds(ack, from);
11fdf7f2 3060 assert (g_conf()->mds_kill_import_at != 8);
7c673cae 3061
f67539c2 3062 mdcache->show_subtrees();
7c673cae
FG
3063}
3064
9f95a23c 3065void Migrator::handle_export_finish(const cref_t<MExportDirFinish> &m)
7c673cae 3066{
f67539c2 3067 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
11fdf7f2 3068 ceph_assert(dir);
9f95a23c 3069 dout(7) << *dir << (m->is_last() ? " last" : "") << dendl;
7c673cae
FG
3070
3071 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
11fdf7f2
TL
3072 ceph_assert(it != import_state.end());
3073 ceph_assert(it->second.tid == m->get_tid());
7c673cae
FG
3074
3075 import_finish(dir, false, m->is_last());
7c673cae
FG
3076}
3077
3078void Migrator::import_finish(CDir *dir, bool notify, bool last)
3079{
9f95a23c 3080 dout(7) << *dir << dendl;
7c673cae
FG
3081
3082 map<dirfrag_t,import_state_t>::iterator it = import_state.find(dir->dirfrag());
11fdf7f2
TL
3083 ceph_assert(it != import_state.end());
3084 ceph_assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING);
7c673cae 3085
224ce89b 3086 if (it->second.state == IMPORT_ACKING) {
11fdf7f2 3087 ceph_assert(dir->is_auth());
f67539c2 3088 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
224ce89b
WB
3089 }
3090
7c673cae 3091 // log finish
11fdf7f2 3092 ceph_assert(g_conf()->mds_kill_import_at != 9);
7c673cae
FG
3093
3094 if (it->second.state == IMPORT_ACKING) {
3095 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3096 p != it->second.peer_exports.end();
3097 ++p) {
3098 CInode *in = p->first;
11fdf7f2 3099 ceph_assert(in->is_auth());
7c673cae
FG
3100 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
3101 q != p->second.end();
3102 ++q) {
28e407b8
AA
3103 auto r = it->second.session_map.find(q->first);
3104 if (r == it->second.session_map.end())
3105 continue;
3106
3107 Session *session = r->second.first;
7c673cae 3108 Capability *cap = in->get_client_cap(q->first);
11fdf7f2 3109 ceph_assert(cap);
7c673cae
FG
3110 cap->merge(q->second, true);
3111 cap->clear_importing();
f67539c2 3112 mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
7c673cae
FG
3113 q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
3114 }
3115 p->second.clear();
3116 in->replica_caps_wanted = 0;
3117 }
28e407b8
AA
3118 for (auto& p : it->second.session_map) {
3119 Session *session = p.second.first;
7c673cae
FG
3120 session->dec_importing();
3121 }
3122 }
3123
3124 if (!last) {
11fdf7f2 3125 ceph_assert(it->second.state == IMPORT_ACKING);
7c673cae
FG
3126 it->second.state = IMPORT_FINISHING;
3127 return;
3128 }
3129
3130 // remove pins
3131 set<CDir*> bounds;
f67539c2 3132 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
3133
3134 if (notify)
3135 import_notify_finish(dir, bounds);
3136
3137 import_remove_pins(dir, bounds);
3138
3139 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3140 it->second.peer_exports.swap(peer_exports);
3141
3142 // clear import state (we're done!)
3143 MutationRef mut = it->second.mut;
3144 import_state.erase(it);
3145
7c673cae
FG
3146 mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
3147
7c673cae 3148 // process delayed expires
f67539c2 3149 mdcache->process_delayed_expire(dir);
7c673cae 3150
224ce89b 3151 // unfreeze tree, with possible subtree merge.
7c673cae 3152 dir->unfreeze_tree();
f67539c2 3153 mdcache->try_subtree_merge(dir);
224ce89b 3154
f67539c2 3155 mdcache->show_subtrees();
7c673cae
FG
3156 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
3157
3158 if (mut) {
3159 mds->locker->drop_locks(mut.get());
3160 mut->cleanup();
3161 }
3162
3163 // re-eval imported caps
3164 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = peer_exports.begin();
3165 p != peer_exports.end();
3166 ++p) {
3167 if (p->first->is_auth())
3168 mds->locker->eval(p->first, CEPH_CAP_LOCKS, true);
3169 p->first->put(CInode::PIN_IMPORTINGCAPS);
3170 }
3171
3172 // send pending import_maps?
f67539c2 3173 mdcache->maybe_send_pending_resolves();
7c673cae
FG
3174
3175 // did i just import mydir?
3176 if (dir->ino() == MDS_INO_MDSDIR(mds->get_nodeid()))
f67539c2 3177 mdcache->populate_mydir();
7c673cae
FG
3178
3179 // is it empty?
3180 if (dir->get_num_head_items() == 0 &&
3181 !dir->inode->is_auth()) {
3182 // reexport!
3183 export_empty_import(dir);
3184 }
3185}
3186
11fdf7f2 3187void Migrator::decode_import_inode(CDentry *dn, bufferlist::const_iterator& blp,
7c673cae
FG
3188 mds_rank_t oldauth, LogSegment *ls,
3189 map<CInode*, map<client_t,Capability::Export> >& peer_exports,
3190 list<ScatterLock*>& updated_scatterlocks)
9f95a23c
TL
3191{
3192 CInode *in;
3193 bool added = false;
3194 DECODE_START(1, blp);
3195 dout(15) << " on " << *dn << dendl;
7c673cae
FG
3196
3197 inodeno_t ino;
3198 snapid_t last;
11fdf7f2
TL
3199 decode(ino, blp);
3200 decode(last, blp);
7c673cae 3201
f67539c2 3202 in = mdcache->get_inode(ino, last);
7c673cae 3203 if (!in) {
f67539c2 3204 in = new CInode(mds->mdcache, true, 2, last);
7c673cae
FG
3205 added = true;
3206 }
3207
3208 // state after link -- or not! -sage
3209 in->decode_import(blp, ls); // cap imports are noted for later action
3210
3211 // caps
3212 decode_import_inode_caps(in, true, blp, peer_exports);
3213
9f95a23c
TL
3214 DECODE_FINISH(blp);
3215
7c673cae
FG
3216 // link before state -- or not! -sage
3217 if (dn->get_linkage()->get_inode() != in) {
11fdf7f2 3218 ceph_assert(!dn->get_linkage()->get_inode());
7c673cae
FG
3219 dn->dir->link_primary_inode(dn, in);
3220 }
28e407b8
AA
3221
3222 if (in->is_dir())
3223 dn->dir->pop_lru_subdirs.push_back(&in->item_pop_lru);
7c673cae
FG
3224
3225 // add inode?
3226 if (added) {
f67539c2 3227 mdcache->add_inode(in);
7c673cae
FG
3228 dout(10) << "added " << *in << dendl;
3229 } else {
3230 dout(10) << " had " << *in << dendl;
3231 }
3232
f67539c2 3233 if (in->get_inode()->is_dirty_rstat())
7c673cae 3234 in->mark_dirty_rstat();
f91f0fd5 3235
f67539c2 3236 if (!in->get_inode()->client_ranges.empty())
f91f0fd5 3237 in->mark_clientwriteable();
7c673cae
FG
3238
3239 // clear if dirtyscattered, since we're going to journal this
3240 // but not until we _actually_ finish the import...
3241 if (in->filelock.is_dirty()) {
3242 updated_scatterlocks.push_back(&in->filelock);
3243 mds->locker->mark_updated_scatterlock(&in->filelock);
3244 }
3245
3246 if (in->dirfragtreelock.is_dirty()) {
3247 updated_scatterlocks.push_back(&in->dirfragtreelock);
3248 mds->locker->mark_updated_scatterlock(&in->dirfragtreelock);
3249 }
3250
3251 // adjust replica list
3252 //assert(!in->is_replica(oldauth)); // not true on failed export
3253 in->add_replica(oldauth, CInode::EXPORT_NONCE);
3254 if (in->is_replica(mds->get_nodeid()))
3255 in->remove_replica(mds->get_nodeid());
11fdf7f2
TL
3256
3257 if (in->snaplock.is_stable() &&
3258 in->snaplock.get_state() != LOCK_SYNC)
3259 mds->locker->try_eval(&in->snaplock, NULL);
9f95a23c
TL
3260
3261 if (in->policylock.is_stable() &&
3262 in->policylock.get_state() != LOCK_SYNC)
3263 mds->locker->try_eval(&in->policylock, NULL);
7c673cae
FG
3264}
3265
3266void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
11fdf7f2 3267 bufferlist::const_iterator &blp,
7c673cae
FG
3268 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
3269{
9f95a23c 3270 DECODE_START(1, blp);
7c673cae 3271 map<client_t,Capability::Export> cap_map;
11fdf7f2
TL
3272 decode(cap_map, blp);
3273 if (auth_cap) {
3274 mempool::mds_co::compact_map<int32_t,int32_t> mds_wanted;
3275 decode(mds_wanted, blp);
3276 mds_wanted.erase(mds->get_nodeid());
3277 in->set_mds_caps_wanted(mds_wanted);
3278 }
7c673cae 3279 if (!cap_map.empty() ||
b32b8144 3280 (auth_cap && (in->get_caps_wanted() & ~CEPH_CAP_PIN))) {
7c673cae
FG
3281 peer_exports[in].swap(cap_map);
3282 in->get(CInode::PIN_IMPORTINGCAPS);
3283 }
9f95a23c 3284 DECODE_FINISH(blp);
7c673cae
FG
3285}
3286
3287void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
28e407b8
AA
3288 const map<client_t,pair<Session*,uint64_t> >& session_map,
3289 const map<client_t,Capability::Export> &export_map,
7c673cae
FG
3290 map<client_t,Capability::Import> &import_map)
3291{
f91f0fd5
TL
3292 const auto& client_ranges = in->get_projected_inode()->client_ranges;
3293 auto r = client_ranges.cbegin();
3294 bool needs_recover = false;
3295
28e407b8 3296 for (auto& it : export_map) {
9f95a23c 3297 dout(10) << "for client." << it.first << " on " << *in << dendl;
28e407b8
AA
3298
3299 auto p = session_map.find(it.first);
3300 if (p == session_map.end()) {
3301 dout(10) << " no session for client." << it.first << dendl;
3302 (void)import_map[it.first];
3303 continue;
3304 }
7c673cae 3305
28e407b8
AA
3306 Session *session = p->second.first;
3307
3308 Capability *cap = in->get_client_cap(it.first);
7c673cae 3309 if (!cap) {
28e407b8 3310 cap = in->add_client_cap(it.first, session);
7c673cae
FG
3311 if (peer < 0)
3312 cap->mark_importing();
3313 }
3314
f91f0fd5
TL
3315 if (auth_cap) {
3316 while (r != client_ranges.cend() && r->first < it.first) {
3317 needs_recover = true;
3318 ++r;
3319 }
3320 if (r != client_ranges.cend() && r->first == it.first) {
3321 cap->mark_clientwriteable();
3322 ++r;
3323 }
3324 }
3325
1adf2230
AA
3326 // Always ask exporter mds to send cap export messages for auth caps.
3327 // For non-auth caps, ask exporter mds to send cap export messages to
3328 // clients who haven't opened sessions. The cap export messages will
3329 // make clients open sessions.
11fdf7f2 3330 if (auth_cap || !session->get_connection()) {
1adf2230
AA
3331 Capability::Import& im = import_map[it.first];
3332 im.cap_id = cap->get_cap_id();
3333 im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
3334 im.issue_seq = cap->get_last_seq() + 1;
3335 }
7c673cae
FG
3336
3337 if (peer >= 0) {
28e407b8 3338 cap->merge(it.second, auth_cap);
f67539c2 3339 mdcache->do_cap_import(session, in, cap, it.second.cap_id,
28e407b8 3340 it.second.seq, it.second.mseq - 1, peer,
7c673cae
FG
3341 auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
3342 }
3343 }
3344
f91f0fd5
TL
3345 if (auth_cap) {
3346 if (r != client_ranges.cend())
3347 needs_recover = true;
3348 if (needs_recover)
3349 in->state_set(CInode::STATE_NEEDSRECOVER);
3350 }
3351
7c673cae
FG
3352 if (peer >= 0) {
3353 in->replica_caps_wanted = 0;
3354 in->put(CInode::PIN_IMPORTINGCAPS);
3355 }
3356}
3357
9f95a23c 3358void Migrator::decode_import_dir(bufferlist::const_iterator& blp,
7c673cae
FG
3359 mds_rank_t oldauth,
3360 CDir *import_root,
3361 EImportStart *le,
3362 LogSegment *ls,
3363 map<CInode*,map<client_t,Capability::Export> >& peer_exports,
9f95a23c 3364 list<ScatterLock*>& updated_scatterlocks, int &num_imported)
7c673cae 3365{
9f95a23c 3366 DECODE_START(1, blp);
7c673cae
FG
3367 // set up dir
3368 dirfrag_t df;
11fdf7f2 3369 decode(df, blp);
7c673cae 3370
f67539c2 3371 CInode *diri = mdcache->get_inode(df.ino);
11fdf7f2 3372 ceph_assert(diri);
7c673cae 3373 CDir *dir = diri->get_or_open_dirfrag(mds->mdcache, df.frag);
11fdf7f2 3374 ceph_assert(dir);
7c673cae 3375
9f95a23c 3376 dout(7) << *dir << dendl;
7c673cae 3377
11fdf7f2
TL
3378 if (!dir->freeze_tree_state) {
3379 ceph_assert(dir->get_version() == 0);
3380 dir->freeze_tree_state = import_root->freeze_tree_state;
3381 }
3382
7c673cae 3383 // assimilate state
11fdf7f2 3384 dir->decode_import(blp, ls);
7c673cae
FG
3385
3386 // adjust replica list
3387 //assert(!dir->is_replica(oldauth)); // not true on failed export
3388 dir->add_replica(oldauth, CDir::EXPORT_NONCE);
3389 if (dir->is_replica(mds->get_nodeid()))
3390 dir->remove_replica(mds->get_nodeid());
3391
3392 // add to journal entry
3393 if (le)
3394 le->metablob.add_import_dir(dir);
3395
3396 int num_imported = 0;
3397
3398 // take all waiters on this dir
3399 // NOTE: a pass of imported data is guaranteed to get all of my waiters because
3400 // a replica's presense in my cache implies/forces it's presense in authority's.
11fdf7f2 3401 MDSContext::vec waiters;
7c673cae 3402 dir->take_waiting(CDir::WAIT_ANY_MASK, waiters);
11fdf7f2
TL
3403 for (auto c : waiters)
3404 dir->add_waiter(CDir::WAIT_UNFREEZE, c); // UNFREEZE will get kicked both on success or failure
7c673cae
FG
3405
3406 dout(15) << "doing contents" << dendl;
3407
3408 // contents
3409 __u32 nden;
11fdf7f2 3410 decode(nden, blp);
7c673cae
FG
3411
3412 for (; nden>0; nden--) {
3413 num_imported++;
3414
3415 // dentry
3416 string dname;
3417 snapid_t last;
11fdf7f2
TL
3418 decode(dname, blp);
3419 decode(last, blp);
7c673cae
FG
3420
3421 CDentry *dn = dir->lookup_exact_snap(dname, last);
3422 if (!dn)
3423 dn = dir->add_null_dentry(dname, 1, last);
3424
3425 dn->decode_import(blp, ls);
3426
3427 dn->add_replica(oldauth, CDentry::EXPORT_NONCE);
3428 if (dn->is_replica(mds->get_nodeid()))
3429 dn->remove_replica(mds->get_nodeid());
3430
3431 // dentry lock in unreadable state can block path traverse
3432 if (dn->lock.get_state() != LOCK_SYNC)
3433 mds->locker->try_eval(&dn->lock, NULL);
3434
9f95a23c 3435 dout(15) << " got " << *dn << dendl;
7c673cae
FG
3436
3437 // points to...
3438 char icode;
11fdf7f2 3439 decode(icode, blp);
7c673cae
FG
3440
3441 if (icode == 'N') {
3442 // null dentry
11fdf7f2 3443 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
3444
3445 // fall thru
3446 }
f67539c2 3447 else if (icode == 'L' || icode == 'l') {
7c673cae
FG
3448 // remote link
3449 inodeno_t ino;
3450 unsigned char d_type;
f67539c2
TL
3451 mempool::mds_co::string alternate_name;
3452
3453 CDentry::decode_remote(icode, ino, d_type, alternate_name, blp);
3454
7c673cae 3455 if (dn->get_linkage()->is_remote()) {
11fdf7f2 3456 ceph_assert(dn->get_linkage()->get_remote_ino() == ino);
f67539c2 3457 ceph_assert(dn->get_alternate_name() == alternate_name);
7c673cae
FG
3458 } else {
3459 dir->link_remote_inode(dn, ino, d_type);
f67539c2 3460 dn->set_alternate_name(std::move(alternate_name));
7c673cae
FG
3461 }
3462 }
f67539c2 3463 else if (icode == 'I' || icode == 'i') {
7c673cae 3464 // inode
11fdf7f2 3465 ceph_assert(le);
f67539c2
TL
3466 if (icode == 'i') {
3467 DECODE_START(2, blp);
3468 decode_import_inode(dn, blp, oldauth, ls,
3469 peer_exports, updated_scatterlocks);
3470 ceph_assert(!dn->is_projected());
3471 decode(dn->alternate_name, blp);
3472 DECODE_FINISH(blp);
3473 } else {
3474 decode_import_inode(dn, blp, oldauth, ls,
3475 peer_exports, updated_scatterlocks);
3476 }
7c673cae
FG
3477 }
3478
3479 // add dentry to journal entry
3480 if (le)
3481 le->metablob.add_import_dentry(dn);
3482 }
3483
3484#ifdef MDS_VERIFY_FRAGSTAT
3485 if (dir->is_complete())
3486 dir->verify_fragstat();
3487#endif
3488
3489 dir->inode->maybe_export_pin();
3490
9f95a23c
TL
3491 dout(7) << " done " << *dir << dendl;
3492 DECODE_FINISH(blp);
7c673cae
FG
3493}
3494
3495
3496
3497
3498
3499// authority bystander
3500
9f95a23c 3501void Migrator::handle_export_notify(const cref_t<MExportDirNotify> &m)
7c673cae
FG
3502{
3503 if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) {
7c673cae
FG
3504 return;
3505 }
3506
f67539c2 3507 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae
FG
3508
3509 mds_rank_t from = mds_rank_t(m->get_source().num());
3510 mds_authority_t old_auth = m->get_old_auth();
3511 mds_authority_t new_auth = m->get_new_auth();
3512
3513 if (!dir) {
9f95a23c 3514 dout(7) << old_auth << " -> " << new_auth
7c673cae
FG
3515 << " on missing dir " << m->get_dirfrag() << dendl;
3516 } else if (dir->authority() != old_auth) {
9f95a23c 3517 dout(7) << "old_auth was " << dir->authority()
7c673cae
FG
3518 << " != " << old_auth << " -> " << new_auth
3519 << " on " << *dir << dendl;
3520 } else {
9f95a23c 3521 dout(7) << old_auth << " -> " << new_auth
7c673cae
FG
3522 << " on " << *dir << dendl;
3523 // adjust auth
3524 set<CDir*> have;
f67539c2
TL
3525 mdcache->map_dirfrag_set(m->get_bounds(), have);
3526 mdcache->adjust_bounded_subtree_auth(dir, have, new_auth);
7c673cae
FG
3527
3528 // induce a merge?
f67539c2 3529 mdcache->try_subtree_merge(dir);
7c673cae
FG
3530 }
3531
3532 // send ack
3533 if (m->wants_ack()) {
9f95a23c 3534 mds->send_message_mds(make_message<MExportDirNotifyAck>(m->get_dirfrag(), m->get_tid(), m->get_new_auth()), from);
7c673cae
FG
3535 } else {
3536 // aborted. no ack.
9f95a23c 3537 dout(7) << "no ack requested" << dendl;
7c673cae 3538 }
7c673cae
FG
3539}
3540
3541/** cap exports **/
3542void Migrator::export_caps(CInode *in)
3543{
3544 mds_rank_t dest = in->authority().first;
9f95a23c 3545 dout(7) << "to mds." << dest << " " << *in << dendl;
7c673cae 3546
11fdf7f2
TL
3547 ceph_assert(in->is_any_caps());
3548 ceph_assert(!in->is_auth());
3549 ceph_assert(!in->is_ambiguous_auth());
3550 ceph_assert(!in->state_test(CInode::STATE_EXPORTINGCAPS));
7c673cae 3551
9f95a23c 3552 auto ex = make_message<MExportCaps>();
7c673cae
FG
3553 ex->ino = in->ino();
3554
11fdf7f2 3555 encode_export_inode_caps(in, false, ex->cap_bl, ex->client_map, ex->client_metadata_map);
7c673cae
FG
3556
3557 mds->send_message_mds(ex, dest);
3558}
3559
9f95a23c 3560void Migrator::handle_export_caps_ack(const cref_t<MExportCapsAck> &ack)
1adf2230
AA
3561{
3562 mds_rank_t from = ack->get_source().num();
f67539c2 3563 CInode *in = mdcache->get_inode(ack->ino);
1adf2230 3564 if (in) {
11fdf7f2 3565 ceph_assert(!in->is_auth());
1adf2230 3566
9f95a23c 3567 dout(10) << *ack << " from "
1adf2230
AA
3568 << ack->get_source() << " on " << *in << dendl;
3569
3570 map<client_t,Capability::Import> imported_caps;
3571 map<client_t,uint64_t> caps_ids;
11fdf7f2
TL
3572 auto blp = ack->cap_bl.cbegin();
3573 decode(imported_caps, blp);
3574 decode(caps_ids, blp);
1adf2230
AA
3575
3576 for (auto& it : imported_caps) {
3577 Capability *cap = in->get_client_cap(it.first);
3578 if (!cap || cap->get_cap_id() != caps_ids.at(it.first))
3579 continue;
3580
9f95a23c 3581 dout(7) << " telling client." << it.first
1adf2230 3582 << " exported caps on " << *in << dendl;
9f95a23c 3583 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
1adf2230
AA
3584 cap->get_cap_id(), cap->get_mseq(),
3585 mds->get_osd_epoch_barrier());
3586 m->set_cap_peer(it.second.cap_id, it.second.issue_seq, it.second.mseq, from, 0);
3587 mds->send_message_client_counted(m, it.first);
3588
3589 in->remove_client_cap(it.first);
3590 }
3591
3592 mds->locker->request_inode_file_caps(in);
3593 mds->locker->try_eval(in, CEPH_CAP_LOCKS);
3594 }
1adf2230
AA
3595}
3596
9f95a23c 3597void Migrator::handle_gather_caps(const cref_t<MGatherCaps> &m)
7c673cae 3598{
f67539c2 3599 CInode *in = mdcache->get_inode(m->ino);
7c673cae 3600 if (!in)
11fdf7f2 3601 return;
7c673cae 3602
9f95a23c 3603 dout(10) << *m << " from " << m->get_source()
1adf2230
AA
3604 << " on " << *in << dendl;
3605
7c673cae
FG
3606 if (in->is_any_caps() &&
3607 !in->is_auth() &&
3608 !in->is_ambiguous_auth() &&
3609 !in->state_test(CInode::STATE_EXPORTINGCAPS))
3610 export_caps(in);
7c673cae
FG
3611}
3612
3613class C_M_LoggedImportCaps : public MigratorLogContext {
3614 CInode *in;
3615 mds_rank_t from;
3616public:
28e407b8 3617 map<client_t,pair<Session*,uint64_t> > imported_session_map;
7c673cae 3618 map<CInode*, map<client_t,Capability::Export> > peer_exports;
7c673cae
FG
3619
3620 C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
3621 void finish(int r) override {
28e407b8 3622 mig->logged_import_caps(in, from, imported_session_map, peer_exports);
7c673cae
FG
3623 }
3624};
3625
9f95a23c 3626void Migrator::handle_export_caps(const cref_t<MExportCaps> &ex)
7c673cae 3627{
9f95a23c 3628 dout(10) << *ex << " from " << ex->get_source() << dendl;
f67539c2 3629 CInode *in = mdcache->get_inode(ex->ino);
7c673cae 3630
11fdf7f2
TL
3631 ceph_assert(in);
3632 ceph_assert(in->is_auth());
7c673cae
FG
3633
3634 // FIXME
28e407b8 3635 if (!in->can_auth_pin()) {
7c673cae 3636 return;
28e407b8
AA
3637 }
3638
181888fb 3639 in->auth_pin(this);
7c673cae 3640
11fdf7f2
TL
3641 map<client_t,entity_inst_t> client_map{ex->client_map};
3642 map<client_t,client_metadata_t> client_metadata_map{ex->client_metadata_map};
28e407b8 3643
7c673cae
FG
3644 C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
3645 this, in, mds_rank_t(ex->get_source().num()));
7c673cae 3646
11fdf7f2 3647 version_t pv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
28e407b8 3648 finish->imported_session_map);
7c673cae 3649 // decode new caps
11fdf7f2 3650 auto blp = ex->cap_bl.cbegin();
7c673cae 3651 decode_import_inode_caps(in, false, blp, finish->peer_exports);
11fdf7f2 3652 ceph_assert(!finish->peer_exports.empty()); // thus, inode is pinned.
7c673cae
FG
3653
3654 // journal open client sessions
11fdf7f2
TL
3655 ESessions *le = new ESessions(pv, std::move(client_map),
3656 std::move(client_metadata_map));
7c673cae
FG
3657 mds->mdlog->start_submit_entry(le, finish);
3658 mds->mdlog->flush();
7c673cae
FG
3659}
3660
3661
3662void Migrator::logged_import_caps(CInode *in,
3663 mds_rank_t from,
28e407b8
AA
3664 map<client_t,pair<Session*,uint64_t> >& imported_session_map,
3665 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
7c673cae 3666{
9f95a23c 3667 dout(10) << *in << dendl;
7c673cae 3668 // see export_go() vs export_go_synced()
11fdf7f2 3669 ceph_assert(in->is_auth());
7c673cae
FG
3670
3671 // force open client sessions and finish cap import
28e407b8 3672 mds->server->finish_force_open_sessions(imported_session_map);
7c673cae 3673
28e407b8 3674 auto it = peer_exports.find(in);
11fdf7f2 3675 ceph_assert(it != peer_exports.end());
28e407b8 3676
7c673cae 3677 // clients will release caps from the exporter when they receive the cap import message.
1adf2230 3678 map<client_t,Capability::Import> imported_caps;
28e407b8 3679 finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
7c673cae 3680 mds->locker->eval(in, CEPH_CAP_LOCKS, true);
1adf2230
AA
3681
3682 if (!imported_caps.empty()) {
9f95a23c 3683 auto ack = make_message<MExportCapsAck>(in->ino());
1adf2230
AA
3684 map<client_t,uint64_t> peer_caps_ids;
3685 for (auto &p : imported_caps )
3686 peer_caps_ids[p.first] = it->second.at(p.first).cap_id;
3687
11fdf7f2
TL
3688 encode(imported_caps, ack->cap_bl);
3689 encode(peer_caps_ids, ack->cap_bl);
1adf2230
AA
3690 mds->send_message_mds(ack, from);
3691 }
3692
181888fb 3693 in->auth_unpin(this);
7c673cae 3694}
28e407b8 3695
f67539c2 3696Migrator::Migrator(MDSRank *m, MDCache *c) : mds(m), mdcache(c) {
11fdf7f2
TL
3697 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
3698 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
91327a77
AA
3699}
3700
92f5a8d4 3701void Migrator::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
28e407b8 3702{
91327a77 3703 if (changed.count("mds_max_export_size"))
11fdf7f2 3704 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
28e407b8 3705 if (changed.count("mds_inject_migrator_session_race")) {
92f5a8d4 3706 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
28e407b8
AA
3707 dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
3708 }
3709}