rootdir->mark_dirty(rootdir->pre_dirty(), mds->mdlog->get_current_segment());
rootdir->commit(0, gather->new_sub());
- root->store(gather->new_sub());
+ root->mark_clean();
+ root->mark_dirty(root->pre_dirty(), mds->mdlog->get_current_segment());
+ root->mark_dirty_parent(mds->mdlog->get_current_segment(), true);
+ root->flush(gather->new_sub());
}
void MDCache::create_mydir_hierarchy(MDSGather *gather)
straydir->mark_complete();
straydir->mark_dirty(straydir->pre_dirty(), ls);
straydir->commit(0, gather->new_sub());
- stray->_mark_dirty_parent(ls, true);
+ stray->mark_dirty_parent(ls, true);
stray->store_backtrace(gather->new_sub());
}
gather.activate();
}
+void MDCache::open_mydir_frag(MDSInternalContextBase *c)
+{
+ open_mydir_inode(
+ new MDSInternalContextWrapper(mds,
+ new FunctionContext([this, c](int r) {
+ if (r < 0) {
+ c->complete(r);
+ return;
+ }
+ CDir *mydir = myin->get_or_open_dirfrag(this, frag_t());
+ assert(mydir);
+ adjust_subtree_auth(mydir, mds->get_nodeid());
+ mydir->fetch(c);
+ })
+ )
+ );
+}
+
void MDCache::open_root()
{
dout(10) << "open_root" << dendl;
* merge with parent and/or child subtrees, if is it appropriate.
* merge can ONLY happen if both parent and child have unambiguous auth.
*/
-void MDCache::adjust_subtree_auth(CDir *dir, mds_authority_t auth)
+void MDCache::adjust_subtree_auth(CDir *dir, mds_authority_t auth, bool adjust_pop)
{
dout(7) << "adjust_subtree_auth " << dir->get_dir_auth() << " -> " << auth
<< " on " << *dir << dendl;
root = dir;
// adjust recursive pop counters
- if (dir->is_auth()) {
+ if (adjust_pop && dir->is_auth()) {
utime_t now = ceph_clock_now();
CDir *p = dir->get_parent_dir();
while (p) {
}
};
-void MDCache::try_subtree_merge_at(CDir *dir, set<CInode*> *to_eval)
+void MDCache::try_subtree_merge_at(CDir *dir, set<CInode*> *to_eval, bool adjust_pop)
{
dout(10) << "try_subtree_merge_at " << *dir << dendl;
subtrees[parent].erase(dir);
// adjust popularity?
- if (dir->is_auth()) {
+ if (adjust_pop && dir->is_auth()) {
utime_t now = ceph_clock_now();
+ CDir *cur = dir;
CDir *p = dir->get_parent_dir();
while (p) {
p->pop_auth_subtree.add(now, decayrate, dir->pop_auth_subtree);
+ p->pop_lru_subdirs.push_front(&cur->get_inode()->item_pop_lru);
if (p->is_subtree_root()) break;
+ cur = p;
p = p->inode->get_parent_dir();
}
}
dout(10) << "adjust_subtree_after_rename " << *diri << " from " << *olddir << dendl;
//show_subtrees();
+ utime_t now = ceph_clock_now();
CDir *newdir = diri->get_parent_dir();
CDir *newparent = get_subtree_root(newdir);
dout(10) << " new parent " << *newparent << dendl;
+ if (olddir != newdir)
+ mds->balancer->adjust_pop_for_rename(olddir, dir, now, false);
+
if (oldparent == newparent) {
dout(10) << "parent unchanged for " << *dir << " at " << *oldparent << dendl;
- continue;
- }
-
- if (dir->is_subtree_root()) {
+ } else if (dir->is_subtree_root()) {
// children are fine. change parent.
dout(10) << "moving " << *dir << " from " << *oldparent << " to " << *newparent << dendl;
assert(subtrees[oldparent].count(dir));
assert(subtrees.count(newparent));
subtrees[newparent].insert(dir);
// caller is responsible for 'eval diri'
- try_subtree_merge_at(dir, NULL);
+ try_subtree_merge_at(dir, NULL, false);
} else {
// mid-subtree.
// did auth change?
if (oldparent->authority() != newparent->authority()) {
- adjust_subtree_auth(dir, oldparent->authority());
+ adjust_subtree_auth(dir, oldparent->authority(), false);
// caller is responsible for 'eval diri'
- try_subtree_merge_at(dir, NULL);
+ try_subtree_merge_at(dir, NULL, false);
}
}
+
+ if (olddir != newdir)
+ mds->balancer->adjust_pop_for_rename(newdir, dir, now, true);
}
show_subtrees();
}
}
-void MDCache::broadcast_quota_to_client(CInode *in)
+void MDCache::broadcast_quota_to_client(CInode *in, client_t exclude_ct)
{
if (!in->is_auth() || in->is_frozen())
return;
continue;
Capability *cap = it->second;
+
+ if (exclude_ct >= 0 && exclude_ct != it->first)
+ goto update;
+
if (cap->last_rbytes == i->rstat.rbytes &&
cap->last_rsize == i->rstat.rsize())
continue;
im.cap_id = ++last_cap_id; // assign a new cap ID
im.issue_seq = 1;
im.mseq = q->second.mseq;
+
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+ if (session)
+ rejoin_client_map.emplace(q->first, session->info.inst);
}
// will process these caps in rejoin stage
if (mds->is_rejoin()) {
map<client_t, set<mds_rank_t> > client_exports;
for (auto p = cap_exports.begin(); p != cap_exports.end(); ++p) {
- assert(cap_export_targets.count(p->first));
- mds_rank_t target = cap_export_targets[p->first];
+ mds_rank_t target = p->second.first;
if (rejoins.count(target) == 0)
continue;
- rejoins[target]->cap_exports[p->first] = p->second;
- for (auto q = p->second.begin(); q != p->second.end(); ++q)
+ rejoins[target]->cap_exports[p->first] = p->second.second;
+ for (auto q = p->second.second.begin(); q != p->second.second.end(); ++q)
client_exports[q->first].insert(target);
}
for (map<client_t, set<mds_rank_t> >::iterator p = client_exports.begin();
rejoins_pending = false;
// nothing?
- if (mds->is_rejoin() && rejoins.empty()) {
+ if (mds->is_rejoin() && rejoin_gather.empty()) {
dout(10) << "nothing to rejoin" << dendl;
rejoin_gather_finish();
}
}
} else {
// done?
- if (rejoin_gather.empty()) {
+ if (rejoin_gather.empty() && rejoin_ack_gather.count(mds->get_nodeid())) {
rejoin_gather_finish();
} else {
dout(7) << "still need rejoin from (" << rejoin_gather << ")" << dendl;
}
}
-class C_MDC_RejoinGatherFinish : public MDCacheContext {
-public:
- explicit C_MDC_RejoinGatherFinish(MDCache *c) : MDCacheContext(c) {}
- void finish(int r) override {
- mdcache->rejoin_gather_finish();
- }
-};
-
/*
* rejoin_scour_survivor_replica - remove source from replica list on unmentioned objects
*
// done?
assert(rejoin_gather.count(from));
rejoin_gather.erase(from);
- if (rejoin_gather.empty()) {
+ if (rejoin_gather.empty() && rejoin_ack_gather.count(mds->get_nodeid())) {
rejoin_gather_finish();
} else {
dout(7) << "still need rejoin from (" << rejoin_gather << ")" << dendl;
for (map<inodeno_t,map<client_t,Capability::Import> >::iterator p = peer_imported.begin();
p != peer_imported.end();
++p) {
- assert(cap_exports.count(p->first));
- assert(cap_export_targets.count(p->first));
- assert(cap_export_targets[p->first] == from);
+ auto& ex = cap_exports.at(p->first);
+ assert(ex.first == from);
for (map<client_t,Capability::Import>::iterator q = p->second.begin();
q != p->second.end();
++q) {
- assert(cap_exports[p->first].count(q->first));
+ auto r = ex.second.find(q->first);
+ assert(r != ex.second.end());
dout(10) << " exporting caps for client." << q->first << " ino " << p->first << dendl;
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
- assert(session);
+ if (!session) {
+ dout(10) << " no session for client." << p->first << dendl;
+ ex.second.erase(r);
+ continue;
+ }
// mark client caps stale.
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0,
- cap_exports[p->first][q->first].capinfo.cap_id, 0,
+ r->second.capinfo.cap_id, 0,
mds->get_osd_epoch_barrier());
m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
(q->second.cap_id > 0 ? from : -1), 0);
mds->send_message_client_counted(m, session);
- cap_exports[p->first].erase(q->first);
+ ex.second.erase(r);
}
- assert(cap_exports[p->first].empty());
+ assert(ex.second.empty());
}
// done?
{
dout(10) << "rejoin_gather_finish" << dendl;
assert(mds->is_rejoin());
+ assert(rejoin_ack_gather.count(mds->get_nodeid()));
if (open_undef_inodes_dirfrags())
return;
rejoin_send_acks();
// signal completion of fetches, rejoin_gather_finish, etc.
- assert(rejoin_ack_gather.count(mds->get_nodeid()));
rejoin_ack_gather.erase(mds->get_nodeid());
// did we already get our acks too?
class C_MDC_RejoinSessionsOpened : public MDCacheLogContext {
public:
- map<client_t,entity_inst_t> client_map;
- map<client_t,uint64_t> sseqmap;
-
- C_MDC_RejoinSessionsOpened(MDCache *c, map<client_t,entity_inst_t>& cm) :
- MDCacheLogContext(c), client_map(cm) {}
+ map<client_t,pair<Session*,uint64_t> > session_map;
+ C_MDC_RejoinSessionsOpened(MDCache *c) : MDCacheLogContext(c) {}
void finish(int r) override {
assert(r == 0);
- mdcache->rejoin_open_sessions_finish(client_map, sseqmap);
+ mdcache->rejoin_open_sessions_finish(session_map);
}
};
-void MDCache::rejoin_open_sessions_finish(map<client_t,entity_inst_t> client_map,
- map<client_t,uint64_t>& sseqmap)
+void MDCache::rejoin_open_sessions_finish(map<client_t,pair<Session*,uint64_t> >& session_map)
{
dout(10) << "rejoin_open_sessions_finish" << dendl;
- mds->server->finish_force_open_sessions(client_map, sseqmap);
+ mds->server->finish_force_open_sessions(session_map);
+ rejoin_session_map.swap(session_map);
if (rejoin_gather.empty())
rejoin_gather_finish();
}
cap_imports_num_opening++;
dout(10) << " opening missing ino " << p->first << dendl;
open_ino(p->first, (int64_t)-1, new C_MDC_RejoinOpenInoFinish(this, p->first), false);
+ if (!(cap_imports_num_opening % 1000))
+ mds->heartbeat_reset();
}
if (cap_imports_num_opening > 0)
// called by rejoin_gather_finish() ?
if (rejoin_gather.count(mds->get_nodeid()) == 0) {
- // if sessions for imported caps are all open ?
- for (map<client_t,entity_inst_t>::iterator p = rejoin_client_map.begin();
- p != rejoin_client_map.end();
- ++p) {
- if (!mds->sessionmap.have_session(entity_name_t::CLIENT(p->first.v))) {
- C_MDC_RejoinSessionsOpened *finish = new C_MDC_RejoinSessionsOpened(this, rejoin_client_map);
- version_t pv = mds->server->prepare_force_open_sessions(rejoin_client_map, finish->sseqmap);
- ESessions *le = new ESessions(pv, rejoin_client_map);
- mds->mdlog->start_submit_entry(le, finish);
- mds->mdlog->flush();
- rejoin_client_map.clear();
- return true;
- }
+ if (!rejoin_client_map.empty() &&
+ rejoin_session_map.empty()) {
+ C_MDC_RejoinSessionsOpened *finish = new C_MDC_RejoinSessionsOpened(this);
+ version_t pv = mds->server->prepare_force_open_sessions(rejoin_client_map,
+ finish->session_map);
+ mds->mdlog->start_submit_entry(new ESessions(pv, rejoin_client_map), finish);
+ mds->mdlog->flush();
+ rejoin_client_map.clear();
+ return true;
}
- rejoin_client_map.clear();
// process caps that were exported by slave rename
for (map<inodeno_t,pair<mds_rank_t,map<client_t,Capability::Export> > >::iterator p = rejoin_slave_exports.begin();
for (map<client_t,Capability::Export>::iterator q = p->second.second.begin();
q != p->second.second.end();
++q) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
- assert(session);
+ auto r = rejoin_session_map.find(q->first);
+ if (r == rejoin_session_map.end())
+ continue;
+ Session *session = r->second.first;
Capability *cap = in->get_client_cap(q->first);
if (!cap)
cap = in->add_client_cap(q->first, session);
}
assert(in->is_auth());
for (auto q = p->second.begin(); q != p->second.end(); ++q) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
- assert(session);
+ Session *session;
+ {
+ auto r = rejoin_session_map.find(q->first);
+ session = (r != rejoin_session_map.end() ? r->second.first : nullptr);
+ }
+
for (auto r = q->second.begin(); r != q->second.end(); ++r) {
+ if (!session) {
+ if (r->first >= 0)
+ (void)rejoin_imported_caps[r->first][p->first][q->first]; // all are zero
+ continue;
+ }
+
Capability *cap = in->reconnect_cap(q->first, r->second, session);
add_reconnected_cap(q->first, in->ino(), r->second);
if (r->first >= 0) {
} else {
trim_non_auth();
+ assert(rejoin_gather.count(mds->get_nodeid()));
rejoin_gather.erase(mds->get_nodeid());
+ assert(!rejoin_ack_gather.count(mds->get_nodeid()));
maybe_send_pending_rejoins();
-
- if (rejoin_gather.empty() && rejoin_ack_gather.count(mds->get_nodeid()))
- rejoin_gather_finish();
}
return false;
}
if (fetch_queue.empty())
return false;
- MDSGatherBuilder gather(g_ceph_context, new C_MDC_RejoinGatherFinish(this));
+ MDSGatherBuilder gather(g_ceph_context,
+ new MDSInternalContextWrapper(mds,
+ new FunctionContext([this](int r) {
+ if (rejoin_gather.empty())
+ rejoin_gather_finish();
+ })
+ )
+ );
+
for (set<CDir*>::iterator p = fetch_queue.begin();
p != fetch_queue.end();
++p) {
// This is because that unconnected replicas are problematic for
// subtree migration.
//
- if (!in->is_auth() && !in->dirfragtreelock.can_read(-1))
+ if (!in->is_auth() && !mds->locker->rdlock_try(&in->dirfragtreelock, -1, nullptr)) {
return true;
+ }
// DIR
list<CDir*> dfls;
}
// empty stray dir
- if (!shutdown_export_strays()) {
- dout(7) << "waiting for strays to migrate" << dendl;
- return false;
- }
-
- // drop our reference to our stray dir inode
- for (int i = 0; i < NUM_STRAY; ++i) {
- if (strays[i] &&
- strays[i]->state_test(CInode::STATE_STRAYPINNED)) {
- strays[i]->state_clear(CInode::STATE_STRAYPINNED);
- strays[i]->put(CInode::PIN_STRAY);
- strays[i]->put_stickydirs();
- }
- }
+ bool strays_all_exported = shutdown_export_strays();
// trim cache
trim(UINT64_MAX);
dout(5) << "lru size now " << lru.lru_get_size() << "/" << bottom_lru.lru_get_size() << dendl;
- // SUBTREES
+ // Export all subtrees to another active (usually rank 0) if not rank 0
int num_auth_subtree = 0;
if (!subtrees.empty() &&
- mds->get_nodeid() != 0 &&
- migrator->get_export_queue_size() == 0) {
+ mds->get_nodeid() != 0) {
dout(7) << "looking for subtrees to export to mds0" << dendl;
list<CDir*> ls;
for (map<CDir*, set<CDir*> >::iterator it = subtrees.begin();
ls.push_back(dir);
}
}
+
+ migrator->clear_export_queue();
for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
CDir *dir = *p;
mds_rank_t dest = dir->get_inode()->authority().first;
}
}
+ if (!strays_all_exported) {
+ dout(7) << "waiting for strays to migrate" << dendl;
+ return false;
+ }
+
if (num_auth_subtree > 0) {
+ assert(mds->get_nodeid() > 0);
dout(7) << "still have " << num_auth_subtree << " auth subtrees" << dendl;
show_subtrees();
return false;
return false;
}
+ // Fully trim the log so that all objects in cache are clean and may be
+ // trimmed by a future MDCache::trim. Note that MDSRank::tick does not
+ // trim the log such that the cache eventually becomes clean.
+ mds->mdlog->trim(0);
+ if (mds->mdlog->get_num_segments() > 1) {
+ dout(7) << "still >1 segments, waiting for log to trim" << dendl;
+ return false;
+ }
+
+ // drop our reference to our stray dir inode
+ for (int i = 0; i < NUM_STRAY; ++i) {
+ if (strays[i] &&
+ strays[i]->state_test(CInode::STATE_STRAYPINNED)) {
+ strays[i]->state_clear(CInode::STATE_STRAYPINNED);
+ strays[i]->put(CInode::PIN_STRAY);
+ strays[i]->put_stickydirs();
+ }
+ }
+
CDir *mydir = myin ? myin->get_dirfrag(frag_t()) : NULL;
if (mydir && !mydir->is_subtree_root())
mydir = NULL;
assert(!migrator->is_exporting());
assert(!migrator->is_importing());
- // flush what we can from the log
- mds->mdlog->trim(0);
- if (mds->mdlog->get_num_segments() > 1) {
- dout(7) << "still >1 segments, waiting for log to trim" << dendl;
- return false;
- }
-
if ((myin && myin->is_auth_pinned()) ||
(mydir && mydir->is_auth_pinned())) {
dout(7) << "still have auth pinned objects" << dendl;
list<CDir*> dfs;
for (int i = 0; i < NUM_STRAY; ++i) {
- if (!strays[i]) {
+ if (!strays[i] ||
+ !strays[i]->state_test(CInode::STATE_STRAYPINNED))
continue;
- }
strays[i]->get_dirfrags(dfs);
}
for (auto &p : dir->items) {
CDentry *dn = p.second;
- CDentry::linkage_t *dnl = dn->get_linkage();
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
if (dnl->is_null())
continue;
done = false;
// If the scrub did some repair, then flush the journal at the end of
// the scrub. Otherwise in the case of e.g. rewriting a backtrace
// the on disk state will still look damaged.
- auto expiry_fin = new FunctionContext([this, header, fin](int r){
- if (header->get_repaired()) {
- dout(4) << "Flushing journal because scrub did some repairs" << dendl;
- mds->mdlog->start_new_segment();
- mds->mdlog->trim_all();
- if (fin) {
- MDSGatherBuilder expiry_gather(g_ceph_context);
- const std::set<LogSegment*> &expiring_segments = mds->mdlog->get_expiring_segments();
- for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
- i != expiring_segments.end(); ++i) {
- (*i)->wait_for_expiry(expiry_gather.new_sub());
- }
- expiry_gather.set_finisher(new MDSInternalContextWrapper(mds, fin));
- expiry_gather.activate();
- }
- } else {
- if (fin) {
- fin->complete(r);
- }
+ auto scrub_finish = new FunctionContext([this, header, fin](int r){
+ if (!header->get_repaired()) {
+ if (fin)
+ fin->complete(r);
+ return;
+ }
+
+ auto flush_finish = new FunctionContext([this, fin](int r){
+ dout(4) << "Expiring log segments because scrub did some repairs" << dendl;
+ mds->mdlog->trim_all();
+
+ if (fin) {
+ MDSGatherBuilder gather(g_ceph_context);
+ auto& expiring_segments = mds->mdlog->get_expiring_segments();
+ for (auto logseg : expiring_segments)
+ logseg->wait_for_expiry(gather.new_sub());
+ assert(gather.has_subs());
+ gather.set_finisher(new MDSInternalContextWrapper(mds, fin));
+ gather.activate();
}
+ });
+
+ dout(4) << "Flushing journal because scrub did some repairs" << dendl;
+ mds->mdlog->start_new_segment();
+ mds->mdlog->flush();
+ mds->mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, flush_finish));
});
if (!header->get_recursive()) {
mds->scrubstack->enqueue_inode_top(in, header,
- new MDSInternalContextWrapper(mds,
- expiry_fin));
+ new MDSInternalContextWrapper(mds, scrub_finish));
} else {
mds->scrubstack->enqueue_inode_bottom(in, header,
- new MDSInternalContextWrapper(mds,
- expiry_fin));
+ new MDSInternalContextWrapper(mds, scrub_finish));
}
mds->server->respond_to_request(mdr, 0);