1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/WorkQueue.h"
13 #include "common/Timer.h"
14 #include "common/perf_counters.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/asio/ContextWQ.h"
17 #include "librbd/cache/pwl/ImageCacheState.h"
18 #include "librbd/cache/pwl/LogEntry.h"
19 #include "librbd/plugin/Api.h"
24 #define dout_subsys ceph_subsys_rbd_pwl
26 #define dout_prefix *_dout << "librbd::cache::pwl::rwl::WriteLog: " << this \
27 << " " << __func__ << ": "
33 using namespace librbd::cache::pwl
;
36 const unsigned long int OPS_APPENDED_TOGETHER
= MAX_ALLOC_PER_TRANSACTION
;
39 Builder
<AbstractWriteLog
<I
>>* WriteLog
<I
>::create_builder() {
40 m_builderobj
= new Builder
<This
>();
45 WriteLog
<I
>::WriteLog(
46 I
&image_ctx
, librbd::cache::pwl::ImageCacheState
<I
>* cache_state
,
47 ImageWritebackInterface
& image_writeback
,
48 plugin::Api
<I
>& plugin_api
)
49 : AbstractWriteLog
<I
>(image_ctx
, cache_state
, create_builder(), image_writeback
,
51 m_pwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_pwl
))
56 WriteLog
<I
>::~WriteLog() {
62 void WriteLog
<I
>::collect_read_extents(
63 uint64_t read_buffer_offset
, LogMapEntry
<GenericWriteLogEntry
> map_entry
,
64 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries_to_read
,
65 std::vector
<bufferlist
*> &bls_to_read
, uint64_t entry_hit_length
,
66 Extent hit_extent
, pwl::C_ReadRequest
*read_ctx
) {
67 /* Make a bl for this hit extent. This will add references to the
68 * write_entry->pmem_bp */
71 /* Create buffer object referring to pmem pool for this read hit */
72 auto write_entry
= map_entry
.log_entry
;
74 buffer::list entry_bl_copy
;
75 write_entry
->copy_cache_bl(&entry_bl_copy
);
76 entry_bl_copy
.begin(read_buffer_offset
).copy(entry_hit_length
, hit_bl
);
77 ceph_assert(hit_bl
.length() == entry_hit_length
);
79 /* Add hit extent to read extents */
80 auto hit_extent_buf
= std::make_shared
<ImageExtentBuf
>(hit_extent
, hit_bl
);
81 read_ctx
->read_extents
.push_back(hit_extent_buf
);
85 void WriteLog
<I
>::complete_read(
86 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> &log_entries_to_read
,
87 std::vector
<bufferlist
*> &bls_to_read
, Context
*ctx
) {
92 * Allocate the (already reserved) write log entries for a set of operations.
98 void WriteLog
<I
>::alloc_op_log_entries(GenericLogOperations
&ops
)
100 TOID(struct WriteLogPoolRoot
) pool_root
;
101 pool_root
= POBJ_ROOT(m_log_pool
, struct WriteLogPoolRoot
);
102 struct WriteLogCacheEntry
*pmem_log_entries
= D_RW(D_RW(pool_root
)->log_entries
);
104 ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock
));
106 /* Allocate the (already reserved) log entries */
107 std::unique_lock
locker(m_lock
);
109 for (auto &operation
: ops
) {
110 uint32_t entry_index
= this->m_first_free_entry
;
111 this->m_first_free_entry
= (this->m_first_free_entry
+ 1) % this->m_total_log_entries
;
112 auto &log_entry
= operation
->get_log_entry();
113 log_entry
->log_entry_index
= entry_index
;
114 log_entry
->ram_entry
.entry_index
= entry_index
;
115 log_entry
->cache_entry
= &pmem_log_entries
[entry_index
];
116 log_entry
->ram_entry
.set_entry_valid(true);
117 m_log_entries
.push_back(log_entry
);
118 ldout(m_image_ctx
.cct
, 20) << "operation=[" << *operation
<< "]" << dendl
;
120 if (m_cache_state
->empty
&& !m_log_entries
.empty()) {
121 m_cache_state
->empty
= false;
122 this->update_image_cache_state();
123 this->write_image_cache_state(locker
);
128 * Write and persist the (already allocated) write log entries and
129 * data buffer allocations for a set of ops. The data buffer for each
130 * of these must already have been persisted to its reserved area.
132 template <typename I
>
133 int WriteLog
<I
>::append_op_log_entries(GenericLogOperations
&ops
)
135 CephContext
*cct
= m_image_ctx
.cct
;
136 GenericLogOperationsVector entries_to_flush
;
137 TOID(struct WriteLogPoolRoot
) pool_root
;
138 pool_root
= POBJ_ROOT(m_log_pool
, struct WriteLogPoolRoot
);
141 ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock
));
146 entries_to_flush
.reserve(OPS_APPENDED_TOGETHER
);
148 /* Write log entries to ring and persist */
149 utime_t now
= ceph_clock_now();
150 for (auto &operation
: ops
) {
151 if (!entries_to_flush
.empty()) {
152 /* Flush these and reset the list if the current entry wraps to the
153 * tail of the ring */
154 if (entries_to_flush
.back()->get_log_entry()->log_entry_index
>
155 operation
->get_log_entry()->log_entry_index
) {
156 ldout(m_image_ctx
.cct
, 20) << "entries to flush wrap around the end of the ring at "
157 << "operation=[" << *operation
<< "]" << dendl
;
158 flush_op_log_entries(entries_to_flush
);
159 entries_to_flush
.clear();
160 now
= ceph_clock_now();
163 ldout(m_image_ctx
.cct
, 20) << "Copying entry for operation at index="
164 << operation
->get_log_entry()->log_entry_index
165 << " from " << &operation
->get_log_entry()->ram_entry
166 << " to " << operation
->get_log_entry()->cache_entry
167 << " operation=[" << *operation
<< "]" << dendl
;
168 operation
->log_append_start_time
= now
;
169 *operation
->get_log_entry()->cache_entry
= operation
->get_log_entry()->ram_entry
;
170 ldout(m_image_ctx
.cct
, 20) << "APPENDING: index="
171 << operation
->get_log_entry()->log_entry_index
172 << " pmem_entry=[" << *operation
->get_log_entry()->cache_entry
174 entries_to_flush
.push_back(operation
);
176 flush_op_log_entries(entries_to_flush
);
178 /* Drain once for all */
179 pmemobj_drain(m_log_pool
);
182 * Atomically advance the log head pointer and publish the
183 * allocations for all the data buffers they refer to.
185 utime_t tx_start
= ceph_clock_now();
186 TX_BEGIN(m_log_pool
) {
187 D_RW(pool_root
)->first_free_entry
= this->m_first_free_entry
;
188 for (auto &operation
: ops
) {
189 if (operation
->reserved_allocated()) {
190 auto write_op
= (std::shared_ptr
<WriteLogOperation
>&) operation
;
191 pmemobj_tx_publish(&write_op
->buffer_alloc
->buffer_alloc_action
, 1);
193 ldout(m_image_ctx
.cct
, 20) << "skipping non-write op: " << *operation
<< dendl
;
198 lderr(cct
) << "failed to commit " << ops
.size()
199 << " log entries (" << this->m_log_pool_name
<< ")" << dendl
;
205 utime_t tx_end
= ceph_clock_now();
206 m_perfcounter
->tinc(l_librbd_pwl_append_tx_t
, tx_end
- tx_start
);
208 l_librbd_pwl_append_tx_t_hist
, utime_t(tx_end
- tx_start
).to_nsec(), ops
.size());
209 for (auto &operation
: ops
) {
210 operation
->log_append_comp_time
= tx_end
;
217 * Flush the persistent write log entries set of ops. The entries must
218 * be contiguous in persistent memory.
220 template <typename I
>
221 void WriteLog
<I
>::flush_op_log_entries(GenericLogOperationsVector
&ops
)
227 if (ops
.size() > 1) {
228 ceph_assert(ops
.front()->get_log_entry()->cache_entry
< ops
.back()->get_log_entry()->cache_entry
);
231 ldout(m_image_ctx
.cct
, 20) << "entry count=" << ops
.size()
233 << ops
.front()->get_log_entry()->cache_entry
235 << ops
.size() * sizeof(*(ops
.front()->get_log_entry()->cache_entry
))
237 pmemobj_flush(m_log_pool
,
238 ops
.front()->get_log_entry()->cache_entry
,
239 ops
.size() * sizeof(*(ops
.front()->get_log_entry()->cache_entry
)));
242 template <typename I
>
243 void WriteLog
<I
>::remove_pool_file() {
245 ldout(m_image_ctx
.cct
, 6) << "closing pmem pool" << dendl
;
246 pmemobj_close(m_log_pool
);
248 if (m_cache_state
->clean
) {
249 ldout(m_image_ctx
.cct
, 5) << "Removing empty pool file: " << this->m_log_pool_name
<< dendl
;
250 if (remove(this->m_log_pool_name
.c_str()) != 0) {
251 lderr(m_image_ctx
.cct
) << "failed to remove empty pool \"" << this->m_log_pool_name
<< "\": "
252 << pmemobj_errormsg() << dendl
;
254 m_cache_state
->present
= false;
257 ldout(m_image_ctx
.cct
, 5) << "Not removing pool file: " << this->m_log_pool_name
<< dendl
;
261 template <typename I
>
262 bool WriteLog
<I
>::initialize_pool(Context
*on_finish
, pwl::DeferredContexts
&later
) {
263 CephContext
*cct
= m_image_ctx
.cct
;
265 TOID(struct WriteLogPoolRoot
) pool_root
;
266 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
267 if (access(this->m_log_pool_name
.c_str(), F_OK
) != 0) {
269 pmemobj_create(this->m_log_pool_name
.c_str(),
270 this->m_pwl_pool_layout_name
,
271 this->m_log_pool_size
,
272 (S_IWUSR
| S_IRUSR
))) == NULL
) {
273 lderr(cct
) << "failed to create pool: " << this->m_log_pool_name
274 << ". error: " << pmemobj_errormsg() << dendl
;
275 m_cache_state
->present
= false;
276 m_cache_state
->clean
= true;
277 m_cache_state
->empty
= true;
278 /* TODO: filter/replace errnos that are meaningless to the caller */
279 on_finish
->complete(-errno
);
282 m_cache_state
->present
= true;
283 m_cache_state
->clean
= true;
284 m_cache_state
->empty
= true;
285 pool_root
= POBJ_ROOT(m_log_pool
, struct WriteLogPoolRoot
);
287 /* new pool, calculate and store metadata */
288 size_t effective_pool_size
= (size_t)(this->m_log_pool_size
* USABLE_SIZE
);
289 size_t small_write_size
= MIN_WRITE_ALLOC_SIZE
+ BLOCK_ALLOC_OVERHEAD_BYTES
+ sizeof(struct WriteLogCacheEntry
);
290 uint64_t num_small_writes
= (uint64_t)(effective_pool_size
/ small_write_size
);
291 if (num_small_writes
> MAX_LOG_ENTRIES
) {
292 num_small_writes
= MAX_LOG_ENTRIES
;
294 if (num_small_writes
<= 2) {
295 lderr(cct
) << "num_small_writes needs to > 2" << dendl
;
298 this->m_bytes_allocated_cap
= effective_pool_size
;
300 m_first_free_entry
= 0;
301 m_first_valid_entry
= 0;
302 TX_BEGIN(m_log_pool
) {
304 D_RW(pool_root
)->header
.layout_version
= RWL_LAYOUT_VERSION
;
305 D_RW(pool_root
)->log_entries
=
306 TX_ZALLOC(struct WriteLogCacheEntry
,
307 sizeof(struct WriteLogCacheEntry
) * num_small_writes
);
308 D_RW(pool_root
)->pool_size
= this->m_log_pool_size
;
309 D_RW(pool_root
)->flushed_sync_gen
= this->m_flushed_sync_gen
;
310 D_RW(pool_root
)->block_size
= MIN_WRITE_ALLOC_SIZE
;
311 D_RW(pool_root
)->num_log_entries
= num_small_writes
;
312 D_RW(pool_root
)->first_free_entry
= m_first_free_entry
;
313 D_RW(pool_root
)->first_valid_entry
= m_first_valid_entry
;
315 this->m_total_log_entries
= D_RO(pool_root
)->num_log_entries
;
316 this->m_free_log_entries
= D_RO(pool_root
)->num_log_entries
- 1; // leave one free
318 this->m_total_log_entries
= 0;
319 this->m_free_log_entries
= 0;
320 lderr(cct
) << "failed to initialize pool: " << this->m_log_pool_name
321 << ". pmemobj TX errno: " << pmemobj_tx_errno() << dendl
;
322 r
= -pmemobj_tx_errno();
327 ceph_assert(m_cache_state
->present
);
328 /* Open existing pool */
330 pmemobj_open(this->m_log_pool_name
.c_str(),
331 this->m_pwl_pool_layout_name
)) == NULL
) {
332 lderr(cct
) << "failed to open pool (" << this->m_log_pool_name
<< "): "
333 << pmemobj_errormsg() << dendl
;
334 on_finish
->complete(-errno
);
337 pool_root
= POBJ_ROOT(m_log_pool
, struct WriteLogPoolRoot
);
338 if (D_RO(pool_root
)->header
.layout_version
!= RWL_LAYOUT_VERSION
) {
339 // TODO: will handle upgrading version in the future
340 lderr(cct
) << "pool layout version is "
341 << D_RO(pool_root
)->header
.layout_version
342 << " expected " << RWL_LAYOUT_VERSION
<< dendl
;
345 if (D_RO(pool_root
)->block_size
!= MIN_WRITE_ALLOC_SIZE
) {
346 lderr(cct
) << "pool block size is " << D_RO(pool_root
)->block_size
347 << " expected " << MIN_WRITE_ALLOC_SIZE
<< dendl
;
350 this->m_log_pool_size
= D_RO(pool_root
)->pool_size
;
351 this->m_flushed_sync_gen
= D_RO(pool_root
)->flushed_sync_gen
;
352 this->m_total_log_entries
= D_RO(pool_root
)->num_log_entries
;
353 m_first_free_entry
= D_RO(pool_root
)->first_free_entry
;
354 m_first_valid_entry
= D_RO(pool_root
)->first_valid_entry
;
355 if (m_first_free_entry
< m_first_valid_entry
) {
356 /* Valid entries wrap around the end of the ring, so first_free is lower
357 * than first_valid. If first_valid was == first_free+1, the entry at
358 * first_free would be empty. The last entry is never used, so in
359 * that case there would be zero free log entries. */
360 this->m_free_log_entries
= this->m_total_log_entries
- (m_first_valid_entry
- m_first_free_entry
) -1;
362 /* first_valid is <= first_free. If they are == we have zero valid log
363 * entries, and n-1 free log entries */
364 this->m_free_log_entries
= this->m_total_log_entries
- (m_first_free_entry
- m_first_valid_entry
) -1;
366 size_t effective_pool_size
= (size_t)(this->m_log_pool_size
* USABLE_SIZE
);
367 this->m_bytes_allocated_cap
= effective_pool_size
;
368 load_existing_entries(later
);
369 m_cache_state
->clean
= this->m_dirty_log_entries
.empty();
370 m_cache_state
->empty
= m_log_entries
.empty();
375 pmemobj_close(m_log_pool
);
376 on_finish
->complete(r
);
381 * Loads the log entries from an existing log.
383 * Creates the in-memory structures to represent the state of the
386 * Finds the last appended sync point, and any sync points referred to
387 * in log entries, but missing from the log. These missing sync points
388 * are created and scheduled for append. Some rudimentary consistency
391 * Rebuilds the m_blocks_to_log_entries map, to make log entries
394 * Places all writes on the dirty entries list, which causes them all
399 template <typename I
>
400 void WriteLog
<I
>::load_existing_entries(DeferredContexts
&later
) {
401 TOID(struct WriteLogPoolRoot
) pool_root
;
402 pool_root
= POBJ_ROOT(m_log_pool
, struct WriteLogPoolRoot
);
403 struct WriteLogCacheEntry
*pmem_log_entries
= D_RW(D_RW(pool_root
)->log_entries
);
404 uint64_t entry_index
= m_first_valid_entry
;
405 /* The map below allows us to find sync point log entries by sync
406 * gen number, which is necessary so write entries can be linked to
407 * their sync points. */
408 std::map
<uint64_t, std::shared_ptr
<SyncPointLogEntry
>> sync_point_entries
;
409 /* The map below tracks sync points referred to in writes but not
410 * appearing in the sync_point_entries map. We'll use this to
411 * determine which sync points are missing and need to be
413 std::map
<uint64_t, bool> missing_sync_points
;
416 * Read the existing log entries. Construct an in-memory log entry
417 * object of the appropriate type for each. Add these to the global
420 * Write entries will not link to their sync points yet. We'll do
421 * that in the next pass. Here we'll accumulate a map of sync point
422 * gen numbers that are referred to in writes but do not appearing in
425 while (entry_index
!= m_first_free_entry
) {
426 WriteLogCacheEntry
*pmem_entry
= &pmem_log_entries
[entry_index
];
427 std::shared_ptr
<GenericLogEntry
> log_entry
= nullptr;
428 ceph_assert(pmem_entry
->entry_index
== entry_index
);
430 this->update_entries(&log_entry
, pmem_entry
, missing_sync_points
,
431 sync_point_entries
, entry_index
);
433 log_entry
->ram_entry
= *pmem_entry
;
434 log_entry
->cache_entry
= pmem_entry
;
435 log_entry
->log_entry_index
= entry_index
;
436 log_entry
->completed
= true;
438 m_log_entries
.push_back(log_entry
);
440 entry_index
= (entry_index
+ 1) % this->m_total_log_entries
;
443 this->update_sync_points(missing_sync_points
, sync_point_entries
, later
);
446 template <typename I
>
447 void WriteLog
<I
>::inc_allocated_cached_bytes(
448 std::shared_ptr
<pwl::GenericLogEntry
> log_entry
) {
449 if (log_entry
->is_write_entry()) {
450 this->m_bytes_allocated
+= std::max(log_entry
->write_bytes(), MIN_WRITE_ALLOC_SIZE
);
451 this->m_bytes_cached
+= log_entry
->write_bytes();
455 template <typename I
>
456 void WriteLog
<I
>::write_data_to_buffer(
457 std::shared_ptr
<pwl::WriteLogEntry
> ws_entry
,
458 WriteLogCacheEntry
*pmem_entry
) {
459 ws_entry
->cache_buffer
= D_RW(pmem_entry
->write_data
);
463 * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
464 * that are eligible to be retired. Returns true if anything was
467 template <typename I
>
468 bool WriteLog
<I
>::retire_entries(const unsigned long int frees_per_tx
) {
469 CephContext
*cct
= m_image_ctx
.cct
;
470 GenericLogEntriesVector retiring_entries
;
471 uint32_t initial_first_valid_entry
;
472 uint32_t first_valid_entry
;
474 std::lock_guard
retire_locker(this->m_log_retire_lock
);
475 ldout(cct
, 20) << "Look for entries to retire" << dendl
;
477 /* Entry readers can't be added while we hold m_entry_reader_lock */
478 RWLock::WLocker
entry_reader_locker(this->m_entry_reader_lock
);
479 std::lock_guard
locker(m_lock
);
480 initial_first_valid_entry
= this->m_first_valid_entry
;
481 first_valid_entry
= this->m_first_valid_entry
;
482 while (!m_log_entries
.empty() && retiring_entries
.size() < frees_per_tx
&&
483 this->can_retire_entry(m_log_entries
.front())) {
484 auto entry
= m_log_entries
.front();
485 if (entry
->log_entry_index
!= first_valid_entry
) {
486 lderr(cct
) << "retiring entry index (" << entry
->log_entry_index
487 << ") and first valid log entry index (" << first_valid_entry
488 << ") must be ==." << dendl
;
490 ceph_assert(entry
->log_entry_index
== first_valid_entry
);
491 first_valid_entry
= (first_valid_entry
+ 1) % this->m_total_log_entries
;
492 m_log_entries
.pop_front();
493 retiring_entries
.push_back(entry
);
494 /* Remove entry from map so there will be no more readers */
495 if ((entry
->write_bytes() > 0) || (entry
->bytes_dirty() > 0)) {
496 auto gen_write_entry
= static_pointer_cast
<GenericWriteLogEntry
>(entry
);
497 if (gen_write_entry
) {
498 this->m_blocks_to_log_entries
.remove_log_entry(gen_write_entry
);
504 if (retiring_entries
.size()) {
505 ldout(cct
, 20) << "Retiring " << retiring_entries
.size() << " entries" << dendl
;
506 TOID(struct WriteLogPoolRoot
) pool_root
;
507 pool_root
= POBJ_ROOT(m_log_pool
, struct WriteLogPoolRoot
);
511 /* Advance first valid entry and release buffers */
513 uint64_t flushed_sync_gen
;
514 std::lock_guard
append_locker(this->m_log_append_lock
);
516 std::lock_guard
locker(m_lock
);
517 flushed_sync_gen
= this->m_flushed_sync_gen
;
520 tx_start
= ceph_clock_now();
521 TX_BEGIN(m_log_pool
) {
522 if (D_RO(pool_root
)->flushed_sync_gen
< flushed_sync_gen
) {
523 ldout(m_image_ctx
.cct
, 20) << "flushed_sync_gen in log updated from "
524 << D_RO(pool_root
)->flushed_sync_gen
<< " to "
525 << flushed_sync_gen
<< dendl
;
526 D_RW(pool_root
)->flushed_sync_gen
= flushed_sync_gen
;
528 D_RW(pool_root
)->first_valid_entry
= first_valid_entry
;
529 for (auto &entry
: retiring_entries
) {
530 if (entry
->write_bytes()) {
531 ldout(cct
, 20) << "Freeing " << entry
->ram_entry
.write_data
.oid
.pool_uuid_lo
532 << "." << entry
->ram_entry
.write_data
.oid
.off
<< dendl
;
533 TX_FREE(entry
->ram_entry
.write_data
);
535 ldout(cct
, 20) << "Retiring non-write: " << *entry
<< dendl
;
540 lderr(cct
) << "failed to commit free of" << retiring_entries
.size()
541 << " log entries (" << this->m_log_pool_name
<< ")" << dendl
;
545 tx_end
= ceph_clock_now();
547 m_perfcounter
->tinc(l_librbd_pwl_retire_tx_t
, tx_end
- tx_start
);
548 m_perfcounter
->hinc(l_librbd_pwl_retire_tx_t_hist
, utime_t(tx_end
- tx_start
).to_nsec(),
549 retiring_entries
.size());
551 bool need_update_state
= false;
552 /* Update runtime copy of first_valid, and free entries counts */
554 std::lock_guard
locker(m_lock
);
556 ceph_assert(this->m_first_valid_entry
== initial_first_valid_entry
);
557 this->m_first_valid_entry
= first_valid_entry
;
558 this->m_free_log_entries
+= retiring_entries
.size();
559 if (!m_cache_state
->empty
&& m_log_entries
.empty()) {
560 m_cache_state
->empty
= true;
561 this->update_image_cache_state();
562 need_update_state
= true;
564 for (auto &entry
: retiring_entries
) {
565 if (entry
->write_bytes()) {
566 ceph_assert(this->m_bytes_cached
>= entry
->write_bytes());
567 this->m_bytes_cached
-= entry
->write_bytes();
568 uint64_t entry_allocation_size
= entry
->write_bytes();
569 if (entry_allocation_size
< MIN_WRITE_ALLOC_SIZE
) {
570 entry_allocation_size
= MIN_WRITE_ALLOC_SIZE
;
572 ceph_assert(this->m_bytes_allocated
>= entry_allocation_size
);
573 this->m_bytes_allocated
-= entry_allocation_size
;
576 this->m_alloc_failed_since_retire
= false;
579 if (need_update_state
) {
580 std::unique_lock
locker(m_lock
);
581 this->write_image_cache_state(locker
);
584 ldout(cct
, 20) << "Nothing to retire" << dendl
;
590 template <typename I
>
591 void WriteLog
<I
>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush
,
592 DeferredContexts
&post_unlock
,
593 bool has_write_entry
) {
594 bool invalidating
= this->m_invalidating
; // snapshot so we behave consistently
596 for (auto &log_entry
: entries_to_flush
) {
597 GuardedRequestFunctionContext
*guarded_ctx
=
598 new GuardedRequestFunctionContext([this, log_entry
, invalidating
]
599 (GuardedRequestFunctionContext
&guard_ctx
) {
600 log_entry
->m_cell
= guard_ctx
.cell
;
601 Context
*ctx
= this->construct_flush_entry(log_entry
, invalidating
);
604 ctx
= new LambdaContext(
605 [this, log_entry
, ctx
](int r
) {
606 m_image_ctx
.op_work_queue
->queue(new LambdaContext(
607 [this, log_entry
, ctx
](int r
) {
608 ldout(m_image_ctx
.cct
, 15) << "flushing:" << log_entry
609 << " " << *log_entry
<< dendl
;
610 log_entry
->writeback(this->m_image_writeback
, ctx
);
617 this->detain_flush_guard_request(log_entry
, guarded_ctx
);
621 const unsigned long int ops_flushed_together
= 4;
623 * Performs the pmem buffer flush on all scheduled ops, then schedules
624 * the log event append operation for all of them.
626 template <typename I
>
627 void WriteLog
<I
>::flush_then_append_scheduled_ops(void)
629 GenericLogOperations ops
;
630 bool ops_remain
= false;
631 ldout(m_image_ctx
.cct
, 20) << dendl
;
635 std::lock_guard
locker(m_lock
);
636 if (m_ops_to_flush
.size()) {
637 auto last_in_batch
= m_ops_to_flush
.begin();
638 unsigned int ops_to_flush
= m_ops_to_flush
.size();
639 if (ops_to_flush
> ops_flushed_together
) {
640 ops_to_flush
= ops_flushed_together
;
642 ldout(m_image_ctx
.cct
, 20) << "should flush " << ops_to_flush
<< dendl
;
643 std::advance(last_in_batch
, ops_to_flush
);
644 ops
.splice(ops
.end(), m_ops_to_flush
, m_ops_to_flush
.begin(), last_in_batch
);
645 ops_remain
= !m_ops_to_flush
.empty();
646 ldout(m_image_ctx
.cct
, 20) << "flushing " << ops
.size() << ", remain "
647 << m_ops_to_flush
.size() << dendl
;
656 /* Ops subsequently scheduled for flush may finish before these,
657 * which is fine. We're unconcerned with completion order until we
658 * get to the log message append step. */
660 flush_pmem_buffer(ops
);
661 schedule_append_ops(ops
, nullptr);
663 } while (ops_remain
);
664 append_scheduled_ops();
668 * Performs the log event append operation for all of the scheduled
671 template <typename I
>
672 void WriteLog
<I
>::append_scheduled_ops(void) {
673 GenericLogOperations ops
;
674 int append_result
= 0;
675 bool ops_remain
= false;
676 bool appending
= false; /* true if we set m_appending */
677 ldout(m_image_ctx
.cct
, 20) << dendl
;
680 this->append_scheduled(ops
, ops_remain
, appending
, true);
683 std::lock_guard
locker(this->m_log_append_lock
);
684 alloc_op_log_entries(ops
);
685 append_result
= append_op_log_entries(ops
);
688 int num_ops
= ops
.size();
690 /* New entries may be flushable. Completion will wake up flusher. */
691 this->complete_op_log_entries(std::move(ops
), append_result
);
693 } while (ops_remain
);
696 template <typename I
>
697 void WriteLog
<I
>::enlist_op_flusher()
699 this->m_async_flush_ops
++;
700 this->m_async_op_tracker
.start_op();
701 Context
*flush_ctx
= new LambdaContext([this](int r
) {
702 flush_then_append_scheduled_ops();
703 this->m_async_flush_ops
--;
704 this->m_async_op_tracker
.finish_op();
706 this->m_work_queue
.queue(flush_ctx
);
709 template <typename I
>
710 void WriteLog
<I
>::setup_schedule_append(
711 pwl::GenericLogOperationsVector
&ops
, bool do_early_flush
,
712 C_BlockIORequestT
*req
) {
713 if (do_early_flush
) {
714 /* This caller is waiting for persist, so we'll use their thread to
716 flush_pmem_buffer(ops
);
717 this->schedule_append(ops
);
719 /* This is probably not still the caller's thread, so do the payload
720 * flushing/replicating later. */
721 schedule_flush_and_append(ops
);
726 * Takes custody of ops. They'll all get their log entries appended,
727 * and have their on_write_persist contexts completed once they and
728 * all prior log entries are persisted everywhere.
730 template <typename I
>
731 void WriteLog
<I
>::schedule_append_ops(GenericLogOperations
&ops
, C_BlockIORequestT
*req
)
734 GenericLogOperationsVector appending
;
736 std::copy(std::begin(ops
), std::end(ops
), std::back_inserter(appending
));
738 std::lock_guard
locker(m_lock
);
740 need_finisher
= this->m_ops_to_append
.empty() && !this->m_appending
;
741 this->m_ops_to_append
.splice(this->m_ops_to_append
.end(), ops
);
746 this->m_async_append_ops
++;
747 this->m_async_op_tracker
.start_op();
748 Context
*append_ctx
= new LambdaContext([this](int r
) {
749 append_scheduled_ops();
750 this->m_async_append_ops
--;
751 this->m_async_op_tracker
.finish_op();
753 this->m_work_queue
.queue(append_ctx
);
756 for (auto &op
: appending
) {
762 * Takes custody of ops. They'll all get their pmem blocks flushed,
763 * then get their log entries appended.
765 template <typename I
>
766 void WriteLog
<I
>::schedule_flush_and_append(GenericLogOperationsVector
&ops
)
768 GenericLogOperations
to_flush(ops
.begin(), ops
.end());
770 ldout(m_image_ctx
.cct
, 20) << dendl
;
772 std::lock_guard
locker(m_lock
);
774 need_finisher
= m_ops_to_flush
.empty();
775 m_ops_to_flush
.splice(m_ops_to_flush
.end(), to_flush
);
783 template <typename I
>
784 void WriteLog
<I
>::process_work() {
785 CephContext
*cct
= m_image_ctx
.cct
;
786 int max_iterations
= 4;
787 bool wake_up_requested
= false;
788 uint64_t aggressive_high_water_bytes
= this->m_bytes_allocated_cap
* AGGRESSIVE_RETIRE_HIGH_WATER
;
789 uint64_t high_water_bytes
= this->m_bytes_allocated_cap
* RETIRE_HIGH_WATER
;
790 uint64_t low_water_bytes
= this->m_bytes_allocated_cap
* RETIRE_LOW_WATER
;
791 uint64_t aggressive_high_water_entries
= this->m_total_log_entries
* AGGRESSIVE_RETIRE_HIGH_WATER
;
792 uint64_t high_water_entries
= this->m_total_log_entries
* RETIRE_HIGH_WATER
;
793 uint64_t low_water_entries
= this->m_total_log_entries
* RETIRE_LOW_WATER
;
795 ldout(cct
, 20) << dendl
;
799 std::lock_guard
locker(m_lock
);
800 this->m_wake_up_requested
= false;
802 if (this->m_alloc_failed_since_retire
|| this->m_invalidating
||
803 this->m_bytes_allocated
> high_water_bytes
||
804 (m_log_entries
.size() > high_water_entries
)) {
806 utime_t started
= ceph_clock_now();
807 ldout(m_image_ctx
.cct
, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
808 << ", allocated > high_water="
809 << (this->m_bytes_allocated
> high_water_bytes
)
810 << ", allocated_entries > high_water="
811 << (m_log_entries
.size() > high_water_entries
)
813 while (this->m_alloc_failed_since_retire
|| this->m_invalidating
||
814 (this->m_bytes_allocated
> high_water_bytes
) ||
815 (m_log_entries
.size() > high_water_entries
) ||
816 (((this->m_bytes_allocated
> low_water_bytes
) ||
817 (m_log_entries
.size() > low_water_entries
)) &&
818 (utime_t(ceph_clock_now() - started
).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS
))) {
819 if (!retire_entries((this->m_shutting_down
|| this->m_invalidating
||
820 (this->m_bytes_allocated
> aggressive_high_water_bytes
) ||
821 (m_log_entries
.size() > aggressive_high_water_entries
) ||
822 this->m_alloc_failed_since_retire
)
823 ? MAX_ALLOC_PER_TRANSACTION
824 : MAX_FREE_PER_TRANSACTION
)) {
828 this->dispatch_deferred_writes();
829 this->process_writeback_dirty_entries();
831 ldout(m_image_ctx
.cct
, 10) << "Retired " << retired
<< " times" << dendl
;
833 this->dispatch_deferred_writes();
834 this->process_writeback_dirty_entries();
837 std::lock_guard
locker(m_lock
);
838 wake_up_requested
= this->m_wake_up_requested
;
840 } while (wake_up_requested
&& --max_iterations
> 0);
843 std::lock_guard
locker(m_lock
);
844 this->m_wake_up_scheduled
= false;
845 /* Reschedule if it's still requested */
846 if (this->m_wake_up_requested
) {
853 * Flush the pmem regions for the data blocks of a set of operations
855 * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
857 template <typename I
>
858 template <typename V
>
859 void WriteLog
<I
>::flush_pmem_buffer(V
& ops
)
861 utime_t now
= ceph_clock_now();
862 for (auto &operation
: ops
) {
863 if (operation
->reserved_allocated()) {
864 operation
->buf_persist_start_time
= now
;
866 ldout(m_image_ctx
.cct
, 20) << "skipping non-write op: "
867 << *operation
<< dendl
;
871 for (auto &operation
: ops
) {
872 if(operation
->is_writing_op()) {
873 auto log_entry
= static_pointer_cast
<WriteLogEntry
>(operation
->get_log_entry());
874 pmemobj_flush(m_log_pool
, log_entry
->cache_buffer
, log_entry
->write_bytes());
878 /* Drain once for all */
879 pmemobj_drain(m_log_pool
);
881 now
= ceph_clock_now();
882 for (auto &operation
: ops
) {
883 if (operation
->reserved_allocated()) {
884 operation
->buf_persist_comp_time
= now
;
886 ldout(m_image_ctx
.cct
, 20) << "skipping non-write op: "
887 << *operation
<< dendl
;
893 * Update/persist the last flushed sync point in the log
895 template <typename I
>
896 void WriteLog
<I
>::persist_last_flushed_sync_gen()
898 TOID(struct WriteLogPoolRoot
) pool_root
;
899 pool_root
= POBJ_ROOT(m_log_pool
, struct WriteLogPoolRoot
);
900 uint64_t flushed_sync_gen
;
902 std::lock_guard
append_locker(this->m_log_append_lock
);
904 std::lock_guard
locker(m_lock
);
905 flushed_sync_gen
= this->m_flushed_sync_gen
;
908 if (D_RO(pool_root
)->flushed_sync_gen
< flushed_sync_gen
) {
909 ldout(m_image_ctx
.cct
, 15) << "flushed_sync_gen in log updated from "
910 << D_RO(pool_root
)->flushed_sync_gen
<< " to "
911 << flushed_sync_gen
<< dendl
;
912 TX_BEGIN(m_log_pool
) {
913 D_RW(pool_root
)->flushed_sync_gen
= flushed_sync_gen
;
916 lderr(m_image_ctx
.cct
) << "failed to commit update of flushed sync point" << dendl
;
923 template <typename I
>
924 void WriteLog
<I
>::reserve_cache(C_BlockIORequestT
*req
,
925 bool &alloc_succeeds
, bool &no_space
) {
926 std::vector
<WriteBufferAllocation
>& buffers
= req
->get_resources_buffers();
927 for (auto &buffer
: buffers
) {
928 utime_t before_reserve
= ceph_clock_now();
929 buffer
.buffer_oid
= pmemobj_reserve(m_log_pool
,
930 &buffer
.buffer_alloc_action
,
931 buffer
.allocation_size
,
932 0 /* Object type */);
933 buffer
.allocation_lat
= ceph_clock_now() - before_reserve
;
934 if (TOID_IS_NULL(buffer
.buffer_oid
)) {
935 ldout(m_image_ctx
.cct
, 5) << "can't allocate all data buffers: "
936 << pmemobj_errormsg() << ". "
938 alloc_succeeds
= false;
939 no_space
= true; /* Entries need to be retired */
941 if (this->m_free_log_entries
== this->m_total_log_entries
- 1) {
942 /* When the cache is empty, there is still no space to allocate.
944 pmemobj_defrag(m_log_pool
, NULL
, 0, NULL
);
948 buffer
.allocated
= true;
950 ldout(m_image_ctx
.cct
, 20) << "Allocated " << buffer
.buffer_oid
.oid
.pool_uuid_lo
951 << "." << buffer
.buffer_oid
.oid
.off
952 << ", size=" << buffer
.allocation_size
<< dendl
;
957 void WriteLog
<I
>::copy_bl_to_buffer(
958 WriteRequestResources
*resources
, std::unique_ptr
<WriteLogOperationSet
> &op_set
) {
959 auto allocation
= resources
->buffers
.begin();
960 for (auto &operation
: op_set
->operations
) {
961 operation
->copy_bl_to_cache_buffer(allocation
);
966 template <typename I
>
967 bool WriteLog
<I
>::alloc_resources(C_BlockIORequestT
*req
) {
968 bool alloc_succeeds
= true;
969 uint64_t bytes_allocated
= 0;
970 uint64_t bytes_cached
= 0;
971 uint64_t bytes_dirtied
= 0;
972 uint64_t num_lanes
= 0;
973 uint64_t num_unpublished_reserves
= 0;
974 uint64_t num_log_entries
= 0;
976 ldout(m_image_ctx
.cct
, 20) << dendl
;
977 // Setup buffer, and get all the number of required resources
978 req
->setup_buffer_resources(&bytes_cached
, &bytes_dirtied
, &bytes_allocated
,
979 &num_lanes
, &num_log_entries
, &num_unpublished_reserves
);
981 alloc_succeeds
= this->check_allocation(req
, bytes_cached
, bytes_dirtied
,
982 bytes_allocated
, num_lanes
, num_log_entries
,
983 num_unpublished_reserves
);
985 std::vector
<WriteBufferAllocation
>& buffers
= req
->get_resources_buffers();
986 if (!alloc_succeeds
) {
987 /* On alloc failure, free any buffers we did allocate */
988 for (auto &buffer
: buffers
) {
989 if (buffer
.allocated
) {
990 pmemobj_cancel(m_log_pool
, &buffer
.buffer_alloc_action
, 1);
995 req
->set_allocated(alloc_succeeds
);
996 return alloc_succeeds
;
999 template <typename I
>
1000 void WriteLog
<I
>::complete_user_request(Context
*&user_req
, int r
) {
1001 user_req
->complete(r
);
1002 // Set user_req as null as it is deleted
1008 } // namespace cache
1009 } // namespace librbd
1011 template class librbd::cache::pwl::rwl::WriteLog
<librbd::ImageCtx
>;