]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | // vim: ts=8 sw=2 smarttab expandtab | |
3 | ||
4 | #include "record_submitter.h" | |
5 | ||
6 | #include <fmt/format.h> | |
7 | #include <fmt/os.h> | |
8 | ||
9 | #include "crimson/os/seastore/logging.h" | |
10 | #include "crimson/os/seastore/async_cleaner.h" | |
11 | ||
12 | SET_SUBSYS(seastore_journal); | |
13 | ||
14 | namespace crimson::os::seastore::journal { | |
15 | ||
16 | RecordBatch::add_pending_ret | |
17 | RecordBatch::add_pending( | |
18 | const std::string& name, | |
19 | record_t&& record, | |
20 | extent_len_t block_size) | |
21 | { | |
22 | LOG_PREFIX(RecordBatch::add_pending); | |
23 | auto new_size = get_encoded_length_after(record, block_size); | |
24 | auto dlength_offset = pending.size.dlength; | |
25 | TRACE("{} batches={}, write_size={}, dlength_offset={} ...", | |
26 | name, | |
27 | pending.get_size() + 1, | |
28 | new_size.get_encoded_length(), | |
29 | dlength_offset); | |
30 | assert(state != state_t::SUBMITTING); | |
31 | assert(evaluate_submit(record.size, block_size).submit_size == new_size); | |
32 | ||
33 | pending.push_back( | |
34 | std::move(record), block_size); | |
35 | assert(pending.size == new_size); | |
36 | if (state == state_t::EMPTY) { | |
37 | assert(!io_promise.has_value()); | |
38 | io_promise = seastar::shared_promise<maybe_promise_result_t>(); | |
39 | } else { | |
40 | assert(io_promise.has_value()); | |
41 | } | |
42 | state = state_t::PENDING; | |
43 | ||
44 | return io_promise->get_shared_future( | |
45 | ).then([dlength_offset, FNAME, &name | |
46 | ](auto maybe_promise_result) -> add_pending_ret { | |
47 | if (!maybe_promise_result.has_value()) { | |
48 | ERROR("{} write failed", name); | |
49 | return crimson::ct_error::input_output_error::make(); | |
50 | } | |
51 | auto write_result = maybe_promise_result->write_result; | |
52 | auto submit_result = record_locator_t{ | |
53 | write_result.start_seq.offset.add_offset( | |
54 | maybe_promise_result->mdlength + dlength_offset), | |
55 | write_result | |
56 | }; | |
57 | TRACE("{} write finish with {}", name, submit_result); | |
58 | return add_pending_ret( | |
59 | add_pending_ertr::ready_future_marker{}, | |
60 | submit_result); | |
61 | }); | |
62 | } | |
63 | ||
64 | std::pair<ceph::bufferlist, record_group_size_t> | |
65 | RecordBatch::encode_batch( | |
66 | const journal_seq_t& committed_to, | |
67 | segment_nonce_t segment_nonce) | |
68 | { | |
69 | assert(state == state_t::PENDING); | |
70 | assert(pending.get_size() > 0); | |
71 | assert(io_promise.has_value()); | |
72 | ||
73 | state = state_t::SUBMITTING; | |
74 | submitting_size = pending.get_size(); | |
75 | auto gsize = pending.size; | |
76 | submitting_length = gsize.get_encoded_length(); | |
77 | submitting_mdlength = gsize.get_mdlength(); | |
78 | auto bl = encode_records(pending, committed_to, segment_nonce); | |
79 | // Note: pending is cleared here | |
80 | assert(bl.length() == submitting_length); | |
81 | return std::make_pair(bl, gsize); | |
82 | } | |
83 | ||
84 | void RecordBatch::set_result( | |
85 | maybe_result_t maybe_write_result) | |
86 | { | |
87 | maybe_promise_result_t result; | |
88 | if (maybe_write_result.has_value()) { | |
89 | assert(maybe_write_result->length == submitting_length); | |
90 | result = promise_result_t{ | |
91 | *maybe_write_result, | |
92 | submitting_mdlength | |
93 | }; | |
94 | } | |
95 | assert(state == state_t::SUBMITTING); | |
96 | assert(io_promise.has_value()); | |
97 | ||
98 | state = state_t::EMPTY; | |
99 | submitting_size = 0; | |
100 | submitting_length = 0; | |
101 | submitting_mdlength = 0; | |
102 | io_promise->set_value(result); | |
103 | io_promise.reset(); | |
104 | } | |
105 | ||
106 | std::pair<ceph::bufferlist, record_group_size_t> | |
107 | RecordBatch::submit_pending_fast( | |
108 | record_t&& record, | |
109 | extent_len_t block_size, | |
110 | const journal_seq_t& committed_to, | |
111 | segment_nonce_t segment_nonce) | |
112 | { | |
113 | auto new_size = get_encoded_length_after(record, block_size); | |
114 | std::ignore = new_size; | |
115 | assert(state == state_t::EMPTY); | |
116 | assert(evaluate_submit(record.size, block_size).submit_size == new_size); | |
117 | ||
118 | auto group = record_group_t(std::move(record), block_size); | |
119 | auto size = group.size; | |
120 | assert(size == new_size); | |
121 | auto bl = encode_records(group, committed_to, segment_nonce); | |
122 | assert(bl.length() == size.get_encoded_length()); | |
123 | return std::make_pair(std::move(bl), size); | |
124 | } | |
125 | ||
126 | RecordSubmitter::RecordSubmitter( | |
127 | std::size_t io_depth, | |
128 | std::size_t batch_capacity, | |
129 | std::size_t batch_flush_size, | |
130 | double preferred_fullness, | |
131 | JournalAllocator& ja) | |
132 | : io_depth_limit{io_depth}, | |
133 | preferred_fullness{preferred_fullness}, | |
134 | journal_allocator{ja}, | |
135 | batches(new RecordBatch[io_depth + 1]) | |
136 | { | |
137 | LOG_PREFIX(RecordSubmitter); | |
138 | INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, " | |
139 | "preferred_fullness={}", | |
140 | get_name(), io_depth, batch_capacity, | |
141 | batch_flush_size, preferred_fullness); | |
142 | ceph_assert(io_depth > 0); | |
143 | ceph_assert(batch_capacity > 0); | |
144 | ceph_assert(preferred_fullness >= 0 && | |
145 | preferred_fullness <= 1); | |
146 | free_batch_ptrs.reserve(io_depth + 1); | |
147 | for (std::size_t i = 0; i <= io_depth; ++i) { | |
148 | batches[i].initialize(i, batch_capacity, batch_flush_size); | |
149 | free_batch_ptrs.push_back(&batches[i]); | |
150 | } | |
151 | pop_free_batch(); | |
152 | } | |
153 | ||
154 | bool RecordSubmitter::is_available() const | |
155 | { | |
156 | auto ret = !wait_available_promise.has_value() && | |
157 | !has_io_error; | |
158 | #ifndef NDEBUG | |
159 | if (ret) { | |
160 | // unconditional invariants | |
161 | ceph_assert(journal_allocator.can_write()); | |
162 | ceph_assert(p_current_batch != nullptr); | |
163 | ceph_assert(!p_current_batch->is_submitting()); | |
164 | // the current batch accepts a further write | |
165 | ceph_assert(!p_current_batch->needs_flush()); | |
166 | if (!p_current_batch->is_empty()) { | |
167 | auto submit_length = | |
168 | p_current_batch->get_submit_size().get_encoded_length(); | |
169 | ceph_assert(!journal_allocator.needs_roll(submit_length)); | |
170 | } | |
171 | // I'm not rolling | |
172 | } | |
173 | #endif | |
174 | return ret; | |
175 | } | |
176 | ||
177 | RecordSubmitter::wa_ertr::future<> | |
178 | RecordSubmitter::wait_available() | |
179 | { | |
180 | LOG_PREFIX(RecordSubmitter::wait_available); | |
181 | assert(!is_available()); | |
182 | if (has_io_error) { | |
183 | ERROR("{} I/O is failed before wait", get_name()); | |
184 | return crimson::ct_error::input_output_error::make(); | |
185 | } | |
186 | return wait_available_promise->get_shared_future( | |
187 | ).then([FNAME, this]() -> wa_ertr::future<> { | |
188 | if (has_io_error) { | |
189 | ERROR("{} I/O is failed after wait", get_name()); | |
190 | return crimson::ct_error::input_output_error::make(); | |
191 | } | |
192 | return wa_ertr::now(); | |
193 | }); | |
194 | } | |
195 | ||
196 | RecordSubmitter::action_t | |
197 | RecordSubmitter::check_action( | |
198 | const record_size_t& rsize) const | |
199 | { | |
200 | assert(is_available()); | |
201 | auto eval = p_current_batch->evaluate_submit( | |
202 | rsize, journal_allocator.get_block_size()); | |
203 | if (journal_allocator.needs_roll(eval.submit_size.get_encoded_length())) { | |
204 | return action_t::ROLL; | |
205 | } else if (eval.is_full) { | |
206 | return action_t::SUBMIT_FULL; | |
207 | } else { | |
208 | return action_t::SUBMIT_NOT_FULL; | |
209 | } | |
210 | } | |
211 | ||
212 | RecordSubmitter::roll_segment_ertr::future<> | |
213 | RecordSubmitter::roll_segment() | |
214 | { | |
215 | LOG_PREFIX(RecordSubmitter::roll_segment); | |
216 | ceph_assert(p_current_batch->needs_flush() || | |
217 | is_available()); | |
218 | // #1 block concurrent submissions due to rolling | |
219 | wait_available_promise = seastar::shared_promise<>(); | |
220 | ceph_assert(!wait_unfull_flush_promise.has_value()); | |
221 | return [FNAME, this] { | |
222 | if (p_current_batch->is_pending()) { | |
223 | if (state == state_t::FULL) { | |
224 | DEBUG("{} wait flush ...", get_name()); | |
225 | wait_unfull_flush_promise = seastar::promise<>(); | |
226 | return wait_unfull_flush_promise->get_future(); | |
227 | } else { // IDLE/PENDING | |
228 | DEBUG("{} flush", get_name()); | |
229 | flush_current_batch(); | |
230 | return seastar::now(); | |
231 | } | |
232 | } else { | |
233 | assert(p_current_batch->is_empty()); | |
234 | return seastar::now(); | |
235 | } | |
236 | }().then_wrapped([FNAME, this](auto fut) { | |
237 | if (fut.failed()) { | |
238 | ERROR("{} rolling is skipped unexpectedly, available", get_name()); | |
239 | has_io_error = true; | |
240 | wait_available_promise->set_value(); | |
241 | wait_available_promise.reset(); | |
242 | return roll_segment_ertr::now(); | |
243 | } else { | |
244 | // start rolling in background | |
245 | std::ignore = journal_allocator.roll( | |
246 | ).safe_then([FNAME, this] { | |
247 | // good | |
248 | DEBUG("{} rolling done, available", get_name()); | |
249 | assert(!has_io_error); | |
250 | wait_available_promise->set_value(); | |
251 | wait_available_promise.reset(); | |
252 | }).handle_error( | |
253 | crimson::ct_error::all_same_way([FNAME, this](auto e) { | |
254 | ERROR("{} got error {}, available", get_name(), e); | |
255 | has_io_error = true; | |
256 | wait_available_promise->set_value(); | |
257 | wait_available_promise.reset(); | |
258 | }) | |
259 | ).handle_exception([FNAME, this](auto e) { | |
260 | ERROR("{} got exception {}, available", get_name(), e); | |
261 | has_io_error = true; | |
262 | wait_available_promise->set_value(); | |
263 | wait_available_promise.reset(); | |
264 | }); | |
265 | // wait for background rolling | |
266 | return wait_available(); | |
267 | } | |
268 | }); | |
269 | } | |
270 | ||
271 | RecordSubmitter::submit_ret | |
272 | RecordSubmitter::submit( | |
273 | record_t&& record, | |
274 | bool with_atomic_roll_segment) | |
275 | { | |
276 | LOG_PREFIX(RecordSubmitter::submit); | |
277 | ceph_assert(is_available()); | |
278 | assert(check_action(record.size) != action_t::ROLL); | |
279 | journal_allocator.update_modify_time(record); | |
280 | auto eval = p_current_batch->evaluate_submit( | |
281 | record.size, journal_allocator.get_block_size()); | |
282 | bool needs_flush = ( | |
283 | state == state_t::IDLE || | |
284 | eval.submit_size.get_fullness() > preferred_fullness || | |
285 | // RecordBatch::needs_flush() | |
286 | eval.is_full || | |
287 | p_current_batch->get_num_records() + 1 >= | |
288 | p_current_batch->get_batch_capacity()); | |
289 | if (p_current_batch->is_empty() && | |
290 | needs_flush && | |
291 | state != state_t::FULL) { | |
292 | // fast path with direct write | |
293 | increment_io(); | |
294 | auto [to_write, sizes] = p_current_batch->submit_pending_fast( | |
295 | std::move(record), | |
296 | journal_allocator.get_block_size(), | |
297 | get_committed_to(), | |
298 | journal_allocator.get_nonce()); | |
299 | DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...", | |
300 | get_name(), sizes, get_committed_to(), num_outstanding_io); | |
301 | account_submission(1, sizes); | |
302 | return journal_allocator.write(std::move(to_write) | |
303 | ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { | |
304 | return record_locator_t{ | |
305 | write_result.start_seq.offset.add_offset(mdlength), | |
306 | write_result | |
307 | }; | |
308 | }).finally([this] { | |
309 | decrement_io_with_flush(); | |
310 | }); | |
311 | } | |
312 | // indirect batched write | |
313 | auto write_fut = p_current_batch->add_pending( | |
314 | get_name(), | |
315 | std::move(record), | |
316 | journal_allocator.get_block_size()); | |
317 | if (needs_flush) { | |
318 | if (state == state_t::FULL) { | |
319 | // #2 block concurrent submissions due to lack of resource | |
320 | DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...", | |
321 | get_name(), | |
322 | p_current_batch->get_num_records(), | |
323 | num_outstanding_io); | |
324 | if (with_atomic_roll_segment) { | |
325 | // wait_available_promise and wait_unfull_flush_promise | |
326 | // need to be delegated to the follow-up atomic roll_segment(); | |
327 | assert(p_current_batch->is_pending()); | |
328 | } else { | |
329 | wait_available_promise = seastar::shared_promise<>(); | |
330 | ceph_assert(!wait_unfull_flush_promise.has_value()); | |
331 | wait_unfull_flush_promise = seastar::promise<>(); | |
332 | // flush and mark available in background | |
333 | std::ignore = wait_unfull_flush_promise->get_future( | |
334 | ).finally([FNAME, this] { | |
335 | DEBUG("{} flush done, available", get_name()); | |
336 | wait_available_promise->set_value(); | |
337 | wait_available_promise.reset(); | |
338 | }); | |
339 | } | |
340 | } else { | |
341 | DEBUG("{} added pending, flush", get_name()); | |
342 | flush_current_batch(); | |
343 | } | |
344 | } else { | |
345 | // will flush later | |
346 | DEBUG("{} added with {} pending, outstanding_io={}", | |
347 | get_name(), | |
348 | p_current_batch->get_num_records(), | |
349 | num_outstanding_io); | |
350 | assert(!p_current_batch->needs_flush()); | |
351 | } | |
352 | return write_fut; | |
353 | } | |
354 | ||
355 | RecordSubmitter::open_ret | |
356 | RecordSubmitter::open(bool is_mkfs) | |
357 | { | |
358 | return journal_allocator.open(is_mkfs | |
359 | ).safe_then([this](journal_seq_t ret) { | |
360 | LOG_PREFIX(RecordSubmitter::open); | |
361 | DEBUG("{} register metrics", get_name()); | |
362 | stats = {}; | |
363 | namespace sm = seastar::metrics; | |
364 | std::vector<sm::label_instance> label_instances; | |
365 | label_instances.push_back(sm::label_instance("submitter", get_name())); | |
366 | metrics.add_group( | |
367 | "journal", | |
368 | { | |
369 | sm::make_counter( | |
370 | "record_num", | |
371 | stats.record_batch_stats.num_io, | |
372 | sm::description("total number of records submitted"), | |
373 | label_instances | |
374 | ), | |
375 | sm::make_counter( | |
376 | "record_batch_num", | |
377 | stats.record_batch_stats.num_io_grouped, | |
378 | sm::description("total number of records batched"), | |
379 | label_instances | |
380 | ), | |
381 | sm::make_counter( | |
382 | "io_num", | |
383 | stats.io_depth_stats.num_io, | |
384 | sm::description("total number of io submitted"), | |
385 | label_instances | |
386 | ), | |
387 | sm::make_counter( | |
388 | "io_depth_num", | |
389 | stats.io_depth_stats.num_io_grouped, | |
390 | sm::description("total number of io depth"), | |
391 | label_instances | |
392 | ), | |
393 | sm::make_counter( | |
394 | "record_group_padding_bytes", | |
395 | stats.record_group_padding_bytes, | |
396 | sm::description("bytes of metadata padding when write record groups"), | |
397 | label_instances | |
398 | ), | |
399 | sm::make_counter( | |
400 | "record_group_metadata_bytes", | |
401 | stats.record_group_metadata_bytes, | |
402 | sm::description("bytes of raw metadata when write record groups"), | |
403 | label_instances | |
404 | ), | |
405 | sm::make_counter( | |
406 | "record_group_data_bytes", | |
407 | stats.record_group_data_bytes, | |
408 | sm::description("bytes of data when write record groups"), | |
409 | label_instances | |
410 | ), | |
411 | } | |
412 | ); | |
413 | return ret; | |
414 | }); | |
415 | } | |
416 | ||
417 | RecordSubmitter::close_ertr::future<> | |
418 | RecordSubmitter::close() | |
419 | { | |
420 | committed_to = JOURNAL_SEQ_NULL; | |
421 | ceph_assert(state == state_t::IDLE); | |
422 | ceph_assert(num_outstanding_io == 0); | |
423 | ceph_assert(p_current_batch != nullptr); | |
424 | ceph_assert(p_current_batch->is_empty()); | |
425 | ceph_assert(!wait_available_promise.has_value()); | |
426 | has_io_error = false; | |
427 | ceph_assert(!wait_unfull_flush_promise.has_value()); | |
428 | metrics.clear(); | |
429 | return journal_allocator.close(); | |
430 | } | |
431 | ||
432 | void RecordSubmitter::update_state() | |
433 | { | |
434 | if (num_outstanding_io == 0) { | |
435 | state = state_t::IDLE; | |
436 | } else if (num_outstanding_io < io_depth_limit) { | |
437 | state = state_t::PENDING; | |
438 | } else if (num_outstanding_io == io_depth_limit) { | |
439 | state = state_t::FULL; | |
440 | } else { | |
441 | ceph_abort("fatal error: io-depth overflow"); | |
442 | } | |
443 | } | |
444 | ||
445 | void RecordSubmitter::decrement_io_with_flush() | |
446 | { | |
447 | LOG_PREFIX(RecordSubmitter::decrement_io_with_flush); | |
448 | assert(num_outstanding_io > 0); | |
449 | auto prv_state = state; | |
450 | --num_outstanding_io; | |
451 | update_state(); | |
452 | ||
453 | if (prv_state == state_t::FULL) { | |
454 | if (wait_unfull_flush_promise.has_value()) { | |
455 | DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name()); | |
456 | assert(!p_current_batch->is_empty()); | |
457 | assert(wait_available_promise.has_value()); | |
458 | flush_current_batch(); | |
459 | wait_unfull_flush_promise->set_value(); | |
460 | wait_unfull_flush_promise.reset(); | |
461 | return; | |
462 | } | |
463 | } else { | |
464 | ceph_assert(!wait_unfull_flush_promise.has_value()); | |
465 | } | |
466 | ||
467 | auto needs_flush = ( | |
468 | !p_current_batch->is_empty() && ( | |
469 | state == state_t::IDLE || | |
470 | p_current_batch->get_submit_size().get_fullness() > preferred_fullness || | |
471 | p_current_batch->needs_flush() | |
472 | )); | |
473 | if (needs_flush) { | |
474 | DEBUG("{} flush", get_name()); | |
475 | flush_current_batch(); | |
476 | } | |
477 | } | |
478 | ||
479 | void RecordSubmitter::account_submission( | |
480 | std::size_t num, | |
481 | const record_group_size_t& size) | |
482 | { | |
483 | stats.record_group_padding_bytes += | |
484 | (size.get_mdlength() - size.get_raw_mdlength()); | |
485 | stats.record_group_metadata_bytes += size.get_raw_mdlength(); | |
486 | stats.record_group_data_bytes += size.dlength; | |
487 | stats.record_batch_stats.increment(num); | |
488 | } | |
489 | ||
490 | void RecordSubmitter::finish_submit_batch( | |
491 | RecordBatch* p_batch, | |
492 | maybe_result_t maybe_result) | |
493 | { | |
494 | assert(p_batch->is_submitting()); | |
495 | p_batch->set_result(maybe_result); | |
496 | free_batch_ptrs.push_back(p_batch); | |
497 | decrement_io_with_flush(); | |
498 | } | |
499 | ||
500 | void RecordSubmitter::flush_current_batch() | |
501 | { | |
502 | LOG_PREFIX(RecordSubmitter::flush_current_batch); | |
503 | RecordBatch* p_batch = p_current_batch; | |
504 | assert(p_batch->is_pending()); | |
505 | p_current_batch = nullptr; | |
506 | pop_free_batch(); | |
507 | ||
508 | increment_io(); | |
509 | auto num = p_batch->get_num_records(); | |
510 | auto [to_write, sizes] = p_batch->encode_batch( | |
511 | get_committed_to(), journal_allocator.get_nonce()); | |
512 | DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...", | |
513 | get_name(), num, sizes, get_committed_to(), num_outstanding_io); | |
514 | account_submission(num, sizes); | |
515 | std::ignore = journal_allocator.write(std::move(to_write) | |
516 | ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { | |
517 | TRACE("{} {} records, {}, write done with {}", | |
518 | get_name(), num, sizes, write_result); | |
519 | finish_submit_batch(p_batch, write_result); | |
520 | }).handle_error( | |
521 | crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) { | |
522 | ERROR("{} {} records, {}, got error {}", | |
523 | get_name(), num, sizes, e); | |
524 | finish_submit_batch(p_batch, std::nullopt); | |
525 | }) | |
526 | ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) { | |
527 | ERROR("{} {} records, {}, got exception {}", | |
528 | get_name(), num, sizes, e); | |
529 | finish_submit_batch(p_batch, std::nullopt); | |
530 | }); | |
531 | } | |
532 | ||
533 | } |