]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PGLog.h
update source to 12.2.11
[ceph.git] / ceph / src / osd / PGLog.h
index e4d0fa304ab326489342d6c72f900608ac6705c9..6f85ee1f1a5505cc6585c0d1b55332f810cabbea 100644 (file)
@@ -14,8 +14,7 @@
  * Foundation.  See file COPYING.
  * 
  */
-#ifndef CEPH_PG_LOG_H
-#define CEPH_PG_LOG_H
+#pragma once
 
 // re-include our assert to clobber boost's
 #include "include/assert.h"
@@ -27,7 +26,11 @@ using namespace std;
 #define PGLOG_INDEXED_OBJECTS          (1 << 0)
 #define PGLOG_INDEXED_CALLER_OPS       (1 << 1)
 #define PGLOG_INDEXED_EXTRA_CALLER_OPS (1 << 2)
-#define PGLOG_INDEXED_ALL              (PGLOG_INDEXED_OBJECTS | PGLOG_INDEXED_CALLER_OPS | PGLOG_INDEXED_EXTRA_CALLER_OPS)
+#define PGLOG_INDEXED_DUPS             (1 << 3)
+#define PGLOG_INDEXED_ALL              (PGLOG_INDEXED_OBJECTS | \
+                                       PGLOG_INDEXED_CALLER_OPS | \
+                                       PGLOG_INDEXED_EXTRA_CALLER_OPS | \
+                                       PGLOG_INDEXED_DUPS)
 
 class CephContext;
 
@@ -82,6 +85,7 @@ public:
     mutable ceph::unordered_map<hobject_t,pg_log_entry_t*> objects;  // ptrs into log.  be careful!
     mutable ceph::unordered_map<osd_reqid_t,pg_log_entry_t*> caller_ops;
     mutable ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*> extra_caller_ops;
+    mutable ceph::unordered_map<osd_reqid_t,pg_log_dup_t*> dup_index;
 
     // recovery pointers
     list<pg_log_entry_t>::iterator complete_to; // not inclusive of referenced item
@@ -97,7 +101,7 @@ public:
      * It's a reverse_iterator because rend() is a natural representation for
      * tail, and rbegin() works nicely for head.
      */
-    mempool::osd::list<pg_log_entry_t>::reverse_iterator
+    mempool::osd_pglog::list<pg_log_entry_t>::reverse_iterator
       rollback_info_trimmed_to_riter;
 
     template <typename F>
@@ -132,7 +136,7 @@ public:
       last_requested(0),
       indexed_data(0),
       rollback_info_trimmed_to_riter(log.rbegin())
-      {}
+    }
 
     template <typename... Args>
     IndexedLog(Args&&... args) :
@@ -140,7 +144,8 @@ public:
       complete_to(log.end()),
       last_requested(0),
       indexed_data(0),
-      rollback_info_trimmed_to_riter(log.rbegin()) {
+      rollback_info_trimmed_to_riter(log.rbegin())
+    {
       reset_rollback_info_trimmed_to_riter();
       index();
     }
@@ -150,10 +155,12 @@ public:
       complete_to(log.end()),
       last_requested(rhs.last_requested),
       indexed_data(0),
-      rollback_info_trimmed_to_riter(log.rbegin()) {
+      rollback_info_trimmed_to_riter(log.rbegin())
+    {
       reset_rollback_info_trimmed_to_riter();
       index(rhs.indexed_data);
     }
+
     IndexedLog &operator=(const IndexedLog &rhs) {
       this->~IndexedLog();
       new (this) IndexedLog(rhs);
@@ -179,7 +186,7 @@ public:
       advance_can_rollback_to(head, [&](const pg_log_entry_t &entry) {});
     }
 
-    mempool::osd::list<pg_log_entry_t> rewind_from_head(eversion_t newhead) {
+    mempool::osd_pglog::list<pg_log_entry_t> rewind_from_head(eversion_t newhead) {
       auto divergent = pg_log_t::rewind_from_head(newhead);
       index();
       reset_rollback_info_trimmed_to_riter();
@@ -261,7 +268,8 @@ public:
       const osd_reqid_t &r,
       eversion_t *version,
       version_t *user_version,
-      int *return_code) const {
+      int *return_code) const
+    {
       assert(version);
       assert(user_version);
       assert(return_code);
@@ -284,8 +292,7 @@ public:
       }
       p = extra_caller_ops.find(r);
       if (p != extra_caller_ops.end()) {
-       for (vector<pair<osd_reqid_t, version_t> >::const_iterator i =
-              p->second->extra_reqids.begin();
+       for (auto i = p->second->extra_reqids.begin();
             i != p->second->extra_reqids.end();
             ++i) {
          if (i->first == r) {
@@ -297,12 +304,24 @@ public:
        }
        assert(0 == "in extra_caller_ops but not extra_reqids");
       }
+
+      if (!(indexed_data & PGLOG_INDEXED_DUPS)) {
+        index_dups();
+      }
+      auto q = dup_index.find(r);
+      if (q != dup_index.end()) {
+       *version = q->second->version;
+       *user_version = q->second->user_version;
+       *return_code = q->second->return_code;
+       return true;
+      }
+
       return false;
     }
 
     /// get a (bounded) list of recent reqids for the given object
     void get_object_reqids(const hobject_t& oid, unsigned max,
-                          vector<pair<osd_reqid_t, version_t> > *pls) const {
+                          mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *pls) const {
        // make sure object is present at least once before we do an
        // O(n) search.
       if (!(indexed_data & PGLOG_INDEXED_OBJECTS)) {
@@ -326,41 +345,58 @@ public:
        }
       }
     }
-    
+
     void index(__u16 to_index = PGLOG_INDEXED_ALL) const {
+      // if to_index is 0, no need to run any of this code, especially
+      // loop below; this can happen with copy constructor for
+      // IndexedLog (and indirectly through assignment operator)
+      if (!to_index) return;
+
       if (to_index & PGLOG_INDEXED_OBJECTS)
        objects.clear();
       if (to_index & PGLOG_INDEXED_CALLER_OPS)
        caller_ops.clear();
       if (to_index & PGLOG_INDEXED_EXTRA_CALLER_OPS)
        extra_caller_ops.clear();
+      if (to_index & PGLOG_INDEXED_DUPS) {
+       dup_index.clear();
+       for (auto& i : dups) {
+         dup_index[i.reqid] = const_cast<pg_log_dup_t*>(&i);
+       }
+      }
 
-      for (list<pg_log_entry_t>::const_iterator i = log.begin();
-          i != log.end();
-          ++i) {
-       if (to_index & PGLOG_INDEXED_OBJECTS) {
-         if (i->object_is_indexed()) {
-           objects[i->soid] = const_cast<pg_log_entry_t*>(&(*i));
+      constexpr __u16 any_log_entry_index =
+       PGLOG_INDEXED_OBJECTS |
+       PGLOG_INDEXED_CALLER_OPS |
+       PGLOG_INDEXED_EXTRA_CALLER_OPS;
+
+      if (to_index & any_log_entry_index) {
+       for (list<pg_log_entry_t>::const_iterator i = log.begin();
+            i != log.end();
+            ++i) {
+         if (to_index & PGLOG_INDEXED_OBJECTS) {
+           if (i->object_is_indexed()) {
+             objects[i->soid] = const_cast<pg_log_entry_t*>(&(*i));
+           }
          }
-       }
 
-       if (to_index & PGLOG_INDEXED_CALLER_OPS) {
-         if (i->reqid_is_indexed()) {
-           caller_ops[i->reqid] = const_cast<pg_log_entry_t*>(&(*i));
+         if (to_index & PGLOG_INDEXED_CALLER_OPS) {
+           if (i->reqid_is_indexed()) {
+             caller_ops[i->reqid] = const_cast<pg_log_entry_t*>(&(*i));
+           }
          }
-       }
-        
-       if (to_index & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
-         for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
-                i->extra_reqids.begin();
-              j != i->extra_reqids.end();
-              ++j) {
-            extra_caller_ops.insert(
-             make_pair(j->first, const_cast<pg_log_entry_t*>(&(*i))));
+
+         if (to_index & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
+           for (auto j = i->extra_reqids.begin();
+                j != i->extra_reqids.end();
+                ++j) {
+             extra_caller_ops.insert(
+               make_pair(j->first, const_cast<pg_log_entry_t*>(&(*i))));
+           }
          }
        }
       }
-        
+
       indexed_data |= to_index;
     }
 
@@ -376,6 +412,10 @@ public:
       index(PGLOG_INDEXED_EXTRA_CALLER_OPS);
     }
 
+    void index_dups() const {
+      index(PGLOG_INDEXED_DUPS);
+    }
+
     void index(pg_log_entry_t& e) {
       if ((indexed_data & PGLOG_INDEXED_OBJECTS) && e.object_is_indexed()) {
         if (objects.count(e.soid) == 0 ||
@@ -389,21 +429,23 @@ public:
         }
       }
       if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
-        for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
-              e.extra_reqids.begin();
+        for (auto j = e.extra_reqids.begin();
             j != e.extra_reqids.end();
             ++j) {
          extra_caller_ops.insert(make_pair(j->first, &e));
         }
       }
     }
+
     void unindex() {
       objects.clear();
       caller_ops.clear();
       extra_caller_ops.clear();
+      dup_index.clear();
       indexed_data = 0;
     }
-    void unindex(pg_log_entry_t& e) {
+
+    void unindex(const pg_log_entry_t& e) {
       // NOTE: this only works if we remove from the _tail_ of the log!
       if (indexed_data & PGLOG_INDEXED_OBJECTS) {
         if (objects.count(e.soid) && objects[e.soid]->version == e.version)
@@ -413,12 +455,11 @@ public:
         if (indexed_data & PGLOG_INDEXED_CALLER_OPS) {
          // divergent merge_log indexes new before unindexing old
           if (caller_ops.count(e.reqid) && caller_ops[e.reqid] == &e)
-            caller_ops.erase(e.reqid);    
+            caller_ops.erase(e.reqid);
         }
       }
       if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
-        for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
-              e.extra_reqids.begin();
+        for (auto j = e.extra_reqids.begin();
              j != e.extra_reqids.end();
              ++j) {
           for (ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*>::iterator k =
@@ -434,12 +475,30 @@ public:
       }
     }
 
+    void index(pg_log_dup_t& e) {
+      if (indexed_data & PGLOG_INDEXED_DUPS) {
+       dup_index[e.reqid] = &e;
+      }
+    }
+
+    void unindex(const pg_log_dup_t& e) {
+      if (indexed_data & PGLOG_INDEXED_DUPS) {
+       auto i = dup_index.find(e.reqid);
+       if (i != dup_index.end()) {
+         dup_index.erase(i);
+       }
+      }
+    }
+
     // actors
     void add(const pg_log_entry_t& e, bool applied = true) {
       if (!applied) {
        assert(get_can_rollback_to() == head);
       }
 
+      // make sure our buffers don't pin bigger buffers
+      e.mod_desc.trim_bl();
+
       // add to log
       log.push_back(e);
 
@@ -460,10 +519,9 @@ public:
          caller_ops[e.reqid] = &(log.back());
         }
       }
-      
+
       if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
-        for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
-              e.extra_reqids.begin();
+        for (auto j = e.extra_reqids.begin();
             j != e.extra_reqids.end();
             ++j) {
          extra_caller_ops.insert(make_pair(j->first, &(log.back())));
@@ -473,15 +531,17 @@ public:
       if (!applied) {
        skip_can_rollback_to_to_head();
       }
-    }
+    } // add
 
     void trim(
       CephContext* cct,
       eversion_t s,
-      set<eversion_t> *trimmed);
+      set<eversion_t> *trimmed,
+      set<string>* trimmed_dups,
+      eversion_t *write_from_dups);
 
     ostream& print(ostream& out) const;
-  };
+  }; // IndexedLog
 
 
 protected:
@@ -494,11 +554,16 @@ protected:
   eversion_t dirty_from;       ///< must clear/writeout all keys >= dirty_from
   eversion_t writeout_from;    ///< must writout keys >= writeout_from
   set<eversion_t> trimmed;     ///< must clear keys in trimmed
+  eversion_t dirty_to_dups;    ///< must clear/writeout all dups <= dirty_to_dups
+  eversion_t dirty_from_dups;  ///< must clear/writeout all dups >= dirty_from_dups
+  eversion_t write_from_dups;  ///< must write keys >= write_from_dups
+  set<string> trimmed_dups;    ///< must clear keys in trimmed_dups
   CephContext *cct;
   bool pg_log_debug;
   /// Log is clean on [dirty_to, dirty_from)
   bool touched_log;
   bool clear_divergent_priors;
+  bool rebuilt_missing_with_deletes = false;
 
   void mark_dirty_to(eversion_t to) {
     if (to > dirty_to)
@@ -512,6 +577,14 @@ protected:
     if (from < writeout_from)
       writeout_from = from;
   }
+  void mark_dirty_to_dups(eversion_t to) {
+    if (to > dirty_to_dups)
+      dirty_to_dups = to;
+  }
+  void mark_dirty_from_dups(eversion_t from) {
+    if (from < dirty_from_dups)
+      dirty_from_dups = from;
+  }
 public:
   bool is_dirty() const {
     return !touched_log ||
@@ -519,13 +592,23 @@ public:
       (dirty_from != eversion_t::max()) ||
       (writeout_from != eversion_t::max()) ||
       !(trimmed.empty()) ||
-      !missing.is_clean();
+      !missing.is_clean() ||
+      !(trimmed_dups.empty()) ||
+      (dirty_to_dups != eversion_t()) ||
+      (dirty_from_dups != eversion_t::max()) ||
+      (write_from_dups != eversion_t::max()) ||
+      rebuilt_missing_with_deletes;
   }
   void mark_log_for_rewrite() {
     mark_dirty_to(eversion_t::max());
     mark_dirty_from(eversion_t());
+    mark_dirty_to_dups(eversion_t::max());
+    mark_dirty_from_dups(eversion_t());
     touched_log = false;
   }
+  bool get_rebuilt_missing_with_deletes() const {
+    return rebuilt_missing_with_deletes;
+  }
 protected:
 
   /// DEBUG
@@ -551,21 +634,28 @@ protected:
     dirty_from = eversion_t::max();
     touched_log = true;
     trimmed.clear();
+    trimmed_dups.clear();
     writeout_from = eversion_t::max();
     check();
     missing.flush();
+    dirty_to_dups = eversion_t();
+    dirty_from_dups = eversion_t::max();
+    write_from_dups = eversion_t::max();
   }
 public:
+
   // cppcheck-suppress noExplicitConstructor
-  PGLog(CephContext *cct, DoutPrefixProvider *dpp = 0) :
+  PGLog(CephContext *cct, DoutPrefixProvider *dpp = nullptr) :
     prefix_provider(dpp),
     dirty_from(eversion_t::max()),
     writeout_from(eversion_t::max()),
+    dirty_from_dups(eversion_t::max()),
+    write_from_dups(eversion_t::max()),
     cct(cct),
     pg_log_debug(!(cct && !(cct->_conf->osd_debug_pg_log_writeout))),
     touched_log(false),
-    clear_divergent_priors(false) {}
-
+    clear_divergent_priors(false)
+  { }
 
   void reset_backfill();
 
@@ -578,16 +668,8 @@ public:
     missing.revise_have(oid, have);
   }
 
-  void revise_need(hobject_t oid, eversion_t need) {
-    missing.revise_need(oid, need);
-  }
-
   void missing_add(const hobject_t& oid, eversion_t need, eversion_t have) {
-    missing.add(oid, need, have);
-  }
-
-  void missing_add_event(const pg_log_entry_t &e) {
-    missing.add_next_event(e);
+    missing.add(oid, need, have, false);
   }
 
   //////////////////// get or set log ////////////////////
@@ -623,7 +705,8 @@ public:
 
   void trim(
     eversion_t trim_to,
-    pg_info_t &info);
+    pg_info_t &info,
+    bool transaction_applied = true);
 
   void roll_forward_to(
     eversion_t roll_forward_to,
@@ -650,22 +733,27 @@ public:
     log.claim_log_and_clear_rollback_info(o);
     missing.clear();
     mark_dirty_to(eversion_t::max());
+    mark_dirty_to_dups(eversion_t::max());
   }
 
   void split_into(
       pg_t child_pgid,
       unsigned split_bits,
-      PGLog *opg_log) { 
+      PGLog *opg_log) {
     log.split_out_child(child_pgid, split_bits, &opg_log->log);
     missing.split_into(child_pgid, split_bits, &(opg_log->missing));
     opg_log->mark_dirty_to(eversion_t::max());
+    opg_log->mark_dirty_to_dups(eversion_t::max());
     mark_dirty_to(eversion_t::max());
+    mark_dirty_to_dups(eversion_t::max());
+    if (missing.may_include_deletes)
+      opg_log->rebuilt_missing_with_deletes = true;
   }
 
   void recover_got(hobject_t oid, eversion_t v, pg_info_t &info) {
     if (missing.is_missing(oid, v)) {
       missing.got(oid, v);
-      
+
       // raise last_complete?
       if (missing.get_items().empty()) {
        log.complete_to = log.log.end();
@@ -685,21 +773,29 @@ public:
     assert(log.get_can_rollback_to() >= v);
   }
 
-  void activate_not_complete(pg_info_t &info) {
+  void reset_complete_to(pg_info_t *info) {
     log.complete_to = log.log.begin();
-    while (log.complete_to->version <
+    while (!missing.get_items().empty() && log.complete_to->version <
           missing.get_items().at(
             missing.get_rmissing().begin()->second
-            ).need)
+            ).need) {
+      assert(log.complete_to != log.log.end());
       ++log.complete_to;
+    }
     assert(log.complete_to != log.log.end());
     if (log.complete_to == log.log.begin()) {
-      info.last_complete = eversion_t();
+      if (info)
+       info->last_complete = eversion_t();
     } else {
       --log.complete_to;
-      info.last_complete = log.complete_to->version;
+      if (info)
+       info->last_complete = log.complete_to->version;
       ++log.complete_to;
     }
+  }
+
+  void activate_not_complete(pg_info_t &info) {
+    reset_complete_to(&info);
     log.last_requested = 0;
   }
 
@@ -707,12 +803,16 @@ public:
                        const pg_log_t &olog,
                        pg_missing_t& omissing, pg_shard_t from) const;
 
+  void rebuild_missing_set_with_deletes(ObjectStore *store,
+                                       coll_t pg_coll,
+                                       const pg_info_t &info);
+
 protected:
   static void split_by_object(
-    mempool::osd::list<pg_log_entry_t> &entries,
-    map<hobject_t, mempool::osd::list<pg_log_entry_t>> *out_entries) {
+    mempool::osd_pglog::list<pg_log_entry_t> &entries,
+    map<hobject_t, mempool::osd_pglog::list<pg_log_entry_t>> *out_entries) {
     while (!entries.empty()) {
-      mempool::osd::list<pg_log_entry_t> &out_list = (*out_entries)[entries.front().soid];
+      auto &out_list = (*out_entries)[entries.front().soid];
       out_list.splice(out_list.end(), entries, entries.begin());
     }
   }
@@ -741,15 +841,15 @@ protected:
   static void _merge_object_divergent_entries(
     const IndexedLog &log,               ///< [in] log to merge against
     const hobject_t &hoid,               ///< [in] object we are merging
-    const mempool::osd::list<pg_log_entry_t> &entries, ///< [in] entries for hoid to merge
+    const mempool::osd_pglog::list<pg_log_entry_t> &orig_entries, ///< [in] entries for hoid to merge
     const pg_info_t &info,              ///< [in] info for merging entries
     eversion_t olog_can_rollback_to,     ///< [in] rollback boundary
-    missing_type &missing,              ///< [in,out] missing to adjust, use
+    missing_type &missing,               ///< [in,out] missing to adjust, use
     LogEntryHandler *rollbacker,         ///< [in] optional rollbacker object
     const DoutPrefixProvider *dpp        ///< [in] logging provider
     ) {
     ldpp_dout(dpp, 20) << __func__ << ": merging hoid " << hoid
-                      << " entries: " << entries << dendl;
+                      << " entries: " << orig_entries << dendl;
 
     if (hoid > info.last_backfill) {
       ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid << " after last_backfill"
@@ -758,20 +858,47 @@ protected:
     }
 
     // entries is non-empty
-    assert(!entries.empty());
+    assert(!orig_entries.empty());
+    // strip out and ignore ERROR entries
+    mempool::osd_pglog::list<pg_log_entry_t> entries;
     eversion_t last;
-    for (list<pg_log_entry_t>::const_iterator i = entries.begin();
-        i != entries.end();
+    bool seen_non_error = false;
+    for (list<pg_log_entry_t>::const_iterator i = orig_entries.begin();
+        i != orig_entries.end();
         ++i) {
       // all entries are on hoid
       assert(i->soid == hoid);
-      if (i != entries.begin() && i->prior_version != eversion_t()) {
+      // did not see error entries before this entry and this entry is not error
+      // then this entry is the first non error entry
+      bool first_non_error = ! seen_non_error && ! i->is_error();
+      if (! i->is_error() ) {
+        // see a non error entry now
+        seen_non_error = true;
+      }
+      
+      // No need to check the first entry since it prior_version is unavailable
+      // in the list
+      // No need to check if the prior_version is the minimal version
+      // No need to check the first non-error entry since the leading error
+      // entries are not its prior version
+      if (i != orig_entries.begin() && i->prior_version != eversion_t() &&
+          ! first_non_error) {
        // in increasing order of version
        assert(i->version > last);
-       // prior_version correct
-       assert(i->prior_version == last);
+       // prior_version correct (unless it is an ERROR entry)
+       assert(i->prior_version == last || i->is_error());
+      }
+      if (i->is_error()) {
+       ldpp_dout(dpp, 20) << __func__ << ": ignoring " << *i << dendl;
+      } else {
+       ldpp_dout(dpp, 20) << __func__ << ": keeping " << *i << dendl;
+       entries.push_back(*i);
+       last = i->version;
       }
-      last = i->version;
+    }
+    if (entries.empty()) {
+      ldpp_dout(dpp, 10) << __func__ << ": no non-ERROR entries" << dendl;
+      return;
     }
 
     const eversion_t prior_version = entries.begin()->prior_version;
@@ -797,7 +924,8 @@ protected:
       assert(objiter->second->version > last_divergent_update);
 
       // ensure missing has been updated appropriately
-      if (objiter->second->is_update()) {
+      if (objiter->second->is_update() ||
+         (missing.may_include_deletes && objiter->second->is_delete())) {
        assert(missing.is_missing(hoid) &&
               missing.get_items().at(hoid).need == objiter->second->version);
       } else {
@@ -851,7 +979,7 @@ protected:
        ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
                           << " missing.have is " << missing.get_items().at(hoid).have
                           << ", adjusting" << dendl;
-       missing.revise_need(hoid, prior_version);
+       missing.revise_need(hoid, prior_version, false);
        if (prior_version <= info.log_tail) {
          ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
                             << " prior_version " << prior_version
@@ -909,7 +1037,7 @@ protected:
          rollbacker->trim(i);
        }
       }
-      missing.add(hoid, prior_version, eversion_t());
+      missing.add(hoid, prior_version, eversion_t(), false);
       if (prior_version <= info.log_tail) {
        ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
                           << " prior_version " << prior_version
@@ -923,16 +1051,16 @@ protected:
   template <typename missing_type>
   static void _merge_divergent_entries(
     const IndexedLog &log,               ///< [in] log to merge against
-    mempool::osd::list<pg_log_entry_t> &entries,       ///< [in] entries to merge
+    mempool::osd_pglog::list<pg_log_entry_t> &entries,       ///< [in] entries to merge
     const pg_info_t &oinfo,              ///< [in] info for merging entries
     eversion_t olog_can_rollback_to,     ///< [in] rollback boundary
     missing_type &omissing,              ///< [in,out] missing to adjust, use
     LogEntryHandler *rollbacker,         ///< [in] optional rollbacker object
     const DoutPrefixProvider *dpp        ///< [in] logging provider
     ) {
-    map<hobject_t, mempool::osd::list<pg_log_entry_t> > split;
+    map<hobject_t, mempool::osd_pglog::list<pg_log_entry_t> > split;
     split_by_object(entries, &split);
-    for (map<hobject_t, mempool::osd::list<pg_log_entry_t>>::iterator i = split.begin();
+    for (map<hobject_t, mempool::osd_pglog::list<pg_log_entry_t>>::iterator i = split.begin();
         i != split.end();
         ++i) {
       _merge_object_divergent_entries(
@@ -956,7 +1084,7 @@ protected:
     const pg_log_entry_t& oe,
     const pg_info_t& info,
     LogEntryHandler *rollbacker) {
-    mempool::osd::list<pg_log_entry_t> entries;
+    mempool::osd_pglog::list<pg_log_entry_t> entries;
     entries.push_back(oe);
     _merge_object_divergent_entries(
       log,
@@ -968,7 +1096,11 @@ protected:
       rollbacker,
       this);
   }
+
+  bool merge_log_dups(const pg_log_t& olog);
+
 public:
+
   void rewind_divergent_log(eversion_t newhead,
                             pg_info_t &info,
                             LogEntryHandler *rollbacker,
@@ -985,7 +1117,7 @@ public:
   static bool append_log_entries_update_missing(
     const hobject_t &last_backfill,
     bool last_backfill_bitwise,
-    const mempool::osd::list<pg_log_entry_t> &entries,
+    const mempool::osd_pglog::list<pg_log_entry_t> &entries,
     bool maintain_rollback,
     IndexedLog *log,
     missing_type &missing,
@@ -1005,13 +1137,21 @@ public:
       }
       if (p->soid <= last_backfill &&
          !p->is_error()) {
-       missing.add_next_event(*p);
-       if (rollbacker) {
-         // hack to match PG::mark_all_unfound_lost
-         if (maintain_rollback && p->is_lost_delete() && p->can_rollback()) {
-           rollbacker->try_stash(p->soid, p->version.version);
-         } else if (p->is_delete()) {
-           rollbacker->remove(p->soid);
+       if (missing.may_include_deletes) {
+         missing.add_next_event(*p);
+       } else {
+         if (p->is_delete()) {
+           missing.rm(p->soid, p->version);
+         } else {
+           missing.add_next_event(*p);
+         }
+         if (rollbacker) {
+           // hack to match PG::mark_all_unfound_lost
+           if (maintain_rollback && p->is_lost_delete() && p->can_rollback()) {
+             rollbacker->try_stash(p->soid, p->version.version);
+           } else if (p->is_delete()) {
+             rollbacker->remove(p->soid);
+           }
          }
        }
       }
@@ -1021,7 +1161,7 @@ public:
   bool append_new_log_entries(
     const hobject_t &last_backfill,
     bool last_backfill_bitwise,
-    const mempool::osd::list<pg_log_entry_t> &entries,
+    const mempool::osd_pglog::list<pg_log_entry_t> &entries,
     LogEntryHandler *rollbacker) {
     bool invalidate_stats = append_log_entries_update_missing(
       last_backfill,
@@ -1034,15 +1174,27 @@ public:
       this);
     if (!entries.empty()) {
       mark_writeout_from(entries.begin()->version);
+      if (entries.begin()->is_lost_delete()) {
+       // hack: since lost deletes queue recovery directly, and don't
+       // go through activate_not_complete() again, our complete_to
+       // iterator may still point at log.end(). Reset it to point
+       // before these new lost_delete entries.  This only occurs
+       // when lost+delete entries are initially added, which is
+       // always in a list of solely lost_delete entries, so it is
+       // sufficient to check whether the first entry is a
+       // lost_delete
+       reset_complete_to(nullptr);
+      }
     }
     return invalidate_stats;
   }
 
-  void write_log_and_missing(ObjectStore::Transaction& t,
-                map<string,bufferlist> *km,
-                const coll_t& coll,
-                const ghobject_t &log_oid,
-                bool require_rollback);
+  void write_log_and_missing(
+    ObjectStore::Transaction& t,
+    map<string,bufferlist> *km,
+    const coll_t& coll,
+    const ghobject_t &log_oid,
+    bool require_rollback);
 
   static void write_log_and_missing_wo_missing(
     ObjectStore::Transaction& t,
@@ -1059,7 +1211,8 @@ public:
     const coll_t& coll,
     const ghobject_t &log_oid,
     const pg_missing_tracker_t &missing,
-    bool require_rollback);
+    bool require_rollback,
+    bool *rebuilt_missing_set_with_deletes);
 
   static void _write_log_and_missing_wo_missing(
     ObjectStore::Transaction& t,
@@ -1071,9 +1224,13 @@ public:
     eversion_t dirty_from,
     eversion_t writeout_from,
     const set<eversion_t> &trimmed,
+    const set<string> &trimmed_dups,
     bool dirty_divergent_priors,
     bool touch_log,
     bool require_rollback,
+    eversion_t dirty_to_dups,
+    eversion_t dirty_from_dups,
+    eversion_t write_from_dups,
     set<string> *log_keys_debug
     );
 
@@ -1086,41 +1243,54 @@ public:
     eversion_t dirty_from,
     eversion_t writeout_from,
     const set<eversion_t> &trimmed,
+    const set<string> &trimmed_dups,
     const pg_missing_tracker_t &missing,
     bool touch_log,
     bool require_rollback,
     bool clear_divergent_priors,
+    eversion_t dirty_to_dups,
+    eversion_t dirty_from_dups,
+    eversion_t write_from_dups,
+    bool *rebuilt_missing_with_deletes,
     set<string> *log_keys_debug
     );
 
   void read_log_and_missing(
-    ObjectStore *store, coll_t pg_coll,
-    coll_t log_coll, ghobject_t log_oid,
+    ObjectStore *store,
+    coll_t pg_coll,
+    coll_t log_coll,
+    ghobject_t log_oid,
     const pg_info_t &info,
+    bool force_rebuild_missing,
     ostringstream &oss,
     bool tolerate_divergent_missing_log,
     bool debug_verify_stored_missing = false
     ) {
     return read_log_and_missing(
       store, pg_coll, log_coll, log_oid, info,
-      log, missing, oss,
+      log, missing, force_rebuild_missing, oss,
       tolerate_divergent_missing_log,
       &clear_divergent_priors,
       this,
-      (pg_log_debug ? &log_keys_debug : 0),
+      (pg_log_debug ? &log_keys_debug : nullptr),
       debug_verify_stored_missing);
   }
 
   template <typename missing_type>
-  static void read_log_and_missing(ObjectStore *store, coll_t pg_coll,
-    coll_t log_coll, ghobject_t log_oid,
+  static void read_log_and_missing(
+    ObjectStore *store,
+    coll_t pg_coll,
+    coll_t log_coll,
+    ghobject_t log_oid,
     const pg_info_t &info,
     IndexedLog &log,
-    missing_type &missing, ostringstream &oss,
+    missing_type &missing,
+    bool force_rebuild_missing,
+    ostringstream &oss,
     bool tolerate_divergent_missing_log,
-    bool *clear_divergent_priors = NULL,
-    const DoutPrefixProvider *dpp = NULL,
-    set<string> *log_keys_debug = 0,
+    bool *clear_divergent_priors = nullptr,
+    const DoutPrefixProvider *dpp = nullptr,
+    set<string> *log_keys_debug = nullptr,
     bool debug_verify_stored_missing = false
     ) {
     ldpp_dout(dpp, 20) << "read_log_and_missing coll " << pg_coll
@@ -1137,8 +1307,10 @@ public:
     eversion_t on_disk_rollback_info_trimmed_to = eversion_t();
     ObjectMap::ObjectMapIterator p = store->get_omap_iterator(log_coll, log_oid);
     map<eversion_t, hobject_t> divergent_priors;
-    bool has_divergent_priors = false;
+    bool must_rebuild = force_rebuild_missing;
+    missing.may_include_deletes = false;
     list<pg_log_entry_t> entries;
+    list<pg_log_dup_t> dups;
     if (p) {
       for (p->seek_to_first(); p->valid() ; p->next(false)) {
        // non-log pgmeta_oid keys are prefixed with _; skip those
@@ -1150,16 +1322,30 @@ public:
          ::decode(divergent_priors, bp);
          ldpp_dout(dpp, 20) << "read_log_and_missing " << divergent_priors.size()
                             << " divergent_priors" << dendl;
-         has_divergent_priors = true;
+         must_rebuild = true;
          debug_verify_stored_missing = false;
        } else if (p->key() == "can_rollback_to") {
          ::decode(on_disk_can_rollback_to, bp);
        } else if (p->key() == "rollback_info_trimmed_to") {
          ::decode(on_disk_rollback_info_trimmed_to, bp);
+       } else if (p->key() == "may_include_deletes_in_missing") {
+         missing.may_include_deletes = true;
        } else if (p->key().substr(0, 7) == string("missing")) {
-         pair<hobject_t, pg_missing_item> p;
-         ::decode(p, bp);
-         missing.add(p.first, p.second.need, p.second.have);
+         hobject_t oid;
+         pg_missing_item item;
+         ::decode(oid, bp);
+         ::decode(item, bp);
+         if (item.is_delete()) {
+           assert(missing.may_include_deletes);
+         }
+         missing.add(oid, item.need, item.have, item.is_delete());
+       } else if (p->key().substr(0, 4) == string("dup_")) {
+         pg_log_dup_t dup;
+         ::decode(dup, bp);
+         if (!dups.empty()) {
+           assert(dups.back().version < dup.version);
+         }
+         dups.push_back(dup);
        } else {
          pg_log_entry_t e;
          e.decode_with_checksum(bp);
@@ -1180,14 +1366,16 @@ public:
       info.log_tail,
       on_disk_can_rollback_to,
       on_disk_rollback_info_trimmed_to,
-      std::move(entries));
+      std::move(entries),
+      std::move(dups));
 
-    if (has_divergent_priors || debug_verify_stored_missing) {
+    if (must_rebuild || debug_verify_stored_missing) {
       // build missing
       if (debug_verify_stored_missing || info.last_complete < info.last_update) {
-       ldpp_dout(dpp, 10) << "read_log_and_missing checking for missing items over interval ("
-                          << info.last_complete
-                          << "," << info.last_update << "]" << dendl;
+       ldpp_dout(dpp, 10)
+         << "read_log_and_missing checking for missing items over interval ("
+         << info.last_complete
+         << "," << info.last_update << "]" << dendl;
 
        set<hobject_t> did;
        set<hobject_t> checked;
@@ -1203,7 +1391,8 @@ public:
          if (did.count(i->soid)) continue;
          did.insert(i->soid);
 
-         if (i->is_delete()) continue;
+         if (!missing.may_include_deletes && i->is_delete())
+           continue;
 
          bufferlist bv;
          int r = store->getattr(
@@ -1220,22 +1409,30 @@ public:
                auto miter = missing.get_items().find(i->soid);
                assert(miter != missing.get_items().end());
                assert(miter->second.need == i->version);
-               assert(miter->second.have == oi.version);
+               // the 'have' version is reset if an object is deleted,
+               // then created again
+               assert(miter->second.have == oi.version || miter->second.have == eversion_t());
                checked.insert(i->soid);
              } else {
-               missing.add(i->soid, i->version, oi.version);
+               missing.add(i->soid, i->version, oi.version, i->is_delete());
              }
            }
          } else {
            ldpp_dout(dpp, 15) << "read_log_and_missing  missing " << *i << dendl;
            if (debug_verify_stored_missing) {
              auto miter = missing.get_items().find(i->soid);
-             assert(miter != missing.get_items().end());
-             assert(miter->second.need == i->version);
-             assert(miter->second.have == eversion_t());
+             if (i->is_delete()) {
+               assert(miter == missing.get_items().end() ||
+                      (miter->second.need == i->version &&
+                       miter->second.have == eversion_t()));
+             } else {
+               assert(miter != missing.get_items().end());
+               assert(miter->second.need == i->version);
+               assert(miter->second.have == eversion_t());
+             }
              checked.insert(i->soid);
            } else {
-             missing.add(i->soid, i->version, eversion_t());
+             missing.add(i->soid, i->version, eversion_t(), i->is_delete());
            }
          }
        }
@@ -1243,11 +1440,12 @@ public:
          for (auto &&i: missing.get_items()) {
            if (checked.count(i.first))
              continue;
-           if (i.second.need > log.tail ||
-             i.first > info.last_backfill) {
-             ldpp_dout(dpp, -1) << __func__ << ": invalid missing set entry found "
-                                << i.first
-                                << dendl;
+           if (i.first > info.last_backfill) {
+             ldpp_dout(dpp, -1) << __func__ << ": invalid missing set entry "
+                               << "found before last_backfill: "
+                               << i.first << " " << i.second
+                               << " last_backfill = " << info.last_backfill
+                               << dendl;
              assert(0 == "invalid missing set entry found");
            }
            bufferlist bv;
@@ -1258,13 +1456,13 @@ public:
              bv);
            if (r >= 0) {
              object_info_t oi(bv);
-             assert(oi.version == i.second.have);
+             assert(oi.version == i.second.have || eversion_t() == i.second.have);
            } else {
-             assert(eversion_t() == i.second.have);
+             assert(i.second.is_delete() || eversion_t() == i.second.have);
            }
          }
        } else {
-         assert(has_divergent_priors);
+         assert(must_rebuild);
          for (map<eversion_t, hobject_t>::reverse_iterator i =
                 divergent_priors.rbegin();
               i != divergent_priors.rend();
@@ -1309,7 +1507,7 @@ public:
              }
            } else {
              ldpp_dout(dpp, 15) << "read_log_and_missing  missing " << *i << dendl;
-             missing.add(i->second, i->first, eversion_t());
+             missing.add(i->second, i->first, eversion_t(), false);
            }
          }
        }
@@ -1318,13 +1516,11 @@ public:
       }
     }
 
-    if (!has_divergent_priors) {
+    if (!must_rebuild) {
       if (clear_divergent_priors)
        (*clear_divergent_priors) = false;
       missing.flush();
     }
     ldpp_dout(dpp, 10) << "read_log_and_missing done" << dendl;
-  }
-};
-
-#endif // CEPH_PG_LOG_H
+  } // static read_log_and_missing
+}; // struct PGLog