1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "include/types.h"
28 #include "LogSegment.h"
30 #include "common/bloom_filter.hpp"
31 #include "include/Context.h"
32 #include "common/Clock.h"
34 #include "osdc/Objecter.h"
36 #include "common/config.h"
37 #include "include/assert.h"
38 #include "include/compat.h"
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mds
43 #define dout_prefix *_dout << "mds." << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
45 int CDir::num_frozen_trees
= 0;
46 int CDir::num_freezing_trees
= 0;
48 class CDirContext
: public MDSInternalContextBase
52 MDSRank
* get_mds() override
{return dir
->cache
->mds
;}
55 explicit CDirContext(CDir
*d
) : dir(d
) {
61 class CDirIOContext
: public MDSIOContextBase
65 MDSRank
* get_mds() override
{return dir
->cache
->mds
;}
68 explicit CDirIOContext(CDir
*d
) : dir(d
) {
75 //int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
78 ostream
& operator<<(ostream
& out
, const CDir
& dir
)
80 out
<< "[dir " << dir
.dirfrag() << " " << dir
.get_path() << "/"
81 << " [" << dir
.first
<< ",head]";
84 if (dir
.is_replicated())
85 out
<< dir
.get_replicas();
87 if (dir
.is_projected())
88 out
<< " pv=" << dir
.get_projected_version();
89 out
<< " v=" << dir
.get_version();
90 out
<< " cv=" << dir
.get_committing_version();
91 out
<< "/" << dir
.get_committed_version();
93 mds_authority_t a
= dir
.authority();
94 out
<< " rep@" << a
.first
;
95 if (a
.second
!= CDIR_AUTH_UNKNOWN
)
96 out
<< "," << a
.second
;
97 out
<< "." << dir
.get_replica_nonce();
100 if (dir
.is_rep()) out
<< " REP";
102 if (dir
.get_dir_auth() != CDIR_AUTH_DEFAULT
) {
103 if (dir
.get_dir_auth().second
== CDIR_AUTH_UNKNOWN
)
104 out
<< " dir_auth=" << dir
.get_dir_auth().first
;
106 out
<< " dir_auth=" << dir
.get_dir_auth();
109 if (dir
.get_cum_auth_pins())
110 out
<< " ap=" << dir
.get_auth_pins()
111 << "+" << dir
.get_dir_auth_pins()
112 << "+" << dir
.get_nested_auth_pins();
114 out
<< " state=" << dir
.get_state();
115 if (dir
.state_test(CDir::STATE_COMPLETE
)) out
<< "|complete";
116 if (dir
.state_test(CDir::STATE_FREEZINGTREE
)) out
<< "|freezingtree";
117 if (dir
.state_test(CDir::STATE_FROZENTREE
)) out
<< "|frozentree";
118 if (dir
.state_test(CDir::STATE_AUXSUBTREE
)) out
<< "|auxsubtree";
119 //if (dir.state_test(CDir::STATE_FROZENTREELEAF)) out << "|frozentreeleaf";
120 if (dir
.state_test(CDir::STATE_FROZENDIR
)) out
<< "|frozendir";
121 if (dir
.state_test(CDir::STATE_FREEZINGDIR
)) out
<< "|freezingdir";
122 if (dir
.state_test(CDir::STATE_EXPORTBOUND
)) out
<< "|exportbound";
123 if (dir
.state_test(CDir::STATE_IMPORTBOUND
)) out
<< "|importbound";
124 if (dir
.state_test(CDir::STATE_BADFRAG
)) out
<< "|badfrag";
125 if (dir
.state_test(CDir::STATE_FRAGMENTING
)) out
<< "|fragmenting";
128 out
<< " " << dir
.fnode
.fragstat
;
129 if (!(dir
.fnode
.fragstat
== dir
.fnode
.accounted_fragstat
))
130 out
<< "/" << dir
.fnode
.accounted_fragstat
;
131 if (g_conf
->mds_debug_scatterstat
&& dir
.is_projected()) {
132 const fnode_t
*pf
= dir
.get_projected_fnode();
133 out
<< "->" << pf
->fragstat
;
134 if (!(pf
->fragstat
== pf
->accounted_fragstat
))
135 out
<< "/" << pf
->accounted_fragstat
;
139 out
<< " " << dir
.fnode
.rstat
;
140 if (!(dir
.fnode
.rstat
== dir
.fnode
.accounted_rstat
))
141 out
<< "/" << dir
.fnode
.accounted_rstat
;
142 if (g_conf
->mds_debug_scatterstat
&& dir
.is_projected()) {
143 const fnode_t
*pf
= dir
.get_projected_fnode();
144 out
<< "->" << pf
->rstat
;
145 if (!(pf
->rstat
== pf
->accounted_rstat
))
146 out
<< "/" << pf
->accounted_rstat
;
149 out
<< " hs=" << dir
.get_num_head_items() << "+" << dir
.get_num_head_null();
150 out
<< ",ss=" << dir
.get_num_snap_items() << "+" << dir
.get_num_snap_null();
151 if (dir
.get_num_dirty())
152 out
<< " dirty=" << dir
.get_num_dirty();
154 if (dir
.get_num_ref()) {
156 dir
.print_pin_set(out
);
164 void CDir::print(ostream
& out
)
172 ostream
& CDir::print_db_line_prefix(ostream
& out
)
174 return out
<< ceph_clock_now() << " mds." << cache
->mds
->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
179 // -------------------------------------------------------------------
182 CDir::CDir(CInode
*in
, frag_t fg
, MDCache
*mdcache
, bool auth
) :
183 cache(mdcache
), inode(in
), frag(fg
),
185 dirty_rstat_inodes(member_offset(CInode
, dirty_rstat_item
)),
186 projected_version(0), item_dirty(this), item_new(this),
187 num_head_items(0), num_head_null(0),
188 num_snap_items(0), num_snap_null(0),
189 num_dirty(0), committing_version(0), committed_version(0),
190 dir_auth_pins(0), request_pins(0),
192 pop_me(ceph_clock_now()),
193 pop_nested(ceph_clock_now()),
194 pop_auth_subtree(ceph_clock_now()),
195 pop_auth_subtree_nested(ceph_clock_now()),
196 num_dentries_nested(0), num_dentries_auth_subtree(0),
197 num_dentries_auth_subtree_nested(0),
198 dir_auth(CDIR_AUTH_DEFAULT
)
200 state
= STATE_INITIAL
;
202 memset(&fnode
, 0, sizeof(fnode
));
205 assert(in
->is_dir());
211 * Check the recursive statistics on size for consistency.
212 * If mds_debug_scatterstat is enabled, assert for correctness,
213 * otherwise just print out the mismatch and continue.
215 bool CDir::check_rstats(bool scrub
)
217 if (!g_conf
->mds_debug_scatterstat
&& !scrub
)
220 dout(25) << "check_rstats on " << this << dendl
;
221 if (!is_complete() || !is_auth() || is_frozen()) {
223 dout(10) << "check_rstats bailing out -- incomplete or non-auth or frozen dir!" << dendl
;
227 frag_info_t frag_info
;
228 nest_info_t nest_info
;
229 for (map_t::iterator i
= items
.begin(); i
!= items
.end(); ++i
) {
230 if (i
->second
->last
!= CEPH_NOSNAP
)
232 CDentry::linkage_t
*dnl
= i
->second
->get_linkage();
233 if (dnl
->is_primary()) {
234 CInode
*in
= dnl
->get_inode();
235 nest_info
.add(in
->inode
.accounted_rstat
);
237 frag_info
.nsubdirs
++;
240 } else if (dnl
->is_remote())
246 if(!frag_info
.same_sums(fnode
.fragstat
)) {
247 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl
;
248 dout(1) << "get_num_head_items() = " << get_num_head_items()
249 << "; fnode.fragstat.nfiles=" << fnode
.fragstat
.nfiles
250 << " fnode.fragstat.nsubdirs=" << fnode
.fragstat
.nsubdirs
<< dendl
;
253 dout(20) << "get_num_head_items() = " << get_num_head_items()
254 << "; fnode.fragstat.nfiles=" << fnode
.fragstat
.nfiles
255 << " fnode.fragstat.nsubdirs=" << fnode
.fragstat
.nsubdirs
<< dendl
;
259 if (!nest_info
.same_sums(fnode
.rstat
)) {
260 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl
;
261 dout(1) << "total of child dentrys: " << nest_info
<< dendl
;
262 dout(1) << "my rstats: " << fnode
.rstat
<< dendl
;
265 dout(20) << "total of child dentrys: " << nest_info
<< dendl
;
266 dout(20) << "my rstats: " << fnode
.rstat
<< dendl
;
271 for (map_t::iterator i
= items
.begin(); i
!= items
.end(); ++i
) {
272 CDentry
*dn
= i
->second
;
273 if (dn
->get_linkage()->is_primary()) {
274 CInode
*in
= dn
->get_linkage()->inode
;
275 dout(1) << *dn
<< " rstat " << in
->inode
.accounted_rstat
<< dendl
;
277 dout(1) << *dn
<< dendl
;
281 assert(frag_info
.nfiles
== fnode
.fragstat
.nfiles
);
282 assert(frag_info
.nsubdirs
== fnode
.fragstat
.nsubdirs
);
283 assert(nest_info
.rbytes
== fnode
.rstat
.rbytes
);
284 assert(nest_info
.rfiles
== fnode
.rstat
.rfiles
);
285 assert(nest_info
.rsubdirs
== fnode
.rstat
.rsubdirs
);
288 dout(10) << "check_rstats complete on " << this << dendl
;
292 CDentry
*CDir::lookup(const string
& name
, snapid_t snap
)
294 dout(20) << "lookup (" << snap
<< ", '" << name
<< "')" << dendl
;
295 map_t::iterator iter
= items
.lower_bound(dentry_key_t(snap
, name
.c_str(),
296 inode
->hash_dentry_name(name
)));
297 if (iter
== items
.end())
299 if (iter
->second
->name
== name
&&
300 iter
->second
->first
<= snap
&&
301 iter
->second
->last
>= snap
) {
302 dout(20) << " hit -> " << iter
->first
<< dendl
;
305 dout(20) << " miss -> " << iter
->first
<< dendl
;
309 CDentry
*CDir::lookup_exact_snap(const string
& name
, snapid_t last
) {
310 map_t::iterator p
= items
.find(dentry_key_t(last
, name
.c_str(),
311 inode
->hash_dentry_name(name
)));
312 if (p
== items
.end())
321 CDentry
* CDir::add_null_dentry(const string
& dname
,
322 snapid_t first
, snapid_t last
)
325 assert(lookup_exact_snap(dname
, last
) == 0);
328 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), first
, last
);
330 dn
->state_set(CDentry::STATE_AUTH
);
331 cache
->lru
.lru_insert_mid(dn
);
334 dn
->version
= get_projected_version();
337 assert(items
.count(dn
->key()) == 0);
338 //assert(null_items.count(dn->name) == 0);
340 items
[dn
->key()] = dn
;
341 if (last
== CEPH_NOSNAP
)
346 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
347 dn
->get(CDentry::PIN_FRAGMENTING
);
348 dn
->state_set(CDentry::STATE_FRAGMENTING
);
351 dout(12) << "add_null_dentry " << *dn
<< dendl
;
354 if (get_num_any() == 1)
357 assert(get_num_any() == items
.size());
362 CDentry
* CDir::add_primary_dentry(const string
& dname
, CInode
*in
,
363 snapid_t first
, snapid_t last
)
366 assert(lookup_exact_snap(dname
, last
) == 0);
369 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), first
, last
);
371 dn
->state_set(CDentry::STATE_AUTH
);
372 cache
->lru
.lru_insert_mid(dn
);
375 dn
->version
= get_projected_version();
378 assert(items
.count(dn
->key()) == 0);
379 //assert(null_items.count(dn->name) == 0);
381 items
[dn
->key()] = dn
;
383 dn
->get_linkage()->inode
= in
;
384 in
->set_primary_parent(dn
);
386 link_inode_work(dn
, in
);
388 if (dn
->last
== CEPH_NOSNAP
)
393 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
394 dn
->get(CDentry::PIN_FRAGMENTING
);
395 dn
->state_set(CDentry::STATE_FRAGMENTING
);
398 dout(12) << "add_primary_dentry " << *dn
<< dendl
;
401 if (get_num_any() == 1)
403 assert(get_num_any() == items
.size());
407 CDentry
* CDir::add_remote_dentry(const string
& dname
, inodeno_t ino
, unsigned char d_type
,
408 snapid_t first
, snapid_t last
)
411 assert(lookup_exact_snap(dname
, last
) == 0);
414 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), ino
, d_type
, first
, last
);
416 dn
->state_set(CDentry::STATE_AUTH
);
417 cache
->lru
.lru_insert_mid(dn
);
420 dn
->version
= get_projected_version();
423 assert(items
.count(dn
->key()) == 0);
424 //assert(null_items.count(dn->name) == 0);
426 items
[dn
->key()] = dn
;
427 if (last
== CEPH_NOSNAP
)
432 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
433 dn
->get(CDentry::PIN_FRAGMENTING
);
434 dn
->state_set(CDentry::STATE_FRAGMENTING
);
437 dout(12) << "add_remote_dentry " << *dn
<< dendl
;
440 if (get_num_any() == 1)
443 assert(get_num_any() == items
.size());
449 void CDir::remove_dentry(CDentry
*dn
)
451 dout(12) << "remove_dentry " << *dn
<< dendl
;
453 // there should be no client leases at this point!
454 assert(dn
->client_lease_map
.empty());
456 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
457 dn
->put(CDentry::PIN_FRAGMENTING
);
458 dn
->state_clear(CDentry::STATE_FRAGMENTING
);
461 if (dn
->get_linkage()->is_null()) {
462 if (dn
->last
== CEPH_NOSNAP
)
467 if (dn
->last
== CEPH_NOSNAP
)
473 if (!dn
->get_linkage()->is_null())
474 // detach inode and dentry
475 unlink_inode_work(dn
);
478 assert(items
.count(dn
->key()) == 1);
479 items
.erase(dn
->key());
485 cache
->lru
.lru_remove(dn
);
489 if (get_num_any() == 0)
491 assert(get_num_any() == items
.size());
494 void CDir::link_remote_inode(CDentry
*dn
, CInode
*in
)
496 link_remote_inode(dn
, in
->ino(), IFTODT(in
->get_projected_inode()->mode
));
499 void CDir::link_remote_inode(CDentry
*dn
, inodeno_t ino
, unsigned char d_type
)
501 dout(12) << "link_remote_inode " << *dn
<< " remote " << ino
<< dendl
;
502 assert(dn
->get_linkage()->is_null());
504 dn
->get_linkage()->set_remote(ino
, d_type
);
506 if (dn
->last
== CEPH_NOSNAP
) {
513 assert(get_num_any() == items
.size());
516 void CDir::link_primary_inode(CDentry
*dn
, CInode
*in
)
518 dout(12) << "link_primary_inode " << *dn
<< " " << *in
<< dendl
;
519 assert(dn
->get_linkage()->is_null());
521 dn
->get_linkage()->inode
= in
;
522 in
->set_primary_parent(dn
);
524 link_inode_work(dn
, in
);
526 if (dn
->last
== CEPH_NOSNAP
) {
534 assert(get_num_any() == items
.size());
537 void CDir::link_inode_work( CDentry
*dn
, CInode
*in
)
539 assert(dn
->get_linkage()->get_inode() == in
);
540 assert(in
->get_parent_dn() == dn
);
543 //in->inode.version = dn->get_version();
546 if (in
->get_num_ref())
547 dn
->get(CDentry::PIN_INODEPIN
);
549 // adjust auth pin count
550 if (in
->auth_pins
+ in
->nested_auth_pins
)
551 dn
->adjust_nested_auth_pins(in
->auth_pins
+ in
->nested_auth_pins
, in
->auth_pins
, NULL
);
553 // verify open snaprealm parent
555 in
->snaprealm
->adjust_parent();
556 else if (in
->is_any_caps())
557 in
->move_to_realm(inode
->find_snaprealm());
560 void CDir::unlink_inode(CDentry
*dn
)
562 if (dn
->get_linkage()->is_primary()) {
563 dout(12) << "unlink_inode " << *dn
<< " " << *dn
->get_linkage()->get_inode() << dendl
;
565 dout(12) << "unlink_inode " << *dn
<< dendl
;
568 unlink_inode_work(dn
);
570 if (dn
->last
== CEPH_NOSNAP
) {
577 assert(get_num_any() == items
.size());
581 void CDir::try_remove_unlinked_dn(CDentry
*dn
)
583 assert(dn
->dir
== this);
584 assert(dn
->get_linkage()->is_null());
586 // no pins (besides dirty)?
587 if (dn
->get_num_ref() != dn
->is_dirty())
592 dout(10) << "try_remove_unlinked_dn " << *dn
<< " in " << *this << dendl
;
597 // NOTE: we may not have any more dirty dentries, but the fnode
598 // still changed, so the directory must remain dirty.
603 void CDir::unlink_inode_work( CDentry
*dn
)
605 CInode
*in
= dn
->get_linkage()->get_inode();
607 if (dn
->get_linkage()->is_remote()) {
610 dn
->unlink_remote(dn
->get_linkage());
612 dn
->get_linkage()->set_remote(0, 0);
613 } else if (dn
->get_linkage()->is_primary()) {
616 if (in
->get_num_ref())
617 dn
->put(CDentry::PIN_INODEPIN
);
619 // unlink auth_pin count
620 if (in
->auth_pins
+ in
->nested_auth_pins
)
621 dn
->adjust_nested_auth_pins(0 - (in
->auth_pins
+ in
->nested_auth_pins
), 0 - in
->auth_pins
, NULL
);
624 in
->remove_primary_parent(dn
);
625 dn
->get_linkage()->inode
= 0;
627 assert(!dn
->get_linkage()->is_null());
631 void CDir::add_to_bloom(CDentry
*dn
)
633 assert(dn
->last
== CEPH_NOSNAP
);
635 /* not create bloom filter for incomplete dir that was added by log replay */
639 /* don't maintain bloom filters in standby replay (saves cycles, and also
640 * avoids need to implement clearing it in EExport for #16924) */
641 if (cache
->mds
->is_standby_replay()) {
645 unsigned size
= get_num_head_items() + get_num_snap_items();
646 if (size
< 100) size
= 100;
647 bloom
.reset(new bloom_filter(size
, 1.0 / size
, 0));
649 /* This size and false positive probability is completely random.*/
650 bloom
->insert(dn
->name
.c_str(), dn
->name
.size());
653 bool CDir::is_in_bloom(const string
& name
)
657 return bloom
->contains(name
.c_str(), name
.size());
660 void CDir::remove_null_dentries() {
661 dout(12) << "remove_null_dentries " << *this << dendl
;
663 CDir::map_t::iterator p
= items
.begin();
664 while (p
!= items
.end()) {
665 CDentry
*dn
= p
->second
;
667 if (dn
->get_linkage()->is_null() && !dn
->is_projected())
671 assert(num_snap_null
== 0);
672 assert(num_head_null
== 0);
673 assert(get_num_any() == items
.size());
676 /** remove dirty null dentries for deleted directory. the dirfrag will be
677 * deleted soon, so it's safe to not commit dirty dentries.
679 * This is called when a directory is being deleted, a prerequisite
680 * of which is that its children have been unlinked: we expect to only see
681 * null, unprojected dentries here.
683 void CDir::try_remove_dentries_for_stray()
685 dout(10) << __func__
<< dendl
;
686 assert(inode
->inode
.nlink
== 0);
688 // clear dirty only when the directory was not snapshotted
689 bool clear_dirty
= !inode
->snaprealm
;
691 CDir::map_t::iterator p
= items
.begin();
692 while (p
!= items
.end()) {
693 CDentry
*dn
= p
->second
;
695 if (dn
->last
== CEPH_NOSNAP
) {
696 assert(!dn
->is_projected());
697 assert(dn
->get_linkage()->is_null());
698 if (clear_dirty
&& dn
->is_dirty())
700 // It's OK to remove lease prematurely because we will never link
701 // the dentry to inode again.
702 if (dn
->is_any_leases())
703 dn
->remove_client_leases(cache
->mds
->locker
);
704 if (dn
->get_num_ref() == 0)
707 assert(!dn
->is_projected());
708 CDentry::linkage_t
*dnl
= dn
->get_linkage();
710 if (dnl
->is_primary()) {
711 in
= dnl
->get_inode();
712 if (clear_dirty
&& in
->is_dirty())
715 if (clear_dirty
&& dn
->is_dirty())
717 if (dn
->get_num_ref() == 0) {
720 cache
->remove_inode(in
);
725 if (clear_dirty
&& is_dirty())
729 void CDir::touch_dentries_bottom() {
730 dout(12) << "touch_dentries_bottom " << *this << dendl
;
732 for (CDir::map_t::iterator p
= items
.begin();
735 inode
->mdcache
->touch_dentry_bottom(p
->second
);
738 bool CDir::try_trim_snap_dentry(CDentry
*dn
, const set
<snapid_t
>& snaps
)
740 assert(dn
->last
!= CEPH_NOSNAP
);
741 set
<snapid_t
>::const_iterator p
= snaps
.lower_bound(dn
->first
);
742 CDentry::linkage_t
*dnl
= dn
->get_linkage();
744 if (dnl
->is_primary())
745 in
= dnl
->get_inode();
746 if ((p
== snaps
.end() || *p
> dn
->last
) &&
747 (dn
->get_num_ref() == dn
->is_dirty()) &&
748 (!in
|| in
->get_num_ref() == in
->is_dirty())) {
749 dout(10) << " purging snapped " << *dn
<< dendl
;
750 if (in
&& in
->is_dirty())
754 dout(10) << " purging snapped " << *in
<< dendl
;
755 cache
->remove_inode(in
);
763 void CDir::purge_stale_snap_data(const set
<snapid_t
>& snaps
)
765 dout(10) << "purge_stale_snap_data " << snaps
<< dendl
;
767 CDir::map_t::iterator p
= items
.begin();
768 while (p
!= items
.end()) {
769 CDentry
*dn
= p
->second
;
772 if (dn
->last
== CEPH_NOSNAP
)
775 try_trim_snap_dentry(dn
, snaps
);
781 * steal_dentry -- semi-violently move a dentry from one CDir to another
782 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
783 * on the old CDir corpse; must call finish_old_fragment() when finished.
785 void CDir::steal_dentry(CDentry
*dn
)
787 dout(15) << "steal_dentry " << *dn
<< dendl
;
789 items
[dn
->key()] = dn
;
791 dn
->dir
->items
.erase(dn
->key());
792 if (dn
->dir
->items
.empty())
793 dn
->dir
->put(PIN_CHILD
);
795 if (get_num_any() == 0)
797 if (dn
->get_linkage()->is_null()) {
798 if (dn
->last
== CEPH_NOSNAP
)
802 } else if (dn
->last
== CEPH_NOSNAP
) {
805 if (dn
->get_linkage()->is_primary()) {
806 CInode
*in
= dn
->get_linkage()->get_inode();
807 inode_t
*pi
= in
->get_projected_inode();
808 if (dn
->get_linkage()->get_inode()->is_dir())
809 fnode
.fragstat
.nsubdirs
++;
811 fnode
.fragstat
.nfiles
++;
812 fnode
.rstat
.rbytes
+= pi
->accounted_rstat
.rbytes
;
813 fnode
.rstat
.rfiles
+= pi
->accounted_rstat
.rfiles
;
814 fnode
.rstat
.rsubdirs
+= pi
->accounted_rstat
.rsubdirs
;
815 fnode
.rstat
.rsnaprealms
+= pi
->accounted_rstat
.rsnaprealms
;
816 if (pi
->accounted_rstat
.rctime
> fnode
.rstat
.rctime
)
817 fnode
.rstat
.rctime
= pi
->accounted_rstat
.rctime
;
819 // move dirty inode rstat to new dirfrag
820 if (in
->is_dirty_rstat())
821 dirty_rstat_inodes
.push_back(&in
->dirty_rstat_item
);
822 } else if (dn
->get_linkage()->is_remote()) {
823 if (dn
->get_linkage()->get_remote_d_type() == DT_DIR
)
824 fnode
.fragstat
.nsubdirs
++;
826 fnode
.fragstat
.nfiles
++;
830 if (dn
->get_linkage()->is_primary()) {
831 CInode
*in
= dn
->get_linkage()->get_inode();
832 if (in
->is_dirty_rstat())
833 dirty_rstat_inodes
.push_back(&in
->dirty_rstat_item
);
837 if (dn
->auth_pins
|| dn
->nested_auth_pins
) {
838 // use the helpers here to maintain the auth_pin invariants on the dir inode
839 int ap
= dn
->get_num_auth_pins() + dn
->get_num_nested_auth_pins();
840 int dap
= dn
->get_num_dir_auth_pins();
842 adjust_nested_auth_pins(ap
, dap
, NULL
);
843 dn
->dir
->adjust_nested_auth_pins(-ap
, -dap
, NULL
);
852 void CDir::prepare_old_fragment(bool replay
)
854 // auth_pin old fragment for duration so that any auth_pinning
855 // during the dentry migration doesn't trigger side effects
856 if (!replay
&& is_auth())
860 void CDir::prepare_new_fragment(bool replay
)
862 if (!replay
&& is_auth()) {
868 void CDir::finish_old_fragment(list
<MDSInternalContextBase
*>& waiters
, bool replay
)
870 // take waiters _before_ unfreeze...
872 take_waiting(WAIT_ANY_MASK
, waiters
);
874 auth_unpin(this); // pinned in prepare_old_fragment
875 assert(is_frozen_dir());
880 assert(nested_auth_pins
== 0);
881 assert(dir_auth_pins
== 0);
882 assert(auth_pins
== 0);
884 num_head_items
= num_head_null
= 0;
885 num_snap_items
= num_snap_null
= 0;
887 // this mirrors init_fragment_pins()
892 if (state_test(STATE_IMPORTBOUND
))
893 put(PIN_IMPORTBOUND
);
894 if (state_test(STATE_EXPORTBOUND
))
895 put(PIN_EXPORTBOUND
);
896 if (is_subtree_root())
902 assert(get_num_ref() == (state_test(STATE_STICKY
) ? 1:0));
905 void CDir::init_fragment_pins()
907 if (!replica_map
.empty())
909 if (state_test(STATE_DIRTY
))
911 if (state_test(STATE_EXPORTBOUND
))
912 get(PIN_EXPORTBOUND
);
913 if (state_test(STATE_IMPORTBOUND
))
914 get(PIN_IMPORTBOUND
);
915 if (is_subtree_root())
919 void CDir::split(int bits
, list
<CDir
*>& subs
, list
<MDSInternalContextBase
*>& waiters
, bool replay
)
921 dout(10) << "split by " << bits
<< " bits on " << *this << dendl
;
923 assert(replay
|| is_complete() || !is_auth());
926 frag
.split(bits
, frags
);
928 vector
<CDir
*> subfrags(1 << bits
);
930 double fac
= 1.0 / (double)(1 << bits
); // for scaling load vecs
932 version_t rstat_version
= inode
->get_projected_inode()->rstat
.version
;
933 version_t dirstat_version
= inode
->get_projected_inode()->dirstat
.version
;
935 nest_info_t rstatdiff
;
936 frag_info_t fragstatdiff
;
937 if (fnode
.accounted_rstat
.version
== rstat_version
)
938 rstatdiff
.add_delta(fnode
.accounted_rstat
, fnode
.rstat
);
939 if (fnode
.accounted_fragstat
.version
== dirstat_version
)
940 fragstatdiff
.add_delta(fnode
.accounted_fragstat
, fnode
.fragstat
);
941 dout(10) << " rstatdiff " << rstatdiff
<< " fragstatdiff " << fragstatdiff
<< dendl
;
943 prepare_old_fragment(replay
);
945 // create subfrag dirs
947 for (list
<frag_t
>::iterator p
= frags
.begin(); p
!= frags
.end(); ++p
) {
948 CDir
*f
= new CDir(inode
, *p
, cache
, is_auth());
949 f
->state_set(state
& (MASK_STATE_FRAGMENT_KEPT
| STATE_COMPLETE
));
950 f
->replica_map
= replica_map
;
951 f
->dir_auth
= dir_auth
;
952 f
->init_fragment_pins();
953 f
->set_version(get_version());
956 f
->pop_me
.scale(fac
);
958 // FIXME; this is an approximation
959 f
->pop_nested
= pop_nested
;
960 f
->pop_nested
.scale(fac
);
961 f
->pop_auth_subtree
= pop_auth_subtree
;
962 f
->pop_auth_subtree
.scale(fac
);
963 f
->pop_auth_subtree_nested
= pop_auth_subtree_nested
;
964 f
->pop_auth_subtree_nested
.scale(fac
);
966 dout(10) << " subfrag " << *p
<< " " << *f
<< dendl
;
969 inode
->add_dirfrag(f
);
971 f
->set_dir_auth(get_dir_auth());
972 f
->prepare_new_fragment(replay
);
975 // repartition dentries
976 while (!items
.empty()) {
977 CDir::map_t::iterator p
= items
.begin();
979 CDentry
*dn
= p
->second
;
980 frag_t subfrag
= inode
->pick_dirfrag(dn
->name
);
981 int n
= (subfrag
.value() & (subfrag
.mask() ^ frag
.mask())) >> subfrag
.mask_shift();
982 dout(15) << " subfrag " << subfrag
<< " n=" << n
<< " for " << p
->first
<< dendl
;
983 CDir
*f
= subfrags
[n
];
987 // FIXME: handle dirty old rstat
989 // fix up new frag fragstats
990 for (int i
=0; i
<n
; i
++) {
991 CDir
*f
= subfrags
[i
];
992 f
->fnode
.rstat
.version
= rstat_version
;
993 f
->fnode
.accounted_rstat
= f
->fnode
.rstat
;
994 f
->fnode
.fragstat
.version
= dirstat_version
;
995 f
->fnode
.accounted_fragstat
= f
->fnode
.fragstat
;
996 dout(10) << " rstat " << f
->fnode
.rstat
<< " fragstat " << f
->fnode
.fragstat
997 << " on " << *f
<< dendl
;
1000 // give any outstanding frag stat differential to first frag
1001 dout(10) << " giving rstatdiff " << rstatdiff
<< " fragstatdiff" << fragstatdiff
1002 << " to " << *subfrags
[0] << dendl
;
1003 subfrags
[0]->fnode
.accounted_rstat
.add(rstatdiff
);
1004 subfrags
[0]->fnode
.accounted_fragstat
.add(fragstatdiff
);
1006 finish_old_fragment(waiters
, replay
);
1009 void CDir::merge(list
<CDir
*>& subs
, list
<MDSInternalContextBase
*>& waiters
, bool replay
)
1011 dout(10) << "merge " << subs
<< dendl
;
1013 mds_authority_t new_auth
= CDIR_AUTH_DEFAULT
;
1014 for (auto dir
: subs
) {
1015 if (dir
->get_dir_auth() != CDIR_AUTH_DEFAULT
&&
1016 dir
->get_dir_auth() != new_auth
) {
1017 assert(new_auth
== CDIR_AUTH_DEFAULT
);
1018 new_auth
= dir
->get_dir_auth();
1022 set_dir_auth(new_auth
);
1023 prepare_new_fragment(replay
);
1025 nest_info_t rstatdiff
;
1026 frag_info_t fragstatdiff
;
1027 bool touched_mtime
, touched_chattr
;
1028 version_t rstat_version
= inode
->get_projected_inode()->rstat
.version
;
1029 version_t dirstat_version
= inode
->get_projected_inode()->dirstat
.version
;
1031 for (auto dir
: subs
) {
1032 dout(10) << " subfrag " << dir
->get_frag() << " " << *dir
<< dendl
;
1033 assert(!dir
->is_auth() || dir
->is_complete() || replay
);
1035 if (dir
->fnode
.accounted_rstat
.version
== rstat_version
)
1036 rstatdiff
.add_delta(dir
->fnode
.accounted_rstat
, dir
->fnode
.rstat
);
1037 if (dir
->fnode
.accounted_fragstat
.version
== dirstat_version
)
1038 fragstatdiff
.add_delta(dir
->fnode
.accounted_fragstat
, dir
->fnode
.fragstat
,
1039 &touched_mtime
, &touched_chattr
);
1041 dir
->prepare_old_fragment(replay
);
1044 while (!dir
->items
.empty())
1045 steal_dentry(dir
->items
.begin()->second
);
1047 // merge replica map
1048 for (compact_map
<mds_rank_t
,unsigned>::iterator p
= dir
->replicas_begin();
1049 p
!= dir
->replicas_end();
1051 unsigned cur
= replica_map
[p
->first
];
1052 if (p
->second
> cur
)
1053 replica_map
[p
->first
] = p
->second
;
1057 if (dir
->get_version() > get_version())
1058 set_version(dir
->get_version());
1061 state_set(dir
->get_state() & MASK_STATE_FRAGMENT_KEPT
);
1062 dir_auth
= dir
->dir_auth
;
1064 dir
->finish_old_fragment(waiters
, replay
);
1065 inode
->close_dirfrag(dir
->get_frag());
1068 if (is_auth() && !replay
)
1071 // FIXME: merge dirty old rstat
1072 fnode
.rstat
.version
= rstat_version
;
1073 fnode
.accounted_rstat
= fnode
.rstat
;
1074 fnode
.accounted_rstat
.add(rstatdiff
);
1076 fnode
.fragstat
.version
= dirstat_version
;
1077 fnode
.accounted_fragstat
= fnode
.fragstat
;
1078 fnode
.accounted_fragstat
.add(fragstatdiff
);
1080 init_fragment_pins();
1086 void CDir::resync_accounted_fragstat()
1088 fnode_t
*pf
= get_projected_fnode();
1089 inode_t
*pi
= inode
->get_projected_inode();
1091 if (pf
->accounted_fragstat
.version
!= pi
->dirstat
.version
) {
1092 pf
->fragstat
.version
= pi
->dirstat
.version
;
1093 dout(10) << "resync_accounted_fragstat " << pf
->accounted_fragstat
<< " -> " << pf
->fragstat
<< dendl
;
1094 pf
->accounted_fragstat
= pf
->fragstat
;
1099 * resync rstat and accounted_rstat with inode
1101 void CDir::resync_accounted_rstat()
1103 fnode_t
*pf
= get_projected_fnode();
1104 inode_t
*pi
= inode
->get_projected_inode();
1106 if (pf
->accounted_rstat
.version
!= pi
->rstat
.version
) {
1107 pf
->rstat
.version
= pi
->rstat
.version
;
1108 dout(10) << "resync_accounted_rstat " << pf
->accounted_rstat
<< " -> " << pf
->rstat
<< dendl
;
1109 pf
->accounted_rstat
= pf
->rstat
;
1110 dirty_old_rstat
.clear();
1114 void CDir::assimilate_dirty_rstat_inodes()
1116 dout(10) << "assimilate_dirty_rstat_inodes" << dendl
;
1117 for (elist
<CInode
*>::iterator p
= dirty_rstat_inodes
.begin_use_current();
1120 assert(in
->is_auth());
1121 if (in
->is_frozen())
1124 inode_t
*pi
= in
->project_inode();
1125 pi
->version
= in
->pre_dirty();
1127 inode
->mdcache
->project_rstat_inode_to_frag(in
, this, 0, 0, NULL
);
1129 state_set(STATE_ASSIMRSTAT
);
1130 dout(10) << "assimilate_dirty_rstat_inodes done" << dendl
;
1133 void CDir::assimilate_dirty_rstat_inodes_finish(MutationRef
& mut
, EMetaBlob
*blob
)
1135 if (!state_test(STATE_ASSIMRSTAT
))
1137 state_clear(STATE_ASSIMRSTAT
);
1138 dout(10) << "assimilate_dirty_rstat_inodes_finish" << dendl
;
1139 elist
<CInode
*>::iterator p
= dirty_rstat_inodes
.begin_use_current();
1144 if (in
->is_frozen())
1147 CDentry
*dn
= in
->get_projected_parent_dn();
1150 mut
->add_projected_inode(in
);
1152 in
->clear_dirty_rstat();
1153 blob
->add_primary_dentry(dn
, in
, true);
1156 if (!dirty_rstat_inodes
.empty())
1157 inode
->mdcache
->mds
->locker
->mark_updated_scatterlock(&inode
->nestlock
);
1163 /****************************************
1167 void CDir::add_dentry_waiter(const string
& dname
, snapid_t snapid
, MDSInternalContextBase
*c
)
1169 if (waiting_on_dentry
.empty())
1171 waiting_on_dentry
[string_snap_t(dname
, snapid
)].push_back(c
);
1172 dout(10) << "add_dentry_waiter dentry " << dname
1173 << " snap " << snapid
1174 << " " << c
<< " on " << *this << dendl
;
1177 void CDir::take_dentry_waiting(const string
& dname
, snapid_t first
, snapid_t last
,
1178 list
<MDSInternalContextBase
*>& ls
)
1180 if (waiting_on_dentry
.empty())
1183 string_snap_t
lb(dname
, first
);
1184 string_snap_t
ub(dname
, last
);
1185 compact_map
<string_snap_t
, list
<MDSInternalContextBase
*> >::iterator p
= waiting_on_dentry
.lower_bound(lb
);
1186 while (p
!= waiting_on_dentry
.end() &&
1188 dout(10) << "take_dentry_waiting dentry " << dname
1189 << " [" << first
<< "," << last
<< "] found waiter on snap "
1191 << " on " << *this << dendl
;
1192 ls
.splice(ls
.end(), p
->second
);
1193 waiting_on_dentry
.erase(p
++);
1196 if (waiting_on_dentry
.empty())
1200 void CDir::take_sub_waiting(list
<MDSInternalContextBase
*>& ls
)
1202 dout(10) << "take_sub_waiting" << dendl
;
1203 if (!waiting_on_dentry
.empty()) {
1204 for (compact_map
<string_snap_t
, list
<MDSInternalContextBase
*> >::iterator p
= waiting_on_dentry
.begin();
1205 p
!= waiting_on_dentry
.end();
1207 ls
.splice(ls
.end(), p
->second
);
1208 waiting_on_dentry
.clear();
1215 void CDir::add_waiter(uint64_t tag
, MDSInternalContextBase
*c
)
1220 if (tag
& WAIT_ATFREEZEROOT
) {
1221 if (!(is_freezing_tree_root() || is_frozen_tree_root() ||
1222 is_freezing_dir() || is_frozen_dir())) {
1224 dout(10) << "add_waiter " << std::hex
<< tag
<< std::dec
<< " " << c
<< " should be ATFREEZEROOT, " << *this << " is not root, trying parent" << dendl
;
1225 inode
->parent
->dir
->add_waiter(tag
, c
);
1231 if (tag
& WAIT_ATSUBTREEROOT
) {
1232 if (!is_subtree_root()) {
1234 dout(10) << "add_waiter " << std::hex
<< tag
<< std::dec
<< " " << c
<< " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl
;
1235 inode
->parent
->dir
->add_waiter(tag
, c
);
1240 assert(!(tag
& WAIT_CREATED
) || state_test(STATE_CREATING
));
1242 MDSCacheObject::add_waiter(tag
, c
);
1247 /* NOTE: this checks dentry waiters too */
1248 void CDir::take_waiting(uint64_t mask
, list
<MDSInternalContextBase
*>& ls
)
1250 if ((mask
& WAIT_DENTRY
) && !waiting_on_dentry
.empty()) {
1251 // take all dentry waiters
1252 while (!waiting_on_dentry
.empty()) {
1253 compact_map
<string_snap_t
, list
<MDSInternalContextBase
*> >::iterator p
= waiting_on_dentry
.begin();
1254 dout(10) << "take_waiting dentry " << p
->first
.name
1255 << " snap " << p
->first
.snapid
<< " on " << *this << dendl
;
1256 ls
.splice(ls
.end(), p
->second
);
1257 waiting_on_dentry
.erase(p
);
1263 MDSCacheObject::take_waiting(mask
, ls
);
1267 void CDir::finish_waiting(uint64_t mask
, int result
)
1269 dout(11) << "finish_waiting mask " << hex
<< mask
<< dec
<< " result " << result
<< " on " << *this << dendl
;
1271 list
<MDSInternalContextBase
*> finished
;
1272 take_waiting(mask
, finished
);
1274 finish_contexts(g_ceph_context
, finished
, result
);
1276 cache
->mds
->queue_waiters(finished
);
1283 fnode_t
*CDir::project_fnode()
1285 assert(get_version() != 0);
1286 fnode_t
*p
= new fnode_t
;
1287 *p
= *get_projected_fnode();
1288 projected_fnode
.push_back(p
);
1290 if (scrub_infop
&& scrub_infop
->last_scrub_dirty
) {
1291 p
->localized_scrub_stamp
= scrub_infop
->last_local
.time
;
1292 p
->localized_scrub_version
= scrub_infop
->last_local
.version
;
1293 p
->recursive_scrub_stamp
= scrub_infop
->last_recursive
.time
;
1294 p
->recursive_scrub_version
= scrub_infop
->last_recursive
.version
;
1295 scrub_infop
->last_scrub_dirty
= false;
1296 scrub_maybe_delete_info();
1299 dout(10) << "project_fnode " << p
<< dendl
;
1303 void CDir::pop_and_dirty_projected_fnode(LogSegment
*ls
)
1305 assert(!projected_fnode
.empty());
1306 dout(15) << "pop_and_dirty_projected_fnode " << projected_fnode
.front()
1307 << " v" << projected_fnode
.front()->version
<< dendl
;
1308 fnode
= *projected_fnode
.front();
1310 delete projected_fnode
.front();
1311 projected_fnode
.pop_front();
1315 version_t
CDir::pre_dirty(version_t min
)
1317 if (min
> projected_version
)
1318 projected_version
= min
;
1319 ++projected_version
;
1320 dout(10) << "pre_dirty " << projected_version
<< dendl
;
1321 return projected_version
;
1324 void CDir::mark_dirty(version_t pv
, LogSegment
*ls
)
1326 assert(get_version() < pv
);
1327 assert(pv
<= projected_version
);
1332 void CDir::_mark_dirty(LogSegment
*ls
)
1334 if (!state_test(STATE_DIRTY
)) {
1335 dout(10) << "mark_dirty (was clean) " << *this << " version " << get_version() << dendl
;
1339 dout(10) << "mark_dirty (already dirty) " << *this << " version " << get_version() << dendl
;
1342 ls
->dirty_dirfrags
.push_back(&item_dirty
);
1344 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1345 if (committed_version
== 0 && !item_new
.is_on_list())
1346 ls
->new_dirfrags
.push_back(&item_new
);
1350 void CDir::mark_new(LogSegment
*ls
)
1352 ls
->new_dirfrags
.push_back(&item_new
);
1353 state_clear(STATE_CREATING
);
1355 list
<MDSInternalContextBase
*> waiters
;
1356 take_waiting(CDir::WAIT_CREATED
, waiters
);
1357 cache
->mds
->queue_waiters(waiters
);
1360 void CDir::mark_clean()
1362 dout(10) << "mark_clean " << *this << " version " << get_version() << dendl
;
1363 if (state_test(STATE_DIRTY
)) {
1364 item_dirty
.remove_myself();
1365 item_new
.remove_myself();
1367 state_clear(STATE_DIRTY
);
1372 // caller should hold auth pin of this
1373 void CDir::log_mark_dirty()
1375 if (is_dirty() || is_projected())
1376 return; // noop if it is already dirty or will be dirty
1378 version_t pv
= pre_dirty();
1379 mark_dirty(pv
, cache
->mds
->mdlog
->get_current_segment());
1382 void CDir::mark_complete() {
1383 state_set(STATE_COMPLETE
);
1387 void CDir::first_get()
1389 inode
->get(CInode::PIN_DIRFRAG
);
1392 void CDir::last_put()
1394 inode
->put(CInode::PIN_DIRFRAG
);
1399 /******************************************************************************
1403 // -----------------------
1405 void CDir::fetch(MDSInternalContextBase
*c
, bool ignore_authpinnability
)
1408 return fetch(c
, want
, ignore_authpinnability
);
1411 void CDir::fetch(MDSInternalContextBase
*c
, const string
& want_dn
, bool ignore_authpinnability
)
1413 dout(10) << "fetch on " << *this << dendl
;
1416 assert(!is_complete());
1418 if (!can_auth_pin() && !ignore_authpinnability
) {
1420 dout(7) << "fetch waiting for authpinnable" << dendl
;
1421 add_waiter(WAIT_UNFREEZE
, c
);
1423 dout(7) << "fetch not authpinnable and no context" << dendl
;
1427 // unlinked directory inode shouldn't have any entry
1428 if (inode
->inode
.nlink
== 0 && !inode
->snaprealm
) {
1429 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl
;
1430 if (get_version() == 0) {
1433 if (state_test(STATE_REJOINUNDEF
)) {
1434 assert(cache
->mds
->is_rejoin());
1435 state_clear(STATE_REJOINUNDEF
);
1436 cache
->opened_undef_dirfrag(this);
1442 cache
->mds
->queue_waiter(c
);
1446 if (c
) add_waiter(WAIT_COMPLETE
, c
);
1447 if (!want_dn
.empty()) wanted_items
.insert(want_dn
);
1449 // already fetching?
1450 if (state_test(CDir::STATE_FETCHING
)) {
1451 dout(7) << "already fetching; waiting" << dendl
;
1456 state_set(CDir::STATE_FETCHING
);
1458 if (cache
->mds
->logger
) cache
->mds
->logger
->inc(l_mds_dir_fetch
);
1460 std::set
<dentry_key_t
> empty
;
1461 _omap_fetch(NULL
, empty
);
1464 void CDir::fetch(MDSInternalContextBase
*c
, const std::set
<dentry_key_t
>& keys
)
1466 dout(10) << "fetch " << keys
.size() << " keys on " << *this << dendl
;
1469 assert(!is_complete());
1471 if (!can_auth_pin()) {
1472 dout(7) << "fetch keys waiting for authpinnable" << dendl
;
1473 add_waiter(WAIT_UNFREEZE
, c
);
1476 if (state_test(CDir::STATE_FETCHING
)) {
1477 dout(7) << "fetch keys waiting for full fetch" << dendl
;
1478 add_waiter(WAIT_COMPLETE
, c
);
1483 if (cache
->mds
->logger
) cache
->mds
->logger
->inc(l_mds_dir_fetch
);
1485 _omap_fetch(c
, keys
);
1488 class C_IO_Dir_OMAP_FetchedMore
: public CDirIOContext
{
1489 MDSInternalContextBase
*fin
;
1493 map
<string
, bufferlist
> omap
; ///< carry-over from before
1494 map
<string
, bufferlist
> omap_more
; ///< new batch
1496 C_IO_Dir_OMAP_FetchedMore(CDir
*d
, MDSInternalContextBase
*f
) :
1497 CDirIOContext(d
), fin(f
), ret(0) { }
1498 void finish(int r
) {
1501 omap
.swap(omap_more
);
1503 omap
.insert(omap_more
.begin(), omap_more
.end());
1506 dir
->_omap_fetch_more(hdrbl
, omap
, fin
);
1508 dir
->_omap_fetched(hdrbl
, omap
, !fin
, r
);
1515 class C_IO_Dir_OMAP_Fetched
: public CDirIOContext
{
1516 MDSInternalContextBase
*fin
;
1520 map
<string
, bufferlist
> omap
;
1522 int ret1
, ret2
, ret3
;
1524 C_IO_Dir_OMAP_Fetched(CDir
*d
, MDSInternalContextBase
*f
) :
1525 CDirIOContext(d
), fin(f
), ret1(0), ret2(0), ret3(0) { }
1526 void finish(int r
) override
{
1527 // check the correctness of backtrace
1528 if (r
>= 0 && ret3
!= -ECANCELED
)
1529 dir
->inode
->verify_diri_backtrace(btbl
, ret3
);
1530 if (r
>= 0) r
= ret1
;
1531 if (r
>= 0) r
= ret2
;
1533 dir
->_omap_fetch_more(hdrbl
, omap
, fin
);
1535 dir
->_omap_fetched(hdrbl
, omap
, !fin
, r
);
1542 void CDir::_omap_fetch(MDSInternalContextBase
*c
, const std::set
<dentry_key_t
>& keys
)
1544 C_IO_Dir_OMAP_Fetched
*fin
= new C_IO_Dir_OMAP_Fetched(this, c
);
1545 object_t oid
= get_ondisk_object();
1546 object_locator_t
oloc(cache
->mds
->mdsmap
->get_metadata_pool());
1548 rd
.omap_get_header(&fin
->hdrbl
, &fin
->ret1
);
1551 rd
.omap_get_vals("", "", g_conf
->mds_dir_keys_per_op
,
1552 &fin
->omap
, &fin
->more
, &fin
->ret2
);
1555 std::set
<std::string
> str_keys
;
1556 for (auto p
= keys
.begin(); p
!= keys
.end(); ++p
) {
1559 str_keys
.insert(str
);
1561 rd
.omap_get_vals_by_keys(str_keys
, &fin
->omap
, &fin
->ret2
);
1563 // check the correctness of backtrace
1564 if (g_conf
->mds_verify_backtrace
> 0 && frag
== frag_t()) {
1565 rd
.getxattr("parent", &fin
->btbl
, &fin
->ret3
);
1566 rd
.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1568 fin
->ret3
= -ECANCELED
;
1571 cache
->mds
->objecter
->read(oid
, oloc
, rd
, CEPH_NOSNAP
, NULL
, 0,
1572 new C_OnFinisher(fin
, cache
->mds
->finisher
));
1575 void CDir::_omap_fetch_more(
1577 map
<string
, bufferlist
>& omap
,
1578 MDSInternalContextBase
*c
)
1580 // we have more omap keys to fetch!
1581 object_t oid
= get_ondisk_object();
1582 object_locator_t
oloc(cache
->mds
->mdsmap
->get_metadata_pool());
1583 C_IO_Dir_OMAP_FetchedMore
*fin
= new C_IO_Dir_OMAP_FetchedMore(this, c
);
1584 fin
->hdrbl
.claim(hdrbl
);
1585 fin
->omap
.swap(omap
);
1587 rd
.omap_get_vals(fin
->omap
.rbegin()->first
,
1588 "", /* filter prefix */
1589 g_conf
->mds_dir_keys_per_op
,
1593 cache
->mds
->objecter
->read(oid
, oloc
, rd
, CEPH_NOSNAP
, NULL
, 0,
1594 new C_OnFinisher(fin
, cache
->mds
->finisher
));
1597 CDentry
*CDir::_load_dentry(
1598 const std::string
&key
,
1599 const std::string
&dname
,
1600 const snapid_t last
,
1603 const std::set
<snapid_t
> *snaps
,
1605 list
<CInode
*> *undef_inodes
)
1607 bufferlist::iterator q
= bl
.begin();
1616 dout(20) << "_fetched pos " << pos
<< " marker '" << type
<< "' dname '" << dname
1617 << " [" << first
<< "," << last
<< "]"
1621 if (snaps
&& last
!= CEPH_NOSNAP
) {
1622 set
<snapid_t
>::const_iterator p
= snaps
->lower_bound(first
);
1623 if (p
== snaps
->end() || *p
> last
) {
1624 dout(10) << " skipping stale dentry on [" << first
<< "," << last
<< "]" << dendl
;
1630 * look for existing dentry for _last_ snap, because unlink +
1631 * create may leave a "hole" (epochs during which the dentry
1632 * doesn't exist) but for which no explicit negative dentry is in
1637 dn
= lookup_exact_snap(dname
, last
);
1639 dn
= lookup(dname
, last
);
1644 unsigned char d_type
;
1646 ::decode(d_type
, q
);
1650 stale_items
.insert(key
);
1651 *force_dirty
= true;
1657 if (dn
->get_linkage()->get_inode() == 0) {
1658 dout(12) << "_fetched had NEG dentry " << *dn
<< dendl
;
1660 dout(12) << "_fetched had dentry " << *dn
<< dendl
;
1664 dn
= add_remote_dentry(dname
, ino
, d_type
, first
, last
);
1667 CInode
*in
= cache
->get_inode(ino
); // we may or may not have it.
1669 dn
->link_remote(dn
->get_linkage(), in
);
1670 dout(12) << "_fetched got remote link " << ino
<< " which we have " << *in
<< dendl
;
1672 dout(12) << "_fetched got remote link " << ino
<< " (dont' have it)" << dendl
;
1676 else if (type
== 'I') {
1679 // Load inode data before looking up or constructing CInode
1680 InodeStore inode_data
;
1681 inode_data
.decode_bare(q
);
1685 stale_items
.insert(key
);
1686 *force_dirty
= true;
1691 bool undef_inode
= false;
1693 CInode
*in
= dn
->get_linkage()->get_inode();
1695 dout(12) << "_fetched had dentry " << *dn
<< dendl
;
1696 if (in
->state_test(CInode::STATE_REJOINUNDEF
)) {
1697 undef_inodes
->push_back(in
);
1701 dout(12) << "_fetched had NEG dentry " << *dn
<< dendl
;
1704 if (!dn
|| undef_inode
) {
1706 CInode
*in
= cache
->get_inode(inode_data
.inode
.ino
, last
);
1707 if (!in
|| undef_inode
) {
1708 if (undef_inode
&& in
)
1711 in
= new CInode(cache
, true, first
, last
);
1713 in
->inode
= inode_data
.inode
;
1715 if (in
->is_symlink())
1716 in
->symlink
= inode_data
.symlink
;
1718 in
->dirfragtree
.swap(inode_data
.dirfragtree
);
1719 in
->xattrs
.swap(inode_data
.xattrs
);
1720 in
->old_inodes
.swap(inode_data
.old_inodes
);
1721 if (!in
->old_inodes
.empty()) {
1722 snapid_t min_first
= in
->old_inodes
.rbegin()->first
+ 1;
1723 if (min_first
> in
->first
)
1724 in
->first
= min_first
;
1727 in
->oldest_snap
= inode_data
.oldest_snap
;
1728 in
->decode_snap_blob(inode_data
.snap_blob
);
1729 if (snaps
&& !in
->snaprealm
)
1730 in
->purge_stale_snap_data(*snaps
);
1733 cache
->add_inode(in
); // add
1734 dn
= add_primary_dentry(dname
, in
, first
, last
); // link
1736 dout(12) << "_fetched got " << *dn
<< " " << *in
<< dendl
;
1738 if (in
->inode
.is_dirty_rstat())
1739 in
->mark_dirty_rstat();
1741 //in->hack_accessed = false;
1742 //in->hack_load_stamp = ceph_clock_now();
1743 //num_new_inodes_loaded++;
1745 dout(0) << "_fetched badness: got (but i already had) " << *in
1746 << " mode " << in
->inode
.mode
1747 << " mtime " << in
->inode
.mtime
<< dendl
;
1748 string dirpath
, inopath
;
1749 this->inode
->make_path_string(dirpath
);
1750 in
->make_path_string(inopath
);
1751 cache
->mds
->clog
->error() << "loaded dup inode " << inode_data
.inode
.ino
1752 << " [" << first
<< "," << last
<< "] v" << inode_data
.inode
.version
1753 << " at " << dirpath
<< "/" << dname
1754 << ", but inode " << in
->vino() << " v" << in
->inode
.version
1755 << " already exists at " << inopath
;
1760 std::ostringstream oss
;
1761 oss
<< "Invalid tag char '" << type
<< "' pos " << pos
;
1762 throw buffer::malformed_input(oss
.str());
1768 void CDir::_omap_fetched(bufferlist
& hdrbl
, map
<string
, bufferlist
>& omap
,
1769 bool complete
, int r
)
1771 LogChannelRef clog
= cache
->mds
->clog
;
1772 dout(10) << "_fetched header " << hdrbl
.length() << " bytes "
1773 << omap
.size() << " keys for " << *this << dendl
;
1775 assert(r
== 0 || r
== -ENOENT
|| r
== -ENODATA
);
1777 assert(!is_frozen());
1779 if (hdrbl
.length() == 0) {
1780 dout(0) << "_fetched missing object for " << *this << dendl
;
1782 clog
->error() << "dir " << dirfrag() << " object missing on disk; some "
1783 "files may be lost (" << get_path() << ")";
1791 bufferlist::iterator p
= hdrbl
.begin();
1793 ::decode(got_fnode
, p
);
1794 } catch (const buffer::error
&err
) {
1795 derr
<< "Corrupt fnode in dirfrag " << dirfrag()
1796 << ": " << err
<< dendl
;
1797 clog
->warn() << "Corrupt fnode header in " << dirfrag() << ": "
1798 << err
<< " (" << get_path() << ")";
1803 clog
->warn() << "header buffer of dir " << dirfrag() << " has "
1804 << hdrbl
.length() - p
.get_off() << " extra bytes ("
1805 << get_path() << ")";
1811 dout(10) << "_fetched version " << got_fnode
.version
<< dendl
;
1813 // take the loaded fnode?
1814 // only if we are a fresh CDir* with no prior state.
1815 if (get_version() == 0) {
1816 assert(!is_projected());
1817 assert(!state_test(STATE_COMMITTING
));
1819 projected_version
= committing_version
= committed_version
= got_fnode
.version
;
1821 if (state_test(STATE_REJOINUNDEF
)) {
1822 assert(cache
->mds
->is_rejoin());
1823 state_clear(STATE_REJOINUNDEF
);
1824 cache
->opened_undef_dirfrag(this);
1828 list
<CInode
*> undef_inodes
;
1830 // purge stale snaps?
1831 // only if we have past_parents open!
1832 bool force_dirty
= false;
1833 const set
<snapid_t
> *snaps
= NULL
;
1834 SnapRealm
*realm
= inode
->find_snaprealm();
1835 if (!realm
->have_past_parents_open()) {
1836 dout(10) << " no snap purge, one or more past parents NOT open" << dendl
;
1837 } else if (fnode
.snap_purged_thru
< realm
->get_last_destroyed()) {
1838 snaps
= &realm
->get_snaps();
1839 dout(10) << " snap_purged_thru " << fnode
.snap_purged_thru
1840 << " < " << realm
->get_last_destroyed()
1841 << ", snap purge based on " << *snaps
<< dendl
;
1842 if (get_num_snap_items() == 0) {
1843 fnode
.snap_purged_thru
= realm
->get_last_destroyed();
1848 unsigned pos
= omap
.size() - 1;
1849 for (map
<string
, bufferlist
>::reverse_iterator p
= omap
.rbegin();
1854 dentry_key_t::decode_helper(p
->first
, dname
, last
);
1859 p
->first
, dname
, last
, p
->second
, pos
, snaps
,
1860 &force_dirty
, &undef_inodes
);
1861 } catch (const buffer::error
&err
) {
1862 cache
->mds
->clog
->warn() << "Corrupt dentry '" << dname
<< "' in "
1863 "dir frag " << dirfrag() << ": "
1864 << err
<< "(" << get_path() << ")";
1866 // Remember that this dentry is damaged. Subsequent operations
1867 // that try to act directly on it will get their EIOs, but this
1868 // dirfrag as a whole will continue to look okay (minus the
1869 // mysteriously-missing dentry)
1870 go_bad_dentry(last
, dname
);
1872 // Anyone who was WAIT_DENTRY for this guy will get kicked
1873 // to RetryRequest, and hit the DamageTable-interrogating path.
1874 // Stats will now be bogus because we will think we're complete,
1875 // but have 1 or more missing dentries.
1879 if (dn
&& (wanted_items
.count(dname
) > 0 || !complete
)) {
1880 dout(10) << " touching wanted dn " << *dn
<< dendl
;
1881 inode
->mdcache
->touch_dentry(dn
);
1884 /** clean underwater item?
1885 * Underwater item is something that is dirty in our cache from
1886 * journal replay, but was previously flushed to disk before the
1889 * We only do this is committed_version == 0. that implies either
1890 * - this is a fetch after from a clean/empty CDir is created
1891 * (and has no effect, since the dn won't exist); or
1892 * - this is a fetch after _recovery_, which is what we're worried
1893 * about. Items that are marked dirty from the journal should be
1894 * marked clean if they appear on disk.
1896 if (committed_version
== 0 &&
1898 dn
->get_version() <= got_fnode
.version
&&
1900 dout(10) << "_fetched had underwater dentry " << *dn
<< ", marking clean" << dendl
;
1903 if (dn
->get_linkage()->is_primary()) {
1904 assert(dn
->get_linkage()->get_inode()->get_version() <= got_fnode
.version
);
1905 dout(10) << "_fetched had underwater inode " << *dn
->get_linkage()->get_inode() << ", marking clean" << dendl
;
1906 dn
->get_linkage()->get_inode()->mark_clean();
1911 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
1913 // mark complete, !fetching
1915 wanted_items
.clear();
1917 state_clear(STATE_FETCHING
);
1919 if (scrub_infop
&& scrub_infop
->need_scrub_local
) {
1920 scrub_infop
->need_scrub_local
= false;
1925 // open & force frags
1926 while (!undef_inodes
.empty()) {
1927 CInode
*in
= undef_inodes
.front();
1928 undef_inodes
.pop_front();
1929 in
->state_clear(CInode::STATE_REJOINUNDEF
);
1930 cache
->opened_undef_inode(in
);
1933 // dirty myself to remove stale snap dentries
1934 if (force_dirty
&& !inode
->mdcache
->is_readonly())
1941 finish_waiting(WAIT_COMPLETE
, 0);
1945 void CDir::_go_bad()
1947 if (get_version() == 0)
1949 state_set(STATE_BADFRAG
);
1950 // mark complete, !fetching
1952 state_clear(STATE_FETCHING
);
1956 finish_waiting(WAIT_COMPLETE
, -EIO
);
1959 void CDir::go_bad_dentry(snapid_t last
, const std::string
&dname
)
1961 dout(10) << "go_bad_dentry " << dname
<< dendl
;
1962 const bool fatal
= cache
->mds
->damage_table
.notify_dentry(
1963 inode
->ino(), frag
, last
, dname
, get_path() + "/" + dname
);
1965 cache
->mds
->damaged();
1966 ceph_abort(); // unreachable, damaged() respawns us
1970 void CDir::go_bad(bool complete
)
1972 dout(10) << "go_bad " << frag
<< dendl
;
1973 const bool fatal
= cache
->mds
->damage_table
.notify_dirfrag(
1974 inode
->ino(), frag
, get_path());
1976 cache
->mds
->damaged();
1977 ceph_abort(); // unreachable, damaged() respawns us
1986 // -----------------------
1992 * @param want - min version i want committed
1993 * @param c - callback for completion
1995 void CDir::commit(version_t want
, MDSInternalContextBase
*c
, bool ignore_authpinnability
, int op_prio
)
1997 dout(10) << "commit want " << want
<< " on " << *this << dendl
;
1998 if (want
== 0) want
= get_version();
2001 assert(want
<= get_version() || get_version() == 0); // can't commit the future
2002 assert(want
> committed_version
); // the caller is stupid
2004 assert(ignore_authpinnability
|| can_auth_pin());
2006 if (inode
->inode
.nlink
== 0 && !inode
->snaprealm
) {
2007 dout(7) << "commit dirfrag for unlinked directory, mark clean" << dendl
;
2008 try_remove_dentries_for_stray();
2010 cache
->mds
->queue_waiter(c
);
2014 // note: queue up a noop if necessary, so that we always
2017 c
= new C_MDSInternalNoop
;
2019 // auth_pin on first waiter
2020 if (waiting_for_commit
.empty())
2022 waiting_for_commit
[want
].push_back(c
);
2025 _commit(want
, op_prio
);
2028 class C_IO_Dir_Committed
: public CDirIOContext
{
2031 C_IO_Dir_Committed(CDir
*d
, version_t v
) : CDirIOContext(d
), version(v
) { }
2032 void finish(int r
) override
{
2033 dir
->_committed(r
, version
);
2038 * Flush out the modified dentries in this dir. Keep the bufferlist
2039 * below max_write_size;
2041 void CDir::_omap_commit(int op_prio
)
2043 dout(10) << "_omap_commit" << dendl
;
2045 unsigned max_write_size
= cache
->max_dir_commit_size
;
2046 unsigned write_size
= 0;
2049 op_prio
= CEPH_MSG_PRIO_DEFAULT
;
2052 const set
<snapid_t
> *snaps
= NULL
;
2053 SnapRealm
*realm
= inode
->find_snaprealm();
2054 if (!realm
->have_past_parents_open()) {
2055 dout(10) << " no snap purge, one or more past parents NOT open" << dendl
;
2056 } else if (fnode
.snap_purged_thru
< realm
->get_last_destroyed()) {
2057 snaps
= &realm
->get_snaps();
2058 dout(10) << " snap_purged_thru " << fnode
.snap_purged_thru
2059 << " < " << realm
->get_last_destroyed()
2060 << ", snap purge based on " << *snaps
<< dendl
;
2061 // fnode.snap_purged_thru = realm->get_last_destroyed();
2064 set
<string
> to_remove
;
2065 map
<string
, bufferlist
> to_set
;
2067 C_GatherBuilder
gather(g_ceph_context
,
2068 new C_OnFinisher(new C_IO_Dir_Committed(this,
2070 cache
->mds
->finisher
));
2073 object_t oid
= get_ondisk_object();
2074 object_locator_t
oloc(cache
->mds
->mdsmap
->get_metadata_pool());
2076 if (!stale_items
.empty()) {
2077 for (compact_set
<string
>::iterator p
= stale_items
.begin();
2078 p
!= stale_items
.end();
2080 to_remove
.insert(*p
);
2081 write_size
+= (*p
).length();
2083 stale_items
.clear();
2086 for (map_t::iterator p
= items
.begin();
2087 p
!= items
.end(); ) {
2088 CDentry
*dn
= p
->second
;
2092 dn
->key().encode(key
);
2094 if (dn
->last
!= CEPH_NOSNAP
&&
2095 snaps
&& try_trim_snap_dentry(dn
, *snaps
)) {
2096 dout(10) << " rm " << key
<< dendl
;
2097 write_size
+= key
.length();
2098 to_remove
.insert(key
);
2102 if (!dn
->is_dirty() &&
2103 (!dn
->state_test(CDentry::STATE_FRAGMENTING
) || dn
->get_linkage()->is_null()))
2104 continue; // skip clean dentries
2106 if (dn
->get_linkage()->is_null()) {
2107 dout(10) << " rm " << dn
->name
<< " " << *dn
<< dendl
;
2108 write_size
+= key
.length();
2109 to_remove
.insert(key
);
2111 dout(10) << " set " << dn
->name
<< " " << *dn
<< dendl
;
2113 _encode_dentry(dn
, dnbl
, snaps
);
2114 write_size
+= key
.length() + dnbl
.length();
2115 to_set
[key
].swap(dnbl
);
2118 if (write_size
>= max_write_size
) {
2120 op
.priority
= op_prio
;
2122 // don't create new dirfrag blindly
2123 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING
))
2124 op
.stat(NULL
, (ceph::real_time
*) NULL
, NULL
);
2126 if (!to_set
.empty())
2127 op
.omap_set(to_set
);
2128 if (!to_remove
.empty())
2129 op
.omap_rm_keys(to_remove
);
2131 cache
->mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
2132 ceph::real_clock::now(),
2133 0, gather
.new_sub());
2142 op
.priority
= op_prio
;
2144 // don't create new dirfrag blindly
2145 if (!is_new() && !state_test(CDir::STATE_FRAGMENTING
))
2146 op
.stat(NULL
, (ceph::real_time
*)NULL
, NULL
);
2149 * save the header at the last moment.. If we were to send it off before other
2150 * updates, but die before sending them all, we'd think that the on-disk state
2151 * was fully committed even though it wasn't! However, since the messages are
2152 * strictly ordered between the MDS and the OSD, and since messages to a given
2153 * PG are strictly ordered, if we simply send the message containing the header
2154 * off last, we cannot get our header into an incorrect state.
2157 ::encode(fnode
, header
);
2158 op
.omap_set_header(header
);
2160 if (!to_set
.empty())
2161 op
.omap_set(to_set
);
2162 if (!to_remove
.empty())
2163 op
.omap_rm_keys(to_remove
);
2165 cache
->mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
2166 ceph::real_clock::now(),
2167 0, gather
.new_sub());
2172 void CDir::_encode_dentry(CDentry
*dn
, bufferlist
& bl
,
2173 const set
<snapid_t
> *snaps
)
2175 // clear dentry NEW flag, if any. we can no longer silently drop it.
2178 ::encode(dn
->first
, bl
);
2180 // primary or remote?
2181 if (dn
->linkage
.is_remote()) {
2182 inodeno_t ino
= dn
->linkage
.get_remote_ino();
2183 unsigned char d_type
= dn
->linkage
.get_remote_d_type();
2184 dout(14) << " pos " << bl
.length() << " dn '" << dn
->name
<< "' remote ino " << ino
<< dendl
;
2186 // marker, name, ino
2187 bl
.append('L'); // remote link
2189 ::encode(d_type
, bl
);
2190 } else if (dn
->linkage
.is_primary()) {
2192 CInode
*in
= dn
->linkage
.get_inode();
2195 dout(14) << " pos " << bl
.length() << " dn '" << dn
->name
<< "' inode " << *in
<< dendl
;
2197 // marker, name, inode, [symlink string]
2198 bl
.append('I'); // inode
2200 if (in
->is_multiversion()) {
2201 if (!in
->snaprealm
) {
2203 in
->purge_stale_snap_data(*snaps
);
2204 } else if (in
->snaprealm
->have_past_parents_open()) {
2205 in
->purge_stale_snap_data(in
->snaprealm
->get_snaps());
2209 bufferlist snap_blob
;
2210 in
->encode_snap_blob(snap_blob
);
2211 in
->encode_bare(bl
, cache
->mds
->mdsmap
->get_up_features(), &snap_blob
);
2213 assert(!dn
->linkage
.is_null());
2217 void CDir::_commit(version_t want
, int op_prio
)
2219 dout(10) << "_commit want " << want
<< " on " << *this << dendl
;
2221 // we can't commit things in the future.
2222 // (even the projected future.)
2223 assert(want
<= get_version() || get_version() == 0);
2225 // check pre+postconditions.
2228 // already committed?
2229 if (committed_version
>= want
) {
2230 dout(10) << "already committed " << committed_version
<< " >= " << want
<< dendl
;
2233 // already committing >= want?
2234 if (committing_version
>= want
) {
2235 dout(10) << "already committing " << committing_version
<< " >= " << want
<< dendl
;
2236 assert(state_test(STATE_COMMITTING
));
2240 // alrady committed an older version?
2241 if (committing_version
> committed_version
) {
2242 dout(10) << "already committing older " << committing_version
<< ", waiting for that to finish" << dendl
;
2247 committing_version
= get_version();
2249 // mark committing (if not already)
2250 if (!state_test(STATE_COMMITTING
)) {
2251 dout(10) << "marking committing" << dendl
;
2252 state_set(STATE_COMMITTING
);
2255 if (cache
->mds
->logger
) cache
->mds
->logger
->inc(l_mds_dir_commit
);
2257 _omap_commit(op_prio
);
2264 * @param v version i just committed
2266 void CDir::_committed(int r
, version_t v
)
2269 // the directory could be partly purged during MDS failover
2270 if (r
== -ENOENT
&& committed_version
== 0 &&
2271 inode
->inode
.nlink
== 0 && inode
->snaprealm
) {
2272 inode
->state_set(CInode::STATE_MISSINGOBJS
);
2276 dout(1) << "commit error " << r
<< " v " << v
<< dendl
;
2277 cache
->mds
->clog
->error() << "failed to commit dir " << dirfrag() << " object,"
2279 cache
->mds
->handle_write_error(r
);
2284 dout(10) << "_committed v " << v
<< " on " << *this << dendl
;
2287 bool stray
= inode
->is_stray();
2290 assert(v
> committed_version
);
2291 assert(v
<= committing_version
);
2292 committed_version
= v
;
2294 // _all_ commits done?
2295 if (committing_version
== committed_version
)
2296 state_clear(CDir::STATE_COMMITTING
);
2298 // _any_ commit, even if we've been redirtied, means we're no longer new.
2299 item_new
.remove_myself();
2302 if (committed_version
== get_version())
2306 for (map_t::iterator it
= items
.begin();
2307 it
!= items
.end(); ) {
2308 CDentry
*dn
= it
->second
;
2312 if (dn
->linkage
.is_primary()) {
2313 CInode
*in
= dn
->linkage
.get_inode();
2315 assert(in
->is_auth());
2317 if (committed_version
>= in
->get_version()) {
2318 if (in
->is_dirty()) {
2319 dout(15) << " dir " << committed_version
<< " >= inode " << in
->get_version() << " now clean " << *in
<< dendl
;
2323 dout(15) << " dir " << committed_version
<< " < inode " << in
->get_version() << " still dirty " << *in
<< dendl
;
2324 assert(in
->is_dirty() || in
->last
< CEPH_NOSNAP
); // special case for cow snap items (not predirtied)
2329 if (committed_version
>= dn
->get_version()) {
2330 if (dn
->is_dirty()) {
2331 dout(15) << " dir " << committed_version
<< " >= dn " << dn
->get_version() << " now clean " << *dn
<< dendl
;
2334 // drop clean null stray dentries immediately
2336 dn
->get_num_ref() == 0 &&
2337 !dn
->is_projected() &&
2338 dn
->get_linkage()->is_null())
2342 dout(15) << " dir " << committed_version
<< " < dn " << dn
->get_version() << " still dirty " << *dn
<< dendl
;
2347 bool were_waiters
= !waiting_for_commit
.empty();
2349 compact_map
<version_t
, list
<MDSInternalContextBase
*> >::iterator p
= waiting_for_commit
.begin();
2350 while (p
!= waiting_for_commit
.end()) {
2351 compact_map
<version_t
, list
<MDSInternalContextBase
*> >::iterator n
= p
;
2353 if (p
->first
> committed_version
) {
2354 dout(10) << " there are waiters for " << p
->first
<< ", committing again" << dendl
;
2355 _commit(p
->first
, -1);
2358 cache
->mds
->queue_waiters(p
->second
);
2359 waiting_for_commit
.erase(p
);
2363 // try drop dentries in this dirfrag if it's about to be purged
2364 if (inode
->inode
.nlink
== 0 && inode
->snaprealm
)
2365 cache
->maybe_eval_stray(inode
, true);
2367 // unpin if we kicked the last waiter.
2369 waiting_for_commit
.empty())
2378 void CDir::encode_export(bufferlist
& bl
)
2380 assert(!is_projected());
2381 ::encode(first
, bl
);
2382 ::encode(fnode
, bl
);
2383 ::encode(dirty_old_rstat
, bl
);
2384 ::encode(committed_version
, bl
);
2386 ::encode(state
, bl
);
2387 ::encode(dir_rep
, bl
);
2389 ::encode(pop_me
, bl
);
2390 ::encode(pop_auth_subtree
, bl
);
2392 ::encode(dir_rep_by
, bl
);
2393 ::encode(replica_map
, bl
);
2395 get(PIN_TEMPEXPORTING
);
2398 void CDir::finish_export(utime_t now
)
2400 state
&= MASK_STATE_EXPORT_KEPT
;
2401 pop_auth_subtree_nested
.sub(now
, cache
->decayrate
, pop_auth_subtree
);
2403 pop_auth_subtree
.zero(now
);
2404 put(PIN_TEMPEXPORTING
);
2405 dirty_old_rstat
.clear();
2408 void CDir::decode_import(bufferlist::iterator
& blp
, utime_t now
, LogSegment
*ls
)
2410 ::decode(first
, blp
);
2411 ::decode(fnode
, blp
);
2412 ::decode(dirty_old_rstat
, blp
);
2413 projected_version
= fnode
.version
;
2414 ::decode(committed_version
, blp
);
2415 committing_version
= committed_version
;
2419 state
&= MASK_STATE_IMPORT_KEPT
;
2420 state_set(STATE_AUTH
| (s
& MASK_STATE_EXPORTED
));
2427 ::decode(dir_rep
, blp
);
2429 ::decode(pop_me
, now
, blp
);
2430 ::decode(pop_auth_subtree
, now
, blp
);
2431 pop_auth_subtree_nested
.add(now
, cache
->decayrate
, pop_auth_subtree
);
2433 ::decode(dir_rep_by
, blp
);
2434 ::decode(replica_map
, blp
);
2435 if (!replica_map
.empty()) get(PIN_REPLICATED
);
2437 replica_nonce
= 0; // no longer defined
2439 // did we import some dirty scatterlock data?
2440 if (dirty_old_rstat
.size() ||
2441 !(fnode
.rstat
== fnode
.accounted_rstat
)) {
2442 cache
->mds
->locker
->mark_updated_scatterlock(&inode
->nestlock
);
2443 ls
->dirty_dirfrag_nest
.push_back(&inode
->item_dirty_dirfrag_nest
);
2445 if (!(fnode
.fragstat
== fnode
.accounted_fragstat
)) {
2446 cache
->mds
->locker
->mark_updated_scatterlock(&inode
->filelock
);
2447 ls
->dirty_dirfrag_dir
.push_back(&inode
->item_dirty_dirfrag_dir
);
2449 if (is_dirty_dft()) {
2450 if (inode
->dirfragtreelock
.get_state() != LOCK_MIX
&&
2451 inode
->dirfragtreelock
.is_stable()) {
2452 // clear stale dirtydft
2453 state_clear(STATE_DIRTYDFT
);
2455 cache
->mds
->locker
->mark_updated_scatterlock(&inode
->dirfragtreelock
);
2456 ls
->dirty_dirfrag_dirfragtree
.push_back(&inode
->item_dirty_dirfrag_dirfragtree
);
2464 /********************************
2469 * if dir_auth.first == parent, auth is same as inode.
2470 * unless .second != unknown, in which case that sticks.
2472 mds_authority_t
CDir::authority() const
2474 if (is_subtree_root())
2477 return inode
->authority();
2480 /** is_subtree_root()
2481 * true if this is an auth delegation point.
2482 * that is, dir_auth != default (parent,unknown)
2484 * some key observations:
2486 * - any region bound will be an export, or frozen.
2488 * note that this DOES heed dir_auth.pending
2491 bool CDir::is_subtree_root()
2493 if (dir_auth == CDIR_AUTH_DEFAULT) {
2494 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2495 //<< " on " << ino() << dendl;
2498 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
2499 //<< " on " << ino() << dendl;
2506 * true if we are x, or an ancestor of x
2508 bool CDir::contains(CDir
*x
)
2513 x
= x
->get_inode()->get_projected_parent_dir();
2523 void CDir::set_dir_auth(mds_authority_t a
)
2525 dout(10) << "setting dir_auth=" << a
2526 << " from " << dir_auth
2527 << " on " << *this << dendl
;
2529 bool was_subtree
= is_subtree_root();
2530 bool was_ambiguous
= dir_auth
.second
>= 0;
2535 // new subtree root?
2536 if (!was_subtree
&& is_subtree_root()) {
2537 dout(10) << " new subtree root, adjusting auth_pins" << dendl
;
2539 // adjust nested auth pins
2540 if (get_cum_auth_pins())
2541 inode
->adjust_nested_auth_pins(-1, NULL
);
2543 // unpin parent of frozen dir/tree?
2544 if (inode
->is_auth() && (is_frozen_tree_root() || is_frozen_dir()))
2545 inode
->auth_unpin(this);
2547 if (was_subtree
&& !is_subtree_root()) {
2548 dout(10) << " old subtree root, adjusting auth_pins" << dendl
;
2550 // adjust nested auth pins
2551 if (get_cum_auth_pins())
2552 inode
->adjust_nested_auth_pins(1, NULL
);
2554 // pin parent of frozen dir/tree?
2555 if (inode
->is_auth() && (is_frozen_tree_root() || is_frozen_dir()))
2556 inode
->auth_pin(this);
2559 // newly single auth?
2560 if (was_ambiguous
&& dir_auth
.second
== CDIR_AUTH_UNKNOWN
) {
2561 list
<MDSInternalContextBase
*> ls
;
2562 take_waiting(WAIT_SINGLEAUTH
, ls
);
2563 cache
->mds
->queue_waiters(ls
);
2568 /*****************************************
2569 * AUTH PINS and FREEZING
2571 * the basic plan is that auth_pins only exist in auth regions, and they
2572 * prevent a freeze (and subsequent auth change).
2574 * however, we also need to prevent a parent from freezing if a child is frozen.
2575 * for that reason, the parent inode of a frozen directory is auth_pinned.
2577 * the oddity is when the frozen directory is a subtree root. if that's the case,
2578 * the parent inode isn't frozen. which means that when subtree authority is adjusted
2579 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
2584 void CDir::auth_pin(void *by
)
2590 #ifdef MDS_AUTHPIN_SET
2591 auth_pin_set
.insert(by
);
2594 dout(10) << "auth_pin by " << by
2596 << " count now " << auth_pins
<< " + " << nested_auth_pins
<< dendl
;
2599 if (!is_subtree_root() &&
2600 get_cum_auth_pins() == 1)
2601 inode
->adjust_nested_auth_pins(1, by
);
2604 void CDir::auth_unpin(void *by
)
2608 #ifdef MDS_AUTHPIN_SET
2609 assert(auth_pin_set
.count(by
));
2610 auth_pin_set
.erase(auth_pin_set
.find(by
));
2615 dout(10) << "auth_unpin by " << by
2617 << " count now " << auth_pins
<< " + " << nested_auth_pins
<< dendl
;
2618 assert(auth_pins
>= 0);
2620 int newcum
= get_cum_auth_pins();
2622 maybe_finish_freeze(); // pending freeze?
2625 if (!is_subtree_root() &&
2627 inode
->adjust_nested_auth_pins(-1, by
);
2630 void CDir::adjust_nested_auth_pins(int inc
, int dirinc
, void *by
)
2633 nested_auth_pins
+= inc
;
2634 dir_auth_pins
+= dirinc
;
2636 dout(15) << "adjust_nested_auth_pins " << inc
<< "/" << dirinc
<< " on " << *this
2637 << " by " << by
<< " count now "
2638 << auth_pins
<< " + " << nested_auth_pins
<< dendl
;
2639 assert(nested_auth_pins
>= 0);
2640 assert(dir_auth_pins
>= 0);
2642 int newcum
= get_cum_auth_pins();
2644 maybe_finish_freeze(); // pending freeze?
2647 if (!is_subtree_root()) {
2649 inode
->adjust_nested_auth_pins(-1, by
);
2650 else if (newcum
== inc
)
2651 inode
->adjust_nested_auth_pins(1, by
);
2655 #ifdef MDS_VERIFY_FRAGSTAT
2656 void CDir::verify_fragstat()
2658 assert(is_complete());
2659 if (inode
->is_stray())
2663 memset(&c
, 0, sizeof(c
));
2665 for (map_t::iterator it
= items
.begin();
2668 CDentry
*dn
= it
->second
;
2672 dout(10) << " " << *dn
<< dendl
;
2673 if (dn
->is_primary())
2674 dout(10) << " " << *dn
->inode
<< dendl
;
2676 if (dn
->is_primary()) {
2677 if (dn
->inode
->is_dir())
2682 if (dn
->is_remote()) {
2683 if (dn
->get_remote_d_type() == DT_DIR
)
2690 if (c
.nsubdirs
!= fnode
.fragstat
.nsubdirs
||
2691 c
.nfiles
!= fnode
.fragstat
.nfiles
) {
2692 dout(0) << "verify_fragstat failed " << fnode
.fragstat
<< " on " << *this << dendl
;
2693 dout(0) << " i count " << c
<< dendl
;
2696 dout(0) << "verify_fragstat ok " << fnode
.fragstat
<< " on " << *this << dendl
;
2701 /*****************************************************************************
2707 bool CDir::freeze_tree()
2709 assert(!is_frozen());
2710 assert(!is_freezing());
2713 if (is_freezeable(true)) {
2718 state_set(STATE_FREEZINGTREE
);
2719 ++num_freezing_trees
;
2720 dout(10) << "freeze_tree waiting " << *this << dendl
;
2725 void CDir::_freeze_tree()
2727 dout(10) << "_freeze_tree " << *this << dendl
;
2728 assert(is_freezeable(true));
2731 if (state_test(STATE_FREEZINGTREE
)) {
2732 state_clear(STATE_FREEZINGTREE
); // actually, this may get set again by next context?
2733 --num_freezing_trees
;
2735 state_set(STATE_FROZENTREE
);
2739 // auth_pin inode for duration of freeze, if we are not a subtree root.
2740 if (is_auth() && !is_subtree_root())
2741 inode
->auth_pin(this);
2744 void CDir::unfreeze_tree()
2746 dout(10) << "unfreeze_tree " << *this << dendl
;
2748 if (state_test(STATE_FROZENTREE
)) {
2749 // frozen. unfreeze.
2750 state_clear(STATE_FROZENTREE
);
2755 // unpin (may => FREEZEABLE) FIXME: is this order good?
2756 if (is_auth() && !is_subtree_root())
2757 inode
->auth_unpin(this);
2760 finish_waiting(WAIT_UNFREEZE
);
2762 finish_waiting(WAIT_FROZEN
, -1);
2764 // freezing. stop it.
2765 assert(state_test(STATE_FREEZINGTREE
));
2766 state_clear(STATE_FREEZINGTREE
);
2767 --num_freezing_trees
;
2770 finish_waiting(WAIT_UNFREEZE
);
2774 bool CDir::is_freezing_tree() const
2776 if (num_freezing_trees
== 0)
2778 const CDir
*dir
= this;
2780 if (dir
->is_freezing_tree_root()) return true;
2781 if (dir
->is_subtree_root()) return false;
2782 if (dir
->inode
->parent
)
2783 dir
= dir
->inode
->parent
->dir
;
2785 return false; // root on replica
2789 bool CDir::is_frozen_tree() const
2791 if (num_frozen_trees
== 0)
2793 const CDir
*dir
= this;
2795 if (dir
->is_frozen_tree_root()) return true;
2796 if (dir
->is_subtree_root()) return false;
2797 if (dir
->inode
->parent
)
2798 dir
= dir
->inode
->parent
->dir
;
2800 return false; // root on replica
2804 CDir
*CDir::get_frozen_tree_root()
2806 assert(is_frozen());
2809 if (dir
->is_frozen_tree_root())
2811 if (dir
->inode
->parent
)
2812 dir
= dir
->inode
->parent
->dir
;
2818 class C_Dir_AuthUnpin
: public CDirContext
{
2820 explicit C_Dir_AuthUnpin(CDir
*d
) : CDirContext(d
) {}
2821 void finish(int r
) override
{
2822 dir
->auth_unpin(dir
->get_inode());
2826 void CDir::maybe_finish_freeze()
2828 if (auth_pins
!= 1 || dir_auth_pins
!= 0)
2831 // we can freeze the _dir_ even with nested pins...
2832 if (state_test(STATE_FREEZINGDIR
)) {
2835 finish_waiting(WAIT_FROZEN
);
2838 if (nested_auth_pins
!= 0)
2841 if (state_test(STATE_FREEZINGTREE
)) {
2842 if (!is_subtree_root() && inode
->is_frozen()) {
2843 dout(10) << "maybe_finish_freeze !subtree root and frozen inode, waiting for unfreeze on " << inode
<< dendl
;
2844 // retake an auth_pin...
2846 // and release it when the parent inode unfreezes
2847 inode
->add_waiter(WAIT_UNFREEZE
, new C_Dir_AuthUnpin(this));
2853 finish_waiting(WAIT_FROZEN
);
2861 bool CDir::freeze_dir()
2863 assert(!is_frozen());
2864 assert(!is_freezing());
2867 if (is_freezeable_dir(true)) {
2872 state_set(STATE_FREEZINGDIR
);
2873 dout(10) << "freeze_dir + wait " << *this << dendl
;
2878 void CDir::_freeze_dir()
2880 dout(10) << "_freeze_dir " << *this << dendl
;
2881 //assert(is_freezeable_dir(true));
2882 // not always true during split because the original fragment may have frozen a while
2883 // ago and we're just now getting around to breaking it up.
2885 state_clear(STATE_FREEZINGDIR
);
2886 state_set(STATE_FROZENDIR
);
2889 if (is_auth() && !is_subtree_root())
2890 inode
->auth_pin(this); // auth_pin for duration of freeze
2894 void CDir::unfreeze_dir()
2896 dout(10) << "unfreeze_dir " << *this << dendl
;
2898 if (state_test(STATE_FROZENDIR
)) {
2899 state_clear(STATE_FROZENDIR
);
2902 // unpin (may => FREEZEABLE) FIXME: is this order good?
2903 if (is_auth() && !is_subtree_root())
2904 inode
->auth_unpin(this);
2906 finish_waiting(WAIT_UNFREEZE
);
2908 finish_waiting(WAIT_FROZEN
, -1);
2910 // still freezing. stop.
2911 assert(state_test(STATE_FREEZINGDIR
));
2912 state_clear(STATE_FREEZINGDIR
);
2915 finish_waiting(WAIT_UNFREEZE
);
2920 * Slightly less complete than operator<<, because this is intended
2921 * for identifying a directory and its state rather than for dumping
2924 void CDir::dump(Formatter
*f
) const
2928 f
->dump_stream("path") << get_path();
2930 f
->dump_stream("dirfrag") << dirfrag();
2931 f
->dump_int("snapid_first", first
);
2933 f
->dump_stream("projected_version") << get_projected_version();
2934 f
->dump_stream("version") << get_version();
2935 f
->dump_stream("committing_version") << get_committing_version();
2936 f
->dump_stream("committed_version") << get_committed_version();
2938 f
->dump_bool("is_rep", is_rep());
2940 if (get_dir_auth() != CDIR_AUTH_DEFAULT
) {
2941 if (get_dir_auth().second
== CDIR_AUTH_UNKNOWN
) {
2942 f
->dump_stream("dir_auth") << get_dir_auth().first
;
2944 f
->dump_stream("dir_auth") << get_dir_auth();
2947 f
->dump_string("dir_auth", "");
2950 f
->open_array_section("states");
2951 MDSCacheObject::dump_states(f
);
2952 if (state_test(CDir::STATE_COMPLETE
)) f
->dump_string("state", "complete");
2953 if (state_test(CDir::STATE_FREEZINGTREE
)) f
->dump_string("state", "freezingtree");
2954 if (state_test(CDir::STATE_FROZENTREE
)) f
->dump_string("state", "frozentree");
2955 if (state_test(CDir::STATE_FROZENDIR
)) f
->dump_string("state", "frozendir");
2956 if (state_test(CDir::STATE_FREEZINGDIR
)) f
->dump_string("state", "freezingdir");
2957 if (state_test(CDir::STATE_EXPORTBOUND
)) f
->dump_string("state", "exportbound");
2958 if (state_test(CDir::STATE_IMPORTBOUND
)) f
->dump_string("state", "importbound");
2959 if (state_test(CDir::STATE_BADFRAG
)) f
->dump_string("state", "badfrag");
2962 MDSCacheObject::dump(f
);
2965 /****** Scrub Stuff *******/
2967 void CDir::scrub_info_create() const
2969 assert(!scrub_infop
);
2971 // break out of const-land to set up implicit initial state
2972 CDir
*me
= const_cast<CDir
*>(this);
2973 fnode_t
*fn
= me
->get_projected_fnode();
2975 std::unique_ptr
<scrub_info_t
> si(new scrub_info_t());
2977 si
->last_recursive
.version
= si
->recursive_start
.version
=
2978 fn
->recursive_scrub_version
;
2979 si
->last_recursive
.time
= si
->recursive_start
.time
=
2980 fn
->recursive_scrub_stamp
;
2982 si
->last_local
.version
= fn
->localized_scrub_version
;
2983 si
->last_local
.time
= fn
->localized_scrub_stamp
;
2985 me
->scrub_infop
.swap(si
);
2988 void CDir::scrub_initialize(const ScrubHeaderRefConst
& header
)
2990 dout(20) << __func__
<< dendl
;
2991 assert(is_complete());
2992 assert(header
!= nullptr);
2994 // FIXME: weird implicit construction, is someone else meant
2995 // to be calling scrub_info_create first?
2997 assert(scrub_infop
&& !scrub_infop
->directory_scrubbing
);
2999 scrub_infop
->recursive_start
.version
= get_projected_version();
3000 scrub_infop
->recursive_start
.time
= ceph_clock_now();
3002 scrub_infop
->directories_to_scrub
.clear();
3003 scrub_infop
->directories_scrubbing
.clear();
3004 scrub_infop
->directories_scrubbed
.clear();
3005 scrub_infop
->others_to_scrub
.clear();
3006 scrub_infop
->others_scrubbing
.clear();
3007 scrub_infop
->others_scrubbed
.clear();
3009 for (map_t::iterator i
= items
.begin();
3012 // TODO: handle snapshot scrubbing
3013 if (i
->first
.snapid
!= CEPH_NOSNAP
)
3016 CDentry::linkage_t
*dnl
= i
->second
->get_projected_linkage();
3017 if (dnl
->is_primary()) {
3018 if (dnl
->get_inode()->is_dir())
3019 scrub_infop
->directories_to_scrub
.insert(i
->first
);
3021 scrub_infop
->others_to_scrub
.insert(i
->first
);
3022 } else if (dnl
->is_remote()) {
3023 // TODO: check remote linkage
3026 scrub_infop
->directory_scrubbing
= true;
3027 scrub_infop
->header
= header
;
3030 void CDir::scrub_finished()
3032 dout(20) << __func__
<< dendl
;
3033 assert(scrub_infop
&& scrub_infop
->directory_scrubbing
);
3035 assert(scrub_infop
->directories_to_scrub
.empty());
3036 assert(scrub_infop
->directories_scrubbing
.empty());
3037 scrub_infop
->directories_scrubbed
.clear();
3038 assert(scrub_infop
->others_to_scrub
.empty());
3039 assert(scrub_infop
->others_scrubbing
.empty());
3040 scrub_infop
->others_scrubbed
.clear();
3041 scrub_infop
->directory_scrubbing
= false;
3043 scrub_infop
->last_recursive
= scrub_infop
->recursive_start
;
3044 scrub_infop
->last_scrub_dirty
= true;
3047 int CDir::_next_dentry_on_set(set
<dentry_key_t
>& dns
, bool missing_okay
,
3048 MDSInternalContext
*cb
, CDentry
**dnout
)
3053 while (!dns
.empty()) {
3054 set
<dentry_key_t
>::iterator front
= dns
.begin();
3056 dn
= lookup(dnkey
.name
);
3058 if (!is_complete() &&
3059 (!has_bloom() || is_in_bloom(dnkey
.name
))) {
3060 // need to re-read this dirfrag
3066 dout(15) << " we no longer have directory dentry "
3067 << dnkey
.name
<< ", assuming it got renamed" << dendl
;
3071 dout(5) << " we lost dentry " << dnkey
.name
3072 << ", bailing out because that's impossible!" << dendl
;
3076 // okay, we got a dentry
3079 if (dn
->get_projected_version() < scrub_infop
->last_recursive
.version
&&
3080 !(scrub_infop
->header
->get_force())) {
3081 dout(15) << " skip dentry " << dnkey
.name
3082 << ", no change since last scrub" << dendl
;
3093 int CDir::scrub_dentry_next(MDSInternalContext
*cb
, CDentry
**dnout
)
3095 dout(20) << __func__
<< dendl
;
3096 assert(scrub_infop
&& scrub_infop
->directory_scrubbing
);
3098 dout(20) << "trying to scrub directories underneath us" << dendl
;
3099 int rval
= _next_dentry_on_set(scrub_infop
->directories_to_scrub
, true,
3102 dout(20) << __func__
<< " inserted to directories scrubbing: "
3104 scrub_infop
->directories_scrubbing
.insert((*dnout
)->key());
3105 } else if (rval
== EAGAIN
) {
3106 // we don't need to do anything else
3107 } else { // we emptied out the directory scrub set
3108 assert(rval
== ENOENT
);
3109 dout(20) << "no directories left, moving on to other kinds of dentries"
3112 rval
= _next_dentry_on_set(scrub_infop
->others_to_scrub
, false, cb
, dnout
);
3114 dout(20) << __func__
<< " inserted to others scrubbing: "
3116 scrub_infop
->others_scrubbing
.insert((*dnout
)->key());
3119 dout(20) << " returning " << rval
<< " with dn=" << *dnout
<< dendl
;
3123 void CDir::scrub_dentries_scrubbing(list
<CDentry
*> *out_dentries
)
3125 dout(20) << __func__
<< dendl
;
3126 assert(scrub_infop
&& scrub_infop
->directory_scrubbing
);
3128 for (set
<dentry_key_t
>::iterator i
=
3129 scrub_infop
->directories_scrubbing
.begin();
3130 i
!= scrub_infop
->directories_scrubbing
.end();
3132 CDentry
*d
= lookup(i
->name
, i
->snapid
);
3134 out_dentries
->push_back(d
);
3136 for (set
<dentry_key_t
>::iterator i
= scrub_infop
->others_scrubbing
.begin();
3137 i
!= scrub_infop
->others_scrubbing
.end();
3139 CDentry
*d
= lookup(i
->name
, i
->snapid
);
3141 out_dentries
->push_back(d
);
3145 void CDir::scrub_dentry_finished(CDentry
*dn
)
3147 dout(20) << __func__
<< " on dn " << *dn
<< dendl
;
3148 assert(scrub_infop
&& scrub_infop
->directory_scrubbing
);
3149 dentry_key_t dn_key
= dn
->key();
3150 if (scrub_infop
->directories_scrubbing
.erase(dn_key
)) {
3151 scrub_infop
->directories_scrubbed
.insert(dn_key
);
3153 assert(scrub_infop
->others_scrubbing
.count(dn_key
));
3154 scrub_infop
->others_scrubbing
.erase(dn_key
);
3155 scrub_infop
->others_scrubbed
.insert(dn_key
);
3159 void CDir::scrub_maybe_delete_info()
3162 !scrub_infop
->directory_scrubbing
&&
3163 !scrub_infop
->need_scrub_local
&&
3164 !scrub_infop
->last_scrub_dirty
&&
3165 !scrub_infop
->pending_scrub_error
&&
3166 scrub_infop
->dirty_scrub_stamps
.empty()) {
3167 scrub_infop
.reset();
3171 bool CDir::scrub_local()
3173 assert(is_complete());
3174 bool rval
= check_rstats(true);
3178 scrub_infop
->last_local
.time
= ceph_clock_now();
3179 scrub_infop
->last_local
.version
= get_projected_version();
3180 scrub_infop
->pending_scrub_error
= false;
3181 scrub_infop
->last_scrub_dirty
= true;
3183 scrub_infop
->pending_scrub_error
= true;
3184 if (scrub_infop
->header
->get_repair())
3185 cache
->repair_dirfrag_stats(this);
3190 std::string
CDir::get_path() const
3193 get_inode()->make_path_string(path
, true);
3197 bool CDir::should_split_fast() const
3199 // Max size a fragment can be before trigger fast splitting
3200 int fast_limit
= g_conf
->mds_bal_split_size
* g_conf
->mds_bal_fragment_fast_factor
;
3202 // Fast path: the sum of accounted size and null dentries does not
3203 // exceed threshold: we definitely are not over it.
3204 if (get_frag_size() + get_num_head_null() <= fast_limit
) {
3208 // Fast path: the accounted size of the frag exceeds threshold: we
3209 // definitely are over it
3210 if (get_frag_size() > fast_limit
) {
3214 int64_t effective_size
= 0;
3216 for (const auto &p
: items
) {
3217 const CDentry
*dn
= p
.second
;
3218 if (!dn
->get_projected_linkage()->is_null()) {
3223 return effective_size
> fast_limit
;