]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/segment_manager_group.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / os / seastore / segment_manager_group.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #include "crimson/os/seastore/segment_manager_group.h"
5
6 #include "crimson/os/seastore/logging.h"
7
8 SET_SUBSYS(seastore_journal);
9
10 namespace crimson::os::seastore {
11
12 SegmentManagerGroup::read_segment_tail_ret
13 SegmentManagerGroup::read_segment_tail(segment_id_t segment)
14 {
15 assert(has_device(segment.device_id()));
16 auto& segment_manager = *segment_managers[segment.device_id()];
17 return segment_manager.read(
18 paddr_t::make_seg_paddr(
19 segment,
20 segment_manager.get_segment_size() - get_rounded_tail_length()),
21 get_rounded_tail_length()
22 ).handle_error(
23 read_segment_header_ertr::pass_further{},
24 crimson::ct_error::assert_all{
25 "Invalid error in SegmentManagerGroup::read_segment_tail"
26 }
27 ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret {
28 LOG_PREFIX(SegmentManagerGroup::read_segment_tail);
29 DEBUG("segment {} bptr size {}", segment, bptr.length());
30
31 segment_tail_t tail;
32 bufferlist bl;
33 bl.push_back(bptr);
34
35 DEBUG("segment {} block crc {}",
36 segment,
37 bl.begin().crc32c(segment_manager.get_block_size(), 0));
38
39 auto bp = bl.cbegin();
40 try {
41 decode(tail, bp);
42 } catch (ceph::buffer::error &e) {
43 DEBUG("segment {} unable to decode tail, skipping -- {}",
44 segment, e.what());
45 return crimson::ct_error::enodata::make();
46 }
47 DEBUG("segment {} tail {}", segment, tail);
48 return read_segment_tail_ret(
49 read_segment_tail_ertr::ready_future_marker{},
50 tail);
51 });
52 }
53
54 SegmentManagerGroup::read_segment_header_ret
55 SegmentManagerGroup::read_segment_header(segment_id_t segment)
56 {
57 assert(has_device(segment.device_id()));
58 auto& segment_manager = *segment_managers[segment.device_id()];
59 return segment_manager.read(
60 paddr_t::make_seg_paddr(segment, 0),
61 get_rounded_header_length()
62 ).handle_error(
63 read_segment_header_ertr::pass_further{},
64 crimson::ct_error::assert_all{
65 "Invalid error in SegmentManagerGroup::read_segment_header"
66 }
67 ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret {
68 LOG_PREFIX(SegmentManagerGroup::read_segment_header);
69 DEBUG("segment {} bptr size {}", segment, bptr.length());
70
71 segment_header_t header;
72 bufferlist bl;
73 bl.push_back(bptr);
74
75 DEBUG("segment {} block crc {}",
76 segment,
77 bl.begin().crc32c(segment_manager.get_block_size(), 0));
78
79 auto bp = bl.cbegin();
80 try {
81 decode(header, bp);
82 } catch (ceph::buffer::error &e) {
83 DEBUG("segment {} unable to decode header, skipping -- {}",
84 segment, e.what());
85 return crimson::ct_error::enodata::make();
86 }
87 DEBUG("segment {} header {}", segment, header);
88 return read_segment_header_ret(
89 read_segment_header_ertr::ready_future_marker{},
90 header);
91 });
92 }
93
94 SegmentManagerGroup::scan_valid_records_ret
95 SegmentManagerGroup::scan_valid_records(
96 scan_valid_records_cursor &cursor,
97 segment_nonce_t nonce,
98 size_t budget,
99 found_record_handler_t &handler)
100 {
101 LOG_PREFIX(SegmentManagerGroup::scan_valid_records);
102 assert(has_device(cursor.get_segment_id().device_id()));
103 auto& segment_manager =
104 *segment_managers[cursor.get_segment_id().device_id()];
105 if (cursor.get_segment_offset() == 0) {
106 INFO("start to scan segment {}", cursor.get_segment_id());
107 cursor.increment_seq(segment_manager.get_block_size());
108 }
109 DEBUG("starting at {}, budget={}", cursor, budget);
110 auto retref = std::make_unique<size_t>(0);
111 auto &budget_used = *retref;
112 return crimson::repeat(
113 [=, &cursor, &budget_used, &handler, this]() mutable
114 -> scan_valid_records_ertr::future<seastar::stop_iteration> {
115 return [=, &handler, &cursor, &budget_used, this] {
116 if (!cursor.last_valid_header_found) {
117 return read_validate_record_metadata(cursor.seq.offset, nonce
118 ).safe_then([=, &cursor](auto md) {
119 if (!md) {
120 cursor.last_valid_header_found = true;
121 if (cursor.is_complete()) {
122 INFO("complete at {}, invalid record group metadata",
123 cursor);
124 } else {
125 DEBUG("found invalid record group metadata at {}, "
126 "processing {} pending record groups",
127 cursor.seq,
128 cursor.pending_record_groups.size());
129 }
130 return scan_valid_records_ertr::now();
131 } else {
132 auto& [header, md_bl] = *md;
133 DEBUG("found valid {} at {}", header, cursor.seq);
134 cursor.emplace_record_group(header, std::move(md_bl));
135 return scan_valid_records_ertr::now();
136 }
137 }).safe_then([=, &cursor, &budget_used, &handler, this] {
138 DEBUG("processing committed record groups until {}, {} pending",
139 cursor.last_committed,
140 cursor.pending_record_groups.size());
141 return crimson::repeat(
142 [=, &budget_used, &cursor, &handler, this] {
143 if (cursor.pending_record_groups.empty()) {
144 /* This is only possible if the segment is empty.
145 * A record's last_commited must be prior to its own
146 * location since it itself cannot yet have been committed
147 * at its own time of submission. Thus, the most recently
148 * read record must always fall after cursor.last_committed */
149 return scan_valid_records_ertr::make_ready_future<
150 seastar::stop_iteration>(seastar::stop_iteration::yes);
151 }
152 auto &next = cursor.pending_record_groups.front();
153 journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
154 if (cursor.last_committed == JOURNAL_SEQ_NULL ||
155 next_seq > cursor.last_committed) {
156 return scan_valid_records_ertr::make_ready_future<
157 seastar::stop_iteration>(seastar::stop_iteration::yes);
158 }
159 return consume_next_records(cursor, handler, budget_used
160 ).safe_then([] {
161 return scan_valid_records_ertr::make_ready_future<
162 seastar::stop_iteration>(seastar::stop_iteration::no);
163 });
164 });
165 });
166 } else {
167 assert(!cursor.pending_record_groups.empty());
168 auto &next = cursor.pending_record_groups.front();
169 return read_validate_data(next.offset, next.header
170 ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) {
171 if (!valid) {
172 INFO("complete at {}, invalid record group data at {}, {}",
173 cursor, next.offset, next.header);
174 cursor.pending_record_groups.clear();
175 return scan_valid_records_ertr::now();
176 }
177 return consume_next_records(cursor, handler, budget_used);
178 });
179 }
180 }().safe_then([=, &budget_used, &cursor] {
181 if (cursor.is_complete() || budget_used >= budget) {
182 DEBUG("finish at {}, budget_used={}, budget={}",
183 cursor, budget_used, budget);
184 return seastar::stop_iteration::yes;
185 } else {
186 return seastar::stop_iteration::no;
187 }
188 });
189 }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
190 return scan_valid_records_ret(
191 scan_valid_records_ertr::ready_future_marker{},
192 std::move(*retref));
193 });
194 }
195
196 SegmentManagerGroup::read_validate_record_metadata_ret
197 SegmentManagerGroup::read_validate_record_metadata(
198 paddr_t start,
199 segment_nonce_t nonce)
200 {
201 LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata);
202 auto& seg_addr = start.as_seg_paddr();
203 assert(has_device(seg_addr.get_segment_id().device_id()));
204 auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
205 auto block_size = segment_manager.get_block_size();
206 auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
207 if (seg_addr.get_segment_off() + block_size > segment_size) {
208 DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
209 return read_validate_record_metadata_ret(
210 read_validate_record_metadata_ertr::ready_future_marker{},
211 std::nullopt);
212 }
213 TRACE("reading record group header block {}~4096", start);
214 return segment_manager.read(start, block_size
215 ).safe_then([=, &segment_manager](bufferptr bptr) mutable
216 -> read_validate_record_metadata_ret {
217 auto block_size = segment_manager.get_block_size();
218 bufferlist bl;
219 bl.append(bptr);
220 auto maybe_header = try_decode_records_header(bl, nonce);
221 if (!maybe_header.has_value()) {
222 return read_validate_record_metadata_ret(
223 read_validate_record_metadata_ertr::ready_future_marker{},
224 std::nullopt);
225 }
226 auto& seg_addr = start.as_seg_paddr();
227 auto& header = *maybe_header;
228 if (header.mdlength < block_size ||
229 header.mdlength % block_size != 0 ||
230 header.dlength % block_size != 0 ||
231 (header.committed_to != JOURNAL_SEQ_NULL &&
232 header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
233 (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
234 ERROR("failed, invalid record group header {}", start);
235 return crimson::ct_error::input_output_error::make();
236 }
237 if (header.mdlength == block_size) {
238 return read_validate_record_metadata_ret(
239 read_validate_record_metadata_ertr::ready_future_marker{},
240 std::make_pair(std::move(header), std::move(bl))
241 );
242 }
243
244 auto rest_start = paddr_t::make_seg_paddr(
245 seg_addr.get_segment_id(),
246 seg_addr.get_segment_off() + block_size
247 );
248 auto rest_len = header.mdlength - block_size;
249 TRACE("reading record group header rest {}~{}", rest_start, rest_len);
250 return segment_manager.read(rest_start, rest_len
251 ).safe_then([header=std::move(header), bl=std::move(bl)
252 ](auto&& bptail) mutable {
253 bl.push_back(bptail);
254 return read_validate_record_metadata_ret(
255 read_validate_record_metadata_ertr::ready_future_marker{},
256 std::make_pair(std::move(header), std::move(bl)));
257 });
258 }).safe_then([](auto p) {
259 if (p && validate_records_metadata(p->second)) {
260 return read_validate_record_metadata_ret(
261 read_validate_record_metadata_ertr::ready_future_marker{},
262 std::move(*p)
263 );
264 } else {
265 return read_validate_record_metadata_ret(
266 read_validate_record_metadata_ertr::ready_future_marker{},
267 std::nullopt);
268 }
269 });
270 }
271
272 SegmentManagerGroup::read_validate_data_ret
273 SegmentManagerGroup::read_validate_data(
274 paddr_t record_base,
275 const record_group_header_t &header)
276 {
277 LOG_PREFIX(SegmentManagerGroup::read_validate_data);
278 assert(has_device(record_base.get_device_id()));
279 auto& segment_manager = *segment_managers[record_base.get_device_id()];
280 auto data_addr = record_base.add_offset(header.mdlength);
281 TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
282 return segment_manager.read(
283 data_addr,
284 header.dlength
285 ).safe_then([=, &header](auto bptr) {
286 bufferlist bl;
287 bl.append(bptr);
288 return validate_records_data(header, bl);
289 });
290 }
291
292 SegmentManagerGroup::consume_record_group_ertr::future<>
293 SegmentManagerGroup::consume_next_records(
294 scan_valid_records_cursor& cursor,
295 found_record_handler_t& handler,
296 std::size_t& budget_used)
297 {
298 LOG_PREFIX(SegmentManagerGroup::consume_next_records);
299 auto& next = cursor.pending_record_groups.front();
300 auto total_length = next.header.dlength + next.header.mdlength;
301 budget_used += total_length;
302 auto locator = record_locator_t{
303 next.offset.add_offset(next.header.mdlength),
304 write_result_t{
305 journal_seq_t{
306 cursor.seq.segment_seq,
307 next.offset
308 },
309 total_length
310 }
311 };
312 DEBUG("processing {} at {}, budget_used={}",
313 next.header, locator, budget_used);
314 return handler(
315 locator,
316 next.header,
317 next.mdbuffer
318 ).safe_then([FNAME, &cursor] {
319 cursor.pop_record_group();
320 if (cursor.is_complete()) {
321 INFO("complete at {}, no more record group", cursor);
322 }
323 });
324 }
325
326 SegmentManagerGroup::find_journal_segment_headers_ret
327 SegmentManagerGroup::find_journal_segment_headers()
328 {
329 return seastar::do_with(
330 get_segment_managers(),
331 find_journal_segment_headers_ret_bare{},
332 [this](auto &sms, auto& ret) -> find_journal_segment_headers_ret
333 {
334 return crimson::do_for_each(sms,
335 [this, &ret](SegmentManager *sm)
336 {
337 LOG_PREFIX(SegmentManagerGroup::find_journal_segment_headers);
338 auto device_id = sm->get_device_id();
339 auto num_segments = sm->get_num_segments();
340 DEBUG("processing {} with {} segments",
341 device_id_printer_t{device_id}, num_segments);
342 return crimson::do_for_each(
343 boost::counting_iterator<device_segment_id_t>(0),
344 boost::counting_iterator<device_segment_id_t>(num_segments),
345 [this, &ret, device_id](device_segment_id_t d_segment_id)
346 {
347 segment_id_t segment_id{device_id, d_segment_id};
348 return read_segment_header(segment_id
349 ).safe_then([segment_id, &ret](auto &&header) {
350 if (header.get_type() == segment_type_t::JOURNAL) {
351 ret.emplace_back(std::make_pair(segment_id, std::move(header)));
352 }
353 }).handle_error(
354 crimson::ct_error::enoent::handle([](auto) {
355 return find_journal_segment_headers_ertr::now();
356 }),
357 crimson::ct_error::enodata::handle([](auto) {
358 return find_journal_segment_headers_ertr::now();
359 }),
360 crimson::ct_error::input_output_error::pass_further{}
361 );
362 });
363 }).safe_then([&ret]() mutable {
364 return find_journal_segment_headers_ret{
365 find_journal_segment_headers_ertr::ready_future_marker{},
366 std::move(ret)};
367 });
368 });
369 }
370
371 } // namespace crimson::os::seastore