]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/client/Client.cc
update sources to 12.2.8
[ceph.git] / ceph / src / client / Client.cc
index 41eff1b4b2a247e64b534a4646a8dfd96172a8df..7043f9058a405acc430f77774054f75da93cafb9 100644 (file)
@@ -448,6 +448,10 @@ void Client::dump_status(Formatter *f)
     f->dump_int("dentry_count", lru.lru_get_size());
     f->dump_int("dentry_pinned_count", lru.lru_get_num_pinned());
     f->dump_int("id", get_nodeid().v);
+    entity_inst_t inst(messenger->get_myname(), messenger->get_myaddr());
+    f->dump_object("inst", inst);
+    f->dump_stream("inst_str") << inst;
+    f->dump_stream("addr_str") << inst.addr;
     f->dump_int("inode_count", inode_map.size());
     f->dump_int("mds_epoch", mdsmap->get_epoch());
     f->dump_int("osd_epoch", osd_epoch);
@@ -674,33 +678,11 @@ void Client::trim_dentry(Dentry *dn)
 }
 
 
-void Client::update_inode_file_bits(Inode *in,
-                                   uint64_t truncate_seq, uint64_t truncate_size,
-                                   uint64_t size, uint64_t change_attr,
-                                   uint64_t time_warp_seq, utime_t ctime,
-                                   utime_t mtime,
-                                   utime_t atime,
-                                   version_t inline_version,
-                                   bufferlist& inline_data,
-                                   int issued)
+void Client::update_inode_file_size(Inode *in, int issued, uint64_t size,
+                                   uint64_t truncate_seq, uint64_t truncate_size)
 {
-  bool warn = false;
-  ldout(cct, 10) << "update_inode_file_bits " << *in << " " << ccap_string(issued)
-          << " mtime " << mtime << dendl;
-  ldout(cct, 25) << "truncate_seq: mds " << truncate_seq <<  " local "
-          << in->truncate_seq << " time_warp_seq: mds " << time_warp_seq
-          << " local " << in->time_warp_seq << dendl;
   uint64_t prior_size = in->size;
 
-  if (inline_version > in->inline_version) {
-    in->inline_data = inline_data;
-    in->inline_version = inline_version;
-  }
-
-  /* always take a newer change attr */
-  if (change_attr > in->change_attr)
-    in->change_attr = change_attr;
-
   if (truncate_seq > in->truncate_seq ||
       (truncate_seq == in->truncate_seq && size > in->size)) {
     ldout(cct, 10) << "size " << in->size << " -> " << size << dendl;
@@ -736,7 +718,20 @@ void Client::update_inode_file_bits(Inode *in,
       ldout(cct, 0) << "Hmmm, truncate_seq && truncate_size changed on non-file inode!" << dendl;
     }
   }
-  
+}
+
+void Client::update_inode_file_time(Inode *in, int issued, uint64_t time_warp_seq,
+                                   utime_t ctime, utime_t mtime, utime_t atime)
+{
+  ldout(cct, 10) << __func__ << " " << *in << " " << ccap_string(issued)
+                << " ctime " << ctime << " mtime " << mtime << dendl;
+
+  if (time_warp_seq > in->time_warp_seq)
+    ldout(cct, 10) << " mds time_warp_seq " << time_warp_seq
+                  << " is higher than local time_warp_seq "
+                  << in->time_warp_seq << dendl;
+
+  int warn = false;
   // be careful with size, mtime, atime
   if (issued & (CEPH_CAP_FILE_EXCL|
                CEPH_CAP_FILE_WR|
@@ -747,9 +742,6 @@ void Client::update_inode_file_bits(Inode *in,
     if (ctime > in->ctime) 
       in->ctime = ctime;
     if (time_warp_seq > in->time_warp_seq) {
-      ldout(cct, 10) << "mds time_warp_seq " << time_warp_seq << " on inode " << *in
-              << " is higher than local time_warp_seq "
-              << in->time_warp_seq << dendl;
       //the mds updated times, so take those!
       in->mtime = mtime;
       in->atime = atime;
@@ -834,53 +826,59 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
   if (in->is_symlink())
     in->symlink = st->symlink;
 
-  if (was_new)
-    ldout(cct, 12) << "add_update_inode adding " << *in << " caps " << ccap_string(st->cap.caps) << dendl;
-
-  if (!st->cap.caps)
-    return in;   // as with readdir returning indoes in different snaprealms (no caps!)
-
   // only update inode if mds info is strictly newer, or it is the same and projected (odd).
-  bool updating_inode = false;
-  int issued = 0;
-  if (st->version == 0 ||
-      (in->version & ~1) < st->version) {
-    updating_inode = true;
+  bool new_version = false;
+  if (in->version == 0 ||
+      ((st->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+       (in->version & ~1) < st->version))
+    new_version = true;
 
-    int implemented = 0;
-    issued = in->caps_issued(&implemented) | in->caps_dirty();
-    issued |= implemented;
+  int issued;
+  in->caps_issued(&issued);
+  issued |= in->caps_dirty();
+  int new_issued = ~issued & (int)st->cap.caps;
 
-    in->version = st->version;
+  if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
+      !(issued & CEPH_CAP_AUTH_EXCL)) {
+    in->mode = st->mode;
+    in->uid = st->uid;
+    in->gid = st->gid;
+    in->btime = st->btime;
+  }
 
-    if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
-      in->mode = st->mode;
-      in->uid = st->uid;
-      in->gid = st->gid;
-      in->btime = st->btime;
-    }
+  if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
+      !(issued & CEPH_CAP_LINK_EXCL)) {
+    in->nlink = st->nlink;
+  }
 
-    if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
-      in->nlink = st->nlink;
-    }
+  if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
+    update_inode_file_time(in, issued, st->time_warp_seq,
+                          st->ctime, st->mtime, st->atime);
+  }
 
-    in->dirstat = st->dirstat;
-    in->rstat = st->rstat;
-    in->quota = st->quota;
+  if (new_version ||
+      (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
     in->layout = st->layout;
+    update_inode_file_size(in, issued, st->size, st->truncate_seq, st->truncate_size);
+  }
 
-    if (in->is_dir()) {
+  if (in->is_dir()) {
+    if (new_version || (new_issued & CEPH_CAP_FILE_SHARED)) {
+      in->dirstat = st->dirstat;
+    }
+    // dir_layout/rstat/quota are not tracked by capability, update them only if
+    // the inode stat is from auth mds
+    if (new_version || (st->cap.flags & CEPH_CAP_FLAG_AUTH)) {
       in->dir_layout = st->dir_layout;
       ldout(cct, 20) << " dir hash is " << (int)in->dir_layout.dl_dir_hash << dendl;
+      in->rstat = st->rstat;
+      in->quota = st->quota;
+    }
+    // move me if/when version reflects fragtree changes.
+    if (in->dirfragtree != st->dirfragtree) {
+      in->dirfragtree = st->dirfragtree;
+      _fragmap_remove_non_leaves(in);
     }
-
-    update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
-                          st->change_attr, st->time_warp_seq, st->ctime,
-                          st->mtime, st->atime, st->inline_version,
-                          st->inline_data, issued);
-  } else if (st->inline_version > in->inline_version) {
-    in->inline_data = st->inline_data;
-    in->inline_version = st->inline_version;
   }
 
   if ((in->xattr_version  == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) &&
@@ -891,12 +889,24 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
     in->xattr_version = st->xattr_version;
   }
 
-  // move me if/when version reflects fragtree changes.
-  if (in->dirfragtree != st->dirfragtree) {
-    in->dirfragtree = st->dirfragtree;
-    _fragmap_remove_non_leaves(in);
+  if (st->inline_version > in->inline_version) {
+    in->inline_data = st->inline_data;
+    in->inline_version = st->inline_version;
   }
 
+  /* always take a newer change attr */
+  if (st->change_attr > in->change_attr)
+    in->change_attr = st->change_attr;
+
+  if (st->version > in->version)
+    in->version = st->version;
+
+  if (was_new)
+    ldout(cct, 12) << __func__ << " adding " << *in << " caps " << ccap_string(st->cap.caps) << dendl;
+
+  if (!st->cap.caps)
+    return in;   // as with readdir returning indoes in different snaprealms (no caps!)
+
   if (in->snapid == CEPH_NOSNAP) {
     add_update_cap(in, session, st->cap.cap_id, st->cap.caps, st->cap.seq,
                   st->cap.mseq, inodeno_t(st->cap.realm), st->cap.flags,
@@ -905,30 +915,28 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from,
       in->max_size = st->max_size;
       in->rstat = st->rstat;
     }
-  } else
-    in->snap_caps |= st->cap.caps;
 
-  // setting I_COMPLETE needs to happen after adding the cap
-  if (updating_inode &&
-      in->is_dir() &&
-      (st->cap.caps & CEPH_CAP_FILE_SHARED) &&
-      (issued & CEPH_CAP_FILE_EXCL) == 0 &&
-      in->dirstat.nfiles == 0 &&
-      in->dirstat.nsubdirs == 0) {
-    ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on empty dir " << *in << dendl;
-    in->flags |= I_COMPLETE | I_DIR_ORDERED;
-    if (in->dir) {
-      ldout(cct, 10) << " dir is open on empty dir " << in->ino << " with "
-                    << in->dir->dentries.size() << " entries, marking all dentries null" << dendl;
-      in->dir->readdir_cache.clear();
-      for (auto p = in->dir->dentries.begin();
-          p != in->dir->dentries.end();
-          ++p) {
-       unlink(p->second, true, true);  // keep dir, keep dentry
+    // setting I_COMPLETE needs to happen after adding the cap
+    if (in->is_dir() &&
+       (st->cap.caps & CEPH_CAP_FILE_SHARED) &&
+       (issued & CEPH_CAP_FILE_EXCL) == 0 &&
+       in->dirstat.nfiles == 0 &&
+       in->dirstat.nsubdirs == 0) {
+      ldout(cct, 10) << " marking (I_COMPLETE|I_DIR_ORDERED) on empty dir " << *in << dendl;
+      in->flags |= I_COMPLETE | I_DIR_ORDERED;
+      if (in->dir) {
+       ldout(cct, 10) << " dir is open on empty dir " << in->ino << " with "
+                      << in->dir->dentries.size() << " entries, marking all dentries null" << dendl;
+       in->dir->readdir_cache.clear();
+       for (const auto& p : in->dir->dentries) {
+         unlink(p.second, true, true);  // keep dir, keep dentry
+       }
+       if (in->dir->dentries.empty())
+         close_dir(in->dir);
       }
-      if (in->dir->dentries.empty())
-       close_dir(in->dir);
     }
+  } else {
+    in->snap_caps |= st->cap.caps;
   }
 
   return in;
@@ -1506,6 +1514,10 @@ void Client::connect_mds_targets(mds_rank_t mds)
 void Client::dump_mds_sessions(Formatter *f)
 {
   f->dump_int("id", get_nodeid().v);
+  entity_inst_t inst(messenger->get_myname(), messenger->get_myaddr());
+  f->dump_object("inst", inst);
+  f->dump_stream("inst_str") << inst;
+  f->dump_stream("addr_str") << inst.addr;
   f->open_array_section("sessions");
   for (map<mds_rank_t,MetaSession*>::const_iterator p = mds_sessions.begin(); p != mds_sessions.end(); ++p) {
     f->open_object_section("session");
@@ -3794,7 +3806,7 @@ bool Client::_flush(Inode *in, Context *onfinish)
   }
 
   if (objecter->osdmap_pool_full(in->layout.pool_id)) {
-    ldout(cct, 1) << __func__ << ": FULL, purging for ENOSPC" << dendl;
+    ldout(cct, 8) << __func__ << ": FULL, purging for ENOSPC" << dendl;
     objectcacher->purge_set(&in->oset);
     if (onfinish) {
       onfinish->complete(-ENOSPC);
@@ -4839,13 +4851,11 @@ void Client::handle_cap_trunc(MetaSession *session, Inode *in, MClientCaps *m)
           << " size " << in->size << " -> " << m->get_size()
           << dendl;
   
-  int implemented = 0;
-  int issued = in->caps_issued(&implemented) | in->caps_dirty();
-  issued |= implemented;
-  update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(),
-                        m->get_size(), m->get_change_attr(), m->get_time_warp_seq(),
-                        m->get_ctime(), m->get_mtime(), m->get_atime(),
-                         m->inline_version, m->inline_data, issued);
+  int issued;
+  in->caps_issued(&issued);
+  issued |= in->caps_dirty();
+  update_inode_file_size(in, issued, m->get_size(),
+                        m->get_truncate_seq(), m->get_truncate_size());
   m->put();
 }
 
@@ -5041,27 +5051,27 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
   cap->seq = m->get_seq();
   cap->gen = session->cap_gen;
 
-  in->layout = m->get_layout();
-
   // update inode
-  int implemented = 0;
-  int issued = in->caps_issued(&implemented) | in->caps_dirty();
-  issued |= implemented;
+  int issued;
+  in->caps_issued(&issued);
+  issued |= in->caps_dirty();
 
-  if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+  if ((new_caps & CEPH_CAP_AUTH_SHARED) &&
+      !(issued & CEPH_CAP_AUTH_EXCL)) {
     in->mode = m->head.mode;
     in->uid = m->head.uid;
     in->gid = m->head.gid;
     in->btime = m->btime;
   }
   bool deleted_inode = false;
-  if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
+  if ((new_caps & CEPH_CAP_LINK_SHARED) &&
+      !(issued & CEPH_CAP_LINK_EXCL)) {
     in->nlink = m->head.nlink;
     if (in->nlink == 0 &&
        (new_caps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
       deleted_inode = true;
   }
-  if ((issued & CEPH_CAP_XATTR_EXCL) == 0 &&
+  if (!(issued & CEPH_CAP_XATTR_EXCL) &&
       m->xattrbl.length() &&
       m->head.xattr_version > in->xattr_version) {
     bufferlist::iterator p = m->xattrbl.begin();
@@ -5074,14 +5084,30 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
     in->dirstat.nsubdirs = m->get_nsubdirs();
   }
 
-  update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(),
-                        m->get_change_attr(), m->get_time_warp_seq(), m->get_ctime(),
-                        m->get_mtime(), m->get_atime(),
-                        m->inline_version, m->inline_data, issued);
+  if (new_caps & CEPH_CAP_ANY_RD) {
+    update_inode_file_time(in, issued, m->get_time_warp_seq(),
+                          m->get_ctime(), m->get_mtime(), m->get_atime());
+  }
+
+  if (new_caps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
+    in->layout = m->get_layout();
+    update_inode_file_size(in, issued, m->get_size(),
+                          m->get_truncate_seq(), m->get_truncate_size());
+  }
+
+  if (m->inline_version > in->inline_version) {
+    in->inline_data = m->inline_data;
+    in->inline_version = m->inline_version;
+  }
+
+  /* always take a newer change attr */
+  if (m->get_change_attr() > in->change_attr)
+    in->change_attr = m->get_change_attr();
 
   // max_size
   if (cap == in->auth_cap &&
-      m->get_max_size() != in->max_size) {
+      (new_caps & CEPH_CAP_ANY_FILE_WR) &&
+      (m->get_max_size() != in->max_size)) {
     ldout(cct, 10) << "max_size " << in->max_size << " -> " << m->get_max_size() << dendl;
     in->max_size = m->get_max_size();
     if (in->max_size > in->wanted_max_size) {
@@ -5185,7 +5211,7 @@ int Client::xattr_permission(Inode *in, const char *name, unsigned want,
     r = inode_permission(in, perms, want);
   }
 out:
-  ldout(cct, 3) << __func__ << " " << in << " = " << r <<  dendl;
+  ldout(cct, 5) << __func__ << " " << in << " = " << r <<  dendl;
   return r;
 }
 
@@ -7441,7 +7467,7 @@ int Client::_opendir(Inode *in, dir_result_t **dirpp, const UserPerm& perms)
     return -ENOTDIR;
   *dirpp = new dir_result_t(in, perms);
   opened_dirs.insert(*dirpp);
-  ldout(cct, 3) << "_opendir(" << in->ino << ") = " << 0 << " (" << *dirpp << ")" << dendl;
+  ldout(cct, 8) << "_opendir(" << in->ino << ") = " << 0 << " (" << *dirpp << ")" << dendl;
   return 0;
 }
 
@@ -8243,10 +8269,9 @@ int Client::lookup_hash(inodeno_t ino, inodeno_t dirino, const char *name,
  * the resulting Inode object in one operation, so that caller
  * can safely assume inode will still be there after return.
  */
-int Client::lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
+int Client::_lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
 {
-  Mutex::Locker lock(client_lock);
-  ldout(cct, 3) << "lookup_ino enter(" << ino << ")" << dendl;
+  ldout(cct, 8) << "lookup_ino enter(" << ino << ")" << dendl;
 
   if (unmounting)
     return -ENOTCONN;
@@ -8263,21 +8288,24 @@ int Client::lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
     *inode = p->second;
     _ll_get(*inode);
   }
-  ldout(cct, 3) << "lookup_ino exit(" << ino << ") = " << r << dendl;
+  ldout(cct, 8) << "lookup_ino exit(" << ino << ") = " << r << dendl;
   return r;
 }
 
-
+int Client::lookup_ino(inodeno_t ino, const UserPerm& perms, Inode **inode)
+{
+  Mutex::Locker lock(client_lock);
+  return _lookup_ino(ino, perms, inode);
+}
 
 /**
  * Find the parent inode of `ino` and insert it into
  * our cache.  Conditionally also set `parent` to a referenced
  * Inode* if caller provides non-NULL value.
  */
-int Client::lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
+int Client::_lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
 {
-  Mutex::Locker lock(client_lock);
-  ldout(cct, 3) << "lookup_parent enter(" << ino->ino << ")" << dendl;
+  ldout(cct, 8) << "lookup_parent enter(" << ino->ino << ")" << dendl;
 
   if (unmounting)
     return -ENOTCONN;
@@ -8285,13 +8313,13 @@ int Client::lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
   if (!ino->dn_set.empty()) {
     // if we exposed the parent here, we'd need to check permissions,
     // but right now we just rely on the MDS doing so in make_request
-    ldout(cct, 3) << "lookup_parent dentry already present" << dendl;
+    ldout(cct, 8) << "lookup_parent dentry already present" << dendl;
     return 0;
   }
   
   if (ino->is_root()) {
     *parent = NULL;
-    ldout(cct, 3) << "ino is root, no parent" << dendl;
+    ldout(cct, 8) << "ino is root, no parent" << dendl;
     return -EINVAL;
   }
 
@@ -8306,25 +8334,28 @@ int Client::lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
     if (r == 0) {
       *parent = target.get();
       _ll_get(*parent);
-      ldout(cct, 3) << "lookup_parent found parent " << (*parent)->ino << dendl;
+      ldout(cct, 8) << "lookup_parent found parent " << (*parent)->ino << dendl;
     } else {
       *parent = NULL;
     }
   }
-  ldout(cct, 3) << "lookup_parent exit(" << ino->ino << ") = " << r << dendl;
+  ldout(cct, 8) << "lookup_parent exit(" << ino->ino << ") = " << r << dendl;
   return r;
 }
 
+int Client::lookup_parent(Inode *ino, const UserPerm& perms, Inode **parent)
+{
+  Mutex::Locker lock(client_lock);
+  return _lookup_parent(ino, perms, parent);
+}
 
 /**
  * Populate the parent dentry for `ino`, provided it is
  * a child of `parent`.
  */
-int Client::lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
+int Client::_lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
 {
   assert(parent->is_dir());
-
-  Mutex::Locker lock(client_lock);
   ldout(cct, 3) << "lookup_name enter(" << ino->ino << ")" << dendl;
 
   if (unmounting)
@@ -8340,6 +8371,11 @@ int Client::lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
   return r;
 }
 
+int Client::lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
+{
+  Mutex::Locker lock(client_lock);
+  return _lookup_name(ino, parent, perms);
+}
 
  Fh *Client::_create_fh(Inode *in, int flags, int cmode, const UserPerm& perms)
 {
@@ -8383,7 +8419,7 @@ int Client::_release_fh(Fh *f)
   //ldout(cct, 3) << "op: client->close(open_files[ " << fh << " ]);" << dendl;
   //ldout(cct, 3) << "op: open_files.erase( " << fh << " );" << dendl;
   Inode *in = f->inode.get();
-  ldout(cct, 5) << "_release_fh " << f << " mode " << f->mode << " on " << *in << dendl;
+  ldout(cct, 8) << "_release_fh " << f << " mode " << f->mode << " on " << *in << dendl;
 
   in->unset_deleg(f);
 
@@ -8477,7 +8513,7 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp,
 
       result = get_caps(in, need, want, &have, -1);
       if (result < 0) {
-       ldout(cct, 1) << "Unable to get caps after open of inode " << *in <<
+       ldout(cct, 8) << "Unable to get caps after open of inode " << *in <<
                          " . Denying open: " <<
                          cpp_strerror(result) << dendl;
        in->put_open_ref(cmode);
@@ -8608,7 +8644,7 @@ loff_t Client::_lseek(Fh *f, loff_t offset, int whence)
     ceph_abort();
   }
 
-  ldout(cct, 3) << "_lseek(" << f << ", " << offset << ", " << whence << ") = " << f->pos << dendl;
+  ldout(cct, 8) << "_lseek(" << f << ", " << offset << ", " << whence << ") = " << f->pos << dendl;
   return f->pos;
 }
 
@@ -9416,14 +9452,14 @@ int Client::fsync(int fd, bool syncdataonly)
     // The IOs in this fsync were okay, but maybe something happened
     // in the background that we shoudl be reporting?
     r = f->take_async_err();
-    ldout(cct, 3) << "fsync(" << fd << ", " << syncdataonly
+    ldout(cct, 5) << "fsync(" << fd << ", " << syncdataonly
                   << ") = 0, async_err = " << r << dendl;
   } else {
     // Assume that an error we encountered during fsync, even reported
     // synchronously, would also have applied the error to the Fh, and we
     // should clear it here to avoid returning the same error again on next
     // call.
-    ldout(cct, 3) << "fsync(" << fd << ", " << syncdataonly << ") = "
+    ldout(cct, 5) << "fsync(" << fd << ", " << syncdataonly << ") = "
                   << r << dendl;
     f->take_async_err();
   }
@@ -9440,7 +9476,7 @@ int Client::_fsync(Inode *in, bool syncdataonly)
   ceph_tid_t flush_tid = 0;
   InodeRef tmp_ref;
 
-  ldout(cct, 3) << "_fsync on " << *in << " " << (syncdataonly ? "(dataonly)":"(data+metadata)") << dendl;
+  ldout(cct, 8) << "_fsync on " << *in << " " << (syncdataonly ? "(dataonly)":"(data+metadata)") << dendl;
   
   if (cct->_conf->client_oc) {
     object_cacher_completion = new C_SafeCond(&lock, &cond, &done, &r);
@@ -9490,7 +9526,7 @@ int Client::_fsync(Inode *in, bool syncdataonly)
 
     ldout(cct, 10) << "ino " << in->ino << " has no uncommitted writes" << dendl;
   } else {
-    ldout(cct, 1) << "ino " << in->ino << " failed to commit to disk! "
+    ldout(cct, 8) << "ino " << in->ino << " failed to commit to disk! "
                  << cpp_strerror(-r) << dendl;
   }
 
@@ -9499,7 +9535,7 @@ int Client::_fsync(Inode *in, bool syncdataonly)
 
 int Client::_fsync(Fh *f, bool syncdataonly)
 {
-  ldout(cct, 3) << "_fsync(" << f << ", " << (syncdataonly ? "dataonly)":"data+metadata)") << dendl;
+  ldout(cct, 8) << "_fsync(" << f << ", " << (syncdataonly ? "dataonly)":"data+metadata)") << dendl;
   return _fsync(f->inode.get(), syncdataonly);
 }
 
@@ -9519,7 +9555,7 @@ int Client::fstat(int fd, struct stat *stbuf, const UserPerm& perms, int mask)
   if (r < 0)
     return r;
   fill_stat(f->inode, stbuf, NULL);
-  ldout(cct, 3) << "fstat(" << fd << ", " << stbuf << ") = " << r << dendl;
+  ldout(cct, 5) << "fstat(" << fd << ", " << stbuf << ") = " << r << dendl;
   return r;
 }
 
@@ -10325,6 +10361,51 @@ int Client::ll_lookup(Inode *parent, const char *name, struct stat *attr,
   return r;
 }
 
+int Client::ll_lookup_inode(
+    struct inodeno_t ino,
+    const UserPerm& perms,
+    Inode **inode)
+{
+  Mutex::Locker lock(client_lock);
+  ldout(cct, 3) << "ll_lookup_inode " << ino  << dendl;
+   
+  // Num1: get inode and *inode
+  int r = _lookup_ino(ino, perms, inode);
+  if (r) {
+    return r;
+  }
+  assert(inode != NULL);
+  assert(*inode != NULL);
+
+  // Num2: Request the parent inode, so that we can look up the name
+  Inode *parent;
+  r = _lookup_parent(*inode, perms, &parent);
+  if (r && r != -EINVAL) {
+    // Unexpected error
+    _ll_forget(*inode, 1);  
+    return r;
+  } else if (r == -EINVAL) {
+    // EINVAL indicates node without parents (root), drop out now
+    // and don't try to look up the non-existent dentry.
+    return 0;
+  }
+  // FIXME: I don't think this works; lookup_parent() returns 0 if the parent
+  // is already in cache
+  assert(parent != NULL);
+
+  // Num3: Finally, get the name (dentry) of the requested inode
+  r = _lookup_name(*inode, parent, perms);
+  if (r) {
+    // Unexpected error
+    _ll_forget(parent, 1);
+    _ll_forget(*inode, 1);
+    return r;
+  }
+
+  _ll_forget(parent, 1);
+  return 0;
+}
+
 int Client::ll_lookupx(Inode *parent, const char *name, Inode **out,
                       struct ceph_statx *stx, unsigned want, unsigned flags,
                       const UserPerm& perms)
@@ -10431,6 +10512,7 @@ int Client::_ll_put(Inode *in, int num)
 void Client::_ll_drop_pins()
 {
   ldout(cct, 10) << "_ll_drop_pins" << dendl;
+  std::set<InodeRef> to_be_put; //this set will be deconstructed item by item when exit
   ceph::unordered_map<vinodeno_t, Inode*>::iterator next;
   for (ceph::unordered_map<vinodeno_t, Inode*>::iterator it = inode_map.begin();
        it != inode_map.end();
@@ -10438,17 +10520,18 @@ void Client::_ll_drop_pins()
     Inode *in = it->second;
     next = it;
     ++next;
-    if (in->ll_ref)
+    if (in->ll_ref){
+      to_be_put.insert(in);
       _ll_put(in, in->ll_ref);
+    }
   }
 }
 
-bool Client::ll_forget(Inode *in, int count)
+bool Client::_ll_forget(Inode *in, int count)
 {
-  Mutex::Locker lock(client_lock);
   inodeno_t ino = _get_inodeno(in);
 
-  ldout(cct, 3) << "ll_forget " << ino << " " << count << dendl;
+  ldout(cct, 8) << "ll_forget " << ino << " " << count << dendl;
   tout(cct) << "ll_forget" << std::endl;
   tout(cct) << ino.val << std::endl;
   tout(cct) << count << std::endl;
@@ -10473,6 +10556,12 @@ bool Client::ll_forget(Inode *in, int count)
   return last;
 }
 
+bool Client::ll_forget(Inode *in, int count)
+{
+  Mutex::Locker lock(client_lock);
+  return _ll_forget(in, count);
+}
+
 bool Client::ll_put(Inode *in)
 {
   /* ll_forget already takes the lock */
@@ -10520,7 +10609,7 @@ int Client::_ll_getattr(Inode *in, int caps, const UserPerm& perms)
 {
   vinodeno_t vino = _get_vino(in);
 
-  ldout(cct, 3) << "ll_getattr " << vino << dendl;
+  ldout(cct, 8) << "ll_getattr " << vino << dendl;
   tout(cct) << "ll_getattr" << std::endl;
   tout(cct) << vino.ino.val << std::endl;
 
@@ -10570,7 +10659,7 @@ int Client::_ll_setattrx(Inode *in, struct ceph_statx *stx, int mask,
 {
   vinodeno_t vino = _get_vino(in);
 
-  ldout(cct, 3) << "ll_setattrx " << vino << " mask " << hex << mask << dec
+  ldout(cct, 8) << "ll_setattrx " << vino << " mask " << hex << mask << dec
                << dendl;
   tout(cct) << "ll_setattrx" << std::endl;
   tout(cct) << vino.ino.val << std::endl;
@@ -10878,7 +10967,7 @@ int Client::_getxattr(Inode *in, const char *name, void *value, size_t size,
     }
   }
  out:
-  ldout(cct, 3) << "_getxattr(" << in->ino << ", \"" << name << "\", " << size << ") = " << r << dendl;
+  ldout(cct, 8) << "_getxattr(" << in->ino << ", \"" << name << "\", " << size << ") = " << r << dendl;
   return r;
 }
 
@@ -10958,7 +11047,7 @@ int Client::_listxattr(Inode *in, char *name, size_t size,
        r = -ERANGE;
     }
   }
-  ldout(cct, 3) << "_listxattr(" << in->ino << ", " << size << ") = " << r << dendl;
+  ldout(cct, 8) << "_listxattr(" << in->ino << ", " << size << ") = " << r << dendl;
   return r;
 }
 
@@ -11198,7 +11287,7 @@ int Client::_removexattr(Inode *in, const char *name, const UserPerm& perms)
   int res = make_request(req, perms);
 
   trim_cache();
-  ldout(cct, 3) << "_removexattr(" << in->ino << ", \"" << name << "\") = " << res << dendl;
+  ldout(cct, 8) << "_removexattr(" << in->ino << ", \"" << name << "\") = " << res << dendl;
   return res;
 }
 
@@ -11495,7 +11584,7 @@ int Client::ll_readlink(Inode *in, char *buf, size_t buflen, const UserPerm& per
 int Client::_mknod(Inode *dir, const char *name, mode_t mode, dev_t rdev,
                   const UserPerm& perms, InodeRef *inp)
 {
-  ldout(cct, 3) << "_mknod(" << dir->ino << " " << name << ", 0" << oct
+  ldout(cct, 8) << "_mknod(" << dir->ino << " " << name << ", 0" << oct
                << mode << dec << ", " << rdev << ", uid " << perms.uid()
                << ", gid " << perms.gid() << ")" << dendl;
 
@@ -11538,7 +11627,7 @@ int Client::_mknod(Inode *dir, const char *name, mode_t mode, dev_t rdev,
 
   trim_cache();
 
-  ldout(cct, 3) << "mknod(" << path << ", 0" << oct << mode << dec << ") = " << res << dendl;
+  ldout(cct, 8) << "mknod(" << path << ", 0" << oct << mode << dec << ") = " << res << dendl;
   return res;
 
  fail:
@@ -11627,7 +11716,7 @@ int Client::_create(Inode *dir, const char *name, int flags, mode_t mode,
                    int object_size, const char *data_pool, bool *created,
                    const UserPerm& perms)
 {
-  ldout(cct, 3) << "_create(" << dir->ino << " " << name << ", 0" << oct <<
+  ldout(cct, 8) << "_create(" << dir->ino << " " << name << ", 0" << oct <<
     mode << dec << ")" << dendl;
 
   if (strlen(name) > NAME_MAX)
@@ -11703,7 +11792,7 @@ int Client::_create(Inode *dir, const char *name, int flags, mode_t mode,
  reply_error:
   trim_cache();
 
-  ldout(cct, 3) << "create(" << path << ", 0" << oct << mode << dec 
+  ldout(cct, 8) << "create(" << path << ", 0" << oct << mode << dec
                << " layout " << stripe_unit
                << ' ' << stripe_count
                << ' ' << object_size
@@ -11719,7 +11808,7 @@ int Client::_create(Inode *dir, const char *name, int flags, mode_t mode,
 int Client::_mkdir(Inode *dir, const char *name, mode_t mode, const UserPerm& perm,
                   InodeRef *inp)
 {
-  ldout(cct, 3) << "_mkdir(" << dir->ino << " " << name << ", 0" << oct
+  ldout(cct, 8) << "_mkdir(" << dir->ino << " " << name << ", 0" << oct
                << mode << dec << ", uid " << perm.uid()
                << ", gid " << perm.gid() << ")" << dendl;
 
@@ -11764,7 +11853,7 @@ int Client::_mkdir(Inode *dir, const char *name, mode_t mode, const UserPerm& pe
 
   trim_cache();
 
-  ldout(cct, 3) << "_mkdir(" << path << ", 0" << oct << mode << dec << ") = " << res << dendl;
+  ldout(cct, 8) << "_mkdir(" << path << ", 0" << oct << mode << dec << ") = " << res << dendl;
   return res;
 
  fail:
@@ -11849,7 +11938,7 @@ int Client::ll_mkdirx(Inode *parent, const char *name, mode_t mode, Inode **out,
 int Client::_symlink(Inode *dir, const char *name, const char *target,
                     const UserPerm& perms, InodeRef *inp)
 {
-  ldout(cct, 3) << "_symlink(" << dir->ino << " " << name << ", " << target
+  ldout(cct, 8) << "_symlink(" << dir->ino << " " << name << ", " << target
                << ", uid " << perms.uid() << ", gid " << perms.gid() << ")"
                << dendl;
 
@@ -11883,7 +11972,7 @@ int Client::_symlink(Inode *dir, const char *name, const char *target,
   res = make_request(req, perms, inp);
 
   trim_cache();
-  ldout(cct, 3) << "_symlink(\"" << path << "\", \"" << target << "\") = " <<
+  ldout(cct, 8) << "_symlink(\"" << path << "\", \"" << target << "\") = " <<
     res << dendl;
   return res;
 
@@ -11967,7 +12056,7 @@ int Client::ll_symlinkx(Inode *parent, const char *name, const char *value,
 
 int Client::_unlink(Inode *dir, const char *name, const UserPerm& perm)
 {
-  ldout(cct, 3) << "_unlink(" << dir->ino << " " << name
+  ldout(cct, 8) << "_unlink(" << dir->ino << " " << name
                << " uid " << perm.uid() << " gid " << perm.gid()
                << ")" << dendl;
 
@@ -12007,7 +12096,7 @@ int Client::_unlink(Inode *dir, const char *name, const UserPerm& perm)
   res = make_request(req, perm);
 
   trim_cache();
-  ldout(cct, 3) << "unlink(" << path << ") = " << res << dendl;
+  ldout(cct, 8) << "unlink(" << path << ") = " << res << dendl;
   return res;
 
  fail:
@@ -12039,7 +12128,7 @@ int Client::ll_unlink(Inode *in, const char *name, const UserPerm& perm)
 
 int Client::_rmdir(Inode *dir, const char *name, const UserPerm& perms)
 {
-  ldout(cct, 3) << "_rmdir(" << dir->ino << " " << name << " uid "
+  ldout(cct, 8) << "_rmdir(" << dir->ino << " " << name << " uid "
                << perms.uid() << " gid " << perms.gid() << ")" << dendl;
 
   if (dir->snapid != CEPH_NOSNAP && dir->snapid != CEPH_SNAPDIR) {
@@ -12083,7 +12172,7 @@ int Client::_rmdir(Inode *dir, const char *name, const UserPerm& perms)
   res = make_request(req, perms);
 
   trim_cache();
-  ldout(cct, 3) << "rmdir(" << path << ") = " << res << dendl;
+  ldout(cct, 8) << "rmdir(" << path << ") = " << res << dendl;
   return res;
 
  fail:
@@ -12116,7 +12205,7 @@ int Client::ll_rmdir(Inode *in, const char *name, const UserPerm& perms)
 
 int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const char *toname, const UserPerm& perm)
 {
-  ldout(cct, 3) << "_rename(" << fromdir->ino << " " << fromname << " to "
+  ldout(cct, 8) << "_rename(" << fromdir->ino << " " << fromname << " to "
                << todir->ino << " " << toname
                << " uid " << perm.uid() << " gid " << perm.gid() << ")"
                << dendl;
@@ -12211,7 +12300,7 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
   // renamed item from our cache
 
   trim_cache();
-  ldout(cct, 3) << "_rename(" << from << ", " << to << ") = " << res << dendl;
+  ldout(cct, 8) << "_rename(" << from << ", " << to << ") = " << res << dendl;
   return res;
 
  fail:
@@ -12252,7 +12341,7 @@ int Client::ll_rename(Inode *parent, const char *name, Inode *newparent,
 
 int Client::_link(Inode *in, Inode *dir, const char *newname, const UserPerm& perm, InodeRef *inp)
 {
-  ldout(cct, 3) << "_link(" << in->ino << " to " << dir->ino << " " << newname
+  ldout(cct, 8) << "_link(" << in->ino << " to " << dir->ino << " " << newname
                << " uid " << perm.uid() << " gid " << perm.gid() << ")" << dendl;
 
   if (strlen(newname) > NAME_MAX)
@@ -12287,7 +12376,7 @@ int Client::_link(Inode *in, Inode *dir, const char *newname, const UserPerm& pe
   ldout(cct, 10) << "link result is " << res << dendl;
 
   trim_cache();
-  ldout(cct, 3) << "link(" << existing << ", " << path << ") = " << res << dendl;
+  ldout(cct, 8) << "link(" << existing << ", " << path << ") = " << res << dendl;
   return res;
 
  fail:
@@ -12527,7 +12616,7 @@ int Client::_ll_create(Inode *parent, const char *name, mode_t mode,
 
   vinodeno_t vparent = _get_vino(parent);
 
-  ldout(cct, 3) << "_ll_create " << vparent << " " << name << " 0" << oct <<
+  ldout(cct, 8) << "_ll_create " << vparent << " " << name << " 0" << oct <<
     mode << dec << " " << ceph_flags_sys2wire(flags) << ", uid " << perms.uid()
                << ", gid " << perms.gid() << dendl;
   tout(cct) << "ll_create" << std::endl;
@@ -12594,7 +12683,7 @@ out:
 
   tout(cct) << (unsigned long)*fhp << std::endl;
   tout(cct) << ino << std::endl;
-  ldout(cct, 3) << "_ll_create " << vparent << " " << name << " 0" << oct <<
+  ldout(cct, 8) << "_ll_create " << vparent << " " << name << " 0" << oct <<
     mode << dec << " " << ceph_flags_sys2wire(flags) << " = " << r << " (" <<
     *fhp << " " << hex << ino << dec << ")" << dendl;