1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/int_types.h"
16 #include "common/errno.h"
31 #include "events/EUpdate.h"
33 #include "osdc/Objecter.h"
37 #include "LogSegment.h"
39 #include "common/Clock.h"
41 #include "messages/MLock.h"
42 #include "messages/MClientCaps.h"
44 #include "common/config.h"
45 #include "global/global_context.h"
46 #include "include/assert.h"
48 #include "mds/MDSContinuation.h"
49 #include "mds/InoTable.h"
51 #define dout_context g_ceph_context
52 #define dout_subsys ceph_subsys_mds
54 #define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << inode.ino << ") "
57 class CInodeIOContext
: public MDSIOContextBase
61 MDSRank
*get_mds() override
{return in
->mdcache
->mds
;}
63 explicit CInodeIOContext(CInode
*in_
) : in(in_
) {
69 LockType
CInode::versionlock_type(CEPH_LOCK_IVERSION
);
70 LockType
CInode::authlock_type(CEPH_LOCK_IAUTH
);
71 LockType
CInode::linklock_type(CEPH_LOCK_ILINK
);
72 LockType
CInode::dirfragtreelock_type(CEPH_LOCK_IDFT
);
73 LockType
CInode::filelock_type(CEPH_LOCK_IFILE
);
74 LockType
CInode::xattrlock_type(CEPH_LOCK_IXATTR
);
75 LockType
CInode::snaplock_type(CEPH_LOCK_ISNAP
);
76 LockType
CInode::nestlock_type(CEPH_LOCK_INEST
);
77 LockType
CInode::flocklock_type(CEPH_LOCK_IFLOCK
);
78 LockType
CInode::policylock_type(CEPH_LOCK_IPOLICY
);
80 //int cinode_pins[CINODE_NUM_PINS]; // counts
81 ostream
& CInode::print_db_line_prefix(ostream
& out
)
83 return out
<< ceph_clock_now() << " mds." << mdcache
->mds
->get_nodeid() << ".cache.ino(" << inode
.ino
<< ") ";
87 * write caps and lock ids
89 struct cinode_lock_info_t cinode_lock_info
[] = {
90 { CEPH_LOCK_IFILE
, CEPH_CAP_ANY_FILE_WR
},
91 { CEPH_LOCK_IAUTH
, CEPH_CAP_AUTH_EXCL
},
92 { CEPH_LOCK_ILINK
, CEPH_CAP_LINK_EXCL
},
93 { CEPH_LOCK_IXATTR
, CEPH_CAP_XATTR_EXCL
},
95 int num_cinode_locks
= sizeof(cinode_lock_info
) / sizeof(cinode_lock_info
[0]);
99 ostream
& operator<<(ostream
& out
, const CInode
& in
)
102 in
.make_path_string(path
, true);
104 out
<< "[inode " << in
.inode
.ino
;
106 << (in
.is_multiversion() ? "...":"")
107 << in
.first
<< "," << in
.last
<< "]";
108 out
<< " " << path
<< (in
.is_dir() ? "/":"");
112 if (in
.is_replicated())
113 out
<< in
.get_replicas();
115 mds_authority_t a
= in
.authority();
116 out
<< " rep@" << a
.first
;
117 if (a
.second
!= CDIR_AUTH_UNKNOWN
)
118 out
<< "," << a
.second
;
119 out
<< "." << in
.get_replica_nonce();
123 out
<< " symlink='" << in
.symlink
<< "'";
124 if (in
.is_dir() && !in
.dirfragtree
.empty())
125 out
<< " " << in
.dirfragtree
;
127 out
<< " v" << in
.get_version();
128 if (in
.get_projected_version() > in
.get_version())
129 out
<< " pv" << in
.get_projected_version();
131 if (in
.is_auth_pinned()) {
132 out
<< " ap=" << in
.get_num_auth_pins() << "+" << in
.get_num_nested_auth_pins();
133 #ifdef MDS_AUTHPIN_SET
134 out
<< "(" << in
.auth_pin_set
<< ")";
139 out
<< " snaprealm=" << in
.snaprealm
;
141 if (in
.state_test(CInode::STATE_AMBIGUOUSAUTH
)) out
<< " AMBIGAUTH";
142 if (in
.state_test(CInode::STATE_NEEDSRECOVER
)) out
<< " needsrecover";
143 if (in
.state_test(CInode::STATE_RECOVERING
)) out
<< " recovering";
144 if (in
.state_test(CInode::STATE_DIRTYPARENT
)) out
<< " dirtyparent";
145 if (in
.state_test(CInode::STATE_MISSINGOBJS
)) out
<< " missingobjs";
146 if (in
.is_freezing_inode()) out
<< " FREEZING=" << in
.auth_pin_freeze_allowance
;
147 if (in
.is_frozen_inode()) out
<< " FROZEN";
148 if (in
.is_frozen_auth_pin()) out
<< " FROZEN_AUTHPIN";
150 const CInode::mempool_inode
*pi
= in
.get_projected_inode();
151 if (pi
->is_truncating())
152 out
<< " truncating(" << pi
->truncate_from
<< " to " << pi
->truncate_size
<< ")";
154 if (in
.inode
.is_dir()) {
155 out
<< " " << in
.inode
.dirstat
;
156 if (g_conf
->mds_debug_scatterstat
&& in
.is_projected()) {
157 const CInode::mempool_inode
*pi
= in
.get_projected_inode();
158 out
<< "->" << pi
->dirstat
;
161 out
<< " s=" << in
.inode
.size
;
162 if (in
.inode
.nlink
!= 1)
163 out
<< " nl=" << in
.inode
.nlink
;
167 out
<< " " << in
.inode
.rstat
;
168 if (!(in
.inode
.rstat
== in
.inode
.accounted_rstat
))
169 out
<< "/" << in
.inode
.accounted_rstat
;
170 if (g_conf
->mds_debug_scatterstat
&& in
.is_projected()) {
171 const CInode::mempool_inode
*pi
= in
.get_projected_inode();
172 out
<< "->" << pi
->rstat
;
173 if (!(pi
->rstat
== pi
->accounted_rstat
))
174 out
<< "/" << pi
->accounted_rstat
;
177 if (!in
.client_need_snapflush
.empty())
178 out
<< " need_snapflush=" << in
.client_need_snapflush
;
182 if (!in
.authlock
.is_sync_and_unlocked())
183 out
<< " " << in
.authlock
;
184 if (!in
.linklock
.is_sync_and_unlocked())
185 out
<< " " << in
.linklock
;
186 if (in
.inode
.is_dir()) {
187 if (!in
.dirfragtreelock
.is_sync_and_unlocked())
188 out
<< " " << in
.dirfragtreelock
;
189 if (!in
.snaplock
.is_sync_and_unlocked())
190 out
<< " " << in
.snaplock
;
191 if (!in
.nestlock
.is_sync_and_unlocked())
192 out
<< " " << in
.nestlock
;
193 if (!in
.policylock
.is_sync_and_unlocked())
194 out
<< " " << in
.policylock
;
196 if (!in
.flocklock
.is_sync_and_unlocked())
197 out
<< " " << in
.flocklock
;
199 if (!in
.filelock
.is_sync_and_unlocked())
200 out
<< " " << in
.filelock
;
201 if (!in
.xattrlock
.is_sync_and_unlocked())
202 out
<< " " << in
.xattrlock
;
203 if (!in
.versionlock
.is_sync_and_unlocked())
204 out
<< " " << in
.versionlock
;
206 // hack: spit out crap on which clients have caps
207 if (in
.inode
.client_ranges
.size())
208 out
<< " cr=" << in
.inode
.client_ranges
;
210 if (!in
.get_client_caps().empty()) {
212 for (map
<client_t
,Capability
*>::const_iterator it
= in
.get_client_caps().begin();
213 it
!= in
.get_client_caps().end();
215 if (it
!= in
.get_client_caps().begin()) out
<< ",";
216 out
<< it
->first
<< "="
217 << ccap_string(it
->second
->pending());
218 if (it
->second
->issued() != it
->second
->pending())
219 out
<< "/" << ccap_string(it
->second
->issued());
220 out
<< "/" << ccap_string(it
->second
->wanted())
221 << "@" << it
->second
->get_last_sent();
224 if (in
.get_loner() >= 0 || in
.get_wanted_loner() >= 0) {
225 out
<< ",l=" << in
.get_loner();
226 if (in
.get_loner() != in
.get_wanted_loner())
227 out
<< "(" << in
.get_wanted_loner() << ")";
230 if (!in
.get_mds_caps_wanted().empty()) {
233 for (const auto &p
: in
.get_mds_caps_wanted()) {
236 out
<< p
.first
<< '=' << ccap_string(p
.second
);
242 if (in
.get_num_ref()) {
244 in
.print_pin_set(out
);
247 if (in
.inode
.export_pin
!= MDS_RANK_NONE
) {
248 out
<< " export_pin=" << in
.inode
.export_pin
;
256 ostream
& operator<<(ostream
& out
, const CInode::scrub_stamp_info_t
& si
)
258 out
<< "{scrub_start_version: " << si
.scrub_start_version
259 << ", scrub_start_stamp: " << si
.scrub_start_stamp
260 << ", last_scrub_version: " << si
.last_scrub_version
261 << ", last_scrub_stamp: " << si
.last_scrub_stamp
;
267 void CInode::print(ostream
& out
)
274 void CInode::add_need_snapflush(CInode
*snapin
, snapid_t snapid
, client_t client
)
276 dout(10) << "add_need_snapflush client." << client
<< " snapid " << snapid
<< " on " << snapin
<< dendl
;
278 if (client_need_snapflush
.empty()) {
279 get(CInode::PIN_NEEDSNAPFLUSH
);
281 // FIXME: this is non-optimal, as we'll block freezes/migrations for potentially
282 // long periods waiting for clients to flush their snaps.
283 auth_pin(this); // pin head inode...
286 auto &clients
= client_need_snapflush
[snapid
];
288 snapin
->auth_pin(this); // ...and pin snapped/old inode!
290 clients
.insert(client
);
293 void CInode::remove_need_snapflush(CInode
*snapin
, snapid_t snapid
, client_t client
)
295 dout(10) << __func__
<< " client." << client
<< " snapid " << snapid
<< " on " << snapin
<< dendl
;
296 auto it
= client_need_snapflush
.find(snapid
);
297 if (it
== client_need_snapflush
.end()) {
298 dout(10) << " snapid not found" << dendl
;
301 size_t n
= it
->second
.erase(client
);
303 dout(10) << " client not found" << dendl
;
306 if (it
->second
.empty()) {
307 client_need_snapflush
.erase(it
);
308 snapin
->auth_unpin(this);
310 if (client_need_snapflush
.empty()) {
311 put(CInode::PIN_NEEDSNAPFLUSH
);
317 bool CInode::split_need_snapflush(CInode
*cowin
, CInode
*in
)
319 dout(10) << "split_need_snapflush [" << cowin
->first
<< "," << cowin
->last
<< "] for " << *cowin
<< dendl
;
320 bool need_flush
= false;
321 for (auto it
= client_need_snapflush
.lower_bound(cowin
->first
);
322 it
!= client_need_snapflush
.end() && it
->first
< in
->first
; ) {
323 assert(!it
->second
.empty());
324 if (cowin
->last
>= it
->first
) {
325 cowin
->auth_pin(this);
329 it
= client_need_snapflush
.erase(it
);
331 in
->auth_unpin(this);
336 void CInode::mark_dirty_rstat()
338 if (!state_test(STATE_DIRTYRSTAT
)) {
339 dout(10) << "mark_dirty_rstat" << dendl
;
340 state_set(STATE_DIRTYRSTAT
);
342 CDentry
*pdn
= get_projected_parent_dn();
343 if (pdn
->is_auth()) {
344 CDir
*pdir
= pdn
->dir
;
345 pdir
->dirty_rstat_inodes
.push_back(&dirty_rstat_item
);
346 mdcache
->mds
->locker
->mark_updated_scatterlock(&pdir
->inode
->nestlock
);
348 // under cross-MDS rename.
349 // DIRTYRSTAT flag will get cleared when rename finishes
350 assert(state_test(STATE_AMBIGUOUSAUTH
));
354 void CInode::clear_dirty_rstat()
356 if (state_test(STATE_DIRTYRSTAT
)) {
357 dout(10) << "clear_dirty_rstat" << dendl
;
358 state_clear(STATE_DIRTYRSTAT
);
360 dirty_rstat_item
.remove_myself();
364 /* Ideally this function would be subsumed by project_inode but it is also
365 * needed by CInode::project_past_snaprealm_parent so we keep it.
367 sr_t
&CInode::project_snaprealm(projected_inode
&pi
)
369 const sr_t
*cur_srnode
= get_projected_srnode();
371 assert(!pi
.snapnode
);
373 pi
.snapnode
.reset(new sr_t(*cur_srnode
));
375 pi
.snapnode
.reset(new sr_t());
376 pi
.snapnode
->created
= 0;
377 pi
.snapnode
->current_parent_since
= get_oldest_snap();
379 ++num_projected_srnodes
;
381 dout(10) << __func__
<< " " << pi
.snapnode
.get() << dendl
;
382 return *pi
.snapnode
.get();
385 CInode::projected_inode
&CInode::project_inode(bool xattr
, bool snap
)
387 if (projected_nodes
.empty()) {
388 projected_nodes
.emplace_back(inode
);
390 projected_nodes
.emplace_back(projected_nodes
.back().inode
);
392 auto &pi
= projected_nodes
.back();
394 if (scrub_infop
&& scrub_infop
->last_scrub_dirty
) {
395 pi
.inode
.last_scrub_stamp
= scrub_infop
->last_scrub_stamp
;
396 pi
.inode
.last_scrub_version
= scrub_infop
->last_scrub_version
;
397 scrub_infop
->last_scrub_dirty
= false;
398 scrub_maybe_delete_info();
402 pi
.xattrs
.reset(new mempool_xattr_map(*get_projected_xattrs()));
403 ++num_projected_xattrs
;
407 project_snaprealm(pi
);
410 dout(15) << __func__
<< " " << pi
.inode
.ino
<< dendl
;
414 void CInode::pop_and_dirty_projected_inode(LogSegment
*ls
)
416 assert(!projected_nodes
.empty());
417 auto &front
= projected_nodes
.front();
418 dout(15) << __func__
<< " " << front
.inode
.ino
419 << " v" << front
.inode
.version
<< dendl
;
420 int64_t old_pool
= inode
.layout
.pool_id
;
422 mark_dirty(front
.inode
.version
, ls
);
425 if (inode
.is_backtrace_updated())
426 mark_dirty_parent(ls
, old_pool
!= inode
.layout
.pool_id
);
429 --num_projected_xattrs
;
430 xattrs
= *front
.xattrs
;
433 auto &snapnode
= front
.snapnode
;
435 pop_projected_snaprealm(snapnode
.get());
436 --num_projected_srnodes
;
439 projected_nodes
.pop_front();
442 /* if newparent != parent, add parent to past_parents
443 if parent DNE, we need to find what the parent actually is and fill that in */
444 void CInode::project_past_snaprealm_parent(SnapRealm
*newparent
)
446 assert(!projected_nodes
.empty());
447 sr_t
&new_snap
= project_snaprealm(projected_nodes
.back());
448 SnapRealm
*oldparent
;
450 oldparent
= find_snaprealm();
451 new_snap
.seq
= oldparent
->get_newest_seq();
454 oldparent
= snaprealm
->parent
;
456 if (newparent
!= oldparent
) {
457 snapid_t oldparentseq
= oldparent
->get_newest_seq();
458 if (oldparentseq
+ 1 > new_snap
.current_parent_since
) {
459 new_snap
.past_parents
[oldparentseq
].ino
= oldparent
->inode
->ino();
460 new_snap
.past_parents
[oldparentseq
].first
= new_snap
.current_parent_since
;
462 new_snap
.current_parent_since
= std::max(oldparentseq
, newparent
->get_last_created()) + 1;
466 void CInode::pop_projected_snaprealm(sr_t
*next_snaprealm
)
468 assert(next_snaprealm
);
469 dout(10) << "pop_projected_snaprealm " << next_snaprealm
470 << " seq" << next_snaprealm
->seq
<< dendl
;
471 bool invalidate_cached_snaps
= false;
474 } else if (next_snaprealm
->past_parents
.size() !=
475 snaprealm
->srnode
.past_parents
.size()) {
476 invalidate_cached_snaps
= true;
477 // re-open past parents
478 snaprealm
->_close_parents();
480 dout(10) << " realm " << *snaprealm
<< " past_parents " << snaprealm
->srnode
.past_parents
481 << " -> " << next_snaprealm
->past_parents
<< dendl
;
483 snaprealm
->srnode
= *next_snaprealm
;
485 // we should be able to open these up (or have them already be open).
486 bool ok
= snaprealm
->_open_parents(NULL
);
489 if (invalidate_cached_snaps
)
490 snaprealm
->invalidate_cached_snaps();
492 if (snaprealm
->parent
)
493 dout(10) << " realm " << *snaprealm
<< " parent " << *snaprealm
->parent
<< dendl
;
497 // ====== CInode =======
501 __u32
InodeStoreBase::hash_dentry_name(boost::string_view dn
)
503 int which
= inode
.dir_layout
.dl_dir_hash
;
505 which
= CEPH_STR_HASH_LINUX
;
506 assert(ceph_str_hash_valid(which
));
507 return ceph_str_hash(which
, dn
.data(), dn
.length());
510 frag_t
InodeStoreBase::pick_dirfrag(boost::string_view dn
)
512 if (dirfragtree
.empty())
513 return frag_t(); // avoid the string hash if we can.
515 __u32 h
= hash_dentry_name(dn
);
516 return dirfragtree
[h
];
519 bool CInode::get_dirfrags_under(frag_t fg
, list
<CDir
*>& ls
)
522 std::list
<frag_t
> fglist
;
523 dirfragtree
.get_leaves_under(fg
, fglist
);
524 for (list
<frag_t
>::iterator p
= fglist
.begin(); p
!= fglist
.end(); ++p
)
525 if (dirfrags
.count(*p
))
526 ls
.push_back(dirfrags
[*p
]);
534 tmpdft
.force_to_leaf(g_ceph_context
, fg
);
535 for (auto &p
: dirfrags
) {
536 tmpdft
.force_to_leaf(g_ceph_context
, p
.first
);
537 if (fg
.contains(p
.first
) && !dirfragtree
.is_leaf(p
.first
))
538 ls
.push_back(p
.second
);
542 tmpdft
.get_leaves_under(fg
, fglist
);
543 for (const auto &p
: fglist
) {
544 if (!dirfrags
.count(p
)) {
553 void CInode::verify_dirfrags()
556 for (const auto &p
: dirfrags
) {
557 if (!dirfragtree
.is_leaf(p
.first
)) {
558 dout(0) << "have open dirfrag " << p
.first
<< " but not leaf in " << dirfragtree
559 << ": " << *p
.second
<< dendl
;
566 void CInode::force_dirfrags()
569 for (auto &p
: dirfrags
) {
570 if (!dirfragtree
.is_leaf(p
.first
)) {
571 dout(0) << "have open dirfrag " << p
.first
<< " but not leaf in " << dirfragtree
572 << ": " << *p
.second
<< dendl
;
579 dirfragtree
.get_leaves(leaves
);
580 for (list
<frag_t
>::iterator p
= leaves
.begin(); p
!= leaves
.end(); ++p
)
581 mdcache
->get_force_dirfrag(dirfrag_t(ino(),*p
), true);
587 CDir
*CInode::get_approx_dirfrag(frag_t fg
)
589 CDir
*dir
= get_dirfrag(fg
);
594 get_dirfrags_under(fg
, ls
);
599 while (fg
.bits() > 0) {
601 dir
= get_dirfrag(fg
);
607 void CInode::get_dirfrags(std::list
<CDir
*>& ls
)
610 for (const auto &p
: dirfrags
) {
611 ls
.push_back(p
.second
);
614 void CInode::get_nested_dirfrags(list
<CDir
*>& ls
)
616 // dirfrags in same subtree
617 for (const auto &p
: dirfrags
) {
618 if (!p
.second
->is_subtree_root())
619 ls
.push_back(p
.second
);
622 void CInode::get_subtree_dirfrags(list
<CDir
*>& ls
)
624 // dirfrags that are roots of new subtrees
625 for (const auto &p
: dirfrags
) {
626 if (p
.second
->is_subtree_root())
627 ls
.push_back(p
.second
);
632 CDir
*CInode::get_or_open_dirfrag(MDCache
*mdcache
, frag_t fg
)
637 CDir
*dir
= get_dirfrag(fg
);
640 assert(is_auth() || mdcache
->mds
->is_any_replay());
641 dir
= new CDir(this, fg
, mdcache
, is_auth());
647 CDir
*CInode::add_dirfrag(CDir
*dir
)
649 assert(dirfrags
.count(dir
->dirfrag().frag
) == 0);
650 dirfrags
[dir
->dirfrag().frag
] = dir
;
652 if (stickydir_ref
> 0) {
653 dir
->state_set(CDir::STATE_STICKY
);
654 dir
->get(CDir::PIN_STICKY
);
662 void CInode::close_dirfrag(frag_t fg
)
664 dout(14) << "close_dirfrag " << fg
<< dendl
;
665 assert(dirfrags
.count(fg
));
667 CDir
*dir
= dirfrags
[fg
];
668 dir
->remove_null_dentries();
674 if (stickydir_ref
> 0) {
675 dir
->state_clear(CDir::STATE_STICKY
);
676 dir
->put(CDir::PIN_STICKY
);
679 // dump any remaining dentries, for debugging purposes
680 for (const auto &p
: dir
->items
)
681 dout(14) << __func__
<< " LEFTOVER dn " << *p
.second
<< dendl
;
683 assert(dir
->get_num_ref() == 0);
688 void CInode::close_dirfrags()
690 while (!dirfrags
.empty())
691 close_dirfrag(dirfrags
.begin()->first
);
694 bool CInode::has_subtree_root_dirfrag(int auth
)
696 for (const auto &p
: dirfrags
) {
697 if (p
.second
->is_subtree_root() &&
698 (auth
== -1 || p
.second
->dir_auth
.first
== auth
))
704 bool CInode::has_subtree_or_exporting_dirfrag()
706 for (const auto &p
: dirfrags
) {
707 if (p
.second
->is_subtree_root() ||
708 p
.second
->state_test(CDir::STATE_EXPORTING
))
714 void CInode::get_stickydirs()
716 if (stickydir_ref
== 0) {
718 for (const auto &p
: dirfrags
) {
719 p
.second
->state_set(CDir::STATE_STICKY
);
720 p
.second
->get(CDir::PIN_STICKY
);
726 void CInode::put_stickydirs()
728 assert(stickydir_ref
> 0);
730 if (stickydir_ref
== 0) {
732 for (const auto &p
: dirfrags
) {
733 p
.second
->state_clear(CDir::STATE_STICKY
);
734 p
.second
->put(CDir::PIN_STICKY
);
745 void CInode::first_get()
749 parent
->get(CDentry::PIN_INODEPIN
);
752 void CInode::last_put()
756 parent
->put(CDentry::PIN_INODEPIN
);
761 if (get_num_ref() == (int)is_dirty() + (int)is_dirty_parent())
762 mdcache
->maybe_eval_stray(this, true);
765 void CInode::add_remote_parent(CDentry
*p
)
767 if (remote_parents
.empty())
768 get(PIN_REMOTEPARENT
);
769 remote_parents
.insert(p
);
771 void CInode::remove_remote_parent(CDentry
*p
)
773 remote_parents
.erase(p
);
774 if (remote_parents
.empty())
775 put(PIN_REMOTEPARENT
);
781 CDir
*CInode::get_parent_dir()
787 CDir
*CInode::get_projected_parent_dir()
789 CDentry
*p
= get_projected_parent_dn();
794 CInode
*CInode::get_parent_inode()
797 return parent
->dir
->inode
;
801 bool CInode::is_projected_ancestor_of(CInode
*other
)
806 if (!other
->get_projected_parent_dn())
808 other
= other
->get_projected_parent_dn()->get_dir()->get_inode();
814 * Because a non-directory inode may have multiple links, the use_parent
815 * argument allows selecting which parent to use for path construction. This
816 * argument is only meaningful for the final component (i.e. the first of the
817 * nested calls) because directories cannot have multiple hard links. If
818 * use_parent is NULL and projected is true, the primary parent's projected
819 * inode is used all the way up the path chain. Otherwise the primary parent
820 * stable inode is used.
822 void CInode::make_path_string(string
& s
, bool projected
, const CDentry
*use_parent
) const
825 use_parent
= projected
? get_projected_parent_dn() : parent
;
829 use_parent
->make_path_string(s
, projected
);
830 } else if (is_root()) {
832 } else if (is_mdsdir()) {
834 uint64_t eino(ino());
835 eino
-= MDS_INO_MDSDIR_OFFSET
;
836 snprintf(t
, sizeof(t
), "~mds%" PRId64
, eino
);
840 uint64_t eino(ino());
841 snprintf(n
, sizeof(n
), "#%" PRIx64
, eino
);
846 void CInode::make_path(filepath
& fp
, bool projected
) const
848 const CDentry
*use_parent
= projected
? get_projected_parent_dn() : parent
;
851 use_parent
->make_path(fp
, projected
);
853 fp
= filepath(ino());
857 void CInode::name_stray_dentry(string
& dname
)
860 snprintf(s
, sizeof(s
), "%llx", (unsigned long long)inode
.ino
.val
);
864 version_t
CInode::pre_dirty()
867 CDentry
* _cdentry
= get_projected_parent_dn();
869 pv
= _cdentry
->pre_dirty(get_projected_version());
870 dout(10) << "pre_dirty " << pv
<< " (current v " << inode
.version
<< ")" << dendl
;
873 pv
= get_projected_version() + 1;
875 // force update backtrace for old format inode (see mempool_inode::decode)
876 if (inode
.backtrace_version
== 0 && !projected_nodes
.empty()) {
877 mempool_inode
&pi
= projected_nodes
.back().inode
;
878 if (pi
.backtrace_version
== 0)
879 pi
.update_backtrace(pv
);
884 void CInode::_mark_dirty(LogSegment
*ls
)
886 if (!state_test(STATE_DIRTY
)) {
887 state_set(STATE_DIRTY
);
892 // move myself to this segment's dirty list
894 ls
->dirty_inodes
.push_back(&item_dirty
);
897 void CInode::mark_dirty(version_t pv
, LogSegment
*ls
) {
899 dout(10) << "mark_dirty " << *this << dendl
;
902 NOTE: I may already be dirty, but this fn _still_ needs to be called so that
903 the directory is (perhaps newly) dirtied, and so that parent_dir_version is
907 // only auth can get dirty. "dirty" async data in replicas is relative to
908 // filelock state, not the dirty flag.
911 // touch my private version
912 assert(inode
.version
< pv
);
918 parent
->mark_dirty(pv
, ls
);
922 void CInode::mark_clean()
924 dout(10) << " mark_clean " << *this << dendl
;
925 if (state_test(STATE_DIRTY
)) {
926 state_clear(STATE_DIRTY
);
929 // remove myself from ls dirty list
930 item_dirty
.remove_myself();
937 // (currently for root inode only)
939 struct C_IO_Inode_Stored
: public CInodeIOContext
{
942 C_IO_Inode_Stored(CInode
*i
, version_t v
, Context
*f
) : CInodeIOContext(i
), version(v
), fin(f
) {}
943 void finish(int r
) override
{
944 in
->_stored(r
, version
, fin
);
948 object_t
InodeStoreBase::get_object_name(inodeno_t ino
, frag_t fg
, const char *suffix
)
951 snprintf(n
, sizeof(n
), "%llx.%08llx%s", (long long unsigned)ino
, (long long unsigned)fg
, suffix
? suffix
: "");
955 void CInode::store(MDSInternalContextBase
*fin
)
957 dout(10) << "store " << get_version() << dendl
;
961 purge_stale_snap_data(snaprealm
->get_snaps());
965 string magic
= CEPH_FS_ONDISK_MAGIC
;
967 encode_store(bl
, mdcache
->mds
->mdsmap
->get_up_features());
974 object_t oid
= CInode::get_object_name(ino(), frag_t(), ".inode");
975 object_locator_t
oloc(mdcache
->mds
->mdsmap
->get_metadata_pool());
978 new C_OnFinisher(new C_IO_Inode_Stored(this, get_version(), fin
),
979 mdcache
->mds
->finisher
);
980 mdcache
->mds
->objecter
->mutate(oid
, oloc
, m
, snapc
,
981 ceph::real_clock::now(), 0,
985 void CInode::_stored(int r
, version_t v
, Context
*fin
)
988 dout(1) << "store error " << r
<< " v " << v
<< " on " << *this << dendl
;
989 mdcache
->mds
->clog
->error() << "failed to store inode " << ino()
990 << " object: " << cpp_strerror(r
);
991 mdcache
->mds
->handle_write_error(r
);
996 dout(10) << "_stored " << v
<< " on " << *this << dendl
;
997 if (v
== get_projected_version())
1003 void CInode::flush(MDSInternalContextBase
*fin
)
1005 dout(10) << "flush " << *this << dendl
;
1006 assert(is_auth() && can_auth_pin());
1008 MDSGatherBuilder
gather(g_ceph_context
);
1010 if (is_dirty_parent()) {
1011 store_backtrace(gather
.new_sub());
1015 store(gather
.new_sub());
1017 parent
->dir
->commit(0, gather
.new_sub());
1021 if (gather
.has_subs()) {
1022 gather
.set_finisher(fin
);
1029 struct C_IO_Inode_Fetched
: public CInodeIOContext
{
1032 C_IO_Inode_Fetched(CInode
*i
, Context
*f
) : CInodeIOContext(i
), fin(f
) {}
1033 void finish(int r
) override
{
1034 // Ignore 'r', because we fetch from two places, so r is usually ENOENT
1035 in
->_fetched(bl
, bl2
, fin
);
1039 void CInode::fetch(MDSInternalContextBase
*fin
)
1041 dout(10) << "fetch" << dendl
;
1043 C_IO_Inode_Fetched
*c
= new C_IO_Inode_Fetched(this, fin
);
1044 C_GatherBuilder
gather(g_ceph_context
, new C_OnFinisher(c
, mdcache
->mds
->finisher
));
1046 object_t oid
= CInode::get_object_name(ino(), frag_t(), "");
1047 object_locator_t
oloc(mdcache
->mds
->mdsmap
->get_metadata_pool());
1049 // Old on-disk format: inode stored in xattr of a dirfrag
1051 rd
.getxattr("inode", &c
->bl
, NULL
);
1052 mdcache
->mds
->objecter
->read(oid
, oloc
, rd
, CEPH_NOSNAP
, (bufferlist
*)NULL
, 0, gather
.new_sub());
1054 // Current on-disk format: inode stored in a .inode object
1055 object_t oid2
= CInode::get_object_name(ino(), frag_t(), ".inode");
1056 mdcache
->mds
->objecter
->read(oid2
, oloc
, 0, 0, CEPH_NOSNAP
, &c
->bl2
, 0, gather
.new_sub());
1061 void CInode::_fetched(bufferlist
& bl
, bufferlist
& bl2
, Context
*fin
)
1063 dout(10) << "_fetched got " << bl
.length() << " and " << bl2
.length() << dendl
;
1064 bufferlist::iterator p
;
1067 } else if (bl
.length()) {
1070 derr
<< "No data while reading inode " << ino() << dendl
;
1071 fin
->complete(-ENOENT
);
1079 dout(10) << " magic is '" << magic
<< "' (expecting '"
1080 << CEPH_FS_ONDISK_MAGIC
<< "')" << dendl
;
1081 if (magic
!= CEPH_FS_ONDISK_MAGIC
) {
1082 dout(0) << "on disk magic '" << magic
<< "' != my magic '" << CEPH_FS_ONDISK_MAGIC
1084 fin
->complete(-EINVAL
);
1087 dout(10) << "_fetched " << *this << dendl
;
1090 } catch (buffer::error
&err
) {
1091 derr
<< "Corrupt inode " << ino() << ": " << err
<< dendl
;
1092 fin
->complete(-EINVAL
);
1097 void CInode::build_backtrace(int64_t pool
, inode_backtrace_t
& bt
)
1100 bt
.ancestors
.clear();
1104 CDentry
*pdn
= get_parent_dn();
1106 CInode
*diri
= pdn
->get_dir()->get_inode();
1107 bt
.ancestors
.push_back(inode_backpointer_t(diri
->ino(), pdn
->get_name(), in
->inode
.version
));
1109 pdn
= in
->get_parent_dn();
1111 for (auto &p
: inode
.old_pools
) {
1112 // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
1114 bt
.old_pools
.insert(p
);
1118 struct C_IO_Inode_StoredBacktrace
: public CInodeIOContext
{
1121 C_IO_Inode_StoredBacktrace(CInode
*i
, version_t v
, Context
*f
) : CInodeIOContext(i
), version(v
), fin(f
) {}
1122 void finish(int r
) override
{
1123 in
->_stored_backtrace(r
, version
, fin
);
1127 void CInode::store_backtrace(MDSInternalContextBase
*fin
, int op_prio
)
1129 dout(10) << "store_backtrace on " << *this << dendl
;
1130 assert(is_dirty_parent());
1133 op_prio
= CEPH_MSG_PRIO_DEFAULT
;
1137 const int64_t pool
= get_backtrace_pool();
1138 inode_backtrace_t bt
;
1139 build_backtrace(pool
, bt
);
1140 bufferlist parent_bl
;
1141 ::encode(bt
, parent_bl
);
1144 op
.priority
= op_prio
;
1146 op
.setxattr("parent", parent_bl
);
1148 bufferlist layout_bl
;
1149 ::encode(inode
.layout
, layout_bl
, mdcache
->mds
->mdsmap
->get_up_features());
1150 op
.setxattr("layout", layout_bl
);
1153 object_t oid
= get_object_name(ino(), frag_t(), "");
1154 object_locator_t
oloc(pool
);
1155 Context
*fin2
= new C_OnFinisher(
1156 new C_IO_Inode_StoredBacktrace(this, inode
.backtrace_version
, fin
),
1157 mdcache
->mds
->finisher
);
1159 if (!state_test(STATE_DIRTYPOOL
) || inode
.old_pools
.empty()) {
1160 dout(20) << __func__
<< ": no dirtypool or no old pools" << dendl
;
1161 mdcache
->mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
1162 ceph::real_clock::now(),
1167 C_GatherBuilder
gather(g_ceph_context
, fin2
);
1168 mdcache
->mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
1169 ceph::real_clock::now(),
1170 0, gather
.new_sub());
1172 // In the case where DIRTYPOOL is set, we update all old pools backtraces
1173 // such that anyone reading them will see the new pool ID in
1174 // inode_backtrace_t::pool and go read everything else from there.
1175 for (const auto &p
: inode
.old_pools
) {
1179 dout(20) << __func__
<< ": updating old pool " << p
<< dendl
;
1182 op
.priority
= op_prio
;
1184 op
.setxattr("parent", parent_bl
);
1186 object_locator_t
oloc(p
);
1187 mdcache
->mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
1188 ceph::real_clock::now(),
1189 0, gather
.new_sub());
1194 void CInode::_stored_backtrace(int r
, version_t v
, Context
*fin
)
1197 const int64_t pool
= get_backtrace_pool();
1198 bool exists
= mdcache
->mds
->objecter
->with_osdmap(
1199 [pool
](const OSDMap
&osd_map
) {
1200 return osd_map
.have_pg_pool(pool
);
1203 // This ENOENT is because the pool doesn't exist (the user deleted it
1204 // out from under us), so the backtrace can never be written, so pretend
1205 // to succeed so that the user can proceed to e.g. delete the file.
1207 dout(4) << "store_backtrace got ENOENT: a data pool was deleted "
1208 "beneath us!" << dendl
;
1214 dout(1) << "store backtrace error " << r
<< " v " << v
<< dendl
;
1215 mdcache
->mds
->clog
->error() << "failed to store backtrace on ino "
1216 << ino() << " object"
1217 << ", pool " << get_backtrace_pool()
1219 mdcache
->mds
->handle_write_error(r
);
1225 dout(10) << "_stored_backtrace v " << v
<< dendl
;
1228 if (v
== inode
.backtrace_version
)
1229 clear_dirty_parent();
1234 void CInode::fetch_backtrace(Context
*fin
, bufferlist
*backtrace
)
1236 mdcache
->fetch_backtrace(inode
.ino
, get_backtrace_pool(), *backtrace
, fin
);
1239 void CInode::mark_dirty_parent(LogSegment
*ls
, bool dirty_pool
)
1241 if (!state_test(STATE_DIRTYPARENT
)) {
1242 dout(10) << "mark_dirty_parent" << dendl
;
1243 state_set(STATE_DIRTYPARENT
);
1244 get(PIN_DIRTYPARENT
);
1248 state_set(STATE_DIRTYPOOL
);
1250 ls
->dirty_parent_inodes
.push_back(&item_dirty_parent
);
1253 void CInode::clear_dirty_parent()
1255 if (state_test(STATE_DIRTYPARENT
)) {
1256 dout(10) << "clear_dirty_parent" << dendl
;
1257 state_clear(STATE_DIRTYPARENT
);
1258 state_clear(STATE_DIRTYPOOL
);
1259 put(PIN_DIRTYPARENT
);
1260 item_dirty_parent
.remove_myself();
1264 void CInode::verify_diri_backtrace(bufferlist
&bl
, int err
)
1266 if (is_base() || is_dirty_parent() || !is_auth())
1269 dout(10) << "verify_diri_backtrace" << dendl
;
1272 inode_backtrace_t backtrace
;
1273 ::decode(backtrace
, bl
);
1274 CDentry
*pdn
= get_parent_dn();
1275 if (backtrace
.ancestors
.empty() ||
1276 backtrace
.ancestors
[0].dname
!= pdn
->get_name() ||
1277 backtrace
.ancestors
[0].dirino
!= pdn
->get_dir()->ino())
1282 MDSRank
*mds
= mdcache
->mds
;
1283 mds
->clog
->error() << "bad backtrace on directory inode " << ino();
1284 assert(!"bad backtrace" == (g_conf
->mds_verify_backtrace
> 1));
1286 mark_dirty_parent(mds
->mdlog
->get_current_segment(), false);
1287 mds
->mdlog
->flush();
1291 // ------------------
1295 void InodeStoreBase::encode_bare(bufferlist
&bl
, uint64_t features
,
1296 const bufferlist
*snap_blob
) const
1298 ::encode(inode
, bl
, features
);
1300 ::encode(symlink
, bl
);
1301 ::encode(dirfragtree
, bl
);
1302 ::encode(xattrs
, bl
);
1304 ::encode(*snap_blob
, bl
);
1306 ::encode(bufferlist(), bl
);
1307 ::encode(old_inodes
, bl
, features
);
1308 ::encode(oldest_snap
, bl
);
1309 ::encode(damage_flags
, bl
);
1312 void InodeStoreBase::encode(bufferlist
&bl
, uint64_t features
,
1313 const bufferlist
*snap_blob
) const
1315 ENCODE_START(6, 4, bl
);
1316 encode_bare(bl
, features
, snap_blob
);
1320 void CInode::encode_store(bufferlist
& bl
, uint64_t features
)
1322 bufferlist snap_blob
;
1323 encode_snap_blob(snap_blob
);
1324 InodeStoreBase::encode(bl
, mdcache
->mds
->mdsmap
->get_up_features(),
1328 void InodeStoreBase::decode_bare(bufferlist::iterator
&bl
,
1329 bufferlist
& snap_blob
, __u8 struct_v
)
1331 ::decode(inode
, bl
);
1335 symlink
= mempool::mds_co::string(boost::string_view(tmp
));
1337 ::decode(dirfragtree
, bl
);
1338 ::decode(xattrs
, bl
);
1339 ::decode(snap_blob
, bl
);
1341 ::decode(old_inodes
, bl
);
1342 if (struct_v
== 2 && inode
.is_dir()) {
1343 bool default_layout_exists
;
1344 ::decode(default_layout_exists
, bl
);
1345 if (default_layout_exists
) {
1346 ::decode(struct_v
, bl
); // this was a default_file_layout
1347 ::decode(inode
.layout
, bl
); // but we only care about the layout portion
1351 if (struct_v
>= 5) {
1352 // InodeStore is embedded in dentries without proper versioning, so
1353 // we consume up to the end of the buffer
1355 ::decode(oldest_snap
, bl
);
1359 ::decode(damage_flags
, bl
);
1365 void InodeStoreBase::decode(bufferlist::iterator
&bl
, bufferlist
& snap_blob
)
1367 DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl
);
1368 decode_bare(bl
, snap_blob
, struct_v
);
1372 void CInode::decode_store(bufferlist::iterator
& bl
)
1374 bufferlist snap_blob
;
1375 InodeStoreBase::decode(bl
, snap_blob
);
1376 decode_snap_blob(snap_blob
);
1379 // ------------------
1382 void CInode::set_object_info(MDSCacheObjectInfo
&info
)
1388 void CInode::encode_lock_state(int type
, bufferlist
& bl
)
1390 ::encode(first
, bl
);
1393 case CEPH_LOCK_IAUTH
:
1394 ::encode(inode
.version
, bl
);
1395 ::encode(inode
.ctime
, bl
);
1396 ::encode(inode
.mode
, bl
);
1397 ::encode(inode
.uid
, bl
);
1398 ::encode(inode
.gid
, bl
);
1401 case CEPH_LOCK_ILINK
:
1402 ::encode(inode
.version
, bl
);
1403 ::encode(inode
.ctime
, bl
);
1404 ::encode(inode
.nlink
, bl
);
1407 case CEPH_LOCK_IDFT
:
1409 ::encode(inode
.version
, bl
);
1411 // treat flushing as dirty when rejoining cache
1412 bool dirty
= dirfragtreelock
.is_dirty_or_flushing();
1413 ::encode(dirty
, bl
);
1416 // encode the raw tree
1417 ::encode(dirfragtree
, bl
);
1419 // also specify which frags are mine
1420 set
<frag_t
> myfrags
;
1423 for (list
<CDir
*>::iterator p
= dfls
.begin(); p
!= dfls
.end(); ++p
)
1424 if ((*p
)->is_auth()) {
1425 frag_t fg
= (*p
)->get_frag();
1428 ::encode(myfrags
, bl
);
1432 case CEPH_LOCK_IFILE
:
1434 ::encode(inode
.version
, bl
);
1435 ::encode(inode
.ctime
, bl
);
1436 ::encode(inode
.mtime
, bl
);
1437 ::encode(inode
.atime
, bl
);
1438 ::encode(inode
.time_warp_seq
, bl
);
1440 ::encode(inode
.layout
, bl
, mdcache
->mds
->mdsmap
->get_up_features());
1441 ::encode(inode
.size
, bl
);
1442 ::encode(inode
.truncate_seq
, bl
);
1443 ::encode(inode
.truncate_size
, bl
);
1444 ::encode(inode
.client_ranges
, bl
);
1445 ::encode(inode
.inline_data
, bl
);
1448 // treat flushing as dirty when rejoining cache
1449 bool dirty
= filelock
.is_dirty_or_flushing();
1450 ::encode(dirty
, bl
);
1454 dout(15) << "encode_lock_state inode.dirstat is " << inode
.dirstat
<< dendl
;
1455 ::encode(inode
.dirstat
, bl
); // only meaningful if i am auth.
1458 for (const auto &p
: dirfrags
) {
1459 frag_t fg
= p
.first
;
1460 CDir
*dir
= p
.second
;
1461 if (is_auth() || dir
->is_auth()) {
1462 fnode_t
*pf
= dir
->get_projected_fnode();
1463 dout(15) << fg
<< " " << *dir
<< dendl
;
1464 dout(20) << fg
<< " fragstat " << pf
->fragstat
<< dendl
;
1465 dout(20) << fg
<< " accounted_fragstat " << pf
->accounted_fragstat
<< dendl
;
1467 ::encode(dir
->first
, tmp
);
1468 ::encode(pf
->fragstat
, tmp
);
1469 ::encode(pf
->accounted_fragstat
, tmp
);
1474 bl
.claim_append(tmp
);
1478 case CEPH_LOCK_INEST
:
1480 ::encode(inode
.version
, bl
);
1482 // treat flushing as dirty when rejoining cache
1483 bool dirty
= nestlock
.is_dirty_or_flushing();
1484 ::encode(dirty
, bl
);
1487 dout(15) << "encode_lock_state inode.rstat is " << inode
.rstat
<< dendl
;
1488 ::encode(inode
.rstat
, bl
); // only meaningful if i am auth.
1491 for (const auto &p
: dirfrags
) {
1492 frag_t fg
= p
.first
;
1493 CDir
*dir
= p
.second
;
1494 if (is_auth() || dir
->is_auth()) {
1495 fnode_t
*pf
= dir
->get_projected_fnode();
1496 dout(10) << fg
<< " " << *dir
<< dendl
;
1497 dout(10) << fg
<< " " << pf
->rstat
<< dendl
;
1498 dout(10) << fg
<< " " << pf
->rstat
<< dendl
;
1499 dout(10) << fg
<< " " << dir
->dirty_old_rstat
<< dendl
;
1501 ::encode(dir
->first
, tmp
);
1502 ::encode(pf
->rstat
, tmp
);
1503 ::encode(pf
->accounted_rstat
, tmp
);
1504 ::encode(dir
->dirty_old_rstat
, tmp
);
1509 bl
.claim_append(tmp
);
1513 case CEPH_LOCK_IXATTR
:
1514 ::encode(inode
.version
, bl
);
1515 ::encode(inode
.ctime
, bl
);
1516 ::encode(xattrs
, bl
);
1519 case CEPH_LOCK_ISNAP
:
1520 ::encode(inode
.version
, bl
);
1521 ::encode(inode
.ctime
, bl
);
1525 case CEPH_LOCK_IFLOCK
:
1526 ::encode(inode
.version
, bl
);
1527 _encode_file_locks(bl
);
1530 case CEPH_LOCK_IPOLICY
:
1531 if (inode
.is_dir()) {
1532 ::encode(inode
.version
, bl
);
1533 ::encode(inode
.ctime
, bl
);
1534 ::encode(inode
.layout
, bl
, mdcache
->mds
->mdsmap
->get_up_features());
1535 ::encode(inode
.quota
, bl
);
1536 ::encode(inode
.export_pin
, bl
);
1546 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1548 void CInode::decode_lock_state(int type
, bufferlist
& bl
)
1550 bufferlist::iterator p
= bl
.begin();
1554 ::decode(newfirst
, p
);
1556 if (!is_auth() && newfirst
!= first
) {
1557 dout(10) << "decode_lock_state first " << first
<< " -> " << newfirst
<< dendl
;
1558 assert(newfirst
> first
);
1559 if (!is_multiversion() && parent
) {
1560 assert(parent
->first
== first
);
1561 parent
->first
= newfirst
;
1567 case CEPH_LOCK_IAUTH
:
1568 ::decode(inode
.version
, p
);
1570 if (inode
.ctime
< tm
) inode
.ctime
= tm
;
1571 ::decode(inode
.mode
, p
);
1572 ::decode(inode
.uid
, p
);
1573 ::decode(inode
.gid
, p
);
1576 case CEPH_LOCK_ILINK
:
1577 ::decode(inode
.version
, p
);
1579 if (inode
.ctime
< tm
) inode
.ctime
= tm
;
1580 ::decode(inode
.nlink
, p
);
1583 case CEPH_LOCK_IDFT
:
1586 ::decode(replica_dirty
, p
);
1587 if (replica_dirty
) {
1588 dout(10) << "decode_lock_state setting dftlock dirty flag" << dendl
;
1589 dirfragtreelock
.mark_dirty(); // ok bc we're auth and caller will handle
1592 ::decode(inode
.version
, p
);
1597 set
<frag_t
> authfrags
;
1598 ::decode(authfrags
, p
);
1600 // auth. believe replica's auth frags only.
1601 for (set
<frag_t
>::iterator p
= authfrags
.begin(); p
!= authfrags
.end(); ++p
)
1602 if (!dirfragtree
.is_leaf(*p
)) {
1603 dout(10) << " forcing frag " << *p
<< " to leaf (split|merge)" << dendl
;
1604 dirfragtree
.force_to_leaf(g_ceph_context
, *p
);
1605 dirfragtreelock
.mark_dirty(); // ok bc we're auth and caller will handle
1608 // replica. take the new tree, BUT make sure any open
1609 // dirfrags remain leaves (they may have split _after_ this
1610 // dft was scattered, or we may still be be waiting on the
1611 // notify from the auth)
1612 dirfragtree
.swap(temp
);
1613 for (const auto &p
: dirfrags
) {
1614 if (!dirfragtree
.is_leaf(p
.first
)) {
1615 dout(10) << " forcing open dirfrag " << p
.first
<< " to leaf (racing with split|merge)" << dendl
;
1616 dirfragtree
.force_to_leaf(g_ceph_context
, p
.first
);
1618 if (p
.second
->is_auth())
1619 p
.second
->state_clear(CDir::STATE_DIRTYDFT
);
1622 if (g_conf
->mds_debug_frag
)
1627 case CEPH_LOCK_IFILE
:
1629 ::decode(inode
.version
, p
);
1631 if (inode
.ctime
< tm
) inode
.ctime
= tm
;
1632 ::decode(inode
.mtime
, p
);
1633 ::decode(inode
.atime
, p
);
1634 ::decode(inode
.time_warp_seq
, p
);
1636 ::decode(inode
.layout
, p
);
1637 ::decode(inode
.size
, p
);
1638 ::decode(inode
.truncate_seq
, p
);
1639 ::decode(inode
.truncate_size
, p
);
1640 ::decode(inode
.client_ranges
, p
);
1641 ::decode(inode
.inline_data
, p
);
1645 ::decode(replica_dirty
, p
);
1646 if (replica_dirty
) {
1647 dout(10) << "decode_lock_state setting filelock dirty flag" << dendl
;
1648 filelock
.mark_dirty(); // ok bc we're auth and caller will handle
1652 frag_info_t dirstat
;
1653 ::decode(dirstat
, p
);
1655 dout(10) << " taking inode dirstat " << dirstat
<< " for " << *this << dendl
;
1656 inode
.dirstat
= dirstat
; // take inode summation if replica
1660 dout(10) << " ...got " << n
<< " fragstats on " << *this << dendl
;
1664 frag_info_t fragstat
;
1665 frag_info_t accounted_fragstat
;
1667 ::decode(fgfirst
, p
);
1668 ::decode(fragstat
, p
);
1669 ::decode(accounted_fragstat
, p
);
1670 dout(10) << fg
<< " [" << fgfirst
<< ",head] " << dendl
;
1671 dout(10) << fg
<< " fragstat " << fragstat
<< dendl
;
1672 dout(20) << fg
<< " accounted_fragstat " << accounted_fragstat
<< dendl
;
1674 CDir
*dir
= get_dirfrag(fg
);
1676 assert(dir
); // i am auth; i had better have this dir open
1677 dout(10) << fg
<< " first " << dir
->first
<< " -> " << fgfirst
1678 << " on " << *dir
<< dendl
;
1679 dir
->first
= fgfirst
;
1680 dir
->fnode
.fragstat
= fragstat
;
1681 dir
->fnode
.accounted_fragstat
= accounted_fragstat
;
1682 dir
->first
= fgfirst
;
1683 if (!(fragstat
== accounted_fragstat
)) {
1684 dout(10) << fg
<< " setting filelock updated flag" << dendl
;
1685 filelock
.mark_dirty(); // ok bc we're auth and caller will handle
1688 if (dir
&& dir
->is_auth()) {
1689 dout(10) << fg
<< " first " << dir
->first
<< " -> " << fgfirst
1690 << " on " << *dir
<< dendl
;
1691 dir
->first
= fgfirst
;
1692 fnode_t
*pf
= dir
->get_projected_fnode();
1693 finish_scatter_update(&filelock
, dir
,
1694 inode
.dirstat
.version
, pf
->accounted_fragstat
.version
);
1701 case CEPH_LOCK_INEST
:
1704 ::decode(replica_dirty
, p
);
1705 if (replica_dirty
) {
1706 dout(10) << "decode_lock_state setting nestlock dirty flag" << dendl
;
1707 nestlock
.mark_dirty(); // ok bc we're auth and caller will handle
1710 ::decode(inode
.version
, p
);
1716 dout(10) << " taking inode rstat " << rstat
<< " for " << *this << dendl
;
1717 inode
.rstat
= rstat
; // take inode summation if replica
1725 nest_info_t accounted_rstat
;
1726 decltype(CDir::dirty_old_rstat
) dirty_old_rstat
;
1728 ::decode(fgfirst
, p
);
1730 ::decode(accounted_rstat
, p
);
1731 ::decode(dirty_old_rstat
, p
);
1732 dout(10) << fg
<< " [" << fgfirst
<< ",head]" << dendl
;
1733 dout(10) << fg
<< " rstat " << rstat
<< dendl
;
1734 dout(10) << fg
<< " accounted_rstat " << accounted_rstat
<< dendl
;
1735 dout(10) << fg
<< " dirty_old_rstat " << dirty_old_rstat
<< dendl
;
1737 CDir
*dir
= get_dirfrag(fg
);
1739 assert(dir
); // i am auth; i had better have this dir open
1740 dout(10) << fg
<< " first " << dir
->first
<< " -> " << fgfirst
1741 << " on " << *dir
<< dendl
;
1742 dir
->first
= fgfirst
;
1743 dir
->fnode
.rstat
= rstat
;
1744 dir
->fnode
.accounted_rstat
= accounted_rstat
;
1745 dir
->dirty_old_rstat
.swap(dirty_old_rstat
);
1746 if (!(rstat
== accounted_rstat
) || !dir
->dirty_old_rstat
.empty()) {
1747 dout(10) << fg
<< " setting nestlock updated flag" << dendl
;
1748 nestlock
.mark_dirty(); // ok bc we're auth and caller will handle
1751 if (dir
&& dir
->is_auth()) {
1752 dout(10) << fg
<< " first " << dir
->first
<< " -> " << fgfirst
1753 << " on " << *dir
<< dendl
;
1754 dir
->first
= fgfirst
;
1755 fnode_t
*pf
= dir
->get_projected_fnode();
1756 finish_scatter_update(&nestlock
, dir
,
1757 inode
.rstat
.version
, pf
->accounted_rstat
.version
);
1764 case CEPH_LOCK_IXATTR
:
1765 ::decode(inode
.version
, p
);
1767 if (inode
.ctime
< tm
) inode
.ctime
= tm
;
1768 ::decode(xattrs
, p
);
1771 case CEPH_LOCK_ISNAP
:
1773 ::decode(inode
.version
, p
);
1775 if (inode
.ctime
< tm
) inode
.ctime
= tm
;
1778 seq
= snaprealm
->srnode
.seq
;
1780 if (snaprealm
&& snaprealm
->srnode
.seq
!= seq
)
1781 mdcache
->do_realm_invalidate_and_update_notify(this, seq
? CEPH_SNAP_OP_UPDATE
:CEPH_SNAP_OP_SPLIT
);
1785 case CEPH_LOCK_IFLOCK
:
1786 ::decode(inode
.version
, p
);
1787 _decode_file_locks(p
);
1790 case CEPH_LOCK_IPOLICY
:
1791 if (inode
.is_dir()) {
1792 ::decode(inode
.version
, p
);
1794 if (inode
.ctime
< tm
) inode
.ctime
= tm
;
1795 ::decode(inode
.layout
, p
);
1796 ::decode(inode
.quota
, p
);
1797 mds_rank_t old_pin
= inode
.export_pin
;
1798 ::decode(inode
.export_pin
, p
);
1799 maybe_export_pin(old_pin
!= inode
.export_pin
);
1809 bool CInode::is_dirty_scattered()
1812 filelock
.is_dirty_or_flushing() ||
1813 nestlock
.is_dirty_or_flushing() ||
1814 dirfragtreelock
.is_dirty_or_flushing();
1817 void CInode::clear_scatter_dirty()
1819 filelock
.remove_dirty();
1820 nestlock
.remove_dirty();
1821 dirfragtreelock
.remove_dirty();
1824 void CInode::clear_dirty_scattered(int type
)
1826 dout(10) << "clear_dirty_scattered " << type
<< " on " << *this << dendl
;
1829 case CEPH_LOCK_IFILE
:
1830 item_dirty_dirfrag_dir
.remove_myself();
1833 case CEPH_LOCK_INEST
:
1834 item_dirty_dirfrag_nest
.remove_myself();
1837 case CEPH_LOCK_IDFT
:
1838 item_dirty_dirfrag_dirfragtree
.remove_myself();
1848 * when we initially scatter a lock, we need to check if any of the dirfrags
1849 * have out of date accounted_rstat/fragstat. if so, mark the lock stale.
1851 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
1852 void CInode::start_scatter(ScatterLock
*lock
)
1854 dout(10) << "start_scatter " << *lock
<< " on " << *this << dendl
;
1856 mempool_inode
*pi
= get_projected_inode();
1858 for (const auto &p
: dirfrags
) {
1859 frag_t fg
= p
.first
;
1860 CDir
*dir
= p
.second
;
1861 fnode_t
*pf
= dir
->get_projected_fnode();
1862 dout(20) << fg
<< " " << *dir
<< dendl
;
1864 if (!dir
->is_auth())
1867 switch (lock
->get_type()) {
1868 case CEPH_LOCK_IFILE
:
1869 finish_scatter_update(lock
, dir
, pi
->dirstat
.version
, pf
->accounted_fragstat
.version
);
1872 case CEPH_LOCK_INEST
:
1873 finish_scatter_update(lock
, dir
, pi
->rstat
.version
, pf
->accounted_rstat
.version
);
1876 case CEPH_LOCK_IDFT
:
1877 dir
->state_clear(CDir::STATE_DIRTYDFT
);
1884 class C_Inode_FragUpdate
: public MDSLogContextBase
{
1889 MDSRank
*get_mds() override
{return in
->mdcache
->mds
;}
1890 void finish(int r
) override
{
1891 in
->_finish_frag_update(dir
, mut
);
1895 C_Inode_FragUpdate(CInode
*i
, CDir
*d
, MutationRef
& m
) : in(i
), dir(d
), mut(m
) {}
1898 void CInode::finish_scatter_update(ScatterLock
*lock
, CDir
*dir
,
1899 version_t inode_version
, version_t dir_accounted_version
)
1901 frag_t fg
= dir
->get_frag();
1902 assert(dir
->is_auth());
1904 if (dir
->is_frozen()) {
1905 dout(10) << "finish_scatter_update " << fg
<< " frozen, marking " << *lock
<< " stale " << *dir
<< dendl
;
1906 } else if (dir
->get_version() == 0) {
1907 dout(10) << "finish_scatter_update " << fg
<< " not loaded, marking " << *lock
<< " stale " << *dir
<< dendl
;
1909 if (dir_accounted_version
!= inode_version
) {
1910 dout(10) << "finish_scatter_update " << fg
<< " journaling accounted scatterstat update v" << inode_version
<< dendl
;
1912 MDLog
*mdlog
= mdcache
->mds
->mdlog
;
1913 MutationRef
mut(new MutationImpl());
1914 mut
->ls
= mdlog
->get_current_segment();
1916 mempool_inode
*pi
= get_projected_inode();
1917 fnode_t
*pf
= dir
->project_fnode();
1919 const char *ename
= 0;
1920 switch (lock
->get_type()) {
1921 case CEPH_LOCK_IFILE
:
1922 pf
->fragstat
.version
= pi
->dirstat
.version
;
1923 pf
->accounted_fragstat
= pf
->fragstat
;
1924 ename
= "lock ifile accounted scatter stat update";
1926 case CEPH_LOCK_INEST
:
1927 pf
->rstat
.version
= pi
->rstat
.version
;
1928 pf
->accounted_rstat
= pf
->rstat
;
1929 ename
= "lock inest accounted scatter stat update";
1931 if (!is_auth() && lock
->get_state() == LOCK_MIX
) {
1932 dout(10) << "finish_scatter_update try to assimilate dirty rstat on "
1934 dir
->assimilate_dirty_rstat_inodes();
1942 pf
->version
= dir
->pre_dirty();
1943 mut
->add_projected_fnode(dir
);
1945 EUpdate
*le
= new EUpdate(mdlog
, ename
);
1946 mdlog
->start_entry(le
);
1947 le
->metablob
.add_dir_context(dir
);
1948 le
->metablob
.add_dir(dir
, true);
1950 assert(!dir
->is_frozen());
1953 if (lock
->get_type() == CEPH_LOCK_INEST
&&
1954 !is_auth() && lock
->get_state() == LOCK_MIX
) {
1955 dout(10) << "finish_scatter_update finish assimilating dirty rstat on "
1957 dir
->assimilate_dirty_rstat_inodes_finish(mut
, &le
->metablob
);
1959 if (!(pf
->rstat
== pf
->accounted_rstat
)) {
1960 if (mut
->wrlocks
.count(&nestlock
) == 0) {
1961 mdcache
->mds
->locker
->wrlock_force(&nestlock
, mut
);
1964 mdcache
->mds
->locker
->mark_updated_scatterlock(&nestlock
);
1965 mut
->ls
->dirty_dirfrag_nest
.push_back(&item_dirty_dirfrag_nest
);
1969 mdlog
->submit_entry(le
, new C_Inode_FragUpdate(this, dir
, mut
));
1971 dout(10) << "finish_scatter_update " << fg
<< " accounted " << *lock
1972 << " scatter stat unchanged at v" << dir_accounted_version
<< dendl
;
1977 void CInode::_finish_frag_update(CDir
*dir
, MutationRef
& mut
)
1979 dout(10) << "_finish_frag_update on " << *dir
<< dendl
;
1981 mdcache
->mds
->locker
->drop_locks(mut
.get());
1987 * when we gather a lock, we need to assimilate dirfrag changes into the inode
1988 * state. it's possible we can't update the dirfrag accounted_rstat/fragstat
1989 * because the frag is auth and frozen, or that the replica couldn't for the same
1990 * reason. hopefully it will get updated the next time the lock cycles.
1992 * we have two dimensions of behavior:
1993 * - we may be (auth and !frozen), and able to update, or not.
1994 * - the frag may be stale, or not.
1996 * if the frag is non-stale, we want to assimilate the diff into the
1997 * inode, regardless of whether it's auth or updateable.
1999 * if we update the frag, we want to set accounted_fragstat = frag,
2000 * both if we took the diff or it was stale and we are making it
2003 /* for more info on scatterlocks, see comments by Locker::scatter_writebehind */
2004 void CInode::finish_scatter_gather_update(int type
)
2006 LogChannelRef clog
= mdcache
->mds
->clog
;
2008 dout(10) << "finish_scatter_gather_update " << type
<< " on " << *this << dendl
;
2012 case CEPH_LOCK_IFILE
:
2014 fragtree_t tmpdft
= dirfragtree
;
2015 struct frag_info_t dirstat
;
2016 bool dirstat_valid
= true;
2020 mempool_inode
*pi
= get_projected_inode();
2022 bool touched_mtime
= false, touched_chattr
= false;
2023 dout(20) << " orig dirstat " << pi
->dirstat
<< dendl
;
2024 pi
->dirstat
.version
++;
2025 for (const auto &p
: dirfrags
) {
2026 frag_t fg
= p
.first
;
2027 CDir
*dir
= p
.second
;
2028 dout(20) << fg
<< " " << *dir
<< dendl
;
2031 if (dir
->get_version() != 0) {
2032 update
= dir
->is_auth() && !dir
->is_frozen();
2035 dirstat_valid
= false;
2038 fnode_t
*pf
= dir
->get_projected_fnode();
2040 pf
= dir
->project_fnode();
2042 if (pf
->accounted_fragstat
.version
== pi
->dirstat
.version
- 1) {
2043 dout(20) << fg
<< " fragstat " << pf
->fragstat
<< dendl
;
2044 dout(20) << fg
<< " accounted_fragstat " << pf
->accounted_fragstat
<< dendl
;
2045 pi
->dirstat
.add_delta(pf
->fragstat
, pf
->accounted_fragstat
, &touched_mtime
, &touched_chattr
);
2047 dout(20) << fg
<< " skipping STALE accounted_fragstat " << pf
->accounted_fragstat
<< dendl
;
2050 if (pf
->fragstat
.nfiles
< 0 ||
2051 pf
->fragstat
.nsubdirs
< 0) {
2052 clog
->error() << "bad/negative dir size on "
2053 << dir
->dirfrag() << " " << pf
->fragstat
;
2054 assert(!"bad/negative fragstat" == g_conf
->mds_verify_scatter
);
2056 if (pf
->fragstat
.nfiles
< 0)
2057 pf
->fragstat
.nfiles
= 0;
2058 if (pf
->fragstat
.nsubdirs
< 0)
2059 pf
->fragstat
.nsubdirs
= 0;
2063 pf
->accounted_fragstat
= pf
->fragstat
;
2064 pf
->fragstat
.version
= pf
->accounted_fragstat
.version
= pi
->dirstat
.version
;
2065 dout(10) << fg
<< " updated accounted_fragstat " << pf
->fragstat
<< " on " << *dir
<< dendl
;
2068 tmpdft
.force_to_leaf(g_ceph_context
, fg
);
2069 dirstat
.add(pf
->fragstat
);
2072 pi
->mtime
= pi
->ctime
= pi
->dirstat
.mtime
;
2074 pi
->change_attr
= pi
->dirstat
.change_attr
;
2075 dout(20) << " final dirstat " << pi
->dirstat
<< dendl
;
2077 if (dirstat_valid
&& !dirstat
.same_sums(pi
->dirstat
)) {
2079 tmpdft
.get_leaves_under(frag_t(), ls
);
2080 for (list
<frag_t
>::iterator p
= ls
.begin(); p
!= ls
.end(); ++p
)
2081 if (!dirfrags
.count(*p
)) {
2082 dirstat_valid
= false;
2085 if (dirstat_valid
) {
2086 if (state_test(CInode::STATE_REPAIRSTATS
)) {
2087 dout(20) << " dirstat mismatch, fixing" << dendl
;
2089 clog
->error() << "unmatched fragstat on " << ino() << ", inode has "
2090 << pi
->dirstat
<< ", dirfrags have " << dirstat
;
2091 assert(!"unmatched fragstat" == g_conf
->mds_verify_scatter
);
2093 // trust the dirfrags for now
2094 version_t v
= pi
->dirstat
.version
;
2095 if (pi
->dirstat
.mtime
> dirstat
.mtime
)
2096 dirstat
.mtime
= pi
->dirstat
.mtime
;
2097 if (pi
->dirstat
.change_attr
> dirstat
.change_attr
)
2098 dirstat
.change_attr
= pi
->dirstat
.change_attr
;
2099 pi
->dirstat
= dirstat
;
2100 pi
->dirstat
.version
= v
;
2104 if (pi
->dirstat
.nfiles
< 0 || pi
->dirstat
.nsubdirs
< 0)
2107 make_path_string(path
);
2108 clog
->error() << "Inconsistent statistics detected: fragstat on inode "
2109 << ino() << " (" << path
<< "), inode has " << pi
->dirstat
;
2110 assert(!"bad/negative fragstat" == g_conf
->mds_verify_scatter
);
2112 if (pi
->dirstat
.nfiles
< 0)
2113 pi
->dirstat
.nfiles
= 0;
2114 if (pi
->dirstat
.nsubdirs
< 0)
2115 pi
->dirstat
.nsubdirs
= 0;
2120 case CEPH_LOCK_INEST
:
2122 fragtree_t tmpdft
= dirfragtree
;
2125 bool rstat_valid
= true;
2129 mempool_inode
*pi
= get_projected_inode();
2130 dout(20) << " orig rstat " << pi
->rstat
<< dendl
;
2131 pi
->rstat
.version
++;
2132 for (const auto &p
: dirfrags
) {
2133 frag_t fg
= p
.first
;
2134 CDir
*dir
= p
.second
;
2135 dout(20) << fg
<< " " << *dir
<< dendl
;
2138 if (dir
->get_version() != 0) {
2139 update
= dir
->is_auth() && !dir
->is_frozen();
2142 rstat_valid
= false;
2145 fnode_t
*pf
= dir
->get_projected_fnode();
2147 pf
= dir
->project_fnode();
2149 if (pf
->accounted_rstat
.version
== pi
->rstat
.version
-1) {
2150 // only pull this frag's dirty rstat inodes into the frag if
2151 // the frag is non-stale and updateable. if it's stale,
2152 // that info will just get thrown out!
2154 dir
->assimilate_dirty_rstat_inodes();
2156 dout(20) << fg
<< " rstat " << pf
->rstat
<< dendl
;
2157 dout(20) << fg
<< " accounted_rstat " << pf
->accounted_rstat
<< dendl
;
2158 dout(20) << fg
<< " dirty_old_rstat " << dir
->dirty_old_rstat
<< dendl
;
2159 mdcache
->project_rstat_frag_to_inode(pf
->rstat
, pf
->accounted_rstat
,
2160 dir
->first
, CEPH_NOSNAP
, this, true);
2161 for (auto &p
: dir
->dirty_old_rstat
) {
2162 mdcache
->project_rstat_frag_to_inode(p
.second
.rstat
, p
.second
.accounted_rstat
,
2163 p
.second
.first
, p
.first
, this, true);
2165 if (update
) // dir contents not valid if frozen or non-auth
2166 dir
->check_rstats();
2168 dout(20) << fg
<< " skipping STALE accounted_rstat " << pf
->accounted_rstat
<< dendl
;
2171 pf
->accounted_rstat
= pf
->rstat
;
2172 dir
->dirty_old_rstat
.clear();
2173 pf
->rstat
.version
= pf
->accounted_rstat
.version
= pi
->rstat
.version
;
2174 dir
->check_rstats();
2175 dout(10) << fg
<< " updated accounted_rstat " << pf
->rstat
<< " on " << *dir
<< dendl
;
2178 tmpdft
.force_to_leaf(g_ceph_context
, fg
);
2179 rstat
.add(pf
->rstat
);
2181 dout(20) << " final rstat " << pi
->rstat
<< dendl
;
2183 if (rstat_valid
&& !rstat
.same_sums(pi
->rstat
)) {
2185 tmpdft
.get_leaves_under(frag_t(), ls
);
2186 for (list
<frag_t
>::iterator p
= ls
.begin(); p
!= ls
.end(); ++p
)
2187 if (!dirfrags
.count(*p
)) {
2188 rstat_valid
= false;
2192 if (state_test(CInode::STATE_REPAIRSTATS
)) {
2193 dout(20) << " rstat mismatch, fixing" << dendl
;
2195 clog
->error() << "inconsistent rstat on inode " << ino()
2196 << ", inode has " << pi
->rstat
2197 << ", directory fragments have " << rstat
;
2198 assert(!"unmatched rstat" == g_conf
->mds_verify_scatter
);
2200 // trust the dirfrag for now
2201 version_t v
= pi
->rstat
.version
;
2202 if (pi
->rstat
.rctime
> rstat
.rctime
)
2203 rstat
.rctime
= pi
->rstat
.rctime
;
2205 pi
->rstat
.version
= v
;
2209 mdcache
->broadcast_quota_to_client(this);
2213 case CEPH_LOCK_IDFT
:
2221 void CInode::finish_scatter_gather_update_accounted(int type
, MutationRef
& mut
, EMetaBlob
*metablob
)
2223 dout(10) << "finish_scatter_gather_update_accounted " << type
<< " on " << *this << dendl
;
2226 for (const auto &p
: dirfrags
) {
2227 CDir
*dir
= p
.second
;
2228 if (!dir
->is_auth() || dir
->get_version() == 0 || dir
->is_frozen())
2231 if (type
== CEPH_LOCK_IDFT
)
2232 continue; // nothing to do.
2234 dout(10) << " journaling updated frag accounted_ on " << *dir
<< dendl
;
2235 assert(dir
->is_projected());
2236 fnode_t
*pf
= dir
->get_projected_fnode();
2237 pf
->version
= dir
->pre_dirty();
2238 mut
->add_projected_fnode(dir
);
2239 metablob
->add_dir(dir
, true);
2242 if (type
== CEPH_LOCK_INEST
)
2243 dir
->assimilate_dirty_rstat_inodes_finish(mut
, metablob
);
2249 bool CInode::is_frozen() const
2251 if (is_frozen_inode()) return true;
2252 if (parent
&& parent
->dir
->is_frozen()) return true;
2256 bool CInode::is_frozen_dir() const
2258 if (parent
&& parent
->dir
->is_frozen_dir()) return true;
2262 bool CInode::is_freezing() const
2264 if (is_freezing_inode()) return true;
2265 if (parent
&& parent
->dir
->is_freezing()) return true;
2269 void CInode::add_dir_waiter(frag_t fg
, MDSInternalContextBase
*c
)
2271 if (waiting_on_dir
.empty())
2273 waiting_on_dir
[fg
].push_back(c
);
2274 dout(10) << "add_dir_waiter frag " << fg
<< " " << c
<< " on " << *this << dendl
;
2277 void CInode::take_dir_waiting(frag_t fg
, list
<MDSInternalContextBase
*>& ls
)
2279 if (waiting_on_dir
.empty())
2282 auto it
= waiting_on_dir
.find(fg
);
2283 if (it
!= waiting_on_dir
.end()) {
2284 dout(10) << __func__
<< " frag " << fg
<< " on " << *this << dendl
;
2285 ls
.splice(ls
.end(), it
->second
);
2286 waiting_on_dir
.erase(it
);
2288 if (waiting_on_dir
.empty())
2293 void CInode::add_waiter(uint64_t tag
, MDSInternalContextBase
*c
)
2295 dout(10) << "add_waiter tag " << std::hex
<< tag
<< std::dec
<< " " << c
2296 << " !ambig " << !state_test(STATE_AMBIGUOUSAUTH
)
2297 << " !frozen " << !is_frozen_inode()
2298 << " !freezing " << !is_freezing_inode()
2300 // wait on the directory?
2301 // make sure its not the inode that is explicitly ambiguous|freezing|frozen
2302 if (((tag
& WAIT_SINGLEAUTH
) && !state_test(STATE_AMBIGUOUSAUTH
)) ||
2303 ((tag
& WAIT_UNFREEZE
) &&
2304 !is_frozen_inode() && !is_freezing_inode() && !is_frozen_auth_pin())) {
2305 dout(15) << "passing waiter up tree" << dendl
;
2306 parent
->dir
->add_waiter(tag
, c
);
2309 dout(15) << "taking waiter here" << dendl
;
2310 MDSCacheObject::add_waiter(tag
, c
);
2313 void CInode::take_waiting(uint64_t mask
, list
<MDSInternalContextBase
*>& ls
)
2315 if ((mask
& WAIT_DIR
) && !waiting_on_dir
.empty()) {
2316 // take all dentry waiters
2317 while (!waiting_on_dir
.empty()) {
2318 auto it
= waiting_on_dir
.begin();
2319 dout(10) << __func__
<< " dirfrag " << it
->first
<< " on " << *this << dendl
;
2320 ls
.splice(ls
.end(), it
->second
);
2321 waiting_on_dir
.erase(it
);
2327 MDSCacheObject::take_waiting(mask
, ls
);
2330 bool CInode::freeze_inode(int auth_pin_allowance
)
2332 assert(auth_pin_allowance
> 0); // otherwise we need to adjust parent's nested_auth_pins
2333 assert(auth_pins
>= auth_pin_allowance
);
2334 if (auth_pins
> auth_pin_allowance
) {
2335 dout(10) << "freeze_inode - waiting for auth_pins to drop to " << auth_pin_allowance
<< dendl
;
2336 auth_pin_freeze_allowance
= auth_pin_allowance
;
2338 state_set(STATE_FREEZING
);
2342 dout(10) << "freeze_inode - frozen" << dendl
;
2343 assert(auth_pins
== auth_pin_allowance
);
2344 if (!state_test(STATE_FROZEN
)) {
2346 state_set(STATE_FROZEN
);
2351 void CInode::unfreeze_inode(list
<MDSInternalContextBase
*>& finished
)
2353 dout(10) << "unfreeze_inode" << dendl
;
2354 if (state_test(STATE_FREEZING
)) {
2355 state_clear(STATE_FREEZING
);
2357 } else if (state_test(STATE_FROZEN
)) {
2358 state_clear(STATE_FROZEN
);
2362 take_waiting(WAIT_UNFREEZE
, finished
);
2365 void CInode::unfreeze_inode()
2367 list
<MDSInternalContextBase
*> finished
;
2368 unfreeze_inode(finished
);
2369 mdcache
->mds
->queue_waiters(finished
);
2372 void CInode::freeze_auth_pin()
2374 assert(state_test(CInode::STATE_FROZEN
));
2375 state_set(CInode::STATE_FROZENAUTHPIN
);
2378 void CInode::unfreeze_auth_pin()
2380 assert(state_test(CInode::STATE_FROZENAUTHPIN
));
2381 state_clear(CInode::STATE_FROZENAUTHPIN
);
2382 if (!state_test(STATE_FREEZING
|STATE_FROZEN
)) {
2383 list
<MDSInternalContextBase
*> finished
;
2384 take_waiting(WAIT_UNFREEZE
, finished
);
2385 mdcache
->mds
->queue_waiters(finished
);
2389 void CInode::clear_ambiguous_auth(list
<MDSInternalContextBase
*>& finished
)
2391 assert(state_test(CInode::STATE_AMBIGUOUSAUTH
));
2392 state_clear(CInode::STATE_AMBIGUOUSAUTH
);
2393 take_waiting(CInode::WAIT_SINGLEAUTH
, finished
);
2396 void CInode::clear_ambiguous_auth()
2398 list
<MDSInternalContextBase
*> finished
;
2399 clear_ambiguous_auth(finished
);
2400 mdcache
->mds
->queue_waiters(finished
);
2404 bool CInode::can_auth_pin() const {
2405 if (!is_auth() || is_freezing_inode() || is_frozen_inode() || is_frozen_auth_pin())
2408 return parent
->can_auth_pin();
2412 void CInode::auth_pin(void *by
)
2418 #ifdef MDS_AUTHPIN_SET
2419 auth_pin_set
.insert(by
);
2422 dout(10) << "auth_pin by " << by
<< " on " << *this
2423 << " now " << auth_pins
<< "+" << nested_auth_pins
2427 parent
->adjust_nested_auth_pins(1, 1, this);
2430 void CInode::auth_unpin(void *by
)
2434 #ifdef MDS_AUTHPIN_SET
2435 assert(auth_pin_set
.count(by
));
2436 auth_pin_set
.erase(auth_pin_set
.find(by
));
2442 dout(10) << "auth_unpin by " << by
<< " on " << *this
2443 << " now " << auth_pins
<< "+" << nested_auth_pins
2446 assert(auth_pins
>= 0);
2449 parent
->adjust_nested_auth_pins(-1, -1, by
);
2451 if (is_freezing_inode() &&
2452 auth_pins
== auth_pin_freeze_allowance
) {
2453 dout(10) << "auth_unpin freezing!" << dendl
;
2456 state_clear(STATE_FREEZING
);
2457 state_set(STATE_FROZEN
);
2458 finish_waiting(WAIT_FROZEN
);
2462 void CInode::adjust_nested_auth_pins(int a
, void *by
)
2465 nested_auth_pins
+= a
;
2466 dout(35) << "adjust_nested_auth_pins by " << by
2467 << " change " << a
<< " yields "
2468 << auth_pins
<< "+" << nested_auth_pins
<< dendl
;
2469 assert(nested_auth_pins
>= 0);
2471 if (g_conf
->mds_debug_auth_pins
) {
2474 for (const auto &p
: dirfrags
) {
2475 CDir
*dir
= p
.second
;
2476 if (!dir
->is_subtree_root() && dir
->get_cum_auth_pins())
2479 assert(s
== nested_auth_pins
);
2483 parent
->adjust_nested_auth_pins(a
, 0, by
);
2489 mds_authority_t
CInode::authority() const
2491 if (inode_auth
.first
>= 0)
2495 return parent
->dir
->authority();
2497 // new items that are not yet linked in (in the committed plane) belong
2498 // to their first parent.
2499 if (!projected_parent
.empty())
2500 return projected_parent
.front()->dir
->authority();
2502 return CDIR_AUTH_UNDEF
;
2508 snapid_t
CInode::get_oldest_snap()
2511 if (!old_inodes
.empty())
2512 t
= old_inodes
.begin()->second
.first
;
2513 return MIN(t
, oldest_snap
);
2516 CInode::mempool_old_inode
& CInode::cow_old_inode(snapid_t follows
, bool cow_head
)
2518 assert(follows
>= first
);
2520 mempool_inode
*pi
= cow_head
? get_projected_inode() : get_previous_projected_inode();
2521 mempool_xattr_map
*px
= cow_head
? get_projected_xattrs() : get_previous_projected_xattrs();
2523 mempool_old_inode
&old
= old_inodes
[follows
];
2528 if (first
< oldest_snap
)
2529 oldest_snap
= first
;
2531 dout(10) << " " << px
->size() << " xattrs cowed, " << *px
<< dendl
;
2533 old
.inode
.trim_client_ranges(follows
);
2535 if (g_conf
->mds_snap_rstat
&&
2536 !(old
.inode
.rstat
== old
.inode
.accounted_rstat
))
2537 dirty_old_rstats
.insert(follows
);
2541 dout(10) << "cow_old_inode " << (cow_head
? "head" : "previous_head" )
2542 << " to [" << old
.first
<< "," << follows
<< "] on "
2548 void CInode::split_old_inode(snapid_t snap
)
2550 auto it
= old_inodes
.lower_bound(snap
);
2551 assert(it
!= old_inodes
.end() && it
->second
.first
< snap
);
2553 mempool_old_inode
&old
= old_inodes
[snap
- 1];
2556 it
->second
.first
= snap
;
2557 dout(10) << __func__
<< " " << "[" << old
.first
<< "," << it
->first
2558 << "] to [" << snap
<< "," << it
->first
<< "] on " << *this << dendl
;
2561 void CInode::pre_cow_old_inode()
2563 snapid_t follows
= find_snaprealm()->get_newest_seq();
2564 if (first
<= follows
)
2565 cow_old_inode(follows
, true);
2568 void CInode::purge_stale_snap_data(const set
<snapid_t
>& snaps
)
2570 dout(10) << "purge_stale_snap_data " << snaps
<< dendl
;
2572 for (auto it
= old_inodes
.begin(); it
!= old_inodes
.end(); ) {
2573 const snapid_t
&id
= it
->first
;
2574 const auto &s
= snaps
.lower_bound(it
->second
.first
);
2575 if (s
== snaps
.end() || *s
> id
) {
2576 dout(10) << " purging old_inode [" << it
->second
.first
<< "," << id
<< "]" << dendl
;
2577 it
= old_inodes
.erase(it
);
2585 * pick/create an old_inode
2587 CInode::mempool_old_inode
* CInode::pick_old_inode(snapid_t snap
)
2589 auto it
= old_inodes
.lower_bound(snap
); // p is first key >= to snap
2590 if (it
!= old_inodes
.end() && it
->second
.first
<= snap
) {
2591 dout(10) << __func__
<< " snap " << snap
<< " -> [" << it
->second
.first
<< "," << it
->first
<< "]" << dendl
;
2594 dout(10) << "pick_old_inode snap " << snap
<< " -> nothing" << dendl
;
2598 void CInode::open_snaprealm(bool nosplit
)
2601 SnapRealm
*parent
= find_snaprealm();
2602 snaprealm
= new SnapRealm(mdcache
, this);
2604 dout(10) << "open_snaprealm " << snaprealm
2605 << " parent is " << parent
2607 dout(30) << " siblings are " << parent
->open_children
<< dendl
;
2608 snaprealm
->parent
= parent
;
2610 parent
->split_at(snaprealm
);
2611 parent
->open_children
.insert(snaprealm
);
2615 void CInode::close_snaprealm(bool nojoin
)
2618 dout(15) << "close_snaprealm " << *snaprealm
<< dendl
;
2619 snaprealm
->close_parents();
2620 if (snaprealm
->parent
) {
2621 snaprealm
->parent
->open_children
.erase(snaprealm
);
2623 //snaprealm->parent->join(snaprealm);
2630 SnapRealm
*CInode::find_snaprealm() const
2632 const CInode
*cur
= this;
2633 while (!cur
->snaprealm
) {
2634 if (cur
->get_parent_dn())
2635 cur
= cur
->get_parent_dn()->get_dir()->get_inode();
2636 else if (get_projected_parent_dn())
2637 cur
= cur
->get_projected_parent_dn()->get_dir()->get_inode();
2641 return cur
->snaprealm
;
2644 void CInode::encode_snap_blob(bufferlist
&snapbl
)
2647 ::encode(snaprealm
->srnode
, snapbl
);
2648 dout(20) << "encode_snap_blob " << *snaprealm
<< dendl
;
2651 void CInode::decode_snap_blob(bufferlist
& snapbl
)
2653 if (snapbl
.length()) {
2655 bufferlist::iterator p
= snapbl
.begin();
2656 ::decode(snaprealm
->srnode
, p
);
2658 bool ok
= snaprealm
->_open_parents(NULL
);
2661 dout(20) << "decode_snap_blob " << *snaprealm
<< dendl
;
2665 void CInode::encode_snap(bufferlist
& bl
)
2668 encode_snap_blob(snapbl
);
2669 ::encode(snapbl
, bl
);
2670 ::encode(oldest_snap
, bl
);
2673 void CInode::decode_snap(bufferlist::iterator
& p
)
2676 ::decode(snapbl
, p
);
2677 ::decode(oldest_snap
, p
);
2678 decode_snap_blob(snapbl
);
2681 // =============================================
2683 client_t
CInode::calc_ideal_loner()
2685 if (mdcache
->is_readonly())
2687 if (!mds_caps_wanted
.empty())
2691 client_t loner
= -1;
2692 for (map
<client_t
,Capability
*>::iterator it
= client_caps
.begin();
2693 it
!= client_caps
.end();
2695 if (!it
->second
->is_stale() &&
2696 ((it
->second
->wanted() & (CEPH_CAP_ANY_WR
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_RD
)) ||
2697 (inode
.is_dir() && !has_subtree_root_dirfrag()))) {
2706 bool CInode::choose_ideal_loner()
2708 want_loner_cap
= calc_ideal_loner();
2709 int changed
= false;
2710 if (loner_cap
>= 0 && loner_cap
!= want_loner_cap
) {
2711 if (!try_drop_loner())
2716 if (want_loner_cap
>= 0) {
2717 if (loner_cap
< 0) {
2718 set_loner_cap(want_loner_cap
);
2721 assert(loner_cap
== want_loner_cap
);
2726 bool CInode::try_set_loner()
2728 assert(want_loner_cap
>= 0);
2729 if (loner_cap
>= 0 && loner_cap
!= want_loner_cap
)
2731 set_loner_cap(want_loner_cap
);
2735 void CInode::set_loner_cap(client_t l
)
2738 authlock
.set_excl_client(loner_cap
);
2739 filelock
.set_excl_client(loner_cap
);
2740 linklock
.set_excl_client(loner_cap
);
2741 xattrlock
.set_excl_client(loner_cap
);
2744 bool CInode::try_drop_loner()
2749 int other_allowed
= get_caps_allowed_by_type(CAP_ANY
);
2750 Capability
*cap
= get_client_cap(loner_cap
);
2752 (cap
->issued() & ~other_allowed
) == 0) {
2760 // choose new lock state during recovery, based on issued caps
2761 void CInode::choose_lock_state(SimpleLock
*lock
, int allissued
)
2763 int shift
= lock
->get_cap_shift();
2764 int issued
= (allissued
>> shift
) & lock
->get_cap_mask();
2766 if (lock
->is_xlocked()) {
2768 } else if (lock
->get_state() != LOCK_MIX
) {
2769 if (issued
& (CEPH_CAP_GEXCL
| CEPH_CAP_GBUFFER
))
2770 lock
->set_state(LOCK_EXCL
);
2771 else if (issued
& CEPH_CAP_GWR
)
2772 lock
->set_state(LOCK_MIX
);
2773 else if (lock
->is_dirty()) {
2774 if (is_replicated())
2775 lock
->set_state(LOCK_MIX
);
2777 lock
->set_state(LOCK_LOCK
);
2779 lock
->set_state(LOCK_SYNC
);
2782 // our states have already been chosen during rejoin.
2783 if (lock
->is_xlocked())
2784 assert(lock
->get_state() == LOCK_LOCK
);
2788 void CInode::choose_lock_states(int dirty_caps
)
2790 int issued
= get_caps_issued() | dirty_caps
;
2791 if (is_auth() && (issued
& (CEPH_CAP_ANY_EXCL
|CEPH_CAP_ANY_WR
)))
2792 choose_ideal_loner();
2793 choose_lock_state(&filelock
, issued
);
2794 choose_lock_state(&nestlock
, issued
);
2795 choose_lock_state(&dirfragtreelock
, issued
);
2796 choose_lock_state(&authlock
, issued
);
2797 choose_lock_state(&xattrlock
, issued
);
2798 choose_lock_state(&linklock
, issued
);
2801 Capability
*CInode::add_client_cap(client_t client
, Session
*session
, SnapRealm
*conrealm
)
2803 if (client_caps
.empty()) {
2806 containing_realm
= conrealm
;
2808 containing_realm
= find_snaprealm();
2809 containing_realm
->inodes_with_caps
.push_back(&item_caps
);
2810 dout(10) << "add_client_cap first cap, joining realm " << *containing_realm
<< dendl
;
2813 if (client_caps
.empty())
2814 mdcache
->num_inodes_with_caps
++;
2816 Capability
*cap
= new Capability(this, ++mdcache
->last_cap_id
, client
);
2817 assert(client_caps
.count(client
) == 0);
2818 client_caps
[client
] = cap
;
2820 session
->add_cap(cap
);
2821 if (session
->is_stale())
2824 cap
->client_follows
= first
-1;
2826 containing_realm
->add_cap(client
, cap
);
2831 void CInode::remove_client_cap(client_t client
)
2833 assert(client_caps
.count(client
) == 1);
2834 Capability
*cap
= client_caps
[client
];
2836 cap
->item_session_caps
.remove_myself();
2837 cap
->item_revoking_caps
.remove_myself();
2838 cap
->item_client_revoking_caps
.remove_myself();
2839 containing_realm
->remove_cap(client
, cap
);
2841 if (client
== loner_cap
)
2845 client_caps
.erase(client
);
2846 if (client_caps
.empty()) {
2847 dout(10) << "remove_client_cap last cap, leaving realm " << *containing_realm
<< dendl
;
2849 item_caps
.remove_myself();
2850 containing_realm
= NULL
;
2851 item_open_file
.remove_myself(); // unpin logsegment
2852 mdcache
->num_inodes_with_caps
--;
2855 //clean up advisory locks
2856 bool fcntl_removed
= fcntl_locks
? fcntl_locks
->remove_all_from(client
) : false;
2857 bool flock_removed
= flock_locks
? flock_locks
->remove_all_from(client
) : false;
2858 if (fcntl_removed
|| flock_removed
) {
2859 list
<MDSInternalContextBase
*> waiters
;
2860 take_waiting(CInode::WAIT_FLOCK
, waiters
);
2861 mdcache
->mds
->queue_waiters(waiters
);
2865 void CInode::move_to_realm(SnapRealm
*realm
)
2867 dout(10) << "move_to_realm joining realm " << *realm
2868 << ", leaving realm " << *containing_realm
<< dendl
;
2869 for (map
<client_t
,Capability
*>::iterator q
= client_caps
.begin();
2870 q
!= client_caps
.end();
2872 containing_realm
->remove_cap(q
->first
, q
->second
);
2873 realm
->add_cap(q
->first
, q
->second
);
2875 item_caps
.remove_myself();
2876 realm
->inodes_with_caps
.push_back(&item_caps
);
2877 containing_realm
= realm
;
2880 Capability
*CInode::reconnect_cap(client_t client
, const cap_reconnect_t
& icr
, Session
*session
)
2882 Capability
*cap
= get_client_cap(client
);
2885 cap
->merge(icr
.capinfo
.wanted
, icr
.capinfo
.issued
);
2887 cap
= add_client_cap(client
, session
);
2888 cap
->set_cap_id(icr
.capinfo
.cap_id
);
2889 cap
->set_wanted(icr
.capinfo
.wanted
);
2890 cap
->issue_norevoke(icr
.capinfo
.issued
);
2893 cap
->set_last_issue_stamp(ceph_clock_now());
2897 void CInode::clear_client_caps_after_export()
2899 while (!client_caps
.empty())
2900 remove_client_cap(client_caps
.begin()->first
);
2902 want_loner_cap
= -1;
2903 mds_caps_wanted
.clear();
2906 void CInode::export_client_caps(map
<client_t
,Capability::Export
>& cl
)
2908 for (map
<client_t
,Capability
*>::iterator it
= client_caps
.begin();
2909 it
!= client_caps
.end();
2911 cl
[it
->first
] = it
->second
->make_export();
2916 int CInode::get_caps_liked() const
2919 return CEPH_CAP_PIN
| CEPH_CAP_ANY_EXCL
| CEPH_CAP_ANY_SHARED
; // but not, say, FILE_RD|WR|WRBUFFER
2921 return CEPH_CAP_ANY
& ~CEPH_CAP_FILE_LAZYIO
;
2924 int CInode::get_caps_allowed_ever() const
2928 allowed
= CEPH_CAP_PIN
| CEPH_CAP_ANY_EXCL
| CEPH_CAP_ANY_SHARED
;
2930 allowed
= CEPH_CAP_ANY
;
2933 (filelock
.gcaps_allowed_ever() << filelock
.get_cap_shift()) |
2934 (authlock
.gcaps_allowed_ever() << authlock
.get_cap_shift()) |
2935 (xattrlock
.gcaps_allowed_ever() << xattrlock
.get_cap_shift()) |
2936 (linklock
.gcaps_allowed_ever() << linklock
.get_cap_shift()));
2939 int CInode::get_caps_allowed_by_type(int type
) const
2943 (filelock
.gcaps_allowed(type
) << filelock
.get_cap_shift()) |
2944 (authlock
.gcaps_allowed(type
) << authlock
.get_cap_shift()) |
2945 (xattrlock
.gcaps_allowed(type
) << xattrlock
.get_cap_shift()) |
2946 (linklock
.gcaps_allowed(type
) << linklock
.get_cap_shift());
2949 int CInode::get_caps_careful() const
2952 (filelock
.gcaps_careful() << filelock
.get_cap_shift()) |
2953 (authlock
.gcaps_careful() << authlock
.get_cap_shift()) |
2954 (xattrlock
.gcaps_careful() << xattrlock
.get_cap_shift()) |
2955 (linklock
.gcaps_careful() << linklock
.get_cap_shift());
2958 int CInode::get_xlocker_mask(client_t client
) const
2961 (filelock
.gcaps_xlocker_mask(client
) << filelock
.get_cap_shift()) |
2962 (authlock
.gcaps_xlocker_mask(client
) << authlock
.get_cap_shift()) |
2963 (xattrlock
.gcaps_xlocker_mask(client
) << xattrlock
.get_cap_shift()) |
2964 (linklock
.gcaps_xlocker_mask(client
) << linklock
.get_cap_shift());
2967 int CInode::get_caps_allowed_for_client(Session
*session
, mempool_inode
*file_i
) const
2969 client_t client
= session
->info
.inst
.name
.num();
2971 if (client
== get_loner()) {
2972 // as the loner, we get the loner_caps AND any xlocker_caps for things we have xlocked
2974 get_caps_allowed_by_type(CAP_LONER
) |
2975 (get_caps_allowed_by_type(CAP_XLOCKER
) & get_xlocker_mask(client
));
2977 allowed
= get_caps_allowed_by_type(CAP_ANY
);
2981 if ((file_i
->inline_data
.version
!= CEPH_INLINE_NONE
&&
2982 !session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)) ||
2983 (!file_i
->layout
.pool_ns
.empty() &&
2984 !session
->connection
->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2
)))
2985 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2990 // caps issued, wanted
2991 int CInode::get_caps_issued(int *ploner
, int *pother
, int *pxlocker
,
2992 int shift
, int mask
)
2995 int loner
= 0, other
= 0, xlocker
= 0;
3000 for (map
<client_t
,Capability
*>::const_iterator it
= client_caps
.begin();
3001 it
!= client_caps
.end();
3003 int i
= it
->second
->issued();
3005 if (it
->first
== loner_cap
)
3009 xlocker
|= get_xlocker_mask(it
->first
) & i
;
3011 if (ploner
) *ploner
= (loner
>> shift
) & mask
;
3012 if (pother
) *pother
= (other
>> shift
) & mask
;
3013 if (pxlocker
) *pxlocker
= (xlocker
>> shift
) & mask
;
3014 return (c
>> shift
) & mask
;
3017 bool CInode::is_any_caps_wanted() const
3019 for (map
<client_t
,Capability
*>::const_iterator it
= client_caps
.begin();
3020 it
!= client_caps
.end();
3022 if (it
->second
->wanted())
3027 int CInode::get_caps_wanted(int *ploner
, int *pother
, int shift
, int mask
) const
3030 int loner
= 0, other
= 0;
3031 for (map
<client_t
,Capability
*>::const_iterator it
= client_caps
.begin();
3032 it
!= client_caps
.end();
3034 if (!it
->second
->is_stale()) {
3035 int t
= it
->second
->wanted();
3037 if (it
->first
== loner_cap
)
3042 //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
3045 for (const auto &p
: mds_caps_wanted
) {
3048 //cout << " get_caps_wanted mds " << it->first << " " << cap_string(it->second) << endl;
3050 if (ploner
) *ploner
= (loner
>> shift
) & mask
;
3051 if (pother
) *pother
= (other
>> shift
) & mask
;
3052 return (w
>> shift
) & mask
;
3055 bool CInode::issued_caps_need_gather(SimpleLock
*lock
)
3057 int loner_issued
, other_issued
, xlocker_issued
;
3058 get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
3059 lock
->get_cap_shift(), lock
->get_cap_mask());
3060 if ((loner_issued
& ~lock
->gcaps_allowed(CAP_LONER
)) ||
3061 (other_issued
& ~lock
->gcaps_allowed(CAP_ANY
)) ||
3062 (xlocker_issued
& ~lock
->gcaps_allowed(CAP_XLOCKER
)))
3067 void CInode::replicate_relax_locks()
3069 //dout(10) << " relaxing locks on " << *this << dendl;
3071 assert(!is_replicated());
3073 authlock
.replicate_relax();
3074 linklock
.replicate_relax();
3075 dirfragtreelock
.replicate_relax();
3076 filelock
.replicate_relax();
3077 xattrlock
.replicate_relax();
3078 snaplock
.replicate_relax();
3079 nestlock
.replicate_relax();
3080 flocklock
.replicate_relax();
3081 policylock
.replicate_relax();
3086 // =============================================
3088 int CInode::encode_inodestat(bufferlist
& bl
, Session
*session
,
3089 SnapRealm
*dir_realm
,
3094 client_t client
= session
->info
.inst
.name
.num();
3096 assert(session
->connection
);
3101 mempool_inode
*oi
= &inode
;
3102 mempool_inode
*pi
= get_projected_inode();
3104 CInode::mempool_xattr_map
*pxattrs
= nullptr;
3106 if (snapid
!= CEPH_NOSNAP
) {
3108 // for now at least, old_inodes is only defined/valid on the auth
3112 if (is_multiversion()) {
3113 auto it
= old_inodes
.lower_bound(snapid
);
3114 if (it
!= old_inodes
.end()) {
3115 if (it
->second
.first
> snapid
) {
3116 if (it
!= old_inodes
.begin())
3119 if (it
->second
.first
<= snapid
&& snapid
<= it
->first
) {
3120 dout(15) << __func__
<< " snapid " << snapid
3121 << " to old_inode [" << it
->second
.first
<< "," << it
->first
<< "]"
3122 << " " << it
->second
.inode
.rstat
3124 auto &p
= it
->second
;
3126 pxattrs
= &p
.xattrs
;
3128 // snapshoted remote dentry can result this
3129 dout(0) << "encode_inodestat old_inode for snapid " << snapid
3130 << " not found" << dendl
;
3133 } else if (snapid
< first
|| snapid
> last
) {
3134 // snapshoted remote dentry can result this
3135 dout(0) << "encode_inodestat [" << first
<< "," << last
<< "]"
3136 << " not match snapid " << snapid
<< dendl
;
3140 SnapRealm
*realm
= find_snaprealm();
3142 bool no_caps
= !valid
||
3143 session
->is_stale() ||
3144 (dir_realm
&& realm
!= dir_realm
) ||
3146 state_test(CInode::STATE_EXPORTINGCAPS
);
3148 dout(20) << "encode_inodestat no caps"
3149 << (!valid
?", !valid":"")
3150 << (session
->is_stale()?", session stale ":"")
3151 << ((dir_realm
&& realm
!= dir_realm
)?", snaprealm differs ":"")
3152 << (is_frozen()?", frozen inode":"")
3153 << (state_test(CInode::STATE_EXPORTINGCAPS
)?", exporting caps":"")
3157 // "fake" a version that is old (stable) version, +1 if projected.
3158 version_t version
= (oi
->version
* 2) + is_projected();
3160 Capability
*cap
= get_client_cap(client
);
3161 bool pfile
= filelock
.is_xlocked_by_client(client
) || get_loner() == client
;
3162 //(cap && (cap->issued() & CEPH_CAP_FILE_EXCL));
3163 bool pauth
= authlock
.is_xlocked_by_client(client
) || get_loner() == client
;
3164 bool plink
= linklock
.is_xlocked_by_client(client
) || get_loner() == client
;
3165 bool pxattr
= xattrlock
.is_xlocked_by_client(client
) || get_loner() == client
;
3167 bool plocal
= versionlock
.get_last_wrlock_client() == client
;
3168 bool ppolicy
= policylock
.is_xlocked_by_client(client
) || get_loner()==client
;
3170 mempool_inode
*any_i
= (pfile
|pauth
|plink
|pxattr
|plocal
) ? pi
: oi
;
3172 dout(20) << " pfile " << pfile
<< " pauth " << pauth
3173 << " plink " << plink
<< " pxattr " << pxattr
3174 << " plocal " << plocal
3175 << " ctime " << any_i
->ctime
3176 << " valid=" << valid
<< dendl
;
3179 mempool_inode
*file_i
= pfile
? pi
:oi
;
3180 file_layout_t layout
;
3182 layout
= (ppolicy
? pi
: oi
)->layout
;
3184 layout
= file_i
->layout
;
3187 // max_size is min of projected, actual
3189 MIN(oi
->client_ranges
.count(client
) ?
3190 oi
->client_ranges
[client
].range
.last
: 0,
3191 pi
->client_ranges
.count(client
) ?
3192 pi
->client_ranges
[client
].range
.last
: 0);
3195 version_t inline_version
= 0;
3196 bufferlist inline_data
;
3197 if (file_i
->inline_data
.version
== CEPH_INLINE_NONE
) {
3198 inline_version
= CEPH_INLINE_NONE
;
3199 } else if ((!cap
&& !no_caps
) ||
3200 (cap
&& cap
->client_inline_version
< file_i
->inline_data
.version
) ||
3201 (getattr_caps
& CEPH_CAP_FILE_RD
)) { // client requests inline data
3202 inline_version
= file_i
->inline_data
.version
;
3203 if (file_i
->inline_data
.length() > 0)
3204 inline_data
= file_i
->inline_data
.get_data();
3207 // nest (do same as file... :/)
3209 cap
->last_rbytes
= file_i
->rstat
.rbytes
;
3210 cap
->last_rsize
= file_i
->rstat
.rsize();
3214 mempool_inode
*auth_i
= pauth
? pi
:oi
;
3217 mempool_inode
*link_i
= plink
? pi
:oi
;
3220 mempool_inode
*xattr_i
= pxattr
? pi
:oi
;
3224 version_t xattr_version
;
3225 if ((!cap
&& !no_caps
) ||
3226 (cap
&& cap
->client_xattr_version
< xattr_i
->xattr_version
) ||
3227 (getattr_caps
& CEPH_CAP_XATTR_SHARED
)) { // client requests xattrs
3229 pxattrs
= pxattr
? get_projected_xattrs() : &xattrs
;
3230 ::encode(*pxattrs
, xbl
);
3231 xattr_version
= xattr_i
->xattr_version
;
3238 unsigned bytes
= 8 + 8 + 4 + 8 + 8 + sizeof(ceph_mds_reply_cap
) +
3239 sizeof(struct ceph_file_layout
) + 4 + layout
.pool_ns
.size() +
3240 sizeof(struct ceph_timespec
) * 3 +
3241 4 + 8 + 8 + 8 + 4 + 4 + 4 + 4 + 4 +
3242 8 + 8 + 8 + 8 + 8 + sizeof(struct ceph_timespec
) +
3244 bytes
+= sizeof(__u32
);
3245 bytes
+= (sizeof(__u32
) + sizeof(__u32
)) * dirfragtree
._splits
.size();
3246 bytes
+= sizeof(__u32
) + symlink
.length();
3247 bytes
+= sizeof(__u32
) + xbl
.length();
3248 bytes
+= sizeof(version_t
) + sizeof(__u32
) + inline_data
.length();
3249 if (bytes
> max_bytes
)
3255 struct ceph_mds_reply_cap ecap
;
3256 if (snapid
!= CEPH_NOSNAP
) {
3258 * snapped inodes (files or dirs) only get read-only caps. always
3259 * issue everything possible, since it is read only.
3261 * if a snapped inode has caps, limit issued caps based on the
3264 * if it is a live inode, limit issued caps based on the lock
3267 * do NOT adjust cap issued state, because the client always
3268 * tracks caps per-snap and the mds does either per-interval or
3271 ecap
.caps
= valid
? get_caps_allowed_by_type(CAP_ANY
) : CEPH_STAT_CAP_INODE
;
3272 if (last
== CEPH_NOSNAP
|| is_any_caps())
3273 ecap
.caps
= ecap
.caps
& get_caps_allowed_for_client(session
, file_i
);
3278 if (!no_caps
&& !cap
) {
3280 cap
= add_client_cap(client
, session
, realm
);
3282 choose_ideal_loner();
3286 if (!no_caps
&& cap
) {
3287 int likes
= get_caps_liked();
3288 int allowed
= get_caps_allowed_for_client(session
, file_i
);
3289 issue
= (cap
->wanted() | likes
) & allowed
;
3290 cap
->issue_norevoke(issue
);
3291 issue
= cap
->pending();
3292 dout(10) << "encode_inodestat issuing " << ccap_string(issue
)
3293 << " seq " << cap
->get_last_seq() << dendl
;
3294 } else if (cap
&& cap
->is_new() && !dir_realm
) {
3295 // alway issue new caps to client, otherwise the caps get lost
3296 assert(cap
->is_stale());
3297 issue
= cap
->pending() | CEPH_CAP_PIN
;
3298 cap
->issue_norevoke(issue
);
3299 dout(10) << "encode_inodestat issuing " << ccap_string(issue
)
3300 << " seq " << cap
->get_last_seq()
3301 << "(stale|new caps)" << dendl
;
3305 cap
->set_last_issue();
3306 cap
->set_last_issue_stamp(ceph_clock_now());
3309 ecap
.wanted
= cap
->wanted();
3310 ecap
.cap_id
= cap
->get_cap_id();
3311 ecap
.seq
= cap
->get_last_seq();
3312 ecap
.mseq
= cap
->get_mseq();
3313 ecap
.realm
= realm
->inode
->ino();
3323 ecap
.flags
= is_auth() ? CEPH_CAP_FLAG_AUTH
: 0;
3324 dout(10) << "encode_inodestat caps " << ccap_string(ecap
.caps
)
3325 << " seq " << ecap
.seq
<< " mseq " << ecap
.mseq
3326 << " xattrv " << xattr_version
<< " len " << xbl
.length()
3329 if (inline_data
.length() && cap
) {
3330 if ((cap
->pending() | getattr_caps
) & CEPH_CAP_FILE_SHARED
) {
3331 dout(10) << "including inline version " << inline_version
<< dendl
;
3332 cap
->client_inline_version
= inline_version
;
3334 dout(10) << "dropping inline version " << inline_version
<< dendl
;
3336 inline_data
.clear();
3340 // include those xattrs?
3341 if (xbl
.length() && cap
) {
3342 if ((cap
->pending() | getattr_caps
) & CEPH_CAP_XATTR_SHARED
) {
3343 dout(10) << "including xattrs version " << xattr_i
->xattr_version
<< dendl
;
3344 cap
->client_xattr_version
= xattr_i
->xattr_version
;
3346 dout(10) << "dropping xattrs version " << xattr_i
->xattr_version
<< dendl
;
3347 xbl
.clear(); // no xattrs .. XXX what's this about?!?
3353 * note: encoding matches MClientReply::InodeStat
3355 ::encode(oi
->ino
, bl
);
3356 ::encode(snapid
, bl
);
3357 ::encode(oi
->rdev
, bl
);
3358 ::encode(version
, bl
);
3360 ::encode(xattr_version
, bl
);
3364 ceph_file_layout legacy_layout
;
3365 layout
.to_legacy(&legacy_layout
);
3366 ::encode(legacy_layout
, bl
);
3368 ::encode(any_i
->ctime
, bl
);
3369 ::encode(file_i
->mtime
, bl
);
3370 ::encode(file_i
->atime
, bl
);
3371 ::encode(file_i
->time_warp_seq
, bl
);
3372 ::encode(file_i
->size
, bl
);
3373 ::encode(max_size
, bl
);
3374 ::encode(file_i
->truncate_size
, bl
);
3375 ::encode(file_i
->truncate_seq
, bl
);
3377 ::encode(auth_i
->mode
, bl
);
3378 ::encode((uint32_t)auth_i
->uid
, bl
);
3379 ::encode((uint32_t)auth_i
->gid
, bl
);
3381 ::encode(link_i
->nlink
, bl
);
3383 ::encode(file_i
->dirstat
.nfiles
, bl
);
3384 ::encode(file_i
->dirstat
.nsubdirs
, bl
);
3385 ::encode(file_i
->rstat
.rbytes
, bl
);
3386 ::encode(file_i
->rstat
.rfiles
, bl
);
3387 ::encode(file_i
->rstat
.rsubdirs
, bl
);
3388 ::encode(file_i
->rstat
.rctime
, bl
);
3390 dirfragtree
.encode(bl
);
3392 ::encode(symlink
, bl
);
3393 if (session
->connection
->has_feature(CEPH_FEATURE_DIRLAYOUTHASH
)) {
3394 ::encode(file_i
->dir_layout
, bl
);
3397 if (session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)) {
3398 ::encode(inline_version
, bl
);
3399 ::encode(inline_data
, bl
);
3401 if (session
->connection
->has_feature(CEPH_FEATURE_MDS_QUOTA
)) {
3402 mempool_inode
*policy_i
= ppolicy
? pi
: oi
;
3403 ::encode(policy_i
->quota
, bl
);
3405 if (session
->connection
->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2
)) {
3406 ::encode(layout
.pool_ns
, bl
);
3408 if (session
->connection
->has_feature(CEPH_FEATURE_FS_BTIME
)) {
3409 ::encode(any_i
->btime
, bl
);
3410 ::encode(any_i
->change_attr
, bl
);
3416 void CInode::encode_cap_message(MClientCaps
*m
, Capability
*cap
)
3420 client_t client
= cap
->get_client();
3422 bool pfile
= filelock
.is_xlocked_by_client(client
) || (cap
->issued() & CEPH_CAP_FILE_EXCL
);
3423 bool pauth
= authlock
.is_xlocked_by_client(client
);
3424 bool plink
= linklock
.is_xlocked_by_client(client
);
3425 bool pxattr
= xattrlock
.is_xlocked_by_client(client
);
3427 mempool_inode
*oi
= &inode
;
3428 mempool_inode
*pi
= get_projected_inode();
3429 mempool_inode
*i
= (pfile
|pauth
|plink
|pxattr
) ? pi
: oi
;
3431 dout(20) << "encode_cap_message pfile " << pfile
3432 << " pauth " << pauth
<< " plink " << plink
<< " pxattr " << pxattr
3433 << " ctime " << i
->ctime
<< dendl
;
3436 m
->set_layout(i
->layout
);
3438 m
->truncate_seq
= i
->truncate_seq
;
3439 m
->truncate_size
= i
->truncate_size
;
3440 m
->mtime
= i
->mtime
;
3441 m
->atime
= i
->atime
;
3442 m
->ctime
= i
->ctime
;
3443 m
->change_attr
= i
->change_attr
;
3444 m
->time_warp_seq
= i
->time_warp_seq
;
3445 m
->nfiles
= i
->dirstat
.nfiles
;
3446 m
->nsubdirs
= i
->dirstat
.nsubdirs
;
3448 if (cap
->client_inline_version
< i
->inline_data
.version
) {
3449 m
->inline_version
= cap
->client_inline_version
= i
->inline_data
.version
;
3450 if (i
->inline_data
.length() > 0)
3451 m
->inline_data
= i
->inline_data
.get_data();
3453 m
->inline_version
= 0;
3456 // max_size is min of projected, actual.
3457 uint64_t oldms
= oi
->client_ranges
.count(client
) ? oi
->client_ranges
[client
].range
.last
: 0;
3458 uint64_t newms
= pi
->client_ranges
.count(client
) ? pi
->client_ranges
[client
].range
.last
: 0;
3459 m
->max_size
= MIN(oldms
, newms
);
3462 m
->head
.mode
= i
->mode
;
3463 m
->head
.uid
= i
->uid
;
3464 m
->head
.gid
= i
->gid
;
3467 m
->head
.nlink
= i
->nlink
;
3470 auto ix
= pxattr
? get_projected_xattrs() : &xattrs
;
3471 if ((cap
->pending() & CEPH_CAP_XATTR_SHARED
) &&
3472 i
->xattr_version
> cap
->client_xattr_version
) {
3473 dout(10) << " including xattrs v " << i
->xattr_version
<< dendl
;
3474 ::encode(*ix
, m
->xattrbl
);
3475 m
->head
.xattr_version
= i
->xattr_version
;
3476 cap
->client_xattr_version
= i
->xattr_version
;
3482 void CInode::_encode_base(bufferlist
& bl
, uint64_t features
)
3484 ::encode(first
, bl
);
3485 ::encode(inode
, bl
, features
);
3486 ::encode(symlink
, bl
);
3487 ::encode(dirfragtree
, bl
);
3488 ::encode(xattrs
, bl
);
3489 ::encode(old_inodes
, bl
, features
);
3490 ::encode(damage_flags
, bl
);
3493 void CInode::_decode_base(bufferlist::iterator
& p
)
3500 symlink
= mempool::mds_co::string(boost::string_view(tmp
));
3502 ::decode(dirfragtree
, p
);
3503 ::decode(xattrs
, p
);
3504 ::decode(old_inodes
, p
);
3505 ::decode(damage_flags
, p
);
3509 void CInode::_encode_locks_full(bufferlist
& bl
)
3511 ::encode(authlock
, bl
);
3512 ::encode(linklock
, bl
);
3513 ::encode(dirfragtreelock
, bl
);
3514 ::encode(filelock
, bl
);
3515 ::encode(xattrlock
, bl
);
3516 ::encode(snaplock
, bl
);
3517 ::encode(nestlock
, bl
);
3518 ::encode(flocklock
, bl
);
3519 ::encode(policylock
, bl
);
3521 ::encode(loner_cap
, bl
);
3523 void CInode::_decode_locks_full(bufferlist::iterator
& p
)
3525 ::decode(authlock
, p
);
3526 ::decode(linklock
, p
);
3527 ::decode(dirfragtreelock
, p
);
3528 ::decode(filelock
, p
);
3529 ::decode(xattrlock
, p
);
3530 ::decode(snaplock
, p
);
3531 ::decode(nestlock
, p
);
3532 ::decode(flocklock
, p
);
3533 ::decode(policylock
, p
);
3535 ::decode(loner_cap
, p
);
3536 set_loner_cap(loner_cap
);
3537 want_loner_cap
= loner_cap
; // for now, we'll eval() shortly.
3540 void CInode::_encode_locks_state_for_replica(bufferlist
& bl
, bool need_recover
)
3542 authlock
.encode_state_for_replica(bl
);
3543 linklock
.encode_state_for_replica(bl
);
3544 dirfragtreelock
.encode_state_for_replica(bl
);
3545 filelock
.encode_state_for_replica(bl
);
3546 nestlock
.encode_state_for_replica(bl
);
3547 xattrlock
.encode_state_for_replica(bl
);
3548 snaplock
.encode_state_for_replica(bl
);
3549 flocklock
.encode_state_for_replica(bl
);
3550 policylock
.encode_state_for_replica(bl
);
3551 ::encode(need_recover
, bl
);
3554 void CInode::_encode_locks_state_for_rejoin(bufferlist
& bl
, int rep
)
3556 authlock
.encode_state_for_replica(bl
);
3557 linklock
.encode_state_for_replica(bl
);
3558 dirfragtreelock
.encode_state_for_rejoin(bl
, rep
);
3559 filelock
.encode_state_for_rejoin(bl
, rep
);
3560 nestlock
.encode_state_for_rejoin(bl
, rep
);
3561 xattrlock
.encode_state_for_replica(bl
);
3562 snaplock
.encode_state_for_replica(bl
);
3563 flocklock
.encode_state_for_replica(bl
);
3564 policylock
.encode_state_for_replica(bl
);
3567 void CInode::_decode_locks_state(bufferlist::iterator
& p
, bool is_new
)
3569 authlock
.decode_state(p
, is_new
);
3570 linklock
.decode_state(p
, is_new
);
3571 dirfragtreelock
.decode_state(p
, is_new
);
3572 filelock
.decode_state(p
, is_new
);
3573 nestlock
.decode_state(p
, is_new
);
3574 xattrlock
.decode_state(p
, is_new
);
3575 snaplock
.decode_state(p
, is_new
);
3576 flocklock
.decode_state(p
, is_new
);
3577 policylock
.decode_state(p
, is_new
);
3580 ::decode(need_recover
, p
);
3581 if (need_recover
&& is_new
) {
3582 // Auth mds replicated this inode while it's recovering. Auth mds may take xlock on the lock
3583 // and change the object when replaying unsafe requests.
3584 authlock
.mark_need_recover();
3585 linklock
.mark_need_recover();
3586 dirfragtreelock
.mark_need_recover();
3587 filelock
.mark_need_recover();
3588 nestlock
.mark_need_recover();
3589 xattrlock
.mark_need_recover();
3590 snaplock
.mark_need_recover();
3591 flocklock
.mark_need_recover();
3592 policylock
.mark_need_recover();
3595 void CInode::_decode_locks_rejoin(bufferlist::iterator
& p
, list
<MDSInternalContextBase
*>& waiters
,
3596 list
<SimpleLock
*>& eval_locks
, bool survivor
)
3598 authlock
.decode_state_rejoin(p
, waiters
, survivor
);
3599 linklock
.decode_state_rejoin(p
, waiters
, survivor
);
3600 dirfragtreelock
.decode_state_rejoin(p
, waiters
, survivor
);
3601 filelock
.decode_state_rejoin(p
, waiters
, survivor
);
3602 nestlock
.decode_state_rejoin(p
, waiters
, survivor
);
3603 xattrlock
.decode_state_rejoin(p
, waiters
, survivor
);
3604 snaplock
.decode_state_rejoin(p
, waiters
, survivor
);
3605 flocklock
.decode_state_rejoin(p
, waiters
, survivor
);
3606 policylock
.decode_state_rejoin(p
, waiters
, survivor
);
3608 if (!dirfragtreelock
.is_stable() && !dirfragtreelock
.is_wrlocked())
3609 eval_locks
.push_back(&dirfragtreelock
);
3610 if (!filelock
.is_stable() && !filelock
.is_wrlocked())
3611 eval_locks
.push_back(&filelock
);
3612 if (!nestlock
.is_stable() && !nestlock
.is_wrlocked())
3613 eval_locks
.push_back(&nestlock
);
3619 void CInode::encode_export(bufferlist
& bl
)
3621 ENCODE_START(5, 4, bl
);
3622 _encode_base(bl
, mdcache
->mds
->mdsmap
->get_up_features());
3624 ::encode(state
, bl
);
3628 ::encode(get_replicas(), bl
);
3630 // include scatterlock info for any bounding CDirs
3631 bufferlist bounding
;
3633 for (const auto &p
: dirfrags
) {
3634 CDir
*dir
= p
.second
;
3635 if (dir
->state_test(CDir::STATE_EXPORTBOUND
)) {
3636 ::encode(p
.first
, bounding
);
3637 ::encode(dir
->fnode
.fragstat
, bounding
);
3638 ::encode(dir
->fnode
.accounted_fragstat
, bounding
);
3639 ::encode(dir
->fnode
.rstat
, bounding
);
3640 ::encode(dir
->fnode
.accounted_rstat
, bounding
);
3641 dout(10) << " encoded fragstat/rstat info for " << *dir
<< dendl
;
3644 ::encode(bounding
, bl
);
3646 _encode_locks_full(bl
);
3648 _encode_file_locks(bl
);
3652 get(PIN_TEMPEXPORTING
);
3655 void CInode::finish_export(utime_t now
)
3657 state
&= MASK_STATE_EXPORT_KEPT
;
3662 //dirlock.clear_updated();
3666 put(PIN_TEMPEXPORTING
);
3669 void CInode::decode_import(bufferlist::iterator
& p
,
3678 state_set(STATE_AUTH
| (s
& MASK_STATE_EXPORTED
));
3684 if (is_dirty_parent()) {
3685 get(PIN_DIRTYPARENT
);
3686 mark_dirty_parent(ls
);
3689 ::decode(pop
, ceph_clock_now(), p
);
3691 ::decode(get_replicas(), p
);
3692 if (is_replicated())
3693 get(PIN_REPLICATED
);
3696 // decode fragstat info on bounding cdirs
3697 bufferlist bounding
;
3698 ::decode(bounding
, p
);
3699 bufferlist::iterator q
= bounding
.begin();
3703 CDir
*dir
= get_dirfrag(fg
);
3704 assert(dir
); // we should have all bounds open
3706 // Only take the remote's fragstat/rstat if we are non-auth for
3707 // this dirfrag AND the lock is NOT in a scattered (MIX) state.
3708 // We know lock is stable, and MIX is the only state in which
3709 // the inode auth (who sent us this data) may not have the best
3712 // HMM: Are there cases where dir->is_auth() is an insufficient
3713 // check because the dirfrag is under migration? That implies
3714 // it is frozen (and in a SYNC or LOCK state). FIXME.
3716 if (dir
->is_auth() ||
3717 filelock
.get_state() == LOCK_MIX
) {
3718 dout(10) << " skipped fragstat info for " << *dir
<< dendl
;
3723 ::decode(dir
->fnode
.fragstat
, q
);
3724 ::decode(dir
->fnode
.accounted_fragstat
, q
);
3725 dout(10) << " took fragstat info for " << *dir
<< dendl
;
3727 if (dir
->is_auth() ||
3728 nestlock
.get_state() == LOCK_MIX
) {
3729 dout(10) << " skipped rstat info for " << *dir
<< dendl
;
3734 ::decode(dir
->fnode
.rstat
, q
);
3735 ::decode(dir
->fnode
.accounted_rstat
, q
);
3736 dout(10) << " took rstat info for " << *dir
<< dendl
;
3740 _decode_locks_full(p
);
3742 _decode_file_locks(p
);
3748 void InodeStoreBase::dump(Formatter
*f
) const
3751 f
->dump_string("symlink", symlink
);
3752 f
->open_array_section("old_inodes");
3753 for (const auto &p
: old_inodes
) {
3754 f
->open_object_section("old_inode");
3755 // The key is the last snapid, the first is in the mempool_old_inode
3756 f
->dump_int("last", p
.first
);
3758 f
->close_section(); // old_inode
3760 f
->close_section(); // old_inodes
3762 f
->open_object_section("dirfragtree");
3763 dirfragtree
.dump(f
);
3764 f
->close_section(); // dirfragtree
3768 void InodeStore::generate_test_instances(list
<InodeStore
*> &ls
)
3770 InodeStore
*populated
= new InodeStore
;
3771 populated
->inode
.ino
= 0xdeadbeef;
3772 populated
->symlink
= "rhubarb";
3773 ls
.push_back(populated
);
3776 void CInode::validate_disk_state(CInode::validated_data
*results
,
3777 MDSInternalContext
*fin
)
3779 class ValidationContinuation
: public MDSContinuation
{
3781 MDSInternalContext
*fin
;
3783 CInode::validated_data
*results
;
3794 ValidationContinuation(CInode
*i
,
3795 CInode::validated_data
*data_r
,
3796 MDSInternalContext
*fin_
) :
3797 MDSContinuation(i
->mdcache
->mds
->server
),
3802 set_callback(START
, static_cast<Continuation::stagePtr
>(&ValidationContinuation::_start
));
3803 set_callback(BACKTRACE
, static_cast<Continuation::stagePtr
>(&ValidationContinuation::_backtrace
));
3804 set_callback(INODE
, static_cast<Continuation::stagePtr
>(&ValidationContinuation::_inode_disk
));
3805 set_callback(DIRFRAGS
, static_cast<Continuation::stagePtr
>(&ValidationContinuation::_dirfrags
));
3808 ~ValidationContinuation() override
{
3811 in
->mdcache
->num_shadow_inodes
--;
3816 * Fetch backtrace and set tag if tag is non-empty
3818 void fetch_backtrace_and_tag(CInode
*in
, boost::string_view tag
,
3819 Context
*fin
, int *bt_r
, bufferlist
*bt
)
3821 const int64_t pool
= in
->get_backtrace_pool();
3822 object_t oid
= CInode::get_object_name(in
->ino(), frag_t(), "");
3824 ObjectOperation fetch
;
3825 fetch
.getxattr("parent", bt
, bt_r
);
3826 in
->mdcache
->mds
->objecter
->read(oid
, object_locator_t(pool
), fetch
, CEPH_NOSNAP
,
3829 ObjectOperation scrub_tag
;
3831 ::encode(tag
, tag_bl
);
3832 scrub_tag
.setxattr("scrub_tag", tag_bl
);
3834 in
->mdcache
->mds
->objecter
->mutate(oid
, object_locator_t(pool
), scrub_tag
, snapc
,
3835 ceph::real_clock::now(),
3840 bool _start(int rval
) {
3841 if (in
->is_dirty()) {
3842 MDCache
*mdcache
= in
->mdcache
;
3843 mempool_inode
& inode
= in
->inode
;
3844 dout(20) << "validating a dirty CInode; results will be inconclusive"
3847 if (in
->is_symlink()) {
3848 // there's nothing to do for symlinks!
3852 C_OnFinisher
*conf
= new C_OnFinisher(get_io_callback(BACKTRACE
),
3853 in
->mdcache
->mds
->finisher
);
3855 // Whether we have a tag to apply depends on ScrubHeader (if one is
3857 if (in
->scrub_infop
) {
3858 // I'm a non-orphan, so look up my ScrubHeader via my linkage
3859 boost::string_view tag
= in
->scrub_infop
->header
->get_tag();
3860 // Rather than using the usual CInode::fetch_backtrace,
3861 // use a special variant that optionally writes a tag in the same
3863 fetch_backtrace_and_tag(in
, tag
, conf
,
3864 &results
->backtrace
.ondisk_read_retval
, &bl
);
3866 // When we're invoked outside of ScrubStack we might be called
3867 // on an orphaned inode like /
3868 fetch_backtrace_and_tag(in
, {}, conf
,
3869 &results
->backtrace
.ondisk_read_retval
, &bl
);
3874 bool _backtrace(int rval
) {
3875 // set up basic result reporting and make sure we got the data
3876 results
->performed_validation
= true; // at least, some of it!
3877 results
->backtrace
.checked
= true;
3879 const int64_t pool
= in
->get_backtrace_pool();
3880 inode_backtrace_t
& memory_backtrace
= results
->backtrace
.memory_value
;
3881 in
->build_backtrace(pool
, memory_backtrace
);
3882 bool equivalent
, divergent
;
3885 MDCache
*mdcache
= in
->mdcache
; // For the benefit of dout
3886 const mempool_inode
& inode
= in
->inode
; // For the benefit of dout
3888 // Ignore rval because it's the result of a FAILOK operation
3889 // from fetch_backtrace_and_tag: the real result is in
3890 // backtrace.ondisk_read_retval
3891 dout(20) << "ondisk_read_retval: " << results
->backtrace
.ondisk_read_retval
<< dendl
;
3892 if (results
->backtrace
.ondisk_read_retval
!= 0) {
3893 results
->backtrace
.error_str
<< "failed to read off disk; see retval";
3897 // extract the backtrace, and compare it to a newly-constructed one
3899 bufferlist::iterator p
= bl
.begin();
3900 ::decode(results
->backtrace
.ondisk_value
, p
);
3901 dout(10) << "decoded " << bl
.length() << " bytes of backtrace successfully" << dendl
;
3902 } catch (buffer::error
&) {
3903 if (results
->backtrace
.ondisk_read_retval
== 0 && rval
!= 0) {
3904 // Cases where something has clearly gone wrong with the overall
3905 // fetch op, though we didn't get a nonzero rc from the getxattr
3906 // operation. e.g. object missing.
3907 results
->backtrace
.ondisk_read_retval
= rval
;
3909 results
->backtrace
.error_str
<< "failed to decode on-disk backtrace ("
3910 << bl
.length() << " bytes)!";
3914 memory_newer
= memory_backtrace
.compare(results
->backtrace
.ondisk_value
,
3915 &equivalent
, &divergent
);
3917 if (divergent
|| memory_newer
< 0) {
3918 // we're divergent, or on-disk version is newer
3919 results
->backtrace
.error_str
<< "On-disk backtrace is divergent or newer";
3921 results
->backtrace
.passed
= true;
3925 if (!results
->backtrace
.passed
&& in
->scrub_infop
->header
->get_repair()) {
3927 in
->make_path_string(path
);
3928 in
->mdcache
->mds
->clog
->warn() << "bad backtrace on inode " << in
->ino()
3929 << "(" << path
<< "), rewriting it";
3930 in
->mark_dirty_parent(in
->mdcache
->mds
->mdlog
->get_current_segment(),
3932 // Flag that we repaired this BT so that it won't go into damagetable
3933 results
->backtrace
.repaired
= true;
3935 // Flag that we did some repair work so that our repair operation
3936 // can be flushed at end of scrub
3937 in
->scrub_infop
->header
->set_repaired();
3940 // If the inode's number was free in the InoTable, fix that
3943 InoTable
*inotable
= mdcache
->mds
->inotable
;
3945 dout(10) << "scrub: inotable ino = " << inode
.ino
<< dendl
;
3946 dout(10) << "scrub: inotable free says "
3947 << inotable
->is_marked_free(inode
.ino
) << dendl
;
3949 if (inotable
->is_marked_free(inode
.ino
)) {
3950 LogChannelRef clog
= in
->mdcache
->mds
->clog
;
3951 clog
->error() << "scrub: inode wrongly marked free: 0x" << std::hex
3954 if (in
->scrub_infop
->header
->get_repair()) {
3955 bool repaired
= inotable
->repair(inode
.ino
);
3957 clog
->error() << "inode table repaired for inode: 0x" << std::hex
3962 clog
->error() << "Cannot repair inotable while other operations"
3969 // quit if we're a file, or kick off directory checks otherwise
3970 // TODO: validate on-disk inode for non-base directories
3971 if (!in
->is_dir()) {
3975 return validate_directory_data();
3978 bool validate_directory_data() {
3979 assert(in
->is_dir());
3981 if (in
->is_base()) {
3983 shadow_in
= new CInode(in
->mdcache
);
3984 in
->mdcache
->create_unlinked_system_inode(shadow_in
, in
->inode
.ino
, in
->inode
.mode
);
3985 in
->mdcache
->num_shadow_inodes
++;
3987 shadow_in
->fetch(get_internal_callback(INODE
));
3990 results
->inode
.passed
= true;
3991 return check_dirfrag_rstats();
3995 bool _inode_disk(int rval
) {
3996 results
->inode
.checked
= true;
3997 results
->inode
.ondisk_read_retval
= rval
;
3998 results
->inode
.ondisk_value
= shadow_in
->inode
;
3999 results
->inode
.memory_value
= in
->inode
;
4001 mempool_inode
& si
= shadow_in
->inode
;
4002 mempool_inode
& i
= in
->inode
;
4003 if (si
.version
> i
.version
) {
4005 results
->inode
.error_str
<< "On-disk inode is newer than in-memory one!";
4008 bool divergent
= false;
4009 int r
= i
.compare(si
, &divergent
);
4010 results
->inode
.passed
= !divergent
&& r
>= 0;
4011 if (!results
->inode
.passed
) {
4012 results
->inode
.error_str
<<
4013 "On-disk inode is divergent or newer than in-memory one!";
4018 return check_dirfrag_rstats();
4021 bool check_dirfrag_rstats() {
4022 MDSGatherBuilder
gather(g_ceph_context
);
4023 std::list
<frag_t
> frags
;
4024 in
->dirfragtree
.get_leaves(frags
);
4025 for (list
<frag_t
>::iterator p
= frags
.begin();
4028 CDir
*dir
= in
->get_or_open_dirfrag(in
->mdcache
, *p
);
4030 if (!dir
->scrub_infop
->header
)
4031 dir
->scrub_infop
->header
= in
->scrub_infop
->header
;
4032 if (dir
->is_complete()) {
4035 dir
->scrub_infop
->need_scrub_local
= true;
4036 dir
->fetch(gather
.new_sub(), false);
4039 if (gather
.has_subs()) {
4040 gather
.set_finisher(get_internal_callback(DIRFRAGS
));
4044 return immediate(DIRFRAGS
, 0);
4048 bool _dirfrags(int rval
) {
4049 int frags_errors
= 0;
4050 // basic reporting setup
4051 results
->raw_stats
.checked
= true;
4052 results
->raw_stats
.ondisk_read_retval
= rval
;
4054 results
->raw_stats
.memory_value
.dirstat
= in
->inode
.dirstat
;
4055 results
->raw_stats
.memory_value
.rstat
= in
->inode
.rstat
;
4056 frag_info_t
& dir_info
= results
->raw_stats
.ondisk_value
.dirstat
;
4057 nest_info_t
& nest_info
= results
->raw_stats
.ondisk_value
.rstat
;
4060 results
->raw_stats
.error_str
<< "Failed to read dirfrags off disk";
4064 // check each dirfrag...
4065 for (const auto &p
: in
->dirfrags
) {
4066 CDir
*dir
= p
.second
;
4067 assert(dir
->get_version() > 0);
4068 nest_info
.add(dir
->fnode
.accounted_rstat
);
4069 dir_info
.add(dir
->fnode
.accounted_fragstat
);
4070 if (dir
->scrub_infop
&&
4071 dir
->scrub_infop
->pending_scrub_error
) {
4072 dir
->scrub_infop
->pending_scrub_error
= false;
4073 if (dir
->scrub_infop
->header
->get_repair()) {
4074 results
->raw_stats
.repaired
= true;
4075 results
->raw_stats
.error_str
4076 << "dirfrag(" << p
.first
<< ") has bad stats (will be fixed); ";
4078 results
->raw_stats
.error_str
4079 << "dirfrag(" << p
.first
<< ") has bad stats; ";
4084 nest_info
.rsubdirs
++; // it gets one to account for self
4085 // ...and that their sum matches our inode settings
4086 if (!dir_info
.same_sums(in
->inode
.dirstat
) ||
4087 !nest_info
.same_sums(in
->inode
.rstat
)) {
4088 if (in
->scrub_infop
&&
4089 in
->scrub_infop
->header
->get_repair()) {
4090 results
->raw_stats
.error_str
4091 << "freshly-calculated rstats don't match existing ones (will be fixed)";
4092 in
->mdcache
->repair_inode_stats(in
);
4093 results
->raw_stats
.repaired
= true;
4095 results
->raw_stats
.error_str
4096 << "freshly-calculated rstats don't match existing ones";
4100 if (frags_errors
> 0)
4103 results
->raw_stats
.passed
= true;
4108 void _done() override
{
4109 if ((!results
->raw_stats
.checked
|| results
->raw_stats
.passed
) &&
4110 (!results
->backtrace
.checked
|| results
->backtrace
.passed
) &&
4111 (!results
->inode
.checked
|| results
->inode
.passed
))
4112 results
->passed_validation
= true;
4114 fin
->complete(get_rval());
4120 dout(10) << "scrub starting validate_disk_state on " << *this << dendl
;
4121 ValidationContinuation
*vc
= new ValidationContinuation(this,
4127 void CInode::validated_data::dump(Formatter
*f
) const
4129 f
->open_object_section("results");
4131 f
->dump_bool("performed_validation", performed_validation
);
4132 f
->dump_bool("passed_validation", passed_validation
);
4133 f
->open_object_section("backtrace");
4135 f
->dump_bool("checked", backtrace
.checked
);
4136 f
->dump_bool("passed", backtrace
.passed
);
4137 f
->dump_int("read_ret_val", backtrace
.ondisk_read_retval
);
4138 f
->dump_stream("ondisk_value") << backtrace
.ondisk_value
;
4139 f
->dump_stream("memoryvalue") << backtrace
.memory_value
;
4140 f
->dump_string("error_str", backtrace
.error_str
.str());
4142 f
->close_section(); // backtrace
4143 f
->open_object_section("raw_stats");
4145 f
->dump_bool("checked", raw_stats
.checked
);
4146 f
->dump_bool("passed", raw_stats
.passed
);
4147 f
->dump_int("read_ret_val", raw_stats
.ondisk_read_retval
);
4148 f
->dump_stream("ondisk_value.dirstat") << raw_stats
.ondisk_value
.dirstat
;
4149 f
->dump_stream("ondisk_value.rstat") << raw_stats
.ondisk_value
.rstat
;
4150 f
->dump_stream("memory_value.dirrstat") << raw_stats
.memory_value
.dirstat
;
4151 f
->dump_stream("memory_value.rstat") << raw_stats
.memory_value
.rstat
;
4152 f
->dump_string("error_str", raw_stats
.error_str
.str());
4154 f
->close_section(); // raw_stats
4155 // dump failure return code
4157 if (backtrace
.checked
&& backtrace
.ondisk_read_retval
)
4158 rc
= backtrace
.ondisk_read_retval
;
4159 if (inode
.checked
&& inode
.ondisk_read_retval
)
4160 rc
= inode
.ondisk_read_retval
;
4161 if (raw_stats
.checked
&& raw_stats
.ondisk_read_retval
)
4162 rc
= raw_stats
.ondisk_read_retval
;
4163 f
->dump_int("return_code", rc
);
4165 f
->close_section(); // results
4168 bool CInode::validated_data::all_damage_repaired() const
4171 (raw_stats
.checked
&& !raw_stats
.passed
&& !raw_stats
.repaired
)
4173 (backtrace
.checked
&& !backtrace
.passed
&& !backtrace
.repaired
)
4175 (inode
.checked
&& !inode
.passed
&& !inode
.repaired
);
4180 void CInode::dump(Formatter
*f
) const
4182 InodeStoreBase::dump(f
);
4184 MDSCacheObject::dump(f
);
4186 f
->open_object_section("versionlock");
4187 versionlock
.dump(f
);
4190 f
->open_object_section("authlock");
4194 f
->open_object_section("linklock");
4198 f
->open_object_section("dirfragtreelock");
4199 dirfragtreelock
.dump(f
);
4202 f
->open_object_section("filelock");
4206 f
->open_object_section("xattrlock");
4210 f
->open_object_section("snaplock");
4214 f
->open_object_section("nestlock");
4218 f
->open_object_section("flocklock");
4222 f
->open_object_section("policylock");
4226 f
->open_array_section("states");
4227 MDSCacheObject::dump_states(f
);
4228 if (state_test(STATE_EXPORTING
))
4229 f
->dump_string("state", "exporting");
4230 if (state_test(STATE_OPENINGDIR
))
4231 f
->dump_string("state", "openingdir");
4232 if (state_test(STATE_FREEZING
))
4233 f
->dump_string("state", "freezing");
4234 if (state_test(STATE_FROZEN
))
4235 f
->dump_string("state", "frozen");
4236 if (state_test(STATE_AMBIGUOUSAUTH
))
4237 f
->dump_string("state", "ambiguousauth");
4238 if (state_test(STATE_EXPORTINGCAPS
))
4239 f
->dump_string("state", "exportingcaps");
4240 if (state_test(STATE_NEEDSRECOVER
))
4241 f
->dump_string("state", "needsrecover");
4242 if (state_test(STATE_PURGING
))
4243 f
->dump_string("state", "purging");
4244 if (state_test(STATE_DIRTYPARENT
))
4245 f
->dump_string("state", "dirtyparent");
4246 if (state_test(STATE_DIRTYRSTAT
))
4247 f
->dump_string("state", "dirtyrstat");
4248 if (state_test(STATE_STRAYPINNED
))
4249 f
->dump_string("state", "straypinned");
4250 if (state_test(STATE_FROZENAUTHPIN
))
4251 f
->dump_string("state", "frozenauthpin");
4252 if (state_test(STATE_DIRTYPOOL
))
4253 f
->dump_string("state", "dirtypool");
4254 if (state_test(STATE_ORPHAN
))
4255 f
->dump_string("state", "orphan");
4256 if (state_test(STATE_MISSINGOBJS
))
4257 f
->dump_string("state", "missingobjs");
4260 f
->open_array_section("client_caps");
4261 for (map
<client_t
,Capability
*>::const_iterator it
= client_caps
.begin();
4262 it
!= client_caps
.end(); ++it
) {
4263 f
->open_object_section("client_cap");
4264 f
->dump_int("client_id", it
->first
.v
);
4265 f
->dump_string("pending", ccap_string(it
->second
->pending()));
4266 f
->dump_string("issued", ccap_string(it
->second
->issued()));
4267 f
->dump_string("wanted", ccap_string(it
->second
->wanted()));
4268 f
->dump_int("last_sent", it
->second
->get_last_sent());
4273 f
->dump_int("loner", loner_cap
.v
);
4274 f
->dump_int("want_loner", want_loner_cap
.v
);
4276 f
->open_array_section("mds_caps_wanted");
4277 for (const auto &p
: mds_caps_wanted
) {
4278 f
->open_object_section("mds_cap_wanted");
4279 f
->dump_int("rank", p
.first
);
4280 f
->dump_string("cap", ccap_string(p
.second
));
4286 /****** Scrub Stuff *****/
4287 void CInode::scrub_info_create() const
4289 dout(25) << __func__
<< dendl
;
4290 assert(!scrub_infop
);
4292 // break out of const-land to set up implicit initial state
4293 CInode
*me
= const_cast<CInode
*>(this);
4294 mempool_inode
*in
= me
->get_projected_inode();
4296 scrub_info_t
*si
= new scrub_info_t();
4297 si
->scrub_start_stamp
= si
->last_scrub_stamp
= in
->last_scrub_stamp
;
4298 si
->scrub_start_version
= si
->last_scrub_version
= in
->last_scrub_version
;
4300 me
->scrub_infop
= si
;
4303 void CInode::scrub_maybe_delete_info()
4306 !scrub_infop
->scrub_in_progress
&&
4307 !scrub_infop
->last_scrub_dirty
) {
4313 void CInode::scrub_initialize(CDentry
*scrub_parent
,
4314 ScrubHeaderRef
& header
,
4315 MDSInternalContextBase
*f
)
4317 dout(20) << __func__
<< " with scrub_version " << get_version() << dendl
;
4318 if (scrub_is_in_progress()) {
4319 dout(20) << __func__
<< " inode moved during scrub, reinitializing "
4321 assert(scrub_infop
->scrub_parent
);
4322 CDentry
*dn
= scrub_infop
->scrub_parent
;
4323 CDir
*dir
= dn
->dir
;
4324 dn
->put(CDentry::PIN_SCRUBPARENT
);
4325 assert(dir
->scrub_infop
&& dir
->scrub_infop
->directory_scrubbing
);
4326 dir
->scrub_infop
->directories_scrubbing
.erase(dn
->key());
4327 dir
->scrub_infop
->others_scrubbing
.erase(dn
->key());
4331 scrub_infop
= new scrub_info_t();
4333 if (get_projected_inode()->is_dir()) {
4334 // fill in dirfrag_stamps with initial state
4335 std::list
<frag_t
> frags
;
4336 dirfragtree
.get_leaves(frags
);
4337 for (std::list
<frag_t
>::iterator i
= frags
.begin();
4340 if (header
->get_force())
4341 scrub_infop
->dirfrag_stamps
[*i
].reset();
4343 scrub_infop
->dirfrag_stamps
[*i
];
4348 scrub_parent
->get(CDentry::PIN_SCRUBPARENT
);
4349 scrub_infop
->scrub_parent
= scrub_parent
;
4350 scrub_infop
->on_finish
= f
;
4351 scrub_infop
->scrub_in_progress
= true;
4352 scrub_infop
->children_scrubbed
= false;
4353 scrub_infop
->header
= header
;
4355 scrub_infop
->scrub_start_version
= get_version();
4356 scrub_infop
->scrub_start_stamp
= ceph_clock_now();
4357 // right now we don't handle remote inodes
4360 int CInode::scrub_dirfrag_next(frag_t
* out_dirfrag
)
4362 dout(20) << __func__
<< dendl
;
4363 assert(scrub_is_in_progress());
4369 std::map
<frag_t
, scrub_stamp_info_t
>::iterator i
=
4370 scrub_infop
->dirfrag_stamps
.begin();
4372 while (i
!= scrub_infop
->dirfrag_stamps
.end()) {
4373 if (i
->second
.scrub_start_version
< scrub_infop
->scrub_start_version
) {
4374 i
->second
.scrub_start_version
= get_projected_version();
4375 i
->second
.scrub_start_stamp
= ceph_clock_now();
4376 *out_dirfrag
= i
->first
;
4377 dout(20) << " return frag " << *out_dirfrag
<< dendl
;
4383 dout(20) << " no frags left, ENOENT " << dendl
;
4387 void CInode::scrub_dirfrags_scrubbing(list
<frag_t
>* out_dirfrags
)
4389 assert(out_dirfrags
!= NULL
);
4390 assert(scrub_infop
!= NULL
);
4392 out_dirfrags
->clear();
4393 std::map
<frag_t
, scrub_stamp_info_t
>::iterator i
=
4394 scrub_infop
->dirfrag_stamps
.begin();
4396 while (i
!= scrub_infop
->dirfrag_stamps
.end()) {
4397 if (i
->second
.scrub_start_version
>= scrub_infop
->scrub_start_version
) {
4398 if (i
->second
.last_scrub_version
< scrub_infop
->scrub_start_version
)
4399 out_dirfrags
->push_back(i
->first
);
4408 void CInode::scrub_dirfrag_finished(frag_t dirfrag
)
4410 dout(20) << __func__
<< " on frag " << dirfrag
<< dendl
;
4411 assert(scrub_is_in_progress());
4413 std::map
<frag_t
, scrub_stamp_info_t
>::iterator i
=
4414 scrub_infop
->dirfrag_stamps
.find(dirfrag
);
4415 assert(i
!= scrub_infop
->dirfrag_stamps
.end());
4417 scrub_stamp_info_t
&si
= i
->second
;
4418 si
.last_scrub_stamp
= si
.scrub_start_stamp
;
4419 si
.last_scrub_version
= si
.scrub_start_version
;
4422 void CInode::scrub_finished(MDSInternalContextBase
**c
) {
4423 dout(20) << __func__
<< dendl
;
4424 assert(scrub_is_in_progress());
4425 for (std::map
<frag_t
, scrub_stamp_info_t
>::iterator i
=
4426 scrub_infop
->dirfrag_stamps
.begin();
4427 i
!= scrub_infop
->dirfrag_stamps
.end();
4429 if(i
->second
.last_scrub_version
!= i
->second
.scrub_start_version
) {
4430 derr
<< i
->second
.last_scrub_version
<< " != "
4431 << i
->second
.scrub_start_version
<< dendl
;
4433 assert(i
->second
.last_scrub_version
== i
->second
.scrub_start_version
);
4436 scrub_infop
->last_scrub_version
= scrub_infop
->scrub_start_version
;
4437 scrub_infop
->last_scrub_stamp
= scrub_infop
->scrub_start_stamp
;
4438 scrub_infop
->last_scrub_dirty
= true;
4439 scrub_infop
->scrub_in_progress
= false;
4441 if (scrub_infop
->scrub_parent
) {
4442 CDentry
*dn
= scrub_infop
->scrub_parent
;
4443 scrub_infop
->scrub_parent
= NULL
;
4444 dn
->dir
->scrub_dentry_finished(dn
);
4445 dn
->put(CDentry::PIN_SCRUBPARENT
);
4448 *c
= scrub_infop
->on_finish
;
4449 scrub_infop
->on_finish
= NULL
;
4451 if (scrub_infop
->header
->get_origin() == this) {
4452 // We are at the point that a tagging scrub was initiated
4453 LogChannelRef clog
= mdcache
->mds
->clog
;
4454 if (scrub_infop
->header
->get_tag().empty()) {
4455 clog
->info() << "scrub complete";
4457 clog
->info() << "scrub complete with tag '"
4458 << scrub_infop
->header
->get_tag() << "'";
4463 int64_t CInode::get_backtrace_pool() const
4466 return mdcache
->mds
->mdsmap
->get_metadata_pool();
4468 // Files are required to have an explicit layout that specifies
4470 assert(inode
.layout
.pool_id
!= -1);
4471 return inode
.layout
.pool_id
;
4475 void CInode::maybe_export_pin(bool update
)
4477 if (!g_conf
->mds_bal_export_pin
)
4479 if (!is_dir() || !is_normal())
4482 mds_rank_t export_pin
= get_export_pin(false);
4483 if (export_pin
== MDS_RANK_NONE
&& !update
)
4486 if (state_test(CInode::STATE_QUEUEDEXPORTPIN
))
4490 for (auto p
= dirfrags
.begin(); p
!= dirfrags
.end(); p
++) {
4491 CDir
*dir
= p
->second
;
4492 if (!dir
->is_auth())
4494 if (export_pin
!= MDS_RANK_NONE
) {
4495 if (dir
->is_subtree_root()) {
4496 // set auxsubtree bit or export it
4497 if (!dir
->state_test(CDir::STATE_AUXSUBTREE
) ||
4498 export_pin
!= dir
->get_dir_auth().first
)
4501 // create aux subtree or export it
4505 // clear aux subtrees ?
4506 queue
= dir
->state_test(CDir::STATE_AUXSUBTREE
);
4509 state_set(CInode::STATE_QUEUEDEXPORTPIN
);
4510 mdcache
->export_pin_queue
.insert(this);
4516 void CInode::set_export_pin(mds_rank_t rank
)
4519 assert(is_projected());
4520 get_projected_inode()->export_pin
= rank
;
4521 maybe_export_pin(true);
4524 mds_rank_t
CInode::get_export_pin(bool inherit
) const
4526 /* An inode that is export pinned may not necessarily be a subtree root, we
4527 * need to traverse the parents. A base or system inode cannot be pinned.
4528 * N.B. inodes not yet linked into a dir (i.e. anonymous inodes) will not
4529 * have a parent yet.
4531 const CInode
*in
= this;
4533 if (in
->is_system())
4535 const CDentry
*pdn
= in
->get_projected_parent_dn();
4538 const mempool_inode
*pi
= in
->get_projected_inode();
4539 // ignore export pin for unlinked directory
4542 if (pi
->export_pin
>= 0)
4543 return pi
->export_pin
;
4547 in
= pdn
->get_dir()->inode
;
4549 return MDS_RANK_NONE
;
4552 bool CInode::is_exportable(mds_rank_t dest
) const
4554 mds_rank_t pin
= get_export_pin();
4557 } else if (pin
>= 0) {
4564 MEMPOOL_DEFINE_OBJECT_FACTORY(CInode
, co_inode
, mds_co
);