1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
9 * Author: Loic Dachary <loic@dachary.org>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
19 #include "include/unordered_map.h"
20 #include "common/ceph_context.h"
28 using ceph::bufferlist
;
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_osd
35 #define dout_prefix _prefix(_dout, this)
37 static ostream
& _prefix(std::ostream
*_dout
, const PGLog
*pglog
)
39 return pglog
->gen_prefix(*_dout
);
42 //////////////////// PGLog::IndexedLog ////////////////////
44 void PGLog::IndexedLog::split_out_child(
47 PGLog::IndexedLog
*target
)
50 *target
= IndexedLog(pg_log_t::split_out_child(child_pgid
, split_bits
));
53 reset_rollback_info_trimmed_to_riter();
56 void PGLog::IndexedLog::trim(
59 set
<eversion_t
> *trimmed
,
60 set
<string
>* trimmed_dups
,
61 eversion_t
*write_from_dups
)
63 ceph_assert(s
<= can_rollback_to
);
64 if (complete_to
!= log
.end())
65 lgeneric_subdout(cct
, osd
, 20) << " complete_to " << complete_to
->version
<< dendl
;
67 auto earliest_dup_version
=
68 log
.rbegin()->version
.version
< cct
->_conf
->osd_pg_log_dups_tracked
70 : log
.rbegin()->version
.version
- cct
->_conf
->osd_pg_log_dups_tracked
+ 1;
72 lgeneric_subdout(cct
, osd
, 20) << "earliest_dup_version = " << earliest_dup_version
<< dendl
;
73 while (!log
.empty()) {
74 const pg_log_entry_t
&e
= *log
.begin();
77 lgeneric_subdout(cct
, osd
, 20) << "trim " << e
<< dendl
;
79 trimmed
->emplace(e
.version
);
81 unindex(e
); // remove from index,
84 if (e
.version
.version
>= earliest_dup_version
) {
85 if (write_from_dups
!= nullptr && *write_from_dups
> e
.version
) {
86 lgeneric_subdout(cct
, osd
, 20) << "updating write_from_dups from " << *write_from_dups
<< " to " << e
.version
<< dendl
;
87 *write_from_dups
= e
.version
;
89 dups
.push_back(pg_log_dup_t(e
));
92 for (const auto& extra
: e
.extra_reqids
) {
93 int return_code
= e
.return_code
;
94 if (return_code
>= 0) {
95 auto it
= e
.extra_reqid_return_codes
.find(idx
);
96 if (it
!= e
.extra_reqid_return_codes
.end()) {
97 return_code
= it
->second
;
98 // FIXME: we aren't setting op_returns for these extra_reqids
103 // note: extras have the same version as outer op
104 dups
.push_back(pg_log_dup_t(e
.version
, extra
.second
,
105 extra
.first
, return_code
));
110 bool reset_complete_to
= false;
111 // we are trimming past complete_to, so reset complete_to
112 if (complete_to
!= log
.end() && e
.version
>= complete_to
->version
)
113 reset_complete_to
= true;
114 if (rollback_info_trimmed_to_riter
== log
.rend() ||
115 e
.version
== rollback_info_trimmed_to_riter
->version
) {
117 rollback_info_trimmed_to_riter
= log
.rend();
122 // reset complete_to to the beginning of the log
123 if (reset_complete_to
) {
124 complete_to
= log
.begin();
125 if (complete_to
!= log
.end()) {
126 lgeneric_subdout(cct
, osd
, 20) << " moving complete_to to "
127 << log
.begin()->version
<< dendl
;
129 lgeneric_subdout(cct
, osd
, 20) << " log is now empty" << dendl
;
134 while (!dups
.empty()) {
135 const auto& e
= *dups
.begin();
136 if (e
.version
.version
>= earliest_dup_version
)
138 lgeneric_subdout(cct
, osd
, 20) << "trim dup " << e
<< dendl
;
140 trimmed_dups
->insert(e
.get_key_name());
150 ostream
& PGLog::IndexedLog::print(ostream
& out
) const
152 out
<< *this << std::endl
;
153 for (auto p
= log
.begin(); p
!= log
.end(); ++p
) {
155 (logged_object(p
->soid
) ? "indexed" : "NOT INDEXED") <<
157 ceph_assert(!p
->reqid_is_indexed() || logged_req(p
->reqid
));
160 for (auto p
= dups
.begin(); p
!= dups
.end(); ++p
) {
161 out
<< *p
<< std::endl
;
167 //////////////////// PGLog ////////////////////
169 void PGLog::reset_backfill()
174 void PGLog::clear() {
177 log_keys_debug
.clear();
181 void PGLog::clear_info_log(
183 ObjectStore::Transaction
*t
) {
185 t
->remove(coll
, pgid
.make_pgmeta_oid());
191 bool transaction_applied
,
194 dout(10) << __func__
<< " proposed trim_to = " << trim_to
<< dendl
;
196 if (trim_to
> log
.tail
) {
197 dout(10) << __func__
<< " missing = " << missing
.num_missing() << dendl
;
198 // Don't assert for async_recovery_targets or backfill_targets
199 // or whenever there are missing items
200 if (transaction_applied
&& !async
&& (missing
.num_missing() == 0))
201 ceph_assert(trim_to
<= info
.last_complete
);
203 dout(10) << "trim " << log
<< " to " << trim_to
<< dendl
;
204 log
.trim(cct
, trim_to
, &trimmed
, &trimmed_dups
, &write_from_dups
);
205 info
.log_tail
= log
.tail
;
206 if (log
.complete_to
!= log
.log
.end())
207 dout(10) << " after trim complete_to " << log
.complete_to
->version
<< dendl
;
211 void PGLog::proc_replica_log(
213 const pg_log_t
&olog
,
214 pg_missing_t
& omissing
,
215 pg_shard_t from
) const
217 dout(10) << "proc_replica_log for osd." << from
<< ": "
218 << oinfo
<< " " << olog
<< " " << omissing
<< dendl
;
220 if (olog
.head
< log
.tail
) {
221 dout(10) << __func__
<< ": osd." << from
<< " does not overlap, not looking "
222 << "for divergent objects" << dendl
;
225 if (olog
.head
== log
.head
) {
226 dout(10) << __func__
<< ": osd." << from
<< " same log head, not looking "
227 << "for divergent objects" << dendl
;
232 basically what we're doing here is rewinding the remote log,
233 dropping divergent entries, until we find something that matches
234 our master log. we then reset last_update to reflect the new
235 point up to which missing is accurate.
237 later, in activate(), missing will get wound forward again and
238 we will send the peer enough log to arrive at the same state.
241 for (auto i
= omissing
.get_items().begin();
242 i
!= omissing
.get_items().end();
244 dout(20) << " before missing " << i
->first
<< " need " << i
->second
.need
245 << " have " << i
->second
.have
<< dendl
;
248 auto first_non_divergent
= log
.log
.rbegin();
250 if (first_non_divergent
== log
.log
.rend())
252 if (first_non_divergent
->version
<= olog
.head
) {
253 dout(20) << "merge_log point (usually last shared) is "
254 << *first_non_divergent
<< dendl
;
257 ++first_non_divergent
;
260 /* Because olog.head >= log.tail, we know that both pgs must at least have
261 * the event represented by log.tail. Similarly, because log.head >= olog.tail,
262 * we know that the event represented by olog.tail must be common to both logs.
263 * Furthermore, the event represented by a log tail was necessarily trimmed,
264 * thus neither olog.tail nor log.tail can be divergent. It's
265 * possible that olog/log contain no actual events between olog.head and
266 * max(log.tail, olog.tail), however, since they might have been split out.
267 * Thus, if we cannot find an event e such that
268 * log.tail <= e.version <= log.head, the last_update must actually be
269 * max(log.tail, olog.tail).
271 eversion_t limit
= std::max(olog
.tail
, log
.tail
);
273 (first_non_divergent
== log
.log
.rend() ||
274 first_non_divergent
->version
< limit
) ?
276 first_non_divergent
->version
;
278 // we merge and adjust the replica's log, rollback the rollbackable divergent entry,
279 // remove the unrollbackable divergent entry and mark the according object as missing.
280 // the rollback boundary must choose crt of the olog which going to be merged.
281 // The replica log's(olog) crt will not be modified, so it could get passed
282 // to _merge_divergent_entries() directly.
283 IndexedLog
folog(olog
);
284 auto divergent
= folog
.rewind_from_head(lu
);
285 _merge_divergent_entries(
289 olog
.get_can_rollback_to(),
294 if (lu
< oinfo
.last_update
) {
295 dout(10) << " peer osd." << from
<< " last_update now " << lu
<< dendl
;
296 oinfo
.last_update
= lu
;
299 if (omissing
.have_missing()) {
300 eversion_t first_missing
=
301 omissing
.get_items().at(omissing
.get_rmissing().begin()->second
).need
;
302 oinfo
.last_complete
= eversion_t();
303 for (auto i
= olog
.log
.begin(); i
!= olog
.log
.end(); ++i
) {
304 if (i
->version
< first_missing
)
305 oinfo
.last_complete
= i
->version
;
310 oinfo
.last_complete
= oinfo
.last_update
;
312 } // proc_replica_log
315 * rewind divergent entries at the head of the log
317 * This rewinds entries off the head of our log that are divergent.
318 * This is used by replicas during activation.
320 * @param newhead new head to rewind to
322 void PGLog::rewind_divergent_log(eversion_t newhead
,
323 pg_info_t
&info
, LogEntryHandler
*rollbacker
,
324 bool &dirty_info
, bool &dirty_big_info
)
326 dout(10) << "rewind_divergent_log truncate divergent future " <<
329 // We need to preserve the original crt before it gets updated in rewind_from_head().
330 // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback
331 // a divergent entry or not.
332 eversion_t original_crt
= log
.get_can_rollback_to();
333 dout(20) << __func__
<< " original_crt = " << original_crt
<< dendl
;
334 if (info
.last_complete
> newhead
)
335 info
.last_complete
= newhead
;
337 auto divergent
= log
.rewind_from_head(newhead
);
338 if (!divergent
.empty()) {
339 mark_dirty_from(divergent
.front().version
);
341 for (auto &&entry
: divergent
) {
342 dout(10) << "rewind_divergent_log future divergent " << entry
<< dendl
;
344 info
.last_update
= newhead
;
346 _merge_divergent_entries(
356 dirty_big_info
= true;
359 void PGLog::merge_log(pg_info_t
&oinfo
, pg_log_t
&& olog
, pg_shard_t fromosd
,
360 pg_info_t
&info
, LogEntryHandler
*rollbacker
,
361 bool &dirty_info
, bool &dirty_big_info
)
363 dout(10) << "merge_log " << olog
<< " from osd." << fromosd
364 << " into " << log
<< dendl
;
366 // Check preconditions
368 // If our log is empty, the incoming log needs to have not been trimmed.
369 ceph_assert(!log
.null() || olog
.tail
== eversion_t());
370 // The logs must overlap.
371 ceph_assert(log
.head
>= olog
.tail
&& olog
.head
>= log
.tail
);
373 for (auto i
= missing
.get_items().begin();
374 i
!= missing
.get_items().end();
376 dout(20) << "pg_missing_t sobject: " << i
->first
<< dendl
;
379 bool changed
= false;
382 // this is just filling in history. it does not affect our
383 // missing set, as that should already be consistent with our
385 eversion_t orig_tail
= log
.tail
;
386 if (olog
.tail
< log
.tail
) {
387 dout(10) << "merge_log extending tail to " << olog
.tail
<< dendl
;
388 auto from
= olog
.log
.begin();
391 for (; to
!= olog
.log
.end(); ++to
) {
392 if (to
->version
> log
.tail
)
395 dout(15) << *to
<< dendl
;
400 // splice into our log.
401 log
.log
.splice(log
.log
.begin(),
402 std::move(olog
.log
), from
, to
);
404 info
.log_tail
= log
.tail
= olog
.tail
;
408 if (oinfo
.stats
.reported_seq
< info
.stats
.reported_seq
|| // make sure reported always increases
409 oinfo
.stats
.reported_epoch
< info
.stats
.reported_epoch
) {
410 oinfo
.stats
.reported_seq
= info
.stats
.reported_seq
;
411 oinfo
.stats
.reported_epoch
= info
.stats
.reported_epoch
;
413 if (info
.last_backfill
.is_max())
414 info
.stats
= oinfo
.stats
;
415 info
.hit_set
= oinfo
.hit_set
;
417 // do we have divergent entries to throw out?
418 if (olog
.head
< log
.head
) {
419 rewind_divergent_log(olog
.head
, info
, rollbacker
, dirty_info
, dirty_big_info
);
424 if (olog
.head
> log
.head
) {
425 dout(10) << "merge_log extending head to " << olog
.head
<< dendl
;
427 // find start point in olog
428 auto to
= olog
.log
.end();
429 auto from
= olog
.log
.end();
430 eversion_t lower_bound
= std::max(olog
.tail
, orig_tail
);
432 if (from
== olog
.log
.begin())
435 dout(20) << " ? " << *from
<< dendl
;
436 if (from
->version
<= log
.head
) {
437 lower_bound
= std::max(lower_bound
, from
->version
);
442 dout(20) << "merge_log cut point (usually last shared) is "
443 << lower_bound
<< dendl
;
444 mark_dirty_from(lower_bound
);
446 // We need to preserve the original crt before it gets updated in rewind_from_head().
447 // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback
448 // a divergent entry or not.
449 eversion_t original_crt
= log
.get_can_rollback_to();
450 dout(20) << __func__
<< " original_crt = " << original_crt
<< dendl
;
451 auto divergent
= log
.rewind_from_head(lower_bound
);
452 // move aside divergent items
453 for (auto &&oe
: divergent
) {
454 dout(10) << "merge_log divergent " << oe
<< dendl
;
456 log
.roll_forward_to(log
.head
, rollbacker
);
458 mempool::osd_pglog::list
<pg_log_entry_t
> new_entries
;
459 new_entries
.splice(new_entries
.end(), olog
.log
, from
, to
);
460 append_log_entries_update_missing(
469 _merge_divergent_entries(
478 info
.last_update
= log
.head
= olog
.head
;
480 // We cannot rollback into the new log entries
481 log
.skip_can_rollback_to_to_head();
483 info
.last_user_version
= oinfo
.last_user_version
;
484 info
.purged_snaps
= oinfo
.purged_snaps
;
485 // update num_missing too
486 // we might have appended some more missing objects above
487 info
.stats
.stats
.sum
.num_objects_missing
= missing
.num_missing();
493 if (merge_log_dups(olog
)) {
497 dout(10) << "merge_log result " << log
<< " " << missing
<<
498 " changed=" << changed
<< dendl
;
502 dirty_big_info
= true;
507 // returns true if any changes were made to log.dups
508 bool PGLog::merge_log_dups(const pg_log_t
& olog
) {
509 bool changed
= false;
511 if (!olog
.dups
.empty()) {
512 if (log
.dups
.empty()) {
513 dout(10) << "merge_log copying olog dups to log " <<
514 olog
.dups
.front().version
<< " to " <<
515 olog
.dups
.back().version
<< dendl
;
517 dirty_from_dups
= eversion_t();
518 dirty_to_dups
= eversion_t::max();
519 // since our log.dups is empty just copy them
520 for (const auto& i
: olog
.dups
) {
521 log
.dups
.push_back(i
);
522 log
.index(log
.dups
.back());
525 // since our log.dups is not empty try to extend on each end
527 if (olog
.dups
.back().version
> log
.dups
.back().version
) {
528 // extend the dups's tail (i.e., newer dups)
529 dout(10) << "merge_log extending dups tail to " <<
530 olog
.dups
.back().version
<< dendl
;
533 auto log_tail_version
= log
.dups
.back().version
;
535 auto insert_cursor
= log
.dups
.end();
536 eversion_t last_shared
= eversion_t::max();
537 for (auto i
= olog
.dups
.crbegin(); i
!= olog
.dups
.crend(); ++i
) {
538 if (i
->version
<= log_tail_version
) break;
539 log
.dups
.insert(insert_cursor
, *i
);
540 last_shared
= i
->version
;
542 auto prev
= insert_cursor
;
544 // be sure to pass reference of copy in log.dups
547 --insert_cursor
; // make sure we insert in reverse order
549 mark_dirty_from_dups(last_shared
);
552 if (olog
.dups
.front().version
< log
.dups
.front().version
) {
553 // extend the dups's head (i.e., older dups)
554 dout(10) << "merge_log extending dups head to " <<
555 olog
.dups
.front().version
<< dendl
;
559 auto insert_cursor
= log
.dups
.begin();
560 for (auto i
= olog
.dups
.cbegin(); i
!= olog
.dups
.cend(); ++i
) {
561 if (i
->version
>= insert_cursor
->version
) break;
562 log
.dups
.insert(insert_cursor
, *i
);
564 auto prev
= insert_cursor
;
566 // be sure to pass address of copy in log.dups
569 mark_dirty_to_dups(last
);
574 // remove any dup entries that overlap with pglog
575 if (!log
.dups
.empty() && log
.dups
.back().version
> log
.tail
) {
576 dout(10) << "merge_log removed dups overlapping log entries (" <<
577 log
.tail
<< "," << log
.dups
.back().version
<< "]" << dendl
;
580 while (!log
.dups
.empty() && log
.dups
.back().version
> log
.tail
) {
581 log
.unindex(log
.dups
.back());
582 mark_dirty_from_dups(log
.dups
.back().version
);
590 void PGLog::check() {
593 if (log
.log
.size() != log_keys_debug
.size()) {
594 derr
<< "log.log.size() != log_keys_debug.size()" << dendl
;
595 derr
<< "actual log:" << dendl
;
596 for (auto i
= log
.log
.begin(); i
!= log
.log
.end(); ++i
) {
597 derr
<< " " << *i
<< dendl
;
599 derr
<< "log_keys_debug:" << dendl
;
600 for (auto i
= log_keys_debug
.begin();
601 i
!= log_keys_debug
.end();
603 derr
<< " " << *i
<< dendl
;
606 ceph_assert(log
.log
.size() == log_keys_debug
.size());
607 for (auto i
= log
.log
.begin(); i
!= log
.log
.end(); ++i
) {
608 ceph_assert(log_keys_debug
.count(i
->get_key_name()));
613 void PGLog::write_log_and_missing(
614 ObjectStore::Transaction
& t
,
615 map
<string
,bufferlist
> *km
,
617 const ghobject_t
&log_oid
,
618 bool require_rollback
)
621 dout(6) << "write_log_and_missing with: "
622 << "dirty_to: " << dirty_to
623 << ", dirty_from: " << dirty_from
624 << ", writeout_from: " << writeout_from
625 << ", trimmed: " << trimmed
626 << ", trimmed_dups: " << trimmed_dups
627 << ", clear_divergent_priors: " << clear_divergent_priors
629 _write_log_and_missing(
630 t
, km
, log
, coll
, log_oid
,
635 std::move(trimmed_dups
),
639 clear_divergent_priors
,
643 &may_include_deletes_in_missing_dirty
,
644 (pg_log_debug
? &log_keys_debug
: nullptr));
647 dout(10) << "log is not dirty" << dendl
;
652 void PGLog::write_log_and_missing_wo_missing(
653 ObjectStore::Transaction
& t
,
654 map
<string
,bufferlist
> *km
,
656 const coll_t
& coll
, const ghobject_t
&log_oid
,
657 map
<eversion_t
, hobject_t
> &divergent_priors
,
658 bool require_rollback
661 _write_log_and_missing_wo_missing(
662 t
, km
, log
, coll
, log_oid
,
663 divergent_priors
, eversion_t::max(), eversion_t(), eversion_t(),
664 true, true, require_rollback
,
665 eversion_t::max(), eversion_t(), eversion_t(), nullptr);
669 void PGLog::write_log_and_missing(
670 ObjectStore::Transaction
& t
,
671 map
<string
,bufferlist
> *km
,
674 const ghobject_t
&log_oid
,
675 const pg_missing_tracker_t
&missing
,
676 bool require_rollback
,
677 bool *may_include_deletes_in_missing_dirty
)
679 _write_log_and_missing(
680 t
, km
, log
, coll
, log_oid
,
687 true, require_rollback
, false,
691 may_include_deletes_in_missing_dirty
, nullptr);
695 void PGLog::_write_log_and_missing_wo_missing(
696 ObjectStore::Transaction
& t
,
697 map
<string
,bufferlist
> *km
,
699 const coll_t
& coll
, const ghobject_t
&log_oid
,
700 map
<eversion_t
, hobject_t
> &divergent_priors
,
702 eversion_t dirty_from
,
703 eversion_t writeout_from
,
704 bool dirty_divergent_priors
,
706 bool require_rollback
,
707 eversion_t dirty_to_dups
,
708 eversion_t dirty_from_dups
,
709 eversion_t write_from_dups
,
710 set
<string
> *log_keys_debug
713 // dout(10) << "write_log_and_missing, clearing up to " << dirty_to << dendl;
715 t
.touch(coll
, log_oid
);
716 if (dirty_to
!= eversion_t()) {
719 eversion_t().get_key_name(), dirty_to
.get_key_name());
720 clear_up_to(log_keys_debug
, dirty_to
.get_key_name());
722 if (dirty_to
!= eversion_t::max() && dirty_from
!= eversion_t::max()) {
723 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
726 dirty_from
.get_key_name(), eversion_t::max().get_key_name());
727 clear_after(log_keys_debug
, dirty_from
.get_key_name());
730 for (auto p
= log
.log
.begin();
731 p
!= log
.log
.end() && p
->version
<= dirty_to
;
733 bufferlist
bl(sizeof(*p
) * 2);
734 p
->encode_with_checksum(bl
);
735 (*km
)[p
->get_key_name()] = std::move(bl
);
738 for (auto p
= log
.log
.rbegin();
739 p
!= log
.log
.rend() &&
740 (p
->version
>= dirty_from
|| p
->version
>= writeout_from
) &&
741 p
->version
>= dirty_to
;
743 bufferlist
bl(sizeof(*p
) * 2);
744 p
->encode_with_checksum(bl
);
745 (*km
)[p
->get_key_name()] = std::move(bl
);
748 if (log_keys_debug
) {
749 for (auto i
= (*km
).begin();
752 if (i
->first
[0] == '_')
754 ceph_assert(!log_keys_debug
->count(i
->first
));
755 log_keys_debug
->insert(i
->first
);
759 // process dups after log_keys_debug is filled, so dups do not
760 // end up in that set
761 if (dirty_to_dups
!= eversion_t()) {
762 pg_log_dup_t min
, dirty_to_dup
;
763 dirty_to_dup
.version
= dirty_to_dups
;
766 min
.get_key_name(), dirty_to_dup
.get_key_name());
768 if (dirty_to_dups
!= eversion_t::max() && dirty_from_dups
!= eversion_t::max()) {
769 pg_log_dup_t max
, dirty_from_dup
;
770 max
.version
= eversion_t::max();
771 dirty_from_dup
.version
= dirty_from_dups
;
774 dirty_from_dup
.get_key_name(), max
.get_key_name());
777 for (const auto& entry
: log
.dups
) {
778 if (entry
.version
> dirty_to_dups
)
782 (*km
)[entry
.get_key_name()] = std::move(bl
);
785 for (auto p
= log
.dups
.rbegin();
786 p
!= log
.dups
.rend() &&
787 (p
->version
>= dirty_from_dups
|| p
->version
>= write_from_dups
) &&
788 p
->version
>= dirty_to_dups
;
792 (*km
)[p
->get_key_name()] = std::move(bl
);
795 if (dirty_divergent_priors
) {
796 //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
797 encode(divergent_priors
, (*km
)["divergent_priors"]);
799 if (require_rollback
) {
801 log
.get_can_rollback_to(),
802 (*km
)["can_rollback_to"]);
804 log
.get_rollback_info_trimmed_to(),
805 (*km
)["rollback_info_trimmed_to"]);
810 void PGLog::_write_log_and_missing(
811 ObjectStore::Transaction
& t
,
812 map
<string
,bufferlist
>* km
,
814 const coll_t
& coll
, const ghobject_t
&log_oid
,
816 eversion_t dirty_from
,
817 eversion_t writeout_from
,
818 set
<eversion_t
> &&trimmed
,
819 set
<string
> &&trimmed_dups
,
820 const pg_missing_tracker_t
&missing
,
822 bool require_rollback
,
823 bool clear_divergent_priors
,
824 eversion_t dirty_to_dups
,
825 eversion_t dirty_from_dups
,
826 eversion_t write_from_dups
,
827 bool *may_include_deletes_in_missing_dirty
, // in/out param
828 set
<string
> *log_keys_debug
830 set
<string
> to_remove
;
831 to_remove
.swap(trimmed_dups
);
832 for (auto& t
: trimmed
) {
833 string key
= t
.get_key_name();
834 if (log_keys_debug
) {
835 auto it
= log_keys_debug
->find(key
);
836 ceph_assert(it
!= log_keys_debug
->end());
837 log_keys_debug
->erase(it
);
839 to_remove
.emplace(std::move(key
));
844 t
.touch(coll
, log_oid
);
845 if (dirty_to
!= eversion_t()) {
848 eversion_t().get_key_name(), dirty_to
.get_key_name());
849 clear_up_to(log_keys_debug
, dirty_to
.get_key_name());
851 if (dirty_to
!= eversion_t::max() && dirty_from
!= eversion_t::max()) {
852 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
855 dirty_from
.get_key_name(), eversion_t::max().get_key_name());
856 clear_after(log_keys_debug
, dirty_from
.get_key_name());
859 for (auto p
= log
.log
.begin();
860 p
!= log
.log
.end() && p
->version
<= dirty_to
;
862 bufferlist
bl(sizeof(*p
) * 2);
863 p
->encode_with_checksum(bl
);
864 (*km
)[p
->get_key_name()] = std::move(bl
);
867 for (auto p
= log
.log
.rbegin();
868 p
!= log
.log
.rend() &&
869 (p
->version
>= dirty_from
|| p
->version
>= writeout_from
) &&
870 p
->version
>= dirty_to
;
872 bufferlist
bl(sizeof(*p
) * 2);
873 p
->encode_with_checksum(bl
);
874 (*km
)[p
->get_key_name()] = std::move(bl
);
877 if (log_keys_debug
) {
878 for (auto i
= (*km
).begin();
881 if (i
->first
[0] == '_')
883 ceph_assert(!log_keys_debug
->count(i
->first
));
884 log_keys_debug
->insert(i
->first
);
888 // process dups after log_keys_debug is filled, so dups do not
889 // end up in that set
890 if (dirty_to_dups
!= eversion_t()) {
891 pg_log_dup_t min
, dirty_to_dup
;
892 dirty_to_dup
.version
= dirty_to_dups
;
895 min
.get_key_name(), dirty_to_dup
.get_key_name());
897 if (dirty_to_dups
!= eversion_t::max() && dirty_from_dups
!= eversion_t::max()) {
898 pg_log_dup_t max
, dirty_from_dup
;
899 max
.version
= eversion_t::max();
900 dirty_from_dup
.version
= dirty_from_dups
;
903 dirty_from_dup
.get_key_name(), max
.get_key_name());
906 for (const auto& entry
: log
.dups
) {
907 if (entry
.version
> dirty_to_dups
)
911 (*km
)[entry
.get_key_name()] = std::move(bl
);
914 for (auto p
= log
.dups
.rbegin();
915 p
!= log
.dups
.rend() &&
916 (p
->version
>= dirty_from_dups
|| p
->version
>= write_from_dups
) &&
917 p
->version
>= dirty_to_dups
;
921 (*km
)[p
->get_key_name()] = std::move(bl
);
924 if (clear_divergent_priors
) {
925 //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
926 to_remove
.insert("divergent_priors");
928 // since we encode individual missing items instead of a whole
929 // missing set, we need another key to store this bit of state
930 if (*may_include_deletes_in_missing_dirty
) {
931 (*km
)["may_include_deletes_in_missing"] = bufferlist();
932 *may_include_deletes_in_missing_dirty
= false;
935 [&](const hobject_t
&obj
) {
936 string key
= string("missing/") + obj
.to_str();
937 pg_missing_item item
;
938 if (!missing
.is_missing(obj
, &item
)) {
939 to_remove
.insert(key
);
941 encode(make_pair(obj
, item
), (*km
)[key
], CEPH_FEATUREMASK_SERVER_OCTOPUS
);
944 if (require_rollback
) {
946 log
.get_can_rollback_to(),
947 (*km
)["can_rollback_to"]);
949 log
.get_rollback_info_trimmed_to(),
950 (*km
)["rollback_info_trimmed_to"]);
953 if (!to_remove
.empty())
954 t
.omap_rmkeys(coll
, log_oid
, to_remove
);
957 void PGLog::rebuild_missing_set_with_deletes(
959 ObjectStore::CollectionHandle
& ch
,
960 const pg_info_t
&info
)
962 // save entries not generated from the current log (e.g. added due
963 // to repair, EIO handling, or divergent_priors).
964 map
<hobject_t
, pg_missing_item
> extra_missing
;
965 for (const auto& p
: missing
.get_items()) {
966 if (!log
.logged_object(p
.first
)) {
967 dout(20) << __func__
<< " extra missing entry: " << p
.first
968 << " " << p
.second
<< dendl
;
969 extra_missing
[p
.first
] = p
.second
;
974 // go through the log and add items that are not present or older
975 // versions on disk, just as if we were reading the log + metadata
976 // off disk originally
978 for (auto i
= log
.log
.rbegin();
981 if (i
->version
<= info
.last_complete
)
983 if (i
->soid
> info
.last_backfill
||
985 did
.find(i
->soid
) != did
.end())
990 int r
= store
->getattr(
992 ghobject_t(i
->soid
, ghobject_t::NO_GEN
, info
.pgid
.shard
),
995 dout(20) << __func__
<< " check for log entry: " << *i
<< " = " << r
<< dendl
;
998 object_info_t
oi(bv
);
999 dout(20) << __func__
<< " store version = " << oi
.version
<< dendl
;
1000 if (oi
.version
< i
->version
) {
1001 missing
.add(i
->soid
, i
->version
, oi
.version
, i
->is_delete());
1004 missing
.add(i
->soid
, i
->version
, eversion_t(), i
->is_delete());
1008 for (const auto& p
: extra_missing
) {
1009 missing
.add(p
.first
, p
.second
.need
, p
.second
.have
, p
.second
.is_delete());
1012 set_missing_may_contain_deletes();
1018 struct FuturizedStoreLogReader
{
1019 crimson::os::FuturizedStore
&store
;
1020 const pg_info_t
&info
;
1021 PGLog::IndexedLog
&log
;
1022 std::set
<std::string
>* log_keys_debug
= NULL
;
1023 pg_missing_tracker_t
&missing
;
1024 const DoutPrefixProvider
*dpp
;
1026 eversion_t on_disk_can_rollback_to
;
1027 eversion_t on_disk_rollback_info_trimmed_to
;
1029 std::map
<eversion_t
, hobject_t
> divergent_priors
;
1030 bool must_rebuild
= false;
1031 std::list
<pg_log_entry_t
> entries
;
1032 std::list
<pg_log_dup_t
> dups
;
1034 std::optional
<std::string
> next
;
1036 void process_entry(crimson::os::FuturizedStore::OmapIteratorRef
&p
) {
1037 if (p
->key()[0] == '_')
1039 //Copy ceph::buffer::list before creating iterator
1040 auto bl
= p
->value();
1041 auto bp
= bl
.cbegin();
1042 if (p
->key() == "divergent_priors") {
1043 decode(divergent_priors
, bp
);
1044 ldpp_dout(dpp
, 20) << "read_log_and_missing " << divergent_priors
.size()
1045 << " divergent_priors" << dendl
;
1046 ceph_assert("crimson shouldn't have had divergent_priors" == 0);
1047 } else if (p
->key() == "can_rollback_to") {
1048 decode(on_disk_can_rollback_to
, bp
);
1049 } else if (p
->key() == "rollback_info_trimmed_to") {
1050 decode(on_disk_rollback_info_trimmed_to
, bp
);
1051 } else if (p
->key() == "may_include_deletes_in_missing") {
1052 missing
.may_include_deletes
= true;
1053 } else if (p
->key().substr(0, 7) == std::string("missing")) {
1055 pg_missing_item item
;
1058 if (item
.is_delete()) {
1059 ceph_assert(missing
.may_include_deletes
);
1061 missing
.add(oid
, std::move(item
));
1062 } else if (p
->key().substr(0, 4) == std::string("dup_")) {
1065 if (!dups
.empty()) {
1066 ceph_assert(dups
.back().version
< dup
.version
);
1068 dups
.push_back(dup
);
1071 e
.decode_with_checksum(bp
);
1072 ldpp_dout(dpp
, 20) << "read_log_and_missing " << e
<< dendl
;
1073 if (!entries
.empty()) {
1074 pg_log_entry_t
last_e(entries
.back());
1075 ceph_assert(last_e
.version
.version
< e
.version
.version
);
1076 ceph_assert(last_e
.version
.epoch
<= e
.version
.epoch
);
1078 entries
.push_back(e
);
1080 log_keys_debug
->insert(e
.get_key_name());
1084 seastar::future
<> read(crimson::os::CollectionRef ch
,
1085 ghobject_t pgmeta_oid
) {
1086 // will get overridden if recorded
1087 on_disk_can_rollback_to
= info
.last_update
;
1088 missing
.may_include_deletes
= false;
1090 return store
.get_omap_iterator(ch
, pgmeta_oid
).then([this](auto iter
) {
1091 return seastar::do_until([iter
] { return !iter
->valid(); },
1092 [iter
, this]() mutable {
1093 process_entry(iter
);
1094 return iter
->next();
1097 if (info
.pgid
.is_no_shard()) {
1098 // replicated pool pg does not persist this key
1099 assert(on_disk_rollback_info_trimmed_to
== eversion_t());
1100 on_disk_rollback_info_trimmed_to
= info
.last_update
;
1102 log
= PGLog::IndexedLog(
1105 on_disk_can_rollback_to
,
1106 on_disk_rollback_info_trimmed_to
,
1114 seastar::future
<> PGLog::read_log_and_missing_crimson(
1115 crimson::os::FuturizedStore
&store
,
1116 crimson::os::CollectionRef ch
,
1117 const pg_info_t
&info
,
1119 std::set
<std::string
>* log_keys_debug
,
1120 pg_missing_tracker_t
&missing
,
1121 ghobject_t pgmeta_oid
,
1122 const DoutPrefixProvider
*dpp
)
1124 ldpp_dout(dpp
, 20) << "read_log_and_missing coll "
1126 << " " << pgmeta_oid
<< dendl
;
1127 return seastar::do_with(FuturizedStoreLogReader
{
1128 store
, info
, log
, log_keys_debug
,
1130 [ch
, pgmeta_oid
](FuturizedStoreLogReader
& reader
) {
1131 return reader
.read(ch
, pgmeta_oid
);