1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <string_view>
18 #include "include/types.h"
30 #include "LogSegment.h"
31 #include "MDBalancer.h"
32 #include "SnapClient.h"
34 #include "common/bloom_filter.hpp"
35 #include "common/likely.h"
36 #include "include/Context.h"
37 #include "common/Clock.h"
39 #include "osdc/Objecter.h"
41 #include "common/config.h"
42 #include "include/ceph_assert.h"
43 #include "include/compat.h"
45 #define dout_context g_ceph_context
46 #define dout_subsys ceph_subsys_mds
48 #define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
52 int CDir::num_frozen_trees
= 0;
53 int CDir::num_freezing_trees
= 0;
55 CDir::fnode_const_ptr
CDir::empty_fnode
= CDir::allocate_fnode();
57 class CDirContext
: public MDSContext
61 MDSRank
* get_mds() override
{return dir
->mdcache
->mds
;}
64 explicit CDirContext(CDir
*d
) : dir(d
) {
65 ceph_assert(dir
!= NULL
);
70 class CDirIOContext
: public MDSIOContextBase
74 MDSRank
* get_mds() override
{return dir
->mdcache
->mds
;}
77 explicit CDirIOContext(CDir
*d
) : dir(d
) {
78 ceph_assert(dir
!= NULL
);
84 //int cdir_pins[CDIR_NUM_PINS] = { 0,0,0,0,0,0,0,0,0,0,0,0,0,0 };
87 ostream
& operator<<(ostream
& out
, const CDir
& dir
)
89 out
<< "[dir " << dir
.dirfrag() << " " << dir
.get_path() << "/"
90 << " [" << dir
.first
<< ",head]";
93 if (dir
.is_replicated())
94 out
<< dir
.get_replicas();
96 if (dir
.is_projected())
97 out
<< " pv=" << dir
.get_projected_version();
98 out
<< " v=" << dir
.get_version();
99 out
<< " cv=" << dir
.get_committing_version();
100 out
<< "/" << dir
.get_committed_version();
102 mds_authority_t a
= dir
.authority();
103 out
<< " rep@" << a
.first
;
104 if (a
.second
!= CDIR_AUTH_UNKNOWN
)
105 out
<< "," << a
.second
;
106 out
<< "." << dir
.get_replica_nonce();
109 if (dir
.is_rep()) out
<< " REP";
111 if (dir
.get_dir_auth() != CDIR_AUTH_DEFAULT
) {
112 if (dir
.get_dir_auth().second
== CDIR_AUTH_UNKNOWN
)
113 out
<< " dir_auth=" << dir
.get_dir_auth().first
;
115 out
<< " dir_auth=" << dir
.get_dir_auth();
118 if (dir
.get_auth_pins() || dir
.get_dir_auth_pins()) {
119 out
<< " ap=" << dir
.get_auth_pins()
120 << "+" << dir
.get_dir_auth_pins();
121 #ifdef MDS_AUTHPIN_SET
122 dir
.print_authpin_set(out
);
126 out
<< " state=" << dir
.get_state();
127 if (dir
.state_test(CDir::STATE_COMPLETE
)) out
<< "|complete";
128 if (dir
.state_test(CDir::STATE_FREEZINGTREE
)) out
<< "|freezingtree";
129 if (dir
.state_test(CDir::STATE_FROZENTREE
)) out
<< "|frozentree";
130 if (dir
.state_test(CDir::STATE_AUXSUBTREE
)) out
<< "|auxsubtree";
131 if (dir
.state_test(CDir::STATE_FROZENDIR
)) out
<< "|frozendir";
132 if (dir
.state_test(CDir::STATE_FREEZINGDIR
)) out
<< "|freezingdir";
133 if (dir
.state_test(CDir::STATE_EXPORTBOUND
)) out
<< "|exportbound";
134 if (dir
.state_test(CDir::STATE_IMPORTBOUND
)) out
<< "|importbound";
135 if (dir
.state_test(CDir::STATE_BADFRAG
)) out
<< "|badfrag";
136 if (dir
.state_test(CDir::STATE_FRAGMENTING
)) out
<< "|fragmenting";
137 if (dir
.state_test(CDir::STATE_CREATING
)) out
<< "|creating";
138 if (dir
.state_test(CDir::STATE_COMMITTING
)) out
<< "|committing";
139 if (dir
.state_test(CDir::STATE_FETCHING
)) out
<< "|fetching";
140 if (dir
.state_test(CDir::STATE_EXPORTING
)) out
<< "|exporting";
141 if (dir
.state_test(CDir::STATE_IMPORTING
)) out
<< "|importing";
142 if (dir
.state_test(CDir::STATE_STICKY
)) out
<< "|sticky";
143 if (dir
.state_test(CDir::STATE_DNPINNEDFRAG
)) out
<< "|dnpinnedfrag";
144 if (dir
.state_test(CDir::STATE_ASSIMRSTAT
)) out
<< "|assimrstat";
147 out
<< " " << dir
.get_fnode()->fragstat
;
148 if (!(dir
.get_fnode()->fragstat
== dir
.get_fnode()->accounted_fragstat
))
149 out
<< "/" << dir
.get_fnode()->accounted_fragstat
;
150 if (g_conf()->mds_debug_scatterstat
&& dir
.is_projected()) {
151 const auto& pf
= dir
.get_projected_fnode();
152 out
<< "->" << pf
->fragstat
;
153 if (!(pf
->fragstat
== pf
->accounted_fragstat
))
154 out
<< "/" << pf
->accounted_fragstat
;
158 out
<< " " << dir
.get_fnode()->rstat
;
159 if (!(dir
.get_fnode()->rstat
== dir
.get_fnode()->accounted_rstat
))
160 out
<< "/" << dir
.get_fnode()->accounted_rstat
;
161 if (g_conf()->mds_debug_scatterstat
&& dir
.is_projected()) {
162 const auto& pf
= dir
.get_projected_fnode();
163 out
<< "->" << pf
->rstat
;
164 if (!(pf
->rstat
== pf
->accounted_rstat
))
165 out
<< "/" << pf
->accounted_rstat
;
168 out
<< " hs=" << dir
.get_num_head_items() << "+" << dir
.get_num_head_null();
169 out
<< ",ss=" << dir
.get_num_snap_items() << "+" << dir
.get_num_snap_null();
170 if (dir
.get_num_dirty())
171 out
<< " dirty=" << dir
.get_num_dirty();
173 if (dir
.get_num_ref()) {
175 dir
.print_pin_set(out
);
183 void CDir::print(ostream
& out
)
191 ostream
& CDir::print_db_line_prefix(ostream
& out
)
193 return out
<< ceph_clock_now() << " mds." << mdcache
->mds
->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
198 // -------------------------------------------------------------------
201 CDir::CDir(CInode
*in
, frag_t fg
, MDCache
*mdc
, bool auth
) :
202 mdcache(mdc
), inode(in
), frag(fg
),
203 dirty_rstat_inodes(member_offset(CInode
, dirty_rstat_item
)),
204 dirty_dentries(member_offset(CDentry
, item_dir_dirty
)),
205 item_dirty(this), item_new(this),
206 lock_caches_with_auth_pins(member_offset(MDLockCache::DirItem
, item_dir
)),
207 freezing_inodes(member_offset(CInode
, item_freezing_inode
)),
209 pop_me(mdc
->decayrate
),
210 pop_nested(mdc
->decayrate
),
211 pop_auth_subtree(mdc
->decayrate
),
212 pop_auth_subtree_nested(mdc
->decayrate
),
213 pop_lru_subdirs(member_offset(CInode
, item_pop_lru
)),
214 dir_auth(CDIR_AUTH_DEFAULT
)
217 ceph_assert(in
->is_dir());
219 state_set(STATE_AUTH
);
223 * Check the recursive statistics on size for consistency.
224 * If mds_debug_scatterstat is enabled, assert for correctness,
225 * otherwise just print out the mismatch and continue.
227 bool CDir::check_rstats(bool scrub
)
229 if (!g_conf()->mds_debug_scatterstat
&& !scrub
)
232 dout(25) << "check_rstats on " << this << dendl
;
233 if (!is_complete() || !is_auth() || is_frozen()) {
234 dout(3) << "check_rstats " << (scrub
? "(scrub) " : "")
235 << "bailing out -- incomplete or non-auth or frozen dir on "
240 frag_info_t frag_info
;
241 nest_info_t nest_info
;
242 for (auto i
= items
.begin(); i
!= items
.end(); ++i
) {
243 if (i
->second
->last
!= CEPH_NOSNAP
)
245 CDentry::linkage_t
*dnl
= i
->second
->get_linkage();
246 if (dnl
->is_primary()) {
247 CInode
*in
= dnl
->get_inode();
248 nest_info
.add(in
->get_inode()->accounted_rstat
);
250 frag_info
.nsubdirs
++;
253 } else if (dnl
->is_remote())
259 if(!frag_info
.same_sums(fnode
->fragstat
)) {
260 dout(1) << "mismatch between head items and fnode.fragstat! printing dentries" << dendl
;
261 dout(1) << "get_num_head_items() = " << get_num_head_items()
262 << "; fnode.fragstat.nfiles=" << fnode
->fragstat
.nfiles
263 << " fnode.fragstat.nsubdirs=" << fnode
->fragstat
.nsubdirs
<< dendl
;
266 dout(20) << "get_num_head_items() = " << get_num_head_items()
267 << "; fnode.fragstat.nfiles=" << fnode
->fragstat
.nfiles
268 << " fnode.fragstat.nsubdirs=" << fnode
->fragstat
.nsubdirs
<< dendl
;
272 if (!nest_info
.same_sums(fnode
->rstat
)) {
273 dout(1) << "mismatch between child accounted_rstats and my rstats!" << dendl
;
274 dout(1) << "total of child dentries: " << nest_info
<< dendl
;
275 dout(1) << "my rstats: " << fnode
->rstat
<< dendl
;
278 dout(20) << "total of child dentries: " << nest_info
<< dendl
;
279 dout(20) << "my rstats: " << fnode
->rstat
<< dendl
;
284 for (auto i
= items
.begin(); i
!= items
.end(); ++i
) {
285 CDentry
*dn
= i
->second
;
286 if (dn
->get_linkage()->is_primary()) {
287 CInode
*in
= dn
->get_linkage()->inode
;
288 dout(1) << *dn
<< " rstat " << in
->get_inode()->accounted_rstat
<< dendl
;
290 dout(1) << *dn
<< dendl
;
294 ceph_assert(frag_info
.nfiles
== fnode
->fragstat
.nfiles
);
295 ceph_assert(frag_info
.nsubdirs
== fnode
->fragstat
.nsubdirs
);
296 ceph_assert(nest_info
.rbytes
== fnode
->rstat
.rbytes
);
297 ceph_assert(nest_info
.rfiles
== fnode
->rstat
.rfiles
);
298 ceph_assert(nest_info
.rsubdirs
== fnode
->rstat
.rsubdirs
);
301 dout(10) << "check_rstats complete on " << this << dendl
;
305 void CDir::adjust_num_inodes_with_caps(int d
)
307 // FIXME: smarter way to decide if adding 'this' to open file table
308 if (num_inodes_with_caps
== 0 && d
> 0)
309 mdcache
->open_file_table
.add_dirfrag(this);
310 else if (num_inodes_with_caps
> 0 && num_inodes_with_caps
== -d
)
311 mdcache
->open_file_table
.remove_dirfrag(this);
313 num_inodes_with_caps
+= d
;
314 ceph_assert(num_inodes_with_caps
>= 0);
317 CDentry
*CDir::lookup(std::string_view name
, snapid_t snap
)
319 dout(20) << "lookup (" << name
<< ", '" << snap
<< "')" << dendl
;
320 auto iter
= items
.lower_bound(dentry_key_t(snap
, name
, inode
->hash_dentry_name(name
)));
321 if (iter
== items
.end())
323 if (iter
->second
->get_name() == name
&&
324 iter
->second
->first
<= snap
&&
325 iter
->second
->last
>= snap
) {
326 dout(20) << " hit -> " << iter
->first
<< dendl
;
329 dout(20) << " miss -> " << iter
->first
<< dendl
;
333 CDentry
*CDir::lookup_exact_snap(std::string_view name
, snapid_t last
) {
334 dout(20) << __func__
<< " (" << last
<< ", '" << name
<< "')" << dendl
;
335 auto p
= items
.find(dentry_key_t(last
, name
, inode
->hash_dentry_name(name
)));
336 if (p
== items
.end())
341 void CDir::adjust_dentry_lru(CDentry
*dn
)
344 if (dn
->get_linkage()->is_primary()) {
345 bottom_lru
= !is_auth() && inode
->is_stray();
346 } else if (dn
->get_linkage()->is_remote()) {
349 bottom_lru
= !is_auth();
352 if (!dn
->state_test(CDentry::STATE_BOTTOMLRU
)) {
353 mdcache
->lru
.lru_remove(dn
);
354 mdcache
->bottom_lru
.lru_insert_mid(dn
);
355 dn
->state_set(CDentry::STATE_BOTTOMLRU
);
358 if (dn
->state_test(CDentry::STATE_BOTTOMLRU
)) {
359 mdcache
->bottom_lru
.lru_remove(dn
);
360 mdcache
->lru
.lru_insert_mid(dn
);
361 dn
->state_clear(CDentry::STATE_BOTTOMLRU
);
370 CDentry
* CDir::add_null_dentry(std::string_view dname
,
371 snapid_t first
, snapid_t last
)
374 ceph_assert(lookup_exact_snap(dname
, last
) == 0);
377 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), "", first
, last
);
379 dn
->version
= get_projected_version();
380 dn
->check_corruption(true);
382 dn
->state_set(CDentry::STATE_AUTH
);
383 mdcache
->lru
.lru_insert_mid(dn
);
385 mdcache
->bottom_lru
.lru_insert_mid(dn
);
386 dn
->state_set(CDentry::STATE_BOTTOMLRU
);
390 ceph_assert(items
.count(dn
->key()) == 0);
391 //assert(null_items.count(dn->get_name()) == 0);
393 items
[dn
->key()] = dn
;
394 if (last
== CEPH_NOSNAP
)
399 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
400 dn
->get(CDentry::PIN_FRAGMENTING
);
401 dn
->state_set(CDentry::STATE_FRAGMENTING
);
404 dout(12) << __func__
<< " " << *dn
<< dendl
;
407 if (get_num_any() == 1)
410 ceph_assert(get_num_any() == items
.size());
415 CDentry
* CDir::add_primary_dentry(std::string_view dname
, CInode
*in
,
416 mempool::mds_co::string alternate_name
,
417 snapid_t first
, snapid_t last
)
420 ceph_assert(lookup_exact_snap(dname
, last
) == 0);
423 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), std::move(alternate_name
), first
, last
);
425 dn
->version
= get_projected_version();
426 dn
->check_corruption(true);
428 dn
->state_set(CDentry::STATE_AUTH
);
429 if (is_auth() || !inode
->is_stray()) {
430 mdcache
->lru
.lru_insert_mid(dn
);
432 mdcache
->bottom_lru
.lru_insert_mid(dn
);
433 dn
->state_set(CDentry::STATE_BOTTOMLRU
);
437 ceph_assert(items
.count(dn
->key()) == 0);
438 //assert(null_items.count(dn->get_name()) == 0);
440 items
[dn
->key()] = dn
;
442 dn
->get_linkage()->inode
= in
;
444 link_inode_work(dn
, in
);
446 if (dn
->last
== CEPH_NOSNAP
)
451 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
452 dn
->get(CDentry::PIN_FRAGMENTING
);
453 dn
->state_set(CDentry::STATE_FRAGMENTING
);
456 dout(12) << __func__
<< " " << *dn
<< dendl
;
459 if (get_num_any() == 1)
461 ceph_assert(get_num_any() == items
.size());
465 CDentry
* CDir::add_remote_dentry(std::string_view dname
, inodeno_t ino
, unsigned char d_type
,
466 mempool::mds_co::string alternate_name
,
467 snapid_t first
, snapid_t last
)
470 ceph_assert(lookup_exact_snap(dname
, last
) == 0);
473 CDentry
* dn
= new CDentry(dname
, inode
->hash_dentry_name(dname
), std::move(alternate_name
), ino
, d_type
, first
, last
);
475 dn
->version
= get_projected_version();
476 dn
->check_corruption(true);
478 dn
->state_set(CDentry::STATE_AUTH
);
479 mdcache
->lru
.lru_insert_mid(dn
);
482 ceph_assert(items
.count(dn
->key()) == 0);
483 //assert(null_items.count(dn->get_name()) == 0);
485 items
[dn
->key()] = dn
;
486 if (last
== CEPH_NOSNAP
)
491 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
492 dn
->get(CDentry::PIN_FRAGMENTING
);
493 dn
->state_set(CDentry::STATE_FRAGMENTING
);
496 dout(12) << __func__
<< " " << *dn
<< dendl
;
499 if (get_num_any() == 1)
502 ceph_assert(get_num_any() == items
.size());
508 void CDir::remove_dentry(CDentry
*dn
)
510 dout(12) << __func__
<< " " << *dn
<< dendl
;
512 // there should be no client leases at this point!
513 ceph_assert(dn
->client_lease_map
.empty());
515 if (state_test(CDir::STATE_DNPINNEDFRAG
)) {
516 dn
->put(CDentry::PIN_FRAGMENTING
);
517 dn
->state_clear(CDentry::STATE_FRAGMENTING
);
520 if (dn
->get_linkage()->is_null()) {
521 if (dn
->last
== CEPH_NOSNAP
)
526 if (dn
->last
== CEPH_NOSNAP
)
532 if (!dn
->get_linkage()->is_null())
533 // detach inode and dentry
534 unlink_inode_work(dn
);
537 ceph_assert(items
.count(dn
->key()) == 1);
538 items
.erase(dn
->key());
544 if (dn
->state_test(CDentry::STATE_BOTTOMLRU
))
545 mdcache
->bottom_lru
.lru_remove(dn
);
547 mdcache
->lru
.lru_remove(dn
);
551 if (get_num_any() == 0)
553 ceph_assert(get_num_any() == items
.size());
556 void CDir::link_remote_inode(CDentry
*dn
, CInode
*in
)
558 link_remote_inode(dn
, in
->ino(), IFTODT(in
->get_projected_inode()->mode
));
561 void CDir::link_remote_inode(CDentry
*dn
, inodeno_t ino
, unsigned char d_type
)
563 dout(12) << __func__
<< " " << *dn
<< " remote " << ino
<< dendl
;
564 ceph_assert(dn
->get_linkage()->is_null());
566 dn
->get_linkage()->set_remote(ino
, d_type
);
568 if (dn
->state_test(CDentry::STATE_BOTTOMLRU
)) {
569 mdcache
->bottom_lru
.lru_remove(dn
);
570 mdcache
->lru
.lru_insert_mid(dn
);
571 dn
->state_clear(CDentry::STATE_BOTTOMLRU
);
574 if (dn
->last
== CEPH_NOSNAP
) {
581 ceph_assert(get_num_any() == items
.size());
584 void CDir::link_primary_inode(CDentry
*dn
, CInode
*in
)
586 dout(12) << __func__
<< " " << *dn
<< " " << *in
<< dendl
;
587 ceph_assert(dn
->get_linkage()->is_null());
589 dn
->get_linkage()->inode
= in
;
591 link_inode_work(dn
, in
);
593 if (dn
->state_test(CDentry::STATE_BOTTOMLRU
) &&
594 (is_auth() || !inode
->is_stray())) {
595 mdcache
->bottom_lru
.lru_remove(dn
);
596 mdcache
->lru
.lru_insert_mid(dn
);
597 dn
->state_clear(CDentry::STATE_BOTTOMLRU
);
600 if (dn
->last
== CEPH_NOSNAP
) {
608 ceph_assert(get_num_any() == items
.size());
611 void CDir::link_inode_work( CDentry
*dn
, CInode
*in
)
613 ceph_assert(dn
->get_linkage()->get_inode() == in
);
614 in
->set_primary_parent(dn
);
617 //in->inode.version = dn->get_version();
620 if (in
->get_num_ref())
621 dn
->get(CDentry::PIN_INODEPIN
);
623 if (in
->state_test(CInode::STATE_TRACKEDBYOFT
))
624 mdcache
->open_file_table
.notify_link(in
);
625 if (in
->is_any_caps())
626 adjust_num_inodes_with_caps(1);
628 // adjust auth pin count
630 dn
->adjust_nested_auth_pins(in
->auth_pins
, NULL
);
632 if (in
->is_freezing_inode())
633 freezing_inodes
.push_back(&in
->item_freezing_inode
);
634 else if (in
->is_frozen_inode() || in
->is_frozen_auth_pin())
637 // verify open snaprealm parent
639 in
->snaprealm
->adjust_parent();
640 else if (in
->is_any_caps())
641 in
->move_to_realm(inode
->find_snaprealm());
644 void CDir::unlink_inode(CDentry
*dn
, bool adjust_lru
)
646 if (dn
->get_linkage()->is_primary()) {
647 dout(12) << __func__
<< " " << *dn
<< " " << *dn
->get_linkage()->get_inode() << dendl
;
649 dout(12) << __func__
<< " " << *dn
<< dendl
;
652 unlink_inode_work(dn
);
654 if (adjust_lru
&& !is_auth() &&
655 !dn
->state_test(CDentry::STATE_BOTTOMLRU
)) {
656 mdcache
->lru
.lru_remove(dn
);
657 mdcache
->bottom_lru
.lru_insert_mid(dn
);
658 dn
->state_set(CDentry::STATE_BOTTOMLRU
);
661 if (dn
->last
== CEPH_NOSNAP
) {
668 ceph_assert(get_num_any() == items
.size());
671 void CDir::try_remove_unlinked_dn(CDentry
*dn
)
673 ceph_assert(dn
->dir
== this);
674 ceph_assert(dn
->get_linkage()->is_null());
676 // no pins (besides dirty)?
677 if (dn
->get_num_ref() != dn
->is_dirty())
682 dout(10) << __func__
<< " " << *dn
<< " in " << *this << dendl
;
687 // NOTE: we may not have any more dirty dentries, but the fnode
688 // still changed, so the directory must remain dirty.
693 void CDir::unlink_inode_work(CDentry
*dn
)
695 CInode
*in
= dn
->get_linkage()->get_inode();
697 if (dn
->get_linkage()->is_remote()) {
700 dn
->unlink_remote(dn
->get_linkage());
702 dn
->get_linkage()->set_remote(0, 0);
703 } else if (dn
->get_linkage()->is_primary()) {
706 if (in
->get_num_ref())
707 dn
->put(CDentry::PIN_INODEPIN
);
709 if (in
->state_test(CInode::STATE_TRACKEDBYOFT
))
710 mdcache
->open_file_table
.notify_unlink(in
);
711 if (in
->is_any_caps())
712 adjust_num_inodes_with_caps(-1);
714 // unlink auth_pin count
716 dn
->adjust_nested_auth_pins(-in
->auth_pins
, nullptr);
718 if (in
->is_freezing_inode())
719 in
->item_freezing_inode
.remove_myself();
720 else if (in
->is_frozen_inode() || in
->is_frozen_auth_pin())
724 in
->remove_primary_parent(dn
);
726 in
->item_pop_lru
.remove_myself();
727 dn
->get_linkage()->inode
= 0;
729 ceph_assert(!dn
->get_linkage()->is_null());
733 void CDir::add_to_bloom(CDentry
*dn
)
735 ceph_assert(dn
->last
== CEPH_NOSNAP
);
737 /* not create bloom filter for incomplete dir that was added by log replay */
741 /* don't maintain bloom filters in standby replay (saves cycles, and also
742 * avoids need to implement clearing it in EExport for #16924) */
743 if (mdcache
->mds
->is_standby_replay()) {
747 unsigned size
= get_num_head_items() + get_num_snap_items();
748 if (size
< 100) size
= 100;
749 bloom
.reset(new bloom_filter(size
, 1.0 / size
, 0));
751 /* This size and false positive probability is completely random.*/
752 bloom
->insert(dn
->get_name().data(), dn
->get_name().size());
755 bool CDir::is_in_bloom(std::string_view name
)
759 return bloom
->contains(name
.data(), name
.size());
762 void CDir::remove_null_dentries() {
763 dout(12) << __func__
<< " " << *this << dendl
;
765 auto p
= items
.begin();
766 while (p
!= items
.end()) {
767 CDentry
*dn
= p
->second
;
769 if (dn
->get_linkage()->is_null() && !dn
->is_projected())
773 ceph_assert(num_snap_null
== 0);
774 ceph_assert(num_head_null
== 0);
775 ceph_assert(get_num_any() == items
.size());
778 /** remove dirty null dentries for deleted directory. the dirfrag will be
779 * deleted soon, so it's safe to not commit dirty dentries.
781 * This is called when a directory is being deleted, a prerequisite
782 * of which is that its children have been unlinked: we expect to only see
783 * null, unprojected dentries here.
785 void CDir::try_remove_dentries_for_stray()
787 dout(10) << __func__
<< dendl
;
788 ceph_assert(get_parent_dir()->inode
->is_stray());
790 // clear dirty only when the directory was not snapshotted
791 bool clear_dirty
= !inode
->snaprealm
;
793 auto p
= items
.begin();
794 while (p
!= items
.end()) {
795 CDentry
*dn
= p
->second
;
797 if (dn
->last
== CEPH_NOSNAP
) {
798 ceph_assert(!dn
->is_projected());
799 ceph_assert(dn
->get_linkage()->is_null());
800 if (clear_dirty
&& dn
->is_dirty())
802 // It's OK to remove lease prematurely because we will never link
803 // the dentry to inode again.
804 if (dn
->is_any_leases())
805 dn
->remove_client_leases(mdcache
->mds
->locker
);
806 if (dn
->get_num_ref() == 0)
809 ceph_assert(!dn
->is_projected());
810 CDentry::linkage_t
*dnl
= dn
->get_linkage();
812 if (dnl
->is_primary()) {
813 in
= dnl
->get_inode();
814 if (clear_dirty
&& in
->is_dirty())
817 if (clear_dirty
&& dn
->is_dirty())
819 if (dn
->get_num_ref() == 0) {
822 mdcache
->remove_inode(in
);
827 if (clear_dirty
&& is_dirty())
831 bool CDir::try_trim_snap_dentry(CDentry
*dn
, const set
<snapid_t
>& snaps
)
833 if (dn
->last
== CEPH_NOSNAP
) {
836 set
<snapid_t
>::const_iterator p
= snaps
.lower_bound(dn
->first
);
837 CDentry::linkage_t
*dnl
= dn
->get_linkage();
839 if (dnl
->is_primary())
840 in
= dnl
->get_inode();
841 if ((p
== snaps
.end() || *p
> dn
->last
) &&
842 (dn
->get_num_ref() == dn
->is_dirty()) &&
843 (!in
|| in
->get_num_ref() == in
->is_dirty())) {
844 dout(10) << " purging snapped " << *dn
<< dendl
;
845 if (in
&& in
->is_dirty())
849 dout(10) << " purging snapped " << *in
<< dendl
;
850 mdcache
->remove_inode(in
);
859 * steal_dentry -- semi-violently move a dentry from one CDir to another
860 * (*) violently, in that nitems, most pins, etc. are not correctly maintained
861 * on the old CDir corpse; must call finish_old_fragment() when finished.
863 void CDir::steal_dentry(CDentry
*dn
)
865 dout(15) << __func__
<< " " << *dn
<< dendl
;
867 items
[dn
->key()] = dn
;
869 dn
->dir
->items
.erase(dn
->key());
870 if (dn
->dir
->items
.empty())
871 dn
->dir
->put(PIN_CHILD
);
873 if (get_num_any() == 0)
875 if (dn
->get_linkage()->is_null()) {
876 if (dn
->last
== CEPH_NOSNAP
)
880 } else if (dn
->last
== CEPH_NOSNAP
) {
883 auto _fnode
= _get_fnode();
885 if (dn
->get_linkage()->is_primary()) {
886 CInode
*in
= dn
->get_linkage()->get_inode();
887 const auto& pi
= in
->get_projected_inode();
889 _fnode
->fragstat
.nsubdirs
++;
890 if (in
->item_pop_lru
.is_on_list())
891 pop_lru_subdirs
.push_back(&in
->item_pop_lru
);
893 _fnode
->fragstat
.nfiles
++;
895 _fnode
->rstat
.rbytes
+= pi
->accounted_rstat
.rbytes
;
896 _fnode
->rstat
.rfiles
+= pi
->accounted_rstat
.rfiles
;
897 _fnode
->rstat
.rsubdirs
+= pi
->accounted_rstat
.rsubdirs
;
898 _fnode
->rstat
.rsnaps
+= pi
->accounted_rstat
.rsnaps
;
899 if (pi
->accounted_rstat
.rctime
> fnode
->rstat
.rctime
)
900 _fnode
->rstat
.rctime
= pi
->accounted_rstat
.rctime
;
902 if (in
->is_any_caps())
903 adjust_num_inodes_with_caps(1);
905 // move dirty inode rstat to new dirfrag
906 if (in
->is_dirty_rstat())
907 dirty_rstat_inodes
.push_back(&in
->dirty_rstat_item
);
908 } else if (dn
->get_linkage()->is_remote()) {
909 if (dn
->get_linkage()->get_remote_d_type() == DT_DIR
)
910 _fnode
->fragstat
.nsubdirs
++;
912 _fnode
->fragstat
.nfiles
++;
916 if (dn
->get_linkage()->is_primary()) {
917 CInode
*in
= dn
->get_linkage()->get_inode();
918 if (in
->is_dirty_rstat())
919 dirty_rstat_inodes
.push_back(&in
->dirty_rstat_item
);
924 int dap
= dn
->get_num_dir_auth_pins();
926 adjust_nested_auth_pins(dap
, NULL
);
927 dn
->dir
->adjust_nested_auth_pins(-dap
, NULL
);
931 if (dn
->is_dirty()) {
932 dirty_dentries
.push_back(&dn
->item_dir_dirty
);
939 void CDir::prepare_old_fragment(map
<string_snap_t
, MDSContext::vec
>& dentry_waiters
, bool replay
)
941 // auth_pin old fragment for duration so that any auth_pinning
942 // during the dentry migration doesn't trigger side effects
943 if (!replay
&& is_auth())
946 if (!waiting_on_dentry
.empty()) {
947 for (const auto &p
: waiting_on_dentry
) {
948 std::copy(p
.second
.begin(), p
.second
.end(),
949 std::back_inserter(dentry_waiters
[p
.first
]));
951 waiting_on_dentry
.clear();
956 void CDir::prepare_new_fragment(bool replay
)
958 if (!replay
&& is_auth()) {
962 inode
->add_dirfrag(this);
965 void CDir::finish_old_fragment(MDSContext::vec
& waiters
, bool replay
)
967 // take waiters _before_ unfreeze...
969 take_waiting(WAIT_ANY_MASK
, waiters
);
971 auth_unpin(this); // pinned in prepare_old_fragment
972 ceph_assert(is_frozen_dir());
977 ceph_assert(dir_auth_pins
== 0);
978 ceph_assert(auth_pins
== 0);
980 num_head_items
= num_head_null
= 0;
981 num_snap_items
= num_snap_null
= 0;
982 adjust_num_inodes_with_caps(-num_inodes_with_caps
);
984 // this mirrors init_fragment_pins()
989 if (state_test(STATE_IMPORTBOUND
))
990 put(PIN_IMPORTBOUND
);
991 if (state_test(STATE_EXPORTBOUND
))
992 put(PIN_EXPORTBOUND
);
993 if (is_subtree_root())
999 ceph_assert(get_num_ref() == (state_test(STATE_STICKY
) ? 1:0));
1002 void CDir::init_fragment_pins()
1004 if (is_replicated())
1005 get(PIN_REPLICATED
);
1006 if (state_test(STATE_DIRTY
))
1008 if (state_test(STATE_EXPORTBOUND
))
1009 get(PIN_EXPORTBOUND
);
1010 if (state_test(STATE_IMPORTBOUND
))
1011 get(PIN_IMPORTBOUND
);
1012 if (is_subtree_root())
1016 void CDir::split(int bits
, std::vector
<CDir
*>* subs
, MDSContext::vec
& waiters
, bool replay
)
1018 dout(10) << "split by " << bits
<< " bits on " << *this << dendl
;
1020 ceph_assert(replay
|| is_complete() || !is_auth());
1023 frag
.split(bits
, frags
);
1025 vector
<CDir
*> subfrags(1 << bits
);
1027 double fac
= 1.0 / (double)(1 << bits
); // for scaling load vecs
1029 version_t rstat_version
= inode
->get_projected_inode()->rstat
.version
;
1030 version_t dirstat_version
= inode
->get_projected_inode()->dirstat
.version
;
1032 nest_info_t rstatdiff
;
1033 frag_info_t fragstatdiff
;
1034 if (fnode
->accounted_rstat
.version
== rstat_version
)
1035 rstatdiff
.add_delta(fnode
->accounted_rstat
, fnode
->rstat
);
1036 if (fnode
->accounted_fragstat
.version
== dirstat_version
)
1037 fragstatdiff
.add_delta(fnode
->accounted_fragstat
, fnode
->fragstat
);
1038 dout(10) << " rstatdiff " << rstatdiff
<< " fragstatdiff " << fragstatdiff
<< dendl
;
1040 map
<string_snap_t
, MDSContext::vec
> dentry_waiters
;
1041 prepare_old_fragment(dentry_waiters
, replay
);
1043 // create subfrag dirs
1045 for (const auto& fg
: frags
) {
1046 CDir
*f
= new CDir(inode
, fg
, mdcache
, is_auth());
1047 f
->state_set(state
& (MASK_STATE_FRAGMENT_KEPT
| STATE_COMPLETE
));
1048 f
->get_replicas() = get_replicas();
1050 f
->pop_me
.scale(fac
);
1052 // FIXME; this is an approximation
1053 f
->pop_nested
= pop_nested
;
1054 f
->pop_nested
.scale(fac
);
1055 f
->pop_auth_subtree
= pop_auth_subtree
;
1056 f
->pop_auth_subtree
.scale(fac
);
1057 f
->pop_auth_subtree_nested
= pop_auth_subtree_nested
;
1058 f
->pop_auth_subtree_nested
.scale(fac
);
1060 dout(10) << " subfrag " << fg
<< " " << *f
<< dendl
;
1064 f
->set_dir_auth(get_dir_auth());
1065 f
->freeze_tree_state
= freeze_tree_state
;
1066 f
->prepare_new_fragment(replay
);
1067 f
->init_fragment_pins();
1070 // repartition dentries
1071 while (!items
.empty()) {
1072 auto p
= items
.begin();
1074 CDentry
*dn
= p
->second
;
1075 frag_t subfrag
= inode
->pick_dirfrag(dn
->get_name());
1076 int n
= (subfrag
.value() & (subfrag
.mask() ^ frag
.mask())) >> subfrag
.mask_shift();
1077 dout(15) << " subfrag " << subfrag
<< " n=" << n
<< " for " << p
->first
<< dendl
;
1078 CDir
*f
= subfrags
[n
];
1079 f
->steal_dentry(dn
);
1082 for (const auto &p
: dentry_waiters
) {
1083 frag_t subfrag
= inode
->pick_dirfrag(p
.first
.name
);
1084 int n
= (subfrag
.value() & (subfrag
.mask() ^ frag
.mask())) >> subfrag
.mask_shift();
1085 CDir
*f
= subfrags
[n
];
1087 if (f
->waiting_on_dentry
.empty())
1088 f
->get(PIN_DNWAITER
);
1089 std::copy(p
.second
.begin(), p
.second
.end(),
1090 std::back_inserter(f
->waiting_on_dentry
[p
.first
]));
1093 // FIXME: handle dirty old rstat
1095 // fix up new frag fragstats
1096 for (int i
= 0; i
< n
; i
++) {
1097 CDir
*f
= subfrags
[i
];
1098 auto _fnode
= f
->_get_fnode();
1099 _fnode
->version
= f
->projected_version
= get_version();
1100 _fnode
->rstat
.version
= rstat_version
;
1101 _fnode
->accounted_rstat
= _fnode
->rstat
;
1102 _fnode
->fragstat
.version
= dirstat_version
;
1103 _fnode
->accounted_fragstat
= _fnode
->fragstat
;
1104 dout(10) << " rstat " << _fnode
->rstat
<< " fragstat " << _fnode
->fragstat
1105 << " on " << *f
<< dendl
;
1108 // give any outstanding frag stat differential to first frag
1109 dout(10) << " giving rstatdiff " << rstatdiff
<< " fragstatdiff" << fragstatdiff
1110 << " to " << *subfrags
[0] << dendl
;
1111 _fnode
->accounted_rstat
.add(rstatdiff
);
1112 _fnode
->accounted_fragstat
.add(fragstatdiff
);
1116 finish_old_fragment(waiters
, replay
);
1119 void CDir::merge(const std::vector
<CDir
*>& subs
, MDSContext::vec
& waiters
, bool replay
)
1121 dout(10) << "merge " << subs
<< dendl
;
1123 ceph_assert(subs
.size() > 0);
1125 set_dir_auth(subs
.front()->get_dir_auth());
1126 freeze_tree_state
= subs
.front()->freeze_tree_state
;
1128 for (const auto& dir
: subs
) {
1129 ceph_assert(get_dir_auth() == dir
->get_dir_auth());
1130 ceph_assert(freeze_tree_state
== dir
->freeze_tree_state
);
1133 prepare_new_fragment(replay
);
1135 auto _fnode
= _get_fnode();
1137 nest_info_t rstatdiff
;
1138 frag_info_t fragstatdiff
;
1139 bool touched_mtime
, touched_chattr
;
1140 version_t rstat_version
= inode
->get_projected_inode()->rstat
.version
;
1141 version_t dirstat_version
= inode
->get_projected_inode()->dirstat
.version
;
1143 map
<string_snap_t
, MDSContext::vec
> dentry_waiters
;
1145 for (const auto& dir
: subs
) {
1146 dout(10) << " subfrag " << dir
->get_frag() << " " << *dir
<< dendl
;
1147 ceph_assert(!dir
->is_auth() || dir
->is_complete() || replay
);
1149 if (dir
->get_fnode()->accounted_rstat
.version
== rstat_version
)
1150 rstatdiff
.add_delta(dir
->get_fnode()->accounted_rstat
, dir
->get_fnode()->rstat
);
1151 if (dir
->get_fnode()->accounted_fragstat
.version
== dirstat_version
)
1152 fragstatdiff
.add_delta(dir
->get_fnode()->accounted_fragstat
, dir
->get_fnode()->fragstat
,
1153 &touched_mtime
, &touched_chattr
);
1155 dir
->prepare_old_fragment(dentry_waiters
, replay
);
1158 while (!dir
->items
.empty())
1159 steal_dentry(dir
->items
.begin()->second
);
1161 // merge replica map
1162 for (const auto &p
: dir
->get_replicas()) {
1163 unsigned cur
= get_replicas()[p
.first
];
1165 get_replicas()[p
.first
] = p
.second
;
1169 if (dir
->get_version() > _fnode
->version
)
1170 _fnode
->version
= projected_version
= dir
->get_version();
1173 state_set(dir
->get_state() & MASK_STATE_FRAGMENT_KEPT
);
1175 dir
->finish_old_fragment(waiters
, replay
);
1176 inode
->close_dirfrag(dir
->get_frag());
1179 if (!dentry_waiters
.empty()) {
1181 for (const auto &p
: dentry_waiters
) {
1182 std::copy(p
.second
.begin(), p
.second
.end(),
1183 std::back_inserter(waiting_on_dentry
[p
.first
]));
1187 if (is_auth() && !replay
)
1190 // FIXME: merge dirty old rstat
1191 _fnode
->rstat
.version
= rstat_version
;
1192 _fnode
->accounted_rstat
= _fnode
->rstat
;
1193 _fnode
->accounted_rstat
.add(rstatdiff
);
1195 _fnode
->fragstat
.version
= dirstat_version
;
1196 _fnode
->accounted_fragstat
= _fnode
->fragstat
;
1197 _fnode
->accounted_fragstat
.add(fragstatdiff
);
1199 init_fragment_pins();
1205 void CDir::resync_accounted_fragstat()
1207 auto pf
= _get_projected_fnode();
1208 const auto& pi
= inode
->get_projected_inode();
1210 if (pf
->accounted_fragstat
.version
!= pi
->dirstat
.version
) {
1211 pf
->fragstat
.version
= pi
->dirstat
.version
;
1212 dout(10) << __func__
<< " " << pf
->accounted_fragstat
<< " -> " << pf
->fragstat
<< dendl
;
1213 pf
->accounted_fragstat
= pf
->fragstat
;
1218 * resync rstat and accounted_rstat with inode
1220 void CDir::resync_accounted_rstat()
1222 auto pf
= _get_projected_fnode();
1223 const auto& pi
= inode
->get_projected_inode();
1225 if (pf
->accounted_rstat
.version
!= pi
->rstat
.version
) {
1226 pf
->rstat
.version
= pi
->rstat
.version
;
1227 dout(10) << __func__
<< " " << pf
->accounted_rstat
<< " -> " << pf
->rstat
<< dendl
;
1228 pf
->accounted_rstat
= pf
->rstat
;
1229 dirty_old_rstat
.clear();
1233 void CDir::assimilate_dirty_rstat_inodes(MutationRef
& mut
)
1235 dout(10) << __func__
<< dendl
;
1236 for (elist
<CInode
*>::iterator p
= dirty_rstat_inodes
.begin_use_current();
1239 ceph_assert(in
->is_auth());
1240 if (in
->is_frozen())
1245 auto pi
= in
->project_inode(mut
);
1246 pi
.inode
->version
= in
->pre_dirty();
1248 mdcache
->project_rstat_inode_to_frag(mut
, in
, this, 0, 0, nullptr);
1250 state_set(STATE_ASSIMRSTAT
);
1251 dout(10) << __func__
<< " done" << dendl
;
1254 void CDir::assimilate_dirty_rstat_inodes_finish(EMetaBlob
*blob
)
1256 if (!state_test(STATE_ASSIMRSTAT
))
1258 state_clear(STATE_ASSIMRSTAT
);
1259 dout(10) << __func__
<< dendl
;
1260 elist
<CInode
*>::iterator p
= dirty_rstat_inodes
.begin_use_current();
1265 if (in
->is_frozen())
1268 CDentry
*dn
= in
->get_projected_parent_dn();
1270 in
->clear_dirty_rstat();
1271 blob
->add_primary_dentry(dn
, in
, true);
1274 if (!dirty_rstat_inodes
.empty())
1275 mdcache
->mds
->locker
->mark_updated_scatterlock(&inode
->nestlock
);
1281 /****************************************
1285 void CDir::add_dentry_waiter(std::string_view dname
, snapid_t snapid
, MDSContext
*c
)
1287 if (waiting_on_dentry
.empty())
1289 waiting_on_dentry
[string_snap_t(dname
, snapid
)].push_back(c
);
1290 dout(10) << __func__
<< " dentry " << dname
1291 << " snap " << snapid
1292 << " " << c
<< " on " << *this << dendl
;
1295 void CDir::take_dentry_waiting(std::string_view dname
, snapid_t first
, snapid_t last
,
1296 MDSContext::vec
& ls
)
1298 if (waiting_on_dentry
.empty())
1301 string_snap_t
lb(dname
, first
);
1302 string_snap_t
ub(dname
, last
);
1303 auto it
= waiting_on_dentry
.lower_bound(lb
);
1304 while (it
!= waiting_on_dentry
.end() &&
1305 !(ub
< it
->first
)) {
1306 dout(10) << __func__
<< " " << dname
1307 << " [" << first
<< "," << last
<< "] found waiter on snap "
1309 << " on " << *this << dendl
;
1310 std::copy(it
->second
.begin(), it
->second
.end(), std::back_inserter(ls
));
1311 waiting_on_dentry
.erase(it
++);
1314 if (waiting_on_dentry
.empty())
1318 void CDir::add_waiter(uint64_t tag
, MDSContext
*c
)
1323 if (tag
& WAIT_ATSUBTREEROOT
) {
1324 if (!is_subtree_root()) {
1326 dout(10) << "add_waiter " << std::hex
<< tag
<< std::dec
<< " " << c
<< " should be ATSUBTREEROOT, " << *this << " is not root, trying parent" << dendl
;
1327 inode
->parent
->dir
->add_waiter(tag
, c
);
1332 ceph_assert(!(tag
& WAIT_CREATED
) || state_test(STATE_CREATING
));
1334 MDSCacheObject::add_waiter(tag
, c
);
1339 /* NOTE: this checks dentry waiters too */
1340 void CDir::take_waiting(uint64_t mask
, MDSContext::vec
& ls
)
1342 if ((mask
& WAIT_DENTRY
) && !waiting_on_dentry
.empty()) {
1343 // take all dentry waiters
1344 for (const auto &p
: waiting_on_dentry
) {
1345 dout(10) << "take_waiting dentry " << p
.first
.name
1346 << " snap " << p
.first
.snapid
<< " on " << *this << dendl
;
1347 std::copy(p
.second
.begin(), p
.second
.end(), std::back_inserter(ls
));
1349 waiting_on_dentry
.clear();
1354 MDSCacheObject::take_waiting(mask
, ls
);
1358 void CDir::finish_waiting(uint64_t mask
, int result
)
1360 dout(11) << __func__
<< " mask " << hex
<< mask
<< dec
<< " result " << result
<< " on " << *this << dendl
;
1362 MDSContext::vec finished
;
1363 take_waiting(mask
, finished
);
1365 finish_contexts(g_ceph_context
, finished
, result
);
1367 mdcache
->mds
->queue_waiters(finished
);
1374 CDir::fnode_ptr
CDir::project_fnode(const MutationRef
& mut
)
1376 ceph_assert(get_version() != 0);
1378 if (mut
&& mut
->is_projected(this))
1379 return std::const_pointer_cast
<fnode_t
>(projected_fnode
.back());
1381 auto pf
= allocate_fnode(*get_projected_fnode());
1383 if (scrub_infop
&& scrub_infop
->last_scrub_dirty
) {
1384 pf
->localized_scrub_stamp
= scrub_infop
->last_local
.time
;
1385 pf
->localized_scrub_version
= scrub_infop
->last_local
.version
;
1386 pf
->recursive_scrub_stamp
= scrub_infop
->last_recursive
.time
;
1387 pf
->recursive_scrub_version
= scrub_infop
->last_recursive
.version
;
1388 scrub_infop
->last_scrub_dirty
= false;
1389 scrub_maybe_delete_info();
1392 projected_fnode
.emplace_back(pf
);
1394 mut
->add_projected_node(this);
1395 dout(10) << __func__
<< " " << pf
.get() << dendl
;
1399 void CDir::pop_and_dirty_projected_fnode(LogSegment
*ls
, const MutationRef
& mut
)
1401 ceph_assert(!projected_fnode
.empty());
1402 auto pf
= std::move(projected_fnode
.front());
1403 dout(15) << __func__
<< " " << pf
.get() << " v" << pf
->version
<< dendl
;
1405 projected_fnode
.pop_front();
1407 mut
->remove_projected_node(this);
1409 reset_fnode(std::move(pf
));
1413 version_t
CDir::pre_dirty(version_t min
)
1415 if (min
> projected_version
)
1416 projected_version
= min
;
1417 ++projected_version
;
1418 dout(10) << __func__
<< " " << projected_version
<< dendl
;
1419 return projected_version
;
1422 void CDir::mark_dirty(LogSegment
*ls
, version_t pv
)
1424 ceph_assert(is_auth());
1427 ceph_assert(get_version() < pv
);
1428 ceph_assert(pv
<= projected_version
);
1429 ceph_assert(!projected_fnode
.empty() &&
1430 pv
<= projected_fnode
.front()->version
);
1436 void CDir::_mark_dirty(LogSegment
*ls
)
1438 if (!state_test(STATE_DIRTY
)) {
1439 dout(10) << __func__
<< " (was clean) " << *this << " version " << get_version() << dendl
;
1443 dout(10) << __func__
<< " (already dirty) " << *this << " version " << get_version() << dendl
;
1446 ls
->dirty_dirfrags
.push_back(&item_dirty
);
1448 // if i've never committed, i need to be before _any_ mention of me is trimmed from the journal.
1449 if (committed_version
== 0 && !item_new
.is_on_list())
1450 ls
->new_dirfrags
.push_back(&item_new
);
1454 void CDir::mark_new(LogSegment
*ls
)
1456 ls
->new_dirfrags
.push_back(&item_new
);
1457 state_clear(STATE_CREATING
);
1459 MDSContext::vec waiters
;
1460 take_waiting(CDir::WAIT_CREATED
, waiters
);
1461 mdcache
->mds
->queue_waiters(waiters
);
1464 void CDir::set_fresh_fnode(fnode_const_ptr
&& ptr
) {
1465 ceph_assert(inode
->is_auth());
1466 ceph_assert(!is_projected());
1467 ceph_assert(!state_test(STATE_COMMITTING
));
1468 reset_fnode(std::move(ptr
));
1469 projected_version
= committing_version
= committed_version
= get_version();
1471 if (state_test(STATE_REJOINUNDEF
)) {
1472 ceph_assert(mdcache
->mds
->is_rejoin());
1473 state_clear(STATE_REJOINUNDEF
);
1474 mdcache
->opened_undef_dirfrag(this);
1478 void CDir::mark_clean()
1480 dout(10) << __func__
<< " " << *this << " version " << get_version() << dendl
;
1481 if (state_test(STATE_DIRTY
)) {
1482 item_dirty
.remove_myself();
1483 item_new
.remove_myself();
1485 state_clear(STATE_DIRTY
);
1490 // caller should hold auth pin of this
1491 void CDir::log_mark_dirty()
1493 if (is_dirty() || projected_version
> get_version())
1494 return; // noop if it is already dirty or will be dirty
1496 auto _fnode
= allocate_fnode(*get_fnode());
1497 _fnode
->version
= pre_dirty();
1498 reset_fnode(std::move(_fnode
));
1499 mark_dirty(mdcache
->mds
->mdlog
->get_current_segment());
1502 void CDir::mark_complete() {
1503 state_set(STATE_COMPLETE
);
1507 void CDir::first_get()
1509 inode
->get(CInode::PIN_DIRFRAG
);
1512 void CDir::last_put()
1514 inode
->put(CInode::PIN_DIRFRAG
);
1519 /******************************************************************************
1523 // -----------------------
1525 void CDir::fetch(std::string_view dname
, snapid_t last
,
1526 MDSContext
*c
, bool ignore_authpinnability
)
1529 dout(10) << "fetch on " << *this << dendl
;
1531 dout(10) << "fetch key(" << dname
<< ", '" << last
<< "')" << dendl
;
1533 ceph_assert(is_auth());
1534 ceph_assert(!is_complete());
1536 if (!ignore_authpinnability
&& !can_auth_pin()) {
1538 dout(7) << "fetch waiting for authpinnable" << dendl
;
1539 add_waiter(WAIT_UNFREEZE
, c
);
1541 dout(7) << "fetch not authpinnable and no context" << dendl
;
1545 // unlinked directory inode shouldn't have any entry
1546 if (CDir
*pdir
= get_parent_dir();
1547 pdir
&& pdir
->inode
->is_stray() && !inode
->snaprealm
) {
1548 dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl
;
1549 if (get_version() == 0) {
1550 auto _fnode
= allocate_fnode();
1551 _fnode
->version
= 1;
1552 set_fresh_fnode(std::move(_fnode
));
1557 mdcache
->mds
->queue_waiter(c
);
1561 // FIXME: to fetch a snap dentry, we need to get omap key in range
1562 // [(name, last), (name, CEPH_NOSNAP))
1563 if (!dname
.empty() && last
== CEPH_NOSNAP
&& !g_conf().get_val
<bool>("mds_dir_prefetch")) {
1564 dentry_key_t
key(last
, dname
, inode
->hash_dentry_name(dname
));
1565 fetch_keys({key
}, c
);
1570 add_waiter(WAIT_COMPLETE
, c
);
1572 // already fetching?
1573 if (state_test(CDir::STATE_FETCHING
)) {
1574 dout(7) << "already fetching; waiting" << dendl
;
1579 state_set(CDir::STATE_FETCHING
);
1581 _omap_fetch(nullptr, nullptr);
1583 if (mdcache
->mds
->logger
)
1584 mdcache
->mds
->logger
->inc(l_mds_dir_fetch_complete
);
1585 mdcache
->mds
->balancer
->hit_dir(this, META_POP_FETCH
);
1588 void CDir::fetch_keys(const std::vector
<dentry_key_t
>& keys
, MDSContext
*c
)
1590 dout(10) << __func__
<< " " << keys
.size() << " keys on " << *this << dendl
;
1591 ceph_assert(is_auth());
1592 ceph_assert(!is_complete());
1594 if (CDir
*pdir
= get_parent_dir();
1595 pdir
&& pdir
->inode
->is_stray() && !inode
->snaprealm
) {
1600 MDSContext::vec_alloc
<mempool::mds_co::pool_allocator
> *fallback_waiting
= nullptr;
1601 std::set
<std::string
> str_keys
;
1602 for (auto& key
: keys
) {
1603 ceph_assert(key
.snapid
== CEPH_NOSNAP
);
1604 if (waiting_on_dentry
.empty())
1606 auto em
= waiting_on_dentry
.emplace(std::piecewise_construct
,
1607 std::forward_as_tuple(key
.name
, key
.snapid
),
1608 std::forward_as_tuple());
1610 if (!fallback_waiting
)
1611 fallback_waiting
= &em
.first
->second
;
1616 em
.first
->second
.push_back(c
);
1622 str_keys
.emplace(std::move(str
));
1625 if (str_keys
.empty()) {
1626 if (c
&& fallback_waiting
) {
1627 fallback_waiting
->push_back(c
);
1631 if (get_version() > 0) {
1632 dout(7) << "fetch keys, all are already being fetched" << dendl
;
1638 if (state_test(CDir::STATE_FETCHING
)) {
1639 dout(7) << "fetch keys, waiting for full fetch" << dendl
;
1641 add_waiter(WAIT_COMPLETE
, c
);
1646 _omap_fetch(&str_keys
, c
);
1648 if (mdcache
->mds
->logger
)
1649 mdcache
->mds
->logger
->inc(l_mds_dir_fetch_keys
);
1650 mdcache
->mds
->balancer
->hit_dir(this, META_POP_FETCH
);
1653 class C_IO_Dir_OMAP_FetchedMore
: public CDirIOContext
{
1656 const version_t omap_version
;
1659 map
<string
, bufferlist
> omap
; ///< carry-over from before
1660 map
<string
, bufferlist
> omap_more
; ///< new batch
1662 C_IO_Dir_OMAP_FetchedMore(CDir
*d
, version_t v
, MDSContext
*f
) :
1663 CDirIOContext(d
), fin(f
), omap_version(v
), ret(0) { }
1664 void finish(int r
) {
1665 if (omap_version
< dir
->get_committed_version()) {
1667 dir
->_omap_fetch(nullptr, fin
);
1673 omap
.swap(omap_more
);
1675 omap
.insert(omap_more
.begin(), omap_more
.end());
1678 dir
->_omap_fetch_more(omap_version
, hdrbl
, omap
, fin
);
1680 dir
->_omap_fetched(hdrbl
, omap
, true, {}, r
);
1685 void print(ostream
& out
) const override
{
1686 out
<< "dirfrag_fetch_more(" << dir
->dirfrag() << ")";
1690 class C_IO_Dir_OMAP_Fetched
: public CDirIOContext
{
1693 const version_t omap_version
;
1694 bool complete
= true;
1695 std::set
<string
> keys
;
1698 map
<string
, bufferlist
> omap
;
1700 int ret1
, ret2
, ret3
;
1702 C_IO_Dir_OMAP_Fetched(CDir
*d
, MDSContext
*f
) :
1703 CDirIOContext(d
), fin(f
),
1704 omap_version(d
->get_committing_version()),
1705 ret1(0), ret2(0), ret3(0) { }
1706 void finish(int r
) override
{
1707 // check the correctness of backtrace
1708 if (r
>= 0 && ret3
!= -CEPHFS_ECANCELED
)
1709 dir
->inode
->verify_diri_backtrace(btbl
, ret3
);
1710 if (r
>= 0) r
= ret1
;
1711 if (r
>= 0) r
= ret2
;
1714 if (omap_version
< dir
->get_committed_version()) {
1715 dir
->_omap_fetch(nullptr, fin
);
1717 dir
->_omap_fetch_more(omap_version
, hdrbl
, omap
, fin
);
1722 dir
->_omap_fetched(hdrbl
, omap
, complete
, keys
, r
);
1726 void print(ostream
& out
) const override
{
1727 out
<< "dirfrag_fetch(" << dir
->dirfrag() << ")";
1731 void CDir::_omap_fetch(std::set
<string
> *keys
, MDSContext
*c
)
1733 C_IO_Dir_OMAP_Fetched
*fin
= new C_IO_Dir_OMAP_Fetched(this, c
);
1734 object_t oid
= get_ondisk_object();
1735 object_locator_t
oloc(mdcache
->mds
->mdsmap
->get_metadata_pool());
1737 rd
.omap_get_header(&fin
->hdrbl
, &fin
->ret1
);
1739 fin
->complete
= false;
1740 fin
->keys
.swap(*keys
);
1741 rd
.omap_get_vals_by_keys(fin
->keys
, &fin
->omap
, &fin
->ret2
);
1744 rd
.omap_get_vals("", "", g_conf()->mds_dir_keys_per_op
,
1745 &fin
->omap
, &fin
->more
, &fin
->ret2
);
1747 // check the correctness of backtrace
1748 if (g_conf()->mds_verify_backtrace
> 0 && frag
== frag_t()) {
1749 rd
.getxattr("parent", &fin
->btbl
, &fin
->ret3
);
1750 rd
.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1752 fin
->ret3
= -CEPHFS_ECANCELED
;
1755 mdcache
->mds
->objecter
->read(oid
, oloc
, rd
, CEPH_NOSNAP
, NULL
, 0,
1756 new C_OnFinisher(fin
, mdcache
->mds
->finisher
));
1759 void CDir::_omap_fetch_more(version_t omap_version
, bufferlist
& hdrbl
,
1760 map
<string
, bufferlist
>& omap
, MDSContext
*c
)
1762 // we have more omap keys to fetch!
1763 object_t oid
= get_ondisk_object();
1764 object_locator_t
oloc(mdcache
->mds
->mdsmap
->get_metadata_pool());
1765 auto fin
= new C_IO_Dir_OMAP_FetchedMore(this, omap_version
, c
);
1766 fin
->hdrbl
= std::move(hdrbl
);
1767 fin
->omap
.swap(omap
);
1769 rd
.omap_get_vals(fin
->omap
.rbegin()->first
,
1770 "", /* filter prefix */
1771 g_conf()->mds_dir_keys_per_op
,
1775 mdcache
->mds
->objecter
->read(oid
, oloc
, rd
, CEPH_NOSNAP
, NULL
, 0,
1776 new C_OnFinisher(fin
, mdcache
->mds
->finisher
));
1779 CDentry
*CDir::_load_dentry(
1780 std::string_view key
,
1781 std::string_view dname
,
1782 const snapid_t last
,
1785 const std::set
<snapid_t
> *snaps
,
1786 double rand_threshold
,
1789 auto q
= bl
.cbegin();
1798 dout(20) << "_fetched pos " << pos
<< " marker '" << type
<< "' dname '" << dname
1799 << " [" << first
<< "," << last
<< "]"
1803 if (snaps
&& last
!= CEPH_NOSNAP
) {
1804 set
<snapid_t
>::const_iterator p
= snaps
->lower_bound(first
);
1805 if (p
== snaps
->end() || *p
> last
) {
1806 dout(10) << " skipping stale dentry on [" << first
<< "," << last
<< "]" << dendl
;
1812 * look for existing dentry for _last_ snap, because unlink +
1813 * create may leave a "hole" (epochs during which the dentry
1814 * doesn't exist) but for which no explicit negative dentry is in
1819 dn
= lookup_exact_snap(dname
, last
);
1821 dn
= lookup(dname
, last
);
1823 if (type
== 'L' || type
== 'l') {
1826 unsigned char d_type
;
1827 mempool::mds_co::string alternate_name
;
1829 CDentry::decode_remote(type
, ino
, d_type
, alternate_name
, q
);
1833 stale_items
.insert(mempool::mds_co::string(key
));
1834 *force_dirty
= true;
1840 CDentry::linkage_t
*dnl
= dn
->get_linkage();
1841 dout(12) << "_fetched had " << (dnl
->is_null() ? "NEG" : "") << " dentry " << *dn
<< dendl
;
1842 if (committed_version
== 0 &&
1845 ino
== dnl
->get_remote_ino() &&
1846 d_type
== dnl
->get_remote_d_type() &&
1847 alternate_name
== dn
->get_alternate_name()) {
1848 // see comment below
1849 dout(10) << "_fetched had underwater dentry " << *dn
<< ", marking clean" << dendl
;
1854 dn
= add_remote_dentry(dname
, ino
, d_type
, std::move(alternate_name
), first
, last
);
1857 CInode
*in
= mdcache
->get_inode(ino
); // we may or may not have it.
1859 dn
->link_remote(dn
->get_linkage(), in
);
1860 dout(12) << "_fetched got remote link " << ino
<< " which we have " << *in
<< dendl
;
1862 dout(12) << "_fetched got remote link " << ino
<< " (don't have it)" << dendl
;
1866 else if (type
== 'I' || type
== 'i') {
1867 InodeStore inode_data
;
1868 mempool::mds_co::string alternate_name
;
1870 // Load inode data before looking up or constructing CInode
1873 if (struct_v
>= 2) {
1874 decode(alternate_name
, q
);
1876 inode_data
.decode(q
);
1879 inode_data
.decode_bare(q
);
1884 stale_items
.insert(mempool::mds_co::string(key
));
1885 *force_dirty
= true;
1890 bool undef_inode
= false;
1892 CDentry::linkage_t
*dnl
= dn
->get_linkage();
1893 dout(12) << "_fetched had " << (dnl
->is_null() ? "NEG" : "") << " dentry " << *dn
<< dendl
;
1895 if (dnl
->is_primary()) {
1896 CInode
*in
= dnl
->get_inode();
1897 if (in
->state_test(CInode::STATE_REJOINUNDEF
)) {
1899 } else if (committed_version
== 0 &&
1901 inode_data
.inode
->ino
== in
->ino() &&
1902 inode_data
.inode
->version
== in
->get_version()) {
1903 /* clean underwater item?
1904 * Underwater item is something that is dirty in our cache from
1905 * journal replay, but was previously flushed to disk before the
1908 * We only do this is committed_version == 0. that implies either
1909 * - this is a fetch after from a clean/empty CDir is created
1910 * (and has no effect, since the dn won't exist); or
1911 * - this is a fetch after _recovery_, which is what we're worried
1912 * about. Items that are marked dirty from the journal should be
1913 * marked clean if they appear on disk.
1915 dout(10) << "_fetched had underwater dentry " << *dn
<< ", marking clean" << dendl
;
1917 dout(10) << "_fetched had underwater inode " << *dnl
->get_inode() << ", marking clean" << dendl
;
1923 if (!dn
|| undef_inode
) {
1925 CInode
*in
= mdcache
->get_inode(inode_data
.inode
->ino
, last
);
1926 if (!in
|| undef_inode
) {
1927 if (undef_inode
&& in
)
1930 in
= new CInode(mdcache
, true, first
, last
);
1932 in
->reset_inode(std::move(inode_data
.inode
));
1933 in
->reset_xattrs(std::move(inode_data
.xattrs
));
1935 if (in
->is_symlink())
1936 in
->symlink
= inode_data
.symlink
;
1938 in
->dirfragtree
.swap(inode_data
.dirfragtree
);
1939 in
->reset_old_inodes(std::move(inode_data
.old_inodes
));
1940 if (in
->is_any_old_inodes()) {
1941 snapid_t min_first
= in
->get_old_inodes()->rbegin()->first
+ 1;
1942 if (min_first
> in
->first
)
1943 in
->first
= min_first
;
1946 in
->oldest_snap
= inode_data
.oldest_snap
;
1947 in
->decode_snap_blob(inode_data
.snap_blob
);
1948 if (snaps
&& !in
->snaprealm
)
1949 in
->purge_stale_snap_data(*snaps
);
1952 mdcache
->add_inode(in
); // add
1953 dn
= add_primary_dentry(dname
, in
, std::move(alternate_name
), first
, last
); // link
1955 dout(12) << "_fetched got " << *dn
<< " " << *in
<< dendl
;
1957 if (in
->get_inode()->is_dirty_rstat())
1958 in
->mark_dirty_rstat();
1960 in
->maybe_ephemeral_rand(rand_threshold
);
1961 //in->hack_accessed = false;
1962 //in->hack_load_stamp = ceph_clock_now();
1963 //num_new_inodes_loaded++;
1964 } else if (g_conf().get_val
<bool>("mds_hack_allow_loading_invalid_metadata")) {
1965 dout(20) << "hack: adding duplicate dentry for " << *in
<< dendl
;
1966 dn
= add_primary_dentry(dname
, in
, std::move(alternate_name
), first
, last
);
1968 dout(0) << "_fetched badness: got (but i already had) " << *in
1969 << " mode " << in
->get_inode()->mode
1970 << " mtime " << in
->get_inode()->mtime
<< dendl
;
1971 string dirpath
, inopath
;
1972 this->inode
->make_path_string(dirpath
);
1973 in
->make_path_string(inopath
);
1974 mdcache
->mds
->clog
->error() << "loaded dup inode " << inode_data
.inode
->ino
1975 << " [" << first
<< "," << last
<< "] v" << inode_data
.inode
->version
1976 << " at " << dirpath
<< "/" << dname
1977 << ", but inode " << in
->vino() << " v" << in
->get_version()
1978 << " already exists at " << inopath
;
1983 CachedStackStringStream css
;
1984 *css
<< "Invalid tag char '" << type
<< "' pos " << pos
;
1985 throw buffer::malformed_input(css
->str());
1991 void CDir::_omap_fetched(bufferlist
& hdrbl
, map
<string
, bufferlist
>& omap
,
1992 bool complete
, const std::set
<string
>& keys
, int r
)
1994 LogChannelRef clog
= mdcache
->mds
->clog
;
1995 dout(10) << "_fetched header " << hdrbl
.length() << " bytes "
1996 << omap
.size() << " keys for " << *this << dendl
;
1998 ceph_assert(r
== 0 || r
== -CEPHFS_ENOENT
|| r
== -CEPHFS_ENODATA
);
1999 ceph_assert(is_auth());
2000 ceph_assert(!is_frozen());
2002 if (hdrbl
.length() == 0) {
2003 dout(0) << "_fetched missing object for " << *this << dendl
;
2005 clog
->error() << "dir " << dirfrag() << " object missing on disk; some "
2006 "files may be lost (" << get_path() << ")";
2014 auto p
= hdrbl
.cbegin();
2016 decode(got_fnode
, p
);
2017 } catch (const buffer::error
&err
) {
2018 derr
<< "Corrupt fnode in dirfrag " << dirfrag()
2019 << ": " << err
.what() << dendl
;
2020 clog
->warn() << "Corrupt fnode header in " << dirfrag() << ": "
2021 << err
.what() << " (" << get_path() << ")";
2026 clog
->warn() << "header buffer of dir " << dirfrag() << " has "
2027 << hdrbl
.length() - p
.get_off() << " extra bytes ("
2028 << get_path() << ")";
2034 dout(10) << "_fetched version " << got_fnode
.version
<< dendl
;
2036 // take the loaded fnode?
2037 // only if we are a fresh CDir* with no prior state.
2038 if (get_version() == 0) {
2039 set_fresh_fnode(allocate_fnode(got_fnode
));
2042 list
<CInode
*> undef_inodes
;
2044 // purge stale snaps?
2045 bool force_dirty
= false;
2046 const set
<snapid_t
> *snaps
= NULL
;
2047 SnapRealm
*realm
= inode
->find_snaprealm();
2048 if (fnode
->snap_purged_thru
< realm
->get_last_destroyed()) {
2049 snaps
= &realm
->get_snaps();
2050 dout(10) << " snap_purged_thru " << fnode
->snap_purged_thru
2051 << " < " << realm
->get_last_destroyed()
2052 << ", snap purge based on " << *snaps
<< dendl
;
2053 if (get_num_snap_items() == 0) {
2054 const_cast<snapid_t
&>(fnode
->snap_purged_thru
) = realm
->get_last_destroyed();
2060 MDSContext::vec finished
;
2061 std::vector
<string_snap_t
> null_keys
;
2063 auto k_it
= keys
.rbegin();
2064 auto w_it
= waiting_on_dentry
.rbegin();
2065 std::string_view last_name
= "";
2067 auto proc_waiters
= [&](const string_snap_t
& key
) {
2069 if (last_name
< key
.name
) {
2070 // string_snap_t and key string are not in the same order
2071 w_it
= decltype(w_it
)(waiting_on_dentry
.upper_bound(key
));
2073 while (w_it
!= waiting_on_dentry
.rend()) {
2074 int cmp
= w_it
->first
.compare(key
);
2079 std::copy(w_it
->second
.begin(), w_it
->second
.end(),
2080 std::back_inserter(finished
));
2081 waiting_on_dentry
.erase(std::next(w_it
).base());
2082 if (waiting_on_dentry
.empty())
2090 auto proc_nulls_and_waiters
= [&](const string
& str_key
, const string_snap_t
& key
) {
2094 while (k_it
!= keys
.rend()) {
2095 int cmp
= k_it
->compare(str_key
);
2104 string_snap_t n_key
;
2105 dentry_key_t::decode_helper(*k_it
, n_key
.name
, n_key
.snapid
);
2106 ceph_assert(n_key
.snapid
== CEPH_NOSNAP
);
2107 proc_waiters(n_key
);
2108 last_name
= std::string_view(k_it
->c_str(), n_key
.name
.length());
2109 null_keys
.emplace_back(std::move(n_key
));
2112 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace()))
2113 mdcache
->mds
->heartbeat_reset();
2119 unsigned pos
= omap
.size() - 1;
2120 double rand_threshold
= get_inode()->get_ephemeral_rand();
2121 for (auto p
= omap
.rbegin(); p
!= omap
.rend(); ++p
, --pos
) {
2123 dentry_key_t::decode_helper(p
->first
, key
.name
, key
.snapid
);
2126 if (key
.snapid
== CEPH_NOSNAP
) {
2128 touch
= proc_waiters(key
);
2130 touch
= proc_nulls_and_waiters(p
->first
, key
);
2132 last_name
= std::string_view(p
->first
.c_str(), key
.name
.length());
2135 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace()))
2136 mdcache
->mds
->heartbeat_reset();
2138 CDentry
*dn
= nullptr;
2141 p
->first
, key
.name
, key
.snapid
, p
->second
, pos
, snaps
,
2142 rand_threshold
, &force_dirty
);
2143 } catch (const buffer::error
&err
) {
2144 mdcache
->mds
->clog
->warn() << "Corrupt dentry '" << key
.name
<< "' in "
2145 "dir frag " << dirfrag() << ": "
2146 << err
.what() << "(" << get_path() << ")";
2148 // Remember that this dentry is damaged. Subsequent operations
2149 // that try to act directly on it will get their CEPHFS_EIOs, but this
2150 // dirfrag as a whole will continue to look okay (minus the
2151 // mysteriously-missing dentry)
2152 go_bad_dentry(key
.snapid
, key
.name
);
2154 // Anyone who was WAIT_DENTRY for this guy will get kicked
2155 // to RetryRequest, and hit the DamageTable-interrogating path.
2156 // Stats will now be bogus because we will think we're complete,
2157 // but have 1 or more missing dentries.
2165 dout(10) << " touching wanted dn " << *dn
<< dendl
;
2166 mdcache
->touch_dentry(dn
);
2169 CDentry::linkage_t
*dnl
= dn
->get_linkage();
2170 if (dnl
->is_primary() && dnl
->get_inode()->state_test(CInode::STATE_REJOINUNDEF
))
2171 undef_inodes
.push_back(dnl
->get_inode());
2175 if (!waiting_on_dentry
.empty()) {
2176 for (auto &p
: waiting_on_dentry
) {
2177 std::copy(p
.second
.begin(), p
.second
.end(), std::back_inserter(finished
));
2178 if (p
.first
.snapid
== CEPH_NOSNAP
)
2179 null_keys
.emplace_back(p
.first
);
2181 waiting_on_dentry
.clear();
2185 proc_nulls_and_waiters("", string_snap_t());
2188 if (!null_keys
.empty()) {
2189 snapid_t first
= mdcache
->get_global_snaprealm()->get_newest_seq() + 1;
2190 for (auto& key
: null_keys
) {
2191 CDentry
* dn
= lookup(key
.name
, key
.snapid
);
2193 dout(12) << "_fetched got null for key " << key
<< ", have " << *dn
<< dendl
;
2195 dn
= add_null_dentry(key
.name
, first
, key
.snapid
);
2196 dout(12) << "_fetched got null for key " << key
<< ", added " << *dn
<< dendl
;
2198 mdcache
->touch_dentry(dn
);
2200 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace(2)))
2201 mdcache
->mds
->heartbeat_reset();
2205 //cache->mds->logger->inc("newin", num_new_inodes_loaded);
2207 // mark complete, !fetching
2210 state_clear(STATE_FETCHING
);
2211 take_waiting(WAIT_COMPLETE
, finished
);
2214 // open & force frags
2215 while (!undef_inodes
.empty()) {
2216 CInode
*in
= undef_inodes
.front();
2218 undef_inodes
.pop_front();
2219 in
->state_clear(CInode::STATE_REJOINUNDEF
);
2220 mdcache
->opened_undef_inode(in
);
2222 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace()))
2223 mdcache
->mds
->heartbeat_reset();
2226 // dirty myself to remove stale snap dentries
2227 if (force_dirty
&& !mdcache
->is_readonly())
2232 if (!finished
.empty())
2233 mdcache
->mds
->queue_waiters(finished
);
2236 void CDir::go_bad_dentry(snapid_t last
, std::string_view dname
)
2238 dout(10) << __func__
<< " " << dname
<< dendl
;
2239 std::string
path(get_path());
2242 const bool fatal
= mdcache
->mds
->damage_table
.notify_dentry(
2243 inode
->ino(), frag
, last
, dname
, path
);
2245 mdcache
->mds
->damaged();
2246 ceph_abort(); // unreachable, damaged() respawns us
2250 void CDir::go_bad(bool complete
)
2252 dout(10) << __func__
<< " " << frag
<< dendl
;
2253 const bool fatal
= mdcache
->mds
->damage_table
.notify_dirfrag(
2254 inode
->ino(), frag
, get_path());
2256 mdcache
->mds
->damaged();
2257 ceph_abort(); // unreachable, damaged() respawns us
2261 if (get_version() == 0) {
2262 auto _fnode
= allocate_fnode();
2263 _fnode
->version
= 1;
2264 reset_fnode(std::move(_fnode
));
2267 state_set(STATE_BADFRAG
);
2271 state_clear(STATE_FETCHING
);
2273 finish_waiting(WAIT_COMPLETE
, -CEPHFS_EIO
);
2276 // -----------------------
2282 * @param want - min version i want committed
2283 * @param c - callback for completion
2285 void CDir::commit(version_t want
, MDSContext
*c
, bool ignore_authpinnability
, int op_prio
)
2287 dout(10) << "commit want " << want
<< " on " << *this << dendl
;
2288 if (want
== 0) want
= get_version();
2291 ceph_assert(want
<= get_version() || get_version() == 0); // can't commit the future
2292 ceph_assert(want
> committed_version
); // the caller is stupid
2293 ceph_assert(is_auth());
2294 ceph_assert(ignore_authpinnability
|| can_auth_pin());
2296 // note: queue up a noop if necessary, so that we always
2299 c
= new C_MDSInternalNoop
;
2301 // auth_pin on first waiter
2302 if (waiting_for_commit
.empty())
2304 waiting_for_commit
[want
].push_back(c
);
2307 _commit(want
, op_prio
);
2310 class C_IO_Dir_Committed
: public CDirIOContext
{
2313 C_IO_Dir_Committed(CDir
*d
, version_t v
) : CDirIOContext(d
), version(v
) { }
2314 void finish(int r
) override
{
2315 dir
->_committed(r
, version
);
2317 void print(ostream
& out
) const override
{
2318 out
<< "dirfrag_committed(" << dir
->dirfrag() << ")";
2322 class C_IO_Dir_Commit_Ops
: public Context
{
2324 C_IO_Dir_Commit_Ops(CDir
*d
, int pr
,
2325 vector
<CDir::dentry_commit_item
> &&s
, bufferlist
&&bl
,
2327 mempool::mds_co::compact_set
<mempool::mds_co::string
> &&stales
) :
2328 dir(d
), op_prio(pr
) {
2329 metapool
= dir
->mdcache
->mds
->get_metadata_pool();
2330 version
= dir
->get_version();
2331 is_new
= dir
->is_new();
2335 stale_items
.swap(stales
);
2338 void finish(int r
) override
{
2339 dir
->_omap_commit_ops(r
, op_prio
, metapool
, version
, is_new
, to_set
, dfts
,
2340 to_remove
, stale_items
);
2349 vector
<CDir::dentry_commit_item
> to_set
;
2351 vector
<string
> to_remove
;
2352 mempool::mds_co::compact_set
<mempool::mds_co::string
> stale_items
;
2355 // This is doing the same thing with the InodeStoreBase::encode()
2356 void CDir::_encode_primary_inode_base(dentry_commit_item
&item
, bufferlist
&dfts
,
2359 ENCODE_START(6, 4, bl
);
2360 encode(*item
.inode
, bl
, item
.features
);
2362 if (!item
.symlink
.empty())
2363 encode(item
.symlink
, bl
);
2366 dfts
.splice(0, item
.dft_len
, &bl
);
2369 encode(*item
.xattrs
, bl
);
2371 encode((__u32
)0, bl
);
2373 if (item
.snaprealm
) {
2374 bufferlist snapr_bl
;
2375 encode(item
.srnode
, snapr_bl
);
2376 encode(snapr_bl
, bl
);
2378 encode(bufferlist(), bl
);
2381 if (item
.old_inodes
)
2382 encode(*item
.old_inodes
, bl
, item
.features
);
2384 encode((__u32
)0, bl
);
2386 encode(item
.oldest_snap
, bl
);
2387 encode(item
.damage_flags
, bl
);
2391 // This is not locked by mds_lock
2392 void CDir::_omap_commit_ops(int r
, int op_prio
, int64_t metapool
, version_t version
, bool _new
,
2393 vector
<dentry_commit_item
> &to_set
, bufferlist
&dfts
,
2394 vector
<string
>& to_remove
,
2395 mempool::mds_co::compact_set
<mempool::mds_co::string
> &stales
)
2397 dout(10) << __func__
<< dendl
;
2400 mdcache
->mds
->handle_write_error_with_lock(r
);
2404 C_GatherBuilder
gather(g_ceph_context
,
2405 new C_OnFinisher(new C_IO_Dir_Committed(this, version
),
2406 mdcache
->mds
->finisher
));
2409 object_t oid
= get_ondisk_object();
2410 object_locator_t
oloc(metapool
);
2412 map
<string
, bufferlist
> _set
;
2415 unsigned max_write_size
= mdcache
->max_dir_commit_size
;
2416 unsigned write_size
= 0;
2418 auto commit_one
= [&](bool header
=false) {
2422 * Shouldn't submit empty op to Rados, which could cause
2423 * the cephfs to become readonly.
2425 ceph_assert(header
|| !_set
.empty() || !_rm
.empty());
2428 // don't create new dirfrag blindly
2430 op
.stat(nullptr, nullptr, nullptr);
2433 * save the header at the last moment.. If we were to send it off before
2434 * other updates, but die before sending them all, we'd think that the
2435 * on-disk state was fully committed even though it wasn't! However, since
2436 * the messages are strictly ordered between the MDS and the OSD, and
2437 * since messages to a given PG are strictly ordered, if we simply send
2438 * the message containing the header off last, we cannot get our header
2439 * into an incorrect state.
2443 encode(*fnode
, header
);
2444 op
.omap_set_header(header
);
2447 op
.priority
= op_prio
;
2451 op
.omap_rm_keys(_rm
);
2452 mdcache
->mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
2453 ceph::real_clock::now(),
2454 0, gather
.new_sub());
2461 for (auto &key
: stales
) {
2462 unsigned size
= key
.length() + sizeof(__u32
);
2463 if (write_size
> 0 && write_size
+ size
> max_write_size
)
2469 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace(2)))
2470 mdcache
->mds
->heartbeat_reset();
2473 for (auto &key
: to_remove
) {
2474 unsigned size
= key
.length() + sizeof(__u32
);
2475 if (write_size
> 0 && write_size
+ size
> max_write_size
)
2479 _rm
.emplace(std::move(key
));
2481 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace(2)))
2482 mdcache
->mds
->heartbeat_reset();
2487 for (auto &item
: to_set
) {
2488 encode(item
.first
, bl
);
2489 if (item
.is_remote
) {
2491 CDentry::encode_remote(item
.ino
, item
.d_type
, item
.alternate_name
, bl
);
2493 // marker, name, inode, [symlink string]
2494 bl
.append('i'); // inode
2496 ENCODE_START(2, 1, bl
);
2497 encode(item
.alternate_name
, bl
);
2498 _encode_primary_inode_base(item
, dfts
, bl
);
2502 unsigned size
= item
.key
.length() + bl
.length() + 2 * sizeof(__u32
);
2503 if (write_size
> 0 && write_size
+ size
> max_write_size
)
2507 _set
[std::move(item
.key
)].swap(bl
);
2509 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace()))
2510 mdcache
->mds
->heartbeat_reset();
2518 * Flush out the modified dentries in this dir. Keep the bufferlist
2519 * below max_write_size;
2521 void CDir::_omap_commit(int op_prio
)
2523 dout(10) << __func__
<< dendl
;
2526 op_prio
= CEPH_MSG_PRIO_DEFAULT
;
2529 const set
<snapid_t
> *snaps
= NULL
;
2530 SnapRealm
*realm
= inode
->find_snaprealm();
2531 if (fnode
->snap_purged_thru
< realm
->get_last_destroyed()) {
2532 snaps
= &realm
->get_snaps();
2533 dout(10) << " snap_purged_thru " << fnode
->snap_purged_thru
2534 << " < " << realm
->get_last_destroyed()
2535 << ", snap purge based on " << *snaps
<< dendl
;
2536 // fnode.snap_purged_thru = realm->get_last_destroyed();
2539 size_t items_count
= 0;
2540 if (state_test(CDir::STATE_FRAGMENTING
) && is_new()) {
2541 items_count
= get_num_head_items() + get_num_snap_items();
2543 for (elist
<CDentry
*>::iterator it
= dirty_dentries
.begin(); !it
.end(); ++it
)
2547 vector
<string
> to_remove
;
2548 // reverve enough memories, which maybe larger than the actually needed
2549 to_remove
.reserve(items_count
);
2551 vector
<dentry_commit_item
> to_set
;
2552 // reverve enough memories, which maybe larger than the actually needed
2553 to_set
.reserve(items_count
);
2555 // for dir fragtrees
2556 bufferlist
dfts(CEPH_PAGE_SIZE
);
2558 auto write_one
= [&](CDentry
*dn
) {
2560 dn
->key().encode(key
);
2562 if (!dn
->corrupt_first_loaded
) {
2563 dn
->check_corruption(false);
2566 if (snaps
&& try_trim_snap_dentry(dn
, *snaps
)) {
2567 dout(10) << " rm " << key
<< dendl
;
2568 to_remove
.emplace_back(std::move(key
));
2572 if (dn
->get_linkage()->is_null()) {
2573 dout(10) << " rm " << dn
->get_name() << " " << *dn
<< dendl
;
2574 to_remove
.emplace_back(std::move(key
));
2576 dout(10) << " set " << dn
->get_name() << " " << *dn
<< dendl
;
2578 uint64_t off
= dfts
.length();
2579 // try to reserve new size if there has less
2580 // than 1/8 page space
2581 uint64_t left
= CEPH_PAGE_SIZE
- off
% CEPH_PAGE_SIZE
;
2582 if (left
< CEPH_PAGE_SIZE
/ 8)
2583 dfts
.reserve(left
+ CEPH_PAGE_SIZE
);
2585 auto& item
= to_set
.emplace_back();
2586 item
.key
= std::move(key
);
2587 _parse_dentry(dn
, item
, snaps
, dfts
);
2588 item
.dft_len
= dfts
.length() - off
;
2593 if (state_test(CDir::STATE_FRAGMENTING
) && is_new()) {
2594 ceph_assert(committed_version
== 0);
2595 for (auto p
= items
.begin(); p
!= items
.end(); ) {
2596 CDentry
*dn
= p
->second
;
2598 if (dn
->get_linkage()->is_null())
2602 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace()))
2603 mdcache
->mds
->heartbeat_reset();
2606 for (auto p
= dirty_dentries
.begin(); !p
.end(); ) {
2611 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace()))
2612 mdcache
->mds
->heartbeat_reset();
2616 auto c
= new C_IO_Dir_Commit_Ops(this, op_prio
, std::move(to_set
), std::move(dfts
),
2617 std::move(to_remove
), std::move(stale_items
));
2618 stale_items
.clear();
2619 mdcache
->mds
->finisher
->queue(c
);
2622 void CDir::_parse_dentry(CDentry
*dn
, dentry_commit_item
&item
,
2623 const set
<snapid_t
> *snaps
, bufferlist
&bl
)
2625 // clear dentry NEW flag, if any. we can no longer silently drop it.
2628 item
.first
= dn
->first
;
2630 // primary or remote?
2631 auto& linkage
= dn
->linkage
;
2632 item
.alternate_name
= dn
->get_alternate_name();
2633 if (linkage
.is_remote()) {
2634 item
.is_remote
= true;
2635 item
.ino
= linkage
.get_remote_ino();
2636 item
.d_type
= linkage
.get_remote_d_type();
2637 dout(14) << " dn '" << dn
->get_name() << "' remote ino " << item
.ino
<< dendl
;
2638 } else if (linkage
.is_primary()) {
2640 CInode
*in
= linkage
.get_inode();
2643 dout(14) << " dn '" << dn
->get_name() << "' inode " << *in
<< dendl
;
2645 if (in
->is_multiversion()) {
2646 if (!in
->snaprealm
) {
2648 in
->purge_stale_snap_data(*snaps
);
2650 in
->purge_stale_snap_data(in
->snaprealm
->get_snaps());
2654 if (in
->snaprealm
) {
2655 item
.snaprealm
= true;
2656 item
.srnode
= in
->snaprealm
->srnode
;
2658 item
.features
= mdcache
->mds
->mdsmap
->get_up_features();
2659 item
.inode
= in
->inode
;
2660 if (in
->inode
->is_symlink())
2661 item
.symlink
= in
->symlink
;
2663 encode(in
->dirfragtree
, bl
);
2664 item
.xattrs
= in
->xattrs
;
2665 item
.old_inodes
= in
->old_inodes
;
2666 item
.oldest_snap
= in
->oldest_snap
;
2667 item
.damage_flags
= in
->damage_flags
;
2669 ceph_assert(!linkage
.is_null());
2673 void CDir::_commit(version_t want
, int op_prio
)
2675 dout(10) << "_commit want " << want
<< " on " << *this << dendl
;
2677 // we can't commit things in the future.
2678 // (even the projected future.)
2679 ceph_assert(want
<= get_version() || get_version() == 0);
2681 // check pre+postconditions.
2682 ceph_assert(is_auth());
2684 // already committed?
2685 if (committed_version
>= want
) {
2686 dout(10) << "already committed " << committed_version
<< " >= " << want
<< dendl
;
2689 // already committing >= want?
2690 if (committing_version
>= want
) {
2691 dout(10) << "already committing " << committing_version
<< " >= " << want
<< dendl
;
2692 ceph_assert(state_test(STATE_COMMITTING
));
2696 // alrady committed an older version?
2697 if (committing_version
> committed_version
) {
2698 dout(10) << "already committing older " << committing_version
<< ", waiting for that to finish" << dendl
;
2703 committing_version
= get_version();
2705 // mark committing (if not already)
2706 if (!state_test(STATE_COMMITTING
)) {
2707 dout(10) << "marking committing" << dendl
;
2708 state_set(STATE_COMMITTING
);
2711 if (mdcache
->mds
->logger
) mdcache
->mds
->logger
->inc(l_mds_dir_commit
);
2713 mdcache
->mds
->balancer
->hit_dir(this, META_POP_STORE
);
2715 _omap_commit(op_prio
);
2722 * @param v version i just committed
2724 void CDir::_committed(int r
, version_t v
)
2727 // the directory could be partly purged during MDS failover
2728 if (r
== -CEPHFS_ENOENT
&& committed_version
== 0 &&
2729 !inode
->is_base() && get_parent_dir()->inode
->is_stray()) {
2731 if (inode
->snaprealm
)
2732 inode
->state_set(CInode::STATE_MISSINGOBJS
);
2735 dout(1) << "commit error " << r
<< " v " << v
<< dendl
;
2736 mdcache
->mds
->clog
->error() << "failed to commit dir " << dirfrag() << " object,"
2738 mdcache
->mds
->handle_write_error(r
);
2743 dout(10) << "_committed v " << v
<< " on " << *this << dendl
;
2744 ceph_assert(is_auth());
2746 bool stray
= inode
->is_stray();
2749 ceph_assert(v
> committed_version
);
2750 ceph_assert(v
<= committing_version
);
2751 committed_version
= v
;
2753 // _all_ commits done?
2754 if (committing_version
== committed_version
)
2755 state_clear(CDir::STATE_COMMITTING
);
2757 // _any_ commit, even if we've been redirtied, means we're no longer new.
2758 item_new
.remove_myself();
2761 if (committed_version
== get_version())
2767 for (auto p
= dirty_dentries
.begin(); !p
.end(); ) {
2772 if (dn
->linkage
.is_primary()) {
2773 CInode
*in
= dn
->linkage
.get_inode();
2775 ceph_assert(in
->is_auth());
2777 if (committed_version
>= in
->get_version()) {
2778 if (in
->is_dirty()) {
2779 dout(15) << " dir " << committed_version
<< " >= inode " << in
->get_version() << " now clean " << *in
<< dendl
;
2783 dout(15) << " dir " << committed_version
<< " < inode " << in
->get_version() << " still dirty " << *in
<< dendl
;
2784 ceph_assert(in
->is_dirty() || in
->last
< CEPH_NOSNAP
); // special case for cow snap items (not predirtied)
2789 if (committed_version
>= dn
->get_version()) {
2790 dout(15) << " dir " << committed_version
<< " >= dn " << dn
->get_version() << " now clean " << *dn
<< dendl
;
2793 // drop clean null stray dentries immediately
2795 dn
->get_num_ref() == 0 &&
2796 !dn
->is_projected() &&
2797 dn
->get_linkage()->is_null())
2800 dout(15) << " dir " << committed_version
<< " < dn " << dn
->get_version() << " still dirty " << *dn
<< dendl
;
2801 ceph_assert(dn
->is_dirty());
2804 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace()))
2805 mdcache
->mds
->heartbeat_reset();
2809 bool were_waiters
= !waiting_for_commit
.empty();
2811 auto it
= waiting_for_commit
.begin();
2812 while (it
!= waiting_for_commit
.end()) {
2815 if (it
->first
> committed_version
) {
2816 dout(10) << " there are waiters for " << it
->first
<< ", committing again" << dendl
;
2817 _commit(it
->first
, -1);
2821 for (const auto &waiter
: it
->second
)
2822 t
.push_back(waiter
);
2823 mdcache
->mds
->queue_waiters(t
);
2824 waiting_for_commit
.erase(it
);
2827 if (!(++count
% mdcache
->mds
->heartbeat_reset_grace()))
2828 mdcache
->mds
->heartbeat_reset();
2831 // try drop dentries in this dirfrag if it's about to be purged
2832 if (!inode
->is_base() && get_parent_dir()->inode
->is_stray() &&
2834 mdcache
->maybe_eval_stray(inode
, true);
2836 // unpin if we kicked the last waiter.
2838 waiting_for_commit
.empty())
2847 mds_rank_t
CDir::get_export_pin(bool inherit
) const
2849 mds_rank_t export_pin
= inode
->get_export_pin(inherit
);
2850 if (export_pin
== MDS_RANK_EPHEMERAL_DIST
)
2851 export_pin
= mdcache
->hash_into_rank_bucket(ino(), get_frag());
2852 else if (export_pin
== MDS_RANK_EPHEMERAL_RAND
)
2853 export_pin
= mdcache
->hash_into_rank_bucket(ino());
2857 bool CDir::is_exportable(mds_rank_t dest
) const
2859 mds_rank_t export_pin
= get_export_pin();
2860 if (export_pin
== dest
)
2862 if (export_pin
>= 0)
2867 void CDir::encode_export(bufferlist
& bl
)
2869 ENCODE_START(1, 1, bl
);
2870 ceph_assert(!is_projected());
2873 encode(dirty_old_rstat
, bl
);
2874 encode(committed_version
, bl
);
2877 encode(dir_rep
, bl
);
2880 encode(pop_auth_subtree
, bl
);
2882 encode(dir_rep_by
, bl
);
2883 encode(get_replicas(), bl
);
2885 get(PIN_TEMPEXPORTING
);
2889 void CDir::finish_export()
2891 state
&= MASK_STATE_EXPORT_KEPT
;
2892 pop_nested
.sub(pop_auth_subtree
);
2893 pop_auth_subtree_nested
.sub(pop_auth_subtree
);
2895 pop_auth_subtree
.zero();
2896 put(PIN_TEMPEXPORTING
);
2897 dirty_old_rstat
.clear();
2900 void CDir::decode_import(bufferlist::const_iterator
& blp
, LogSegment
*ls
)
2902 DECODE_START(1, blp
);
2905 auto _fnode
= allocate_fnode();
2906 decode(*_fnode
, blp
);
2907 reset_fnode(std::move(_fnode
));
2909 update_projected_version();
2911 decode(dirty_old_rstat
, blp
);
2912 decode(committed_version
, blp
);
2913 committing_version
= committed_version
;
2917 state
&= MASK_STATE_IMPORT_KEPT
;
2918 state_set(STATE_AUTH
| (s
& MASK_STATE_EXPORTED
));
2925 decode(dir_rep
, blp
);
2927 decode(pop_me
, blp
);
2928 decode(pop_auth_subtree
, blp
);
2929 pop_nested
.add(pop_auth_subtree
);
2930 pop_auth_subtree_nested
.add(pop_auth_subtree
);
2932 decode(dir_rep_by
, blp
);
2933 decode(get_replicas(), blp
);
2934 if (is_replicated()) get(PIN_REPLICATED
);
2936 replica_nonce
= 0; // no longer defined
2938 // did we import some dirty scatterlock data?
2939 if (dirty_old_rstat
.size() ||
2940 !(fnode
->rstat
== fnode
->accounted_rstat
)) {
2941 mdcache
->mds
->locker
->mark_updated_scatterlock(&inode
->nestlock
);
2942 ls
->dirty_dirfrag_nest
.push_back(&inode
->item_dirty_dirfrag_nest
);
2944 if (!(fnode
->fragstat
== fnode
->accounted_fragstat
)) {
2945 mdcache
->mds
->locker
->mark_updated_scatterlock(&inode
->filelock
);
2946 ls
->dirty_dirfrag_dir
.push_back(&inode
->item_dirty_dirfrag_dir
);
2948 if (is_dirty_dft()) {
2949 if (inode
->dirfragtreelock
.get_state() != LOCK_MIX
&&
2950 inode
->dirfragtreelock
.is_stable()) {
2951 // clear stale dirtydft
2952 state_clear(STATE_DIRTYDFT
);
2954 mdcache
->mds
->locker
->mark_updated_scatterlock(&inode
->dirfragtreelock
);
2955 ls
->dirty_dirfrag_dirfragtree
.push_back(&inode
->item_dirty_dirfrag_dirfragtree
);
2961 void CDir::abort_import()
2963 ceph_assert(is_auth());
2964 state_clear(CDir::STATE_AUTH
);
2966 clear_replica_map();
2967 set_replica_nonce(CDir::EXPORT_NONCE
);
2971 pop_nested
.sub(pop_auth_subtree
);
2972 pop_auth_subtree_nested
.sub(pop_auth_subtree
);
2974 pop_auth_subtree
.zero();
2977 void CDir::encode_dirstat(bufferlist
& bl
, const session_info_t
& info
, const DirStat
& ds
) {
2978 if (info
.has_feature(CEPHFS_FEATURE_REPLY_ENCODING
)) {
2979 ENCODE_START(1, 1, bl
);
2980 encode(ds
.frag
, bl
);
2981 encode(ds
.auth
, bl
);
2982 encode(ds
.dist
, bl
);
2986 encode(ds
.frag
, bl
);
2987 encode(ds
.auth
, bl
);
2988 encode(ds
.dist
, bl
);
2992 /********************************
2997 * if dir_auth.first == parent, auth is same as inode.
2998 * unless .second != unknown, in which case that sticks.
3000 mds_authority_t
CDir::authority() const
3002 if (is_subtree_root())
3005 return inode
->authority();
3008 /** is_subtree_root()
3009 * true if this is an auth delegation point.
3010 * that is, dir_auth != default (parent,unknown)
3012 * some key observations:
3014 * - any region bound will be an export, or frozen.
3016 * note that this DOES heed dir_auth.pending
3019 bool CDir::is_subtree_root()
3021 if (dir_auth == CDIR_AUTH_DEFAULT) {
3022 //dout(10) << "is_subtree_root false " << dir_auth << " != " << CDIR_AUTH_DEFAULT
3023 //<< " on " << ino() << dendl;
3026 //dout(10) << "is_subtree_root true " << dir_auth << " != " << CDIR_AUTH_DEFAULT
3027 //<< " on " << ino() << dendl;
3034 * true if we are x, or an ancestor of x
3036 bool CDir::contains(CDir
*x
)
3041 x
= x
->get_inode()->get_projected_parent_dir();
3047 bool CDir::can_rep() const
3052 unsigned mds_num
= mdcache
->mds
->get_mds_map()->get_num_mds(MDSMap::STATE_ACTIVE
);
3053 if ((mds_num
- 1) > get_replicas().size())
3062 void CDir::set_dir_auth(const mds_authority_t
&a
)
3064 dout(10) << "setting dir_auth=" << a
3065 << " from " << dir_auth
3066 << " on " << *this << dendl
;
3068 bool was_subtree
= is_subtree_root();
3069 bool was_ambiguous
= dir_auth
.second
>= 0;
3074 // new subtree root?
3075 if (!was_subtree
&& is_subtree_root()) {
3076 dout(10) << " new subtree root, adjusting auth_pins" << dendl
;
3078 if (freeze_tree_state
) {
3079 // only by CDir::_freeze_tree()
3080 ceph_assert(is_freezing_tree_root());
3083 inode
->num_subtree_roots
++;
3085 // unpin parent of frozen dir/tree?
3086 if (inode
->is_auth()) {
3087 ceph_assert(!is_frozen_tree_root());
3088 if (is_frozen_dir())
3089 inode
->auth_unpin(this);
3092 if (was_subtree
&& !is_subtree_root()) {
3093 dout(10) << " old subtree root, adjusting auth_pins" << dendl
;
3095 inode
->num_subtree_roots
--;
3097 // pin parent of frozen dir/tree?
3098 if (inode
->is_auth()) {
3099 ceph_assert(!is_frozen_tree_root());
3100 if (is_frozen_dir())
3101 inode
->auth_pin(this);
3105 // newly single auth?
3106 if (was_ambiguous
&& dir_auth
.second
== CDIR_AUTH_UNKNOWN
) {
3108 take_waiting(WAIT_SINGLEAUTH
, ls
);
3109 mdcache
->mds
->queue_waiters(ls
);
3113 /*****************************************
3114 * AUTH PINS and FREEZING
3116 * the basic plan is that auth_pins only exist in auth regions, and they
3117 * prevent a freeze (and subsequent auth change).
3119 * however, we also need to prevent a parent from freezing if a child is frozen.
3120 * for that reason, the parent inode of a frozen directory is auth_pinned.
3122 * the oddity is when the frozen directory is a subtree root. if that's the case,
3123 * the parent inode isn't frozen. which means that when subtree authority is adjusted
3124 * at the bounds, inodes for any frozen bound directories need to get auth_pins at that
3129 void CDir::auth_pin(void *by
)
3135 #ifdef MDS_AUTHPIN_SET
3136 auth_pin_set
.insert(by
);
3139 dout(10) << "auth_pin by " << by
<< " on " << *this << " count now " << auth_pins
<< dendl
;
3141 if (freeze_tree_state
)
3142 freeze_tree_state
->auth_pins
+= 1;
3145 void CDir::auth_unpin(void *by
)
3149 #ifdef MDS_AUTHPIN_SET
3151 auto it
= auth_pin_set
.find(by
);
3152 ceph_assert(it
!= auth_pin_set
.end());
3153 auth_pin_set
.erase(it
);
3159 dout(10) << "auth_unpin by " << by
<< " on " << *this << " count now " << auth_pins
<< dendl
;
3160 ceph_assert(auth_pins
>= 0);
3162 if (freeze_tree_state
)
3163 freeze_tree_state
->auth_pins
-= 1;
3165 maybe_finish_freeze(); // pending freeze?
3168 void CDir::adjust_nested_auth_pins(int dirinc
, void *by
)
3170 ceph_assert(dirinc
);
3171 dir_auth_pins
+= dirinc
;
3173 dout(15) << __func__
<< " " << dirinc
<< " on " << *this
3174 << " by " << by
<< " count now "
3175 << auth_pins
<< "/" << dir_auth_pins
<< dendl
;
3176 ceph_assert(dir_auth_pins
>= 0);
3178 if (freeze_tree_state
)
3179 freeze_tree_state
->auth_pins
+= dirinc
;
3182 maybe_finish_freeze(); // pending freeze?
3185 #ifdef MDS_VERIFY_FRAGSTAT
3186 void CDir::verify_fragstat()
3188 ceph_assert(is_complete());
3189 if (inode
->is_stray())
3193 memset(&c
, 0, sizeof(c
));
3195 for (auto it
= items
.begin();
3198 CDentry
*dn
= it
->second
;
3202 dout(10) << " " << *dn
<< dendl
;
3203 if (dn
->is_primary())
3204 dout(10) << " " << *dn
->inode
<< dendl
;
3206 if (dn
->is_primary()) {
3207 if (dn
->inode
->is_dir())
3212 if (dn
->is_remote()) {
3213 if (dn
->get_remote_d_type() == DT_DIR
)
3220 if (c
.nsubdirs
!= fnode
->fragstat
.nsubdirs
||
3221 c
.nfiles
!= fnode
->fragstat
.nfiles
) {
3222 dout(0) << "verify_fragstat failed " << fnode
->fragstat
<< " on " << *this << dendl
;
3223 dout(0) << " i count " << c
<< dendl
;
3226 dout(0) << "verify_fragstat ok " << fnode
->fragstat
<< " on " << *this << dendl
;
3231 /*****************************************************************************
3237 void CDir::_walk_tree(std::function
<bool(CDir
*)> callback
)
3240 dfq
.push_back(this);
3242 while (!dfq
.empty()) {
3243 CDir
*dir
= dfq
.front();
3246 for (auto& p
: *dir
) {
3247 CDentry
*dn
= p
.second
;
3248 if (!dn
->get_linkage()->is_primary())
3250 CInode
*in
= dn
->get_linkage()->get_inode();
3254 auto&& dfv
= in
->get_nested_dirfrags();
3255 for (auto& dir
: dfv
) {
3256 auto ret
= callback(dir
);
3264 bool CDir::freeze_tree()
3266 ceph_assert(!is_frozen());
3267 ceph_assert(!is_freezing());
3268 ceph_assert(!freeze_tree_state
);
3272 // Travese the subtree to mark dirfrags as 'freezing' (set freeze_tree_state)
3273 // and to accumulate auth pins and record total count in freeze_tree_state.
3274 // when auth unpin an 'freezing' object, the counter in freeze_tree_state also
3275 // gets decreased. Subtree become 'frozen' when the counter reaches zero.
3276 freeze_tree_state
= std::make_shared
<freeze_tree_state_t
>(this);
3277 freeze_tree_state
->auth_pins
+= get_auth_pins() + get_dir_auth_pins();
3278 if (!lock_caches_with_auth_pins
.empty())
3279 mdcache
->mds
->locker
->invalidate_lock_caches(this);
3281 _walk_tree([this](CDir
*dir
) {
3282 if (dir
->freeze_tree_state
)
3284 dir
->freeze_tree_state
= freeze_tree_state
;
3285 freeze_tree_state
->auth_pins
+= dir
->get_auth_pins() + dir
->get_dir_auth_pins();
3286 if (!dir
->lock_caches_with_auth_pins
.empty())
3287 mdcache
->mds
->locker
->invalidate_lock_caches(dir
);
3292 if (is_freezeable(true)) {
3297 state_set(STATE_FREEZINGTREE
);
3298 ++num_freezing_trees
;
3299 dout(10) << "freeze_tree waiting " << *this << dendl
;
3304 void CDir::_freeze_tree()
3306 dout(10) << __func__
<< " " << *this << dendl
;
3307 ceph_assert(is_freezeable(true));
3309 if (freeze_tree_state
) {
3310 ceph_assert(is_auth());
3312 ceph_assert(!is_auth());
3313 freeze_tree_state
= std::make_shared
<freeze_tree_state_t
>(this);
3315 freeze_tree_state
->frozen
= true;
3318 mds_authority_t auth
;
3319 bool was_subtree
= is_subtree_root();
3321 auth
= get_dir_auth();
3323 // temporarily prevent parent subtree from becoming frozen.
3324 inode
->auth_pin(this);
3325 // create new subtree
3329 _walk_tree([this, &auth
] (CDir
*dir
) {
3330 if (dir
->freeze_tree_state
!= freeze_tree_state
) {
3331 mdcache
->adjust_subtree_auth(dir
, auth
);
3338 ceph_assert(auth
.first
>= 0);
3339 ceph_assert(auth
.second
== CDIR_AUTH_UNKNOWN
);
3340 auth
.second
= auth
.first
;
3341 mdcache
->adjust_subtree_auth(this, auth
);
3343 inode
->auth_unpin(this);
3345 // importing subtree ?
3346 _walk_tree([this] (CDir
*dir
) {
3347 ceph_assert(!dir
->freeze_tree_state
);
3348 dir
->freeze_tree_state
= freeze_tree_state
;
3355 if (state_test(STATE_FREEZINGTREE
)) {
3356 state_clear(STATE_FREEZINGTREE
);
3357 --num_freezing_trees
;
3360 state_set(STATE_FROZENTREE
);
3365 void CDir::unfreeze_tree()
3367 dout(10) << __func__
<< " " << *this << dendl
;
3369 MDSContext::vec unfreeze_waiters
;
3370 take_waiting(WAIT_UNFREEZE
, unfreeze_waiters
);
3372 if (freeze_tree_state
) {
3373 _walk_tree([this, &unfreeze_waiters
](CDir
*dir
) {
3374 if (dir
->freeze_tree_state
!= freeze_tree_state
)
3376 dir
->freeze_tree_state
.reset();
3377 dir
->take_waiting(WAIT_UNFREEZE
, unfreeze_waiters
);
3383 if (state_test(STATE_FROZENTREE
)) {
3384 // frozen. unfreeze.
3385 state_clear(STATE_FROZENTREE
);
3392 ceph_assert(is_subtree_root());
3393 // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
3394 mds_authority_t auth
= get_dir_auth();
3395 ceph_assert(auth
.first
>= 0);
3396 ceph_assert(auth
.second
== auth
.first
);
3397 auth
.second
= CDIR_AUTH_UNKNOWN
;
3398 mdcache
->adjust_subtree_auth(this, auth
);
3400 freeze_tree_state
.reset();
3402 ceph_assert(state_test(STATE_FREEZINGTREE
));
3404 // freezing. stop it.
3405 state_clear(STATE_FREEZINGTREE
);
3406 --num_freezing_trees
;
3407 freeze_tree_state
.reset();
3409 finish_waiting(WAIT_FROZEN
, -1);
3413 mdcache
->mds
->queue_waiters(unfreeze_waiters
);
3416 void CDir::adjust_freeze_after_rename(CDir
*dir
)
3418 if (!freeze_tree_state
|| dir
->freeze_tree_state
!= freeze_tree_state
)
3420 CDir
*newdir
= dir
->get_inode()->get_parent_dir();
3421 if (newdir
== this || newdir
->freeze_tree_state
== freeze_tree_state
)
3424 ceph_assert(!freeze_tree_state
->frozen
);
3425 ceph_assert(get_dir_auth_pins() > 0);
3427 MDSContext::vec unfreeze_waiters
;
3429 auto unfreeze
= [this, &unfreeze_waiters
](CDir
*dir
) {
3430 if (dir
->freeze_tree_state
!= freeze_tree_state
)
3432 int dec
= dir
->get_auth_pins() + dir
->get_dir_auth_pins();
3433 // shouldn't become zero because srcdn of rename was auth pinned
3434 ceph_assert(freeze_tree_state
->auth_pins
> dec
);
3435 freeze_tree_state
->auth_pins
-= dec
;
3436 dir
->freeze_tree_state
.reset();
3437 dir
->take_waiting(WAIT_UNFREEZE
, unfreeze_waiters
);
3442 dir
->_walk_tree(unfreeze
);
3444 mdcache
->mds
->queue_waiters(unfreeze_waiters
);
3447 bool CDir::can_auth_pin(int *err_ret
) const
3452 } else if (is_freezing_dir() || is_frozen_dir()) {
3453 err
= ERR_FRAGMENTING_DIR
;
3455 auto p
= is_freezing_or_frozen_tree();
3456 if (p
.first
|| p
.second
) {
3457 err
= ERR_EXPORTING_TREE
;
3467 class C_Dir_AuthUnpin
: public CDirContext
{
3469 explicit C_Dir_AuthUnpin(CDir
*d
) : CDirContext(d
) {}
3470 void finish(int r
) override
{
3471 dir
->auth_unpin(dir
->get_inode());
3475 void CDir::maybe_finish_freeze()
3477 if (dir_auth_pins
!= 0)
3480 // we can freeze the _dir_ even with nested pins...
3481 if (state_test(STATE_FREEZINGDIR
)) {
3482 if (auth_pins
== 1) {
3485 finish_waiting(WAIT_FROZEN
);
3489 if (freeze_tree_state
) {
3490 if (freeze_tree_state
->frozen
||
3491 freeze_tree_state
->auth_pins
!= 1)
3494 if (freeze_tree_state
->dir
!= this) {
3495 freeze_tree_state
->dir
->maybe_finish_freeze();
3499 ceph_assert(state_test(STATE_FREEZINGTREE
));
3501 if (!is_subtree_root() && inode
->is_frozen()) {
3502 dout(10) << __func__
<< " !subtree root and frozen inode, waiting for unfreeze on " << inode
<< dendl
;
3503 // retake an auth_pin...
3505 // and release it when the parent inode unfreezes
3506 inode
->add_waiter(WAIT_UNFREEZE
, new C_Dir_AuthUnpin(this));
3512 finish_waiting(WAIT_FROZEN
);
3520 bool CDir::freeze_dir()
3522 ceph_assert(!is_frozen());
3523 ceph_assert(!is_freezing());
3526 if (is_freezeable_dir(true)) {
3531 state_set(STATE_FREEZINGDIR
);
3532 if (!lock_caches_with_auth_pins
.empty())
3533 mdcache
->mds
->locker
->invalidate_lock_caches(this);
3534 dout(10) << "freeze_dir + wait " << *this << dendl
;
3539 void CDir::_freeze_dir()
3541 dout(10) << __func__
<< " " << *this << dendl
;
3542 //assert(is_freezeable_dir(true));
3543 // not always true during split because the original fragment may have frozen a while
3544 // ago and we're just now getting around to breaking it up.
3546 state_clear(STATE_FREEZINGDIR
);
3547 state_set(STATE_FROZENDIR
);
3550 if (is_auth() && !is_subtree_root())
3551 inode
->auth_pin(this); // auth_pin for duration of freeze
3555 void CDir::unfreeze_dir()
3557 dout(10) << __func__
<< " " << *this << dendl
;
3559 if (state_test(STATE_FROZENDIR
)) {
3560 state_clear(STATE_FROZENDIR
);
3563 // unpin (may => FREEZEABLE) FIXME: is this order good?
3564 if (is_auth() && !is_subtree_root())
3565 inode
->auth_unpin(this);
3567 finish_waiting(WAIT_UNFREEZE
);
3569 finish_waiting(WAIT_FROZEN
, -1);
3571 // still freezing. stop.
3572 ceph_assert(state_test(STATE_FREEZINGDIR
));
3573 state_clear(STATE_FREEZINGDIR
);
3576 finish_waiting(WAIT_UNFREEZE
);
3580 void CDir::enable_frozen_inode()
3582 ceph_assert(frozen_inode_suppressed
> 0);
3583 if (--frozen_inode_suppressed
== 0) {
3584 for (auto p
= freezing_inodes
.begin(); !p
.end(); ) {
3587 ceph_assert(in
->is_freezing_inode());
3588 in
->maybe_finish_freeze_inode();
3594 * Slightly less complete than operator<<, because this is intended
3595 * for identifying a directory and its state rather than for dumping
3598 void CDir::dump(Formatter
*f
, int flags
) const
3600 ceph_assert(f
!= NULL
);
3601 if (flags
& DUMP_PATH
) {
3602 f
->dump_stream("path") << get_path();
3604 if (flags
& DUMP_DIRFRAG
) {
3605 f
->dump_stream("dirfrag") << dirfrag();
3607 if (flags
& DUMP_SNAPID_FIRST
) {
3608 f
->dump_int("snapid_first", first
);
3610 if (flags
& DUMP_VERSIONS
) {
3611 f
->dump_stream("projected_version") << get_projected_version();
3612 f
->dump_stream("version") << get_version();
3613 f
->dump_stream("committing_version") << get_committing_version();
3614 f
->dump_stream("committed_version") << get_committed_version();
3616 if (flags
& DUMP_REP
) {
3617 f
->dump_bool("is_rep", is_rep());
3619 if (flags
& DUMP_DIR_AUTH
) {
3620 if (get_dir_auth() != CDIR_AUTH_DEFAULT
) {
3621 if (get_dir_auth().second
== CDIR_AUTH_UNKNOWN
) {
3622 f
->dump_stream("dir_auth") << get_dir_auth().first
;
3624 f
->dump_stream("dir_auth") << get_dir_auth();
3627 f
->dump_string("dir_auth", "");
3630 if (flags
& DUMP_STATES
) {
3631 f
->open_array_section("states");
3632 MDSCacheObject::dump_states(f
);
3633 if (state_test(CDir::STATE_COMPLETE
)) f
->dump_string("state", "complete");
3634 if (state_test(CDir::STATE_FREEZINGTREE
)) f
->dump_string("state", "freezingtree");
3635 if (state_test(CDir::STATE_FROZENTREE
)) f
->dump_string("state", "frozentree");
3636 if (state_test(CDir::STATE_FROZENDIR
)) f
->dump_string("state", "frozendir");
3637 if (state_test(CDir::STATE_FREEZINGDIR
)) f
->dump_string("state", "freezingdir");
3638 if (state_test(CDir::STATE_EXPORTBOUND
)) f
->dump_string("state", "exportbound");
3639 if (state_test(CDir::STATE_IMPORTBOUND
)) f
->dump_string("state", "importbound");
3640 if (state_test(CDir::STATE_BADFRAG
)) f
->dump_string("state", "badfrag");
3643 if (flags
& DUMP_MDS_CACHE_OBJECT
) {
3644 MDSCacheObject::dump(f
);
3646 if (flags
& DUMP_ITEMS
) {
3647 f
->open_array_section("dentries");
3648 for (auto &p
: items
) {
3649 CDentry
*dn
= p
.second
;
3650 f
->open_object_section("dentry");
3658 void CDir::dump_load(Formatter
*f
)
3660 f
->dump_stream("path") << get_path();
3661 f
->dump_stream("dirfrag") << dirfrag();
3663 f
->open_object_section("pop_me");
3667 f
->open_object_section("pop_nested");
3671 f
->open_object_section("pop_auth_subtree");
3672 pop_auth_subtree
.dump(f
);
3675 f
->open_object_section("pop_auth_subtree_nested");
3676 pop_auth_subtree_nested
.dump(f
);
3680 /****** Scrub Stuff *******/
3682 void CDir::scrub_info_create() const
3684 ceph_assert(!scrub_infop
);
3686 // break out of const-land to set up implicit initial state
3687 CDir
*me
= const_cast<CDir
*>(this);
3688 const auto& pf
= me
->get_projected_fnode();
3690 std::unique_ptr
<scrub_info_t
> si(new scrub_info_t());
3692 si
->last_recursive
.version
= pf
->recursive_scrub_version
;
3693 si
->last_recursive
.time
= pf
->recursive_scrub_stamp
;
3695 si
->last_local
.version
= pf
->localized_scrub_version
;
3696 si
->last_local
.time
= pf
->localized_scrub_stamp
;
3698 me
->scrub_infop
.swap(si
);
3701 void CDir::scrub_initialize(const ScrubHeaderRef
& header
)
3703 ceph_assert(header
);
3704 // FIXME: weird implicit construction, is someone else meant
3705 // to be calling scrub_info_create first?
3707 scrub_infop
->directory_scrubbing
= true;
3708 scrub_infop
->header
= header
;
3709 header
->inc_num_pending();
3712 void CDir::scrub_aborted() {
3713 dout(20) << __func__
<< dendl
;
3714 ceph_assert(scrub_is_in_progress());
3716 scrub_infop
->last_scrub_dirty
= false;
3717 scrub_infop
->directory_scrubbing
= false;
3718 scrub_infop
->header
->dec_num_pending();
3719 scrub_infop
.reset();
3722 void CDir::scrub_finished()
3724 dout(20) << __func__
<< dendl
;
3725 ceph_assert(scrub_is_in_progress());
3727 scrub_infop
->last_local
.time
= ceph_clock_now();
3728 scrub_infop
->last_local
.version
= get_version();
3729 if (scrub_infop
->header
->get_recursive())
3730 scrub_infop
->last_recursive
= scrub_infop
->last_local
;
3732 scrub_infop
->last_scrub_dirty
= true;
3734 scrub_infop
->directory_scrubbing
= false;
3735 scrub_infop
->header
->dec_num_pending();
3738 void CDir::scrub_maybe_delete_info()
3741 !scrub_infop
->directory_scrubbing
&&
3742 !scrub_infop
->last_scrub_dirty
)
3743 scrub_infop
.reset();
3746 bool CDir::scrub_local()
3748 ceph_assert(is_complete());
3749 bool good
= check_rstats(true);
3750 if (!good
&& scrub_infop
->header
->get_repair()) {
3751 mdcache
->repair_dirfrag_stats(this);
3752 scrub_infop
->header
->set_repaired();
3758 std::string
CDir::get_path() const
3761 get_inode()->make_path_string(path
, true);
3765 bool CDir::should_split_fast() const
3767 // Max size a fragment can be before trigger fast splitting
3768 int fast_limit
= g_conf()->mds_bal_split_size
* g_conf()->mds_bal_fragment_fast_factor
;
3770 // Fast path: the sum of accounted size and null dentries does not
3771 // exceed threshold: we definitely are not over it.
3772 if (get_frag_size() + get_num_head_null() <= fast_limit
) {
3776 // Fast path: the accounted size of the frag exceeds threshold: we
3777 // definitely are over it
3778 if (get_frag_size() > fast_limit
) {
3782 int64_t effective_size
= 0;
3784 for (const auto &p
: items
) {
3785 const CDentry
*dn
= p
.second
;
3786 if (!dn
->get_projected_linkage()->is_null()) {
3791 return effective_size
> fast_limit
;
3794 bool CDir::should_merge() const
3796 if (get_frag() == frag_t())
3799 if (inode
->is_ephemeral_dist()) {
3800 unsigned min_frag_bits
= mdcache
->get_ephemeral_dist_frag_bits();
3801 if (min_frag_bits
> 0 && get_frag().bits() < min_frag_bits
+ 1)
3805 return ((int)get_frag_size() + (int)get_num_snap_items()) < g_conf()->mds_bal_merge_size
;
3808 MEMPOOL_DEFINE_OBJECT_FACTORY(CDir
, co_dir
, mds_co
);
3809 MEMPOOL_DEFINE_OBJECT_FACTORY(CDir::scrub_info_t
, scrub_info_t
, mds_co
)