1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
9 * Author: Loic Dachary <loic@dachary.org>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
19 #include "include/unordered_map.h"
20 #include "common/ceph_context.h"
28 using ceph::bufferlist
;
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_osd
35 #define dout_prefix _prefix(_dout, this)
37 static ostream
& _prefix(std::ostream
*_dout
, const PGLog
*pglog
)
39 return pglog
->gen_prefix(*_dout
);
42 //////////////////// PGLog::IndexedLog ////////////////////
44 void PGLog::IndexedLog::split_out_child(
47 PGLog::IndexedLog
*target
)
50 *target
= IndexedLog(pg_log_t::split_out_child(child_pgid
, split_bits
));
53 reset_rollback_info_trimmed_to_riter();
56 void PGLog::IndexedLog::trim(
59 set
<eversion_t
> *trimmed
,
60 set
<string
>* trimmed_dups
,
61 eversion_t
*write_from_dups
)
63 lgeneric_subdout(cct
, osd
, 10) << "IndexedLog::trim s=" << s
<< dendl
;
64 ceph_assert(s
<= can_rollback_to
);
65 if (complete_to
!= log
.end())
66 lgeneric_subdout(cct
, osd
, 20) << " complete_to " << complete_to
->version
<< dendl
;
68 auto earliest_dup_version
=
69 log
.rbegin()->version
.version
< cct
->_conf
->osd_pg_log_dups_tracked
71 : log
.rbegin()->version
.version
- cct
->_conf
->osd_pg_log_dups_tracked
+ 1;
73 lgeneric_subdout(cct
, osd
, 20) << "earliest_dup_version = " << earliest_dup_version
<< dendl
;
74 while (!log
.empty()) {
75 const pg_log_entry_t
&e
= *log
.begin();
78 lgeneric_subdout(cct
, osd
, 20) << "trim " << e
<< dendl
;
80 trimmed
->emplace(e
.version
);
82 unindex(e
); // remove from index,
85 if (e
.version
.version
>= earliest_dup_version
) {
86 if (write_from_dups
!= nullptr && *write_from_dups
> e
.version
) {
87 lgeneric_subdout(cct
, osd
, 20) << "updating write_from_dups from " << *write_from_dups
<< " to " << e
.version
<< dendl
;
88 *write_from_dups
= e
.version
;
90 dups
.push_back(pg_log_dup_t(e
));
93 for (const auto& extra
: e
.extra_reqids
) {
94 int return_code
= e
.return_code
;
95 if (return_code
>= 0) {
96 auto it
= e
.extra_reqid_return_codes
.find(idx
);
97 if (it
!= e
.extra_reqid_return_codes
.end()) {
98 return_code
= it
->second
;
99 // FIXME: we aren't setting op_returns for these extra_reqids
104 // note: extras have the same version as outer op
105 dups
.push_back(pg_log_dup_t(e
.version
, extra
.second
,
106 extra
.first
, return_code
));
111 bool reset_complete_to
= false;
112 // we are trimming past complete_to, so reset complete_to
113 if (complete_to
!= log
.end() && e
.version
>= complete_to
->version
)
114 reset_complete_to
= true;
115 if (rollback_info_trimmed_to_riter
== log
.rend() ||
116 e
.version
== rollback_info_trimmed_to_riter
->version
) {
118 rollback_info_trimmed_to_riter
= log
.rend();
123 // reset complete_to to the beginning of the log
124 if (reset_complete_to
) {
125 complete_to
= log
.begin();
126 if (complete_to
!= log
.end()) {
127 lgeneric_subdout(cct
, osd
, 20) << " moving complete_to to "
128 << log
.begin()->version
<< dendl
;
130 lgeneric_subdout(cct
, osd
, 20) << " log is now empty" << dendl
;
135 // we can hit an inflated `dups` b/c of https://tracker.ceph.com/issues/53729
136 // the idea is to slowly trim them over a prolonged period of time and mix
137 // omap deletes with writes (if we're here, a new log entry got added) to
138 // neither: 1) blow size of single Transaction nor 2) generate-n-accumulate
139 // large amount of tombstones in BlueStore's RocksDB.
140 // if trimming immediately is a must, then the ceph-objectstore-tool is
142 const size_t max_dups
= cct
->_conf
->osd_pg_log_dups_tracked
;
143 for (size_t max_dups_to_trim
= cct
->_conf
->osd_pg_log_trim_max
;
144 max_dups_to_trim
> 0 && dups
.size() > max_dups
;
145 max_dups_to_trim
--) {
146 const auto& e
= *dups
.begin();
147 lgeneric_subdout(cct
, osd
, 20) << "trim dup " << e
<< dendl
;
149 trimmed_dups
->insert(e
.get_key_name());
157 lgeneric_subdout(cct
, osd
, 20) << "IndexedLog::trim after trim"
158 << " dups.size()=" << dups
.size()
160 << " s=" << s
<< dendl
;
163 ostream
& PGLog::IndexedLog::print(ostream
& out
) const
165 out
<< *this << std::endl
;
166 for (auto p
= log
.begin(); p
!= log
.end(); ++p
) {
168 (logged_object(p
->soid
) ? "indexed" : "NOT INDEXED") <<
170 ceph_assert(!p
->reqid_is_indexed() || logged_req(p
->reqid
));
173 for (auto p
= dups
.begin(); p
!= dups
.end(); ++p
) {
174 out
<< *p
<< std::endl
;
180 //////////////////// PGLog ////////////////////
182 void PGLog::reset_backfill()
187 void PGLog::clear() {
190 log_keys_debug
.clear();
194 void PGLog::clear_info_log(
196 ObjectStore::Transaction
*t
) {
198 t
->remove(coll
, pgid
.make_pgmeta_oid());
204 bool transaction_applied
,
207 dout(10) << __func__
<< " proposed trim_to = " << trim_to
<< dendl
;
209 if (trim_to
> log
.tail
) {
210 dout(10) << __func__
<< " missing = " << missing
.num_missing() << dendl
;
211 // Don't assert for async_recovery_targets or backfill_targets
212 // or whenever there are missing items
213 if (transaction_applied
&& !async
&& (missing
.num_missing() == 0))
214 ceph_assert(trim_to
<= info
.last_complete
);
216 dout(10) << "trim " << log
<< " to " << trim_to
<< dendl
;
217 log
.trim(cct
, trim_to
, &trimmed
, &trimmed_dups
, &write_from_dups
);
218 info
.log_tail
= log
.tail
;
219 if (log
.complete_to
!= log
.log
.end())
220 dout(10) << " after trim complete_to " << log
.complete_to
->version
<< dendl
;
224 void PGLog::proc_replica_log(
226 const pg_log_t
&olog
,
227 pg_missing_t
& omissing
,
228 pg_shard_t from
) const
230 dout(10) << "proc_replica_log for osd." << from
<< ": "
231 << oinfo
<< " " << olog
<< " " << omissing
<< dendl
;
233 if (olog
.head
< log
.tail
) {
234 dout(10) << __func__
<< ": osd." << from
<< " does not overlap, not looking "
235 << "for divergent objects" << dendl
;
238 if (olog
.head
== log
.head
) {
239 dout(10) << __func__
<< ": osd." << from
<< " same log head, not looking "
240 << "for divergent objects" << dendl
;
245 basically what we're doing here is rewinding the remote log,
246 dropping divergent entries, until we find something that matches
247 our master log. we then reset last_update to reflect the new
248 point up to which missing is accurate.
250 later, in activate(), missing will get wound forward again and
251 we will send the peer enough log to arrive at the same state.
254 for (auto i
= omissing
.get_items().begin();
255 i
!= omissing
.get_items().end();
257 dout(20) << " before missing " << i
->first
<< " need " << i
->second
.need
258 << " have " << i
->second
.have
<< dendl
;
261 auto first_non_divergent
= log
.log
.rbegin();
263 if (first_non_divergent
== log
.log
.rend())
265 if (first_non_divergent
->version
<= olog
.head
) {
266 dout(20) << "merge_log point (usually last shared) is "
267 << *first_non_divergent
<< dendl
;
270 ++first_non_divergent
;
273 /* Because olog.head >= log.tail, we know that both pgs must at least have
274 * the event represented by log.tail. Similarly, because log.head >= olog.tail,
275 * we know that the event represented by olog.tail must be common to both logs.
276 * Furthermore, the event represented by a log tail was necessarily trimmed,
277 * thus neither olog.tail nor log.tail can be divergent. It's
278 * possible that olog/log contain no actual events between olog.head and
279 * max(log.tail, olog.tail), however, since they might have been split out.
280 * Thus, if we cannot find an event e such that
281 * log.tail <= e.version <= log.head, the last_update must actually be
282 * max(log.tail, olog.tail).
284 eversion_t limit
= std::max(olog
.tail
, log
.tail
);
286 (first_non_divergent
== log
.log
.rend() ||
287 first_non_divergent
->version
< limit
) ?
289 first_non_divergent
->version
;
291 // we merge and adjust the replica's log, rollback the rollbackable divergent entry,
292 // remove the unrollbackable divergent entry and mark the according object as missing.
293 // the rollback boundary must choose crt of the olog which going to be merged.
294 // The replica log's(olog) crt will not be modified, so it could get passed
295 // to _merge_divergent_entries() directly.
296 IndexedLog
folog(olog
);
297 auto divergent
= folog
.rewind_from_head(lu
);
298 _merge_divergent_entries(
302 olog
.get_can_rollback_to(),
307 if (lu
< oinfo
.last_update
) {
308 dout(10) << " peer osd." << from
<< " last_update now " << lu
<< dendl
;
309 oinfo
.last_update
= lu
;
312 if (omissing
.have_missing()) {
313 eversion_t first_missing
=
314 omissing
.get_items().at(omissing
.get_rmissing().begin()->second
).need
;
315 oinfo
.last_complete
= eversion_t();
316 for (auto i
= olog
.log
.begin(); i
!= olog
.log
.end(); ++i
) {
317 if (i
->version
< first_missing
)
318 oinfo
.last_complete
= i
->version
;
323 oinfo
.last_complete
= oinfo
.last_update
;
325 } // proc_replica_log
328 * rewind divergent entries at the head of the log
330 * This rewinds entries off the head of our log that are divergent.
331 * This is used by replicas during activation.
333 * @param newhead new head to rewind to
335 void PGLog::rewind_divergent_log(eversion_t newhead
,
336 pg_info_t
&info
, LogEntryHandler
*rollbacker
,
337 bool &dirty_info
, bool &dirty_big_info
)
339 dout(10) << "rewind_divergent_log truncate divergent future " <<
342 // We need to preserve the original crt before it gets updated in rewind_from_head().
343 // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback
344 // a divergent entry or not.
345 eversion_t original_crt
= log
.get_can_rollback_to();
346 dout(20) << __func__
<< " original_crt = " << original_crt
<< dendl
;
347 if (info
.last_complete
> newhead
)
348 info
.last_complete
= newhead
;
350 auto divergent
= log
.rewind_from_head(newhead
);
351 if (!divergent
.empty()) {
352 mark_dirty_from(divergent
.front().version
);
354 for (auto &&entry
: divergent
) {
355 dout(10) << "rewind_divergent_log future divergent " << entry
<< dendl
;
357 info
.last_update
= newhead
;
359 _merge_divergent_entries(
369 dirty_big_info
= true;
372 void PGLog::merge_log(pg_info_t
&oinfo
, pg_log_t
&& olog
, pg_shard_t fromosd
,
373 pg_info_t
&info
, LogEntryHandler
*rollbacker
,
374 bool &dirty_info
, bool &dirty_big_info
)
376 dout(10) << "merge_log " << olog
<< " from osd." << fromosd
377 << " into " << log
<< dendl
;
379 // Check preconditions
381 // If our log is empty, the incoming log needs to have not been trimmed.
382 ceph_assert(!log
.null() || olog
.tail
== eversion_t());
383 // The logs must overlap.
384 ceph_assert(log
.head
>= olog
.tail
&& olog
.head
>= log
.tail
);
386 for (auto i
= missing
.get_items().begin();
387 i
!= missing
.get_items().end();
389 dout(20) << "pg_missing_t sobject: " << i
->first
<< dendl
;
392 bool changed
= false;
395 // this is just filling in history. it does not affect our
396 // missing set, as that should already be consistent with our
398 eversion_t orig_tail
= log
.tail
;
399 if (olog
.tail
< log
.tail
) {
400 dout(10) << "merge_log extending tail to " << olog
.tail
<< dendl
;
401 auto from
= olog
.log
.begin();
404 for (; to
!= olog
.log
.end(); ++to
) {
405 if (to
->version
> log
.tail
)
408 dout(15) << *to
<< dendl
;
413 // splice into our log.
414 log
.log
.splice(log
.log
.begin(),
415 std::move(olog
.log
), from
, to
);
417 info
.log_tail
= log
.tail
= olog
.tail
;
421 if (oinfo
.stats
.reported_seq
< info
.stats
.reported_seq
|| // make sure reported always increases
422 oinfo
.stats
.reported_epoch
< info
.stats
.reported_epoch
) {
423 oinfo
.stats
.reported_seq
= info
.stats
.reported_seq
;
424 oinfo
.stats
.reported_epoch
= info
.stats
.reported_epoch
;
426 if (info
.last_backfill
.is_max())
427 info
.stats
= oinfo
.stats
;
428 info
.hit_set
= oinfo
.hit_set
;
430 // do we have divergent entries to throw out?
431 if (olog
.head
< log
.head
) {
432 rewind_divergent_log(olog
.head
, info
, rollbacker
, dirty_info
, dirty_big_info
);
437 if (olog
.head
> log
.head
) {
438 dout(10) << "merge_log extending head to " << olog
.head
<< dendl
;
440 // find start point in olog
441 auto to
= olog
.log
.end();
442 auto from
= olog
.log
.end();
443 eversion_t lower_bound
= std::max(olog
.tail
, orig_tail
);
445 if (from
== olog
.log
.begin())
448 dout(20) << " ? " << *from
<< dendl
;
449 if (from
->version
<= log
.head
) {
450 lower_bound
= std::max(lower_bound
, from
->version
);
455 dout(20) << "merge_log cut point (usually last shared) is "
456 << lower_bound
<< dendl
;
457 mark_dirty_from(lower_bound
);
459 // We need to preserve the original crt before it gets updated in rewind_from_head().
460 // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback
461 // a divergent entry or not.
462 eversion_t original_crt
= log
.get_can_rollback_to();
463 dout(20) << __func__
<< " original_crt = " << original_crt
<< dendl
;
464 auto divergent
= log
.rewind_from_head(lower_bound
);
465 // move aside divergent items
466 for (auto &&oe
: divergent
) {
467 dout(10) << "merge_log divergent " << oe
<< dendl
;
469 log
.roll_forward_to(log
.head
, rollbacker
);
471 mempool::osd_pglog::list
<pg_log_entry_t
> new_entries
;
472 new_entries
.splice(new_entries
.end(), olog
.log
, from
, to
);
473 append_log_entries_update_missing(
482 _merge_divergent_entries(
491 info
.last_update
= log
.head
= olog
.head
;
493 // We cannot rollback into the new log entries
494 log
.skip_can_rollback_to_to_head();
496 info
.last_user_version
= oinfo
.last_user_version
;
497 info
.purged_snaps
= oinfo
.purged_snaps
;
498 // update num_missing too
499 // we might have appended some more missing objects above
500 info
.stats
.stats
.sum
.num_objects_missing
= missing
.num_missing();
506 if (merge_log_dups(olog
)) {
510 dout(10) << "merge_log result " << log
<< " " << missing
<<
511 " changed=" << changed
<< dendl
;
515 dirty_big_info
= true;
520 // returns true if any changes were made to log.dups
521 bool PGLog::merge_log_dups(const pg_log_t
& olog
) {
523 << " log.dups.size()=" << log
.dups
.size()
524 << "olog.dups.size()=" << olog
.dups
.size() << dendl
;
525 bool changed
= false;
527 if (!olog
.dups
.empty()) {
528 if (log
.dups
.empty()) {
529 dout(10) << "merge_log copying olog dups to log " <<
530 olog
.dups
.front().version
<< " to " <<
531 olog
.dups
.back().version
<< dendl
;
533 dirty_from_dups
= eversion_t();
534 dirty_to_dups
= eversion_t::max();
535 // since our log.dups is empty just copy them
536 for (const auto& i
: olog
.dups
) {
537 log
.dups
.push_back(i
);
538 log
.index(log
.dups
.back());
541 // since our log.dups is not empty try to extend on each end
543 if (olog
.dups
.back().version
> log
.dups
.back().version
) {
544 // extend the dups's tail (i.e., newer dups)
545 dout(10) << "merge_log extending dups tail to " <<
546 olog
.dups
.back().version
<< dendl
;
549 auto log_tail_version
= log
.dups
.back().version
;
551 auto insert_cursor
= log
.dups
.end();
552 eversion_t last_shared
= eversion_t::max();
553 for (auto i
= olog
.dups
.crbegin(); i
!= olog
.dups
.crend(); ++i
) {
554 if (i
->version
<= log_tail_version
) break;
555 log
.dups
.insert(insert_cursor
, *i
);
556 last_shared
= i
->version
;
558 auto prev
= insert_cursor
;
560 // be sure to pass reference of copy in log.dups
563 --insert_cursor
; // make sure we insert in reverse order
565 mark_dirty_from_dups(last_shared
);
568 if (olog
.dups
.front().version
< log
.dups
.front().version
) {
569 // extend the dups's head (i.e., older dups)
570 dout(10) << "merge_log extending dups head to " <<
571 olog
.dups
.front().version
<< dendl
;
575 auto insert_cursor
= log
.dups
.begin();
576 for (auto i
= olog
.dups
.cbegin(); i
!= olog
.dups
.cend(); ++i
) {
577 if (i
->version
>= insert_cursor
->version
) break;
578 log
.dups
.insert(insert_cursor
, *i
);
580 auto prev
= insert_cursor
;
582 // be sure to pass address of copy in log.dups
585 mark_dirty_to_dups(last
);
590 // remove any dup entries that overlap with pglog
591 if (!log
.dups
.empty() && log
.dups
.back().version
> log
.tail
) {
592 dout(10) << "merge_log removed dups overlapping log entries (" <<
593 log
.tail
<< "," << log
.dups
.back().version
<< "]" << dendl
;
596 while (!log
.dups
.empty() && log
.dups
.back().version
> log
.tail
) {
597 log
.unindex(log
.dups
.back());
598 mark_dirty_from_dups(log
.dups
.back().version
);
603 dout(5) << "end of " << __func__
<< " changed=" << changed
604 << " log.dups.size()=" << log
.dups
.size()
605 << " olog.dups.size()=" << olog
.dups
.size() << dendl
;
610 void PGLog::check() {
613 if (log
.log
.size() != log_keys_debug
.size()) {
614 derr
<< "log.log.size() != log_keys_debug.size()" << dendl
;
615 derr
<< "actual log:" << dendl
;
616 for (auto i
= log
.log
.begin(); i
!= log
.log
.end(); ++i
) {
617 derr
<< " " << *i
<< dendl
;
619 derr
<< "log_keys_debug:" << dendl
;
620 for (auto i
= log_keys_debug
.begin();
621 i
!= log_keys_debug
.end();
623 derr
<< " " << *i
<< dendl
;
626 ceph_assert(log
.log
.size() == log_keys_debug
.size());
627 for (auto i
= log
.log
.begin(); i
!= log
.log
.end(); ++i
) {
628 ceph_assert(log_keys_debug
.count(i
->get_key_name()));
633 void PGLog::write_log_and_missing(
634 ObjectStore::Transaction
& t
,
635 map
<string
,bufferlist
> *km
,
637 const ghobject_t
&log_oid
,
638 bool require_rollback
)
641 dout(6) << "write_log_and_missing with: "
642 << "dirty_to: " << dirty_to
643 << ", dirty_from: " << dirty_from
644 << ", writeout_from: " << writeout_from
645 << ", trimmed: " << trimmed
646 << ", trimmed_dups: " << trimmed_dups
647 << ", clear_divergent_priors: " << clear_divergent_priors
649 _write_log_and_missing(
650 t
, km
, log
, coll
, log_oid
,
655 std::move(trimmed_dups
),
659 clear_divergent_priors
,
663 &may_include_deletes_in_missing_dirty
,
664 (pg_log_debug
? &log_keys_debug
: nullptr),
668 dout(10) << "log is not dirty" << dendl
;
673 void PGLog::write_log_and_missing_wo_missing(
674 ObjectStore::Transaction
& t
,
675 map
<string
,bufferlist
> *km
,
677 const coll_t
& coll
, const ghobject_t
&log_oid
,
678 map
<eversion_t
, hobject_t
> &divergent_priors
,
679 bool require_rollback
,
680 const DoutPrefixProvider
*dpp
683 _write_log_and_missing_wo_missing(
684 t
, km
, log
, coll
, log_oid
,
685 divergent_priors
, eversion_t::max(), eversion_t(), eversion_t(),
686 true, true, require_rollback
,
687 eversion_t::max(), eversion_t(), eversion_t(), nullptr, dpp
);
691 void PGLog::write_log_and_missing(
692 ObjectStore::Transaction
& t
,
693 map
<string
,bufferlist
> *km
,
696 const ghobject_t
&log_oid
,
697 const pg_missing_tracker_t
&missing
,
698 bool require_rollback
,
699 bool *may_include_deletes_in_missing_dirty
,
700 const DoutPrefixProvider
*dpp
)
702 _write_log_and_missing(
703 t
, km
, log
, coll
, log_oid
,
710 true, require_rollback
, false,
714 may_include_deletes_in_missing_dirty
, nullptr, dpp
);
718 void PGLog::_write_log_and_missing_wo_missing(
719 ObjectStore::Transaction
& t
,
720 map
<string
,bufferlist
> *km
,
722 const coll_t
& coll
, const ghobject_t
&log_oid
,
723 map
<eversion_t
, hobject_t
> &divergent_priors
,
725 eversion_t dirty_from
,
726 eversion_t writeout_from
,
727 bool dirty_divergent_priors
,
729 bool require_rollback
,
730 eversion_t dirty_to_dups
,
731 eversion_t dirty_from_dups
,
732 eversion_t write_from_dups
,
733 set
<string
> *log_keys_debug
,
734 const DoutPrefixProvider
*dpp
737 ldpp_dout(dpp
, 10) << "_write_log_and_missing_wo_missing, clearing up to " << dirty_to
738 << " dirty_to_dups=" << dirty_to_dups
739 << " dirty_from_dups=" << dirty_from_dups
740 << " write_from_dups=" << write_from_dups
<< dendl
;
742 t
.touch(coll
, log_oid
);
743 if (dirty_to
!= eversion_t()) {
746 eversion_t().get_key_name(), dirty_to
.get_key_name());
747 clear_up_to(log_keys_debug
, dirty_to
.get_key_name());
749 if (dirty_to
!= eversion_t::max() && dirty_from
!= eversion_t::max()) {
750 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
753 dirty_from
.get_key_name(), eversion_t::max().get_key_name());
754 clear_after(log_keys_debug
, dirty_from
.get_key_name());
757 for (auto p
= log
.log
.begin();
758 p
!= log
.log
.end() && p
->version
<= dirty_to
;
760 bufferlist
bl(sizeof(*p
) * 2);
761 p
->encode_with_checksum(bl
);
762 (*km
)[p
->get_key_name()] = std::move(bl
);
765 for (auto p
= log
.log
.rbegin();
766 p
!= log
.log
.rend() &&
767 (p
->version
>= dirty_from
|| p
->version
>= writeout_from
) &&
768 p
->version
>= dirty_to
;
770 bufferlist
bl(sizeof(*p
) * 2);
771 p
->encode_with_checksum(bl
);
772 (*km
)[p
->get_key_name()] = std::move(bl
);
775 if (log_keys_debug
) {
776 for (auto i
= (*km
).begin();
779 if (i
->first
[0] == '_')
781 ceph_assert(!log_keys_debug
->count(i
->first
));
782 log_keys_debug
->insert(i
->first
);
786 // process dups after log_keys_debug is filled, so dups do not
787 // end up in that set
788 if (dirty_to_dups
!= eversion_t()) {
789 pg_log_dup_t min
, dirty_to_dup
;
790 dirty_to_dup
.version
= dirty_to_dups
;
791 ldpp_dout(dpp
, 10) << __func__
<< " remove dups min=" << min
.get_key_name()
792 << " to dirty_to_dup=" << dirty_to_dup
.get_key_name() << dendl
;
795 min
.get_key_name(), dirty_to_dup
.get_key_name());
797 if (dirty_to_dups
!= eversion_t::max() && dirty_from_dups
!= eversion_t::max()) {
798 pg_log_dup_t max
, dirty_from_dup
;
799 max
.version
= eversion_t::max();
800 dirty_from_dup
.version
= dirty_from_dups
;
801 ldpp_dout(dpp
, 10) << __func__
<< " remove dups dirty_from_dup="
802 << dirty_from_dup
.get_key_name()
803 << " to max=" << max
.get_key_name() << dendl
;
806 dirty_from_dup
.get_key_name(), max
.get_key_name());
809 ldpp_dout(dpp
, 10) << __func__
<< " going to encode log.dups.size()="
810 << log
.dups
.size() << dendl
;
811 for (const auto& entry
: log
.dups
) {
812 if (entry
.version
> dirty_to_dups
)
816 (*km
)[entry
.get_key_name()] = std::move(bl
);
818 ldpp_dout(dpp
, 10) << __func__
<< " 1st round encoded log.dups.size()="
819 << log
.dups
.size() << dendl
;
820 for (auto p
= log
.dups
.rbegin();
821 p
!= log
.dups
.rend() &&
822 (p
->version
>= dirty_from_dups
|| p
->version
>= write_from_dups
) &&
823 p
->version
>= dirty_to_dups
;
827 (*km
)[p
->get_key_name()] = std::move(bl
);
829 ldpp_dout(dpp
, 10) << __func__
<< " 2st round encoded log.dups.size()="
830 << log
.dups
.size() << dendl
;
832 if (dirty_divergent_priors
) {
833 ldpp_dout(dpp
, 10) << "write_log_and_missing: writing divergent_priors"
835 encode(divergent_priors
, (*km
)["divergent_priors"]);
837 if (require_rollback
) {
839 log
.get_can_rollback_to(),
840 (*km
)["can_rollback_to"]);
842 log
.get_rollback_info_trimmed_to(),
843 (*km
)["rollback_info_trimmed_to"]);
845 ldpp_dout(dpp
, 10) << "end of " << __func__
<< dendl
;
849 void PGLog::_write_log_and_missing(
850 ObjectStore::Transaction
& t
,
851 map
<string
,bufferlist
>* km
,
853 const coll_t
& coll
, const ghobject_t
&log_oid
,
855 eversion_t dirty_from
,
856 eversion_t writeout_from
,
857 set
<eversion_t
> &&trimmed
,
858 set
<string
> &&trimmed_dups
,
859 const pg_missing_tracker_t
&missing
,
861 bool require_rollback
,
862 bool clear_divergent_priors
,
863 eversion_t dirty_to_dups
,
864 eversion_t dirty_from_dups
,
865 eversion_t write_from_dups
,
866 bool *may_include_deletes_in_missing_dirty
, // in/out param
867 set
<string
> *log_keys_debug
,
868 const DoutPrefixProvider
*dpp
870 ldpp_dout(dpp
, 10) << __func__
<< " clearing up to " << dirty_to
871 << " dirty_to_dups=" << dirty_to_dups
872 << " dirty_from_dups=" << dirty_from_dups
873 << " write_from_dups=" << write_from_dups
874 << " trimmed_dups.size()=" << trimmed_dups
.size() << dendl
;
875 set
<string
> to_remove
;
876 to_remove
.swap(trimmed_dups
);
877 for (auto& t
: trimmed
) {
878 string key
= t
.get_key_name();
879 if (log_keys_debug
) {
880 auto it
= log_keys_debug
->find(key
);
881 ceph_assert(it
!= log_keys_debug
->end());
882 log_keys_debug
->erase(it
);
884 to_remove
.emplace(std::move(key
));
889 t
.touch(coll
, log_oid
);
890 if (dirty_to
!= eversion_t()) {
893 eversion_t().get_key_name(), dirty_to
.get_key_name());
894 clear_up_to(log_keys_debug
, dirty_to
.get_key_name());
896 if (dirty_to
!= eversion_t::max() && dirty_from
!= eversion_t::max()) {
897 ldpp_dout(dpp
, 10) << "write_log_and_missing, clearing from "
898 << dirty_from
<< dendl
;
901 dirty_from
.get_key_name(), eversion_t::max().get_key_name());
902 clear_after(log_keys_debug
, dirty_from
.get_key_name());
905 for (auto p
= log
.log
.begin();
906 p
!= log
.log
.end() && p
->version
<= dirty_to
;
908 bufferlist
bl(sizeof(*p
) * 2);
909 p
->encode_with_checksum(bl
);
910 (*km
)[p
->get_key_name()] = std::move(bl
);
913 for (auto p
= log
.log
.rbegin();
914 p
!= log
.log
.rend() &&
915 (p
->version
>= dirty_from
|| p
->version
>= writeout_from
) &&
916 p
->version
>= dirty_to
;
918 bufferlist
bl(sizeof(*p
) * 2);
919 p
->encode_with_checksum(bl
);
920 (*km
)[p
->get_key_name()] = std::move(bl
);
923 if (log_keys_debug
) {
924 for (auto i
= (*km
).begin();
927 if (i
->first
[0] == '_')
929 ceph_assert(!log_keys_debug
->count(i
->first
));
930 log_keys_debug
->insert(i
->first
);
934 // process dups after log_keys_debug is filled, so dups do not
935 // end up in that set
936 if (dirty_to_dups
!= eversion_t()) {
937 pg_log_dup_t min
, dirty_to_dup
;
938 dirty_to_dup
.version
= dirty_to_dups
;
939 ldpp_dout(dpp
, 10) << __func__
<< " remove dups min=" << min
.get_key_name()
940 << " to dirty_to_dup=" << dirty_to_dup
.get_key_name() << dendl
;
943 min
.get_key_name(), dirty_to_dup
.get_key_name());
945 if (dirty_to_dups
!= eversion_t::max() && dirty_from_dups
!= eversion_t::max()) {
946 pg_log_dup_t max
, dirty_from_dup
;
947 max
.version
= eversion_t::max();
948 dirty_from_dup
.version
= dirty_from_dups
;
949 ldpp_dout(dpp
, 10) << __func__
<< " remove dups dirty_from_dup="
950 << dirty_from_dup
.get_key_name()
951 << " to max=" << max
.get_key_name() << dendl
;
954 dirty_from_dup
.get_key_name(), max
.get_key_name());
957 ldpp_dout(dpp
, 10) << __func__
<< " going to encode log.dups.size()="
958 << log
.dups
.size() << dendl
;
959 for (const auto& entry
: log
.dups
) {
960 if (entry
.version
> dirty_to_dups
)
964 (*km
)[entry
.get_key_name()] = std::move(bl
);
966 ldpp_dout(dpp
, 10) << __func__
<< " 1st round encoded log.dups.size()="
967 << log
.dups
.size() << dendl
;
969 for (auto p
= log
.dups
.rbegin();
970 p
!= log
.dups
.rend() &&
971 (p
->version
>= dirty_from_dups
|| p
->version
>= write_from_dups
) &&
972 p
->version
>= dirty_to_dups
;
976 (*km
)[p
->get_key_name()] = std::move(bl
);
978 ldpp_dout(dpp
, 10) << __func__
<< " 2st round encoded log.dups.size()="
979 << log
.dups
.size() << dendl
;
981 if (clear_divergent_priors
) {
982 ldpp_dout(dpp
, 10) << "write_log_and_missing: writing divergent_priors"
984 to_remove
.insert("divergent_priors");
986 // since we encode individual missing items instead of a whole
987 // missing set, we need another key to store this bit of state
988 if (*may_include_deletes_in_missing_dirty
) {
989 (*km
)["may_include_deletes_in_missing"] = bufferlist();
990 *may_include_deletes_in_missing_dirty
= false;
993 [&](const hobject_t
&obj
) {
994 string key
= string("missing/") + obj
.to_str();
995 pg_missing_item item
;
996 if (!missing
.is_missing(obj
, &item
)) {
997 to_remove
.insert(key
);
999 encode(make_pair(obj
, item
), (*km
)[key
], CEPH_FEATUREMASK_SERVER_OCTOPUS
);
1002 if (require_rollback
) {
1004 log
.get_can_rollback_to(),
1005 (*km
)["can_rollback_to"]);
1007 log
.get_rollback_info_trimmed_to(),
1008 (*km
)["rollback_info_trimmed_to"]);
1011 if (!to_remove
.empty())
1012 t
.omap_rmkeys(coll
, log_oid
, to_remove
);
1013 ldpp_dout(dpp
, 10) << "end of " << __func__
<< dendl
;
1016 void PGLog::rebuild_missing_set_with_deletes(
1018 ObjectStore::CollectionHandle
& ch
,
1019 const pg_info_t
&info
)
1021 // save entries not generated from the current log (e.g. added due
1022 // to repair, EIO handling, or divergent_priors).
1023 map
<hobject_t
, pg_missing_item
> extra_missing
;
1024 for (const auto& p
: missing
.get_items()) {
1025 if (!log
.logged_object(p
.first
)) {
1026 dout(20) << __func__
<< " extra missing entry: " << p
.first
1027 << " " << p
.second
<< dendl
;
1028 extra_missing
[p
.first
] = p
.second
;
1033 // go through the log and add items that are not present or older
1034 // versions on disk, just as if we were reading the log + metadata
1035 // off disk originally
1037 for (auto i
= log
.log
.rbegin();
1038 i
!= log
.log
.rend();
1040 if (i
->version
<= info
.last_complete
)
1042 if (i
->soid
> info
.last_backfill
||
1044 did
.find(i
->soid
) != did
.end())
1046 did
.insert(i
->soid
);
1049 int r
= store
->getattr(
1051 ghobject_t(i
->soid
, ghobject_t::NO_GEN
, info
.pgid
.shard
),
1054 dout(20) << __func__
<< " check for log entry: " << *i
<< " = " << r
<< dendl
;
1057 object_info_t
oi(bv
);
1058 dout(20) << __func__
<< " store version = " << oi
.version
<< dendl
;
1059 if (oi
.version
< i
->version
) {
1060 missing
.add(i
->soid
, i
->version
, oi
.version
, i
->is_delete());
1063 missing
.add(i
->soid
, i
->version
, eversion_t(), i
->is_delete());
1067 for (const auto& p
: extra_missing
) {
1068 missing
.add(p
.first
, p
.second
.need
, p
.second
.have
, p
.second
.is_delete());
1071 set_missing_may_contain_deletes();
1077 struct FuturizedShardStoreLogReader
{
1078 crimson::os::FuturizedStore::Shard
&store
;
1079 const pg_info_t
&info
;
1080 PGLog::IndexedLog
&log
;
1081 std::set
<std::string
>* log_keys_debug
= NULL
;
1082 pg_missing_tracker_t
&missing
;
1083 const DoutPrefixProvider
*dpp
;
1085 eversion_t on_disk_can_rollback_to
;
1086 eversion_t on_disk_rollback_info_trimmed_to
;
1088 std::map
<eversion_t
, hobject_t
> divergent_priors
;
1089 bool must_rebuild
= false;
1090 std::list
<pg_log_entry_t
> entries
;
1091 std::list
<pg_log_dup_t
> dups
;
1093 std::optional
<std::string
> next
;
1095 void process_entry(const auto& key
, const auto& value
) {
1098 //Copy ceph::buffer::list before creating iterator
1100 auto bp
= bl
.cbegin();
1101 if (key
== "divergent_priors") {
1102 decode(divergent_priors
, bp
);
1103 ldpp_dout(dpp
, 20) << "read_log_and_missing " << divergent_priors
.size()
1104 << " divergent_priors" << dendl
;
1105 ceph_assert("crimson shouldn't have had divergent_priors" == 0);
1106 } else if (key
== "can_rollback_to") {
1107 decode(on_disk_can_rollback_to
, bp
);
1108 } else if (key
== "rollback_info_trimmed_to") {
1109 decode(on_disk_rollback_info_trimmed_to
, bp
);
1110 } else if (key
== "may_include_deletes_in_missing") {
1111 missing
.may_include_deletes
= true;
1112 } else if (key
.substr(0, 7) == std::string("missing")) {
1114 pg_missing_item item
;
1117 if (item
.is_delete()) {
1118 ceph_assert(missing
.may_include_deletes
);
1120 missing
.add(oid
, std::move(item
));
1121 } else if (key
.substr(0, 4) == std::string("dup_")) {
1124 if (!dups
.empty()) {
1125 ceph_assert(dups
.back().version
< dup
.version
);
1127 dups
.push_back(dup
);
1130 e
.decode_with_checksum(bp
);
1131 ldpp_dout(dpp
, 20) << "read_log_and_missing " << e
<< dendl
;
1132 if (!entries
.empty()) {
1133 pg_log_entry_t
last_e(entries
.back());
1134 ceph_assert(last_e
.version
.version
< e
.version
.version
);
1135 ceph_assert(last_e
.version
.epoch
<= e
.version
.epoch
);
1137 entries
.push_back(e
);
1139 log_keys_debug
->insert(e
.get_key_name());
1143 seastar::future
<> read(crimson::os::CollectionRef ch
,
1144 ghobject_t pgmeta_oid
) {
1145 // will get overridden if recorded
1146 on_disk_can_rollback_to
= info
.last_update
;
1147 missing
.may_include_deletes
= false;
1149 return seastar::do_with(
1151 std::move(pgmeta_oid
),
1152 std::make_optional
<std::string
>(),
1153 [this](crimson::os::CollectionRef
&ch
,
1154 ghobject_t
&pgmeta_oid
,
1155 std::optional
<std::string
> &start
) {
1156 return seastar::repeat([this, &ch
, &pgmeta_oid
, &start
]() {
1157 return store
.omap_get_values(
1158 ch
, pgmeta_oid
, start
1159 ).safe_then([this, &start
](const auto& ret
) {
1160 const auto& [done
, kvs
] = ret
;
1161 for (const auto& [key
, value
] : kvs
) {
1162 process_entry(key
, value
);
1165 return seastar::make_ready_future
<seastar::stop_iteration
>(
1166 done
? seastar::stop_iteration::yes
: seastar::stop_iteration::no
1168 }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all
{});
1170 if (info
.pgid
.is_no_shard()) {
1171 // replicated pool pg does not persist this key
1172 assert(on_disk_rollback_info_trimmed_to
== eversion_t());
1173 on_disk_rollback_info_trimmed_to
= info
.last_update
;
1175 log
= PGLog::IndexedLog(
1178 on_disk_can_rollback_to
,
1179 on_disk_rollback_info_trimmed_to
,
1188 seastar::future
<> PGLog::read_log_and_missing_crimson(
1189 crimson::os::FuturizedStore::Shard
&store
,
1190 crimson::os::CollectionRef ch
,
1191 const pg_info_t
&info
,
1193 std::set
<std::string
>* log_keys_debug
,
1194 pg_missing_tracker_t
&missing
,
1195 ghobject_t pgmeta_oid
,
1196 const DoutPrefixProvider
*dpp
)
1198 ldpp_dout(dpp
, 20) << "read_log_and_missing coll "
1200 << " " << pgmeta_oid
<< dendl
;
1201 return seastar::do_with(FuturizedShardStoreLogReader
{
1202 store
, info
, log
, log_keys_debug
,
1204 [ch
, pgmeta_oid
](FuturizedShardStoreLogReader
& reader
) {
1205 return reader
.read(ch
, pgmeta_oid
);