]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/Migrator.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / mds / Migrator.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSRank.h"
16#include "MDCache.h"
17#include "CInode.h"
18#include "CDir.h"
19#include "CDentry.h"
20#include "Migrator.h"
21#include "Locker.h"
22#include "Server.h"
23
24#include "MDBalancer.h"
25#include "MDLog.h"
26#include "MDSMap.h"
27#include "Mutation.h"
28
29#include "include/filepath.h"
28e407b8 30#include "common/likely.h"
7c673cae
FG
31
32#include "events/EExport.h"
33#include "events/EImportStart.h"
34#include "events/EImportFinish.h"
35#include "events/ESessions.h"
36
37#include "msg/Messenger.h"
38
39#include "messages/MClientCaps.h"
40
7c673cae
FG
41/*
42 * this is what the dir->dir_auth values look like
43 *
44 * dir_auth authbits
45 * export
46 * me me - before
47 * me, me me - still me, but preparing for export
48 * me, them me - send MExportDir (peer is preparing)
49 * them, me me - journaled EExport
50 * them them - done
51 *
52 * import:
53 * them them - before
54 * me, them me - journaled EImportStart
55 * me me - done
56 *
57 * which implies:
58 * - auth bit is set if i am listed as first _or_ second dir_auth.
59 */
60
61#include "common/config.h"
62
63
64#define dout_context g_ceph_context
65#define dout_subsys ceph_subsys_mds
66#undef dout_prefix
9f95a23c 67#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".mig " << __func__ << " "
7c673cae 68
20effc67 69using namespace std;
7c673cae 70
11fdf7f2 71class MigratorContext : public MDSContext {
7c673cae
FG
72protected:
73 Migrator *mig;
74 MDSRank *get_mds() override {
75 return mig->mds;
76 }
77public:
78 explicit MigratorContext(Migrator *mig_) : mig(mig_) {
11fdf7f2 79 ceph_assert(mig != NULL);
7c673cae
FG
80 }
81};
82
83class MigratorLogContext : public MDSLogContextBase {
84protected:
85 Migrator *mig;
86 MDSRank *get_mds() override {
87 return mig->mds;
88 }
89public:
90 explicit MigratorLogContext(Migrator *mig_) : mig(mig_) {
11fdf7f2 91 ceph_assert(mig != NULL);
7c673cae
FG
92 }
93};
94
9f95a23c 95void Migrator::dispatch(const cref_t<Message> &m)
7c673cae
FG
96{
97 switch (m->get_type()) {
98 // import
99 case MSG_MDS_EXPORTDIRDISCOVER:
9f95a23c 100 handle_export_discover(ref_cast<MExportDirDiscover>(m));
7c673cae
FG
101 break;
102 case MSG_MDS_EXPORTDIRPREP:
9f95a23c 103 handle_export_prep(ref_cast<MExportDirPrep>(m));
7c673cae
FG
104 break;
105 case MSG_MDS_EXPORTDIR:
28e407b8
AA
106 if (unlikely(inject_session_race)) {
107 dout(0) << "waiting for inject_session_race" << dendl;
108 mds->wait_for_any_client_connection(new C_MDS_RetryMessage(mds, m));
109 } else {
9f95a23c 110 handle_export_dir(ref_cast<MExportDir>(m));
28e407b8 111 }
7c673cae
FG
112 break;
113 case MSG_MDS_EXPORTDIRFINISH:
9f95a23c 114 handle_export_finish(ref_cast<MExportDirFinish>(m));
7c673cae
FG
115 break;
116 case MSG_MDS_EXPORTDIRCANCEL:
9f95a23c 117 handle_export_cancel(ref_cast<MExportDirCancel>(m));
7c673cae
FG
118 break;
119
120 // export
121 case MSG_MDS_EXPORTDIRDISCOVERACK:
9f95a23c 122 handle_export_discover_ack(ref_cast<MExportDirDiscoverAck>(m));
7c673cae
FG
123 break;
124 case MSG_MDS_EXPORTDIRPREPACK:
9f95a23c 125 handle_export_prep_ack(ref_cast<MExportDirPrepAck>(m));
7c673cae
FG
126 break;
127 case MSG_MDS_EXPORTDIRACK:
9f95a23c 128 handle_export_ack(ref_cast<MExportDirAck>(m));
7c673cae
FG
129 break;
130 case MSG_MDS_EXPORTDIRNOTIFYACK:
9f95a23c 131 handle_export_notify_ack(ref_cast<MExportDirNotifyAck>(m));
11fdf7f2 132 break;
7c673cae
FG
133
134 // export 3rd party (dir_auth adjustments)
135 case MSG_MDS_EXPORTDIRNOTIFY:
9f95a23c 136 handle_export_notify(ref_cast<MExportDirNotify>(m));
7c673cae
FG
137 break;
138
139 // caps
140 case MSG_MDS_EXPORTCAPS:
9f95a23c 141 handle_export_caps(ref_cast<MExportCaps>(m));
7c673cae 142 break;
1adf2230 143 case MSG_MDS_EXPORTCAPSACK:
9f95a23c 144 handle_export_caps_ack(ref_cast<MExportCapsAck>(m));
1adf2230 145 break;
7c673cae 146 case MSG_MDS_GATHERCAPS:
9f95a23c 147 handle_gather_caps(ref_cast<MGatherCaps>(m));
7c673cae
FG
148 break;
149
150 default:
151 derr << "migrator unknown message " << m->get_type() << dendl;
11fdf7f2 152 ceph_abort_msg("migrator unknown message");
7c673cae
FG
153 }
154}
155
7c673cae
FG
156void Migrator::export_empty_import(CDir *dir)
157{
9f95a23c 158 dout(7) << *dir << dendl;
11fdf7f2 159 ceph_assert(dir->is_subtree_root());
7c673cae
FG
160
161 if (dir->inode->is_auth()) {
162 dout(7) << " inode is auth" << dendl;
163 return;
164 }
165 if (!dir->is_auth()) {
166 dout(7) << " not auth" << dendl;
167 return;
168 }
169 if (dir->is_freezing() || dir->is_frozen()) {
170 dout(7) << " freezing or frozen" << dendl;
171 return;
172 }
173 if (dir->get_num_head_items() > 0) {
174 dout(7) << " not actually empty" << dendl;
175 return;
176 }
177 if (dir->inode->is_root()) {
178 dout(7) << " root" << dendl;
179 return;
180 }
181
182 mds_rank_t dest = dir->inode->authority().first;
183 //if (mds->is_shutting_down()) dest = 0; // this is more efficient.
184
185 dout(7) << " really empty, exporting to " << dest << dendl;
186 assert (dest != mds->get_nodeid());
187
188 dout(7) << "exporting to mds." << dest
189 << " empty import " << *dir << dendl;
190 export_dir( dir, dest );
191}
192
193void Migrator::find_stale_export_freeze()
194{
195 utime_t now = ceph_clock_now();
196 utime_t cutoff = now;
11fdf7f2 197 cutoff -= g_conf()->mds_freeze_tree_timeout;
7c673cae
FG
198
199
200 /*
201 * We could have situations like:
202 *
203 * - mds.0 authpins an item in subtree A
204 * - mds.0 sends request to mds.1 to authpin an item in subtree B
205 * - mds.0 freezes subtree A
206 * - mds.1 authpins an item in subtree B
207 * - mds.1 sends request to mds.0 to authpin an item in subtree A
208 * - mds.1 freezes subtree B
209 * - mds.1 receives the remote authpin request from mds.0
210 * (wait because subtree B is freezing)
211 * - mds.0 receives the remote authpin request from mds.1
212 * (wait because subtree A is freezing)
213 *
214 *
215 * - client request authpins items in subtree B
216 * - freeze subtree B
217 * - import subtree A which is parent of subtree B
218 * (authpins parent inode of subtree B, see CDir::set_dir_auth())
219 * - freeze subtree A
220 * - client request tries authpinning items in subtree A
221 * (wait because subtree A is freezing)
222 */
223 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
224 p != export_state.end(); ) {
225 CDir* dir = p->first;
226 export_state_t& stat = p->second;
227 ++p;
228 if (stat.state != EXPORT_DISCOVERING && stat.state != EXPORT_FREEZING)
229 continue;
11fdf7f2
TL
230 ceph_assert(dir->freeze_tree_state);
231 if (stat.last_cum_auth_pins != dir->freeze_tree_state->auth_pins) {
232 stat.last_cum_auth_pins = dir->freeze_tree_state->auth_pins;
7c673cae
FG
233 stat.last_cum_auth_pins_change = now;
234 continue;
235 }
236 if (stat.last_cum_auth_pins_change >= cutoff)
237 continue;
238 if (stat.num_remote_waiters > 0 ||
239 (!dir->inode->is_root() && dir->get_parent_dir()->is_freezing())) {
240 export_try_cancel(dir);
241 }
242 }
243}
244
245void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
246{
9f95a23c 247 dout(10) << *dir << dendl;
7c673cae
FG
248
249 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
11fdf7f2 250 ceph_assert(it != export_state.end());
7c673cae
FG
251
252 int state = it->second.state;
253 switch (state) {
254 case EXPORT_LOCKING:
255 dout(10) << "export state=locking : dropping locks and removing auth_pin" << dendl;
91327a77 256 num_locking_exports--;
7c673cae
FG
257 it->second.state = EXPORT_CANCELLED;
258 dir->auth_unpin(this);
259 break;
260 case EXPORT_DISCOVERING:
261 dout(10) << "export state=discovering : canceling freeze and removing auth_pin" << dendl;
262 it->second.state = EXPORT_CANCELLED;
263 dir->unfreeze_tree(); // cancel the freeze
264 dir->auth_unpin(this);
265 if (notify_peer &&
266 (!mds->is_cluster_degraded() ||
267 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
9f95a23c
TL
268 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
269 it->second.tid),
270 it->second.peer);
7c673cae
FG
271 break;
272
273 case EXPORT_FREEZING:
274 dout(10) << "export state=freezing : canceling freeze" << dendl;
275 it->second.state = EXPORT_CANCELLED;
276 dir->unfreeze_tree(); // cancel the freeze
224ce89b 277 if (dir->is_subtree_root())
f67539c2 278 mdcache->try_subtree_merge(dir);
7c673cae
FG
279 if (notify_peer &&
280 (!mds->is_cluster_degraded() ||
281 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
9f95a23c
TL
282 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
283 it->second.tid),
284 it->second.peer);
7c673cae
FG
285 break;
286
287 // NOTE: state order reversal, warning comes after prepping
288 case EXPORT_WARNING:
289 dout(10) << "export state=warning : unpinning bounds, unfreezing, notifying" << dendl;
290 it->second.state = EXPORT_CANCELLING;
291 // fall-thru
292
293 case EXPORT_PREPPING:
294 if (state != EXPORT_WARNING) {
295 dout(10) << "export state=prepping : unpinning bounds, unfreezing" << dendl;
296 it->second.state = EXPORT_CANCELLED;
297 }
298
299 {
300 // unpin bounds
301 set<CDir*> bounds;
f67539c2 302 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
303 for (set<CDir*>::iterator q = bounds.begin();
304 q != bounds.end();
305 ++q) {
306 CDir *bd = *q;
307 bd->put(CDir::PIN_EXPORTBOUND);
308 bd->state_clear(CDir::STATE_EXPORTBOUND);
309 }
310 if (state == EXPORT_WARNING) {
311 // notify bystanders
b32b8144 312 export_notify_abort(dir, it->second, bounds);
7c673cae 313 // process delayed expires
f67539c2 314 mdcache->process_delayed_expire(dir);
7c673cae
FG
315 }
316 }
317 dir->unfreeze_tree();
f67539c2 318 mdcache->try_subtree_merge(dir);
7c673cae
FG
319 if (notify_peer &&
320 (!mds->is_cluster_degraded() ||
321 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
9f95a23c
TL
322 mds->send_message_mds(make_message<MExportDirCancel>(dir->dirfrag(),
323 it->second.tid),
324 it->second.peer);
7c673cae
FG
325 break;
326
327 case EXPORT_EXPORTING:
328 dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
329 it->second.state = EXPORT_CANCELLING;
b32b8144 330 export_reverse(dir, it->second);
7c673cae
FG
331 break;
332
333 case EXPORT_LOGGINGFINISH:
334 case EXPORT_NOTIFYING:
335 dout(10) << "export state=loggingfinish|notifying : ignoring dest failure, we were successful." << dendl;
336 // leave export_state, don't clean up now.
337 break;
338 case EXPORT_CANCELLING:
339 break;
340
341 default:
342 ceph_abort();
343 }
344
345 // finish clean-up?
346 if (it->second.state == EXPORT_CANCELLING ||
347 it->second.state == EXPORT_CANCELLED) {
348 MutationRef mut;
349 mut.swap(it->second.mut);
350
351 if (it->second.state == EXPORT_CANCELLED) {
91327a77 352 export_cancel_finish(it);
7c673cae
FG
353 }
354
355 // drop locks
356 if (state == EXPORT_LOCKING || state == EXPORT_DISCOVERING) {
357 MDRequestRef mdr = static_cast<MDRequestImpl*>(mut.get());
11fdf7f2 358 ceph_assert(mdr);
f67539c2 359 mdcache->request_kill(mdr);
7c673cae
FG
360 } else if (mut) {
361 mds->locker->drop_locks(mut.get());
362 mut->cleanup();
363 }
364
f67539c2 365 mdcache->show_subtrees();
7c673cae
FG
366
367 maybe_do_queued_export();
368 }
369}
370
91327a77 371void Migrator::export_cancel_finish(export_state_iterator& it)
7c673cae 372{
91327a77
AA
373 CDir *dir = it->first;
374 bool unpin = (it->second.state == EXPORT_CANCELLING);
375 auto parent = std::move(it->second.parent);
376
377 total_exporting_size -= it->second.approx_size;
378 export_state.erase(it);
379
11fdf7f2 380 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
1adf2230 381 dir->clear_exporting();
7c673cae 382
91327a77
AA
383 if (unpin) {
384 // pinned by Migrator::export_notify_abort()
385 dir->auth_unpin(this);
386 }
7c673cae 387 // send pending import_maps? (these need to go out when all exports have finished.)
f67539c2 388 mdcache->maybe_send_pending_resolves();
91327a77
AA
389
390 if (parent)
391 child_export_finish(parent, false);
7c673cae
FG
392}
393
394// ==========================================================
395// mds failure handling
396
397void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
398{
9f95a23c 399 dout(5) << who << dendl;
7c673cae
FG
400
401 // check my exports
402
403 // first add an extra auth_pin on any freezes, so that canceling a
404 // nested freeze doesn't complete one further up the hierarchy and
405 // confuse the shit out of us. we'll remove it after canceling the
406 // freeze. this way no freeze completions run before we want them
407 // to.
9f95a23c 408 std::vector<CDir*> pinned_dirs;
7c673cae
FG
409 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
410 p != export_state.end();
411 ++p) {
412 if (p->second.state == EXPORT_FREEZING) {
413 CDir *dir = p->first;
414 dout(10) << "adding temp auth_pin on freezing " << *dir << dendl;
415 dir->auth_pin(this);
416 pinned_dirs.push_back(dir);
417 }
418 }
419
420 map<CDir*,export_state_t>::iterator p = export_state.begin();
421 while (p != export_state.end()) {
422 map<CDir*,export_state_t>::iterator next = p;
423 ++next;
424 CDir *dir = p->first;
425
426 // abort exports:
427 // - that are going to the failed node
428 // - that aren't frozen yet (to avoid auth_pin deadlock)
429 // - they havne't prepped yet (they may need to discover bounds to do that)
430 if ((p->second.peer == who &&
431 p->second.state != EXPORT_CANCELLING) ||
432 p->second.state == EXPORT_LOCKING ||
433 p->second.state == EXPORT_DISCOVERING ||
434 p->second.state == EXPORT_FREEZING ||
435 p->second.state == EXPORT_PREPPING) {
436 // the guy i'm exporting to failed, or we're just freezing.
437 dout(10) << "cleaning up export state (" << p->second.state << ")"
438 << get_export_statename(p->second.state) << " of " << *dir << dendl;
439 export_try_cancel(dir);
440 } else if (p->second.peer != who) {
441 // bystander failed.
442 if (p->second.warning_ack_waiting.erase(who)) {
443 if (p->second.state == EXPORT_WARNING) {
444 p->second.notify_ack_waiting.erase(who); // they won't get a notify either.
445 // exporter waiting for warning acks, let's fake theirs.
446 dout(10) << "faking export_warning_ack from mds." << who
447 << " on " << *dir << " to mds." << p->second.peer
448 << dendl;
449 if (p->second.warning_ack_waiting.empty())
450 export_go(dir);
451 }
452 }
453 if (p->second.notify_ack_waiting.erase(who)) {
454 // exporter is waiting for notify acks, fake it
455 dout(10) << "faking export_notify_ack from mds." << who
456 << " on " << *dir << " to mds." << p->second.peer
457 << dendl;
458 if (p->second.state == EXPORT_NOTIFYING) {
459 if (p->second.notify_ack_waiting.empty())
460 export_finish(dir);
461 } else if (p->second.state == EXPORT_CANCELLING) {
462 if (p->second.notify_ack_waiting.empty()) {
91327a77 463 export_cancel_finish(p);
7c673cae
FG
464 }
465 }
466 }
467 }
468
469 // next!
470 p = next;
471 }
472
473
474 // check my imports
475 map<dirfrag_t,import_state_t>::iterator q = import_state.begin();
476 while (q != import_state.end()) {
477 map<dirfrag_t,import_state_t>::iterator next = q;
478 ++next;
479 dirfrag_t df = q->first;
f67539c2
TL
480 CInode *diri = mdcache->get_inode(df.ino);
481 CDir *dir = mdcache->get_dirfrag(df);
7c673cae
FG
482
483 if (q->second.peer == who) {
484 if (dir)
485 dout(10) << "cleaning up import state (" << q->second.state << ")"
486 << get_import_statename(q->second.state) << " of " << *dir << dendl;
487 else
488 dout(10) << "cleaning up import state (" << q->second.state << ")"
489 << get_import_statename(q->second.state) << " of " << df << dendl;
490
491 switch (q->second.state) {
492 case IMPORT_DISCOVERING:
493 dout(10) << "import state=discovering : clearing state" << dendl;
494 import_reverse_discovering(df);
495 break;
496
497 case IMPORT_DISCOVERED:
11fdf7f2 498 ceph_assert(diri);
7c673cae
FG
499 dout(10) << "import state=discovered : unpinning inode " << *diri << dendl;
500 import_reverse_discovered(df, diri);
501 break;
502
503 case IMPORT_PREPPING:
11fdf7f2 504 ceph_assert(dir);
7c673cae 505 dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
b32b8144 506 import_reverse_prepping(dir, q->second);
7c673cae
FG
507 break;
508
509 case IMPORT_PREPPED:
11fdf7f2 510 ceph_assert(dir);
7c673cae
FG
511 dout(10) << "import state=prepped : unpinning base+bounds, unfreezing " << *dir << dendl;
512 {
513 set<CDir*> bounds;
f67539c2 514 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
515 import_remove_pins(dir, bounds);
516
517 // adjust auth back to the exporter
f67539c2 518 mdcache->adjust_subtree_auth(dir, q->second.peer);
7c673cae
FG
519
520 // notify bystanders ; wait in aborting state
b32b8144 521 q->second.state = IMPORT_ABORTING;
7c673cae 522 import_notify_abort(dir, bounds);
11fdf7f2 523 ceph_assert(g_conf()->mds_kill_import_at != 10);
7c673cae
FG
524 }
525 break;
526
527 case IMPORT_LOGGINGSTART:
11fdf7f2 528 ceph_assert(dir);
7c673cae
FG
529 dout(10) << "import state=loggingstart : reversing import on " << *dir << dendl;
530 import_reverse(dir);
531 break;
532
533 case IMPORT_ACKING:
11fdf7f2 534 ceph_assert(dir);
7c673cae
FG
535 // hrm. make this an ambiguous import, and wait for exporter recovery to disambiguate
536 dout(10) << "import state=acking : noting ambiguous import " << *dir << dendl;
537 {
538 set<CDir*> bounds;
f67539c2
TL
539 mdcache->get_subtree_bounds(dir, bounds);
540 mdcache->add_ambiguous_import(dir, bounds);
7c673cae
FG
541 }
542 break;
543
544 case IMPORT_FINISHING:
11fdf7f2 545 ceph_assert(dir);
7c673cae
FG
546 dout(10) << "import state=finishing : finishing import on " << *dir << dendl;
547 import_finish(dir, true);
548 break;
549
550 case IMPORT_ABORTING:
11fdf7f2 551 ceph_assert(dir);
7c673cae
FG
552 dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl;
553 break;
554 }
555 } else {
556 auto bystanders_entry = q->second.bystanders.find(who);
557 if (bystanders_entry != q->second.bystanders.end()) {
558 q->second.bystanders.erase(bystanders_entry);
559 if (q->second.state == IMPORT_ABORTING) {
11fdf7f2 560 ceph_assert(dir);
7c673cae
FG
561 dout(10) << "faking export_notify_ack from mds." << who
562 << " on aborting import " << *dir << " from mds." << q->second.peer
563 << dendl;
224ce89b 564 if (q->second.bystanders.empty())
7c673cae 565 import_reverse_unfreeze(dir);
7c673cae
FG
566 }
567 }
568 }
569
570 // next!
571 q = next;
572 }
573
9f95a23c 574 for (const auto& dir : pinned_dirs) {
7c673cae
FG
575 dout(10) << "removing temp auth_pin on " << *dir << dendl;
576 dir->auth_unpin(this);
7c673cae
FG
577 }
578}
579
580
581
582void Migrator::show_importing()
583{
9f95a23c 584 dout(10) << dendl;
7c673cae
FG
585 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
586 p != import_state.end();
587 ++p) {
f67539c2 588 CDir *dir = mdcache->get_dirfrag(p->first);
7c673cae
FG
589 if (dir) {
590 dout(10) << " importing from " << p->second.peer
591 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
592 << " " << p->first << " " << *dir << dendl;
593 } else {
594 dout(10) << " importing from " << p->second.peer
595 << ": (" << p->second.state << ") " << get_import_statename(p->second.state)
596 << " " << p->first << dendl;
597 }
598 }
599}
600
601void Migrator::show_exporting()
602{
9f95a23c
TL
603 dout(10) << dendl;
604 for (const auto& [dir, state] : export_state) {
605 dout(10) << " exporting to " << state.peer
606 << ": (" << state.state << ") " << get_export_statename(state.state)
607 << " " << dir->dirfrag() << " " << *dir << dendl;
608 }
7c673cae
FG
609}
610
611
612
613void Migrator::audit()
614{
11fdf7f2 615 if (!g_conf()->subsys.should_gather<ceph_subsys_mds, 5>())
7c673cae
FG
616 return; // hrm.
617
618 // import_state
619 show_importing();
620 for (map<dirfrag_t,import_state_t>::iterator p = import_state.begin();
621 p != import_state.end();
622 ++p) {
623 if (p->second.state == IMPORT_DISCOVERING)
624 continue;
625 if (p->second.state == IMPORT_DISCOVERED) {
f67539c2 626 CInode *in = mdcache->get_inode(p->first.ino);
11fdf7f2 627 ceph_assert(in);
7c673cae
FG
628 continue;
629 }
f67539c2 630 CDir *dir = mdcache->get_dirfrag(p->first);
11fdf7f2 631 ceph_assert(dir);
7c673cae
FG
632 if (p->second.state == IMPORT_PREPPING)
633 continue;
634 if (p->second.state == IMPORT_ABORTING) {
11fdf7f2
TL
635 ceph_assert(!dir->is_ambiguous_dir_auth());
636 ceph_assert(dir->get_dir_auth().first != mds->get_nodeid());
7c673cae
FG
637 continue;
638 }
11fdf7f2
TL
639 ceph_assert(dir->is_ambiguous_dir_auth());
640 ceph_assert(dir->authority().first == mds->get_nodeid() ||
7c673cae
FG
641 dir->authority().second == mds->get_nodeid());
642 }
643
644 // export_state
645 show_exporting();
646 for (map<CDir*,export_state_t>::iterator p = export_state.begin();
647 p != export_state.end();
648 ++p) {
649 CDir *dir = p->first;
650 if (p->second.state == EXPORT_LOCKING ||
651 p->second.state == EXPORT_DISCOVERING ||
652 p->second.state == EXPORT_FREEZING ||
653 p->second.state == EXPORT_CANCELLING)
654 continue;
11fdf7f2
TL
655 ceph_assert(dir->is_ambiguous_dir_auth());
656 ceph_assert(dir->authority().first == mds->get_nodeid() ||
7c673cae
FG
657 dir->authority().second == mds->get_nodeid());
658 }
659
660 // ambiguous+me subtrees should be importing|exporting
661
662 // write me
663}
664
665
666
667
668
669// ==========================================================
670// EXPORT
671
672void Migrator::export_dir_nicely(CDir *dir, mds_rank_t dest)
673{
674 // enqueue
9f95a23c 675 dout(7) << *dir << " to " << dest << dendl;
7c673cae
FG
676 export_queue.push_back(pair<dirfrag_t,mds_rank_t>(dir->dirfrag(), dest));
677
678 maybe_do_queued_export();
679}
680
681void Migrator::maybe_do_queued_export()
682{
683 static bool running;
684 if (running)
685 return;
686 running = true;
91327a77
AA
687
688 uint64_t max_total_size = max_export_size * 2;
689
7c673cae 690 while (!export_queue.empty() &&
91327a77
AA
691 max_total_size > total_exporting_size &&
692 max_total_size - total_exporting_size >=
693 max_export_size * (num_locking_exports + 1)) {
694
7c673cae
FG
695 dirfrag_t df = export_queue.front().first;
696 mds_rank_t dest = export_queue.front().second;
697 export_queue.pop_front();
698
f67539c2 699 CDir *dir = mdcache->get_dirfrag(df);
7c673cae
FG
700 if (!dir) continue;
701 if (!dir->is_auth()) continue;
702
9f95a23c 703 dout(7) << "nicely exporting to mds." << dest << " " << *dir << dendl;
7c673cae
FG
704
705 export_dir(dir, dest);
706 }
91327a77 707
7c673cae
FG
708 running = false;
709}
710
711
712
713
714class C_MDC_ExportFreeze : public MigratorContext {
9f95a23c 715 CDir *dir; // dir i'm exporting
7c673cae
FG
716 uint64_t tid;
717public:
718 C_MDC_ExportFreeze(Migrator *m, CDir *e, uint64_t t) :
9f95a23c
TL
719 MigratorContext(m), dir(e), tid(t) {
720 dir->get(CDir::PIN_PTRWAITER);
721 }
7c673cae
FG
722 void finish(int r) override {
723 if (r >= 0)
9f95a23c
TL
724 mig->export_frozen(dir, tid);
725 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
726 }
727};
728
729
9f95a23c 730bool Migrator::export_try_grab_locks(CDir *dir, MutationRef& mut)
7c673cae 731{
9f95a23c 732 CInode *diri = dir->get_inode();
11fdf7f2 733
9f95a23c
TL
734 if (!diri->filelock.can_wrlock(diri->get_loner()) ||
735 !diri->nestlock.can_wrlock(diri->get_loner()))
736 return false;
11fdf7f2 737
9f95a23c 738 MutationImpl::LockOpVec lov;
11fdf7f2 739
9f95a23c
TL
740 set<CDir*> wouldbe_bounds;
741 set<CInode*> bound_inodes;
f67539c2 742 mdcache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
9f95a23c
TL
743 for (auto& bound : wouldbe_bounds)
744 bound_inodes.insert(bound->get_inode());
745 for (auto& in : bound_inodes)
746 lov.add_rdlock(&in->dirfragtreelock);
747
748 lov.add_rdlock(&diri->dirfragtreelock);
749
750 CInode* in = diri;
751 while (true) {
752 lov.add_rdlock(&in->snaplock);
753 CDentry* pdn = in->get_projected_parent_dn();
754 if (!pdn)
755 break;
756 in = pdn->get_dir()->get_inode();
757 }
7c673cae 758
9f95a23c
TL
759 if (!mds->locker->rdlock_try_set(lov, mut))
760 return false;
7c673cae 761
9f95a23c
TL
762 mds->locker->wrlock_force(&diri->filelock, mut);
763 mds->locker->wrlock_force(&diri->nestlock, mut);
11fdf7f2 764
9f95a23c 765 return true;
7c673cae
FG
766}
767
768
7c673cae
FG
769/** export_dir(dir, dest)
770 * public method to initiate an export.
771 * will fail if the directory is freezing, frozen, unpinnable, or root.
772 */
773void Migrator::export_dir(CDir *dir, mds_rank_t dest)
774{
11fdf7f2
TL
775 ceph_assert(dir->is_auth());
776 ceph_assert(dest != mds->get_nodeid());
7c673cae 777
9f95a23c 778 CDir* parent = dir->inode->get_projected_parent_dir();
f67539c2 779 if (!mds->is_stopping() && !dir->is_exportable(dest) && dir->get_num_head_items() > 0) {
9f95a23c 780 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": dir is export pinned" << dendl;
81eedcae 781 return;
9f95a23c
TL
782 } else if (!(mds->is_active() || mds->is_stopping())) {
783 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": not active" << dendl;
181888fb 784 return;
f67539c2 785 } else if (mdcache->is_readonly()) {
9f95a23c 786 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": read-only FS, no exports for now" << dendl;
7c673cae 787 return;
9f95a23c
TL
788 } else if (!mds->mdsmap->is_active(dest)) {
789 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": destination not active" << dendl;
31f18b77 790 return;
9f95a23c
TL
791 } else if (mds->is_cluster_degraded()) {
792 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": cluster degraded" << dendl;
7c673cae 793 return;
9f95a23c
TL
794 } else if (dir->inode->is_system()) {
795 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is a system directory" << dendl;
7c673cae 796 return;
9f95a23c
TL
797 } else if (dir->is_frozen() || dir->is_freezing()) {
798 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": is frozen" << dendl;
7c673cae 799 return;
9f95a23c
TL
800 } else if (dir->state_test(CDir::STATE_EXPORTING)) {
801 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": already exporting" << dendl;
802 return;
803 } else if (parent && parent->inode->is_stray()
804 && parent->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
805 dout(7) << "Cannot export to mds." << dest << " " << *dir << ": in stray directory" << dendl;
7c673cae
FG
806 return;
807 }
808
9f95a23c 809 if (unlikely(g_conf()->mds_thrash_exports)) {
7c673cae 810 // create random subtree bound (which will not be exported)
9f95a23c 811 std::vector<CDir*> ls;
7c673cae
FG
812 for (auto p = dir->begin(); p != dir->end(); ++p) {
813 auto dn = p->second;
814 CDentry::linkage_t *dnl= dn->get_linkage();
815 if (dnl->is_primary()) {
816 CInode *in = dnl->get_inode();
9f95a23c
TL
817 if (in->is_dir()) {
818 auto&& dirs = in->get_nested_dirfrags();
819 ls.insert(std::end(ls), std::begin(dirs), std::end(dirs));
820 }
7c673cae
FG
821 }
822 }
823 if (ls.size() > 0) {
824 int n = rand() % ls.size();
825 auto p = ls.begin();
826 while (n--) ++p;
827 CDir *bd = *p;
828 if (!(bd->is_frozen() || bd->is_freezing())) {
11fdf7f2 829 ceph_assert(bd->is_auth());
7c673cae 830 dir->state_set(CDir::STATE_AUXSUBTREE);
f67539c2 831 mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
9f95a23c 832 dout(7) << "create aux subtree " << *bd << " under " << *dir << dendl;
7c673cae
FG
833 }
834 }
835 }
836
9f95a23c
TL
837 dout(4) << "Starting export to mds." << dest << " " << *dir << dendl;
838
11fdf7f2 839 mds->hit_export_target(dest, -1);
7c673cae
FG
840
841 dir->auth_pin(this);
1adf2230 842 dir->mark_exporting();
7c673cae 843
f67539c2 844 MDRequestRef mdr = mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
7c673cae 845 mdr->more()->export_dir = dir;
9f95a23c 846 mdr->pin(dir);
7c673cae 847
11fdf7f2 848 ceph_assert(export_state.count(dir) == 0);
7c673cae 849 export_state_t& stat = export_state[dir];
91327a77 850 num_locking_exports++;
7c673cae
FG
851 stat.state = EXPORT_LOCKING;
852 stat.peer = dest;
853 stat.tid = mdr->reqid.tid;
854 stat.mut = mdr;
855
f67539c2 856 mdcache->dispatch_request(mdr);
7c673cae
FG
857}
858
91327a77
AA
859/*
860 * check if directory is too large to be export in whole. If it is,
861 * choose some subdirs, whose total size is suitable.
862 */
863void Migrator::maybe_split_export(CDir* dir, uint64_t max_size, bool null_okay,
864 vector<pair<CDir*, size_t> >& results)
7c673cae 865{
91327a77
AA
866 static const unsigned frag_size = 800;
867 static const unsigned inode_size = 1000;
868 static const unsigned cap_size = 80;
869 static const unsigned remote_size = 10;
870 static const unsigned null_size = 1;
871
872 // state for depth-first search
873 struct LevelData {
874 CDir *dir;
875 CDir::dentry_key_map::iterator iter;
876 size_t dirfrag_size = frag_size;
877 size_t subdirs_size = 0;
878 bool complete = true;
879 vector<CDir*> siblings;
880 vector<pair<CDir*, size_t> > subdirs;
881 LevelData(const LevelData&) = default;
882 LevelData(CDir *d) :
883 dir(d), iter(d->begin()) {}
884 };
885
886 vector<LevelData> stack;
887 stack.emplace_back(dir);
888
889 size_t found_size = 0;
890 size_t skipped_size = 0;
891
892 for (;;) {
893 auto& data = stack.back();
894 CDir *cur = data.dir;
895 auto& it = data.iter;
896 auto& dirfrag_size = data.dirfrag_size;
897
898 while(it != cur->end()) {
899 CDentry *dn = it->second;
900 ++it;
901
902 dirfrag_size += dn->name.size();
903 if (dn->get_linkage()->is_null()) {
904 dirfrag_size += null_size;
905 continue;
906 }
907 if (dn->get_linkage()->is_remote()) {
908 dirfrag_size += remote_size;
909 continue;
910 }
911
912 CInode *in = dn->get_linkage()->get_inode();
913 dirfrag_size += inode_size;
914 dirfrag_size += in->get_client_caps().size() * cap_size;
915
916 if (in->is_dir()) {
9f95a23c 917 auto ls = in->get_nested_dirfrags();
91327a77
AA
918 std::reverse(ls.begin(), ls.end());
919
920 bool complete = true;
921 for (auto p = ls.begin(); p != ls.end(); ) {
922 if ((*p)->state_test(CDir::STATE_EXPORTING) ||
923 (*p)->is_freezing_dir() || (*p)->is_frozen_dir()) {
924 complete = false;
925 p = ls.erase(p);
926 } else {
927 ++p;
928 }
929 }
930 if (!complete) {
931 // skip exporting dir's ancestors. because they can't get
932 // frozen (exporting dir's parent inode is auth pinned).
933 for (auto p = stack.rbegin(); p < stack.rend(); ++p) {
934 if (!p->complete)
935 break;
936 p->complete = false;
937 }
938 }
939 if (!ls.empty()) {
940 stack.emplace_back(ls.back());
941 ls.pop_back();
942 stack.back().siblings.swap(ls);
943 break;
944 }
945 }
946 }
947 // did above loop push new dirfrag into the stack?
948 if (stack.back().dir != cur)
949 continue;
950
951 if (data.complete) {
952 auto cur_size = data.subdirs_size + dirfrag_size;
953 // we can do nothing with large dirfrag
954 if (cur_size >= max_size && found_size * 2 > max_size)
955 break;
7c673cae 956
91327a77
AA
957 found_size += dirfrag_size;
958
959 if (stack.size() > 1) {
960 auto& parent = stack[stack.size() - 2];
961 parent.subdirs.emplace_back(cur, cur_size);
962 parent.subdirs_size += cur_size;
963 }
964 } else {
965 // can't merge current dirfrag to its parent if there is skipped subdir
966 results.insert(results.end(), data.subdirs.begin(), data.subdirs.end());
967 skipped_size += dirfrag_size;
968 }
969
970 vector<CDir*> ls;
971 ls.swap(data.siblings);
972
973 stack.pop_back();
974 if (stack.empty())
975 break;
976
977 if (found_size >= max_size)
978 break;
979
980 // next dirfrag
981 if (!ls.empty()) {
982 stack.emplace_back(ls.back());
983 ls.pop_back();
984 stack.back().siblings.swap(ls);
985 }
986 }
987
988 for (auto& p : stack)
989 results.insert(results.end(), p.subdirs.begin(), p.subdirs.end());
990
991 if (results.empty() && (!skipped_size || !null_okay))
992 results.emplace_back(dir, found_size + skipped_size);
993}
994
995class C_M_ExportDirWait : public MigratorContext {
996 MDRequestRef mdr;
997 int count;
998public:
999 C_M_ExportDirWait(Migrator *m, MDRequestRef mdr, int count)
1000 : MigratorContext(m), mdr(mdr), count(count) {}
1001 void finish(int r) override {
1002 mig->dispatch_export_dir(mdr, count);
1003 }
1004};
1005
1006void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
1007{
7c673cae 1008 CDir *dir = mdr->more()->export_dir;
9f95a23c 1009 dout(7) << *mdr << " " << *dir << dendl;
91327a77 1010
7c673cae
FG
1011 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1012 if (it == export_state.end() || it->second.tid != mdr->reqid.tid) {
1013 // export must have aborted.
1014 dout(7) << "export must have aborted " << *mdr << dendl;
11fdf7f2 1015 ceph_assert(mdr->killed || mdr->aborted);
91327a77
AA
1016 if (mdr->aborted) {
1017 mdr->aborted = false;
f67539c2 1018 mdcache->request_kill(mdr);
91327a77 1019 }
7c673cae
FG
1020 return;
1021 }
11fdf7f2 1022 ceph_assert(it->second.state == EXPORT_LOCKING);
7c673cae 1023
f67539c2 1024 if (mdr->more()->peer_error || dir->is_frozen() || dir->is_freezing()) {
9f95a23c
TL
1025 dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
1026 export_try_cancel(dir);
1027 return;
1028 }
7c673cae 1029
9f95a23c 1030 mds_rank_t dest = it->second.peer;
7c673cae
FG
1031 if (!mds->is_export_target(dest)) {
1032 dout(7) << "dest is not yet an export target" << dendl;
1033 if (count > 3) {
9f95a23c 1034 dout(7) << "dest has not been added as export target after three MDSMap epochs, canceling export" << dendl;
7c673cae
FG
1035 export_try_cancel(dir);
1036 return;
1037 }
224ce89b
WB
1038
1039 mds->locker->drop_locks(mdr.get());
1040 mdr->drop_local_auth_pins();
1041
31f18b77 1042 mds->wait_for_mdsmap(mds->mdsmap->get_epoch(), new C_M_ExportDirWait(this, mdr, count+1));
7c673cae
FG
1043 return;
1044 }
1045
1046 if (!dir->inode->get_parent_dn()) {
1047 dout(7) << "waiting for dir to become stable before export: " << *dir << dendl;
31f18b77 1048 dir->add_waiter(CDir::WAIT_CREATED, new C_M_ExportDirWait(this, mdr, 1));
7c673cae
FG
1049 return;
1050 }
1051
7c673cae 1052 // locks?
9f95a23c
TL
1053 if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
1054 MutationImpl::LockOpVec lov;
1055 // If auth MDS of the subtree root inode is neither the exporter MDS
1056 // nor the importer MDS and it gathers subtree root's fragstat/neststat
1057 // while the subtree is exporting. It's possible that the exporter MDS
1058 // and the importer MDS both are auth MDS of the subtree root or both
1059 // are not auth MDS of the subtree root at the time they receive the
1060 // lock messages. So the auth MDS of the subtree root inode may get no
1061 // or duplicated fragstat/neststat for the subtree root dirfrag.
1062 lov.lock_scatter_gather(&dir->get_inode()->filelock);
1063 lov.lock_scatter_gather(&dir->get_inode()->nestlock);
1064 if (dir->get_inode()->is_auth()) {
1065 dir->get_inode()->filelock.set_scatter_wanted();
1066 dir->get_inode()->nestlock.set_scatter_wanted();
1067 }
1068 lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
1069
1070 if (!mds->locker->acquire_locks(mdr, lov, nullptr, true)) {
1071 if (mdr->aborted)
1072 export_try_cancel(dir);
1073 return;
1074 }
1075
1076 lov.clear();
1077 // bound dftlocks:
1078 // NOTE: We need to take an rdlock on bounding dirfrags during
1079 // migration for a rather irritating reason: when we export the
1080 // bound inode, we need to send scatterlock state for the dirfrags
1081 // as well, so that the new auth also gets the correct info. If we
1082 // race with a refragment, this info is useless, as we can't
1083 // redivvy it up. And it's needed for the scatterlocks to work
1084 // properly: when the auth is in a sync/lock state it keeps each
1085 // dirfrag's portion in the local (auth OR replica) dirfrag.
1086 set<CDir*> wouldbe_bounds;
1087 set<CInode*> bound_inodes;
f67539c2 1088 mdcache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
9f95a23c
TL
1089 for (auto& bound : wouldbe_bounds)
1090 bound_inodes.insert(bound->get_inode());
1091 for (auto& in : bound_inodes)
1092 lov.add_rdlock(&in->dirfragtreelock);
1093
1094 if (!mds->locker->rdlock_try_set(lov, mdr))
1095 return;
1096
1097 if (!mds->locker->try_rdlock_snap_layout(dir->get_inode(), mdr))
1098 return;
1099
1100 mdr->locking_state |= MutationImpl::ALL_LOCKED;
7c673cae
FG
1101 }
1102
11fdf7f2 1103 ceph_assert(g_conf()->mds_kill_export_at != 1);
7c673cae 1104
91327a77
AA
1105 auto parent = it->second.parent;
1106
1107 vector<pair<CDir*, size_t> > results;
1108 maybe_split_export(dir, max_export_size, (bool)parent, results);
1109
1110 if (results.size() == 1 && results.front().first == dir) {
1111 num_locking_exports--;
1112 it->second.state = EXPORT_DISCOVERING;
1113 // send ExportDirDiscover (ask target)
1114 filepath path;
1115 dir->inode->make_path(path);
9f95a23c
TL
1116 auto discover = make_message<MExportDirDiscover>(dir->dirfrag(), path,
1117 mds->get_nodeid(),
1118 it->second.tid);
91327a77 1119 mds->send_message_mds(discover, dest);
11fdf7f2 1120 ceph_assert(g_conf()->mds_kill_export_at != 2);
91327a77
AA
1121
1122 it->second.last_cum_auth_pins_change = ceph_clock_now();
1123 it->second.approx_size = results.front().second;
91327a77
AA
1124 total_exporting_size += it->second.approx_size;
1125
1126 // start the freeze, but hold it up with an auth_pin.
1127 dir->freeze_tree();
11fdf7f2 1128 ceph_assert(dir->is_freezing_tree());
91327a77
AA
1129 dir->add_waiter(CDir::WAIT_FROZEN, new C_MDC_ExportFreeze(this, dir, it->second.tid));
1130 return;
1131 }
1132
1133 if (parent) {
1134 parent->pending_children += results.size();
1135 } else {
1136 parent = std::make_shared<export_base_t>(dir->dirfrag(), dest,
1137 results.size(), export_queue_gen);
1138 }
1139
1140 if (results.empty()) {
1141 dout(7) << "subtree's children all are under exporting, retry rest parts of parent export "
1142 << parent->dirfrag << dendl;
1143 parent->restart = true;
1144 } else {
1145 dout(7) << "subtree is too large, splitting it into: " << dendl;
1146 }
1147
1148 for (auto& p : results) {
1149 CDir *sub = p.first;
11fdf7f2 1150 ceph_assert(sub != dir);
91327a77
AA
1151 dout(7) << " sub " << *sub << dendl;
1152
1153 sub->auth_pin(this);
1154 sub->mark_exporting();
1155
f67539c2 1156 MDRequestRef _mdr = mdcache->request_start_internal(CEPH_MDS_OP_EXPORTDIR);
91327a77 1157 _mdr->more()->export_dir = sub;
9f95a23c 1158 _mdr->pin(sub);
91327a77 1159
11fdf7f2 1160 ceph_assert(export_state.count(sub) == 0);
91327a77
AA
1161 auto& stat = export_state[sub];
1162 num_locking_exports++;
1163 stat.state = EXPORT_LOCKING;
1164 stat.peer = dest;
1165 stat.tid = _mdr->reqid.tid;
1166 stat.mut = _mdr;
1167 stat.parent = parent;
f67539c2 1168 mdcache->dispatch_request(_mdr);
91327a77
AA
1169 }
1170
1171 // cancel the original one
1172 export_try_cancel(dir);
1173}
1174
91327a77
AA
1175void Migrator::child_export_finish(std::shared_ptr<export_base_t>& parent, bool success)
1176{
1177 if (success)
1178 parent->restart = true;
1179 if (--parent->pending_children == 0) {
1180 if (parent->restart &&
1181 parent->export_queue_gen == export_queue_gen) {
f67539c2 1182 CDir *origin = mdcache->get_dirfrag(parent->dirfrag);
91327a77
AA
1183 if (origin && origin->is_auth()) {
1184 dout(7) << "child_export_finish requeue " << *origin << dendl;
1185 export_queue.emplace_front(origin->dirfrag(), parent->dest);
1186 }
1187 }
1188 }
7c673cae
FG
1189}
1190
1191/*
1192 * called on receipt of MExportDirDiscoverAck
1193 * the importer now has the directory's _inode_ in memory, and pinned.
7c673cae 1194 */
9f95a23c 1195void Migrator::handle_export_discover_ack(const cref_t<MExportDirDiscoverAck> &m)
7c673cae 1196{
f67539c2 1197 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae 1198 mds_rank_t dest(m->get_source().num());
11fdf7f2 1199 ceph_assert(dir);
7c673cae 1200
9f95a23c 1201 dout(7) << "from " << m->get_source()
7c673cae
FG
1202 << " on " << *dir << dendl;
1203
11fdf7f2 1204 mds->hit_export_target(dest, -1);
7c673cae
FG
1205
1206 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1207 if (it == export_state.end() ||
1208 it->second.tid != m->get_tid() ||
1209 it->second.peer != dest) {
1210 dout(7) << "must have aborted" << dendl;
1211 } else {
11fdf7f2 1212 ceph_assert(it->second.state == EXPORT_DISCOVERING);
c07f9fc5
FG
1213
1214 if (m->is_success()) {
1215 // release locks to avoid deadlock
1216 MDRequestRef mdr = static_cast<MDRequestImpl*>(it->second.mut.get());
11fdf7f2 1217 ceph_assert(mdr);
f67539c2 1218 mdcache->request_finish(mdr);
c07f9fc5
FG
1219 it->second.mut.reset();
1220 // freeze the subtree
1221 it->second.state = EXPORT_FREEZING;
1222 dir->auth_unpin(this);
11fdf7f2 1223 ceph_assert(g_conf()->mds_kill_export_at != 3);
c07f9fc5
FG
1224
1225 } else {
1226 dout(7) << "peer failed to discover (not active?), canceling" << dendl;
1227 export_try_cancel(dir, false);
1228 }
7c673cae 1229 }
7c673cae
FG
1230}
1231
1232class C_M_ExportSessionsFlushed : public MigratorContext {
1233 CDir *dir;
1234 uint64_t tid;
1235public:
9f95a23c
TL
1236 C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t) :
1237 MigratorContext(m), dir(d), tid(t) {
1238 dir->get(CDir::PIN_PTRWAITER);
7c673cae
FG
1239 }
1240 void finish(int r) override {
1241 mig->export_sessions_flushed(dir, tid);
9f95a23c 1242 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
1243 }
1244};
1245
1246void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
1247{
9f95a23c 1248 dout(7) << *dir << dendl;
7c673cae
FG
1249
1250 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1251 if (it == export_state.end() ||
1252 it->second.state == EXPORT_CANCELLING ||
1253 it->second.tid != tid) {
1254 // export must have aborted.
1255 dout(7) << "export must have aborted on " << dir << dendl;
1256 return;
1257 }
1258
11fdf7f2
TL
1259 ceph_assert(it->second.state == EXPORT_PREPPING || it->second.state == EXPORT_WARNING);
1260 ceph_assert(it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0);
7c673cae
FG
1261 it->second.warning_ack_waiting.erase(MDS_RANK_NONE);
1262 if (it->second.state == EXPORT_WARNING && it->second.warning_ack_waiting.empty())
1263 export_go(dir); // start export.
1264}
1265
9f95a23c
TL
1266void Migrator::encode_export_prep_trace(bufferlist &final_bl, CDir *bound,
1267 CDir *dir, export_state_t &es,
1268 set<inodeno_t> &inodes_added,
1269 set<dirfrag_t> &dirfrags_added)
1270{
1271 ENCODE_START(1, 1, final_bl);
1272
1273 dout(7) << " started to encode dir " << *bound << dendl;
1274 CDir *cur = bound;
1275 bufferlist tracebl;
1276 char start = '-';
1277
1278 while (1) {
1279 // don't repeat inodes
1280 if (inodes_added.count(cur->inode->ino()))
1281 break;
1282 inodes_added.insert(cur->inode->ino());
1283
1284 // prepend dentry + inode
1285 ceph_assert(cur->inode->is_auth());
1286 bufferlist bl;
f67539c2 1287 mdcache->encode_replica_dentry(cur->inode->parent, es.peer, bl);
9f95a23c 1288 dout(7) << " added " << *cur->inode->parent << dendl;
f67539c2 1289 mdcache->encode_replica_inode(cur->inode, es.peer, bl, mds->mdsmap->get_up_features());
9f95a23c
TL
1290 dout(7) << " added " << *cur->inode << dendl;
1291 bl.claim_append(tracebl);
f67539c2 1292 tracebl = std::move(bl);
9f95a23c
TL
1293
1294 cur = cur->get_parent_dir();
1295 // don't repeat dirfrags
1296 if (dirfrags_added.count(cur->dirfrag()) || cur == dir) {
1297 start = 'd'; // start with dentry
1298 break;
1299 }
1300 dirfrags_added.insert(cur->dirfrag());
1301
1302 // prepend dir
f67539c2 1303 mdcache->encode_replica_dir(cur, es.peer, bl);
9f95a23c
TL
1304 dout(7) << " added " << *cur << dendl;
1305 bl.claim_append(tracebl);
f67539c2 1306 tracebl = std::move(bl);
9f95a23c
TL
1307 start = 'f'; // start with dirfrag
1308 }
1309 dirfrag_t df = cur->dirfrag();
1310 encode(df, final_bl);
1311 encode(start, final_bl);
1312 final_bl.claim_append(tracebl);
1313
1314 ENCODE_FINISH(final_bl);
1315}
1316
7c673cae
FG
1317void Migrator::export_frozen(CDir *dir, uint64_t tid)
1318{
9f95a23c 1319 dout(7) << *dir << dendl;
7c673cae
FG
1320
1321 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1322 if (it == export_state.end() || it->second.tid != tid) {
1323 dout(7) << "export must have aborted" << dendl;
1324 return;
1325 }
1326
11fdf7f2
TL
1327 ceph_assert(it->second.state == EXPORT_FREEZING);
1328 ceph_assert(dir->is_frozen_tree_root());
7c673cae 1329
9f95a23c 1330 it->second.mut = new MutationImpl();
7c673cae
FG
1331
1332 // ok, try to grab all my locks.
9f95a23c 1333 CInode *diri = dir->get_inode();
7c673cae 1334 if ((diri->is_auth() && diri->is_frozen()) ||
9f95a23c 1335 !export_try_grab_locks(dir, it->second.mut)) {
7c673cae
FG
1336 dout(7) << "export_dir couldn't acquire all needed locks, failing. "
1337 << *dir << dendl;
91327a77 1338 export_try_cancel(dir);
7c673cae
FG
1339 return;
1340 }
1341
7c673cae
FG
1342 if (diri->is_auth())
1343 it->second.mut->auth_pin(diri);
7c673cae 1344
f67539c2 1345 mdcache->show_subtrees();
7c673cae 1346
224ce89b 1347 // CDir::_freeze_tree() should have forced it into subtree.
11fdf7f2 1348 ceph_assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
7c673cae 1349 // note the bounds.
7c673cae 1350 set<CDir*> bounds;
f67539c2 1351 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
1352
1353 // generate prep message, log entry.
9f95a23c 1354 auto prep = make_message<MExportDirPrep>(dir->dirfrag(), it->second.tid);
7c673cae
FG
1355
1356 // include list of bystanders
181888fb
FG
1357 for (const auto &p : dir->get_replicas()) {
1358 if (p.first != it->second.peer) {
1359 dout(10) << "bystander mds." << p.first << dendl;
1360 prep->add_bystander(p.first);
7c673cae
FG
1361 }
1362 }
1363
1364 // include base dirfrag
f67539c2 1365 mdcache->encode_replica_dir(dir, it->second.peer, prep->basedir);
7c673cae
FG
1366
1367 /*
1368 * include spanning tree for all nested exports.
1369 * these need to be on the destination _before_ the final export so that
1370 * dir_auth updates on any nested exports are properly absorbed.
1371 * this includes inodes and dirfrags included in the subtree, but
1372 * only the inodes at the bounds.
1373 *
1374 * each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
1375 */
1376 set<inodeno_t> inodes_added;
1377 set<dirfrag_t> dirfrags_added;
1378
1379 // check bounds
9f95a23c 1380 for (auto &bound : bounds){
7c673cae 1381 // pin it.
91327a77
AA
1382 bound->get(CDir::PIN_EXPORTBOUND);
1383 bound->state_set(CDir::STATE_EXPORTBOUND);
9f95a23c 1384
7c673cae
FG
1385 dout(7) << " export bound " << *bound << dendl;
1386 prep->add_bound( bound->dirfrag() );
9f95a23c 1387
7c673cae 1388 bufferlist final_bl;
9f95a23c 1389 encode_export_prep_trace(final_bl, bound, dir, it->second, inodes_added, dirfrags_added);
7c673cae
FG
1390 prep->add_trace(final_bl);
1391 }
1392
1393 // send.
1394 it->second.state = EXPORT_PREPPING;
1395 mds->send_message_mds(prep, it->second.peer);
20effc67 1396 ceph_assert(g_conf()->mds_kill_export_at != 4);
7c673cae
FG
1397
1398 // make sure any new instantiations of caps are flushed out
11fdf7f2 1399 ceph_assert(it->second.warning_ack_waiting.empty());
7c673cae 1400
91327a77
AA
1401 set<client_t> export_client_set;
1402 get_export_client_set(dir, export_client_set);
1403
7c673cae
FG
1404 MDSGatherBuilder gather(g_ceph_context);
1405 mds->server->flush_client_sessions(export_client_set, gather);
1406 if (gather.has_subs()) {
1407 it->second.warning_ack_waiting.insert(MDS_RANK_NONE);
1408 gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
1409 gather.activate();
1410 }
1411}
1412
91327a77 1413void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
7c673cae 1414{
91327a77 1415 deque<CDir*> dfs;
7c673cae
FG
1416 dfs.push_back(dir);
1417 while (!dfs.empty()) {
1418 CDir *dir = dfs.front();
1419 dfs.pop_front();
91327a77 1420 for (auto& p : *dir) {
94b18763 1421 CDentry *dn = p.second;
91327a77 1422 if (!dn->get_linkage()->is_primary())
7c673cae
FG
1423 continue;
1424 CInode *in = dn->get_linkage()->get_inode();
1425 if (in->is_dir()) {
1426 // directory?
9f95a23c 1427 auto&& ls = in->get_dirfrags();
91327a77
AA
1428 for (auto& q : ls) {
1429 if (!q->state_test(CDir::STATE_EXPORTBOUND)) {
7c673cae 1430 // include nested dirfrag
11fdf7f2 1431 ceph_assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
91327a77 1432 dfs.push_back(q); // it's ours, recurse (later)
7c673cae
FG
1433 }
1434 }
1435 }
91327a77
AA
1436 for (auto& q : in->get_client_caps()) {
1437 client_set.insert(q.first);
b32b8144 1438 }
7c673cae
FG
1439 }
1440 }
1441}
1442
1443void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
1444{
11fdf7f2
TL
1445 for (const auto &p : in->get_client_caps()) {
1446 client_set.insert(p.first);
1447 }
7c673cae
FG
1448}
1449
9f95a23c 1450void Migrator::handle_export_prep_ack(const cref_t<MExportDirPrepAck> &m)
7c673cae 1451{
f67539c2 1452 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae 1453 mds_rank_t dest(m->get_source().num());
11fdf7f2 1454 ceph_assert(dir);
7c673cae 1455
9f95a23c 1456 dout(7) << *dir << dendl;
7c673cae 1457
11fdf7f2 1458 mds->hit_export_target(dest, -1);
7c673cae
FG
1459
1460 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1461 if (it == export_state.end() ||
1462 it->second.tid != m->get_tid() ||
1463 it->second.peer != mds_rank_t(m->get_source().num())) {
1464 // export must have aborted.
1465 dout(7) << "export must have aborted" << dendl;
7c673cae
FG
1466 return;
1467 }
11fdf7f2 1468 ceph_assert(it->second.state == EXPORT_PREPPING);
7c673cae
FG
1469
1470 if (!m->is_success()) {
c07f9fc5 1471 dout(7) << "peer couldn't acquire all needed locks or wasn't active, canceling" << dendl;
7c673cae 1472 export_try_cancel(dir, false);
7c673cae
FG
1473 return;
1474 }
1475
20effc67 1476 ceph_assert(g_conf()->mds_kill_export_at != 5);
7c673cae
FG
1477 // send warnings
1478 set<CDir*> bounds;
f67539c2 1479 mdcache->get_subtree_bounds(dir, bounds);
7c673cae 1480
11fdf7f2 1481 ceph_assert(it->second.warning_ack_waiting.empty() ||
7c673cae
FG
1482 (it->second.warning_ack_waiting.size() == 1 &&
1483 it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0));
11fdf7f2 1484 ceph_assert(it->second.notify_ack_waiting.empty());
7c673cae 1485
181888fb
FG
1486 for (const auto &p : dir->get_replicas()) {
1487 if (p.first == it->second.peer) continue;
7c673cae 1488 if (mds->is_cluster_degraded() &&
181888fb 1489 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first))
7c673cae 1490 continue; // only if active
181888fb
FG
1491 it->second.warning_ack_waiting.insert(p.first);
1492 it->second.notify_ack_waiting.insert(p.first); // we'll eventually get a notifyack, too!
7c673cae 1493
9f95a23c 1494 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), it->second.tid, true,
11fdf7f2
TL
1495 mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
1496 mds_authority_t(mds->get_nodeid(),it->second.peer));
1497 for (auto &cdir : bounds) {
1498 notify->get_bounds().push_back(cdir->dirfrag());
1499 }
181888fb 1500 mds->send_message_mds(notify, p.first);
7c673cae
FG
1501
1502 }
1503
1504 it->second.state = EXPORT_WARNING;
1505
11fdf7f2 1506 ceph_assert(g_conf()->mds_kill_export_at != 6);
7c673cae
FG
1507 // nobody to warn?
1508 if (it->second.warning_ack_waiting.empty())
1509 export_go(dir); // start export.
7c673cae
FG
1510}
1511
1512
1513class C_M_ExportGo : public MigratorContext {
1514 CDir *dir;
1515 uint64_t tid;
1516public:
1517 C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
1518 MigratorContext(m), dir(d), tid(t) {
9f95a23c
TL
1519 dir->get(CDir::PIN_PTRWAITER);
1520 }
7c673cae
FG
1521 void finish(int r) override {
1522 mig->export_go_synced(dir, tid);
9f95a23c 1523 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
1524 }
1525};
1526
1527void Migrator::export_go(CDir *dir)
1528{
b32b8144 1529 auto it = export_state.find(dir);
11fdf7f2 1530 ceph_assert(it != export_state.end());
9f95a23c 1531 dout(7) << *dir << " to " << it->second.peer << dendl;
7c673cae
FG
1532
1533 // first sync log to flush out e.g. any cap imports
b32b8144 1534 mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, it->second.tid));
7c673cae
FG
1535 mds->mdlog->flush();
1536}
1537
1538void Migrator::export_go_synced(CDir *dir, uint64_t tid)
1539{
1540 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
1541 if (it == export_state.end() ||
1542 it->second.state == EXPORT_CANCELLING ||
1543 it->second.tid != tid) {
1544 // export must have aborted.
1545 dout(7) << "export must have aborted on " << dir << dendl;
1546 return;
1547 }
11fdf7f2 1548 ceph_assert(it->second.state == EXPORT_WARNING);
7c673cae
FG
1549 mds_rank_t dest = it->second.peer;
1550
9f95a23c 1551 dout(7) << *dir << " to " << dest << dendl;
7c673cae 1552
f67539c2 1553 mdcache->show_subtrees();
7c673cae
FG
1554
1555 it->second.state = EXPORT_EXPORTING;
11fdf7f2 1556 ceph_assert(g_conf()->mds_kill_export_at != 7);
7c673cae 1557
11fdf7f2 1558 ceph_assert(dir->is_frozen_tree_root());
7c673cae
FG
1559
1560 // set ambiguous auth
f67539c2 1561 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), dest);
7c673cae
FG
1562
1563 // take away the popularity we're sending.
11fdf7f2 1564 mds->balancer->subtract_export(dir);
7c673cae
FG
1565
1566 // fill export message with cache data
9f95a23c 1567 auto req = make_message<MExportDir>(dir->dirfrag(), it->second.tid);
7c673cae 1568 map<client_t,entity_inst_t> exported_client_map;
11fdf7f2 1569 map<client_t,client_metadata_t> exported_client_metadata_map;
9f95a23c
TL
1570 uint64_t num_exported_inodes = 0;
1571 encode_export_dir(req->export_data, dir, // recur start point
1572 exported_client_map, exported_client_metadata_map,
1573 num_exported_inodes);
11fdf7f2
TL
1574 encode(exported_client_map, req->client_map, mds->mdsmap->get_up_features());
1575 encode(exported_client_metadata_map, req->client_map);
7c673cae
FG
1576
1577 // add bounds to message
1578 set<CDir*> bounds;
f67539c2 1579 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
1580 for (set<CDir*>::iterator p = bounds.begin();
1581 p != bounds.end();
1582 ++p)
1583 req->add_export((*p)->dirfrag());
1584
1585 // send
1586 mds->send_message_mds(req, dest);
11fdf7f2 1587 ceph_assert(g_conf()->mds_kill_export_at != 8);
7c673cae 1588
11fdf7f2 1589 mds->hit_export_target(dest, num_exported_inodes+1);
7c673cae
FG
1590
1591 // stats
1592 if (mds->logger) mds->logger->inc(l_mds_exported);
1593 if (mds->logger) mds->logger->inc(l_mds_exported_inodes, num_exported_inodes);
1594
f67539c2 1595 mdcache->show_subtrees();
7c673cae
FG
1596}
1597
1598
1599/** encode_export_inode
1600 * update our local state for this inode to export.
1601 * encode relevant state to be sent over the wire.
1602 * used by: encode_export_dir, file_rename (if foreign)
1603 *
1604 * FIXME: the separation between CInode.encode_export and these methods
1605 * is pretty arbitrary and dumb.
1606 */
1607void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
11fdf7f2
TL
1608 map<client_t,entity_inst_t>& exported_client_map,
1609 map<client_t,client_metadata_t>& exported_client_metadata_map)
7c673cae 1610{
9f95a23c
TL
1611 ENCODE_START(1, 1, enc_state);
1612 dout(7) << *in << dendl;
11fdf7f2 1613 ceph_assert(!in->is_replica(mds->get_nodeid()));
7c673cae 1614
f67539c2 1615 encode(in->ino(), enc_state);
11fdf7f2 1616 encode(in->last, enc_state);
7c673cae
FG
1617 in->encode_export(enc_state);
1618
1619 // caps
11fdf7f2 1620 encode_export_inode_caps(in, true, enc_state, exported_client_map, exported_client_metadata_map);
9f95a23c 1621 ENCODE_FINISH(enc_state);
7c673cae
FG
1622}
1623
1624void Migrator::encode_export_inode_caps(CInode *in, bool auth_cap, bufferlist& bl,
11fdf7f2
TL
1625 map<client_t,entity_inst_t>& exported_client_map,
1626 map<client_t,client_metadata_t>& exported_client_metadata_map)
7c673cae 1627{
9f95a23c
TL
1628 ENCODE_START(1, 1, bl);
1629 dout(20) << *in << dendl;
7c673cae
FG
1630 // encode caps
1631 map<client_t,Capability::Export> cap_map;
1632 in->export_client_caps(cap_map);
11fdf7f2 1633 encode(cap_map, bl);
7c673cae 1634 if (auth_cap) {
11fdf7f2 1635 encode(in->get_mds_caps_wanted(), bl);
7c673cae
FG
1636
1637 in->state_set(CInode::STATE_EXPORTINGCAPS);
1638 in->get(CInode::PIN_EXPORTINGCAPS);
1639 }
1640
1641 // make note of clients named by exported capabilities
11fdf7f2
TL
1642 for (const auto &p : in->get_client_caps()) {
1643 if (exported_client_map.count(p.first))
1644 continue;
1645 Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p.first.v));
1646 exported_client_map[p.first] = session->info.inst;
1647 exported_client_metadata_map[p.first] = session->info.client_metadata;
1648 }
9f95a23c 1649 ENCODE_FINISH(bl);
7c673cae
FG
1650}
1651
1652void Migrator::finish_export_inode_caps(CInode *in, mds_rank_t peer,
1653 map<client_t,Capability::Import>& peer_imported)
1654{
9f95a23c 1655 dout(20) << *in << dendl;
7c673cae
FG
1656
1657 in->state_clear(CInode::STATE_EXPORTINGCAPS);
1658 in->put(CInode::PIN_EXPORTINGCAPS);
1659
1660 // tell (all) clients about migrating caps..
11fdf7f2
TL
1661 for (const auto &p : in->get_client_caps()) {
1662 const Capability *cap = &p.second;
9f95a23c 1663 dout(7) << p.first
7c673cae 1664 << " exported caps on " << *in << dendl;
9f95a23c
TL
1665 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
1666 cap->get_cap_id(), cap->get_mseq(),
1667 mds->get_osd_epoch_barrier());
11fdf7f2
TL
1668 map<client_t,Capability::Import>::iterator q = peer_imported.find(p.first);
1669 ceph_assert(q != peer_imported.end());
28e407b8
AA
1670 m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
1671 (q->second.cap_id > 0 ? peer : -1), 0);
11fdf7f2 1672 mds->send_message_client_counted(m, p.first);
7c673cae
FG
1673 }
1674 in->clear_client_caps_after_export();
1675 mds->locker->eval(in, CEPH_CAP_LOCKS);
1676}
1677
11fdf7f2 1678void Migrator::finish_export_inode(CInode *in, mds_rank_t peer,
7c673cae 1679 map<client_t,Capability::Import>& peer_imported,
11fdf7f2 1680 MDSContext::vec& finished)
7c673cae 1681{
9f95a23c 1682 dout(12) << *in << dendl;
7c673cae
FG
1683
1684 // clean
1685 if (in->is_dirty())
1686 in->mark_clean();
1687
1688 // clear/unpin cached_by (we're no longer the authority)
1689 in->clear_replica_map();
1690
1691 // twiddle lock states for auth -> replica transition
1692 in->authlock.export_twiddle();
1693 in->linklock.export_twiddle();
1694 in->dirfragtreelock.export_twiddle();
1695 in->filelock.export_twiddle();
1696 in->nestlock.export_twiddle();
1697 in->xattrlock.export_twiddle();
1698 in->snaplock.export_twiddle();
1699 in->flocklock.export_twiddle();
1700 in->policylock.export_twiddle();
1701
1702 // mark auth
11fdf7f2 1703 ceph_assert(in->is_auth());
7c673cae
FG
1704 in->state_clear(CInode::STATE_AUTH);
1705 in->replica_nonce = CInode::EXPORT_NONCE;
1706
1707 in->clear_dirty_rstat();
1708
1709 // no more auth subtree? clear scatter dirty
1710 if (!in->has_subtree_root_dirfrag(mds->get_nodeid()))
1711 in->clear_scatter_dirty();
1712
7c673cae
FG
1713 in->clear_dirty_parent();
1714
f91f0fd5
TL
1715 in->clear_clientwriteable();
1716
7c673cae
FG
1717 in->clear_file_locks();
1718
1719 // waiters
1720 in->take_waiting(CInode::WAIT_ANY_MASK, finished);
1721
11fdf7f2 1722 in->finish_export();
7c673cae
FG
1723
1724 finish_export_inode_caps(in, peer, peer_imported);
7c673cae
FG
1725}
1726
9f95a23c 1727void Migrator::encode_export_dir(bufferlist& exportbl,
7c673cae
FG
1728 CDir *dir,
1729 map<client_t,entity_inst_t>& exported_client_map,
9f95a23c
TL
1730 map<client_t,client_metadata_t>& exported_client_metadata_map,
1731 uint64_t &num_exported)
7c673cae 1732{
9f95a23c
TL
1733 // This has to be declared before ENCODE_STARTED as it will need to be referenced after ENCODE_FINISH.
1734 std::vector<CDir*> subdirs;
1735
1736 ENCODE_START(1, 1, exportbl);
1737 dout(7) << *dir << " " << dir->get_num_head_items() << " head items" << dendl;
7c673cae 1738
11fdf7f2 1739 ceph_assert(dir->get_projected_version() == dir->get_version());
7c673cae
FG
1740
1741#ifdef MDS_VERIFY_FRAGSTAT
1742 if (dir->is_complete())
1743 dir->verify_fragstat();
1744#endif
1745
1746 // dir
1747 dirfrag_t df = dir->dirfrag();
11fdf7f2 1748 encode(df, exportbl);
7c673cae
FG
1749 dir->encode_export(exportbl);
1750
1751 __u32 nden = dir->items.size();
11fdf7f2 1752 encode(nden, exportbl);
7c673cae
FG
1753
1754 // dentries
94b18763
FG
1755 for (auto &p : *dir) {
1756 CDentry *dn = p.second;
7c673cae 1757 CInode *in = dn->get_linkage()->get_inode();
7c673cae
FG
1758
1759 num_exported++;
1760
1761 // -- dentry
9f95a23c 1762 dout(7) << " exporting " << *dn << dendl;
7c673cae
FG
1763
1764 // dn name
11fdf7f2
TL
1765 encode(dn->get_name(), exportbl);
1766 encode(dn->last, exportbl);
7c673cae
FG
1767
1768 // state
1769 dn->encode_export(exportbl);
1770
1771 // points to...
1772
1773 // null dentry?
1774 if (dn->get_linkage()->is_null()) {
1775 exportbl.append("N", 1); // null dentry
1776 continue;
1777 }
1778
1779 if (dn->get_linkage()->is_remote()) {
7c673cae
FG
1780 inodeno_t ino = dn->get_linkage()->get_remote_ino();
1781 unsigned char d_type = dn->get_linkage()->get_remote_d_type();
f67539c2
TL
1782 auto& alternate_name = dn->alternate_name;
1783 // remote link
1784 CDentry::encode_remote(ino, d_type, alternate_name, exportbl);
7c673cae
FG
1785 continue;
1786 }
1787
1788 // primary link
1789 // -- inode
f67539c2
TL
1790 exportbl.append("i", 1); // inode dentry
1791
1792 ENCODE_START(2, 1, exportbl);
11fdf7f2 1793 encode_export_inode(in, exportbl, exported_client_map, exported_client_metadata_map); // encode, and (update state for) export
f67539c2
TL
1794 encode(dn->alternate_name, exportbl);
1795 ENCODE_FINISH(exportbl);
9f95a23c 1796
7c673cae 1797 // directory?
9f95a23c
TL
1798 auto&& dfs = in->get_dirfrags();
1799 for (const auto& t : dfs) {
7c673cae
FG
1800 if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
1801 // include nested dirfrag
11fdf7f2 1802 ceph_assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
9f95a23c 1803 subdirs.push_back(t); // it's ours, recurse (later)
7c673cae
FG
1804 }
1805 }
1806 }
1807
9f95a23c 1808 ENCODE_FINISH(exportbl);
7c673cae 1809 // subdirs
9f95a23c
TL
1810 for (const auto &dir : subdirs) {
1811 encode_export_dir(exportbl, dir, exported_client_map, exported_client_metadata_map, num_exported);
1812 }
7c673cae
FG
1813}
1814
11fdf7f2 1815void Migrator::finish_export_dir(CDir *dir, mds_rank_t peer,
7c673cae 1816 map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
11fdf7f2 1817 MDSContext::vec& finished, int *num_dentries)
7c673cae 1818{
9f95a23c 1819 dout(10) << *dir << dendl;
7c673cae
FG
1820
1821 // release open_by
1822 dir->clear_replica_map();
1823
1824 // mark
11fdf7f2 1825 ceph_assert(dir->is_auth());
7c673cae
FG
1826 dir->state_clear(CDir::STATE_AUTH);
1827 dir->remove_bloom();
1828 dir->replica_nonce = CDir::EXPORT_NONCE;
1829
1830 if (dir->is_dirty())
1831 dir->mark_clean();
1832
1833 // suck up all waiters
1834 dir->take_waiting(CDir::WAIT_ANY_MASK, finished); // all dir waiters
1835
1836 // pop
11fdf7f2 1837 dir->finish_export();
7c673cae
FG
1838
1839 // dentries
9f95a23c 1840 std::vector<CDir*> subdirs;
94b18763
FG
1841 for (auto &p : *dir) {
1842 CDentry *dn = p.second;
7c673cae
FG
1843 CInode *in = dn->get_linkage()->get_inode();
1844
1845 // dentry
1846 dn->finish_export();
1847
1848 // inode?
1849 if (dn->get_linkage()->is_primary()) {
11fdf7f2 1850 finish_export_inode(in, peer, peer_imported[in->ino()], finished);
7c673cae
FG
1851
1852 // subdirs?
9f95a23c
TL
1853 auto&& dirs = in->get_nested_dirfrags();
1854 subdirs.insert(std::end(subdirs), std::begin(dirs), std::end(dirs));
7c673cae
FG
1855 }
1856
f67539c2 1857 mdcache->touch_dentry_bottom(dn); // move dentry to tail of LRU
7c673cae
FG
1858 ++(*num_dentries);
1859 }
1860
1861 // subdirs
9f95a23c
TL
1862 for (const auto& dir : subdirs) {
1863 finish_export_dir(dir, peer, peer_imported, finished, num_dentries);
1864 }
7c673cae
FG
1865}
1866
1867class C_MDS_ExportFinishLogged : public MigratorLogContext {
1868 CDir *dir;
1869public:
1870 C_MDS_ExportFinishLogged(Migrator *m, CDir *d) : MigratorLogContext(m), dir(d) {}
1871 void finish(int r) override {
1872 mig->export_logged_finish(dir);
1873 }
1874};
1875
1876
1877/*
1878 * i should get an export_ack from the export target.
7c673cae 1879 */
9f95a23c 1880void Migrator::handle_export_ack(const cref_t<MExportDirAck> &m)
7c673cae 1881{
f67539c2 1882 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae 1883 mds_rank_t dest(m->get_source().num());
11fdf7f2
TL
1884 ceph_assert(dir);
1885 ceph_assert(dir->is_frozen_tree_root()); // i'm exporting!
7c673cae
FG
1886
1887 // yay!
9f95a23c 1888 dout(7) << *dir << dendl;
7c673cae 1889
11fdf7f2 1890 mds->hit_export_target(dest, -1);
7c673cae
FG
1891
1892 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
11fdf7f2
TL
1893 ceph_assert(it != export_state.end());
1894 ceph_assert(it->second.state == EXPORT_EXPORTING);
1895 ceph_assert(it->second.tid == m->get_tid());
7c673cae 1896
11fdf7f2
TL
1897 auto bp = m->imported_caps.cbegin();
1898 decode(it->second.peer_imported, bp);
7c673cae
FG
1899
1900 it->second.state = EXPORT_LOGGINGFINISH;
20effc67 1901 ceph_assert(g_conf()->mds_kill_export_at != 9);
7c673cae 1902 set<CDir*> bounds;
f67539c2 1903 mdcache->get_subtree_bounds(dir, bounds);
7c673cae 1904
7c673cae
FG
1905 // log completion.
1906 // include export bounds, to ensure they're in the journal.
31f18b77 1907 EExport *le = new EExport(mds->mdlog, dir, it->second.peer);;
7c673cae
FG
1908 mds->mdlog->start_entry(le);
1909
1910 le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
31f18b77 1911 le->metablob.add_dir(dir, false);
7c673cae
FG
1912 for (set<CDir*>::iterator p = bounds.begin();
1913 p != bounds.end();
1914 ++p) {
1915 CDir *bound = *p;
1916 le->get_bounds().insert(bound->dirfrag());
1917 le->metablob.add_dir_context(bound);
1918 le->metablob.add_dir(bound, false);
1919 }
1920
31f18b77
FG
1921 // list us second, them first.
1922 // this keeps authority().first in sync with subtree auth state in the journal.
f67539c2 1923 mdcache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid());
31f18b77 1924
7c673cae
FG
1925 // log export completion, then finish (unfreeze, trigger finish context, etc.)
1926 mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir));
1927 mds->mdlog->flush();
20effc67 1928 ceph_assert(g_conf()->mds_kill_export_at != 10);
7c673cae
FG
1929}
1930
b32b8144 1931void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
7c673cae 1932{
9f95a23c 1933 dout(7) << *dir << dendl;
7c673cae 1934
11fdf7f2 1935 ceph_assert(stat.state == EXPORT_CANCELLING);
7c673cae
FG
1936
1937 if (stat.notify_ack_waiting.empty()) {
1938 stat.state = EXPORT_CANCELLED;
1939 return;
1940 }
1941
1942 dir->auth_pin(this);
1943
1944 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
1945 p != stat.notify_ack_waiting.end();
1946 ++p) {
9f95a23c 1947 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
11fdf7f2
TL
1948 pair<int,int>(mds->get_nodeid(), stat.peer),
1949 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
7c673cae
FG
1950 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
1951 notify->get_bounds().push_back((*i)->dirfrag());
1952 mds->send_message_mds(notify, *p);
1953 }
1954}
1955
1956/*
20effc67 1957 * this happens if the dest failes after i send the export data but before it is acked
7c673cae
FG
1958 * that is, we don't know they safely received and logged it, so we reverse our changes
1959 * and go on.
1960 */
b32b8144 1961void Migrator::export_reverse(CDir *dir, export_state_t& stat)
7c673cae 1962{
9f95a23c 1963 dout(7) << *dir << dendl;
7c673cae
FG
1964
1965 set<CInode*> to_eval;
1966
1967 set<CDir*> bounds;
f67539c2 1968 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
1969
1970 // remove exporting pins
9f95a23c 1971 std::deque<CDir*> rq;
7c673cae
FG
1972 rq.push_back(dir);
1973 while (!rq.empty()) {
1974 CDir *t = rq.front();
1975 rq.pop_front();
1976 t->abort_export();
94b18763
FG
1977 for (auto &p : *t) {
1978 CDentry *dn = p.second;
1979 dn->abort_export();
1980 if (!dn->get_linkage()->is_primary())
7c673cae 1981 continue;
94b18763 1982 CInode *in = dn->get_linkage()->get_inode();
7c673cae
FG
1983 in->abort_export();
1984 if (in->state_test(CInode::STATE_EVALSTALECAPS)) {
1985 in->state_clear(CInode::STATE_EVALSTALECAPS);
1986 to_eval.insert(in);
1987 }
9f95a23c
TL
1988 if (in->is_dir()) {
1989 auto&& dirs = in->get_nested_dirfrags();
1990 for (const auto& dir : dirs) {
1991 rq.push_back(dir);
1992 }
1993 }
7c673cae
FG
1994 }
1995 }
1996
1997 // unpin bounds
b32b8144 1998 for (auto bd : bounds) {
7c673cae
FG
1999 bd->put(CDir::PIN_EXPORTBOUND);
2000 bd->state_clear(CDir::STATE_EXPORTBOUND);
2001 }
2002
7c673cae 2003 // notify bystanders
b32b8144 2004 export_notify_abort(dir, stat, bounds);
7c673cae 2005
224ce89b 2006 // unfreeze tree, with possible subtree merge.
f67539c2 2007 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
224ce89b 2008
7c673cae 2009 // process delayed expires
f67539c2 2010 mdcache->process_delayed_expire(dir);
224ce89b 2011
7c673cae 2012 dir->unfreeze_tree();
f67539c2 2013 mdcache->try_subtree_merge(dir);
7c673cae
FG
2014
2015 // revoke/resume stale caps
2016 for (auto in : to_eval) {
2017 bool need_issue = false;
11fdf7f2
TL
2018 for (auto &p : in->client_caps) {
2019 Capability *cap = &p.second;
a8e16298 2020 if (!cap->is_stale()) {
7c673cae 2021 need_issue = true;
a8e16298 2022 break;
7c673cae
FG
2023 }
2024 }
2025 if (need_issue &&
2026 (!in->is_auth() || !mds->locker->eval(in, CEPH_CAP_LOCKS)))
2027 mds->locker->issue_caps(in);
2028 }
2029
f67539c2 2030 mdcache->show_cache();
7c673cae
FG
2031}
2032
2033
2034/*
2035 * once i get the ack, and logged the EExportFinish(true),
2036 * send notifies (if any), otherwise go straight to finish.
2037 *
2038 */
2039void Migrator::export_logged_finish(CDir *dir)
2040{
9f95a23c 2041 dout(7) << *dir << dendl;
7c673cae
FG
2042
2043 export_state_t& stat = export_state[dir];
2044
2045 // send notifies
2046 set<CDir*> bounds;
f67539c2 2047 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
2048
2049 for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
2050 p != stat.notify_ack_waiting.end();
2051 ++p) {
9f95a23c 2052 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
11fdf7f2
TL
2053 pair<int,int>(mds->get_nodeid(), stat.peer),
2054 pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
7c673cae
FG
2055
2056 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2057 notify->get_bounds().push_back((*i)->dirfrag());
2058
2059 mds->send_message_mds(notify, *p);
2060 }
2061
2062 // wait for notifyacks
2063 stat.state = EXPORT_NOTIFYING;
20effc67 2064 ceph_assert(g_conf()->mds_kill_export_at != 11);
7c673cae
FG
2065
2066 // no notifies to wait for?
2067 if (stat.notify_ack_waiting.empty()) {
2068 export_finish(dir); // skip notify/notify_ack stage.
2069 } else {
2070 // notify peer to send cap import messages to clients
2071 if (!mds->is_cluster_degraded() ||
2072 mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) {
9f95a23c 2073 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), false, stat.tid), stat.peer);
7c673cae
FG
2074 } else {
2075 dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
2076 }
2077 }
2078}
2079
2080/*
2081 * warning:
2082 * i'll get an ack from each bystander.
2083 * when i get them all, do the export.
2084 * notify:
2085 * i'll get an ack from each bystander.
2086 * when i get them all, unfreeze and send the finish.
7c673cae 2087 */
9f95a23c 2088void Migrator::handle_export_notify_ack(const cref_t<MExportDirNotifyAck> &m)
7c673cae 2089{
f67539c2 2090 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae 2091 mds_rank_t dest(m->get_source().num());
11fdf7f2 2092 ceph_assert(dir);
7c673cae
FG
2093 mds_rank_t from = mds_rank_t(m->get_source().num());
2094
11fdf7f2 2095 mds->hit_export_target(dest, -1);
7c673cae
FG
2096
2097 auto export_state_entry = export_state.find(dir);
2098 if (export_state_entry != export_state.end()) {
2099 export_state_t& stat = export_state_entry->second;
2100 if (stat.state == EXPORT_WARNING &&
2101 stat.warning_ack_waiting.erase(from)) {
2102 // exporting. process warning.
9f95a23c 2103 dout(7) << "from " << m->get_source()
7c673cae
FG
2104 << ": exporting, processing warning on " << *dir << dendl;
2105 if (stat.warning_ack_waiting.empty())
2106 export_go(dir); // start export.
2107 } else if (stat.state == EXPORT_NOTIFYING &&
2108 stat.notify_ack_waiting.erase(from)) {
2109 // exporting. process notify.
9f95a23c 2110 dout(7) << "from " << m->get_source()
7c673cae
FG
2111 << ": exporting, processing notify on " << *dir << dendl;
2112 if (stat.notify_ack_waiting.empty())
2113 export_finish(dir);
2114 } else if (stat.state == EXPORT_CANCELLING &&
2115 m->get_new_auth().second == CDIR_AUTH_UNKNOWN && // not warning ack
2116 stat.notify_ack_waiting.erase(from)) {
9f95a23c 2117 dout(7) << "from " << m->get_source()
7c673cae
FG
2118 << ": cancelling export, processing notify on " << *dir << dendl;
2119 if (stat.notify_ack_waiting.empty()) {
91327a77 2120 export_cancel_finish(export_state_entry);
7c673cae
FG
2121 }
2122 }
2123 }
2124 else {
2125 auto import_state_entry = import_state.find(dir->dirfrag());
2126 if (import_state_entry != import_state.end()) {
2127 import_state_t& stat = import_state_entry->second;
2128 if (stat.state == IMPORT_ABORTING) {
2129 // reversing import
9f95a23c 2130 dout(7) << "from " << m->get_source()
7c673cae 2131 << ": aborting import on " << *dir << dendl;
11fdf7f2 2132 ceph_assert(stat.bystanders.count(from));
7c673cae
FG
2133 stat.bystanders.erase(from);
2134 if (stat.bystanders.empty())
2135 import_reverse_unfreeze(dir);
2136 }
2137 }
2138 }
7c673cae
FG
2139}
2140
2141void Migrator::export_finish(CDir *dir)
2142{
9f95a23c 2143 dout(3) << *dir << dendl;
7c673cae 2144
20effc67 2145 ceph_assert(g_conf()->mds_kill_export_at != 12);
7c673cae
FG
2146 map<CDir*,export_state_t>::iterator it = export_state.find(dir);
2147 if (it == export_state.end()) {
2148 dout(7) << "target must have failed, not sending final commit message. export succeeded anyway." << dendl;
2149 return;
2150 }
2151
2152 // send finish/commit to new auth
2153 if (!mds->is_cluster_degraded() ||
2154 mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
9f95a23c 2155 mds->send_message_mds(make_message<MExportDirFinish>(dir->dirfrag(), true, it->second.tid), it->second.peer);
7c673cae
FG
2156 } else {
2157 dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl;
2158 }
11fdf7f2 2159 ceph_assert(g_conf()->mds_kill_export_at != 13);
7c673cae
FG
2160
2161 // finish export (adjust local cache state)
2162 int num_dentries = 0;
11fdf7f2
TL
2163 MDSContext::vec finished;
2164 finish_export_dir(dir, it->second.peer,
224ce89b
WB
2165 it->second.peer_imported, finished, &num_dentries);
2166
11fdf7f2 2167 ceph_assert(!dir->is_auth());
f67539c2 2168 mdcache->adjust_subtree_auth(dir, it->second.peer);
224ce89b 2169
7c673cae
FG
2170 // unpin bounds
2171 set<CDir*> bounds;
f67539c2 2172 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
2173 for (set<CDir*>::iterator p = bounds.begin();
2174 p != bounds.end();
2175 ++p) {
2176 CDir *bd = *p;
2177 bd->put(CDir::PIN_EXPORTBOUND);
2178 bd->state_clear(CDir::STATE_EXPORTBOUND);
2179 }
2180
2181 if (dir->state_test(CDir::STATE_AUXSUBTREE))
2182 dir->state_clear(CDir::STATE_AUXSUBTREE);
2183
224ce89b 2184 // discard delayed expires
f67539c2 2185 mdcache->discard_delayed_expire(dir);
224ce89b 2186
9f95a23c 2187 dout(7) << "unfreezing" << dendl;
224ce89b
WB
2188
2189 // unfreeze tree, with possible subtree merge.
7c673cae 2190 // (we do this _after_ removing EXPORTBOUND pins, to allow merges)
224ce89b 2191 dir->unfreeze_tree();
f67539c2 2192 mdcache->try_subtree_merge(dir);
7c673cae
FG
2193
2194 // no more auth subtree? clear scatter dirty
2195 if (!dir->get_inode()->is_auth() &&
2196 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2197 dir->get_inode()->clear_scatter_dirty();
2198 // wake up scatter_nudge waiters
224ce89b 2199 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, finished);
7c673cae
FG
2200 }
2201
224ce89b
WB
2202 if (!finished.empty())
2203 mds->queue_waiters(finished);
7c673cae 2204
91327a77
AA
2205 MutationRef mut = std::move(it->second.mut);
2206 auto parent = std::move(it->second.parent);
7c673cae 2207 // remove from exporting list, clean up state
91327a77 2208 total_exporting_size -= it->second.approx_size;
7c673cae 2209 export_state.erase(it);
91327a77 2210
11fdf7f2 2211 ceph_assert(dir->state_test(CDir::STATE_EXPORTING));
1adf2230 2212 dir->clear_exporting();
7c673cae 2213
f67539c2 2214 mdcache->show_subtrees();
7c673cae
FG
2215 audit();
2216
f67539c2 2217 mdcache->trim(num_dentries); // try trimming exported dentries
7c673cae
FG
2218
2219 // send pending import_maps?
f67539c2 2220 mdcache->maybe_send_pending_resolves();
7c673cae
FG
2221
2222 // drop locks, unpin path
2223 if (mut) {
2224 mds->locker->drop_locks(mut.get());
2225 mut->cleanup();
2226 }
91327a77
AA
2227
2228 if (parent)
2229 child_export_finish(parent, true);
2230
7c673cae
FG
2231 maybe_do_queued_export();
2232}
2233
2234
2235
11fdf7f2
TL
2236class C_MDS_ExportDiscover : public MigratorContext {
2237public:
9f95a23c 2238 C_MDS_ExportDiscover(Migrator *mig, const cref_t<MExportDirDiscover>& m) : MigratorContext(mig), m(m) {}
11fdf7f2
TL
2239 void finish(int r) override {
2240 mig->handle_export_discover(m, true);
2241 }
2242private:
9f95a23c 2243 cref_t<MExportDirDiscover> m;
11fdf7f2 2244};
7c673cae 2245
11fdf7f2
TL
2246class C_MDS_ExportDiscoverFactory : public MDSContextFactory {
2247public:
9f95a23c 2248 C_MDS_ExportDiscoverFactory(Migrator *mig, cref_t<MExportDirDiscover> m) : mig(mig), m(m) {}
11fdf7f2
TL
2249 MDSContext *build() {
2250 return new C_MDS_ExportDiscover(mig, m);
2251 }
2252private:
2253 Migrator *mig;
9f95a23c 2254 cref_t<MExportDirDiscover> m;
11fdf7f2 2255};
7c673cae
FG
2256
2257// ==========================================================
2258// IMPORT
2259
9f95a23c 2260void Migrator::handle_export_discover(const cref_t<MExportDirDiscover> &m, bool started)
7c673cae
FG
2261{
2262 mds_rank_t from = m->get_source_mds();
11fdf7f2 2263 ceph_assert(from != mds->get_nodeid());
7c673cae 2264
9f95a23c 2265 dout(7) << m->get_path() << dendl;
7c673cae
FG
2266
2267 // note import state
2268 dirfrag_t df = m->get_dirfrag();
c07f9fc5
FG
2269
2270 if (!mds->is_active()) {
2271 dout(7) << " not active, send NACK " << dendl;
9f95a23c 2272 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid(), false), from);
c07f9fc5
FG
2273 return;
2274 }
2275
7c673cae 2276 // only start discovering on this message once.
b32b8144 2277 import_state_t *p_state;
7c673cae 2278 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
11fdf7f2
TL
2279 if (!started) {
2280 ceph_assert(it == import_state.end());
b32b8144
FG
2281 p_state = &import_state[df];
2282 p_state->state = IMPORT_DISCOVERING;
2283 p_state->peer = from;
2284 p_state->tid = m->get_tid();
7c673cae
FG
2285 } else {
2286 // am i retrying after ancient path_traverse results?
2287 if (it == import_state.end() ||
2288 it->second.peer != from ||
2289 it->second.tid != m->get_tid()) {
2290 dout(7) << " dropping obsolete message" << dendl;
7c673cae
FG
2291 return;
2292 }
11fdf7f2 2293 ceph_assert(it->second.state == IMPORT_DISCOVERING);
b32b8144 2294 p_state = &it->second;
7c673cae
FG
2295 }
2296
11fdf7f2 2297 C_MDS_ExportDiscoverFactory cf(this, m);
f67539c2 2298 if (!mdcache->is_open()) {
9f95a23c 2299 dout(10) << " waiting for root" << dendl;
11fdf7f2 2300 mds->mdcache->wait_for_open(cf.build());
7c673cae
FG
2301 return;
2302 }
2303
20effc67 2304 ceph_assert(g_conf()->mds_kill_import_at != 1);
7c673cae
FG
2305
2306 // do we have it?
f67539c2 2307 CInode *in = mdcache->get_inode(m->get_dirfrag().ino);
7c673cae
FG
2308 if (!in) {
2309 // must discover it!
2310 filepath fpath(m->get_path());
2311 vector<CDentry*> trace;
2312 MDRequestRef null_ref;
f67539c2
TL
2313 int r = mdcache->path_traverse(null_ref, cf, fpath,
2314 MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_PATH_LOCKED,
2315 &trace);
7c673cae
FG
2316 if (r > 0) return;
2317 if (r < 0) {
9f95a23c 2318 dout(7) << "failed to discover or not dir " << m->get_path() << ", NAK" << dendl;
7c673cae
FG
2319 ceph_abort(); // this shouldn't happen if the auth pins its path properly!!!!
2320 }
2321
2322 ceph_abort(); // this shouldn't happen; the get_inode above would have succeeded.
2323 }
2324
2325 // yay
9f95a23c 2326 dout(7) << "have " << df << " inode " << *in << dendl;
7c673cae 2327
b32b8144 2328 p_state->state = IMPORT_DISCOVERED;
7c673cae
FG
2329
2330 // pin inode in the cache (for now)
11fdf7f2 2331 ceph_assert(in->is_dir());
7c673cae
FG
2332 in->get(CInode::PIN_IMPORTING);
2333
2334 // reply
2335 dout(7) << " sending export_discover_ack on " << *in << dendl;
9f95a23c 2336 mds->send_message_mds(make_message<MExportDirDiscoverAck>(df, m->get_tid()), p_state->peer);
20effc67 2337 ceph_assert(g_conf()->mds_kill_import_at != 2);
7c673cae
FG
2338}
2339
2340void Migrator::import_reverse_discovering(dirfrag_t df)
2341{
2342 import_state.erase(df);
2343}
2344
2345void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
2346{
2347 // unpin base
2348 diri->put(CInode::PIN_IMPORTING);
2349 import_state.erase(df);
2350}
2351
b32b8144 2352void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
7c673cae
FG
2353{
2354 set<CDir*> bounds;
f67539c2 2355 mdcache->map_dirfrag_set(stat.bound_ls, bounds);
7c673cae
FG
2356 import_remove_pins(dir, bounds);
2357 import_reverse_final(dir);
2358}
2359
9f95a23c 2360void Migrator::handle_export_cancel(const cref_t<MExportDirCancel> &m)
7c673cae 2361{
9f95a23c 2362 dout(7) << "on " << m->get_dirfrag() << dendl;
7c673cae
FG
2363 dirfrag_t df = m->get_dirfrag();
2364 map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
2365 if (it == import_state.end()) {
11fdf7f2 2366 ceph_abort_msg("got export_cancel in weird state");
7c673cae
FG
2367 } else if (it->second.state == IMPORT_DISCOVERING) {
2368 import_reverse_discovering(df);
2369 } else if (it->second.state == IMPORT_DISCOVERED) {
f67539c2 2370 CInode *in = mdcache->get_inode(df.ino);
11fdf7f2 2371 ceph_assert(in);
7c673cae
FG
2372 import_reverse_discovered(df, in);
2373 } else if (it->second.state == IMPORT_PREPPING) {
f67539c2 2374 CDir *dir = mdcache->get_dirfrag(df);
11fdf7f2 2375 ceph_assert(dir);
b32b8144 2376 import_reverse_prepping(dir, it->second);
7c673cae 2377 } else if (it->second.state == IMPORT_PREPPED) {
f67539c2 2378 CDir *dir = mdcache->get_dirfrag(df);
11fdf7f2 2379 ceph_assert(dir);
7c673cae 2380 set<CDir*> bounds;
f67539c2 2381 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
2382 import_remove_pins(dir, bounds);
2383 // adjust auth back to the exportor
f67539c2 2384 mdcache->adjust_subtree_auth(dir, it->second.peer);
7c673cae
FG
2385 import_reverse_unfreeze(dir);
2386 } else {
11fdf7f2 2387 ceph_abort_msg("got export_cancel in weird state");
7c673cae 2388 }
7c673cae
FG
2389}
2390
11fdf7f2
TL
2391class C_MDS_ExportPrep : public MigratorContext {
2392public:
9f95a23c 2393 C_MDS_ExportPrep(Migrator *mig, const cref_t<MExportDirPrep>& m) : MigratorContext(mig), m(m) {}
11fdf7f2
TL
2394 void finish(int r) override {
2395 mig->handle_export_prep(m, true);
2396 }
2397private:
9f95a23c 2398 cref_t<MExportDirPrep> m;
11fdf7f2
TL
2399};
2400
2401class C_MDS_ExportPrepFactory : public MDSContextFactory {
2402public:
9f95a23c 2403 C_MDS_ExportPrepFactory(Migrator *mig, cref_t<MExportDirPrep> m) : mig(mig), m(m) {}
11fdf7f2
TL
2404 MDSContext *build() {
2405 return new C_MDS_ExportPrep(mig, m);
2406 }
2407private:
2408 Migrator *mig;
9f95a23c 2409 cref_t<MExportDirPrep> m;
11fdf7f2
TL
2410};
2411
9f95a23c
TL
2412void Migrator::decode_export_prep_trace(bufferlist::const_iterator& blp, mds_rank_t oldauth, MDSContext::vec& finished)
2413{
2414 DECODE_START(1, blp);
2415 dirfrag_t df;
2416 decode(df, blp);
2417 char start;
2418 decode(start, blp);
2419 dout(10) << " trace from " << df << " start " << start << dendl;
2420
2421 CDir *cur = nullptr;
2422 if (start == 'd') {
f67539c2 2423 cur = mdcache->get_dirfrag(df);
9f95a23c
TL
2424 ceph_assert(cur);
2425 dout(10) << " had " << *cur << dendl;
2426 } else if (start == 'f') {
f67539c2 2427 CInode *in = mdcache->get_inode(df.ino);
9f95a23c
TL
2428 ceph_assert(in);
2429 dout(10) << " had " << *in << dendl;
f67539c2 2430 mdcache->decode_replica_dir(cur, blp, in, oldauth, finished);
9f95a23c
TL
2431 dout(10) << " added " << *cur << dendl;
2432 } else if (start == '-') {
2433 // nothing
2434 } else
2435 ceph_abort_msg("unrecognized start char");
2436
2437 while (!blp.end()) {
2438 CDentry *dn = nullptr;
f67539c2 2439 mdcache->decode_replica_dentry(dn, blp, cur, finished);
9f95a23c
TL
2440 dout(10) << " added " << *dn << dendl;
2441 CInode *in = nullptr;
f67539c2 2442 mdcache->decode_replica_inode(in, blp, dn, finished);
9f95a23c
TL
2443 dout(10) << " added " << *in << dendl;
2444 if (blp.end())
2445 break;
f67539c2 2446 mdcache->decode_replica_dir(cur, blp, in, oldauth, finished);
9f95a23c
TL
2447 dout(10) << " added " << *cur << dendl;
2448 }
2449
2450 DECODE_FINISH(blp);
2451}
2452
2453void Migrator::handle_export_prep(const cref_t<MExportDirPrep> &m, bool did_assim)
7c673cae
FG
2454{
2455 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
11fdf7f2 2456 ceph_assert(oldauth != mds->get_nodeid());
7c673cae
FG
2457
2458 CDir *dir;
2459 CInode *diri;
11fdf7f2 2460 MDSContext::vec finished;
7c673cae
FG
2461
2462 // assimilate root dir.
2463 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
11fdf7f2
TL
2464 if (!did_assim) {
2465 ceph_assert(it != import_state.end());
2466 ceph_assert(it->second.state == IMPORT_DISCOVERED);
2467 ceph_assert(it->second.peer == oldauth);
f67539c2 2468 diri = mdcache->get_inode(m->get_dirfrag().ino);
11fdf7f2
TL
2469 ceph_assert(diri);
2470 auto p = m->basedir.cbegin();
f67539c2 2471 mdcache->decode_replica_dir(dir, p, diri, oldauth, finished);
9f95a23c 2472 dout(7) << "on " << *dir << " (first pass)" << dendl;
7c673cae
FG
2473 } else {
2474 if (it == import_state.end() ||
2475 it->second.peer != oldauth ||
2476 it->second.tid != m->get_tid()) {
9f95a23c 2477 dout(7) << "obsolete message, dropping" << dendl;
7c673cae
FG
2478 return;
2479 }
11fdf7f2
TL
2480 ceph_assert(it->second.state == IMPORT_PREPPING);
2481 ceph_assert(it->second.peer == oldauth);
7c673cae 2482
f67539c2 2483 dir = mdcache->get_dirfrag(m->get_dirfrag());
11fdf7f2 2484 ceph_assert(dir);
9f95a23c 2485 dout(7) << "on " << *dir << " (subsequent pass)" << dendl;
7c673cae
FG
2486 diri = dir->get_inode();
2487 }
11fdf7f2 2488 ceph_assert(dir->is_auth() == false);
7c673cae 2489
f67539c2 2490 mdcache->show_subtrees();
7c673cae
FG
2491
2492 // build import bound map
2493 map<inodeno_t, fragset_t> import_bound_fragset;
11fdf7f2
TL
2494 for (const auto &bound : m->get_bounds()) {
2495 dout(10) << " bound " << bound << dendl;
9f95a23c 2496 import_bound_fragset[bound.ino].insert_raw(bound.frag);
7c673cae 2497 }
7c673cae 2498 // assimilate contents?
11fdf7f2 2499 if (!did_assim) {
7c673cae 2500 dout(7) << "doing assim on " << *dir << dendl;
7c673cae
FG
2501
2502 // change import state
2503 it->second.state = IMPORT_PREPPING;
2504 it->second.bound_ls = m->get_bounds();
2505 it->second.bystanders = m->get_bystanders();
11fdf7f2 2506 ceph_assert(g_conf()->mds_kill_import_at != 3);
7c673cae
FG
2507
2508 // bystander list
2509 dout(7) << "bystanders are " << it->second.bystanders << dendl;
2510
2511 // move pin to dir
2512 diri->put(CInode::PIN_IMPORTING);
2513 dir->get(CDir::PIN_IMPORTING);
2514 dir->state_set(CDir::STATE_IMPORTING);
2515
2516 // assimilate traces to exports
2517 // each trace is: df ('-' | ('f' dir | 'd') dentry inode (dir dentry inode)*)
11fdf7f2 2518 for (const auto &bl : m->traces) {
9f95a23c
TL
2519 auto blp = bl.cbegin();
2520 decode_export_prep_trace(blp, oldauth, finished);
7c673cae
FG
2521 }
2522
2523 // make bound sticky
2524 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2525 p != import_bound_fragset.end();
2526 ++p) {
9f95a23c 2527 p->second.simplify();
f67539c2 2528 CInode *in = mdcache->get_inode(p->first);
11fdf7f2 2529 ceph_assert(in);
7c673cae
FG
2530 in->get_stickydirs();
2531 dout(7) << " set stickydirs on bound inode " << *in << dendl;
2532 }
2533
2534 } else {
2535 dout(7) << " not doing assim on " << *dir << dendl;
2536 }
2537
81eedcae 2538 MDSGatherBuilder gather(g_ceph_context);
11fdf7f2 2539
7c673cae
FG
2540 if (!finished.empty())
2541 mds->queue_waiters(finished);
2542
2543
c07f9fc5
FG
2544 bool success = true;
2545 if (mds->is_active()) {
2546 // open all bounds
2547 set<CDir*> import_bounds;
2548 for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
2549 p != import_bound_fragset.end();
2550 ++p) {
f67539c2 2551 CInode *in = mdcache->get_inode(p->first);
11fdf7f2 2552 ceph_assert(in);
7c673cae 2553
c07f9fc5 2554 // map fragset into a frag_t list, based on the inode fragtree
11fdf7f2
TL
2555 frag_vec_t leaves;
2556 for (const auto& frag : p->second) {
2557 in->dirfragtree.get_leaves_under(frag, leaves);
2558 }
2559 dout(10) << " bound inode " << p->first << " fragset " << p->second << " maps to " << leaves << dendl;
c07f9fc5 2560
11fdf7f2 2561 for (const auto& leaf : leaves) {
f67539c2 2562 CDir *bound = mdcache->get_dirfrag(dirfrag_t(p->first, leaf));
c07f9fc5 2563 if (!bound) {
11fdf7f2 2564 dout(7) << " opening bounding dirfrag " << leaf << " on " << *in << dendl;
f67539c2 2565 mdcache->open_remote_dirfrag(in, leaf, gather.new_sub());
81eedcae 2566 continue;
c07f9fc5 2567 }
7c673cae 2568
c07f9fc5
FG
2569 if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
2570 dout(7) << " pinning import bound " << *bound << dendl;
2571 bound->get(CDir::PIN_IMPORTBOUND);
2572 bound->state_set(CDir::STATE_IMPORTBOUND);
2573 } else {
2574 dout(7) << " already pinned import bound " << *bound << dendl;
2575 }
2576 import_bounds.insert(bound);
7c673cae 2577 }
7c673cae 2578 }
7c673cae 2579
81eedcae
TL
2580 if (gather.has_subs()) {
2581 C_MDS_ExportPrepFactory cf(this, m);
2582 gather.set_finisher(cf.build());
2583 gather.activate();
2584 return;
2585 }
2586
c07f9fc5
FG
2587 dout(7) << " all ready, noting auth and freezing import region" << dendl;
2588
f67539c2 2589 if (!mdcache->is_readonly() &&
9f95a23c
TL
2590 // for pinning scatter gather. loner has a higher chance to get wrlock
2591 diri->filelock.can_wrlock(diri->get_loner()) &&
2592 diri->nestlock.can_wrlock(diri->get_loner())) {
c07f9fc5
FG
2593 it->second.mut = new MutationImpl();
2594 // force some locks. hacky.
2595 mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);
2596 mds->locker->wrlock_force(&dir->inode->nestlock, it->second.mut);
2597
2598 // note that i am an ambiguous auth for this subtree.
2599 // specify bounds, since the exporter explicitly defines the region.
f67539c2 2600 mdcache->adjust_bounded_subtree_auth(dir, import_bounds,
c07f9fc5 2601 pair<int,int>(oldauth, mds->get_nodeid()));
f67539c2 2602 mdcache->verify_subtree_bounds(dir, import_bounds);
c07f9fc5
FG
2603 // freeze.
2604 dir->_freeze_tree();
2605 // note new state
2606 it->second.state = IMPORT_PREPPED;
2607 } else {
2608 dout(7) << " couldn't acquire all needed locks, failing. " << *dir << dendl;
2609 success = false;
2610 }
7c673cae 2611 } else {
c07f9fc5 2612 dout(7) << " not active, failing. " << *dir << dendl;
7c673cae 2613 success = false;
7c673cae
FG
2614 }
2615
c07f9fc5 2616 if (!success)
b32b8144 2617 import_reverse_prepping(dir, it->second);
c07f9fc5 2618
7c673cae
FG
2619 // ok!
2620 dout(7) << " sending export_prep_ack on " << *dir << dendl;
9f95a23c 2621 mds->send_message(make_message<MExportDirPrepAck>(dir->dirfrag(), success, m->get_tid()), m->get_connection());
7c673cae 2622
11fdf7f2 2623 ceph_assert(g_conf()->mds_kill_import_at != 4);
7c673cae
FG
2624}
2625
2626
2627
2628
2629class C_MDS_ImportDirLoggedStart : public MigratorLogContext {
2630 dirfrag_t df;
2631 CDir *dir;
2632 mds_rank_t from;
2633public:
28e407b8 2634 map<client_t,pair<Session*,uint64_t> > imported_session_map;
7c673cae
FG
2635
2636 C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
2637 MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
9f95a23c 2638 dir->get(CDir::PIN_PTRWAITER);
7c673cae
FG
2639 }
2640 void finish(int r) override {
28e407b8 2641 mig->import_logged_start(df, dir, from, imported_session_map);
9f95a23c 2642 dir->put(CDir::PIN_PTRWAITER);
7c673cae
FG
2643 }
2644};
2645
9f95a23c 2646void Migrator::handle_export_dir(const cref_t<MExportDir> &m)
7c673cae 2647{
20effc67 2648 ceph_assert(g_conf()->mds_kill_import_at != 5);
f67539c2 2649 CDir *dir = mdcache->get_dirfrag(m->dirfrag);
11fdf7f2 2650 ceph_assert(dir);
31f18b77
FG
2651
2652 mds_rank_t oldauth = mds_rank_t(m->get_source().num());
9f95a23c 2653 dout(7) << "importing " << *dir << " from " << oldauth << dendl;
31f18b77 2654
11fdf7f2
TL
2655 ceph_assert(!dir->is_auth());
2656 ceph_assert(dir->freeze_tree_state);
7c673cae
FG
2657
2658 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->dirfrag);
11fdf7f2
TL
2659 ceph_assert(it != import_state.end());
2660 ceph_assert(it->second.state == IMPORT_PREPPED);
2661 ceph_assert(it->second.tid == m->get_tid());
2662 ceph_assert(it->second.peer == oldauth);
7c673cae
FG
2663
2664 if (!dir->get_inode()->dirfragtree.is_leaf(dir->get_frag()))
2665 dir->get_inode()->dirfragtree.force_to_leaf(g_ceph_context, dir->get_frag());
2666
f67539c2 2667 mdcache->show_subtrees();
7c673cae 2668
31f18b77 2669 C_MDS_ImportDirLoggedStart *onlogged = new C_MDS_ImportDirLoggedStart(this, dir, oldauth);
7c673cae
FG
2670
2671 // start the journal entry
31f18b77 2672 EImportStart *le = new EImportStart(mds->mdlog, dir->dirfrag(), m->bounds, oldauth);
7c673cae
FG
2673 mds->mdlog->start_entry(le);
2674
2675 le->metablob.add_dir_context(dir);
2676
2677 // adjust auth (list us _first_)
f67539c2 2678 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), oldauth);
7c673cae
FG
2679
2680 // new client sessions, open these after we journal
2681 // include imported sessions in EImportStart
11fdf7f2 2682 auto cmp = m->client_map.cbegin();
28e407b8 2683 map<client_t,entity_inst_t> client_map;
11fdf7f2 2684 map<client_t,client_metadata_t> client_metadata_map;
28e407b8 2685 decode(client_map, cmp);
11fdf7f2
TL
2686 decode(client_metadata_map, cmp);
2687 ceph_assert(cmp.end());
2688 le->cmapv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
2689 onlogged->imported_session_map);
28e407b8 2690 encode(client_map, le->client_map, mds->mdsmap->get_up_features());
11fdf7f2 2691 encode(client_metadata_map, le->client_map);
7c673cae 2692
11fdf7f2 2693 auto blp = m->export_data.cbegin();
7c673cae
FG
2694 int num_imported_inodes = 0;
2695 while (!blp.end()) {
9f95a23c
TL
2696 decode_import_dir(blp,
2697 oldauth,
2698 dir, // import root
2699 le,
2700 mds->mdlog->get_current_segment(),
2701 it->second.peer_exports,
2702 it->second.updated_scatterlocks,
2703 num_imported_inodes);
7c673cae
FG
2704 }
2705 dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
2706
2707 // include bounds in EImportStart
2708 set<CDir*> import_bounds;
11fdf7f2 2709 for (const auto &bound : m->bounds) {
f67539c2 2710 CDir *bd = mdcache->get_dirfrag(bound);
11fdf7f2 2711 ceph_assert(bd);
7c673cae
FG
2712 le->metablob.add_dir(bd, false); // note that parent metadata is already in the event
2713 import_bounds.insert(bd);
2714 }
f67539c2 2715 mdcache->verify_subtree_bounds(dir, import_bounds);
7c673cae
FG
2716
2717 // adjust popularity
11fdf7f2 2718 mds->balancer->add_import(dir);
7c673cae 2719
9f95a23c 2720 dout(7) << "did " << *dir << dendl;
7c673cae
FG
2721
2722 // note state
2723 it->second.state = IMPORT_LOGGINGSTART;
20effc67 2724 ceph_assert(g_conf()->mds_kill_import_at != 6);
7c673cae
FG
2725
2726 // log it
2727 mds->mdlog->submit_entry(le, onlogged);
2728 mds->mdlog->flush();
2729
2730 // some stats
2731 if (mds->logger) {
2732 mds->logger->inc(l_mds_imported);
2733 mds->logger->inc(l_mds_imported_inodes, num_imported_inodes);
2734 }
7c673cae
FG
2735}
2736
2737
2738/*
2739 * this is an import helper
2740 * called by import_finish, and import_reverse and friends.
2741 */
2742void Migrator::import_remove_pins(CDir *dir, set<CDir*>& bounds)
2743{
2744 import_state_t& stat = import_state[dir->dirfrag()];
2745 // root
2746 dir->put(CDir::PIN_IMPORTING);
2747 dir->state_clear(CDir::STATE_IMPORTING);
2748
2749 // bounding inodes
2750 set<inodeno_t> did;
2751 for (list<dirfrag_t>::iterator p = stat.bound_ls.begin();
2752 p != stat.bound_ls.end();
2753 ++p) {
2754 if (did.count(p->ino))
2755 continue;
2756 did.insert(p->ino);
f67539c2 2757 CInode *in = mdcache->get_inode(p->ino);
11fdf7f2 2758 ceph_assert(in);
7c673cae
FG
2759 in->put_stickydirs();
2760 }
2761
2762 if (stat.state == IMPORT_PREPPING) {
2763 for (auto bd : bounds) {
2764 if (bd->state_test(CDir::STATE_IMPORTBOUND)) {
2765 bd->put(CDir::PIN_IMPORTBOUND);
2766 bd->state_clear(CDir::STATE_IMPORTBOUND);
2767 }
2768 }
2769 } else if (stat.state >= IMPORT_PREPPED) {
2770 // bounding dirfrags
2771 for (auto bd : bounds) {
11fdf7f2 2772 ceph_assert(bd->state_test(CDir::STATE_IMPORTBOUND));
7c673cae
FG
2773 bd->put(CDir::PIN_IMPORTBOUND);
2774 bd->state_clear(CDir::STATE_IMPORTBOUND);
2775 }
2776 }
2777}
2778
91327a77
AA
2779class C_MDC_QueueContexts : public MigratorContext {
2780public:
11fdf7f2 2781 MDSContext::vec contexts;
91327a77
AA
2782 C_MDC_QueueContexts(Migrator *m) : MigratorContext(m) {}
2783 void finish(int r) override {
2784 // execute contexts immediately after 'this' context
2785 get_mds()->queue_waiters_front(contexts);
2786 }
2787};
7c673cae
FG
2788
2789/*
2790 * note: this does teh full work of reversing and import and cleaning up
2791 * state.
2792 * called by both handle_mds_failure and by handle_resolve (if we are
2793 * a survivor coping with an exporter failure+recovery).
2794 */
2795void Migrator::import_reverse(CDir *dir)
2796{
9f95a23c 2797 dout(7) << *dir << dendl;
7c673cae
FG
2798
2799 import_state_t& stat = import_state[dir->dirfrag()];
2800 stat.state = IMPORT_ABORTING;
2801
2802 set<CDir*> bounds;
f67539c2 2803 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
2804
2805 // remove pins
2806 import_remove_pins(dir, bounds);
2807
2808 // update auth, with possible subtree merge.
11fdf7f2 2809 ceph_assert(dir->is_subtree_root());
7c673cae 2810 if (mds->is_resolve())
f67539c2 2811 mdcache->trim_non_auth_subtree(dir);
7c673cae 2812
f67539c2 2813 mdcache->adjust_subtree_auth(dir, stat.peer);
7c673cae 2814
91327a77 2815 auto fin = new C_MDC_QueueContexts(this);
7c673cae
FG
2816 if (!dir->get_inode()->is_auth() &&
2817 !dir->get_inode()->has_subtree_root_dirfrag(mds->get_nodeid())) {
2818 dir->get_inode()->clear_scatter_dirty();
2819 // wake up scatter_nudge waiters
2820 dir->get_inode()->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2821 }
2822
2823 int num_dentries = 0;
2824 // adjust auth bits.
9f95a23c 2825 std::deque<CDir*> q;
7c673cae
FG
2826 q.push_back(dir);
2827 while (!q.empty()) {
2828 CDir *cur = q.front();
2829 q.pop_front();
2830
2831 // dir
11fdf7f2 2832 cur->abort_import();
7c673cae 2833
94b18763
FG
2834 for (auto &p : *cur) {
2835 CDentry *dn = p.second;
7c673cae
FG
2836
2837 // dentry
1e59de90 2838 dn->clear_auth();
7c673cae
FG
2839 dn->clear_replica_map();
2840 dn->set_replica_nonce(CDentry::EXPORT_NONCE);
2841 if (dn->is_dirty())
2842 dn->mark_clean();
2843
2844 // inode?
2845 if (dn->get_linkage()->is_primary()) {
2846 CInode *in = dn->get_linkage()->get_inode();
1e59de90 2847 in->state_clear(CInode::STATE_AUTH);
7c673cae
FG
2848 in->clear_replica_map();
2849 in->set_replica_nonce(CInode::EXPORT_NONCE);
2850 if (in->is_dirty())
2851 in->mark_clean();
2852 in->clear_dirty_rstat();
2853 if (!in->has_subtree_root_dirfrag(mds->get_nodeid())) {
2854 in->clear_scatter_dirty();
2855 in->take_waiting(CInode::WAIT_ANY_MASK, fin->contexts);
2856 }
2857
2858 in->clear_dirty_parent();
2859
f91f0fd5
TL
2860 in->clear_clientwriteable();
2861 in->state_clear(CInode::STATE_NEEDSRECOVER);
2862
7c673cae
FG
2863 in->authlock.clear_gather();
2864 in->linklock.clear_gather();
2865 in->dirfragtreelock.clear_gather();
2866 in->filelock.clear_gather();
2867
2868 in->clear_file_locks();
2869
2870 // non-bounding dir?
9f95a23c
TL
2871 auto&& dfs = in->get_dirfrags();
2872 for (const auto& dir : dfs) {
2873 if (bounds.count(dir) == 0)
2874 q.push_back(dir);
2875 }
7c673cae
FG
2876 }
2877
f67539c2 2878 mdcache->touch_dentry_bottom(dn); // move dentry to tail of LRU
7c673cae
FG
2879 ++num_dentries;
2880 }
2881 }
2882
2883 dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
2884
2885 if (stat.state == IMPORT_ACKING) {
2886 // remove imported caps
2887 for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
28e407b8
AA
2888 p != stat.peer_exports.end();
2889 ++p) {
7c673cae
FG
2890 CInode *in = p->first;
2891 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
28e407b8
AA
2892 q != p->second.end();
2893 ++q) {
7c673cae 2894 Capability *cap = in->get_client_cap(q->first);
28e407b8 2895 if (!cap) {
11fdf7f2 2896 ceph_assert(!stat.session_map.count(q->first));
28e407b8
AA
2897 continue;
2898 }
7c673cae
FG
2899 if (cap->is_importing())
2900 in->remove_client_cap(q->first);
f91f0fd5
TL
2901 else
2902 cap->clear_clientwriteable();
7c673cae
FG
2903 }
2904 in->put(CInode::PIN_IMPORTINGCAPS);
2905 }
28e407b8
AA
2906 for (auto& p : stat.session_map) {
2907 Session *session = p.second.first;
7c673cae
FG
2908 session->dec_importing();
2909 }
2910 }
2911
2912 // log our failure
2913 mds->mdlog->start_submit_entry(new EImportFinish(dir, false)); // log failure
2914
f67539c2 2915 mdcache->trim(num_dentries); // try trimming dentries
7c673cae
FG
2916
2917 // notify bystanders; wait in aborting state
2918 import_notify_abort(dir, bounds);
2919}
2920
2921void Migrator::import_notify_finish(CDir *dir, set<CDir*>& bounds)
2922{
9f95a23c 2923 dout(7) << *dir << dendl;
7c673cae
FG
2924
2925 import_state_t& stat = import_state[dir->dirfrag()];
2926 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2927 p != stat.bystanders.end();
2928 ++p) {
9f95a23c 2929 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, false,
11fdf7f2
TL
2930 pair<int,int>(stat.peer, mds->get_nodeid()),
2931 pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
7c673cae
FG
2932 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2933 notify->get_bounds().push_back((*i)->dirfrag());
2934 mds->send_message_mds(notify, *p);
2935 }
2936}
2937
2938void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)
2939{
9f95a23c 2940 dout(7) << *dir << dendl;
7c673cae
FG
2941
2942 import_state_t& stat = import_state[dir->dirfrag()];
2943 for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
2944 p != stat.bystanders.end(); ) {
2945 if (mds->is_cluster_degraded() &&
2946 !mds->mdsmap->is_clientreplay_or_active_or_stopping(*p)) {
2947 // this can happen if both exporter and bystander fail in the same mdsmap epoch
2948 stat.bystanders.erase(p++);
2949 continue;
2950 }
9f95a23c 2951 auto notify = make_message<MExportDirNotify>(dir->dirfrag(), stat.tid, true,
11fdf7f2
TL
2952 mds_authority_t(stat.peer, mds->get_nodeid()),
2953 mds_authority_t(stat.peer, CDIR_AUTH_UNKNOWN));
7c673cae
FG
2954 for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
2955 notify->get_bounds().push_back((*i)->dirfrag());
2956 mds->send_message_mds(notify, *p);
2957 ++p;
2958 }
2959 if (stat.bystanders.empty()) {
2960 dout(7) << "no bystanders, finishing reverse now" << dendl;
2961 import_reverse_unfreeze(dir);
2962 } else {
20effc67 2963 ceph_assert(g_conf()->mds_kill_import_at != 10);
7c673cae
FG
2964 }
2965}
2966
2967void Migrator::import_reverse_unfreeze(CDir *dir)
2968{
9f95a23c 2969 dout(7) << *dir << dendl;
11fdf7f2 2970 ceph_assert(!dir->is_auth());
f67539c2 2971 mdcache->discard_delayed_expire(dir);
224ce89b
WB
2972 dir->unfreeze_tree();
2973 if (dir->is_subtree_root())
f67539c2 2974 mdcache->try_subtree_merge(dir);
7c673cae
FG
2975 import_reverse_final(dir);
2976}
2977
2978void Migrator::import_reverse_final(CDir *dir)
2979{
9f95a23c 2980 dout(7) << *dir << dendl;
7c673cae
FG
2981
2982 // clean up
2983 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
11fdf7f2 2984 ceph_assert(it != import_state.end());
7c673cae
FG
2985
2986 MutationRef mut = it->second.mut;
2987 import_state.erase(it);
2988
2989 // send pending import_maps?
f67539c2 2990 mdcache->maybe_send_pending_resolves();
7c673cae
FG
2991
2992 if (mut) {
2993 mds->locker->drop_locks(mut.get());
2994 mut->cleanup();
2995 }
2996
f67539c2 2997 mdcache->show_subtrees();
7c673cae
FG
2998 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
2999}
3000
3001
3002
3003
3004void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
28e407b8 3005 map<client_t,pair<Session*,uint64_t> >& imported_session_map)
7c673cae 3006{
9f95a23c
TL
3007 dout(7) << *dir << dendl;
3008
7c673cae
FG
3009 map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
3010 if (it == import_state.end() ||
3011 it->second.state != IMPORT_LOGGINGSTART) {
3012 dout(7) << "import " << df << " must have aborted" << dendl;
28e407b8 3013 mds->server->finish_force_open_sessions(imported_session_map);
7c673cae
FG
3014 return;
3015 }
3016
7c673cae
FG
3017 // note state
3018 it->second.state = IMPORT_ACKING;
3019
20effc67 3020 ceph_assert(g_conf()->mds_kill_import_at != 7);
7c673cae
FG
3021
3022 // force open client sessions and finish cap import
28e407b8 3023 mds->server->finish_force_open_sessions(imported_session_map, false);
7c673cae
FG
3024
3025 map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
3026 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3027 p != it->second.peer_exports.end();
3028 ++p) {
3029 // parameter 'peer' is NONE, delay sending cap import messages to client
28e407b8
AA
3030 finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map,
3031 p->second, imported_caps[p->first->ino()]);
7c673cae 3032 }
28e407b8
AA
3033
3034 it->second.session_map.swap(imported_session_map);
7c673cae
FG
3035
3036 // send notify's etc.
3037 dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
3038
3039 // test surviving observer of a failed migration that did not complete
3040 //assert(dir->replica_map.size() < 2 || mds->get_nodeid() != 0);
3041
9f95a23c 3042 auto ack = make_message<MExportDirAck>(dir->dirfrag(), it->second.tid);
11fdf7f2 3043 encode(imported_caps, ack->imported_caps);
7c673cae
FG
3044
3045 mds->send_message_mds(ack, from);
20effc67 3046 ceph_assert(g_conf()->mds_kill_import_at != 8);
7c673cae 3047
f67539c2 3048 mdcache->show_subtrees();
7c673cae
FG
3049}
3050
9f95a23c 3051void Migrator::handle_export_finish(const cref_t<MExportDirFinish> &m)
7c673cae 3052{
f67539c2 3053 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
11fdf7f2 3054 ceph_assert(dir);
9f95a23c 3055 dout(7) << *dir << (m->is_last() ? " last" : "") << dendl;
7c673cae
FG
3056
3057 map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
11fdf7f2
TL
3058 ceph_assert(it != import_state.end());
3059 ceph_assert(it->second.tid == m->get_tid());
7c673cae
FG
3060
3061 import_finish(dir, false, m->is_last());
7c673cae
FG
3062}
3063
3064void Migrator::import_finish(CDir *dir, bool notify, bool last)
3065{
9f95a23c 3066 dout(7) << *dir << dendl;
7c673cae
FG
3067
3068 map<dirfrag_t,import_state_t>::iterator it = import_state.find(dir->dirfrag());
11fdf7f2
TL
3069 ceph_assert(it != import_state.end());
3070 ceph_assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING);
7c673cae 3071
224ce89b 3072 if (it->second.state == IMPORT_ACKING) {
11fdf7f2 3073 ceph_assert(dir->is_auth());
f67539c2 3074 mdcache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
224ce89b
WB
3075 }
3076
7c673cae 3077 // log finish
11fdf7f2 3078 ceph_assert(g_conf()->mds_kill_import_at != 9);
7c673cae
FG
3079
3080 if (it->second.state == IMPORT_ACKING) {
3081 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
3082 p != it->second.peer_exports.end();
3083 ++p) {
3084 CInode *in = p->first;
11fdf7f2 3085 ceph_assert(in->is_auth());
7c673cae
FG
3086 for (map<client_t,Capability::Export>::iterator q = p->second.begin();
3087 q != p->second.end();
3088 ++q) {
28e407b8
AA
3089 auto r = it->second.session_map.find(q->first);
3090 if (r == it->second.session_map.end())
3091 continue;
3092
3093 Session *session = r->second.first;
7c673cae 3094 Capability *cap = in->get_client_cap(q->first);
11fdf7f2 3095 ceph_assert(cap);
7c673cae
FG
3096 cap->merge(q->second, true);
3097 cap->clear_importing();
f67539c2 3098 mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
7c673cae
FG
3099 q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
3100 }
3101 p->second.clear();
3102 in->replica_caps_wanted = 0;
3103 }
28e407b8
AA
3104 for (auto& p : it->second.session_map) {
3105 Session *session = p.second.first;
7c673cae
FG
3106 session->dec_importing();
3107 }
3108 }
3109
3110 if (!last) {
11fdf7f2 3111 ceph_assert(it->second.state == IMPORT_ACKING);
7c673cae
FG
3112 it->second.state = IMPORT_FINISHING;
3113 return;
3114 }
3115
3116 // remove pins
3117 set<CDir*> bounds;
f67539c2 3118 mdcache->get_subtree_bounds(dir, bounds);
7c673cae
FG
3119
3120 if (notify)
3121 import_notify_finish(dir, bounds);
3122
3123 import_remove_pins(dir, bounds);
3124
3125 map<CInode*, map<client_t,Capability::Export> > peer_exports;
3126 it->second.peer_exports.swap(peer_exports);
3127
3128 // clear import state (we're done!)
3129 MutationRef mut = it->second.mut;
3130 import_state.erase(it);
3131
7c673cae
FG
3132 mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
3133
7c673cae 3134 // process delayed expires
f67539c2 3135 mdcache->process_delayed_expire(dir);
7c673cae 3136
224ce89b 3137 // unfreeze tree, with possible subtree merge.
7c673cae 3138 dir->unfreeze_tree();
f67539c2 3139 mdcache->try_subtree_merge(dir);
224ce89b 3140
f67539c2 3141 mdcache->show_subtrees();
7c673cae
FG
3142 //audit(); // this fails, bc we munge up the subtree map during handle_import_map (resolve phase)
3143
3144 if (mut) {
3145 mds->locker->drop_locks(mut.get());
3146 mut->cleanup();
3147 }
3148
3149 // re-eval imported caps
3150 for (map<CInode*, map<client_t,Capability::Export> >::iterator p = peer_exports.begin();
3151 p != peer_exports.end();
3152 ++p) {
3153 if (p->first->is_auth())
3154 mds->locker->eval(p->first, CEPH_CAP_LOCKS, true);
3155 p->first->put(CInode::PIN_IMPORTINGCAPS);
3156 }
3157
3158 // send pending import_maps?
f67539c2 3159 mdcache->maybe_send_pending_resolves();
7c673cae
FG
3160
3161 // did i just import mydir?
3162 if (dir->ino() == MDS_INO_MDSDIR(mds->get_nodeid()))
f67539c2 3163 mdcache->populate_mydir();
7c673cae
FG
3164
3165 // is it empty?
3166 if (dir->get_num_head_items() == 0 &&
3167 !dir->inode->is_auth()) {
3168 // reexport!
3169 export_empty_import(dir);
3170 }
3171}
3172
11fdf7f2 3173void Migrator::decode_import_inode(CDentry *dn, bufferlist::const_iterator& blp,
7c673cae
FG
3174 mds_rank_t oldauth, LogSegment *ls,
3175 map<CInode*, map<client_t,Capability::Export> >& peer_exports,
3176 list<ScatterLock*>& updated_scatterlocks)
9f95a23c
TL
3177{
3178 CInode *in;
3179 bool added = false;
3180 DECODE_START(1, blp);
3181 dout(15) << " on " << *dn << dendl;
7c673cae
FG
3182
3183 inodeno_t ino;
3184 snapid_t last;
11fdf7f2
TL
3185 decode(ino, blp);
3186 decode(last, blp);
7c673cae 3187
f67539c2 3188 in = mdcache->get_inode(ino, last);
7c673cae 3189 if (!in) {
f67539c2 3190 in = new CInode(mds->mdcache, true, 2, last);
7c673cae
FG
3191 added = true;
3192 }
3193
3194 // state after link -- or not! -sage
3195 in->decode_import(blp, ls); // cap imports are noted for later action
3196
3197 // caps
3198 decode_import_inode_caps(in, true, blp, peer_exports);
3199
9f95a23c
TL
3200 DECODE_FINISH(blp);
3201
7c673cae
FG
3202 // link before state -- or not! -sage
3203 if (dn->get_linkage()->get_inode() != in) {
11fdf7f2 3204 ceph_assert(!dn->get_linkage()->get_inode());
7c673cae
FG
3205 dn->dir->link_primary_inode(dn, in);
3206 }
28e407b8
AA
3207
3208 if (in->is_dir())
3209 dn->dir->pop_lru_subdirs.push_back(&in->item_pop_lru);
7c673cae
FG
3210
3211 // add inode?
3212 if (added) {
f67539c2 3213 mdcache->add_inode(in);
7c673cae
FG
3214 dout(10) << "added " << *in << dendl;
3215 } else {
3216 dout(10) << " had " << *in << dendl;
3217 }
3218
f67539c2 3219 if (in->get_inode()->is_dirty_rstat())
7c673cae 3220 in->mark_dirty_rstat();
f91f0fd5 3221
f67539c2 3222 if (!in->get_inode()->client_ranges.empty())
f91f0fd5 3223 in->mark_clientwriteable();
7c673cae
FG
3224
3225 // clear if dirtyscattered, since we're going to journal this
3226 // but not until we _actually_ finish the import...
3227 if (in->filelock.is_dirty()) {
3228 updated_scatterlocks.push_back(&in->filelock);
3229 mds->locker->mark_updated_scatterlock(&in->filelock);
3230 }
3231
3232 if (in->dirfragtreelock.is_dirty()) {
3233 updated_scatterlocks.push_back(&in->dirfragtreelock);
3234 mds->locker->mark_updated_scatterlock(&in->dirfragtreelock);
3235 }
3236
3237 // adjust replica list
3238 //assert(!in->is_replica(oldauth)); // not true on failed export
3239 in->add_replica(oldauth, CInode::EXPORT_NONCE);
3240 if (in->is_replica(mds->get_nodeid()))
3241 in->remove_replica(mds->get_nodeid());
11fdf7f2
TL
3242
3243 if (in->snaplock.is_stable() &&
3244 in->snaplock.get_state() != LOCK_SYNC)
3245 mds->locker->try_eval(&in->snaplock, NULL);
9f95a23c
TL
3246
3247 if (in->policylock.is_stable() &&
3248 in->policylock.get_state() != LOCK_SYNC)
3249 mds->locker->try_eval(&in->policylock, NULL);
7c673cae
FG
3250}
3251
3252void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
11fdf7f2 3253 bufferlist::const_iterator &blp,
7c673cae
FG
3254 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
3255{
9f95a23c 3256 DECODE_START(1, blp);
7c673cae 3257 map<client_t,Capability::Export> cap_map;
11fdf7f2
TL
3258 decode(cap_map, blp);
3259 if (auth_cap) {
3260 mempool::mds_co::compact_map<int32_t,int32_t> mds_wanted;
3261 decode(mds_wanted, blp);
3262 mds_wanted.erase(mds->get_nodeid());
3263 in->set_mds_caps_wanted(mds_wanted);
3264 }
7c673cae 3265 if (!cap_map.empty() ||
b32b8144 3266 (auth_cap && (in->get_caps_wanted() & ~CEPH_CAP_PIN))) {
7c673cae
FG
3267 peer_exports[in].swap(cap_map);
3268 in->get(CInode::PIN_IMPORTINGCAPS);
3269 }
9f95a23c 3270 DECODE_FINISH(blp);
7c673cae
FG
3271}
3272
3273void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
28e407b8
AA
3274 const map<client_t,pair<Session*,uint64_t> >& session_map,
3275 const map<client_t,Capability::Export> &export_map,
7c673cae
FG
3276 map<client_t,Capability::Import> &import_map)
3277{
f91f0fd5
TL
3278 const auto& client_ranges = in->get_projected_inode()->client_ranges;
3279 auto r = client_ranges.cbegin();
3280 bool needs_recover = false;
3281
28e407b8 3282 for (auto& it : export_map) {
9f95a23c 3283 dout(10) << "for client." << it.first << " on " << *in << dendl;
28e407b8
AA
3284
3285 auto p = session_map.find(it.first);
3286 if (p == session_map.end()) {
3287 dout(10) << " no session for client." << it.first << dendl;
3288 (void)import_map[it.first];
3289 continue;
3290 }
7c673cae 3291
28e407b8
AA
3292 Session *session = p->second.first;
3293
3294 Capability *cap = in->get_client_cap(it.first);
7c673cae 3295 if (!cap) {
28e407b8 3296 cap = in->add_client_cap(it.first, session);
7c673cae
FG
3297 if (peer < 0)
3298 cap->mark_importing();
3299 }
3300
f91f0fd5
TL
3301 if (auth_cap) {
3302 while (r != client_ranges.cend() && r->first < it.first) {
3303 needs_recover = true;
3304 ++r;
3305 }
3306 if (r != client_ranges.cend() && r->first == it.first) {
3307 cap->mark_clientwriteable();
3308 ++r;
3309 }
3310 }
3311
1adf2230
AA
3312 // Always ask exporter mds to send cap export messages for auth caps.
3313 // For non-auth caps, ask exporter mds to send cap export messages to
3314 // clients who haven't opened sessions. The cap export messages will
3315 // make clients open sessions.
11fdf7f2 3316 if (auth_cap || !session->get_connection()) {
1adf2230
AA
3317 Capability::Import& im = import_map[it.first];
3318 im.cap_id = cap->get_cap_id();
3319 im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
3320 im.issue_seq = cap->get_last_seq() + 1;
3321 }
7c673cae
FG
3322
3323 if (peer >= 0) {
28e407b8 3324 cap->merge(it.second, auth_cap);
f67539c2 3325 mdcache->do_cap_import(session, in, cap, it.second.cap_id,
28e407b8 3326 it.second.seq, it.second.mseq - 1, peer,
7c673cae
FG
3327 auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
3328 }
3329 }
3330
f91f0fd5
TL
3331 if (auth_cap) {
3332 if (r != client_ranges.cend())
3333 needs_recover = true;
3334 if (needs_recover)
3335 in->state_set(CInode::STATE_NEEDSRECOVER);
3336 }
3337
7c673cae
FG
3338 if (peer >= 0) {
3339 in->replica_caps_wanted = 0;
3340 in->put(CInode::PIN_IMPORTINGCAPS);
3341 }
3342}
3343
9f95a23c 3344void Migrator::decode_import_dir(bufferlist::const_iterator& blp,
7c673cae
FG
3345 mds_rank_t oldauth,
3346 CDir *import_root,
3347 EImportStart *le,
3348 LogSegment *ls,
3349 map<CInode*,map<client_t,Capability::Export> >& peer_exports,
9f95a23c 3350 list<ScatterLock*>& updated_scatterlocks, int &num_imported)
7c673cae 3351{
9f95a23c 3352 DECODE_START(1, blp);
7c673cae
FG
3353 // set up dir
3354 dirfrag_t df;
11fdf7f2 3355 decode(df, blp);
7c673cae 3356
f67539c2 3357 CInode *diri = mdcache->get_inode(df.ino);
11fdf7f2 3358 ceph_assert(diri);
7c673cae 3359 CDir *dir = diri->get_or_open_dirfrag(mds->mdcache, df.frag);
11fdf7f2 3360 ceph_assert(dir);
7c673cae 3361
9f95a23c 3362 dout(7) << *dir << dendl;
7c673cae 3363
11fdf7f2
TL
3364 if (!dir->freeze_tree_state) {
3365 ceph_assert(dir->get_version() == 0);
3366 dir->freeze_tree_state = import_root->freeze_tree_state;
3367 }
3368
7c673cae 3369 // assimilate state
11fdf7f2 3370 dir->decode_import(blp, ls);
7c673cae
FG
3371
3372 // adjust replica list
3373 //assert(!dir->is_replica(oldauth)); // not true on failed export
3374 dir->add_replica(oldauth, CDir::EXPORT_NONCE);
3375 if (dir->is_replica(mds->get_nodeid()))
3376 dir->remove_replica(mds->get_nodeid());
3377
3378 // add to journal entry
3379 if (le)
3380 le->metablob.add_import_dir(dir);
3381
3382 int num_imported = 0;
3383
3384 // take all waiters on this dir
3385 // NOTE: a pass of imported data is guaranteed to get all of my waiters because
3386 // a replica's presense in my cache implies/forces it's presense in authority's.
11fdf7f2 3387 MDSContext::vec waiters;
7c673cae 3388 dir->take_waiting(CDir::WAIT_ANY_MASK, waiters);
11fdf7f2
TL
3389 for (auto c : waiters)
3390 dir->add_waiter(CDir::WAIT_UNFREEZE, c); // UNFREEZE will get kicked both on success or failure
7c673cae
FG
3391
3392 dout(15) << "doing contents" << dendl;
3393
3394 // contents
3395 __u32 nden;
11fdf7f2 3396 decode(nden, blp);
7c673cae
FG
3397
3398 for (; nden>0; nden--) {
3399 num_imported++;
3400
3401 // dentry
3402 string dname;
3403 snapid_t last;
11fdf7f2
TL
3404 decode(dname, blp);
3405 decode(last, blp);
7c673cae
FG
3406
3407 CDentry *dn = dir->lookup_exact_snap(dname, last);
3408 if (!dn)
3409 dn = dir->add_null_dentry(dname, 1, last);
3410
3411 dn->decode_import(blp, ls);
3412
3413 dn->add_replica(oldauth, CDentry::EXPORT_NONCE);
3414 if (dn->is_replica(mds->get_nodeid()))
3415 dn->remove_replica(mds->get_nodeid());
3416
3417 // dentry lock in unreadable state can block path traverse
3418 if (dn->lock.get_state() != LOCK_SYNC)
3419 mds->locker->try_eval(&dn->lock, NULL);
3420
9f95a23c 3421 dout(15) << " got " << *dn << dendl;
7c673cae
FG
3422
3423 // points to...
3424 char icode;
11fdf7f2 3425 decode(icode, blp);
7c673cae
FG
3426
3427 if (icode == 'N') {
3428 // null dentry
11fdf7f2 3429 ceph_assert(dn->get_linkage()->is_null());
7c673cae
FG
3430
3431 // fall thru
3432 }
f67539c2 3433 else if (icode == 'L' || icode == 'l') {
7c673cae
FG
3434 // remote link
3435 inodeno_t ino;
3436 unsigned char d_type;
f67539c2
TL
3437 mempool::mds_co::string alternate_name;
3438
3439 CDentry::decode_remote(icode, ino, d_type, alternate_name, blp);
3440
7c673cae 3441 if (dn->get_linkage()->is_remote()) {
11fdf7f2 3442 ceph_assert(dn->get_linkage()->get_remote_ino() == ino);
f67539c2 3443 ceph_assert(dn->get_alternate_name() == alternate_name);
7c673cae
FG
3444 } else {
3445 dir->link_remote_inode(dn, ino, d_type);
f67539c2 3446 dn->set_alternate_name(std::move(alternate_name));
7c673cae
FG
3447 }
3448 }
f67539c2 3449 else if (icode == 'I' || icode == 'i') {
7c673cae 3450 // inode
11fdf7f2 3451 ceph_assert(le);
f67539c2
TL
3452 if (icode == 'i') {
3453 DECODE_START(2, blp);
3454 decode_import_inode(dn, blp, oldauth, ls,
3455 peer_exports, updated_scatterlocks);
3456 ceph_assert(!dn->is_projected());
3457 decode(dn->alternate_name, blp);
3458 DECODE_FINISH(blp);
3459 } else {
3460 decode_import_inode(dn, blp, oldauth, ls,
3461 peer_exports, updated_scatterlocks);
3462 }
7c673cae
FG
3463 }
3464
3465 // add dentry to journal entry
3466 if (le)
3467 le->metablob.add_import_dentry(dn);
3468 }
3469
3470#ifdef MDS_VERIFY_FRAGSTAT
3471 if (dir->is_complete())
3472 dir->verify_fragstat();
3473#endif
3474
3475 dir->inode->maybe_export_pin();
3476
9f95a23c
TL
3477 dout(7) << " done " << *dir << dendl;
3478 DECODE_FINISH(blp);
7c673cae
FG
3479}
3480
3481
3482
3483
3484
3485// authority bystander
3486
9f95a23c 3487void Migrator::handle_export_notify(const cref_t<MExportDirNotify> &m)
7c673cae
FG
3488{
3489 if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) {
7c673cae
FG
3490 return;
3491 }
3492
f67539c2 3493 CDir *dir = mdcache->get_dirfrag(m->get_dirfrag());
7c673cae
FG
3494
3495 mds_rank_t from = mds_rank_t(m->get_source().num());
3496 mds_authority_t old_auth = m->get_old_auth();
3497 mds_authority_t new_auth = m->get_new_auth();
3498
3499 if (!dir) {
9f95a23c 3500 dout(7) << old_auth << " -> " << new_auth
7c673cae
FG
3501 << " on missing dir " << m->get_dirfrag() << dendl;
3502 } else if (dir->authority() != old_auth) {
9f95a23c 3503 dout(7) << "old_auth was " << dir->authority()
7c673cae
FG
3504 << " != " << old_auth << " -> " << new_auth
3505 << " on " << *dir << dendl;
3506 } else {
9f95a23c 3507 dout(7) << old_auth << " -> " << new_auth
7c673cae
FG
3508 << " on " << *dir << dendl;
3509 // adjust auth
3510 set<CDir*> have;
f67539c2
TL
3511 mdcache->map_dirfrag_set(m->get_bounds(), have);
3512 mdcache->adjust_bounded_subtree_auth(dir, have, new_auth);
7c673cae
FG
3513
3514 // induce a merge?
f67539c2 3515 mdcache->try_subtree_merge(dir);
7c673cae
FG
3516 }
3517
3518 // send ack
3519 if (m->wants_ack()) {
9f95a23c 3520 mds->send_message_mds(make_message<MExportDirNotifyAck>(m->get_dirfrag(), m->get_tid(), m->get_new_auth()), from);
7c673cae
FG
3521 } else {
3522 // aborted. no ack.
9f95a23c 3523 dout(7) << "no ack requested" << dendl;
7c673cae 3524 }
7c673cae
FG
3525}
3526
3527/** cap exports **/
3528void Migrator::export_caps(CInode *in)
3529{
3530 mds_rank_t dest = in->authority().first;
9f95a23c 3531 dout(7) << "to mds." << dest << " " << *in << dendl;
7c673cae 3532
11fdf7f2
TL
3533 ceph_assert(in->is_any_caps());
3534 ceph_assert(!in->is_auth());
3535 ceph_assert(!in->is_ambiguous_auth());
3536 ceph_assert(!in->state_test(CInode::STATE_EXPORTINGCAPS));
7c673cae 3537
9f95a23c 3538 auto ex = make_message<MExportCaps>();
7c673cae
FG
3539 ex->ino = in->ino();
3540
11fdf7f2 3541 encode_export_inode_caps(in, false, ex->cap_bl, ex->client_map, ex->client_metadata_map);
7c673cae
FG
3542
3543 mds->send_message_mds(ex, dest);
3544}
3545
9f95a23c 3546void Migrator::handle_export_caps_ack(const cref_t<MExportCapsAck> &ack)
1adf2230
AA
3547{
3548 mds_rank_t from = ack->get_source().num();
f67539c2 3549 CInode *in = mdcache->get_inode(ack->ino);
1adf2230 3550 if (in) {
11fdf7f2 3551 ceph_assert(!in->is_auth());
1adf2230 3552
9f95a23c 3553 dout(10) << *ack << " from "
1adf2230
AA
3554 << ack->get_source() << " on " << *in << dendl;
3555
3556 map<client_t,Capability::Import> imported_caps;
3557 map<client_t,uint64_t> caps_ids;
11fdf7f2
TL
3558 auto blp = ack->cap_bl.cbegin();
3559 decode(imported_caps, blp);
3560 decode(caps_ids, blp);
1adf2230
AA
3561
3562 for (auto& it : imported_caps) {
3563 Capability *cap = in->get_client_cap(it.first);
3564 if (!cap || cap->get_cap_id() != caps_ids.at(it.first))
3565 continue;
3566
9f95a23c 3567 dout(7) << " telling client." << it.first
1adf2230 3568 << " exported caps on " << *in << dendl;
9f95a23c 3569 auto m = make_message<MClientCaps>(CEPH_CAP_OP_EXPORT, in->ino(), 0,
1adf2230
AA
3570 cap->get_cap_id(), cap->get_mseq(),
3571 mds->get_osd_epoch_barrier());
3572 m->set_cap_peer(it.second.cap_id, it.second.issue_seq, it.second.mseq, from, 0);
3573 mds->send_message_client_counted(m, it.first);
3574
3575 in->remove_client_cap(it.first);
3576 }
3577
3578 mds->locker->request_inode_file_caps(in);
3579 mds->locker->try_eval(in, CEPH_CAP_LOCKS);
3580 }
1adf2230
AA
3581}
3582
9f95a23c 3583void Migrator::handle_gather_caps(const cref_t<MGatherCaps> &m)
7c673cae 3584{
f67539c2 3585 CInode *in = mdcache->get_inode(m->ino);
7c673cae 3586 if (!in)
11fdf7f2 3587 return;
7c673cae 3588
9f95a23c 3589 dout(10) << *m << " from " << m->get_source()
1adf2230
AA
3590 << " on " << *in << dendl;
3591
7c673cae
FG
3592 if (in->is_any_caps() &&
3593 !in->is_auth() &&
3594 !in->is_ambiguous_auth() &&
3595 !in->state_test(CInode::STATE_EXPORTINGCAPS))
3596 export_caps(in);
7c673cae
FG
3597}
3598
3599class C_M_LoggedImportCaps : public MigratorLogContext {
3600 CInode *in;
3601 mds_rank_t from;
3602public:
28e407b8 3603 map<client_t,pair<Session*,uint64_t> > imported_session_map;
7c673cae 3604 map<CInode*, map<client_t,Capability::Export> > peer_exports;
7c673cae
FG
3605
3606 C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
3607 void finish(int r) override {
28e407b8 3608 mig->logged_import_caps(in, from, imported_session_map, peer_exports);
7c673cae
FG
3609 }
3610};
3611
9f95a23c 3612void Migrator::handle_export_caps(const cref_t<MExportCaps> &ex)
7c673cae 3613{
9f95a23c 3614 dout(10) << *ex << " from " << ex->get_source() << dendl;
f67539c2 3615 CInode *in = mdcache->get_inode(ex->ino);
7c673cae 3616
11fdf7f2
TL
3617 ceph_assert(in);
3618 ceph_assert(in->is_auth());
7c673cae
FG
3619
3620 // FIXME
28e407b8 3621 if (!in->can_auth_pin()) {
7c673cae 3622 return;
28e407b8
AA
3623 }
3624
181888fb 3625 in->auth_pin(this);
7c673cae 3626
11fdf7f2
TL
3627 map<client_t,entity_inst_t> client_map{ex->client_map};
3628 map<client_t,client_metadata_t> client_metadata_map{ex->client_metadata_map};
28e407b8 3629
7c673cae
FG
3630 C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
3631 this, in, mds_rank_t(ex->get_source().num()));
7c673cae 3632
11fdf7f2 3633 version_t pv = mds->server->prepare_force_open_sessions(client_map, client_metadata_map,
28e407b8 3634 finish->imported_session_map);
7c673cae 3635 // decode new caps
11fdf7f2 3636 auto blp = ex->cap_bl.cbegin();
7c673cae 3637 decode_import_inode_caps(in, false, blp, finish->peer_exports);
11fdf7f2 3638 ceph_assert(!finish->peer_exports.empty()); // thus, inode is pinned.
7c673cae
FG
3639
3640 // journal open client sessions
11fdf7f2
TL
3641 ESessions *le = new ESessions(pv, std::move(client_map),
3642 std::move(client_metadata_map));
7c673cae
FG
3643 mds->mdlog->start_submit_entry(le, finish);
3644 mds->mdlog->flush();
7c673cae
FG
3645}
3646
3647
3648void Migrator::logged_import_caps(CInode *in,
3649 mds_rank_t from,
28e407b8
AA
3650 map<client_t,pair<Session*,uint64_t> >& imported_session_map,
3651 map<CInode*, map<client_t,Capability::Export> >& peer_exports)
7c673cae 3652{
9f95a23c 3653 dout(10) << *in << dendl;
7c673cae 3654 // see export_go() vs export_go_synced()
11fdf7f2 3655 ceph_assert(in->is_auth());
7c673cae
FG
3656
3657 // force open client sessions and finish cap import
28e407b8 3658 mds->server->finish_force_open_sessions(imported_session_map);
7c673cae 3659
28e407b8 3660 auto it = peer_exports.find(in);
11fdf7f2 3661 ceph_assert(it != peer_exports.end());
28e407b8 3662
7c673cae 3663 // clients will release caps from the exporter when they receive the cap import message.
1adf2230 3664 map<client_t,Capability::Import> imported_caps;
28e407b8 3665 finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
7c673cae 3666 mds->locker->eval(in, CEPH_CAP_LOCKS, true);
1adf2230
AA
3667
3668 if (!imported_caps.empty()) {
9f95a23c 3669 auto ack = make_message<MExportCapsAck>(in->ino());
1adf2230
AA
3670 map<client_t,uint64_t> peer_caps_ids;
3671 for (auto &p : imported_caps )
3672 peer_caps_ids[p.first] = it->second.at(p.first).cap_id;
3673
11fdf7f2
TL
3674 encode(imported_caps, ack->cap_bl);
3675 encode(peer_caps_ids, ack->cap_bl);
1adf2230
AA
3676 mds->send_message_mds(ack, from);
3677 }
3678
181888fb 3679 in->auth_unpin(this);
7c673cae 3680}
28e407b8 3681
f67539c2 3682Migrator::Migrator(MDSRank *m, MDCache *c) : mds(m), mdcache(c) {
11fdf7f2
TL
3683 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
3684 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
91327a77
AA
3685}
3686
92f5a8d4 3687void Migrator::handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map)
28e407b8 3688{
91327a77 3689 if (changed.count("mds_max_export_size"))
11fdf7f2 3690 max_export_size = g_conf().get_val<Option::size_t>("mds_max_export_size");
28e407b8 3691 if (changed.count("mds_inject_migrator_session_race")) {
92f5a8d4 3692 inject_session_race = g_conf().get_val<bool>("mds_inject_migrator_session_race");
28e407b8
AA
3693 dout(0) << "mds_inject_migrator_session_race is " << inject_session_race << dendl;
3694 }
3695}