1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <boost/utility/string_view.hpp>
17 #include "include/types.h"
29 #include "LogSegment.h"
31 #include "common/bloom_filter.hpp"
32 #include "include/Context.h"
33 #include "common/Clock.h"
35 #include "osdc/Objecter.h"
37 #include "common/config.h"
38 #include "include/assert.h"
39 #include "include/compat.h"
41 #define dout_context g_ceph_context
42 #define dout_subsys ceph_subsys_mds
44 #define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
46 int CDir::num_frozen_trees
= 0;
47 int CDir::num_freezing_trees
= 0;
49 class CDirContext
: public MDSInternalContextBase
53 MDSRank
* get_mds() override
{return dir
->cache
->mds
;}
56 explicit CDirContext(CDir
*d
) : dir(d
) {
62 class CDirIOContext
: public MDSIOContextBase
66 MDSRank
* get_mds() override
{return dir
->cache
->mds
;}
69 explicit CDirIOContext(CDir
*d
) : dir(d
) {
76 //int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
79 ostream
& operator<<(ostream
& out
, const CDir
& dir
)
81 out
<< "[dir " << dir
.dirfrag() << " " << dir
.get_path() << "/"
82 << " [" << dir
.first
<< ",head]";
85 if (dir
.is_replicated())
86 out
<< dir
.get_replicas();
88 if (dir
.is_projected())
89 out
<< " pv=" << dir
.get_projected_version();
90 out
<< " v=" << dir
.get_version();
91 out
<< " cv=" << dir
.get_committing_version();
92 out
<< "/" << dir
.get_committed_version();
94 mds_authority_t a
= dir
.authority();
95 out
<< " rep@" << a
.first
;
96 if (a
.second
!= CDIR_AUTH_UNKNOWN
)
97 out
<< "," << a
.second
;
98 out
<< "." << dir
.get_replica_nonce();
101 if (dir
.is_rep()) out
<< " REP";
103 if (dir
.get_dir_auth() != CDIR_AUTH_DEFAULT
) {
104 if (dir
.get_dir_auth().second
== CDIR_AUTH_UNKNOWN
)
105 out
<< " dir_auth=" << dir
.get_dir_auth().first
;
107 out
<< " dir_auth=" << dir
.get_dir_auth();
110 if (dir
.get_cum_auth_pins())
111 out
<< " ap=" << dir
.get_auth_pins()
112 << "+" << dir
.get_dir_auth_pins()
113 << "+" << dir
.get_nested_auth_pins();
115 out
<< " state=" << dir
.get_state();
116 if (dir
.state_test(CDir::STATE_COMPLETE
)) out
<< "|complete";
117 if (dir
.state_test(CDir::STATE_FREEZINGTREE
)) out
<< "|freezingtree";
118 if (dir
.state_test(CDir::STATE_FROZENTREE
)) out
<< "|frozentree";
119 if (dir
.state_test(CDir::STATE_AUXSUBTREE
)) out
<< "|auxsubtree";
120 //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
121 if (dir
.state_test(CDir::STATE_FROZENDIR
)) out
<< "|frozendir";
122 if (dir
.state_test(CDir::STATE_FREEZINGDIR
)) out
<< "|freezingdir";
123 if (dir
.state_test(CDir::STATE_EXPORTBOUND
)) out
<< "|exportbound";
124 if (dir
.state_test(CDir::STATE_IMPORTBOUND
)) out
<< "|importbound";
125 if (dir
.state_test(CDir::STATE_BADFRAG
)) out
<< "|badfrag";
126 if (dir
.state_test(CDir::STATE_FRAGMENTING
)) out
<< "|fragmenting";
129 out
<< " " << dir
.fnode
.fragstat
;
130 if (!(dir
.fnode
.fragstat
== dir
.fnode
.accounted_fragstat
))
131 out
<< "/" << dir
.fnode
.accounted_fragstat
;
132 if (g_conf
->mds_debug_scatterstat
&& dir
.is_projected()) {
133 const fnode_t
*pf
= dir
.get_projected_fnode();
134 out
<< "->" << pf
->fragstat
;
135 if (!(pf
->fragstat
== pf
->accounted_fragstat
))
136 out
<< "/" << pf
->accounted_fragstat
;
140 out
<< " " << dir
.fnode
.rstat
;
141 if (!(dir
.fnode
.rstat
== dir
.fnode
.accounted_rstat
))
142 out
<< "/" << dir
.fnode
.accounted_rstat
;
143 if (g_conf
->mds_debug_scatterstat
&& dir
.is_projected()) {
144 const fnode_t
*pf
= dir
.get_projected_fnode();
145 out
<< "->" << pf
->rstat
;
146 if (!(pf
->rstat
== pf
->accounted_rstat
))
147 out
<< "/" << pf
->accounted_rstat
;
150 out
<< " hs=" << dir
.get_num_head_items() << "+" << dir
.get_num_head_null();
151 out
<< ",ss=" << dir
.get_num_snap_items() << "+" << dir
.get_num_snap_null();
152 if (dir
.get_num_dirty())
153 out
<< " dirty=" << dir
.get_num_dirty();
155 if (dir
.get_num_ref()) {
157 dir
.print_pin_set(out
);
165 void CDir::print(ostream
& out
)
173 ostream
& CDir::print_db_line_prefix(ostream
& out
)
175 return out
<< ceph_clock_now() << " mds." << cache
->mds
->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
180 // -------------------------------------------------------------------
183 CDir::CDir(CInode
*in
, frag_t fg
, MDCache
*mdcache
, bool auth
) :
184 cache(mdcache
), inode(in
), frag(fg
),
186 dirty_rstat_inodes(member_offset(CInode
, dirty_rstat_item
)),
187 projected_version(0),
188 dirty_dentries(member_offset(CDentry
, item_dir_dirty
)),
189 item_dirty(this), item_new(this),
190 num_head_items(0), num_head_null(0),
191 num_snap_items(0), num_snap_null(0),
192 num_dirty(0), committing_version(0), committed_version(0),
193 dir_auth_pins(0), request_pins(0),
195 pop_me(ceph_clock_now()),
196 pop_nested(ceph_clock_now()),
197 pop_auth_subtree(ceph_clock_now()),
198 pop_auth_subtree_nested(ceph_clock_now()),
199 pop_lru_subdirs(member_offset(CInode
, item_pop_lru
)),
200 num_dentries_nested(0), num_dentries_auth_subtree(0),
201 num_dentries_auth_subtree_nested(0),
202 dir_auth(CDIR_AUTH_DEFAULT
)
204 memset(&fnode
, 0, sizeof(fnode
));
207 assert(in
->is_dir());
208 if (auth
) state_set(STATE_AUTH
);
212 * Check the recursive statistics on size for consistency.
213 * If mds_debug_scatterstat is enabled, assert for correctness,
214 * otherwise just print out the mismatch and continue.
216 bool CDir::check_rstats(bool scrub
)
218 if (!g_conf
->mds_debug_scatterstat
&& !scrub
)
221 dout(25) << "check_rstats on " << this << dendl
;
222 if (!is_complete() || !is_auth() || is_frozen()) {
224 dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl
;
228 frag_info_t frag_info
;
229 nest_info_t nest_info
;
230 for (auto i
= items
.begin(); i
!= items
.end(); ++i
) {
231 if (i
->second
->last
!= CEPH_NOSNAP
)
233 CDentry::linkage_t
*dnl
= i
->second
->get_linkage();
234 if (dnl
->is_primary()) {
235 CInode
*in
= dnl
->get_inode();
236 nest_info
.add(in
->inode
.accounted_rstat
);
238 frag_info
.nsubdirs
++;
241 } else if (dnl
->is_remote())
247 if(!frag_info
.same_sums(fnode
.fragstat
)) {
248 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl
;
249 dout(1) << "get_num_head_items() = " << get_num_head_items()
250 << "; fnode.fragstat.nfiles=" << fnode
.fragstat
.nfiles
251 << " fnode.fragstat.nsubdirs=" << fnode
.fragstat
.nsubdirs
<< dendl
;
254 dout(20) << "get_num_head_items() = " << get_num_head_items()
255 << "; fnode.fragstat.nfiles=" << fnode
.fragstat
.nfiles
256 << " fnode.fragstat.nsubdirs=" << fnode
.fragstat
.nsubdirs
<< dendl
;
260 if (!nest_info
.same_sums(fnode
.rstat
)) {
261 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl
;
262 dout(1) << "total of child dentrys: " << nest_info
<< dendl
;
263 dout(1) << "my rstats: " << fnode
.rstat
<< dendl
;
266 dout(20) << "total of child dentrys: " << nest_info
<< dendl
;
267 dout(20) << "my rstats: " << fnode
.rstat
<< dendl
;
272 for (auto i
= items
.begin(); i
!= items
.end(); ++i
) {
273 CDentry
*dn
= i
->second
;
274 if (dn
->get_linkage()->is_primary()) {
275 CInode
*in
= dn
->get_linkage()->inode
;
276 dout(1) << *dn
<< " rstat " << in
->inode
.accounted_rstat
<< dendl
;
278 dout(1) << *dn
<< dendl
;
282 assert(frag_info
.nfiles
== fnode
.fragstat
.nfiles
);
283 assert(frag_info
.nsubdirs
== fnode
.fragstat
.nsubdirs
);
284 assert(nest_info
.rbytes
== fnode
.rstat
.rbytes
);
285 assert(nest_info
.rfiles
== fnode
.rstat
.rfiles
);
286 assert(nest_info
.rsubdirs
== fnode
.rstat
.rsubdirs
);
289 dout(10) << "check_rstats complete on " << this << dendl
;
293 CDentry
*CDir::lookup(boost::string_view name
, snapid_t snap
)
295 dout(20) << "lookup (" << snap
<< ", '" << name
<< "')" << dendl
;
296 auto iter
= items
.lower_bound(dentry_key_t(snap
, name
, inode
->hash_dentry_name(name
)));
297 if (iter
== items
.end())
299 if (iter
->second
->get_name() == name
&&
300 iter
->second
->first
<= snap
&&
301 iter
->second
->last
>= snap
) {
302 dout(20) << " hit -> " << iter
->first
<< dendl
;
305 dout(20) << " miss -> " << iter
->first
<< dendl
;
309 CDentry
*CDir::lookup_exact_snap(boost::string_view name
, snapid_t last
) {
310 auto p
= items
.find(dentry_key_t(last
, name
, inode
->hash_dentry_name(name
)));
311 if (p
== items
.end())
320 CDentry
* CDir::add_null_dentry(boost::string_view dname
,
321 snapid_t first
, snapid_t last
)
324 assert(lookup_exact_snap(dname
, last
) == 0);
327 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), first
, last
);
329 dn
->state_set(CDentry::STATE_AUTH
);
331 cache
->bottom_lru
.lru_insert_mid(dn
);
332 dn
->state_set(CDentry::STATE_BOTTOMLRU
);
335 dn
->version
= get_projected_version();
338 assert(items
.count(dn
->key()) == 0);
339 //assert(null_items.count(dn->get_name()) == 0);
341 items
[dn
->key()] = dn
;
342 if (last
== CEPH_NOSNAP
)
347 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
348 dn
->get(CDentry::PIN_FRAGMENTING
);
349 dn
->state_set(CDentry::STATE_FRAGMENTING
);
352 dout(12) << "add_null_dentry " << *dn
<< dendl
;
355 if (get_num_any() == 1)
358 assert(get_num_any() == items
.size());
363 CDentry
* CDir::add_primary_dentry(boost::string_view dname
, CInode
*in
,
364 snapid_t first
, snapid_t last
)
367 assert(lookup_exact_snap(dname
, last
) == 0);
370 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), first
, last
);
372 dn
->state_set(CDentry::STATE_AUTH
);
373 if (is_auth() || !inode
->is_stray()) {
374 cache
->lru
.lru_insert_mid(dn
);
376 cache
->bottom_lru
.lru_insert_mid(dn
);
377 dn
->state_set(CDentry::STATE_BOTTOMLRU
);
381 dn
->version
= get_projected_version();
384 assert(items
.count(dn
->key()) == 0);
385 //assert(null_items.count(dn->get_name()) == 0);
387 items
[dn
->key()] = dn
;
389 dn
->get_linkage()->inode
= in
;
391 link_inode_work(dn
, in
);
393 if (dn
->last
== CEPH_NOSNAP
)
398 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
399 dn
->get(CDentry::PIN_FRAGMENTING
);
400 dn
->state_set(CDentry::STATE_FRAGMENTING
);
403 dout(12) << "add_primary_dentry " << *dn
<< dendl
;
406 if (get_num_any() == 1)
408 assert(get_num_any() == items
.size());
412 CDentry
* CDir::add_remote_dentry(boost::string_view dname
, inodeno_t ino
, unsigned char d_type
,
413 snapid_t first
, snapid_t last
)
416 assert(lookup_exact_snap(dname
, last
) == 0);
419 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), ino
, d_type
, first
, last
);
421 dn
->state_set(CDentry::STATE_AUTH
);
422 cache
->lru
.lru_insert_mid(dn
);
425 dn
->version
= get_projected_version();
428 assert(items
.count(dn
->key()) == 0);
429 //assert(null_items.count(dn->get_name()) == 0);
431 items
[dn
->key()] = dn
;
432 if (last
== CEPH_NOSNAP
)
437 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
438 dn
->get(CDentry::PIN_FRAGMENTING
);
439 dn
->state_set(CDentry::STATE_FRAGMENTING
);
442 dout(12) << "add_remote_dentry " << *dn
<< dendl
;
445 if (get_num_any() == 1)
448 assert(get_num_any() == items
.size());
454 void CDir::remove_dentry(CDentry
*dn
)
456 dout(12) << "remove_dentry " << *dn
<< dendl
;
458 // there should be no client leases at this point!
459 assert(dn
->client_lease_map
.empty());
461 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
462 dn
->put(CDentry::PIN_FRAGMENTING
);
463 dn
->state_clear(CDentry::STATE_FRAGMENTING
);
466 if (dn
->get_linkage()->is_null()) {
467 if (dn
->last
== CEPH_NOSNAP
)
472 if (dn
->last
== CEPH_NOSNAP
)
478 if (!dn
->get_linkage()->is_null())
479 // detach inode and dentry
480 unlink_inode_work(dn
);
483 assert(items
.count(dn
->key()) == 1);
484 items
.erase(dn
->key());
490 if (dn
->state_test(CDentry::STATE_BOTTOMLRU
))
491 cache
->bottom_lru
.lru_remove(dn
);
493 cache
->lru
.lru_remove(dn
);
497 if (get_num_any() == 0)
499 assert(get_num_any() == items
.size());
502 void CDir::link_remote_inode(CDentry
*dn
, CInode
*in
)
504 link_remote_inode(dn
, in
->ino(), IFTODT(in
->get_projected_inode()->mode
));
507 void CDir::link_remote_inode(CDentry
*dn
, inodeno_t ino
, unsigned char d_type
)
509 dout(12) << "link_remote_inode " << *dn
<< " remote " << ino
<< dendl
;
510 assert(dn
->get_linkage()->is_null());
512 dn
->get_linkage()->set_remote(ino
, d_type
);
514 if (dn
->state_test(CDentry::STATE_BOTTOMLRU
)) {
515 cache
->bottom_lru
.lru_remove(dn
);
516 cache
->lru
.lru_insert_mid(dn
);
517 dn
->state_clear(CDentry::STATE_BOTTOMLRU
);
520 if (dn
->last
== CEPH_NOSNAP
) {
527 assert(get_num_any() == items
.size());
530 void CDir::link_primary_inode(CDentry
*dn
, CInode
*in
)
532 dout(12) << "link_primary_inode " << *dn
<< " " << *in
<< dendl
;
533 assert(dn
->get_linkage()->is_null());
535 dn
->get_linkage()->inode
= in
;
537 link_inode_work(dn
, in
);
539 if (dn
->state_test(CDentry::STATE_BOTTOMLRU
) &&
540 (is_auth() || !inode
->is_stray())) {
541 cache
->bottom_lru
.lru_remove(dn
);
542 cache
->lru
.lru_insert_mid(dn
);
543 dn
->state_clear(CDentry::STATE_BOTTOMLRU
);
546 if (dn
->last
== CEPH_NOSNAP
) {
554 assert(get_num_any() == items
.size());
557 void CDir::link_inode_work( CDentry
*dn
, CInode
*in
)
559 assert(dn
->get_linkage()->get_inode() == in
);
560 in
->set_primary_parent(dn
);
563 //in->inode.version = dn->get_version();
566 if (in
->get_num_ref())
567 dn
->get(CDentry::PIN_INODEPIN
);
569 // adjust auth pin count
570 if (in
->auth_pins
+ in
->nested_auth_pins
)
571 dn
->adjust_nested_auth_pins(in
->auth_pins
+ in
->nested_auth_pins
, in
->auth_pins
, NULL
);
573 // verify open snaprealm parent
575 in
->snaprealm
->adjust_parent();
576 else if (in
->is_any_caps())
577 in
->move_to_realm(inode
->find_snaprealm());
580 void CDir::unlink_inode(CDentry
*dn
, bool adjust_lru
)
582 if (dn
->get_linkage()->is_primary()) {
583 dout(12) << "unlink_inode " << *dn
<< " " << *dn
->get_linkage()->get_inode() << dendl
;
585 dout(12) << "unlink_inode " << *dn
<< dendl
;
588 unlink_inode_work(dn
);
590 if (adjust_lru
&& !dn
->state_test(CDentry::STATE_BOTTOMLRU
)) {
591 cache
->lru
.lru_remove(dn
);
592 cache
->bottom_lru
.lru_insert_mid(dn
);
593 dn
->state_set(CDentry::STATE_BOTTOMLRU
);
596 if (dn
->last
== CEPH_NOSNAP
) {
603 assert(get_num_any() == items
.size());
607 void CDir::try_remove_unlinked_dn(CDentry
*dn
)
609 assert(dn
->dir
== this);
610 assert(dn
->get_linkage()->is_null());
612 // no pins (besides dirty)?
613 if (dn
->get_num_ref() != dn
->is_dirty())
618 dout(10) << "try_remove_unlinked_dn " << *dn
<< " in " << *this << dendl
;
623 // NOTE: we may not have any more dirty dentries, but the fnode
624 // still changed, so the directory must remain dirty.
629 void CDir::unlink_inode_work( CDentry
*dn
)
631 CInode
*in
= dn
->get_linkage()->get_inode();
633 if (dn
->get_linkage()->is_remote()) {
636 dn
->unlink_remote(dn
->get_linkage());
638 dn
->get_linkage()->set_remote(0, 0);
639 } else if (dn
->get_linkage()->is_primary()) {
642 if (in
->get_num_ref())
643 dn
->put(CDentry::PIN_INODEPIN
);
645 // unlink auth_pin count
646 if (in
->auth_pins
+ in
->nested_auth_pins
)
647 dn
->adjust_nested_auth_pins(0 - (in
->auth_pins
+ in
->nested_auth_pins
), 0 - in
->auth_pins
, NULL
);
650 in
->remove_primary_parent(dn
);
652 in
->item_pop_lru
.remove_myself();
653 dn
->get_linkage()->inode
= 0;
655 assert(!dn
->get_linkage()->is_null());
659 void CDir::add_to_bloom(CDentry
*dn
)
661 assert(dn
->last
== CEPH_NOSNAP
);
663 /* not create bloom filter for incomplete dir that was added by log replay */
667 /* don't maintain bloom filters in standby replay (saves cycles, and also
668 * avoids need to implement clearing it in EExport for #16924) */
669 if (cache
->mds
->is_standby_replay()) {
673 unsigned size
= get_num_head_items() + get_num_snap_items();
674 if (size
< 100) size
= 100;
675 bloom
.reset(new bloom_filter(size
, 1.0 / size
, 0));
677 /* This size and false positive probability is completely random.*/
678 bloom
->insert(dn
->get_name().data(), dn
->get_name().size());
681 bool CDir::is_in_bloom(boost::string_view name
)
685 return bloom
->contains(name
.data(), name
.size());
688 void CDir::remove_null_dentries() {
689 dout(12) << "remove_null_dentries " << *this << dendl
;
691 auto p
= items
.begin();
692 while (p
!= items
.end()) {
693 CDentry
*dn
= p
->second
;
695 if (dn
->get_linkage()->is_null() && !dn
->is_projected())
699 assert(num_snap_null
== 0);
700 assert(num_head_null
== 0);
701 assert(get_num_any() == items
.size());
704 /** remove dirty null dentries for deleted directory. the dirfrag will be
705 * deleted soon, so it's safe to not commit dirty dentries.
707 * This is called when a directory is being deleted, a prerequisite
708 * of which is that its children have been unlinked: we expect to only see
709 * null, unprojected dentries here.
711 void CDir::try_remove_dentries_for_stray()
713 dout(10) << __func__
<< dendl
;
714 assert(get_parent_dir()->inode
->is_stray());
716 // clear dirty only when the directory was not snapshotted
717 bool clear_dirty
= !inode
->snaprealm
;
719 auto p
= items
.begin();
720 while (p
!= items
.end()) {
721 CDentry
*dn
= p
->second
;
723 if (dn
->last
== CEPH_NOSNAP
) {
724 assert(!dn
->is_projected());
725 assert(dn
->get_linkage()->is_null());
726 if (clear_dirty
&& dn
->is_dirty())
728 // It's OK to remove lease prematurely because we will never link
729 // the dentry to inode again.
730 if (dn
->is_any_leases())
731 dn
->remove_client_leases(cache
->mds
->locker
);
732 if (dn
->get_num_ref() == 0)
735 assert(!dn
->is_projected());
736 CDentry::linkage_t
*dnl
= dn
->get_linkage();
738 if (dnl
->is_primary()) {
739 in
= dnl
->get_inode();
740 if (clear_dirty
&& in
->is_dirty())
743 if (clear_dirty
&& dn
->is_dirty())
745 if (dn
->get_num_ref() == 0) {
748 cache
->remove_inode(in
);
753 if (clear_dirty
&& is_dirty())
757 bool CDir::try_trim_snap_dentry(CDentry
*dn
, const set
<snapid_t
>& snaps
)
759 assert(dn
->last
!= CEPH_NOSNAP
);
760 set
<snapid_t
>::const_iterator p
= snaps
.lower_bound(dn
->first
);
761 CDentry::linkage_t
*dnl
= dn
->get_linkage();
763 if (dnl
->is_primary())
764 in
= dnl
->get_inode();
765 if ((p
== snaps
.end() || *p
> dn
->last
) &&
766 (dn
->get_num_ref() == dn
->is_dirty()) &&
767 (!in
|| in
->get_num_ref() == in
->is_dirty())) {
768 dout(10) << " purging snapped " << *dn
<< dendl
;
769 if (in
&& in
->is_dirty())
773 dout(10) << " purging snapped " << *in
<< dendl
;
774 cache
->remove_inode(in
);
782 void CDir::purge_stale_snap_data(const set
<snapid_t
>& snaps
)
784 dout(10) << "purge_stale_snap_data " << snaps
<< dendl
;
786 auto p
= items
.begin();
787 while (p
!= items
.end()) {
788 CDentry
*dn
= p
->second
;
791 if (dn
->last
== CEPH_NOSNAP
)
794 try_trim_snap_dentry(dn
, snaps
);
800 * steal_dentry -- semi-violently move a dentry from one CDir to another
801 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
802 * on the old CDir corpse; must call finish_old_fragment() when finished.
804 void CDir::steal_dentry(CDentry
*dn
)
806 dout(15) << "steal_dentry " << *dn
<< dendl
;
808 items
[dn
->key()] = dn
;
810 dn
->dir
->items
.erase(dn
->key());
811 if (dn
->dir
->items
.empty())
812 dn
->dir
->put(PIN_CHILD
);
814 if (get_num_any() == 0)
816 if (dn
->get_linkage()->is_null()) {
817 if (dn
->last
== CEPH_NOSNAP
)
821 } else if (dn
->last
== CEPH_NOSNAP
) {
824 if (dn
->get_linkage()->is_primary()) {
825 CInode
*in
= dn
->get_linkage()->get_inode();
826 auto pi
= in
->get_projected_inode();
828 fnode
.fragstat
.nsubdirs
++;
829 if (in
->item_pop_lru
.is_on_list())
830 pop_lru_subdirs
.push_back(&in
->item_pop_lru
);
832 fnode
.fragstat
.nfiles
++;
834 fnode
.rstat
.rbytes
+= pi
->accounted_rstat
.rbytes
;
835 fnode
.rstat
.rfiles
+= pi
->accounted_rstat
.rfiles
;
836 fnode
.rstat
.rsubdirs
+= pi
->accounted_rstat
.rsubdirs
;
837 fnode
.rstat
.rsnaprealms
+= pi
->accounted_rstat
.rsnaprealms
;
838 if (pi
->accounted_rstat
.rctime
> fnode
.rstat
.rctime
)
839 fnode
.rstat
.rctime
= pi
->accounted_rstat
.rctime
;
841 // move dirty inode rstat to new dirfrag
842 if (in
->is_dirty_rstat())
843 dirty_rstat_inodes
.push_back(&in
->dirty_rstat_item
);
844 } else if (dn
->get_linkage()->is_remote()) {
845 if (dn
->get_linkage()->get_remote_d_type() == DT_DIR
)
846 fnode
.fragstat
.nsubdirs
++;
848 fnode
.fragstat
.nfiles
++;
852 if (dn
->get_linkage()->is_primary()) {
853 CInode
*in
= dn
->get_linkage()->get_inode();
854 if (in
->is_dirty_rstat())
855 dirty_rstat_inodes
.push_back(&in
->dirty_rstat_item
);
859 if (dn
->auth_pins
|| dn
->nested_auth_pins
) {
860 // use the helpers here to maintain the auth_pin invariants on the dir inode
861 int ap
= dn
->get_num_auth_pins() + dn
->get_num_nested_auth_pins();
862 int dap
= dn
->get_num_dir_auth_pins();
864 adjust_nested_auth_pins(ap
, dap
, NULL
);
865 dn
->dir
->adjust_nested_auth_pins(-ap
, -dap
, NULL
);
868 if (dn
->is_dirty()) {
869 dirty_dentries
.push_back(&dn
->item_dir_dirty
);
876 void CDir::prepare_old_fragment(map
<string_snap_t
, std::list
<MDSInternalContextBase
*> >& dentry_waiters
, bool replay
)
878 // auth_pin old fragment for duration so that any auth_pinning
879 // during the dentry migration doesn't trigger side effects
880 if (!replay
&& is_auth())
883 if (!waiting_on_dentry
.empty()) {
884 for (const auto &p
: waiting_on_dentry
) {
885 auto &e
= dentry_waiters
[p
.first
];
886 for (const auto &waiter
: p
.second
) {
890 waiting_on_dentry
.clear();
895 void CDir::prepare_new_fragment(bool replay
)
897 if (!replay
&& is_auth()) {
901 inode
->add_dirfrag(this);
904 void CDir::finish_old_fragment(list
<MDSInternalContextBase
*>& waiters
, bool replay
)
906 // take waiters _before_ unfreeze...
908 take_waiting(WAIT_ANY_MASK
, waiters
);
910 auth_unpin(this); // pinned in prepare_old_fragment
911 assert(is_frozen_dir());
916 assert(nested_auth_pins
== 0);
917 assert(dir_auth_pins
== 0);
918 assert(auth_pins
== 0);
920 num_head_items
= num_head_null
= 0;
921 num_snap_items
= num_snap_null
= 0;
923 // this mirrors init_fragment_pins()
928 if (state_test(STATE_IMPORTBOUND
))
929 put(PIN_IMPORTBOUND
);
930 if (state_test(STATE_EXPORTBOUND
))
931 put(PIN_EXPORTBOUND
);
932 if (is_subtree_root())
938 assert(get_num_ref() == (state_test(STATE_STICKY
) ? 1:0));
941 void CDir::init_fragment_pins()
945 if (state_test(STATE_DIRTY
))
947 if (state_test(STATE_EXPORTBOUND
))
948 get(PIN_EXPORTBOUND
);
949 if (state_test(STATE_IMPORTBOUND
))
950 get(PIN_IMPORTBOUND
);
951 if (is_subtree_root())
955 void CDir::split(int bits
, list
<CDir
*>& subs
, list
<MDSInternalContextBase
*>& waiters
, bool replay
)
957 dout(10) << "split by " << bits
<< " bits on " << *this << dendl
;
959 assert(replay
|| is_complete() || !is_auth());
962 frag
.split(bits
, frags
);
964 vector
<CDir
*> subfrags(1 << bits
);
966 double fac
= 1.0 / (double)(1 << bits
); // for scaling load vecs
968 version_t rstat_version
= inode
->get_projected_inode()->rstat
.version
;
969 version_t dirstat_version
= inode
->get_projected_inode()->dirstat
.version
;
971 nest_info_t rstatdiff
;
972 frag_info_t fragstatdiff
;
973 if (fnode
.accounted_rstat
.version
== rstat_version
)
974 rstatdiff
.add_delta(fnode
.accounted_rstat
, fnode
.rstat
);
975 if (fnode
.accounted_fragstat
.version
== dirstat_version
)
976 fragstatdiff
.add_delta(fnode
.accounted_fragstat
, fnode
.fragstat
);
977 dout(10) << " rstatdiff " << rstatdiff
<< " fragstatdiff " << fragstatdiff
<< dendl
;
979 map
<string_snap_t
, std::list
<MDSInternalContextBase
*> > dentry_waiters
;
980 prepare_old_fragment(dentry_waiters
, replay
);
982 // create subfrag dirs
984 for (list
<frag_t
>::iterator p
= frags
.begin(); p
!= frags
.end(); ++p
) {
985 CDir
*f
= new CDir(inode
, *p
, cache
, is_auth());
986 f
->state_set(state
& (MASK_STATE_FRAGMENT_KEPT
| STATE_COMPLETE
));
987 f
->get_replicas() = get_replicas();
988 f
->set_version(get_version());
990 f
->pop_me
.scale(fac
);
992 // FIXME; this is an approximation
993 f
->pop_nested
= pop_nested
;
994 f
->pop_nested
.scale(fac
);
995 f
->pop_auth_subtree
= pop_auth_subtree
;
996 f
->pop_auth_subtree
.scale(fac
);
997 f
->pop_auth_subtree_nested
= pop_auth_subtree_nested
;
998 f
->pop_auth_subtree_nested
.scale(fac
);
1000 dout(10) << " subfrag " << *p
<< " " << *f
<< dendl
;
1004 f
->set_dir_auth(get_dir_auth());
1005 f
->prepare_new_fragment(replay
);
1006 f
->init_fragment_pins();
1009 // repartition dentries
1010 while (!items
.empty()) {
1011 auto p
= items
.begin();
1013 CDentry
*dn
= p
->second
;
1014 frag_t subfrag
= inode
->pick_dirfrag(dn
->get_name());
1015 int n
= (subfrag
.value() & (subfrag
.mask() ^ frag
.mask())) >> subfrag
.mask_shift();
1016 dout(15) << " subfrag " << subfrag
<< " n=" << n
<< " for " << p
->first
<< dendl
;
1017 CDir
*f
= subfrags
[n
];
1018 f
->steal_dentry(dn
);
1021 for (const auto &p
: dentry_waiters
) {
1022 frag_t subfrag
= inode
->pick_dirfrag(p
.first
.name
);
1023 int n
= (subfrag
.value() & (subfrag
.mask() ^ frag
.mask())) >> subfrag
.mask_shift();
1024 CDir
*f
= subfrags
[n
];
1026 if (f
->waiting_on_dentry
.empty())
1027 f
->get(PIN_DNWAITER
);
1028 auto &e
= f
->waiting_on_dentry
[p
.first
];
1029 for (const auto &waiter
: p
.second
) {
1030 e
.push_back(waiter
);
1034 // FIXME: handle dirty old rstat
1036 // fix up new frag fragstats
1037 for (int i
=0; i
<n
; i
++) {
1038 CDir
*f
= subfrags
[i
];
1039 f
->fnode
.rstat
.version
= rstat_version
;
1040 f
->fnode
.accounted_rstat
= f
->fnode
.rstat
;
1041 f
->fnode
.fragstat
.version
= dirstat_version
;
1042 f
->fnode
.accounted_fragstat
= f
->fnode
.fragstat
;
1043 dout(10) << " rstat " << f
->fnode
.rstat
<< " fragstat " << f
->fnode
.fragstat
1044 << " on " << *f
<< dendl
;
1047 // give any outstanding frag stat differential to first frag
1048 dout(10) << " giving rstatdiff " << rstatdiff
<< " fragstatdiff" << fragstatdiff
1049 << " to " << *subfrags
[0] << dendl
;
1050 subfrags
[0]->fnode
.accounted_rstat
.add(rstatdiff
);
1051 subfrags
[0]->fnode
.accounted_fragstat
.add(fragstatdiff
);
1053 finish_old_fragment(waiters
, replay
);
1056 void CDir::merge(list
<CDir
*>& subs
, list
<MDSInternalContextBase
*>& waiters
, bool replay
)
1058 dout(10) << "merge " << subs
<< dendl
;
1060 mds_authority_t new_auth
= CDIR_AUTH_DEFAULT
;
1061 for (auto dir
: subs
) {
1062 if (dir
->get_dir_auth() != CDIR_AUTH_DEFAULT
&&
1063 dir
->get_dir_auth() != new_auth
) {
1064 assert(new_auth
== CDIR_AUTH_DEFAULT
);
1065 new_auth
= dir
->get_dir_auth();
1069 set_dir_auth(new_auth
);
1070 prepare_new_fragment(replay
);
1072 nest_info_t rstatdiff
;
1073 frag_info_t fragstatdiff
;
1074 bool touched_mtime
, touched_chattr
;
1075 version_t rstat_version
= inode
->get_projected_inode()->rstat
.version
;
1076 version_t dirstat_version
= inode
->get_projected_inode()->dirstat
.version
;
1078 map
<string_snap_t
, std::list
<MDSInternalContextBase
*> > dentry_waiters
;
1080 for (auto dir
: subs
) {
1081 dout(10) << " subfrag " << dir
->get_frag() << " " << *dir
<< dendl
;
1082 assert(!dir
->is_auth() || dir
->is_complete() || replay
);
1084 if (dir
->fnode
.accounted_rstat
.version
== rstat_version
)
1085 rstatdiff
.add_delta(dir
->fnode
.accounted_rstat
, dir
->fnode
.rstat
);
1086 if (dir
->fnode
.accounted_fragstat
.version
== dirstat_version
)
1087 fragstatdiff
.add_delta(dir
->fnode
.accounted_fragstat
, dir
->fnode
.fragstat
,
1088 &touched_mtime
, &touched_chattr
);
1090 dir
->prepare_old_fragment(dentry_waiters
, replay
);
1093 while (!dir
->items
.empty())
1094 steal_dentry(dir
->items
.begin()->second
);
1096 // merge replica map
1097 for (const auto &p
: dir
->get_replicas()) {
1098 unsigned cur
= get_replicas()[p
.first
];
1100 get_replicas()[p
.first
] = p
.second
;
1104 if (dir
->get_version() > get_version())
1105 set_version(dir
->get_version());
1108 state_set(dir
->get_state() & MASK_STATE_FRAGMENT_KEPT
);
1110 dir
->finish_old_fragment(waiters
, replay
);
1111 inode
->close_dirfrag(dir
->get_frag());
1114 if (!dentry_waiters
.empty()) {
1116 for (const auto &p
: dentry_waiters
) {
1117 auto &e
= waiting_on_dentry
[p
.first
];
1118 for (const auto &waiter
: p
.second
) {
1119 e
.push_back(waiter
);
1124 if (is_auth() && !replay
)
1127 // FIXME: merge dirty old rstat
1128 fnode
.rstat
.version
= rstat_version
;
1129 fnode
.accounted_rstat
= fnode
.rstat
;
1130 fnode
.accounted_rstat
.add(rstatdiff
);
1132 fnode
.fragstat
.version
= dirstat_version
;
1133 fnode
.accounted_fragstat
= fnode
.fragstat
;
1134 fnode
.accounted_fragstat
.add(fragstatdiff
);
1136 init_fragment_pins();
1142 void CDir::resync_accounted_fragstat()
1144 fnode_t
*pf
= get_projected_fnode();
1145 auto pi
= inode
->get_projected_inode();
1147 if (pf
->accounted_fragstat
.version
!= pi
->dirstat
.version
) {
1148 pf
->fragstat
.version
= pi
->dirstat
.version
;
1149 dout(10) << "resync_accounted_fragstat " << pf
->accounted_fragstat
<< " -> " << pf
->fragstat
<< dendl
;
1150 pf
->accounted_fragstat
= pf
->fragstat
;
1155 * resync rstat and accounted_rstat with inode
1157 void CDir::resync_accounted_rstat()
1159 fnode_t
*pf
= get_projected_fnode();
1160 auto pi
= inode
->get_projected_inode();
1162 if (pf
->accounted_rstat
.version
!= pi
->rstat
.version
) {
1163 pf
->rstat
.version
= pi
->rstat
.version
;
1164 dout(10) << "resync_accounted_rstat " << pf
->accounted_rstat
<< " -> " << pf
->rstat
<< dendl
;
1165 pf
->accounted_rstat
= pf
->rstat
;
1166 dirty_old_rstat
.clear();
1170 void CDir::assimilate_dirty_rstat_inodes()
1172 dout(10) << "assimilate_dirty_rstat_inodes" << dendl
;
1173 for (elist
<CInode
*>::iterator p
= dirty_rstat_inodes
.begin_use_current();
1176 assert(in
->is_auth());
1177 if (in
->is_frozen())
1180 auto &pi
= in
->project_inode();
1181 pi
.inode
.version
= in
->pre_dirty();
1183 inode
->mdcache
->project_rstat_inode_to_frag(in
, this, 0, 0, NULL
);
1185 state_set(STATE_ASSIMRSTAT
);
1186 dout(10) << "assimilate_dirty_rstat_inodes done" << dendl
;
1189 void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef
& mut
, EMetaBlob
*blob
)
1191 if (!state_test(STATE_ASSIMRSTAT
))
1193 state_clear(STATE_ASSIMRSTAT
);
1194 dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl
;
1195 elist
<CInode
*>::iterator p
= dirty_rstat_inodes
.begin_use_current();
1200 if (in
->is_frozen())
1203 CDentry
*dn
= in
->get_projected_parent_dn();
1206 mut
->add_projected_inode(in
);
1208 in
->clear_dirty_rstat();
1209 blob
->add_primary_dentry(dn
, in
, true);
1212 if (!dirty_rstat_inodes
.empty())
1213 inode
->mdcache
->mds
->locker
->mark_updated_scatterlock(&inode
->nestlock
);
1219 /****************************************
1223 void CDir::add_dentry_waiter(boost::string_view dname
, snapid_t snapid
, MDSInternalContextBase
*c
)
1225 if (waiting_on_dentry
.empty())
1227 waiting_on_dentry
[string_snap_t(dname
, snapid
)].push_back(c
);
1228 dout(10) << "add_dentry_waiter dentry " << dname
1229 << " snap " << snapid
1230 << " " << c
<< " on " << *this << dendl
;
1233 void CDir::take_dentry_waiting(boost::string_view dname
, snapid_t first
, snapid_t last
,
1234 list
<MDSInternalContextBase
*>& ls
)
1236 if (waiting_on_dentry
.empty())
1239 string_snap_t
lb(dname
, first
);
1240 string_snap_t
ub(dname
, last
);
1241 auto it
= waiting_on_dentry
.lower_bound(lb
);
1242 while (it
!= waiting_on_dentry
.end() &&
1243 !(ub
< it
->first
)) {
1244 dout(10) << "take_dentry_waiting dentry " << dname
1245 << " [" << first
<< "," << last
<< "] found waiter on snap "
1247 << " on " << *this << dendl
;
1248 for (const auto &waiter
: it
->second
) {
1249 ls
.push_back(waiter
);
1251 waiting_on_dentry
.erase(it
++);
1254 if (waiting_on_dentry
.empty())
1258 void CDir::take_sub_waiting(list
<MDSInternalContextBase
*>& ls
)
1260 dout(10) << "take_sub_waiting" << dendl
;
1261 if (!waiting_on_dentry
.empty()) {
1262 for (const auto &p
: waiting_on_dentry
) {
1263 for (const auto &waiter
: p
.second
) {
1264 ls
.push_back(waiter
);
1267 waiting_on_dentry
.clear();
1274 void CDir::add_waiter(uint64_t tag
, MDSInternalContextBase
*c
)
1279 if (tag
& WAIT_ATFREEZEROOT
) {
1280 if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1281 is_freezing_dir() || is_frozen_dir())) {
1283 dout(10) << "add_waiter " << std::hex
<< tag
<< std::dec
<< " " << c
<< " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl
;
1284 inode
->parent
->dir
->add_waiter(tag
, c
);
1290 if (tag
& WAIT_ATSUBTREEROOT
) {
1291 if (!is_subtree_root()) {
1293 dout(10) << "add_waiter " << std::hex
<< tag
<< std::dec
<< " " << c
<< " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl
;
1294 inode
->parent
->dir
->add_waiter(tag
, c
);
1299 assert(!(tag
& WAIT_CREATED
) || state_test(STATE_CREATING
));
1301 MDSCacheObject::add_waiter(tag
, c
);
1306 /* NOTE: this checks dentry waiters too */
1307 void CDir::take_waiting(uint64_t mask
, list
<MDSInternalContextBase
*>& ls
)
1309 if ((mask
& WAIT_DENTRY
) && !waiting_on_dentry
.empty()) {
1310 // take all dentry waiters
1311 for (const auto &p
: waiting_on_dentry
) {
1312 dout(10) << "take_waiting dentry " << p
.first
.name
1313 << " snap " << p
.first
.snapid
<< " on " << *this << dendl
;
1314 for (const auto &waiter
: p
.second
) {
1315 ls
.push_back(waiter
);
1318 waiting_on_dentry
.clear();
1323 MDSCacheObject::take_waiting(mask
, ls
);
1327 void CDir::finish_waiting(uint64_t mask
, int result
)
1329 dout(11) << "finish_waiting mask " << hex
<< mask
<< dec
<< " result " << result
<< " on " << *this << dendl
;
1331 list
<MDSInternalContextBase
*> finished
;
1332 take_waiting(mask
, finished
);
1334 finish_contexts(g_ceph_context
, finished
, result
);
1336 cache
->mds
->queue_waiters(finished
);
1343 fnode_t
*CDir::project_fnode()
1345 assert(get_version() != 0);
1346 projected_fnode
.emplace_back(*get_projected_fnode());
1347 auto &p
= projected_fnode
.back();
1349 if (scrub_infop
&& scrub_infop
->last_scrub_dirty
) {
1350 p
.localized_scrub_stamp
= scrub_infop
->last_local
.time
;
1351 p
.localized_scrub_version
= scrub_infop
->last_local
.version
;
1352 p
.recursive_scrub_stamp
= scrub_infop
->last_recursive
.time
;
1353 p
.recursive_scrub_version
= scrub_infop
->last_recursive
.version
;
1354 scrub_infop
->last_scrub_dirty
= false;
1355 scrub_maybe_delete_info();
1358 dout(10) << __func__
<< " " << &p
<< dendl
;
1362 void CDir::pop_and_dirty_projected_fnode(LogSegment
*ls
)
1364 assert(!projected_fnode
.empty());
1365 auto &front
= projected_fnode
.front();
1366 dout(15) << __func__
<< " " << &front
<< " v" << front
.version
<< dendl
;
1369 projected_fnode
.pop_front();
1373 version_t
CDir::pre_dirty(version_t min
)
1375 if (min
> projected_version
)
1376 projected_version
= min
;
1377 ++projected_version
;
1378 dout(10) << "pre_dirty " << projected_version
<< dendl
;
1379 return projected_version
;
1382 void CDir::mark_dirty(version_t pv
, LogSegment
*ls
)
1384 assert(get_version() < pv
);
1385 assert(pv
<= projected_version
);
1390 void CDir::_mark_dirty(LogSegment
*ls
)
1392 if (!state_test(STATE_DIRTY
)) {
1393 dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl
;
1397 dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl
;
1400 ls
->dirty_dirfrags
.push_back(&item_dirty
);
1402 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1403 if (committed_version
== 0 && !item_new
.is_on_list())
1404 ls
->new_dirfrags
.push_back(&item_new
);
1408 void CDir::mark_new(LogSegment
*ls
)
1410 ls
->new_dirfrags
.push_back(&item_new
);
1411 state_clear(STATE_CREATING
);
1413 list
<MDSInternalContextBase
*> waiters
;
1414 take_waiting(CDir::WAIT_CREATED
, waiters
);
1415 cache
->mds
->queue_waiters(waiters
);
1418 void CDir::mark_clean()
1420 dout(10) << "mark_clean " << *this << " version " << get_version() << dendl
;
1421 if (state_test(STATE_DIRTY
)) {
1422 item_dirty
.remove_myself();
1423 item_new
.remove_myself();
1425 state_clear(STATE_DIRTY
);
1430 // caller should hold auth pin of this
1431 void CDir::log_mark_dirty()
1433 if (is_dirty() || projected_version
> get_version())
1434 return; // noop if it is already dirty or will be dirty
1436 version_t pv
= pre_dirty();
1437 mark_dirty(pv
, cache
->mds
->mdlog
->get_current_segment());
1440 void CDir::mark_complete() {
1441 state_set(STATE_COMPLETE
);
1445 void CDir::first_get()
1447 inode
->get(CInode::PIN_DIRFRAG
);
1450 void CDir::last_put()
1452 inode
->put(CInode::PIN_DIRFRAG
);
1457 /******************************************************************************
1461 // -----------------------
1463 void CDir::fetch(MDSInternalContextBase
*c
, bool ignore_authpinnability
)
1466 return fetch(c
, want
, ignore_authpinnability
);
1469 void CDir::fetch(MDSInternalContextBase
*c
, boost::string_view want_dn
, bool ignore_authpinnability
)
1471 dout(10) << "fetch on " << *this << dendl
;
1474 assert(!is_complete());
1476 if (!can_auth_pin() && !ignore_authpinnability
) {
1478 dout(7) << "fetch waiting for authpinnable" << dendl
;
1479 add_waiter(WAIT_UNFREEZE
, c
);
1481 dout(7) << "fetch not authpinnable and no context" << dendl
;
1485 // unlinked directory inode shouldn't have any entry
1486 if (!inode
->is_base() && get_parent_dir()->inode
->is_stray() &&
1487 !inode
->snaprealm
) {
1488 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl
;
1489 if (get_version() == 0) {
1490 assert(inode
->is_auth());
1493 if (state_test(STATE_REJOINUNDEF
)) {
1494 assert(cache
->mds
->is_rejoin());
1495 state_clear(STATE_REJOINUNDEF
);
1496 cache
->opened_undef_dirfrag(this);
1502 cache
->mds
->queue_waiter(c
);
1506 if (c
) add_waiter(WAIT_COMPLETE
, c
);
1507 if (!want_dn
.empty()) wanted_items
.insert(mempool::mds_co::string(want_dn
));
1509 // already fetching?
1510 if (state_test(CDir::STATE_FETCHING
)) {
1511 dout(7) << "already fetching; waiting" << dendl
;
1516 state_set(CDir::STATE_FETCHING
);
1518 if (cache
->mds
->logger
) cache
->mds
->logger
->inc(l_mds_dir_fetch
);
1520 std::set
<dentry_key_t
> empty
;
1521 _omap_fetch(NULL
, empty
);
1524 void CDir::fetch(MDSInternalContextBase
*c
, const std::set
<dentry_key_t
>& keys
)
1526 dout(10) << "fetch " << keys
.size() << " keys on " << *this << dendl
;
1529 assert(!is_complete());
1531 if (!can_auth_pin()) {
1532 dout(7) << "fetch keys waiting for authpinnable" << dendl
;
1533 add_waiter(WAIT_UNFREEZE
, c
);
1536 if (state_test(CDir::STATE_FETCHING
)) {
1537 dout(7) << "fetch keys waiting for full fetch" << dendl
;
1538 add_waiter(WAIT_COMPLETE
, c
);
1543 if (cache
->mds
->logger
) cache
->mds
->logger
->inc(l_mds_dir_fetch
);
1545 _omap_fetch(c
, keys
);
1548 class C_IO_Dir_OMAP_FetchedMore
: public CDirIOContext
{
1549 MDSInternalContextBase
*fin
;
1553 map
<string
, bufferlist
> omap
; ///< carry-over from before
1554 map
<string
, bufferlist
> omap_more
; ///< new batch
1556 C_IO_Dir_OMAP_FetchedMore(CDir
*d
, MDSInternalContextBase
*f
) :
1557 CDirIOContext(d
), fin(f
), ret(0) { }
1558 void finish(int r
) {
1561 omap
.swap(omap_more
);
1563 omap
.insert(omap_more
.begin(), omap_more
.end());
1566 dir
->_omap_fetch_more(hdrbl
, omap
, fin
);
1568 dir
->_omap_fetched(hdrbl
, omap
, !fin
, r
);
1575 class C_IO_Dir_OMAP_Fetched
: public CDirIOContext
{
1576 MDSInternalContextBase
*fin
;
1580 map
<string
, bufferlist
> omap
;
1582 int ret1
, ret2
, ret3
;
1584 C_IO_Dir_OMAP_Fetched(CDir
*d
, MDSInternalContextBase
*f
) :
1585 CDirIOContext(d
), fin(f
), ret1(0), ret2(0), ret3(0) { }
1586 void finish(int r
) override
{
1587 // check the correctness of backtrace
1588 if (r
>= 0 && ret3
!= -ECANCELED
)
1589 dir
->inode
->verify_diri_backtrace(btbl
, ret3
);
1590 if (r
>= 0) r
= ret1
;
1591 if (r
>= 0) r
= ret2
;
1593 dir
->_omap_fetch_more(hdrbl
, omap
, fin
);
1595 dir
->_omap_fetched(hdrbl
, omap
, !fin
, r
);
1602 void CDir::_omap_fetch(MDSInternalContextBase
*c
, const std::set
<dentry_key_t
>& keys
)
1604 C_IO_Dir_OMAP_Fetched
*fin
= new C_IO_Dir_OMAP_Fetched(this, c
);
1605 object_t oid
= get_ondisk_object();
1606 object_locator_t
oloc(cache
->mds
->mdsmap
->get_metadata_pool());
1608 rd
.omap_get_header(&fin
->hdrbl
, &fin
->ret1
);
1611 rd
.omap_get_vals("", "", g_conf
->mds_dir_keys_per_op
,
1612 &fin
->omap
, &fin
->more
, &fin
->ret2
);
1615 std::set
<std::string
> str_keys
;
1616 for (auto p
: keys
) {
1619 str_keys
.insert(str
);
1621 rd
.omap_get_vals_by_keys(str_keys
, &fin
->omap
, &fin
->ret2
);
1623 // check the correctness of backtrace
1624 if (g_conf
->mds_verify_backtrace
> 0 && frag
== frag_t()) {
1625 rd
.getxattr("parent", &fin
->btbl
, &fin
->ret3
);
1626 rd
.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1628 fin
->ret3
= -ECANCELED
;
1631 cache
->mds
->objecter
->read(oid
, oloc
, rd
, CEPH_NOSNAP
, NULL
, 0,
1632 new C_OnFinisher(fin
, cache
->mds
->finisher
));
1635 void CDir::_omap_fetch_more(
1637 map
<string
, bufferlist
>& omap
,
1638 MDSInternalContextBase
*c
)
1640 // we have more omap keys to fetch!
1641 object_t oid
= get_ondisk_object();
1642 object_locator_t
oloc(cache
->mds
->mdsmap
->get_metadata_pool());
1643 C_IO_Dir_OMAP_FetchedMore
*fin
= new C_IO_Dir_OMAP_FetchedMore(this, c
);
1644 fin
->hdrbl
.claim(hdrbl
);
1645 fin
->omap
.swap(omap
);
1647 rd
.omap_get_vals(fin
->omap
.rbegin()->first
,
1648 "", /* filter prefix */
1649 g_conf
->mds_dir_keys_per_op
,
1653 cache
->mds
->objecter
->read(oid
, oloc
, rd
, CEPH_NOSNAP
, NULL
, 0,
1654 new C_OnFinisher(fin
, cache
->mds
->finisher
));
1657 CDentry
*CDir::_load_dentry(
1658 boost::string_view key
,
1659 boost::string_view dname
,
1660 const snapid_t last
,
1663 const std::set
<snapid_t
> *snaps
,
1666 bufferlist::iterator q
= bl
.begin();
1675 dout(20) << "_fetched pos " << pos
<< " marker '" << type
<< "' dname '" << dname
1676 << " [" << first
<< "," << last
<< "]"
1680 if (snaps
&& last
!= CEPH_NOSNAP
) {
1681 set
<snapid_t
>::const_iterator p
= snaps
->lower_bound(first
);
1682 if (p
== snaps
->end() || *p
> last
) {
1683 dout(10) << " skipping stale dentry on [" << first
<< "," << last
<< "]" << dendl
;
1689 * look for existing dentry for _last_ snap, because unlink +
1690 * create may leave a "hole" (epochs during which the dentry
1691 * doesn't exist) but for which no explicit negative dentry is in
1696 dn
= lookup_exact_snap(dname
, last
);
1698 dn
= lookup(dname
, last
);
1703 unsigned char d_type
;
1705 ::decode(d_type
, q
);
1709 stale_items
.insert(mempool::mds_co::string(key
));
1710 *force_dirty
= true;
1716 CDentry::linkage_t
*dnl
= dn
->get_linkage();
1717 dout(12) << "_fetched had " << (dnl
->is_null() ? "NEG" : "") << " dentry " << *dn
<< dendl
;
1718 if (committed_version
== 0 &&
1721 ino
== dnl
->get_remote_ino() &&
1722 d_type
== dnl
->get_remote_d_type()) {
1723 // see comment below
1724 dout(10) << "_fetched had underwater dentry " << *dn
<< ", marking clean" << dendl
;
1729 dn
= add_remote_dentry(dname
, ino
, d_type
, first
, last
);
1732 CInode
*in
= cache
->get_inode(ino
); // we may or may not have it.
1734 dn
->link_remote(dn
->get_linkage(), in
);
1735 dout(12) << "_fetched got remote link " << ino
<< " which we have " << *in
<< dendl
;
1737 dout(12) << "_fetched got remote link " << ino
<< " (dont' have it)" << dendl
;
1741 else if (type
== 'I') {
1744 // Load inode data before looking up or constructing CInode
1745 InodeStore inode_data
;
1746 inode_data
.decode_bare(q
);
1750 stale_items
.insert(mempool::mds_co::string(key
));
1751 *force_dirty
= true;
1756 bool undef_inode
= false;
1758 CDentry::linkage_t
*dnl
= dn
->get_linkage();
1759 dout(12) << "_fetched had " << (dnl
->is_null() ? "NEG" : "") << " dentry " << *dn
<< dendl
;
1761 if (dnl
->is_primary()) {
1762 CInode
*in
= dnl
->get_inode();
1763 if (in
->state_test(CInode::STATE_REJOINUNDEF
)) {
1765 } else if (committed_version
== 0 &&
1767 inode_data
.inode
.ino
== in
->ino() &&
1768 inode_data
.inode
.version
== in
->get_version()) {
1769 /* clean underwater item?
1770 * Underwater item is something that is dirty in our cache from
1771 * journal replay, but was previously flushed to disk before the
1774 * We only do this is committed_version == 0. that implies either
1775 * - this is a fetch after from a clean/empty CDir is created
1776 * (and has no effect, since the dn won't exist); or
1777 * - this is a fetch after _recovery_, which is what we're worried
1778 * about. Items that are marked dirty from the journal should be
1779 * marked clean if they appear on disk.
1781 dout(10) << "_fetched had underwater dentry " << *dn
<< ", marking clean" << dendl
;
1783 dout(10) << "_fetched had underwater inode " << *dnl
->get_inode() << ", marking clean" << dendl
;
1789 if (!dn
|| undef_inode
) {
1791 CInode
*in
= cache
->get_inode(inode_data
.inode
.ino
, last
);
1792 if (!in
|| undef_inode
) {
1793 if (undef_inode
&& in
)
1796 in
= new CInode(cache
, true, first
, last
);
1798 in
->inode
= inode_data
.inode
;
1800 if (in
->is_symlink())
1801 in
->symlink
= inode_data
.symlink
;
1803 in
->dirfragtree
.swap(inode_data
.dirfragtree
);
1804 in
->xattrs
.swap(inode_data
.xattrs
);
1805 in
->old_inodes
.swap(inode_data
.old_inodes
);
1806 if (!in
->old_inodes
.empty()) {
1807 snapid_t min_first
= in
->old_inodes
.rbegin()->first
+ 1;
1808 if (min_first
> in
->first
)
1809 in
->first
= min_first
;
1812 in
->oldest_snap
= inode_data
.oldest_snap
;
1813 in
->decode_snap_blob(inode_data
.snap_blob
);
1814 if (snaps
&& !in
->snaprealm
)
1815 in
->purge_stale_snap_data(*snaps
);
1818 cache
->add_inode(in
); // add
1819 dn
= add_primary_dentry(dname
, in
, first
, last
); // link
1821 dout(12) << "_fetched got " << *dn
<< " " << *in
<< dendl
;
1823 if (in
->inode
.is_dirty_rstat())
1824 in
->mark_dirty_rstat();
1826 //in->hack_accessed = false;
1827 //in->hack_load_stamp = ceph_clock_now();
1828 //num_new_inodes_loaded++;
1829 } else if (g_conf
->get_val
<bool>("mds_hack_allow_loading_invalid_metadata")) {
1830 dout(20) << "hack: adding duplicate dentry for " << *in
<< dendl
;
1831 dn
= add_primary_dentry(dname
, in
, first
, last
);
1833 dout(0) << "_fetched badness: got (but i already had) " << *in
1834 << " mode " << in
->inode
.mode
1835 << " mtime " << in
->inode
.mtime
<< dendl
;
1836 string dirpath
, inopath
;
1837 this->inode
->make_path_string(dirpath
);
1838 in
->make_path_string(inopath
);
1839 cache
->mds
->clog
->error() << "loaded dup inode " << inode_data
.inode
.ino
1840 << " [" << first
<< "," << last
<< "] v" << inode_data
.inode
.version
1841 << " at " << dirpath
<< "/" << dname
1842 << ", but inode " << in
->vino() << " v" << in
->inode
.version
1843 << " already exists at " << inopath
;
1848 std::ostringstream oss
;
1849 oss
<< "Invalid tag char '" << type
<< "' pos " << pos
;
1850 throw buffer::malformed_input(oss
.str());
1856 void CDir::_omap_fetched(bufferlist
& hdrbl
, map
<string
, bufferlist
>& omap
,
1857 bool complete
, int r
)
1859 LogChannelRef clog
= cache
->mds
->clog
;
1860 dout(10) << "_fetched header " << hdrbl
.length() << " bytes "
1861 << omap
.size() << " keys for " << *this << dendl
;
1863 assert(r
== 0 || r
== -ENOENT
|| r
== -ENODATA
);
1865 assert(!is_frozen());
1867 if (hdrbl
.length() == 0) {
1868 dout(0) << "_fetched missing object for " << *this << dendl
;
1870 clog
->error() << "dir " << dirfrag() << " object missing on disk; some "
1871 "files may be lost (" << get_path() << ")";
1879 bufferlist::iterator p
= hdrbl
.begin();
1881 ::decode(got_fnode
, p
);
1882 } catch (const buffer::error
&err
) {
1883 derr
<< "Corrupt fnode in dirfrag " << dirfrag()
1884 << ": " << err
<< dendl
;
1885 clog
->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1886 << err
<< " (" << get_path() << ")";
1891 clog
->warn() << "header buffer of dir " << dirfrag() << " has "
1892 << hdrbl
.length() - p
.get_off() << " extra bytes ("
1893 << get_path() << ")";
1899 dout(10) << "_fetched version " << got_fnode
.version
<< dendl
;
1901 // take the loaded fnode?
1902 // only if we are a fresh CDir* with no prior state.
1903 if (get_version() == 0) {
1904 assert(!is_projected());
1905 assert(!state_test(STATE_COMMITTING
));
1907 projected_version
= committing_version
= committed_version
= got_fnode
.version
;
1909 if (state_test(STATE_REJOINUNDEF
)) {
1910 assert(cache
->mds
->is_rejoin());
1911 state_clear(STATE_REJOINUNDEF
);
1912 cache
->opened_undef_dirfrag(this);
1916 list
<CInode
*> undef_inodes
;
1918 // purge stale snaps?
1919 // only if we have past_parents open!
1920 bool force_dirty
= false;
1921 const set
<snapid_t
> *snaps
= NULL
;
1922 SnapRealm
*realm
= inode
->find_snaprealm();
1923 if (!realm
->have_past_parents_open()) {
1924 dout(10) << " no snap purge, one or more past parents NOT open" << dendl
;
1925 } else if (fnode
.snap_purged_thru
< realm
->get_last_destroyed()) {
1926 snaps
= &realm
->get_snaps();
1927 dout(10) << " snap_purged_thru " << fnode
.snap_purged_thru
1928 << " < " << realm
->get_last_destroyed()
1929 << ", snap purge based on " << *snaps
<< dendl
;
1930 if (get_num_snap_items() == 0) {
1931 fnode
.snap_purged_thru
= realm
->get_last_destroyed();
1936 unsigned pos
= omap
.size() - 1;
1937 for (map
<string
, bufferlist
>::reverse_iterator p
= omap
.rbegin();
1942 dentry_key_t::decode_helper(p
->first
, dname
, last
);
1947 p
->first
, dname
, last
, p
->second
, pos
, snaps
,
1949 } catch (const buffer::error
&err
) {
1950 cache
->mds
->clog
->warn() << "Corrupt dentry '" << dname
<< "' in "
1951 "dir frag " << dirfrag() << ": "
1952 << err
<< "(" << get_path() << ")";
1954 // Remember that this dentry is damaged. Subsequent operations
1955 // that try to act directly on it will get their EIOs, but this
1956 // dirfrag as a whole will continue to look okay (minus the
1957 // mysteriously-missing dentry)
1958 go_bad_dentry(last
, dname
);
1960 // Anyone who was WAIT_DENTRY for this guy will get kicked
1961 // to RetryRequest, and hit the DamageTable-interrogating path.
1962 // Stats will now be bogus because we will think we're complete,
1963 // but have 1 or more missing dentries.
1970 CDentry::linkage_t
*dnl
= dn
->get_linkage();
1971 if (dnl
->is_primary() && dnl
->get_inode()->state_test(CInode::STATE_REJOINUNDEF
))
1972 undef_inodes
.push_back(dnl
->get_inode());
1974 if (!complete
|| wanted_items
.count(mempool::mds_co::string(boost::string_view(dname
))) > 0) {
1975 dout(10) << " touching wanted dn " << *dn
<< dendl
;
1976 inode
->mdcache
->touch_dentry(dn
);
1980 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1982 // mark complete, !fetching
1984 wanted_items
.clear();
1986 state_clear(STATE_FETCHING
);
1988 if (scrub_infop
&& scrub_infop
->need_scrub_local
) {
1989 scrub_infop
->need_scrub_local
= false;
1994 // open & force frags
1995 while (!undef_inodes
.empty()) {
1996 CInode
*in
= undef_inodes
.front();
1997 undef_inodes
.pop_front();
1998 in
->state_clear(CInode::STATE_REJOINUNDEF
);
1999 cache
->opened_undef_inode(in
);
2002 // dirty myself to remove stale snap dentries
2003 if (force_dirty
&& !inode
->mdcache
->is_readonly())
2010 finish_waiting(WAIT_COMPLETE
, 0);
2014 void CDir::_go_bad()
2016 if (get_version() == 0)
2018 state_set(STATE_BADFRAG
);
2019 // mark complete, !fetching
2021 state_clear(STATE_FETCHING
);
2025 finish_waiting(WAIT_COMPLETE
, -EIO
);
2028 void CDir::go_bad_dentry(snapid_t last
, boost::string_view dname
)
2030 dout(10) << __func__
<< " " << dname
<< dendl
;
2031 std::string
path(get_path());
2033 path
+= std::string(dname
);
2034 const bool fatal
= cache
->mds
->damage_table
.notify_dentry(
2035 inode
->ino(), frag
, last
, dname
, path
);
2037 cache
->mds
->damaged();
2038 ceph_abort(); // unreachable, damaged() respawns us
2042 void CDir::go_bad(bool complete
)
2044 dout(10) << "go_bad " << frag
<< dendl
;
2045 const bool fatal
= cache
->mds
->damage_table
.notify_dirfrag(
2046 inode
->ino(), frag
, get_path());
2048 cache
->mds
->damaged();
2049 ceph_abort(); // unreachable, damaged() respawns us
2058 // -----------------------
2064 * @param want - min version i want committed
2065 * @param c - callback for completion
2067 void CDir::commit(version_t want
, MDSInternalContextBase
*c
, bool ignore_authpinnability
, int op_prio
)
2069 dout(10) << "commit want " << want
<< " on " << *this << dendl
;
2070 if (want
== 0) want
= get_version();
2073 assert(want
<= get_version() || get_version() == 0); // can't commit the future
2074 assert(want
> committed_version
); // the caller is stupid
2076 assert(ignore_authpinnability
|| can_auth_pin());
2078 // note: queue up a noop if necessary, so that we always
2081 c
= new C_MDSInternalNoop
;
2083 // auth_pin on first waiter
2084 if (waiting_for_commit
.empty())
2086 waiting_for_commit
[want
].push_back(c
);
2089 _commit(want
, op_prio
);
2092 class C_IO_Dir_Committed
: public CDirIOContext
{
2095 C_IO_Dir_Committed(CDir
*d
, version_t v
) : CDirIOContext(d
), version(v
) { }
2096 void finish(int r
) override
{
2097 dir
->_committed(r
, version
);
2102 * Flush out the modified dentries in this dir. Keep the bufferlist
2103 * below max_write_size;
2105 void CDir::_omap_commit(int op_prio
)
2107 dout(10) << "_omap_commit" << dendl
;
2109 unsigned max_write_size
= cache
->max_dir_commit_size
;
2110 unsigned write_size
= 0;
2113 op_prio
= CEPH_MSG_PRIO_DEFAULT
;
2116 const set
<snapid_t
> *snaps
= NULL
;
2117 SnapRealm
*realm
= inode
->find_snaprealm();
2118 if (!realm
->have_past_parents_open()) {
2119 dout(10) << " no snap purge, one or more past parents NOT open" << dendl
;
2120 } else if (fnode
.snap_purged_thru
< realm
->get_last_destroyed()) {
2121 snaps
= &realm
->get_snaps();
2122 dout(10) << " snap_purged_thru " << fnode
.snap_purged_thru
2123 << " < " << realm
->get_last_destroyed()
2124 << ", snap purge based on " << *snaps
<< dendl
;
2125 // fnode.snap_purged_thru = realm->get_last_destroyed();
2128 set
<string
> to_remove
;
2129 map
<string
, bufferlist
> to_set
;
2131 C_GatherBuilder
gather(g_ceph_context
,
2132 new C_OnFinisher(new C_IO_Dir_Committed(this,
2134 cache
->mds
->finisher
));
2137 object_t oid
= get_ondisk_object();
2138 object_locator_t
oloc(cache
->mds
->mdsmap
->get_metadata_pool());
2140 if (!stale_items
.empty()) {
2141 for (const auto &p
: stale_items
) {
2142 to_remove
.insert(std::string(boost::string_view(p
)));
2143 write_size
+= p
.length();
2145 stale_items
.clear();
2148 auto write_one
= [&](CDentry
*dn
) {
2150 dn
->key().encode(key
);
2152 if (dn
->last
!= CEPH_NOSNAP
&&
2153 snaps
&& try_trim_snap_dentry(dn
, *snaps
)) {
2154 dout(10) << " rm " << key
<< dendl
;
2155 write_size
+= key
.length();
2156 to_remove
.insert(key
);
2160 if (dn
->get_linkage()->is_null()) {
2161 dout(10) << " rm " << dn
->get_name() << " " << *dn
<< dendl
;
2162 write_size
+= key
.length();
2163 to_remove
.insert(key
);
2165 dout(10) << " set " << dn
->get_name() << " " << *dn
<< dendl
;
2167 _encode_dentry(dn
, dnbl
, snaps
);
2168 write_size
+= key
.length() + dnbl
.length();
2169 to_set
[key
].swap(dnbl
);
2172 if (write_size
>= max_write_size
) {
2174 op
.priority
= op_prio
;
2176 // don't create new dirfrag blindly
2177 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING
))
2178 op
.stat(NULL
, (ceph::real_time
*) NULL
, NULL
);
2180 if (!to_set
.empty())
2181 op
.omap_set(to_set
);
2182 if (!to_remove
.empty())
2183 op
.omap_rm_keys(to_remove
);
2185 cache
->mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
2186 ceph::real_clock::now(),
2187 0, gather
.new_sub());
2195 if (state_test(CDir::STATE_FRAGMENTING
)) {
2196 for (auto p
= items
.begin(); p
!= items
.end(); ) {
2197 CDentry
*dn
= p
->second
;
2199 if (!dn
->is_dirty() && dn
->get_linkage()->is_null())
2204 for (auto p
= dirty_dentries
.begin(); !p
.end(); ) {
2212 op
.priority
= op_prio
;
2214 // don't create new dirfrag blindly
2215 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING
))
2216 op
.stat(NULL
, (ceph::real_time
*)NULL
, NULL
);
2219 * save the header at the last moment.. If we were to send it off before other
2220 * updates, but die before sending them all, we'd think that the on-disk state
2221 * was fully committed even though it wasn't! However, since the messages are
2222 * strictly ordered between the MDS and the OSD, and since messages to a given
2223 * PG are strictly ordered, if we simply send the message containing the header
2224 * off last, we cannot get our header into an incorrect state.
2227 ::encode(fnode
, header
);
2228 op
.omap_set_header(header
);
2230 if (!to_set
.empty())
2231 op
.omap_set(to_set
);
2232 if (!to_remove
.empty())
2233 op
.omap_rm_keys(to_remove
);
2235 cache
->mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
2236 ceph::real_clock::now(),
2237 0, gather
.new_sub());
2242 void CDir::_encode_dentry(CDentry
*dn
, bufferlist
& bl
,
2243 const set
<snapid_t
> *snaps
)
2245 // clear dentry NEW flag, if any. we can no longer silently drop it.
2248 ::encode(dn
->first
, bl
);
2250 // primary or remote?
2251 if (dn
->linkage
.is_remote()) {
2252 inodeno_t ino
= dn
->linkage
.get_remote_ino();
2253 unsigned char d_type
= dn
->linkage
.get_remote_d_type();
2254 dout(14) << " pos " << bl
.length() << " dn '" << dn
->get_name() << "' remote ino " << ino
<< dendl
;
2256 // marker, name, ino
2257 bl
.append('L'); // remote link
2259 ::encode(d_type
, bl
);
2260 } else if (dn
->linkage
.is_primary()) {
2262 CInode
*in
= dn
->linkage
.get_inode();
2265 dout(14) << " pos " << bl
.length() << " dn '" << dn
->get_name() << "' inode " << *in
<< dendl
;
2267 // marker, name, inode, [symlink string]
2268 bl
.append('I'); // inode
2270 if (in
->is_multiversion()) {
2271 if (!in
->snaprealm
) {
2273 in
->purge_stale_snap_data(*snaps
);
2274 } else if (in
->snaprealm
->have_past_parents_open()) {
2275 in
->purge_stale_snap_data(in
->snaprealm
->get_snaps());
2279 bufferlist snap_blob
;
2280 in
->encode_snap_blob(snap_blob
);
2281 in
->encode_bare(bl
, cache
->mds
->mdsmap
->get_up_features(), &snap_blob
);
2283 assert(!dn
->linkage
.is_null());
2287 void CDir::_commit(version_t want
, int op_prio
)
2289 dout(10) << "_commit want " << want
<< " on " << *this << dendl
;
2291 // we can't commit things in the future.
2292 // (even the projected future.)
2293 assert(want
<= get_version() || get_version() == 0);
2295 // check pre+postconditions.
2298 // already committed?
2299 if (committed_version
>= want
) {
2300 dout(10) << "already committed " << committed_version
<< " >= " << want
<< dendl
;
2303 // already committing >= want?
2304 if (committing_version
>= want
) {
2305 dout(10) << "already committing " << committing_version
<< " >= " << want
<< dendl
;
2306 assert(state_test(STATE_COMMITTING
));
2310 // alrady committed an older version?
2311 if (committing_version
> committed_version
) {
2312 dout(10) << "already committing older " << committing_version
<< ", waiting for that to finish" << dendl
;
2317 committing_version
= get_version();
2319 // mark committing (if not already)
2320 if (!state_test(STATE_COMMITTING
)) {
2321 dout(10) << "marking committing" << dendl
;
2322 state_set(STATE_COMMITTING
);
2325 if (cache
->mds
->logger
) cache
->mds
->logger
->inc(l_mds_dir_commit
);
2327 _omap_commit(op_prio
);
2334 * @param v version i just committed
2336 void CDir::_committed(int r
, version_t v
)
2339 // the directory could be partly purged during MDS failover
2340 if (r
== -ENOENT
&& committed_version
== 0 &&
2341 !inode
->is_base() && get_parent_dir()->inode
->is_stray()) {
2343 if (inode
->snaprealm
)
2344 inode
->state_set(CInode::STATE_MISSINGOBJS
);
2347 dout(1) << "commit error " << r
<< " v " << v
<< dendl
;
2348 cache
->mds
->clog
->error() << "failed to commit dir " << dirfrag() << " object,"
2350 cache
->mds
->handle_write_error(r
);
2355 dout(10) << "_committed v " << v
<< " on " << *this << dendl
;
2358 bool stray
= inode
->is_stray();
2361 assert(v
> committed_version
);
2362 assert(v
<= committing_version
);
2363 committed_version
= v
;
2365 // _all_ commits done?
2366 if (committing_version
== committed_version
)
2367 state_clear(CDir::STATE_COMMITTING
);
2369 // _any_ commit, even if we've been redirtied, means we're no longer new.
2370 item_new
.remove_myself();
2373 if (committed_version
== get_version())
2377 for (auto p
= dirty_dentries
.begin(); !p
.end(); ) {
2382 if (dn
->linkage
.is_primary()) {
2383 CInode
*in
= dn
->linkage
.get_inode();
2385 assert(in
->is_auth());
2387 if (committed_version
>= in
->get_version()) {
2388 if (in
->is_dirty()) {
2389 dout(15) << " dir " << committed_version
<< " >= inode " << in
->get_version() << " now clean " << *in
<< dendl
;
2393 dout(15) << " dir " << committed_version
<< " < inode " << in
->get_version() << " still dirty " << *in
<< dendl
;
2394 assert(in
->is_dirty() || in
->last
< CEPH_NOSNAP
); // special case for cow snap items (not predirtied)
2399 if (committed_version
>= dn
->get_version()) {
2400 dout(15) << " dir " << committed_version
<< " >= dn " << dn
->get_version() << " now clean " << *dn
<< dendl
;
2403 // drop clean null stray dentries immediately
2405 dn
->get_num_ref() == 0 &&
2406 !dn
->is_projected() &&
2407 dn
->get_linkage()->is_null())
2410 dout(15) << " dir " << committed_version
<< " < dn " << dn
->get_version() << " still dirty " << *dn
<< dendl
;
2411 assert(dn
->is_dirty());
2416 bool were_waiters
= !waiting_for_commit
.empty();
2418 auto it
= waiting_for_commit
.begin();
2419 while (it
!= waiting_for_commit
.end()) {
2422 if (it
->first
> committed_version
) {
2423 dout(10) << " there are waiters for " << it
->first
<< ", committing again" << dendl
;
2424 _commit(it
->first
, -1);
2427 std::list
<MDSInternalContextBase
*> t
;
2428 for (const auto &waiter
: it
->second
)
2429 t
.push_back(waiter
);
2430 cache
->mds
->queue_waiters(t
);
2431 waiting_for_commit
.erase(it
);
2435 // try drop dentries in this dirfrag if it's about to be purged
2436 if (!inode
->is_base() && get_parent_dir()->inode
->is_stray() &&
2438 cache
->maybe_eval_stray(inode
, true);
2440 // unpin if we kicked the last waiter.
2442 waiting_for_commit
.empty())
2451 void CDir::encode_export(bufferlist
& bl
)
2453 assert(!is_projected());
2454 ::encode(first
, bl
);
2455 ::encode(fnode
, bl
);
2456 ::encode(dirty_old_rstat
, bl
);
2457 ::encode(committed_version
, bl
);
2459 ::encode(state
, bl
);
2460 ::encode(dir_rep
, bl
);
2462 ::encode(pop_me
, bl
);
2463 ::encode(pop_auth_subtree
, bl
);
2465 ::encode(dir_rep_by
, bl
);
2466 ::encode(get_replicas(), bl
);
2468 get(PIN_TEMPEXPORTING
);
2471 void CDir::finish_export(utime_t now
)
2473 state
&= MASK_STATE_EXPORT_KEPT
;
2474 pop_auth_subtree_nested
.sub(now
, cache
->decayrate
, pop_auth_subtree
);
2476 pop_auth_subtree
.zero(now
);
2477 put(PIN_TEMPEXPORTING
);
2478 dirty_old_rstat
.clear();
2481 void CDir::decode_import(bufferlist::iterator
& blp
, utime_t now
, LogSegment
*ls
)
2483 ::decode(first
, blp
);
2484 ::decode(fnode
, blp
);
2485 ::decode(dirty_old_rstat
, blp
);
2486 projected_version
= fnode
.version
;
2487 ::decode(committed_version
, blp
);
2488 committing_version
= committed_version
;
2492 state
&= MASK_STATE_IMPORT_KEPT
;
2493 state_set(STATE_AUTH
| (s
& MASK_STATE_EXPORTED
));
2500 ::decode(dir_rep
, blp
);
2502 ::decode(pop_me
, now
, blp
);
2503 ::decode(pop_auth_subtree
, now
, blp
);
2504 pop_auth_subtree_nested
.add(now
, cache
->decayrate
, pop_auth_subtree
);
2506 ::decode(dir_rep_by
, blp
);
2507 ::decode(get_replicas(), blp
);
2508 if (is_replicated()) get(PIN_REPLICATED
);
2510 replica_nonce
= 0; // no longer defined
2512 // did we import some dirty scatterlock data?
2513 if (dirty_old_rstat
.size() ||
2514 !(fnode
.rstat
== fnode
.accounted_rstat
)) {
2515 cache
->mds
->locker
->mark_updated_scatterlock(&inode
->nestlock
);
2516 ls
->dirty_dirfrag_nest
.push_back(&inode
->item_dirty_dirfrag_nest
);
2518 if (!(fnode
.fragstat
== fnode
.accounted_fragstat
)) {
2519 cache
->mds
->locker
->mark_updated_scatterlock(&inode
->filelock
);
2520 ls
->dirty_dirfrag_dir
.push_back(&inode
->item_dirty_dirfrag_dir
);
2522 if (is_dirty_dft()) {
2523 if (inode
->dirfragtreelock
.get_state() != LOCK_MIX
&&
2524 inode
->dirfragtreelock
.is_stable()) {
2525 // clear stale dirtydft
2526 state_clear(STATE_DIRTYDFT
);
2528 cache
->mds
->locker
->mark_updated_scatterlock(&inode
->dirfragtreelock
);
2529 ls
->dirty_dirfrag_dirfragtree
.push_back(&inode
->item_dirty_dirfrag_dirfragtree
);
2537 /********************************
2542 * if dir_auth.first == parent, auth is same as inode.
2543 * unless .second != unknown, in which case that sticks.
2545 mds_authority_t
CDir::authority() const
2547 if (is_subtree_root())
2550 return inode
->authority();
2553 /** is_subtree_root()
2554 * true if this is an auth delegation point.
2555 * that is, dir_auth != default (parent,unknown)
2557 * some key observations:
2559 * - any region bound will be an export, or frozen.
2561 * note that this DOES heed dir_auth.pending
2564 bool CDir::is_subtree_root()
2566 if (dir_auth == CDIR_AUTH_DEFAULT) {
2567 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2568 //<< " on " << ino() << dendl;
2571 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2572 //<< " on " << ino() << dendl;
2579 * true if we are x, or an ancestor of x
2581 bool CDir::contains(CDir
*x
)
2586 x
= x
->get_inode()->get_projected_parent_dir();
2596 void CDir::set_dir_auth(mds_authority_t a
)
2598 dout(10) << "setting dir_auth=" << a
2599 << " from " << dir_auth
2600 << " on " << *this << dendl
;
2602 bool was_subtree
= is_subtree_root();
2603 bool was_ambiguous
= dir_auth
.second
>= 0;
2608 // new subtree root?
2609 if (!was_subtree
&& is_subtree_root()) {
2610 dout(10) << " new subtree root, adjusting auth_pins" << dendl
;
2612 inode
->num_subtree_roots
++;
2614 // adjust nested auth pins
2615 if (get_cum_auth_pins())
2616 inode
->adjust_nested_auth_pins(-1, NULL
);
2618 // unpin parent of frozen dir/tree?
2619 if (inode
->is_auth()) {
2620 assert(!is_frozen_tree_root());
2621 if (is_frozen_dir())
2622 inode
->auth_unpin(this);
2625 if (was_subtree
&& !is_subtree_root()) {
2626 dout(10) << " old subtree root, adjusting auth_pins" << dendl
;
2628 inode
->num_subtree_roots
--;
2630 // adjust nested auth pins
2631 if (get_cum_auth_pins())
2632 inode
->adjust_nested_auth_pins(1, NULL
);
2634 // pin parent of frozen dir/tree?
2635 if (inode
->is_auth()) {
2636 assert(!is_frozen_tree_root());
2637 if (is_frozen_dir())
2638 inode
->auth_pin(this);
2642 // newly single auth?
2643 if (was_ambiguous
&& dir_auth
.second
== CDIR_AUTH_UNKNOWN
) {
2644 list
<MDSInternalContextBase
*> ls
;
2645 take_waiting(WAIT_SINGLEAUTH
, ls
);
2646 cache
->mds
->queue_waiters(ls
);
2651 /*****************************************
2652 * AUTH PINS and FREEZING
2654 * the basic plan is that auth_pins only exist in auth regions, and they
2655 * prevent a freeze (and subsequent auth change).
2657 * however, we also need to prevent a parent from freezing if a child is frozen.
2658 * for that reason, the parent inode of a frozen directory is auth_pinned.
2660 * the oddity is when the frozen directory is a subtree root. if that's the case,
2661 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2662 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2667 void CDir::auth_pin(void *by
)
2673 #ifdef MDS_AUTHPIN_SET
2674 auth_pin_set
.insert(by
);
2677 dout(10) << "auth_pin by " << by
2679 << " count now " << auth_pins
<< " + " << nested_auth_pins
<< dendl
;
2682 if (!is_subtree_root() &&
2683 get_cum_auth_pins() == 1)
2684 inode
->adjust_nested_auth_pins(1, by
);
2687 void CDir::auth_unpin(void *by
)
2691 #ifdef MDS_AUTHPIN_SET
2692 assert(auth_pin_set
.count(by
));
2693 auth_pin_set
.erase(auth_pin_set
.find(by
));
2698 dout(10) << "auth_unpin by " << by
2700 << " count now " << auth_pins
<< " + " << nested_auth_pins
<< dendl
;
2701 assert(auth_pins
>= 0);
2703 int newcum
= get_cum_auth_pins();
2705 maybe_finish_freeze(); // pending freeze?
2708 if (!is_subtree_root() &&
2710 inode
->adjust_nested_auth_pins(-1, by
);
2713 void CDir::adjust_nested_auth_pins(int inc
, int dirinc
, void *by
)
2716 nested_auth_pins
+= inc
;
2717 dir_auth_pins
+= dirinc
;
2719 dout(15) << "adjust_nested_auth_pins " << inc
<< "/" << dirinc
<< " on " << *this
2720 << " by " << by
<< " count now "
2721 << auth_pins
<< " + " << nested_auth_pins
<< dendl
;
2722 assert(nested_auth_pins
>= 0);
2723 assert(dir_auth_pins
>= 0);
2725 int newcum
= get_cum_auth_pins();
2727 maybe_finish_freeze(); // pending freeze?
2730 if (!is_subtree_root()) {
2732 inode
->adjust_nested_auth_pins(-1, by
);
2733 else if (newcum
== inc
)
2734 inode
->adjust_nested_auth_pins(1, by
);
2738 #ifdef MDS_VERIFY_FRAGSTAT
2739 void CDir::verify_fragstat()
2741 assert(is_complete());
2742 if (inode
->is_stray())
2746 memset(&c
, 0, sizeof(c
));
2748 for (auto it
= items
.begin();
2751 CDentry
*dn
= it
->second
;
2755 dout(10) << " " << *dn
<< dendl
;
2756 if (dn
->is_primary())
2757 dout(10) << " " << *dn
->inode
<< dendl
;
2759 if (dn
->is_primary()) {
2760 if (dn
->inode
->is_dir())
2765 if (dn
->is_remote()) {
2766 if (dn
->get_remote_d_type() == DT_DIR
)
2773 if (c
.nsubdirs
!= fnode
.fragstat
.nsubdirs
||
2774 c
.nfiles
!= fnode
.fragstat
.nfiles
) {
2775 dout(0) << "verify_fragstat failed " << fnode
.fragstat
<< " on " << *this << dendl
;
2776 dout(0) << " i count " << c
<< dendl
;
2779 dout(0) << "verify_fragstat ok " << fnode
.fragstat
<< " on " << *this << dendl
;
2784 /*****************************************************************************
2790 bool CDir::freeze_tree()
2792 assert(!is_frozen());
2793 assert(!is_freezing());
2796 if (is_freezeable(true)) {
2801 state_set(STATE_FREEZINGTREE
);
2802 ++num_freezing_trees
;
2803 dout(10) << "freeze_tree waiting " << *this << dendl
;
2808 void CDir::_freeze_tree()
2810 dout(10) << "_freeze_tree " << *this << dendl
;
2811 assert(is_freezeable(true));
2814 if (state_test(STATE_FREEZINGTREE
)) {
2815 state_clear(STATE_FREEZINGTREE
); // actually, this may get set again by next context?
2816 --num_freezing_trees
;
2820 mds_authority_t auth
;
2821 bool was_subtree
= is_subtree_root();
2823 auth
= get_dir_auth();
2825 // temporarily prevent parent subtree from becoming frozen.
2826 inode
->auth_pin(this);
2827 // create new subtree
2831 assert(auth
.first
>= 0);
2832 assert(auth
.second
== CDIR_AUTH_UNKNOWN
);
2833 auth
.second
= auth
.first
;
2834 inode
->mdcache
->adjust_subtree_auth(this, auth
);
2836 inode
->auth_unpin(this);
2839 state_set(STATE_FROZENTREE
);
2844 void CDir::unfreeze_tree()
2846 dout(10) << "unfreeze_tree " << *this << dendl
;
2848 if (state_test(STATE_FROZENTREE
)) {
2849 // frozen. unfreeze.
2850 state_clear(STATE_FROZENTREE
);
2857 assert(is_subtree_root());
2858 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
2859 mds_authority_t auth
= get_dir_auth();
2860 assert(auth
.first
>= 0);
2861 assert(auth
.second
== auth
.first
);
2862 auth
.second
= CDIR_AUTH_UNKNOWN
;
2863 inode
->mdcache
->adjust_subtree_auth(this, auth
);
2867 finish_waiting(WAIT_UNFREEZE
);
2869 finish_waiting(WAIT_FROZEN
, -1);
2871 // freezing. stop it.
2872 assert(state_test(STATE_FREEZINGTREE
));
2873 state_clear(STATE_FREEZINGTREE
);
2874 --num_freezing_trees
;
2877 finish_waiting(WAIT_UNFREEZE
);
2881 bool CDir::is_freezing_tree() const
2883 if (num_freezing_trees
== 0)
2885 const CDir
*dir
= this;
2887 if (dir
->is_freezing_tree_root()) return true;
2888 if (dir
->is_subtree_root()) return false;
2889 if (dir
->inode
->parent
)
2890 dir
= dir
->inode
->parent
->dir
;
2892 return false; // root on replica
2896 bool CDir::is_frozen_tree() const
2898 if (num_frozen_trees
== 0)
2900 const CDir
*dir
= this;
2902 if (dir
->is_frozen_tree_root()) return true;
2903 if (dir
->is_subtree_root()) return false;
2904 if (dir
->inode
->parent
)
2905 dir
= dir
->inode
->parent
->dir
;
2907 return false; // root on replica
2911 CDir
*CDir::get_frozen_tree_root()
2913 assert(is_frozen());
2916 if (dir
->is_frozen_tree_root())
2918 if (dir
->inode
->parent
)
2919 dir
= dir
->inode
->parent
->dir
;
2925 class C_Dir_AuthUnpin
: public CDirContext
{
2927 explicit C_Dir_AuthUnpin(CDir
*d
) : CDirContext(d
) {}
2928 void finish(int r
) override
{
2929 dir
->auth_unpin(dir
->get_inode());
2933 void CDir::maybe_finish_freeze()
2935 if (auth_pins
!= 1 || dir_auth_pins
!= 0)
2938 // we can freeze the _dir_ even with nested pins...
2939 if (state_test(STATE_FREEZINGDIR
)) {
2942 finish_waiting(WAIT_FROZEN
);
2945 if (nested_auth_pins
!= 0)
2948 if (state_test(STATE_FREEZINGTREE
)) {
2949 if (!is_subtree_root() && inode
->is_frozen()) {
2950 dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode
<< dendl
;
2951 // retake an auth_pin...
2953 // and release it when the parent inode unfreezes
2954 inode
->add_waiter(WAIT_UNFREEZE
, new C_Dir_AuthUnpin(this));
2960 finish_waiting(WAIT_FROZEN
);
2968 bool CDir::freeze_dir()
2970 assert(!is_frozen());
2971 assert(!is_freezing());
2974 if (is_freezeable_dir(true)) {
2979 state_set(STATE_FREEZINGDIR
);
2980 dout(10) << "freeze_dir + wait " << *this << dendl
;
2985 void CDir::_freeze_dir()
2987 dout(10) << "_freeze_dir " << *this << dendl
;
2988 //assert(is_freezeable_dir(true));
2989 // not always true during split because the original fragment may have frozen a while
2990 // ago and we're just now getting around to breaking it up.
2992 state_clear(STATE_FREEZINGDIR
);
2993 state_set(STATE_FROZENDIR
);
2996 if (is_auth() && !is_subtree_root())
2997 inode
->auth_pin(this); // auth_pin for duration of freeze
3001 void CDir::unfreeze_dir()
3003 dout(10) << "unfreeze_dir " << *this << dendl
;
3005 if (state_test(STATE_FROZENDIR
)) {
3006 state_clear(STATE_FROZENDIR
);
3009 // unpin (may => FREEZEABLE) FIXME: is this order good?
3010 if (is_auth() && !is_subtree_root())
3011 inode
->auth_unpin(this);
3013 finish_waiting(WAIT_UNFREEZE
);
3015 finish_waiting(WAIT_FROZEN
, -1);
3017 // still freezing. stop.
3018 assert(state_test(STATE_FREEZINGDIR
));
3019 state_clear(STATE_FREEZINGDIR
);
3022 finish_waiting(WAIT_UNFREEZE
);
3027 * Slightly less complete than operator<<, because this is intended
3028 * for identifying a directory and its state rather than for dumping
3031 void CDir::dump(Formatter
*f
) const
3035 f
->dump_stream("path") << get_path();
3037 f
->dump_stream("dirfrag") << dirfrag();
3038 f
->dump_int("snapid_first", first
);
3040 f
->dump_stream("projected_version") << get_projected_version();
3041 f
->dump_stream("version") << get_version();
3042 f
->dump_stream("committing_version") << get_committing_version();
3043 f
->dump_stream("committed_version") << get_committed_version();
3045 f
->dump_bool("is_rep", is_rep());
3047 if (get_dir_auth() != CDIR_AUTH_DEFAULT
) {
3048 if (get_dir_auth().second
== CDIR_AUTH_UNKNOWN
) {
3049 f
->dump_stream("dir_auth") << get_dir_auth().first
;
3051 f
->dump_stream("dir_auth") << get_dir_auth();
3054 f
->dump_string("dir_auth", "");
3057 f
->open_array_section("states");
3058 MDSCacheObject::dump_states(f
);
3059 if (state_test(CDir::STATE_COMPLETE
)) f
->dump_string("state", "complete");
3060 if (state_test(CDir::STATE_FREEZINGTREE
)) f
->dump_string("state", "freezingtree");
3061 if (state_test(CDir::STATE_FROZENTREE
)) f
->dump_string("state", "frozentree");
3062 if (state_test(CDir::STATE_FROZENDIR
)) f
->dump_string("state", "frozendir");
3063 if (state_test(CDir::STATE_FREEZINGDIR
)) f
->dump_string("state", "freezingdir");
3064 if (state_test(CDir::STATE_EXPORTBOUND
)) f
->dump_string("state", "exportbound");
3065 if (state_test(CDir::STATE_IMPORTBOUND
)) f
->dump_string("state", "importbound");
3066 if (state_test(CDir::STATE_BADFRAG
)) f
->dump_string("state", "badfrag");
3069 MDSCacheObject::dump(f
);
3072 void CDir::dump_load(Formatter
*f
, utime_t now
, const DecayRate
& rate
)
3074 f
->dump_stream("path") << get_path();
3075 f
->dump_stream("dirfrag") << dirfrag();
3077 f
->open_object_section("pop_me");
3078 pop_me
.dump(f
, now
, rate
);
3081 f
->open_object_section("pop_nested");
3082 pop_nested
.dump(f
, now
, rate
);
3085 f
->open_object_section("pop_auth_subtree");
3086 pop_auth_subtree
.dump(f
, now
, rate
);
3089 f
->open_object_section("pop_auth_subtree_nested");
3090 pop_auth_subtree_nested
.dump(f
, now
, rate
);
3094 /****** Scrub Stuff *******/
3096 void CDir::scrub_info_create() const
3098 assert(!scrub_infop
);
3100 // break out of const-land to set up implicit initial state
3101 CDir
*me
= const_cast<CDir
*>(this);
3102 fnode_t
*fn
= me
->get_projected_fnode();
3104 std::unique_ptr
<scrub_info_t
> si(new scrub_info_t());
3106 si
->last_recursive
.version
= si
->recursive_start
.version
=
3107 fn
->recursive_scrub_version
;
3108 si
->last_recursive
.time
= si
->recursive_start
.time
=
3109 fn
->recursive_scrub_stamp
;
3111 si
->last_local
.version
= fn
->localized_scrub_version
;
3112 si
->last_local
.time
= fn
->localized_scrub_stamp
;
3114 me
->scrub_infop
.swap(si
);
3117 void CDir::scrub_initialize(const ScrubHeaderRefConst
& header
)
3119 dout(20) << __func__
<< dendl
;
3120 assert(is_complete());
3121 assert(header
!= nullptr);
3123 // FIXME: weird implicit construction, is someone else meant
3124 // to be calling scrub_info_create first?
3126 assert(scrub_infop
&& !scrub_infop
->directory_scrubbing
);
3128 scrub_infop
->recursive_start
.version
= get_projected_version();
3129 scrub_infop
->recursive_start
.time
= ceph_clock_now();
3131 scrub_infop
->directories_to_scrub
.clear();
3132 scrub_infop
->directories_scrubbing
.clear();
3133 scrub_infop
->directories_scrubbed
.clear();
3134 scrub_infop
->others_to_scrub
.clear();
3135 scrub_infop
->others_scrubbing
.clear();
3136 scrub_infop
->others_scrubbed
.clear();
3138 for (auto i
= items
.begin();
3141 // TODO: handle snapshot scrubbing
3142 if (i
->first
.snapid
!= CEPH_NOSNAP
)
3145 CDentry::linkage_t
*dnl
= i
->second
->get_projected_linkage();
3146 if (dnl
->is_primary()) {
3147 if (dnl
->get_inode()->is_dir())
3148 scrub_infop
->directories_to_scrub
.insert(i
->first
);
3150 scrub_infop
->others_to_scrub
.insert(i
->first
);
3151 } else if (dnl
->is_remote()) {
3152 // TODO: check remote linkage
3155 scrub_infop
->directory_scrubbing
= true;
3156 scrub_infop
->header
= header
;
3159 void CDir::scrub_finished()
3161 dout(20) << __func__
<< dendl
;
3162 assert(scrub_infop
&& scrub_infop
->directory_scrubbing
);
3164 assert(scrub_infop
->directories_to_scrub
.empty());
3165 assert(scrub_infop
->directories_scrubbing
.empty());
3166 scrub_infop
->directories_scrubbed
.clear();
3167 assert(scrub_infop
->others_to_scrub
.empty());
3168 assert(scrub_infop
->others_scrubbing
.empty());
3169 scrub_infop
->others_scrubbed
.clear();
3170 scrub_infop
->directory_scrubbing
= false;
3172 scrub_infop
->last_recursive
= scrub_infop
->recursive_start
;
3173 scrub_infop
->last_scrub_dirty
= true;
3176 int CDir::_next_dentry_on_set(dentry_key_set
&dns
, bool missing_okay
,
3177 MDSInternalContext
*cb
, CDentry
**dnout
)
3182 while (!dns
.empty()) {
3183 set
<dentry_key_t
>::iterator front
= dns
.begin();
3185 dn
= lookup(dnkey
.name
);
3187 if (!is_complete() &&
3188 (!has_bloom() || is_in_bloom(dnkey
.name
))) {
3189 // need to re-read this dirfrag
3195 dout(15) << " we no longer have directory dentry "
3196 << dnkey
.name
<< ", assuming it got renamed" << dendl
;
3200 dout(5) << " we lost dentry " << dnkey
.name
3201 << ", bailing out because that's impossible!" << dendl
;
3205 // okay, we got a dentry
3208 if (dn
->get_projected_version() < scrub_infop
->last_recursive
.version
&&
3209 !(scrub_infop
->header
->get_force())) {
3210 dout(15) << " skip dentry " << dnkey
.name
3211 << ", no change since last scrub" << dendl
;
3215 if (!dn
->get_linkage()->is_primary()) {
3216 dout(15) << " skip dentry " << dnkey
.name
3217 << ", no longer primary" << dendl
;
3228 int CDir::scrub_dentry_next(MDSInternalContext
*cb
, CDentry
**dnout
)
3230 dout(20) << __func__
<< dendl
;
3231 assert(scrub_infop
&& scrub_infop
->directory_scrubbing
);
3233 dout(20) << "trying to scrub directories underneath us" << dendl
;
3234 int rval
= _next_dentry_on_set(scrub_infop
->directories_to_scrub
, true,
3237 dout(20) << __func__
<< " inserted to directories scrubbing: "
3239 scrub_infop
->directories_scrubbing
.insert((*dnout
)->key());
3240 } else if (rval
== EAGAIN
) {
3241 // we don't need to do anything else
3242 } else { // we emptied out the directory scrub set
3243 assert(rval
== ENOENT
);
3244 dout(20) << "no directories left, moving on to other kinds of dentries"
3247 rval
= _next_dentry_on_set(scrub_infop
->others_to_scrub
, false, cb
, dnout
);
3249 dout(20) << __func__
<< " inserted to others scrubbing: "
3251 scrub_infop
->others_scrubbing
.insert((*dnout
)->key());
3254 dout(20) << " returning " << rval
<< " with dn=" << *dnout
<< dendl
;
3258 void CDir::scrub_dentries_scrubbing(list
<CDentry
*> *out_dentries
)
3260 dout(20) << __func__
<< dendl
;
3261 assert(scrub_infop
&& scrub_infop
->directory_scrubbing
);
3263 for (set
<dentry_key_t
>::iterator i
=
3264 scrub_infop
->directories_scrubbing
.begin();
3265 i
!= scrub_infop
->directories_scrubbing
.end();
3267 CDentry
*d
= lookup(i
->name
, i
->snapid
);
3269 out_dentries
->push_back(d
);
3271 for (set
<dentry_key_t
>::iterator i
= scrub_infop
->others_scrubbing
.begin();
3272 i
!= scrub_infop
->others_scrubbing
.end();
3274 CDentry
*d
= lookup(i
->name
, i
->snapid
);
3276 out_dentries
->push_back(d
);
3280 void CDir::scrub_dentry_finished(CDentry
*dn
)
3282 dout(20) << __func__
<< " on dn " << *dn
<< dendl
;
3283 assert(scrub_infop
&& scrub_infop
->directory_scrubbing
);
3284 dentry_key_t dn_key
= dn
->key();
3285 if (scrub_infop
->directories_scrubbing
.erase(dn_key
)) {
3286 scrub_infop
->directories_scrubbed
.insert(dn_key
);
3288 assert(scrub_infop
->others_scrubbing
.count(dn_key
));
3289 scrub_infop
->others_scrubbing
.erase(dn_key
);
3290 scrub_infop
->others_scrubbed
.insert(dn_key
);
3294 void CDir::scrub_maybe_delete_info()
3297 !scrub_infop
->directory_scrubbing
&&
3298 !scrub_infop
->need_scrub_local
&&
3299 !scrub_infop
->last_scrub_dirty
&&
3300 !scrub_infop
->pending_scrub_error
&&
3301 scrub_infop
->dirty_scrub_stamps
.empty()) {
3302 scrub_infop
.reset();
3306 bool CDir::scrub_local()
3308 assert(is_complete());
3309 bool rval
= check_rstats(true);
3313 scrub_infop
->last_local
.time
= ceph_clock_now();
3314 scrub_infop
->last_local
.version
= get_projected_version();
3315 scrub_infop
->pending_scrub_error
= false;
3316 scrub_infop
->last_scrub_dirty
= true;
3318 scrub_infop
->pending_scrub_error
= true;
3319 if (scrub_infop
->header
->get_repair())
3320 cache
->repair_dirfrag_stats(this);
3325 std::string
CDir::get_path() const
3328 get_inode()->make_path_string(path
, true);
3332 bool CDir::should_split_fast() const
3334 // Max size a fragment can be before trigger fast splitting
3335 int fast_limit
= g_conf
->mds_bal_split_size
* g_conf
->mds_bal_fragment_fast_factor
;
3337 // Fast path: the sum of accounted size and null dentries does not
3338 // exceed threshold: we definitely are not over it.
3339 if (get_frag_size() + get_num_head_null() <= fast_limit
) {
3343 // Fast path: the accounted size of the frag exceeds threshold: we
3344 // definitely are over it
3345 if (get_frag_size() > fast_limit
) {
3349 int64_t effective_size
= 0;
3351 for (const auto &p
: items
) {
3352 const CDentry
*dn
= p
.second
;
3353 if (!dn
->get_projected_linkage()->is_null()) {
3358 return effective_size
> fast_limit
;
3361 MEMPOOL_DEFINE_OBJECT_FACTORY(CDir
, co_dir
, mds_co
);