}
if (state == EXPORT_WARNING) {
// notify bystanders
- export_notify_abort(dir, bounds);
+ export_notify_abort(dir, it->second, bounds);
// process delayed expires
cache->process_delayed_expire(dir);
}
}
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
+ for (auto bd : it->second.residual_dirs) {
+ bd->unfreeze_tree();
+ cache->try_subtree_merge(bd);
+ }
if (notify_peer &&
(!mds->is_cluster_degraded() ||
mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
case EXPORT_EXPORTING:
dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
it->second.state = EXPORT_CANCELLING;
- export_reverse(dir);
+ export_reverse(dir, it->second);
break;
case EXPORT_LOGGINGFINISH:
case IMPORT_PREPPING:
assert(dir);
dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
- import_reverse_prepping(dir);
+ import_reverse_prepping(dir, q->second);
break;
case IMPORT_PREPPED:
cache->adjust_subtree_auth(dir, q->second.peer);
// notify bystanders ; wait in aborting state
- import_state[df].state = IMPORT_ABORTING;
+ q->second.state = IMPORT_ABORTING;
import_notify_abort(dir, bounds);
assert(g_conf->mds_kill_import_at != 10);
}
assert(dir->is_auth());
assert(dest != mds->get_nodeid());
+ if (!(mds->is_active() || mds->is_stopping())) {
+ dout(7) << "i'm not active, no exports for now" << dendl;
+ return;
+ }
if (mds->mdcache->is_readonly()) {
dout(7) << "read-only FS, no exports for now" << dendl;
return;
return;
}
- if (!dir->inode->is_base() && dir->inode->get_projected_parent_dir()->inode->is_stray() &&
- dir->inode->get_projected_parent_dir()->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
- dout(7) << "i won't export anything in stray" << dendl;
- return;
+ CDir* parent_dir = dir->inode->get_projected_parent_dir();
+ if (parent_dir && parent_dir->inode->is_stray()) {
+ if (parent_dir->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
+ dout(7) << "i won't export anything in stray" << dendl;
+ return;
+ }
+ } else {
+ if (!mds->is_stopping() && !dir->inode->is_exportable(dest)) {
+ dout(7) << "dir is export pinned" << dendl;
+ return;
+ }
}
if (dir->is_frozen() ||
return;
}
- if (!mds->is_stopping() && !dir->inode->is_exportable(dest)) {
- dout(7) << "dir is export pinned" << dendl;
- return;
- }
-
- if (dest == mds->get_nodeid() || !mds->mdsmap->is_up(dest)) {
- dout(7) << "cannot export: dest " << dest << " is me or is not active" << dendl;
- return;
- }
-
if (g_conf->mds_thrash_exports) {
// create random subtree bound (which will not be exported)
list<CDir*> ls;
// CDir::_freeze_tree() should have forced it into subtree.
assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
+
+ set<client_t> export_client_set;
+ check_export_size(dir, it->second, export_client_set);
+
// note the bounds.
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
MExportDirPrep *prep = new MExportDirPrep(dir->dirfrag(), it->second.tid);
// include list of bystanders
- for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
- p != dir->replicas_end();
- ++p) {
- if (p->first != it->second.peer) {
- dout(10) << "bystander mds." << p->first << dendl;
- prep->add_bystander(p->first);
+ for (const auto &p : dir->get_replicas()) {
+ if (p.first != it->second.peer) {
+ dout(10) << "bystander mds." << p.first << dendl;
+ prep->add_bystander(p.first);
}
}
CDir *bound = *p;
// pin it.
- bound->get(CDir::PIN_EXPORTBOUND);
- bound->state_set(CDir::STATE_EXPORTBOUND);
+ assert(bound->state_test(CDir::STATE_EXPORTBOUND));
dout(7) << " export bound " << *bound << dendl;
prep->add_bound( bound->dirfrag() );
// trace to bound
bufferlist tracebl;
CDir *cur = bound;
-
+
char start = '-';
+ if (it->second.residual_dirs.count(bound)) {
+ start = 'f';
+ cache->replicate_dir(bound, it->second.peer, tracebl);
+ dout(7) << " added " << *bound << dendl;
+ }
+
while (1) {
// don't repeat inodes
if (inodes_added.count(cur->inode->ino()))
// make sure any new instantiations of caps are flushed out
assert(it->second.warning_ack_waiting.empty());
- set<client_t> export_client_set;
- get_export_client_set(dir, export_client_set);
-
MDSGatherBuilder gather(g_ceph_context);
mds->server->flush_client_sessions(export_client_set, gather);
if (gather.has_subs()) {
}
}
-void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
+void Migrator::check_export_size(CDir *dir, export_state_t& stat, set<client_t>& client_set)
{
+ const unsigned frag_size = 800;
+ const unsigned inode_size = 1000;
+ const unsigned cap_size = 80;
+ const unsigned link_size = 10;
+ const unsigned null_size = 1;
+
+ uint64_t max_size = g_conf->get_val<uint64_t>("mds_max_export_size");
+ uint64_t approx_size = 0;
+
list<CDir*> dfs;
dfs.push_back(dir);
while (!dfs.empty()) {
CDir *dir = dfs.front();
dfs.pop_front();
+
+ approx_size += frag_size;
for (CDir::map_t::iterator p = dir->begin(); p != dir->end(); ++p) {
CDentry *dn = p->second;
- if (!dn->get_linkage()->is_primary())
+ if (dn->get_linkage()->is_null()) {
+ approx_size += null_size;
continue;
+ }
+ if (dn->get_linkage()->is_remote()) {
+ approx_size += link_size;
+ continue;
+ }
+
+ approx_size += inode_size;
CInode *in = dn->get_linkage()->get_inode();
if (in->is_dir()) {
// directory?
list<CDir*> ls;
in->get_dirfrags(ls);
- for (list<CDir*>::iterator q = ls.begin(); q != ls.end(); ++q) {
- if (!(*q)->state_test(CDir::STATE_EXPORTBOUND)) {
+ for (auto q : ls) {
+ if (q->is_subtree_root()) {
+ q->state_set(CDir::STATE_EXPORTBOUND);
+ q->get(CDir::PIN_EXPORTBOUND);
+ } else {
// include nested dirfrag
- assert((*q)->get_dir_auth().first == CDIR_AUTH_PARENT);
- dfs.push_back(*q); // it's ours, recurse (later)
+ assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
+ dfs.push_front(q);
}
}
}
for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
q != in->client_caps.end();
- ++q)
+ ++q) {
+ approx_size += cap_size;
client_set.insert(q->first);
+ }
}
+
+ if (approx_size >= max_size)
+ break;
+ }
+
+ while (!dfs.empty()) {
+ CDir *dir = dfs.front();
+ dfs.pop_front();
+
+ dout(7) << "check_export_size: creating bound " << *dir << dendl;
+ assert(dir->is_auth());
+ dir->state_set(CDir::STATE_EXPORTBOUND);
+ dir->get(CDir::PIN_EXPORTBOUND);
+
+ mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
+ // Another choice here is finishing all WAIT_UNFREEZE contexts and keeping
+ // the newly created subtree unfreeze.
+ dir->_freeze_tree();
+
+ stat.residual_dirs.insert(dir);
}
}
it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0));
assert(it->second.notify_ack_waiting.empty());
- for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
- p != dir->replicas_end();
- ++p) {
- if (p->first == it->second.peer) continue;
+ for (const auto &p : dir->get_replicas()) {
+ if (p.first == it->second.peer) continue;
if (mds->is_cluster_degraded() &&
- !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first))
+ !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first))
continue; // only if active
- it->second.warning_ack_waiting.insert(p->first);
- it->second.notify_ack_waiting.insert(p->first); // we'll eventually get a notifyack, too!
+ it->second.warning_ack_waiting.insert(p.first);
+ it->second.notify_ack_waiting.insert(p.first); // we'll eventually get a notifyack, too!
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), it->second.tid, true,
mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
mds_authority_t(mds->get_nodeid(),it->second.peer));
for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q)
notify->get_bounds().push_back((*q)->dirfrag());
- mds->send_message_mds(notify, p->first);
+ mds->send_message_mds(notify, p.first);
}
void Migrator::export_go(CDir *dir)
{
- assert(export_state.count(dir));
- dout(7) << "export_go " << *dir << " to " << export_state[dir].peer << dendl;
+ auto it = export_state.find(dir);
+ assert(it != export_state.end());
+ dout(7) << "export_go " << *dir << " to " << it->second.peer << dendl;
// first sync log to flush out e.g. any cap imports
- mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, export_state[dir].tid));
+ mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, it->second.tid));
mds->mdlog->flush();
}
if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
// include nested dirfrag
assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
- subdirs.push_back(t); // it's ours, recurse (later)
+ subdirs.push_front(t); // it's ours, recurse (later)
}
}
}
m->put();
}
-void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
+void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
{
dout(7) << "export_notify_abort " << *dir << dendl;
- export_state_t& stat = export_state[dir];
assert(stat.state == EXPORT_CANCELLING);
if (stat.notify_ack_waiting.empty()) {
for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
p != stat.notify_ack_waiting.end();
++p) {
- MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(),stat.tid, true,
- pair<int,int>(mds->get_nodeid(),stat.peer),
- pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN));
+ MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), stat.tid, true,
+ pair<int,int>(mds->get_nodeid(), stat.peer),
+ pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
notify->get_bounds().push_back((*i)->dirfrag());
mds->send_message_mds(notify, *p);
* that is, we don't know they safely received and logged it, so we reverse our changes
* and go on.
*/
-void Migrator::export_reverse(CDir *dir)
+void Migrator::export_reverse(CDir *dir, export_state_t& stat)
{
dout(7) << "export_reverse " << *dir << dendl;
}
// unpin bounds
- for (const auto &bd : bounds) {
+ for (auto bd : bounds) {
bd->put(CDir::PIN_EXPORTBOUND);
bd->state_clear(CDir::STATE_EXPORTBOUND);
}
// notify bystanders
- export_notify_abort(dir, bounds);
+ export_notify_abort(dir, stat, bounds);
// unfreeze tree, with possible subtree merge.
cache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
+ for (auto bd : stat.residual_dirs) {
+ bd->unfreeze_tree();
+ cache->try_subtree_merge(bd);
+ }
// revoke/resume stale caps
for (auto in : to_eval) {
// (we do this _after_ removing EXPORTBOUND pins, to allow merges)
dir->unfreeze_tree();
cache->try_subtree_merge(dir);
+ for (auto bd : it->second.residual_dirs) {
+ export_queue.push_front(pair<dirfrag_t,mds_rank_t>(bd->dirfrag(), it->second.peer));
+ bd->take_waiting(CDir::WAIT_ANY_MASK, finished);
+ bd->unfreeze_tree();
+ cache->try_subtree_merge(bd);
+ }
// no more auth subtree? clear scatter dirty
if (!dir->get_inode()->is_auth() &&
cache->show_subtrees();
audit();
- cache->trim(-1, num_dentries); // try trimming exported dentries
+ cache->trim(num_dentries); // try trimming exported dentries
// send pending import_maps?
mds->mdcache->maybe_send_pending_resolves();
}
// only start discovering on this message once.
+ import_state_t *p_state;
map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
if (!m->started) {
assert(it == import_state.end());
m->started = true;
- import_state[df].state = IMPORT_DISCOVERING;
- import_state[df].peer = from;
- import_state[df].tid = m->get_tid();
+ p_state = &import_state[df];
+ p_state->state = IMPORT_DISCOVERING;
+ p_state->peer = from;
+ p_state->tid = m->get_tid();
} else {
// am i retrying after ancient path_traverse results?
if (it == import_state.end() ||
return;
}
assert(it->second.state == IMPORT_DISCOVERING);
+ p_state = &it->second;
}
if (!mds->mdcache->is_open()) {
// yay
dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl;
- import_state[df].state = IMPORT_DISCOVERED;
+ p_state->state = IMPORT_DISCOVERED;
// pin inode in the cache (for now)
assert(in->is_dir());
// reply
dout(7) << " sending export_discover_ack on " << *in << dendl;
- mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), import_state[df].peer);
+ mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), p_state->peer);
m->put();
assert (g_conf->mds_kill_import_at != 2);
}
import_state.erase(df);
}
-void Migrator::import_reverse_prepping(CDir *dir)
+void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
{
set<CDir*> bounds;
- cache->map_dirfrag_set(import_state[dir->dirfrag()].bound_ls, bounds);
+ cache->map_dirfrag_set(stat.bound_ls, bounds);
import_remove_pins(dir, bounds);
import_reverse_final(dir);
}
} else if (it->second.state == IMPORT_PREPPING) {
CDir *dir = mds->mdcache->get_dirfrag(df);
assert(dir);
- import_reverse_prepping(dir);
+ import_reverse_prepping(dir, it->second);
} else if (it->second.state == IMPORT_PREPPED) {
CDir *dir = mds->mdcache->get_dirfrag(df);
assert(dir);
} else
assert(0 == "unrecognized start char");
- while (start != '-') {
+ while (!q.end()) {
CDentry *dn = cache->add_replica_dentry(q, cur, finished);
dout(10) << " added " << *dn << dendl;
CInode *in = cache->add_replica_inode(q, dn, finished);
}
if (!success)
- import_reverse_prepping(dir);
+ import_reverse_prepping(dir, it->second);
// ok!
dout(7) << " sending export_prep_ack on " << *dir << dendl;
// log our failure
mds->mdlog->start_submit_entry(new EImportFinish(dir, false)); // log failure
- cache->trim(-1, num_dentries); // try trimming dentries
+ cache->trim(num_dentries); // try trimming dentries
// notify bystanders; wait in aborting state
import_notify_abort(dir, bounds);
if (auth_cap)
::decode(in->get_mds_caps_wanted(), blp);
if (!cap_map.empty() ||
- (auth_cap && !in->get_mds_caps_wanted().empty())) {
+ (auth_cap && (in->get_caps_wanted() & ~CEPH_CAP_PIN))) {
peer_exports[in].swap(cap_map);
in->get(CInode::PIN_IMPORTINGCAPS);
}
assert(in->is_auth());
// FIXME
- if (in->is_frozen())
+ if (!in->can_auth_pin())
return;
+ in->auth_pin(this);
C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
this, in, mds_rank_t(ex->get_source().num()));
// clients will release caps from the exporter when they receive the cap import message.
finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
+ in->auth_unpin(this);
}