]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/PGLog.cc
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / osd / PGLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
8 *
9 * Author: Loic Dachary <loic@dachary.org>
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 *
16 */
17
18 #include "PGLog.h"
19 #include "include/unordered_map.h"
20 #include "common/ceph_context.h"
21
22 using std::make_pair;
23 using std::map;
24 using std::ostream;
25 using std::set;
26 using std::string;
27
28 using ceph::bufferlist;
29 using ceph::decode;
30 using ceph::encode;
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_osd
34 #undef dout_prefix
35 #define dout_prefix _prefix(_dout, this)
36
37 static ostream& _prefix(std::ostream *_dout, const PGLog *pglog)
38 {
39 return pglog->gen_prefix(*_dout);
40 }
41
42 //////////////////// PGLog::IndexedLog ////////////////////
43
44 void PGLog::IndexedLog::split_out_child(
45 pg_t child_pgid,
46 unsigned split_bits,
47 PGLog::IndexedLog *target)
48 {
49 unindex();
50 *target = IndexedLog(pg_log_t::split_out_child(child_pgid, split_bits));
51 index();
52 target->index();
53 reset_rollback_info_trimmed_to_riter();
54 }
55
56 void PGLog::IndexedLog::trim(
57 CephContext* cct,
58 eversion_t s,
59 set<eversion_t> *trimmed,
60 set<string>* trimmed_dups,
61 eversion_t *write_from_dups)
62 {
63 lgeneric_subdout(cct, osd, 10) << "IndexedLog::trim s=" << s << dendl;
64 ceph_assert(s <= can_rollback_to);
65 if (complete_to != log.end())
66 lgeneric_subdout(cct, osd, 20) << " complete_to " << complete_to->version << dendl;
67
68 auto earliest_dup_version =
69 log.rbegin()->version.version < cct->_conf->osd_pg_log_dups_tracked
70 ? 0u
71 : log.rbegin()->version.version - cct->_conf->osd_pg_log_dups_tracked + 1;
72
73 lgeneric_subdout(cct, osd, 20) << "earliest_dup_version = " << earliest_dup_version << dendl;
74 while (!log.empty()) {
75 const pg_log_entry_t &e = *log.begin();
76 if (e.version > s)
77 break;
78 lgeneric_subdout(cct, osd, 20) << "trim " << e << dendl;
79 if (trimmed)
80 trimmed->emplace(e.version);
81
82 unindex(e); // remove from index,
83
84 // add to dup list
85 if (e.version.version >= earliest_dup_version) {
86 if (write_from_dups != nullptr && *write_from_dups > e.version) {
87 lgeneric_subdout(cct, osd, 20) << "updating write_from_dups from " << *write_from_dups << " to " << e.version << dendl;
88 *write_from_dups = e.version;
89 }
90 dups.push_back(pg_log_dup_t(e));
91 index(dups.back());
92 uint32_t idx = 0;
93 for (const auto& extra : e.extra_reqids) {
94 int return_code = e.return_code;
95 if (return_code >= 0) {
96 auto it = e.extra_reqid_return_codes.find(idx);
97 if (it != e.extra_reqid_return_codes.end()) {
98 return_code = it->second;
99 // FIXME: we aren't setting op_returns for these extra_reqids
100 }
101 }
102 ++idx;
103
104 // note: extras have the same version as outer op
105 dups.push_back(pg_log_dup_t(e.version, extra.second,
106 extra.first, return_code));
107 index(dups.back());
108 }
109 }
110
111 bool reset_complete_to = false;
112 // we are trimming past complete_to, so reset complete_to
113 if (complete_to != log.end() && e.version >= complete_to->version)
114 reset_complete_to = true;
115 if (rollback_info_trimmed_to_riter == log.rend() ||
116 e.version == rollback_info_trimmed_to_riter->version) {
117 log.pop_front();
118 rollback_info_trimmed_to_riter = log.rend();
119 } else {
120 log.pop_front();
121 }
122
123 // reset complete_to to the beginning of the log
124 if (reset_complete_to) {
125 complete_to = log.begin();
126 if (complete_to != log.end()) {
127 lgeneric_subdout(cct, osd, 20) << " moving complete_to to "
128 << log.begin()->version << dendl;
129 } else {
130 lgeneric_subdout(cct, osd, 20) << " log is now empty" << dendl;
131 }
132 }
133 }
134
135 // we can hit an inflated `dups` b/c of https://tracker.ceph.com/issues/53729
136 // the idea is to slowly trim them over a prolonged period of time and mix
137 // omap deletes with writes (if we're here, a new log entry got added) to
138 // neither: 1) blow size of single Transaction nor 2) generate-n-accumulate
139 // large amount of tombstones in BlueStore's RocksDB.
140 // if trimming immediately is a must, then the ceph-objectstore-tool is
141 // the way to go.
142 const size_t max_dups = cct->_conf->osd_pg_log_dups_tracked;
143 for (size_t max_dups_to_trim = cct->_conf->osd_pg_log_trim_max;
144 max_dups_to_trim > 0 && dups.size() > max_dups;
145 max_dups_to_trim--) {
146 const auto& e = *dups.begin();
147 lgeneric_subdout(cct, osd, 20) << "trim dup " << e << dendl;
148 if (trimmed_dups)
149 trimmed_dups->insert(e.get_key_name());
150 unindex(e);
151 dups.pop_front();
152 }
153
154 // raise tail?
155 if (tail < s)
156 tail = s;
157 lgeneric_subdout(cct, osd, 20) << "IndexedLog::trim after trim"
158 << " dups.size()=" << dups.size()
159 << " tail=" << tail
160 << " s=" << s << dendl;
161 }
162
163 ostream& PGLog::IndexedLog::print(ostream& out) const
164 {
165 out << *this << std::endl;
166 for (auto p = log.begin(); p != log.end(); ++p) {
167 out << *p << " " <<
168 (logged_object(p->soid) ? "indexed" : "NOT INDEXED") <<
169 std::endl;
170 ceph_assert(!p->reqid_is_indexed() || logged_req(p->reqid));
171 }
172
173 for (auto p = dups.begin(); p != dups.end(); ++p) {
174 out << *p << std::endl;
175 }
176
177 return out;
178 }
179
180 //////////////////// PGLog ////////////////////
181
182 void PGLog::reset_backfill()
183 {
184 missing.clear();
185 }
186
187 void PGLog::clear() {
188 missing.clear();
189 log.clear();
190 log_keys_debug.clear();
191 undirty();
192 }
193
194 void PGLog::clear_info_log(
195 spg_t pgid,
196 ObjectStore::Transaction *t) {
197 coll_t coll(pgid);
198 t->remove(coll, pgid.make_pgmeta_oid());
199 }
200
201 void PGLog::trim(
202 eversion_t trim_to,
203 pg_info_t &info,
204 bool transaction_applied,
205 bool async)
206 {
207 dout(10) << __func__ << " proposed trim_to = " << trim_to << dendl;
208 // trim?
209 if (trim_to > log.tail) {
210 dout(10) << __func__ << " missing = " << missing.num_missing() << dendl;
211 // Don't assert for async_recovery_targets or backfill_targets
212 // or whenever there are missing items
213 if (transaction_applied && !async && (missing.num_missing() == 0))
214 ceph_assert(trim_to <= info.last_complete);
215
216 dout(10) << "trim " << log << " to " << trim_to << dendl;
217 log.trim(cct, trim_to, &trimmed, &trimmed_dups, &write_from_dups);
218 info.log_tail = log.tail;
219 if (log.complete_to != log.log.end())
220 dout(10) << " after trim complete_to " << log.complete_to->version << dendl;
221 }
222 }
223
224 void PGLog::proc_replica_log(
225 pg_info_t &oinfo,
226 const pg_log_t &olog,
227 pg_missing_t& omissing,
228 pg_shard_t from) const
229 {
230 dout(10) << "proc_replica_log for osd." << from << ": "
231 << oinfo << " " << olog << " " << omissing << dendl;
232
233 if (olog.head < log.tail) {
234 dout(10) << __func__ << ": osd." << from << " does not overlap, not looking "
235 << "for divergent objects" << dendl;
236 return;
237 }
238 if (olog.head == log.head) {
239 dout(10) << __func__ << ": osd." << from << " same log head, not looking "
240 << "for divergent objects" << dendl;
241 return;
242 }
243
244 /*
245 basically what we're doing here is rewinding the remote log,
246 dropping divergent entries, until we find something that matches
247 our master log. we then reset last_update to reflect the new
248 point up to which missing is accurate.
249
250 later, in activate(), missing will get wound forward again and
251 we will send the peer enough log to arrive at the same state.
252 */
253
254 for (auto i = omissing.get_items().begin();
255 i != omissing.get_items().end();
256 ++i) {
257 dout(20) << " before missing " << i->first << " need " << i->second.need
258 << " have " << i->second.have << dendl;
259 }
260
261 auto first_non_divergent = log.log.rbegin();
262 while (1) {
263 if (first_non_divergent == log.log.rend())
264 break;
265 if (first_non_divergent->version <= olog.head) {
266 dout(20) << "merge_log point (usually last shared) is "
267 << *first_non_divergent << dendl;
268 break;
269 }
270 ++first_non_divergent;
271 }
272
273 /* Because olog.head >= log.tail, we know that both pgs must at least have
274 * the event represented by log.tail. Similarly, because log.head >= olog.tail,
275 * we know that the event represented by olog.tail must be common to both logs.
276 * Furthermore, the event represented by a log tail was necessarily trimmed,
277 * thus neither olog.tail nor log.tail can be divergent. It's
278 * possible that olog/log contain no actual events between olog.head and
279 * max(log.tail, olog.tail), however, since they might have been split out.
280 * Thus, if we cannot find an event e such that
281 * log.tail <= e.version <= log.head, the last_update must actually be
282 * max(log.tail, olog.tail).
283 */
284 eversion_t limit = std::max(olog.tail, log.tail);
285 eversion_t lu =
286 (first_non_divergent == log.log.rend() ||
287 first_non_divergent->version < limit) ?
288 limit :
289 first_non_divergent->version;
290
291 // we merge and adjust the replica's log, rollback the rollbackable divergent entry,
292 // remove the unrollbackable divergent entry and mark the according object as missing.
293 // the rollback boundary must choose crt of the olog which going to be merged.
294 // The replica log's(olog) crt will not be modified, so it could get passed
295 // to _merge_divergent_entries() directly.
296 IndexedLog folog(olog);
297 auto divergent = folog.rewind_from_head(lu);
298 _merge_divergent_entries(
299 folog,
300 divergent,
301 oinfo,
302 olog.get_can_rollback_to(),
303 omissing,
304 0,
305 this);
306
307 if (lu < oinfo.last_update) {
308 dout(10) << " peer osd." << from << " last_update now " << lu << dendl;
309 oinfo.last_update = lu;
310 }
311
312 if (omissing.have_missing()) {
313 eversion_t first_missing =
314 omissing.get_items().at(omissing.get_rmissing().begin()->second).need;
315 oinfo.last_complete = eversion_t();
316 for (auto i = olog.log.begin(); i != olog.log.end(); ++i) {
317 if (i->version < first_missing)
318 oinfo.last_complete = i->version;
319 else
320 break;
321 }
322 } else {
323 oinfo.last_complete = oinfo.last_update;
324 }
325 } // proc_replica_log
326
327 /**
328 * rewind divergent entries at the head of the log
329 *
330 * This rewinds entries off the head of our log that are divergent.
331 * This is used by replicas during activation.
332 *
333 * @param newhead new head to rewind to
334 */
335 void PGLog::rewind_divergent_log(eversion_t newhead,
336 pg_info_t &info, LogEntryHandler *rollbacker,
337 bool &dirty_info, bool &dirty_big_info)
338 {
339 dout(10) << "rewind_divergent_log truncate divergent future " <<
340 newhead << dendl;
341
342 // We need to preserve the original crt before it gets updated in rewind_from_head().
343 // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback
344 // a divergent entry or not.
345 eversion_t original_crt = log.get_can_rollback_to();
346 dout(20) << __func__ << " original_crt = " << original_crt << dendl;
347 if (info.last_complete > newhead)
348 info.last_complete = newhead;
349
350 auto divergent = log.rewind_from_head(newhead);
351 if (!divergent.empty()) {
352 mark_dirty_from(divergent.front().version);
353 }
354 for (auto &&entry: divergent) {
355 dout(10) << "rewind_divergent_log future divergent " << entry << dendl;
356 }
357 info.last_update = newhead;
358
359 _merge_divergent_entries(
360 log,
361 divergent,
362 info,
363 original_crt,
364 missing,
365 rollbacker,
366 this);
367
368 dirty_info = true;
369 dirty_big_info = true;
370 }
371
372 void PGLog::merge_log(pg_info_t &oinfo, pg_log_t&& olog, pg_shard_t fromosd,
373 pg_info_t &info, LogEntryHandler *rollbacker,
374 bool &dirty_info, bool &dirty_big_info)
375 {
376 dout(10) << "merge_log " << olog << " from osd." << fromosd
377 << " into " << log << dendl;
378
379 // Check preconditions
380
381 // If our log is empty, the incoming log needs to have not been trimmed.
382 ceph_assert(!log.null() || olog.tail == eversion_t());
383 // The logs must overlap.
384 ceph_assert(log.head >= olog.tail && olog.head >= log.tail);
385
386 for (auto i = missing.get_items().begin();
387 i != missing.get_items().end();
388 ++i) {
389 dout(20) << "pg_missing_t sobject: " << i->first << dendl;
390 }
391
392 bool changed = false;
393
394 // extend on tail?
395 // this is just filling in history. it does not affect our
396 // missing set, as that should already be consistent with our
397 // current log.
398 eversion_t orig_tail = log.tail;
399 if (olog.tail < log.tail) {
400 dout(10) << "merge_log extending tail to " << olog.tail << dendl;
401 auto from = olog.log.begin();
402 auto to = from;
403 eversion_t last;
404 for (; to != olog.log.end(); ++to) {
405 if (to->version > log.tail)
406 break;
407 log.index(*to);
408 dout(15) << *to << dendl;
409 last = to->version;
410 }
411 mark_dirty_to(last);
412
413 // splice into our log.
414 log.log.splice(log.log.begin(),
415 std::move(olog.log), from, to);
416
417 info.log_tail = log.tail = olog.tail;
418 changed = true;
419 }
420
421 if (oinfo.stats.reported_seq < info.stats.reported_seq || // make sure reported always increases
422 oinfo.stats.reported_epoch < info.stats.reported_epoch) {
423 oinfo.stats.reported_seq = info.stats.reported_seq;
424 oinfo.stats.reported_epoch = info.stats.reported_epoch;
425 }
426 if (info.last_backfill.is_max())
427 info.stats = oinfo.stats;
428 info.hit_set = oinfo.hit_set;
429
430 // do we have divergent entries to throw out?
431 if (olog.head < log.head) {
432 rewind_divergent_log(olog.head, info, rollbacker, dirty_info, dirty_big_info);
433 changed = true;
434 }
435
436 // extend on head?
437 if (olog.head > log.head) {
438 dout(10) << "merge_log extending head to " << olog.head << dendl;
439
440 // find start point in olog
441 auto to = olog.log.end();
442 auto from = olog.log.end();
443 eversion_t lower_bound = std::max(olog.tail, orig_tail);
444 while (1) {
445 if (from == olog.log.begin())
446 break;
447 --from;
448 dout(20) << " ? " << *from << dendl;
449 if (from->version <= log.head) {
450 lower_bound = std::max(lower_bound, from->version);
451 ++from;
452 break;
453 }
454 }
455 dout(20) << "merge_log cut point (usually last shared) is "
456 << lower_bound << dendl;
457 mark_dirty_from(lower_bound);
458
459 // We need to preserve the original crt before it gets updated in rewind_from_head().
460 // Later, in merge_object_divergent_entries(), we use it to check whether we can rollback
461 // a divergent entry or not.
462 eversion_t original_crt = log.get_can_rollback_to();
463 dout(20) << __func__ << " original_crt = " << original_crt << dendl;
464 auto divergent = log.rewind_from_head(lower_bound);
465 // move aside divergent items
466 for (auto &&oe: divergent) {
467 dout(10) << "merge_log divergent " << oe << dendl;
468 }
469 log.roll_forward_to(log.head, rollbacker);
470
471 mempool::osd_pglog::list<pg_log_entry_t> new_entries;
472 new_entries.splice(new_entries.end(), olog.log, from, to);
473 append_log_entries_update_missing(
474 info.last_backfill,
475 new_entries,
476 false,
477 &log,
478 missing,
479 rollbacker,
480 this);
481
482 _merge_divergent_entries(
483 log,
484 divergent,
485 info,
486 original_crt,
487 missing,
488 rollbacker,
489 this);
490
491 info.last_update = log.head = olog.head;
492
493 // We cannot rollback into the new log entries
494 log.skip_can_rollback_to_to_head();
495
496 info.last_user_version = oinfo.last_user_version;
497 info.purged_snaps = oinfo.purged_snaps;
498 // update num_missing too
499 // we might have appended some more missing objects above
500 info.stats.stats.sum.num_objects_missing = missing.num_missing();
501
502 changed = true;
503 }
504
505 // now handle dups
506 if (merge_log_dups(olog)) {
507 changed = true;
508 }
509
510 dout(10) << "merge_log result " << log << " " << missing <<
511 " changed=" << changed << dendl;
512
513 if (changed) {
514 dirty_info = true;
515 dirty_big_info = true;
516 }
517 }
518
519
520 // returns true if any changes were made to log.dups
521 bool PGLog::merge_log_dups(const pg_log_t& olog) {
522 dout(5) << __func__
523 << " log.dups.size()=" << log.dups.size()
524 << "olog.dups.size()=" << olog.dups.size() << dendl;
525 bool changed = false;
526
527 if (!olog.dups.empty()) {
528 if (log.dups.empty()) {
529 dout(10) << "merge_log copying olog dups to log " <<
530 olog.dups.front().version << " to " <<
531 olog.dups.back().version << dendl;
532 changed = true;
533 dirty_from_dups = eversion_t();
534 dirty_to_dups = eversion_t::max();
535 // since our log.dups is empty just copy them
536 for (const auto& i : olog.dups) {
537 log.dups.push_back(i);
538 log.index(log.dups.back());
539 }
540 } else {
541 // since our log.dups is not empty try to extend on each end
542
543 if (olog.dups.back().version > log.dups.back().version) {
544 // extend the dups's tail (i.e., newer dups)
545 dout(10) << "merge_log extending dups tail to " <<
546 olog.dups.back().version << dendl;
547 changed = true;
548
549 auto log_tail_version = log.dups.back().version;
550
551 auto insert_cursor = log.dups.end();
552 eversion_t last_shared = eversion_t::max();
553 for (auto i = olog.dups.crbegin(); i != olog.dups.crend(); ++i) {
554 if (i->version <= log_tail_version) break;
555 log.dups.insert(insert_cursor, *i);
556 last_shared = i->version;
557
558 auto prev = insert_cursor;
559 --prev;
560 // be sure to pass reference of copy in log.dups
561 log.index(*prev);
562
563 --insert_cursor; // make sure we insert in reverse order
564 }
565 mark_dirty_from_dups(last_shared);
566 }
567
568 if (olog.dups.front().version < log.dups.front().version) {
569 // extend the dups's head (i.e., older dups)
570 dout(10) << "merge_log extending dups head to " <<
571 olog.dups.front().version << dendl;
572 changed = true;
573
574 eversion_t last;
575 auto insert_cursor = log.dups.begin();
576 for (auto i = olog.dups.cbegin(); i != olog.dups.cend(); ++i) {
577 if (i->version >= insert_cursor->version) break;
578 log.dups.insert(insert_cursor, *i);
579 last = i->version;
580 auto prev = insert_cursor;
581 --prev;
582 // be sure to pass address of copy in log.dups
583 log.index(*prev);
584 }
585 mark_dirty_to_dups(last);
586 }
587 }
588 }
589
590 // remove any dup entries that overlap with pglog
591 if (!log.dups.empty() && log.dups.back().version > log.tail) {
592 dout(10) << "merge_log removed dups overlapping log entries (" <<
593 log.tail << "," << log.dups.back().version << "]" << dendl;
594 changed = true;
595
596 while (!log.dups.empty() && log.dups.back().version > log.tail) {
597 log.unindex(log.dups.back());
598 mark_dirty_from_dups(log.dups.back().version);
599 log.dups.pop_back();
600 }
601 }
602
603 dout(5) << "end of " << __func__ << " changed=" << changed
604 << " log.dups.size()=" << log.dups.size()
605 << " olog.dups.size()=" << olog.dups.size() << dendl;
606
607 return changed;
608 }
609
610 void PGLog::check() {
611 if (!pg_log_debug)
612 return;
613 if (log.log.size() != log_keys_debug.size()) {
614 derr << "log.log.size() != log_keys_debug.size()" << dendl;
615 derr << "actual log:" << dendl;
616 for (auto i = log.log.begin(); i != log.log.end(); ++i) {
617 derr << " " << *i << dendl;
618 }
619 derr << "log_keys_debug:" << dendl;
620 for (auto i = log_keys_debug.begin();
621 i != log_keys_debug.end();
622 ++i) {
623 derr << " " << *i << dendl;
624 }
625 }
626 ceph_assert(log.log.size() == log_keys_debug.size());
627 for (auto i = log.log.begin(); i != log.log.end(); ++i) {
628 ceph_assert(log_keys_debug.count(i->get_key_name()));
629 }
630 }
631
632 // non-static
633 void PGLog::write_log_and_missing(
634 ObjectStore::Transaction& t,
635 map<string,bufferlist> *km,
636 const coll_t& coll,
637 const ghobject_t &log_oid,
638 bool require_rollback)
639 {
640 if (needs_write()) {
641 dout(6) << "write_log_and_missing with: "
642 << "dirty_to: " << dirty_to
643 << ", dirty_from: " << dirty_from
644 << ", writeout_from: " << writeout_from
645 << ", trimmed: " << trimmed
646 << ", trimmed_dups: " << trimmed_dups
647 << ", clear_divergent_priors: " << clear_divergent_priors
648 << dendl;
649 _write_log_and_missing(
650 t, km, log, coll, log_oid,
651 dirty_to,
652 dirty_from,
653 writeout_from,
654 std::move(trimmed),
655 std::move(trimmed_dups),
656 missing,
657 !touched_log,
658 require_rollback,
659 clear_divergent_priors,
660 dirty_to_dups,
661 dirty_from_dups,
662 write_from_dups,
663 &may_include_deletes_in_missing_dirty,
664 (pg_log_debug ? &log_keys_debug : nullptr),
665 this);
666 undirty();
667 } else {
668 dout(10) << "log is not dirty" << dendl;
669 }
670 }
671
672 // static
673 void PGLog::write_log_and_missing_wo_missing(
674 ObjectStore::Transaction& t,
675 map<string,bufferlist> *km,
676 pg_log_t &log,
677 const coll_t& coll, const ghobject_t &log_oid,
678 map<eversion_t, hobject_t> &divergent_priors,
679 bool require_rollback,
680 const DoutPrefixProvider *dpp
681 )
682 {
683 _write_log_and_missing_wo_missing(
684 t, km, log, coll, log_oid,
685 divergent_priors, eversion_t::max(), eversion_t(), eversion_t(),
686 true, true, require_rollback,
687 eversion_t::max(), eversion_t(), eversion_t(), nullptr, dpp);
688 }
689
690 // static
691 void PGLog::write_log_and_missing(
692 ObjectStore::Transaction& t,
693 map<string,bufferlist> *km,
694 pg_log_t &log,
695 const coll_t& coll,
696 const ghobject_t &log_oid,
697 const pg_missing_tracker_t &missing,
698 bool require_rollback,
699 bool *may_include_deletes_in_missing_dirty,
700 const DoutPrefixProvider *dpp)
701 {
702 _write_log_and_missing(
703 t, km, log, coll, log_oid,
704 eversion_t::max(),
705 eversion_t(),
706 eversion_t(),
707 set<eversion_t>(),
708 set<string>(),
709 missing,
710 true, require_rollback, false,
711 eversion_t::max(),
712 eversion_t(),
713 eversion_t(),
714 may_include_deletes_in_missing_dirty, nullptr, dpp);
715 }
716
717 // static
718 void PGLog::_write_log_and_missing_wo_missing(
719 ObjectStore::Transaction& t,
720 map<string,bufferlist> *km,
721 pg_log_t &log,
722 const coll_t& coll, const ghobject_t &log_oid,
723 map<eversion_t, hobject_t> &divergent_priors,
724 eversion_t dirty_to,
725 eversion_t dirty_from,
726 eversion_t writeout_from,
727 bool dirty_divergent_priors,
728 bool touch_log,
729 bool require_rollback,
730 eversion_t dirty_to_dups,
731 eversion_t dirty_from_dups,
732 eversion_t write_from_dups,
733 set<string> *log_keys_debug,
734 const DoutPrefixProvider *dpp
735 )
736 {
737 ldpp_dout(dpp, 10) << "_write_log_and_missing_wo_missing, clearing up to " << dirty_to
738 << " dirty_to_dups=" << dirty_to_dups
739 << " dirty_from_dups=" << dirty_from_dups
740 << " write_from_dups=" << write_from_dups << dendl;
741 if (touch_log)
742 t.touch(coll, log_oid);
743 if (dirty_to != eversion_t()) {
744 t.omap_rmkeyrange(
745 coll, log_oid,
746 eversion_t().get_key_name(), dirty_to.get_key_name());
747 clear_up_to(log_keys_debug, dirty_to.get_key_name());
748 }
749 if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) {
750 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
751 t.omap_rmkeyrange(
752 coll, log_oid,
753 dirty_from.get_key_name(), eversion_t::max().get_key_name());
754 clear_after(log_keys_debug, dirty_from.get_key_name());
755 }
756
757 for (auto p = log.log.begin();
758 p != log.log.end() && p->version <= dirty_to;
759 ++p) {
760 bufferlist bl(sizeof(*p) * 2);
761 p->encode_with_checksum(bl);
762 (*km)[p->get_key_name()] = std::move(bl);
763 }
764
765 for (auto p = log.log.rbegin();
766 p != log.log.rend() &&
767 (p->version >= dirty_from || p->version >= writeout_from) &&
768 p->version >= dirty_to;
769 ++p) {
770 bufferlist bl(sizeof(*p) * 2);
771 p->encode_with_checksum(bl);
772 (*km)[p->get_key_name()] = std::move(bl);
773 }
774
775 if (log_keys_debug) {
776 for (auto i = (*km).begin();
777 i != (*km).end();
778 ++i) {
779 if (i->first[0] == '_')
780 continue;
781 ceph_assert(!log_keys_debug->count(i->first));
782 log_keys_debug->insert(i->first);
783 }
784 }
785
786 // process dups after log_keys_debug is filled, so dups do not
787 // end up in that set
788 if (dirty_to_dups != eversion_t()) {
789 pg_log_dup_t min, dirty_to_dup;
790 dirty_to_dup.version = dirty_to_dups;
791 ldpp_dout(dpp, 10) << __func__ << " remove dups min=" << min.get_key_name()
792 << " to dirty_to_dup=" << dirty_to_dup.get_key_name() << dendl;
793 t.omap_rmkeyrange(
794 coll, log_oid,
795 min.get_key_name(), dirty_to_dup.get_key_name());
796 }
797 if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
798 pg_log_dup_t max, dirty_from_dup;
799 max.version = eversion_t::max();
800 dirty_from_dup.version = dirty_from_dups;
801 ldpp_dout(dpp, 10) << __func__ << " remove dups dirty_from_dup="
802 << dirty_from_dup.get_key_name()
803 << " to max=" << max.get_key_name() << dendl;
804 t.omap_rmkeyrange(
805 coll, log_oid,
806 dirty_from_dup.get_key_name(), max.get_key_name());
807 }
808
809 ldpp_dout(dpp, 10) << __func__ << " going to encode log.dups.size()="
810 << log.dups.size() << dendl;
811 for (const auto& entry : log.dups) {
812 if (entry.version > dirty_to_dups)
813 break;
814 bufferlist bl;
815 encode(entry, bl);
816 (*km)[entry.get_key_name()] = std::move(bl);
817 }
818 ldpp_dout(dpp, 10) << __func__ << " 1st round encoded log.dups.size()="
819 << log.dups.size() << dendl;
820 for (auto p = log.dups.rbegin();
821 p != log.dups.rend() &&
822 (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
823 p->version >= dirty_to_dups;
824 ++p) {
825 bufferlist bl;
826 encode(*p, bl);
827 (*km)[p->get_key_name()] = std::move(bl);
828 }
829 ldpp_dout(dpp, 10) << __func__ << " 2st round encoded log.dups.size()="
830 << log.dups.size() << dendl;
831
832 if (dirty_divergent_priors) {
833 ldpp_dout(dpp, 10) << "write_log_and_missing: writing divergent_priors"
834 << dendl;
835 encode(divergent_priors, (*km)["divergent_priors"]);
836 }
837 if (require_rollback) {
838 encode(
839 log.get_can_rollback_to(),
840 (*km)["can_rollback_to"]);
841 encode(
842 log.get_rollback_info_trimmed_to(),
843 (*km)["rollback_info_trimmed_to"]);
844 }
845 ldpp_dout(dpp, 10) << "end of " << __func__ << dendl;
846 }
847
848 // static
849 void PGLog::_write_log_and_missing(
850 ObjectStore::Transaction& t,
851 map<string,bufferlist>* km,
852 pg_log_t &log,
853 const coll_t& coll, const ghobject_t &log_oid,
854 eversion_t dirty_to,
855 eversion_t dirty_from,
856 eversion_t writeout_from,
857 set<eversion_t> &&trimmed,
858 set<string> &&trimmed_dups,
859 const pg_missing_tracker_t &missing,
860 bool touch_log,
861 bool require_rollback,
862 bool clear_divergent_priors,
863 eversion_t dirty_to_dups,
864 eversion_t dirty_from_dups,
865 eversion_t write_from_dups,
866 bool *may_include_deletes_in_missing_dirty, // in/out param
867 set<string> *log_keys_debug,
868 const DoutPrefixProvider *dpp
869 ) {
870 ldpp_dout(dpp, 10) << __func__ << " clearing up to " << dirty_to
871 << " dirty_to_dups=" << dirty_to_dups
872 << " dirty_from_dups=" << dirty_from_dups
873 << " write_from_dups=" << write_from_dups
874 << " trimmed_dups.size()=" << trimmed_dups.size() << dendl;
875 set<string> to_remove;
876 to_remove.swap(trimmed_dups);
877 for (auto& t : trimmed) {
878 string key = t.get_key_name();
879 if (log_keys_debug) {
880 auto it = log_keys_debug->find(key);
881 ceph_assert(it != log_keys_debug->end());
882 log_keys_debug->erase(it);
883 }
884 to_remove.emplace(std::move(key));
885 }
886 trimmed.clear();
887
888 if (touch_log)
889 t.touch(coll, log_oid);
890 if (dirty_to != eversion_t()) {
891 t.omap_rmkeyrange(
892 coll, log_oid,
893 eversion_t().get_key_name(), dirty_to.get_key_name());
894 clear_up_to(log_keys_debug, dirty_to.get_key_name());
895 }
896 if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) {
897 ldpp_dout(dpp, 10) << "write_log_and_missing, clearing from "
898 << dirty_from << dendl;
899 t.omap_rmkeyrange(
900 coll, log_oid,
901 dirty_from.get_key_name(), eversion_t::max().get_key_name());
902 clear_after(log_keys_debug, dirty_from.get_key_name());
903 }
904
905 for (auto p = log.log.begin();
906 p != log.log.end() && p->version <= dirty_to;
907 ++p) {
908 bufferlist bl(sizeof(*p) * 2);
909 p->encode_with_checksum(bl);
910 (*km)[p->get_key_name()] = std::move(bl);
911 }
912
913 for (auto p = log.log.rbegin();
914 p != log.log.rend() &&
915 (p->version >= dirty_from || p->version >= writeout_from) &&
916 p->version >= dirty_to;
917 ++p) {
918 bufferlist bl(sizeof(*p) * 2);
919 p->encode_with_checksum(bl);
920 (*km)[p->get_key_name()] = std::move(bl);
921 }
922
923 if (log_keys_debug) {
924 for (auto i = (*km).begin();
925 i != (*km).end();
926 ++i) {
927 if (i->first[0] == '_')
928 continue;
929 ceph_assert(!log_keys_debug->count(i->first));
930 log_keys_debug->insert(i->first);
931 }
932 }
933
934 // process dups after log_keys_debug is filled, so dups do not
935 // end up in that set
936 if (dirty_to_dups != eversion_t()) {
937 pg_log_dup_t min, dirty_to_dup;
938 dirty_to_dup.version = dirty_to_dups;
939 ldpp_dout(dpp, 10) << __func__ << " remove dups min=" << min.get_key_name()
940 << " to dirty_to_dup=" << dirty_to_dup.get_key_name() << dendl;
941 t.omap_rmkeyrange(
942 coll, log_oid,
943 min.get_key_name(), dirty_to_dup.get_key_name());
944 }
945 if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
946 pg_log_dup_t max, dirty_from_dup;
947 max.version = eversion_t::max();
948 dirty_from_dup.version = dirty_from_dups;
949 ldpp_dout(dpp, 10) << __func__ << " remove dups dirty_from_dup="
950 << dirty_from_dup.get_key_name()
951 << " to max=" << max.get_key_name() << dendl;
952 t.omap_rmkeyrange(
953 coll, log_oid,
954 dirty_from_dup.get_key_name(), max.get_key_name());
955 }
956
957 ldpp_dout(dpp, 10) << __func__ << " going to encode log.dups.size()="
958 << log.dups.size() << dendl;
959 for (const auto& entry : log.dups) {
960 if (entry.version > dirty_to_dups)
961 break;
962 bufferlist bl;
963 encode(entry, bl);
964 (*km)[entry.get_key_name()] = std::move(bl);
965 }
966 ldpp_dout(dpp, 10) << __func__ << " 1st round encoded log.dups.size()="
967 << log.dups.size() << dendl;
968
969 for (auto p = log.dups.rbegin();
970 p != log.dups.rend() &&
971 (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
972 p->version >= dirty_to_dups;
973 ++p) {
974 bufferlist bl;
975 encode(*p, bl);
976 (*km)[p->get_key_name()] = std::move(bl);
977 }
978 ldpp_dout(dpp, 10) << __func__ << " 2st round encoded log.dups.size()="
979 << log.dups.size() << dendl;
980
981 if (clear_divergent_priors) {
982 ldpp_dout(dpp, 10) << "write_log_and_missing: writing divergent_priors"
983 << dendl;
984 to_remove.insert("divergent_priors");
985 }
986 // since we encode individual missing items instead of a whole
987 // missing set, we need another key to store this bit of state
988 if (*may_include_deletes_in_missing_dirty) {
989 (*km)["may_include_deletes_in_missing"] = bufferlist();
990 *may_include_deletes_in_missing_dirty = false;
991 }
992 missing.get_changed(
993 [&](const hobject_t &obj) {
994 string key = string("missing/") + obj.to_str();
995 pg_missing_item item;
996 if (!missing.is_missing(obj, &item)) {
997 to_remove.insert(key);
998 } else {
999 encode(make_pair(obj, item), (*km)[key], CEPH_FEATUREMASK_SERVER_OCTOPUS);
1000 }
1001 });
1002 if (require_rollback) {
1003 encode(
1004 log.get_can_rollback_to(),
1005 (*km)["can_rollback_to"]);
1006 encode(
1007 log.get_rollback_info_trimmed_to(),
1008 (*km)["rollback_info_trimmed_to"]);
1009 }
1010
1011 if (!to_remove.empty())
1012 t.omap_rmkeys(coll, log_oid, to_remove);
1013 ldpp_dout(dpp, 10) << "end of " << __func__ << dendl;
1014 }
1015
1016 void PGLog::rebuild_missing_set_with_deletes(
1017 ObjectStore *store,
1018 ObjectStore::CollectionHandle& ch,
1019 const pg_info_t &info)
1020 {
1021 // save entries not generated from the current log (e.g. added due
1022 // to repair, EIO handling, or divergent_priors).
1023 map<hobject_t, pg_missing_item> extra_missing;
1024 for (const auto& p : missing.get_items()) {
1025 if (!log.logged_object(p.first)) {
1026 dout(20) << __func__ << " extra missing entry: " << p.first
1027 << " " << p.second << dendl;
1028 extra_missing[p.first] = p.second;
1029 }
1030 }
1031 missing.clear();
1032
1033 // go through the log and add items that are not present or older
1034 // versions on disk, just as if we were reading the log + metadata
1035 // off disk originally
1036 set<hobject_t> did;
1037 for (auto i = log.log.rbegin();
1038 i != log.log.rend();
1039 ++i) {
1040 if (i->version <= info.last_complete)
1041 break;
1042 if (i->soid > info.last_backfill ||
1043 i->is_error() ||
1044 did.find(i->soid) != did.end())
1045 continue;
1046 did.insert(i->soid);
1047
1048 bufferlist bv;
1049 int r = store->getattr(
1050 ch,
1051 ghobject_t(i->soid, ghobject_t::NO_GEN, info.pgid.shard),
1052 OI_ATTR,
1053 bv);
1054 dout(20) << __func__ << " check for log entry: " << *i << " = " << r << dendl;
1055
1056 if (r >= 0) {
1057 object_info_t oi(bv);
1058 dout(20) << __func__ << " store version = " << oi.version << dendl;
1059 if (oi.version < i->version) {
1060 missing.add(i->soid, i->version, oi.version, i->is_delete());
1061 }
1062 } else {
1063 missing.add(i->soid, i->version, eversion_t(), i->is_delete());
1064 }
1065 }
1066
1067 for (const auto& p : extra_missing) {
1068 missing.add(p.first, p.second.need, p.second.have, p.second.is_delete());
1069 }
1070
1071 set_missing_may_contain_deletes();
1072 }
1073
1074 #ifdef WITH_SEASTAR
1075
1076 namespace {
1077 struct FuturizedShardStoreLogReader {
1078 crimson::os::FuturizedStore::Shard &store;
1079 const pg_info_t &info;
1080 PGLog::IndexedLog &log;
1081 std::set<std::string>* log_keys_debug = NULL;
1082 pg_missing_tracker_t &missing;
1083 const DoutPrefixProvider *dpp;
1084
1085 eversion_t on_disk_can_rollback_to;
1086 eversion_t on_disk_rollback_info_trimmed_to;
1087
1088 std::map<eversion_t, hobject_t> divergent_priors;
1089 bool must_rebuild = false;
1090 std::list<pg_log_entry_t> entries;
1091 std::list<pg_log_dup_t> dups;
1092
1093 std::optional<std::string> next;
1094
1095 void process_entry(const auto& key, const auto& value) {
1096 if (key[0] == '_')
1097 return;
1098 //Copy ceph::buffer::list before creating iterator
1099 auto bl = value;
1100 auto bp = bl.cbegin();
1101 if (key == "divergent_priors") {
1102 decode(divergent_priors, bp);
1103 ldpp_dout(dpp, 20) << "read_log_and_missing " << divergent_priors.size()
1104 << " divergent_priors" << dendl;
1105 ceph_assert("crimson shouldn't have had divergent_priors" == 0);
1106 } else if (key == "can_rollback_to") {
1107 decode(on_disk_can_rollback_to, bp);
1108 } else if (key == "rollback_info_trimmed_to") {
1109 decode(on_disk_rollback_info_trimmed_to, bp);
1110 } else if (key == "may_include_deletes_in_missing") {
1111 missing.may_include_deletes = true;
1112 } else if (key.substr(0, 7) == std::string("missing")) {
1113 hobject_t oid;
1114 pg_missing_item item;
1115 decode(oid, bp);
1116 decode(item, bp);
1117 if (item.is_delete()) {
1118 ceph_assert(missing.may_include_deletes);
1119 }
1120 missing.add(oid, std::move(item));
1121 } else if (key.substr(0, 4) == std::string("dup_")) {
1122 pg_log_dup_t dup;
1123 decode(dup, bp);
1124 if (!dups.empty()) {
1125 ceph_assert(dups.back().version < dup.version);
1126 }
1127 dups.push_back(dup);
1128 } else {
1129 pg_log_entry_t e;
1130 e.decode_with_checksum(bp);
1131 ldpp_dout(dpp, 20) << "read_log_and_missing " << e << dendl;
1132 if (!entries.empty()) {
1133 pg_log_entry_t last_e(entries.back());
1134 ceph_assert(last_e.version.version < e.version.version);
1135 ceph_assert(last_e.version.epoch <= e.version.epoch);
1136 }
1137 entries.push_back(e);
1138 if (log_keys_debug)
1139 log_keys_debug->insert(e.get_key_name());
1140 }
1141 }
1142
1143 seastar::future<> read(crimson::os::CollectionRef ch,
1144 ghobject_t pgmeta_oid) {
1145 // will get overridden if recorded
1146 on_disk_can_rollback_to = info.last_update;
1147 missing.may_include_deletes = false;
1148
1149 return seastar::do_with(
1150 std::move(ch),
1151 std::move(pgmeta_oid),
1152 std::make_optional<std::string>(),
1153 [this](crimson::os::CollectionRef &ch,
1154 ghobject_t &pgmeta_oid,
1155 std::optional<std::string> &start) {
1156 return seastar::repeat([this, &ch, &pgmeta_oid, &start]() {
1157 return store.omap_get_values(
1158 ch, pgmeta_oid, start
1159 ).safe_then([this, &start](const auto& ret) {
1160 const auto& [done, kvs] = ret;
1161 for (const auto& [key, value] : kvs) {
1162 process_entry(key, value);
1163 start = key;
1164 }
1165 return seastar::make_ready_future<seastar::stop_iteration>(
1166 done ? seastar::stop_iteration::yes : seastar::stop_iteration::no
1167 );
1168 }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all{});
1169 }).then([this] {
1170 if (info.pgid.is_no_shard()) {
1171 // replicated pool pg does not persist this key
1172 assert(on_disk_rollback_info_trimmed_to == eversion_t());
1173 on_disk_rollback_info_trimmed_to = info.last_update;
1174 }
1175 log = PGLog::IndexedLog(
1176 info.last_update,
1177 info.log_tail,
1178 on_disk_can_rollback_to,
1179 on_disk_rollback_info_trimmed_to,
1180 std::move(entries),
1181 std::move(dups));
1182 });
1183 });
1184 }
1185 };
1186 }
1187
1188 seastar::future<> PGLog::read_log_and_missing_crimson(
1189 crimson::os::FuturizedStore::Shard &store,
1190 crimson::os::CollectionRef ch,
1191 const pg_info_t &info,
1192 IndexedLog &log,
1193 std::set<std::string>* log_keys_debug,
1194 pg_missing_tracker_t &missing,
1195 ghobject_t pgmeta_oid,
1196 const DoutPrefixProvider *dpp)
1197 {
1198 ldpp_dout(dpp, 20) << "read_log_and_missing coll "
1199 << ch->get_cid()
1200 << " " << pgmeta_oid << dendl;
1201 return seastar::do_with(FuturizedShardStoreLogReader{
1202 store, info, log, log_keys_debug,
1203 missing, dpp},
1204 [ch, pgmeta_oid](FuturizedShardStoreLogReader& reader) {
1205 return reader.read(ch, pgmeta_oid);
1206 });
1207 }
1208
1209 #endif