1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/intrusive_ptr.hpp>
9 #include <seastar/core/circular_buffer.hh>
10 #include <seastar/core/future.hh>
11 #include <seastar/core/metrics.hh>
12 #include <seastar/core/shared_future.hh>
14 #include "include/ceph_assert.h"
15 #include "include/buffer.h"
16 #include "include/denc.h"
18 #include "crimson/common/log.h"
19 #include "crimson/os/seastore/extent_reader.h"
20 #include "crimson/os/seastore/segment_manager.h"
21 #include "crimson/os/seastore/ordering_handle.h"
22 #include "crimson/os/seastore/seastore_types.h"
23 #include "crimson/osd/exceptions.h"
25 namespace crimson::os::seastore
{
27 class SegmentProvider
;
28 class SegmentedAllocator
;
31 * Manages stream of atomically written records to a SegmentManager.
35 Journal(SegmentManager
&segment_manager
, ExtentReader
& scanner
);
38 * Gets the current journal segment sequence.
40 segment_seq_t
get_segment_seq() const {
41 return journal_segment_manager
.get_segment_seq();
45 * Sets the SegmentProvider.
47 * Not provided in constructor to allow the provider to not own
48 * or construct the Journal (TransactionManager).
50 * Note, Journal does not own this ptr, user must ensure that
51 * *provider outlives Journal.
53 void set_segment_provider(SegmentProvider
*provider
) {
54 segment_provider
= provider
;
55 journal_segment_manager
.set_segment_provider(provider
);
59 * initializes journal for new writes -- must run prior to calls
60 * to submit_record. Should be called after replay if not a new
63 using open_for_write_ertr
= crimson::errorator
<
64 crimson::ct_error::input_output_error
66 using open_for_write_ret
= open_for_write_ertr::future
<journal_seq_t
>;
67 open_for_write_ret
open_for_write() {
68 return journal_segment_manager
.open();
74 * TODO: should probably flush and disallow further writes
76 using close_ertr
= crimson::errorator
<
77 crimson::ct_error::input_output_error
>;
78 close_ertr::future
<> close() {
80 return journal_segment_manager
.close();
86 * write record with the ordering handle
88 using submit_record_ertr
= crimson::errorator
<
89 crimson::ct_error::erange
,
90 crimson::ct_error::input_output_error
92 using submit_record_ret
= submit_record_ertr::future
<
95 submit_record_ret
submit_record(
97 OrderingHandle
&handle
99 return record_submitter
.submit(std::move(record
), handle
);
103 * Read deltas and pass to delta_handler
105 * record_block_start (argument to delta_handler) is the start of the
106 * of the first block in the record
108 using replay_ertr
= SegmentManager::read_ertr
;
109 using replay_ret
= replay_ertr::future
<>;
110 using delta_handler_t
= std::function
<
111 replay_ret(const record_locator_t
&,
112 const delta_info_t
&)>;
114 std::vector
<std::pair
<segment_id_t
, segment_header_t
>>&& segment_headers
,
115 delta_handler_t
&&delta_handler
);
117 void set_write_pipeline(WritePipeline
* write_pipeline
) {
118 record_submitter
.set_write_pipeline(write_pipeline
);
122 class JournalSegmentManager
{
124 JournalSegmentManager(SegmentManager
&);
126 using base_ertr
= crimson::errorator
<
127 crimson::ct_error::input_output_error
>;
128 extent_len_t
get_max_write_length() const {
129 return segment_manager
.get_segment_size() -
130 p2align(ceph::encoded_sizeof_bounded
<segment_header_t
>(),
131 size_t(segment_manager
.get_block_size()));
134 segment_off_t
get_block_size() const {
135 return segment_manager
.get_block_size();
138 segment_nonce_t
get_nonce() const {
139 return current_segment_nonce
;
142 journal_seq_t
get_committed_to() const {
146 segment_seq_t
get_segment_seq() const {
147 return next_journal_segment_seq
- 1;
150 void set_segment_provider(SegmentProvider
* provider
) {
151 segment_provider
= provider
;
154 void set_segment_seq(segment_seq_t current_seq
) {
155 next_journal_segment_seq
= (current_seq
+ 1);
158 using open_ertr
= base_ertr
;
159 using open_ret
= open_ertr::future
<journal_seq_t
>;
161 return roll().safe_then([this] {
162 return get_current_write_seq();
166 using close_ertr
= base_ertr
;
167 close_ertr::future
<> close();
169 // returns true iff the current segment has insufficient space
170 bool needs_roll(std::size_t length
) const {
171 auto write_capacity
= current_journal_segment
->get_write_capacity();
172 return length
+ written_to
> std::size_t(write_capacity
);
175 // close the current segment and initialize next one
176 using roll_ertr
= base_ertr
;
177 roll_ertr::future
<> roll();
179 // write the buffer, return the write result
180 // May be called concurrently, writes may complete in any order.
181 using write_ertr
= base_ertr
;
182 using write_ret
= write_ertr::future
<write_result_t
>;
183 write_ret
write(ceph::bufferlist to_write
);
185 // mark write committed in order
186 void mark_committed(const journal_seq_t
& new_committed_to
);
189 journal_seq_t
get_current_write_seq() const {
190 assert(current_journal_segment
);
191 return journal_seq_t
{
193 paddr_t::make_seg_paddr(current_journal_segment
->get_segment_id(),
199 next_journal_segment_seq
= 0;
200 current_segment_nonce
= 0;
201 current_journal_segment
.reset();
206 // prepare segment for writes, writes out segment header
207 using initialize_segment_ertr
= base_ertr
;
208 initialize_segment_ertr::future
<> initialize_segment(Segment
&);
210 SegmentProvider
* segment_provider
;
211 SegmentManager
& segment_manager
;
213 segment_seq_t next_journal_segment_seq
;
214 segment_nonce_t current_segment_nonce
;
216 SegmentRef current_journal_segment
;
217 segment_off_t written_to
;
218 // committed_to may be in a previous journal segment
219 journal_seq_t committed_to
;
230 RecordBatch() = default;
231 RecordBatch(RecordBatch
&&) = delete;
232 RecordBatch(const RecordBatch
&) = delete;
233 RecordBatch
& operator=(RecordBatch
&&) = delete;
234 RecordBatch
& operator=(const RecordBatch
&) = delete;
236 bool is_empty() const {
237 return state
== state_t::EMPTY
;
240 bool is_pending() const {
241 return state
== state_t::PENDING
;
244 bool is_submitting() const {
245 return state
== state_t::SUBMITTING
;
248 std::size_t get_index() const {
252 std::size_t get_num_records() const {
253 return pending
.get_size();
256 // return the expected write sizes if allows to batch,
257 // otherwise, return nullopt
258 std::optional
<record_group_size_t
> can_batch(
259 const record_t
& record
,
260 extent_len_t block_size
) const {
261 assert(state
!= state_t::SUBMITTING
);
262 if (pending
.get_size() >= batch_capacity
||
263 (pending
.get_size() > 0 &&
264 pending
.size
.get_encoded_length() > batch_flush_size
)) {
265 assert(state
== state_t::PENDING
);
268 return get_encoded_length_after(record
, block_size
);
271 void initialize(std::size_t i
,
272 std::size_t _batch_capacity
,
273 std::size_t _batch_flush_size
) {
274 ceph_assert(_batch_capacity
> 0);
276 batch_capacity
= _batch_capacity
;
277 batch_flush_size
= _batch_flush_size
;
278 pending
.reserve(batch_capacity
);
281 // Add to the batch, the future will be resolved after the batch is
284 // Set write_result_t::write_length to 0 if the record is not the first one
286 using add_pending_ertr
= JournalSegmentManager::write_ertr
;
287 using add_pending_ret
= add_pending_ertr::future
<record_locator_t
>;
288 add_pending_ret
add_pending(
290 extent_len_t block_size
);
292 // Encode the batched records for write.
293 std::pair
<ceph::bufferlist
, record_group_size_t
> encode_batch(
294 const journal_seq_t
& committed_to
,
295 segment_nonce_t segment_nonce
);
297 // Set the write result and reset for reuse
298 using maybe_result_t
= std::optional
<write_result_t
>;
299 void set_result(maybe_result_t maybe_write_end_seq
);
301 // The fast path that is equivalent to submit a single record as a batch.
303 // Essentially, equivalent to the combined logic of:
304 // add_pending(), encode_batch() and set_result() above without
305 // the intervention of the shared io_promise.
307 // Note the current RecordBatch can be reused afterwards.
308 std::pair
<ceph::bufferlist
, record_group_size_t
> submit_pending_fast(
310 extent_len_t block_size
,
311 const journal_seq_t
& committed_to
,
312 segment_nonce_t segment_nonce
);
315 record_group_size_t
get_encoded_length_after(
316 const record_t
& record
,
317 extent_len_t block_size
) const {
318 return pending
.size
.get_encoded_length_after(
319 record
.size
, block_size
);
322 state_t state
= state_t::EMPTY
;
323 std::size_t index
= 0;
324 std::size_t batch_capacity
= 0;
325 std::size_t batch_flush_size
= 0;
327 record_group_t pending
;
328 std::size_t submitting_size
= 0;
329 segment_off_t submitting_length
= 0;
330 segment_off_t submitting_mdlength
= 0;
332 struct promise_result_t
{
333 write_result_t write_result
;
334 segment_off_t mdlength
;
336 using maybe_promise_result_t
= std::optional
<promise_result_t
>;
337 std::optional
<seastar::shared_promise
<maybe_promise_result_t
> > io_promise
;
340 class RecordSubmitter
{
342 IDLE
= 0, // outstanding_io == 0
343 PENDING
, // outstanding_io < io_depth_limit
344 FULL
// outstanding_io == io_depth_limit
345 // OVERFLOW: outstanding_io > io_depth_limit is impossible
348 struct grouped_io_stats
{
350 uint64_t num_io_grouped
= 0;
352 void increment(uint64_t num_grouped_io
) {
354 num_io_grouped
+= num_grouped_io
;
359 RecordSubmitter(std::size_t io_depth
,
360 std::size_t batch_capacity
,
361 std::size_t batch_flush_size
,
362 double preferred_fullness
,
363 JournalSegmentManager
&);
365 grouped_io_stats
get_record_batch_stats() const {
366 return stats
.record_batch_stats
;
369 grouped_io_stats
get_io_depth_stats() const {
370 return stats
.io_depth_stats
;
373 uint64_t get_record_group_padding_bytes() const {
374 return stats
.record_group_padding_bytes
;
377 uint64_t get_record_group_metadata_bytes() const {
378 return stats
.record_group_metadata_bytes
;
381 uint64_t get_record_group_data_bytes() const {
382 return stats
.record_group_data_bytes
;
389 void set_write_pipeline(WritePipeline
*_write_pipeline
) {
390 write_pipeline
= _write_pipeline
;
393 using submit_ret
= Journal::submit_record_ret
;
394 submit_ret
submit(record_t
&&, OrderingHandle
&);
399 void increment_io() {
400 ++num_outstanding_io
;
401 stats
.io_depth_stats
.increment(num_outstanding_io
);
405 void decrement_io_with_flush() {
406 assert(num_outstanding_io
> 0);
407 --num_outstanding_io
;
409 auto prv_state
= state
;
413 if (wait_submit_promise
.has_value()) {
414 assert(prv_state
== state_t::FULL
);
415 wait_submit_promise
->set_value();
416 wait_submit_promise
.reset();
419 if (!p_current_batch
->is_empty()) {
420 flush_current_batch();
424 void pop_free_batch() {
425 assert(p_current_batch
== nullptr);
426 assert(!free_batch_ptrs
.empty());
427 p_current_batch
= free_batch_ptrs
.front();
428 assert(p_current_batch
->is_empty());
429 assert(p_current_batch
== &batches
[p_current_batch
->get_index()]);
430 free_batch_ptrs
.pop_front();
433 void account_submission(std::size_t, const record_group_size_t
&);
435 using maybe_result_t
= RecordBatch::maybe_result_t
;
436 void finish_submit_batch(RecordBatch
*, maybe_result_t
);
438 void flush_current_batch();
440 using submit_pending_ertr
= JournalSegmentManager::write_ertr
;
441 using submit_pending_ret
= submit_pending_ertr::future
<
443 submit_pending_ret
submit_pending(
444 record_t
&&, OrderingHandle
&handle
, bool flush
);
446 using do_submit_ret
= submit_pending_ret
;
447 do_submit_ret
do_submit(
448 record_t
&&, OrderingHandle
&);
450 state_t state
= state_t::IDLE
;
451 std::size_t num_outstanding_io
= 0;
452 std::size_t io_depth_limit
;
453 double preferred_fullness
;
455 WritePipeline
* write_pipeline
= nullptr;
456 JournalSegmentManager
& journal_segment_manager
;
457 std::unique_ptr
<RecordBatch
[]> batches
;
458 std::size_t current_batch_index
;
459 // should not be nullptr after constructed
460 RecordBatch
* p_current_batch
= nullptr;
461 seastar::circular_buffer
<RecordBatch
*> free_batch_ptrs
;
462 std::optional
<seastar::promise
<> > wait_submit_promise
;
465 grouped_io_stats record_batch_stats
;
466 grouped_io_stats io_depth_stats
;
467 uint64_t record_group_padding_bytes
= 0;
468 uint64_t record_group_metadata_bytes
= 0;
469 uint64_t record_group_data_bytes
= 0;
473 SegmentProvider
* segment_provider
= nullptr;
474 JournalSegmentManager journal_segment_manager
;
475 RecordSubmitter record_submitter
;
476 ExtentReader
& scanner
;
477 seastar::metrics::metric_group metrics
;
479 /// return ordered vector of segments to replay
480 using replay_segments_t
= std::vector
<
481 std::pair
<journal_seq_t
, segment_header_t
>>;
482 using prep_replay_segments_ertr
= crimson::errorator
<
483 crimson::ct_error::input_output_error
485 using prep_replay_segments_fut
= prep_replay_segments_ertr::future
<
487 prep_replay_segments_fut
prep_replay_segments(
488 std::vector
<std::pair
<segment_id_t
, segment_header_t
>> segments
);
490 /// replays records starting at start through end of segment
491 replay_ertr::future
<>
493 journal_seq_t start
, ///< [in] starting addr, seq
494 segment_header_t header
, ///< [in] segment header
495 delta_handler_t
&delta_handler
///< [in] processes deltas in order
498 void register_metrics();
500 using JournalRef
= std::unique_ptr
<Journal
>;