1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
9 * Author: Loic Dachary <loic@dachary.org>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
19 #include "include/unordered_map.h"
20 #include "common/ceph_context.h"
22 #define dout_context cct
23 #define dout_subsys ceph_subsys_osd
25 #define dout_prefix _prefix(_dout, this)
27 static ostream
& _prefix(std::ostream
*_dout
, const PGLog
*pglog
)
29 return *_dout
<< pglog
->gen_prefix();
32 //////////////////// PGLog::IndexedLog ////////////////////
34 void PGLog::IndexedLog::split_out_child(
37 PGLog::IndexedLog
*target
)
40 *target
= IndexedLog(pg_log_t::split_out_child(child_pgid
, split_bits
));
43 reset_rollback_info_trimmed_to_riter();
46 void PGLog::IndexedLog::trim(
49 set
<eversion_t
> *trimmed
,
50 set
<string
>* trimmed_dups
,
53 if (complete_to
!= log
.end() &&
54 complete_to
->version
<= s
) {
55 generic_dout(0) << " bad trim to " << s
<< " when complete_to is "
56 << complete_to
->version
57 << " on " << *this << dendl
;
60 assert(s
<= can_rollback_to
);
62 auto earliest_dup_version
=
63 log
.rbegin()->version
.version
< cct
->_conf
->osd_pg_log_dups_tracked
65 : log
.rbegin()->version
.version
- cct
->_conf
->osd_pg_log_dups_tracked
;
67 while (!log
.empty()) {
68 const pg_log_entry_t
&e
= *log
.begin();
71 generic_dout(20) << "trim " << e
<< dendl
;
73 trimmed
->insert(e
.version
);
75 unindex(e
); // remove from index,
78 if (e
.version
.version
>= earliest_dup_version
) {
79 if (dirty_dups
) *dirty_dups
= true;
80 dups
.push_back(pg_log_dup_t(e
));
82 for (const auto& extra
: e
.extra_reqids
) {
83 // note: extras have the same version as outer op
84 dups
.push_back(pg_log_dup_t(e
.version
, extra
.second
,
85 extra
.first
, e
.return_code
));
90 if (rollback_info_trimmed_to_riter
== log
.rend() ||
91 e
.version
== rollback_info_trimmed_to_riter
->version
) {
93 rollback_info_trimmed_to_riter
= log
.rend();
99 while (!dups
.empty()) {
100 const auto& e
= *dups
.begin();
101 if (e
.version
.version
>= earliest_dup_version
)
103 generic_dout(20) << "trim dup " << e
<< dendl
;
105 trimmed_dups
->insert(e
.get_key_name());
106 if (indexed_data
& PGLOG_INDEXED_DUPS
) {
107 dup_index
.erase(e
.reqid
);
117 ostream
& PGLog::IndexedLog::print(ostream
& out
) const
119 out
<< *this << std::endl
;
120 for (list
<pg_log_entry_t
>::const_iterator p
= log
.begin();
124 (logged_object(p
->soid
) ? "indexed" : "NOT INDEXED") <<
126 assert(!p
->reqid_is_indexed() || logged_req(p
->reqid
));
129 for (list
<pg_log_dup_t
>::const_iterator p
= dups
.begin();
132 out
<< *p
<< std::endl
;
138 //////////////////// PGLog ////////////////////
140 void PGLog::reset_backfill()
145 void PGLog::clear() {
148 log_keys_debug
.clear();
152 void PGLog::clear_info_log(
154 ObjectStore::Transaction
*t
) {
156 t
->remove(coll
, pgid
.make_pgmeta_oid());
164 if (trim_to
> log
.tail
) {
165 // We shouldn't be trimming the log past last_complete
166 assert(trim_to
<= info
.last_complete
);
168 dout(10) << "trim " << log
<< " to " << trim_to
<< dendl
;
169 log
.trim(cct
, trim_to
, &trimmed
, &trimmed_dups
, &dirty_dups
);
170 info
.log_tail
= log
.tail
;
174 void PGLog::proc_replica_log(
176 const pg_log_t
&olog
,
177 pg_missing_t
& omissing
,
178 pg_shard_t from
) const
180 dout(10) << "proc_replica_log for osd." << from
<< ": "
181 << oinfo
<< " " << olog
<< " " << omissing
<< dendl
;
183 if (olog
.head
< log
.tail
) {
184 dout(10) << __func__
<< ": osd." << from
<< " does not overlap, not looking "
185 << "for divergent objects" << dendl
;
188 if (olog
.head
== log
.head
) {
189 dout(10) << __func__
<< ": osd." << from
<< " same log head, not looking "
190 << "for divergent objects" << dendl
;
193 assert(olog
.head
>= log
.tail
);
196 basically what we're doing here is rewinding the remote log,
197 dropping divergent entries, until we find something that matches
198 our master log. we then reset last_update to reflect the new
199 point up to which missing is accurate.
201 later, in activate(), missing will get wound forward again and
202 we will send the peer enough log to arrive at the same state.
205 for (map
<hobject_t
, pg_missing_item
>::const_iterator i
= omissing
.get_items().begin();
206 i
!= omissing
.get_items().end();
208 dout(20) << " before missing " << i
->first
<< " need " << i
->second
.need
209 << " have " << i
->second
.have
<< dendl
;
212 list
<pg_log_entry_t
>::const_reverse_iterator first_non_divergent
=
215 if (first_non_divergent
== log
.log
.rend())
217 if (first_non_divergent
->version
<= olog
.head
) {
218 dout(20) << "merge_log point (usually last shared) is "
219 << *first_non_divergent
<< dendl
;
222 ++first_non_divergent
;
225 /* Because olog.head >= log.tail, we know that both pgs must at least have
226 * the event represented by log.tail. Similarly, because log.head >= olog.tail,
227 * we know that the even represented by olog.tail must be common to both logs.
228 * Furthermore, the event represented by a log tail was necessarily trimmed,
229 * thus neither olog.tail nor log.tail can be divergent. It's
230 * possible that olog/log contain no actual events between olog.head and
231 * MAX(log.tail, olog.tail), however, since they might have been split out.
232 * Thus, if we cannot find an event e such that
233 * log.tail <= e.version <= log.head, the last_update must actually be
234 * MAX(log.tail, olog.tail).
236 eversion_t limit
= MAX(olog
.tail
, log
.tail
);
238 (first_non_divergent
== log
.log
.rend() ||
239 first_non_divergent
->version
< limit
) ?
241 first_non_divergent
->version
;
243 IndexedLog
folog(olog
);
244 auto divergent
= folog
.rewind_from_head(lu
);
245 _merge_divergent_entries(
249 olog
.get_can_rollback_to(),
254 if (lu
< oinfo
.last_update
) {
255 dout(10) << " peer osd." << from
<< " last_update now " << lu
<< dendl
;
256 oinfo
.last_update
= lu
;
259 if (omissing
.have_missing()) {
260 eversion_t first_missing
=
261 omissing
.get_items().at(omissing
.get_rmissing().begin()->second
).need
;
262 oinfo
.last_complete
= eversion_t();
263 list
<pg_log_entry_t
>::const_iterator i
= olog
.log
.begin();
267 if (i
->version
< first_missing
)
268 oinfo
.last_complete
= i
->version
;
273 oinfo
.last_complete
= oinfo
.last_update
;
275 } // proc_replica_log
278 * rewind divergent entries at the head of the log
280 * This rewinds entries off the head of our log that are divergent.
281 * This is used by replicas during activation.
283 * @param newhead new head to rewind to
285 void PGLog::rewind_divergent_log(eversion_t newhead
,
286 pg_info_t
&info
, LogEntryHandler
*rollbacker
,
287 bool &dirty_info
, bool &dirty_big_info
)
289 dout(10) << "rewind_divergent_log truncate divergent future " <<
293 if (info
.last_complete
> newhead
)
294 info
.last_complete
= newhead
;
296 auto divergent
= log
.rewind_from_head(newhead
);
297 if (!divergent
.empty()) {
298 mark_dirty_from(divergent
.front().version
);
300 for (auto &&entry
: divergent
) {
301 dout(10) << "rewind_divergent_log future divergent " << entry
<< dendl
;
303 info
.last_update
= newhead
;
305 _merge_divergent_entries(
309 log
.get_can_rollback_to(),
315 dirty_big_info
= true;
318 void PGLog::merge_log(pg_info_t
&oinfo
, pg_log_t
&olog
, pg_shard_t fromosd
,
319 pg_info_t
&info
, LogEntryHandler
*rollbacker
,
320 bool &dirty_info
, bool &dirty_big_info
)
322 dout(10) << "merge_log " << olog
<< " from osd." << fromosd
323 << " into " << log
<< dendl
;
325 // Check preconditions
327 // If our log is empty, the incoming log needs to have not been trimmed.
328 assert(!log
.null() || olog
.tail
== eversion_t());
329 // The logs must overlap.
330 assert(log
.head
>= olog
.tail
&& olog
.head
>= log
.tail
);
332 for (map
<hobject_t
, pg_missing_item
>::const_iterator i
= missing
.get_items().begin();
333 i
!= missing
.get_items().end();
335 dout(20) << "pg_missing_t sobject: " << i
->first
<< dendl
;
338 bool changed
= false;
341 // this is just filling in history. it does not affect our
342 // missing set, as that should already be consistent with our
344 eversion_t orig_tail
= log
.tail
;
345 if (olog
.tail
< log
.tail
) {
346 dout(10) << "merge_log extending tail to " << olog
.tail
<< dendl
;
347 list
<pg_log_entry_t
>::iterator from
= olog
.log
.begin();
348 list
<pg_log_entry_t
>::iterator to
;
351 to
!= olog
.log
.end();
353 if (to
->version
> log
.tail
)
356 dout(15) << *to
<< dendl
;
361 // splice into our log.
362 log
.log
.splice(log
.log
.begin(),
365 info
.log_tail
= log
.tail
= olog
.tail
;
369 if (oinfo
.stats
.reported_seq
< info
.stats
.reported_seq
|| // make sure reported always increases
370 oinfo
.stats
.reported_epoch
< info
.stats
.reported_epoch
) {
371 oinfo
.stats
.reported_seq
= info
.stats
.reported_seq
;
372 oinfo
.stats
.reported_epoch
= info
.stats
.reported_epoch
;
374 if (info
.last_backfill
.is_max())
375 info
.stats
= oinfo
.stats
;
376 info
.hit_set
= oinfo
.hit_set
;
378 // do we have divergent entries to throw out?
379 if (olog
.head
< log
.head
) {
380 rewind_divergent_log(olog
.head
, info
, rollbacker
, dirty_info
, dirty_big_info
);
385 if (olog
.head
> log
.head
) {
386 dout(10) << "merge_log extending head to " << olog
.head
<< dendl
;
388 // find start point in olog
389 list
<pg_log_entry_t
>::iterator to
= olog
.log
.end();
390 list
<pg_log_entry_t
>::iterator from
= olog
.log
.end();
391 eversion_t lower_bound
= MAX(olog
.tail
, orig_tail
);
393 if (from
== olog
.log
.begin())
396 dout(20) << " ? " << *from
<< dendl
;
397 if (from
->version
<= log
.head
) {
398 lower_bound
= MAX(lower_bound
, from
->version
);
403 dout(20) << "merge_log cut point (usually last shared) is "
404 << lower_bound
<< dendl
;
405 mark_dirty_from(lower_bound
);
407 auto divergent
= log
.rewind_from_head(lower_bound
);
408 // move aside divergent items
409 for (auto &&oe
: divergent
) {
410 dout(10) << "merge_log divergent " << oe
<< dendl
;
412 log
.roll_forward_to(log
.head
, rollbacker
);
414 mempool::osd_pglog::list
<pg_log_entry_t
> new_entries
;
415 new_entries
.splice(new_entries
.end(), olog
.log
, from
, to
);
416 append_log_entries_update_missing(
418 info
.last_backfill_bitwise
,
426 _merge_divergent_entries(
430 log
.get_can_rollback_to(),
435 info
.last_update
= log
.head
= olog
.head
;
437 // We cannot rollback into the new log entries
438 log
.skip_can_rollback_to_to_head();
440 info
.last_user_version
= oinfo
.last_user_version
;
441 info
.purged_snaps
= oinfo
.purged_snaps
;
447 if (merge_log_dups(olog
)) {
452 dout(10) << "merge_log result " << log
<< " " << missing
<<
453 " changed=" << changed
<< dendl
;
457 dirty_big_info
= true;
462 // returns true if any changes were made to log.dups
463 bool PGLog::merge_log_dups(const pg_log_t
& olog
) {
464 bool changed
= false;
466 if (!olog
.dups
.empty()) {
467 if (log
.dups
.empty()) {
468 dout(10) << "merge_log copying olog dups to log " <<
469 olog
.dups
.front().version
<< " to " <<
470 olog
.dups
.back().version
<< dendl
;
472 // since our log.dups is empty just copy them
473 for (const auto& i
: olog
.dups
) {
474 log
.dups
.push_back(i
);
475 log
.index(log
.dups
.back());
478 // since our log.dups is not empty try to extend on each end
480 if (olog
.dups
.back().version
> log
.dups
.back().version
) {
481 // extend the dups's tail (i.e., newer dups)
482 dout(10) << "merge_log extending dups tail to " <<
483 olog
.dups
.back().version
<< dendl
;
486 auto log_tail_version
= log
.dups
.back().version
;
488 auto insert_cursor
= log
.dups
.end();
489 for (auto i
= olog
.dups
.crbegin(); i
!= olog
.dups
.crend(); ++i
) {
490 if (i
->version
<= log_tail_version
) break;
491 log
.dups
.insert(insert_cursor
, *i
);
493 auto prev
= insert_cursor
;
495 // be sure to pass reference of copy in log.dups
498 --insert_cursor
; // make sure we insert in reverse order
502 if (olog
.dups
.front().version
< log
.dups
.front().version
) {
503 // extend the dups's head (i.e., older dups)
504 dout(10) << "merge_log extending dups head to " <<
505 olog
.dups
.front().version
<< dendl
;
508 auto insert_cursor
= log
.dups
.begin();
509 for (auto i
= olog
.dups
.cbegin(); i
!= olog
.dups
.cend(); ++i
) {
510 if (i
->version
>= insert_cursor
->version
) break;
511 log
.dups
.insert(insert_cursor
, *i
);
512 auto prev
= insert_cursor
;
514 // be sure to pass address of copy in log.dups
521 // remove any dup entries that overlap with pglog
522 if (!log
.dups
.empty() && log
.dups
.back().version
>= log
.tail
) {
523 dout(10) << "merge_log removed dups overlapping log entries [" <<
524 log
.tail
<< "," << log
.dups
.back().version
<< "]" << dendl
;
527 while (!log
.dups
.empty() && log
.dups
.back().version
>= log
.tail
) {
528 log
.unindex(log
.dups
.back());
536 void PGLog::check() {
539 if (log
.log
.size() != log_keys_debug
.size()) {
540 derr
<< "log.log.size() != log_keys_debug.size()" << dendl
;
541 derr
<< "actual log:" << dendl
;
542 for (list
<pg_log_entry_t
>::iterator i
= log
.log
.begin();
545 derr
<< " " << *i
<< dendl
;
547 derr
<< "log_keys_debug:" << dendl
;
548 for (set
<string
>::const_iterator i
= log_keys_debug
.begin();
549 i
!= log_keys_debug
.end();
551 derr
<< " " << *i
<< dendl
;
554 assert(log
.log
.size() == log_keys_debug
.size());
555 for (list
<pg_log_entry_t
>::iterator i
= log
.log
.begin();
558 assert(log_keys_debug
.count(i
->get_key_name()));
563 void PGLog::write_log_and_missing(
564 ObjectStore::Transaction
& t
,
565 map
<string
,bufferlist
> *km
,
567 const ghobject_t
&log_oid
,
568 bool require_rollback
)
571 dout(5) << "write_log_and_missing with: "
572 << "dirty_to: " << dirty_to
573 << ", dirty_from: " << dirty_from
574 << ", writeout_from: " << writeout_from
575 << ", trimmed: " << trimmed
576 << ", trimmed_dups: " << trimmed_dups
577 << ", clear_divergent_priors: " << clear_divergent_priors
579 _write_log_and_missing(
580 t
, km
, log
, coll
, log_oid
,
589 clear_divergent_priors
,
591 &rebuilt_missing_with_deletes
,
592 (pg_log_debug
? &log_keys_debug
: nullptr));
595 dout(10) << "log is not dirty" << dendl
;
600 void PGLog::write_log_and_missing_wo_missing(
601 ObjectStore::Transaction
& t
,
602 map
<string
,bufferlist
> *km
,
604 const coll_t
& coll
, const ghobject_t
&log_oid
,
605 map
<eversion_t
, hobject_t
> &divergent_priors
,
606 bool require_rollback
,
609 _write_log_and_missing_wo_missing(
610 t
, km
, log
, coll
, log_oid
,
611 divergent_priors
, eversion_t::max(), eversion_t(), eversion_t(),
614 true, true, require_rollback
, dirty_dups
, nullptr);
618 void PGLog::write_log_and_missing(
619 ObjectStore::Transaction
& t
,
620 map
<string
,bufferlist
> *km
,
623 const ghobject_t
&log_oid
,
624 const pg_missing_tracker_t
&missing
,
625 bool require_rollback
,
627 bool *rebuilt_missing_with_deletes
)
629 _write_log_and_missing(
630 t
, km
, log
, coll
, log_oid
,
637 true, require_rollback
, false, dirty_dups
, rebuilt_missing_with_deletes
, nullptr);
641 void PGLog::_write_log_and_missing_wo_missing(
642 ObjectStore::Transaction
& t
,
643 map
<string
,bufferlist
> *km
,
645 const coll_t
& coll
, const ghobject_t
&log_oid
,
646 map
<eversion_t
, hobject_t
> &divergent_priors
,
648 eversion_t dirty_from
,
649 eversion_t writeout_from
,
650 const set
<eversion_t
> &trimmed
,
651 const set
<string
> &trimmed_dups
,
652 bool dirty_divergent_priors
,
654 bool require_rollback
,
656 set
<string
> *log_keys_debug
659 set
<string
> to_remove(trimmed_dups
);
660 for (set
<eversion_t
>::const_iterator i
= trimmed
.begin();
663 to_remove
.insert(i
->get_key_name());
664 if (log_keys_debug
) {
665 assert(log_keys_debug
->count(i
->get_key_name()));
666 log_keys_debug
->erase(i
->get_key_name());
670 // dout(10) << "write_log_and_missing, clearing up to " << dirty_to << dendl;
672 t
.touch(coll
, log_oid
);
673 if (dirty_to
!= eversion_t()) {
676 eversion_t().get_key_name(), dirty_to
.get_key_name());
677 clear_up_to(log_keys_debug
, dirty_to
.get_key_name());
679 if (dirty_to
!= eversion_t::max() && dirty_from
!= eversion_t::max()) {
680 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
683 dirty_from
.get_key_name(), eversion_t::max().get_key_name());
684 clear_after(log_keys_debug
, dirty_from
.get_key_name());
687 for (list
<pg_log_entry_t
>::iterator p
= log
.log
.begin();
688 p
!= log
.log
.end() && p
->version
<= dirty_to
;
690 bufferlist
bl(sizeof(*p
) * 2);
691 p
->encode_with_checksum(bl
);
692 (*km
)[p
->get_key_name()].claim(bl
);
695 for (list
<pg_log_entry_t
>::reverse_iterator p
= log
.log
.rbegin();
696 p
!= log
.log
.rend() &&
697 (p
->version
>= dirty_from
|| p
->version
>= writeout_from
) &&
698 p
->version
>= dirty_to
;
700 bufferlist
bl(sizeof(*p
) * 2);
701 p
->encode_with_checksum(bl
);
702 (*km
)[p
->get_key_name()].claim(bl
);
705 if (log_keys_debug
) {
706 for (map
<string
, bufferlist
>::iterator i
= (*km
).begin();
709 if (i
->first
[0] == '_')
711 assert(!log_keys_debug
->count(i
->first
));
712 log_keys_debug
->insert(i
->first
);
716 // process dirty_dups after log_keys_debug is filled, so dups do not
717 // end up in that set
722 min
.get_key_name(), log
.dups
.begin()->get_key_name());
723 for (const auto& entry
: log
.dups
) {
726 (*km
)[entry
.get_key_name()].claim(bl
);
730 if (dirty_divergent_priors
) {
731 //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
732 ::encode(divergent_priors
, (*km
)["divergent_priors"]);
734 if (require_rollback
) {
736 log
.get_can_rollback_to(),
737 (*km
)["can_rollback_to"]);
739 log
.get_rollback_info_trimmed_to(),
740 (*km
)["rollback_info_trimmed_to"]);
743 if (!to_remove
.empty())
744 t
.omap_rmkeys(coll
, log_oid
, to_remove
);
748 void PGLog::_write_log_and_missing(
749 ObjectStore::Transaction
& t
,
750 map
<string
,bufferlist
>* km
,
752 const coll_t
& coll
, const ghobject_t
&log_oid
,
754 eversion_t dirty_from
,
755 eversion_t writeout_from
,
756 const set
<eversion_t
> &trimmed
,
757 const set
<string
> &trimmed_dups
,
758 const pg_missing_tracker_t
&missing
,
760 bool require_rollback
,
761 bool clear_divergent_priors
,
763 bool *rebuilt_missing_with_deletes
, // in/out param
764 set
<string
> *log_keys_debug
766 set
<string
> to_remove(trimmed_dups
);
767 for (set
<eversion_t
>::const_iterator i
= trimmed
.begin();
770 to_remove
.insert(i
->get_key_name());
771 if (log_keys_debug
) {
772 assert(log_keys_debug
->count(i
->get_key_name()));
773 log_keys_debug
->erase(i
->get_key_name());
778 t
.touch(coll
, log_oid
);
779 if (dirty_to
!= eversion_t()) {
782 eversion_t().get_key_name(), dirty_to
.get_key_name());
783 clear_up_to(log_keys_debug
, dirty_to
.get_key_name());
785 if (dirty_to
!= eversion_t::max() && dirty_from
!= eversion_t::max()) {
786 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
789 dirty_from
.get_key_name(), eversion_t::max().get_key_name());
790 clear_after(log_keys_debug
, dirty_from
.get_key_name());
793 for (list
<pg_log_entry_t
>::iterator p
= log
.log
.begin();
794 p
!= log
.log
.end() && p
->version
<= dirty_to
;
796 bufferlist
bl(sizeof(*p
) * 2);
797 p
->encode_with_checksum(bl
);
798 (*km
)[p
->get_key_name()].claim(bl
);
801 for (list
<pg_log_entry_t
>::reverse_iterator p
= log
.log
.rbegin();
802 p
!= log
.log
.rend() &&
803 (p
->version
>= dirty_from
|| p
->version
>= writeout_from
) &&
804 p
->version
>= dirty_to
;
806 bufferlist
bl(sizeof(*p
) * 2);
807 p
->encode_with_checksum(bl
);
808 (*km
)[p
->get_key_name()].claim(bl
);
811 if (log_keys_debug
) {
812 for (map
<string
, bufferlist
>::iterator i
= (*km
).begin();
815 if (i
->first
[0] == '_')
817 assert(!log_keys_debug
->count(i
->first
));
818 log_keys_debug
->insert(i
->first
);
822 // process dirty_dups after log_keys_debug is filled, so dups do not
823 // end up in that set
828 min
.get_key_name(), log
.dups
.begin()->get_key_name());
829 for (const auto& entry
: log
.dups
) {
832 (*km
)[entry
.get_key_name()].claim(bl
);
836 if (clear_divergent_priors
) {
837 //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
838 to_remove
.insert("divergent_priors");
840 // since we encode individual missing items instead of a whole
841 // missing set, we need another key to store this bit of state
842 if (*rebuilt_missing_with_deletes
) {
843 (*km
)["may_include_deletes_in_missing"] = bufferlist();
844 *rebuilt_missing_with_deletes
= false;
847 [&](const hobject_t
&obj
) {
848 string key
= string("missing/") + obj
.to_str();
849 pg_missing_item item
;
850 if (!missing
.is_missing(obj
, &item
)) {
851 to_remove
.insert(key
);
853 uint64_t features
= missing
.may_include_deletes
? CEPH_FEATURE_OSD_RECOVERY_DELETES
: 0;
854 ::encode(make_pair(obj
, item
), (*km
)[key
], features
);
857 if (require_rollback
) {
859 log
.get_can_rollback_to(),
860 (*km
)["can_rollback_to"]);
862 log
.get_rollback_info_trimmed_to(),
863 (*km
)["rollback_info_trimmed_to"]);
866 if (!to_remove
.empty())
867 t
.omap_rmkeys(coll
, log_oid
, to_remove
);
870 void PGLog::rebuild_missing_set_with_deletes(ObjectStore
*store
,
872 const pg_info_t
&info
)
874 // save entries not generated from the current log (e.g. added due
875 // to repair, EIO handling, or divergent_priors).
876 map
<hobject_t
, pg_missing_item
> extra_missing
;
877 for (const auto& p
: missing
.get_items()) {
878 if (!log
.logged_object(p
.first
)) {
879 dout(20) << __func__
<< " extra missing entry: " << p
.first
880 << " " << p
.second
<< dendl
;
881 extra_missing
[p
.first
] = p
.second
;
885 missing
.may_include_deletes
= true;
887 // go through the log and add items that are not present or older
888 // versions on disk, just as if we were reading the log + metadata
889 // off disk originally
891 for (list
<pg_log_entry_t
>::reverse_iterator i
= log
.log
.rbegin();
894 if (i
->version
<= info
.last_complete
)
896 if (i
->soid
> info
.last_backfill
||
898 did
.find(i
->soid
) != did
.end())
903 int r
= store
->getattr(
905 ghobject_t(i
->soid
, ghobject_t::NO_GEN
, info
.pgid
.shard
),
908 dout(20) << __func__
<< " check for log entry: " << *i
<< " = " << r
<< dendl
;
911 object_info_t
oi(bv
);
912 dout(20) << __func__
<< " store version = " << oi
.version
<< dendl
;
913 if (oi
.version
< i
->version
) {
914 missing
.add(i
->soid
, i
->version
, oi
.version
, i
->is_delete());
917 missing
.add(i
->soid
, i
->version
, eversion_t(), i
->is_delete());
921 for (const auto& p
: extra_missing
) {
922 missing
.add(p
.first
, p
.second
.need
, p
.second
.have
, p
.second
.is_delete());
924 rebuilt_missing_with_deletes
= true;