1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/iterator/counting_iterator.hpp>
8 #include "include/intarith.h"
10 #include "segmented_journal.h"
12 #include "crimson/common/config_proxy.h"
13 #include "crimson/os/seastore/logging.h"
15 SET_SUBSYS(seastore_journal
);
19 * - H<handle-addr> information
22 * - INFO: major initiation, closing, rolling and replay operations
23 * - DEBUG: INFO details, major submit operations
24 * - TRACE: DEBUG details
27 namespace crimson::os::seastore::journal
{
29 SegmentedJournal::SegmentedJournal(
30 SegmentProvider
&segment_provider
,
31 JournalTrimmer
&trimmer
)
32 : segment_seq_allocator(
33 new SegmentSeqAllocator(segment_type_t::JOURNAL
)),
34 journal_segment_allocator(&trimmer
,
35 data_category_t::METADATA
,
38 *segment_seq_allocator
),
39 record_submitter(crimson::common::get_conf
<uint64_t>(
40 "seastore_journal_iodepth_limit"),
41 crimson::common::get_conf
<uint64_t>(
42 "seastore_journal_batch_capacity"),
43 crimson::common::get_conf
<Option::size_t>(
44 "seastore_journal_batch_flush_size"),
45 crimson::common::get_conf
<double>(
46 "seastore_journal_batch_preferred_fullness"),
47 journal_segment_allocator
),
48 sm_group(*segment_provider
.get_segment_manager_group()),
53 SegmentedJournal::open_for_mkfs_ret
54 SegmentedJournal::open_for_mkfs()
56 return record_submitter
.open(true);
59 SegmentedJournal::open_for_mount_ret
60 SegmentedJournal::open_for_mount()
62 return record_submitter
.open(false);
65 SegmentedJournal::close_ertr::future
<> SegmentedJournal::close()
67 LOG_PREFIX(Journal::close
);
68 INFO("closing, committed_to={}",
69 record_submitter
.get_committed_to());
70 return record_submitter
.close();
73 SegmentedJournal::prep_replay_segments_fut
74 SegmentedJournal::prep_replay_segments(
75 std::vector
<std::pair
<segment_id_t
, segment_header_t
>> segments
)
77 LOG_PREFIX(Journal::prep_replay_segments
);
78 if (segments
.empty()) {
79 ERROR("no journal segments for replay");
80 return crimson::ct_error::input_output_error::make();
85 [](const auto <
, const auto &rt
) {
86 return lt
.second
.segment_seq
<
87 rt
.second
.segment_seq
;
90 segment_seq_allocator
->set_next_segment_seq(
91 segments
.rbegin()->second
.segment_seq
+ 1);
97 if (seg
.first
!= seg
.second
.physical_segment_id
||
98 seg
.second
.get_type() != segment_type_t::JOURNAL
) {
99 ERROR("illegal journal segment for replay -- {}", seg
.second
);
104 auto last_segment_id
= segments
.rbegin()->first
;
105 auto last_header
= segments
.rbegin()->second
;
106 return scan_last_segment(last_segment_id
, last_header
107 ).safe_then([this, FNAME
, segments
=std::move(segments
)] {
108 INFO("dirty_tail={}, alloc_tail={}",
109 trimmer
.get_dirty_tail(),
110 trimmer
.get_alloc_tail());
111 auto journal_tail
= trimmer
.get_journal_tail();
112 auto journal_tail_paddr
= journal_tail
.offset
;
113 ceph_assert(journal_tail
!= JOURNAL_SEQ_NULL
);
114 ceph_assert(journal_tail_paddr
!= P_ADDR_NULL
);
115 auto from
= std::find_if(
118 [&journal_tail_paddr
](const auto &seg
) -> bool {
119 auto& seg_addr
= journal_tail_paddr
.as_seg_paddr();
120 return seg
.first
== seg_addr
.get_segment_id();
122 if (from
->second
.segment_seq
!= journal_tail
.segment_seq
) {
123 ERROR("journal_tail {} does not match {}",
124 journal_tail
, from
->second
);
128 auto num_segments
= segments
.end() - from
;
129 INFO("{} segments to replay", num_segments
);
130 auto ret
= replay_segments_t(num_segments
);
132 from
, segments
.end(), ret
.begin(),
133 [this](const auto &p
) {
134 auto ret
= journal_seq_t
{
135 p
.second
.segment_seq
,
136 paddr_t::make_seg_paddr(
138 sm_group
.get_block_size())
140 return std::make_pair(ret
, p
.second
);
142 ret
[0].first
.offset
= journal_tail_paddr
;
143 return prep_replay_segments_fut(
144 replay_ertr::ready_future_marker
{},
149 SegmentedJournal::scan_last_segment_ertr::future
<>
150 SegmentedJournal::scan_last_segment(
151 const segment_id_t
&segment_id
,
152 const segment_header_t
&segment_header
)
154 LOG_PREFIX(SegmentedJournal::scan_last_segment
);
155 assert(segment_id
== segment_header
.physical_segment_id
);
156 trimmer
.update_journal_tails(
157 segment_header
.dirty_tail
, segment_header
.alloc_tail
);
158 auto seq
= journal_seq_t
{
159 segment_header
.segment_seq
,
160 paddr_t::make_seg_paddr(segment_id
, 0)
162 INFO("scanning journal tail deltas -- {}", segment_header
);
163 return seastar::do_with(
164 scan_valid_records_cursor(seq
),
165 SegmentManagerGroup::found_record_handler_t(
167 record_locator_t locator
,
168 const record_group_header_t
& record_group_header
,
169 const bufferlist
& mdbuf
170 ) -> SegmentManagerGroup::scan_valid_records_ertr::future
<>
172 DEBUG("decoding {} at {}", record_group_header
, locator
);
173 bool has_tail_delta
= false;
174 auto maybe_headers
= try_decode_record_headers(
175 record_group_header
, mdbuf
);
176 if (!maybe_headers
) {
177 // This should be impossible, we did check the crc on the mdbuf
178 ERROR("unable to decode headers from {} at {}",
179 record_group_header
, locator
);
182 for (auto &record_header
: *maybe_headers
) {
183 ceph_assert(is_valid_transaction(record_header
.type
));
184 if (is_background_transaction(record_header
.type
)) {
185 has_tail_delta
= true;
188 if (has_tail_delta
) {
189 bool found_delta
= false;
190 auto maybe_record_deltas_list
= try_decode_deltas(
191 record_group_header
, mdbuf
, locator
.record_block_base
);
192 if (!maybe_record_deltas_list
) {
193 ERROR("unable to decode deltas from {} at {}",
194 record_group_header
, locator
);
197 for (auto &record_deltas
: *maybe_record_deltas_list
) {
198 for (auto &[ctime
, delta
] : record_deltas
.deltas
) {
199 if (delta
.type
== extent_types_t::JOURNAL_TAIL
) {
201 journal_tail_delta_t tail_delta
;
202 decode(tail_delta
, delta
.bl
);
203 auto start_seq
= locator
.write_result
.start_seq
;
204 DEBUG("got {}, at {}", tail_delta
, start_seq
);
205 ceph_assert(tail_delta
.dirty_tail
!= JOURNAL_SEQ_NULL
);
206 ceph_assert(tail_delta
.alloc_tail
!= JOURNAL_SEQ_NULL
);
207 trimmer
.update_journal_tails(
208 tail_delta
.dirty_tail
, tail_delta
.alloc_tail
);
212 ceph_assert(found_delta
);
214 return seastar::now();
216 [this, nonce
=segment_header
.segment_nonce
](auto &cursor
, auto &handler
)
218 return sm_group
.scan_valid_records(
221 std::numeric_limits
<std::size_t>::max(),
222 handler
).discard_result();
226 SegmentedJournal::replay_ertr::future
<>
227 SegmentedJournal::replay_segment(
229 segment_header_t header
,
230 delta_handler_t
&handler
,
231 replay_stats_t
&stats
)
233 LOG_PREFIX(Journal::replay_segment
);
234 INFO("starting at {} -- {}", seq
, header
);
235 return seastar::do_with(
236 scan_valid_records_cursor(seq
),
237 SegmentManagerGroup::found_record_handler_t(
238 [&handler
, this, &stats
](
239 record_locator_t locator
,
240 const record_group_header_t
& header
,
241 const bufferlist
& mdbuf
)
242 -> SegmentManagerGroup::scan_valid_records_ertr::future
<>
244 LOG_PREFIX(Journal::replay_segment
);
245 ++stats
.num_record_groups
;
246 auto maybe_record_deltas_list
= try_decode_deltas(
247 header
, mdbuf
, locator
.record_block_base
);
248 if (!maybe_record_deltas_list
) {
249 // This should be impossible, we did check the crc on the mdbuf
250 ERROR("unable to decode deltas for record {} at {}",
252 return crimson::ct_error::input_output_error::make();
255 return seastar::do_with(
256 std::move(*maybe_record_deltas_list
),
257 [write_result
=locator
.write_result
,
261 &stats
](auto& record_deltas_list
)
263 return crimson::do_for_each(
269 &stats
](record_deltas_t
& record_deltas
)
272 auto locator
= record_locator_t
{
273 record_deltas
.record_block_base
,
276 DEBUG("processing {} deltas at block_base {}",
277 record_deltas
.deltas
.size(),
279 return crimson::do_for_each(
280 record_deltas
.deltas
,
286 auto& modify_time
= p
.first
;
287 auto& delta
= p
.second
;
291 trimmer
.get_dirty_tail(),
292 trimmer
.get_alloc_tail(),
294 ).safe_then([&stats
, delta_type
=delta
.type
](bool is_applied
) {
296 // see Cache::replay_delta()
297 assert(delta_type
!= extent_types_t::JOURNAL_TAIL
);
298 if (delta_type
== extent_types_t::ALLOC_INFO
) {
299 ++stats
.num_alloc_deltas
;
301 ++stats
.num_dirty_deltas
;
309 [=, this](auto &cursor
, auto &dhandler
) {
310 return sm_group
.scan_valid_records(
312 header
.segment_nonce
,
313 std::numeric_limits
<size_t>::max(),
314 dhandler
).safe_then([](auto){}
316 replay_ertr::pass_further
{},
317 crimson::ct_error::assert_all
{
318 "shouldn't meet with any other error other replay_ertr"
325 SegmentedJournal::replay_ret
SegmentedJournal::replay(
326 delta_handler_t
&&delta_handler
)
328 LOG_PREFIX(Journal::replay
);
329 return sm_group
.find_journal_segment_headers(
330 ).safe_then([this, FNAME
, delta_handler
=std::move(delta_handler
)]
331 (auto &&segment_headers
) mutable -> replay_ret
{
332 INFO("got {} segments", segment_headers
.size());
333 return seastar::do_with(
334 std::move(delta_handler
),
337 [this, segment_headers
=std::move(segment_headers
), FNAME
]
338 (auto &handler
, auto &segments
, auto &stats
) mutable -> replay_ret
{
339 return prep_replay_segments(std::move(segment_headers
)
340 ).safe_then([this, &handler
, &segments
, &stats
](auto replay_segs
) mutable {
341 segments
= std::move(replay_segs
);
342 return crimson::do_for_each(segments
,[this, &handler
, &stats
](auto i
) mutable {
343 return replay_segment(i
.first
, i
.second
, handler
, stats
);
345 }).safe_then([&stats
, FNAME
] {
346 INFO("replay done, record_groups={}, records={}, "
347 "alloc_deltas={}, dirty_deltas={}",
348 stats
.num_record_groups
,
350 stats
.num_alloc_deltas
,
351 stats
.num_dirty_deltas
);
357 seastar::future
<> SegmentedJournal::flush(OrderingHandle
&handle
)
359 LOG_PREFIX(SegmentedJournal::flush
);
360 DEBUG("H{} flush ...", (void*)&handle
);
361 assert(write_pipeline
);
362 return handle
.enter(write_pipeline
->device_submission
363 ).then([this, &handle
] {
364 return handle
.enter(write_pipeline
->finalize
);
365 }).then([FNAME
, &handle
] {
366 DEBUG("H{} flush done", (void*)&handle
);
370 SegmentedJournal::submit_record_ret
371 SegmentedJournal::do_submit_record(
373 OrderingHandle
&handle
)
375 LOG_PREFIX(SegmentedJournal::do_submit_record
);
376 if (!record_submitter
.is_available()) {
377 DEBUG("H{} wait ...", (void*)&handle
);
378 return record_submitter
.wait_available(
379 ).safe_then([this, record
=std::move(record
), &handle
]() mutable {
380 return do_submit_record(std::move(record
), handle
);
383 auto action
= record_submitter
.check_action(record
.size
);
384 if (action
== RecordSubmitter::action_t::ROLL
) {
385 DEBUG("H{} roll, unavailable ...", (void*)&handle
);
386 return record_submitter
.roll_segment(
387 ).safe_then([this, record
=std::move(record
), &handle
]() mutable {
388 return do_submit_record(std::move(record
), handle
);
390 } else { // SUBMIT_FULL/NOT_FULL
391 DEBUG("H{} submit {} ...",
393 action
== RecordSubmitter::action_t::SUBMIT_FULL
?
394 "FULL" : "NOT_FULL");
395 auto submit_fut
= record_submitter
.submit(std::move(record
));
396 return handle
.enter(write_pipeline
->device_submission
397 ).then([submit_fut
=std::move(submit_fut
)]() mutable {
398 return std::move(submit_fut
);
399 }).safe_then([FNAME
, this, &handle
](record_locator_t result
) {
400 return handle
.enter(write_pipeline
->finalize
401 ).then([FNAME
, this, result
, &handle
] {
402 DEBUG("H{} finish with {}", (void*)&handle
, result
);
403 auto new_committed_to
= result
.write_result
.get_end_seq();
404 record_submitter
.update_committed_to(new_committed_to
);
411 SegmentedJournal::submit_record_ret
412 SegmentedJournal::submit_record(
414 OrderingHandle
&handle
)
416 LOG_PREFIX(SegmentedJournal::submit_record
);
417 DEBUG("H{} {} start ...", (void*)&handle
, record
);
418 assert(write_pipeline
);
419 auto expected_size
= record_group_size_t(
421 journal_segment_allocator
.get_block_size()
422 ).get_encoded_length();
423 auto max_record_length
= journal_segment_allocator
.get_max_write_length();
424 if (expected_size
> max_record_length
) {
425 ERROR("H{} {} exceeds max record size {}",
426 (void*)&handle
, record
, max_record_length
);
427 return crimson::ct_error::erange::make();
430 return do_submit_record(std::move(record
), handle
);