]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/os/seastore/journal/record_submitter.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / os / seastore / journal / record_submitter.cc
CommitLineData
1e59de90
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2// vim: ts=8 sw=2 smarttab expandtab
3
4#include "record_submitter.h"
5
6#include <fmt/format.h>
7#include <fmt/os.h>
8
9#include "crimson/os/seastore/logging.h"
10#include "crimson/os/seastore/async_cleaner.h"
11
12SET_SUBSYS(seastore_journal);
13
14namespace crimson::os::seastore::journal {
15
16RecordBatch::add_pending_ret
17RecordBatch::add_pending(
18 const std::string& name,
19 record_t&& record,
20 extent_len_t block_size)
21{
22 LOG_PREFIX(RecordBatch::add_pending);
23 auto new_size = get_encoded_length_after(record, block_size);
24 auto dlength_offset = pending.size.dlength;
25 TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
26 name,
27 pending.get_size() + 1,
28 new_size.get_encoded_length(),
29 dlength_offset);
30 assert(state != state_t::SUBMITTING);
31 assert(evaluate_submit(record.size, block_size).submit_size == new_size);
32
33 pending.push_back(
34 std::move(record), block_size);
35 assert(pending.size == new_size);
36 if (state == state_t::EMPTY) {
37 assert(!io_promise.has_value());
38 io_promise = seastar::shared_promise<maybe_promise_result_t>();
39 } else {
40 assert(io_promise.has_value());
41 }
42 state = state_t::PENDING;
43
44 return io_promise->get_shared_future(
45 ).then([dlength_offset, FNAME, &name
46 ](auto maybe_promise_result) -> add_pending_ret {
47 if (!maybe_promise_result.has_value()) {
48 ERROR("{} write failed", name);
49 return crimson::ct_error::input_output_error::make();
50 }
51 auto write_result = maybe_promise_result->write_result;
52 auto submit_result = record_locator_t{
53 write_result.start_seq.offset.add_offset(
54 maybe_promise_result->mdlength + dlength_offset),
55 write_result
56 };
57 TRACE("{} write finish with {}", name, submit_result);
58 return add_pending_ret(
59 add_pending_ertr::ready_future_marker{},
60 submit_result);
61 });
62}
63
64std::pair<ceph::bufferlist, record_group_size_t>
65RecordBatch::encode_batch(
66 const journal_seq_t& committed_to,
67 segment_nonce_t segment_nonce)
68{
69 assert(state == state_t::PENDING);
70 assert(pending.get_size() > 0);
71 assert(io_promise.has_value());
72
73 state = state_t::SUBMITTING;
74 submitting_size = pending.get_size();
75 auto gsize = pending.size;
76 submitting_length = gsize.get_encoded_length();
77 submitting_mdlength = gsize.get_mdlength();
78 auto bl = encode_records(pending, committed_to, segment_nonce);
79 // Note: pending is cleared here
80 assert(bl.length() == submitting_length);
81 return std::make_pair(bl, gsize);
82}
83
84void RecordBatch::set_result(
85 maybe_result_t maybe_write_result)
86{
87 maybe_promise_result_t result;
88 if (maybe_write_result.has_value()) {
89 assert(maybe_write_result->length == submitting_length);
90 result = promise_result_t{
91 *maybe_write_result,
92 submitting_mdlength
93 };
94 }
95 assert(state == state_t::SUBMITTING);
96 assert(io_promise.has_value());
97
98 state = state_t::EMPTY;
99 submitting_size = 0;
100 submitting_length = 0;
101 submitting_mdlength = 0;
102 io_promise->set_value(result);
103 io_promise.reset();
104}
105
106std::pair<ceph::bufferlist, record_group_size_t>
107RecordBatch::submit_pending_fast(
108 record_t&& record,
109 extent_len_t block_size,
110 const journal_seq_t& committed_to,
111 segment_nonce_t segment_nonce)
112{
113 auto new_size = get_encoded_length_after(record, block_size);
114 std::ignore = new_size;
115 assert(state == state_t::EMPTY);
116 assert(evaluate_submit(record.size, block_size).submit_size == new_size);
117
118 auto group = record_group_t(std::move(record), block_size);
119 auto size = group.size;
120 assert(size == new_size);
121 auto bl = encode_records(group, committed_to, segment_nonce);
122 assert(bl.length() == size.get_encoded_length());
123 return std::make_pair(std::move(bl), size);
124}
125
126RecordSubmitter::RecordSubmitter(
127 std::size_t io_depth,
128 std::size_t batch_capacity,
129 std::size_t batch_flush_size,
130 double preferred_fullness,
131 JournalAllocator& ja)
132 : io_depth_limit{io_depth},
133 preferred_fullness{preferred_fullness},
134 journal_allocator{ja},
135 batches(new RecordBatch[io_depth + 1])
136{
137 LOG_PREFIX(RecordSubmitter);
138 INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
139 "preferred_fullness={}",
140 get_name(), io_depth, batch_capacity,
141 batch_flush_size, preferred_fullness);
142 ceph_assert(io_depth > 0);
143 ceph_assert(batch_capacity > 0);
144 ceph_assert(preferred_fullness >= 0 &&
145 preferred_fullness <= 1);
146 free_batch_ptrs.reserve(io_depth + 1);
147 for (std::size_t i = 0; i <= io_depth; ++i) {
148 batches[i].initialize(i, batch_capacity, batch_flush_size);
149 free_batch_ptrs.push_back(&batches[i]);
150 }
151 pop_free_batch();
152}
153
154bool RecordSubmitter::is_available() const
155{
156 auto ret = !wait_available_promise.has_value() &&
157 !has_io_error;
158#ifndef NDEBUG
159 if (ret) {
160 // unconditional invariants
161 ceph_assert(journal_allocator.can_write());
162 ceph_assert(p_current_batch != nullptr);
163 ceph_assert(!p_current_batch->is_submitting());
164 // the current batch accepts a further write
165 ceph_assert(!p_current_batch->needs_flush());
166 if (!p_current_batch->is_empty()) {
167 auto submit_length =
168 p_current_batch->get_submit_size().get_encoded_length();
169 ceph_assert(!journal_allocator.needs_roll(submit_length));
170 }
171 // I'm not rolling
172 }
173#endif
174 return ret;
175}
176
177RecordSubmitter::wa_ertr::future<>
178RecordSubmitter::wait_available()
179{
180 LOG_PREFIX(RecordSubmitter::wait_available);
181 assert(!is_available());
182 if (has_io_error) {
183 ERROR("{} I/O is failed before wait", get_name());
184 return crimson::ct_error::input_output_error::make();
185 }
186 return wait_available_promise->get_shared_future(
187 ).then([FNAME, this]() -> wa_ertr::future<> {
188 if (has_io_error) {
189 ERROR("{} I/O is failed after wait", get_name());
190 return crimson::ct_error::input_output_error::make();
191 }
192 return wa_ertr::now();
193 });
194}
195
196RecordSubmitter::action_t
197RecordSubmitter::check_action(
198 const record_size_t& rsize) const
199{
200 assert(is_available());
201 auto eval = p_current_batch->evaluate_submit(
202 rsize, journal_allocator.get_block_size());
203 if (journal_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
204 return action_t::ROLL;
205 } else if (eval.is_full) {
206 return action_t::SUBMIT_FULL;
207 } else {
208 return action_t::SUBMIT_NOT_FULL;
209 }
210}
211
212RecordSubmitter::roll_segment_ertr::future<>
213RecordSubmitter::roll_segment()
214{
215 LOG_PREFIX(RecordSubmitter::roll_segment);
216 ceph_assert(p_current_batch->needs_flush() ||
217 is_available());
218 // #1 block concurrent submissions due to rolling
219 wait_available_promise = seastar::shared_promise<>();
220 ceph_assert(!wait_unfull_flush_promise.has_value());
221 return [FNAME, this] {
222 if (p_current_batch->is_pending()) {
223 if (state == state_t::FULL) {
224 DEBUG("{} wait flush ...", get_name());
225 wait_unfull_flush_promise = seastar::promise<>();
226 return wait_unfull_flush_promise->get_future();
227 } else { // IDLE/PENDING
228 DEBUG("{} flush", get_name());
229 flush_current_batch();
230 return seastar::now();
231 }
232 } else {
233 assert(p_current_batch->is_empty());
234 return seastar::now();
235 }
236 }().then_wrapped([FNAME, this](auto fut) {
237 if (fut.failed()) {
238 ERROR("{} rolling is skipped unexpectedly, available", get_name());
239 has_io_error = true;
240 wait_available_promise->set_value();
241 wait_available_promise.reset();
242 return roll_segment_ertr::now();
243 } else {
244 // start rolling in background
245 std::ignore = journal_allocator.roll(
246 ).safe_then([FNAME, this] {
247 // good
248 DEBUG("{} rolling done, available", get_name());
249 assert(!has_io_error);
250 wait_available_promise->set_value();
251 wait_available_promise.reset();
252 }).handle_error(
253 crimson::ct_error::all_same_way([FNAME, this](auto e) {
254 ERROR("{} got error {}, available", get_name(), e);
255 has_io_error = true;
256 wait_available_promise->set_value();
257 wait_available_promise.reset();
258 })
259 ).handle_exception([FNAME, this](auto e) {
260 ERROR("{} got exception {}, available", get_name(), e);
261 has_io_error = true;
262 wait_available_promise->set_value();
263 wait_available_promise.reset();
264 });
265 // wait for background rolling
266 return wait_available();
267 }
268 });
269}
270
271RecordSubmitter::submit_ret
272RecordSubmitter::submit(
273 record_t&& record,
274 bool with_atomic_roll_segment)
275{
276 LOG_PREFIX(RecordSubmitter::submit);
277 ceph_assert(is_available());
278 assert(check_action(record.size) != action_t::ROLL);
279 journal_allocator.update_modify_time(record);
280 auto eval = p_current_batch->evaluate_submit(
281 record.size, journal_allocator.get_block_size());
282 bool needs_flush = (
283 state == state_t::IDLE ||
284 eval.submit_size.get_fullness() > preferred_fullness ||
285 // RecordBatch::needs_flush()
286 eval.is_full ||
287 p_current_batch->get_num_records() + 1 >=
288 p_current_batch->get_batch_capacity());
289 if (p_current_batch->is_empty() &&
290 needs_flush &&
291 state != state_t::FULL) {
292 // fast path with direct write
293 increment_io();
294 auto [to_write, sizes] = p_current_batch->submit_pending_fast(
295 std::move(record),
296 journal_allocator.get_block_size(),
297 get_committed_to(),
298 journal_allocator.get_nonce());
299 DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
300 get_name(), sizes, get_committed_to(), num_outstanding_io);
301 account_submission(1, sizes);
302 return journal_allocator.write(std::move(to_write)
303 ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
304 return record_locator_t{
305 write_result.start_seq.offset.add_offset(mdlength),
306 write_result
307 };
308 }).finally([this] {
309 decrement_io_with_flush();
310 });
311 }
312 // indirect batched write
313 auto write_fut = p_current_batch->add_pending(
314 get_name(),
315 std::move(record),
316 journal_allocator.get_block_size());
317 if (needs_flush) {
318 if (state == state_t::FULL) {
319 // #2 block concurrent submissions due to lack of resource
320 DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
321 get_name(),
322 p_current_batch->get_num_records(),
323 num_outstanding_io);
324 if (with_atomic_roll_segment) {
325 // wait_available_promise and wait_unfull_flush_promise
326 // need to be delegated to the follow-up atomic roll_segment();
327 assert(p_current_batch->is_pending());
328 } else {
329 wait_available_promise = seastar::shared_promise<>();
330 ceph_assert(!wait_unfull_flush_promise.has_value());
331 wait_unfull_flush_promise = seastar::promise<>();
332 // flush and mark available in background
333 std::ignore = wait_unfull_flush_promise->get_future(
334 ).finally([FNAME, this] {
335 DEBUG("{} flush done, available", get_name());
336 wait_available_promise->set_value();
337 wait_available_promise.reset();
338 });
339 }
340 } else {
341 DEBUG("{} added pending, flush", get_name());
342 flush_current_batch();
343 }
344 } else {
345 // will flush later
346 DEBUG("{} added with {} pending, outstanding_io={}",
347 get_name(),
348 p_current_batch->get_num_records(),
349 num_outstanding_io);
350 assert(!p_current_batch->needs_flush());
351 }
352 return write_fut;
353}
354
355RecordSubmitter::open_ret
356RecordSubmitter::open(bool is_mkfs)
357{
358 return journal_allocator.open(is_mkfs
359 ).safe_then([this](journal_seq_t ret) {
360 LOG_PREFIX(RecordSubmitter::open);
361 DEBUG("{} register metrics", get_name());
362 stats = {};
363 namespace sm = seastar::metrics;
364 std::vector<sm::label_instance> label_instances;
365 label_instances.push_back(sm::label_instance("submitter", get_name()));
366 metrics.add_group(
367 "journal",
368 {
369 sm::make_counter(
370 "record_num",
371 stats.record_batch_stats.num_io,
372 sm::description("total number of records submitted"),
373 label_instances
374 ),
375 sm::make_counter(
376 "record_batch_num",
377 stats.record_batch_stats.num_io_grouped,
378 sm::description("total number of records batched"),
379 label_instances
380 ),
381 sm::make_counter(
382 "io_num",
383 stats.io_depth_stats.num_io,
384 sm::description("total number of io submitted"),
385 label_instances
386 ),
387 sm::make_counter(
388 "io_depth_num",
389 stats.io_depth_stats.num_io_grouped,
390 sm::description("total number of io depth"),
391 label_instances
392 ),
393 sm::make_counter(
394 "record_group_padding_bytes",
395 stats.record_group_padding_bytes,
396 sm::description("bytes of metadata padding when write record groups"),
397 label_instances
398 ),
399 sm::make_counter(
400 "record_group_metadata_bytes",
401 stats.record_group_metadata_bytes,
402 sm::description("bytes of raw metadata when write record groups"),
403 label_instances
404 ),
405 sm::make_counter(
406 "record_group_data_bytes",
407 stats.record_group_data_bytes,
408 sm::description("bytes of data when write record groups"),
409 label_instances
410 ),
411 }
412 );
413 return ret;
414 });
415}
416
417RecordSubmitter::close_ertr::future<>
418RecordSubmitter::close()
419{
420 committed_to = JOURNAL_SEQ_NULL;
421 ceph_assert(state == state_t::IDLE);
422 ceph_assert(num_outstanding_io == 0);
423 ceph_assert(p_current_batch != nullptr);
424 ceph_assert(p_current_batch->is_empty());
425 ceph_assert(!wait_available_promise.has_value());
426 has_io_error = false;
427 ceph_assert(!wait_unfull_flush_promise.has_value());
428 metrics.clear();
429 return journal_allocator.close();
430}
431
432void RecordSubmitter::update_state()
433{
434 if (num_outstanding_io == 0) {
435 state = state_t::IDLE;
436 } else if (num_outstanding_io < io_depth_limit) {
437 state = state_t::PENDING;
438 } else if (num_outstanding_io == io_depth_limit) {
439 state = state_t::FULL;
440 } else {
441 ceph_abort("fatal error: io-depth overflow");
442 }
443}
444
445void RecordSubmitter::decrement_io_with_flush()
446{
447 LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
448 assert(num_outstanding_io > 0);
449 auto prv_state = state;
450 --num_outstanding_io;
451 update_state();
452
453 if (prv_state == state_t::FULL) {
454 if (wait_unfull_flush_promise.has_value()) {
455 DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
456 assert(!p_current_batch->is_empty());
457 assert(wait_available_promise.has_value());
458 flush_current_batch();
459 wait_unfull_flush_promise->set_value();
460 wait_unfull_flush_promise.reset();
461 return;
462 }
463 } else {
464 ceph_assert(!wait_unfull_flush_promise.has_value());
465 }
466
467 auto needs_flush = (
468 !p_current_batch->is_empty() && (
469 state == state_t::IDLE ||
470 p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
471 p_current_batch->needs_flush()
472 ));
473 if (needs_flush) {
474 DEBUG("{} flush", get_name());
475 flush_current_batch();
476 }
477}
478
479void RecordSubmitter::account_submission(
480 std::size_t num,
481 const record_group_size_t& size)
482{
483 stats.record_group_padding_bytes +=
484 (size.get_mdlength() - size.get_raw_mdlength());
485 stats.record_group_metadata_bytes += size.get_raw_mdlength();
486 stats.record_group_data_bytes += size.dlength;
487 stats.record_batch_stats.increment(num);
488}
489
490void RecordSubmitter::finish_submit_batch(
491 RecordBatch* p_batch,
492 maybe_result_t maybe_result)
493{
494 assert(p_batch->is_submitting());
495 p_batch->set_result(maybe_result);
496 free_batch_ptrs.push_back(p_batch);
497 decrement_io_with_flush();
498}
499
500void RecordSubmitter::flush_current_batch()
501{
502 LOG_PREFIX(RecordSubmitter::flush_current_batch);
503 RecordBatch* p_batch = p_current_batch;
504 assert(p_batch->is_pending());
505 p_current_batch = nullptr;
506 pop_free_batch();
507
508 increment_io();
509 auto num = p_batch->get_num_records();
510 auto [to_write, sizes] = p_batch->encode_batch(
511 get_committed_to(), journal_allocator.get_nonce());
512 DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
513 get_name(), num, sizes, get_committed_to(), num_outstanding_io);
514 account_submission(num, sizes);
515 std::ignore = journal_allocator.write(std::move(to_write)
516 ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
517 TRACE("{} {} records, {}, write done with {}",
518 get_name(), num, sizes, write_result);
519 finish_submit_batch(p_batch, write_result);
520 }).handle_error(
521 crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
522 ERROR("{} {} records, {}, got error {}",
523 get_name(), num, sizes, e);
524 finish_submit_batch(p_batch, std::nullopt);
525 })
526 ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
527 ERROR("{} {} records, {}, got exception {}",
528 get_name(), num, sizes, e);
529 finish_submit_batch(p_batch, std::nullopt);
530 });
531}
532
533}