]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/pwl/ssd/WriteLog.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / librbd / cache / pwl / ssd / WriteLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "WriteLog.h"
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/WorkQueue.h"
13 #include "common/Timer.h"
14 #include "common/perf_counters.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/asio/ContextWQ.h"
17 #include "librbd/cache/pwl/ImageCacheState.h"
18 #include "librbd/cache/pwl/LogEntry.h"
19 #include <map>
20 #include <vector>
21
22 #undef dout_subsys
23 #define dout_subsys ceph_subsys_rbd_pwl
24 #undef dout_prefix
25 #define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \
26 << this << " " << __func__ << ": "
27
28 namespace librbd {
29 namespace cache {
30 namespace pwl {
31 namespace ssd {
32
33 using namespace std;
34 using namespace librbd::cache::pwl;
35
36 static bool is_valid_pool_root(const WriteLogPoolRoot& root) {
37 return root.pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
38 root.first_valid_entry >= DATA_RING_BUFFER_OFFSET &&
39 root.first_valid_entry < root.pool_size &&
40 root.first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
41 root.first_free_entry >= DATA_RING_BUFFER_OFFSET &&
42 root.first_free_entry < root.pool_size &&
43 root.first_free_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0;
44 }
45
46 template <typename I>
47 Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() {
48 m_builderobj = new Builder<This>();
49 return m_builderobj;
50 }
51
52 template <typename I>
53 WriteLog<I>::WriteLog(
54 I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
55 cache::ImageWritebackInterface& image_writeback,
56 plugin::Api<I>& plugin_api)
57 : AbstractWriteLog<I>(image_ctx, cache_state, create_builder(),
58 image_writeback, plugin_api)
59 {
60 }
61
62 template <typename I>
63 WriteLog<I>::~WriteLog() {
64 delete m_builderobj;
65 }
66
67 template <typename I>
68 void WriteLog<I>::collect_read_extents(
69 uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
70 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
71 std::vector<bufferlist*> &bls_to_read,
72 uint64_t entry_hit_length, Extent hit_extent,
73 pwl::C_ReadRequest *read_ctx) {
74 // Make a bl for this hit extent. This will add references to the
75 // write_entry->cache_bl */
76 ldout(m_image_ctx.cct, 5) << dendl;
77 auto write_entry = std::static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
78 buffer::list hit_bl;
79 write_entry->copy_cache_bl(&hit_bl);
80 bool writesame = write_entry->is_writesame_entry();
81 auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
82 hit_extent, hit_bl, true, read_buffer_offset, writesame);
83 read_ctx->read_extents.push_back(hit_extent_buf);
84
85 if (!hit_bl.length()) {
86 ldout(m_image_ctx.cct, 5) << "didn't hit RAM" << dendl;
87 auto read_extent = read_ctx->read_extents.back();
88 write_entry->inc_bl_refs();
89 log_entries_to_read.push_back(std::move(write_entry));
90 bls_to_read.push_back(&read_extent->m_bl);
91 }
92 }
93
94 template <typename I>
95 void WriteLog<I>::complete_read(
96 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
97 std::vector<bufferlist*> &bls_to_read,
98 Context *ctx) {
99 if (!log_entries_to_read.empty()) {
100 aio_read_data_blocks(log_entries_to_read, bls_to_read, ctx);
101 } else {
102 ctx->complete(0);
103 }
104 }
105
106 template <typename I>
107 int WriteLog<I>::create_and_open_bdev() {
108 CephContext *cct = m_image_ctx.cct;
109
110 bdev = BlockDevice::create(cct, this->m_log_pool_name, aio_cache_cb,
111 nullptr, nullptr, nullptr);
112 int r = bdev->open(this->m_log_pool_name);
113 if (r < 0) {
114 lderr(cct) << "failed to open bdev" << dendl;
115 delete bdev;
116 return r;
117 }
118
119 ceph_assert(this->m_log_pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0);
120 if (bdev->get_size() != this->m_log_pool_size) {
121 lderr(cct) << "size mismatch: bdev size " << bdev->get_size()
122 << " (block size " << bdev->get_block_size()
123 << ") != pool size " << this->m_log_pool_size << dendl;
124 bdev->close();
125 delete bdev;
126 return -EINVAL;
127 }
128
129 return 0;
130 }
131
132 template <typename I>
133 bool WriteLog<I>::initialize_pool(Context *on_finish,
134 pwl::DeferredContexts &later) {
135 int r;
136 CephContext *cct = m_image_ctx.cct;
137
138 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
139 if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
140 int fd = ::open(this->m_log_pool_name.c_str(), O_RDWR|O_CREAT, 0644);
141 bool succeed = true;
142 if (fd >= 0) {
143 if (truncate(this->m_log_pool_name.c_str(),
144 this->m_log_pool_size) != 0) {
145 succeed = false;
146 }
147 ::close(fd);
148 } else {
149 succeed = false;
150 }
151 if (!succeed) {
152 m_cache_state->present = false;
153 m_cache_state->clean = true;
154 m_cache_state->empty = true;
155 /* TODO: filter/replace errnos that are meaningless to the caller */
156 on_finish->complete(-errno);
157 return false;
158 }
159
160 r = create_and_open_bdev();
161 if (r < 0) {
162 on_finish->complete(r);
163 return false;
164 }
165 m_cache_state->present = true;
166 m_cache_state->clean = true;
167 m_cache_state->empty = true;
168 /* new pool, calculate and store metadata */
169
170 /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free.
171 * In this way, when all ring buffer spaces are allocated,
172 * m_first_free_entry and m_first_valid_entry will not be equal.
173 * Equal only means the cache is empty. */
174 this->m_bytes_allocated_cap = this->m_log_pool_size -
175 DATA_RING_BUFFER_OFFSET - MIN_WRITE_ALLOC_SSD_SIZE;
176 /* Log ring empty */
177 m_first_free_entry = DATA_RING_BUFFER_OFFSET;
178 m_first_valid_entry = DATA_RING_BUFFER_OFFSET;
179
180 auto new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
181 new_root->layout_version = SSD_LAYOUT_VERSION;
182 new_root->pool_size = this->m_log_pool_size;
183 new_root->flushed_sync_gen = this->m_flushed_sync_gen;
184 new_root->block_size = MIN_WRITE_ALLOC_SSD_SIZE;
185 new_root->first_free_entry = m_first_free_entry;
186 new_root->first_valid_entry = m_first_valid_entry;
187 new_root->num_log_entries = 0;
188 pool_root = *new_root;
189
190 r = update_pool_root_sync(new_root);
191 if (r != 0) {
192 lderr(cct) << "failed to initialize pool ("
193 << this->m_log_pool_name << ")" << dendl;
194 bdev->close();
195 delete bdev;
196 on_finish->complete(r);
197 return false;
198 }
199 } else {
200 ceph_assert(m_cache_state->present);
201 r = create_and_open_bdev();
202 if (r < 0) {
203 on_finish->complete(r);
204 return false;
205 }
206
207 bufferlist bl;
208 SuperBlock superblock;
209 ::IOContext ioctx(cct, nullptr);
210 r = bdev->read(0, MIN_WRITE_ALLOC_SSD_SIZE, &bl, &ioctx, false);
211 if (r < 0) {
212 lderr(cct) << "read ssd cache superblock failed " << dendl;
213 goto err_close_bdev;
214 }
215 auto p = bl.cbegin();
216 decode(superblock, p);
217 pool_root = superblock.root;
218 ldout(cct, 1) << "Decoded root: pool_size=" << pool_root.pool_size
219 << " first_valid_entry=" << pool_root.first_valid_entry
220 << " first_free_entry=" << pool_root.first_free_entry
221 << " flushed_sync_gen=" << pool_root.flushed_sync_gen
222 << dendl;
223 ceph_assert(is_valid_pool_root(pool_root));
224 if (pool_root.layout_version != SSD_LAYOUT_VERSION) {
225 lderr(cct) << "pool layout version is "
226 << pool_root.layout_version
227 << " expected " << SSD_LAYOUT_VERSION
228 << dendl;
229 goto err_close_bdev;
230 }
231 if (pool_root.block_size != MIN_WRITE_ALLOC_SSD_SIZE) {
232 lderr(cct) << "pool block size is " << pool_root.block_size
233 << " expected " << MIN_WRITE_ALLOC_SSD_SIZE
234 << dendl;
235 goto err_close_bdev;
236 }
237
238 this->m_log_pool_size = pool_root.pool_size;
239 this->m_flushed_sync_gen = pool_root.flushed_sync_gen;
240 this->m_first_valid_entry = pool_root.first_valid_entry;
241 this->m_first_free_entry = pool_root.first_free_entry;
242 this->m_bytes_allocated_cap = this->m_log_pool_size -
243 DATA_RING_BUFFER_OFFSET -
244 MIN_WRITE_ALLOC_SSD_SIZE;
245
246 load_existing_entries(later);
247 m_cache_state->clean = this->m_dirty_log_entries.empty();
248 m_cache_state->empty = m_log_entries.empty();
249 }
250 return true;
251
252 err_close_bdev:
253 bdev->close();
254 delete bdev;
255 on_finish->complete(-EINVAL);
256 return false;
257 }
258
259 template <typename I>
260 void WriteLog<I>::remove_pool_file() {
261 ceph_assert(bdev);
262 bdev->close();
263 delete bdev;
264 bdev = nullptr;
265 ldout(m_image_ctx.cct, 5) << "block device is closed" << dendl;
266
267 if (m_cache_state->clean) {
268 ldout(m_image_ctx.cct, 5) << "Removing empty pool file: "
269 << this->m_log_pool_name << dendl;
270 if (remove(this->m_log_pool_name.c_str()) != 0) {
271 lderr(m_image_ctx.cct) << "failed to remove empty pool \""
272 << this->m_log_pool_name << "\": " << dendl;
273 } else {
274 m_cache_state->present = false;
275 }
276 } else {
277 ldout(m_image_ctx.cct, 5) << "Not removing pool file: "
278 << this->m_log_pool_name << dendl;
279 }
280 }
281
282 template <typename I>
283 void WriteLog<I>::load_existing_entries(pwl::DeferredContexts &later) {
284 CephContext *cct = m_image_ctx.cct;
285 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
286 std::map<uint64_t, bool> missing_sync_points;
287
288 // Iterate through the log_entries and append all the write_bytes
289 // of each entry to fetch the pos of next 4k of log_entries. Iterate
290 // through the log entries and append them to the in-memory vector
291 for (uint64_t next_log_pos = this->m_first_valid_entry;
292 next_log_pos != this->m_first_free_entry; ) {
293 // read the entries from SSD cache and decode
294 bufferlist bl_entries;
295 ::IOContext ioctx_entry(cct, nullptr);
296 bdev->read(next_log_pos, MIN_WRITE_ALLOC_SSD_SIZE, &bl_entries,
297 &ioctx_entry, false);
298 std::vector<WriteLogCacheEntry> ssd_log_entries;
299 auto pl = bl_entries.cbegin();
300 decode(ssd_log_entries, pl);
301 ldout(cct, 5) << "decoded ssd log entries" << dendl;
302 uint64_t curr_log_pos = next_log_pos;
303 std::shared_ptr<GenericLogEntry> log_entry = nullptr;
304
305 for (auto it = ssd_log_entries.begin(); it != ssd_log_entries.end(); ++it) {
306 this->update_entries(&log_entry, &*it, missing_sync_points,
307 sync_point_entries, curr_log_pos);
308 log_entry->ram_entry = *it;
309 log_entry->log_entry_index = curr_log_pos;
310 log_entry->completed = true;
311 m_log_entries.push_back(log_entry);
312 next_log_pos += round_up_to(it->write_bytes, MIN_WRITE_ALLOC_SSD_SIZE);
313 }
314 // along with the write_bytes, add control block size too
315 next_log_pos += MIN_WRITE_ALLOC_SSD_SIZE;
316 if (next_log_pos >= this->m_log_pool_size) {
317 next_log_pos = next_log_pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
318 }
319 }
320 this->update_sync_points(missing_sync_points, sync_point_entries, later);
321 if (m_first_valid_entry > m_first_free_entry) {
322 m_bytes_allocated = this->m_log_pool_size - m_first_valid_entry +
323 m_first_free_entry - DATA_RING_BUFFER_OFFSET;
324 } else {
325 m_bytes_allocated = m_first_free_entry - m_first_valid_entry;
326 }
327 }
328
329 // For SSD we don't calc m_bytes_allocated in this
330 template <typename I>
331 void WriteLog<I>::inc_allocated_cached_bytes(
332 std::shared_ptr<pwl::GenericLogEntry> log_entry) {
333 if (log_entry->is_write_entry()) {
334 this->m_bytes_cached += log_entry->write_bytes();
335 }
336 }
337
338 template <typename I>
339 bool WriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
340 bool alloc_succeeds = true;
341 uint64_t bytes_allocated = 0;
342 uint64_t bytes_cached = 0;
343 uint64_t bytes_dirtied = 0;
344 uint64_t num_lanes = 0;
345 uint64_t num_unpublished_reserves = 0;
346 uint64_t num_log_entries = 0;
347
348 // Setup buffer, and get all the number of required resources
349 req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated,
350 &num_lanes, &num_log_entries,
351 &num_unpublished_reserves);
352
353 ceph_assert(!num_lanes);
354 if (num_log_entries) {
355 bytes_allocated += num_log_entries * MIN_WRITE_ALLOC_SSD_SIZE;
356 num_log_entries = 0;
357 }
358 ceph_assert(!num_unpublished_reserves);
359
360 alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied,
361 bytes_allocated, num_lanes,
362 num_log_entries,
363 num_unpublished_reserves);
364 req->set_allocated(alloc_succeeds);
365 return alloc_succeeds;
366 }
367
368 template <typename I>
369 bool WriteLog<I>::has_sync_point_logs(GenericLogOperations &ops) {
370 for (auto &op : ops) {
371 if (op->get_log_entry()->is_sync_point()) {
372 return true;
373 break;
374 }
375 }
376 return false;
377 }
378
379 template<typename I>
380 void WriteLog<I>::enlist_op_appender() {
381 this->m_async_append_ops++;
382 this->m_async_op_tracker.start_op();
383 Context *append_ctx = new LambdaContext([this](int r) {
384 append_scheduled_ops();
385 });
386 this->m_work_queue.queue(append_ctx);
387 }
388
389 /*
390 * Takes custody of ops. They'll all get their log entries appended,
391 * and have their on_write_persist contexts completed once they and
392 * all prior log entries are persisted everywhere.
393 */
394 template<typename I>
395 void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req) {
396 bool need_finisher = false;
397 GenericLogOperationsVector appending;
398
399 std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
400 {
401 std::lock_guard locker(m_lock);
402
403 bool persist_on_flush = this->get_persist_on_flush();
404 need_finisher = !this->m_appending &&
405 ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) ||
406 !persist_on_flush);
407
408 // Only flush logs into SSD when there is internal/external flush request
409 if (!need_finisher) {
410 need_finisher = has_sync_point_logs(ops);
411 }
412 this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
413
414 // To preserve the order of overlapping IOs, release_cell() may be
415 // called only after the ops are added to m_ops_to_append.
416 // As soon as m_lock is released, the appended ops can be picked up
417 // by append_scheduled_ops() in another thread and req can be freed.
418 if (req != nullptr) {
419 if (persist_on_flush) {
420 req->complete_user_request(0);
421 }
422 req->release_cell();
423 }
424 }
425
426 if (need_finisher) {
427 this->enlist_op_appender();
428 }
429
430 for (auto &op : appending) {
431 op->appending();
432 }
433 }
434
435 template <typename I>
436 void WriteLog<I>::setup_schedule_append(pwl::GenericLogOperationsVector &ops,
437 bool do_early_flush,
438 C_BlockIORequestT *req) {
439 this->schedule_append(ops, req);
440 }
441
442 template <typename I>
443 void WriteLog<I>::append_scheduled_ops(void) {
444 GenericLogOperations ops;
445 ldout(m_image_ctx.cct, 20) << dendl;
446
447 bool ops_remain = false; // unused, no-op variable for SSD
448 bool appending = false; // unused, no-op variable for SSD
449 this->append_scheduled(ops, ops_remain, appending);
450
451 if (ops.size()) {
452 alloc_op_log_entries(ops);
453 append_op_log_entries(ops);
454 } else {
455 this->m_async_append_ops--;
456 this->m_async_op_tracker.finish_op();
457 }
458 }
459
460 /*
461 * Write and persist the (already allocated) write log entries and
462 * data buffer allocations for a set of ops. The data buffer for each
463 * of these must already have been persisted to its reserved area.
464 */
465 template <typename I>
466 void WriteLog<I>::append_op_log_entries(GenericLogOperations &ops) {
467 ceph_assert(!ops.empty());
468 ldout(m_image_ctx.cct, 20) << dendl;
469 Context *ctx = new LambdaContext([this, ops](int r) {
470 assert(r == 0);
471 ldout(m_image_ctx.cct, 20) << "Finished root update " << dendl;
472
473 auto captured_ops = std::move(ops);
474 this->complete_op_log_entries(std::move(captured_ops), r);
475
476 bool need_finisher = false;
477 {
478 std::lock_guard locker1(m_lock);
479 bool persist_on_flush = this->get_persist_on_flush();
480 need_finisher = ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) ||
481 !persist_on_flush);
482
483 if (!need_finisher) {
484 need_finisher = has_sync_point_logs(this->m_ops_to_append);
485 }
486 }
487
488 if (need_finisher) {
489 this->enlist_op_appender();
490 }
491 this->m_async_update_superblock--;
492 this->m_async_op_tracker.finish_op();
493 });
494 uint64_t *new_first_free_entry = new(uint64_t);
495 Context *append_ctx = new LambdaContext(
496 [this, new_first_free_entry, ops, ctx](int r) {
497 std::shared_ptr<WriteLogPoolRoot> new_root;
498 {
499 ldout(m_image_ctx.cct, 20) << "Finished appending at "
500 << *new_first_free_entry << dendl;
501 utime_t now = ceph_clock_now();
502 for (auto &operation : ops) {
503 operation->log_append_comp_time = now;
504 }
505
506 std::lock_guard locker(this->m_log_append_lock);
507 std::lock_guard locker1(m_lock);
508 assert(this->m_appending);
509 this->m_appending = false;
510 new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
511 pool_root.first_free_entry = *new_first_free_entry;
512 new_root->first_free_entry = *new_first_free_entry;
513 delete new_first_free_entry;
514 schedule_update_root(new_root, ctx);
515 }
516 this->m_async_append_ops--;
517 this->m_async_op_tracker.finish_op();
518 });
519 // Append logs and update first_free_update
520 append_ops(ops, append_ctx, new_first_free_entry);
521
522 if (ops.size()) {
523 this->dispatch_deferred_writes();
524 }
525 }
526
527 template <typename I>
528 void WriteLog<I>::release_ram(std::shared_ptr<GenericLogEntry> log_entry) {
529 log_entry->remove_cache_bl();
530 }
531
532 template <typename I>
533 void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops) {
534 std::unique_lock locker(m_lock);
535
536 for (auto &operation : ops) {
537 auto &log_entry = operation->get_log_entry();
538 log_entry->ram_entry.set_entry_valid(true);
539 m_log_entries.push_back(log_entry);
540 ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
541 }
542 if (m_cache_state->empty && !m_log_entries.empty()) {
543 m_cache_state->empty = false;
544 this->update_image_cache_state();
545 this->write_image_cache_state(locker);
546 }
547 }
548
549 template <typename I>
550 void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
551 DeferredContexts &post_unlock,
552 bool has_write_entry) {
553 // snapshot so we behave consistently
554 bool invalidating = this->m_invalidating;
555
556 if (invalidating || !has_write_entry) {
557 for (auto &log_entry : entries_to_flush) {
558 GuardedRequestFunctionContext *guarded_ctx =
559 new GuardedRequestFunctionContext([this, log_entry, invalidating]
560 (GuardedRequestFunctionContext &guard_ctx) {
561 log_entry->m_cell = guard_ctx.cell;
562 Context *ctx = this->construct_flush_entry(log_entry, invalidating);
563
564 if (!invalidating) {
565 ctx = new LambdaContext([this, log_entry, ctx](int r) {
566 m_image_ctx.op_work_queue->queue(new LambdaContext(
567 [this, log_entry, ctx](int r) {
568 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
569 << " " << *log_entry << dendl;
570 log_entry->writeback(this->m_image_writeback, ctx);
571 }), 0);
572 });
573 }
574 ctx->complete(0);
575 });
576 this->detain_flush_guard_request(log_entry, guarded_ctx);
577 }
578 } else {
579 int count = entries_to_flush.size();
580 std::vector<std::shared_ptr<GenericWriteLogEntry>> write_entries;
581 std::vector<bufferlist *> read_bls;
582
583 write_entries.reserve(count);
584 read_bls.reserve(count);
585
586 for (auto &log_entry : entries_to_flush) {
587 if (log_entry->is_write_entry()) {
588 bufferlist *bl = new bufferlist;
589 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
590 write_entry->inc_bl_refs();
591 write_entries.push_back(write_entry);
592 read_bls.push_back(bl);
593 }
594 }
595
596 Context *ctx = new LambdaContext(
597 [this, entries_to_flush, read_bls](int r) {
598 int i = 0;
599 GuardedRequestFunctionContext *guarded_ctx = nullptr;
600
601 for (auto &log_entry : entries_to_flush) {
602 if (log_entry->is_write_entry()) {
603 bufferlist captured_entry_bl;
604 captured_entry_bl.claim_append(*read_bls[i]);
605 delete read_bls[i++];
606
607 guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl]
608 (GuardedRequestFunctionContext &guard_ctx) {
609 log_entry->m_cell = guard_ctx.cell;
610 Context *ctx = this->construct_flush_entry(log_entry, false);
611
612 m_image_ctx.op_work_queue->queue(new LambdaContext(
613 [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
614 auto captured_entry_bl = std::move(entry_bl);
615 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
616 << " " << *log_entry << dendl;
617 log_entry->writeback_bl(this->m_image_writeback, ctx,
618 std::move(captured_entry_bl));
619 }), 0);
620 });
621 } else {
622 guarded_ctx = new GuardedRequestFunctionContext([this, log_entry]
623 (GuardedRequestFunctionContext &guard_ctx) {
624 log_entry->m_cell = guard_ctx.cell;
625 Context *ctx = this->construct_flush_entry(log_entry, false);
626 m_image_ctx.op_work_queue->queue(new LambdaContext(
627 [this, log_entry, ctx](int r) {
628 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
629 << " " << *log_entry << dendl;
630 log_entry->writeback(this->m_image_writeback, ctx);
631 }), 0);
632 });
633 }
634 this->detain_flush_guard_request(log_entry, guarded_ctx);
635 }
636 });
637
638 aio_read_data_blocks(write_entries, read_bls, ctx);
639 }
640 }
641
642 template <typename I>
643 void WriteLog<I>::process_work() {
644 CephContext *cct = m_image_ctx.cct;
645 int max_iterations = 4;
646 bool wake_up_requested = false;
647 uint64_t aggressive_high_water_bytes =
648 this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
649 uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
650
651 ldout(cct, 20) << dendl;
652
653 do {
654 {
655 std::lock_guard locker(m_lock);
656 this->m_wake_up_requested = false;
657 }
658 if (this->m_alloc_failed_since_retire || (this->m_shutting_down) ||
659 this->m_invalidating || m_bytes_allocated > high_water_bytes) {
660 ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
661 << ", allocated > high_water="
662 << (m_bytes_allocated > high_water_bytes)
663 << dendl;
664 retire_entries((this->m_shutting_down || this->m_invalidating ||
665 m_bytes_allocated > aggressive_high_water_bytes)
666 ? MAX_ALLOC_PER_TRANSACTION : MAX_FREE_PER_TRANSACTION);
667 }
668 this->dispatch_deferred_writes();
669 this->process_writeback_dirty_entries();
670 {
671 std::lock_guard locker(m_lock);
672 wake_up_requested = this->m_wake_up_requested;
673 }
674 } while (wake_up_requested && --max_iterations > 0);
675
676 {
677 std::lock_guard locker(m_lock);
678 this->m_wake_up_scheduled = false;
679 // Reschedule if it's still requested
680 if (this->m_wake_up_requested) {
681 this->wake_up();
682 }
683 }
684 }
685
686 /**
687 * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
688 * that are eligible to be retired. Returns true if anything was
689 * retired.
690 *
691 */
692 template <typename I>
693 bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
694 CephContext *cct = m_image_ctx.cct;
695 GenericLogEntriesVector retiring_entries;
696 uint64_t initial_first_valid_entry;
697 uint64_t first_valid_entry;
698
699 std::lock_guard retire_locker(this->m_log_retire_lock);
700 ldout(cct, 20) << "Look for entries to retire" << dendl;
701 {
702 // Entry readers can't be added while we hold m_entry_reader_lock
703 RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
704 std::lock_guard locker(m_lock);
705 initial_first_valid_entry = m_first_valid_entry;
706 first_valid_entry = m_first_valid_entry;
707 while (retiring_entries.size() < frees_per_tx && !m_log_entries.empty()) {
708 GenericLogEntriesVector retiring_subentries;
709 uint64_t control_block_pos = m_log_entries.front()->log_entry_index;
710 uint64_t data_length = 0;
711 for (auto it = m_log_entries.begin(); it != m_log_entries.end(); ++it) {
712 if (this->can_retire_entry(*it)) {
713 // log_entry_index is valid after appending to SSD
714 if ((*it)->log_entry_index != control_block_pos) {
715 ldout(cct, 20) << "Old log_entry_index is " << control_block_pos
716 << ",New log_entry_index is "
717 << (*it)->log_entry_index
718 << ",data length is " << data_length << dendl;
719 ldout(cct, 20) << "The log entry is " << *(*it) << dendl;
720 if ((*it)->log_entry_index < control_block_pos) {
721 ceph_assert((*it)->log_entry_index ==
722 (control_block_pos + data_length + MIN_WRITE_ALLOC_SSD_SIZE) %
723 this->m_log_pool_size + DATA_RING_BUFFER_OFFSET);
724 } else {
725 ceph_assert((*it)->log_entry_index == control_block_pos +
726 data_length + MIN_WRITE_ALLOC_SSD_SIZE);
727 }
728 break;
729 } else {
730 retiring_subentries.push_back(*it);
731 if ((*it)->is_write_entry()) {
732 data_length += (*it)->get_aligned_data_size();
733 }
734 }
735 } else {
736 retiring_subentries.clear();
737 break;
738 }
739 }
740 // SSD: retiring_subentries in a span
741 if (!retiring_subentries.empty()) {
742 for (auto it = retiring_subentries.begin();
743 it != retiring_subentries.end(); it++) {
744 ceph_assert(m_log_entries.front() == *it);
745 m_log_entries.pop_front();
746 if ((*it)->write_bytes() > 0 || (*it)->bytes_dirty() > 0) {
747 auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(*it);
748 if (gen_write_entry) {
749 this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
750 }
751 }
752 }
753
754 ldout(cct, 20) << "span with " << retiring_subentries.size()
755 << " entries: control_block_pos=" << control_block_pos
756 << " data_length=" << data_length
757 << dendl;
758 retiring_entries.insert(
759 retiring_entries.end(), retiring_subentries.begin(),
760 retiring_subentries.end());
761
762 first_valid_entry = control_block_pos + data_length +
763 MIN_WRITE_ALLOC_SSD_SIZE;
764 if (first_valid_entry >= this->m_log_pool_size) {
765 first_valid_entry = first_valid_entry % this->m_log_pool_size +
766 DATA_RING_BUFFER_OFFSET;
767 }
768 } else {
769 break;
770 }
771 }
772 }
773 if (retiring_entries.size()) {
774 ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries"
775 << dendl;
776
777 // Advance first valid entry and release buffers
778 uint64_t flushed_sync_gen;
779 std::lock_guard append_locker(this->m_log_append_lock);
780 {
781 std::lock_guard locker(m_lock);
782 flushed_sync_gen = this->m_flushed_sync_gen;
783 }
784
785 ceph_assert(first_valid_entry != initial_first_valid_entry);
786 auto new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
787 new_root->flushed_sync_gen = flushed_sync_gen;
788 new_root->first_valid_entry = first_valid_entry;
789 pool_root.flushed_sync_gen = flushed_sync_gen;
790 pool_root.first_valid_entry = first_valid_entry;
791
792 Context *ctx = new LambdaContext(
793 [this, first_valid_entry, initial_first_valid_entry,
794 retiring_entries](int r) {
795 uint64_t allocated_bytes = 0;
796 uint64_t cached_bytes = 0;
797 uint64_t former_log_pos = 0;
798 for (auto &entry : retiring_entries) {
799 ceph_assert(entry->log_entry_index != 0);
800 if (entry->log_entry_index != former_log_pos ) {
801 // Space for control blocks
802 allocated_bytes += MIN_WRITE_ALLOC_SSD_SIZE;
803 former_log_pos = entry->log_entry_index;
804 }
805 if (entry->is_write_entry()) {
806 cached_bytes += entry->write_bytes();
807 // space for userdata
808 allocated_bytes += entry->get_aligned_data_size();
809 }
810 }
811 bool need_update_state = false;
812 {
813 std::lock_guard locker(m_lock);
814 m_first_valid_entry = first_valid_entry;
815 ceph_assert(m_first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0);
816 ceph_assert(this->m_bytes_allocated >= allocated_bytes);
817 this->m_bytes_allocated -= allocated_bytes;
818 ceph_assert(this->m_bytes_cached >= cached_bytes);
819 this->m_bytes_cached -= cached_bytes;
820 if (!m_cache_state->empty && m_log_entries.empty()) {
821 m_cache_state->empty = true;
822 this->update_image_cache_state();
823 need_update_state = true;
824 }
825
826 ldout(m_image_ctx.cct, 20)
827 << "Finished root update: initial_first_valid_entry="
828 << initial_first_valid_entry << ", m_first_valid_entry="
829 << m_first_valid_entry << ", release space = "
830 << allocated_bytes << ", m_bytes_allocated="
831 << m_bytes_allocated << ", release cached space="
832 << cached_bytes << ", m_bytes_cached="
833 << this->m_bytes_cached << dendl;
834
835 this->m_alloc_failed_since_retire = false;
836 this->wake_up();
837 }
838 if (need_update_state) {
839 std::unique_lock locker(m_lock);
840 this->write_image_cache_state(locker);
841 }
842
843 this->dispatch_deferred_writes();
844 this->process_writeback_dirty_entries();
845 m_async_update_superblock--;
846 this->m_async_op_tracker.finish_op();
847 });
848
849 std::lock_guard locker(m_lock);
850 schedule_update_root(new_root, ctx);
851 } else {
852 ldout(cct, 20) << "Nothing to retire" << dendl;
853 return false;
854 }
855 return true;
856 }
857
858 template <typename I>
859 void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
860 uint64_t* new_first_free_entry) {
861 GenericLogEntriesVector log_entries;
862 CephContext *cct = m_image_ctx.cct;
863 uint64_t span_payload_len = 0;
864 uint64_t bytes_to_free = 0;
865 ldout(cct, 20) << "Appending " << ops.size() << " log entries." << dendl;
866
867 *new_first_free_entry = pool_root.first_free_entry;
868 AioTransContext* aio = new AioTransContext(cct, ctx);
869
870 utime_t now = ceph_clock_now();
871 for (auto &operation : ops) {
872 operation->log_append_start_time = now;
873 auto log_entry = operation->get_log_entry();
874
875 if (log_entries.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES ||
876 span_payload_len >= SPAN_MAX_DATA_LEN) {
877 if (log_entries.size() > 1) {
878 bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
879 }
880 write_log_entries(log_entries, aio, new_first_free_entry);
881 log_entries.clear();
882 span_payload_len = 0;
883 }
884 log_entries.push_back(log_entry);
885 span_payload_len += log_entry->write_bytes();
886 }
887 if (!span_payload_len || !log_entries.empty()) {
888 if (log_entries.size() > 1) {
889 bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
890 }
891 write_log_entries(log_entries, aio, new_first_free_entry);
892 }
893
894 {
895 std::lock_guard locker1(m_lock);
896 m_first_free_entry = *new_first_free_entry;
897 m_bytes_allocated -= bytes_to_free;
898 }
899
900 bdev->aio_submit(&aio->ioc);
901 }
902
903 template <typename I>
904 void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
905 AioTransContext *aio, uint64_t *pos) {
906 CephContext *cct = m_image_ctx.cct;
907 ldout(m_image_ctx.cct, 20) << "pos=" << *pos << dendl;
908 ceph_assert(*pos >= DATA_RING_BUFFER_OFFSET &&
909 *pos < this->m_log_pool_size &&
910 *pos % MIN_WRITE_ALLOC_SSD_SIZE == 0);
911
912 // The first block is for log entries
913 uint64_t control_block_pos = *pos;
914 *pos += MIN_WRITE_ALLOC_SSD_SIZE;
915 if (*pos == this->m_log_pool_size) {
916 *pos = DATA_RING_BUFFER_OFFSET;
917 }
918
919 std::vector<WriteLogCacheEntry> persist_log_entries;
920 bufferlist data_bl;
921 for (auto &log_entry : log_entries) {
922 log_entry->log_entry_index = control_block_pos;
923 // Append data buffer for write operations
924 if (log_entry->is_write_entry()) {
925 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
926 auto cache_bl = write_entry->get_cache_bl();
927 auto align_size = write_entry->get_aligned_data_size();
928 data_bl.append(cache_bl);
929 data_bl.append_zero(align_size - cache_bl.length());
930
931 write_entry->ram_entry.write_data_pos = *pos;
932 *pos += align_size;
933 if (*pos >= this->m_log_pool_size) {
934 *pos = *pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
935 }
936 }
937 // push_back _after_ setting write_data_pos
938 persist_log_entries.push_back(log_entry->ram_entry);
939 }
940
941 //aio write
942 bufferlist bl;
943 encode(persist_log_entries, bl);
944 ceph_assert(bl.length() <= MIN_WRITE_ALLOC_SSD_SIZE);
945 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
946 bl.append(data_bl);
947 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
948 if (control_block_pos + bl.length() > this->m_log_pool_size) {
949 //exceeds border, need to split
950 uint64_t size = bl.length();
951 bufferlist bl1;
952 bl.splice(0, this->m_log_pool_size - control_block_pos, &bl1);
953 ceph_assert(bl.length() == (size - bl1.length()));
954
955 ldout(cct, 20) << "write " << control_block_pos << "~"
956 << size << " spans boundary, split into "
957 << control_block_pos << "~" << bl1.length()
958 << " and " << DATA_RING_BUFFER_OFFSET << "~"
959 << bl.length() << dendl;
960 bdev->aio_write(control_block_pos, bl1, &aio->ioc, false,
961 WRITE_LIFE_NOT_SET);
962 bdev->aio_write(DATA_RING_BUFFER_OFFSET, bl, &aio->ioc, false,
963 WRITE_LIFE_NOT_SET);
964 } else {
965 ldout(cct, 20) << "write " << control_block_pos << "~"
966 << bl.length() << dendl;
967 bdev->aio_write(control_block_pos, bl, &aio->ioc, false,
968 WRITE_LIFE_NOT_SET);
969 }
970 }
971
972 template <typename I>
973 void WriteLog<I>::schedule_update_root(
974 std::shared_ptr<WriteLogPoolRoot> root, Context *ctx) {
975 CephContext *cct = m_image_ctx.cct;
976 ldout(cct, 15) << "New root: pool_size=" << root->pool_size
977 << " first_valid_entry=" << root->first_valid_entry
978 << " first_free_entry=" << root->first_free_entry
979 << " flushed_sync_gen=" << root->flushed_sync_gen
980 << dendl;
981 ceph_assert(is_valid_pool_root(*root));
982
983 bool need_finisher;
984 {
985 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
986 need_finisher = m_poolroot_to_update.empty() && !m_updating_pool_root;
987 std::shared_ptr<WriteLogPoolRootUpdate> entry =
988 std::make_shared<WriteLogPoolRootUpdate>(root, ctx);
989 this->m_async_update_superblock++;
990 this->m_async_op_tracker.start_op();
991 m_poolroot_to_update.emplace_back(entry);
992 }
993 if (need_finisher) {
994 enlist_op_update_root();
995 }
996 }
997
998 template <typename I>
999 void WriteLog<I>::enlist_op_update_root() {
1000 Context *append_ctx = new LambdaContext([this](int r) {
1001 update_root_scheduled_ops();
1002 });
1003 this->m_work_queue.queue(append_ctx);
1004 }
1005
1006 template <typename I>
1007 void WriteLog<I>::update_root_scheduled_ops() {
1008 ldout(m_image_ctx.cct, 20) << dendl;
1009
1010 std::shared_ptr<WriteLogPoolRoot> root;
1011 WriteLogPoolRootUpdateList root_updates;
1012 Context *ctx = nullptr;
1013 {
1014 std::lock_guard locker(m_lock);
1015 if (m_updating_pool_root) {
1016 /* Another thread is appending */
1017 ldout(m_image_ctx.cct, 15) << "Another thread is updating pool root"
1018 << dendl;
1019 return;
1020 }
1021 if (m_poolroot_to_update.size()) {
1022 m_updating_pool_root = true;
1023 root_updates.swap(m_poolroot_to_update);
1024 }
1025 }
1026 ceph_assert(!root_updates.empty());
1027 ldout(m_image_ctx.cct, 15) << "Update root number: " << root_updates.size()
1028 << dendl;
1029 // We just update the last one, and call all the completions.
1030 auto entry = root_updates.back();
1031 root = entry->root;
1032
1033 ctx = new LambdaContext([this, updates = std::move(root_updates)](int r) {
1034 ldout(m_image_ctx.cct, 15) << "Start to callback." << dendl;
1035 for (auto it = updates.begin(); it != updates.end(); it++) {
1036 Context *it_ctx = (*it)->ctx;
1037 it_ctx->complete(r);
1038 }
1039 });
1040 Context *append_ctx = new LambdaContext([this, ctx](int r) {
1041 ldout(m_image_ctx.cct, 15) << "Finish the update of pool root." << dendl;
1042 bool need_finisher = false;
1043 assert(r == 0);
1044 {
1045 std::lock_guard locker(m_lock);
1046 m_updating_pool_root = false;
1047 need_finisher = !m_poolroot_to_update.empty();
1048 }
1049 if (need_finisher) {
1050 enlist_op_update_root();
1051 }
1052 ctx->complete(r);
1053 });
1054 AioTransContext* aio = new AioTransContext(m_image_ctx.cct, append_ctx);
1055 update_pool_root(root, aio);
1056 }
1057
1058 template <typename I>
1059 void WriteLog<I>::update_pool_root(std::shared_ptr<WriteLogPoolRoot> root,
1060 AioTransContext *aio) {
1061 bufferlist bl;
1062 SuperBlock superblock;
1063 superblock.root = *root;
1064 encode(superblock, bl);
1065 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
1066 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
1067 bdev->aio_write(0, bl, &aio->ioc, false, WRITE_LIFE_NOT_SET);
1068 bdev->aio_submit(&aio->ioc);
1069 }
1070
1071 template <typename I>
1072 int WriteLog<I>::update_pool_root_sync(
1073 std::shared_ptr<WriteLogPoolRoot> root) {
1074 bufferlist bl;
1075 SuperBlock superblock;
1076 superblock.root = *root;
1077 encode(superblock, bl);
1078 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
1079 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
1080 return bdev->write(0, bl, false);
1081 }
1082
1083 template <typename I>
1084 void WriteLog<I>::aio_read_data_block(std::shared_ptr<GenericWriteLogEntry> log_entry,
1085 bufferlist *bl, Context *ctx) {
1086 std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries = {std::move(log_entry)};
1087 std::vector<bufferlist *> bls {bl};
1088 aio_read_data_blocks(log_entries, bls, ctx);
1089 }
1090
1091 template <typename I>
1092 void WriteLog<I>::aio_read_data_blocks(
1093 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries,
1094 std::vector<bufferlist *> &bls, Context *ctx) {
1095 ceph_assert(log_entries.size() == bls.size());
1096
1097 //get the valid part
1098 Context *read_ctx = new LambdaContext(
1099 [log_entries, bls, ctx](int r) {
1100 for (unsigned int i = 0; i < log_entries.size(); i++) {
1101 bufferlist valid_data_bl;
1102 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entries[i]);
1103 auto length = write_entry->ram_entry.is_write() ? write_entry->ram_entry.write_bytes
1104 : write_entry->ram_entry.ws_datalen;
1105
1106 valid_data_bl.substr_of(*bls[i], 0, length);
1107 bls[i]->clear();
1108 bls[i]->append(valid_data_bl);
1109 write_entry->dec_bl_refs();
1110 }
1111 ctx->complete(r);
1112 });
1113
1114 CephContext *cct = m_image_ctx.cct;
1115 AioTransContext *aio = new AioTransContext(cct, read_ctx);
1116 for (unsigned int i = 0; i < log_entries.size(); i++) {
1117 WriteLogCacheEntry *log_entry = &log_entries[i]->ram_entry;
1118
1119 ceph_assert(log_entry->is_write() || log_entry->is_writesame());
1120 uint64_t len = log_entry->is_write() ? log_entry->write_bytes :
1121 log_entry->ws_datalen;
1122 uint64_t align_len = round_up_to(len, MIN_WRITE_ALLOC_SSD_SIZE);
1123
1124 ldout(cct, 20) << "entry i=" << i << " " << log_entry->write_data_pos
1125 << "~" << len << dendl;
1126 ceph_assert(log_entry->write_data_pos >= DATA_RING_BUFFER_OFFSET &&
1127 log_entry->write_data_pos < pool_root.pool_size);
1128 ceph_assert(align_len);
1129 if (log_entry->write_data_pos + align_len > pool_root.pool_size) {
1130 // spans boundary, need to split
1131 uint64_t len1 = pool_root.pool_size - log_entry->write_data_pos;
1132 uint64_t len2 = align_len - len1;
1133
1134 ldout(cct, 20) << "read " << log_entry->write_data_pos << "~"
1135 << align_len << " spans boundary, split into "
1136 << log_entry->write_data_pos << "~" << len1
1137 << " and " << DATA_RING_BUFFER_OFFSET << "~"
1138 << len2 << dendl;
1139 bdev->aio_read(log_entry->write_data_pos, len1, bls[i], &aio->ioc);
1140 bdev->aio_read(DATA_RING_BUFFER_OFFSET, len2, bls[i], &aio->ioc);
1141 } else {
1142 ldout(cct, 20) << "read " << log_entry->write_data_pos << "~"
1143 << align_len << dendl;
1144 bdev->aio_read(log_entry->write_data_pos, align_len, bls[i], &aio->ioc);
1145 }
1146 }
1147 bdev->aio_submit(&aio->ioc);
1148 }
1149
1150 template <typename I>
1151 void WriteLog<I>::complete_user_request(Context *&user_req, int r) {
1152 m_image_ctx.op_work_queue->queue(user_req, r);
1153 }
1154
1155 } // namespace ssd
1156 } // namespace pwl
1157 } // namespace cache
1158 } // namespace librbd
1159
1160 template class librbd::cache::pwl::ssd::WriteLog<librbd::ImageCtx>;