*
*/
+#include <boost/utility/string_view.hpp>
#include "include/types.h"
cache(mdcache), inode(in), frag(fg),
first(2),
dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
- projected_version(0), item_dirty(this), item_new(this),
+ projected_version(0),
+ dirty_dentries(member_offset(CDentry, item_dir_dirty)),
+ item_dirty(this), item_new(this),
num_head_items(0), num_head_null(0),
num_snap_items(0), num_snap_null(0),
num_dirty(0), committing_version(0), committed_version(0),
pop_nested(ceph_clock_now()),
pop_auth_subtree(ceph_clock_now()),
pop_auth_subtree_nested(ceph_clock_now()),
+ pop_lru_subdirs(member_offset(CInode, item_pop_lru)),
num_dentries_nested(0), num_dentries_auth_subtree(0),
num_dentries_auth_subtree_nested(0),
dir_auth(CDIR_AUTH_DEFAULT)
{
- state = STATE_INITIAL;
-
memset(&fnode, 0, sizeof(fnode));
// auth
assert(in->is_dir());
- if (auth)
- state |= STATE_AUTH;
+ if (auth) state_set(STATE_AUTH);
}
/**
frag_info_t frag_info;
nest_info_t nest_info;
- for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
+ for (auto i = items.begin(); i != items.end(); ++i) {
if (i->second->last != CEPH_NOSNAP)
continue;
CDentry::linkage_t *dnl = i->second->get_linkage();
if (!good) {
if (!scrub) {
- for (map_t::iterator i = items.begin(); i != items.end(); ++i) {
+ for (auto i = items.begin(); i != items.end(); ++i) {
CDentry *dn = i->second;
if (dn->get_linkage()->is_primary()) {
CInode *in = dn->get_linkage()->inode;
return good;
}
-CDentry *CDir::lookup(const string& name, snapid_t snap)
+CDentry *CDir::lookup(boost::string_view name, snapid_t snap)
{
dout(20) << "lookup (" << snap << ", '" << name << "')" << dendl;
- map_t::iterator iter = items.lower_bound(dentry_key_t(snap, name.c_str(),
- inode->hash_dentry_name(name)));
+ auto iter = items.lower_bound(dentry_key_t(snap, name, inode->hash_dentry_name(name)));
if (iter == items.end())
return 0;
- if (iter->second->name == name &&
+ if (iter->second->get_name() == name &&
iter->second->first <= snap &&
iter->second->last >= snap) {
dout(20) << " hit -> " << iter->first << dendl;
return 0;
}
-CDentry *CDir::lookup_exact_snap(const string& name, snapid_t last) {
- map_t::iterator p = items.find(dentry_key_t(last, name.c_str(),
- inode->hash_dentry_name(name)));
+CDentry *CDir::lookup_exact_snap(boost::string_view name, snapid_t last) {
+ auto p = items.find(dentry_key_t(last, name, inode->hash_dentry_name(name)));
if (p == items.end())
return NULL;
return p->second;
* linking fun
*/
-CDentry* CDir::add_null_dentry(const string& dname,
+CDentry* CDir::add_null_dentry(boost::string_view dname,
snapid_t first, snapid_t last)
{
// foreign
CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
if (is_auth())
dn->state_set(CDentry::STATE_AUTH);
- cache->lru.lru_insert_mid(dn);
+
+ cache->bottom_lru.lru_insert_mid(dn);
+ dn->state_set(CDentry::STATE_BOTTOMLRU);
dn->dir = this;
dn->version = get_projected_version();
// add to dir
assert(items.count(dn->key()) == 0);
- //assert(null_items.count(dn->name) == 0);
+ //assert(null_items.count(dn->get_name()) == 0);
items[dn->key()] = dn;
if (last == CEPH_NOSNAP)
}
-CDentry* CDir::add_primary_dentry(const string& dname, CInode *in,
+CDentry* CDir::add_primary_dentry(boost::string_view dname, CInode *in,
snapid_t first, snapid_t last)
{
// primary
CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), first, last);
if (is_auth())
dn->state_set(CDentry::STATE_AUTH);
- cache->lru.lru_insert_mid(dn);
+ if (is_auth() || !inode->is_stray()) {
+ cache->lru.lru_insert_mid(dn);
+ } else {
+ cache->bottom_lru.lru_insert_mid(dn);
+ dn->state_set(CDentry::STATE_BOTTOMLRU);
+ }
dn->dir = this;
dn->version = get_projected_version();
// add to dir
assert(items.count(dn->key()) == 0);
- //assert(null_items.count(dn->name) == 0);
+ //assert(null_items.count(dn->get_name()) == 0);
items[dn->key()] = dn;
dn->get_linkage()->inode = in;
- in->set_primary_parent(dn);
link_inode_work(dn, in);
return dn;
}
-CDentry* CDir::add_remote_dentry(const string& dname, inodeno_t ino, unsigned char d_type,
+CDentry* CDir::add_remote_dentry(boost::string_view dname, inodeno_t ino, unsigned char d_type,
snapid_t first, snapid_t last)
{
// foreign
// add to dir
assert(items.count(dn->key()) == 0);
- //assert(null_items.count(dn->name) == 0);
+ //assert(null_items.count(dn->get_name()) == 0);
items[dn->key()] = dn;
if (last == CEPH_NOSNAP)
if (dn->is_dirty())
dn->mark_clean();
- cache->lru.lru_remove(dn);
+ if (dn->state_test(CDentry::STATE_BOTTOMLRU))
+ cache->bottom_lru.lru_remove(dn);
+ else
+ cache->lru.lru_remove(dn);
delete dn;
// unpin?
dn->get_linkage()->set_remote(ino, d_type);
+ if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
+ cache->bottom_lru.lru_remove(dn);
+ cache->lru.lru_insert_mid(dn);
+ dn->state_clear(CDentry::STATE_BOTTOMLRU);
+ }
+
if (dn->last == CEPH_NOSNAP) {
num_head_items++;
num_head_null--;
assert(dn->get_linkage()->is_null());
dn->get_linkage()->inode = in;
- in->set_primary_parent(dn);
link_inode_work(dn, in);
+
+ if (dn->state_test(CDentry::STATE_BOTTOMLRU) &&
+ (is_auth() || !inode->is_stray())) {
+ cache->bottom_lru.lru_remove(dn);
+ cache->lru.lru_insert_mid(dn);
+ dn->state_clear(CDentry::STATE_BOTTOMLRU);
+ }
if (dn->last == CEPH_NOSNAP) {
num_head_items++;
void CDir::link_inode_work( CDentry *dn, CInode *in)
{
assert(dn->get_linkage()->get_inode() == in);
- assert(in->get_parent_dn() == dn);
+ in->set_primary_parent(dn);
// set inode version
//in->inode.version = dn->get_version();
in->move_to_realm(inode->find_snaprealm());
}
-void CDir::unlink_inode(CDentry *dn)
+void CDir::unlink_inode(CDentry *dn, bool adjust_lru)
{
if (dn->get_linkage()->is_primary()) {
dout(12) << "unlink_inode " << *dn << " " << *dn->get_linkage()->get_inode() << dendl;
unlink_inode_work(dn);
+ if (adjust_lru && !dn->state_test(CDentry::STATE_BOTTOMLRU)) {
+ cache->lru.lru_remove(dn);
+ cache->bottom_lru.lru_insert_mid(dn);
+ dn->state_set(CDentry::STATE_BOTTOMLRU);
+ }
+
if (dn->last == CEPH_NOSNAP) {
num_head_items--;
num_head_null++;
// unlink auth_pin count
if (in->auth_pins + in->nested_auth_pins)
dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins), 0 - in->auth_pins, NULL);
-
+
// detach inode
in->remove_primary_parent(dn);
+ if (in->is_dir())
+ in->item_pop_lru.remove_myself();
dn->get_linkage()->inode = 0;
} else {
assert(!dn->get_linkage()->is_null());
bloom.reset(new bloom_filter(size, 1.0 / size, 0));
}
/* This size and false positive probability is completely random.*/
- bloom->insert(dn->name.c_str(), dn->name.size());
+ bloom->insert(dn->get_name().data(), dn->get_name().size());
}
-bool CDir::is_in_bloom(const string& name)
+bool CDir::is_in_bloom(boost::string_view name)
{
if (!bloom)
return false;
- return bloom->contains(name.c_str(), name.size());
+ return bloom->contains(name.data(), name.size());
}
void CDir::remove_null_dentries() {
dout(12) << "remove_null_dentries " << *this << dendl;
- CDir::map_t::iterator p = items.begin();
+ auto p = items.begin();
while (p != items.end()) {
CDentry *dn = p->second;
++p;
void CDir::try_remove_dentries_for_stray()
{
dout(10) << __func__ << dendl;
- assert(inode->inode.nlink == 0);
+ assert(get_parent_dir()->inode->is_stray());
// clear dirty only when the directory was not snapshotted
bool clear_dirty = !inode->snaprealm;
- CDir::map_t::iterator p = items.begin();
+ auto p = items.begin();
while (p != items.end()) {
CDentry *dn = p->second;
++p;
mark_clean();
}
-void CDir::touch_dentries_bottom() {
- dout(12) << "touch_dentries_bottom " << *this << dendl;
-
- for (CDir::map_t::iterator p = items.begin();
- p != items.end();
- ++p)
- inode->mdcache->touch_dentry_bottom(p->second);
-}
-
bool CDir::try_trim_snap_dentry(CDentry *dn, const set<snapid_t>& snaps)
{
assert(dn->last != CEPH_NOSNAP);
{
dout(10) << "purge_stale_snap_data " << snaps << dendl;
- CDir::map_t::iterator p = items.begin();
+ auto p = items.begin();
while (p != items.end()) {
CDentry *dn = p->second;
++p;
if (dn->get_linkage()->is_primary()) {
CInode *in = dn->get_linkage()->get_inode();
- inode_t *pi = in->get_projected_inode();
- if (dn->get_linkage()->get_inode()->is_dir())
+ auto pi = in->get_projected_inode();
+ if (in->is_dir()) {
fnode.fragstat.nsubdirs++;
- else
+ if (in->item_pop_lru.is_on_list())
+ pop_lru_subdirs.push_back(&in->item_pop_lru);
+ } else {
fnode.fragstat.nfiles++;
+ }
fnode.rstat.rbytes += pi->accounted_rstat.rbytes;
fnode.rstat.rfiles += pi->accounted_rstat.rfiles;
fnode.rstat.rsubdirs += pi->accounted_rstat.rsubdirs;
dn->dir->adjust_nested_auth_pins(-ap, -dap, NULL);
}
- if (dn->is_dirty())
+ if (dn->is_dirty()) {
+ dirty_dentries.push_back(&dn->item_dir_dirty);
num_dirty++;
+ }
dn->dir = this;
}
-void CDir::prepare_old_fragment(bool replay)
+void CDir::prepare_old_fragment(map<string_snap_t, std::list<MDSInternalContextBase*> >& dentry_waiters, bool replay)
{
// auth_pin old fragment for duration so that any auth_pinning
// during the dentry migration doesn't trigger side effects
if (!replay && is_auth())
auth_pin(this);
+
+ if (!waiting_on_dentry.empty()) {
+ for (const auto &p : waiting_on_dentry) {
+ auto &e = dentry_waiters[p.first];
+ for (const auto &waiter : p.second) {
+ e.push_back(waiter);
+ }
+ }
+ waiting_on_dentry.clear();
+ put(PIN_DNWAITER);
+ }
}
void CDir::prepare_new_fragment(bool replay)
_freeze_dir();
mark_complete();
}
+ inode->add_dirfrag(this);
}
void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
void CDir::init_fragment_pins()
{
- if (!replica_map.empty())
+ if (is_replicated())
get(PIN_REPLICATED);
if (state_test(STATE_DIRTY))
get(PIN_DIRTY);
fragstatdiff.add_delta(fnode.accounted_fragstat, fnode.fragstat);
dout(10) << " rstatdiff " << rstatdiff << " fragstatdiff " << fragstatdiff << dendl;
- prepare_old_fragment(replay);
+ map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
+ prepare_old_fragment(dentry_waiters, replay);
// create subfrag dirs
int n = 0;
for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
CDir *f = new CDir(inode, *p, cache, is_auth());
f->state_set(state & (MASK_STATE_FRAGMENT_KEPT | STATE_COMPLETE));
- f->replica_map = replica_map;
+ f->get_replicas() = get_replicas();
f->dir_auth = dir_auth;
f->init_fragment_pins();
f->set_version(get_version());
dout(10) << " subfrag " << *p << " " << *f << dendl;
subfrags[n++] = f;
subs.push_back(f);
- inode->add_dirfrag(f);
f->set_dir_auth(get_dir_auth());
f->prepare_new_fragment(replay);
// repartition dentries
while (!items.empty()) {
- CDir::map_t::iterator p = items.begin();
+ auto p = items.begin();
CDentry *dn = p->second;
- frag_t subfrag = inode->pick_dirfrag(dn->name);
+ frag_t subfrag = inode->pick_dirfrag(dn->get_name());
int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
dout(15) << " subfrag " << subfrag << " n=" << n << " for " << p->first << dendl;
CDir *f = subfrags[n];
f->steal_dentry(dn);
}
+ for (const auto &p : dentry_waiters) {
+ frag_t subfrag = inode->pick_dirfrag(p.first.name);
+ int n = (subfrag.value() & (subfrag.mask() ^ frag.mask())) >> subfrag.mask_shift();
+ CDir *f = subfrags[n];
+
+ if (f->waiting_on_dentry.empty())
+ f->get(PIN_DNWAITER);
+ auto &e = f->waiting_on_dentry[p.first];
+ for (const auto &waiter : p.second) {
+ e.push_back(waiter);
+ }
+ }
+
// FIXME: handle dirty old rstat
// fix up new frag fragstats
version_t rstat_version = inode->get_projected_inode()->rstat.version;
version_t dirstat_version = inode->get_projected_inode()->dirstat.version;
+ map<string_snap_t, std::list<MDSInternalContextBase*> > dentry_waiters;
+
for (auto dir : subs) {
dout(10) << " subfrag " << dir->get_frag() << " " << *dir << dendl;
assert(!dir->is_auth() || dir->is_complete() || replay);
fragstatdiff.add_delta(dir->fnode.accounted_fragstat, dir->fnode.fragstat,
&touched_mtime, &touched_chattr);
- dir->prepare_old_fragment(replay);
+ dir->prepare_old_fragment(dentry_waiters, replay);
// steal dentries
while (!dir->items.empty())
steal_dentry(dir->items.begin()->second);
// merge replica map
- for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
- p != dir->replicas_end();
- ++p) {
- unsigned cur = replica_map[p->first];
- if (p->second > cur)
- replica_map[p->first] = p->second;
+ for (const auto &p : dir->get_replicas()) {
+ unsigned cur = get_replicas()[p.first];
+ if (p.second > cur)
+ get_replicas()[p.first] = p.second;
}
// merge version
// merge state
state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
- dir_auth = dir->dir_auth;
dir->finish_old_fragment(waiters, replay);
inode->close_dirfrag(dir->get_frag());
}
+ if (!dentry_waiters.empty()) {
+ get(PIN_DNWAITER);
+ for (const auto &p : dentry_waiters) {
+ auto &e = waiting_on_dentry[p.first];
+ for (const auto &waiter : p.second) {
+ e.push_back(waiter);
+ }
+ }
+ }
+
if (is_auth() && !replay)
mark_complete();
void CDir::resync_accounted_fragstat()
{
fnode_t *pf = get_projected_fnode();
- inode_t *pi = inode->get_projected_inode();
+ auto pi = inode->get_projected_inode();
if (pf->accounted_fragstat.version != pi->dirstat.version) {
pf->fragstat.version = pi->dirstat.version;
void CDir::resync_accounted_rstat()
{
fnode_t *pf = get_projected_fnode();
- inode_t *pi = inode->get_projected_inode();
+ auto pi = inode->get_projected_inode();
if (pf->accounted_rstat.version != pi->rstat.version) {
pf->rstat.version = pi->rstat.version;
if (in->is_frozen())
continue;
- inode_t *pi = in->project_inode();
- pi->version = in->pre_dirty();
+ auto &pi = in->project_inode();
+ pi.inode.version = in->pre_dirty();
inode->mdcache->project_rstat_inode_to_frag(in, this, 0, 0, NULL);
}
* WAITING
*/
-void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalContextBase *c)
+void CDir::add_dentry_waiter(boost::string_view dname, snapid_t snapid, MDSInternalContextBase *c)
{
if (waiting_on_dentry.empty())
get(PIN_DNWAITER);
<< " " << c << " on " << *this << dendl;
}
-void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t last,
+void CDir::take_dentry_waiting(boost::string_view dname, snapid_t first, snapid_t last,
list<MDSInternalContextBase*>& ls)
{
if (waiting_on_dentry.empty())
string_snap_t lb(dname, first);
string_snap_t ub(dname, last);
- compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.lower_bound(lb);
- while (p != waiting_on_dentry.end() &&
- !(ub < p->first)) {
+ auto it = waiting_on_dentry.lower_bound(lb);
+ while (it != waiting_on_dentry.end() &&
+ !(ub < it->first)) {
dout(10) << "take_dentry_waiting dentry " << dname
<< " [" << first << "," << last << "] found waiter on snap "
- << p->first.snapid
+ << it->first.snapid
<< " on " << *this << dendl;
- ls.splice(ls.end(), p->second);
- waiting_on_dentry.erase(p++);
+ for (const auto &waiter : it->second) {
+ ls.push_back(waiter);
+ }
+ waiting_on_dentry.erase(it++);
}
if (waiting_on_dentry.empty())
{
dout(10) << "take_sub_waiting" << dendl;
if (!waiting_on_dentry.empty()) {
- for (compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
- p != waiting_on_dentry.end();
- ++p)
- ls.splice(ls.end(), p->second);
+ for (const auto &p : waiting_on_dentry) {
+ for (const auto &waiter : p.second) {
+ ls.push_back(waiter);
+ }
+ }
waiting_on_dentry.clear();
put(PIN_DNWAITER);
}
{
if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
// take all dentry waiters
- while (!waiting_on_dentry.empty()) {
- compact_map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
- dout(10) << "take_waiting dentry " << p->first.name
- << " snap " << p->first.snapid << " on " << *this << dendl;
- ls.splice(ls.end(), p->second);
- waiting_on_dentry.erase(p);
+ for (const auto &p : waiting_on_dentry) {
+ dout(10) << "take_waiting dentry " << p.first.name
+ << " snap " << p.first.snapid << " on " << *this << dendl;
+ for (const auto &waiter : p.second) {
+ ls.push_back(waiter);
+ }
}
+ waiting_on_dentry.clear();
put(PIN_DNWAITER);
}
fnode_t *CDir::project_fnode()
{
assert(get_version() != 0);
- fnode_t *p = new fnode_t;
- *p = *get_projected_fnode();
- projected_fnode.push_back(p);
+ projected_fnode.emplace_back(*get_projected_fnode());
+ auto &p = projected_fnode.back();
if (scrub_infop && scrub_infop->last_scrub_dirty) {
- p->localized_scrub_stamp = scrub_infop->last_local.time;
- p->localized_scrub_version = scrub_infop->last_local.version;
- p->recursive_scrub_stamp = scrub_infop->last_recursive.time;
- p->recursive_scrub_version = scrub_infop->last_recursive.version;
+ p.localized_scrub_stamp = scrub_infop->last_local.time;
+ p.localized_scrub_version = scrub_infop->last_local.version;
+ p.recursive_scrub_stamp = scrub_infop->last_recursive.time;
+ p.recursive_scrub_version = scrub_infop->last_recursive.version;
scrub_infop->last_scrub_dirty = false;
scrub_maybe_delete_info();
}
- dout(10) << "project_fnode " << p << dendl;
- return p;
+ dout(10) << __func__ << " " << &p << dendl;
+ return &p;
}
void CDir::pop_and_dirty_projected_fnode(LogSegment *ls)
{
assert(!projected_fnode.empty());
- dout(15) << "pop_and_dirty_projected_fnode " << projected_fnode.front()
- << " v" << projected_fnode.front()->version << dendl;
- fnode = *projected_fnode.front();
+ auto &front = projected_fnode.front();
+ dout(15) << __func__ << " " << &front << " v" << front.version << dendl;
+ fnode = front;
_mark_dirty(ls);
- delete projected_fnode.front();
projected_fnode.pop_front();
}
// caller should hold auth pin of this
void CDir::log_mark_dirty()
{
- if (is_dirty() || is_projected())
+ if (is_dirty() || projected_version > get_version())
return; // noop if it is already dirty or will be dirty
version_t pv = pre_dirty();
return fetch(c, want, ignore_authpinnability);
}
-void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_authpinnability)
+void CDir::fetch(MDSInternalContextBase *c, boost::string_view want_dn, bool ignore_authpinnability)
{
dout(10) << "fetch on " << *this << dendl;
}
// unlinked directory inode shouldn't have any entry
- if (inode->inode.nlink == 0 && !inode->snaprealm) {
+ if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
+ !inode->snaprealm) {
dout(7) << "fetch dirfrag for unlinked directory, mark complete" << dendl;
if (get_version() == 0) {
+ assert(inode->is_auth());
set_version(1);
if (state_test(STATE_REJOINUNDEF)) {
}
if (c) add_waiter(WAIT_COMPLETE, c);
- if (!want_dn.empty()) wanted_items.insert(want_dn);
+ if (!want_dn.empty()) wanted_items.insert(mempool::mds_co::string(want_dn));
// already fetching?
if (state_test(CDir::STATE_FETCHING)) {
} else {
assert(c);
std::set<std::string> str_keys;
- for (auto p = keys.begin(); p != keys.end(); ++p) {
+ for (auto p : keys) {
string str;
- p->encode(str);
+ p.encode(str);
str_keys.insert(str);
}
rd.omap_get_vals_by_keys(str_keys, &fin->omap, &fin->ret2);
}
CDentry *CDir::_load_dentry(
- const std::string &key,
- const std::string &dname,
+ boost::string_view key,
+ boost::string_view dname,
const snapid_t last,
bufferlist &bl,
const int pos,
const std::set<snapid_t> *snaps,
- bool *force_dirty,
- list<CInode*> *undef_inodes)
+ bool *force_dirty)
{
bufferlist::iterator q = bl.begin();
if (stale) {
if (!dn) {
- stale_items.insert(key);
+ stale_items.insert(mempool::mds_co::string(key));
*force_dirty = true;
}
return dn;
}
if (dn) {
- if (dn->get_linkage()->get_inode() == 0) {
- dout(12) << "_fetched had NEG dentry " << *dn << dendl;
- } else {
- dout(12) << "_fetched had dentry " << *dn << dendl;
+ CDentry::linkage_t *dnl = dn->get_linkage();
+ dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
+ if (committed_version == 0 &&
+ dnl->is_remote() &&
+ dn->is_dirty() &&
+ ino == dnl->get_remote_ino() &&
+ d_type == dnl->get_remote_d_type()) {
+ // see comment below
+ dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
+ dn->mark_clean();
}
} else {
// (remote) link
if (stale) {
if (!dn) {
- stale_items.insert(key);
+ stale_items.insert(mempool::mds_co::string(key));
*force_dirty = true;
}
return dn;
bool undef_inode = false;
if (dn) {
- CInode *in = dn->get_linkage()->get_inode();
- if (in) {
- dout(12) << "_fetched had dentry " << *dn << dendl;
- if (in->state_test(CInode::STATE_REJOINUNDEF)) {
- undef_inodes->push_back(in);
- undef_inode = true;
- }
- } else
- dout(12) << "_fetched had NEG dentry " << *dn << dendl;
+ CDentry::linkage_t *dnl = dn->get_linkage();
+ dout(12) << "_fetched had " << (dnl->is_null() ? "NEG" : "") << " dentry " << *dn << dendl;
+
+ if (dnl->is_primary()) {
+ CInode *in = dnl->get_inode();
+ if (in->state_test(CInode::STATE_REJOINUNDEF)) {
+ undef_inode = true;
+ } else if (committed_version == 0 &&
+ dn->is_dirty() &&
+ inode_data.inode.ino == in->ino() &&
+ inode_data.inode.version == in->get_version()) {
+ /* clean underwater item?
+ * Underwater item is something that is dirty in our cache from
+ * journal replay, but was previously flushed to disk before the
+ * mds failed.
+ *
+ * We only do this is committed_version == 0. that implies either
+ * - this is a fetch after from a clean/empty CDir is created
+ * (and has no effect, since the dn won't exist); or
+ * - this is a fetch after _recovery_, which is what we're worried
+ * about. Items that are marked dirty from the journal should be
+ * marked clean if they appear on disk.
+ */
+ dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
+ dn->mark_clean();
+ dout(10) << "_fetched had underwater inode " << *dnl->get_inode() << ", marking clean" << dendl;
+ in->mark_clean();
+ }
+ }
}
if (!dn || undef_inode) {
//in->hack_accessed = false;
//in->hack_load_stamp = ceph_clock_now();
//num_new_inodes_loaded++;
+ } else if (g_conf->get_val<bool>("mds_hack_allow_loading_invalid_metadata")) {
+ dout(20) << "hack: adding duplicate dentry for " << *in << dendl;
+ dn = add_primary_dentry(dname, in, first, last);
} else {
dout(0) << "_fetched badness: got (but i already had) " << *in
<< " mode " << in->inode.mode
try {
dn = _load_dentry(
p->first, dname, last, p->second, pos, snaps,
- &force_dirty, &undef_inodes);
+ &force_dirty);
} catch (const buffer::error &err) {
cache->mds->clog->warn() << "Corrupt dentry '" << dname << "' in "
"dir frag " << dirfrag() << ": "
continue;
}
- if (dn && (wanted_items.count(dname) > 0 || !complete)) {
- dout(10) << " touching wanted dn " << *dn << dendl;
- inode->mdcache->touch_dentry(dn);
- }
+ if (!dn)
+ continue;
- /** clean underwater item?
- * Underwater item is something that is dirty in our cache from
- * journal replay, but was previously flushed to disk before the
- * mds failed.
- *
- * We only do this is committed_version == 0. that implies either
- * - this is a fetch after from a clean/empty CDir is created
- * (and has no effect, since the dn won't exist); or
- * - this is a fetch after _recovery_, which is what we're worried
- * about. Items that are marked dirty from the journal should be
- * marked clean if they appear on disk.
- */
- if (committed_version == 0 &&
- dn &&
- dn->get_version() <= got_fnode.version &&
- dn->is_dirty()) {
- dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << dendl;
- dn->mark_clean();
+ CDentry::linkage_t *dnl = dn->get_linkage();
+ if (dnl->is_primary() && dnl->get_inode()->state_test(CInode::STATE_REJOINUNDEF))
+ undef_inodes.push_back(dnl->get_inode());
- if (dn->get_linkage()->is_primary()) {
- assert(dn->get_linkage()->get_inode()->get_version() <= got_fnode.version);
- dout(10) << "_fetched had underwater inode " << *dn->get_linkage()->get_inode() << ", marking clean" << dendl;
- dn->get_linkage()->get_inode()->mark_clean();
- }
+ if (!complete || wanted_items.count(mempool::mds_co::string(boost::string_view(dname))) > 0) {
+ dout(10) << " touching wanted dn " << *dn << dendl;
+ inode->mdcache->touch_dentry(dn);
}
}
finish_waiting(WAIT_COMPLETE, -EIO);
}
-void CDir::go_bad_dentry(snapid_t last, const std::string &dname)
+void CDir::go_bad_dentry(snapid_t last, boost::string_view dname)
{
- dout(10) << "go_bad_dentry " << dname << dendl;
+ dout(10) << __func__ << " " << dname << dendl;
+ std::string path(get_path());
+ path += "/";
+ path += std::string(dname);
const bool fatal = cache->mds->damage_table.notify_dentry(
- inode->ino(), frag, last, dname, get_path() + "/" + dname);
+ inode->ino(), frag, last, dname, path);
if (fatal) {
cache->mds->damaged();
ceph_abort(); // unreachable, damaged() respawns us
assert(is_auth());
assert(ignore_authpinnability || can_auth_pin());
- if (inode->inode.nlink == 0 && !inode->snaprealm) {
- dout(7) << "commit dirfrag for unlinked directory, mark clean" << dendl;
- try_remove_dentries_for_stray();
- if (c)
- cache->mds->queue_waiter(c);
- return;
- }
-
// note: queue up a noop if necessary, so that we always
// get an auth_pin.
if (!c)
object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
if (!stale_items.empty()) {
- for (compact_set<string>::iterator p = stale_items.begin();
- p != stale_items.end();
- ++p) {
- to_remove.insert(*p);
- write_size += (*p).length();
+ for (const auto &p : stale_items) {
+ to_remove.insert(std::string(boost::string_view(p)));
+ write_size += p.length();
}
stale_items.clear();
}
- for (map_t::iterator p = items.begin();
- p != items.end(); ) {
- CDentry *dn = p->second;
- ++p;
-
+ auto write_one = [&](CDentry *dn) {
string key;
dn->key().encode(key);
dout(10) << " rm " << key << dendl;
write_size += key.length();
to_remove.insert(key);
- continue;
+ return;
}
- if (!dn->is_dirty() &&
- (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
- continue; // skip clean dentries
-
if (dn->get_linkage()->is_null()) {
- dout(10) << " rm " << dn->name << " " << *dn << dendl;
+ dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
write_size += key.length();
to_remove.insert(key);
} else {
- dout(10) << " set " << dn->name << " " << *dn << dendl;
+ dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
bufferlist dnbl;
_encode_dentry(dn, dnbl, snaps);
write_size += key.length() + dnbl.length();
to_set.clear();
to_remove.clear();
}
+ };
+
+ if (state_test(CDir::STATE_FRAGMENTING)) {
+ for (auto p = items.begin(); p != items.end(); ) {
+ CDentry *dn = p->second;
+ ++p;
+ if (!dn->is_dirty() && dn->get_linkage()->is_null())
+ continue;
+ write_one(dn);
+ }
+ } else {
+ for (auto p = dirty_dentries.begin(); !p.end(); ) {
+ CDentry *dn = *p;
+ ++p;
+ write_one(dn);
+ }
}
ObjectOperation op;
if (dn->linkage.is_remote()) {
inodeno_t ino = dn->linkage.get_remote_ino();
unsigned char d_type = dn->linkage.get_remote_d_type();
- dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' remote ino " << ino << dendl;
+ dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' remote ino " << ino << dendl;
// marker, name, ino
bl.append('L'); // remote link
CInode *in = dn->linkage.get_inode();
assert(in);
- dout(14) << " pos " << bl.length() << " dn '" << dn->name << "' inode " << *in << dendl;
+ dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' inode " << *in << dendl;
// marker, name, inode, [symlink string]
bl.append('I'); // inode
if (r < 0) {
// the directory could be partly purged during MDS failover
if (r == -ENOENT && committed_version == 0 &&
- inode->inode.nlink == 0 && inode->snaprealm) {
- inode->state_set(CInode::STATE_MISSINGOBJS);
+ !inode->is_base() && get_parent_dir()->inode->is_stray()) {
r = 0;
+ if (inode->snaprealm)
+ inode->state_set(CInode::STATE_MISSINGOBJS);
}
if (r < 0) {
dout(1) << "commit error " << r << " v " << v << dendl;
mark_clean();
// dentries clean?
- for (map_t::iterator it = items.begin();
- it != items.end(); ) {
- CDentry *dn = it->second;
- ++it;
+ for (auto p = dirty_dentries.begin(); !p.end(); ) {
+ CDentry *dn = *p;
+ ++p;
// inode?
if (dn->linkage.is_primary()) {
// dentry
if (committed_version >= dn->get_version()) {
- if (dn->is_dirty()) {
- dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
- dn->mark_clean();
+ dout(15) << " dir " << committed_version << " >= dn " << dn->get_version() << " now clean " << *dn << dendl;
+ dn->mark_clean();
- // drop clean null stray dentries immediately
- if (stray &&
- dn->get_num_ref() == 0 &&
- !dn->is_projected() &&
- dn->get_linkage()->is_null())
- remove_dentry(dn);
- }
+ // drop clean null stray dentries immediately
+ if (stray &&
+ dn->get_num_ref() == 0 &&
+ !dn->is_projected() &&
+ dn->get_linkage()->is_null())
+ remove_dentry(dn);
} else {
dout(15) << " dir " << committed_version << " < dn " << dn->get_version() << " still dirty " << *dn << dendl;
+ assert(dn->is_dirty());
}
}
// finishers?
bool were_waiters = !waiting_for_commit.empty();
- compact_map<version_t, list<MDSInternalContextBase*> >::iterator p = waiting_for_commit.begin();
- while (p != waiting_for_commit.end()) {
- compact_map<version_t, list<MDSInternalContextBase*> >::iterator n = p;
- ++n;
- if (p->first > committed_version) {
- dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
- _commit(p->first, -1);
+ auto it = waiting_for_commit.begin();
+ while (it != waiting_for_commit.end()) {
+ auto _it = it;
+ ++_it;
+ if (it->first > committed_version) {
+ dout(10) << " there are waiters for " << it->first << ", committing again" << dendl;
+ _commit(it->first, -1);
break;
}
- cache->mds->queue_waiters(p->second);
- waiting_for_commit.erase(p);
- p = n;
+ std::list<MDSInternalContextBase*> t;
+ for (const auto &waiter : it->second)
+ t.push_back(waiter);
+ cache->mds->queue_waiters(t);
+ waiting_for_commit.erase(it);
+ it = _it;
}
// try drop dentries in this dirfrag if it's about to be purged
- if (inode->inode.nlink == 0 && inode->snaprealm)
+ if (!inode->is_base() && get_parent_dir()->inode->is_stray() &&
+ inode->snaprealm)
cache->maybe_eval_stray(inode, true);
// unpin if we kicked the last waiter.
::encode(pop_auth_subtree, bl);
::encode(dir_rep_by, bl);
- ::encode(replica_map, bl);
+ ::encode(get_replicas(), bl);
get(PIN_TEMPEXPORTING);
}
pop_auth_subtree_nested.add(now, cache->decayrate, pop_auth_subtree);
::decode(dir_rep_by, blp);
- ::decode(replica_map, blp);
- if (!replica_map.empty()) get(PIN_REPLICATED);
+ ::decode(get_replicas(), blp);
+ if (is_replicated()) get(PIN_REPLICATED);
replica_nonce = 0; // no longer defined
inode->adjust_nested_auth_pins(-1, NULL);
// unpin parent of frozen dir/tree?
- if (inode->is_auth() && (is_frozen_tree_root() || is_frozen_dir()))
- inode->auth_unpin(this);
+ if (inode->is_auth()) {
+ assert(!is_frozen_tree_root());
+ if (is_frozen_dir())
+ inode->auth_unpin(this);
+ }
}
if (was_subtree && !is_subtree_root()) {
dout(10) << " old subtree root, adjusting auth_pins" << dendl;
inode->adjust_nested_auth_pins(1, NULL);
// pin parent of frozen dir/tree?
- if (inode->is_auth() && (is_frozen_tree_root() || is_frozen_dir()))
- inode->auth_pin(this);
+ if (inode->is_auth()) {
+ assert(!is_frozen_tree_root());
+ if (is_frozen_dir())
+ inode->auth_pin(this);
+ }
}
// newly single auth?
frag_info_t c;
memset(&c, 0, sizeof(c));
- for (map_t::iterator it = items.begin();
+ for (auto it = items.begin();
it != items.end();
++it) {
CDentry *dn = it->second;
state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
--num_freezing_trees;
}
+
+ if (is_auth()) {
+ mds_authority_t auth;
+ bool was_subtree = is_subtree_root();
+ if (was_subtree) {
+ auth = get_dir_auth();
+ } else {
+ // temporarily prevent parent subtree from becoming frozen.
+ inode->auth_pin(this);
+ // create new subtree
+ auth = authority();
+ }
+
+ assert(auth.first >= 0);
+ assert(auth.second == CDIR_AUTH_UNKNOWN);
+ auth.second = auth.first;
+ inode->mdcache->adjust_subtree_auth(this, auth);
+ if (!was_subtree)
+ inode->auth_unpin(this);
+ }
+
state_set(STATE_FROZENTREE);
++num_frozen_trees;
get(PIN_FROZEN);
-
- // auth_pin inode for duration of freeze, if we are not a subtree root.
- if (is_auth() && !is_subtree_root())
- inode->auth_pin(this);
}
void CDir::unfreeze_tree()
put(PIN_FROZEN);
- // unpin (may => FREEZEABLE) FIXME: is this order good?
- if (is_auth() && !is_subtree_root())
- inode->auth_unpin(this);
+ if (is_auth()) {
+ // must be subtree
+ assert(is_subtree_root());
+ // for debug purpose, caller should ensure 'dir_auth.second == dir_auth.first'
+ mds_authority_t auth = get_dir_auth();
+ assert(auth.first >= 0);
+ assert(auth.second == auth.first);
+ auth.second = CDIR_AUTH_UNKNOWN;
+ inode->mdcache->adjust_subtree_auth(this, auth);
+ }
// waiters?
finish_waiting(WAIT_UNFREEZE);
MDSCacheObject::dump(f);
}
+void CDir::dump_load(Formatter *f, utime_t now, const DecayRate& rate)
+{
+ f->dump_stream("path") << get_path();
+ f->dump_stream("dirfrag") << dirfrag();
+
+ f->open_object_section("pop_me");
+ pop_me.dump(f, now, rate);
+ f->close_section();
+
+ f->open_object_section("pop_nested");
+ pop_nested.dump(f, now, rate);
+ f->close_section();
+
+ f->open_object_section("pop_auth_subtree");
+ pop_auth_subtree.dump(f, now, rate);
+ f->close_section();
+
+ f->open_object_section("pop_auth_subtree_nested");
+ pop_auth_subtree_nested.dump(f, now, rate);
+ f->close_section();
+}
+
/****** Scrub Stuff *******/
void CDir::scrub_info_create() const
scrub_infop->others_scrubbing.clear();
scrub_infop->others_scrubbed.clear();
- for (map_t::iterator i = items.begin();
+ for (auto i = items.begin();
i != items.end();
++i) {
// TODO: handle snapshot scrubbing
scrub_infop->last_scrub_dirty = true;
}
-int CDir::_next_dentry_on_set(set<dentry_key_t>& dns, bool missing_okay,
+int CDir::_next_dentry_on_set(dentry_key_set &dns, bool missing_okay,
MDSInternalContext *cb, CDentry **dnout)
{
dentry_key_t dnkey;
continue;
}
+ if (!dn->get_linkage()->is_primary()) {
+ dout(15) << " skip dentry " << dnkey.name
+ << ", no longer primary" << dendl;
+ continue;
+ }
+
*dnout = dn;
return 0;
}
return effective_size > fast_limit;
}
+MEMPOOL_DEFINE_OBJECT_FACTORY(CDir, co_dir, mds_co);