1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/WorkQueue.h"
13 #include "common/Timer.h"
14 #include "common/perf_counters.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/asio/ContextWQ.h"
17 #include "librbd/cache/pwl/ImageCacheState.h"
18 #include "librbd/cache/pwl/LogEntry.h"
23 #define dout_subsys ceph_subsys_rbd_pwl
25 #define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \
26 << this << " " << __func__ << ": "
34 using namespace librbd::cache::pwl
;
36 static bool is_valid_pool_root(const WriteLogPoolRoot
& root
) {
37 return root
.pool_size
% MIN_WRITE_ALLOC_SSD_SIZE
== 0 &&
38 root
.first_valid_entry
>= DATA_RING_BUFFER_OFFSET
&&
39 root
.first_valid_entry
< root
.pool_size
&&
40 root
.first_valid_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0 &&
41 root
.first_free_entry
>= DATA_RING_BUFFER_OFFSET
&&
42 root
.first_free_entry
< root
.pool_size
&&
43 root
.first_free_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0;
47 Builder
<AbstractWriteLog
<I
>>* WriteLog
<I
>::create_builder() {
48 m_builderobj
= new Builder
<This
>();
53 WriteLog
<I
>::WriteLog(
54 I
&image_ctx
, librbd::cache::pwl::ImageCacheState
<I
>* cache_state
,
55 cache::ImageWritebackInterface
& image_writeback
,
56 plugin::Api
<I
>& plugin_api
)
57 : AbstractWriteLog
<I
>(image_ctx
, cache_state
, create_builder(),
58 image_writeback
, plugin_api
)
63 WriteLog
<I
>::~WriteLog() {
68 void WriteLog
<I
>::collect_read_extents(
69 uint64_t read_buffer_offset
, LogMapEntry
<GenericWriteLogEntry
> map_entry
,
70 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries_to_read
,
71 std::vector
<bufferlist
*> &bls_to_read
,
72 uint64_t entry_hit_length
, Extent hit_extent
,
73 pwl::C_ReadRequest
*read_ctx
) {
74 // Make a bl for this hit extent. This will add references to the
75 // write_entry->cache_bl */
76 ldout(m_image_ctx
.cct
, 5) << dendl
;
77 auto write_entry
= std::static_pointer_cast
<WriteLogEntry
>(map_entry
.log_entry
);
79 write_entry
->copy_cache_bl(&hit_bl
);
80 bool writesame
= write_entry
->is_writesame_entry();
81 auto hit_extent_buf
= std::make_shared
<ImageExtentBuf
>(
82 hit_extent
, hit_bl
, true, read_buffer_offset
, writesame
);
83 read_ctx
->read_extents
.push_back(hit_extent_buf
);
85 if (!hit_bl
.length()) {
86 ldout(m_image_ctx
.cct
, 5) << "didn't hit RAM" << dendl
;
87 auto read_extent
= read_ctx
->read_extents
.back();
88 write_entry
->inc_bl_refs();
89 log_entries_to_read
.push_back(std::move(write_entry
));
90 bls_to_read
.push_back(&read_extent
->m_bl
);
95 void WriteLog
<I
>::complete_read(
96 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries_to_read
,
97 std::vector
<bufferlist
*> &bls_to_read
,
99 if (!log_entries_to_read
.empty()) {
100 aio_read_data_blocks(log_entries_to_read
, bls_to_read
, ctx
);
106 template <typename I
>
107 int WriteLog
<I
>::create_and_open_bdev() {
108 CephContext
*cct
= m_image_ctx
.cct
;
110 bdev
= BlockDevice::create(cct
, this->m_log_pool_name
, aio_cache_cb
,
111 nullptr, nullptr, nullptr);
112 int r
= bdev
->open(this->m_log_pool_name
);
114 lderr(cct
) << "failed to open bdev" << dendl
;
119 ceph_assert(this->m_log_pool_size
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
120 if (bdev
->get_size() != this->m_log_pool_size
) {
121 lderr(cct
) << "size mismatch: bdev size " << bdev
->get_size()
122 << " (block size " << bdev
->get_block_size()
123 << ") != pool size " << this->m_log_pool_size
<< dendl
;
132 template <typename I
>
133 bool WriteLog
<I
>::initialize_pool(Context
*on_finish
,
134 pwl::DeferredContexts
&later
) {
136 CephContext
*cct
= m_image_ctx
.cct
;
138 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
139 if (access(this->m_log_pool_name
.c_str(), F_OK
) != 0) {
140 int fd
= ::open(this->m_log_pool_name
.c_str(), O_RDWR
|O_CREAT
, 0644);
143 if (truncate(this->m_log_pool_name
.c_str(),
144 this->m_log_pool_size
) != 0) {
152 m_cache_state
->present
= false;
153 m_cache_state
->clean
= true;
154 m_cache_state
->empty
= true;
155 /* TODO: filter/replace errnos that are meaningless to the caller */
156 on_finish
->complete(-errno
);
160 r
= create_and_open_bdev();
162 on_finish
->complete(r
);
165 m_cache_state
->present
= true;
166 m_cache_state
->clean
= true;
167 m_cache_state
->empty
= true;
168 /* new pool, calculate and store metadata */
170 /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free.
171 * In this way, when all ring buffer spaces are allocated,
172 * m_first_free_entry and m_first_valid_entry will not be equal.
173 * Equal only means the cache is empty. */
174 this->m_bytes_allocated_cap
= this->m_log_pool_size
-
175 DATA_RING_BUFFER_OFFSET
- MIN_WRITE_ALLOC_SSD_SIZE
;
177 m_first_free_entry
= DATA_RING_BUFFER_OFFSET
;
178 m_first_valid_entry
= DATA_RING_BUFFER_OFFSET
;
180 auto new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
181 new_root
->layout_version
= SSD_LAYOUT_VERSION
;
182 new_root
->pool_size
= this->m_log_pool_size
;
183 new_root
->flushed_sync_gen
= this->m_flushed_sync_gen
;
184 new_root
->block_size
= MIN_WRITE_ALLOC_SSD_SIZE
;
185 new_root
->first_free_entry
= m_first_free_entry
;
186 new_root
->first_valid_entry
= m_first_valid_entry
;
187 new_root
->num_log_entries
= 0;
188 pool_root
= *new_root
;
190 r
= update_pool_root_sync(new_root
);
192 lderr(cct
) << "failed to initialize pool ("
193 << this->m_log_pool_name
<< ")" << dendl
;
196 on_finish
->complete(r
);
200 m_cache_state
->present
= true;
201 r
= create_and_open_bdev();
203 on_finish
->complete(r
);
208 SuperBlock superblock
;
209 ::IOContext
ioctx(cct
, nullptr);
210 r
= bdev
->read(0, MIN_WRITE_ALLOC_SSD_SIZE
, &bl
, &ioctx
, false);
212 lderr(cct
) << "read ssd cache superblock failed " << dendl
;
215 auto p
= bl
.cbegin();
216 decode(superblock
, p
);
217 pool_root
= superblock
.root
;
218 ldout(cct
, 1) << "Decoded root: pool_size=" << pool_root
.pool_size
219 << " first_valid_entry=" << pool_root
.first_valid_entry
220 << " first_free_entry=" << pool_root
.first_free_entry
221 << " flushed_sync_gen=" << pool_root
.flushed_sync_gen
223 ceph_assert(is_valid_pool_root(pool_root
));
224 if (pool_root
.layout_version
!= SSD_LAYOUT_VERSION
) {
225 lderr(cct
) << "pool layout version is "
226 << pool_root
.layout_version
227 << " expected " << SSD_LAYOUT_VERSION
231 if (pool_root
.block_size
!= MIN_WRITE_ALLOC_SSD_SIZE
) {
232 lderr(cct
) << "pool block size is " << pool_root
.block_size
233 << " expected " << MIN_WRITE_ALLOC_SSD_SIZE
238 this->m_log_pool_size
= pool_root
.pool_size
;
239 this->m_flushed_sync_gen
= pool_root
.flushed_sync_gen
;
240 this->m_first_valid_entry
= pool_root
.first_valid_entry
;
241 this->m_first_free_entry
= pool_root
.first_free_entry
;
242 this->m_bytes_allocated_cap
= this->m_log_pool_size
-
243 DATA_RING_BUFFER_OFFSET
-
244 MIN_WRITE_ALLOC_SSD_SIZE
;
246 load_existing_entries(later
);
247 m_cache_state
->clean
= this->m_dirty_log_entries
.empty();
248 m_cache_state
->empty
= m_log_entries
.empty();
255 on_finish
->complete(-EINVAL
);
259 template <typename I
>
260 void WriteLog
<I
>::remove_pool_file() {
265 ldout(m_image_ctx
.cct
, 5) << "block device is closed" << dendl
;
267 if (m_cache_state
->clean
) {
268 ldout(m_image_ctx
.cct
, 5) << "Removing empty pool file: "
269 << this->m_log_pool_name
<< dendl
;
270 if (remove(this->m_log_pool_name
.c_str()) != 0) {
271 lderr(m_image_ctx
.cct
) << "failed to remove empty pool \""
272 << this->m_log_pool_name
<< "\": " << dendl
;
274 m_cache_state
->clean
= true;
275 m_cache_state
->empty
= true;
276 m_cache_state
->present
= false;
279 ldout(m_image_ctx
.cct
, 5) << "Not removing pool file: "
280 << this->m_log_pool_name
<< dendl
;
284 template <typename I
>
285 void WriteLog
<I
>::load_existing_entries(pwl::DeferredContexts
&later
) {
286 CephContext
*cct
= m_image_ctx
.cct
;
287 std::map
<uint64_t, std::shared_ptr
<SyncPointLogEntry
>> sync_point_entries
;
288 std::map
<uint64_t, bool> missing_sync_points
;
290 // Iterate through the log_entries and append all the write_bytes
291 // of each entry to fetch the pos of next 4k of log_entries. Iterate
292 // through the log entries and append them to the in-memory vector
293 for (uint64_t next_log_pos
= this->m_first_valid_entry
;
294 next_log_pos
!= this->m_first_free_entry
; ) {
295 // read the entries from SSD cache and decode
296 bufferlist bl_entries
;
297 ::IOContext
ioctx_entry(cct
, nullptr);
298 bdev
->read(next_log_pos
, MIN_WRITE_ALLOC_SSD_SIZE
, &bl_entries
,
299 &ioctx_entry
, false);
300 std::vector
<WriteLogCacheEntry
> ssd_log_entries
;
301 auto pl
= bl_entries
.cbegin();
302 decode(ssd_log_entries
, pl
);
303 ldout(cct
, 5) << "decoded ssd log entries" << dendl
;
304 uint64_t curr_log_pos
= next_log_pos
;
305 std::shared_ptr
<GenericLogEntry
> log_entry
= nullptr;
307 for (auto it
= ssd_log_entries
.begin(); it
!= ssd_log_entries
.end(); ++it
) {
308 this->update_entries(&log_entry
, &*it
, missing_sync_points
,
309 sync_point_entries
, curr_log_pos
);
310 log_entry
->ram_entry
= *it
;
311 log_entry
->log_entry_index
= curr_log_pos
;
312 log_entry
->completed
= true;
313 m_log_entries
.push_back(log_entry
);
314 next_log_pos
+= round_up_to(it
->write_bytes
, MIN_WRITE_ALLOC_SSD_SIZE
);
316 // along with the write_bytes, add control block size too
317 next_log_pos
+= MIN_WRITE_ALLOC_SSD_SIZE
;
318 if (next_log_pos
>= this->m_log_pool_size
) {
319 next_log_pos
= next_log_pos
% this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
;
322 this->update_sync_points(missing_sync_points
, sync_point_entries
, later
);
323 if (m_first_valid_entry
> m_first_free_entry
) {
324 m_bytes_allocated
= this->m_log_pool_size
- m_first_valid_entry
+
325 m_first_free_entry
- DATA_RING_BUFFER_OFFSET
;
327 m_bytes_allocated
= m_first_free_entry
- m_first_valid_entry
;
331 // For SSD we don't calc m_bytes_allocated in this
332 template <typename I
>
333 void WriteLog
<I
>::inc_allocated_cached_bytes(
334 std::shared_ptr
<pwl::GenericLogEntry
> log_entry
) {
335 if (log_entry
->is_write_entry()) {
336 this->m_bytes_cached
+= log_entry
->write_bytes();
340 template <typename I
>
341 bool WriteLog
<I
>::alloc_resources(C_BlockIORequestT
*req
) {
342 bool alloc_succeeds
= true;
343 uint64_t bytes_allocated
= 0;
344 uint64_t bytes_cached
= 0;
345 uint64_t bytes_dirtied
= 0;
346 uint64_t num_lanes
= 0;
347 uint64_t num_unpublished_reserves
= 0;
348 uint64_t num_log_entries
= 0;
350 // Setup buffer, and get all the number of required resources
351 req
->setup_buffer_resources(&bytes_cached
, &bytes_dirtied
, &bytes_allocated
,
352 &num_lanes
, &num_log_entries
,
353 &num_unpublished_reserves
);
355 ceph_assert(!num_lanes
);
356 if (num_log_entries
) {
357 bytes_allocated
+= num_log_entries
* MIN_WRITE_ALLOC_SSD_SIZE
;
360 ceph_assert(!num_unpublished_reserves
);
362 alloc_succeeds
= this->check_allocation(req
, bytes_cached
, bytes_dirtied
,
363 bytes_allocated
, num_lanes
,
365 num_unpublished_reserves
);
366 req
->set_allocated(alloc_succeeds
);
367 return alloc_succeeds
;
370 template <typename I
>
371 bool WriteLog
<I
>::has_sync_point_logs(GenericLogOperations
&ops
) {
372 for (auto &op
: ops
) {
373 if (op
->get_log_entry()->is_sync_point()) {
382 void WriteLog
<I
>::enlist_op_appender() {
383 this->m_async_append_ops
++;
384 this->m_async_op_tracker
.start_op();
385 Context
*append_ctx
= new LambdaContext([this](int r
) {
386 append_scheduled_ops();
388 this->m_work_queue
.queue(append_ctx
);
392 * Takes custody of ops. They'll all get their log entries appended,
393 * and have their on_write_persist contexts completed once they and
394 * all prior log entries are persisted everywhere.
397 void WriteLog
<I
>::schedule_append_ops(GenericLogOperations
&ops
, C_BlockIORequestT
*req
) {
398 bool need_finisher
= false;
399 GenericLogOperationsVector appending
;
401 std::copy(std::begin(ops
), std::end(ops
), std::back_inserter(appending
));
403 std::lock_guard
locker(m_lock
);
405 bool persist_on_flush
= this->get_persist_on_flush();
406 need_finisher
= !this->m_appending
&&
407 ((this->m_ops_to_append
.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES
) ||
410 // Only flush logs into SSD when there is internal/external flush request
411 if (!need_finisher
) {
412 need_finisher
= has_sync_point_logs(ops
);
414 this->m_ops_to_append
.splice(this->m_ops_to_append
.end(), ops
);
416 // To preserve the order of overlapping IOs, release_cell() may be
417 // called only after the ops are added to m_ops_to_append.
418 // As soon as m_lock is released, the appended ops can be picked up
419 // by append_scheduled_ops() in another thread and req can be freed.
420 if (req
!= nullptr) {
421 if (persist_on_flush
) {
422 req
->complete_user_request(0);
429 this->enlist_op_appender();
432 for (auto &op
: appending
) {
437 template <typename I
>
438 void WriteLog
<I
>::setup_schedule_append(pwl::GenericLogOperationsVector
&ops
,
440 C_BlockIORequestT
*req
) {
441 this->schedule_append(ops
, req
);
444 template <typename I
>
445 void WriteLog
<I
>::append_scheduled_ops(void) {
446 GenericLogOperations ops
;
447 ldout(m_image_ctx
.cct
, 20) << dendl
;
449 bool ops_remain
= false; // unused, no-op variable for SSD
450 bool appending
= false; // unused, no-op variable for SSD
451 this->append_scheduled(ops
, ops_remain
, appending
);
454 alloc_op_log_entries(ops
);
455 append_op_log_entries(ops
);
457 this->m_async_append_ops
--;
458 this->m_async_op_tracker
.finish_op();
463 * Write and persist the (already allocated) write log entries and
464 * data buffer allocations for a set of ops. The data buffer for each
465 * of these must already have been persisted to its reserved area.
467 template <typename I
>
468 void WriteLog
<I
>::append_op_log_entries(GenericLogOperations
&ops
) {
469 ceph_assert(!ops
.empty());
470 ldout(m_image_ctx
.cct
, 20) << dendl
;
471 Context
*ctx
= new LambdaContext([this, ops
](int r
) {
473 ldout(m_image_ctx
.cct
, 20) << "Finished root update " << dendl
;
475 auto captured_ops
= std::move(ops
);
476 this->complete_op_log_entries(std::move(captured_ops
), r
);
478 bool need_finisher
= false;
480 std::lock_guard
locker1(m_lock
);
481 bool persist_on_flush
= this->get_persist_on_flush();
482 need_finisher
= ((this->m_ops_to_append
.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES
) ||
485 if (!need_finisher
) {
486 need_finisher
= has_sync_point_logs(this->m_ops_to_append
);
491 this->enlist_op_appender();
493 this->m_async_update_superblock
--;
494 this->m_async_op_tracker
.finish_op();
496 uint64_t *new_first_free_entry
= new(uint64_t);
497 Context
*append_ctx
= new LambdaContext(
498 [this, new_first_free_entry
, ops
, ctx
](int r
) {
499 std::shared_ptr
<WriteLogPoolRoot
> new_root
;
501 ldout(m_image_ctx
.cct
, 20) << "Finished appending at "
502 << *new_first_free_entry
<< dendl
;
503 utime_t now
= ceph_clock_now();
504 for (auto &operation
: ops
) {
505 operation
->log_append_comp_time
= now
;
508 std::lock_guard
locker(this->m_log_append_lock
);
509 std::lock_guard
locker1(m_lock
);
510 assert(this->m_appending
);
511 this->m_appending
= false;
512 new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
513 pool_root
.first_free_entry
= *new_first_free_entry
;
514 new_root
->first_free_entry
= *new_first_free_entry
;
515 delete new_first_free_entry
;
516 schedule_update_root(new_root
, ctx
);
518 this->m_async_append_ops
--;
519 this->m_async_op_tracker
.finish_op();
521 // Append logs and update first_free_update
522 append_ops(ops
, append_ctx
, new_first_free_entry
);
525 this->dispatch_deferred_writes();
529 template <typename I
>
530 void WriteLog
<I
>::release_ram(std::shared_ptr
<GenericLogEntry
> log_entry
) {
531 log_entry
->remove_cache_bl();
534 template <typename I
>
535 void WriteLog
<I
>::alloc_op_log_entries(GenericLogOperations
&ops
) {
536 std::lock_guard
locker(m_lock
);
538 for (auto &operation
: ops
) {
539 auto &log_entry
= operation
->get_log_entry();
540 log_entry
->ram_entry
.entry_valid
= 1;
541 m_log_entries
.push_back(log_entry
);
542 ldout(m_image_ctx
.cct
, 20) << "operation=[" << *operation
<< "]" << dendl
;
546 template <typename I
>
547 void WriteLog
<I
>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush
,
548 DeferredContexts
&post_unlock
,
549 bool has_write_entry
) {
550 // snapshot so we behave consistently
551 bool invalidating
= this->m_invalidating
;
553 if (invalidating
|| !has_write_entry
) {
554 for (auto &log_entry
: entries_to_flush
) {
555 GuardedRequestFunctionContext
*guarded_ctx
=
556 new GuardedRequestFunctionContext([this, log_entry
, invalidating
]
557 (GuardedRequestFunctionContext
&guard_ctx
) {
558 log_entry
->m_cell
= guard_ctx
.cell
;
559 Context
*ctx
= this->construct_flush_entry(log_entry
, invalidating
);
562 ctx
= new LambdaContext([this, log_entry
, ctx
](int r
) {
563 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
564 [this, log_entry
, ctx
](int r
) {
565 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
566 << " " << *log_entry
<< dendl
;
567 log_entry
->writeback(this->m_image_writeback
, ctx
);
573 this->detain_flush_guard_request(log_entry
, guarded_ctx
);
576 int count
= entries_to_flush
.size();
577 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> write_entries
;
578 std::vector
<bufferlist
*> read_bls
;
580 write_entries
.reserve(count
);
581 read_bls
.reserve(count
);
583 for (auto &log_entry
: entries_to_flush
) {
584 if (log_entry
->is_write_entry()) {
585 bufferlist
*bl
= new bufferlist
;
586 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entry
);
587 write_entry
->inc_bl_refs();
588 write_entries
.push_back(write_entry
);
589 read_bls
.push_back(bl
);
593 Context
*ctx
= new LambdaContext(
594 [this, entries_to_flush
, read_bls
](int r
) {
596 GuardedRequestFunctionContext
*guarded_ctx
= nullptr;
598 for (auto &log_entry
: entries_to_flush
) {
599 if (log_entry
->is_write_entry()) {
600 bufferlist captured_entry_bl
;
601 captured_entry_bl
.claim_append(*read_bls
[i
]);
602 delete read_bls
[i
++];
604 guarded_ctx
= new GuardedRequestFunctionContext([this, log_entry
, captured_entry_bl
]
605 (GuardedRequestFunctionContext
&guard_ctx
) {
606 log_entry
->m_cell
= guard_ctx
.cell
;
607 Context
*ctx
= this->construct_flush_entry(log_entry
, false);
609 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
610 [this, log_entry
, entry_bl
=std::move(captured_entry_bl
), ctx
](int r
) {
611 auto captured_entry_bl
= std::move(entry_bl
);
612 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
613 << " " << *log_entry
<< dendl
;
614 log_entry
->writeback_bl(this->m_image_writeback
, ctx
,
615 std::move(captured_entry_bl
));
619 guarded_ctx
= new GuardedRequestFunctionContext([this, log_entry
]
620 (GuardedRequestFunctionContext
&guard_ctx
) {
621 log_entry
->m_cell
= guard_ctx
.cell
;
622 Context
*ctx
= this->construct_flush_entry(log_entry
, false);
623 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
624 [this, log_entry
, ctx
](int r
) {
625 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
626 << " " << *log_entry
<< dendl
;
627 log_entry
->writeback(this->m_image_writeback
, ctx
);
631 this->detain_flush_guard_request(log_entry
, guarded_ctx
);
635 aio_read_data_blocks(write_entries
, read_bls
, ctx
);
639 template <typename I
>
640 void WriteLog
<I
>::process_work() {
641 CephContext
*cct
= m_image_ctx
.cct
;
642 int max_iterations
= 4;
643 bool wake_up_requested
= false;
644 uint64_t aggressive_high_water_bytes
=
645 this->m_bytes_allocated_cap
* AGGRESSIVE_RETIRE_HIGH_WATER
;
646 uint64_t high_water_bytes
= this->m_bytes_allocated_cap
* RETIRE_HIGH_WATER
;
648 ldout(cct
, 20) << dendl
;
652 std::lock_guard
locker(m_lock
);
653 this->m_wake_up_requested
= false;
655 if (this->m_alloc_failed_since_retire
|| (this->m_shutting_down
) ||
656 this->m_invalidating
|| m_bytes_allocated
> high_water_bytes
) {
657 ldout(m_image_ctx
.cct
, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
658 << ", allocated > high_water="
659 << (m_bytes_allocated
> high_water_bytes
)
661 retire_entries((this->m_shutting_down
|| this->m_invalidating
||
662 m_bytes_allocated
> aggressive_high_water_bytes
)
663 ? MAX_ALLOC_PER_TRANSACTION
: MAX_FREE_PER_TRANSACTION
);
665 this->dispatch_deferred_writes();
666 this->process_writeback_dirty_entries();
668 std::lock_guard
locker(m_lock
);
669 wake_up_requested
= this->m_wake_up_requested
;
671 } while (wake_up_requested
&& --max_iterations
> 0);
674 std::lock_guard
locker(m_lock
);
675 this->m_wake_up_scheduled
= false;
676 // Reschedule if it's still requested
677 if (this->m_wake_up_requested
) {
684 * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
685 * that are eligible to be retired. Returns true if anything was
689 template <typename I
>
690 bool WriteLog
<I
>::retire_entries(const unsigned long int frees_per_tx
) {
691 CephContext
*cct
= m_image_ctx
.cct
;
692 GenericLogEntriesVector retiring_entries
;
693 uint64_t initial_first_valid_entry
;
694 uint64_t first_valid_entry
;
696 std::lock_guard
retire_locker(this->m_log_retire_lock
);
697 ldout(cct
, 20) << "Look for entries to retire" << dendl
;
699 // Entry readers can't be added while we hold m_entry_reader_lock
700 RWLock::WLocker
entry_reader_locker(this->m_entry_reader_lock
);
701 std::lock_guard
locker(m_lock
);
702 initial_first_valid_entry
= m_first_valid_entry
;
703 first_valid_entry
= m_first_valid_entry
;
704 while (retiring_entries
.size() < frees_per_tx
&& !m_log_entries
.empty()) {
705 GenericLogEntriesVector retiring_subentries
;
706 uint64_t control_block_pos
= m_log_entries
.front()->log_entry_index
;
707 uint64_t data_length
= 0;
708 for (auto it
= m_log_entries
.begin(); it
!= m_log_entries
.end(); ++it
) {
709 if (this->can_retire_entry(*it
)) {
710 // log_entry_index is valid after appending to SSD
711 if ((*it
)->log_entry_index
!= control_block_pos
) {
712 ldout(cct
, 20) << "Old log_entry_index is " << control_block_pos
713 << ",New log_entry_index is "
714 << (*it
)->log_entry_index
715 << ",data length is " << data_length
<< dendl
;
716 ldout(cct
, 20) << "The log entry is " << *(*it
) << dendl
;
717 if ((*it
)->log_entry_index
< control_block_pos
) {
718 ceph_assert((*it
)->log_entry_index
==
719 (control_block_pos
+ data_length
+ MIN_WRITE_ALLOC_SSD_SIZE
) %
720 this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
);
722 ceph_assert((*it
)->log_entry_index
== control_block_pos
+
723 data_length
+ MIN_WRITE_ALLOC_SSD_SIZE
);
727 retiring_subentries
.push_back(*it
);
728 if ((*it
)->is_write_entry()) {
729 data_length
+= (*it
)->get_aligned_data_size();
733 retiring_subentries
.clear();
737 // SSD: retiring_subentries in a span
738 if (!retiring_subentries
.empty()) {
739 for (auto it
= retiring_subentries
.begin();
740 it
!= retiring_subentries
.end(); it
++) {
741 ceph_assert(m_log_entries
.front() == *it
);
742 m_log_entries
.pop_front();
743 if ((*it
)->write_bytes() > 0 || (*it
)->bytes_dirty() > 0) {
744 auto gen_write_entry
= static_pointer_cast
<GenericWriteLogEntry
>(*it
);
745 if (gen_write_entry
) {
746 this->m_blocks_to_log_entries
.remove_log_entry(gen_write_entry
);
751 ldout(cct
, 20) << "span with " << retiring_subentries
.size()
752 << " entries: control_block_pos=" << control_block_pos
753 << " data_length=" << data_length
755 retiring_entries
.insert(
756 retiring_entries
.end(), retiring_subentries
.begin(),
757 retiring_subentries
.end());
759 first_valid_entry
= control_block_pos
+ data_length
+
760 MIN_WRITE_ALLOC_SSD_SIZE
;
761 if (first_valid_entry
>= this->m_log_pool_size
) {
762 first_valid_entry
= first_valid_entry
% this->m_log_pool_size
+
763 DATA_RING_BUFFER_OFFSET
;
770 if (retiring_entries
.size()) {
771 ldout(cct
, 20) << "Retiring " << retiring_entries
.size() << " entries"
774 // Advance first valid entry and release buffers
775 uint64_t flushed_sync_gen
;
776 std::lock_guard
append_locker(this->m_log_append_lock
);
778 std::lock_guard
locker(m_lock
);
779 flushed_sync_gen
= this->m_flushed_sync_gen
;
782 ceph_assert(first_valid_entry
!= initial_first_valid_entry
);
783 auto new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
784 new_root
->flushed_sync_gen
= flushed_sync_gen
;
785 new_root
->first_valid_entry
= first_valid_entry
;
786 pool_root
.flushed_sync_gen
= flushed_sync_gen
;
787 pool_root
.first_valid_entry
= first_valid_entry
;
789 Context
*ctx
= new LambdaContext(
790 [this, first_valid_entry
, initial_first_valid_entry
,
791 retiring_entries
](int r
) {
792 uint64_t allocated_bytes
= 0;
793 uint64_t cached_bytes
= 0;
794 uint64_t former_log_pos
= 0;
795 for (auto &entry
: retiring_entries
) {
796 ceph_assert(entry
->log_entry_index
!= 0);
797 if (entry
->log_entry_index
!= former_log_pos
) {
798 // Space for control blocks
799 allocated_bytes
+= MIN_WRITE_ALLOC_SSD_SIZE
;
800 former_log_pos
= entry
->log_entry_index
;
802 if (entry
->is_write_entry()) {
803 cached_bytes
+= entry
->write_bytes();
804 // space for userdata
805 allocated_bytes
+= entry
->get_aligned_data_size();
809 std::lock_guard
locker(m_lock
);
810 m_first_valid_entry
= first_valid_entry
;
811 ceph_assert(m_first_valid_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
812 ceph_assert(this->m_bytes_allocated
>= allocated_bytes
);
813 this->m_bytes_allocated
-= allocated_bytes
;
814 ceph_assert(this->m_bytes_cached
>= cached_bytes
);
815 this->m_bytes_cached
-= cached_bytes
;
817 ldout(m_image_ctx
.cct
, 20)
818 << "Finished root update: initial_first_valid_entry="
819 << initial_first_valid_entry
<< ", m_first_valid_entry="
820 << m_first_valid_entry
<< ", release space = "
821 << allocated_bytes
<< ", m_bytes_allocated="
822 << m_bytes_allocated
<< ", release cached space="
823 << cached_bytes
<< ", m_bytes_cached="
824 << this->m_bytes_cached
<< dendl
;
826 this->m_alloc_failed_since_retire
= false;
830 this->dispatch_deferred_writes();
831 this->process_writeback_dirty_entries();
832 m_async_update_superblock
--;
833 this->m_async_op_tracker
.finish_op();
836 std::lock_guard
locker(m_lock
);
837 schedule_update_root(new_root
, ctx
);
839 ldout(cct
, 20) << "Nothing to retire" << dendl
;
845 template <typename I
>
846 void WriteLog
<I
>::append_ops(GenericLogOperations
&ops
, Context
*ctx
,
847 uint64_t* new_first_free_entry
) {
848 GenericLogEntriesVector log_entries
;
849 CephContext
*cct
= m_image_ctx
.cct
;
850 uint64_t span_payload_len
= 0;
851 uint64_t bytes_to_free
= 0;
852 ldout(cct
, 20) << "Appending " << ops
.size() << " log entries." << dendl
;
854 *new_first_free_entry
= pool_root
.first_free_entry
;
855 AioTransContext
* aio
= new AioTransContext(cct
, ctx
);
857 utime_t now
= ceph_clock_now();
858 for (auto &operation
: ops
) {
859 operation
->log_append_start_time
= now
;
860 auto log_entry
= operation
->get_log_entry();
862 if (log_entries
.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES
||
863 span_payload_len
>= SPAN_MAX_DATA_LEN
) {
864 if (log_entries
.size() > 1) {
865 bytes_to_free
+= (log_entries
.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE
;
867 write_log_entries(log_entries
, aio
, new_first_free_entry
);
869 span_payload_len
= 0;
871 log_entries
.push_back(log_entry
);
872 span_payload_len
+= log_entry
->write_bytes();
874 if (!span_payload_len
|| !log_entries
.empty()) {
875 if (log_entries
.size() > 1) {
876 bytes_to_free
+= (log_entries
.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE
;
878 write_log_entries(log_entries
, aio
, new_first_free_entry
);
882 std::lock_guard
locker1(m_lock
);
883 m_first_free_entry
= *new_first_free_entry
;
884 m_bytes_allocated
-= bytes_to_free
;
887 bdev
->aio_submit(&aio
->ioc
);
890 template <typename I
>
891 void WriteLog
<I
>::write_log_entries(GenericLogEntriesVector log_entries
,
892 AioTransContext
*aio
, uint64_t *pos
) {
893 CephContext
*cct
= m_image_ctx
.cct
;
894 ldout(m_image_ctx
.cct
, 20) << "pos=" << *pos
<< dendl
;
895 ceph_assert(*pos
>= DATA_RING_BUFFER_OFFSET
&&
896 *pos
< this->m_log_pool_size
&&
897 *pos
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
899 // The first block is for log entries
900 uint64_t control_block_pos
= *pos
;
901 *pos
+= MIN_WRITE_ALLOC_SSD_SIZE
;
902 if (*pos
== this->m_log_pool_size
) {
903 *pos
= DATA_RING_BUFFER_OFFSET
;
906 std::vector
<WriteLogCacheEntry
> persist_log_entries
;
908 for (auto &log_entry
: log_entries
) {
909 log_entry
->log_entry_index
= control_block_pos
;
910 // Append data buffer for write operations
911 if (log_entry
->is_write_entry()) {
912 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entry
);
913 auto cache_bl
= write_entry
->get_cache_bl();
914 auto align_size
= write_entry
->get_aligned_data_size();
915 data_bl
.append(cache_bl
);
916 data_bl
.append_zero(align_size
- cache_bl
.length());
918 write_entry
->ram_entry
.write_data_pos
= *pos
;
920 if (*pos
>= this->m_log_pool_size
) {
921 *pos
= *pos
% this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
;
924 // push_back _after_ setting write_data_pos
925 persist_log_entries
.push_back(log_entry
->ram_entry
);
930 encode(persist_log_entries
, bl
);
931 ceph_assert(bl
.length() <= MIN_WRITE_ALLOC_SSD_SIZE
);
932 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
934 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
935 if (control_block_pos
+ bl
.length() > this->m_log_pool_size
) {
936 //exceeds border, need to split
937 uint64_t size
= bl
.length();
939 bl
.splice(0, this->m_log_pool_size
- control_block_pos
, &bl1
);
940 ceph_assert(bl
.length() == (size
- bl1
.length()));
942 ldout(cct
, 20) << "write " << control_block_pos
<< "~"
943 << size
<< " spans boundary, split into "
944 << control_block_pos
<< "~" << bl1
.length()
945 << " and " << DATA_RING_BUFFER_OFFSET
<< "~"
946 << bl
.length() << dendl
;
947 bdev
->aio_write(control_block_pos
, bl1
, &aio
->ioc
, false,
949 bdev
->aio_write(DATA_RING_BUFFER_OFFSET
, bl
, &aio
->ioc
, false,
952 ldout(cct
, 20) << "write " << control_block_pos
<< "~"
953 << bl
.length() << dendl
;
954 bdev
->aio_write(control_block_pos
, bl
, &aio
->ioc
, false,
959 template <typename I
>
960 void WriteLog
<I
>::schedule_update_root(
961 std::shared_ptr
<WriteLogPoolRoot
> root
, Context
*ctx
) {
962 CephContext
*cct
= m_image_ctx
.cct
;
963 ldout(cct
, 15) << "New root: pool_size=" << root
->pool_size
964 << " first_valid_entry=" << root
->first_valid_entry
965 << " first_free_entry=" << root
->first_free_entry
966 << " flushed_sync_gen=" << root
->flushed_sync_gen
968 ceph_assert(is_valid_pool_root(*root
));
972 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
973 need_finisher
= m_poolroot_to_update
.empty() && !m_updating_pool_root
;
974 std::shared_ptr
<WriteLogPoolRootUpdate
> entry
=
975 std::make_shared
<WriteLogPoolRootUpdate
>(root
, ctx
);
976 this->m_async_update_superblock
++;
977 this->m_async_op_tracker
.start_op();
978 m_poolroot_to_update
.emplace_back(entry
);
981 enlist_op_update_root();
985 template <typename I
>
986 void WriteLog
<I
>::enlist_op_update_root() {
987 Context
*append_ctx
= new LambdaContext([this](int r
) {
988 update_root_scheduled_ops();
990 this->m_work_queue
.queue(append_ctx
);
993 template <typename I
>
994 void WriteLog
<I
>::update_root_scheduled_ops() {
995 ldout(m_image_ctx
.cct
, 20) << dendl
;
997 std::shared_ptr
<WriteLogPoolRoot
> root
;
998 WriteLogPoolRootUpdateList root_updates
;
999 Context
*ctx
= nullptr;
1001 std::lock_guard
locker(m_lock
);
1002 if (m_updating_pool_root
) {
1003 /* Another thread is appending */
1004 ldout(m_image_ctx
.cct
, 15) << "Another thread is updating pool root"
1008 if (m_poolroot_to_update
.size()) {
1009 m_updating_pool_root
= true;
1010 root_updates
.swap(m_poolroot_to_update
);
1013 ceph_assert(!root_updates
.empty());
1014 ldout(m_image_ctx
.cct
, 15) << "Update root number: " << root_updates
.size()
1016 // We just update the last one, and call all the completions.
1017 auto entry
= root_updates
.back();
1020 ctx
= new LambdaContext([this, updates
= std::move(root_updates
)](int r
) {
1021 ldout(m_image_ctx
.cct
, 15) << "Start to callback." << dendl
;
1022 for (auto it
= updates
.begin(); it
!= updates
.end(); it
++) {
1023 Context
*it_ctx
= (*it
)->ctx
;
1024 it_ctx
->complete(r
);
1027 Context
*append_ctx
= new LambdaContext([this, ctx
](int r
) {
1028 ldout(m_image_ctx
.cct
, 15) << "Finish the update of pool root." << dendl
;
1029 bool need_finisher
= false;
1032 std::lock_guard
locker(m_lock
);
1033 m_updating_pool_root
= false;
1034 need_finisher
= !m_poolroot_to_update
.empty();
1036 if (need_finisher
) {
1037 enlist_op_update_root();
1041 AioTransContext
* aio
= new AioTransContext(m_image_ctx
.cct
, append_ctx
);
1042 update_pool_root(root
, aio
);
1045 template <typename I
>
1046 void WriteLog
<I
>::update_pool_root(std::shared_ptr
<WriteLogPoolRoot
> root
,
1047 AioTransContext
*aio
) {
1049 SuperBlock superblock
;
1050 superblock
.root
= *root
;
1051 encode(superblock
, bl
);
1052 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
1053 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
1054 bdev
->aio_write(0, bl
, &aio
->ioc
, false, WRITE_LIFE_NOT_SET
);
1055 bdev
->aio_submit(&aio
->ioc
);
1058 template <typename I
>
1059 int WriteLog
<I
>::update_pool_root_sync(
1060 std::shared_ptr
<WriteLogPoolRoot
> root
) {
1062 SuperBlock superblock
;
1063 superblock
.root
= *root
;
1064 encode(superblock
, bl
);
1065 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
1066 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
1067 return bdev
->write(0, bl
, false);
1070 template <typename I
>
1071 void WriteLog
<I
>::aio_read_data_block(std::shared_ptr
<GenericWriteLogEntry
> log_entry
,
1072 bufferlist
*bl
, Context
*ctx
) {
1073 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> log_entries
= {std::move(log_entry
)};
1074 std::vector
<bufferlist
*> bls
{bl
};
1075 aio_read_data_blocks(log_entries
, bls
, ctx
);
1078 template <typename I
>
1079 void WriteLog
<I
>::aio_read_data_blocks(
1080 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries
,
1081 std::vector
<bufferlist
*> &bls
, Context
*ctx
) {
1082 ceph_assert(log_entries
.size() == bls
.size());
1084 //get the valid part
1085 Context
*read_ctx
= new LambdaContext(
1086 [log_entries
, bls
, ctx
](int r
) {
1087 for (unsigned int i
= 0; i
< log_entries
.size(); i
++) {
1088 bufferlist valid_data_bl
;
1089 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entries
[i
]);
1090 auto length
= write_entry
->ram_entry
.is_write() ? write_entry
->ram_entry
.write_bytes
1091 : write_entry
->ram_entry
.ws_datalen
;
1093 valid_data_bl
.substr_of(*bls
[i
], 0, length
);
1095 bls
[i
]->append(valid_data_bl
);
1096 write_entry
->dec_bl_refs();
1101 CephContext
*cct
= m_image_ctx
.cct
;
1102 AioTransContext
*aio
= new AioTransContext(cct
, read_ctx
);
1103 for (unsigned int i
= 0; i
< log_entries
.size(); i
++) {
1104 WriteLogCacheEntry
*log_entry
= &log_entries
[i
]->ram_entry
;
1106 ceph_assert(log_entry
->is_write() || log_entry
->is_writesame());
1107 uint64_t len
= log_entry
->is_write() ? log_entry
->write_bytes
:
1108 log_entry
->ws_datalen
;
1109 uint64_t align_len
= round_up_to(len
, MIN_WRITE_ALLOC_SSD_SIZE
);
1111 ldout(cct
, 20) << "entry i=" << i
<< " " << log_entry
->write_data_pos
1112 << "~" << len
<< dendl
;
1113 ceph_assert(log_entry
->write_data_pos
>= DATA_RING_BUFFER_OFFSET
&&
1114 log_entry
->write_data_pos
< pool_root
.pool_size
);
1115 ceph_assert(align_len
);
1116 if (log_entry
->write_data_pos
+ align_len
> pool_root
.pool_size
) {
1117 // spans boundary, need to split
1118 uint64_t len1
= pool_root
.pool_size
- log_entry
->write_data_pos
;
1119 uint64_t len2
= align_len
- len1
;
1121 ldout(cct
, 20) << "read " << log_entry
->write_data_pos
<< "~"
1122 << align_len
<< " spans boundary, split into "
1123 << log_entry
->write_data_pos
<< "~" << len1
1124 << " and " << DATA_RING_BUFFER_OFFSET
<< "~"
1126 bdev
->aio_read(log_entry
->write_data_pos
, len1
, bls
[i
], &aio
->ioc
);
1127 bdev
->aio_read(DATA_RING_BUFFER_OFFSET
, len2
, bls
[i
], &aio
->ioc
);
1129 ldout(cct
, 20) << "read " << log_entry
->write_data_pos
<< "~"
1130 << align_len
<< dendl
;
1131 bdev
->aio_read(log_entry
->write_data_pos
, align_len
, bls
[i
], &aio
->ioc
);
1134 bdev
->aio_submit(&aio
->ioc
);
1137 template <typename I
>
1138 void WriteLog
<I
>::complete_user_request(Context
*&user_req
, int r
) {
1139 m_image_ctx
.op_work_queue
->queue(user_req
, r
);
1144 } // namespace cache
1145 } // namespace librbd
1147 template class librbd::cache::pwl::ssd::WriteLog
<librbd::ImageCtx
>;