]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/pwl/ssd/WriteLog.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / librbd / cache / pwl / ssd / WriteLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "WriteLog.h"
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/WorkQueue.h"
13 #include "common/Timer.h"
14 #include "common/perf_counters.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/asio/ContextWQ.h"
17 #include "librbd/cache/pwl/ImageCacheState.h"
18 #include "librbd/cache/pwl/LogEntry.h"
19 #include <map>
20 #include <vector>
21
22 #undef dout_subsys
23 #define dout_subsys ceph_subsys_rbd_pwl
24 #undef dout_prefix
25 #define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \
26 << this << " " << __func__ << ": "
27
28 namespace librbd {
29 namespace cache {
30 namespace pwl {
31 namespace ssd {
32
33 using namespace std;
34 using namespace librbd::cache::pwl;
35
36 static bool is_valid_pool_root(const WriteLogPoolRoot& root) {
37 return root.pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
38 root.first_valid_entry >= DATA_RING_BUFFER_OFFSET &&
39 root.first_valid_entry < root.pool_size &&
40 root.first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
41 root.first_free_entry >= DATA_RING_BUFFER_OFFSET &&
42 root.first_free_entry < root.pool_size &&
43 root.first_free_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0;
44 }
45
46 template <typename I>
47 Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() {
48 m_builderobj = new Builder<This>();
49 return m_builderobj;
50 }
51
52 template <typename I>
53 WriteLog<I>::WriteLog(
54 I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
55 cache::ImageWritebackInterface& image_writeback,
56 plugin::Api<I>& plugin_api)
57 : AbstractWriteLog<I>(image_ctx, cache_state, create_builder(),
58 image_writeback, plugin_api)
59 {
60 }
61
62 template <typename I>
63 WriteLog<I>::~WriteLog() {
64 delete m_builderobj;
65 }
66
67 template <typename I>
68 void WriteLog<I>::collect_read_extents(
69 uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
70 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
71 std::vector<bufferlist*> &bls_to_read,
72 uint64_t entry_hit_length, Extent hit_extent,
73 pwl::C_ReadRequest *read_ctx) {
74 // Make a bl for this hit extent. This will add references to the
75 // write_entry->cache_bl */
76 ldout(m_image_ctx.cct, 5) << dendl;
77 auto write_entry = std::static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
78 buffer::list hit_bl;
79 write_entry->copy_cache_bl(&hit_bl);
80 bool writesame = write_entry->is_writesame_entry();
81 auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
82 hit_extent, hit_bl, true, read_buffer_offset, writesame);
83 read_ctx->read_extents.push_back(hit_extent_buf);
84
85 if (!hit_bl.length()) {
86 ldout(m_image_ctx.cct, 5) << "didn't hit RAM" << dendl;
87 auto read_extent = read_ctx->read_extents.back();
88 write_entry->inc_bl_refs();
89 log_entries_to_read.push_back(std::move(write_entry));
90 bls_to_read.push_back(&read_extent->m_bl);
91 }
92 }
93
94 template <typename I>
95 void WriteLog<I>::complete_read(
96 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
97 std::vector<bufferlist*> &bls_to_read,
98 Context *ctx) {
99 if (!log_entries_to_read.empty()) {
100 aio_read_data_blocks(log_entries_to_read, bls_to_read, ctx);
101 } else {
102 ctx->complete(0);
103 }
104 }
105
106 template <typename I>
107 int WriteLog<I>::create_and_open_bdev() {
108 CephContext *cct = m_image_ctx.cct;
109
110 bdev = BlockDevice::create(cct, this->m_log_pool_name, aio_cache_cb,
111 nullptr, nullptr, nullptr);
112 int r = bdev->open(this->m_log_pool_name);
113 if (r < 0) {
114 lderr(cct) << "failed to open bdev" << dendl;
115 delete bdev;
116 return r;
117 }
118
119 ceph_assert(this->m_log_pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0);
120 if (bdev->get_size() != this->m_log_pool_size) {
121 lderr(cct) << "size mismatch: bdev size " << bdev->get_size()
122 << " (block size " << bdev->get_block_size()
123 << ") != pool size " << this->m_log_pool_size << dendl;
124 bdev->close();
125 delete bdev;
126 return -EINVAL;
127 }
128
129 return 0;
130 }
131
132 template <typename I>
133 bool WriteLog<I>::initialize_pool(Context *on_finish,
134 pwl::DeferredContexts &later) {
135 int r;
136 CephContext *cct = m_image_ctx.cct;
137
138 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
139 if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
140 int fd = ::open(this->m_log_pool_name.c_str(), O_RDWR|O_CREAT, 0644);
141 bool succeed = true;
142 if (fd >= 0) {
143 if (truncate(this->m_log_pool_name.c_str(),
144 this->m_log_pool_size) != 0) {
145 succeed = false;
146 }
147 ::close(fd);
148 } else {
149 succeed = false;
150 }
151 if (!succeed) {
152 m_cache_state->present = false;
153 m_cache_state->clean = true;
154 m_cache_state->empty = true;
155 /* TODO: filter/replace errnos that are meaningless to the caller */
156 on_finish->complete(-errno);
157 return false;
158 }
159
160 r = create_and_open_bdev();
161 if (r < 0) {
162 on_finish->complete(r);
163 return false;
164 }
165 m_cache_state->present = true;
166 m_cache_state->clean = true;
167 m_cache_state->empty = true;
168 /* new pool, calculate and store metadata */
169
170 /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free.
171 * In this way, when all ring buffer spaces are allocated,
172 * m_first_free_entry and m_first_valid_entry will not be equal.
173 * Equal only means the cache is empty. */
174 this->m_bytes_allocated_cap = this->m_log_pool_size -
175 DATA_RING_BUFFER_OFFSET - MIN_WRITE_ALLOC_SSD_SIZE;
176 /* Log ring empty */
177 m_first_free_entry = DATA_RING_BUFFER_OFFSET;
178 m_first_valid_entry = DATA_RING_BUFFER_OFFSET;
179
180 auto new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
181 new_root->layout_version = SSD_LAYOUT_VERSION;
182 new_root->pool_size = this->m_log_pool_size;
183 new_root->flushed_sync_gen = this->m_flushed_sync_gen;
184 new_root->block_size = MIN_WRITE_ALLOC_SSD_SIZE;
185 new_root->first_free_entry = m_first_free_entry;
186 new_root->first_valid_entry = m_first_valid_entry;
187 new_root->num_log_entries = 0;
188 pool_root = *new_root;
189
190 r = update_pool_root_sync(new_root);
191 if (r != 0) {
192 lderr(cct) << "failed to initialize pool ("
193 << this->m_log_pool_name << ")" << dendl;
194 bdev->close();
195 delete bdev;
196 on_finish->complete(r);
197 return false;
198 }
199 } else {
200 m_cache_state->present = true;
201 r = create_and_open_bdev();
202 if (r < 0) {
203 on_finish->complete(r);
204 return false;
205 }
206
207 bufferlist bl;
208 SuperBlock superblock;
209 ::IOContext ioctx(cct, nullptr);
210 r = bdev->read(0, MIN_WRITE_ALLOC_SSD_SIZE, &bl, &ioctx, false);
211 if (r < 0) {
212 lderr(cct) << "read ssd cache superblock failed " << dendl;
213 goto err_close_bdev;
214 }
215 auto p = bl.cbegin();
216 decode(superblock, p);
217 pool_root = superblock.root;
218 ldout(cct, 1) << "Decoded root: pool_size=" << pool_root.pool_size
219 << " first_valid_entry=" << pool_root.first_valid_entry
220 << " first_free_entry=" << pool_root.first_free_entry
221 << " flushed_sync_gen=" << pool_root.flushed_sync_gen
222 << dendl;
223 ceph_assert(is_valid_pool_root(pool_root));
224 if (pool_root.layout_version != SSD_LAYOUT_VERSION) {
225 lderr(cct) << "pool layout version is "
226 << pool_root.layout_version
227 << " expected " << SSD_LAYOUT_VERSION
228 << dendl;
229 goto err_close_bdev;
230 }
231 if (pool_root.block_size != MIN_WRITE_ALLOC_SSD_SIZE) {
232 lderr(cct) << "pool block size is " << pool_root.block_size
233 << " expected " << MIN_WRITE_ALLOC_SSD_SIZE
234 << dendl;
235 goto err_close_bdev;
236 }
237
238 this->m_log_pool_size = pool_root.pool_size;
239 this->m_flushed_sync_gen = pool_root.flushed_sync_gen;
240 this->m_first_valid_entry = pool_root.first_valid_entry;
241 this->m_first_free_entry = pool_root.first_free_entry;
242 this->m_bytes_allocated_cap = this->m_log_pool_size -
243 DATA_RING_BUFFER_OFFSET -
244 MIN_WRITE_ALLOC_SSD_SIZE;
245
246 load_existing_entries(later);
247 m_cache_state->clean = this->m_dirty_log_entries.empty();
248 m_cache_state->empty = m_log_entries.empty();
249 }
250 return true;
251
252 err_close_bdev:
253 bdev->close();
254 delete bdev;
255 on_finish->complete(-EINVAL);
256 return false;
257 }
258
259 template <typename I>
260 void WriteLog<I>::remove_pool_file() {
261 ceph_assert(bdev);
262 bdev->close();
263 delete bdev;
264 bdev = nullptr;
265 ldout(m_image_ctx.cct, 5) << "block device is closed" << dendl;
266
267 if (m_cache_state->clean) {
268 ldout(m_image_ctx.cct, 5) << "Removing empty pool file: "
269 << this->m_log_pool_name << dendl;
270 if (remove(this->m_log_pool_name.c_str()) != 0) {
271 lderr(m_image_ctx.cct) << "failed to remove empty pool \""
272 << this->m_log_pool_name << "\": " << dendl;
273 } else {
274 m_cache_state->clean = true;
275 m_cache_state->empty = true;
276 m_cache_state->present = false;
277 }
278 } else {
279 ldout(m_image_ctx.cct, 5) << "Not removing pool file: "
280 << this->m_log_pool_name << dendl;
281 }
282 }
283
284 template <typename I>
285 void WriteLog<I>::load_existing_entries(pwl::DeferredContexts &later) {
286 CephContext *cct = m_image_ctx.cct;
287 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
288 std::map<uint64_t, bool> missing_sync_points;
289
290 // Iterate through the log_entries and append all the write_bytes
291 // of each entry to fetch the pos of next 4k of log_entries. Iterate
292 // through the log entries and append them to the in-memory vector
293 for (uint64_t next_log_pos = this->m_first_valid_entry;
294 next_log_pos != this->m_first_free_entry; ) {
295 // read the entries from SSD cache and decode
296 bufferlist bl_entries;
297 ::IOContext ioctx_entry(cct, nullptr);
298 bdev->read(next_log_pos, MIN_WRITE_ALLOC_SSD_SIZE, &bl_entries,
299 &ioctx_entry, false);
300 std::vector<WriteLogCacheEntry> ssd_log_entries;
301 auto pl = bl_entries.cbegin();
302 decode(ssd_log_entries, pl);
303 ldout(cct, 5) << "decoded ssd log entries" << dendl;
304 uint64_t curr_log_pos = next_log_pos;
305 std::shared_ptr<GenericLogEntry> log_entry = nullptr;
306
307 for (auto it = ssd_log_entries.begin(); it != ssd_log_entries.end(); ++it) {
308 this->update_entries(&log_entry, &*it, missing_sync_points,
309 sync_point_entries, curr_log_pos);
310 log_entry->ram_entry = *it;
311 log_entry->log_entry_index = curr_log_pos;
312 log_entry->completed = true;
313 m_log_entries.push_back(log_entry);
314 next_log_pos += round_up_to(it->write_bytes, MIN_WRITE_ALLOC_SSD_SIZE);
315 }
316 // along with the write_bytes, add control block size too
317 next_log_pos += MIN_WRITE_ALLOC_SSD_SIZE;
318 if (next_log_pos >= this->m_log_pool_size) {
319 next_log_pos = next_log_pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
320 }
321 }
322 this->update_sync_points(missing_sync_points, sync_point_entries, later);
323 if (m_first_valid_entry > m_first_free_entry) {
324 m_bytes_allocated = this->m_log_pool_size - m_first_valid_entry +
325 m_first_free_entry - DATA_RING_BUFFER_OFFSET;
326 } else {
327 m_bytes_allocated = m_first_free_entry - m_first_valid_entry;
328 }
329 }
330
331 // For SSD we don't calc m_bytes_allocated in this
332 template <typename I>
333 void WriteLog<I>::inc_allocated_cached_bytes(
334 std::shared_ptr<pwl::GenericLogEntry> log_entry) {
335 if (log_entry->is_write_entry()) {
336 this->m_bytes_cached += log_entry->write_bytes();
337 }
338 }
339
340 template <typename I>
341 bool WriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
342 bool alloc_succeeds = true;
343 uint64_t bytes_allocated = 0;
344 uint64_t bytes_cached = 0;
345 uint64_t bytes_dirtied = 0;
346 uint64_t num_lanes = 0;
347 uint64_t num_unpublished_reserves = 0;
348 uint64_t num_log_entries = 0;
349
350 // Setup buffer, and get all the number of required resources
351 req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated,
352 &num_lanes, &num_log_entries,
353 &num_unpublished_reserves);
354
355 ceph_assert(!num_lanes);
356 if (num_log_entries) {
357 bytes_allocated += num_log_entries * MIN_WRITE_ALLOC_SSD_SIZE;
358 num_log_entries = 0;
359 }
360 ceph_assert(!num_unpublished_reserves);
361
362 alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied,
363 bytes_allocated, num_lanes,
364 num_log_entries,
365 num_unpublished_reserves);
366 req->set_allocated(alloc_succeeds);
367 return alloc_succeeds;
368 }
369
370 template <typename I>
371 bool WriteLog<I>::has_sync_point_logs(GenericLogOperations &ops) {
372 for (auto &op : ops) {
373 if (op->get_log_entry()->is_sync_point()) {
374 return true;
375 break;
376 }
377 }
378 return false;
379 }
380
381 template<typename I>
382 void WriteLog<I>::enlist_op_appender() {
383 this->m_async_append_ops++;
384 this->m_async_op_tracker.start_op();
385 Context *append_ctx = new LambdaContext([this](int r) {
386 append_scheduled_ops();
387 });
388 this->m_work_queue.queue(append_ctx);
389 }
390
391 /*
392 * Takes custody of ops. They'll all get their log entries appended,
393 * and have their on_write_persist contexts completed once they and
394 * all prior log entries are persisted everywhere.
395 */
396 template<typename I>
397 void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req) {
398 bool need_finisher = false;
399 GenericLogOperationsVector appending;
400
401 std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
402 {
403 std::lock_guard locker(m_lock);
404
405 bool persist_on_flush = this->get_persist_on_flush();
406 need_finisher = !this->m_appending &&
407 ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) ||
408 !persist_on_flush);
409
410 // Only flush logs into SSD when there is internal/external flush request
411 if (!need_finisher) {
412 need_finisher = has_sync_point_logs(ops);
413 }
414 this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
415
416 // To preserve the order of overlapping IOs, release_cell() may be
417 // called only after the ops are added to m_ops_to_append.
418 // As soon as m_lock is released, the appended ops can be picked up
419 // by append_scheduled_ops() in another thread and req can be freed.
420 if (req != nullptr) {
421 if (persist_on_flush) {
422 req->complete_user_request(0);
423 }
424 req->release_cell();
425 }
426 }
427
428 if (need_finisher) {
429 this->enlist_op_appender();
430 }
431
432 for (auto &op : appending) {
433 op->appending();
434 }
435 }
436
437 template <typename I>
438 void WriteLog<I>::setup_schedule_append(pwl::GenericLogOperationsVector &ops,
439 bool do_early_flush,
440 C_BlockIORequestT *req) {
441 this->schedule_append(ops, req);
442 }
443
444 template <typename I>
445 void WriteLog<I>::append_scheduled_ops(void) {
446 GenericLogOperations ops;
447 ldout(m_image_ctx.cct, 20) << dendl;
448
449 bool ops_remain = false; // unused, no-op variable for SSD
450 bool appending = false; // unused, no-op variable for SSD
451 this->append_scheduled(ops, ops_remain, appending);
452
453 if (ops.size()) {
454 alloc_op_log_entries(ops);
455 append_op_log_entries(ops);
456 } else {
457 this->m_async_append_ops--;
458 this->m_async_op_tracker.finish_op();
459 }
460 }
461
462 /*
463 * Write and persist the (already allocated) write log entries and
464 * data buffer allocations for a set of ops. The data buffer for each
465 * of these must already have been persisted to its reserved area.
466 */
467 template <typename I>
468 void WriteLog<I>::append_op_log_entries(GenericLogOperations &ops) {
469 ceph_assert(!ops.empty());
470 ldout(m_image_ctx.cct, 20) << dendl;
471 Context *ctx = new LambdaContext([this, ops](int r) {
472 assert(r == 0);
473 ldout(m_image_ctx.cct, 20) << "Finished root update " << dendl;
474
475 auto captured_ops = std::move(ops);
476 this->complete_op_log_entries(std::move(captured_ops), r);
477
478 bool need_finisher = false;
479 {
480 std::lock_guard locker1(m_lock);
481 bool persist_on_flush = this->get_persist_on_flush();
482 need_finisher = ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) ||
483 !persist_on_flush);
484
485 if (!need_finisher) {
486 need_finisher = has_sync_point_logs(this->m_ops_to_append);
487 }
488 }
489
490 if (need_finisher) {
491 this->enlist_op_appender();
492 }
493 this->m_async_update_superblock--;
494 this->m_async_op_tracker.finish_op();
495 });
496 uint64_t *new_first_free_entry = new(uint64_t);
497 Context *append_ctx = new LambdaContext(
498 [this, new_first_free_entry, ops, ctx](int r) {
499 std::shared_ptr<WriteLogPoolRoot> new_root;
500 {
501 ldout(m_image_ctx.cct, 20) << "Finished appending at "
502 << *new_first_free_entry << dendl;
503 utime_t now = ceph_clock_now();
504 for (auto &operation : ops) {
505 operation->log_append_comp_time = now;
506 }
507
508 std::lock_guard locker(this->m_log_append_lock);
509 std::lock_guard locker1(m_lock);
510 assert(this->m_appending);
511 this->m_appending = false;
512 new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
513 pool_root.first_free_entry = *new_first_free_entry;
514 new_root->first_free_entry = *new_first_free_entry;
515 delete new_first_free_entry;
516 schedule_update_root(new_root, ctx);
517 }
518 this->m_async_append_ops--;
519 this->m_async_op_tracker.finish_op();
520 });
521 // Append logs and update first_free_update
522 append_ops(ops, append_ctx, new_first_free_entry);
523
524 if (ops.size()) {
525 this->dispatch_deferred_writes();
526 }
527 }
528
529 template <typename I>
530 void WriteLog<I>::release_ram(std::shared_ptr<GenericLogEntry> log_entry) {
531 log_entry->remove_cache_bl();
532 }
533
534 template <typename I>
535 void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops) {
536 std::lock_guard locker(m_lock);
537
538 for (auto &operation : ops) {
539 auto &log_entry = operation->get_log_entry();
540 log_entry->ram_entry.entry_valid = 1;
541 m_log_entries.push_back(log_entry);
542 ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
543 }
544 }
545
546 template <typename I>
547 void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
548 DeferredContexts &post_unlock,
549 bool has_write_entry) {
550 // snapshot so we behave consistently
551 bool invalidating = this->m_invalidating;
552
553 if (invalidating || !has_write_entry) {
554 for (auto &log_entry : entries_to_flush) {
555 GuardedRequestFunctionContext *guarded_ctx =
556 new GuardedRequestFunctionContext([this, log_entry, invalidating]
557 (GuardedRequestFunctionContext &guard_ctx) {
558 log_entry->m_cell = guard_ctx.cell;
559 Context *ctx = this->construct_flush_entry(log_entry, invalidating);
560
561 if (!invalidating) {
562 ctx = new LambdaContext([this, log_entry, ctx](int r) {
563 m_image_ctx.op_work_queue->queue(new LambdaContext(
564 [this, log_entry, ctx](int r) {
565 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
566 << " " << *log_entry << dendl;
567 log_entry->writeback(this->m_image_writeback, ctx);
568 }), 0);
569 });
570 }
571 ctx->complete(0);
572 });
573 this->detain_flush_guard_request(log_entry, guarded_ctx);
574 }
575 } else {
576 int count = entries_to_flush.size();
577 std::vector<std::shared_ptr<GenericWriteLogEntry>> write_entries;
578 std::vector<bufferlist *> read_bls;
579
580 write_entries.reserve(count);
581 read_bls.reserve(count);
582
583 for (auto &log_entry : entries_to_flush) {
584 if (log_entry->is_write_entry()) {
585 bufferlist *bl = new bufferlist;
586 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
587 write_entry->inc_bl_refs();
588 write_entries.push_back(write_entry);
589 read_bls.push_back(bl);
590 }
591 }
592
593 Context *ctx = new LambdaContext(
594 [this, entries_to_flush, read_bls](int r) {
595 int i = 0;
596 GuardedRequestFunctionContext *guarded_ctx = nullptr;
597
598 for (auto &log_entry : entries_to_flush) {
599 if (log_entry->is_write_entry()) {
600 bufferlist captured_entry_bl;
601 captured_entry_bl.claim_append(*read_bls[i]);
602 delete read_bls[i++];
603
604 guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl]
605 (GuardedRequestFunctionContext &guard_ctx) {
606 log_entry->m_cell = guard_ctx.cell;
607 Context *ctx = this->construct_flush_entry(log_entry, false);
608
609 m_image_ctx.op_work_queue->queue(new LambdaContext(
610 [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
611 auto captured_entry_bl = std::move(entry_bl);
612 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
613 << " " << *log_entry << dendl;
614 log_entry->writeback_bl(this->m_image_writeback, ctx,
615 std::move(captured_entry_bl));
616 }), 0);
617 });
618 } else {
619 guarded_ctx = new GuardedRequestFunctionContext([this, log_entry]
620 (GuardedRequestFunctionContext &guard_ctx) {
621 log_entry->m_cell = guard_ctx.cell;
622 Context *ctx = this->construct_flush_entry(log_entry, false);
623 m_image_ctx.op_work_queue->queue(new LambdaContext(
624 [this, log_entry, ctx](int r) {
625 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
626 << " " << *log_entry << dendl;
627 log_entry->writeback(this->m_image_writeback, ctx);
628 }), 0);
629 });
630 }
631 this->detain_flush_guard_request(log_entry, guarded_ctx);
632 }
633 });
634
635 aio_read_data_blocks(write_entries, read_bls, ctx);
636 }
637 }
638
639 template <typename I>
640 void WriteLog<I>::process_work() {
641 CephContext *cct = m_image_ctx.cct;
642 int max_iterations = 4;
643 bool wake_up_requested = false;
644 uint64_t aggressive_high_water_bytes =
645 this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
646 uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
647
648 ldout(cct, 20) << dendl;
649
650 do {
651 {
652 std::lock_guard locker(m_lock);
653 this->m_wake_up_requested = false;
654 }
655 if (this->m_alloc_failed_since_retire || (this->m_shutting_down) ||
656 this->m_invalidating || m_bytes_allocated > high_water_bytes) {
657 ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
658 << ", allocated > high_water="
659 << (m_bytes_allocated > high_water_bytes)
660 << dendl;
661 retire_entries((this->m_shutting_down || this->m_invalidating ||
662 m_bytes_allocated > aggressive_high_water_bytes)
663 ? MAX_ALLOC_PER_TRANSACTION : MAX_FREE_PER_TRANSACTION);
664 }
665 this->dispatch_deferred_writes();
666 this->process_writeback_dirty_entries();
667 {
668 std::lock_guard locker(m_lock);
669 wake_up_requested = this->m_wake_up_requested;
670 }
671 } while (wake_up_requested && --max_iterations > 0);
672
673 {
674 std::lock_guard locker(m_lock);
675 this->m_wake_up_scheduled = false;
676 // Reschedule if it's still requested
677 if (this->m_wake_up_requested) {
678 this->wake_up();
679 }
680 }
681 }
682
683 /**
684 * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
685 * that are eligible to be retired. Returns true if anything was
686 * retired.
687 *
688 */
689 template <typename I>
690 bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
691 CephContext *cct = m_image_ctx.cct;
692 GenericLogEntriesVector retiring_entries;
693 uint64_t initial_first_valid_entry;
694 uint64_t first_valid_entry;
695
696 std::lock_guard retire_locker(this->m_log_retire_lock);
697 ldout(cct, 20) << "Look for entries to retire" << dendl;
698 {
699 // Entry readers can't be added while we hold m_entry_reader_lock
700 RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
701 std::lock_guard locker(m_lock);
702 initial_first_valid_entry = m_first_valid_entry;
703 first_valid_entry = m_first_valid_entry;
704 while (retiring_entries.size() < frees_per_tx && !m_log_entries.empty()) {
705 GenericLogEntriesVector retiring_subentries;
706 uint64_t control_block_pos = m_log_entries.front()->log_entry_index;
707 uint64_t data_length = 0;
708 for (auto it = m_log_entries.begin(); it != m_log_entries.end(); ++it) {
709 if (this->can_retire_entry(*it)) {
710 // log_entry_index is valid after appending to SSD
711 if ((*it)->log_entry_index != control_block_pos) {
712 ldout(cct, 20) << "Old log_entry_index is " << control_block_pos
713 << ",New log_entry_index is "
714 << (*it)->log_entry_index
715 << ",data length is " << data_length << dendl;
716 ldout(cct, 20) << "The log entry is " << *(*it) << dendl;
717 if ((*it)->log_entry_index < control_block_pos) {
718 ceph_assert((*it)->log_entry_index ==
719 (control_block_pos + data_length + MIN_WRITE_ALLOC_SSD_SIZE) %
720 this->m_log_pool_size + DATA_RING_BUFFER_OFFSET);
721 } else {
722 ceph_assert((*it)->log_entry_index == control_block_pos +
723 data_length + MIN_WRITE_ALLOC_SSD_SIZE);
724 }
725 break;
726 } else {
727 retiring_subentries.push_back(*it);
728 if ((*it)->is_write_entry()) {
729 data_length += (*it)->get_aligned_data_size();
730 }
731 }
732 } else {
733 retiring_subentries.clear();
734 break;
735 }
736 }
737 // SSD: retiring_subentries in a span
738 if (!retiring_subentries.empty()) {
739 for (auto it = retiring_subentries.begin();
740 it != retiring_subentries.end(); it++) {
741 ceph_assert(m_log_entries.front() == *it);
742 m_log_entries.pop_front();
743 if ((*it)->write_bytes() > 0 || (*it)->bytes_dirty() > 0) {
744 auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(*it);
745 if (gen_write_entry) {
746 this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
747 }
748 }
749 }
750
751 ldout(cct, 20) << "span with " << retiring_subentries.size()
752 << " entries: control_block_pos=" << control_block_pos
753 << " data_length=" << data_length
754 << dendl;
755 retiring_entries.insert(
756 retiring_entries.end(), retiring_subentries.begin(),
757 retiring_subentries.end());
758
759 first_valid_entry = control_block_pos + data_length +
760 MIN_WRITE_ALLOC_SSD_SIZE;
761 if (first_valid_entry >= this->m_log_pool_size) {
762 first_valid_entry = first_valid_entry % this->m_log_pool_size +
763 DATA_RING_BUFFER_OFFSET;
764 }
765 } else {
766 break;
767 }
768 }
769 }
770 if (retiring_entries.size()) {
771 ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries"
772 << dendl;
773
774 // Advance first valid entry and release buffers
775 uint64_t flushed_sync_gen;
776 std::lock_guard append_locker(this->m_log_append_lock);
777 {
778 std::lock_guard locker(m_lock);
779 flushed_sync_gen = this->m_flushed_sync_gen;
780 }
781
782 ceph_assert(first_valid_entry != initial_first_valid_entry);
783 auto new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
784 new_root->flushed_sync_gen = flushed_sync_gen;
785 new_root->first_valid_entry = first_valid_entry;
786 pool_root.flushed_sync_gen = flushed_sync_gen;
787 pool_root.first_valid_entry = first_valid_entry;
788
789 Context *ctx = new LambdaContext(
790 [this, first_valid_entry, initial_first_valid_entry,
791 retiring_entries](int r) {
792 uint64_t allocated_bytes = 0;
793 uint64_t cached_bytes = 0;
794 uint64_t former_log_pos = 0;
795 for (auto &entry : retiring_entries) {
796 ceph_assert(entry->log_entry_index != 0);
797 if (entry->log_entry_index != former_log_pos ) {
798 // Space for control blocks
799 allocated_bytes += MIN_WRITE_ALLOC_SSD_SIZE;
800 former_log_pos = entry->log_entry_index;
801 }
802 if (entry->is_write_entry()) {
803 cached_bytes += entry->write_bytes();
804 // space for userdata
805 allocated_bytes += entry->get_aligned_data_size();
806 }
807 }
808 {
809 std::lock_guard locker(m_lock);
810 m_first_valid_entry = first_valid_entry;
811 ceph_assert(m_first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0);
812 ceph_assert(this->m_bytes_allocated >= allocated_bytes);
813 this->m_bytes_allocated -= allocated_bytes;
814 ceph_assert(this->m_bytes_cached >= cached_bytes);
815 this->m_bytes_cached -= cached_bytes;
816
817 ldout(m_image_ctx.cct, 20)
818 << "Finished root update: initial_first_valid_entry="
819 << initial_first_valid_entry << ", m_first_valid_entry="
820 << m_first_valid_entry << ", release space = "
821 << allocated_bytes << ", m_bytes_allocated="
822 << m_bytes_allocated << ", release cached space="
823 << cached_bytes << ", m_bytes_cached="
824 << this->m_bytes_cached << dendl;
825
826 this->m_alloc_failed_since_retire = false;
827 this->wake_up();
828 }
829
830 this->dispatch_deferred_writes();
831 this->process_writeback_dirty_entries();
832 m_async_update_superblock--;
833 this->m_async_op_tracker.finish_op();
834 });
835
836 std::lock_guard locker(m_lock);
837 schedule_update_root(new_root, ctx);
838 } else {
839 ldout(cct, 20) << "Nothing to retire" << dendl;
840 return false;
841 }
842 return true;
843 }
844
845 template <typename I>
846 void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
847 uint64_t* new_first_free_entry) {
848 GenericLogEntriesVector log_entries;
849 CephContext *cct = m_image_ctx.cct;
850 uint64_t span_payload_len = 0;
851 uint64_t bytes_to_free = 0;
852 ldout(cct, 20) << "Appending " << ops.size() << " log entries." << dendl;
853
854 *new_first_free_entry = pool_root.first_free_entry;
855 AioTransContext* aio = new AioTransContext(cct, ctx);
856
857 utime_t now = ceph_clock_now();
858 for (auto &operation : ops) {
859 operation->log_append_start_time = now;
860 auto log_entry = operation->get_log_entry();
861
862 if (log_entries.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES ||
863 span_payload_len >= SPAN_MAX_DATA_LEN) {
864 if (log_entries.size() > 1) {
865 bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
866 }
867 write_log_entries(log_entries, aio, new_first_free_entry);
868 log_entries.clear();
869 span_payload_len = 0;
870 }
871 log_entries.push_back(log_entry);
872 span_payload_len += log_entry->write_bytes();
873 }
874 if (!span_payload_len || !log_entries.empty()) {
875 if (log_entries.size() > 1) {
876 bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
877 }
878 write_log_entries(log_entries, aio, new_first_free_entry);
879 }
880
881 {
882 std::lock_guard locker1(m_lock);
883 m_first_free_entry = *new_first_free_entry;
884 m_bytes_allocated -= bytes_to_free;
885 }
886
887 bdev->aio_submit(&aio->ioc);
888 }
889
890 template <typename I>
891 void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
892 AioTransContext *aio, uint64_t *pos) {
893 CephContext *cct = m_image_ctx.cct;
894 ldout(m_image_ctx.cct, 20) << "pos=" << *pos << dendl;
895 ceph_assert(*pos >= DATA_RING_BUFFER_OFFSET &&
896 *pos < this->m_log_pool_size &&
897 *pos % MIN_WRITE_ALLOC_SSD_SIZE == 0);
898
899 // The first block is for log entries
900 uint64_t control_block_pos = *pos;
901 *pos += MIN_WRITE_ALLOC_SSD_SIZE;
902 if (*pos == this->m_log_pool_size) {
903 *pos = DATA_RING_BUFFER_OFFSET;
904 }
905
906 std::vector<WriteLogCacheEntry> persist_log_entries;
907 bufferlist data_bl;
908 for (auto &log_entry : log_entries) {
909 log_entry->log_entry_index = control_block_pos;
910 // Append data buffer for write operations
911 if (log_entry->is_write_entry()) {
912 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
913 auto cache_bl = write_entry->get_cache_bl();
914 auto align_size = write_entry->get_aligned_data_size();
915 data_bl.append(cache_bl);
916 data_bl.append_zero(align_size - cache_bl.length());
917
918 write_entry->ram_entry.write_data_pos = *pos;
919 *pos += align_size;
920 if (*pos >= this->m_log_pool_size) {
921 *pos = *pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
922 }
923 }
924 // push_back _after_ setting write_data_pos
925 persist_log_entries.push_back(log_entry->ram_entry);
926 }
927
928 //aio write
929 bufferlist bl;
930 encode(persist_log_entries, bl);
931 ceph_assert(bl.length() <= MIN_WRITE_ALLOC_SSD_SIZE);
932 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
933 bl.append(data_bl);
934 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
935 if (control_block_pos + bl.length() > this->m_log_pool_size) {
936 //exceeds border, need to split
937 uint64_t size = bl.length();
938 bufferlist bl1;
939 bl.splice(0, this->m_log_pool_size - control_block_pos, &bl1);
940 ceph_assert(bl.length() == (size - bl1.length()));
941
942 ldout(cct, 20) << "write " << control_block_pos << "~"
943 << size << " spans boundary, split into "
944 << control_block_pos << "~" << bl1.length()
945 << " and " << DATA_RING_BUFFER_OFFSET << "~"
946 << bl.length() << dendl;
947 bdev->aio_write(control_block_pos, bl1, &aio->ioc, false,
948 WRITE_LIFE_NOT_SET);
949 bdev->aio_write(DATA_RING_BUFFER_OFFSET, bl, &aio->ioc, false,
950 WRITE_LIFE_NOT_SET);
951 } else {
952 ldout(cct, 20) << "write " << control_block_pos << "~"
953 << bl.length() << dendl;
954 bdev->aio_write(control_block_pos, bl, &aio->ioc, false,
955 WRITE_LIFE_NOT_SET);
956 }
957 }
958
959 template <typename I>
960 void WriteLog<I>::schedule_update_root(
961 std::shared_ptr<WriteLogPoolRoot> root, Context *ctx) {
962 CephContext *cct = m_image_ctx.cct;
963 ldout(cct, 15) << "New root: pool_size=" << root->pool_size
964 << " first_valid_entry=" << root->first_valid_entry
965 << " first_free_entry=" << root->first_free_entry
966 << " flushed_sync_gen=" << root->flushed_sync_gen
967 << dendl;
968 ceph_assert(is_valid_pool_root(*root));
969
970 bool need_finisher;
971 {
972 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
973 need_finisher = m_poolroot_to_update.empty() && !m_updating_pool_root;
974 std::shared_ptr<WriteLogPoolRootUpdate> entry =
975 std::make_shared<WriteLogPoolRootUpdate>(root, ctx);
976 this->m_async_update_superblock++;
977 this->m_async_op_tracker.start_op();
978 m_poolroot_to_update.emplace_back(entry);
979 }
980 if (need_finisher) {
981 enlist_op_update_root();
982 }
983 }
984
985 template <typename I>
986 void WriteLog<I>::enlist_op_update_root() {
987 Context *append_ctx = new LambdaContext([this](int r) {
988 update_root_scheduled_ops();
989 });
990 this->m_work_queue.queue(append_ctx);
991 }
992
993 template <typename I>
994 void WriteLog<I>::update_root_scheduled_ops() {
995 ldout(m_image_ctx.cct, 20) << dendl;
996
997 std::shared_ptr<WriteLogPoolRoot> root;
998 WriteLogPoolRootUpdateList root_updates;
999 Context *ctx = nullptr;
1000 {
1001 std::lock_guard locker(m_lock);
1002 if (m_updating_pool_root) {
1003 /* Another thread is appending */
1004 ldout(m_image_ctx.cct, 15) << "Another thread is updating pool root"
1005 << dendl;
1006 return;
1007 }
1008 if (m_poolroot_to_update.size()) {
1009 m_updating_pool_root = true;
1010 root_updates.swap(m_poolroot_to_update);
1011 }
1012 }
1013 ceph_assert(!root_updates.empty());
1014 ldout(m_image_ctx.cct, 15) << "Update root number: " << root_updates.size()
1015 << dendl;
1016 // We just update the last one, and call all the completions.
1017 auto entry = root_updates.back();
1018 root = entry->root;
1019
1020 ctx = new LambdaContext([this, updates = std::move(root_updates)](int r) {
1021 ldout(m_image_ctx.cct, 15) << "Start to callback." << dendl;
1022 for (auto it = updates.begin(); it != updates.end(); it++) {
1023 Context *it_ctx = (*it)->ctx;
1024 it_ctx->complete(r);
1025 }
1026 });
1027 Context *append_ctx = new LambdaContext([this, ctx](int r) {
1028 ldout(m_image_ctx.cct, 15) << "Finish the update of pool root." << dendl;
1029 bool need_finisher = false;
1030 assert(r == 0);
1031 {
1032 std::lock_guard locker(m_lock);
1033 m_updating_pool_root = false;
1034 need_finisher = !m_poolroot_to_update.empty();
1035 }
1036 if (need_finisher) {
1037 enlist_op_update_root();
1038 }
1039 ctx->complete(r);
1040 });
1041 AioTransContext* aio = new AioTransContext(m_image_ctx.cct, append_ctx);
1042 update_pool_root(root, aio);
1043 }
1044
1045 template <typename I>
1046 void WriteLog<I>::update_pool_root(std::shared_ptr<WriteLogPoolRoot> root,
1047 AioTransContext *aio) {
1048 bufferlist bl;
1049 SuperBlock superblock;
1050 superblock.root = *root;
1051 encode(superblock, bl);
1052 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
1053 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
1054 bdev->aio_write(0, bl, &aio->ioc, false, WRITE_LIFE_NOT_SET);
1055 bdev->aio_submit(&aio->ioc);
1056 }
1057
1058 template <typename I>
1059 int WriteLog<I>::update_pool_root_sync(
1060 std::shared_ptr<WriteLogPoolRoot> root) {
1061 bufferlist bl;
1062 SuperBlock superblock;
1063 superblock.root = *root;
1064 encode(superblock, bl);
1065 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
1066 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
1067 return bdev->write(0, bl, false);
1068 }
1069
1070 template <typename I>
1071 void WriteLog<I>::aio_read_data_block(std::shared_ptr<GenericWriteLogEntry> log_entry,
1072 bufferlist *bl, Context *ctx) {
1073 std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries = {std::move(log_entry)};
1074 std::vector<bufferlist *> bls {bl};
1075 aio_read_data_blocks(log_entries, bls, ctx);
1076 }
1077
1078 template <typename I>
1079 void WriteLog<I>::aio_read_data_blocks(
1080 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries,
1081 std::vector<bufferlist *> &bls, Context *ctx) {
1082 ceph_assert(log_entries.size() == bls.size());
1083
1084 //get the valid part
1085 Context *read_ctx = new LambdaContext(
1086 [log_entries, bls, ctx](int r) {
1087 for (unsigned int i = 0; i < log_entries.size(); i++) {
1088 bufferlist valid_data_bl;
1089 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entries[i]);
1090 auto length = write_entry->ram_entry.is_write() ? write_entry->ram_entry.write_bytes
1091 : write_entry->ram_entry.ws_datalen;
1092
1093 valid_data_bl.substr_of(*bls[i], 0, length);
1094 bls[i]->clear();
1095 bls[i]->append(valid_data_bl);
1096 write_entry->dec_bl_refs();
1097 }
1098 ctx->complete(r);
1099 });
1100
1101 CephContext *cct = m_image_ctx.cct;
1102 AioTransContext *aio = new AioTransContext(cct, read_ctx);
1103 for (unsigned int i = 0; i < log_entries.size(); i++) {
1104 WriteLogCacheEntry *log_entry = &log_entries[i]->ram_entry;
1105
1106 ceph_assert(log_entry->is_write() || log_entry->is_writesame());
1107 uint64_t len = log_entry->is_write() ? log_entry->write_bytes :
1108 log_entry->ws_datalen;
1109 uint64_t align_len = round_up_to(len, MIN_WRITE_ALLOC_SSD_SIZE);
1110
1111 ldout(cct, 20) << "entry i=" << i << " " << log_entry->write_data_pos
1112 << "~" << len << dendl;
1113 ceph_assert(log_entry->write_data_pos >= DATA_RING_BUFFER_OFFSET &&
1114 log_entry->write_data_pos < pool_root.pool_size);
1115 ceph_assert(align_len);
1116 if (log_entry->write_data_pos + align_len > pool_root.pool_size) {
1117 // spans boundary, need to split
1118 uint64_t len1 = pool_root.pool_size - log_entry->write_data_pos;
1119 uint64_t len2 = align_len - len1;
1120
1121 ldout(cct, 20) << "read " << log_entry->write_data_pos << "~"
1122 << align_len << " spans boundary, split into "
1123 << log_entry->write_data_pos << "~" << len1
1124 << " and " << DATA_RING_BUFFER_OFFSET << "~"
1125 << len2 << dendl;
1126 bdev->aio_read(log_entry->write_data_pos, len1, bls[i], &aio->ioc);
1127 bdev->aio_read(DATA_RING_BUFFER_OFFSET, len2, bls[i], &aio->ioc);
1128 } else {
1129 ldout(cct, 20) << "read " << log_entry->write_data_pos << "~"
1130 << align_len << dendl;
1131 bdev->aio_read(log_entry->write_data_pos, align_len, bls[i], &aio->ioc);
1132 }
1133 }
1134 bdev->aio_submit(&aio->ioc);
1135 }
1136
1137 template <typename I>
1138 void WriteLog<I>::complete_user_request(Context *&user_req, int r) {
1139 m_image_ctx.op_work_queue->queue(user_req, r);
1140 }
1141
1142 } // namespace ssd
1143 } // namespace pwl
1144 } // namespace cache
1145 } // namespace librbd
1146
1147 template class librbd::cache::pwl::ssd::WriteLog<librbd::ImageCtx>;