]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/StrayManager.cc
aa4c95e79e1c30d969646b9b3965b76ddf461b54
[ceph.git] / ceph / src / mds / StrayManager.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "common/perf_counters.h"
17
18 #include "mds/MDSRank.h"
19 #include "mds/MDCache.h"
20 #include "mds/MDLog.h"
21 #include "mds/CDir.h"
22 #include "mds/CDentry.h"
23 #include "events/EUpdate.h"
24 #include "messages/MClientRequest.h"
25
26 #include "StrayManager.h"
27
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_mds
30 #undef dout_prefix
31 #define dout_prefix _prefix(_dout, mds)
32
33 using namespace std;
34
35 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
36 return *_dout << "mds." << mds->get_nodeid() << ".cache.strays ";
37 }
38
39 class StrayManagerIOContext : public virtual MDSIOContextBase {
40 protected:
41 StrayManager *sm;
42 MDSRank *get_mds() override
43 {
44 return sm->mds;
45 }
46 public:
47 explicit StrayManagerIOContext(StrayManager *sm_) : sm(sm_) {}
48 };
49
50 class StrayManagerLogContext : public virtual MDSLogContextBase {
51 protected:
52 StrayManager *sm;
53 MDSRank *get_mds() override
54 {
55 return sm->mds;
56 }
57 public:
58 explicit StrayManagerLogContext(StrayManager *sm_) : sm(sm_) {}
59 };
60
61 class StrayManagerContext : public virtual MDSContext {
62 protected:
63 StrayManager *sm;
64 MDSRank *get_mds() override
65 {
66 return sm->mds;
67 }
68 public:
69 explicit StrayManagerContext(StrayManager *sm_) : sm(sm_) {}
70 };
71
72
73 /**
74 * Context wrapper for _purge_stray_purged completion
75 */
76 class C_IO_PurgeStrayPurged : public StrayManagerIOContext {
77 CDentry *dn;
78 bool only_head;
79 public:
80 C_IO_PurgeStrayPurged(StrayManager *sm_, CDentry *d, bool oh) :
81 StrayManagerIOContext(sm_), dn(d), only_head(oh) { }
82 void finish(int r) override {
83 ceph_assert(r == 0 || r == -CEPHFS_ENOENT);
84 sm->_purge_stray_purged(dn, only_head);
85 }
86 void print(ostream& out) const override {
87 CInode *in = dn->get_projected_linkage()->get_inode();
88 out << "purge_stray(" << in->ino() << ")";
89 }
90 };
91
92
93 void StrayManager::purge(CDentry *dn)
94 {
95 CDentry::linkage_t *dnl = dn->get_projected_linkage();
96 CInode *in = dnl->get_inode();
97 dout(10) << __func__ << " " << *dn << " " << *in << dendl;
98 ceph_assert(!dn->is_replicated());
99
100 // CHEAT. there's no real need to journal our intent to purge, since
101 // that is implicit in the dentry's presence and non-use in the stray
102 // dir. on recovery, we'll need to re-eval all strays anyway.
103
104 SnapContext nullsnapc;
105
106 PurgeItem item;
107 item.ino = in->ino();
108 item.stamp = ceph_clock_now();
109 if (in->is_dir()) {
110 item.action = PurgeItem::PURGE_DIR;
111 item.fragtree = in->dirfragtree;
112 } else {
113 item.action = PurgeItem::PURGE_FILE;
114
115 const SnapContext *snapc;
116 SnapRealm *realm = in->find_snaprealm();
117 if (realm) {
118 dout(10) << " realm " << *realm << dendl;
119 snapc = &realm->get_snap_context();
120 } else {
121 dout(10) << " NO realm, using null context" << dendl;
122 snapc = &nullsnapc;
123 ceph_assert(in->last == CEPH_NOSNAP);
124 }
125
126 const auto& pi = in->get_projected_inode();
127
128 uint64_t to = 0;
129 if (in->is_file()) {
130 to = std::max(pi->size, pi->get_max_size());
131 // when truncating a file, the filer does not delete stripe objects that are
132 // truncated to zero. so we need to purge stripe objects up to the max size
133 // the file has ever been.
134 to = std::max(pi->max_size_ever, to);
135 }
136
137 item.size = to;
138 item.layout = pi->layout;
139 item.old_pools.reserve(pi->old_pools.size());
140 for (const auto &p : pi->old_pools) {
141 if (p != pi->layout.pool_id)
142 item.old_pools.push_back(p);
143 }
144 item.snapc = *snapc;
145 }
146
147 purge_queue.push(item, new C_IO_PurgeStrayPurged(
148 this, dn, false));
149 }
150
151 class C_PurgeStrayLogged : public StrayManagerLogContext {
152 CDentry *dn;
153 version_t pdv;
154 MutationRef mut;
155 public:
156 C_PurgeStrayLogged(StrayManager *sm_, CDentry *d, version_t v, MutationRef& m) :
157 StrayManagerLogContext(sm_), dn(d), pdv(v), mut(m) { }
158 void finish(int r) override {
159 sm->_purge_stray_logged(dn, pdv, mut);
160 }
161 };
162
163 class C_TruncateStrayLogged : public StrayManagerLogContext {
164 CDentry *dn;
165 MutationRef mut;
166 public:
167 C_TruncateStrayLogged(StrayManager *sm, CDentry *d, MutationRef& m) :
168 StrayManagerLogContext(sm), dn(d), mut(m) {}
169 void finish(int r) override {
170 sm->_truncate_stray_logged(dn, mut);
171 }
172 };
173
174 void StrayManager::_purge_stray_purged(
175 CDentry *dn, bool only_head)
176 {
177 CInode *in = dn->get_projected_linkage()->get_inode();
178 dout(10) << "_purge_stray_purged " << *dn << " " << *in << dendl;
179
180 logger->inc(l_mdc_strays_enqueued);
181 num_strays_enqueuing--;
182 logger->set(l_mdc_num_strays_enqueuing, num_strays_enqueuing);
183
184 if (only_head) {
185 /* This was a ::truncate */
186 MutationRef mut(new MutationImpl());
187 mut->ls = mds->mdlog->get_current_segment();
188
189 auto pi = in->project_inode(mut);
190 pi.inode->size = 0;
191 pi.inode->max_size_ever = 0;
192 pi.inode->client_ranges.clear();
193 pi.inode->truncate_size = 0;
194 pi.inode->truncate_from = 0;
195 pi.inode->version = in->pre_dirty();
196 pi.inode->client_ranges.clear();
197 in->clear_clientwriteable();
198
199 CDir *dir = dn->get_dir();
200 auto pf = dir->project_fnode(mut);
201 pf->version = dir->pre_dirty();
202
203 EUpdate *le = new EUpdate(mds->mdlog, "purge_stray truncate");
204 mds->mdlog->start_entry(le);
205
206 le->metablob.add_dir_context(dir);
207 auto& dl = le->metablob.add_dir(dn->dir, true);
208 le->metablob.add_primary_dentry(dl, dn, in, EMetaBlob::fullbit::STATE_DIRTY);
209
210 mds->mdlog->submit_entry(le, new C_TruncateStrayLogged(this, dn, mut));
211 } else {
212 if (in->get_num_ref() != (int)in->is_dirty() ||
213 dn->get_num_ref() !=
214 (int)dn->is_dirty() +
215 !!dn->state_test(CDentry::STATE_FRAGMENTING) +
216 !!in->get_num_ref() + 1 /* PIN_PURGING */) {
217 // Nobody should be taking new references to an inode when it
218 // is being purged (aside from it were
219
220 derr << "Rogue reference after purge to " << *dn << dendl;
221 ceph_abort_msg("rogue reference to purging inode");
222 }
223
224 MutationRef mut(new MutationImpl());
225 mut->ls = mds->mdlog->get_current_segment();
226
227 // kill dentry.
228 version_t pdv = dn->pre_dirty();
229 dn->push_projected_linkage(); // NULL
230
231 EUpdate *le = new EUpdate(mds->mdlog, "purge_stray");
232 mds->mdlog->start_entry(le);
233
234 // update dirfrag fragstat, rstat
235 CDir *dir = dn->get_dir();
236 auto pf = dir->project_fnode(mut);
237 pf->version = dir->pre_dirty();
238 if (in->is_dir())
239 pf->fragstat.nsubdirs--;
240 else
241 pf->fragstat.nfiles--;
242 pf->rstat.sub(in->get_inode()->accounted_rstat);
243
244 le->metablob.add_dir_context(dn->dir);
245 auto& dl = le->metablob.add_dir(dn->dir, true);
246 le->metablob.add_null_dentry(dl, dn, true);
247 le->metablob.add_destroyed_inode(in->ino());
248
249 mds->mdlog->submit_entry(le, new C_PurgeStrayLogged(this, dn, pdv, mut));
250 }
251 }
252
253 void StrayManager::_purge_stray_logged(CDentry *dn, version_t pdv, MutationRef& mut)
254 {
255 CInode *in = dn->get_linkage()->get_inode();
256 CDir *dir = dn->get_dir();
257 dout(10) << "_purge_stray_logged " << *dn << " " << *in << dendl;
258
259 ceph_assert(!in->state_test(CInode::STATE_RECOVERING));
260 ceph_assert(!dir->is_frozen_dir());
261
262 bool new_dn = dn->is_new();
263
264 // unlink
265 ceph_assert(dn->get_projected_linkage()->is_null());
266 dir->unlink_inode(dn, !new_dn);
267 dn->pop_projected_linkage();
268 dn->mark_dirty(pdv, mut->ls);
269
270 mut->apply();
271
272 in->state_clear(CInode::STATE_ORPHAN);
273 dn->state_clear(CDentry::STATE_PURGING | CDentry::STATE_PURGINGPINNED);
274 dn->put(CDentry::PIN_PURGING);
275
276
277 // drop dentry?
278 if (new_dn) {
279 dout(20) << " dn is new, removing" << dendl;
280 dn->mark_clean();
281 dir->remove_dentry(dn);
282 }
283
284 // Once we are here normally the waiter list are mostly empty
285 // but in corner case that the clients pass a invalidate ino,
286 // which maybe under unlinking, the link caller will add the
287 // request to the waiter list. We need try to wake them up
288 // anyway.
289 MDSContext::vec finished;
290 in->take_waiting(CInode::WAIT_UNLINK, finished);
291 if (!finished.empty()) {
292 mds->queue_waiters(finished);
293 }
294
295 // drop inode
296 inodeno_t ino = in->ino();
297 if (in->is_dirty())
298 in->mark_clean();
299 mds->mdcache->remove_inode(in);
300
301 dir->auth_unpin(this);
302
303 if (mds->is_stopping())
304 mds->mdcache->shutdown_export_stray_finish(ino);
305 }
306
307 void StrayManager::enqueue(CDentry *dn, bool trunc)
308 {
309 CDentry::linkage_t *dnl = dn->get_projected_linkage();
310 ceph_assert(dnl);
311 CInode *in = dnl->get_inode();
312 ceph_assert(in);
313
314 /* We consider a stray to be purging as soon as it is enqueued, to avoid
315 * enqueing it twice */
316 dn->state_set(CDentry::STATE_PURGING);
317 in->state_set(CInode::STATE_PURGING);
318
319 /* We must clear this as soon as enqueuing it, to prevent the journal
320 * expiry code from seeing a dirty parent and trying to write a backtrace */
321 if (!trunc) {
322 if (in->is_dirty_parent()) {
323 in->clear_dirty_parent();
324 }
325 }
326
327 dout(20) << __func__ << ": purging dn: " << *dn << dendl;
328
329 if (!dn->state_test(CDentry::STATE_PURGINGPINNED)) {
330 dn->get(CDentry::PIN_PURGING);
331 dn->state_set(CDentry::STATE_PURGINGPINNED);
332 }
333
334 ++num_strays_enqueuing;
335 logger->set(l_mdc_num_strays_enqueuing, num_strays_enqueuing);
336
337 // Resources are available, acquire them and execute the purge
338 _enqueue(dn, trunc);
339
340 dout(10) << __func__ << ": purging this dentry immediately: "
341 << *dn << dendl;
342 }
343
344 class C_RetryEnqueue : public StrayManagerContext {
345 CDentry *dn;
346 bool trunc;
347 public:
348 C_RetryEnqueue(StrayManager *sm_, CDentry *dn_, bool t) :
349 StrayManagerContext(sm_), dn(dn_), trunc(t) { }
350 void finish(int r) override {
351 sm->_enqueue(dn, trunc);
352 }
353 };
354
355 void StrayManager::_enqueue(CDentry *dn, bool trunc)
356 {
357 ceph_assert(started);
358
359 CDir *dir = dn->get_dir();
360 if (!dir->can_auth_pin()) {
361 dout(10) << " can't auth_pin (freezing?) " << *dir << ", waiting" << dendl;
362 dir->add_waiter(CDir::WAIT_UNFREEZE, new C_RetryEnqueue(this, dn, trunc));
363 return;
364 }
365
366 dn->get_dir()->auth_pin(this);
367 if (trunc) {
368 truncate(dn);
369 } else {
370 purge(dn);
371 }
372 }
373
374 void StrayManager::queue_delayed(CDentry *dn)
375 {
376 if (!started)
377 return;
378
379 if (dn->state_test(CDentry::STATE_EVALUATINGSTRAY))
380 return;
381
382 if (!dn->item_stray.is_on_list()) {
383 delayed_eval_stray.push_back(&dn->item_stray);
384 num_strays_delayed++;
385 logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
386 }
387 }
388
389 void StrayManager::advance_delayed()
390 {
391 if (!started)
392 return;
393
394 while (!delayed_eval_stray.empty()) {
395 CDentry *dn = delayed_eval_stray.front();
396 dn->item_stray.remove_myself();
397 num_strays_delayed--;
398
399 if (dn->get_projected_linkage()->is_null()) {
400 /* A special case: a stray dentry can go null if its inode is being
401 * re-linked into another MDS's stray dir during a shutdown migration. */
402 dout(4) << __func__ << ": delayed dentry is now null: " << *dn << dendl;
403 continue;
404 }
405
406 eval_stray(dn);
407 }
408 logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
409 }
410
411 void StrayManager::set_num_strays(uint64_t num)
412 {
413 ceph_assert(!started);
414 num_strays = num;
415 logger->set(l_mdc_num_strays, num_strays);
416 }
417
418 void StrayManager::notify_stray_created()
419 {
420 num_strays++;
421 logger->set(l_mdc_num_strays, num_strays);
422 logger->inc(l_mdc_strays_created);
423 }
424
425 void StrayManager::notify_stray_removed()
426 {
427 num_strays--;
428 logger->set(l_mdc_num_strays, num_strays);
429 }
430
431 struct C_EvalStray : public StrayManagerContext {
432 CDentry *dn;
433 C_EvalStray(StrayManager *sm_, CDentry *d) : StrayManagerContext(sm_), dn(d) {}
434 void finish(int r) override {
435 sm->eval_stray(dn);
436 }
437 };
438
439 struct C_MDC_EvalStray : public StrayManagerContext {
440 CDentry *dn;
441 C_MDC_EvalStray(StrayManager *sm_, CDentry *d) : StrayManagerContext(sm_), dn(d) {}
442 void finish(int r) override {
443 sm->eval_stray(dn);
444 }
445 };
446
447 bool StrayManager::_eval_stray(CDentry *dn)
448 {
449 dout(10) << "eval_stray " << *dn << dendl;
450 CDentry::linkage_t *dnl = dn->get_projected_linkage();
451 ceph_assert(dnl->is_primary());
452 dout(10) << " inode is " << *dnl->get_inode() << dendl;
453 CInode *in = dnl->get_inode();
454 ceph_assert(in);
455 ceph_assert(!in->state_test(CInode::STATE_REJOINUNDEF));
456
457 // The only dentries elegible for purging are those
458 // in the stray directories
459 ceph_assert(dn->get_dir()->get_inode()->is_stray());
460
461 // Inode may not pass through this function if it
462 // was already identified for purging (i.e. cannot
463 // call eval_stray() after purge()
464 ceph_assert(!dn->state_test(CDentry::STATE_PURGING));
465
466 if (!dn->is_auth())
467 return false;
468
469 if (!started)
470 return false;
471
472 if (dn->item_stray.is_on_list()) {
473 dn->item_stray.remove_myself();
474 num_strays_delayed--;
475 logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
476 }
477
478 // purge?
479 if (in->get_inode()->nlink == 0) {
480 // past snaprealm parents imply snapped dentry remote links.
481 // only important for directories. normal file data snaps are handled
482 // by the object store.
483 if (in->snaprealm) {
484 in->snaprealm->prune_past_parent_snaps();
485 in->purge_stale_snap_data(in->snaprealm->get_snaps());
486 }
487 if (in->is_dir()) {
488 if (in->snaprealm && in->snaprealm->has_past_parent_snaps()) {
489 dout(20) << " directory has past parents "
490 << in->snaprealm << dendl;
491 if (in->state_test(CInode::STATE_MISSINGOBJS)) {
492 mds->clog->error() << "previous attempt at committing dirfrag of ino "
493 << in->ino() << " has failed, missing object";
494 mds->handle_write_error(-CEPHFS_ENOENT);
495 }
496 return false; // not until some snaps are deleted.
497 }
498
499 mds->mdcache->clear_dirty_bits_for_stray(in);
500
501 if (!in->remote_parents.empty()) {
502 // unlink any stale remote snap dentry.
503 for (auto it = in->remote_parents.begin(); it != in->remote_parents.end(); ) {
504 CDentry *remote_dn = *it;
505 ++it;
506 ceph_assert(remote_dn->last != CEPH_NOSNAP);
507 remote_dn->unlink_remote(remote_dn->get_linkage());
508 }
509 }
510 }
511 if (dn->is_replicated()) {
512 dout(20) << " replicated" << dendl;
513 return false;
514 }
515 if (dn->is_any_leases() || in->is_any_caps()) {
516 dout(20) << " caps | leases" << dendl;
517 return false; // wait
518 }
519 if (in->state_test(CInode::STATE_NEEDSRECOVER) ||
520 in->state_test(CInode::STATE_RECOVERING)) {
521 dout(20) << " pending recovery" << dendl;
522 return false; // don't mess with file size probing
523 }
524 if (in->get_num_ref() > (int)in->is_dirty() + (int)in->is_dirty_parent()) {
525 dout(20) << " too many inode refs" << dendl;
526 return false;
527 }
528 if (dn->get_num_ref() > (int)dn->is_dirty() + !!in->get_num_ref()) {
529 dout(20) << " too many dn refs" << dendl;
530 return false;
531 }
532 // don't purge multiversion inode with snap data
533 if (in->snaprealm && in->snaprealm->has_past_parent_snaps() &&
534 in->is_any_old_inodes()) {
535 // A file with snapshots: we will truncate the HEAD revision
536 // but leave the metadata intact.
537 ceph_assert(!in->is_dir());
538 dout(20) << " file has past parents "
539 << in->snaprealm << dendl;
540 if (in->is_file() && in->get_projected_inode()->size > 0) {
541 enqueue(dn, true); // truncate head objects
542 }
543 } else {
544 // A straightforward file, ready to be purged. Enqueue it.
545 if (in->is_dir()) {
546 in->close_dirfrags();
547 }
548
549 enqueue(dn, false);
550 }
551
552 return true;
553 } else {
554 /*
555 * Where a stray has some links, they should be remotes, check
556 * if we can do anything with them if we happen to have them in
557 * cache.
558 */
559 _eval_stray_remote(dn, NULL);
560 return false;
561 }
562 }
563
564 void StrayManager::activate()
565 {
566 dout(10) << __func__ << dendl;
567 started = true;
568 purge_queue.activate();
569 }
570
571 bool StrayManager::eval_stray(CDentry *dn)
572 {
573 // avoid nested eval_stray
574 if (dn->state_test(CDentry::STATE_EVALUATINGSTRAY))
575 return false;
576
577 dn->state_set(CDentry::STATE_EVALUATINGSTRAY);
578 bool ret = _eval_stray(dn);
579 dn->state_clear(CDentry::STATE_EVALUATINGSTRAY);
580 return ret;
581 }
582
583 void StrayManager::eval_remote(CDentry *remote_dn)
584 {
585 dout(10) << __func__ << " " << *remote_dn << dendl;
586
587 CDentry::linkage_t *dnl = remote_dn->get_projected_linkage();
588 ceph_assert(dnl->is_remote());
589 CInode *in = dnl->get_inode();
590
591 if (!in) {
592 dout(20) << __func__ << ": no inode, cannot evaluate" << dendl;
593 return;
594 }
595
596 if (remote_dn->last != CEPH_NOSNAP) {
597 dout(20) << __func__ << ": snap dentry, cannot evaluate" << dendl;
598 return;
599 }
600
601 // refers to stray?
602 CDentry *primary_dn = in->get_projected_parent_dn();
603 ceph_assert(primary_dn != NULL);
604 if (primary_dn->get_dir()->get_inode()->is_stray()) {
605 _eval_stray_remote(primary_dn, remote_dn);
606 } else {
607 dout(20) << __func__ << ": inode's primary dn not stray" << dendl;
608 }
609 }
610
611 class C_RetryEvalRemote : public StrayManagerContext {
612 CDentry *dn;
613 public:
614 C_RetryEvalRemote(StrayManager *sm_, CDentry *dn_) :
615 StrayManagerContext(sm_), dn(dn_) {
616 dn->get(CDentry::PIN_PTRWAITER);
617 }
618 void finish(int r) override {
619 if (dn->get_projected_linkage()->is_remote())
620 sm->eval_remote(dn);
621 dn->put(CDentry::PIN_PTRWAITER);
622 }
623 };
624
625 void StrayManager::_eval_stray_remote(CDentry *stray_dn, CDentry *remote_dn)
626 {
627 dout(20) << __func__ << " " << *stray_dn << dendl;
628 ceph_assert(stray_dn != NULL);
629 ceph_assert(stray_dn->get_dir()->get_inode()->is_stray());
630 CDentry::linkage_t *stray_dnl = stray_dn->get_projected_linkage();
631 ceph_assert(stray_dnl->is_primary());
632 CInode *stray_in = stray_dnl->get_inode();
633 ceph_assert(stray_in->get_inode()->nlink >= 1);
634 ceph_assert(stray_in->last == CEPH_NOSNAP);
635
636 /* If no remote_dn hinted, pick one arbitrarily */
637 if (remote_dn == NULL) {
638 if (!stray_in->remote_parents.empty()) {
639 for (const auto &dn : stray_in->remote_parents) {
640 if (dn->last == CEPH_NOSNAP && !dn->is_projected()) {
641 if (dn->is_auth()) {
642 remote_dn = dn;
643 if (remote_dn->dir->can_auth_pin())
644 break;
645 } else if (!remote_dn) {
646 remote_dn = dn;
647 }
648 }
649 }
650 }
651 if (!remote_dn) {
652 dout(20) << __func__ << ": not reintegrating (no remote parents in cache)" << dendl;
653 return;
654 }
655 }
656 ceph_assert(remote_dn->last == CEPH_NOSNAP);
657 // NOTE: we repeat this check in _rename(), since our submission path is racey.
658 if (!remote_dn->is_projected()) {
659 if (remote_dn->is_auth()) {
660 if (remote_dn->dir->can_auth_pin()) {
661 reintegrate_stray(stray_dn, remote_dn);
662 } else {
663 remote_dn->dir->add_waiter(CDir::WAIT_UNFREEZE, new C_RetryEvalRemote(this, remote_dn));
664 dout(20) << __func__ << ": not reintegrating (can't authpin remote parent)" << dendl;
665 }
666
667 } else if (stray_dn->is_auth()) {
668 migrate_stray(stray_dn, remote_dn->authority().first);
669 } else {
670 dout(20) << __func__ << ": not reintegrating" << dendl;
671 }
672 } else {
673 // don't do anything if the remote parent is projected, or we may
674 // break user-visible semantics!
675 dout(20) << __func__ << ": not reintegrating (projected)" << dendl;
676 }
677 }
678
679 void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn)
680 {
681 dout(10) << __func__ << " " << *straydn << " to " << *rdn << dendl;
682
683 logger->inc(l_mdc_strays_reintegrated);
684
685 // rename it to remote linkage .
686 filepath src(straydn->get_name(), straydn->get_dir()->ino());
687 filepath dst(rdn->get_name(), rdn->get_dir()->ino());
688
689 ceph_tid_t tid = mds->issue_tid();
690
691 auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
692 req->set_filepath(dst);
693 req->set_filepath2(src);
694 req->set_tid(tid);
695
696 rdn->state_set(CDentry::STATE_REINTEGRATING);
697 mds->internal_client_requests.emplace(std::piecewise_construct,
698 std::make_tuple(tid),
699 std::make_tuple(CEPH_MDS_OP_RENAME,
700 rdn, tid));
701
702 mds->send_message_mds(req, rdn->authority().first);
703 }
704
705 void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
706 {
707 dout(10) << __func__ << " " << *dn << " to mds." << to << dendl;
708
709 logger->inc(l_mdc_strays_migrated);
710
711 // rename it to another mds.
712 inodeno_t dirino = dn->get_dir()->ino();
713 ceph_assert(MDS_INO_IS_STRAY(dirino));
714
715 filepath src(dn->get_name(), dirino);
716 filepath dst(dn->get_name(), MDS_INO_STRAY(to, MDS_INO_STRAY_INDEX(dirino)));
717
718 ceph_tid_t tid = mds->issue_tid();
719
720 auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
721 req->set_filepath(dst);
722 req->set_filepath2(src);
723 req->set_tid(tid);
724
725 mds->internal_client_requests.emplace(std::piecewise_construct,
726 std::make_tuple(tid),
727 std::make_tuple(CEPH_MDS_OP_RENAME,
728 nullptr, tid));
729
730 mds->send_message_mds(req, to);
731 }
732
733 StrayManager::StrayManager(MDSRank *mds, PurgeQueue &purge_queue_)
734 : delayed_eval_stray(member_offset(CDentry, item_stray)),
735 mds(mds), purge_queue(purge_queue_)
736 {
737 ceph_assert(mds != NULL);
738 }
739
740 void StrayManager::truncate(CDentry *dn)
741 {
742 const CDentry::linkage_t *dnl = dn->get_projected_linkage();
743 const CInode *in = dnl->get_inode();
744 ceph_assert(in);
745 dout(10) << __func__ << ": " << *dn << " " << *in << dendl;
746 ceph_assert(!dn->is_replicated());
747
748 const SnapRealm *realm = in->find_snaprealm();
749 ceph_assert(realm);
750 dout(10) << " realm " << *realm << dendl;
751 const SnapContext *snapc = &realm->get_snap_context();
752
753 uint64_t to = std::max(in->get_inode()->size, in->get_inode()->get_max_size());
754 // when truncating a file, the filer does not delete stripe objects that are
755 // truncated to zero. so we need to purge stripe objects up to the max size
756 // the file has ever been.
757 to = std::max(in->get_inode()->max_size_ever, to);
758
759 ceph_assert(to > 0);
760
761 PurgeItem item;
762 item.action = PurgeItem::TRUNCATE_FILE;
763 item.ino = in->ino();
764 item.layout = in->get_inode()->layout;
765 item.snapc = *snapc;
766 item.size = to;
767 item.stamp = ceph_clock_now();
768
769 purge_queue.push(item, new C_IO_PurgeStrayPurged(
770 this, dn, true));
771 }
772
773 void StrayManager::_truncate_stray_logged(CDentry *dn, MutationRef& mut)
774 {
775 CInode *in = dn->get_projected_linkage()->get_inode();
776
777 dout(10) << __func__ << ": " << *dn << " " << *in << dendl;
778
779 mut->apply();
780
781 in->state_clear(CInode::STATE_PURGING);
782 dn->state_clear(CDentry::STATE_PURGING | CDentry::STATE_PURGINGPINNED);
783 dn->put(CDentry::PIN_PURGING);
784
785 dn->get_dir()->auth_unpin(this);
786
787 eval_stray(dn);
788
789 if (!dn->state_test(CDentry::STATE_PURGING) && mds->is_stopping())
790 mds->mdcache->shutdown_export_stray_finish(in->ino());
791 }
792