]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/Migrator.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mds / Migrator.cc
index 779a87dc2f955f40ff6baaf1917663a282cdac53..b8379250d6f5995e7a4e50435521c41f76156769 100644 (file)
@@ -321,13 +321,17 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
       }
       if (state == EXPORT_WARNING) {
        // notify bystanders
-       export_notify_abort(dir, bounds);
+       export_notify_abort(dir, it->second, bounds);
        // process delayed expires
        cache->process_delayed_expire(dir);
       }
     }
     dir->unfreeze_tree();
     cache->try_subtree_merge(dir);
+    for (auto bd : it->second.residual_dirs) {
+      bd->unfreeze_tree();
+      cache->try_subtree_merge(bd);
+    }
     if (notify_peer &&
        (!mds->is_cluster_degraded() ||
         mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer))) // tell them.
@@ -337,7 +341,7 @@ void Migrator::export_try_cancel(CDir *dir, bool notify_peer)
   case EXPORT_EXPORTING:
     dout(10) << "export state=exporting : reversing, and unfreezing" << dendl;
     it->second.state = EXPORT_CANCELLING;
-    export_reverse(dir);
+    export_reverse(dir, it->second);
     break;
 
   case EXPORT_LOGGINGFINISH:
@@ -506,7 +510,7 @@ void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
       case IMPORT_PREPPING:
        assert(dir);
        dout(10) << "import state=prepping : unpinning base+bounds " << *dir << dendl;
-       import_reverse_prepping(dir);
+       import_reverse_prepping(dir, q->second);
        break;
 
       case IMPORT_PREPPED:
@@ -521,7 +525,7 @@ void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
          cache->adjust_subtree_auth(dir, q->second.peer);
 
          // notify bystanders ; wait in aborting state
-         import_state[df].state = IMPORT_ABORTING;
+         q->second.state = IMPORT_ABORTING;
          import_notify_abort(dir, bounds);
          assert(g_conf->mds_kill_import_at != 10);
        }
@@ -776,6 +780,10 @@ void Migrator::export_dir(CDir *dir, mds_rank_t dest)
   assert(dir->is_auth());
   assert(dest != mds->get_nodeid());
    
+  if (!(mds->is_active() || mds->is_stopping())) {
+    dout(7) << "i'm not active, no exports for now" << dendl;
+    return;
+  }
   if (mds->mdcache->is_readonly()) {
     dout(7) << "read-only FS, no exports for now" << dendl;
     return;
@@ -794,10 +802,17 @@ void Migrator::export_dir(CDir *dir, mds_rank_t dest)
     return;
   }
 
-  if (!dir->inode->is_base() && dir->inode->get_projected_parent_dir()->inode->is_stray() &&
-      dir->inode->get_projected_parent_dir()->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
-    dout(7) << "i won't export anything in stray" << dendl;
-    return;
+  CDir* parent_dir = dir->inode->get_projected_parent_dir();
+  if (parent_dir && parent_dir->inode->is_stray()) {
+    if (parent_dir->get_parent_dir()->ino() != MDS_INO_MDSDIR(dest)) {
+      dout(7) << "i won't export anything in stray" << dendl;
+      return;
+    }
+  } else {
+    if (!mds->is_stopping() && !dir->inode->is_exportable(dest)) {
+      dout(7) << "dir is export pinned" << dendl;
+      return;
+    }
   }
 
   if (dir->is_frozen() ||
@@ -810,16 +825,6 @@ void Migrator::export_dir(CDir *dir, mds_rank_t dest)
     return;
   }
 
-  if (!mds->is_stopping() && !dir->inode->is_exportable(dest)) {
-    dout(7) << "dir is export pinned" << dendl;
-    return;
-  }
-
-  if (dest == mds->get_nodeid() || !mds->mdsmap->is_up(dest)) {
-    dout(7) << "cannot export: dest " << dest << " is me or is not active" << dendl;
-    return;
-  }
-
   if (g_conf->mds_thrash_exports) {
     // create random subtree bound (which will not be exported)
     list<CDir*> ls;
@@ -977,15 +982,22 @@ void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m)
     dout(7) << "must have aborted" << dendl;
   } else {
     assert(it->second.state == EXPORT_DISCOVERING);
-    // release locks to avoid deadlock
-    MDRequestRef mdr = static_cast<MDRequestImpl*>(it->second.mut.get());
-    assert(mdr);
-    mds->mdcache->request_finish(mdr);
-    it->second.mut.reset();
-    // freeze the subtree
-    it->second.state = EXPORT_FREEZING;
-    dir->auth_unpin(this);
-    assert(g_conf->mds_kill_export_at != 3);
+
+    if (m->is_success()) {
+      // release locks to avoid deadlock
+      MDRequestRef mdr = static_cast<MDRequestImpl*>(it->second.mut.get());
+      assert(mdr);
+      mds->mdcache->request_finish(mdr);
+      it->second.mut.reset();
+      // freeze the subtree
+      it->second.state = EXPORT_FREEZING;
+      dir->auth_unpin(this);
+      assert(g_conf->mds_kill_export_at != 3);
+
+    } else {
+      dout(7) << "peer failed to discover (not active?), canceling" << dendl;
+      export_try_cancel(dir, false);
+    }
   }
   
   m->put();  // done
@@ -1072,6 +1084,10 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
 
   // CDir::_freeze_tree() should have forced it into subtree.
   assert(dir->get_dir_auth() == mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
+
+  set<client_t> export_client_set;
+  check_export_size(dir, it->second, export_client_set);
+
   // note the bounds.
   set<CDir*> bounds;
   cache->get_subtree_bounds(dir, bounds);
@@ -1080,12 +1096,10 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
   MExportDirPrep *prep = new MExportDirPrep(dir->dirfrag(), it->second.tid);
 
   // include list of bystanders
-  for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
-       p != dir->replicas_end();
-       ++p) {
-    if (p->first != it->second.peer) {
-      dout(10) << "bystander mds." << p->first << dendl;
-      prep->add_bystander(p->first);
+  for (const auto &p : dir->get_replicas()) {
+    if (p.first != it->second.peer) {
+      dout(10) << "bystander mds." << p.first << dendl;
+      prep->add_bystander(p.first);
     }
   }
 
@@ -1111,8 +1125,7 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
     CDir *bound = *p;
 
     // pin it.
-    bound->get(CDir::PIN_EXPORTBOUND);
-    bound->state_set(CDir::STATE_EXPORTBOUND);
+    assert(bound->state_test(CDir::STATE_EXPORTBOUND));
     
     dout(7) << "  export bound " << *bound << dendl;
     prep->add_bound( bound->dirfrag() );
@@ -1120,8 +1133,14 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
     // trace to bound
     bufferlist tracebl;
     CDir *cur = bound;
-    
+
     char start = '-';
+    if (it->second.residual_dirs.count(bound)) {
+      start = 'f';
+      cache->replicate_dir(bound, it->second.peer, tracebl);
+      dout(7) << "  added " << *bound << dendl;
+    }
+
     while (1) {
       // don't repeat inodes
       if (inodes_added.count(cur->inode->ino()))
@@ -1173,9 +1192,6 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
   // make sure any new instantiations of caps are flushed out
   assert(it->second.warning_ack_waiting.empty());
 
-  set<client_t> export_client_set;
-  get_export_client_set(dir, export_client_set);
-
   MDSGatherBuilder gather(g_ceph_context);
   mds->server->flush_client_sessions(export_client_set, gather);
   if (gather.has_subs()) {
@@ -1185,35 +1201,79 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
   }
 }
 
-void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
+void Migrator::check_export_size(CDir *dir, export_state_t& stat, set<client_t>& client_set)
 {
+  const unsigned frag_size = 800;
+  const unsigned inode_size = 1000;
+  const unsigned cap_size = 80;
+  const unsigned link_size = 10;
+  const unsigned null_size = 1;
+
+  uint64_t max_size = g_conf->get_val<uint64_t>("mds_max_export_size");
+  uint64_t approx_size = 0;
+
   list<CDir*> dfs;
   dfs.push_back(dir);
   while (!dfs.empty()) {
     CDir *dir = dfs.front();
     dfs.pop_front();
+
+    approx_size += frag_size;
     for (CDir::map_t::iterator p = dir->begin(); p != dir->end(); ++p) {
       CDentry *dn = p->second;
-      if (!dn->get_linkage()->is_primary())
+      if (dn->get_linkage()->is_null()) {
+       approx_size += null_size;
+       continue;
+      }
+      if (dn->get_linkage()->is_remote()) {
+       approx_size += link_size;
        continue;
+      }
+
+      approx_size += inode_size;
       CInode *in = dn->get_linkage()->get_inode();
       if (in->is_dir()) {
        // directory?
        list<CDir*> ls;
        in->get_dirfrags(ls);
-       for (list<CDir*>::iterator q = ls.begin(); q != ls.end(); ++q) {
-         if (!(*q)->state_test(CDir::STATE_EXPORTBOUND)) {
+       for (auto q : ls) {
+         if (q->is_subtree_root()) {
+           q->state_set(CDir::STATE_EXPORTBOUND);
+           q->get(CDir::PIN_EXPORTBOUND);
+         } else {
            // include nested dirfrag
-           assert((*q)->get_dir_auth().first == CDIR_AUTH_PARENT);
-           dfs.push_back(*q); // it's ours, recurse (later)
+           assert(q->get_dir_auth().first == CDIR_AUTH_PARENT);
+           dfs.push_front(q);
          }
        }
       }
       for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
           q != in->client_caps.end();
-          ++q)
+          ++q) {
+       approx_size += cap_size;
        client_set.insert(q->first);
+      }
     }
+
+    if (approx_size >= max_size)
+      break;
+  }
+
+  while (!dfs.empty()) {
+    CDir *dir = dfs.front();
+    dfs.pop_front();
+
+    dout(7) << "check_export_size: creating bound " << *dir << dendl;
+    assert(dir->is_auth());
+    dir->state_set(CDir::STATE_EXPORTBOUND);
+    dir->get(CDir::PIN_EXPORTBOUND);
+
+    mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
+    // Another choice here is finishing all WAIT_UNFREEZE contexts and keeping
+    // the newly created subtree unfreeze.
+    dir->_freeze_tree();
+
+    stat.residual_dirs.insert(dir);
   }
 }
 
@@ -1249,7 +1309,7 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
   assert(it->second.state == EXPORT_PREPPING);
 
   if (!m->is_success()) {
-    dout(7) << "peer couldn't acquire all needed locks, canceling" << dendl;
+    dout(7) << "peer couldn't acquire all needed locks or wasn't active, canceling" << dendl;
     export_try_cancel(dir, false);
     m->put();
     return;
@@ -1265,22 +1325,20 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
          it->second.warning_ack_waiting.count(MDS_RANK_NONE) > 0));
   assert(it->second.notify_ack_waiting.empty());
 
-  for (compact_map<mds_rank_t,unsigned>::iterator p = dir->replicas_begin();
-       p != dir->replicas_end();
-       ++p) {
-    if (p->first == it->second.peer) continue;
+  for (const auto &p : dir->get_replicas()) {
+    if (p.first == it->second.peer) continue;
     if (mds->is_cluster_degraded() &&
-       !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first))
+       !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first))
       continue;  // only if active
-    it->second.warning_ack_waiting.insert(p->first);
-    it->second.notify_ack_waiting.insert(p->first);  // we'll eventually get a notifyack, too!
+    it->second.warning_ack_waiting.insert(p.first);
+    it->second.notify_ack_waiting.insert(p.first);  // we'll eventually get a notifyack, too!
 
     MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), it->second.tid, true,
                                                    mds_authority_t(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
                                                    mds_authority_t(mds->get_nodeid(),it->second.peer));
     for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q)
       notify->get_bounds().push_back((*q)->dirfrag());
-    mds->send_message_mds(notify, p->first);
+    mds->send_message_mds(notify, p.first);
     
   }
 
@@ -1311,11 +1369,12 @@ public:
 
 void Migrator::export_go(CDir *dir)
 {
-  assert(export_state.count(dir));
-  dout(7) << "export_go " << *dir << " to " << export_state[dir].peer << dendl;
+  auto it = export_state.find(dir);
+  assert(it != export_state.end());
+  dout(7) << "export_go " << *dir << " to " << it->second.peer << dendl;
 
   // first sync log to flush out e.g. any cap imports
-  mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, export_state[dir].tid));
+  mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, it->second.tid));
   mds->mdlog->flush();
 }
 
@@ -1587,7 +1646,7 @@ uint64_t Migrator::encode_export_dir(bufferlist& exportbl,
       if (!t->state_test(CDir::STATE_EXPORTBOUND)) {
        // include nested dirfrag
        assert(t->get_dir_auth().first == CDIR_AUTH_PARENT);
-       subdirs.push_back(t);  // it's ours, recurse (later)
+       subdirs.push_front(t);  // it's ours, recurse (later)
       }
     }
   }
@@ -1719,11 +1778,10 @@ void Migrator::handle_export_ack(MExportDirAck *m)
   m->put();
 }
 
-void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
+void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
 {
   dout(7) << "export_notify_abort " << *dir << dendl;
 
-  export_state_t& stat = export_state[dir];
   assert(stat.state == EXPORT_CANCELLING);
 
   if (stat.notify_ack_waiting.empty()) {
@@ -1736,9 +1794,9 @@ void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
   for (set<mds_rank_t>::iterator p = stat.notify_ack_waiting.begin();
        p != stat.notify_ack_waiting.end();
        ++p) {
-    MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(),stat.tid, true,
-                                                   pair<int,int>(mds->get_nodeid(),stat.peer),
-                                                   pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN));
+    MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), stat.tid, true,
+                                                   pair<int,int>(mds->get_nodeid(), stat.peer),
+                                                   pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
     for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
       notify->get_bounds().push_back((*i)->dirfrag());
     mds->send_message_mds(notify, *p);
@@ -1750,7 +1808,7 @@ void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
  * that is, we don't know they safely received and logged it, so we reverse our changes
  * and go on.
  */
-void Migrator::export_reverse(CDir *dir)
+void Migrator::export_reverse(CDir *dir, export_state_t& stat)
 {
   dout(7) << "export_reverse " << *dir << dendl;
 
@@ -1782,13 +1840,13 @@ void Migrator::export_reverse(CDir *dir)
   }
   
   // unpin bounds
-  for (const auto &bd : bounds) {
+  for (auto bd : bounds) {
     bd->put(CDir::PIN_EXPORTBOUND);
     bd->state_clear(CDir::STATE_EXPORTBOUND);
   }
 
   // notify bystanders
-  export_notify_abort(dir, bounds);
+  export_notify_abort(dir, stat, bounds);
 
   // unfreeze tree, with possible subtree merge.
   cache->adjust_subtree_auth(dir, mds->get_nodeid(), mds->get_nodeid());
@@ -1798,6 +1856,10 @@ void Migrator::export_reverse(CDir *dir)
 
   dir->unfreeze_tree();
   cache->try_subtree_merge(dir);
+  for (auto bd : stat.residual_dirs) {
+    bd->unfreeze_tree();
+    cache->try_subtree_merge(bd);
+  }
 
   // revoke/resume stale caps
   for (auto in : to_eval) {
@@ -1984,6 +2046,12 @@ void Migrator::export_finish(CDir *dir)
   //  (we do this _after_ removing EXPORTBOUND pins, to allow merges)
   dir->unfreeze_tree();
   cache->try_subtree_merge(dir);
+  for (auto bd : it->second.residual_dirs) {
+    export_queue.push_front(pair<dirfrag_t,mds_rank_t>(bd->dirfrag(), it->second.peer));
+    bd->take_waiting(CDir::WAIT_ANY_MASK, finished);
+    bd->unfreeze_tree();
+    cache->try_subtree_merge(bd);
+  }
 
   // no more auth subtree? clear scatter dirty
   if (!dir->get_inode()->is_auth() &&
@@ -2004,7 +2072,7 @@ void Migrator::export_finish(CDir *dir)
   cache->show_subtrees();
   audit();
 
-  cache->trim(-1, num_dentries); // try trimming exported dentries
+  cache->trim(num_dentries); // try trimming exported dentries
 
   // send pending import_maps?
   mds->mdcache->maybe_send_pending_resolves();
@@ -2037,14 +2105,24 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
 
   // note import state
   dirfrag_t df = m->get_dirfrag();
+
+  if (!mds->is_active()) {
+    dout(7) << " not active, send NACK " << dendl;
+    mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid(), false), from);
+    m->put();
+    return;
+  }
+
   // only start discovering on this message once.
+  import_state_t *p_state;
   map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
   if (!m->started) {
     assert(it == import_state.end());
     m->started = true;
-    import_state[df].state = IMPORT_DISCOVERING;
-    import_state[df].peer = from;
-    import_state[df].tid = m->get_tid();
+    p_state = &import_state[df];
+    p_state->state = IMPORT_DISCOVERING;
+    p_state->peer = from;
+    p_state->tid = m->get_tid();
   } else {
     // am i retrying after ancient path_traverse results?
     if (it == import_state.end() ||
@@ -2055,6 +2133,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
       return;
     }
     assert(it->second.state == IMPORT_DISCOVERING);
+    p_state = &it->second;
   }
 
   if (!mds->mdcache->is_open()) {
@@ -2085,7 +2164,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
   // yay
   dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl;
   
-  import_state[df].state = IMPORT_DISCOVERED;
+  p_state->state = IMPORT_DISCOVERED;
 
   // pin inode in the cache (for now)
   assert(in->is_dir());
@@ -2093,7 +2172,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
 
   // reply
   dout(7) << " sending export_discover_ack on " << *in << dendl;
-  mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), import_state[df].peer);
+  mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), p_state->peer);
   m->put();
   assert (g_conf->mds_kill_import_at != 2);  
 }
@@ -2110,10 +2189,10 @@ void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
   import_state.erase(df);
 }
 
-void Migrator::import_reverse_prepping(CDir *dir)
+void Migrator::import_reverse_prepping(CDir *dir, import_state_t& stat)
 {
   set<CDir*> bounds;
-  cache->map_dirfrag_set(import_state[dir->dirfrag()].bound_ls, bounds);
+  cache->map_dirfrag_set(stat.bound_ls, bounds);
   import_remove_pins(dir, bounds);
   import_reverse_final(dir);
 }
@@ -2135,7 +2214,7 @@ void Migrator::handle_export_cancel(MExportDirCancel *m)
   } else if (it->second.state == IMPORT_PREPPING) {
     CDir *dir = mds->mdcache->get_dirfrag(df);
     assert(dir);
-    import_reverse_prepping(dir);
+    import_reverse_prepping(dir, it->second);
   } else if (it->second.state == IMPORT_PREPPED) {
     CDir *dir = mds->mdcache->get_dirfrag(df);
     assert(dir);
@@ -2248,7 +2327,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
       } else
        assert(0 == "unrecognized start char");
 
-      while (start != '-') {
+      while (!q.end()) {
        CDentry *dn = cache->add_replica_dentry(q, cur, finished);
        dout(10) << "  added " << *dn << dendl;
        CInode *in = cache->add_replica_inode(q, dn, finished);
@@ -2278,68 +2357,75 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
     mds->queue_waiters(finished);
 
 
-  // open all bounds
-  set<CDir*> import_bounds;
-  for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
-       p != import_bound_fragset.end();
-       ++p) {
-    CInode *in = cache->get_inode(p->first);
-    assert(in);
+  bool success = true;
+  if (mds->is_active()) {
+    // open all bounds
+    set<CDir*> import_bounds;
+    for (map<inodeno_t,fragset_t>::iterator p = import_bound_fragset.begin();
+        p != import_bound_fragset.end();
+        ++p) {
+      CInode *in = cache->get_inode(p->first);
+      assert(in);
 
-    // map fragset into a frag_t list, based on the inode fragtree
-    list<frag_t> fglist;
-    for (set<frag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q)
-      in->dirfragtree.get_leaves_under(*q, fglist);
-    dout(10) << " bound inode " << p->first << " fragset " << p->second << " maps to " << fglist << dendl;
-    
-    for (list<frag_t>::iterator q = fglist.begin();
-        q != fglist.end();
-        ++q) {
-      CDir *bound = cache->get_dirfrag(dirfrag_t(p->first, *q));
-      if (!bound) {
-       dout(7) << "  opening bounding dirfrag " << *q << " on " << *in << dendl;
-       cache->open_remote_dirfrag(in, *q,
-                                  new C_MDS_RetryMessage(mds, m));
-       return;
-      }
+      // map fragset into a frag_t list, based on the inode fragtree
+      list<frag_t> fglist;
+      for (set<frag_t>::iterator q = p->second.begin(); q != p->second.end(); ++q)
+       in->dirfragtree.get_leaves_under(*q, fglist);
+      dout(10) << " bound inode " << p->first << " fragset " << p->second << " maps to " << fglist << dendl;
+
+      for (list<frag_t>::iterator q = fglist.begin();
+          q != fglist.end();
+          ++q) {
+       CDir *bound = cache->get_dirfrag(dirfrag_t(p->first, *q));
+       if (!bound) {
+         dout(7) << "  opening bounding dirfrag " << *q << " on " << *in << dendl;
+         cache->open_remote_dirfrag(in, *q,
+             new C_MDS_RetryMessage(mds, m));
+         return;
+       }
 
-      if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
-       dout(7) << "  pinning import bound " << *bound << dendl;
-       bound->get(CDir::PIN_IMPORTBOUND);
-       bound->state_set(CDir::STATE_IMPORTBOUND);
-      } else {
-       dout(7) << "  already pinned import bound " << *bound << dendl;
+       if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
+         dout(7) << "  pinning import bound " << *bound << dendl;
+         bound->get(CDir::PIN_IMPORTBOUND);
+         bound->state_set(CDir::STATE_IMPORTBOUND);
+       } else {
+         dout(7) << "  already pinned import bound " << *bound << dendl;
+       }
+       import_bounds.insert(bound);
       }
-      import_bounds.insert(bound);
     }
-  }
-
-  dout(7) << " all ready, noting auth and freezing import region" << dendl;
 
-  bool success = true;
-  if (!mds->mdcache->is_readonly() &&
-      dir->get_inode()->filelock.can_wrlock(-1) &&
-      dir->get_inode()->nestlock.can_wrlock(-1)) {
-    it->second.mut = new MutationImpl();
-    // force some locks.  hacky.
-    mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);
-    mds->locker->wrlock_force(&dir->inode->nestlock, it->second.mut);
-
-    // note that i am an ambiguous auth for this subtree.
-    // specify bounds, since the exporter explicitly defines the region.
-    cache->adjust_bounded_subtree_auth(dir, import_bounds,
-                                      pair<int,int>(oldauth, mds->get_nodeid()));
-    cache->verify_subtree_bounds(dir, import_bounds);
-    // freeze.
-    dir->_freeze_tree();
-    // note new state
-    it->second.state = IMPORT_PREPPED;
+    dout(7) << " all ready, noting auth and freezing import region" << dendl;
+
+    if (!mds->mdcache->is_readonly() &&
+       dir->get_inode()->filelock.can_wrlock(-1) &&
+       dir->get_inode()->nestlock.can_wrlock(-1)) {
+      it->second.mut = new MutationImpl();
+      // force some locks.  hacky.
+      mds->locker->wrlock_force(&dir->inode->filelock, it->second.mut);
+      mds->locker->wrlock_force(&dir->inode->nestlock, it->second.mut);
+
+      // note that i am an ambiguous auth for this subtree.
+      // specify bounds, since the exporter explicitly defines the region.
+      cache->adjust_bounded_subtree_auth(dir, import_bounds,
+                                        pair<int,int>(oldauth, mds->get_nodeid()));
+      cache->verify_subtree_bounds(dir, import_bounds);
+      // freeze.
+      dir->_freeze_tree();
+      // note new state
+      it->second.state = IMPORT_PREPPED;
+    } else {
+      dout(7) << " couldn't acquire all needed locks, failing. " << *dir << dendl;
+      success = false;
+    }
   } else {
-    dout(7) << " couldn't acquire all needed locks, failing. " << *dir << dendl;
+    dout(7) << " not active, failing. " << *dir << dendl;
     success = false;
-    import_reverse_prepping(dir);
   }
 
+  if (!success)
+    import_reverse_prepping(dir, it->second);
+
   // ok!
   dout(7) << " sending export_prep_ack on " << *dir << dendl;
   mds->send_message(new MExportDirPrepAck(dir->dirfrag(), success, m->get_tid()), m->get_connection());
@@ -2632,7 +2718,7 @@ void Migrator::import_reverse(CDir *dir)
   // log our failure
   mds->mdlog->start_submit_entry(new EImportFinish(dir, false));       // log failure
 
-  cache->trim(-1, num_dentries); // try trimming dentries
+  cache->trim(num_dentries); // try trimming dentries
 
   // notify bystanders; wait in aborting state
   import_notify_abort(dir, bounds);
@@ -2965,7 +3051,7 @@ void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
   if (auth_cap)
     ::decode(in->get_mds_caps_wanted(), blp);
   if (!cap_map.empty() ||
-      (auth_cap && !in->get_mds_caps_wanted().empty())) {
+      (auth_cap && (in->get_caps_wanted() & ~CEPH_CAP_PIN))) {
     peer_exports[in].swap(cap_map);
     in->get(CInode::PIN_IMPORTINGCAPS);
   }
@@ -3242,8 +3328,9 @@ void Migrator::handle_export_caps(MExportCaps *ex)
   assert(in->is_auth());
 
   // FIXME
-  if (in->is_frozen())
+  if (!in->can_auth_pin())
     return;
+  in->auth_pin(this);
 
   C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
       this, in, mds_rank_t(ex->get_source().num()));
@@ -3284,4 +3371,5 @@ void Migrator::logged_import_caps(CInode *in,
   // clients will release caps from the exporter when they receive the cap import message.
   finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps);
   mds->locker->eval(in, CEPH_CAP_LOCKS, true);
+  in->auth_unpin(this);
 }