]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/pwl/rwl/WriteLog.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / librbd / cache / pwl / rwl / WriteLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "WriteLog.h"
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/WorkQueue.h"
13 #include "common/Timer.h"
14 #include "common/perf_counters.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/asio/ContextWQ.h"
17 #include "librbd/cache/pwl/ImageCacheState.h"
18 #include "librbd/cache/pwl/LogEntry.h"
19 #include "librbd/plugin/Api.h"
20 #include <map>
21 #include <vector>
22
23 #undef dout_subsys
24 #define dout_subsys ceph_subsys_rbd_pwl
25 #undef dout_prefix
26 #define dout_prefix *_dout << "librbd::cache::pwl::rwl::WriteLog: " << this \
27 << " " << __func__ << ": "
28
29 namespace librbd {
30 namespace cache {
31 namespace pwl {
32 using namespace std;
33 using namespace librbd::cache::pwl;
34 namespace rwl {
35
36 const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
37
38 template <typename I>
39 Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() {
40 m_builderobj = new Builder<This>();
41 return m_builderobj;
42 }
43
44 template <typename I>
45 WriteLog<I>::WriteLog(
46 I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
47 ImageWritebackInterface& image_writeback,
48 plugin::Api<I>& plugin_api)
49 : AbstractWriteLog<I>(image_ctx, cache_state, create_builder(), image_writeback,
50 plugin_api),
51 m_pwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_pwl))
52 {
53 }
54
55 template <typename I>
56 WriteLog<I>::~WriteLog() {
57 m_log_pool = nullptr;
58 delete m_builderobj;
59 }
60
61 template <typename I>
62 void WriteLog<I>::collect_read_extents(
63 uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
64 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
65 std::vector<bufferlist*> &bls_to_read, uint64_t entry_hit_length,
66 Extent hit_extent, pwl::C_ReadRequest *read_ctx) {
67 /* Make a bl for this hit extent. This will add references to the
68 * write_entry->pmem_bp */
69 buffer::list hit_bl;
70
71 /* Create buffer object referring to pmem pool for this read hit */
72 auto write_entry = map_entry.log_entry;
73
74 buffer::list entry_bl_copy;
75 write_entry->copy_cache_bl(&entry_bl_copy);
76 entry_bl_copy.begin(read_buffer_offset).copy(entry_hit_length, hit_bl);
77 ceph_assert(hit_bl.length() == entry_hit_length);
78
79 /* Add hit extent to read extents */
80 auto hit_extent_buf = std::make_shared<ImageExtentBuf>(hit_extent, hit_bl);
81 read_ctx->read_extents.push_back(hit_extent_buf);
82 }
83
84 template <typename I>
85 void WriteLog<I>::complete_read(
86 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
87 std::vector<bufferlist*> &bls_to_read, Context *ctx) {
88 ctx->complete(0);
89 }
90
91 /*
92 * Allocate the (already reserved) write log entries for a set of operations.
93 *
94 * Locking:
95 * Acquires lock
96 */
97 template <typename I>
98 void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
99 {
100 TOID(struct WriteLogPoolRoot) pool_root;
101 pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
102 struct WriteLogCacheEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
103
104 ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
105
106 /* Allocate the (already reserved) log entries */
107 std::unique_lock locker(m_lock);
108
109 for (auto &operation : ops) {
110 uint32_t entry_index = this->m_first_free_entry;
111 this->m_first_free_entry = (this->m_first_free_entry + 1) % this->m_total_log_entries;
112 auto &log_entry = operation->get_log_entry();
113 log_entry->log_entry_index = entry_index;
114 log_entry->ram_entry.entry_index = entry_index;
115 log_entry->cache_entry = &pmem_log_entries[entry_index];
116 log_entry->ram_entry.set_entry_valid(true);
117 m_log_entries.push_back(log_entry);
118 ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
119 }
120 if (m_cache_state->empty && !m_log_entries.empty()) {
121 m_cache_state->empty = false;
122 this->update_image_cache_state();
123 this->write_image_cache_state(locker);
124 }
125 }
126
127 /*
128 * Write and persist the (already allocated) write log entries and
129 * data buffer allocations for a set of ops. The data buffer for each
130 * of these must already have been persisted to its reserved area.
131 */
132 template <typename I>
133 int WriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
134 {
135 CephContext *cct = m_image_ctx.cct;
136 GenericLogOperationsVector entries_to_flush;
137 TOID(struct WriteLogPoolRoot) pool_root;
138 pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
139 int ret = 0;
140
141 ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
142
143 if (ops.empty()) {
144 return 0;
145 }
146 entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
147
148 /* Write log entries to ring and persist */
149 utime_t now = ceph_clock_now();
150 for (auto &operation : ops) {
151 if (!entries_to_flush.empty()) {
152 /* Flush these and reset the list if the current entry wraps to the
153 * tail of the ring */
154 if (entries_to_flush.back()->get_log_entry()->log_entry_index >
155 operation->get_log_entry()->log_entry_index) {
156 ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
157 << "operation=[" << *operation << "]" << dendl;
158 flush_op_log_entries(entries_to_flush);
159 entries_to_flush.clear();
160 now = ceph_clock_now();
161 }
162 }
163 ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
164 << operation->get_log_entry()->log_entry_index
165 << " from " << &operation->get_log_entry()->ram_entry
166 << " to " << operation->get_log_entry()->cache_entry
167 << " operation=[" << *operation << "]" << dendl;
168 operation->log_append_start_time = now;
169 *operation->get_log_entry()->cache_entry = operation->get_log_entry()->ram_entry;
170 ldout(m_image_ctx.cct, 20) << "APPENDING: index="
171 << operation->get_log_entry()->log_entry_index
172 << " pmem_entry=[" << *operation->get_log_entry()->cache_entry
173 << "]" << dendl;
174 entries_to_flush.push_back(operation);
175 }
176 flush_op_log_entries(entries_to_flush);
177
178 /* Drain once for all */
179 pmemobj_drain(m_log_pool);
180
181 /*
182 * Atomically advance the log head pointer and publish the
183 * allocations for all the data buffers they refer to.
184 */
185 utime_t tx_start = ceph_clock_now();
186 TX_BEGIN(m_log_pool) {
187 D_RW(pool_root)->first_free_entry = this->m_first_free_entry;
188 for (auto &operation : ops) {
189 if (operation->reserved_allocated()) {
190 auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
191 pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
192 } else {
193 ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
194 }
195 }
196 } TX_ONCOMMIT {
197 } TX_ONABORT {
198 lderr(cct) << "failed to commit " << ops.size()
199 << " log entries (" << this->m_log_pool_name << ")" << dendl;
200 ceph_assert(false);
201 ret = -EIO;
202 } TX_FINALLY {
203 } TX_END;
204
205 utime_t tx_end = ceph_clock_now();
206 m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start);
207 m_perfcounter->hinc(
208 l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
209 for (auto &operation : ops) {
210 operation->log_append_comp_time = tx_end;
211 }
212
213 return ret;
214 }
215
216 /*
217 * Flush the persistent write log entries set of ops. The entries must
218 * be contiguous in persistent memory.
219 */
220 template <typename I>
221 void WriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
222 {
223 if (ops.empty()) {
224 return;
225 }
226
227 if (ops.size() > 1) {
228 ceph_assert(ops.front()->get_log_entry()->cache_entry < ops.back()->get_log_entry()->cache_entry);
229 }
230
231 ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size()
232 << " start address="
233 << ops.front()->get_log_entry()->cache_entry
234 << " bytes="
235 << ops.size() * sizeof(*(ops.front()->get_log_entry()->cache_entry))
236 << dendl;
237 pmemobj_flush(m_log_pool,
238 ops.front()->get_log_entry()->cache_entry,
239 ops.size() * sizeof(*(ops.front()->get_log_entry()->cache_entry)));
240 }
241
242 template <typename I>
243 void WriteLog<I>::remove_pool_file() {
244 if (m_log_pool) {
245 ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
246 pmemobj_close(m_log_pool);
247 }
248 if (m_cache_state->clean) {
249 ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << this->m_log_pool_name << dendl;
250 if (remove(this->m_log_pool_name.c_str()) != 0) {
251 lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << this->m_log_pool_name << "\": "
252 << pmemobj_errormsg() << dendl;
253 } else {
254 m_cache_state->present = false;
255 }
256 } else {
257 ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << this->m_log_pool_name << dendl;
258 }
259 }
260
261 template <typename I>
262 bool WriteLog<I>::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) {
263 CephContext *cct = m_image_ctx.cct;
264 int r = -EINVAL;
265 TOID(struct WriteLogPoolRoot) pool_root;
266 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
267 if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
268 if ((m_log_pool =
269 pmemobj_create(this->m_log_pool_name.c_str(),
270 this->m_pwl_pool_layout_name,
271 this->m_log_pool_size,
272 (S_IWUSR | S_IRUSR))) == NULL) {
273 lderr(cct) << "failed to create pool: " << this->m_log_pool_name
274 << ". error: " << pmemobj_errormsg() << dendl;
275 m_cache_state->present = false;
276 m_cache_state->clean = true;
277 m_cache_state->empty = true;
278 /* TODO: filter/replace errnos that are meaningless to the caller */
279 on_finish->complete(-errno);
280 return false;
281 }
282 m_cache_state->present = true;
283 m_cache_state->clean = true;
284 m_cache_state->empty = true;
285 pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
286
287 /* new pool, calculate and store metadata */
288 size_t effective_pool_size = (size_t)(this->m_log_pool_size * USABLE_SIZE);
289 size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogCacheEntry);
290 uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size);
291 if (num_small_writes > MAX_LOG_ENTRIES) {
292 num_small_writes = MAX_LOG_ENTRIES;
293 }
294 if (num_small_writes <= 2) {
295 lderr(cct) << "num_small_writes needs to > 2" << dendl;
296 goto err_close_pool;
297 }
298 this->m_bytes_allocated_cap = effective_pool_size;
299 /* Log ring empty */
300 m_first_free_entry = 0;
301 m_first_valid_entry = 0;
302 TX_BEGIN(m_log_pool) {
303 TX_ADD(pool_root);
304 D_RW(pool_root)->header.layout_version = RWL_LAYOUT_VERSION;
305 D_RW(pool_root)->log_entries =
306 TX_ZALLOC(struct WriteLogCacheEntry,
307 sizeof(struct WriteLogCacheEntry) * num_small_writes);
308 D_RW(pool_root)->pool_size = this->m_log_pool_size;
309 D_RW(pool_root)->flushed_sync_gen = this->m_flushed_sync_gen;
310 D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE;
311 D_RW(pool_root)->num_log_entries = num_small_writes;
312 D_RW(pool_root)->first_free_entry = m_first_free_entry;
313 D_RW(pool_root)->first_valid_entry = m_first_valid_entry;
314 } TX_ONCOMMIT {
315 this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
316 this->m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free
317 } TX_ONABORT {
318 this->m_total_log_entries = 0;
319 this->m_free_log_entries = 0;
320 lderr(cct) << "failed to initialize pool: " << this->m_log_pool_name
321 << ". pmemobj TX errno: " << pmemobj_tx_errno() << dendl;
322 r = -pmemobj_tx_errno();
323 goto err_close_pool;
324 } TX_FINALLY {
325 } TX_END;
326 } else {
327 ceph_assert(m_cache_state->present);
328 /* Open existing pool */
329 if ((m_log_pool =
330 pmemobj_open(this->m_log_pool_name.c_str(),
331 this->m_pwl_pool_layout_name)) == NULL) {
332 lderr(cct) << "failed to open pool (" << this->m_log_pool_name << "): "
333 << pmemobj_errormsg() << dendl;
334 on_finish->complete(-errno);
335 return false;
336 }
337 pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
338 if (D_RO(pool_root)->header.layout_version != RWL_LAYOUT_VERSION) {
339 // TODO: will handle upgrading version in the future
340 lderr(cct) << "pool layout version is "
341 << D_RO(pool_root)->header.layout_version
342 << " expected " << RWL_LAYOUT_VERSION << dendl;
343 goto err_close_pool;
344 }
345 if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
346 lderr(cct) << "pool block size is " << D_RO(pool_root)->block_size
347 << " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
348 goto err_close_pool;
349 }
350 this->m_log_pool_size = D_RO(pool_root)->pool_size;
351 this->m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
352 this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
353 m_first_free_entry = D_RO(pool_root)->first_free_entry;
354 m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
355 if (m_first_free_entry < m_first_valid_entry) {
356 /* Valid entries wrap around the end of the ring, so first_free is lower
357 * than first_valid. If first_valid was == first_free+1, the entry at
358 * first_free would be empty. The last entry is never used, so in
359 * that case there would be zero free log entries. */
360 this->m_free_log_entries = this->m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
361 } else {
362 /* first_valid is <= first_free. If they are == we have zero valid log
363 * entries, and n-1 free log entries */
364 this->m_free_log_entries = this->m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
365 }
366 size_t effective_pool_size = (size_t)(this->m_log_pool_size * USABLE_SIZE);
367 this->m_bytes_allocated_cap = effective_pool_size;
368 load_existing_entries(later);
369 m_cache_state->clean = this->m_dirty_log_entries.empty();
370 m_cache_state->empty = m_log_entries.empty();
371 }
372 return true;
373
374 err_close_pool:
375 pmemobj_close(m_log_pool);
376 on_finish->complete(r);
377 return false;
378 }
379
380 /*
381 * Loads the log entries from an existing log.
382 *
383 * Creates the in-memory structures to represent the state of the
384 * re-opened log.
385 *
386 * Finds the last appended sync point, and any sync points referred to
387 * in log entries, but missing from the log. These missing sync points
388 * are created and scheduled for append. Some rudimentary consistency
389 * checking is done.
390 *
391 * Rebuilds the m_blocks_to_log_entries map, to make log entries
392 * readable.
393 *
394 * Places all writes on the dirty entries list, which causes them all
395 * to be flushed.
396 *
397 */
398
399 template <typename I>
400 void WriteLog<I>::load_existing_entries(DeferredContexts &later) {
401 TOID(struct WriteLogPoolRoot) pool_root;
402 pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
403 struct WriteLogCacheEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
404 uint64_t entry_index = m_first_valid_entry;
405 /* The map below allows us to find sync point log entries by sync
406 * gen number, which is necessary so write entries can be linked to
407 * their sync points. */
408 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
409 /* The map below tracks sync points referred to in writes but not
410 * appearing in the sync_point_entries map. We'll use this to
411 * determine which sync points are missing and need to be
412 * created. */
413 std::map<uint64_t, bool> missing_sync_points;
414
415 /*
416 * Read the existing log entries. Construct an in-memory log entry
417 * object of the appropriate type for each. Add these to the global
418 * log entries list.
419 *
420 * Write entries will not link to their sync points yet. We'll do
421 * that in the next pass. Here we'll accumulate a map of sync point
422 * gen numbers that are referred to in writes but do not appearing in
423 * the log.
424 */
425 while (entry_index != m_first_free_entry) {
426 WriteLogCacheEntry *pmem_entry = &pmem_log_entries[entry_index];
427 std::shared_ptr<GenericLogEntry> log_entry = nullptr;
428 ceph_assert(pmem_entry->entry_index == entry_index);
429
430 this->update_entries(&log_entry, pmem_entry, missing_sync_points,
431 sync_point_entries, entry_index);
432
433 log_entry->ram_entry = *pmem_entry;
434 log_entry->cache_entry = pmem_entry;
435 log_entry->log_entry_index = entry_index;
436 log_entry->completed = true;
437
438 m_log_entries.push_back(log_entry);
439
440 entry_index = (entry_index + 1) % this->m_total_log_entries;
441 }
442
443 this->update_sync_points(missing_sync_points, sync_point_entries, later);
444 }
445
446 template <typename I>
447 void WriteLog<I>::inc_allocated_cached_bytes(
448 std::shared_ptr<pwl::GenericLogEntry> log_entry) {
449 if (log_entry->is_write_entry()) {
450 this->m_bytes_allocated += std::max(log_entry->write_bytes(), MIN_WRITE_ALLOC_SIZE);
451 this->m_bytes_cached += log_entry->write_bytes();
452 }
453 }
454
455 template <typename I>
456 void WriteLog<I>::write_data_to_buffer(
457 std::shared_ptr<pwl::WriteLogEntry> ws_entry,
458 WriteLogCacheEntry *pmem_entry) {
459 ws_entry->cache_buffer = D_RW(pmem_entry->write_data);
460 }
461
462 /**
463 * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
464 * that are eligible to be retired. Returns true if anything was
465 * retired.
466 */
467 template <typename I>
468 bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
469 CephContext *cct = m_image_ctx.cct;
470 GenericLogEntriesVector retiring_entries;
471 uint32_t initial_first_valid_entry;
472 uint32_t first_valid_entry;
473
474 std::lock_guard retire_locker(this->m_log_retire_lock);
475 ldout(cct, 20) << "Look for entries to retire" << dendl;
476 {
477 /* Entry readers can't be added while we hold m_entry_reader_lock */
478 RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
479 std::lock_guard locker(m_lock);
480 initial_first_valid_entry = this->m_first_valid_entry;
481 first_valid_entry = this->m_first_valid_entry;
482 while (!m_log_entries.empty() && retiring_entries.size() < frees_per_tx &&
483 this->can_retire_entry(m_log_entries.front())) {
484 auto entry = m_log_entries.front();
485 if (entry->log_entry_index != first_valid_entry) {
486 lderr(cct) << "retiring entry index (" << entry->log_entry_index
487 << ") and first valid log entry index (" << first_valid_entry
488 << ") must be ==." << dendl;
489 }
490 ceph_assert(entry->log_entry_index == first_valid_entry);
491 first_valid_entry = (first_valid_entry + 1) % this->m_total_log_entries;
492 m_log_entries.pop_front();
493 retiring_entries.push_back(entry);
494 /* Remove entry from map so there will be no more readers */
495 if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
496 auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
497 if (gen_write_entry) {
498 this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
499 }
500 }
501 }
502 }
503
504 if (retiring_entries.size()) {
505 ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
506 TOID(struct WriteLogPoolRoot) pool_root;
507 pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
508
509 utime_t tx_start;
510 utime_t tx_end;
511 /* Advance first valid entry and release buffers */
512 {
513 uint64_t flushed_sync_gen;
514 std::lock_guard append_locker(this->m_log_append_lock);
515 {
516 std::lock_guard locker(m_lock);
517 flushed_sync_gen = this->m_flushed_sync_gen;
518 }
519
520 tx_start = ceph_clock_now();
521 TX_BEGIN(m_log_pool) {
522 if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
523 ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
524 << D_RO(pool_root)->flushed_sync_gen << " to "
525 << flushed_sync_gen << dendl;
526 D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
527 }
528 D_RW(pool_root)->first_valid_entry = first_valid_entry;
529 for (auto &entry: retiring_entries) {
530 if (entry->write_bytes()) {
531 ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
532 << "." << entry->ram_entry.write_data.oid.off << dendl;
533 TX_FREE(entry->ram_entry.write_data);
534 } else {
535 ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
536 }
537 }
538 } TX_ONCOMMIT {
539 } TX_ONABORT {
540 lderr(cct) << "failed to commit free of" << retiring_entries.size()
541 << " log entries (" << this->m_log_pool_name << ")" << dendl;
542 ceph_assert(false);
543 } TX_FINALLY {
544 } TX_END;
545 tx_end = ceph_clock_now();
546 }
547 m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start);
548 m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(),
549 retiring_entries.size());
550
551 bool need_update_state = false;
552 /* Update runtime copy of first_valid, and free entries counts */
553 {
554 std::lock_guard locker(m_lock);
555
556 ceph_assert(this->m_first_valid_entry == initial_first_valid_entry);
557 this->m_first_valid_entry = first_valid_entry;
558 this->m_free_log_entries += retiring_entries.size();
559 if (!m_cache_state->empty && m_log_entries.empty()) {
560 m_cache_state->empty = true;
561 this->update_image_cache_state();
562 need_update_state = true;
563 }
564 for (auto &entry: retiring_entries) {
565 if (entry->write_bytes()) {
566 ceph_assert(this->m_bytes_cached >= entry->write_bytes());
567 this->m_bytes_cached -= entry->write_bytes();
568 uint64_t entry_allocation_size = entry->write_bytes();
569 if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
570 entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
571 }
572 ceph_assert(this->m_bytes_allocated >= entry_allocation_size);
573 this->m_bytes_allocated -= entry_allocation_size;
574 }
575 }
576 this->m_alloc_failed_since_retire = false;
577 this->wake_up();
578 }
579 if (need_update_state) {
580 std::unique_lock locker(m_lock);
581 this->write_image_cache_state(locker);
582 }
583 } else {
584 ldout(cct, 20) << "Nothing to retire" << dendl;
585 return false;
586 }
587 return true;
588 }
589
590 template <typename I>
591 void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
592 DeferredContexts &post_unlock,
593 bool has_write_entry) {
594 bool invalidating = this->m_invalidating; // snapshot so we behave consistently
595
596 for (auto &log_entry : entries_to_flush) {
597 GuardedRequestFunctionContext *guarded_ctx =
598 new GuardedRequestFunctionContext([this, log_entry, invalidating]
599 (GuardedRequestFunctionContext &guard_ctx) {
600 log_entry->m_cell = guard_ctx.cell;
601 Context *ctx = this->construct_flush_entry(log_entry, invalidating);
602
603 if (!invalidating) {
604 ctx = new LambdaContext(
605 [this, log_entry, ctx](int r) {
606 m_image_ctx.op_work_queue->queue(new LambdaContext(
607 [this, log_entry, ctx](int r) {
608 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
609 << " " << *log_entry << dendl;
610 log_entry->writeback(this->m_image_writeback, ctx);
611 }), 0);
612 });
613 }
614
615 ctx->complete(0);
616 });
617 this->detain_flush_guard_request(log_entry, guarded_ctx);
618 }
619 }
620
621 const unsigned long int ops_flushed_together = 4;
622 /*
623 * Performs the pmem buffer flush on all scheduled ops, then schedules
624 * the log event append operation for all of them.
625 */
626 template <typename I>
627 void WriteLog<I>::flush_then_append_scheduled_ops(void)
628 {
629 GenericLogOperations ops;
630 bool ops_remain = false;
631 ldout(m_image_ctx.cct, 20) << dendl;
632 do {
633 {
634 ops.clear();
635 std::lock_guard locker(m_lock);
636 if (m_ops_to_flush.size()) {
637 auto last_in_batch = m_ops_to_flush.begin();
638 unsigned int ops_to_flush = m_ops_to_flush.size();
639 if (ops_to_flush > ops_flushed_together) {
640 ops_to_flush = ops_flushed_together;
641 }
642 ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
643 std::advance(last_in_batch, ops_to_flush);
644 ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
645 ops_remain = !m_ops_to_flush.empty();
646 ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", remain "
647 << m_ops_to_flush.size() << dendl;
648 } else {
649 ops_remain = false;
650 }
651 }
652 if (ops_remain) {
653 enlist_op_flusher();
654 }
655
656 /* Ops subsequently scheduled for flush may finish before these,
657 * which is fine. We're unconcerned with completion order until we
658 * get to the log message append step. */
659 if (ops.size()) {
660 flush_pmem_buffer(ops);
661 schedule_append_ops(ops, nullptr);
662 }
663 } while (ops_remain);
664 append_scheduled_ops();
665 }
666
667 /*
668 * Performs the log event append operation for all of the scheduled
669 * events.
670 */
671 template <typename I>
672 void WriteLog<I>::append_scheduled_ops(void) {
673 GenericLogOperations ops;
674 int append_result = 0;
675 bool ops_remain = false;
676 bool appending = false; /* true if we set m_appending */
677 ldout(m_image_ctx.cct, 20) << dendl;
678 do {
679 ops.clear();
680 this->append_scheduled(ops, ops_remain, appending, true);
681
682 if (ops.size()) {
683 std::lock_guard locker(this->m_log_append_lock);
684 alloc_op_log_entries(ops);
685 append_result = append_op_log_entries(ops);
686 }
687
688 int num_ops = ops.size();
689 if (num_ops) {
690 /* New entries may be flushable. Completion will wake up flusher. */
691 this->complete_op_log_entries(std::move(ops), append_result);
692 }
693 } while (ops_remain);
694 }
695
696 template <typename I>
697 void WriteLog<I>::enlist_op_flusher()
698 {
699 this->m_async_flush_ops++;
700 this->m_async_op_tracker.start_op();
701 Context *flush_ctx = new LambdaContext([this](int r) {
702 flush_then_append_scheduled_ops();
703 this->m_async_flush_ops--;
704 this->m_async_op_tracker.finish_op();
705 });
706 this->m_work_queue.queue(flush_ctx);
707 }
708
709 template <typename I>
710 void WriteLog<I>::setup_schedule_append(
711 pwl::GenericLogOperationsVector &ops, bool do_early_flush,
712 C_BlockIORequestT *req) {
713 if (do_early_flush) {
714 /* This caller is waiting for persist, so we'll use their thread to
715 * expedite it */
716 flush_pmem_buffer(ops);
717 this->schedule_append(ops);
718 } else {
719 /* This is probably not still the caller's thread, so do the payload
720 * flushing/replicating later. */
721 schedule_flush_and_append(ops);
722 }
723 }
724
725 /*
726 * Takes custody of ops. They'll all get their log entries appended,
727 * and have their on_write_persist contexts completed once they and
728 * all prior log entries are persisted everywhere.
729 */
730 template <typename I>
731 void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req)
732 {
733 bool need_finisher;
734 GenericLogOperationsVector appending;
735
736 std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
737 {
738 std::lock_guard locker(m_lock);
739
740 need_finisher = this->m_ops_to_append.empty() && !this->m_appending;
741 this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
742 }
743
744 if (need_finisher) {
745 //enlist op appender
746 this->m_async_append_ops++;
747 this->m_async_op_tracker.start_op();
748 Context *append_ctx = new LambdaContext([this](int r) {
749 append_scheduled_ops();
750 this->m_async_append_ops--;
751 this->m_async_op_tracker.finish_op();
752 });
753 this->m_work_queue.queue(append_ctx);
754 }
755
756 for (auto &op : appending) {
757 op->appending();
758 }
759 }
760
761 /*
762 * Takes custody of ops. They'll all get their pmem blocks flushed,
763 * then get their log entries appended.
764 */
765 template <typename I>
766 void WriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
767 {
768 GenericLogOperations to_flush(ops.begin(), ops.end());
769 bool need_finisher;
770 ldout(m_image_ctx.cct, 20) << dendl;
771 {
772 std::lock_guard locker(m_lock);
773
774 need_finisher = m_ops_to_flush.empty();
775 m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
776 }
777
778 if (need_finisher) {
779 enlist_op_flusher();
780 }
781 }
782
783 template <typename I>
784 void WriteLog<I>::process_work() {
785 CephContext *cct = m_image_ctx.cct;
786 int max_iterations = 4;
787 bool wake_up_requested = false;
788 uint64_t aggressive_high_water_bytes = this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
789 uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
790 uint64_t low_water_bytes = this->m_bytes_allocated_cap * RETIRE_LOW_WATER;
791 uint64_t aggressive_high_water_entries = this->m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
792 uint64_t high_water_entries = this->m_total_log_entries * RETIRE_HIGH_WATER;
793 uint64_t low_water_entries = this->m_total_log_entries * RETIRE_LOW_WATER;
794
795 ldout(cct, 20) << dendl;
796
797 do {
798 {
799 std::lock_guard locker(m_lock);
800 this->m_wake_up_requested = false;
801 }
802 if (this->m_alloc_failed_since_retire || this->m_invalidating ||
803 this->m_bytes_allocated > high_water_bytes ||
804 (m_log_entries.size() > high_water_entries)) {
805 int retired = 0;
806 utime_t started = ceph_clock_now();
807 ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
808 << ", allocated > high_water="
809 << (this->m_bytes_allocated > high_water_bytes)
810 << ", allocated_entries > high_water="
811 << (m_log_entries.size() > high_water_entries)
812 << dendl;
813 while (this->m_alloc_failed_since_retire || this->m_invalidating ||
814 (this->m_bytes_allocated > high_water_bytes) ||
815 (m_log_entries.size() > high_water_entries) ||
816 (((this->m_bytes_allocated > low_water_bytes) ||
817 (m_log_entries.size() > low_water_entries)) &&
818 (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
819 if (!retire_entries((this->m_shutting_down || this->m_invalidating ||
820 (this->m_bytes_allocated > aggressive_high_water_bytes) ||
821 (m_log_entries.size() > aggressive_high_water_entries) ||
822 this->m_alloc_failed_since_retire)
823 ? MAX_ALLOC_PER_TRANSACTION
824 : MAX_FREE_PER_TRANSACTION)) {
825 break;
826 }
827 retired++;
828 this->dispatch_deferred_writes();
829 this->process_writeback_dirty_entries();
830 }
831 ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
832 }
833 this->dispatch_deferred_writes();
834 this->process_writeback_dirty_entries();
835
836 {
837 std::lock_guard locker(m_lock);
838 wake_up_requested = this->m_wake_up_requested;
839 }
840 } while (wake_up_requested && --max_iterations > 0);
841
842 {
843 std::lock_guard locker(m_lock);
844 this->m_wake_up_scheduled = false;
845 /* Reschedule if it's still requested */
846 if (this->m_wake_up_requested) {
847 this->wake_up();
848 }
849 }
850 }
851
852 /*
853 * Flush the pmem regions for the data blocks of a set of operations
854 *
855 * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
856 */
857 template <typename I>
858 template <typename V>
859 void WriteLog<I>::flush_pmem_buffer(V& ops)
860 {
861 utime_t now = ceph_clock_now();
862 for (auto &operation : ops) {
863 if (operation->reserved_allocated()) {
864 operation->buf_persist_start_time = now;
865 } else {
866 ldout(m_image_ctx.cct, 20) << "skipping non-write op: "
867 << *operation << dendl;
868 }
869 }
870
871 for (auto &operation : ops) {
872 if(operation->is_writing_op()) {
873 auto log_entry = static_pointer_cast<WriteLogEntry>(operation->get_log_entry());
874 pmemobj_flush(m_log_pool, log_entry->cache_buffer, log_entry->write_bytes());
875 }
876 }
877
878 /* Drain once for all */
879 pmemobj_drain(m_log_pool);
880
881 now = ceph_clock_now();
882 for (auto &operation : ops) {
883 if (operation->reserved_allocated()) {
884 operation->buf_persist_comp_time = now;
885 } else {
886 ldout(m_image_ctx.cct, 20) << "skipping non-write op: "
887 << *operation << dendl;
888 }
889 }
890 }
891
892 /**
893 * Update/persist the last flushed sync point in the log
894 */
895 template <typename I>
896 void WriteLog<I>::persist_last_flushed_sync_gen()
897 {
898 TOID(struct WriteLogPoolRoot) pool_root;
899 pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
900 uint64_t flushed_sync_gen;
901
902 std::lock_guard append_locker(this->m_log_append_lock);
903 {
904 std::lock_guard locker(m_lock);
905 flushed_sync_gen = this->m_flushed_sync_gen;
906 }
907
908 if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
909 ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from "
910 << D_RO(pool_root)->flushed_sync_gen << " to "
911 << flushed_sync_gen << dendl;
912 TX_BEGIN(m_log_pool) {
913 D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
914 } TX_ONCOMMIT {
915 } TX_ONABORT {
916 lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl;
917 ceph_assert(false);
918 } TX_FINALLY {
919 } TX_END;
920 }
921 }
922
923 template <typename I>
924 void WriteLog<I>::reserve_cache(C_BlockIORequestT *req,
925 bool &alloc_succeeds, bool &no_space) {
926 std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
927 for (auto &buffer : buffers) {
928 utime_t before_reserve = ceph_clock_now();
929 buffer.buffer_oid = pmemobj_reserve(m_log_pool,
930 &buffer.buffer_alloc_action,
931 buffer.allocation_size,
932 0 /* Object type */);
933 buffer.allocation_lat = ceph_clock_now() - before_reserve;
934 if (TOID_IS_NULL(buffer.buffer_oid)) {
935 ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
936 << pmemobj_errormsg() << ". "
937 << *req << dendl;
938 alloc_succeeds = false;
939 no_space = true; /* Entries need to be retired */
940
941 if (this->m_free_log_entries == this->m_total_log_entries - 1) {
942 /* When the cache is empty, there is still no space to allocate.
943 * Defragment. */
944 pmemobj_defrag(m_log_pool, NULL, 0, NULL);
945 }
946 break;
947 } else {
948 buffer.allocated = true;
949 }
950 ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
951 << "." << buffer.buffer_oid.oid.off
952 << ", size=" << buffer.allocation_size << dendl;
953 }
954 }
955
956 template<typename I>
957 void WriteLog<I>::copy_bl_to_buffer(
958 WriteRequestResources *resources, std::unique_ptr<WriteLogOperationSet> &op_set) {
959 auto allocation = resources->buffers.begin();
960 for (auto &operation : op_set->operations) {
961 operation->copy_bl_to_cache_buffer(allocation);
962 allocation++;
963 }
964 }
965
966 template <typename I>
967 bool WriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
968 bool alloc_succeeds = true;
969 uint64_t bytes_allocated = 0;
970 uint64_t bytes_cached = 0;
971 uint64_t bytes_dirtied = 0;
972 uint64_t num_lanes = 0;
973 uint64_t num_unpublished_reserves = 0;
974 uint64_t num_log_entries = 0;
975
976 ldout(m_image_ctx.cct, 20) << dendl;
977 // Setup buffer, and get all the number of required resources
978 req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated,
979 &num_lanes, &num_log_entries, &num_unpublished_reserves);
980
981 alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied,
982 bytes_allocated, num_lanes, num_log_entries,
983 num_unpublished_reserves);
984
985 std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
986 if (!alloc_succeeds) {
987 /* On alloc failure, free any buffers we did allocate */
988 for (auto &buffer : buffers) {
989 if (buffer.allocated) {
990 pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
991 }
992 }
993 }
994
995 req->set_allocated(alloc_succeeds);
996 return alloc_succeeds;
997 }
998
999 template <typename I>
1000 void WriteLog<I>::complete_user_request(Context *&user_req, int r) {
1001 user_req->complete(r);
1002 // Set user_req as null as it is deleted
1003 user_req = nullptr;
1004 }
1005
1006 } // namespace rwl
1007 } // namespace pwl
1008 } // namespace cache
1009 } // namespace librbd
1010
1011 template class librbd::cache::pwl::rwl::WriteLog<librbd::ImageCtx>;