]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/journal.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / os / seastore / journal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <boost/intrusive_ptr.hpp>
7 #include <optional>
8
9 #include <seastar/core/circular_buffer.hh>
10 #include <seastar/core/future.hh>
11 #include <seastar/core/metrics.hh>
12 #include <seastar/core/shared_future.hh>
13
14 #include "include/ceph_assert.h"
15 #include "include/buffer.h"
16 #include "include/denc.h"
17
18 #include "crimson/common/log.h"
19 #include "crimson/os/seastore/extent_reader.h"
20 #include "crimson/os/seastore/segment_manager.h"
21 #include "crimson/os/seastore/ordering_handle.h"
22 #include "crimson/os/seastore/seastore_types.h"
23 #include "crimson/osd/exceptions.h"
24
25 namespace crimson::os::seastore {
26
27 class SegmentProvider;
28 class SegmentedAllocator;
29
30 /**
31 * Manages stream of atomically written records to a SegmentManager.
32 */
33 class Journal {
34 public:
35 Journal(SegmentManager &segment_manager, ExtentReader& scanner);
36
37 /**
38 * Gets the current journal segment sequence.
39 */
40 segment_seq_t get_segment_seq() const {
41 return journal_segment_manager.get_segment_seq();
42 }
43
44 /**
45 * Sets the SegmentProvider.
46 *
47 * Not provided in constructor to allow the provider to not own
48 * or construct the Journal (TransactionManager).
49 *
50 * Note, Journal does not own this ptr, user must ensure that
51 * *provider outlives Journal.
52 */
53 void set_segment_provider(SegmentProvider *provider) {
54 segment_provider = provider;
55 journal_segment_manager.set_segment_provider(provider);
56 }
57
58 /**
59 * initializes journal for new writes -- must run prior to calls
60 * to submit_record. Should be called after replay if not a new
61 * Journal.
62 */
63 using open_for_write_ertr = crimson::errorator<
64 crimson::ct_error::input_output_error
65 >;
66 using open_for_write_ret = open_for_write_ertr::future<journal_seq_t>;
67 open_for_write_ret open_for_write() {
68 return journal_segment_manager.open();
69 }
70
71 /**
72 * close journal
73 *
74 * TODO: should probably flush and disallow further writes
75 */
76 using close_ertr = crimson::errorator<
77 crimson::ct_error::input_output_error>;
78 close_ertr::future<> close() {
79 metrics.clear();
80 return journal_segment_manager.close();
81 }
82
83 /**
84 * submit_record
85 *
86 * write record with the ordering handle
87 */
88 using submit_record_ertr = crimson::errorator<
89 crimson::ct_error::erange,
90 crimson::ct_error::input_output_error
91 >;
92 using submit_record_ret = submit_record_ertr::future<
93 record_locator_t
94 >;
95 submit_record_ret submit_record(
96 record_t &&record,
97 OrderingHandle &handle
98 ) {
99 return record_submitter.submit(std::move(record), handle);
100 }
101
102 /**
103 * Read deltas and pass to delta_handler
104 *
105 * record_block_start (argument to delta_handler) is the start of the
106 * of the first block in the record
107 */
108 using replay_ertr = SegmentManager::read_ertr;
109 using replay_ret = replay_ertr::future<>;
110 using delta_handler_t = std::function<
111 replay_ret(const record_locator_t&,
112 const delta_info_t&)>;
113 replay_ret replay(
114 std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
115 delta_handler_t &&delta_handler);
116
117 void set_write_pipeline(WritePipeline* write_pipeline) {
118 record_submitter.set_write_pipeline(write_pipeline);
119 }
120
121 private:
122 class JournalSegmentManager {
123 public:
124 JournalSegmentManager(SegmentManager&);
125
126 using base_ertr = crimson::errorator<
127 crimson::ct_error::input_output_error>;
128 extent_len_t get_max_write_length() const {
129 return segment_manager.get_segment_size() -
130 p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
131 size_t(segment_manager.get_block_size()));
132 }
133
134 segment_off_t get_block_size() const {
135 return segment_manager.get_block_size();
136 }
137
138 segment_nonce_t get_nonce() const {
139 return current_segment_nonce;
140 }
141
142 journal_seq_t get_committed_to() const {
143 return committed_to;
144 }
145
146 segment_seq_t get_segment_seq() const {
147 return next_journal_segment_seq - 1;
148 }
149
150 void set_segment_provider(SegmentProvider* provider) {
151 segment_provider = provider;
152 }
153
154 void set_segment_seq(segment_seq_t current_seq) {
155 next_journal_segment_seq = (current_seq + 1);
156 }
157
158 using open_ertr = base_ertr;
159 using open_ret = open_ertr::future<journal_seq_t>;
160 open_ret open() {
161 return roll().safe_then([this] {
162 return get_current_write_seq();
163 });
164 }
165
166 using close_ertr = base_ertr;
167 close_ertr::future<> close();
168
169 // returns true iff the current segment has insufficient space
170 bool needs_roll(std::size_t length) const {
171 auto write_capacity = current_journal_segment->get_write_capacity();
172 return length + written_to > std::size_t(write_capacity);
173 }
174
175 // close the current segment and initialize next one
176 using roll_ertr = base_ertr;
177 roll_ertr::future<> roll();
178
179 // write the buffer, return the write result
180 // May be called concurrently, writes may complete in any order.
181 using write_ertr = base_ertr;
182 using write_ret = write_ertr::future<write_result_t>;
183 write_ret write(ceph::bufferlist to_write);
184
185 // mark write committed in order
186 void mark_committed(const journal_seq_t& new_committed_to);
187
188 private:
189 journal_seq_t get_current_write_seq() const {
190 assert(current_journal_segment);
191 return journal_seq_t{
192 get_segment_seq(),
193 paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(),
194 written_to)
195 };
196 }
197
198 void reset() {
199 next_journal_segment_seq = 0;
200 current_segment_nonce = 0;
201 current_journal_segment.reset();
202 written_to = 0;
203 committed_to = {};
204 }
205
206 // prepare segment for writes, writes out segment header
207 using initialize_segment_ertr = base_ertr;
208 initialize_segment_ertr::future<> initialize_segment(Segment&);
209
210 SegmentProvider* segment_provider;
211 SegmentManager& segment_manager;
212
213 segment_seq_t next_journal_segment_seq;
214 segment_nonce_t current_segment_nonce;
215
216 SegmentRef current_journal_segment;
217 segment_off_t written_to;
218 // committed_to may be in a previous journal segment
219 journal_seq_t committed_to;
220 };
221
222 class RecordBatch {
223 enum class state_t {
224 EMPTY = 0,
225 PENDING,
226 SUBMITTING
227 };
228
229 public:
230 RecordBatch() = default;
231 RecordBatch(RecordBatch&&) = delete;
232 RecordBatch(const RecordBatch&) = delete;
233 RecordBatch& operator=(RecordBatch&&) = delete;
234 RecordBatch& operator=(const RecordBatch&) = delete;
235
236 bool is_empty() const {
237 return state == state_t::EMPTY;
238 }
239
240 bool is_pending() const {
241 return state == state_t::PENDING;
242 }
243
244 bool is_submitting() const {
245 return state == state_t::SUBMITTING;
246 }
247
248 std::size_t get_index() const {
249 return index;
250 }
251
252 std::size_t get_num_records() const {
253 return pending.get_size();
254 }
255
256 // return the expected write sizes if allows to batch,
257 // otherwise, return nullopt
258 std::optional<record_group_size_t> can_batch(
259 const record_t& record,
260 extent_len_t block_size) const {
261 assert(state != state_t::SUBMITTING);
262 if (pending.get_size() >= batch_capacity ||
263 (pending.get_size() > 0 &&
264 pending.size.get_encoded_length() > batch_flush_size)) {
265 assert(state == state_t::PENDING);
266 return std::nullopt;
267 }
268 return get_encoded_length_after(record, block_size);
269 }
270
271 void initialize(std::size_t i,
272 std::size_t _batch_capacity,
273 std::size_t _batch_flush_size) {
274 ceph_assert(_batch_capacity > 0);
275 index = i;
276 batch_capacity = _batch_capacity;
277 batch_flush_size = _batch_flush_size;
278 pending.reserve(batch_capacity);
279 }
280
281 // Add to the batch, the future will be resolved after the batch is
282 // written.
283 //
284 // Set write_result_t::write_length to 0 if the record is not the first one
285 // in the batch.
286 using add_pending_ertr = JournalSegmentManager::write_ertr;
287 using add_pending_ret = add_pending_ertr::future<record_locator_t>;
288 add_pending_ret add_pending(
289 record_t&&,
290 extent_len_t block_size);
291
292 // Encode the batched records for write.
293 std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
294 const journal_seq_t& committed_to,
295 segment_nonce_t segment_nonce);
296
297 // Set the write result and reset for reuse
298 using maybe_result_t = std::optional<write_result_t>;
299 void set_result(maybe_result_t maybe_write_end_seq);
300
301 // The fast path that is equivalent to submit a single record as a batch.
302 //
303 // Essentially, equivalent to the combined logic of:
304 // add_pending(), encode_batch() and set_result() above without
305 // the intervention of the shared io_promise.
306 //
307 // Note the current RecordBatch can be reused afterwards.
308 std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
309 record_t&&,
310 extent_len_t block_size,
311 const journal_seq_t& committed_to,
312 segment_nonce_t segment_nonce);
313
314 private:
315 record_group_size_t get_encoded_length_after(
316 const record_t& record,
317 extent_len_t block_size) const {
318 return pending.size.get_encoded_length_after(
319 record.size, block_size);
320 }
321
322 state_t state = state_t::EMPTY;
323 std::size_t index = 0;
324 std::size_t batch_capacity = 0;
325 std::size_t batch_flush_size = 0;
326
327 record_group_t pending;
328 std::size_t submitting_size = 0;
329 segment_off_t submitting_length = 0;
330 segment_off_t submitting_mdlength = 0;
331
332 struct promise_result_t {
333 write_result_t write_result;
334 segment_off_t mdlength;
335 };
336 using maybe_promise_result_t = std::optional<promise_result_t>;
337 std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
338 };
339
340 class RecordSubmitter {
341 enum class state_t {
342 IDLE = 0, // outstanding_io == 0
343 PENDING, // outstanding_io < io_depth_limit
344 FULL // outstanding_io == io_depth_limit
345 // OVERFLOW: outstanding_io > io_depth_limit is impossible
346 };
347
348 struct grouped_io_stats {
349 uint64_t num_io = 0;
350 uint64_t num_io_grouped = 0;
351
352 void increment(uint64_t num_grouped_io) {
353 ++num_io;
354 num_io_grouped += num_grouped_io;
355 }
356 };
357
358 public:
359 RecordSubmitter(std::size_t io_depth,
360 std::size_t batch_capacity,
361 std::size_t batch_flush_size,
362 double preferred_fullness,
363 JournalSegmentManager&);
364
365 grouped_io_stats get_record_batch_stats() const {
366 return stats.record_batch_stats;
367 }
368
369 grouped_io_stats get_io_depth_stats() const {
370 return stats.io_depth_stats;
371 }
372
373 uint64_t get_record_group_padding_bytes() const {
374 return stats.record_group_padding_bytes;
375 }
376
377 uint64_t get_record_group_metadata_bytes() const {
378 return stats.record_group_metadata_bytes;
379 }
380
381 uint64_t get_record_group_data_bytes() const {
382 return stats.record_group_data_bytes;
383 }
384
385 void reset_stats() {
386 stats = {};
387 }
388
389 void set_write_pipeline(WritePipeline *_write_pipeline) {
390 write_pipeline = _write_pipeline;
391 }
392
393 using submit_ret = Journal::submit_record_ret;
394 submit_ret submit(record_t&&, OrderingHandle&);
395
396 private:
397 void update_state();
398
399 void increment_io() {
400 ++num_outstanding_io;
401 stats.io_depth_stats.increment(num_outstanding_io);
402 update_state();
403 }
404
405 void decrement_io_with_flush() {
406 assert(num_outstanding_io > 0);
407 --num_outstanding_io;
408 #ifndef NDEBUG
409 auto prv_state = state;
410 #endif
411 update_state();
412
413 if (wait_submit_promise.has_value()) {
414 assert(prv_state == state_t::FULL);
415 wait_submit_promise->set_value();
416 wait_submit_promise.reset();
417 }
418
419 if (!p_current_batch->is_empty()) {
420 flush_current_batch();
421 }
422 }
423
424 void pop_free_batch() {
425 assert(p_current_batch == nullptr);
426 assert(!free_batch_ptrs.empty());
427 p_current_batch = free_batch_ptrs.front();
428 assert(p_current_batch->is_empty());
429 assert(p_current_batch == &batches[p_current_batch->get_index()]);
430 free_batch_ptrs.pop_front();
431 }
432
433 void account_submission(std::size_t, const record_group_size_t&);
434
435 using maybe_result_t = RecordBatch::maybe_result_t;
436 void finish_submit_batch(RecordBatch*, maybe_result_t);
437
438 void flush_current_batch();
439
440 using submit_pending_ertr = JournalSegmentManager::write_ertr;
441 using submit_pending_ret = submit_pending_ertr::future<
442 record_locator_t>;
443 submit_pending_ret submit_pending(
444 record_t&&, OrderingHandle &handle, bool flush);
445
446 using do_submit_ret = submit_pending_ret;
447 do_submit_ret do_submit(
448 record_t&&, OrderingHandle&);
449
450 state_t state = state_t::IDLE;
451 std::size_t num_outstanding_io = 0;
452 std::size_t io_depth_limit;
453 double preferred_fullness;
454
455 WritePipeline* write_pipeline = nullptr;
456 JournalSegmentManager& journal_segment_manager;
457 std::unique_ptr<RecordBatch[]> batches;
458 std::size_t current_batch_index;
459 // should not be nullptr after constructed
460 RecordBatch* p_current_batch = nullptr;
461 seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
462 std::optional<seastar::promise<> > wait_submit_promise;
463
464 struct {
465 grouped_io_stats record_batch_stats;
466 grouped_io_stats io_depth_stats;
467 uint64_t record_group_padding_bytes = 0;
468 uint64_t record_group_metadata_bytes = 0;
469 uint64_t record_group_data_bytes = 0;
470 } stats;
471 };
472
473 SegmentProvider* segment_provider = nullptr;
474 JournalSegmentManager journal_segment_manager;
475 RecordSubmitter record_submitter;
476 ExtentReader& scanner;
477 seastar::metrics::metric_group metrics;
478
479 /// return ordered vector of segments to replay
480 using replay_segments_t = std::vector<
481 std::pair<journal_seq_t, segment_header_t>>;
482 using prep_replay_segments_ertr = crimson::errorator<
483 crimson::ct_error::input_output_error
484 >;
485 using prep_replay_segments_fut = prep_replay_segments_ertr::future<
486 replay_segments_t>;
487 prep_replay_segments_fut prep_replay_segments(
488 std::vector<std::pair<segment_id_t, segment_header_t>> segments);
489
490 /// replays records starting at start through end of segment
491 replay_ertr::future<>
492 replay_segment(
493 journal_seq_t start, ///< [in] starting addr, seq
494 segment_header_t header, ///< [in] segment header
495 delta_handler_t &delta_handler ///< [in] processes deltas in order
496 );
497
498 void register_metrics();
499 };
500 using JournalRef = std::unique_ptr<Journal>;
501
502 }