]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/PGLog.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / osd / PGLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
8 *
9 * Author: Loic Dachary <loic@dachary.org>
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 *
16 */
17
18 #include "PGLog.h"
19 #include "include/unordered_map.h"
20 #include "common/ceph_context.h"
21
22 using std::make_pair;
23 using std::map;
24 using std::ostream;
25 using std::set;
26 using std::string;
27
28 using ceph::bufferlist;
29 using ceph::decode;
30 using ceph::encode;
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_osd
34 #undef dout_prefix
35 #define dout_prefix _prefix(_dout, this)
36
37 static ostream& _prefix(std::ostream *_dout, const PGLog *pglog)
38 {
39 return pglog->gen_prefix(*_dout);
40 }
41
42 //////////////////// PGLog::IndexedLog ////////////////////
43
44 void PGLog::IndexedLog::split_out_child(
45 pg_t child_pgid,
46 unsigned split_bits,
47 PGLog::IndexedLog *target)
48 {
49 unindex();
50 *target = IndexedLog(pg_log_t::split_out_child(child_pgid, split_bits));
51 index();
52 target->index();
53 reset_rollback_info_trimmed_to_riter();
54 }
55
56 void PGLog::IndexedLog::trim(
57 CephContext* cct,
58 eversion_t s,
59 set<eversion_t> *trimmed,
60 set<string>* trimmed_dups,
61 eversion_t *write_from_dups)
62 {
63 ceph_assert(s <= can_rollback_to);
64 if (complete_to != log.end())
65 lgeneric_subdout(cct, osd, 20) << " complete_to " << complete_to->version << dendl;
66
67 auto earliest_dup_version =
68 log.rbegin()->version.version < cct->_conf->osd_pg_log_dups_tracked
69 ? 0u
70 : log.rbegin()->version.version - cct->_conf->osd_pg_log_dups_tracked + 1;
71
72 lgeneric_subdout(cct, osd, 20) << "earliest_dup_version = " << earliest_dup_version << dendl;
73 while (!log.empty()) {
74 const pg_log_entry_t &e = *log.begin();
75 if (e.version > s)
76 break;
77 lgeneric_subdout(cct, osd, 20) << "trim " << e << dendl;
78 if (trimmed)
79 trimmed->emplace(e.version);
80
81 unindex(e); // remove from index,
82
83 // add to dup list
84 if (e.version.version >= earliest_dup_version) {
85 if (write_from_dups != nullptr && *write_from_dups > e.version) {
86 lgeneric_subdout(cct, osd, 20) << "updating write_from_dups from " << *write_from_dups << " to " << e.version << dendl;
87 *write_from_dups = e.version;
88 }
89 dups.push_back(pg_log_dup_t(e));
90 index(dups.back());
91 uint32_t idx = 0;
92 for (const auto& extra : e.extra_reqids) {
93 int return_code = e.return_code;
94 if (return_code >= 0) {
95 auto it = e.extra_reqid_return_codes.find(idx);
96 if (it != e.extra_reqid_return_codes.end()) {
97 return_code = it->second;
98 // FIXME: we aren't setting op_returns for these extra_reqids
99 }
100 }
101 ++idx;
102
103 // note: extras have the same version as outer op
104 dups.push_back(pg_log_dup_t(e.version, extra.second,
105 extra.first, return_code));
106 index(dups.back());
107 }
108 }
109
110 bool reset_complete_to = false;
111 // we are trimming past complete_to, so reset complete_to
112 if (complete_to != log.end() && e.version >= complete_to->version)
113 reset_complete_to = true;
114 if (rollback_info_trimmed_to_riter == log.rend() ||
115 e.version == rollback_info_trimmed_to_riter->version) {
116 log.pop_front();
117 rollback_info_trimmed_to_riter = log.rend();
118 } else {
119 log.pop_front();
120 }
121
122 // reset complete_to to the beginning of the log
123 if (reset_complete_to) {
124 complete_to = log.begin();
125 if (complete_to != log.end()) {
126 lgeneric_subdout(cct, osd, 20) << " moving complete_to to "
127 << log.begin()->version << dendl;
128 } else {
129 lgeneric_subdout(cct, osd, 20) << " log is now empty" << dendl;
130 }
131 }
132 }
133
134 while (!dups.empty()) {
135 const auto& e = *dups.begin();
136 if (e.version.version >= earliest_dup_version)
137 break;
138 lgeneric_subdout(cct, osd, 20) << "trim dup " << e << dendl;
139 if (trimmed_dups)
140 trimmed_dups->insert(e.get_key_name());
141 unindex(e);
142 dups.pop_front();
143 }
144
145 // raise tail?
146 if (tail < s)
147 tail = s;
148 }
149
150 ostream& PGLog::IndexedLog::print(ostream& out) const
151 {
152 out << *this << std::endl;
153 for (auto p = log.begin(); p != log.end(); ++p) {
154 out << *p << " " <<
155 (logged_object(p->soid) ? "indexed" : "NOT INDEXED") <<
156 std::endl;
157 ceph_assert(!p->reqid_is_indexed() || logged_req(p->reqid));
158 }
159
160 for (auto p = dups.begin(); p != dups.end(); ++p) {
161 out << *p << std::endl;
162 }
163
164 return out;
165 }
166
167 //////////////////// PGLog ////////////////////
168
169 void PGLog::reset_backfill()
170 {
171 missing.clear();
172 }
173
174 void PGLog::clear() {
175 missing.clear();
176 log.clear();
177 log_keys_debug.clear();
178 undirty();
179 }
180
181 void PGLog::clear_info_log(
182 spg_t pgid,
183 ObjectStore::Transaction *t) {
184 coll_t coll(pgid);
185 t->remove(coll, pgid.make_pgmeta_oid());
186 }
187
188 void PGLog::trim(
189 eversion_t trim_to,
190 pg_info_t &info,
191 bool transaction_applied,
192 bool async)
193 {
194 dout(10) << __func__ << " proposed trim_to = " << trim_to << dendl;
195 // trim?
196 if (trim_to > log.tail) {
197 dout(10) << __func__ << " missing = " << missing.num_missing() << dendl;
198 // Don't assert for async_recovery_targets or backfill_targets
199 // or whenever there are missing items
200 if (transaction_applied && !async && (missing.num_missing() == 0))
201 ceph_assert(trim_to <= info.last_complete);
202
203 dout(10) << "trim " << log << " to " << trim_to << dendl;
204 log.trim(cct, trim_to, &trimmed, &trimmed_dups, &write_from_dups);
205 info.log_tail = log.tail;
206 if (log.complete_to != log.log.end())
207 dout(10) << " after trim complete_to " << log.complete_to->version << dendl;
208 }
209 }
210
211 void PGLog::proc_replica_log(
212 pg_info_t &oinfo,
213 const pg_log_t &olog,
214 pg_missing_t& omissing,
215 pg_shard_t from) const
216 {
217 dout(10) << "proc_replica_log for osd." << from << ": "
218 << oinfo << " " << olog << " " << omissing << dendl;
219
220 if (olog.head < log.tail) {
221 dout(10) << __func__ << ": osd." << from << " does not overlap, not looking "
222 << "for divergent objects" << dendl;
223 return;
224 }
225 if (olog.head == log.head) {
226 dout(10) << __func__ << ": osd." << from << " same log head, not looking "
227 << "for divergent objects" << dendl;
228 return;
229 }
230
231 /*
232 basically what we're doing here is rewinding the remote log,
233 dropping divergent entries, until we find something that matches
234 our master log. we then reset last_update to reflect the new
235 point up to which missing is accurate.
236
237 later, in activate(), missing will get wound forward again and
238 we will send the peer enough log to arrive at the same state.
239 */
240
241 for (auto i = omissing.get_items().begin();
242 i != omissing.get_items().end();
243 ++i) {
244 dout(20) << " before missing " << i->first << " need " << i->second.need
245 << " have " << i->second.have << dendl;
246 }
247
248 auto first_non_divergent = log.log.rbegin();
249 while (1) {
250 if (first_non_divergent == log.log.rend())
251 break;
252 if (first_non_divergent->version <= olog.head) {
253 dout(20) << "merge_log point (usually last shared) is "
254 << *first_non_divergent << dendl;
255 break;
256 }
257 ++first_non_divergent;
258 }
259
260 /* Because olog.head >= log.tail, we know that both pgs must at least have
261 * the event represented by log.tail. Similarly, because log.head >= olog.tail,
262 * we know that the event represented by olog.tail must be common to both logs.
263 * Furthermore, the event represented by a log tail was necessarily trimmed,
264 * thus neither olog.tail nor log.tail can be divergent. It's
265 * possible that olog/log contain no actual events between olog.head and
266 * max(log.tail, olog.tail), however, since they might have been split out.
267 * Thus, if we cannot find an event e such that
268 * log.tail <= e.version <= log.head, the last_update must actually be
269 * max(log.tail, olog.tail).
270 */
271 eversion_t limit = std::max(olog.tail, log.tail);
272 eversion_t lu =
273 (first_non_divergent == log.log.rend() ||
274 first_non_divergent->version < limit) ?
275 limit :
276 first_non_divergent->version;
277
278 // we merge and adjust the replica's log, rollback the rollbackable divergent entry,
279 // remove the unrollbackable divergent entry and mark the according object as missing.
280 // the rollback boundary must choose crt of the olog which going to be merged.
281 // The replica log's(olog) crt will not be modified, so it could get passed
282 // to _merge_divergent_entries() directly.
283 IndexedLog folog(olog);
284 auto divergent = folog.rewind_from_head(lu);
285 _merge_divergent_entries(
286 folog,
287 divergent,
288 oinfo,
289 olog.get_can_rollback_to(),
290 omissing,
291 0,
292 this);
293
294 if (lu < oinfo.last_update) {
295 dout(10) << " peer osd." << from << " last_update now " << lu << dendl;
296 oinfo.last_update = lu;
297 }
298
299 if (omissing.have_missing()) {
300 eversion_t first_missing =
301 omissing.get_items().at(omissing.get_rmissing().begin()->second).need;
302 oinfo.last_complete = eversion_t();
303 for (auto i = olog.log.begin(); i != olog.log.end(); ++i) {
304 if (i->version < first_missing)
305 oinfo.last_complete = i->version;
306 else
307 break;
308 }
309 } else {
310 oinfo.last_complete = oinfo.last_update;
311 }
312 } // proc_replica_log
313
314 /**
315 * rewind divergent entries at the head of the log
316 *
317 * This rewinds entries off the head of our log that are divergent.
318 * This is used by replicas during activation.
319 *
320 * @param newhead new head to rewind to
321 */
322 void PGLog::rewind_divergent_log(eversion_t newhead,
323 pg_info_t &info, LogEntryHandler *rollbacker,
324 bool &dirty_info, bool &dirty_big_info)
325 {
326 dout(10) << "rewind_divergent_log truncate divergent future " <<
327 newhead << dendl;
328
329 // We need to preserve the original crt before it gets updated in rewind_from_head().
330 // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback
331 // a divergent entry or not.
332 eversion_t original_crt = log.get_can_rollback_to();
333 dout(20) << __func__ << " original_crt = " << original_crt << dendl;
334 if (info.last_complete > newhead)
335 info.last_complete = newhead;
336
337 auto divergent = log.rewind_from_head(newhead);
338 if (!divergent.empty()) {
339 mark_dirty_from(divergent.front().version);
340 }
341 for (auto &&entry: divergent) {
342 dout(10) << "rewind_divergent_log future divergent " << entry << dendl;
343 }
344 info.last_update = newhead;
345
346 _merge_divergent_entries(
347 log,
348 divergent,
349 info,
350 original_crt,
351 missing,
352 rollbacker,
353 this);
354
355 dirty_info = true;
356 dirty_big_info = true;
357 }
358
359 void PGLog::merge_log(pg_info_t &oinfo, pg_log_t&& olog, pg_shard_t fromosd,
360 pg_info_t &info, LogEntryHandler *rollbacker,
361 bool &dirty_info, bool &dirty_big_info)
362 {
363 dout(10) << "merge_log " << olog << " from osd." << fromosd
364 << " into " << log << dendl;
365
366 // Check preconditions
367
368 // If our log is empty, the incoming log needs to have not been trimmed.
369 ceph_assert(!log.null() || olog.tail == eversion_t());
370 // The logs must overlap.
371 ceph_assert(log.head >= olog.tail && olog.head >= log.tail);
372
373 for (auto i = missing.get_items().begin();
374 i != missing.get_items().end();
375 ++i) {
376 dout(20) << "pg_missing_t sobject: " << i->first << dendl;
377 }
378
379 bool changed = false;
380
381 // extend on tail?
382 // this is just filling in history. it does not affect our
383 // missing set, as that should already be consistent with our
384 // current log.
385 eversion_t orig_tail = log.tail;
386 if (olog.tail < log.tail) {
387 dout(10) << "merge_log extending tail to " << olog.tail << dendl;
388 auto from = olog.log.begin();
389 auto to = from;
390 eversion_t last;
391 for (; to != olog.log.end(); ++to) {
392 if (to->version > log.tail)
393 break;
394 log.index(*to);
395 dout(15) << *to << dendl;
396 last = to->version;
397 }
398 mark_dirty_to(last);
399
400 // splice into our log.
401 log.log.splice(log.log.begin(),
402 std::move(olog.log), from, to);
403
404 info.log_tail = log.tail = olog.tail;
405 changed = true;
406 }
407
408 if (oinfo.stats.reported_seq < info.stats.reported_seq || // make sure reported always increases
409 oinfo.stats.reported_epoch < info.stats.reported_epoch) {
410 oinfo.stats.reported_seq = info.stats.reported_seq;
411 oinfo.stats.reported_epoch = info.stats.reported_epoch;
412 }
413 if (info.last_backfill.is_max())
414 info.stats = oinfo.stats;
415 info.hit_set = oinfo.hit_set;
416
417 // do we have divergent entries to throw out?
418 if (olog.head < log.head) {
419 rewind_divergent_log(olog.head, info, rollbacker, dirty_info, dirty_big_info);
420 changed = true;
421 }
422
423 // extend on head?
424 if (olog.head > log.head) {
425 dout(10) << "merge_log extending head to " << olog.head << dendl;
426
427 // find start point in olog
428 auto to = olog.log.end();
429 auto from = olog.log.end();
430 eversion_t lower_bound = std::max(olog.tail, orig_tail);
431 while (1) {
432 if (from == olog.log.begin())
433 break;
434 --from;
435 dout(20) << " ? " << *from << dendl;
436 if (from->version <= log.head) {
437 lower_bound = std::max(lower_bound, from->version);
438 ++from;
439 break;
440 }
441 }
442 dout(20) << "merge_log cut point (usually last shared) is "
443 << lower_bound << dendl;
444 mark_dirty_from(lower_bound);
445
446 // We need to preserve the original crt before it gets updated in rewind_from_head().
447 // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback
448 // a divergent entry or not.
449 eversion_t original_crt = log.get_can_rollback_to();
450 dout(20) << __func__ << " original_crt = " << original_crt << dendl;
451 auto divergent = log.rewind_from_head(lower_bound);
452 // move aside divergent items
453 for (auto &&oe: divergent) {
454 dout(10) << "merge_log divergent " << oe << dendl;
455 }
456 log.roll_forward_to(log.head, rollbacker);
457
458 mempool::osd_pglog::list<pg_log_entry_t> new_entries;
459 new_entries.splice(new_entries.end(), olog.log, from, to);
460 append_log_entries_update_missing(
461 info.last_backfill,
462 new_entries,
463 false,
464 &log,
465 missing,
466 rollbacker,
467 this);
468
469 _merge_divergent_entries(
470 log,
471 divergent,
472 info,
473 original_crt,
474 missing,
475 rollbacker,
476 this);
477
478 info.last_update = log.head = olog.head;
479
480 // We cannot rollback into the new log entries
481 log.skip_can_rollback_to_to_head();
482
483 info.last_user_version = oinfo.last_user_version;
484 info.purged_snaps = oinfo.purged_snaps;
485 // update num_missing too
486 // we might have appended some more missing objects above
487 info.stats.stats.sum.num_objects_missing = missing.num_missing();
488
489 changed = true;
490 }
491
492 // now handle dups
493 if (merge_log_dups(olog)) {
494 changed = true;
495 }
496
497 dout(10) << "merge_log result " << log << " " << missing <<
498 " changed=" << changed << dendl;
499
500 if (changed) {
501 dirty_info = true;
502 dirty_big_info = true;
503 }
504 }
505
506
507 // returns true if any changes were made to log.dups
508 bool PGLog::merge_log_dups(const pg_log_t& olog) {
509 bool changed = false;
510
511 if (!olog.dups.empty()) {
512 if (log.dups.empty()) {
513 dout(10) << "merge_log copying olog dups to log " <<
514 olog.dups.front().version << " to " <<
515 olog.dups.back().version << dendl;
516 changed = true;
517 dirty_from_dups = eversion_t();
518 dirty_to_dups = eversion_t::max();
519 // since our log.dups is empty just copy them
520 for (const auto& i : olog.dups) {
521 log.dups.push_back(i);
522 log.index(log.dups.back());
523 }
524 } else {
525 // since our log.dups is not empty try to extend on each end
526
527 if (olog.dups.back().version > log.dups.back().version) {
528 // extend the dups's tail (i.e., newer dups)
529 dout(10) << "merge_log extending dups tail to " <<
530 olog.dups.back().version << dendl;
531 changed = true;
532
533 auto log_tail_version = log.dups.back().version;
534
535 auto insert_cursor = log.dups.end();
536 eversion_t last_shared = eversion_t::max();
537 for (auto i = olog.dups.crbegin(); i != olog.dups.crend(); ++i) {
538 if (i->version <= log_tail_version) break;
539 log.dups.insert(insert_cursor, *i);
540 last_shared = i->version;
541
542 auto prev = insert_cursor;
543 --prev;
544 // be sure to pass reference of copy in log.dups
545 log.index(*prev);
546
547 --insert_cursor; // make sure we insert in reverse order
548 }
549 mark_dirty_from_dups(last_shared);
550 }
551
552 if (olog.dups.front().version < log.dups.front().version) {
553 // extend the dups's head (i.e., older dups)
554 dout(10) << "merge_log extending dups head to " <<
555 olog.dups.front().version << dendl;
556 changed = true;
557
558 eversion_t last;
559 auto insert_cursor = log.dups.begin();
560 for (auto i = olog.dups.cbegin(); i != olog.dups.cend(); ++i) {
561 if (i->version >= insert_cursor->version) break;
562 log.dups.insert(insert_cursor, *i);
563 last = i->version;
564 auto prev = insert_cursor;
565 --prev;
566 // be sure to pass address of copy in log.dups
567 log.index(*prev);
568 }
569 mark_dirty_to_dups(last);
570 }
571 }
572 }
573
574 // remove any dup entries that overlap with pglog
575 if (!log.dups.empty() && log.dups.back().version > log.tail) {
576 dout(10) << "merge_log removed dups overlapping log entries (" <<
577 log.tail << "," << log.dups.back().version << "]" << dendl;
578 changed = true;
579
580 while (!log.dups.empty() && log.dups.back().version > log.tail) {
581 log.unindex(log.dups.back());
582 mark_dirty_from_dups(log.dups.back().version);
583 log.dups.pop_back();
584 }
585 }
586
587 return changed;
588 }
589
590 void PGLog::check() {
591 if (!pg_log_debug)
592 return;
593 if (log.log.size() != log_keys_debug.size()) {
594 derr << "log.log.size() != log_keys_debug.size()" << dendl;
595 derr << "actual log:" << dendl;
596 for (auto i = log.log.begin(); i != log.log.end(); ++i) {
597 derr << " " << *i << dendl;
598 }
599 derr << "log_keys_debug:" << dendl;
600 for (auto i = log_keys_debug.begin();
601 i != log_keys_debug.end();
602 ++i) {
603 derr << " " << *i << dendl;
604 }
605 }
606 ceph_assert(log.log.size() == log_keys_debug.size());
607 for (auto i = log.log.begin(); i != log.log.end(); ++i) {
608 ceph_assert(log_keys_debug.count(i->get_key_name()));
609 }
610 }
611
612 // non-static
613 void PGLog::write_log_and_missing(
614 ObjectStore::Transaction& t,
615 map<string,bufferlist> *km,
616 const coll_t& coll,
617 const ghobject_t &log_oid,
618 bool require_rollback)
619 {
620 if (needs_write()) {
621 dout(6) << "write_log_and_missing with: "
622 << "dirty_to: " << dirty_to
623 << ", dirty_from: " << dirty_from
624 << ", writeout_from: " << writeout_from
625 << ", trimmed: " << trimmed
626 << ", trimmed_dups: " << trimmed_dups
627 << ", clear_divergent_priors: " << clear_divergent_priors
628 << dendl;
629 _write_log_and_missing(
630 t, km, log, coll, log_oid,
631 dirty_to,
632 dirty_from,
633 writeout_from,
634 std::move(trimmed),
635 std::move(trimmed_dups),
636 missing,
637 !touched_log,
638 require_rollback,
639 clear_divergent_priors,
640 dirty_to_dups,
641 dirty_from_dups,
642 write_from_dups,
643 &may_include_deletes_in_missing_dirty,
644 (pg_log_debug ? &log_keys_debug : nullptr));
645 undirty();
646 } else {
647 dout(10) << "log is not dirty" << dendl;
648 }
649 }
650
651 // static
652 void PGLog::write_log_and_missing_wo_missing(
653 ObjectStore::Transaction& t,
654 map<string,bufferlist> *km,
655 pg_log_t &log,
656 const coll_t& coll, const ghobject_t &log_oid,
657 map<eversion_t, hobject_t> &divergent_priors,
658 bool require_rollback
659 )
660 {
661 _write_log_and_missing_wo_missing(
662 t, km, log, coll, log_oid,
663 divergent_priors, eversion_t::max(), eversion_t(), eversion_t(),
664 true, true, require_rollback,
665 eversion_t::max(), eversion_t(), eversion_t(), nullptr);
666 }
667
668 // static
669 void PGLog::write_log_and_missing(
670 ObjectStore::Transaction& t,
671 map<string,bufferlist> *km,
672 pg_log_t &log,
673 const coll_t& coll,
674 const ghobject_t &log_oid,
675 const pg_missing_tracker_t &missing,
676 bool require_rollback,
677 bool *may_include_deletes_in_missing_dirty)
678 {
679 _write_log_and_missing(
680 t, km, log, coll, log_oid,
681 eversion_t::max(),
682 eversion_t(),
683 eversion_t(),
684 set<eversion_t>(),
685 set<string>(),
686 missing,
687 true, require_rollback, false,
688 eversion_t::max(),
689 eversion_t(),
690 eversion_t(),
691 may_include_deletes_in_missing_dirty, nullptr);
692 }
693
694 // static
695 void PGLog::_write_log_and_missing_wo_missing(
696 ObjectStore::Transaction& t,
697 map<string,bufferlist> *km,
698 pg_log_t &log,
699 const coll_t& coll, const ghobject_t &log_oid,
700 map<eversion_t, hobject_t> &divergent_priors,
701 eversion_t dirty_to,
702 eversion_t dirty_from,
703 eversion_t writeout_from,
704 bool dirty_divergent_priors,
705 bool touch_log,
706 bool require_rollback,
707 eversion_t dirty_to_dups,
708 eversion_t dirty_from_dups,
709 eversion_t write_from_dups,
710 set<string> *log_keys_debug
711 )
712 {
713 // dout(10) << "write_log_and_missing, clearing up to " << dirty_to << dendl;
714 if (touch_log)
715 t.touch(coll, log_oid);
716 if (dirty_to != eversion_t()) {
717 t.omap_rmkeyrange(
718 coll, log_oid,
719 eversion_t().get_key_name(), dirty_to.get_key_name());
720 clear_up_to(log_keys_debug, dirty_to.get_key_name());
721 }
722 if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) {
723 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
724 t.omap_rmkeyrange(
725 coll, log_oid,
726 dirty_from.get_key_name(), eversion_t::max().get_key_name());
727 clear_after(log_keys_debug, dirty_from.get_key_name());
728 }
729
730 for (auto p = log.log.begin();
731 p != log.log.end() && p->version <= dirty_to;
732 ++p) {
733 bufferlist bl(sizeof(*p) * 2);
734 p->encode_with_checksum(bl);
735 (*km)[p->get_key_name()] = std::move(bl);
736 }
737
738 for (auto p = log.log.rbegin();
739 p != log.log.rend() &&
740 (p->version >= dirty_from || p->version >= writeout_from) &&
741 p->version >= dirty_to;
742 ++p) {
743 bufferlist bl(sizeof(*p) * 2);
744 p->encode_with_checksum(bl);
745 (*km)[p->get_key_name()] = std::move(bl);
746 }
747
748 if (log_keys_debug) {
749 for (auto i = (*km).begin();
750 i != (*km).end();
751 ++i) {
752 if (i->first[0] == '_')
753 continue;
754 ceph_assert(!log_keys_debug->count(i->first));
755 log_keys_debug->insert(i->first);
756 }
757 }
758
759 // process dups after log_keys_debug is filled, so dups do not
760 // end up in that set
761 if (dirty_to_dups != eversion_t()) {
762 pg_log_dup_t min, dirty_to_dup;
763 dirty_to_dup.version = dirty_to_dups;
764 t.omap_rmkeyrange(
765 coll, log_oid,
766 min.get_key_name(), dirty_to_dup.get_key_name());
767 }
768 if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
769 pg_log_dup_t max, dirty_from_dup;
770 max.version = eversion_t::max();
771 dirty_from_dup.version = dirty_from_dups;
772 t.omap_rmkeyrange(
773 coll, log_oid,
774 dirty_from_dup.get_key_name(), max.get_key_name());
775 }
776
777 for (const auto& entry : log.dups) {
778 if (entry.version > dirty_to_dups)
779 break;
780 bufferlist bl;
781 encode(entry, bl);
782 (*km)[entry.get_key_name()] = std::move(bl);
783 }
784
785 for (auto p = log.dups.rbegin();
786 p != log.dups.rend() &&
787 (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
788 p->version >= dirty_to_dups;
789 ++p) {
790 bufferlist bl;
791 encode(*p, bl);
792 (*km)[p->get_key_name()] = std::move(bl);
793 }
794
795 if (dirty_divergent_priors) {
796 //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
797 encode(divergent_priors, (*km)["divergent_priors"]);
798 }
799 if (require_rollback) {
800 encode(
801 log.get_can_rollback_to(),
802 (*km)["can_rollback_to"]);
803 encode(
804 log.get_rollback_info_trimmed_to(),
805 (*km)["rollback_info_trimmed_to"]);
806 }
807 }
808
809 // static
810 void PGLog::_write_log_and_missing(
811 ObjectStore::Transaction& t,
812 map<string,bufferlist>* km,
813 pg_log_t &log,
814 const coll_t& coll, const ghobject_t &log_oid,
815 eversion_t dirty_to,
816 eversion_t dirty_from,
817 eversion_t writeout_from,
818 set<eversion_t> &&trimmed,
819 set<string> &&trimmed_dups,
820 const pg_missing_tracker_t &missing,
821 bool touch_log,
822 bool require_rollback,
823 bool clear_divergent_priors,
824 eversion_t dirty_to_dups,
825 eversion_t dirty_from_dups,
826 eversion_t write_from_dups,
827 bool *may_include_deletes_in_missing_dirty, // in/out param
828 set<string> *log_keys_debug
829 ) {
830 set<string> to_remove;
831 to_remove.swap(trimmed_dups);
832 for (auto& t : trimmed) {
833 string key = t.get_key_name();
834 if (log_keys_debug) {
835 auto it = log_keys_debug->find(key);
836 ceph_assert(it != log_keys_debug->end());
837 log_keys_debug->erase(it);
838 }
839 to_remove.emplace(std::move(key));
840 }
841 trimmed.clear();
842
843 if (touch_log)
844 t.touch(coll, log_oid);
845 if (dirty_to != eversion_t()) {
846 t.omap_rmkeyrange(
847 coll, log_oid,
848 eversion_t().get_key_name(), dirty_to.get_key_name());
849 clear_up_to(log_keys_debug, dirty_to.get_key_name());
850 }
851 if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) {
852 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
853 t.omap_rmkeyrange(
854 coll, log_oid,
855 dirty_from.get_key_name(), eversion_t::max().get_key_name());
856 clear_after(log_keys_debug, dirty_from.get_key_name());
857 }
858
859 for (auto p = log.log.begin();
860 p != log.log.end() && p->version <= dirty_to;
861 ++p) {
862 bufferlist bl(sizeof(*p) * 2);
863 p->encode_with_checksum(bl);
864 (*km)[p->get_key_name()] = std::move(bl);
865 }
866
867 for (auto p = log.log.rbegin();
868 p != log.log.rend() &&
869 (p->version >= dirty_from || p->version >= writeout_from) &&
870 p->version >= dirty_to;
871 ++p) {
872 bufferlist bl(sizeof(*p) * 2);
873 p->encode_with_checksum(bl);
874 (*km)[p->get_key_name()] = std::move(bl);
875 }
876
877 if (log_keys_debug) {
878 for (auto i = (*km).begin();
879 i != (*km).end();
880 ++i) {
881 if (i->first[0] == '_')
882 continue;
883 ceph_assert(!log_keys_debug->count(i->first));
884 log_keys_debug->insert(i->first);
885 }
886 }
887
888 // process dups after log_keys_debug is filled, so dups do not
889 // end up in that set
890 if (dirty_to_dups != eversion_t()) {
891 pg_log_dup_t min, dirty_to_dup;
892 dirty_to_dup.version = dirty_to_dups;
893 t.omap_rmkeyrange(
894 coll, log_oid,
895 min.get_key_name(), dirty_to_dup.get_key_name());
896 }
897 if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
898 pg_log_dup_t max, dirty_from_dup;
899 max.version = eversion_t::max();
900 dirty_from_dup.version = dirty_from_dups;
901 t.omap_rmkeyrange(
902 coll, log_oid,
903 dirty_from_dup.get_key_name(), max.get_key_name());
904 }
905
906 for (const auto& entry : log.dups) {
907 if (entry.version > dirty_to_dups)
908 break;
909 bufferlist bl;
910 encode(entry, bl);
911 (*km)[entry.get_key_name()] = std::move(bl);
912 }
913
914 for (auto p = log.dups.rbegin();
915 p != log.dups.rend() &&
916 (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
917 p->version >= dirty_to_dups;
918 ++p) {
919 bufferlist bl;
920 encode(*p, bl);
921 (*km)[p->get_key_name()] = std::move(bl);
922 }
923
924 if (clear_divergent_priors) {
925 //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
926 to_remove.insert("divergent_priors");
927 }
928 // since we encode individual missing items instead of a whole
929 // missing set, we need another key to store this bit of state
930 if (*may_include_deletes_in_missing_dirty) {
931 (*km)["may_include_deletes_in_missing"] = bufferlist();
932 *may_include_deletes_in_missing_dirty = false;
933 }
934 missing.get_changed(
935 [&](const hobject_t &obj) {
936 string key = string("missing/") + obj.to_str();
937 pg_missing_item item;
938 if (!missing.is_missing(obj, &item)) {
939 to_remove.insert(key);
940 } else {
941 encode(make_pair(obj, item), (*km)[key], CEPH_FEATUREMASK_SERVER_OCTOPUS);
942 }
943 });
944 if (require_rollback) {
945 encode(
946 log.get_can_rollback_to(),
947 (*km)["can_rollback_to"]);
948 encode(
949 log.get_rollback_info_trimmed_to(),
950 (*km)["rollback_info_trimmed_to"]);
951 }
952
953 if (!to_remove.empty())
954 t.omap_rmkeys(coll, log_oid, to_remove);
955 }
956
957 void PGLog::rebuild_missing_set_with_deletes(
958 ObjectStore *store,
959 ObjectStore::CollectionHandle& ch,
960 const pg_info_t &info)
961 {
962 // save entries not generated from the current log (e.g. added due
963 // to repair, EIO handling, or divergent_priors).
964 map<hobject_t, pg_missing_item> extra_missing;
965 for (const auto& p : missing.get_items()) {
966 if (!log.logged_object(p.first)) {
967 dout(20) << __func__ << " extra missing entry: " << p.first
968 << " " << p.second << dendl;
969 extra_missing[p.first] = p.second;
970 }
971 }
972 missing.clear();
973
974 // go through the log and add items that are not present or older
975 // versions on disk, just as if we were reading the log + metadata
976 // off disk originally
977 set<hobject_t> did;
978 for (auto i = log.log.rbegin();
979 i != log.log.rend();
980 ++i) {
981 if (i->version <= info.last_complete)
982 break;
983 if (i->soid > info.last_backfill ||
984 i->is_error() ||
985 did.find(i->soid) != did.end())
986 continue;
987 did.insert(i->soid);
988
989 bufferlist bv;
990 int r = store->getattr(
991 ch,
992 ghobject_t(i->soid, ghobject_t::NO_GEN, info.pgid.shard),
993 OI_ATTR,
994 bv);
995 dout(20) << __func__ << " check for log entry: " << *i << " = " << r << dendl;
996
997 if (r >= 0) {
998 object_info_t oi(bv);
999 dout(20) << __func__ << " store version = " << oi.version << dendl;
1000 if (oi.version < i->version) {
1001 missing.add(i->soid, i->version, oi.version, i->is_delete());
1002 }
1003 } else {
1004 missing.add(i->soid, i->version, eversion_t(), i->is_delete());
1005 }
1006 }
1007
1008 for (const auto& p : extra_missing) {
1009 missing.add(p.first, p.second.need, p.second.have, p.second.is_delete());
1010 }
1011
1012 set_missing_may_contain_deletes();
1013 }
1014
1015 #ifdef WITH_SEASTAR
1016
1017 namespace {
1018 struct FuturizedStoreLogReader {
1019 crimson::os::FuturizedStore &store;
1020 const pg_info_t &info;
1021 PGLog::IndexedLog &log;
1022 std::set<std::string>* log_keys_debug = NULL;
1023 pg_missing_tracker_t &missing;
1024 const DoutPrefixProvider *dpp;
1025
1026 eversion_t on_disk_can_rollback_to;
1027 eversion_t on_disk_rollback_info_trimmed_to;
1028
1029 std::map<eversion_t, hobject_t> divergent_priors;
1030 bool must_rebuild = false;
1031 std::list<pg_log_entry_t> entries;
1032 std::list<pg_log_dup_t> dups;
1033
1034 std::optional<std::string> next;
1035
1036 void process_entry(crimson::os::FuturizedStore::OmapIteratorRef &p) {
1037 if (p->key()[0] == '_')
1038 return;
1039 //Copy ceph::buffer::list before creating iterator
1040 auto bl = p->value();
1041 auto bp = bl.cbegin();
1042 if (p->key() == "divergent_priors") {
1043 decode(divergent_priors, bp);
1044 ldpp_dout(dpp, 20) << "read_log_and_missing " << divergent_priors.size()
1045 << " divergent_priors" << dendl;
1046 ceph_assert("crimson shouldn't have had divergent_priors" == 0);
1047 } else if (p->key() == "can_rollback_to") {
1048 decode(on_disk_can_rollback_to, bp);
1049 } else if (p->key() == "rollback_info_trimmed_to") {
1050 decode(on_disk_rollback_info_trimmed_to, bp);
1051 } else if (p->key() == "may_include_deletes_in_missing") {
1052 missing.may_include_deletes = true;
1053 } else if (p->key().substr(0, 7) == std::string("missing")) {
1054 hobject_t oid;
1055 pg_missing_item item;
1056 decode(oid, bp);
1057 decode(item, bp);
1058 if (item.is_delete()) {
1059 ceph_assert(missing.may_include_deletes);
1060 }
1061 missing.add(oid, std::move(item));
1062 } else if (p->key().substr(0, 4) == std::string("dup_")) {
1063 pg_log_dup_t dup;
1064 decode(dup, bp);
1065 if (!dups.empty()) {
1066 ceph_assert(dups.back().version < dup.version);
1067 }
1068 dups.push_back(dup);
1069 } else {
1070 pg_log_entry_t e;
1071 e.decode_with_checksum(bp);
1072 ldpp_dout(dpp, 20) << "read_log_and_missing " << e << dendl;
1073 if (!entries.empty()) {
1074 pg_log_entry_t last_e(entries.back());
1075 ceph_assert(last_e.version.version < e.version.version);
1076 ceph_assert(last_e.version.epoch <= e.version.epoch);
1077 }
1078 entries.push_back(e);
1079 if (log_keys_debug)
1080 log_keys_debug->insert(e.get_key_name());
1081 }
1082 }
1083
1084 seastar::future<> read(crimson::os::CollectionRef ch,
1085 ghobject_t pgmeta_oid) {
1086 // will get overridden if recorded
1087 on_disk_can_rollback_to = info.last_update;
1088 missing.may_include_deletes = false;
1089
1090 return store.get_omap_iterator(ch, pgmeta_oid).then([this](auto iter) {
1091 return seastar::do_until([iter] { return !iter->valid(); },
1092 [iter, this]() mutable {
1093 process_entry(iter);
1094 return iter->next();
1095 });
1096 }).then([this] {
1097 if (info.pgid.is_no_shard()) {
1098 // replicated pool pg does not persist this key
1099 assert(on_disk_rollback_info_trimmed_to == eversion_t());
1100 on_disk_rollback_info_trimmed_to = info.last_update;
1101 }
1102 log = PGLog::IndexedLog(
1103 info.last_update,
1104 info.log_tail,
1105 on_disk_can_rollback_to,
1106 on_disk_rollback_info_trimmed_to,
1107 std::move(entries),
1108 std::move(dups));
1109 });
1110 }
1111 };
1112 }
1113
1114 seastar::future<> PGLog::read_log_and_missing_crimson(
1115 crimson::os::FuturizedStore &store,
1116 crimson::os::CollectionRef ch,
1117 const pg_info_t &info,
1118 IndexedLog &log,
1119 std::set<std::string>* log_keys_debug,
1120 pg_missing_tracker_t &missing,
1121 ghobject_t pgmeta_oid,
1122 const DoutPrefixProvider *dpp)
1123 {
1124 ldpp_dout(dpp, 20) << "read_log_and_missing coll "
1125 << ch->get_cid()
1126 << " " << pgmeta_oid << dendl;
1127 return seastar::do_with(FuturizedStoreLogReader{
1128 store, info, log, log_keys_debug,
1129 missing, dpp},
1130 [ch, pgmeta_oid](FuturizedStoreLogReader& reader) {
1131 return reader.read(ch, pgmeta_oid);
1132 });
1133 }
1134
1135 #endif