]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/journal/segmented_journal.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / os / seastore / journal / segmented_journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <iostream>
5
6 #include <boost/iterator/counting_iterator.hpp>
7
8 #include "include/intarith.h"
9
10 #include "segmented_journal.h"
11
12 #include "crimson/common/config_proxy.h"
13 #include "crimson/os/seastore/logging.h"
14
15 SET_SUBSYS(seastore_journal);
16
17 /*
18 * format:
19 * - H<handle-addr> information
20 *
21 * levels:
22 * - INFO: major initiation, closing, rolling and replay operations
23 * - DEBUG: INFO details, major submit operations
24 * - TRACE: DEBUG details
25 */
26
27 namespace crimson::os::seastore::journal {
28
29 SegmentedJournal::SegmentedJournal(
30 SegmentProvider &segment_provider,
31 JournalTrimmer &trimmer)
32 : segment_seq_allocator(
33 new SegmentSeqAllocator(segment_type_t::JOURNAL)),
34 journal_segment_allocator(&trimmer,
35 data_category_t::METADATA,
36 INLINE_GENERATION,
37 segment_provider,
38 *segment_seq_allocator),
39 record_submitter(crimson::common::get_conf<uint64_t>(
40 "seastore_journal_iodepth_limit"),
41 crimson::common::get_conf<uint64_t>(
42 "seastore_journal_batch_capacity"),
43 crimson::common::get_conf<Option::size_t>(
44 "seastore_journal_batch_flush_size"),
45 crimson::common::get_conf<double>(
46 "seastore_journal_batch_preferred_fullness"),
47 journal_segment_allocator),
48 sm_group(*segment_provider.get_segment_manager_group()),
49 trimmer{trimmer}
50 {
51 }
52
53 SegmentedJournal::open_for_mkfs_ret
54 SegmentedJournal::open_for_mkfs()
55 {
56 return record_submitter.open(true);
57 }
58
59 SegmentedJournal::open_for_mount_ret
60 SegmentedJournal::open_for_mount()
61 {
62 return record_submitter.open(false);
63 }
64
65 SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
66 {
67 LOG_PREFIX(Journal::close);
68 INFO("closing, committed_to={}",
69 record_submitter.get_committed_to());
70 return record_submitter.close();
71 }
72
73 SegmentedJournal::prep_replay_segments_fut
74 SegmentedJournal::prep_replay_segments(
75 std::vector<std::pair<segment_id_t, segment_header_t>> segments)
76 {
77 LOG_PREFIX(Journal::prep_replay_segments);
78 if (segments.empty()) {
79 ERROR("no journal segments for replay");
80 return crimson::ct_error::input_output_error::make();
81 }
82 std::sort(
83 segments.begin(),
84 segments.end(),
85 [](const auto &lt, const auto &rt) {
86 return lt.second.segment_seq <
87 rt.second.segment_seq;
88 });
89
90 segment_seq_allocator->set_next_segment_seq(
91 segments.rbegin()->second.segment_seq + 1);
92 std::for_each(
93 segments.begin(),
94 segments.end(),
95 [FNAME](auto &seg)
96 {
97 if (seg.first != seg.second.physical_segment_id ||
98 seg.second.get_type() != segment_type_t::JOURNAL) {
99 ERROR("illegal journal segment for replay -- {}", seg.second);
100 ceph_abort();
101 }
102 });
103
104 auto last_segment_id = segments.rbegin()->first;
105 auto last_header = segments.rbegin()->second;
106 return scan_last_segment(last_segment_id, last_header
107 ).safe_then([this, FNAME, segments=std::move(segments)] {
108 INFO("dirty_tail={}, alloc_tail={}",
109 trimmer.get_dirty_tail(),
110 trimmer.get_alloc_tail());
111 auto journal_tail = trimmer.get_journal_tail();
112 auto journal_tail_paddr = journal_tail.offset;
113 ceph_assert(journal_tail != JOURNAL_SEQ_NULL);
114 ceph_assert(journal_tail_paddr != P_ADDR_NULL);
115 auto from = std::find_if(
116 segments.begin(),
117 segments.end(),
118 [&journal_tail_paddr](const auto &seg) -> bool {
119 auto& seg_addr = journal_tail_paddr.as_seg_paddr();
120 return seg.first == seg_addr.get_segment_id();
121 });
122 if (from->second.segment_seq != journal_tail.segment_seq) {
123 ERROR("journal_tail {} does not match {}",
124 journal_tail, from->second);
125 ceph_abort();
126 }
127
128 auto num_segments = segments.end() - from;
129 INFO("{} segments to replay", num_segments);
130 auto ret = replay_segments_t(num_segments);
131 std::transform(
132 from, segments.end(), ret.begin(),
133 [this](const auto &p) {
134 auto ret = journal_seq_t{
135 p.second.segment_seq,
136 paddr_t::make_seg_paddr(
137 p.first,
138 sm_group.get_block_size())
139 };
140 return std::make_pair(ret, p.second);
141 });
142 ret[0].first.offset = journal_tail_paddr;
143 return prep_replay_segments_fut(
144 replay_ertr::ready_future_marker{},
145 std::move(ret));
146 });
147 }
148
149 SegmentedJournal::scan_last_segment_ertr::future<>
150 SegmentedJournal::scan_last_segment(
151 const segment_id_t &segment_id,
152 const segment_header_t &segment_header)
153 {
154 LOG_PREFIX(SegmentedJournal::scan_last_segment);
155 assert(segment_id == segment_header.physical_segment_id);
156 trimmer.update_journal_tails(
157 segment_header.dirty_tail, segment_header.alloc_tail);
158 auto seq = journal_seq_t{
159 segment_header.segment_seq,
160 paddr_t::make_seg_paddr(segment_id, 0)
161 };
162 INFO("scanning journal tail deltas -- {}", segment_header);
163 return seastar::do_with(
164 scan_valid_records_cursor(seq),
165 SegmentManagerGroup::found_record_handler_t(
166 [FNAME, this](
167 record_locator_t locator,
168 const record_group_header_t& record_group_header,
169 const bufferlist& mdbuf
170 ) -> SegmentManagerGroup::scan_valid_records_ertr::future<>
171 {
172 DEBUG("decoding {} at {}", record_group_header, locator);
173 bool has_tail_delta = false;
174 auto maybe_headers = try_decode_record_headers(
175 record_group_header, mdbuf);
176 if (!maybe_headers) {
177 // This should be impossible, we did check the crc on the mdbuf
178 ERROR("unable to decode headers from {} at {}",
179 record_group_header, locator);
180 ceph_abort();
181 }
182 for (auto &record_header : *maybe_headers) {
183 ceph_assert(is_valid_transaction(record_header.type));
184 if (is_background_transaction(record_header.type)) {
185 has_tail_delta = true;
186 }
187 }
188 if (has_tail_delta) {
189 bool found_delta = false;
190 auto maybe_record_deltas_list = try_decode_deltas(
191 record_group_header, mdbuf, locator.record_block_base);
192 if (!maybe_record_deltas_list) {
193 ERROR("unable to decode deltas from {} at {}",
194 record_group_header, locator);
195 ceph_abort();
196 }
197 for (auto &record_deltas : *maybe_record_deltas_list) {
198 for (auto &[ctime, delta] : record_deltas.deltas) {
199 if (delta.type == extent_types_t::JOURNAL_TAIL) {
200 found_delta = true;
201 journal_tail_delta_t tail_delta;
202 decode(tail_delta, delta.bl);
203 auto start_seq = locator.write_result.start_seq;
204 DEBUG("got {}, at {}", tail_delta, start_seq);
205 ceph_assert(tail_delta.dirty_tail != JOURNAL_SEQ_NULL);
206 ceph_assert(tail_delta.alloc_tail != JOURNAL_SEQ_NULL);
207 trimmer.update_journal_tails(
208 tail_delta.dirty_tail, tail_delta.alloc_tail);
209 }
210 }
211 }
212 ceph_assert(found_delta);
213 }
214 return seastar::now();
215 }),
216 [this, nonce=segment_header.segment_nonce](auto &cursor, auto &handler)
217 {
218 return sm_group.scan_valid_records(
219 cursor,
220 nonce,
221 std::numeric_limits<std::size_t>::max(),
222 handler).discard_result();
223 });
224 }
225
226 SegmentedJournal::replay_ertr::future<>
227 SegmentedJournal::replay_segment(
228 journal_seq_t seq,
229 segment_header_t header,
230 delta_handler_t &handler,
231 replay_stats_t &stats)
232 {
233 LOG_PREFIX(Journal::replay_segment);
234 INFO("starting at {} -- {}", seq, header);
235 return seastar::do_with(
236 scan_valid_records_cursor(seq),
237 SegmentManagerGroup::found_record_handler_t(
238 [&handler, this, &stats](
239 record_locator_t locator,
240 const record_group_header_t& header,
241 const bufferlist& mdbuf)
242 -> SegmentManagerGroup::scan_valid_records_ertr::future<>
243 {
244 LOG_PREFIX(Journal::replay_segment);
245 ++stats.num_record_groups;
246 auto maybe_record_deltas_list = try_decode_deltas(
247 header, mdbuf, locator.record_block_base);
248 if (!maybe_record_deltas_list) {
249 // This should be impossible, we did check the crc on the mdbuf
250 ERROR("unable to decode deltas for record {} at {}",
251 header, locator);
252 return crimson::ct_error::input_output_error::make();
253 }
254
255 return seastar::do_with(
256 std::move(*maybe_record_deltas_list),
257 [write_result=locator.write_result,
258 this,
259 FNAME,
260 &handler,
261 &stats](auto& record_deltas_list)
262 {
263 return crimson::do_for_each(
264 record_deltas_list,
265 [write_result,
266 this,
267 FNAME,
268 &handler,
269 &stats](record_deltas_t& record_deltas)
270 {
271 ++stats.num_records;
272 auto locator = record_locator_t{
273 record_deltas.record_block_base,
274 write_result
275 };
276 DEBUG("processing {} deltas at block_base {}",
277 record_deltas.deltas.size(),
278 locator);
279 return crimson::do_for_each(
280 record_deltas.deltas,
281 [locator,
282 this,
283 &handler,
284 &stats](auto &p)
285 {
286 auto& modify_time = p.first;
287 auto& delta = p.second;
288 return handler(
289 locator,
290 delta,
291 trimmer.get_dirty_tail(),
292 trimmer.get_alloc_tail(),
293 modify_time
294 ).safe_then([&stats, delta_type=delta.type](bool is_applied) {
295 if (is_applied) {
296 // see Cache::replay_delta()
297 assert(delta_type != extent_types_t::JOURNAL_TAIL);
298 if (delta_type == extent_types_t::ALLOC_INFO) {
299 ++stats.num_alloc_deltas;
300 } else {
301 ++stats.num_dirty_deltas;
302 }
303 }
304 });
305 });
306 });
307 });
308 }),
309 [=, this](auto &cursor, auto &dhandler) {
310 return sm_group.scan_valid_records(
311 cursor,
312 header.segment_nonce,
313 std::numeric_limits<size_t>::max(),
314 dhandler).safe_then([](auto){}
315 ).handle_error(
316 replay_ertr::pass_further{},
317 crimson::ct_error::assert_all{
318 "shouldn't meet with any other error other replay_ertr"
319 }
320 );
321 }
322 );
323 }
324
325 SegmentedJournal::replay_ret SegmentedJournal::replay(
326 delta_handler_t &&delta_handler)
327 {
328 LOG_PREFIX(Journal::replay);
329 return sm_group.find_journal_segment_headers(
330 ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)]
331 (auto &&segment_headers) mutable -> replay_ret {
332 INFO("got {} segments", segment_headers.size());
333 return seastar::do_with(
334 std::move(delta_handler),
335 replay_segments_t(),
336 replay_stats_t(),
337 [this, segment_headers=std::move(segment_headers), FNAME]
338 (auto &handler, auto &segments, auto &stats) mutable -> replay_ret {
339 return prep_replay_segments(std::move(segment_headers)
340 ).safe_then([this, &handler, &segments, &stats](auto replay_segs) mutable {
341 segments = std::move(replay_segs);
342 return crimson::do_for_each(segments,[this, &handler, &stats](auto i) mutable {
343 return replay_segment(i.first, i.second, handler, stats);
344 });
345 }).safe_then([&stats, FNAME] {
346 INFO("replay done, record_groups={}, records={}, "
347 "alloc_deltas={}, dirty_deltas={}",
348 stats.num_record_groups,
349 stats.num_records,
350 stats.num_alloc_deltas,
351 stats.num_dirty_deltas);
352 });
353 });
354 });
355 }
356
357 seastar::future<> SegmentedJournal::flush(OrderingHandle &handle)
358 {
359 LOG_PREFIX(SegmentedJournal::flush);
360 DEBUG("H{} flush ...", (void*)&handle);
361 assert(write_pipeline);
362 return handle.enter(write_pipeline->device_submission
363 ).then([this, &handle] {
364 return handle.enter(write_pipeline->finalize);
365 }).then([FNAME, &handle] {
366 DEBUG("H{} flush done", (void*)&handle);
367 });
368 }
369
370 SegmentedJournal::submit_record_ret
371 SegmentedJournal::do_submit_record(
372 record_t &&record,
373 OrderingHandle &handle)
374 {
375 LOG_PREFIX(SegmentedJournal::do_submit_record);
376 if (!record_submitter.is_available()) {
377 DEBUG("H{} wait ...", (void*)&handle);
378 return record_submitter.wait_available(
379 ).safe_then([this, record=std::move(record), &handle]() mutable {
380 return do_submit_record(std::move(record), handle);
381 });
382 }
383 auto action = record_submitter.check_action(record.size);
384 if (action == RecordSubmitter::action_t::ROLL) {
385 DEBUG("H{} roll, unavailable ...", (void*)&handle);
386 return record_submitter.roll_segment(
387 ).safe_then([this, record=std::move(record), &handle]() mutable {
388 return do_submit_record(std::move(record), handle);
389 });
390 } else { // SUBMIT_FULL/NOT_FULL
391 DEBUG("H{} submit {} ...",
392 (void*)&handle,
393 action == RecordSubmitter::action_t::SUBMIT_FULL ?
394 "FULL" : "NOT_FULL");
395 auto submit_fut = record_submitter.submit(std::move(record));
396 return handle.enter(write_pipeline->device_submission
397 ).then([submit_fut=std::move(submit_fut)]() mutable {
398 return std::move(submit_fut);
399 }).safe_then([FNAME, this, &handle](record_locator_t result) {
400 return handle.enter(write_pipeline->finalize
401 ).then([FNAME, this, result, &handle] {
402 DEBUG("H{} finish with {}", (void*)&handle, result);
403 auto new_committed_to = result.write_result.get_end_seq();
404 record_submitter.update_committed_to(new_committed_to);
405 return result;
406 });
407 });
408 }
409 }
410
411 SegmentedJournal::submit_record_ret
412 SegmentedJournal::submit_record(
413 record_t &&record,
414 OrderingHandle &handle)
415 {
416 LOG_PREFIX(SegmentedJournal::submit_record);
417 DEBUG("H{} {} start ...", (void*)&handle, record);
418 assert(write_pipeline);
419 auto expected_size = record_group_size_t(
420 record.size,
421 journal_segment_allocator.get_block_size()
422 ).get_encoded_length();
423 auto max_record_length = journal_segment_allocator.get_max_write_length();
424 if (expected_size > max_record_length) {
425 ERROR("H{} {} exceeds max record size {}",
426 (void*)&handle, record, max_record_length);
427 return crimson::ct_error::erange::make();
428 }
429
430 return do_submit_record(std::move(record), handle);
431 }
432
433 }