]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | // vim: ts=8 sw=2 smarttab expandtab | |
3 | ||
4 | #include "crimson/os/seastore/segment_manager_group.h" | |
5 | ||
6 | #include "crimson/os/seastore/logging.h" | |
7 | ||
8 | SET_SUBSYS(seastore_journal); | |
9 | ||
10 | namespace crimson::os::seastore { | |
11 | ||
12 | SegmentManagerGroup::read_segment_tail_ret | |
13 | SegmentManagerGroup::read_segment_tail(segment_id_t segment) | |
14 | { | |
15 | assert(has_device(segment.device_id())); | |
16 | auto& segment_manager = *segment_managers[segment.device_id()]; | |
17 | return segment_manager.read( | |
18 | paddr_t::make_seg_paddr( | |
19 | segment, | |
20 | segment_manager.get_segment_size() - get_rounded_tail_length()), | |
21 | get_rounded_tail_length() | |
22 | ).handle_error( | |
23 | read_segment_header_ertr::pass_further{}, | |
24 | crimson::ct_error::assert_all{ | |
25 | "Invalid error in SegmentManagerGroup::read_segment_tail" | |
26 | } | |
27 | ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_tail_ret { | |
28 | LOG_PREFIX(SegmentManagerGroup::read_segment_tail); | |
29 | DEBUG("segment {} bptr size {}", segment, bptr.length()); | |
30 | ||
31 | segment_tail_t tail; | |
32 | bufferlist bl; | |
33 | bl.push_back(bptr); | |
34 | ||
35 | DEBUG("segment {} block crc {}", | |
36 | segment, | |
37 | bl.begin().crc32c(segment_manager.get_block_size(), 0)); | |
38 | ||
39 | auto bp = bl.cbegin(); | |
40 | try { | |
41 | decode(tail, bp); | |
42 | } catch (ceph::buffer::error &e) { | |
43 | DEBUG("segment {} unable to decode tail, skipping -- {}", | |
44 | segment, e.what()); | |
45 | return crimson::ct_error::enodata::make(); | |
46 | } | |
47 | DEBUG("segment {} tail {}", segment, tail); | |
48 | return read_segment_tail_ret( | |
49 | read_segment_tail_ertr::ready_future_marker{}, | |
50 | tail); | |
51 | }); | |
52 | } | |
53 | ||
54 | SegmentManagerGroup::read_segment_header_ret | |
55 | SegmentManagerGroup::read_segment_header(segment_id_t segment) | |
56 | { | |
57 | assert(has_device(segment.device_id())); | |
58 | auto& segment_manager = *segment_managers[segment.device_id()]; | |
59 | return segment_manager.read( | |
60 | paddr_t::make_seg_paddr(segment, 0), | |
61 | get_rounded_header_length() | |
62 | ).handle_error( | |
63 | read_segment_header_ertr::pass_further{}, | |
64 | crimson::ct_error::assert_all{ | |
65 | "Invalid error in SegmentManagerGroup::read_segment_header" | |
66 | } | |
67 | ).safe_then([=, &segment_manager](bufferptr bptr) -> read_segment_header_ret { | |
68 | LOG_PREFIX(SegmentManagerGroup::read_segment_header); | |
69 | DEBUG("segment {} bptr size {}", segment, bptr.length()); | |
70 | ||
71 | segment_header_t header; | |
72 | bufferlist bl; | |
73 | bl.push_back(bptr); | |
74 | ||
75 | DEBUG("segment {} block crc {}", | |
76 | segment, | |
77 | bl.begin().crc32c(segment_manager.get_block_size(), 0)); | |
78 | ||
79 | auto bp = bl.cbegin(); | |
80 | try { | |
81 | decode(header, bp); | |
82 | } catch (ceph::buffer::error &e) { | |
83 | DEBUG("segment {} unable to decode header, skipping -- {}", | |
84 | segment, e.what()); | |
85 | return crimson::ct_error::enodata::make(); | |
86 | } | |
87 | DEBUG("segment {} header {}", segment, header); | |
88 | return read_segment_header_ret( | |
89 | read_segment_header_ertr::ready_future_marker{}, | |
90 | header); | |
91 | }); | |
92 | } | |
93 | ||
94 | SegmentManagerGroup::scan_valid_records_ret | |
95 | SegmentManagerGroup::scan_valid_records( | |
96 | scan_valid_records_cursor &cursor, | |
97 | segment_nonce_t nonce, | |
98 | size_t budget, | |
99 | found_record_handler_t &handler) | |
100 | { | |
101 | LOG_PREFIX(SegmentManagerGroup::scan_valid_records); | |
102 | assert(has_device(cursor.get_segment_id().device_id())); | |
103 | auto& segment_manager = | |
104 | *segment_managers[cursor.get_segment_id().device_id()]; | |
105 | if (cursor.get_segment_offset() == 0) { | |
106 | INFO("start to scan segment {}", cursor.get_segment_id()); | |
107 | cursor.increment_seq(segment_manager.get_block_size()); | |
108 | } | |
109 | DEBUG("starting at {}, budget={}", cursor, budget); | |
110 | auto retref = std::make_unique<size_t>(0); | |
111 | auto &budget_used = *retref; | |
112 | return crimson::repeat( | |
113 | [=, &cursor, &budget_used, &handler, this]() mutable | |
114 | -> scan_valid_records_ertr::future<seastar::stop_iteration> { | |
115 | return [=, &handler, &cursor, &budget_used, this] { | |
116 | if (!cursor.last_valid_header_found) { | |
117 | return read_validate_record_metadata(cursor.seq.offset, nonce | |
118 | ).safe_then([=, &cursor](auto md) { | |
119 | if (!md) { | |
120 | cursor.last_valid_header_found = true; | |
121 | if (cursor.is_complete()) { | |
122 | INFO("complete at {}, invalid record group metadata", | |
123 | cursor); | |
124 | } else { | |
125 | DEBUG("found invalid record group metadata at {}, " | |
126 | "processing {} pending record groups", | |
127 | cursor.seq, | |
128 | cursor.pending_record_groups.size()); | |
129 | } | |
130 | return scan_valid_records_ertr::now(); | |
131 | } else { | |
132 | auto& [header, md_bl] = *md; | |
133 | DEBUG("found valid {} at {}", header, cursor.seq); | |
134 | cursor.emplace_record_group(header, std::move(md_bl)); | |
135 | return scan_valid_records_ertr::now(); | |
136 | } | |
137 | }).safe_then([=, &cursor, &budget_used, &handler, this] { | |
138 | DEBUG("processing committed record groups until {}, {} pending", | |
139 | cursor.last_committed, | |
140 | cursor.pending_record_groups.size()); | |
141 | return crimson::repeat( | |
142 | [=, &budget_used, &cursor, &handler, this] { | |
143 | if (cursor.pending_record_groups.empty()) { | |
144 | /* This is only possible if the segment is empty. | |
145 | * A record's last_commited must be prior to its own | |
146 | * location since it itself cannot yet have been committed | |
147 | * at its own time of submission. Thus, the most recently | |
148 | * read record must always fall after cursor.last_committed */ | |
149 | return scan_valid_records_ertr::make_ready_future< | |
150 | seastar::stop_iteration>(seastar::stop_iteration::yes); | |
151 | } | |
152 | auto &next = cursor.pending_record_groups.front(); | |
153 | journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset}; | |
154 | if (cursor.last_committed == JOURNAL_SEQ_NULL || | |
155 | next_seq > cursor.last_committed) { | |
156 | return scan_valid_records_ertr::make_ready_future< | |
157 | seastar::stop_iteration>(seastar::stop_iteration::yes); | |
158 | } | |
159 | return consume_next_records(cursor, handler, budget_used | |
160 | ).safe_then([] { | |
161 | return scan_valid_records_ertr::make_ready_future< | |
162 | seastar::stop_iteration>(seastar::stop_iteration::no); | |
163 | }); | |
164 | }); | |
165 | }); | |
166 | } else { | |
167 | assert(!cursor.pending_record_groups.empty()); | |
168 | auto &next = cursor.pending_record_groups.front(); | |
169 | return read_validate_data(next.offset, next.header | |
170 | ).safe_then([this, FNAME, &budget_used, &cursor, &handler, &next](auto valid) { | |
171 | if (!valid) { | |
172 | INFO("complete at {}, invalid record group data at {}, {}", | |
173 | cursor, next.offset, next.header); | |
174 | cursor.pending_record_groups.clear(); | |
175 | return scan_valid_records_ertr::now(); | |
176 | } | |
177 | return consume_next_records(cursor, handler, budget_used); | |
178 | }); | |
179 | } | |
180 | }().safe_then([=, &budget_used, &cursor] { | |
181 | if (cursor.is_complete() || budget_used >= budget) { | |
182 | DEBUG("finish at {}, budget_used={}, budget={}", | |
183 | cursor, budget_used, budget); | |
184 | return seastar::stop_iteration::yes; | |
185 | } else { | |
186 | return seastar::stop_iteration::no; | |
187 | } | |
188 | }); | |
189 | }).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret { | |
190 | return scan_valid_records_ret( | |
191 | scan_valid_records_ertr::ready_future_marker{}, | |
192 | std::move(*retref)); | |
193 | }); | |
194 | } | |
195 | ||
196 | SegmentManagerGroup::read_validate_record_metadata_ret | |
197 | SegmentManagerGroup::read_validate_record_metadata( | |
198 | paddr_t start, | |
199 | segment_nonce_t nonce) | |
200 | { | |
201 | LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata); | |
202 | auto& seg_addr = start.as_seg_paddr(); | |
203 | assert(has_device(seg_addr.get_segment_id().device_id())); | |
204 | auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()]; | |
205 | auto block_size = segment_manager.get_block_size(); | |
206 | auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size()); | |
207 | if (seg_addr.get_segment_off() + block_size > segment_size) { | |
208 | DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size); | |
209 | return read_validate_record_metadata_ret( | |
210 | read_validate_record_metadata_ertr::ready_future_marker{}, | |
211 | std::nullopt); | |
212 | } | |
213 | TRACE("reading record group header block {}~4096", start); | |
214 | return segment_manager.read(start, block_size | |
215 | ).safe_then([=, &segment_manager](bufferptr bptr) mutable | |
216 | -> read_validate_record_metadata_ret { | |
217 | auto block_size = segment_manager.get_block_size(); | |
218 | bufferlist bl; | |
219 | bl.append(bptr); | |
220 | auto maybe_header = try_decode_records_header(bl, nonce); | |
221 | if (!maybe_header.has_value()) { | |
222 | return read_validate_record_metadata_ret( | |
223 | read_validate_record_metadata_ertr::ready_future_marker{}, | |
224 | std::nullopt); | |
225 | } | |
226 | auto& seg_addr = start.as_seg_paddr(); | |
227 | auto& header = *maybe_header; | |
228 | if (header.mdlength < block_size || | |
229 | header.mdlength % block_size != 0 || | |
230 | header.dlength % block_size != 0 || | |
231 | (header.committed_to != JOURNAL_SEQ_NULL && | |
232 | header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) || | |
233 | (seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) { | |
234 | ERROR("failed, invalid record group header {}", start); | |
235 | return crimson::ct_error::input_output_error::make(); | |
236 | } | |
237 | if (header.mdlength == block_size) { | |
238 | return read_validate_record_metadata_ret( | |
239 | read_validate_record_metadata_ertr::ready_future_marker{}, | |
240 | std::make_pair(std::move(header), std::move(bl)) | |
241 | ); | |
242 | } | |
243 | ||
244 | auto rest_start = paddr_t::make_seg_paddr( | |
245 | seg_addr.get_segment_id(), | |
246 | seg_addr.get_segment_off() + block_size | |
247 | ); | |
248 | auto rest_len = header.mdlength - block_size; | |
249 | TRACE("reading record group header rest {}~{}", rest_start, rest_len); | |
250 | return segment_manager.read(rest_start, rest_len | |
251 | ).safe_then([header=std::move(header), bl=std::move(bl) | |
252 | ](auto&& bptail) mutable { | |
253 | bl.push_back(bptail); | |
254 | return read_validate_record_metadata_ret( | |
255 | read_validate_record_metadata_ertr::ready_future_marker{}, | |
256 | std::make_pair(std::move(header), std::move(bl))); | |
257 | }); | |
258 | }).safe_then([](auto p) { | |
259 | if (p && validate_records_metadata(p->second)) { | |
260 | return read_validate_record_metadata_ret( | |
261 | read_validate_record_metadata_ertr::ready_future_marker{}, | |
262 | std::move(*p) | |
263 | ); | |
264 | } else { | |
265 | return read_validate_record_metadata_ret( | |
266 | read_validate_record_metadata_ertr::ready_future_marker{}, | |
267 | std::nullopt); | |
268 | } | |
269 | }); | |
270 | } | |
271 | ||
272 | SegmentManagerGroup::read_validate_data_ret | |
273 | SegmentManagerGroup::read_validate_data( | |
274 | paddr_t record_base, | |
275 | const record_group_header_t &header) | |
276 | { | |
277 | LOG_PREFIX(SegmentManagerGroup::read_validate_data); | |
278 | assert(has_device(record_base.get_device_id())); | |
279 | auto& segment_manager = *segment_managers[record_base.get_device_id()]; | |
280 | auto data_addr = record_base.add_offset(header.mdlength); | |
281 | TRACE("reading record group data blocks {}~{}", data_addr, header.dlength); | |
282 | return segment_manager.read( | |
283 | data_addr, | |
284 | header.dlength | |
285 | ).safe_then([=, &header](auto bptr) { | |
286 | bufferlist bl; | |
287 | bl.append(bptr); | |
288 | return validate_records_data(header, bl); | |
289 | }); | |
290 | } | |
291 | ||
292 | SegmentManagerGroup::consume_record_group_ertr::future<> | |
293 | SegmentManagerGroup::consume_next_records( | |
294 | scan_valid_records_cursor& cursor, | |
295 | found_record_handler_t& handler, | |
296 | std::size_t& budget_used) | |
297 | { | |
298 | LOG_PREFIX(SegmentManagerGroup::consume_next_records); | |
299 | auto& next = cursor.pending_record_groups.front(); | |
300 | auto total_length = next.header.dlength + next.header.mdlength; | |
301 | budget_used += total_length; | |
302 | auto locator = record_locator_t{ | |
303 | next.offset.add_offset(next.header.mdlength), | |
304 | write_result_t{ | |
305 | journal_seq_t{ | |
306 | cursor.seq.segment_seq, | |
307 | next.offset | |
308 | }, | |
309 | total_length | |
310 | } | |
311 | }; | |
312 | DEBUG("processing {} at {}, budget_used={}", | |
313 | next.header, locator, budget_used); | |
314 | return handler( | |
315 | locator, | |
316 | next.header, | |
317 | next.mdbuffer | |
318 | ).safe_then([FNAME, &cursor] { | |
319 | cursor.pop_record_group(); | |
320 | if (cursor.is_complete()) { | |
321 | INFO("complete at {}, no more record group", cursor); | |
322 | } | |
323 | }); | |
324 | } | |
325 | ||
326 | SegmentManagerGroup::find_journal_segment_headers_ret | |
327 | SegmentManagerGroup::find_journal_segment_headers() | |
328 | { | |
329 | return seastar::do_with( | |
330 | get_segment_managers(), | |
331 | find_journal_segment_headers_ret_bare{}, | |
332 | [this](auto &sms, auto& ret) -> find_journal_segment_headers_ret | |
333 | { | |
334 | return crimson::do_for_each(sms, | |
335 | [this, &ret](SegmentManager *sm) | |
336 | { | |
337 | LOG_PREFIX(SegmentManagerGroup::find_journal_segment_headers); | |
338 | auto device_id = sm->get_device_id(); | |
339 | auto num_segments = sm->get_num_segments(); | |
340 | DEBUG("processing {} with {} segments", | |
341 | device_id_printer_t{device_id}, num_segments); | |
342 | return crimson::do_for_each( | |
343 | boost::counting_iterator<device_segment_id_t>(0), | |
344 | boost::counting_iterator<device_segment_id_t>(num_segments), | |
345 | [this, &ret, device_id](device_segment_id_t d_segment_id) | |
346 | { | |
347 | segment_id_t segment_id{device_id, d_segment_id}; | |
348 | return read_segment_header(segment_id | |
349 | ).safe_then([segment_id, &ret](auto &&header) { | |
350 | if (header.get_type() == segment_type_t::JOURNAL) { | |
351 | ret.emplace_back(std::make_pair(segment_id, std::move(header))); | |
352 | } | |
353 | }).handle_error( | |
354 | crimson::ct_error::enoent::handle([](auto) { | |
355 | return find_journal_segment_headers_ertr::now(); | |
356 | }), | |
357 | crimson::ct_error::enodata::handle([](auto) { | |
358 | return find_journal_segment_headers_ertr::now(); | |
359 | }), | |
360 | crimson::ct_error::input_output_error::pass_further{} | |
361 | ); | |
362 | }); | |
363 | }).safe_then([&ret]() mutable { | |
364 | return find_journal_segment_headers_ret{ | |
365 | find_journal_segment_headers_ertr::ready_future_marker{}, | |
366 | std::move(ret)}; | |
367 | }); | |
368 | }); | |
369 | } | |
370 | ||
371 | } // namespace crimson::os::seastore |