]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/tools/cephfs/JournalTool.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / tools / cephfs / JournalTool.cc
index 0f3b15752501c70fb4a003d3b896b35cc2a6f172..6bca9bb08167b960111efe2b2f727cffef15f950 100644 (file)
@@ -37,7 +37,7 @@
 #undef dout_prefix
 #define dout_prefix *_dout << __func__ << ": "
 
-
+using namespace std;
 
 void JournalTool::usage()
 {
@@ -45,26 +45,31 @@ void JournalTool::usage()
     << "  cephfs-journal-tool [options] journal <command>\n"
     << "    <command>:\n"
     << "      inspect\n"
-    << "      import <path>\n"
+    << "      import <path> [--force]\n"
     << "      export <path>\n"
     << "      reset [--force]\n"
-    << "  cephfs-journal-tool [options] header <get|set <field> <value>\n"
-    << "  cephfs-journal-tool [options] event <effect> <selector> <output>\n"
+    << "  cephfs-journal-tool [options] header <get|set> <field> <value>\n"
+    << "    <field>: [trimmed_pos|expire_pos|write_pos|pool_id]\n"
+    << "  cephfs-journal-tool [options] event <effect> <selector> <output> [special options]\n"
     << "    <selector>:\n"
     << "      --range=<start>..<end>\n"
     << "      --path=<substring>\n"
     << "      --inode=<integer>\n"
     << "      --type=<UPDATE|OPEN|SESSION...><\n"
     << "      --frag=<ino>.<frag> [--dname=<dentry string>]\n"
-    << "      --alternate-pool=pool-name\n"
     << "      --client=<session id integer>\n"
-    << "    <effect>: [get|apply|recover_dentries|splice]\n"
+    << "    <effect>: [get|recover_dentries|splice]\n"
     << "    <output>: [summary|list|binary|json] [--path <path>]\n"
     << "\n"
-    << "Options:\n"
-    << "  --rank=filesystem:mds-rank  Journal rank (required if multiple\n"
-    << "                              file systems, default is rank 0 on\n"
-    << "                              the only filesystem otherwise.\n";
+    << "General options:\n"
+    << "  --rank=filesystem:{mds-rank|all} journal rank or \"all\" ranks (mandatory)\n"
+    << "  --journal=<mdlog|purge_queue>  Journal type (purge_queue means\n"
+    << "                                 this journal is used to queue for purge operation,\n"
+    << "                                 default is mdlog, and only mdlog support event mode)\n"
+    << "\n"
+    << "Special options\n"
+    << "  --alternate-pool <name>     Alternative metadata pool to target\n"
+    << "                              when using recover_dentries.\n";
 
   generic_client_usage();
 }
@@ -81,20 +86,30 @@ int JournalTool::main(std::vector<const char*> &argv)
   // Common arg parsing
   // ==================
   if (argv.empty()) {
-    usage();
+    cerr << "missing positional argument" << std::endl;
     return -EINVAL;
   }
 
   std::vector<const char*>::iterator arg = argv.begin();
 
   std::string rank_str;
-  if(!ceph_argparse_witharg(argv, arg, &rank_str, "--rank", (char*)NULL)) {
-    // Default: act on rank 0.  Will give the user an error if they
-    // try invoking this way when they have more than one filesystem.
-    rank_str = "0";
+  if (!ceph_argparse_witharg(argv, arg, &rank_str, "--rank", (char*)NULL)) {
+    derr << "missing mandatory \"--rank\" argument" << dendl;
+    return -EINVAL;
+  }
+
+  if (!ceph_argparse_witharg(argv, arg, &type, "--journal", (char*)NULL)) {
+    // Default is mdlog
+    type = "mdlog";
+  }
+  
+  r = validate_type(type);
+  if (r != 0) {
+    derr << "journal type is not correct." << dendl;
+    return r;
   }
 
-  r = role_selector.parse(*fsmap, rank_str);
+  r = role_selector.parse(*fsmap, rank_str, false);
   if (r != 0) {
     derr << "Couldn't determine MDS rank." << dendl;
     return r;
@@ -124,7 +139,7 @@ int JournalTool::main(std::vector<const char*> &argv)
   }
  
   auto fs = fsmap->get_filesystem(role_selector.get_ns());
-  assert(fs != nullptr);
+  ceph_assert(fs != nullptr);
   int64_t const pool_id = fs->mds_map.get_metadata_pool();
   dout(4) << "JournalTool: resolving pool " << pool_id << dendl;
   std::string pool_name;
@@ -136,23 +151,36 @@ int JournalTool::main(std::vector<const char*> &argv)
 
   dout(4) << "JournalTool: creating IoCtx.." << dendl;
   r = rados.ioctx_create(pool_name.c_str(), input);
-  assert(r == 0);
+  ceph_assert(r == 0);
   output.dup(input);
 
   // Execution
   // =========
-  for (auto role : role_selector.get_roles()) {
+  // journal and header are general journal mode
+  // event mode is only specific for mdlog
+  auto roles = role_selector.get_roles();
+  if (roles.size() > 1) {
+    const std::string &command = argv[0];
+    bool allowed = can_execute_for_all_ranks(mode, command);
+    if (!allowed) {
+      derr << "operation not allowed for all ranks" << dendl;
+      return -EINVAL;
+    }
+
+    all_ranks = true;
+  }
+  for (auto role : roles) {
     rank = role.rank;
+    std::vector<const char *> rank_argv(argv);
     dout(4) << "Executing for rank " << rank << dendl;
     if (mode == std::string("journal")) {
-      r = main_journal(argv);
+      r = main_journal(rank_argv);
     } else if (mode == std::string("header")) {
-      r = main_header(argv);
+      r = main_header(rank_argv);
     } else if (mode == std::string("event")) {
-      r = main_event(argv);
+      r = main_event(rank_argv);
     } else {
-      derr << "Bad command '" << mode << "'" << dendl;
-      usage();
+      cerr << "Bad command '" << mode << "'" << std::endl;
       return -EINVAL;
     }
 
@@ -164,6 +192,30 @@ int JournalTool::main(std::vector<const char*> &argv)
   return r;
 }
 
+int JournalTool::validate_type(const std::string &type)
+{
+  if (type == "mdlog" || type == "purge_queue") {
+    return 0;
+  }
+  return -1;
+}
+
+std::string JournalTool::gen_dump_file_path(const std::string &prefix) {
+  if (!all_ranks) {
+    return prefix;
+  }
+
+  return prefix + "." + std::to_string(rank);
+}
+
+bool JournalTool::can_execute_for_all_ranks(const std::string &mode,
+                                            const std::string &command) {
+  if (mode == "journal" && command == "import") {
+    return false;
+  }
+
+  return true;
+}
 
 /**
  * Handle arguments for 'journal' mode
@@ -172,13 +224,27 @@ int JournalTool::main(std::vector<const char*> &argv)
  */
 int JournalTool::main_journal(std::vector<const char*> &argv)
 {
+  if (argv.empty()) {
+    derr << "Missing journal command, please see help" << dendl;
+    return -EINVAL;
+  }
+
   std::string command = argv[0];
   if (command == "inspect") {
     return journal_inspect();
   } else if (command == "export" || command == "import") {
+    bool force = false;
     if (argv.size() >= 2) {
       std::string const path = argv[1];
-      return journal_export(path, command == "import");
+      if (argv.size() == 3) {
+        if (std::string(argv[2]) == "--force") {
+          force = true;
+        } else {
+          std::cerr << "Unknown argument " << argv[1] << std::endl;
+          return -EINVAL;
+        }
+      }
+      return journal_export(path, command == "import", force);
     } else {
       derr << "Missing path" << dendl;
       return -EINVAL;
@@ -190,12 +256,10 @@ int JournalTool::main_journal(std::vector<const char*> &argv)
         force = true;
       } else {
         std::cerr << "Unknown argument " << argv[1] << std::endl;
-        usage();
         return -EINVAL;
       }
     } else if (argv.size() > 2) {
       std::cerr << "Too many arguments!" << std::endl;
-      usage();
       return -EINVAL;
     }
     return journal_reset(force);
@@ -213,8 +277,8 @@ int JournalTool::main_journal(std::vector<const char*> &argv)
  */
 int JournalTool::main_header(std::vector<const char*> &argv)
 {
-  JournalFilter filter;
-  JournalScanner js(input, rank, filter);
+  JournalFilter filter(type);
+  JournalScanner js(input, rank, type, filter);
   int r = js.scan(false);
   if (r < 0) {
     std::cerr << "Unable to scan journal" << std::endl;
@@ -229,11 +293,11 @@ int JournalTool::main_header(std::vector<const char*> &argv)
     derr << "Header could not be read!" << dendl;
     return -ENOENT;
   } else {
-    assert(js.header != NULL);
+    ceph_assert(js.header != NULL);
   }
 
-  if (argv.size() == 0) {
-    derr << "Invalid header command, must be [get|set]" << dendl;
+  if (argv.empty()) {
+    derr << "Missing header command, must be [get|set]" << dendl;
     return -EINVAL;
   }
   std::vector<const char *>::iterator arg = argv.begin();
@@ -258,7 +322,7 @@ int JournalTool::main_header(std::vector<const char*> &argv)
 
     std::string const value_str = *arg;
     arg = argv.erase(arg);
-    assert(argv.empty());
+    ceph_assert(argv.empty());
 
     std::string parse_err;
     uint64_t new_val = strict_strtoll(value_str.c_str(), 0, &parse_err);
@@ -274,6 +338,8 @@ int JournalTool::main_header(std::vector<const char*> &argv)
       field = &(js.header->expire_pos);
     } else if (field_name == "write_pos") {
       field = &(js.header->write_pos);
+    } else if (field_name == "pool_id") {
+      field = (uint64_t*)(&(js.header->layout.pool_id));
     } else {
       derr << "Invalid field '" << field_name << "'" << dendl;
       return -EINVAL;
@@ -284,7 +350,7 @@ int JournalTool::main_header(std::vector<const char*> &argv)
 
     dout(4) << "Writing object..." << dendl;
     bufferlist header_bl;
-    ::encode(*(js.header), header_bl);
+    encode(*(js.header), header_bl);
     output.write_full(js.obj_name(0), header_bl);
     dout(4) << "Write complete." << dendl;
     std::cout << "Successfully updated header." << std::endl;
@@ -306,24 +372,39 @@ int JournalTool::main_event(std::vector<const char*> &argv)
 {
   int r;
 
+  if (argv.empty()) {
+    derr << "Missing event command, please see help" << dendl;
+    return -EINVAL;
+  }
+
   std::vector<const char*>::iterator arg = argv.begin();
+  bool dry_run = false;
 
   std::string command = *(arg++);
-  if (command != "get" && command != "apply" && command != "splice" && command != "recover_dentries") {
+  if (command != "get" && command != "splice" && command != "recover_dentries") {
     derr << "Unknown argument '" << command << "'" << dendl;
-    usage();
     return -EINVAL;
   }
 
+  if (command == "recover_dentries") {
+    if (type != "mdlog") {
+      derr << "journaler for " << type << " can't do \"recover_dentries\"." << dendl;
+      return -EINVAL;
+    } else {
+      if (arg != argv.end() && ceph_argparse_flag(argv, arg, "--dry_run", (char*)NULL)) {
+        dry_run = true;
+      }
+    }
+  }
+
   if (arg == argv.end()) {
     derr << "Incomplete command line" << dendl;
-    usage();
     return -EINVAL;
   }
 
   // Parse filter options
   // ====================
-  JournalFilter filter;
+  JournalFilter filter(type);
   r = filter.parse_args(argv, arg);
   if (r) {
     return r;
@@ -332,15 +413,14 @@ int JournalTool::main_event(std::vector<const char*> &argv)
   // Parse output options
   // ====================
   if (arg == argv.end()) {
-    derr << "Missing output command" << dendl;
-    usage();
+    cerr << "Missing output command" << std::endl;
+    return -EINVAL;
   }
   std::string output_style = *(arg++);
   if (output_style != "binary" && output_style != "json" &&
       output_style != "summary" && output_style != "list") {
-      derr << "Unknown argument: '" << output_style << "'" << dendl;
-      usage();
-      return -EINVAL;
+    cerr << "Unknown argument: '" << output_style << "'" << std::endl;
+    return -EINVAL;
   }
 
   std::string output_path = "dump";
@@ -352,43 +432,25 @@ int JournalTool::main_event(std::vector<const char*> &argv)
                                     nullptr)) {
       dout(1) << "Using alternate pool " << arg_str << dendl;
       int r = rados.ioctx_create(arg_str.c_str(), output);
-      assert(r == 0);
+      ceph_assert(r == 0);
       other_pool = true;
     } else {
-      derr << "Unknown argument: '" << *arg << "'" << dendl;
-      usage();
+      cerr << "Unknown argument: '" << *arg << "'" << std::endl;
       return -EINVAL;
     }
   }
 
+  const std::string dump_path = gen_dump_file_path(output_path);
+
   // Execute command
   // ===============
-  JournalScanner js(input, rank, filter);
+  JournalScanner js(input, rank, type, filter);
   if (command == "get") {
     r = js.scan();
     if (r) {
       derr << "Failed to scan journal (" << cpp_strerror(r) << ")" << dendl;
       return r;
     }
-  } else if (command == "apply") {
-    r = js.scan();
-    if (r) {
-      derr << "Failed to scan journal (" << cpp_strerror(r) << ")" << dendl;
-      return r;
-    }
-
-    bool dry_run = false;
-    if (arg != argv.end() && ceph_argparse_flag(argv, arg, "--dry_run", (char*)NULL)) {
-      dry_run = true;
-    }
-
-    for (JournalScanner::EventMap::iterator i = js.events.begin(); i != js.events.end(); ++i) {
-      LogEvent *le = i->second.log_event;
-      EMetaBlob const *mb = le->get_metablob();
-      if (mb) {
-        replay_offline(*mb, dry_run);
-      }
-    }
   } else if (command == "recover_dentries") {
     r = js.scan();
     if (r) {
@@ -396,21 +458,16 @@ int JournalTool::main_event(std::vector<const char*> &argv)
       return r;
     }
 
-    bool dry_run = false;
-    if (arg != argv.end() && ceph_argparse_flag(argv, arg, "--dry_run", (char*)NULL)) {
-      dry_run = true;
-    }
-
     /**
      * Iterate over log entries, attempting to scavenge from each one
      */
     std::set<inodeno_t> consumed_inos;
     for (JournalScanner::EventMap::iterator i = js.events.begin();
          i != js.events.end(); ++i) {
-      LogEvent *le = i->second.log_event;
+      auto& le = i->second.log_event;
       EMetaBlob const *mb = le->get_metablob();
       if (mb) {
-        int scav_r = scavenge_dentries(*mb, dry_run, &consumed_inos);
+        int scav_r = recover_dentries(*mb, dry_run, &consumed_inos);
         if (scav_r) {
           dout(1) << "Error processing event 0x" << std::hex << i->first << std::dec
                   << ": " << cpp_strerror(scav_r) << ", continuing..." << dendl;
@@ -490,14 +547,13 @@ int JournalTool::main_event(std::vector<const char*> &argv)
 
 
   } else {
-    derr << "Unknown argument '" << command << "'" << dendl;
-    usage();
+    cerr << "Unknown argument '" << command << "'" << std::endl;
     return -EINVAL;
   }
 
   // Generate output
   // ===============
-  EventOutput output(js, output_path);
+  EventOutput output(js, dump_path);
   int output_result = 0;
   if (output_style == "binary") {
       output_result = output.binary();
@@ -528,8 +584,8 @@ int JournalTool::journal_inspect()
 {
   int r;
 
-  JournalFilter filter;
-  JournalScanner js(input, rank, filter);
+  JournalFilter filter(type);
+  JournalScanner js(input, rank, type, filter);
   r = js.scan();
   if (r) {
     std::cerr << "Failed to scan journal (" << cpp_strerror(r) << ")" << std::endl;
@@ -550,10 +606,10 @@ int JournalTool::journal_inspect()
  * back to manually listing RADOS objects and extracting them, which
  * they can do with the ``rados`` CLI.
  */
-int JournalTool::journal_export(std::string const &path, bool import)
+int JournalTool::journal_export(std::string const &path, bool import, bool force)
 {
   int r = 0;
-  JournalScanner js(input, rank);
+  JournalScanner js(input, rank, type);
 
   if (!import) {
     /*
@@ -576,17 +632,17 @@ int JournalTool::journal_export(std::string const &path, bool import)
    */
   {
     Dumper dumper;
-    r = dumper.init(mds_role_t(role_selector.get_ns(), rank));
+    r = dumper.init(mds_role_t(role_selector.get_ns(), rank), type);
     if (r < 0) {
       derr << "dumper::init failed: " << cpp_strerror(r) << dendl;
       return r;
     }
     if (import) {
-      r = dumper.undump(path.c_str());
+      r = dumper.undump(path.c_str(), force);
     } else {
-      r = dumper.dump(path.c_str());
+      const std::string ex_path = gen_dump_file_path(path);
+      r = dumper.dump(ex_path.c_str());
     }
-    dumper.shutdown();
   }
 
   return r;
@@ -600,18 +656,17 @@ int JournalTool::journal_reset(bool hard)
 {
   int r = 0;
   Resetter resetter;
-  r = resetter.init();
+  r = resetter.init(mds_role_t(role_selector.get_ns(), rank), type, hard);
   if (r < 0) {
     derr << "resetter::init failed: " << cpp_strerror(r) << dendl;
     return r;
   }
 
   if (hard) {
-    r = resetter.reset_hard(mds_role_t(role_selector.get_ns(), rank));
+    r = resetter.reset_hard();
   } else {
-    r = resetter.reset(mds_role_t(role_selector.get_ns(), rank));
+    r = resetter.reset();
   }
-  resetter.shutdown();
 
   return r;
 }
@@ -634,20 +689,17 @@ int JournalTool::journal_reset(bool hard)
  * @param consumed_inos output, populated with any inos inserted
  * @returns 0 on success, else negative error code
  */
-int JournalTool::scavenge_dentries(
+int JournalTool::recover_dentries(
     EMetaBlob const &metablob,
     bool const dry_run,
     std::set<inodeno_t> *consumed_inos)
 {
-  assert(consumed_inos != NULL);
+  ceph_assert(consumed_inos != NULL);
 
   int r = 0;
 
   // Replay fullbits (dentry+inode)
-  for (list<dirfrag_t>::const_iterator lp = metablob.lump_order.begin();
-       lp != metablob.lump_order.end(); ++lp)
-  {
-    dirfrag_t const &frag = *lp;
+  for (const auto& frag : metablob.lump_order) {
     EMetaBlob::dirlump const &lump = metablob.lump_map.find(frag)->second;
     lump._decode_bits();
     object_t frag_oid = InodeStore::get_object_name(frag.ino, frag.frag, "");
@@ -674,13 +726,13 @@ int JournalTool::scavenge_dentries(
     } else if (r == 0) {
       // Conditionally update existing omap header
       fnode_t old_fnode;
-      bufferlist::iterator old_fnode_iter = old_fnode_bl.begin();
+      auto old_fnode_iter = old_fnode_bl.cbegin();
       try {
         old_fnode.decode(old_fnode_iter);
         dout(4) << "frag " << frag_oid.name << " fnode old v" <<
-          old_fnode.version << " vs new v" << lump.fnode.version << dendl;
+          old_fnode.version << " vs new v" << lump.fnode->version << dendl;
         old_fnode_version = old_fnode.version;
-        write_fnode = old_fnode_version < lump.fnode.version;
+        write_fnode = old_fnode_version < lump.fnode->version;
       } catch (const buffer::error &err) {
         dout(1) << "frag " << frag_oid.name
                 << " is corrupt, overwriting" << dendl;
@@ -696,7 +748,7 @@ int JournalTool::scavenge_dentries(
     if ((other_pool || write_fnode) && !dry_run) {
       dout(4) << "writing fnode to omap header" << dendl;
       bufferlist fnode_bl;
-      lump.fnode.encode(fnode_bl);
+      lump.fnode->encode(fnode_bl);
       if (!other_pool || frag.ino >= MDS_INO_SYSTEM_BASE) {
        r = output.omap_set_header(frag_oid.name, fnode_bl);
       }
@@ -710,12 +762,7 @@ int JournalTool::scavenge_dentries(
     std::set<std::string> read_keys;
 
     // Compose list of potentially-existing dentries we would like to fetch
-    list<ceph::shared_ptr<EMetaBlob::fullbit> > const &fb_list =
-      lump.get_dfull();
-    for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator fbi =
-        fb_list.begin(); fbi != fb_list.end(); ++fbi) {
-      EMetaBlob::fullbit const &fb = *(*fbi);
-
+    for (const auto& fb : lump.get_dfull()) {
       // Get a key like "foobar_head"
       std::string key;
       dentry_key_t dn_key(fb.dnlast, fb.dn.c_str());
@@ -723,12 +770,7 @@ int JournalTool::scavenge_dentries(
       read_keys.insert(key);
     }
 
-    list<EMetaBlob::remotebit> const &rb_list =
-      lump.get_dremote();
-    for (list<EMetaBlob::remotebit>::const_iterator rbi =
-        rb_list.begin(); rbi != rb_list.end(); ++rbi) {
-      EMetaBlob::remotebit const &rb = *rbi;
-
+    for(const auto& rb : lump.get_dremote()) {
       // Get a key like "foobar_head"
       std::string key;
       dentry_key_t dn_key(rb.dnlast, rb.dn.c_str());
@@ -736,6 +778,14 @@ int JournalTool::scavenge_dentries(
       read_keys.insert(key);
     }
 
+    for (const auto& nb : lump.get_dnull()) {
+      // Get a key like "foobar_head"
+      std::string key;
+      dentry_key_t dn_key(nb.dnlast, nb.dn.c_str());
+      dn_key.encode(key);
+      read_keys.insert(key);
+    }
+
     // Perform bulk read of existing dentries
     std::map<std::string, bufferlist> read_vals;
     r = input.omap_get_vals_by_keys(frag_oid.name, read_keys, &read_vals);
@@ -750,10 +800,7 @@ int JournalTool::scavenge_dentries(
 
     // Compose list of dentries we will write back
     std::map<std::string, bufferlist> write_vals;
-    for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator fbi =
-        fb_list.begin(); fbi != fb_list.end(); ++fbi) {
-      EMetaBlob::fullbit const &fb = *(*fbi);
-
+    for (const auto& fb : lump.get_dfull()) {
       // Get a key like "foobar_head"
       std::string key;
       dentry_key_t dn_key(fb.dnlast, fb.dn.c_str());
@@ -770,30 +817,40 @@ int JournalTool::scavenge_dentries(
         dout(4) << "dentry exists, checking versions..." << dendl;
         bufferlist &old_dentry = read_vals[key];
         // Decode dentry+inode
-        bufferlist::iterator q = old_dentry.begin();
+        auto q = old_dentry.cbegin();
 
         snapid_t dnfirst;
-        ::decode(dnfirst, q);
+        decode(dnfirst, q);
         char dentry_type;
-        ::decode(dentry_type, q);
+        decode(dentry_type, q);
 
-        if (dentry_type == 'L') {
+        if (dentry_type == 'L' || dentry_type == 'l') {
           // leave write_dentry false, we have no version to
           // compare with in a hardlink, so it's not safe to
           // squash over it with what's in this fullbit
           dout(10) << "Existing remote inode in slot to be (maybe) written "
                << "by a full inode from the journal dn '" << fb.dn.c_str()
-               << "' with lump fnode version " << lump.fnode.version
+               << "' with lump fnode version " << lump.fnode->version
                << "vs existing fnode version " << old_fnode_version << dendl;
-          write_dentry = old_fnode_version < lump.fnode.version;
-        } else if (dentry_type == 'I') {
+          write_dentry = old_fnode_version < lump.fnode->version;
+        } else if (dentry_type == 'I' || dentry_type == 'i') {
           // Read out inode version to compare with backing store
           InodeStore inode;
-          inode.decode_bare(q);
+          if (dentry_type == 'i') {
+            mempool::mds_co::string alternate_name;
+
+            DECODE_START(2, q);
+            if (struct_v >= 2)
+              decode(alternate_name, q);
+            inode.decode(q);
+            DECODE_FINISH(q);
+         } else {
+            inode.decode_bare(q);
+         }
           dout(4) << "decoded embedded inode version "
-            << inode.inode.version << " vs fullbit version "
-            << fb.inode.version << dendl;
-          if (inode.inode.version < fb.inode.version) {
+            << inode.inode->version << " vs fullbit version "
+            << fb.inode->version << dendl;
+          if (inode.inode->version < fb.inode->version) {
             write_dentry = true;
           }
         } else {
@@ -809,20 +866,17 @@ int JournalTool::scavenge_dentries(
 
         // Compose: Dentry format is dnfirst, [I|L], InodeStore(bare=true)
         bufferlist dentry_bl;
-        ::encode(fb.dnfirst, dentry_bl);
-        ::encode('I', dentry_bl);
+        encode(fb.dnfirst, dentry_bl);
+        encode('I', dentry_bl);
         encode_fullbit_as_inode(fb, true, &dentry_bl);
 
         // Record for writing to RADOS
         write_vals[key] = dentry_bl;
-        consumed_inos->insert(fb.inode.ino);
+        consumed_inos->insert(fb.inode->ino);
       }
     }
 
-    for (list<EMetaBlob::remotebit>::const_iterator rbi =
-        rb_list.begin(); rbi != rb_list.end(); ++rbi) {
-      EMetaBlob::remotebit const &rb = *rbi;
-
+    for(const auto& rb : lump.get_dremote()) {
       // Get a key like "foobar_head"
       std::string key;
       dentry_key_t dn_key(rb.dnlast, rb.dn.c_str());
@@ -839,25 +893,25 @@ int JournalTool::scavenge_dentries(
         dout(4) << "dentry exists, checking versions..." << dendl;
         bufferlist &old_dentry = read_vals[key];
         // Decode dentry+inode
-        bufferlist::iterator q = old_dentry.begin();
+        auto q = old_dentry.cbegin();
 
         snapid_t dnfirst;
-        ::decode(dnfirst, q);
+        decode(dnfirst, q);
         char dentry_type;
-        ::decode(dentry_type, q);
+        decode(dentry_type, q);
 
-        if (dentry_type == 'L') {
+        if (dentry_type == 'L' || dentry_type == 'l') {
           dout(10) << "Existing hardlink inode in slot to be (maybe) written "
                << "by a remote inode from the journal dn '" << rb.dn.c_str()
-               << "' with lump fnode version " << lump.fnode.version
+               << "' with lump fnode version " << lump.fnode->version
                << "vs existing fnode version " << old_fnode_version << dendl;
-          write_dentry = old_fnode_version < lump.fnode.version;
-        } else if (dentry_type == 'I') {
+          write_dentry = old_fnode_version < lump.fnode->version;
+        } else if (dentry_type == 'I' || dentry_type == 'i') {
           dout(10) << "Existing full inode in slot to be (maybe) written "
                << "by a remote inode from the journal dn '" << rb.dn.c_str()
-               << "' with lump fnode version " << lump.fnode.version
+               << "' with lump fnode version " << lump.fnode->version
                << "vs existing fnode version " << old_fnode_version << dendl;
-          write_dentry = old_fnode_version < lump.fnode.version;
+          write_dentry = old_fnode_version < lump.fnode->version;
         } else {
           dout(4) << "corrupt dentry in backing store, overwriting from "
             "journal" << dendl;
@@ -871,10 +925,10 @@ int JournalTool::scavenge_dentries(
 
         // Compose: Dentry format is dnfirst, [I|L], InodeStore(bare=true)
         bufferlist dentry_bl;
-        ::encode(rb.dnfirst, dentry_bl);
-        ::encode('L', dentry_bl);
-        ::encode(rb.ino, dentry_bl);
-        ::encode(rb.d_type, dentry_bl);
+        encode(rb.dnfirst, dentry_bl);
+        encode('L', dentry_bl);
+        encode(rb.ino, dentry_bl);
+        encode(rb.d_type, dentry_bl);
 
         // Record for writing to RADOS
         write_vals[key] = dentry_bl;
@@ -882,6 +936,48 @@ int JournalTool::scavenge_dentries(
       }
     }
 
+    std::set<std::string> null_vals;
+    for (const auto& nb : lump.get_dnull()) {
+      std::string key;
+      dentry_key_t dn_key(nb.dnlast, nb.dn.c_str());
+      dn_key.encode(key);
+
+      dout(4) << "inspecting nullbit " << frag_oid.name << "/" << nb.dn
+       << dendl;
+
+      auto it = read_vals.find(key);
+      if (it != read_vals.end()) {
+       dout(4) << "dentry exists, will remove" << dendl;
+
+       auto q = it->second.cbegin();
+       snapid_t dnfirst;
+       decode(dnfirst, q);
+       char dentry_type;
+       decode(dentry_type, q);
+
+       bool remove_dentry = false;
+       if (dentry_type == 'L' || dentry_type == 'l') {
+         dout(10) << "Existing hardlink inode in slot to be (maybe) removed "
+           << "by null journal dn '" << nb.dn.c_str()
+           << "' with lump fnode version " << lump.fnode->version
+           << "vs existing fnode version " << old_fnode_version << dendl;
+         remove_dentry = old_fnode_version < lump.fnode->version;
+       } else if (dentry_type == 'I' || dentry_type == 'i') {
+         dout(10) << "Existing full inode in slot to be (maybe) removed "
+           << "by null journal dn '" << nb.dn.c_str()
+           << "' with lump fnode version " << lump.fnode->version
+           << "vs existing fnode version " << old_fnode_version << dendl;
+         remove_dentry = old_fnode_version < lump.fnode->version;
+       } else {
+         dout(4) << "corrupt dentry in backing store, will remove" << dendl;
+         remove_dentry = true;
+       }
+
+       if (remove_dentry)
+         null_vals.insert(key);
+      }
+    }
+
     // Write back any new/changed dentries
     if (!write_vals.empty()) {
       r = output.omap_set(frag_oid.name, write_vals);
@@ -891,6 +987,16 @@ int JournalTool::scavenge_dentries(
        return r;
       }
     }
+
+    // remove any null dentries
+    if (!null_vals.empty()) {
+      r = output.omap_rm_keys(frag_oid.name, null_vals);
+      if (r != 0) {
+       derr << "error removing dentries from " << frag_oid.name
+         << ": " << cpp_strerror(r) << dendl;
+       return r;
+      }
+    }
   }
 
   /* Now that we've looked at the dirlumps, we finally pay attention to
@@ -899,10 +1005,8 @@ int JournalTool::scavenge_dentries(
    * important because clients use them to infer completeness
    * of directories
    */
-  for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator p =
-       metablob.roots.begin(); p != metablob.roots.end(); ++p) {
-    EMetaBlob::fullbit const &fb = *(*p);
-    inodeno_t ino = fb.inode.ino;
+  for (const auto& fb : metablob.roots) {
+    inodeno_t ino = fb.inode->ino;
     dout(4) << "updating root 0x" << std::hex << ino << std::dec << dendl;
 
     object_t root_oid = InodeStore::get_object_name(ino, frag_t(), ".inode");
@@ -919,14 +1023,14 @@ int JournalTool::scavenge_dentries(
       InodeStore old_inode;
       dout(4) << "root exists, will modify (" << old_root_ino_bl.length()
         << ")" << dendl;
-      bufferlist::iterator inode_bl_iter = old_root_ino_bl.begin(); 
+      auto inode_bl_iter = old_root_ino_bl.cbegin(); 
       std::string magic;
-      ::decode(magic, inode_bl_iter);
+      decode(magic, inode_bl_iter);
       if (magic == CEPH_FS_ONDISK_MAGIC) {
         dout(4) << "magic ok" << dendl;
         old_inode.decode(inode_bl_iter);
 
-        if (old_inode.inode.version < fb.inode.version) {
+        if (old_inode.inode->version < fb.inode->version) {
           write_root_ino = true;
         }
       } else {
@@ -941,11 +1045,11 @@ int JournalTool::scavenge_dentries(
 
     if (write_root_ino && !dry_run) {
       dout(4) << "writing root ino " << root_oid.name
-               << " version " << fb.inode.version << dendl;
+               << " version " << fb.inode->version << dendl;
 
       // Compose: root ino format is magic,InodeStore(bare=false)
       bufferlist new_root_ino_bl;
-      ::encode(std::string(CEPH_FS_ONDISK_MAGIC), new_root_ino_bl);
+      encode(std::string(CEPH_FS_ONDISK_MAGIC), new_root_ino_bl);
       encode_fullbit_as_inode(fb, false, &new_root_ino_bl);
 
       // Write to RADOS
@@ -962,176 +1066,6 @@ int JournalTool::scavenge_dentries(
 }
 
 
-int JournalTool::replay_offline(EMetaBlob const &metablob, bool const dry_run)
-{
-  int r;
-
-  // Replay roots
-  for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator p = metablob.roots.begin(); p != metablob.roots.end(); ++p) {
-    EMetaBlob::fullbit const &fb = *(*p);
-    inodeno_t ino = fb.inode.ino;
-    dout(4) << "updating root 0x" << std::hex << ino << std::dec << dendl;
-
-    object_t root_oid = InodeStore::get_object_name(ino, frag_t(), ".inode");
-    dout(4) << "object id " << root_oid.name << dendl;
-
-    bufferlist inode_bl;
-    r = input.read(root_oid.name, inode_bl, (1<<22), 0);
-    InodeStore inode;
-    if (r == -ENOENT) {
-      dout(4) << "root does not exist, will create" << dendl;
-    } else {
-      dout(4) << "root exists, will modify (" << inode_bl.length() << ")" << dendl;
-      // TODO: add some kind of force option so that we can overwrite bad inodes
-      // from the journal if needed
-      bufferlist::iterator inode_bl_iter = inode_bl.begin(); 
-      std::string magic;
-      ::decode(magic, inode_bl_iter);
-      if (magic == CEPH_FS_ONDISK_MAGIC) {
-        dout(4) << "magic ok" << dendl;
-      } else {
-        dout(4) << "magic bad: '" << magic << "'" << dendl;
-      }
-      inode.decode(inode_bl_iter);
-    }
-
-    // This is a distant cousin of EMetaBlob::fullbit::update_inode, but for use
-    // on an offline InodeStore instance.  It's way simpler, because we are just
-    // uncritically hauling the data between structs.
-    inode.inode = fb.inode;
-    inode.xattrs = fb.xattrs;
-    inode.dirfragtree = fb.dirfragtree;
-    inode.snap_blob = fb.snapbl;
-    inode.symlink = fb.symlink;
-    inode.old_inodes = fb.old_inodes;
-
-    inode_bl.clear();
-    std::string magic = CEPH_FS_ONDISK_MAGIC;
-    ::encode(magic, inode_bl);
-    inode.encode(inode_bl, CEPH_FEATURES_SUPPORTED_DEFAULT);
-
-    if (!dry_run) {
-      r = output.write_full(root_oid.name, inode_bl);
-      assert(r == 0);
-    }
-  }
-
-
-
-  // TODO: respect metablob.renamed_dirino (cues us as to which dirlumps
-  // indicate renamed directories)
-
-  // Replay fullbits (dentry+inode)
-  for (list<dirfrag_t>::const_iterator lp = metablob.lump_order.begin(); lp != metablob.lump_order.end(); ++lp) {
-    dirfrag_t const &frag = *lp;
-    EMetaBlob::dirlump const &lump = metablob.lump_map.find(frag)->second;
-    lump._decode_bits();
-    object_t frag_object_id = InodeStore::get_object_name(frag.ino, frag.frag, "");
-
-    // Check for presence of dirfrag object
-    uint64_t psize;
-    time_t pmtime;
-    r = input.stat(frag_object_id.name, &psize, &pmtime);
-    if (r == -ENOENT) {
-      dout(4) << "Frag object " << frag_object_id.name << " did not exist, will create" << dendl;
-    } else if (r != 0) {
-      // FIXME: what else can happen here?  can I deal?
-      assert(r == 0);
-    } else {
-      dout(4) << "Frag object " << frag_object_id.name << " exists, will modify" << dendl;
-    }
-
-    // Write fnode to omap header of dirfrag object
-    bufferlist fnode_bl;
-    lump.fnode.encode(fnode_bl);
-    if (!dry_run) {
-      r = output.omap_set_header(frag_object_id.name, fnode_bl);
-      if (r != 0) {
-        derr << "Failed to write fnode for frag object " << frag_object_id.name << dendl;
-        return r;
-      }
-    }
-
-    // Try to get the existing dentry
-    list<ceph::shared_ptr<EMetaBlob::fullbit> > const &fb_list = lump.get_dfull();
-    for (list<ceph::shared_ptr<EMetaBlob::fullbit> >::const_iterator fbi = fb_list.begin(); fbi != fb_list.end(); ++fbi) {
-      EMetaBlob::fullbit const &fb = *(*fbi);
-
-      // Get a key like "foobar_head"
-      std::string key;
-      dentry_key_t dn_key(fb.dnlast, fb.dn.c_str());
-      dn_key.encode(key);
-
-      // See if the dentry is present
-      std::set<std::string> keys;
-      keys.insert(key);
-      std::map<std::string, bufferlist> vals;
-      r = input.omap_get_vals_by_keys(frag_object_id.name, keys, &vals);
-      assert (r == 0);  // I assume success because I checked object existed and absence of 
-                        // dentry gives me empty map instead of failure
-                        // FIXME handle failures so we can replay other events
-                        // if this one is causing some unexpected issue
-    
-      if (vals.find(key) == vals.end()) {
-        dout(4) << "dentry " << key << " does not exist, will be created" << dendl;
-      } else {
-        dout(4) << "dentry " << key << " existed already" << dendl;
-        // TODO: read out existing dentry before adding new one so that
-        // we can print a bit of info about what we're overwriting
-      }
-    
-      bufferlist dentry_bl;
-      ::encode(fb.dnfirst, dentry_bl);
-      ::encode('I', dentry_bl);
-
-      InodeStore inode;
-      inode.inode = fb.inode;
-      inode.xattrs = fb.xattrs;
-      inode.dirfragtree = fb.dirfragtree;
-      inode.snap_blob = fb.snapbl;
-      inode.symlink = fb.symlink;
-      inode.old_inodes = fb.old_inodes;
-      inode.encode_bare(dentry_bl, CEPH_FEATURES_SUPPORTED_DEFAULT);
-      
-      vals[key] = dentry_bl;
-      if (!dry_run) {
-        r = output.omap_set(frag_object_id.name, vals);
-        assert(r == 0);  // FIXME handle failures
-      }
-    }
-
-    // Replay nullbits: removal of dentries
-    list<EMetaBlob::nullbit> const &nb_list = lump.get_dnull();
-    for (list<EMetaBlob::nullbit>::const_iterator
-       iter = nb_list.begin(); iter != nb_list.end(); ++iter) {
-      EMetaBlob::nullbit const &nb = *iter;
-
-      // Get a key like "foobar_head"
-      std::string key;
-      dentry_key_t dn_key(nb.dnlast, nb.dn.c_str());
-      dn_key.encode(key);
-
-      // Remove it from the dirfrag
-      dout(4) << "Removing dentry " << key << dendl;
-      std::set<std::string> keys;
-      keys.insert(key);
-      if (!dry_run) {
-        r = output.omap_rm_keys(frag_object_id.name, keys);
-        assert(r == 0);
-      }
-    }
-  }
-
-  for (std::vector<inodeno_t>::const_iterator i = metablob.destroyed_inodes.begin();
-       i != metablob.destroyed_inodes.end(); ++i) {
-    dout(4) << "Destroyed inode: " << *i << dendl;
-    // TODO: if it was a dir, then delete its dirfrag objects
-  }
-
-  return 0;
-}
-
-
 /**
  * Erase a region of the log by overwriting it with ENoOp
  *
@@ -1142,8 +1076,13 @@ int JournalTool::erase_region(JournalScanner const &js, uint64_t const pos, uint
   // of an ENoOp, and our trailing start ptr.  Calculate how much padding
   // is needed inside the ENoOp to make up the difference.
   bufferlist tmp;
-  ENoOp enoop(0);
-  enoop.encode_with_header(tmp, CEPH_FEATURES_SUPPORTED_DEFAULT);
+  if (type == "mdlog") {
+    ENoOp enoop(0);
+    enoop.encode_with_header(tmp, CEPH_FEATURES_SUPPORTED_DEFAULT);
+  } else if (type == "purge_queue") {
+    PurgeItem pi;
+    pi.encode(tmp);
+  }
 
   dout(4) << "erase_region " << pos << " len=" << length << dendl;
 
@@ -1156,22 +1095,27 @@ int JournalTool::erase_region(JournalScanner const &js, uint64_t const pos, uint
     return -EINVAL;
   }
 
-  // Serialize an ENoOp with the correct amount of padding
-  enoop = ENoOp(padding);
   bufferlist entry;
-  enoop.encode_with_header(entry, CEPH_FEATURES_SUPPORTED_DEFAULT);
+  if (type == "mdlog") {
+    // Serialize an ENoOp with the correct amount of padding
+    ENoOp enoop(padding);
+    enoop.encode_with_header(entry, CEPH_FEATURES_SUPPORTED_DEFAULT);
+  } else if (type == "purge_queue") {
+    PurgeItem pi;
+    pi.pad_size = padding;
+    pi.encode(entry);
+  }
   JournalStream stream(JOURNAL_FORMAT_RESILIENT);
-
   // Serialize region of log stream
   bufferlist log_data;
   stream.write(entry, &log_data, pos);
 
   dout(4) << "erase_region data length " << log_data.length() << dendl;
-  assert(log_data.length() == length);
+  ceph_assert(log_data.length() == length);
 
   // Write log stream region to RADOS
   // FIXME: get object size somewhere common to scan_events
-  uint32_t object_size = g_conf->mds_log_segment_size;
+  uint32_t object_size = g_conf()->mds_log_segment_size;
   if (object_size == 0) {
     // Default layout object size
     object_size = file_layout_t::get_default().object_size;
@@ -1219,7 +1163,7 @@ void JournalTool::encode_fullbit_as_inode(
   const bool bare,
   bufferlist *out_bl)
 {
-  assert(out_bl != NULL);
+  ceph_assert(out_bl != NULL);
 
   // Compose InodeStore
   InodeStore new_inode;
@@ -1282,8 +1226,8 @@ int JournalTool::consume_inos(const std::set<inodeno_t> &inos)
 
     // Deserialize InoTable
     version_t inotable_ver;
-    bufferlist::iterator q = inotable_bl.begin();
-    ::decode(inotable_ver, q);
+    auto q = inotable_bl.cbegin();
+    decode(inotable_ver, q);
     InoTable ino_table(NULL);
     ino_table.decode(q);
     
@@ -1305,7 +1249,7 @@ int JournalTool::consume_inos(const std::set<inodeno_t> &inos)
       inotable_ver += 1;
       dout(4) << "writing modified inotable version " << inotable_ver << dendl;
       bufferlist inotable_new_bl;
-      ::encode(inotable_ver, inotable_new_bl);
+      encode(inotable_ver, inotable_new_bl);
       ino_table.encode_state(inotable_new_bl);
       int write_r = output.write_full(inotable_oid.name, inotable_new_bl);
       if (write_r != 0) {