1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/WorkQueue.h"
13 #include "common/Timer.h"
14 #include "common/perf_counters.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/asio/ContextWQ.h"
17 #include "librbd/cache/pwl/ImageCacheState.h"
18 #include "librbd/cache/pwl/LogEntry.h"
23 #define dout_subsys ceph_subsys_rbd_pwl
25 #define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \
26 << this << " " << __func__ << ": "
34 using namespace librbd::cache::pwl
;
36 static bool is_valid_pool_root(const WriteLogPoolRoot
& root
) {
37 return root
.pool_size
% MIN_WRITE_ALLOC_SSD_SIZE
== 0 &&
38 root
.first_valid_entry
>= DATA_RING_BUFFER_OFFSET
&&
39 root
.first_valid_entry
< root
.pool_size
&&
40 root
.first_valid_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0 &&
41 root
.first_free_entry
>= DATA_RING_BUFFER_OFFSET
&&
42 root
.first_free_entry
< root
.pool_size
&&
43 root
.first_free_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0;
47 Builder
<AbstractWriteLog
<I
>>* WriteLog
<I
>::create_builder() {
48 m_builderobj
= new Builder
<This
>();
53 WriteLog
<I
>::WriteLog(
54 I
&image_ctx
, librbd::cache::pwl::ImageCacheState
<I
>* cache_state
,
55 cache::ImageWritebackInterface
& image_writeback
,
56 plugin::Api
<I
>& plugin_api
)
57 : AbstractWriteLog
<I
>(image_ctx
, cache_state
, create_builder(),
58 image_writeback
, plugin_api
)
63 WriteLog
<I
>::~WriteLog() {
68 void WriteLog
<I
>::collect_read_extents(
69 uint64_t read_buffer_offset
, LogMapEntry
<GenericWriteLogEntry
> map_entry
,
70 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries_to_read
,
71 std::vector
<bufferlist
*> &bls_to_read
,
72 uint64_t entry_hit_length
, Extent hit_extent
,
73 pwl::C_ReadRequest
*read_ctx
) {
74 // Make a bl for this hit extent. This will add references to the
75 // write_entry->cache_bl */
76 ldout(m_image_ctx
.cct
, 5) << dendl
;
77 auto write_entry
= std::static_pointer_cast
<WriteLogEntry
>(map_entry
.log_entry
);
79 write_entry
->copy_cache_bl(&hit_bl
);
80 bool writesame
= write_entry
->is_writesame_entry();
81 auto hit_extent_buf
= std::make_shared
<ImageExtentBuf
>(
82 hit_extent
, hit_bl
, true, read_buffer_offset
, writesame
);
83 read_ctx
->read_extents
.push_back(hit_extent_buf
);
85 if (!hit_bl
.length()) {
86 ldout(m_image_ctx
.cct
, 5) << "didn't hit RAM" << dendl
;
87 auto read_extent
= read_ctx
->read_extents
.back();
88 write_entry
->inc_bl_refs();
89 log_entries_to_read
.push_back(std::move(write_entry
));
90 bls_to_read
.push_back(&read_extent
->m_bl
);
95 void WriteLog
<I
>::complete_read(
96 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries_to_read
,
97 std::vector
<bufferlist
*> &bls_to_read
,
99 if (!log_entries_to_read
.empty()) {
100 aio_read_data_blocks(log_entries_to_read
, bls_to_read
, ctx
);
106 template <typename I
>
107 int WriteLog
<I
>::create_and_open_bdev() {
108 CephContext
*cct
= m_image_ctx
.cct
;
110 bdev
= BlockDevice::create(cct
, this->m_log_pool_name
, aio_cache_cb
,
111 nullptr, nullptr, nullptr);
112 int r
= bdev
->open(this->m_log_pool_name
);
114 lderr(cct
) << "failed to open bdev" << dendl
;
119 ceph_assert(this->m_log_pool_size
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
120 if (bdev
->get_size() != this->m_log_pool_size
) {
121 lderr(cct
) << "size mismatch: bdev size " << bdev
->get_size()
122 << " (block size " << bdev
->get_block_size()
123 << ") != pool size " << this->m_log_pool_size
<< dendl
;
132 template <typename I
>
133 bool WriteLog
<I
>::initialize_pool(Context
*on_finish
,
134 pwl::DeferredContexts
&later
) {
136 CephContext
*cct
= m_image_ctx
.cct
;
138 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
139 if (access(this->m_log_pool_name
.c_str(), F_OK
) != 0) {
140 int fd
= ::open(this->m_log_pool_name
.c_str(), O_RDWR
|O_CREAT
, 0644);
143 if (truncate(this->m_log_pool_name
.c_str(),
144 this->m_log_pool_size
) != 0) {
152 m_cache_state
->present
= false;
153 m_cache_state
->clean
= true;
154 m_cache_state
->empty
= true;
155 /* TODO: filter/replace errnos that are meaningless to the caller */
156 on_finish
->complete(-errno
);
160 r
= create_and_open_bdev();
162 on_finish
->complete(r
);
165 m_cache_state
->present
= true;
166 m_cache_state
->clean
= true;
167 m_cache_state
->empty
= true;
168 /* new pool, calculate and store metadata */
170 /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free.
171 * In this way, when all ring buffer spaces are allocated,
172 * m_first_free_entry and m_first_valid_entry will not be equal.
173 * Equal only means the cache is empty. */
174 this->m_bytes_allocated_cap
= this->m_log_pool_size
-
175 DATA_RING_BUFFER_OFFSET
- MIN_WRITE_ALLOC_SSD_SIZE
;
177 m_first_free_entry
= DATA_RING_BUFFER_OFFSET
;
178 m_first_valid_entry
= DATA_RING_BUFFER_OFFSET
;
180 auto new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
181 new_root
->layout_version
= SSD_LAYOUT_VERSION
;
182 new_root
->pool_size
= this->m_log_pool_size
;
183 new_root
->flushed_sync_gen
= this->m_flushed_sync_gen
;
184 new_root
->block_size
= MIN_WRITE_ALLOC_SSD_SIZE
;
185 new_root
->first_free_entry
= m_first_free_entry
;
186 new_root
->first_valid_entry
= m_first_valid_entry
;
187 new_root
->num_log_entries
= 0;
188 pool_root
= *new_root
;
190 r
= update_pool_root_sync(new_root
);
192 lderr(cct
) << "failed to initialize pool ("
193 << this->m_log_pool_name
<< ")" << dendl
;
196 on_finish
->complete(r
);
200 ceph_assert(m_cache_state
->present
);
201 r
= create_and_open_bdev();
203 on_finish
->complete(r
);
208 SuperBlock superblock
;
209 ::IOContext
ioctx(cct
, nullptr);
210 r
= bdev
->read(0, MIN_WRITE_ALLOC_SSD_SIZE
, &bl
, &ioctx
, false);
212 lderr(cct
) << "read ssd cache superblock failed " << dendl
;
215 auto p
= bl
.cbegin();
216 decode(superblock
, p
);
217 pool_root
= superblock
.root
;
218 ldout(cct
, 1) << "Decoded root: pool_size=" << pool_root
.pool_size
219 << " first_valid_entry=" << pool_root
.first_valid_entry
220 << " first_free_entry=" << pool_root
.first_free_entry
221 << " flushed_sync_gen=" << pool_root
.flushed_sync_gen
223 ceph_assert(is_valid_pool_root(pool_root
));
224 if (pool_root
.layout_version
!= SSD_LAYOUT_VERSION
) {
225 lderr(cct
) << "pool layout version is "
226 << pool_root
.layout_version
227 << " expected " << SSD_LAYOUT_VERSION
231 if (pool_root
.block_size
!= MIN_WRITE_ALLOC_SSD_SIZE
) {
232 lderr(cct
) << "pool block size is " << pool_root
.block_size
233 << " expected " << MIN_WRITE_ALLOC_SSD_SIZE
238 this->m_log_pool_size
= pool_root
.pool_size
;
239 this->m_flushed_sync_gen
= pool_root
.flushed_sync_gen
;
240 this->m_first_valid_entry
= pool_root
.first_valid_entry
;
241 this->m_first_free_entry
= pool_root
.first_free_entry
;
242 this->m_bytes_allocated_cap
= this->m_log_pool_size
-
243 DATA_RING_BUFFER_OFFSET
-
244 MIN_WRITE_ALLOC_SSD_SIZE
;
246 load_existing_entries(later
);
247 m_cache_state
->clean
= this->m_dirty_log_entries
.empty();
248 m_cache_state
->empty
= m_log_entries
.empty();
255 on_finish
->complete(-EINVAL
);
259 template <typename I
>
260 void WriteLog
<I
>::remove_pool_file() {
265 ldout(m_image_ctx
.cct
, 5) << "block device is closed" << dendl
;
267 if (m_cache_state
->clean
) {
268 ldout(m_image_ctx
.cct
, 5) << "Removing empty pool file: "
269 << this->m_log_pool_name
<< dendl
;
270 if (remove(this->m_log_pool_name
.c_str()) != 0) {
271 lderr(m_image_ctx
.cct
) << "failed to remove empty pool \""
272 << this->m_log_pool_name
<< "\": " << dendl
;
274 m_cache_state
->present
= false;
277 ldout(m_image_ctx
.cct
, 5) << "Not removing pool file: "
278 << this->m_log_pool_name
<< dendl
;
282 template <typename I
>
283 void WriteLog
<I
>::load_existing_entries(pwl::DeferredContexts
&later
) {
284 CephContext
*cct
= m_image_ctx
.cct
;
285 std::map
<uint64_t, std::shared_ptr
<SyncPointLogEntry
>> sync_point_entries
;
286 std::map
<uint64_t, bool> missing_sync_points
;
288 // Iterate through the log_entries and append all the write_bytes
289 // of each entry to fetch the pos of next 4k of log_entries. Iterate
290 // through the log entries and append them to the in-memory vector
291 for (uint64_t next_log_pos
= this->m_first_valid_entry
;
292 next_log_pos
!= this->m_first_free_entry
; ) {
293 // read the entries from SSD cache and decode
294 bufferlist bl_entries
;
295 ::IOContext
ioctx_entry(cct
, nullptr);
296 bdev
->read(next_log_pos
, MIN_WRITE_ALLOC_SSD_SIZE
, &bl_entries
,
297 &ioctx_entry
, false);
298 std::vector
<WriteLogCacheEntry
> ssd_log_entries
;
299 auto pl
= bl_entries
.cbegin();
300 decode(ssd_log_entries
, pl
);
301 ldout(cct
, 5) << "decoded ssd log entries" << dendl
;
302 uint64_t curr_log_pos
= next_log_pos
;
303 std::shared_ptr
<GenericLogEntry
> log_entry
= nullptr;
305 for (auto it
= ssd_log_entries
.begin(); it
!= ssd_log_entries
.end(); ++it
) {
306 this->update_entries(&log_entry
, &*it
, missing_sync_points
,
307 sync_point_entries
, curr_log_pos
);
308 log_entry
->ram_entry
= *it
;
309 log_entry
->log_entry_index
= curr_log_pos
;
310 log_entry
->completed
= true;
311 m_log_entries
.push_back(log_entry
);
312 next_log_pos
+= round_up_to(it
->write_bytes
, MIN_WRITE_ALLOC_SSD_SIZE
);
314 // along with the write_bytes, add control block size too
315 next_log_pos
+= MIN_WRITE_ALLOC_SSD_SIZE
;
316 if (next_log_pos
>= this->m_log_pool_size
) {
317 next_log_pos
= next_log_pos
% this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
;
320 this->update_sync_points(missing_sync_points
, sync_point_entries
, later
);
321 if (m_first_valid_entry
> m_first_free_entry
) {
322 m_bytes_allocated
= this->m_log_pool_size
- m_first_valid_entry
+
323 m_first_free_entry
- DATA_RING_BUFFER_OFFSET
;
325 m_bytes_allocated
= m_first_free_entry
- m_first_valid_entry
;
329 // For SSD we don't calc m_bytes_allocated in this
330 template <typename I
>
331 void WriteLog
<I
>::inc_allocated_cached_bytes(
332 std::shared_ptr
<pwl::GenericLogEntry
> log_entry
) {
333 if (log_entry
->is_write_entry()) {
334 this->m_bytes_cached
+= log_entry
->write_bytes();
338 template <typename I
>
339 bool WriteLog
<I
>::alloc_resources(C_BlockIORequestT
*req
) {
340 bool alloc_succeeds
= true;
341 uint64_t bytes_allocated
= 0;
342 uint64_t bytes_cached
= 0;
343 uint64_t bytes_dirtied
= 0;
344 uint64_t num_lanes
= 0;
345 uint64_t num_unpublished_reserves
= 0;
346 uint64_t num_log_entries
= 0;
348 // Setup buffer, and get all the number of required resources
349 req
->setup_buffer_resources(&bytes_cached
, &bytes_dirtied
, &bytes_allocated
,
350 &num_lanes
, &num_log_entries
,
351 &num_unpublished_reserves
);
353 ceph_assert(!num_lanes
);
354 if (num_log_entries
) {
355 bytes_allocated
+= num_log_entries
* MIN_WRITE_ALLOC_SSD_SIZE
;
358 ceph_assert(!num_unpublished_reserves
);
360 alloc_succeeds
= this->check_allocation(req
, bytes_cached
, bytes_dirtied
,
361 bytes_allocated
, num_lanes
,
363 num_unpublished_reserves
);
364 req
->set_allocated(alloc_succeeds
);
365 return alloc_succeeds
;
368 template <typename I
>
369 bool WriteLog
<I
>::has_sync_point_logs(GenericLogOperations
&ops
) {
370 for (auto &op
: ops
) {
371 if (op
->get_log_entry()->is_sync_point()) {
380 void WriteLog
<I
>::enlist_op_appender() {
381 this->m_async_append_ops
++;
382 this->m_async_op_tracker
.start_op();
383 Context
*append_ctx
= new LambdaContext([this](int r
) {
384 append_scheduled_ops();
386 this->m_work_queue
.queue(append_ctx
);
390 * Takes custody of ops. They'll all get their log entries appended,
391 * and have their on_write_persist contexts completed once they and
392 * all prior log entries are persisted everywhere.
395 void WriteLog
<I
>::schedule_append_ops(GenericLogOperations
&ops
, C_BlockIORequestT
*req
) {
396 bool need_finisher
= false;
397 GenericLogOperationsVector appending
;
399 std::copy(std::begin(ops
), std::end(ops
), std::back_inserter(appending
));
401 std::lock_guard
locker(m_lock
);
403 bool persist_on_flush
= this->get_persist_on_flush();
404 need_finisher
= !this->m_appending
&&
405 ((this->m_ops_to_append
.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES
) ||
408 // Only flush logs into SSD when there is internal/external flush request
409 if (!need_finisher
) {
410 need_finisher
= has_sync_point_logs(ops
);
412 this->m_ops_to_append
.splice(this->m_ops_to_append
.end(), ops
);
414 // To preserve the order of overlapping IOs, release_cell() may be
415 // called only after the ops are added to m_ops_to_append.
416 // As soon as m_lock is released, the appended ops can be picked up
417 // by append_scheduled_ops() in another thread and req can be freed.
418 if (req
!= nullptr) {
419 if (persist_on_flush
) {
420 req
->complete_user_request(0);
427 this->enlist_op_appender();
430 for (auto &op
: appending
) {
435 template <typename I
>
436 void WriteLog
<I
>::setup_schedule_append(pwl::GenericLogOperationsVector
&ops
,
438 C_BlockIORequestT
*req
) {
439 this->schedule_append(ops
, req
);
442 template <typename I
>
443 void WriteLog
<I
>::append_scheduled_ops(void) {
444 GenericLogOperations ops
;
445 ldout(m_image_ctx
.cct
, 20) << dendl
;
447 bool ops_remain
= false; // unused, no-op variable for SSD
448 bool appending
= false; // unused, no-op variable for SSD
449 this->append_scheduled(ops
, ops_remain
, appending
);
452 alloc_op_log_entries(ops
);
453 append_op_log_entries(ops
);
455 this->m_async_append_ops
--;
456 this->m_async_op_tracker
.finish_op();
461 * Write and persist the (already allocated) write log entries and
462 * data buffer allocations for a set of ops. The data buffer for each
463 * of these must already have been persisted to its reserved area.
465 template <typename I
>
466 void WriteLog
<I
>::append_op_log_entries(GenericLogOperations
&ops
) {
467 ceph_assert(!ops
.empty());
468 ldout(m_image_ctx
.cct
, 20) << dendl
;
469 Context
*ctx
= new LambdaContext([this, ops
](int r
) {
471 ldout(m_image_ctx
.cct
, 20) << "Finished root update " << dendl
;
473 auto captured_ops
= std::move(ops
);
474 this->complete_op_log_entries(std::move(captured_ops
), r
);
476 bool need_finisher
= false;
478 std::lock_guard
locker1(m_lock
);
479 bool persist_on_flush
= this->get_persist_on_flush();
480 need_finisher
= ((this->m_ops_to_append
.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES
) ||
483 if (!need_finisher
) {
484 need_finisher
= has_sync_point_logs(this->m_ops_to_append
);
489 this->enlist_op_appender();
491 this->m_async_update_superblock
--;
492 this->m_async_op_tracker
.finish_op();
494 uint64_t *new_first_free_entry
= new(uint64_t);
495 Context
*append_ctx
= new LambdaContext(
496 [this, new_first_free_entry
, ops
, ctx
](int r
) {
497 std::shared_ptr
<WriteLogPoolRoot
> new_root
;
499 ldout(m_image_ctx
.cct
, 20) << "Finished appending at "
500 << *new_first_free_entry
<< dendl
;
501 utime_t now
= ceph_clock_now();
502 for (auto &operation
: ops
) {
503 operation
->log_append_comp_time
= now
;
506 std::lock_guard
locker(this->m_log_append_lock
);
507 std::lock_guard
locker1(m_lock
);
508 assert(this->m_appending
);
509 this->m_appending
= false;
510 new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
511 pool_root
.first_free_entry
= *new_first_free_entry
;
512 new_root
->first_free_entry
= *new_first_free_entry
;
513 delete new_first_free_entry
;
514 schedule_update_root(new_root
, ctx
);
516 this->m_async_append_ops
--;
517 this->m_async_op_tracker
.finish_op();
519 // Append logs and update first_free_update
520 append_ops(ops
, append_ctx
, new_first_free_entry
);
523 this->dispatch_deferred_writes();
527 template <typename I
>
528 void WriteLog
<I
>::release_ram(std::shared_ptr
<GenericLogEntry
> log_entry
) {
529 log_entry
->remove_cache_bl();
532 template <typename I
>
533 void WriteLog
<I
>::alloc_op_log_entries(GenericLogOperations
&ops
) {
534 std::unique_lock
locker(m_lock
);
536 for (auto &operation
: ops
) {
537 auto &log_entry
= operation
->get_log_entry();
538 log_entry
->ram_entry
.set_entry_valid(true);
539 m_log_entries
.push_back(log_entry
);
540 ldout(m_image_ctx
.cct
, 20) << "operation=[" << *operation
<< "]" << dendl
;
542 if (m_cache_state
->empty
&& !m_log_entries
.empty()) {
543 m_cache_state
->empty
= false;
544 this->update_image_cache_state();
545 this->write_image_cache_state(locker
);
549 template <typename I
>
550 void WriteLog
<I
>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush
,
551 DeferredContexts
&post_unlock
,
552 bool has_write_entry
) {
553 // snapshot so we behave consistently
554 bool invalidating
= this->m_invalidating
;
556 if (invalidating
|| !has_write_entry
) {
557 for (auto &log_entry
: entries_to_flush
) {
558 GuardedRequestFunctionContext
*guarded_ctx
=
559 new GuardedRequestFunctionContext([this, log_entry
, invalidating
]
560 (GuardedRequestFunctionContext
&guard_ctx
) {
561 log_entry
->m_cell
= guard_ctx
.cell
;
562 Context
*ctx
= this->construct_flush_entry(log_entry
, invalidating
);
565 ctx
= new LambdaContext([this, log_entry
, ctx
](int r
) {
566 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
567 [this, log_entry
, ctx
](int r
) {
568 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
569 << " " << *log_entry
<< dendl
;
570 log_entry
->writeback(this->m_image_writeback
, ctx
);
576 this->detain_flush_guard_request(log_entry
, guarded_ctx
);
579 int count
= entries_to_flush
.size();
580 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> write_entries
;
581 std::vector
<bufferlist
*> read_bls
;
583 write_entries
.reserve(count
);
584 read_bls
.reserve(count
);
586 for (auto &log_entry
: entries_to_flush
) {
587 if (log_entry
->is_write_entry()) {
588 bufferlist
*bl
= new bufferlist
;
589 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entry
);
590 write_entry
->inc_bl_refs();
591 write_entries
.push_back(write_entry
);
592 read_bls
.push_back(bl
);
596 Context
*ctx
= new LambdaContext(
597 [this, entries_to_flush
, read_bls
](int r
) {
599 GuardedRequestFunctionContext
*guarded_ctx
= nullptr;
601 for (auto &log_entry
: entries_to_flush
) {
602 if (log_entry
->is_write_entry()) {
603 bufferlist captured_entry_bl
;
604 captured_entry_bl
.claim_append(*read_bls
[i
]);
605 delete read_bls
[i
++];
607 guarded_ctx
= new GuardedRequestFunctionContext([this, log_entry
, captured_entry_bl
]
608 (GuardedRequestFunctionContext
&guard_ctx
) {
609 log_entry
->m_cell
= guard_ctx
.cell
;
610 Context
*ctx
= this->construct_flush_entry(log_entry
, false);
612 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
613 [this, log_entry
, entry_bl
=std::move(captured_entry_bl
), ctx
](int r
) {
614 auto captured_entry_bl
= std::move(entry_bl
);
615 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
616 << " " << *log_entry
<< dendl
;
617 log_entry
->writeback_bl(this->m_image_writeback
, ctx
,
618 std::move(captured_entry_bl
));
622 guarded_ctx
= new GuardedRequestFunctionContext([this, log_entry
]
623 (GuardedRequestFunctionContext
&guard_ctx
) {
624 log_entry
->m_cell
= guard_ctx
.cell
;
625 Context
*ctx
= this->construct_flush_entry(log_entry
, false);
626 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
627 [this, log_entry
, ctx
](int r
) {
628 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
629 << " " << *log_entry
<< dendl
;
630 log_entry
->writeback(this->m_image_writeback
, ctx
);
634 this->detain_flush_guard_request(log_entry
, guarded_ctx
);
638 aio_read_data_blocks(write_entries
, read_bls
, ctx
);
642 template <typename I
>
643 void WriteLog
<I
>::process_work() {
644 CephContext
*cct
= m_image_ctx
.cct
;
645 int max_iterations
= 4;
646 bool wake_up_requested
= false;
647 uint64_t aggressive_high_water_bytes
=
648 this->m_bytes_allocated_cap
* AGGRESSIVE_RETIRE_HIGH_WATER
;
649 uint64_t high_water_bytes
= this->m_bytes_allocated_cap
* RETIRE_HIGH_WATER
;
651 ldout(cct
, 20) << dendl
;
655 std::lock_guard
locker(m_lock
);
656 this->m_wake_up_requested
= false;
658 if (this->m_alloc_failed_since_retire
|| (this->m_shutting_down
) ||
659 this->m_invalidating
|| m_bytes_allocated
> high_water_bytes
) {
660 ldout(m_image_ctx
.cct
, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
661 << ", allocated > high_water="
662 << (m_bytes_allocated
> high_water_bytes
)
664 retire_entries((this->m_shutting_down
|| this->m_invalidating
||
665 m_bytes_allocated
> aggressive_high_water_bytes
)
666 ? MAX_ALLOC_PER_TRANSACTION
: MAX_FREE_PER_TRANSACTION
);
668 this->dispatch_deferred_writes();
669 this->process_writeback_dirty_entries();
671 std::lock_guard
locker(m_lock
);
672 wake_up_requested
= this->m_wake_up_requested
;
674 } while (wake_up_requested
&& --max_iterations
> 0);
677 std::lock_guard
locker(m_lock
);
678 this->m_wake_up_scheduled
= false;
679 // Reschedule if it's still requested
680 if (this->m_wake_up_requested
) {
687 * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
688 * that are eligible to be retired. Returns true if anything was
692 template <typename I
>
693 bool WriteLog
<I
>::retire_entries(const unsigned long int frees_per_tx
) {
694 CephContext
*cct
= m_image_ctx
.cct
;
695 GenericLogEntriesVector retiring_entries
;
696 uint64_t initial_first_valid_entry
;
697 uint64_t first_valid_entry
;
699 std::lock_guard
retire_locker(this->m_log_retire_lock
);
700 ldout(cct
, 20) << "Look for entries to retire" << dendl
;
702 // Entry readers can't be added while we hold m_entry_reader_lock
703 RWLock::WLocker
entry_reader_locker(this->m_entry_reader_lock
);
704 std::lock_guard
locker(m_lock
);
705 initial_first_valid_entry
= m_first_valid_entry
;
706 first_valid_entry
= m_first_valid_entry
;
707 while (retiring_entries
.size() < frees_per_tx
&& !m_log_entries
.empty()) {
708 GenericLogEntriesVector retiring_subentries
;
709 uint64_t control_block_pos
= m_log_entries
.front()->log_entry_index
;
710 uint64_t data_length
= 0;
711 for (auto it
= m_log_entries
.begin(); it
!= m_log_entries
.end(); ++it
) {
712 if (this->can_retire_entry(*it
)) {
713 // log_entry_index is valid after appending to SSD
714 if ((*it
)->log_entry_index
!= control_block_pos
) {
715 ldout(cct
, 20) << "Old log_entry_index is " << control_block_pos
716 << ",New log_entry_index is "
717 << (*it
)->log_entry_index
718 << ",data length is " << data_length
<< dendl
;
719 ldout(cct
, 20) << "The log entry is " << *(*it
) << dendl
;
720 if ((*it
)->log_entry_index
< control_block_pos
) {
721 ceph_assert((*it
)->log_entry_index
==
722 (control_block_pos
+ data_length
+ MIN_WRITE_ALLOC_SSD_SIZE
) %
723 this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
);
725 ceph_assert((*it
)->log_entry_index
== control_block_pos
+
726 data_length
+ MIN_WRITE_ALLOC_SSD_SIZE
);
730 retiring_subentries
.push_back(*it
);
731 if ((*it
)->is_write_entry()) {
732 data_length
+= (*it
)->get_aligned_data_size();
736 retiring_subentries
.clear();
740 // SSD: retiring_subentries in a span
741 if (!retiring_subentries
.empty()) {
742 for (auto it
= retiring_subentries
.begin();
743 it
!= retiring_subentries
.end(); it
++) {
744 ceph_assert(m_log_entries
.front() == *it
);
745 m_log_entries
.pop_front();
746 if ((*it
)->write_bytes() > 0 || (*it
)->bytes_dirty() > 0) {
747 auto gen_write_entry
= static_pointer_cast
<GenericWriteLogEntry
>(*it
);
748 if (gen_write_entry
) {
749 this->m_blocks_to_log_entries
.remove_log_entry(gen_write_entry
);
754 ldout(cct
, 20) << "span with " << retiring_subentries
.size()
755 << " entries: control_block_pos=" << control_block_pos
756 << " data_length=" << data_length
758 retiring_entries
.insert(
759 retiring_entries
.end(), retiring_subentries
.begin(),
760 retiring_subentries
.end());
762 first_valid_entry
= control_block_pos
+ data_length
+
763 MIN_WRITE_ALLOC_SSD_SIZE
;
764 if (first_valid_entry
>= this->m_log_pool_size
) {
765 first_valid_entry
= first_valid_entry
% this->m_log_pool_size
+
766 DATA_RING_BUFFER_OFFSET
;
773 if (retiring_entries
.size()) {
774 ldout(cct
, 20) << "Retiring " << retiring_entries
.size() << " entries"
777 // Advance first valid entry and release buffers
778 uint64_t flushed_sync_gen
;
779 std::lock_guard
append_locker(this->m_log_append_lock
);
781 std::lock_guard
locker(m_lock
);
782 flushed_sync_gen
= this->m_flushed_sync_gen
;
785 ceph_assert(first_valid_entry
!= initial_first_valid_entry
);
786 auto new_root
= std::make_shared
<WriteLogPoolRoot
>(pool_root
);
787 new_root
->flushed_sync_gen
= flushed_sync_gen
;
788 new_root
->first_valid_entry
= first_valid_entry
;
789 pool_root
.flushed_sync_gen
= flushed_sync_gen
;
790 pool_root
.first_valid_entry
= first_valid_entry
;
792 Context
*ctx
= new LambdaContext(
793 [this, first_valid_entry
, initial_first_valid_entry
,
794 retiring_entries
](int r
) {
795 uint64_t allocated_bytes
= 0;
796 uint64_t cached_bytes
= 0;
797 uint64_t former_log_pos
= 0;
798 for (auto &entry
: retiring_entries
) {
799 ceph_assert(entry
->log_entry_index
!= 0);
800 if (entry
->log_entry_index
!= former_log_pos
) {
801 // Space for control blocks
802 allocated_bytes
+= MIN_WRITE_ALLOC_SSD_SIZE
;
803 former_log_pos
= entry
->log_entry_index
;
805 if (entry
->is_write_entry()) {
806 cached_bytes
+= entry
->write_bytes();
807 // space for userdata
808 allocated_bytes
+= entry
->get_aligned_data_size();
811 bool need_update_state
= false;
813 std::lock_guard
locker(m_lock
);
814 m_first_valid_entry
= first_valid_entry
;
815 ceph_assert(m_first_valid_entry
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
816 ceph_assert(this->m_bytes_allocated
>= allocated_bytes
);
817 this->m_bytes_allocated
-= allocated_bytes
;
818 ceph_assert(this->m_bytes_cached
>= cached_bytes
);
819 this->m_bytes_cached
-= cached_bytes
;
820 if (!m_cache_state
->empty
&& m_log_entries
.empty()) {
821 m_cache_state
->empty
= true;
822 this->update_image_cache_state();
823 need_update_state
= true;
826 ldout(m_image_ctx
.cct
, 20)
827 << "Finished root update: initial_first_valid_entry="
828 << initial_first_valid_entry
<< ", m_first_valid_entry="
829 << m_first_valid_entry
<< ", release space = "
830 << allocated_bytes
<< ", m_bytes_allocated="
831 << m_bytes_allocated
<< ", release cached space="
832 << cached_bytes
<< ", m_bytes_cached="
833 << this->m_bytes_cached
<< dendl
;
835 this->m_alloc_failed_since_retire
= false;
838 if (need_update_state
) {
839 std::unique_lock
locker(m_lock
);
840 this->write_image_cache_state(locker
);
843 this->dispatch_deferred_writes();
844 this->process_writeback_dirty_entries();
845 m_async_update_superblock
--;
846 this->m_async_op_tracker
.finish_op();
849 std::lock_guard
locker(m_lock
);
850 schedule_update_root(new_root
, ctx
);
852 ldout(cct
, 20) << "Nothing to retire" << dendl
;
858 template <typename I
>
859 void WriteLog
<I
>::append_ops(GenericLogOperations
&ops
, Context
*ctx
,
860 uint64_t* new_first_free_entry
) {
861 GenericLogEntriesVector log_entries
;
862 CephContext
*cct
= m_image_ctx
.cct
;
863 uint64_t span_payload_len
= 0;
864 uint64_t bytes_to_free
= 0;
865 ldout(cct
, 20) << "Appending " << ops
.size() << " log entries." << dendl
;
867 *new_first_free_entry
= pool_root
.first_free_entry
;
868 AioTransContext
* aio
= new AioTransContext(cct
, ctx
);
870 utime_t now
= ceph_clock_now();
871 for (auto &operation
: ops
) {
872 operation
->log_append_start_time
= now
;
873 auto log_entry
= operation
->get_log_entry();
875 if (log_entries
.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES
||
876 span_payload_len
>= SPAN_MAX_DATA_LEN
) {
877 if (log_entries
.size() > 1) {
878 bytes_to_free
+= (log_entries
.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE
;
880 write_log_entries(log_entries
, aio
, new_first_free_entry
);
882 span_payload_len
= 0;
884 log_entries
.push_back(log_entry
);
885 span_payload_len
+= log_entry
->write_bytes();
887 if (!span_payload_len
|| !log_entries
.empty()) {
888 if (log_entries
.size() > 1) {
889 bytes_to_free
+= (log_entries
.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE
;
891 write_log_entries(log_entries
, aio
, new_first_free_entry
);
895 std::lock_guard
locker1(m_lock
);
896 m_first_free_entry
= *new_first_free_entry
;
897 m_bytes_allocated
-= bytes_to_free
;
900 bdev
->aio_submit(&aio
->ioc
);
903 template <typename I
>
904 void WriteLog
<I
>::write_log_entries(GenericLogEntriesVector log_entries
,
905 AioTransContext
*aio
, uint64_t *pos
) {
906 CephContext
*cct
= m_image_ctx
.cct
;
907 ldout(m_image_ctx
.cct
, 20) << "pos=" << *pos
<< dendl
;
908 ceph_assert(*pos
>= DATA_RING_BUFFER_OFFSET
&&
909 *pos
< this->m_log_pool_size
&&
910 *pos
% MIN_WRITE_ALLOC_SSD_SIZE
== 0);
912 // The first block is for log entries
913 uint64_t control_block_pos
= *pos
;
914 *pos
+= MIN_WRITE_ALLOC_SSD_SIZE
;
915 if (*pos
== this->m_log_pool_size
) {
916 *pos
= DATA_RING_BUFFER_OFFSET
;
919 std::vector
<WriteLogCacheEntry
> persist_log_entries
;
921 for (auto &log_entry
: log_entries
) {
922 log_entry
->log_entry_index
= control_block_pos
;
923 // Append data buffer for write operations
924 if (log_entry
->is_write_entry()) {
925 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entry
);
926 auto cache_bl
= write_entry
->get_cache_bl();
927 auto align_size
= write_entry
->get_aligned_data_size();
928 data_bl
.append(cache_bl
);
929 data_bl
.append_zero(align_size
- cache_bl
.length());
931 write_entry
->ram_entry
.write_data_pos
= *pos
;
933 if (*pos
>= this->m_log_pool_size
) {
934 *pos
= *pos
% this->m_log_pool_size
+ DATA_RING_BUFFER_OFFSET
;
937 // push_back _after_ setting write_data_pos
938 persist_log_entries
.push_back(log_entry
->ram_entry
);
943 encode(persist_log_entries
, bl
);
944 ceph_assert(bl
.length() <= MIN_WRITE_ALLOC_SSD_SIZE
);
945 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
947 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
948 if (control_block_pos
+ bl
.length() > this->m_log_pool_size
) {
949 //exceeds border, need to split
950 uint64_t size
= bl
.length();
952 bl
.splice(0, this->m_log_pool_size
- control_block_pos
, &bl1
);
953 ceph_assert(bl
.length() == (size
- bl1
.length()));
955 ldout(cct
, 20) << "write " << control_block_pos
<< "~"
956 << size
<< " spans boundary, split into "
957 << control_block_pos
<< "~" << bl1
.length()
958 << " and " << DATA_RING_BUFFER_OFFSET
<< "~"
959 << bl
.length() << dendl
;
960 bdev
->aio_write(control_block_pos
, bl1
, &aio
->ioc
, false,
962 bdev
->aio_write(DATA_RING_BUFFER_OFFSET
, bl
, &aio
->ioc
, false,
965 ldout(cct
, 20) << "write " << control_block_pos
<< "~"
966 << bl
.length() << dendl
;
967 bdev
->aio_write(control_block_pos
, bl
, &aio
->ioc
, false,
972 template <typename I
>
973 void WriteLog
<I
>::schedule_update_root(
974 std::shared_ptr
<WriteLogPoolRoot
> root
, Context
*ctx
) {
975 CephContext
*cct
= m_image_ctx
.cct
;
976 ldout(cct
, 15) << "New root: pool_size=" << root
->pool_size
977 << " first_valid_entry=" << root
->first_valid_entry
978 << " first_free_entry=" << root
->first_free_entry
979 << " flushed_sync_gen=" << root
->flushed_sync_gen
981 ceph_assert(is_valid_pool_root(*root
));
985 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
986 need_finisher
= m_poolroot_to_update
.empty() && !m_updating_pool_root
;
987 std::shared_ptr
<WriteLogPoolRootUpdate
> entry
=
988 std::make_shared
<WriteLogPoolRootUpdate
>(root
, ctx
);
989 this->m_async_update_superblock
++;
990 this->m_async_op_tracker
.start_op();
991 m_poolroot_to_update
.emplace_back(entry
);
994 enlist_op_update_root();
998 template <typename I
>
999 void WriteLog
<I
>::enlist_op_update_root() {
1000 Context
*append_ctx
= new LambdaContext([this](int r
) {
1001 update_root_scheduled_ops();
1003 this->m_work_queue
.queue(append_ctx
);
1006 template <typename I
>
1007 void WriteLog
<I
>::update_root_scheduled_ops() {
1008 ldout(m_image_ctx
.cct
, 20) << dendl
;
1010 std::shared_ptr
<WriteLogPoolRoot
> root
;
1011 WriteLogPoolRootUpdateList root_updates
;
1012 Context
*ctx
= nullptr;
1014 std::lock_guard
locker(m_lock
);
1015 if (m_updating_pool_root
) {
1016 /* Another thread is appending */
1017 ldout(m_image_ctx
.cct
, 15) << "Another thread is updating pool root"
1021 if (m_poolroot_to_update
.size()) {
1022 m_updating_pool_root
= true;
1023 root_updates
.swap(m_poolroot_to_update
);
1026 ceph_assert(!root_updates
.empty());
1027 ldout(m_image_ctx
.cct
, 15) << "Update root number: " << root_updates
.size()
1029 // We just update the last one, and call all the completions.
1030 auto entry
= root_updates
.back();
1033 ctx
= new LambdaContext([this, updates
= std::move(root_updates
)](int r
) {
1034 ldout(m_image_ctx
.cct
, 15) << "Start to callback." << dendl
;
1035 for (auto it
= updates
.begin(); it
!= updates
.end(); it
++) {
1036 Context
*it_ctx
= (*it
)->ctx
;
1037 it_ctx
->complete(r
);
1040 Context
*append_ctx
= new LambdaContext([this, ctx
](int r
) {
1041 ldout(m_image_ctx
.cct
, 15) << "Finish the update of pool root." << dendl
;
1042 bool need_finisher
= false;
1045 std::lock_guard
locker(m_lock
);
1046 m_updating_pool_root
= false;
1047 need_finisher
= !m_poolroot_to_update
.empty();
1049 if (need_finisher
) {
1050 enlist_op_update_root();
1054 AioTransContext
* aio
= new AioTransContext(m_image_ctx
.cct
, append_ctx
);
1055 update_pool_root(root
, aio
);
1058 template <typename I
>
1059 void WriteLog
<I
>::update_pool_root(std::shared_ptr
<WriteLogPoolRoot
> root
,
1060 AioTransContext
*aio
) {
1062 SuperBlock superblock
;
1063 superblock
.root
= *root
;
1064 encode(superblock
, bl
);
1065 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
1066 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
1067 bdev
->aio_write(0, bl
, &aio
->ioc
, false, WRITE_LIFE_NOT_SET
);
1068 bdev
->aio_submit(&aio
->ioc
);
1071 template <typename I
>
1072 int WriteLog
<I
>::update_pool_root_sync(
1073 std::shared_ptr
<WriteLogPoolRoot
> root
) {
1075 SuperBlock superblock
;
1076 superblock
.root
= *root
;
1077 encode(superblock
, bl
);
1078 bl
.append_zero(MIN_WRITE_ALLOC_SSD_SIZE
- bl
.length());
1079 ceph_assert(bl
.length() % MIN_WRITE_ALLOC_SSD_SIZE
== 0);
1080 return bdev
->write(0, bl
, false);
1083 template <typename I
>
1084 void WriteLog
<I
>::aio_read_data_block(std::shared_ptr
<GenericWriteLogEntry
> log_entry
,
1085 bufferlist
*bl
, Context
*ctx
) {
1086 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> log_entries
= {std::move(log_entry
)};
1087 std::vector
<bufferlist
*> bls
{bl
};
1088 aio_read_data_blocks(log_entries
, bls
, ctx
);
1091 template <typename I
>
1092 void WriteLog
<I
>::aio_read_data_blocks(
1093 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries
,
1094 std::vector
<bufferlist
*> &bls
, Context
*ctx
) {
1095 ceph_assert(log_entries
.size() == bls
.size());
1097 //get the valid part
1098 Context
*read_ctx
= new LambdaContext(
1099 [log_entries
, bls
, ctx
](int r
) {
1100 for (unsigned int i
= 0; i
< log_entries
.size(); i
++) {
1101 bufferlist valid_data_bl
;
1102 auto write_entry
= static_pointer_cast
<WriteLogEntry
>(log_entries
[i
]);
1103 auto length
= write_entry
->ram_entry
.is_write() ? write_entry
->ram_entry
.write_bytes
1104 : write_entry
->ram_entry
.ws_datalen
;
1106 valid_data_bl
.substr_of(*bls
[i
], 0, length
);
1108 bls
[i
]->append(valid_data_bl
);
1109 write_entry
->dec_bl_refs();
1114 CephContext
*cct
= m_image_ctx
.cct
;
1115 AioTransContext
*aio
= new AioTransContext(cct
, read_ctx
);
1116 for (unsigned int i
= 0; i
< log_entries
.size(); i
++) {
1117 WriteLogCacheEntry
*log_entry
= &log_entries
[i
]->ram_entry
;
1119 ceph_assert(log_entry
->is_write() || log_entry
->is_writesame());
1120 uint64_t len
= log_entry
->is_write() ? log_entry
->write_bytes
:
1121 log_entry
->ws_datalen
;
1122 uint64_t align_len
= round_up_to(len
, MIN_WRITE_ALLOC_SSD_SIZE
);
1124 ldout(cct
, 20) << "entry i=" << i
<< " " << log_entry
->write_data_pos
1125 << "~" << len
<< dendl
;
1126 ceph_assert(log_entry
->write_data_pos
>= DATA_RING_BUFFER_OFFSET
&&
1127 log_entry
->write_data_pos
< pool_root
.pool_size
);
1128 ceph_assert(align_len
);
1129 if (log_entry
->write_data_pos
+ align_len
> pool_root
.pool_size
) {
1130 // spans boundary, need to split
1131 uint64_t len1
= pool_root
.pool_size
- log_entry
->write_data_pos
;
1132 uint64_t len2
= align_len
- len1
;
1134 ldout(cct
, 20) << "read " << log_entry
->write_data_pos
<< "~"
1135 << align_len
<< " spans boundary, split into "
1136 << log_entry
->write_data_pos
<< "~" << len1
1137 << " and " << DATA_RING_BUFFER_OFFSET
<< "~"
1139 bdev
->aio_read(log_entry
->write_data_pos
, len1
, bls
[i
], &aio
->ioc
);
1140 bdev
->aio_read(DATA_RING_BUFFER_OFFSET
, len2
, bls
[i
], &aio
->ioc
);
1142 ldout(cct
, 20) << "read " << log_entry
->write_data_pos
<< "~"
1143 << align_len
<< dendl
;
1144 bdev
->aio_read(log_entry
->write_data_pos
, align_len
, bls
[i
], &aio
->ioc
);
1147 bdev
->aio_submit(&aio
->ioc
);
1150 template <typename I
>
1151 void WriteLog
<I
>::complete_user_request(Context
*&user_req
, int r
) {
1152 m_image_ctx
.op_work_queue
->queue(user_req
, r
);
1157 } // namespace cache
1158 } // namespace librbd
1160 template class librbd::cache::pwl::ssd::WriteLog
<librbd::ImageCtx
>;