1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/WorkQueue.h"
13 #include "common/Timer.h"
14 #include "common/perf_counters.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/asio/ContextWQ.h"
17 #include "librbd/cache/pwl/ImageCacheState.h"
18 #include "librbd/cache/pwl/LogEntry.h"
23 #define dout_subsys ceph_subsys_rbd_pwl
25 #define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \
26 << this << " " << __func__ << ": "
34 using namespace librbd::cache::pwl
;
36 static bool is_valid_pool_root(const WriteLogPoolRoot
& root
) {
37 return root
.pool_size
% MIN_WRITE_ALLOC_SSD_SIZE
== 0 &&
38 root
.first_valid_entry
>= DATA_RING_BUFFER_OFFSET
&&
39 root
.first_valid_entry
< root
.pool_size
&&
40 root
.first_valid_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0 &&
41 root
.first_free_entry
>= DATA_RING_BUFFER_OFFSET
&&
42 root
.first_free_entry
< root
.pool_size
&&
43 root
.first_free_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0;
47 Builder
<AbstractWriteLog
<I
>>* WriteLog
<I
>::create_builder() {
48 m_builderobj
= new Builder
<This
>();
53 WriteLog
<I
>::WriteLog(
54 I
&image_ctx
, librbd::cache::pwl::ImageCacheState
<I
>* cache_state
,
55 cache::ImageWritebackInterface
& image_writeback
,
56 plugin::Api
<I
>& plugin_api
)
57 : AbstractWriteLog
<I
>(image_ctx
, cache_state
, create_builder(),
58 image_writeback
, plugin_api
)
63 WriteLog
<I
>::~WriteLog() {
68 void WriteLog
<I
>::collect_read_extents(
69 uint64_t read_buffer_offset
, LogMapEntry
<GenericWriteLogEntry
> map_entry
,
70 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries_to_read
,
71 std::vector
<bufferlist
*> &bls_to_read
,
72 uint64_t entry_hit_length
, Extent hit_extent
,
73 pwl::C_ReadRequest
*read_ctx
) {
74 // Make a bl for this hit extent. This will add references to the
75 // write_entry->cache_bl */
76 ldout(m_image_ctx
.cct
, 5) << dendl
;
77 auto write_entry
= std::static_pointer_cast
<WriteLogEntry
>(map_entry
.log_entry
);
79 write_entry
->copy_cache_bl(&hit_bl
);
80 bool writesame
= write_entry
->is_writesame_entry();
81 auto hit_extent_buf
= std::make_shared
<ImageExtentBuf
>(
82 hit_extent
, hit_bl
, true, read_buffer_offset
, writesame
);
83 read_ctx
->read_extents
.push_back(hit_extent_buf
);
85 if (!hit_bl
.length()) {
86 ldout(m_image_ctx
.cct
, 5) << "didn't hit RAM" << dendl
;
87 auto read_extent
= read_ctx
->read_extents
.back();
88 write_entry
->inc_bl_refs();
89 log_entries_to_read
.push_back(std::move(write_entry
));
90 bls_to_read
.push_back(&read_extent
->m_bl
);
95 void WriteLog
<I
>::complete_read(
96 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries_to_read
,
97 std::vector
<bufferlist
*> &bls_to_read
,
99 if (!log_entries_to_read
.empty()) {
100 aio_read_data_blocks(log_entries_to_read
, bls_to_read
, ctx
);
106 template <typename I
>
107 int WriteLog
<I
>::create_and_open_bdev() {
108 CephContext
*cct
= m_image_ctx
.cct
;
110 bdev
= BlockDevice::create(cct
, this->m_log_pool_name
, aio_cache_cb
,
111 nullptr, nullptr, nullptr);
112 int r
= bdev
->open(this->m_log_pool_name
);
114 lderr(cct
) << "failed to open bdev" << dendl
;
119 ceph_assert(this->m_log_pool_size
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
120 if (bdev
->get_size() != this->m_log_pool_size
) {
121 lderr(cct
) << "size mismatch: bdev size " << bdev
->get_size()
122 << " (block size " << bdev
->get_block_size()
123 << ") != pool size " << this->m_log_pool_size
<< dendl
;
132 template <typename I
>
133 bool WriteLog
<I
>::initialize_pool(Context
*on_finish
,
134 pwl::DeferredContexts
&later
) {
136 CephContext
*cct
= m_image_ctx
.cct
;
138 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
139 if (access(this->m_log_pool_name
.c_str(), F_OK
) != 0) {
140 int fd
= ::open(this->m_log_pool_name
.c_str(), O_RDWR
|O_CREAT
, 0644);
143 if (truncate(this->m_log_pool_name
.c_str(),
144 this->m_log_pool_size
) != 0) {
152 m_cache_state
->present
= false;
153 m_cache_state
->clean
= true;
154 m_cache_state
->empty
= true;
155 /* TODO: filter/replace errnos that are meaningless to the caller */
156 on_finish
->complete(-errno
);
160 r
= create_and_open_bdev();
162 on_finish
->complete(r
);
165 m_cache_state
->present
= true;
166 m_cache_state
->clean
= true;
167 m_cache_state
->empty
= true;
168 /* new pool, calculate and store metadata */
170 /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free.
171 * In this way, when all ring buffer spaces are allocated,
172 * m_first_free_entry and m_first_valid_entry will not be equal.
173 * Equal only means the cache is empty. */
174 this->m_bytes_allocated_cap
= this->m_log_pool_size
-
175 DATA_RING_BUFFER_OFFSET
- MIN_WRITE_ALLOC_SSD_SIZE
;
177 m_first_free_entry
= DATA_RING_BUFFER_OFFSET
;
178 m_first_valid_entry
= DATA_RING_BUFFER_OFFSET
;
180 auto new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
181 new_root
->layout_version
= SSD_LAYOUT_VERSION
;
182 new_root
->pool_size
= this->m_log_pool_size
;
183 new_root
->flushed_sync_gen
= this->m_flushed_sync_gen
;
184 new_root
->block_size
= MIN_WRITE_ALLOC_SSD_SIZE
;
185 new_root
->first_free_entry
= m_first_free_entry
;
186 new_root
->first_valid_entry
= m_first_valid_entry
;
187 new_root
->num_log_entries
= 0;
188 pool_root
= *new_root
;
190 r
= update_pool_root_sync(new_root
);
192 lderr(cct
) << "failed to initialize pool ("
193 << this->m_log_pool_name
<< ")" << dendl
;
196 on_finish
->complete(r
);
200 ceph_assert(m_cache_state
->present
);
201 r
= create_and_open_bdev();
203 on_finish
->complete(r
);
208 SuperBlock superblock
;
209 ::IOContext
ioctx(cct
, nullptr);
210 r
= bdev
->read(0, MIN_WRITE_ALLOC_SSD_SIZE
, &bl
, &ioctx
, false);
212 lderr(cct
) << "read ssd cache superblock failed " << dendl
;
215 auto p
= bl
.cbegin();
216 decode(superblock
, p
);
217 pool_root
= superblock
.root
;
218 ldout(cct
, 1) << "Decoded root: pool_size=" << pool_root
.pool_size
219 << " first_valid_entry=" << pool_root
.first_valid_entry
220 << " first_free_entry=" << pool_root
.first_free_entry
221 << " flushed_sync_gen=" << pool_root
.flushed_sync_gen
223 ceph_assert(is_valid_pool_root(pool_root
));
224 if (pool_root
.layout_version
!= SSD_LAYOUT_VERSION
) {
225 lderr(cct
) << "pool layout version is "
226 << pool_root
.layout_version
227 << " expected " << SSD_LAYOUT_VERSION
231 if (pool_root
.block_size
!= MIN_WRITE_ALLOC_SSD_SIZE
) {
232 lderr(cct
) << "pool block size is " << pool_root
.block_size
233 << " expected " << MIN_WRITE_ALLOC_SSD_SIZE
238 this->m_log_pool_size
= pool_root
.pool_size
;
239 this->m_flushed_sync_gen
= pool_root
.flushed_sync_gen
;
240 this->m_first_valid_entry
= pool_root
.first_valid_entry
;
241 this->m_first_free_entry
= pool_root
.first_free_entry
;
242 this->m_bytes_allocated_cap
= this->m_log_pool_size
-
243 DATA_RING_BUFFER_OFFSET
-
244 MIN_WRITE_ALLOC_SSD_SIZE
;
246 load_existing_entries(later
);
247 m_cache_state
->clean
= this->m_dirty_log_entries
.empty();
248 m_cache_state
->empty
= m_log_entries
.empty();
255 on_finish
->complete(-EINVAL
);
259 template <typename I
>
260 void WriteLog
<I
>::remove_pool_file() {
265 ldout(m_image_ctx
.cct
, 5) << "block device is closed" << dendl
;
267 if (m_cache_state
->clean
) {
268 ldout(m_image_ctx
.cct
, 5) << "Removing empty pool file: "
269 << this->m_log_pool_name
<< dendl
;
270 if (remove(this->m_log_pool_name
.c_str()) != 0) {
271 lderr(m_image_ctx
.cct
) << "failed to remove empty pool \""
272 << this->m_log_pool_name
<< "\": " << dendl
;
274 m_cache_state
->present
= false;
277 ldout(m_image_ctx
.cct
, 5) << "Not removing pool file: "
278 << this->m_log_pool_name
<< dendl
;
282 template <typename I
>
283 void WriteLog
<I
>::load_existing_entries(pwl::DeferredContexts
&later
) {
284 CephContext
*cct
= m_image_ctx
.cct
;
285 std::map
<uint64_t, std::shared_ptr
<SyncPointLogEntry
>> sync_point_entries
;
286 std::map
<uint64_t, bool> missing_sync_points
;
288 // Iterate through the log_entries and append all the write_bytes
289 // of each entry to fetch the pos of next 4k of log_entries. Iterate
290 // through the log entries and append them to the in-memory vector
291 for (uint64_t next_log_pos
= this->m_first_valid_entry
;
292 next_log_pos
!= this->m_first_free_entry
; ) {
293 // read the entries from SSD cache and decode
294 bufferlist bl_entries
;
295 ::IOContext
ioctx_entry(cct
, nullptr);
296 bdev
->read(next_log_pos
, MIN_WRITE_ALLOC_SSD_SIZE
, &bl_entries
,
297 &ioctx_entry
, false);
298 std::vector
<WriteLogCacheEntry
> ssd_log_entries
;
299 auto pl
= bl_entries
.cbegin();
300 decode(ssd_log_entries
, pl
);
301 ldout(cct
, 5) << "decoded ssd log entries" << dendl
;
302 uint64_t curr_log_pos
= next_log_pos
;
303 std::shared_ptr
<GenericLogEntry
> log_entry
= nullptr;
305 for (auto it
= ssd_log_entries
.begin(); it
!= ssd_log_entries
.end(); ++it
) {
306 this->update_entries(&log_entry
, &*it
, missing_sync_points
,
307 sync_point_entries
, curr_log_pos
);
308 log_entry
->ram_entry
= *it
;
309 log_entry
->log_entry_index
= curr_log_pos
;
310 log_entry
->completed
= true;
311 m_log_entries
.push_back(log_entry
);
312 next_log_pos
+= round_up_to(it
->write_bytes
, MIN_WRITE_ALLOC_SSD_SIZE
);
314 // along with the write_bytes, add control block size too
315 next_log_pos
+= MIN_WRITE_ALLOC_SSD_SIZE
;
316 if (next_log_pos
>= this->m_log_pool_size
) {
317 next_log_pos
= next_log_pos
% this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
;
320 this->update_sync_points(missing_sync_points
, sync_point_entries
, later
);
321 if (m_first_valid_entry
> m_first_free_entry
) {
322 m_bytes_allocated
= this->m_log_pool_size
- m_first_valid_entry
+
323 m_first_free_entry
- DATA_RING_BUFFER_OFFSET
;
325 m_bytes_allocated
= m_first_free_entry
- m_first_valid_entry
;
329 // For SSD we don't calc m_bytes_allocated in this
330 template <typename I
>
331 void WriteLog
<I
>::inc_allocated_cached_bytes(
332 std::shared_ptr
<pwl::GenericLogEntry
> log_entry
) {
333 if (log_entry
->is_write_entry()) {
334 this->m_bytes_cached
+= log_entry
->write_bytes();
338 template <typename I
>
339 bool WriteLog
<I
>::alloc_resources(C_BlockIORequestT
*req
) {
340 bool alloc_succeeds
= true;
341 uint64_t bytes_allocated
= 0;
342 uint64_t bytes_cached
= 0;
343 uint64_t bytes_dirtied
= 0;
344 uint64_t num_lanes
= 0;
345 uint64_t num_unpublished_reserves
= 0;
346 uint64_t num_log_entries
= 0;
348 // Setup buffer, and get all the number of required resources
349 req
->setup_buffer_resources(&bytes_cached
, &bytes_dirtied
, &bytes_allocated
,
350 &num_lanes
, &num_log_entries
,
351 &num_unpublished_reserves
);
353 ceph_assert(!num_lanes
);
354 if (num_log_entries
) {
355 bytes_allocated
+= num_log_entries
* MIN_WRITE_ALLOC_SSD_SIZE
;
358 ceph_assert(!num_unpublished_reserves
);
360 alloc_succeeds
= this->check_allocation(req
, bytes_cached
, bytes_dirtied
,
361 bytes_allocated
, num_lanes
,
363 num_unpublished_reserves
);
364 req
->set_allocated(alloc_succeeds
);
365 return alloc_succeeds
;
368 template <typename I
>
369 bool WriteLog
<I
>::has_sync_point_logs(GenericLogOperations
&ops
) {
370 for (auto &op
: ops
) {
371 if (op
->get_log_entry()->is_sync_point()) {
380 void WriteLog
<I
>::enlist_op_appender() {
381 this->m_async_append_ops
++;
382 this->m_async_op_tracker
.start_op();
383 Context
*append_ctx
= new LambdaContext([this](int r
) {
384 append_scheduled_ops();
386 this->m_work_queue
.queue(append_ctx
);
390 * Takes custody of ops. They'll all get their log entries appended,
391 * and have their on_write_persist contexts completed once they and
392 * all prior log entries are persisted everywhere.
395 void WriteLog
<I
>::schedule_append_ops(GenericLogOperations
&ops
, C_BlockIORequestT
*req
) {
396 bool need_finisher
= false;
397 GenericLogOperationsVector appending
;
399 std::copy(std::begin(ops
), std::end(ops
), std::back_inserter(appending
));
401 std::lock_guard
locker(m_lock
);
403 bool persist_on_flush
= this->get_persist_on_flush();
404 need_finisher
= !this->m_appending
&&
405 ((this->m_ops_to_append
.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES
) ||
408 // Only flush logs into SSD when there is internal/external flush request
409 if (!need_finisher
) {
410 need_finisher
= has_sync_point_logs(ops
);
412 this->m_ops_to_append
.splice(this->m_ops_to_append
.end(), ops
);
414 // To preserve the order of overlapping IOs, release_cell() may be
415 // called only after the ops are added to m_ops_to_append.
416 // As soon as m_lock is released, the appended ops can be picked up
417 // by append_scheduled_ops() in another thread and req can be freed.
418 if (req
!= nullptr) {
419 if (persist_on_flush
) {
420 req
->complete_user_request(0);
427 this->enlist_op_appender();
430 for (auto &op
: appending
) {
435 template <typename I
>
436 void WriteLog
<I
>::setup_schedule_append(pwl::GenericLogOperationsVector
&ops
,
438 C_BlockIORequestT
*req
) {
439 this->schedule_append(ops
, req
);
442 template <typename I
>
443 void WriteLog
<I
>::append_scheduled_ops(void) {
444 GenericLogOperations ops
;
445 ldout(m_image_ctx
.cct
, 20) << dendl
;
447 bool ops_remain
= false; // unused, no-op variable for SSD
448 bool appending
= false; // unused, no-op variable for SSD
449 this->append_scheduled(ops
, ops_remain
, appending
);
452 alloc_op_log_entries(ops
);
453 append_op_log_entries(ops
);
455 this->m_async_append_ops
--;
456 this->m_async_op_tracker
.finish_op();
461 * Write and persist the (already allocated) write log entries and
462 * data buffer allocations for a set of ops. The data buffer for each
463 * of these must already have been persisted to its reserved area.
465 template <typename I
>
466 void WriteLog
<I
>::append_op_log_entries(GenericLogOperations
&ops
) {
467 ceph_assert(!ops
.empty());
468 ldout(m_image_ctx
.cct
, 20) << dendl
;
469 Context
*ctx
= new LambdaContext([this, ops
](int r
) {
471 ldout(m_image_ctx
.cct
, 20) << "Finished root update " << dendl
;
473 auto captured_ops
= std::move(ops
);
474 this->complete_op_log_entries(std::move(captured_ops
), r
);
476 bool need_finisher
= false;
478 std::lock_guard
locker1(m_lock
);
479 bool persist_on_flush
= this->get_persist_on_flush();
480 need_finisher
= ((this->m_ops_to_append
.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES
) ||
483 if (!need_finisher
) {
484 need_finisher
= has_sync_point_logs(this->m_ops_to_append
);
489 this->enlist_op_appender();
491 this->m_async_update_superblock
--;
492 this->m_async_op_tracker
.finish_op();
494 uint64_t *new_first_free_entry
= new(uint64_t);
495 Context
*append_ctx
= new LambdaContext(
496 [this, new_first_free_entry
, ops
, ctx
](int r
) {
497 std::shared_ptr
<WriteLogPoolRoot
> new_root
;
499 ldout(m_image_ctx
.cct
, 20) << "Finished appending at "
500 << *new_first_free_entry
<< dendl
;
501 utime_t now
= ceph_clock_now();
502 for (auto &operation
: ops
) {
503 operation
->log_append_comp_time
= now
;
506 std::lock_guard
locker(this->m_log_append_lock
);
507 std::lock_guard
locker1(m_lock
);
508 assert(this->m_appending
);
509 this->m_appending
= false;
510 new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
511 pool_root
.first_free_entry
= *new_first_free_entry
;
512 new_root
->first_free_entry
= *new_first_free_entry
;
513 delete new_first_free_entry
;
514 schedule_update_root(new_root
, ctx
);
516 this->m_async_append_ops
--;
517 this->m_async_op_tracker
.finish_op();
519 // Append logs and update first_free_update
520 append_ops(ops
, append_ctx
, new_first_free_entry
);
523 this->dispatch_deferred_writes();
527 template <typename I
>
528 void WriteLog
<I
>::release_ram(std::shared_ptr
<GenericLogEntry
> log_entry
) {
529 log_entry
->remove_cache_bl();
532 template <typename I
>
533 void WriteLog
<I
>::alloc_op_log_entries(GenericLogOperations
&ops
) {
534 std::lock_guard
locker(m_lock
);
536 for (auto &operation
: ops
) {
537 auto &log_entry
= operation
->get_log_entry();
538 log_entry
->ram_entry
.set_entry_valid(true);
539 m_log_entries
.push_back(log_entry
);
540 ldout(m_image_ctx
.cct
, 20) << "operation=[" << *operation
<< "]" << dendl
;
542 if (m_cache_state
->empty
&& !m_log_entries
.empty()) {
543 m_cache_state
->empty
= false;
544 this->update_image_cache_state();
548 template <typename I
>
549 void WriteLog
<I
>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush
,
550 DeferredContexts
&post_unlock
,
551 bool has_write_entry
) {
552 // snapshot so we behave consistently
553 bool invalidating
= this->m_invalidating
;
555 if (invalidating
|| !has_write_entry
) {
556 for (auto &log_entry
: entries_to_flush
) {
557 GuardedRequestFunctionContext
*guarded_ctx
=
558 new GuardedRequestFunctionContext([this, log_entry
, invalidating
]
559 (GuardedRequestFunctionContext
&guard_ctx
) {
560 log_entry
->m_cell
= guard_ctx
.cell
;
561 Context
*ctx
= this->construct_flush_entry(log_entry
, invalidating
);
564 ctx
= new LambdaContext([this, log_entry
, ctx
](int r
) {
565 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
566 [this, log_entry
, ctx
](int r
) {
567 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
568 << " " << *log_entry
<< dendl
;
569 log_entry
->writeback(this->m_image_writeback
, ctx
);
575 this->detain_flush_guard_request(log_entry
, guarded_ctx
);
578 int count
= entries_to_flush
.size();
579 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> write_entries
;
580 std::vector
<bufferlist
*> read_bls
;
582 write_entries
.reserve(count
);
583 read_bls
.reserve(count
);
585 for (auto &log_entry
: entries_to_flush
) {
586 if (log_entry
->is_write_entry()) {
587 bufferlist
*bl
= new bufferlist
;
588 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entry
);
589 write_entry
->inc_bl_refs();
590 write_entries
.push_back(write_entry
);
591 read_bls
.push_back(bl
);
595 Context
*ctx
= new LambdaContext(
596 [this, entries_to_flush
, read_bls
](int r
) {
598 GuardedRequestFunctionContext
*guarded_ctx
= nullptr;
600 for (auto &log_entry
: entries_to_flush
) {
601 if (log_entry
->is_write_entry()) {
602 bufferlist captured_entry_bl
;
603 captured_entry_bl
.claim_append(*read_bls
[i
]);
604 delete read_bls
[i
++];
606 guarded_ctx
= new GuardedRequestFunctionContext([this, log_entry
, captured_entry_bl
]
607 (GuardedRequestFunctionContext
&guard_ctx
) {
608 log_entry
->m_cell
= guard_ctx
.cell
;
609 Context
*ctx
= this->construct_flush_entry(log_entry
, false);
611 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
612 [this, log_entry
, entry_bl
=std::move(captured_entry_bl
), ctx
](int r
) {
613 auto captured_entry_bl
= std::move(entry_bl
);
614 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
615 << " " << *log_entry
<< dendl
;
616 log_entry
->writeback_bl(this->m_image_writeback
, ctx
,
617 std::move(captured_entry_bl
));
621 guarded_ctx
= new GuardedRequestFunctionContext([this, log_entry
]
622 (GuardedRequestFunctionContext
&guard_ctx
) {
623 log_entry
->m_cell
= guard_ctx
.cell
;
624 Context
*ctx
= this->construct_flush_entry(log_entry
, false);
625 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
626 [this, log_entry
, ctx
](int r
) {
627 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
628 << " " << *log_entry
<< dendl
;
629 log_entry
->writeback(this->m_image_writeback
, ctx
);
633 this->detain_flush_guard_request(log_entry
, guarded_ctx
);
637 aio_read_data_blocks(write_entries
, read_bls
, ctx
);
641 template <typename I
>
642 void WriteLog
<I
>::process_work() {
643 CephContext
*cct
= m_image_ctx
.cct
;
644 int max_iterations
= 4;
645 bool wake_up_requested
= false;
646 uint64_t aggressive_high_water_bytes
=
647 this->m_bytes_allocated_cap
* AGGRESSIVE_RETIRE_HIGH_WATER
;
648 uint64_t high_water_bytes
= this->m_bytes_allocated_cap
* RETIRE_HIGH_WATER
;
650 ldout(cct
, 20) << dendl
;
654 std::lock_guard
locker(m_lock
);
655 this->m_wake_up_requested
= false;
657 if (this->m_alloc_failed_since_retire
|| (this->m_shutting_down
) ||
658 this->m_invalidating
|| m_bytes_allocated
> high_water_bytes
) {
659 ldout(m_image_ctx
.cct
, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
660 << ", allocated > high_water="
661 << (m_bytes_allocated
> high_water_bytes
)
663 retire_entries((this->m_shutting_down
|| this->m_invalidating
||
664 m_bytes_allocated
> aggressive_high_water_bytes
)
665 ? MAX_ALLOC_PER_TRANSACTION
: MAX_FREE_PER_TRANSACTION
);
667 this->dispatch_deferred_writes();
668 this->process_writeback_dirty_entries();
670 std::lock_guard
locker(m_lock
);
671 wake_up_requested
= this->m_wake_up_requested
;
673 } while (wake_up_requested
&& --max_iterations
> 0);
676 std::lock_guard
locker(m_lock
);
677 this->m_wake_up_scheduled
= false;
678 // Reschedule if it's still requested
679 if (this->m_wake_up_requested
) {
686 * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
687 * that are eligible to be retired. Returns true if anything was
691 template <typename I
>
692 bool WriteLog
<I
>::retire_entries(const unsigned long int frees_per_tx
) {
693 CephContext
*cct
= m_image_ctx
.cct
;
694 GenericLogEntriesVector retiring_entries
;
695 uint64_t initial_first_valid_entry
;
696 uint64_t first_valid_entry
;
698 std::lock_guard
retire_locker(this->m_log_retire_lock
);
699 ldout(cct
, 20) << "Look for entries to retire" << dendl
;
701 // Entry readers can't be added while we hold m_entry_reader_lock
702 RWLock::WLocker
entry_reader_locker(this->m_entry_reader_lock
);
703 std::lock_guard
locker(m_lock
);
704 initial_first_valid_entry
= m_first_valid_entry
;
705 first_valid_entry
= m_first_valid_entry
;
706 while (retiring_entries
.size() < frees_per_tx
&& !m_log_entries
.empty()) {
707 GenericLogEntriesVector retiring_subentries
;
708 uint64_t control_block_pos
= m_log_entries
.front()->log_entry_index
;
709 uint64_t data_length
= 0;
710 for (auto it
= m_log_entries
.begin(); it
!= m_log_entries
.end(); ++it
) {
711 if (this->can_retire_entry(*it
)) {
712 // log_entry_index is valid after appending to SSD
713 if ((*it
)->log_entry_index
!= control_block_pos
) {
714 ldout(cct
, 20) << "Old log_entry_index is " << control_block_pos
715 << ",New log_entry_index is "
716 << (*it
)->log_entry_index
717 << ",data length is " << data_length
<< dendl
;
718 ldout(cct
, 20) << "The log entry is " << *(*it
) << dendl
;
719 if ((*it
)->log_entry_index
< control_block_pos
) {
720 ceph_assert((*it
)->log_entry_index
==
721 (control_block_pos
+ data_length
+ MIN_WRITE_ALLOC_SSD_SIZE
) %
722 this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
);
724 ceph_assert((*it
)->log_entry_index
== control_block_pos
+
725 data_length
+ MIN_WRITE_ALLOC_SSD_SIZE
);
729 retiring_subentries
.push_back(*it
);
730 if ((*it
)->is_write_entry()) {
731 data_length
+= (*it
)->get_aligned_data_size();
735 retiring_subentries
.clear();
739 // SSD: retiring_subentries in a span
740 if (!retiring_subentries
.empty()) {
741 for (auto it
= retiring_subentries
.begin();
742 it
!= retiring_subentries
.end(); it
++) {
743 ceph_assert(m_log_entries
.front() == *it
);
744 m_log_entries
.pop_front();
745 if ((*it
)->write_bytes() > 0 || (*it
)->bytes_dirty() > 0) {
746 auto gen_write_entry
= static_pointer_cast
<GenericWriteLogEntry
>(*it
);
747 if (gen_write_entry
) {
748 this->m_blocks_to_log_entries
.remove_log_entry(gen_write_entry
);
753 ldout(cct
, 20) << "span with " << retiring_subentries
.size()
754 << " entries: control_block_pos=" << control_block_pos
755 << " data_length=" << data_length
757 retiring_entries
.insert(
758 retiring_entries
.end(), retiring_subentries
.begin(),
759 retiring_subentries
.end());
761 first_valid_entry
= control_block_pos
+ data_length
+
762 MIN_WRITE_ALLOC_SSD_SIZE
;
763 if (first_valid_entry
>= this->m_log_pool_size
) {
764 first_valid_entry
= first_valid_entry
% this->m_log_pool_size
+
765 DATA_RING_BUFFER_OFFSET
;
772 if (retiring_entries
.size()) {
773 ldout(cct
, 20) << "Retiring " << retiring_entries
.size() << " entries"
776 // Advance first valid entry and release buffers
777 uint64_t flushed_sync_gen
;
778 std::lock_guard
append_locker(this->m_log_append_lock
);
780 std::lock_guard
locker(m_lock
);
781 flushed_sync_gen
= this->m_flushed_sync_gen
;
784 ceph_assert(first_valid_entry
!= initial_first_valid_entry
);
785 auto new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
786 new_root
->flushed_sync_gen
= flushed_sync_gen
;
787 new_root
->first_valid_entry
= first_valid_entry
;
788 pool_root
.flushed_sync_gen
= flushed_sync_gen
;
789 pool_root
.first_valid_entry
= first_valid_entry
;
791 Context
*ctx
= new LambdaContext(
792 [this, first_valid_entry
, initial_first_valid_entry
,
793 retiring_entries
](int r
) {
794 uint64_t allocated_bytes
= 0;
795 uint64_t cached_bytes
= 0;
796 uint64_t former_log_pos
= 0;
797 for (auto &entry
: retiring_entries
) {
798 ceph_assert(entry
->log_entry_index
!= 0);
799 if (entry
->log_entry_index
!= former_log_pos
) {
800 // Space for control blocks
801 allocated_bytes
+= MIN_WRITE_ALLOC_SSD_SIZE
;
802 former_log_pos
= entry
->log_entry_index
;
804 if (entry
->is_write_entry()) {
805 cached_bytes
+= entry
->write_bytes();
806 // space for userdata
807 allocated_bytes
+= entry
->get_aligned_data_size();
811 std::lock_guard
locker(m_lock
);
812 m_first_valid_entry
= first_valid_entry
;
813 ceph_assert(m_first_valid_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
814 ceph_assert(this->m_bytes_allocated
>= allocated_bytes
);
815 this->m_bytes_allocated
-= allocated_bytes
;
816 ceph_assert(this->m_bytes_cached
>= cached_bytes
);
817 this->m_bytes_cached
-= cached_bytes
;
818 if (!m_cache_state
->empty
&& m_log_entries
.empty()) {
819 m_cache_state
->empty
= true;
820 this->update_image_cache_state();
823 ldout(m_image_ctx
.cct
, 20)
824 << "Finished root update: initial_first_valid_entry="
825 << initial_first_valid_entry
<< ", m_first_valid_entry="
826 << m_first_valid_entry
<< ", release space = "
827 << allocated_bytes
<< ", m_bytes_allocated="
828 << m_bytes_allocated
<< ", release cached space="
829 << cached_bytes
<< ", m_bytes_cached="
830 << this->m_bytes_cached
<< dendl
;
832 this->m_alloc_failed_since_retire
= false;
836 this->dispatch_deferred_writes();
837 this->process_writeback_dirty_entries();
838 m_async_update_superblock
--;
839 this->m_async_op_tracker
.finish_op();
842 std::lock_guard
locker(m_lock
);
843 schedule_update_root(new_root
, ctx
);
845 ldout(cct
, 20) << "Nothing to retire" << dendl
;
851 template <typename I
>
852 void WriteLog
<I
>::append_ops(GenericLogOperations
&ops
, Context
*ctx
,
853 uint64_t* new_first_free_entry
) {
854 GenericLogEntriesVector log_entries
;
855 CephContext
*cct
= m_image_ctx
.cct
;
856 uint64_t span_payload_len
= 0;
857 uint64_t bytes_to_free
= 0;
858 ldout(cct
, 20) << "Appending " << ops
.size() << " log entries." << dendl
;
860 *new_first_free_entry
= pool_root
.first_free_entry
;
861 AioTransContext
* aio
= new AioTransContext(cct
, ctx
);
863 utime_t now
= ceph_clock_now();
864 for (auto &operation
: ops
) {
865 operation
->log_append_start_time
= now
;
866 auto log_entry
= operation
->get_log_entry();
868 if (log_entries
.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES
||
869 span_payload_len
>= SPAN_MAX_DATA_LEN
) {
870 if (log_entries
.size() > 1) {
871 bytes_to_free
+= (log_entries
.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE
;
873 write_log_entries(log_entries
, aio
, new_first_free_entry
);
875 span_payload_len
= 0;
877 log_entries
.push_back(log_entry
);
878 span_payload_len
+= log_entry
->write_bytes();
880 if (!span_payload_len
|| !log_entries
.empty()) {
881 if (log_entries
.size() > 1) {
882 bytes_to_free
+= (log_entries
.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE
;
884 write_log_entries(log_entries
, aio
, new_first_free_entry
);
888 std::lock_guard
locker1(m_lock
);
889 m_first_free_entry
= *new_first_free_entry
;
890 m_bytes_allocated
-= bytes_to_free
;
893 bdev
->aio_submit(&aio
->ioc
);
896 template <typename I
>
897 void WriteLog
<I
>::write_log_entries(GenericLogEntriesVector log_entries
,
898 AioTransContext
*aio
, uint64_t *pos
) {
899 CephContext
*cct
= m_image_ctx
.cct
;
900 ldout(m_image_ctx
.cct
, 20) << "pos=" << *pos
<< dendl
;
901 ceph_assert(*pos
>= DATA_RING_BUFFER_OFFSET
&&
902 *pos
< this->m_log_pool_size
&&
903 *pos
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
905 // The first block is for log entries
906 uint64_t control_block_pos
= *pos
;
907 *pos
+= MIN_WRITE_ALLOC_SSD_SIZE
;
908 if (*pos
== this->m_log_pool_size
) {
909 *pos
= DATA_RING_BUFFER_OFFSET
;
912 std::vector
<WriteLogCacheEntry
> persist_log_entries
;
914 for (auto &log_entry
: log_entries
) {
915 log_entry
->log_entry_index
= control_block_pos
;
916 // Append data buffer for write operations
917 if (log_entry
->is_write_entry()) {
918 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entry
);
919 auto cache_bl
= write_entry
->get_cache_bl();
920 auto align_size
= write_entry
->get_aligned_data_size();
921 data_bl
.append(cache_bl
);
922 data_bl
.append_zero(align_size
- cache_bl
.length());
924 write_entry
->ram_entry
.write_data_pos
= *pos
;
926 if (*pos
>= this->m_log_pool_size
) {
927 *pos
= *pos
% this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
;
930 // push_back _after_ setting write_data_pos
931 persist_log_entries
.push_back(log_entry
->ram_entry
);
936 encode(persist_log_entries
, bl
);
937 ceph_assert(bl
.length() <= MIN_WRITE_ALLOC_SSD_SIZE
);
938 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
940 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
941 if (control_block_pos
+ bl
.length() > this->m_log_pool_size
) {
942 //exceeds border, need to split
943 uint64_t size
= bl
.length();
945 bl
.splice(0, this->m_log_pool_size
- control_block_pos
, &bl1
);
946 ceph_assert(bl
.length() == (size
- bl1
.length()));
948 ldout(cct
, 20) << "write " << control_block_pos
<< "~"
949 << size
<< " spans boundary, split into "
950 << control_block_pos
<< "~" << bl1
.length()
951 << " and " << DATA_RING_BUFFER_OFFSET
<< "~"
952 << bl
.length() << dendl
;
953 bdev
->aio_write(control_block_pos
, bl1
, &aio
->ioc
, false,
955 bdev
->aio_write(DATA_RING_BUFFER_OFFSET
, bl
, &aio
->ioc
, false,
958 ldout(cct
, 20) << "write " << control_block_pos
<< "~"
959 << bl
.length() << dendl
;
960 bdev
->aio_write(control_block_pos
, bl
, &aio
->ioc
, false,
965 template <typename I
>
966 void WriteLog
<I
>::schedule_update_root(
967 std::shared_ptr
<WriteLogPoolRoot
> root
, Context
*ctx
) {
968 CephContext
*cct
= m_image_ctx
.cct
;
969 ldout(cct
, 15) << "New root: pool_size=" << root
->pool_size
970 << " first_valid_entry=" << root
->first_valid_entry
971 << " first_free_entry=" << root
->first_free_entry
972 << " flushed_sync_gen=" << root
->flushed_sync_gen
974 ceph_assert(is_valid_pool_root(*root
));
978 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
979 need_finisher
= m_poolroot_to_update
.empty() && !m_updating_pool_root
;
980 std::shared_ptr
<WriteLogPoolRootUpdate
> entry
=
981 std::make_shared
<WriteLogPoolRootUpdate
>(root
, ctx
);
982 this->m_async_update_superblock
++;
983 this->m_async_op_tracker
.start_op();
984 m_poolroot_to_update
.emplace_back(entry
);
987 enlist_op_update_root();
991 template <typename I
>
992 void WriteLog
<I
>::enlist_op_update_root() {
993 Context
*append_ctx
= new LambdaContext([this](int r
) {
994 update_root_scheduled_ops();
996 this->m_work_queue
.queue(append_ctx
);
999 template <typename I
>
1000 void WriteLog
<I
>::update_root_scheduled_ops() {
1001 ldout(m_image_ctx
.cct
, 20) << dendl
;
1003 std::shared_ptr
<WriteLogPoolRoot
> root
;
1004 WriteLogPoolRootUpdateList root_updates
;
1005 Context
*ctx
= nullptr;
1007 std::lock_guard
locker(m_lock
);
1008 if (m_updating_pool_root
) {
1009 /* Another thread is appending */
1010 ldout(m_image_ctx
.cct
, 15) << "Another thread is updating pool root"
1014 if (m_poolroot_to_update
.size()) {
1015 m_updating_pool_root
= true;
1016 root_updates
.swap(m_poolroot_to_update
);
1019 ceph_assert(!root_updates
.empty());
1020 ldout(m_image_ctx
.cct
, 15) << "Update root number: " << root_updates
.size()
1022 // We just update the last one, and call all the completions.
1023 auto entry
= root_updates
.back();
1026 ctx
= new LambdaContext([this, updates
= std::move(root_updates
)](int r
) {
1027 ldout(m_image_ctx
.cct
, 15) << "Start to callback." << dendl
;
1028 for (auto it
= updates
.begin(); it
!= updates
.end(); it
++) {
1029 Context
*it_ctx
= (*it
)->ctx
;
1030 it_ctx
->complete(r
);
1033 Context
*append_ctx
= new LambdaContext([this, ctx
](int r
) {
1034 ldout(m_image_ctx
.cct
, 15) << "Finish the update of pool root." << dendl
;
1035 bool need_finisher
= false;
1038 std::lock_guard
locker(m_lock
);
1039 m_updating_pool_root
= false;
1040 need_finisher
= !m_poolroot_to_update
.empty();
1042 if (need_finisher
) {
1043 enlist_op_update_root();
1047 AioTransContext
* aio
= new AioTransContext(m_image_ctx
.cct
, append_ctx
);
1048 update_pool_root(root
, aio
);
1051 template <typename I
>
1052 void WriteLog
<I
>::update_pool_root(std::shared_ptr
<WriteLogPoolRoot
> root
,
1053 AioTransContext
*aio
) {
1055 SuperBlock superblock
;
1056 superblock
.root
= *root
;
1057 encode(superblock
, bl
);
1058 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
1059 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
1060 bdev
->aio_write(0, bl
, &aio
->ioc
, false, WRITE_LIFE_NOT_SET
);
1061 bdev
->aio_submit(&aio
->ioc
);
1064 template <typename I
>
1065 int WriteLog
<I
>::update_pool_root_sync(
1066 std::shared_ptr
<WriteLogPoolRoot
> root
) {
1068 SuperBlock superblock
;
1069 superblock
.root
= *root
;
1070 encode(superblock
, bl
);
1071 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
1072 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
1073 return bdev
->write(0, bl
, false);
1076 template <typename I
>
1077 void WriteLog
<I
>::aio_read_data_block(std::shared_ptr
<GenericWriteLogEntry
> log_entry
,
1078 bufferlist
*bl
, Context
*ctx
) {
1079 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> log_entries
= {std::move(log_entry
)};
1080 std::vector
<bufferlist
*> bls
{bl
};
1081 aio_read_data_blocks(log_entries
, bls
, ctx
);
1084 template <typename I
>
1085 void WriteLog
<I
>::aio_read_data_blocks(
1086 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries
,
1087 std::vector
<bufferlist
*> &bls
, Context
*ctx
) {
1088 ceph_assert(log_entries
.size() == bls
.size());
1090 //get the valid part
1091 Context
*read_ctx
= new LambdaContext(
1092 [log_entries
, bls
, ctx
](int r
) {
1093 for (unsigned int i
= 0; i
< log_entries
.size(); i
++) {
1094 bufferlist valid_data_bl
;
1095 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entries
[i
]);
1096 auto length
= write_entry
->ram_entry
.is_write() ? write_entry
->ram_entry
.write_bytes
1097 : write_entry
->ram_entry
.ws_datalen
;
1099 valid_data_bl
.substr_of(*bls
[i
], 0, length
);
1101 bls
[i
]->append(valid_data_bl
);
1102 write_entry
->dec_bl_refs();
1107 CephContext
*cct
= m_image_ctx
.cct
;
1108 AioTransContext
*aio
= new AioTransContext(cct
, read_ctx
);
1109 for (unsigned int i
= 0; i
< log_entries
.size(); i
++) {
1110 WriteLogCacheEntry
*log_entry
= &log_entries
[i
]->ram_entry
;
1112 ceph_assert(log_entry
->is_write() || log_entry
->is_writesame());
1113 uint64_t len
= log_entry
->is_write() ? log_entry
->write_bytes
:
1114 log_entry
->ws_datalen
;
1115 uint64_t align_len
= round_up_to(len
, MIN_WRITE_ALLOC_SSD_SIZE
);
1117 ldout(cct
, 20) << "entry i=" << i
<< " " << log_entry
->write_data_pos
1118 << "~" << len
<< dendl
;
1119 ceph_assert(log_entry
->write_data_pos
>= DATA_RING_BUFFER_OFFSET
&&
1120 log_entry
->write_data_pos
< pool_root
.pool_size
);
1121 ceph_assert(align_len
);
1122 if (log_entry
->write_data_pos
+ align_len
> pool_root
.pool_size
) {
1123 // spans boundary, need to split
1124 uint64_t len1
= pool_root
.pool_size
- log_entry
->write_data_pos
;
1125 uint64_t len2
= align_len
- len1
;
1127 ldout(cct
, 20) << "read " << log_entry
->write_data_pos
<< "~"
1128 << align_len
<< " spans boundary, split into "
1129 << log_entry
->write_data_pos
<< "~" << len1
1130 << " and " << DATA_RING_BUFFER_OFFSET
<< "~"
1132 bdev
->aio_read(log_entry
->write_data_pos
, len1
, bls
[i
], &aio
->ioc
);
1133 bdev
->aio_read(DATA_RING_BUFFER_OFFSET
, len2
, bls
[i
], &aio
->ioc
);
1135 ldout(cct
, 20) << "read " << log_entry
->write_data_pos
<< "~"
1136 << align_len
<< dendl
;
1137 bdev
->aio_read(log_entry
->write_data_pos
, align_len
, bls
[i
], &aio
->ioc
);
1140 bdev
->aio_submit(&aio
->ioc
);
1143 template <typename I
>
1144 void WriteLog
<I
>::complete_user_request(Context
*&user_req
, int r
) {
1145 m_image_ctx
.op_work_queue
->queue(user_req
, r
);
1150 } // namespace cache
1151 } // namespace librbd
1153 template class librbd::cache::pwl::ssd::WriteLog
<librbd::ImageCtx
>;