1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
4 #include "crimson/os/seastore/segment_manager.h"
5 #include "crimson/os/seastore/extent_reader.h"
6 #include "crimson/common/log.h"
9 seastar::logger
& logger() {
10 return crimson::get_logger(ceph_subsys_seastore_tm
);
14 namespace crimson::os::seastore
{
16 ExtentReader::read_segment_header_ret
17 ExtentReader::read_segment_header(segment_id_t segment
)
19 auto& segment_manager
= *segment_managers
[segment
.device_id()];
20 return segment_manager
.read(
21 paddr_t::make_seg_paddr(segment
, 0),
22 segment_manager
.get_block_size()
24 read_segment_header_ertr::pass_further
{},
25 crimson::ct_error::assert_all
{
26 "Invalid error in ExtentReader::read_segment_header"
28 ).safe_then([=, &segment_manager
](bufferptr bptr
) -> read_segment_header_ret
{
29 logger().debug("segment {} bptr size {}", segment
, bptr
.length());
31 segment_header_t header
;
36 "ExtentReader::read_segment_header: segment {} block crc {}",
38 bl
.begin().crc32c(segment_manager
.get_block_size(), 0));
40 auto bp
= bl
.cbegin();
43 } catch (ceph::buffer::error
&e
) {
45 "ExtentReader::read_segment_header: segment {} unable to decode "
46 "header, skipping -- {}",
48 return crimson::ct_error::enodata::make();
51 "ExtentReader::read_segment_header: segment {} header {}",
54 return read_segment_header_ret(
55 read_segment_header_ertr::ready_future_marker
{},
60 ExtentReader::scan_extents_ret
ExtentReader::scan_extents(
61 scan_extents_cursor
&cursor
,
62 extent_len_t bytes_to_read
)
64 auto ret
= std::make_unique
<scan_extents_ret_bare
>();
65 auto* extents
= ret
.get();
66 return read_segment_header(cursor
.get_segment_id()
68 scan_extents_ertr::pass_further
{},
69 crimson::ct_error::assert_all
{
70 "Invalid error in ExtentReader::scan_extents"
72 ).safe_then([bytes_to_read
, extents
, &cursor
, this](auto segment_header
) {
73 auto segment_nonce
= segment_header
.segment_nonce
;
74 return seastar::do_with(
75 found_record_handler_t([extents
](
76 record_locator_t locator
,
77 const record_group_header_t
& header
,
78 const bufferlist
& mdbuf
) mutable -> scan_valid_records_ertr::future
<>
80 logger().debug("ExtentReader::scan_extents: decoding {} records",
82 auto maybe_record_extent_infos
= try_decode_extent_infos(header
, mdbuf
);
83 if (!maybe_record_extent_infos
) {
84 // This should be impossible, we did check the crc on the mdbuf
86 "ExtentReader::scan_extents: unable to decode extents for record {}",
87 locator
.record_block_base
);
88 return crimson::ct_error::input_output_error::make();
91 paddr_t extent_offset
= locator
.record_block_base
;
92 for (auto& r
: *maybe_record_extent_infos
) {
93 logger().debug("ExtentReader::scan_extents: decoded {} extents",
94 r
.extent_infos
.size());
95 for (const auto &i
: r
.extent_infos
) {
96 extents
->emplace_back(extent_offset
, i
);
97 auto& seg_addr
= extent_offset
.as_seg_paddr();
98 seg_addr
.set_segment_off(
99 seg_addr
.get_segment_off() + i
.len
);
102 return scan_extents_ertr::now();
104 [bytes_to_read
, segment_nonce
, &cursor
, this](auto &dhandler
) {
105 return scan_valid_records(
113 }).safe_then([ret
=std::move(ret
)] {
114 return std::move(*ret
);
118 ExtentReader::scan_valid_records_ret
ExtentReader::scan_valid_records(
119 scan_valid_records_cursor
&cursor
,
120 segment_nonce_t nonce
,
122 found_record_handler_t
&handler
)
124 auto& segment_manager
=
125 *segment_managers
[cursor
.get_segment_id().device_id()];
126 if (cursor
.get_segment_offset() == 0) {
127 cursor
.increment(segment_manager
.get_block_size());
129 auto retref
= std::make_unique
<size_t>(0);
130 auto &budget_used
= *retref
;
131 return crimson::repeat(
132 [=, &cursor
, &budget_used
, &handler
]() mutable
133 -> scan_valid_records_ertr::future
<seastar::stop_iteration
> {
134 return [=, &handler
, &cursor
, &budget_used
] {
135 if (!cursor
.last_valid_header_found
) {
136 return read_validate_record_metadata(cursor
.seq
.offset
, nonce
137 ).safe_then([=, &cursor
](auto md
) {
139 "ExtentReader::scan_valid_records: read complete {}",
143 "ExtentReader::scan_valid_records: found invalid header at {}, presumably at end",
145 cursor
.last_valid_header_found
= true;
146 return scan_valid_records_ertr::now();
148 auto& [header
, md_bl
] = *md
;
149 auto new_committed_to
= header
.committed_to
;
151 "ExtentReader::scan_valid_records: valid record read at {}, now committed at {}",
154 ceph_assert(cursor
.last_committed
== journal_seq_t() ||
155 cursor
.last_committed
<= new_committed_to
);
156 cursor
.last_committed
= new_committed_to
;
157 cursor
.pending_record_groups
.emplace_back(
161 cursor
.increment(header
.dlength
+ header
.mdlength
);
162 ceph_assert(new_committed_to
== journal_seq_t() ||
163 new_committed_to
< cursor
.seq
);
164 return scan_valid_records_ertr::now();
166 }).safe_then([=, &cursor
, &budget_used
, &handler
] {
167 return crimson::repeat(
168 [=, &budget_used
, &cursor
, &handler
] {
170 "ExtentReader::scan_valid_records: valid record read, processing queue");
171 if (cursor
.pending_record_groups
.empty()) {
172 /* This is only possible if the segment is empty.
173 * A record's last_commited must be prior to its own
174 * location since it itself cannot yet have been committed
175 * at its own time of submission. Thus, the most recently
176 * read record must always fall after cursor.last_committed */
177 return scan_valid_records_ertr::make_ready_future
<
178 seastar::stop_iteration
>(seastar::stop_iteration::yes
);
180 auto &next
= cursor
.pending_record_groups
.front();
181 journal_seq_t next_seq
= {cursor
.seq
.segment_seq
, next
.offset
};
182 if (cursor
.last_committed
== journal_seq_t() ||
183 next_seq
> cursor
.last_committed
) {
184 return scan_valid_records_ertr::make_ready_future
<
185 seastar::stop_iteration
>(seastar::stop_iteration::yes
);
187 return consume_next_records(cursor
, handler
, budget_used
189 return scan_valid_records_ertr::make_ready_future
<
190 seastar::stop_iteration
>(seastar::stop_iteration::no
);
195 assert(!cursor
.pending_record_groups
.empty());
196 auto &next
= cursor
.pending_record_groups
.front();
197 return read_validate_data(next
.offset
, next
.header
198 ).safe_then([this, &budget_used
, &cursor
, &handler
](auto valid
) {
200 cursor
.pending_record_groups
.clear();
201 return scan_valid_records_ertr::now();
203 return consume_next_records(cursor
, handler
, budget_used
);
206 }().safe_then([=, &budget_used
, &cursor
] {
207 if (cursor
.is_complete() || budget_used
>= budget
) {
208 return seastar::stop_iteration::yes
;
210 return seastar::stop_iteration::no
;
213 }).safe_then([retref
=std::move(retref
)]() mutable -> scan_valid_records_ret
{
214 return scan_valid_records_ret(
215 scan_valid_records_ertr::ready_future_marker
{},
220 ExtentReader::read_validate_record_metadata_ret
221 ExtentReader::read_validate_record_metadata(
223 segment_nonce_t nonce
)
225 auto& seg_addr
= start
.as_seg_paddr();
226 auto& segment_manager
= *segment_managers
[seg_addr
.get_segment_id().device_id()];
227 auto block_size
= segment_manager
.get_block_size();
228 if (seg_addr
.get_segment_off() + block_size
>
229 (int64_t)segment_manager
.get_segment_size()) {
230 logger().debug("read_validate_record_metadata: failed, reach segment end");
231 return read_validate_record_metadata_ret(
232 read_validate_record_metadata_ertr::ready_future_marker
{},
235 logger().debug("read_validate_record_metadata: reading header block {}...",
237 return segment_manager
.read(start
, block_size
238 ).safe_then([=, &segment_manager
](bufferptr bptr
) mutable
239 -> read_validate_record_metadata_ret
{
240 auto block_size
= static_cast<extent_len_t
>(
241 segment_manager
.get_block_size());
244 auto maybe_header
= try_decode_records_header(bl
, nonce
);
245 if (!maybe_header
.has_value()) {
246 return read_validate_record_metadata_ret(
247 read_validate_record_metadata_ertr::ready_future_marker
{},
250 auto& seg_addr
= start
.as_seg_paddr();
251 auto& header
= *maybe_header
;
252 if (header
.mdlength
< block_size
||
253 header
.mdlength
% block_size
!= 0 ||
254 header
.dlength
% block_size
!= 0 ||
255 (header
.committed_to
!= journal_seq_t() &&
256 header
.committed_to
.offset
.as_seg_paddr().get_segment_off() % block_size
!= 0) ||
257 (seg_addr
.get_segment_off() + header
.mdlength
+ header
.dlength
>
258 (int64_t)segment_manager
.get_segment_size())) {
259 logger().error("read_validate_record_metadata: failed, invalid header");
260 return crimson::ct_error::input_output_error::make();
262 if (header
.mdlength
== block_size
) {
263 return read_validate_record_metadata_ret(
264 read_validate_record_metadata_ertr::ready_future_marker
{},
265 std::make_pair(std::move(header
), std::move(bl
))
268 return segment_manager
.read(
269 paddr_t::make_seg_paddr(
270 seg_addr
.get_segment_id(),
271 seg_addr
.get_segment_off() + (segment_off_t
)block_size
273 header
.mdlength
- block_size
274 ).safe_then([header
=std::move(header
), bl
=std::move(bl
)
275 ](auto&& bptail
) mutable {
276 bl
.push_back(bptail
);
277 return read_validate_record_metadata_ret(
278 read_validate_record_metadata_ertr::ready_future_marker
{},
279 std::make_pair(std::move(header
), std::move(bl
)));
281 }).safe_then([](auto p
) {
282 if (p
&& validate_records_metadata(p
->second
)) {
283 return read_validate_record_metadata_ret(
284 read_validate_record_metadata_ertr::ready_future_marker
{},
288 return read_validate_record_metadata_ret(
289 read_validate_record_metadata_ertr::ready_future_marker
{},
295 ExtentReader::read_validate_data_ret
296 ExtentReader::read_validate_data(
298 const record_group_header_t
&header
)
300 auto& segment_manager
= *segment_managers
[record_base
.get_device_id()];
301 auto data_addr
= record_base
.add_offset(header
.mdlength
);
302 logger().debug("read_validate_data: reading data blocks {}+{}...",
303 data_addr
, header
.dlength
);
304 return segment_manager
.read(
307 ).safe_then([=, &header
](auto bptr
) {
310 return validate_records_data(header
, bl
);
314 ExtentReader::consume_record_group_ertr::future
<>
315 ExtentReader::consume_next_records(
316 scan_valid_records_cursor
& cursor
,
317 found_record_handler_t
& handler
,
318 std::size_t& budget_used
)
320 auto& next
= cursor
.pending_record_groups
.front();
321 auto total_length
= next
.header
.dlength
+ next
.header
.mdlength
;
322 budget_used
+= total_length
;
323 auto locator
= record_locator_t
{
324 next
.offset
.add_offset(next
.header
.mdlength
),
327 cursor
.seq
.segment_seq
,
330 static_cast<segment_off_t
>(total_length
)
337 ).safe_then([&cursor
] {
338 cursor
.pending_record_groups
.pop_front();
342 } // namespace crimson::os::seastore