]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "WriteLog.h" | |
5 | #include "include/buffer.h" | |
6 | #include "include/Context.h" | |
7 | #include "include/ceph_assert.h" | |
8 | #include "common/deleter.h" | |
9 | #include "common/dout.h" | |
10 | #include "common/environment.h" | |
11 | #include "common/errno.h" | |
12 | #include "common/WorkQueue.h" | |
13 | #include "common/Timer.h" | |
14 | #include "common/perf_counters.h" | |
15 | #include "librbd/ImageCtx.h" | |
16 | #include "librbd/asio/ContextWQ.h" | |
17 | #include "librbd/cache/pwl/ImageCacheState.h" | |
18 | #include "librbd/cache/pwl/LogEntry.h" | |
19 | #include "librbd/plugin/Api.h" | |
20 | #include <map> | |
21 | #include <vector> | |
22 | ||
23 | #undef dout_subsys | |
24 | #define dout_subsys ceph_subsys_rbd_pwl | |
25 | #undef dout_prefix | |
26 | #define dout_prefix *_dout << "librbd::cache::pwl::rwl::WriteLog: " << this \ | |
27 | << " " << __func__ << ": " | |
28 | ||
29 | namespace librbd { | |
30 | namespace cache { | |
31 | namespace pwl { | |
20effc67 | 32 | using namespace std; |
f67539c2 TL |
33 | using namespace librbd::cache::pwl; |
34 | namespace rwl { | |
35 | ||
36 | const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION; | |
37 | ||
38 | template <typename I> | |
39 | Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() { | |
40 | m_builderobj = new Builder<This>(); | |
41 | return m_builderobj; | |
42 | } | |
43 | ||
44 | template <typename I> | |
45 | WriteLog<I>::WriteLog( | |
46 | I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state, | |
47 | ImageWritebackInterface& image_writeback, | |
48 | plugin::Api<I>& plugin_api) | |
49 | : AbstractWriteLog<I>(image_ctx, cache_state, create_builder(), image_writeback, | |
50 | plugin_api), | |
51 | m_pwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_pwl)) | |
52 | { | |
53 | } | |
54 | ||
55 | template <typename I> | |
56 | WriteLog<I>::~WriteLog() { | |
57 | m_log_pool = nullptr; | |
58 | delete m_builderobj; | |
59 | } | |
60 | ||
61 | template <typename I> | |
62 | void WriteLog<I>::collect_read_extents( | |
63 | uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry, | |
a4b75251 | 64 | std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read, |
f67539c2 TL |
65 | std::vector<bufferlist*> &bls_to_read, uint64_t entry_hit_length, |
66 | Extent hit_extent, pwl::C_ReadRequest *read_ctx) { | |
67 | /* Make a bl for this hit extent. This will add references to the | |
68 | * write_entry->pmem_bp */ | |
69 | buffer::list hit_bl; | |
70 | ||
71 | /* Create buffer object referring to pmem pool for this read hit */ | |
72 | auto write_entry = map_entry.log_entry; | |
73 | ||
74 | buffer::list entry_bl_copy; | |
75 | write_entry->copy_cache_bl(&entry_bl_copy); | |
76 | entry_bl_copy.begin(read_buffer_offset).copy(entry_hit_length, hit_bl); | |
77 | ceph_assert(hit_bl.length() == entry_hit_length); | |
78 | ||
79 | /* Add hit extent to read extents */ | |
80 | auto hit_extent_buf = std::make_shared<ImageExtentBuf>(hit_extent, hit_bl); | |
81 | read_ctx->read_extents.push_back(hit_extent_buf); | |
82 | } | |
83 | ||
84 | template <typename I> | |
85 | void WriteLog<I>::complete_read( | |
a4b75251 | 86 | std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read, |
f67539c2 TL |
87 | std::vector<bufferlist*> &bls_to_read, Context *ctx) { |
88 | ctx->complete(0); | |
89 | } | |
90 | ||
91 | /* | |
92 | * Allocate the (already reserved) write log entries for a set of operations. | |
93 | * | |
94 | * Locking: | |
95 | * Acquires lock | |
96 | */ | |
97 | template <typename I> | |
98 | void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops) | |
99 | { | |
100 | TOID(struct WriteLogPoolRoot) pool_root; | |
101 | pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); | |
102 | struct WriteLogCacheEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries); | |
103 | ||
104 | ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock)); | |
105 | ||
106 | /* Allocate the (already reserved) log entries */ | |
2a845540 | 107 | std::unique_lock locker(m_lock); |
f67539c2 TL |
108 | |
109 | for (auto &operation : ops) { | |
110 | uint32_t entry_index = this->m_first_free_entry; | |
111 | this->m_first_free_entry = (this->m_first_free_entry + 1) % this->m_total_log_entries; | |
112 | auto &log_entry = operation->get_log_entry(); | |
113 | log_entry->log_entry_index = entry_index; | |
114 | log_entry->ram_entry.entry_index = entry_index; | |
115 | log_entry->cache_entry = &pmem_log_entries[entry_index]; | |
33c7a0ef | 116 | log_entry->ram_entry.set_entry_valid(true); |
f67539c2 TL |
117 | m_log_entries.push_back(log_entry); |
118 | ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl; | |
119 | } | |
33c7a0ef TL |
120 | if (m_cache_state->empty && !m_log_entries.empty()) { |
121 | m_cache_state->empty = false; | |
122 | this->update_image_cache_state(); | |
2a845540 | 123 | this->write_image_cache_state(locker); |
33c7a0ef | 124 | } |
f67539c2 TL |
125 | } |
126 | ||
127 | /* | |
128 | * Write and persist the (already allocated) write log entries and | |
129 | * data buffer allocations for a set of ops. The data buffer for each | |
130 | * of these must already have been persisted to its reserved area. | |
131 | */ | |
132 | template <typename I> | |
133 | int WriteLog<I>::append_op_log_entries(GenericLogOperations &ops) | |
134 | { | |
135 | CephContext *cct = m_image_ctx.cct; | |
136 | GenericLogOperationsVector entries_to_flush; | |
137 | TOID(struct WriteLogPoolRoot) pool_root; | |
138 | pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); | |
139 | int ret = 0; | |
140 | ||
141 | ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock)); | |
142 | ||
143 | if (ops.empty()) { | |
144 | return 0; | |
145 | } | |
146 | entries_to_flush.reserve(OPS_APPENDED_TOGETHER); | |
147 | ||
148 | /* Write log entries to ring and persist */ | |
149 | utime_t now = ceph_clock_now(); | |
150 | for (auto &operation : ops) { | |
151 | if (!entries_to_flush.empty()) { | |
152 | /* Flush these and reset the list if the current entry wraps to the | |
153 | * tail of the ring */ | |
154 | if (entries_to_flush.back()->get_log_entry()->log_entry_index > | |
155 | operation->get_log_entry()->log_entry_index) { | |
156 | ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at " | |
157 | << "operation=[" << *operation << "]" << dendl; | |
158 | flush_op_log_entries(entries_to_flush); | |
159 | entries_to_flush.clear(); | |
160 | now = ceph_clock_now(); | |
161 | } | |
162 | } | |
163 | ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index=" | |
20effc67 TL |
164 | << operation->get_log_entry()->log_entry_index |
165 | << " from " << &operation->get_log_entry()->ram_entry | |
166 | << " to " << operation->get_log_entry()->cache_entry | |
167 | << " operation=[" << *operation << "]" << dendl; | |
a4b75251 | 168 | operation->log_append_start_time = now; |
f67539c2 TL |
169 | *operation->get_log_entry()->cache_entry = operation->get_log_entry()->ram_entry; |
170 | ldout(m_image_ctx.cct, 20) << "APPENDING: index=" | |
20effc67 TL |
171 | << operation->get_log_entry()->log_entry_index |
172 | << " pmem_entry=[" << *operation->get_log_entry()->cache_entry | |
f67539c2 TL |
173 | << "]" << dendl; |
174 | entries_to_flush.push_back(operation); | |
175 | } | |
176 | flush_op_log_entries(entries_to_flush); | |
177 | ||
178 | /* Drain once for all */ | |
179 | pmemobj_drain(m_log_pool); | |
180 | ||
181 | /* | |
182 | * Atomically advance the log head pointer and publish the | |
183 | * allocations for all the data buffers they refer to. | |
184 | */ | |
185 | utime_t tx_start = ceph_clock_now(); | |
186 | TX_BEGIN(m_log_pool) { | |
187 | D_RW(pool_root)->first_free_entry = this->m_first_free_entry; | |
188 | for (auto &operation : ops) { | |
189 | if (operation->reserved_allocated()) { | |
190 | auto write_op = (std::shared_ptr<WriteLogOperation>&) operation; | |
191 | pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1); | |
192 | } else { | |
193 | ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; | |
194 | } | |
195 | } | |
196 | } TX_ONCOMMIT { | |
197 | } TX_ONABORT { | |
198 | lderr(cct) << "failed to commit " << ops.size() | |
199 | << " log entries (" << this->m_log_pool_name << ")" << dendl; | |
200 | ceph_assert(false); | |
201 | ret = -EIO; | |
202 | } TX_FINALLY { | |
203 | } TX_END; | |
204 | ||
205 | utime_t tx_end = ceph_clock_now(); | |
206 | m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start); | |
207 | m_perfcounter->hinc( | |
208 | l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size()); | |
209 | for (auto &operation : ops) { | |
210 | operation->log_append_comp_time = tx_end; | |
211 | } | |
212 | ||
213 | return ret; | |
214 | } | |
215 | ||
216 | /* | |
217 | * Flush the persistent write log entries set of ops. The entries must | |
218 | * be contiguous in persistent memory. | |
219 | */ | |
220 | template <typename I> | |
221 | void WriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops) | |
222 | { | |
223 | if (ops.empty()) { | |
224 | return; | |
225 | } | |
226 | ||
227 | if (ops.size() > 1) { | |
228 | ceph_assert(ops.front()->get_log_entry()->cache_entry < ops.back()->get_log_entry()->cache_entry); | |
229 | } | |
230 | ||
20effc67 TL |
231 | ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() |
232 | << " start address=" | |
233 | << ops.front()->get_log_entry()->cache_entry | |
234 | << " bytes=" | |
f67539c2 TL |
235 | << ops.size() * sizeof(*(ops.front()->get_log_entry()->cache_entry)) |
236 | << dendl; | |
237 | pmemobj_flush(m_log_pool, | |
238 | ops.front()->get_log_entry()->cache_entry, | |
239 | ops.size() * sizeof(*(ops.front()->get_log_entry()->cache_entry))); | |
240 | } | |
241 | ||
242 | template <typename I> | |
243 | void WriteLog<I>::remove_pool_file() { | |
244 | if (m_log_pool) { | |
245 | ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl; | |
246 | pmemobj_close(m_log_pool); | |
247 | } | |
248 | if (m_cache_state->clean) { | |
249 | ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << this->m_log_pool_name << dendl; | |
250 | if (remove(this->m_log_pool_name.c_str()) != 0) { | |
251 | lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << this->m_log_pool_name << "\": " | |
252 | << pmemobj_errormsg() << dendl; | |
253 | } else { | |
f67539c2 TL |
254 | m_cache_state->present = false; |
255 | } | |
256 | } else { | |
257 | ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << this->m_log_pool_name << dendl; | |
258 | } | |
259 | } | |
260 | ||
261 | template <typename I> | |
a4b75251 | 262 | bool WriteLog<I>::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) { |
f67539c2 | 263 | CephContext *cct = m_image_ctx.cct; |
20effc67 | 264 | int r = -EINVAL; |
f67539c2 TL |
265 | TOID(struct WriteLogPoolRoot) pool_root; |
266 | ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); | |
267 | if (access(this->m_log_pool_name.c_str(), F_OK) != 0) { | |
268 | if ((m_log_pool = | |
269 | pmemobj_create(this->m_log_pool_name.c_str(), | |
270 | this->m_pwl_pool_layout_name, | |
a4b75251 | 271 | this->m_log_pool_size, |
f67539c2 | 272 | (S_IWUSR | S_IRUSR))) == NULL) { |
20effc67 TL |
273 | lderr(cct) << "failed to create pool: " << this->m_log_pool_name |
274 | << ". error: " << pmemobj_errormsg() << dendl; | |
f67539c2 TL |
275 | m_cache_state->present = false; |
276 | m_cache_state->clean = true; | |
277 | m_cache_state->empty = true; | |
278 | /* TODO: filter/replace errnos that are meaningless to the caller */ | |
279 | on_finish->complete(-errno); | |
a4b75251 | 280 | return false; |
f67539c2 TL |
281 | } |
282 | m_cache_state->present = true; | |
283 | m_cache_state->clean = true; | |
284 | m_cache_state->empty = true; | |
285 | pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); | |
286 | ||
287 | /* new pool, calculate and store metadata */ | |
a4b75251 | 288 | size_t effective_pool_size = (size_t)(this->m_log_pool_size * USABLE_SIZE); |
f67539c2 TL |
289 | size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogCacheEntry); |
290 | uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size); | |
291 | if (num_small_writes > MAX_LOG_ENTRIES) { | |
292 | num_small_writes = MAX_LOG_ENTRIES; | |
293 | } | |
294 | if (num_small_writes <= 2) { | |
295 | lderr(cct) << "num_small_writes needs to > 2" << dendl; | |
20effc67 | 296 | goto err_close_pool; |
f67539c2 | 297 | } |
f67539c2 TL |
298 | this->m_bytes_allocated_cap = effective_pool_size; |
299 | /* Log ring empty */ | |
300 | m_first_free_entry = 0; | |
301 | m_first_valid_entry = 0; | |
302 | TX_BEGIN(m_log_pool) { | |
303 | TX_ADD(pool_root); | |
a4b75251 | 304 | D_RW(pool_root)->header.layout_version = RWL_LAYOUT_VERSION; |
f67539c2 TL |
305 | D_RW(pool_root)->log_entries = |
306 | TX_ZALLOC(struct WriteLogCacheEntry, | |
307 | sizeof(struct WriteLogCacheEntry) * num_small_writes); | |
a4b75251 | 308 | D_RW(pool_root)->pool_size = this->m_log_pool_size; |
f67539c2 TL |
309 | D_RW(pool_root)->flushed_sync_gen = this->m_flushed_sync_gen; |
310 | D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE; | |
311 | D_RW(pool_root)->num_log_entries = num_small_writes; | |
312 | D_RW(pool_root)->first_free_entry = m_first_free_entry; | |
313 | D_RW(pool_root)->first_valid_entry = m_first_valid_entry; | |
314 | } TX_ONCOMMIT { | |
315 | this->m_total_log_entries = D_RO(pool_root)->num_log_entries; | |
316 | this->m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free | |
317 | } TX_ONABORT { | |
318 | this->m_total_log_entries = 0; | |
319 | this->m_free_log_entries = 0; | |
20effc67 TL |
320 | lderr(cct) << "failed to initialize pool: " << this->m_log_pool_name |
321 | << ". pmemobj TX errno: " << pmemobj_tx_errno() << dendl; | |
322 | r = -pmemobj_tx_errno(); | |
323 | goto err_close_pool; | |
f67539c2 TL |
324 | } TX_FINALLY { |
325 | } TX_END; | |
326 | } else { | |
33c7a0ef | 327 | ceph_assert(m_cache_state->present); |
f67539c2 TL |
328 | /* Open existing pool */ |
329 | if ((m_log_pool = | |
330 | pmemobj_open(this->m_log_pool_name.c_str(), | |
331 | this->m_pwl_pool_layout_name)) == NULL) { | |
332 | lderr(cct) << "failed to open pool (" << this->m_log_pool_name << "): " | |
333 | << pmemobj_errormsg() << dendl; | |
334 | on_finish->complete(-errno); | |
a4b75251 | 335 | return false; |
f67539c2 TL |
336 | } |
337 | pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); | |
a4b75251 | 338 | if (D_RO(pool_root)->header.layout_version != RWL_LAYOUT_VERSION) { |
f67539c2 | 339 | // TODO: will handle upgrading version in the future |
20effc67 | 340 | lderr(cct) << "pool layout version is " |
f67539c2 | 341 | << D_RO(pool_root)->header.layout_version |
a4b75251 | 342 | << " expected " << RWL_LAYOUT_VERSION << dendl; |
20effc67 | 343 | goto err_close_pool; |
f67539c2 TL |
344 | } |
345 | if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) { | |
20effc67 | 346 | lderr(cct) << "pool block size is " << D_RO(pool_root)->block_size |
f67539c2 | 347 | << " expected " << MIN_WRITE_ALLOC_SIZE << dendl; |
20effc67 | 348 | goto err_close_pool; |
f67539c2 | 349 | } |
a4b75251 | 350 | this->m_log_pool_size = D_RO(pool_root)->pool_size; |
f67539c2 TL |
351 | this->m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen; |
352 | this->m_total_log_entries = D_RO(pool_root)->num_log_entries; | |
353 | m_first_free_entry = D_RO(pool_root)->first_free_entry; | |
354 | m_first_valid_entry = D_RO(pool_root)->first_valid_entry; | |
355 | if (m_first_free_entry < m_first_valid_entry) { | |
356 | /* Valid entries wrap around the end of the ring, so first_free is lower | |
357 | * than first_valid. If first_valid was == first_free+1, the entry at | |
358 | * first_free would be empty. The last entry is never used, so in | |
359 | * that case there would be zero free log entries. */ | |
360 | this->m_free_log_entries = this->m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1; | |
361 | } else { | |
362 | /* first_valid is <= first_free. If they are == we have zero valid log | |
363 | * entries, and n-1 free log entries */ | |
364 | this->m_free_log_entries = this->m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1; | |
365 | } | |
a4b75251 | 366 | size_t effective_pool_size = (size_t)(this->m_log_pool_size * USABLE_SIZE); |
f67539c2 TL |
367 | this->m_bytes_allocated_cap = effective_pool_size; |
368 | load_existing_entries(later); | |
369 | m_cache_state->clean = this->m_dirty_log_entries.empty(); | |
370 | m_cache_state->empty = m_log_entries.empty(); | |
371 | } | |
a4b75251 | 372 | return true; |
20effc67 TL |
373 | |
374 | err_close_pool: | |
375 | pmemobj_close(m_log_pool); | |
376 | on_finish->complete(r); | |
377 | return false; | |
f67539c2 TL |
378 | } |
379 | ||
380 | /* | |
381 | * Loads the log entries from an existing log. | |
382 | * | |
383 | * Creates the in-memory structures to represent the state of the | |
384 | * re-opened log. | |
385 | * | |
386 | * Finds the last appended sync point, and any sync points referred to | |
387 | * in log entries, but missing from the log. These missing sync points | |
388 | * are created and scheduled for append. Some rudimentary consistency | |
389 | * checking is done. | |
390 | * | |
391 | * Rebuilds the m_blocks_to_log_entries map, to make log entries | |
392 | * readable. | |
393 | * | |
394 | * Places all writes on the dirty entries list, which causes them all | |
395 | * to be flushed. | |
396 | * | |
397 | */ | |
398 | ||
399 | template <typename I> | |
400 | void WriteLog<I>::load_existing_entries(DeferredContexts &later) { | |
401 | TOID(struct WriteLogPoolRoot) pool_root; | |
402 | pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); | |
403 | struct WriteLogCacheEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries); | |
404 | uint64_t entry_index = m_first_valid_entry; | |
405 | /* The map below allows us to find sync point log entries by sync | |
406 | * gen number, which is necessary so write entries can be linked to | |
407 | * their sync points. */ | |
408 | std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries; | |
409 | /* The map below tracks sync points referred to in writes but not | |
410 | * appearing in the sync_point_entries map. We'll use this to | |
411 | * determine which sync points are missing and need to be | |
412 | * created. */ | |
413 | std::map<uint64_t, bool> missing_sync_points; | |
414 | ||
415 | /* | |
416 | * Read the existing log entries. Construct an in-memory log entry | |
417 | * object of the appropriate type for each. Add these to the global | |
418 | * log entries list. | |
419 | * | |
420 | * Write entries will not link to their sync points yet. We'll do | |
421 | * that in the next pass. Here we'll accumulate a map of sync point | |
422 | * gen numbers that are referred to in writes but do not appearing in | |
423 | * the log. | |
424 | */ | |
425 | while (entry_index != m_first_free_entry) { | |
426 | WriteLogCacheEntry *pmem_entry = &pmem_log_entries[entry_index]; | |
427 | std::shared_ptr<GenericLogEntry> log_entry = nullptr; | |
428 | ceph_assert(pmem_entry->entry_index == entry_index); | |
429 | ||
430 | this->update_entries(&log_entry, pmem_entry, missing_sync_points, | |
431 | sync_point_entries, entry_index); | |
432 | ||
433 | log_entry->ram_entry = *pmem_entry; | |
434 | log_entry->cache_entry = pmem_entry; | |
435 | log_entry->log_entry_index = entry_index; | |
436 | log_entry->completed = true; | |
437 | ||
438 | m_log_entries.push_back(log_entry); | |
439 | ||
440 | entry_index = (entry_index + 1) % this->m_total_log_entries; | |
441 | } | |
442 | ||
a4b75251 TL |
443 | this->update_sync_points(missing_sync_points, sync_point_entries, later); |
444 | } | |
445 | ||
446 | template <typename I> | |
447 | void WriteLog<I>::inc_allocated_cached_bytes( | |
448 | std::shared_ptr<pwl::GenericLogEntry> log_entry) { | |
449 | if (log_entry->is_write_entry()) { | |
450 | this->m_bytes_allocated += std::max(log_entry->write_bytes(), MIN_WRITE_ALLOC_SIZE); | |
451 | this->m_bytes_cached += log_entry->write_bytes(); | |
452 | } | |
f67539c2 TL |
453 | } |
454 | ||
455 | template <typename I> | |
456 | void WriteLog<I>::write_data_to_buffer( | |
457 | std::shared_ptr<pwl::WriteLogEntry> ws_entry, | |
458 | WriteLogCacheEntry *pmem_entry) { | |
459 | ws_entry->cache_buffer = D_RW(pmem_entry->write_data); | |
460 | } | |
461 | ||
462 | /** | |
463 | * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries | |
464 | * that are eligible to be retired. Returns true if anything was | |
465 | * retired. | |
466 | */ | |
467 | template <typename I> | |
468 | bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) { | |
469 | CephContext *cct = m_image_ctx.cct; | |
470 | GenericLogEntriesVector retiring_entries; | |
471 | uint32_t initial_first_valid_entry; | |
472 | uint32_t first_valid_entry; | |
473 | ||
474 | std::lock_guard retire_locker(this->m_log_retire_lock); | |
475 | ldout(cct, 20) << "Look for entries to retire" << dendl; | |
476 | { | |
477 | /* Entry readers can't be added while we hold m_entry_reader_lock */ | |
478 | RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock); | |
479 | std::lock_guard locker(m_lock); | |
480 | initial_first_valid_entry = this->m_first_valid_entry; | |
481 | first_valid_entry = this->m_first_valid_entry; | |
20effc67 TL |
482 | while (!m_log_entries.empty() && retiring_entries.size() < frees_per_tx && |
483 | this->can_retire_entry(m_log_entries.front())) { | |
484 | auto entry = m_log_entries.front(); | |
f67539c2 | 485 | if (entry->log_entry_index != first_valid_entry) { |
20effc67 | 486 | lderr(cct) << "retiring entry index (" << entry->log_entry_index |
f67539c2 TL |
487 | << ") and first valid log entry index (" << first_valid_entry |
488 | << ") must be ==." << dendl; | |
489 | } | |
490 | ceph_assert(entry->log_entry_index == first_valid_entry); | |
491 | first_valid_entry = (first_valid_entry + 1) % this->m_total_log_entries; | |
492 | m_log_entries.pop_front(); | |
493 | retiring_entries.push_back(entry); | |
494 | /* Remove entry from map so there will be no more readers */ | |
495 | if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) { | |
496 | auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry); | |
497 | if (gen_write_entry) { | |
498 | this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry); | |
499 | } | |
500 | } | |
f67539c2 TL |
501 | } |
502 | } | |
503 | ||
504 | if (retiring_entries.size()) { | |
505 | ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl; | |
506 | TOID(struct WriteLogPoolRoot) pool_root; | |
507 | pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); | |
508 | ||
509 | utime_t tx_start; | |
510 | utime_t tx_end; | |
511 | /* Advance first valid entry and release buffers */ | |
512 | { | |
513 | uint64_t flushed_sync_gen; | |
514 | std::lock_guard append_locker(this->m_log_append_lock); | |
515 | { | |
516 | std::lock_guard locker(m_lock); | |
517 | flushed_sync_gen = this->m_flushed_sync_gen; | |
518 | } | |
519 | ||
520 | tx_start = ceph_clock_now(); | |
521 | TX_BEGIN(m_log_pool) { | |
522 | if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) { | |
523 | ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from " | |
524 | << D_RO(pool_root)->flushed_sync_gen << " to " | |
525 | << flushed_sync_gen << dendl; | |
526 | D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen; | |
527 | } | |
528 | D_RW(pool_root)->first_valid_entry = first_valid_entry; | |
529 | for (auto &entry: retiring_entries) { | |
530 | if (entry->write_bytes()) { | |
531 | ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo | |
532 | << "." << entry->ram_entry.write_data.oid.off << dendl; | |
533 | TX_FREE(entry->ram_entry.write_data); | |
534 | } else { | |
535 | ldout(cct, 20) << "Retiring non-write: " << *entry << dendl; | |
536 | } | |
537 | } | |
538 | } TX_ONCOMMIT { | |
539 | } TX_ONABORT { | |
540 | lderr(cct) << "failed to commit free of" << retiring_entries.size() | |
541 | << " log entries (" << this->m_log_pool_name << ")" << dendl; | |
542 | ceph_assert(false); | |
543 | } TX_FINALLY { | |
544 | } TX_END; | |
545 | tx_end = ceph_clock_now(); | |
546 | } | |
547 | m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start); | |
548 | m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), | |
549 | retiring_entries.size()); | |
550 | ||
2a845540 | 551 | bool need_update_state = false; |
f67539c2 TL |
552 | /* Update runtime copy of first_valid, and free entries counts */ |
553 | { | |
554 | std::lock_guard locker(m_lock); | |
555 | ||
556 | ceph_assert(this->m_first_valid_entry == initial_first_valid_entry); | |
557 | this->m_first_valid_entry = first_valid_entry; | |
558 | this->m_free_log_entries += retiring_entries.size(); | |
33c7a0ef TL |
559 | if (!m_cache_state->empty && m_log_entries.empty()) { |
560 | m_cache_state->empty = true; | |
561 | this->update_image_cache_state(); | |
2a845540 | 562 | need_update_state = true; |
33c7a0ef | 563 | } |
f67539c2 TL |
564 | for (auto &entry: retiring_entries) { |
565 | if (entry->write_bytes()) { | |
566 | ceph_assert(this->m_bytes_cached >= entry->write_bytes()); | |
567 | this->m_bytes_cached -= entry->write_bytes(); | |
568 | uint64_t entry_allocation_size = entry->write_bytes(); | |
569 | if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) { | |
570 | entry_allocation_size = MIN_WRITE_ALLOC_SIZE; | |
571 | } | |
572 | ceph_assert(this->m_bytes_allocated >= entry_allocation_size); | |
573 | this->m_bytes_allocated -= entry_allocation_size; | |
574 | } | |
575 | } | |
576 | this->m_alloc_failed_since_retire = false; | |
577 | this->wake_up(); | |
2a845540 TL |
578 | } |
579 | if (need_update_state) { | |
580 | std::unique_lock locker(m_lock); | |
581 | this->write_image_cache_state(locker); | |
f67539c2 TL |
582 | } |
583 | } else { | |
584 | ldout(cct, 20) << "Nothing to retire" << dendl; | |
585 | return false; | |
586 | } | |
587 | return true; | |
588 | } | |
589 | ||
590 | template <typename I> | |
a4b75251 TL |
591 | void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush, |
592 | DeferredContexts &post_unlock, | |
593 | bool has_write_entry) { | |
f67539c2 | 594 | bool invalidating = this->m_invalidating; // snapshot so we behave consistently |
f67539c2 | 595 | |
a4b75251 | 596 | for (auto &log_entry : entries_to_flush) { |
20effc67 TL |
597 | GuardedRequestFunctionContext *guarded_ctx = |
598 | new GuardedRequestFunctionContext([this, log_entry, invalidating] | |
599 | (GuardedRequestFunctionContext &guard_ctx) { | |
600 | log_entry->m_cell = guard_ctx.cell; | |
601 | Context *ctx = this->construct_flush_entry(log_entry, invalidating); | |
602 | ||
603 | if (!invalidating) { | |
604 | ctx = new LambdaContext( | |
605 | [this, log_entry, ctx](int r) { | |
606 | m_image_ctx.op_work_queue->queue(new LambdaContext( | |
607 | [this, log_entry, ctx](int r) { | |
608 | ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry | |
609 | << " " << *log_entry << dendl; | |
610 | log_entry->writeback(this->m_image_writeback, ctx); | |
611 | }), 0); | |
612 | }); | |
613 | } | |
614 | ||
615 | ctx->complete(0); | |
616 | }); | |
617 | this->detain_flush_guard_request(log_entry, guarded_ctx); | |
a4b75251 | 618 | } |
f67539c2 TL |
619 | } |
620 | ||
621 | const unsigned long int ops_flushed_together = 4; | |
622 | /* | |
623 | * Performs the pmem buffer flush on all scheduled ops, then schedules | |
624 | * the log event append operation for all of them. | |
625 | */ | |
626 | template <typename I> | |
627 | void WriteLog<I>::flush_then_append_scheduled_ops(void) | |
628 | { | |
629 | GenericLogOperations ops; | |
630 | bool ops_remain = false; | |
631 | ldout(m_image_ctx.cct, 20) << dendl; | |
632 | do { | |
633 | { | |
634 | ops.clear(); | |
635 | std::lock_guard locker(m_lock); | |
636 | if (m_ops_to_flush.size()) { | |
637 | auto last_in_batch = m_ops_to_flush.begin(); | |
638 | unsigned int ops_to_flush = m_ops_to_flush.size(); | |
639 | if (ops_to_flush > ops_flushed_together) { | |
640 | ops_to_flush = ops_flushed_together; | |
641 | } | |
642 | ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl; | |
643 | std::advance(last_in_batch, ops_to_flush); | |
644 | ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch); | |
645 | ops_remain = !m_ops_to_flush.empty(); | |
20effc67 TL |
646 | ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", remain " |
647 | << m_ops_to_flush.size() << dendl; | |
f67539c2 TL |
648 | } else { |
649 | ops_remain = false; | |
650 | } | |
651 | } | |
652 | if (ops_remain) { | |
653 | enlist_op_flusher(); | |
654 | } | |
655 | ||
656 | /* Ops subsequently scheduled for flush may finish before these, | |
657 | * which is fine. We're unconcerned with completion order until we | |
658 | * get to the log message append step. */ | |
659 | if (ops.size()) { | |
660 | flush_pmem_buffer(ops); | |
a4b75251 | 661 | schedule_append_ops(ops, nullptr); |
f67539c2 TL |
662 | } |
663 | } while (ops_remain); | |
664 | append_scheduled_ops(); | |
665 | } | |
666 | ||
667 | /* | |
668 | * Performs the log event append operation for all of the scheduled | |
669 | * events. | |
670 | */ | |
671 | template <typename I> | |
672 | void WriteLog<I>::append_scheduled_ops(void) { | |
673 | GenericLogOperations ops; | |
674 | int append_result = 0; | |
675 | bool ops_remain = false; | |
676 | bool appending = false; /* true if we set m_appending */ | |
677 | ldout(m_image_ctx.cct, 20) << dendl; | |
678 | do { | |
679 | ops.clear(); | |
680 | this->append_scheduled(ops, ops_remain, appending, true); | |
681 | ||
682 | if (ops.size()) { | |
683 | std::lock_guard locker(this->m_log_append_lock); | |
684 | alloc_op_log_entries(ops); | |
685 | append_result = append_op_log_entries(ops); | |
686 | } | |
687 | ||
688 | int num_ops = ops.size(); | |
689 | if (num_ops) { | |
690 | /* New entries may be flushable. Completion will wake up flusher. */ | |
691 | this->complete_op_log_entries(std::move(ops), append_result); | |
692 | } | |
693 | } while (ops_remain); | |
694 | } | |
695 | ||
696 | template <typename I> | |
697 | void WriteLog<I>::enlist_op_flusher() | |
698 | { | |
699 | this->m_async_flush_ops++; | |
700 | this->m_async_op_tracker.start_op(); | |
701 | Context *flush_ctx = new LambdaContext([this](int r) { | |
702 | flush_then_append_scheduled_ops(); | |
703 | this->m_async_flush_ops--; | |
704 | this->m_async_op_tracker.finish_op(); | |
705 | }); | |
706 | this->m_work_queue.queue(flush_ctx); | |
707 | } | |
708 | ||
709 | template <typename I> | |
710 | void WriteLog<I>::setup_schedule_append( | |
711 | pwl::GenericLogOperationsVector &ops, bool do_early_flush, | |
712 | C_BlockIORequestT *req) { | |
713 | if (do_early_flush) { | |
714 | /* This caller is waiting for persist, so we'll use their thread to | |
715 | * expedite it */ | |
716 | flush_pmem_buffer(ops); | |
717 | this->schedule_append(ops); | |
718 | } else { | |
719 | /* This is probably not still the caller's thread, so do the payload | |
720 | * flushing/replicating later. */ | |
721 | schedule_flush_and_append(ops); | |
722 | } | |
723 | } | |
724 | ||
725 | /* | |
726 | * Takes custody of ops. They'll all get their log entries appended, | |
727 | * and have their on_write_persist contexts completed once they and | |
728 | * all prior log entries are persisted everywhere. | |
729 | */ | |
730 | template <typename I> | |
a4b75251 | 731 | void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req) |
f67539c2 TL |
732 | { |
733 | bool need_finisher; | |
734 | GenericLogOperationsVector appending; | |
735 | ||
736 | std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending)); | |
737 | { | |
738 | std::lock_guard locker(m_lock); | |
739 | ||
740 | need_finisher = this->m_ops_to_append.empty() && !this->m_appending; | |
741 | this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops); | |
742 | } | |
743 | ||
744 | if (need_finisher) { | |
745 | //enlist op appender | |
746 | this->m_async_append_ops++; | |
747 | this->m_async_op_tracker.start_op(); | |
748 | Context *append_ctx = new LambdaContext([this](int r) { | |
749 | append_scheduled_ops(); | |
750 | this->m_async_append_ops--; | |
751 | this->m_async_op_tracker.finish_op(); | |
752 | }); | |
753 | this->m_work_queue.queue(append_ctx); | |
754 | } | |
755 | ||
756 | for (auto &op : appending) { | |
757 | op->appending(); | |
758 | } | |
759 | } | |
760 | ||
761 | /* | |
762 | * Takes custody of ops. They'll all get their pmem blocks flushed, | |
763 | * then get their log entries appended. | |
764 | */ | |
765 | template <typename I> | |
766 | void WriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops) | |
767 | { | |
768 | GenericLogOperations to_flush(ops.begin(), ops.end()); | |
769 | bool need_finisher; | |
770 | ldout(m_image_ctx.cct, 20) << dendl; | |
771 | { | |
772 | std::lock_guard locker(m_lock); | |
773 | ||
774 | need_finisher = m_ops_to_flush.empty(); | |
775 | m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush); | |
776 | } | |
777 | ||
778 | if (need_finisher) { | |
779 | enlist_op_flusher(); | |
780 | } | |
781 | } | |
782 | ||
783 | template <typename I> | |
784 | void WriteLog<I>::process_work() { | |
785 | CephContext *cct = m_image_ctx.cct; | |
786 | int max_iterations = 4; | |
787 | bool wake_up_requested = false; | |
788 | uint64_t aggressive_high_water_bytes = this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER; | |
789 | uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER; | |
790 | uint64_t low_water_bytes = this->m_bytes_allocated_cap * RETIRE_LOW_WATER; | |
791 | uint64_t aggressive_high_water_entries = this->m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER; | |
792 | uint64_t high_water_entries = this->m_total_log_entries * RETIRE_HIGH_WATER; | |
793 | uint64_t low_water_entries = this->m_total_log_entries * RETIRE_LOW_WATER; | |
794 | ||
795 | ldout(cct, 20) << dendl; | |
796 | ||
797 | do { | |
798 | { | |
799 | std::lock_guard locker(m_lock); | |
800 | this->m_wake_up_requested = false; | |
801 | } | |
802 | if (this->m_alloc_failed_since_retire || this->m_invalidating || | |
803 | this->m_bytes_allocated > high_water_bytes || | |
804 | (m_log_entries.size() > high_water_entries)) { | |
805 | int retired = 0; | |
806 | utime_t started = ceph_clock_now(); | |
807 | ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire | |
808 | << ", allocated > high_water=" | |
809 | << (this->m_bytes_allocated > high_water_bytes) | |
810 | << ", allocated_entries > high_water=" | |
811 | << (m_log_entries.size() > high_water_entries) | |
812 | << dendl; | |
813 | while (this->m_alloc_failed_since_retire || this->m_invalidating || | |
814 | (this->m_bytes_allocated > high_water_bytes) || | |
815 | (m_log_entries.size() > high_water_entries) || | |
816 | (((this->m_bytes_allocated > low_water_bytes) || | |
817 | (m_log_entries.size() > low_water_entries)) && | |
818 | (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) { | |
819 | if (!retire_entries((this->m_shutting_down || this->m_invalidating || | |
820 | (this->m_bytes_allocated > aggressive_high_water_bytes) || | |
821 | (m_log_entries.size() > aggressive_high_water_entries) || | |
822 | this->m_alloc_failed_since_retire) | |
823 | ? MAX_ALLOC_PER_TRANSACTION | |
824 | : MAX_FREE_PER_TRANSACTION)) { | |
825 | break; | |
826 | } | |
827 | retired++; | |
828 | this->dispatch_deferred_writes(); | |
829 | this->process_writeback_dirty_entries(); | |
830 | } | |
831 | ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl; | |
832 | } | |
833 | this->dispatch_deferred_writes(); | |
834 | this->process_writeback_dirty_entries(); | |
835 | ||
836 | { | |
837 | std::lock_guard locker(m_lock); | |
838 | wake_up_requested = this->m_wake_up_requested; | |
839 | } | |
840 | } while (wake_up_requested && --max_iterations > 0); | |
841 | ||
842 | { | |
843 | std::lock_guard locker(m_lock); | |
844 | this->m_wake_up_scheduled = false; | |
845 | /* Reschedule if it's still requested */ | |
846 | if (this->m_wake_up_requested) { | |
847 | this->wake_up(); | |
848 | } | |
849 | } | |
850 | } | |
851 | ||
852 | /* | |
853 | * Flush the pmem regions for the data blocks of a set of operations | |
854 | * | |
855 | * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I> | |
856 | */ | |
857 | template <typename I> | |
858 | template <typename V> | |
859 | void WriteLog<I>::flush_pmem_buffer(V& ops) | |
860 | { | |
a4b75251 TL |
861 | utime_t now = ceph_clock_now(); |
862 | for (auto &operation : ops) { | |
863 | if (operation->reserved_allocated()) { | |
864 | operation->buf_persist_start_time = now; | |
865 | } else { | |
866 | ldout(m_image_ctx.cct, 20) << "skipping non-write op: " | |
867 | << *operation << dendl; | |
868 | } | |
869 | } | |
870 | ||
f67539c2 TL |
871 | for (auto &operation : ops) { |
872 | if(operation->is_writing_op()) { | |
873 | auto log_entry = static_pointer_cast<WriteLogEntry>(operation->get_log_entry()); | |
874 | pmemobj_flush(m_log_pool, log_entry->cache_buffer, log_entry->write_bytes()); | |
875 | } | |
876 | } | |
877 | ||
878 | /* Drain once for all */ | |
879 | pmemobj_drain(m_log_pool); | |
880 | ||
a4b75251 | 881 | now = ceph_clock_now(); |
f67539c2 TL |
882 | for (auto &operation : ops) { |
883 | if (operation->reserved_allocated()) { | |
884 | operation->buf_persist_comp_time = now; | |
885 | } else { | |
a4b75251 TL |
886 | ldout(m_image_ctx.cct, 20) << "skipping non-write op: " |
887 | << *operation << dendl; | |
f67539c2 TL |
888 | } |
889 | } | |
890 | } | |
891 | ||
892 | /** | |
893 | * Update/persist the last flushed sync point in the log | |
894 | */ | |
895 | template <typename I> | |
896 | void WriteLog<I>::persist_last_flushed_sync_gen() | |
897 | { | |
898 | TOID(struct WriteLogPoolRoot) pool_root; | |
899 | pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); | |
900 | uint64_t flushed_sync_gen; | |
901 | ||
902 | std::lock_guard append_locker(this->m_log_append_lock); | |
903 | { | |
904 | std::lock_guard locker(m_lock); | |
905 | flushed_sync_gen = this->m_flushed_sync_gen; | |
906 | } | |
907 | ||
908 | if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) { | |
909 | ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from " | |
910 | << D_RO(pool_root)->flushed_sync_gen << " to " | |
911 | << flushed_sync_gen << dendl; | |
912 | TX_BEGIN(m_log_pool) { | |
913 | D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen; | |
914 | } TX_ONCOMMIT { | |
915 | } TX_ONABORT { | |
916 | lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl; | |
917 | ceph_assert(false); | |
918 | } TX_FINALLY { | |
919 | } TX_END; | |
920 | } | |
921 | } | |
922 | ||
923 | template <typename I> | |
924 | void WriteLog<I>::reserve_cache(C_BlockIORequestT *req, | |
925 | bool &alloc_succeeds, bool &no_space) { | |
926 | std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers(); | |
927 | for (auto &buffer : buffers) { | |
928 | utime_t before_reserve = ceph_clock_now(); | |
929 | buffer.buffer_oid = pmemobj_reserve(m_log_pool, | |
930 | &buffer.buffer_alloc_action, | |
931 | buffer.allocation_size, | |
932 | 0 /* Object type */); | |
933 | buffer.allocation_lat = ceph_clock_now() - before_reserve; | |
934 | if (TOID_IS_NULL(buffer.buffer_oid)) { | |
f67539c2 TL |
935 | ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: " |
936 | << pmemobj_errormsg() << ". " | |
937 | << *req << dendl; | |
938 | alloc_succeeds = false; | |
939 | no_space = true; /* Entries need to be retired */ | |
a4b75251 TL |
940 | |
941 | if (this->m_free_log_entries == this->m_total_log_entries - 1) { | |
942 | /* When the cache is empty, there is still no space to allocate. | |
943 | * Defragment. */ | |
944 | pmemobj_defrag(m_log_pool, NULL, 0, NULL); | |
945 | } | |
f67539c2 TL |
946 | break; |
947 | } else { | |
948 | buffer.allocated = true; | |
949 | } | |
950 | ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo | |
951 | << "." << buffer.buffer_oid.oid.off | |
952 | << ", size=" << buffer.allocation_size << dendl; | |
953 | } | |
954 | } | |
955 | ||
956 | template<typename I> | |
957 | void WriteLog<I>::copy_bl_to_buffer( | |
958 | WriteRequestResources *resources, std::unique_ptr<WriteLogOperationSet> &op_set) { | |
959 | auto allocation = resources->buffers.begin(); | |
960 | for (auto &operation : op_set->operations) { | |
961 | operation->copy_bl_to_cache_buffer(allocation); | |
962 | allocation++; | |
963 | } | |
964 | } | |
965 | ||
966 | template <typename I> | |
967 | bool WriteLog<I>::alloc_resources(C_BlockIORequestT *req) { | |
968 | bool alloc_succeeds = true; | |
969 | uint64_t bytes_allocated = 0; | |
970 | uint64_t bytes_cached = 0; | |
971 | uint64_t bytes_dirtied = 0; | |
972 | uint64_t num_lanes = 0; | |
973 | uint64_t num_unpublished_reserves = 0; | |
974 | uint64_t num_log_entries = 0; | |
975 | ||
976 | ldout(m_image_ctx.cct, 20) << dendl; | |
977 | // Setup buffer, and get all the number of required resources | |
978 | req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated, | |
979 | &num_lanes, &num_log_entries, &num_unpublished_reserves); | |
980 | ||
a4b75251 TL |
981 | alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied, |
982 | bytes_allocated, num_lanes, num_log_entries, | |
983 | num_unpublished_reserves); | |
f67539c2 TL |
984 | |
985 | std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers(); | |
986 | if (!alloc_succeeds) { | |
987 | /* On alloc failure, free any buffers we did allocate */ | |
988 | for (auto &buffer : buffers) { | |
989 | if (buffer.allocated) { | |
990 | pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1); | |
991 | } | |
992 | } | |
993 | } | |
994 | ||
995 | req->set_allocated(alloc_succeeds); | |
996 | return alloc_succeeds; | |
997 | } | |
998 | ||
999 | template <typename I> | |
1000 | void WriteLog<I>::complete_user_request(Context *&user_req, int r) { | |
1001 | user_req->complete(r); | |
1002 | // Set user_req as null as it is deleted | |
1003 | user_req = nullptr; | |
1004 | } | |
1005 | ||
1006 | } // namespace rwl | |
1007 | } // namespace pwl | |
1008 | } // namespace cache | |
1009 | } // namespace librbd | |
1010 | ||
1011 | template class librbd::cache::pwl::rwl::WriteLog<librbd::ImageCtx>; |