1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
9 * Author: Loic Dachary <loic@dachary.org>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
19 // re-include our assert to clobber boost's
20 #include "include/ceph_assert.h"
21 #include "osd_types.h"
22 #include "os/ObjectStore.h"
25 constexpr auto PGLOG_INDEXED_OBJECTS
= 1 << 0;
26 constexpr auto PGLOG_INDEXED_CALLER_OPS
= 1 << 1;
27 constexpr auto PGLOG_INDEXED_EXTRA_CALLER_OPS
= 1 << 2;
28 constexpr auto PGLOG_INDEXED_DUPS
= 1 << 3;
29 constexpr auto PGLOG_INDEXED_ALL
= PGLOG_INDEXED_OBJECTS
30 | PGLOG_INDEXED_CALLER_OPS
31 | PGLOG_INDEXED_EXTRA_CALLER_OPS
36 struct PGLog
: DoutPrefixProvider
{
37 std::ostream
& gen_prefix(std::ostream
& out
) const override
{
40 unsigned get_subsys() const override
{
41 return static_cast<unsigned>(ceph_subsys_osd
);
43 CephContext
*get_cct() const override
{
47 ////////////////////////////// sub classes //////////////////////////////
48 struct LogEntryHandler
{
49 virtual void rollback(
50 const pg_log_entry_t
&entry
) = 0;
51 virtual void rollforward(
52 const pg_log_entry_t
&entry
) = 0;
54 const pg_log_entry_t
&entry
) = 0;
56 const hobject_t
&hoid
) = 0;
57 virtual void try_stash(
58 const hobject_t
&hoid
,
60 virtual ~LogEntryHandler() {}
65 * IndexLog - adds in-memory index of the log, by oid.
66 * plus some methods to manipulate it all.
68 struct IndexedLog
: public pg_log_t
{
69 mutable ceph::unordered_map
<hobject_t
,pg_log_entry_t
*> objects
; // ptrs into log. be careful!
70 mutable ceph::unordered_map
<osd_reqid_t
,pg_log_entry_t
*> caller_ops
;
71 mutable ceph::unordered_multimap
<osd_reqid_t
,pg_log_entry_t
*> extra_caller_ops
;
72 mutable ceph::unordered_map
<osd_reqid_t
,pg_log_dup_t
*> dup_index
;
75 list
<pg_log_entry_t
>::iterator complete_to
; // not inclusive of referenced item
76 version_t last_requested
= 0; // last object requested by primary
80 mutable __u16 indexed_data
= 0;
82 * rollback_info_trimmed_to_riter points to the first log entry <=
83 * rollback_info_trimmed_to
85 * It's a reverse_iterator because rend() is a natural representation for
86 * tail, and rbegin() works nicely for head.
88 mempool::osd_pglog::list
<pg_log_entry_t
>::reverse_iterator
89 rollback_info_trimmed_to_riter
;
92 void advance_can_rollback_to(eversion_t to
, F
&&f
) {
93 if (to
> can_rollback_to
)
96 if (to
> rollback_info_trimmed_to
)
97 rollback_info_trimmed_to
= to
;
99 while (rollback_info_trimmed_to_riter
!= log
.rbegin()) {
100 --rollback_info_trimmed_to_riter
;
101 if (rollback_info_trimmed_to_riter
->version
> rollback_info_trimmed_to
) {
102 ++rollback_info_trimmed_to_riter
;
105 f(*rollback_info_trimmed_to_riter
);
109 void reset_rollback_info_trimmed_to_riter() {
110 rollback_info_trimmed_to_riter
= log
.rbegin();
111 while (rollback_info_trimmed_to_riter
!= log
.rend() &&
112 rollback_info_trimmed_to_riter
->version
> rollback_info_trimmed_to
)
113 ++rollback_info_trimmed_to_riter
;
116 // indexes objects, caller ops and extra caller ops
119 complete_to(log
.end()),
122 rollback_info_trimmed_to_riter(log
.rbegin())
125 template <typename
... Args
>
126 explicit IndexedLog(Args
&&... args
) :
127 pg_log_t(std::forward
<Args
>(args
)...),
128 complete_to(log
.end()),
131 rollback_info_trimmed_to_riter(log
.rbegin())
133 reset_rollback_info_trimmed_to_riter();
137 IndexedLog(const IndexedLog
&rhs
) :
139 complete_to(log
.end()),
140 last_requested(rhs
.last_requested
),
142 rollback_info_trimmed_to_riter(log
.rbegin())
144 reset_rollback_info_trimmed_to_riter();
145 index(rhs
.indexed_data
);
148 IndexedLog
&operator=(const IndexedLog
&rhs
) {
150 new (this) IndexedLog(rhs
);
154 void trim_rollback_info_to(eversion_t to
, LogEntryHandler
*h
) {
155 advance_can_rollback_to(
157 [&](pg_log_entry_t
&entry
) {
161 void roll_forward_to(eversion_t to
, LogEntryHandler
*h
) {
162 advance_can_rollback_to(
164 [&](pg_log_entry_t
&entry
) {
165 h
->rollforward(entry
);
169 void skip_can_rollback_to_to_head() {
170 advance_can_rollback_to(head
, [&](const pg_log_entry_t
&entry
) {});
173 mempool::osd_pglog::list
<pg_log_entry_t
> rewind_from_head(eversion_t newhead
) {
174 auto divergent
= pg_log_t::rewind_from_head(newhead
);
176 reset_rollback_info_trimmed_to_riter();
180 template <typename T
>
182 const eversion_t
&bound
, ///< [in] scan entries > bound
184 auto iter
= log
.rbegin();
185 while (iter
!= log
.rend() && iter
->version
> bound
)
189 if (iter
== log
.rbegin())
196 void claim_log_and_clear_rollback_info(const pg_log_t
& o
) {
197 // we must have already trimmed the old entries
198 ceph_assert(rollback_info_trimmed_to
== head
);
199 ceph_assert(rollback_info_trimmed_to_riter
== log
.rbegin());
201 *this = IndexedLog(o
);
203 skip_can_rollback_to_to_head();
207 void split_out_child(
213 // we must have already trimmed the old entries
214 ceph_assert(rollback_info_trimmed_to
== head
);
215 ceph_assert(rollback_info_trimmed_to_riter
== log
.rbegin());
219 rollback_info_trimmed_to_riter
= log
.rbegin();
220 reset_recovery_pointers();
223 skip_can_rollback_to_to_head();
226 void reset_recovery_pointers() {
227 complete_to
= log
.end();
231 bool logged_object(const hobject_t
& oid
) const {
232 if (!(indexed_data
& PGLOG_INDEXED_OBJECTS
)) {
235 return objects
.count(oid
);
238 bool logged_req(const osd_reqid_t
&r
) const {
239 if (!(indexed_data
& PGLOG_INDEXED_CALLER_OPS
)) {
242 if (!caller_ops
.count(r
)) {
243 if (!(indexed_data
& PGLOG_INDEXED_EXTRA_CALLER_OPS
)) {
244 index_extra_caller_ops();
246 return extra_caller_ops
.count(r
);
252 const osd_reqid_t
&r
,
254 version_t
*user_version
,
255 int *return_code
) const
257 ceph_assert(version
);
258 ceph_assert(user_version
);
259 ceph_assert(return_code
);
260 ceph::unordered_map
<osd_reqid_t
,pg_log_entry_t
*>::const_iterator p
;
261 if (!(indexed_data
& PGLOG_INDEXED_CALLER_OPS
)) {
264 p
= caller_ops
.find(r
);
265 if (p
!= caller_ops
.end()) {
266 *version
= p
->second
->version
;
267 *user_version
= p
->second
->user_version
;
268 *return_code
= p
->second
->return_code
;
272 // warning: we will return *a* request for this reqid, but not
273 // necessarily the most recent.
274 if (!(indexed_data
& PGLOG_INDEXED_EXTRA_CALLER_OPS
)) {
275 index_extra_caller_ops();
277 p
= extra_caller_ops
.find(r
);
278 if (p
!= extra_caller_ops
.end()) {
280 for (auto i
= p
->second
->extra_reqids
.begin();
281 i
!= p
->second
->extra_reqids
.end();
284 *version
= p
->second
->version
;
285 *user_version
= i
->second
;
286 *return_code
= p
->second
->return_code
;
287 if (*return_code
>= 0) {
288 auto it
= p
->second
->extra_reqid_return_codes
.find(idx
);
289 if (it
!= p
->second
->extra_reqid_return_codes
.end()) {
290 *return_code
= it
->second
;
296 ceph_abort_msg("in extra_caller_ops but not extra_reqids");
299 if (!(indexed_data
& PGLOG_INDEXED_DUPS
)) {
302 auto q
= dup_index
.find(r
);
303 if (q
!= dup_index
.end()) {
304 *version
= q
->second
->version
;
305 *user_version
= q
->second
->user_version
;
306 *return_code
= q
->second
->return_code
;
313 /// get a (bounded) list of recent reqids for the given object
314 void get_object_reqids(const hobject_t
& oid
, unsigned max
,
315 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *pls
,
316 mempool::osd_pglog::map
<uint32_t, int> *return_codes
) const {
317 // make sure object is present at least once before we do an
319 if (!(indexed_data
& PGLOG_INDEXED_OBJECTS
)) {
322 if (objects
.count(oid
) == 0)
325 for (list
<pg_log_entry_t
>::const_reverse_iterator i
= log
.rbegin();
328 if (i
->soid
== oid
) {
329 if (i
->reqid_is_indexed()) {
330 if (i
->op
== pg_log_entry_t::ERROR
) {
331 // propagate op errors to the cache tier's PG log
332 return_codes
->emplace(pls
->size(), i
->return_code
);
334 pls
->push_back(make_pair(i
->reqid
, i
->user_version
));
337 pls
->insert(pls
->end(), i
->extra_reqids
.begin(), i
->extra_reqids
.end());
338 if (pls
->size() >= max
) {
339 if (pls
->size() > max
) {
348 void index(__u16 to_index
= PGLOG_INDEXED_ALL
) const {
349 // if to_index is 0, no need to run any of this code, especially
350 // loop below; this can happen with copy constructor for
351 // IndexedLog (and indirectly through assignment operator)
352 if (!to_index
) return;
354 if (to_index
& PGLOG_INDEXED_OBJECTS
)
356 if (to_index
& PGLOG_INDEXED_CALLER_OPS
)
358 if (to_index
& PGLOG_INDEXED_EXTRA_CALLER_OPS
)
359 extra_caller_ops
.clear();
360 if (to_index
& PGLOG_INDEXED_DUPS
) {
362 for (auto& i
: dups
) {
363 dup_index
[i
.reqid
] = const_cast<pg_log_dup_t
*>(&i
);
367 constexpr __u16 any_log_entry_index
=
368 PGLOG_INDEXED_OBJECTS
|
369 PGLOG_INDEXED_CALLER_OPS
|
370 PGLOG_INDEXED_EXTRA_CALLER_OPS
;
372 if (to_index
& any_log_entry_index
) {
373 for (list
<pg_log_entry_t
>::const_iterator i
= log
.begin();
376 if (to_index
& PGLOG_INDEXED_OBJECTS
) {
377 if (i
->object_is_indexed()) {
378 objects
[i
->soid
] = const_cast<pg_log_entry_t
*>(&(*i
));
382 if (to_index
& PGLOG_INDEXED_CALLER_OPS
) {
383 if (i
->reqid_is_indexed()) {
384 caller_ops
[i
->reqid
] = const_cast<pg_log_entry_t
*>(&(*i
));
388 if (to_index
& PGLOG_INDEXED_EXTRA_CALLER_OPS
) {
389 for (auto j
= i
->extra_reqids
.begin();
390 j
!= i
->extra_reqids
.end();
392 extra_caller_ops
.insert(
393 make_pair(j
->first
, const_cast<pg_log_entry_t
*>(&(*i
))));
399 indexed_data
|= to_index
;
402 void index_objects() const {
403 index(PGLOG_INDEXED_OBJECTS
);
406 void index_caller_ops() const {
407 index(PGLOG_INDEXED_CALLER_OPS
);
410 void index_extra_caller_ops() const {
411 index(PGLOG_INDEXED_EXTRA_CALLER_OPS
);
414 void index_dups() const {
415 index(PGLOG_INDEXED_DUPS
);
418 void index(pg_log_entry_t
& e
) {
419 if ((indexed_data
& PGLOG_INDEXED_OBJECTS
) && e
.object_is_indexed()) {
420 if (objects
.count(e
.soid
) == 0 ||
421 objects
[e
.soid
]->version
< e
.version
)
422 objects
[e
.soid
] = &e
;
424 if (indexed_data
& PGLOG_INDEXED_CALLER_OPS
) {
425 // divergent merge_log indexes new before unindexing old
426 if (e
.reqid_is_indexed()) {
427 caller_ops
[e
.reqid
] = &e
;
430 if (indexed_data
& PGLOG_INDEXED_EXTRA_CALLER_OPS
) {
431 for (auto j
= e
.extra_reqids
.begin();
432 j
!= e
.extra_reqids
.end();
434 extra_caller_ops
.insert(make_pair(j
->first
, &e
));
442 extra_caller_ops
.clear();
447 void unindex(const pg_log_entry_t
& e
) {
448 // NOTE: this only works if we remove from the _tail_ of the log!
449 if (indexed_data
& PGLOG_INDEXED_OBJECTS
) {
450 auto it
= objects
.find(e
.soid
);
451 if (it
!= objects
.end() && it
->second
->version
== e
.version
)
454 if (e
.reqid_is_indexed()) {
455 if (indexed_data
& PGLOG_INDEXED_CALLER_OPS
) {
456 auto it
= caller_ops
.find(e
.reqid
);
457 // divergent merge_log indexes new before unindexing old
458 if (it
!= caller_ops
.end() && it
->second
== &e
)
459 caller_ops
.erase(it
);
462 if (indexed_data
& PGLOG_INDEXED_EXTRA_CALLER_OPS
) {
463 for (auto j
= e
.extra_reqids
.begin();
464 j
!= e
.extra_reqids
.end();
466 for (ceph::unordered_multimap
<osd_reqid_t
,pg_log_entry_t
*>::iterator k
=
467 extra_caller_ops
.find(j
->first
);
468 k
!= extra_caller_ops
.end() && k
->first
== j
->first
;
470 if (k
->second
== &e
) {
471 extra_caller_ops
.erase(k
);
479 void index(pg_log_dup_t
& e
) {
480 if (indexed_data
& PGLOG_INDEXED_DUPS
) {
481 dup_index
[e
.reqid
] = &e
;
485 void unindex(const pg_log_dup_t
& e
) {
486 if (indexed_data
& PGLOG_INDEXED_DUPS
) {
487 auto i
= dup_index
.find(e
.reqid
);
488 if (i
!= dup_index
.end()) {
495 void add(const pg_log_entry_t
& e
, bool applied
= true) {
497 ceph_assert(get_can_rollback_to() == head
);
500 // make sure our buffers don't pin bigger buffers
501 e
.mod_desc
.trim_bl();
506 // riter previously pointed to the previous entry
507 if (rollback_info_trimmed_to_riter
== log
.rbegin())
508 ++rollback_info_trimmed_to_riter
;
510 ceph_assert(e
.version
> head
);
511 ceph_assert(head
.version
== 0 || e
.version
.version
> head
.version
);
515 if ((indexed_data
& PGLOG_INDEXED_OBJECTS
) && e
.object_is_indexed()) {
516 objects
[e
.soid
] = &(log
.back());
518 if (indexed_data
& PGLOG_INDEXED_CALLER_OPS
) {
519 if (e
.reqid_is_indexed()) {
520 caller_ops
[e
.reqid
] = &(log
.back());
524 if (indexed_data
& PGLOG_INDEXED_EXTRA_CALLER_OPS
) {
525 for (auto j
= e
.extra_reqids
.begin();
526 j
!= e
.extra_reqids
.end();
528 extra_caller_ops
.insert(make_pair(j
->first
, &(log
.back())));
533 skip_can_rollback_to_to_head();
540 set
<eversion_t
> *trimmed
,
541 set
<string
>* trimmed_dups
,
542 eversion_t
*write_from_dups
);
544 ostream
& print(ostream
& out
) const;
549 //////////////////// data members ////////////////////
551 pg_missing_tracker_t missing
;
554 eversion_t dirty_to
; ///< must clear/writeout all keys <= dirty_to
555 eversion_t dirty_from
; ///< must clear/writeout all keys >= dirty_from
556 eversion_t writeout_from
; ///< must writout keys >= writeout_from
557 set
<eversion_t
> trimmed
; ///< must clear keys in trimmed
558 eversion_t dirty_to_dups
; ///< must clear/writeout all dups <= dirty_to_dups
559 eversion_t dirty_from_dups
; ///< must clear/writeout all dups >= dirty_from_dups
560 eversion_t write_from_dups
; ///< must write keys >= write_from_dups
561 set
<string
> trimmed_dups
; ///< must clear keys in trimmed_dups
564 /// Log is clean on [dirty_to, dirty_from)
566 bool clear_divergent_priors
;
567 bool rebuilt_missing_with_deletes
= false;
569 void mark_dirty_to(eversion_t to
) {
573 void mark_dirty_from(eversion_t from
) {
574 if (from
< dirty_from
)
577 void mark_writeout_from(eversion_t from
) {
578 if (from
< writeout_from
)
579 writeout_from
= from
;
581 void mark_dirty_to_dups(eversion_t to
) {
582 if (to
> dirty_to_dups
)
585 void mark_dirty_from_dups(eversion_t from
) {
586 if (from
< dirty_from_dups
)
587 dirty_from_dups
= from
;
590 bool is_dirty() const {
591 return !touched_log
||
592 (dirty_to
!= eversion_t()) ||
593 (dirty_from
!= eversion_t::max()) ||
594 (writeout_from
!= eversion_t::max()) ||
595 !(trimmed
.empty()) ||
596 !missing
.is_clean() ||
597 !(trimmed_dups
.empty()) ||
598 (dirty_to_dups
!= eversion_t()) ||
599 (dirty_from_dups
!= eversion_t::max()) ||
600 (write_from_dups
!= eversion_t::max()) ||
601 rebuilt_missing_with_deletes
;
603 void mark_log_for_rewrite() {
604 mark_dirty_to(eversion_t::max());
605 mark_dirty_from(eversion_t());
606 mark_dirty_to_dups(eversion_t::max());
607 mark_dirty_from_dups(eversion_t());
610 bool get_rebuilt_missing_with_deletes() const {
611 return rebuilt_missing_with_deletes
;
616 set
<string
> log_keys_debug
;
617 static void clear_after(set
<string
> *log_keys_debug
, const string
&lb
) {
620 for (set
<string
>::iterator i
= log_keys_debug
->lower_bound(lb
);
621 i
!= log_keys_debug
->end();
622 log_keys_debug
->erase(i
++));
624 static void clear_up_to(set
<string
> *log_keys_debug
, const string
&ub
) {
627 for (set
<string
>::iterator i
= log_keys_debug
->begin();
628 i
!= log_keys_debug
->end() && *i
< ub
;
629 log_keys_debug
->erase(i
++));
634 dirty_to
= eversion_t();
635 dirty_from
= eversion_t::max();
638 trimmed_dups
.clear();
639 writeout_from
= eversion_t::max();
642 dirty_to_dups
= eversion_t();
643 dirty_from_dups
= eversion_t::max();
644 write_from_dups
= eversion_t::max();
648 // cppcheck-suppress noExplicitConstructor
649 PGLog(CephContext
*cct
) :
650 dirty_from(eversion_t::max()),
651 writeout_from(eversion_t::max()),
652 dirty_from_dups(eversion_t::max()),
653 write_from_dups(eversion_t::max()),
655 pg_log_debug(!(cct
&& !(cct
->_conf
->osd_debug_pg_log_writeout
))),
657 clear_divergent_priors(false)
660 void reset_backfill();
664 //////////////////// get or set missing ////////////////////
666 const pg_missing_tracker_t
& get_missing() const { return missing
; }
668 void missing_add(const hobject_t
& oid
, eversion_t need
, eversion_t have
, bool is_delete
=false) {
669 missing
.add(oid
, need
, have
, is_delete
);
672 void missing_add_next_entry(const pg_log_entry_t
& e
) {
673 missing
.add_next_event(e
);
676 //////////////////// get or set log ////////////////////
678 const IndexedLog
&get_log() const { return log
; }
680 const eversion_t
&get_tail() const { return log
.tail
; }
682 void set_tail(eversion_t tail
) { log
.tail
= tail
; }
684 const eversion_t
&get_head() const { return log
.head
; }
686 void set_head(eversion_t head
) { log
.head
= head
; }
688 void set_last_requested(version_t last_requested
) {
689 log
.last_requested
= last_requested
;
692 void index() { log
.index(); }
694 void unindex() { log
.unindex(); }
696 void add(const pg_log_entry_t
& e
, bool applied
= true) {
697 mark_writeout_from(e
.version
);
701 void reset_recovery_pointers() { log
.reset_recovery_pointers(); }
703 static void clear_info_log(
705 ObjectStore::Transaction
*t
);
710 bool transaction_applied
= true,
713 void roll_forward_to(
714 eversion_t roll_forward_to
,
715 LogEntryHandler
*h
) {
721 eversion_t
get_can_rollback_to() const {
722 return log
.get_can_rollback_to();
725 void roll_forward(LogEntryHandler
*h
) {
731 void skip_rollforward() {
732 log
.skip_can_rollback_to_to_head();
735 //////////////////// get or set log & missing ////////////////////
737 void reset_backfill_claim_log(const pg_log_t
&o
, LogEntryHandler
*h
) {
738 log
.trim_rollback_info_to(log
.head
, h
);
739 log
.claim_log_and_clear_rollback_info(o
);
741 mark_dirty_to(eversion_t::max());
742 mark_dirty_to_dups(eversion_t::max());
749 log
.split_out_child(child_pgid
, split_bits
, &opg_log
->log
);
750 missing
.split_into(child_pgid
, split_bits
, &(opg_log
->missing
));
751 opg_log
->mark_dirty_to(eversion_t::max());
752 opg_log
->mark_dirty_to_dups(eversion_t::max());
753 mark_dirty_to(eversion_t::max());
754 mark_dirty_to_dups(eversion_t::max());
755 if (missing
.may_include_deletes
)
756 opg_log
->rebuilt_missing_with_deletes
= true;
760 const vector
<PGLog
*>& sources
,
761 eversion_t last_update
) {
765 vector
<pg_log_t
*> slogs
;
766 for (auto s
: sources
) {
767 slogs
.push_back(&s
->log
);
769 log
.merge_from(slogs
, last_update
);
773 mark_log_for_rewrite();
776 void recover_got(hobject_t oid
, eversion_t v
, pg_info_t
&info
) {
777 if (missing
.is_missing(oid
, v
)) {
779 info
.stats
.stats
.sum
.num_objects_missing
= missing
.num_missing();
781 // raise last_complete?
782 if (missing
.get_items().empty()) {
783 log
.complete_to
= log
.log
.end();
784 info
.last_complete
= info
.last_update
;
786 auto oldest_need
= missing
.get_oldest_need();
787 while (log
.complete_to
!= log
.log
.end()) {
788 if (oldest_need
<= log
.complete_to
->version
)
790 if (info
.last_complete
< log
.complete_to
->version
)
791 info
.last_complete
= log
.complete_to
->version
;
796 ceph_assert(log
.get_can_rollback_to() >= v
);
799 void reset_complete_to(pg_info_t
*info
) {
800 if (log
.log
.empty()) // caller is split_into()
802 log
.complete_to
= log
.log
.begin();
803 ceph_assert(log
.complete_to
!= log
.log
.end());
804 auto oldest_need
= missing
.get_oldest_need();
805 if (oldest_need
!= eversion_t()) {
806 while (log
.complete_to
->version
< oldest_need
) {
808 ceph_assert(log
.complete_to
!= log
.log
.end());
813 if (log
.complete_to
== log
.log
.begin()) {
814 info
->last_complete
= eversion_t();
817 info
->last_complete
= log
.complete_to
->version
;
822 void activate_not_complete(pg_info_t
&info
) {
823 reset_complete_to(&info
);
824 log
.last_requested
= 0;
827 void proc_replica_log(pg_info_t
&oinfo
,
828 const pg_log_t
&olog
,
829 pg_missing_t
& omissing
, pg_shard_t from
) const;
831 void rebuild_missing_set_with_deletes(ObjectStore
*store
,
832 ObjectStore::CollectionHandle
& ch
,
833 const pg_info_t
&info
);
836 static void split_by_object(
837 mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
838 map
<hobject_t
, mempool::osd_pglog::list
<pg_log_entry_t
>> *out_entries
) {
839 while (!entries
.empty()) {
840 auto &out_list
= (*out_entries
)[entries
.front().soid
];
841 out_list
.splice(out_list
.end(), entries
, entries
.begin());
846 * _merge_object_divergent_entries
848 * There are 5 distinct cases:
849 * 1) There is a more recent update: in this case we assume we adjusted the
850 * store and missing during merge_log
851 * 2) The first entry in the divergent sequence is a create. This might
852 * either be because the object is a clone or because prior_version is
853 * eversion_t(). In this case the object does not exist and we must
854 * adjust missing and the store to match.
855 * 3) We are currently missing the object. In this case, we adjust the
856 * missing to our prior_version taking care to add a divergent_prior
858 * 4) We can rollback all of the entries. In this case, we do so using
859 * the rollbacker and return -- the object does not go into missing.
860 * 5) We cannot rollback at least 1 of the entries. In this case, we
861 * clear the object out of the store and add a missing entry at
862 * prior_version taking care to add a divergent_prior if
865 template <typename missing_type
>
866 static void _merge_object_divergent_entries(
867 const IndexedLog
&log
, ///< [in] log to merge against
868 const hobject_t
&hoid
, ///< [in] object we are merging
869 const mempool::osd_pglog::list
<pg_log_entry_t
> &orig_entries
, ///< [in] entries for hoid to merge
870 const pg_info_t
&info
, ///< [in] info for merging entries
871 eversion_t olog_can_rollback_to
, ///< [in] rollback boundary
872 eversion_t original_can_rollback_to
, ///< [in] original rollback boundary
873 missing_type
&missing
, ///< [in,out] missing to adjust, use
874 LogEntryHandler
*rollbacker
, ///< [in] optional rollbacker object
875 const DoutPrefixProvider
*dpp
///< [in] logging provider
877 ldpp_dout(dpp
, 20) << __func__
<< ": merging hoid " << hoid
878 << " entries: " << orig_entries
<< dendl
;
880 if (hoid
> info
.last_backfill
) {
881 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
<< " after last_backfill"
886 // entries is non-empty
887 ceph_assert(!orig_entries
.empty());
888 // strip out and ignore ERROR entries
889 mempool::osd_pglog::list
<pg_log_entry_t
> entries
;
891 bool seen_non_error
= false;
892 for (list
<pg_log_entry_t
>::const_iterator i
= orig_entries
.begin();
893 i
!= orig_entries
.end();
895 // all entries are on hoid
896 ceph_assert(i
->soid
== hoid
);
897 // did not see error entries before this entry and this entry is not error
898 // then this entry is the first non error entry
899 bool first_non_error
= ! seen_non_error
&& ! i
->is_error();
900 if (! i
->is_error() ) {
901 // see a non error entry now
902 seen_non_error
= true;
905 // No need to check the first entry since it prior_version is unavailable
907 // No need to check if the prior_version is the minimal version
908 // No need to check the first non-error entry since the leading error
909 // entries are not its prior version
910 if (i
!= orig_entries
.begin() && i
->prior_version
!= eversion_t() &&
912 // in increasing order of version
913 ceph_assert(i
->version
> last
);
914 // prior_version correct (unless it is an ERROR entry)
915 ceph_assert(i
->prior_version
== last
|| i
->is_error());
918 ldpp_dout(dpp
, 20) << __func__
<< ": ignoring " << *i
<< dendl
;
920 ldpp_dout(dpp
, 20) << __func__
<< ": keeping " << *i
<< dendl
;
921 entries
.push_back(*i
);
925 if (entries
.empty()) {
926 ldpp_dout(dpp
, 10) << __func__
<< ": no non-ERROR entries" << dendl
;
930 const eversion_t prior_version
= entries
.begin()->prior_version
;
931 const eversion_t first_divergent_update
= entries
.begin()->version
;
932 const eversion_t last_divergent_update
= entries
.rbegin()->version
;
933 const bool object_not_in_store
=
934 !missing
.is_missing(hoid
) &&
935 entries
.rbegin()->is_delete();
936 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << " object_not_in_store: "
937 << object_not_in_store
<< dendl
;
938 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
939 << " prior_version: " << prior_version
940 << " first_divergent_update: " << first_divergent_update
941 << " last_divergent_update: " << last_divergent_update
944 ceph::unordered_map
<hobject_t
, pg_log_entry_t
*>::const_iterator objiter
=
945 log
.objects
.find(hoid
);
946 if (objiter
!= log
.objects
.end() &&
947 objiter
->second
->version
>= first_divergent_update
) {
949 ldpp_dout(dpp
, 10) << __func__
<< ": more recent entry found: "
950 << *objiter
->second
<< ", already merged" << dendl
;
952 ceph_assert(objiter
->second
->version
> last_divergent_update
);
954 // ensure missing has been updated appropriately
955 if (objiter
->second
->is_update() ||
956 (missing
.may_include_deletes
&& objiter
->second
->is_delete())) {
957 ceph_assert(missing
.is_missing(hoid
) &&
958 missing
.get_items().at(hoid
).need
== objiter
->second
->version
);
960 ceph_assert(!missing
.is_missing(hoid
));
962 missing
.revise_have(hoid
, eversion_t());
964 if (!object_not_in_store
) {
965 rollbacker
->remove(hoid
);
967 for (auto &&i
: entries
) {
974 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
975 <<" has no more recent entries in log" << dendl
;
976 if (prior_version
== eversion_t() || entries
.front().is_clone()) {
978 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
979 << " prior_version or op type indicates creation,"
982 if (missing
.is_missing(hoid
))
983 missing
.rm(missing
.get_items().find(hoid
));
985 if (!object_not_in_store
) {
986 rollbacker
->remove(hoid
);
988 for (auto &&i
: entries
) {
995 if (missing
.is_missing(hoid
)) {
997 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
998 << " missing, " << missing
.get_items().at(hoid
)
999 << " adjusting" << dendl
;
1001 if (missing
.get_items().at(hoid
).have
== prior_version
) {
1002 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1003 << " missing.have is prior_version " << prior_version
1004 << " removing from missing" << dendl
;
1005 missing
.rm(missing
.get_items().find(hoid
));
1007 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1008 << " missing.have is " << missing
.get_items().at(hoid
).have
1009 << ", adjusting" << dendl
;
1010 missing
.revise_need(hoid
, prior_version
, false);
1011 if (prior_version
<= info
.log_tail
) {
1012 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1013 << " prior_version " << prior_version
1014 << " <= info.log_tail "
1015 << info
.log_tail
<< dendl
;
1019 for (auto &&i
: entries
) {
1020 rollbacker
->trim(i
);
1026 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1027 << " must be rolled back or recovered,"
1028 << " attempting to rollback"
1030 bool can_rollback
= true;
1031 // We are going to make an important decision based on the
1032 // olog_can_rollback_to value we have received, better known it.
1033 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1034 << " olog_can_rollback_to: "
1035 << olog_can_rollback_to
<< dendl
;
1036 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1037 << " original_crt: "
1038 << original_can_rollback_to
<< dendl
;
1039 /// Distinguish between 4) and 5)
1040 for (list
<pg_log_entry_t
>::const_reverse_iterator i
= entries
.rbegin();
1041 i
!= entries
.rend();
1043 /// Use original_can_rollback_to instead of olog_can_rollback_to to check
1044 // if we can rollback or not. This is to ensure that we don't try to rollback
1045 // to an object that has been deleted and doesn't exist.
1046 if (!i
->can_rollback() || i
->version
<= original_can_rollback_to
) {
1047 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
<< " cannot rollback "
1049 can_rollback
= false;
1056 for (list
<pg_log_entry_t
>::const_reverse_iterator i
= entries
.rbegin();
1057 i
!= entries
.rend();
1059 ceph_assert(i
->can_rollback() && i
->version
> original_can_rollback_to
);
1060 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1061 << " rolling back " << *i
<< dendl
;
1063 rollbacker
->rollback(*i
);
1065 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1066 << " rolled back" << dendl
;
1070 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
<< " cannot roll back, "
1071 << "removing and adding to missing" << dendl
;
1073 if (!object_not_in_store
)
1074 rollbacker
->remove(hoid
);
1075 for (auto &&i
: entries
) {
1076 rollbacker
->trim(i
);
1079 missing
.add(hoid
, prior_version
, eversion_t(), false);
1080 if (prior_version
<= info
.log_tail
) {
1081 ldpp_dout(dpp
, 10) << __func__
<< ": hoid " << hoid
1082 << " prior_version " << prior_version
1083 << " <= info.log_tail "
1084 << info
.log_tail
<< dendl
;
1089 /// Merge all entries using above
1090 template <typename missing_type
>
1091 static void _merge_divergent_entries(
1092 const IndexedLog
&log
, ///< [in] log to merge against
1093 mempool::osd_pglog::list
<pg_log_entry_t
> &entries
, ///< [in] entries to merge
1094 const pg_info_t
&oinfo
, ///< [in] info for merging entries
1095 eversion_t olog_can_rollback_to
, ///< [in] rollback boundary
1096 eversion_t original_can_rollback_to
, ///< [in] original rollback boundary
1097 missing_type
&omissing
, ///< [in,out] missing to adjust, use
1098 LogEntryHandler
*rollbacker
, ///< [in] optional rollbacker object
1099 const DoutPrefixProvider
*dpp
///< [in] logging provider
1101 map
<hobject_t
, mempool::osd_pglog::list
<pg_log_entry_t
> > split
;
1102 split_by_object(entries
, &split
);
1103 for (map
<hobject_t
, mempool::osd_pglog::list
<pg_log_entry_t
>>::iterator i
= split
.begin();
1106 _merge_object_divergent_entries(
1111 olog_can_rollback_to
,
1112 original_can_rollback_to
,
1120 * Exists for use in TestPGLog for simply testing single divergent log
1123 void merge_old_entry(
1124 ObjectStore::Transaction
& t
,
1125 const pg_log_entry_t
& oe
,
1126 const pg_info_t
& info
,
1127 LogEntryHandler
*rollbacker
) {
1128 mempool::osd_pglog::list
<pg_log_entry_t
> entries
;
1129 entries
.push_back(oe
);
1130 _merge_object_divergent_entries(
1135 log
.get_can_rollback_to(),
1136 log
.get_can_rollback_to(),
1142 bool merge_log_dups(const pg_log_t
& olog
);
1146 void rewind_divergent_log(eversion_t newhead
,
1148 LogEntryHandler
*rollbacker
,
1150 bool &dirty_big_info
);
1152 void merge_log(pg_info_t
&oinfo
,
1155 pg_info_t
&info
, LogEntryHandler
*rollbacker
,
1156 bool &dirty_info
, bool &dirty_big_info
);
1158 template <typename missing_type
>
1159 static bool append_log_entries_update_missing(
1160 const hobject_t
&last_backfill
,
1161 bool last_backfill_bitwise
,
1162 const mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
1163 bool maintain_rollback
,
1165 missing_type
&missing
,
1166 LogEntryHandler
*rollbacker
,
1167 const DoutPrefixProvider
*dpp
) {
1168 bool invalidate_stats
= false;
1169 if (log
&& !entries
.empty()) {
1170 ceph_assert(log
->head
< entries
.begin()->version
);
1172 for (list
<pg_log_entry_t
>::const_iterator p
= entries
.begin();
1175 invalidate_stats
= invalidate_stats
|| !p
->is_error();
1177 ldpp_dout(dpp
, 20) << "update missing, append " << *p
<< dendl
;
1180 if (p
->soid
<= last_backfill
&&
1182 if (missing
.may_include_deletes
) {
1183 missing
.add_next_event(*p
);
1185 if (p
->is_delete()) {
1186 missing
.rm(p
->soid
, p
->version
);
1188 missing
.add_next_event(*p
);
1191 // hack to match PG::mark_all_unfound_lost
1192 if (maintain_rollback
&& p
->is_lost_delete() && p
->can_rollback()) {
1193 rollbacker
->try_stash(p
->soid
, p
->version
.version
);
1194 } else if (p
->is_delete()) {
1195 rollbacker
->remove(p
->soid
);
1201 return invalidate_stats
;
1203 bool append_new_log_entries(
1204 const hobject_t
&last_backfill
,
1205 bool last_backfill_bitwise
,
1206 const mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
1207 LogEntryHandler
*rollbacker
) {
1208 bool invalidate_stats
= append_log_entries_update_missing(
1210 last_backfill_bitwise
,
1217 if (!entries
.empty()) {
1218 mark_writeout_from(entries
.begin()->version
);
1219 if (entries
.begin()->is_lost_delete()) {
1220 // hack: since lost deletes queue recovery directly, and don't
1221 // go through activate_not_complete() again, our complete_to
1222 // iterator may still point at log.end(). Reset it to point
1223 // before these new lost_delete entries. This only occurs
1224 // when lost+delete entries are initially added, which is
1225 // always in a list of solely lost_delete entries, so it is
1226 // sufficient to check whether the first entry is a
1228 reset_complete_to(nullptr);
1231 return invalidate_stats
;
1234 void write_log_and_missing(
1235 ObjectStore::Transaction
& t
,
1236 map
<string
,bufferlist
> *km
,
1238 const ghobject_t
&log_oid
,
1239 bool require_rollback
);
1241 static void write_log_and_missing_wo_missing(
1242 ObjectStore::Transaction
& t
,
1243 map
<string
,bufferlist
>* km
,
1246 const ghobject_t
&log_oid
, map
<eversion_t
, hobject_t
> &divergent_priors
,
1247 bool require_rollback
);
1249 static void write_log_and_missing(
1250 ObjectStore::Transaction
& t
,
1251 map
<string
,bufferlist
>* km
,
1254 const ghobject_t
&log_oid
,
1255 const pg_missing_tracker_t
&missing
,
1256 bool require_rollback
,
1257 bool *rebuilt_missing_set_with_deletes
);
1259 static void _write_log_and_missing_wo_missing(
1260 ObjectStore::Transaction
& t
,
1261 map
<string
,bufferlist
>* km
,
1263 const coll_t
& coll
, const ghobject_t
&log_oid
,
1264 map
<eversion_t
, hobject_t
> &divergent_priors
,
1265 eversion_t dirty_to
,
1266 eversion_t dirty_from
,
1267 eversion_t writeout_from
,
1268 bool dirty_divergent_priors
,
1270 bool require_rollback
,
1271 eversion_t dirty_to_dups
,
1272 eversion_t dirty_from_dups
,
1273 eversion_t write_from_dups
,
1274 set
<string
> *log_keys_debug
1277 static void _write_log_and_missing(
1278 ObjectStore::Transaction
& t
,
1279 map
<string
,bufferlist
>* km
,
1281 const coll_t
& coll
, const ghobject_t
&log_oid
,
1282 eversion_t dirty_to
,
1283 eversion_t dirty_from
,
1284 eversion_t writeout_from
,
1285 set
<eversion_t
> &&trimmed
,
1286 set
<string
> &&trimmed_dups
,
1287 const pg_missing_tracker_t
&missing
,
1289 bool require_rollback
,
1290 bool clear_divergent_priors
,
1291 eversion_t dirty_to_dups
,
1292 eversion_t dirty_from_dups
,
1293 eversion_t write_from_dups
,
1294 bool *rebuilt_missing_with_deletes
,
1295 set
<string
> *log_keys_debug
1298 void read_log_and_missing(
1300 ObjectStore::CollectionHandle
& ch
,
1301 ghobject_t pgmeta_oid
,
1302 const pg_info_t
&info
,
1304 bool tolerate_divergent_missing_log
,
1305 bool debug_verify_stored_missing
= false
1307 return read_log_and_missing(
1308 store
, ch
, pgmeta_oid
, info
,
1310 tolerate_divergent_missing_log
,
1311 &clear_divergent_priors
,
1313 (pg_log_debug
? &log_keys_debug
: nullptr),
1314 debug_verify_stored_missing
);
1317 template <typename missing_type
>
1318 static void read_log_and_missing(
1320 ObjectStore::CollectionHandle
&ch
,
1321 ghobject_t pgmeta_oid
,
1322 const pg_info_t
&info
,
1324 missing_type
&missing
,
1326 bool tolerate_divergent_missing_log
,
1327 bool *clear_divergent_priors
= nullptr,
1328 const DoutPrefixProvider
*dpp
= nullptr,
1329 set
<string
> *log_keys_debug
= nullptr,
1330 bool debug_verify_stored_missing
= false
1332 ldpp_dout(dpp
, 20) << "read_log_and_missing coll " << ch
->cid
1333 << " " << pgmeta_oid
<< dendl
;
1337 int r
= store
->stat(ch
, pgmeta_oid
, &st
);
1338 ceph_assert(r
== 0);
1339 ceph_assert(st
.st_size
== 0);
1341 // will get overridden below if it had been recorded
1342 eversion_t on_disk_can_rollback_to
= info
.last_update
;
1343 eversion_t on_disk_rollback_info_trimmed_to
= eversion_t();
1344 ObjectMap::ObjectMapIterator p
= store
->get_omap_iterator(ch
,
1346 map
<eversion_t
, hobject_t
> divergent_priors
;
1347 bool must_rebuild
= false;
1348 missing
.may_include_deletes
= false;
1349 list
<pg_log_entry_t
> entries
;
1350 list
<pg_log_dup_t
> dups
;
1352 for (p
->seek_to_first(); p
->valid() ; p
->next()) {
1353 // non-log pgmeta_oid keys are prefixed with _; skip those
1354 if (p
->key()[0] == '_')
1356 bufferlist bl
= p
->value();//Copy bufferlist before creating iterator
1357 auto bp
= bl
.cbegin();
1358 if (p
->key() == "divergent_priors") {
1359 decode(divergent_priors
, bp
);
1360 ldpp_dout(dpp
, 20) << "read_log_and_missing " << divergent_priors
.size()
1361 << " divergent_priors" << dendl
;
1362 must_rebuild
= true;
1363 debug_verify_stored_missing
= false;
1364 } else if (p
->key() == "can_rollback_to") {
1365 decode(on_disk_can_rollback_to
, bp
);
1366 } else if (p
->key() == "rollback_info_trimmed_to") {
1367 decode(on_disk_rollback_info_trimmed_to
, bp
);
1368 } else if (p
->key() == "may_include_deletes_in_missing") {
1369 missing
.may_include_deletes
= true;
1370 } else if (p
->key().substr(0, 7) == string("missing")) {
1372 pg_missing_item item
;
1375 if (item
.is_delete()) {
1376 ceph_assert(missing
.may_include_deletes
);
1378 missing
.add(oid
, item
.need
, item
.have
, item
.is_delete());
1379 } else if (p
->key().substr(0, 4) == string("dup_")) {
1382 if (!dups
.empty()) {
1383 ceph_assert(dups
.back().version
< dup
.version
);
1385 dups
.push_back(dup
);
1388 e
.decode_with_checksum(bp
);
1389 ldpp_dout(dpp
, 20) << "read_log_and_missing " << e
<< dendl
;
1390 if (!entries
.empty()) {
1391 pg_log_entry_t
last_e(entries
.back());
1392 ceph_assert(last_e
.version
.version
< e
.version
.version
);
1393 ceph_assert(last_e
.version
.epoch
<= e
.version
.epoch
);
1395 entries
.push_back(e
);
1397 log_keys_debug
->insert(e
.get_key_name());
1404 on_disk_can_rollback_to
,
1405 on_disk_rollback_info_trimmed_to
,
1409 if (must_rebuild
|| debug_verify_stored_missing
) {
1411 if (debug_verify_stored_missing
|| info
.last_complete
< info
.last_update
) {
1413 << "read_log_and_missing checking for missing items over interval ("
1414 << info
.last_complete
1415 << "," << info
.last_update
<< "]" << dendl
;
1418 set
<hobject_t
> checked
;
1419 set
<hobject_t
> skipped
;
1420 for (list
<pg_log_entry_t
>::reverse_iterator i
= log
.log
.rbegin();
1421 i
!= log
.log
.rend();
1423 if (!debug_verify_stored_missing
&& i
->version
<= info
.last_complete
) break;
1424 if (i
->soid
> info
.last_backfill
)
1428 if (did
.count(i
->soid
)) continue;
1429 did
.insert(i
->soid
);
1431 if (!missing
.may_include_deletes
&& i
->is_delete())
1435 int r
= store
->getattr(
1437 ghobject_t(i
->soid
, ghobject_t::NO_GEN
, info
.pgid
.shard
),
1441 object_info_t
oi(bv
);
1442 if (oi
.version
< i
->version
) {
1443 ldpp_dout(dpp
, 15) << "read_log_and_missing missing " << *i
1444 << " (have " << oi
.version
<< ")" << dendl
;
1445 if (debug_verify_stored_missing
) {
1446 auto miter
= missing
.get_items().find(i
->soid
);
1447 ceph_assert(miter
!= missing
.get_items().end());
1448 ceph_assert(miter
->second
.need
== i
->version
);
1449 // the 'have' version is reset if an object is deleted,
1450 // then created again
1451 ceph_assert(miter
->second
.have
== oi
.version
|| miter
->second
.have
== eversion_t());
1452 checked
.insert(i
->soid
);
1454 missing
.add(i
->soid
, i
->version
, oi
.version
, i
->is_delete());
1458 ldpp_dout(dpp
, 15) << "read_log_and_missing missing " << *i
<< dendl
;
1459 if (debug_verify_stored_missing
) {
1460 auto miter
= missing
.get_items().find(i
->soid
);
1461 if (i
->is_delete()) {
1462 ceph_assert(miter
== missing
.get_items().end() ||
1463 (miter
->second
.need
== i
->version
&&
1464 miter
->second
.have
== eversion_t()));
1466 ceph_assert(miter
!= missing
.get_items().end());
1467 ceph_assert(miter
->second
.need
== i
->version
);
1468 ceph_assert(miter
->second
.have
== eversion_t());
1470 checked
.insert(i
->soid
);
1472 missing
.add(i
->soid
, i
->version
, eversion_t(), i
->is_delete());
1476 if (debug_verify_stored_missing
) {
1477 for (auto &&i
: missing
.get_items()) {
1478 if (checked
.count(i
.first
))
1480 if (i
.first
> info
.last_backfill
) {
1481 ldpp_dout(dpp
, -1) << __func__
<< ": invalid missing set entry "
1482 << "found before last_backfill: "
1483 << i
.first
<< " " << i
.second
1484 << " last_backfill = " << info
.last_backfill
1486 ceph_abort_msg("invalid missing set entry found");
1489 int r
= store
->getattr(
1491 ghobject_t(i
.first
, ghobject_t::NO_GEN
, info
.pgid
.shard
),
1495 object_info_t
oi(bv
);
1496 ceph_assert(oi
.version
== i
.second
.have
|| eversion_t() == i
.second
.have
);
1498 ceph_assert(i
.second
.is_delete() || eversion_t() == i
.second
.have
);
1502 ceph_assert(must_rebuild
);
1503 for (map
<eversion_t
, hobject_t
>::reverse_iterator i
=
1504 divergent_priors
.rbegin();
1505 i
!= divergent_priors
.rend();
1507 if (i
->first
<= info
.last_complete
) break;
1508 if (i
->second
> info
.last_backfill
)
1510 if (did
.count(i
->second
)) continue;
1511 did
.insert(i
->second
);
1513 int r
= store
->getattr(
1515 ghobject_t(i
->second
, ghobject_t::NO_GEN
, info
.pgid
.shard
),
1519 object_info_t
oi(bv
);
1521 * 1) we see this entry in the divergent priors mapping
1522 * 2) we didn't see an entry for this object in the log
1524 * From 1 & 2 we know that either the object does not exist
1525 * or it is at the version specified in the divergent_priors
1526 * map since the object would have been deleted atomically
1527 * with the addition of the divergent_priors entry, an older
1528 * version would not have been recovered, and a newer version
1529 * would show up in the log above.
1532 * Unfortunately the assessment above is incorrect because of
1533 * http://tracker.ceph.com/issues/17916 (we were incorrectly
1534 * not removing the divergent_priors set from disk state!),
1535 * so let's check that.
1537 if (oi
.version
> i
->first
&& tolerate_divergent_missing_log
) {
1538 ldpp_dout(dpp
, 0) << "read_log divergent_priors entry (" << *i
1539 << ") inconsistent with disk state (" << oi
1540 << "), assuming it is tracker.ceph.com/issues/17916"
1543 ceph_assert(oi
.version
== i
->first
);
1546 ldpp_dout(dpp
, 15) << "read_log_and_missing missing " << *i
<< dendl
;
1547 missing
.add(i
->second
, i
->first
, eversion_t(), false);
1551 if (clear_divergent_priors
)
1552 (*clear_divergent_priors
) = true;
1556 if (!must_rebuild
) {
1557 if (clear_divergent_priors
)
1558 (*clear_divergent_priors
) = false;
1561 ldpp_dout(dpp
, 10) << "read_log_and_missing done" << dendl
;
1562 } // static read_log_and_missing