eversion_t s,
set<eversion_t> *trimmed,
set<string>* trimmed_dups,
- bool* dirty_dups)
+ eversion_t *write_from_dups)
{
if (complete_to != log.end() &&
complete_to->version <= s) {
unindex(e); // remove from index,
// add to dup list
+ generic_dout(20) << "earliest_dup_version = " << earliest_dup_version << dendl;
if (e.version.version >= earliest_dup_version) {
- if (dirty_dups) *dirty_dups = true;
+ if (write_from_dups != nullptr && *write_from_dups > e.version) {
+ generic_dout(20) << "updating write_from_dups from " << *write_from_dups << " to " << e.version << dendl;
+ *write_from_dups = e.version;
+ }
dups.push_back(pg_log_dup_t(e));
index(dups.back());
for (const auto& extra : e.extra_reqids) {
assert(trim_to <= info.last_complete);
dout(10) << "trim " << log << " to " << trim_to << dendl;
- log.trim(cct, trim_to, &trimmed, &trimmed_dups, &dirty_dups);
+ log.trim(cct, trim_to, &trimmed, &trimmed_dups, &write_from_dups);
info.log_tail = log.tail;
}
}
// now handle dups
if (merge_log_dups(olog)) {
- dirty_dups = true;
changed = true;
}
olog.dups.front().version << " to " <<
olog.dups.back().version << dendl;
changed = true;
+ dirty_from_dups = eversion_t();
+ dirty_to_dups = eversion_t::max();
// since our log.dups is empty just copy them
for (const auto& i : olog.dups) {
log.dups.push_back(i);
auto log_tail_version = log.dups.back().version;
auto insert_cursor = log.dups.end();
+ eversion_t last_shared = eversion_t::max();
for (auto i = olog.dups.crbegin(); i != olog.dups.crend(); ++i) {
if (i->version <= log_tail_version) break;
log.dups.insert(insert_cursor, *i);
+ last_shared = i->version;
auto prev = insert_cursor;
--prev;
--insert_cursor; // make sure we insert in reverse order
}
+ mark_dirty_from_dups(last_shared);
}
if (olog.dups.front().version < log.dups.front().version) {
olog.dups.front().version << dendl;
changed = true;
+ eversion_t last;
auto insert_cursor = log.dups.begin();
for (auto i = olog.dups.cbegin(); i != olog.dups.cend(); ++i) {
if (i->version >= insert_cursor->version) break;
log.dups.insert(insert_cursor, *i);
+ last = i->version;
auto prev = insert_cursor;
--prev;
// be sure to pass address of copy in log.dups
log.index(*prev);
}
+ mark_dirty_to_dups(last);
}
}
}
while (!log.dups.empty() && log.dups.back().version >= log.tail) {
log.unindex(log.dups.back());
+ mark_dirty_from_dups(log.dups.back().version);
log.dups.pop_back();
}
}
!touched_log,
require_rollback,
clear_divergent_priors,
- dirty_dups,
+ dirty_to_dups,
+ dirty_from_dups,
+ write_from_dups,
&rebuilt_missing_with_deletes,
(pg_log_debug ? &log_keys_debug : nullptr));
undirty();
pg_log_t &log,
const coll_t& coll, const ghobject_t &log_oid,
map<eversion_t, hobject_t> &divergent_priors,
- bool require_rollback,
- bool dirty_dups)
+ bool require_rollback
+ )
{
_write_log_and_missing_wo_missing(
t, km, log, coll, log_oid,
divergent_priors, eversion_t::max(), eversion_t(), eversion_t(),
set<eversion_t>(),
set<string>(),
- true, true, require_rollback, dirty_dups, nullptr);
+ true, true, require_rollback,
+ eversion_t::max(), eversion_t(), eversion_t(), nullptr);
}
// static
const ghobject_t &log_oid,
const pg_missing_tracker_t &missing,
bool require_rollback,
- bool dirty_dups,
bool *rebuilt_missing_with_deletes)
{
_write_log_and_missing(
set<eversion_t>(),
set<string>(),
missing,
- true, require_rollback, false, dirty_dups, rebuilt_missing_with_deletes, nullptr);
+ true, require_rollback, false,
+ eversion_t::max(),
+ eversion_t(),
+ eversion_t(),
+ rebuilt_missing_with_deletes, nullptr);
}
// static
bool dirty_divergent_priors,
bool touch_log,
bool require_rollback,
- bool dirty_dups,
+ eversion_t dirty_to_dups,
+ eversion_t dirty_from_dups,
+ eversion_t write_from_dups,
set<string> *log_keys_debug
)
{
}
}
- // process dirty_dups after log_keys_debug is filled, so dups do not
+ // process dups after log_keys_debug is filled, so dups do not
// end up in that set
- if (dirty_dups) {
- pg_log_dup_t min;
+ if (dirty_to_dups != eversion_t()) {
+ pg_log_dup_t min, dirty_to_dup;
+ dirty_to_dup.version = dirty_to_dups;
t.omap_rmkeyrange(
coll, log_oid,
- min.get_key_name(), log.dups.begin()->get_key_name());
- for (const auto& entry : log.dups) {
- bufferlist bl;
- ::encode(entry, bl);
- (*km)[entry.get_key_name()].claim(bl);
- }
+ min.get_key_name(), dirty_to_dup.get_key_name());
+ }
+ if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
+ pg_log_dup_t max, dirty_from_dup;
+ max.version = eversion_t::max();
+ dirty_from_dup.version = dirty_from_dups;
+ t.omap_rmkeyrange(
+ coll, log_oid,
+ dirty_from_dup.get_key_name(), max.get_key_name());
+ }
+
+ for (const auto& entry : log.dups) {
+ if (entry.version > dirty_to_dups)
+ break;
+ bufferlist bl;
+ ::encode(entry, bl);
+ (*km)[entry.get_key_name()].claim(bl);
+ }
+
+ for (list<pg_log_dup_t>::reverse_iterator p = log.dups.rbegin();
+ p != log.dups.rend() &&
+ (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
+ p->version >= dirty_to_dups;
+ ++p) {
+ bufferlist bl;
+ ::encode(*p, bl);
+ (*km)[p->get_key_name()].claim(bl);
}
if (dirty_divergent_priors) {
bool touch_log,
bool require_rollback,
bool clear_divergent_priors,
- bool dirty_dups,
+ eversion_t dirty_to_dups,
+ eversion_t dirty_from_dups,
+ eversion_t write_from_dups,
bool *rebuilt_missing_with_deletes, // in/out param
set<string> *log_keys_debug
) {
}
}
- // process dirty_dups after log_keys_debug is filled, so dups do not
+ // process dups after log_keys_debug is filled, so dups do not
// end up in that set
- if (dirty_dups) {
- pg_log_dup_t min;
+ if (dirty_to_dups != eversion_t()) {
+ pg_log_dup_t min, dirty_to_dup;
+ dirty_to_dup.version = dirty_to_dups;
t.omap_rmkeyrange(
coll, log_oid,
- min.get_key_name(), log.dups.begin()->get_key_name());
- for (const auto& entry : log.dups) {
- bufferlist bl;
- ::encode(entry, bl);
- (*km)[entry.get_key_name()].claim(bl);
- }
+ min.get_key_name(), dirty_to_dup.get_key_name());
+ }
+ if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
+ pg_log_dup_t max, dirty_from_dup;
+ max.version = eversion_t::max();
+ dirty_from_dup.version = dirty_from_dups;
+ t.omap_rmkeyrange(
+ coll, log_oid,
+ dirty_from_dup.get_key_name(), max.get_key_name());
+ }
+
+ for (const auto& entry : log.dups) {
+ if (entry.version > dirty_to_dups)
+ break;
+ bufferlist bl;
+ ::encode(entry, bl);
+ (*km)[entry.get_key_name()].claim(bl);
+ }
+
+ for (list<pg_log_dup_t>::reverse_iterator p = log.dups.rbegin();
+ p != log.dups.rend() &&
+ (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
+ p->version >= dirty_to_dups;
+ ++p) {
+ bufferlist bl;
+ ::encode(*p, bl);
+ (*km)[p->get_key_name()].claim(bl);
}
if (clear_divergent_priors) {