]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/extent_reader.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / crimson / os / seastore / extent_reader.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #include "crimson/os/seastore/segment_manager.h"
5 #include "crimson/os/seastore/extent_reader.h"
6 #include "crimson/common/log.h"
7
8 namespace {
9 seastar::logger& logger() {
10 return crimson::get_logger(ceph_subsys_seastore_tm);
11 }
12 }
13
14 namespace crimson::os::seastore {
15
16 ExtentReader::read_segment_header_ret
17 ExtentReader::read_segment_header(segment_id_t segment)
18 {
19 auto& segment_manager = *segment_managers[segment.device_id()];
20 return segment_manager.read(
21 paddr_t::make_seg_paddr(segment, 0),
22 segment_manager.get_block_size()
23 ).handle_error(
24 read_segment_header_ertr::pass_further{},
25 crimson::ct_error::assert_all{
26 "Invalid error in ExtentReader::read_segment_header"
27 }
28 ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret {
29 logger().debug("segment {} bptr size {}", segment, bptr.length());
30
31 segment_header_t header;
32 bufferlist bl;
33 bl.push_back(bptr);
34
35 logger().debug(
36 "ExtentReader::read_segment_header: segment {} block crc {}",
37 segment,
38 bl.begin().crc32c(segment_manager.get_block_size(), 0));
39
40 auto bp = bl.cbegin();
41 try {
42 decode(header, bp);
43 } catch (ceph::buffer::error &e) {
44 logger().debug(
45 "ExtentReader::read_segment_header: segment {} unable to decode "
46 "header, skipping -- {}",
47 segment, e);
48 return crimson::ct_error::enodata::make();
49 }
50 logger().debug(
51 "ExtentReader::read_segment_header: segment {} header {}",
52 segment,
53 header);
54 return read_segment_header_ret(
55 read_segment_header_ertr::ready_future_marker{},
56 header);
57 });
58 }
59
60 ExtentReader::scan_extents_ret ExtentReader::scan_extents(
61 scan_extents_cursor &cursor,
62 extent_len_t bytes_to_read)
63 {
64 auto ret = std::make_unique<scan_extents_ret_bare>();
65 auto* extents = ret.get();
66 return read_segment_header(cursor.get_segment_id()
67 ).handle_error(
68 scan_extents_ertr::pass_further{},
69 crimson::ct_error::assert_all{
70 "Invalid error in ExtentReader::scan_extents"
71 }
72 ).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
73 auto segment_nonce = segment_header.segment_nonce;
74 return seastar::do_with(
75 found_record_handler_t([extents](
76 record_locator_t locator,
77 const record_group_header_t& header,
78 const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
79 {
80 logger().debug("ExtentReader::scan_extents: decoding {} records",
81 header.records);
82 auto maybe_record_extent_infos = try_decode_extent_infos(header, mdbuf);
83 if (!maybe_record_extent_infos) {
84 // This should be impossible, we did check the crc on the mdbuf
85 logger().error(
86 "ExtentReader::scan_extents: unable to decode extents for record {}",
87 locator.record_block_base);
88 return crimson::ct_error::input_output_error::make();
89 }
90
91 paddr_t extent_offset = locator.record_block_base;
92 for (auto& r: *maybe_record_extent_infos) {
93 logger().debug("ExtentReader::scan_extents: decoded {} extents",
94 r.extent_infos.size());
95 for (const auto &i : r.extent_infos) {
96 extents->emplace_back(extent_offset, i);
97 auto& seg_addr = extent_offset.as_seg_paddr();
98 seg_addr.set_segment_off(
99 seg_addr.get_segment_off() + i.len);
100 }
101 }
102 return scan_extents_ertr::now();
103 }),
104 [bytes_to_read, segment_nonce, &cursor, this](auto &dhandler) {
105 return scan_valid_records(
106 cursor,
107 segment_nonce,
108 bytes_to_read,
109 dhandler
110 ).discard_result();
111 }
112 );
113 }).safe_then([ret=std::move(ret)] {
114 return std::move(*ret);
115 });
116 }
117
118 ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
119 scan_valid_records_cursor &cursor,
120 segment_nonce_t nonce,
121 size_t budget,
122 found_record_handler_t &handler)
123 {
124 auto& segment_manager =
125 *segment_managers[cursor.get_segment_id().device_id()];
126 if (cursor.get_segment_offset() == 0) {
127 cursor.increment(segment_manager.get_block_size());
128 }
129 auto retref = std::make_unique<size_t>(0);
130 auto &budget_used = *retref;
131 return crimson::repeat(
132 [=, &cursor, &budget_used, &handler]() mutable
133 -> scan_valid_records_ertr::future<seastar::stop_iteration> {
134 return [=, &handler, &cursor, &budget_used] {
135 if (!cursor.last_valid_header_found) {
136 return read_validate_record_metadata(cursor.seq.offset, nonce
137 ).safe_then([=, &cursor](auto md) {
138 logger().debug(
139 "ExtentReader::scan_valid_records: read complete {}",
140 cursor.seq);
141 if (!md) {
142 logger().debug(
143 "ExtentReader::scan_valid_records: found invalid header at {}, presumably at end",
144 cursor.seq);
145 cursor.last_valid_header_found = true;
146 return scan_valid_records_ertr::now();
147 } else {
148 auto& [header, md_bl] = *md;
149 auto new_committed_to = header.committed_to;
150 logger().debug(
151 "ExtentReader::scan_valid_records: valid record read at {}, now committed at {}",
152 cursor.seq,
153 new_committed_to);
154 ceph_assert(cursor.last_committed == journal_seq_t() ||
155 cursor.last_committed <= new_committed_to);
156 cursor.last_committed = new_committed_to;
157 cursor.pending_record_groups.emplace_back(
158 cursor.seq.offset,
159 header,
160 std::move(md_bl));
161 cursor.increment(header.dlength + header.mdlength);
162 ceph_assert(new_committed_to == journal_seq_t() ||
163 new_committed_to < cursor.seq);
164 return scan_valid_records_ertr::now();
165 }
166 }).safe_then([=, &cursor, &budget_used, &handler] {
167 return crimson::repeat(
168 [=, &budget_used, &cursor, &handler] {
169 logger().debug(
170 "ExtentReader::scan_valid_records: valid record read, processing queue");
171 if (cursor.pending_record_groups.empty()) {
172 /* This is only possible if the segment is empty.
173 * A record's last_commited must be prior to its own
174 * location since it itself cannot yet have been committed
175 * at its own time of submission. Thus, the most recently
176 * read record must always fall after cursor.last_committed */
177 return scan_valid_records_ertr::make_ready_future<
178 seastar::stop_iteration>(seastar::stop_iteration::yes);
179 }
180 auto &next = cursor.pending_record_groups.front();
181 journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
182 if (cursor.last_committed == journal_seq_t() ||
183 next_seq > cursor.last_committed) {
184 return scan_valid_records_ertr::make_ready_future<
185 seastar::stop_iteration>(seastar::stop_iteration::yes);
186 }
187 return consume_next_records(cursor, handler, budget_used
188 ).safe_then([] {
189 return scan_valid_records_ertr::make_ready_future<
190 seastar::stop_iteration>(seastar::stop_iteration::no);
191 });
192 });
193 });
194 } else {
195 assert(!cursor.pending_record_groups.empty());
196 auto &next = cursor.pending_record_groups.front();
197 return read_validate_data(next.offset, next.header
198 ).safe_then([this, &budget_used, &cursor, &handler](auto valid) {
199 if (!valid) {
200 cursor.pending_record_groups.clear();
201 return scan_valid_records_ertr::now();
202 }
203 return consume_next_records(cursor, handler, budget_used);
204 });
205 }
206 }().safe_then([=, &budget_used, &cursor] {
207 if (cursor.is_complete() || budget_used >= budget) {
208 return seastar::stop_iteration::yes;
209 } else {
210 return seastar::stop_iteration::no;
211 }
212 });
213 }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
214 return scan_valid_records_ret(
215 scan_valid_records_ertr::ready_future_marker{},
216 std::move(*retref));
217 });
218 }
219
220 ExtentReader::read_validate_record_metadata_ret
221 ExtentReader::read_validate_record_metadata(
222 paddr_t start,
223 segment_nonce_t nonce)
224 {
225 auto& seg_addr = start.as_seg_paddr();
226 auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
227 auto block_size = segment_manager.get_block_size();
228 if (seg_addr.get_segment_off() + block_size >
229 (int64_t)segment_manager.get_segment_size()) {
230 logger().debug("read_validate_record_metadata: failed, reach segment end");
231 return read_validate_record_metadata_ret(
232 read_validate_record_metadata_ertr::ready_future_marker{},
233 std::nullopt);
234 }
235 logger().debug("read_validate_record_metadata: reading header block {}...",
236 start);
237 return segment_manager.read(start, block_size
238 ).safe_then([=, &segment_manager](bufferptr bptr) mutable
239 -> read_validate_record_metadata_ret {
240 auto block_size = static_cast<extent_len_t>(
241 segment_manager.get_block_size());
242 bufferlist bl;
243 bl.append(bptr);
244 auto maybe_header = try_decode_records_header(bl, nonce);
245 if (!maybe_header.has_value()) {
246 return read_validate_record_metadata_ret(
247 read_validate_record_metadata_ertr::ready_future_marker{},
248 std::nullopt);
249 }
250 auto& seg_addr = start.as_seg_paddr();
251 auto& header = *maybe_header;
252 if (header.mdlength < block_size ||
253 header.mdlength % block_size != 0 ||
254 header.dlength % block_size != 0 ||
255 (header.committed_to != journal_seq_t() &&
256 header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
257 (seg_addr.get_segment_off() + header.mdlength + header.dlength >
258 (int64_t)segment_manager.get_segment_size())) {
259 logger().error("read_validate_record_metadata: failed, invalid header");
260 return crimson::ct_error::input_output_error::make();
261 }
262 if (header.mdlength == block_size) {
263 return read_validate_record_metadata_ret(
264 read_validate_record_metadata_ertr::ready_future_marker{},
265 std::make_pair(std::move(header), std::move(bl))
266 );
267 }
268 return segment_manager.read(
269 paddr_t::make_seg_paddr(
270 seg_addr.get_segment_id(),
271 seg_addr.get_segment_off() + (segment_off_t)block_size
272 ),
273 header.mdlength - block_size
274 ).safe_then([header=std::move(header), bl=std::move(bl)
275 ](auto&& bptail) mutable {
276 bl.push_back(bptail);
277 return read_validate_record_metadata_ret(
278 read_validate_record_metadata_ertr::ready_future_marker{},
279 std::make_pair(std::move(header), std::move(bl)));
280 });
281 }).safe_then([](auto p) {
282 if (p && validate_records_metadata(p->second)) {
283 return read_validate_record_metadata_ret(
284 read_validate_record_metadata_ertr::ready_future_marker{},
285 std::move(*p)
286 );
287 } else {
288 return read_validate_record_metadata_ret(
289 read_validate_record_metadata_ertr::ready_future_marker{},
290 std::nullopt);
291 }
292 });
293 }
294
295 ExtentReader::read_validate_data_ret
296 ExtentReader::read_validate_data(
297 paddr_t record_base,
298 const record_group_header_t &header)
299 {
300 auto& segment_manager = *segment_managers[record_base.get_device_id()];
301 auto data_addr = record_base.add_offset(header.mdlength);
302 logger().debug("read_validate_data: reading data blocks {}+{}...",
303 data_addr, header.dlength);
304 return segment_manager.read(
305 data_addr,
306 header.dlength
307 ).safe_then([=, &header](auto bptr) {
308 bufferlist bl;
309 bl.append(bptr);
310 return validate_records_data(header, bl);
311 });
312 }
313
314 ExtentReader::consume_record_group_ertr::future<>
315 ExtentReader::consume_next_records(
316 scan_valid_records_cursor& cursor,
317 found_record_handler_t& handler,
318 std::size_t& budget_used)
319 {
320 auto& next = cursor.pending_record_groups.front();
321 auto total_length = next.header.dlength + next.header.mdlength;
322 budget_used += total_length;
323 auto locator = record_locator_t{
324 next.offset.add_offset(next.header.mdlength),
325 write_result_t{
326 journal_seq_t{
327 cursor.seq.segment_seq,
328 next.offset
329 },
330 static_cast<segment_off_t>(total_length)
331 }
332 };
333 return handler(
334 locator,
335 next.header,
336 next.mdbuffer
337 ).safe_then([&cursor] {
338 cursor.pending_record_groups.pop_front();
339 });
340 }
341
342 } // namespace crimson::os::seastore