1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
4 #include "crimson/os/seastore/segment_manager_group.h"
6 #include "crimson/os/seastore/logging.h"
8 SET_SUBSYS(seastore_journal
);
10 namespace crimson::os::seastore
{
12 SegmentManagerGroup::read_segment_tail_ret
13 SegmentManagerGroup::read_segment_tail(segment_id_t segment
)
15 assert(has_device(segment
.device_id()));
16 auto& segment_manager
= *segment_managers
[segment
.device_id()];
17 return segment_manager
.read(
18 paddr_t::make_seg_paddr(
20 segment_manager
.get_segment_size() - get_rounded_tail_length()),
21 get_rounded_tail_length()
23 read_segment_header_ertr::pass_further
{},
24 crimson::ct_error::assert_all
{
25 "Invalid error in SegmentManagerGroup::read_segment_tail"
27 ).safe_then([=, &segment_manager
](bufferptr bptr
) -> read_segment_tail_ret
{
28 LOG_PREFIX(SegmentManagerGroup::read_segment_tail
);
29 DEBUG("segment {} bptr size {}", segment
, bptr
.length());
35 DEBUG("segment {} block crc {}",
37 bl
.begin().crc32c(segment_manager
.get_block_size(), 0));
39 auto bp
= bl
.cbegin();
42 } catch (ceph::buffer::error
&e
) {
43 DEBUG("segment {} unable to decode tail, skipping -- {}",
45 return crimson::ct_error::enodata::make();
47 DEBUG("segment {} tail {}", segment
, tail
);
48 return read_segment_tail_ret(
49 read_segment_tail_ertr::ready_future_marker
{},
54 SegmentManagerGroup::read_segment_header_ret
55 SegmentManagerGroup::read_segment_header(segment_id_t segment
)
57 assert(has_device(segment
.device_id()));
58 auto& segment_manager
= *segment_managers
[segment
.device_id()];
59 return segment_manager
.read(
60 paddr_t::make_seg_paddr(segment
, 0),
61 get_rounded_header_length()
63 read_segment_header_ertr::pass_further
{},
64 crimson::ct_error::assert_all
{
65 "Invalid error in SegmentManagerGroup::read_segment_header"
67 ).safe_then([=, &segment_manager
](bufferptr bptr
) -> read_segment_header_ret
{
68 LOG_PREFIX(SegmentManagerGroup::read_segment_header
);
69 DEBUG("segment {} bptr size {}", segment
, bptr
.length());
71 segment_header_t header
;
75 DEBUG("segment {} block crc {}",
77 bl
.begin().crc32c(segment_manager
.get_block_size(), 0));
79 auto bp
= bl
.cbegin();
82 } catch (ceph::buffer::error
&e
) {
83 DEBUG("segment {} unable to decode header, skipping -- {}",
85 return crimson::ct_error::enodata::make();
87 DEBUG("segment {} header {}", segment
, header
);
88 return read_segment_header_ret(
89 read_segment_header_ertr::ready_future_marker
{},
94 SegmentManagerGroup::scan_valid_records_ret
95 SegmentManagerGroup::scan_valid_records(
96 scan_valid_records_cursor
&cursor
,
97 segment_nonce_t nonce
,
99 found_record_handler_t
&handler
)
101 LOG_PREFIX(SegmentManagerGroup::scan_valid_records
);
102 assert(has_device(cursor
.get_segment_id().device_id()));
103 auto& segment_manager
=
104 *segment_managers
[cursor
.get_segment_id().device_id()];
105 if (cursor
.get_segment_offset() == 0) {
106 INFO("start to scan segment {}", cursor
.get_segment_id());
107 cursor
.increment_seq(segment_manager
.get_block_size());
109 DEBUG("starting at {}, budget={}", cursor
, budget
);
110 auto retref
= std::make_unique
<size_t>(0);
111 auto &budget_used
= *retref
;
112 return crimson::repeat(
113 [=, &cursor
, &budget_used
, &handler
, this]() mutable
114 -> scan_valid_records_ertr::future
<seastar::stop_iteration
> {
115 return [=, &handler
, &cursor
, &budget_used
, this] {
116 if (!cursor
.last_valid_header_found
) {
117 return read_validate_record_metadata(cursor
.seq
.offset
, nonce
118 ).safe_then([=, &cursor
](auto md
) {
120 cursor
.last_valid_header_found
= true;
121 if (cursor
.is_complete()) {
122 INFO("complete at {}, invalid record group metadata",
125 DEBUG("found invalid record group metadata at {}, "
126 "processing {} pending record groups",
128 cursor
.pending_record_groups
.size());
130 return scan_valid_records_ertr::now();
132 auto& [header
, md_bl
] = *md
;
133 DEBUG("found valid {} at {}", header
, cursor
.seq
);
134 cursor
.emplace_record_group(header
, std::move(md_bl
));
135 return scan_valid_records_ertr::now();
137 }).safe_then([=, &cursor
, &budget_used
, &handler
, this] {
138 DEBUG("processing committed record groups until {}, {} pending",
139 cursor
.last_committed
,
140 cursor
.pending_record_groups
.size());
141 return crimson::repeat(
142 [=, &budget_used
, &cursor
, &handler
, this] {
143 if (cursor
.pending_record_groups
.empty()) {
144 /* This is only possible if the segment is empty.
145 * A record's last_commited must be prior to its own
146 * location since it itself cannot yet have been committed
147 * at its own time of submission. Thus, the most recently
148 * read record must always fall after cursor.last_committed */
149 return scan_valid_records_ertr::make_ready_future
<
150 seastar::stop_iteration
>(seastar::stop_iteration::yes
);
152 auto &next
= cursor
.pending_record_groups
.front();
153 journal_seq_t next_seq
= {cursor
.seq
.segment_seq
, next
.offset
};
154 if (cursor
.last_committed
== JOURNAL_SEQ_NULL
||
155 next_seq
> cursor
.last_committed
) {
156 return scan_valid_records_ertr::make_ready_future
<
157 seastar::stop_iteration
>(seastar::stop_iteration::yes
);
159 return consume_next_records(cursor
, handler
, budget_used
161 return scan_valid_records_ertr::make_ready_future
<
162 seastar::stop_iteration
>(seastar::stop_iteration::no
);
167 assert(!cursor
.pending_record_groups
.empty());
168 auto &next
= cursor
.pending_record_groups
.front();
169 return read_validate_data(next
.offset
, next
.header
170 ).safe_then([this, FNAME
, &budget_used
, &cursor
, &handler
, &next
](auto valid
) {
172 INFO("complete at {}, invalid record group data at {}, {}",
173 cursor
, next
.offset
, next
.header
);
174 cursor
.pending_record_groups
.clear();
175 return scan_valid_records_ertr::now();
177 return consume_next_records(cursor
, handler
, budget_used
);
180 }().safe_then([=, &budget_used
, &cursor
] {
181 if (cursor
.is_complete() || budget_used
>= budget
) {
182 DEBUG("finish at {}, budget_used={}, budget={}",
183 cursor
, budget_used
, budget
);
184 return seastar::stop_iteration::yes
;
186 return seastar::stop_iteration::no
;
189 }).safe_then([retref
=std::move(retref
)]() mutable -> scan_valid_records_ret
{
190 return scan_valid_records_ret(
191 scan_valid_records_ertr::ready_future_marker
{},
196 SegmentManagerGroup::read_validate_record_metadata_ret
197 SegmentManagerGroup::read_validate_record_metadata(
199 segment_nonce_t nonce
)
201 LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata
);
202 auto& seg_addr
= start
.as_seg_paddr();
203 assert(has_device(seg_addr
.get_segment_id().device_id()));
204 auto& segment_manager
= *segment_managers
[seg_addr
.get_segment_id().device_id()];
205 auto block_size
= segment_manager
.get_block_size();
206 auto segment_size
= static_cast<int64_t>(segment_manager
.get_segment_size());
207 if (seg_addr
.get_segment_off() + block_size
> segment_size
) {
208 DEBUG("failed -- record group header block {}~4096 > segment_size {}", start
, segment_size
);
209 return read_validate_record_metadata_ret(
210 read_validate_record_metadata_ertr::ready_future_marker
{},
213 TRACE("reading record group header block {}~4096", start
);
214 return segment_manager
.read(start
, block_size
215 ).safe_then([=, &segment_manager
](bufferptr bptr
) mutable
216 -> read_validate_record_metadata_ret
{
217 auto block_size
= segment_manager
.get_block_size();
220 auto maybe_header
= try_decode_records_header(bl
, nonce
);
221 if (!maybe_header
.has_value()) {
222 return read_validate_record_metadata_ret(
223 read_validate_record_metadata_ertr::ready_future_marker
{},
226 auto& seg_addr
= start
.as_seg_paddr();
227 auto& header
= *maybe_header
;
228 if (header
.mdlength
< block_size
||
229 header
.mdlength
% block_size
!= 0 ||
230 header
.dlength
% block_size
!= 0 ||
231 (header
.committed_to
!= JOURNAL_SEQ_NULL
&&
232 header
.committed_to
.offset
.as_seg_paddr().get_segment_off() % block_size
!= 0) ||
233 (seg_addr
.get_segment_off() + header
.mdlength
+ header
.dlength
> segment_size
)) {
234 ERROR("failed, invalid record group header {}", start
);
235 return crimson::ct_error::input_output_error::make();
237 if (header
.mdlength
== block_size
) {
238 return read_validate_record_metadata_ret(
239 read_validate_record_metadata_ertr::ready_future_marker
{},
240 std::make_pair(std::move(header
), std::move(bl
))
244 auto rest_start
= paddr_t::make_seg_paddr(
245 seg_addr
.get_segment_id(),
246 seg_addr
.get_segment_off() + block_size
248 auto rest_len
= header
.mdlength
- block_size
;
249 TRACE("reading record group header rest {}~{}", rest_start
, rest_len
);
250 return segment_manager
.read(rest_start
, rest_len
251 ).safe_then([header
=std::move(header
), bl
=std::move(bl
)
252 ](auto&& bptail
) mutable {
253 bl
.push_back(bptail
);
254 return read_validate_record_metadata_ret(
255 read_validate_record_metadata_ertr::ready_future_marker
{},
256 std::make_pair(std::move(header
), std::move(bl
)));
258 }).safe_then([](auto p
) {
259 if (p
&& validate_records_metadata(p
->second
)) {
260 return read_validate_record_metadata_ret(
261 read_validate_record_metadata_ertr::ready_future_marker
{},
265 return read_validate_record_metadata_ret(
266 read_validate_record_metadata_ertr::ready_future_marker
{},
272 SegmentManagerGroup::read_validate_data_ret
273 SegmentManagerGroup::read_validate_data(
275 const record_group_header_t
&header
)
277 LOG_PREFIX(SegmentManagerGroup::read_validate_data
);
278 assert(has_device(record_base
.get_device_id()));
279 auto& segment_manager
= *segment_managers
[record_base
.get_device_id()];
280 auto data_addr
= record_base
.add_offset(header
.mdlength
);
281 TRACE("reading record group data blocks {}~{}", data_addr
, header
.dlength
);
282 return segment_manager
.read(
285 ).safe_then([=, &header
](auto bptr
) {
288 return validate_records_data(header
, bl
);
292 SegmentManagerGroup::consume_record_group_ertr::future
<>
293 SegmentManagerGroup::consume_next_records(
294 scan_valid_records_cursor
& cursor
,
295 found_record_handler_t
& handler
,
296 std::size_t& budget_used
)
298 LOG_PREFIX(SegmentManagerGroup::consume_next_records
);
299 auto& next
= cursor
.pending_record_groups
.front();
300 auto total_length
= next
.header
.dlength
+ next
.header
.mdlength
;
301 budget_used
+= total_length
;
302 auto locator
= record_locator_t
{
303 next
.offset
.add_offset(next
.header
.mdlength
),
306 cursor
.seq
.segment_seq
,
312 DEBUG("processing {} at {}, budget_used={}",
313 next
.header
, locator
, budget_used
);
318 ).safe_then([FNAME
, &cursor
] {
319 cursor
.pop_record_group();
320 if (cursor
.is_complete()) {
321 INFO("complete at {}, no more record group", cursor
);
326 SegmentManagerGroup::find_journal_segment_headers_ret
327 SegmentManagerGroup::find_journal_segment_headers()
329 return seastar::do_with(
330 get_segment_managers(),
331 find_journal_segment_headers_ret_bare
{},
332 [this](auto &sms
, auto& ret
) -> find_journal_segment_headers_ret
334 return crimson::do_for_each(sms
,
335 [this, &ret
](SegmentManager
*sm
)
337 LOG_PREFIX(SegmentManagerGroup::find_journal_segment_headers
);
338 auto device_id
= sm
->get_device_id();
339 auto num_segments
= sm
->get_num_segments();
340 DEBUG("processing {} with {} segments",
341 device_id_printer_t
{device_id
}, num_segments
);
342 return crimson::do_for_each(
343 boost::counting_iterator
<device_segment_id_t
>(0),
344 boost::counting_iterator
<device_segment_id_t
>(num_segments
),
345 [this, &ret
, device_id
](device_segment_id_t d_segment_id
)
347 segment_id_t segment_id
{device_id
, d_segment_id
};
348 return read_segment_header(segment_id
349 ).safe_then([segment_id
, &ret
](auto &&header
) {
350 if (header
.get_type() == segment_type_t::JOURNAL
) {
351 ret
.emplace_back(std::make_pair(segment_id
, std::move(header
)));
354 crimson::ct_error::enoent::handle([](auto) {
355 return find_journal_segment_headers_ertr::now();
357 crimson::ct_error::enodata::handle([](auto) {
358 return find_journal_segment_headers_ertr::now();
360 crimson::ct_error::input_output_error::pass_further
{}
363 }).safe_then([&ret
]() mutable {
364 return find_journal_segment_headers_ret
{
365 find_journal_segment_headers_ertr::ready_future_marker
{},
371 } // namespace crimson::os::seastore