]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/pwl/ssd/WriteLog.cc
2c0dc258b86fb5dc03adf368b42e34dac934a41e
[ceph.git] / ceph / src / librbd / cache / pwl / ssd / WriteLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "WriteLog.h"
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/WorkQueue.h"
13 #include "common/Timer.h"
14 #include "common/perf_counters.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/asio/ContextWQ.h"
17 #include "librbd/cache/pwl/ImageCacheState.h"
18 #include "librbd/cache/pwl/LogEntry.h"
19 #include <map>
20 #include <vector>
21
22 #undef dout_subsys
23 #define dout_subsys ceph_subsys_rbd_pwl
24 #undef dout_prefix
25 #define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \
26 << this << " " << __func__ << ": "
27
28 namespace librbd {
29 namespace cache {
30 namespace pwl {
31 namespace ssd {
32
33 using namespace std;
34 using namespace librbd::cache::pwl;
35
36 static bool is_valid_pool_root(const WriteLogPoolRoot& root) {
37 return root.pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
38 root.first_valid_entry >= DATA_RING_BUFFER_OFFSET &&
39 root.first_valid_entry < root.pool_size &&
40 root.first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
41 root.first_free_entry >= DATA_RING_BUFFER_OFFSET &&
42 root.first_free_entry < root.pool_size &&
43 root.first_free_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0;
44 }
45
46 template <typename I>
47 Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() {
48 m_builderobj = new Builder<This>();
49 return m_builderobj;
50 }
51
52 template <typename I>
53 WriteLog<I>::WriteLog(
54 I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
55 cache::ImageWritebackInterface& image_writeback,
56 plugin::Api<I>& plugin_api)
57 : AbstractWriteLog<I>(image_ctx, cache_state, create_builder(),
58 image_writeback, plugin_api)
59 {
60 }
61
62 template <typename I>
63 WriteLog<I>::~WriteLog() {
64 delete m_builderobj;
65 }
66
67 template <typename I>
68 void WriteLog<I>::collect_read_extents(
69 uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
70 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
71 std::vector<bufferlist*> &bls_to_read,
72 uint64_t entry_hit_length, Extent hit_extent,
73 pwl::C_ReadRequest *read_ctx) {
74 // Make a bl for this hit extent. This will add references to the
75 // write_entry->cache_bl */
76 ldout(m_image_ctx.cct, 5) << dendl;
77 auto write_entry = std::static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
78 buffer::list hit_bl;
79 write_entry->copy_cache_bl(&hit_bl);
80 bool writesame = write_entry->is_writesame_entry();
81 auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
82 hit_extent, hit_bl, true, read_buffer_offset, writesame);
83 read_ctx->read_extents.push_back(hit_extent_buf);
84
85 if (!hit_bl.length()) {
86 ldout(m_image_ctx.cct, 5) << "didn't hit RAM" << dendl;
87 auto read_extent = read_ctx->read_extents.back();
88 write_entry->inc_bl_refs();
89 log_entries_to_read.push_back(std::move(write_entry));
90 bls_to_read.push_back(&read_extent->m_bl);
91 }
92 }
93
94 template <typename I>
95 void WriteLog<I>::complete_read(
96 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
97 std::vector<bufferlist*> &bls_to_read,
98 Context *ctx) {
99 if (!log_entries_to_read.empty()) {
100 aio_read_data_blocks(log_entries_to_read, bls_to_read, ctx);
101 } else {
102 ctx->complete(0);
103 }
104 }
105
106 template <typename I>
107 int WriteLog<I>::create_and_open_bdev() {
108 CephContext *cct = m_image_ctx.cct;
109
110 bdev = BlockDevice::create(cct, this->m_log_pool_name, aio_cache_cb,
111 nullptr, nullptr, nullptr);
112 int r = bdev->open(this->m_log_pool_name);
113 if (r < 0) {
114 lderr(cct) << "failed to open bdev" << dendl;
115 delete bdev;
116 return r;
117 }
118
119 ceph_assert(this->m_log_pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0);
120 if (bdev->get_size() != this->m_log_pool_size) {
121 lderr(cct) << "size mismatch: bdev size " << bdev->get_size()
122 << " (block size " << bdev->get_block_size()
123 << ") != pool size " << this->m_log_pool_size << dendl;
124 bdev->close();
125 delete bdev;
126 return -EINVAL;
127 }
128
129 return 0;
130 }
131
132 template <typename I>
133 bool WriteLog<I>::initialize_pool(Context *on_finish,
134 pwl::DeferredContexts &later) {
135 int r;
136 CephContext *cct = m_image_ctx.cct;
137
138 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
139 if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
140 int fd = ::open(this->m_log_pool_name.c_str(), O_RDWR|O_CREAT, 0644);
141 bool succeed = true;
142 if (fd >= 0) {
143 if (truncate(this->m_log_pool_name.c_str(),
144 this->m_log_pool_size) != 0) {
145 succeed = false;
146 }
147 ::close(fd);
148 } else {
149 succeed = false;
150 }
151 if (!succeed) {
152 m_cache_state->present = false;
153 m_cache_state->clean = true;
154 m_cache_state->empty = true;
155 /* TODO: filter/replace errnos that are meaningless to the caller */
156 on_finish->complete(-errno);
157 return false;
158 }
159
160 r = create_and_open_bdev();
161 if (r < 0) {
162 on_finish->complete(r);
163 return false;
164 }
165 m_cache_state->present = true;
166 m_cache_state->clean = true;
167 m_cache_state->empty = true;
168 /* new pool, calculate and store metadata */
169
170 /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free.
171 * In this way, when all ring buffer spaces are allocated,
172 * m_first_free_entry and m_first_valid_entry will not be equal.
173 * Equal only means the cache is empty. */
174 this->m_bytes_allocated_cap = this->m_log_pool_size -
175 DATA_RING_BUFFER_OFFSET - MIN_WRITE_ALLOC_SSD_SIZE;
176 /* Log ring empty */
177 m_first_free_entry = DATA_RING_BUFFER_OFFSET;
178 m_first_valid_entry = DATA_RING_BUFFER_OFFSET;
179
180 auto new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
181 new_root->layout_version = SSD_LAYOUT_VERSION;
182 new_root->pool_size = this->m_log_pool_size;
183 new_root->flushed_sync_gen = this->m_flushed_sync_gen;
184 new_root->block_size = MIN_WRITE_ALLOC_SSD_SIZE;
185 new_root->first_free_entry = m_first_free_entry;
186 new_root->first_valid_entry = m_first_valid_entry;
187 new_root->num_log_entries = 0;
188 pool_root = *new_root;
189
190 r = update_pool_root_sync(new_root);
191 if (r != 0) {
192 lderr(cct) << "failed to initialize pool ("
193 << this->m_log_pool_name << ")" << dendl;
194 bdev->close();
195 delete bdev;
196 on_finish->complete(r);
197 return false;
198 }
199 } else {
200 ceph_assert(m_cache_state->present);
201 r = create_and_open_bdev();
202 if (r < 0) {
203 on_finish->complete(r);
204 return false;
205 }
206
207 bufferlist bl;
208 SuperBlock superblock;
209 ::IOContext ioctx(cct, nullptr);
210 r = bdev->read(0, MIN_WRITE_ALLOC_SSD_SIZE, &bl, &ioctx, false);
211 if (r < 0) {
212 lderr(cct) << "read ssd cache superblock failed " << dendl;
213 goto err_close_bdev;
214 }
215 auto p = bl.cbegin();
216 decode(superblock, p);
217 pool_root = superblock.root;
218 ldout(cct, 1) << "Decoded root: pool_size=" << pool_root.pool_size
219 << " first_valid_entry=" << pool_root.first_valid_entry
220 << " first_free_entry=" << pool_root.first_free_entry
221 << " flushed_sync_gen=" << pool_root.flushed_sync_gen
222 << dendl;
223 ceph_assert(is_valid_pool_root(pool_root));
224 if (pool_root.layout_version != SSD_LAYOUT_VERSION) {
225 lderr(cct) << "pool layout version is "
226 << pool_root.layout_version
227 << " expected " << SSD_LAYOUT_VERSION
228 << dendl;
229 goto err_close_bdev;
230 }
231 if (pool_root.block_size != MIN_WRITE_ALLOC_SSD_SIZE) {
232 lderr(cct) << "pool block size is " << pool_root.block_size
233 << " expected " << MIN_WRITE_ALLOC_SSD_SIZE
234 << dendl;
235 goto err_close_bdev;
236 }
237
238 this->m_log_pool_size = pool_root.pool_size;
239 this->m_flushed_sync_gen = pool_root.flushed_sync_gen;
240 this->m_first_valid_entry = pool_root.first_valid_entry;
241 this->m_first_free_entry = pool_root.first_free_entry;
242 this->m_bytes_allocated_cap = this->m_log_pool_size -
243 DATA_RING_BUFFER_OFFSET -
244 MIN_WRITE_ALLOC_SSD_SIZE;
245
246 load_existing_entries(later);
247 m_cache_state->clean = this->m_dirty_log_entries.empty();
248 m_cache_state->empty = m_log_entries.empty();
249 }
250 return true;
251
252 err_close_bdev:
253 bdev->close();
254 delete bdev;
255 on_finish->complete(-EINVAL);
256 return false;
257 }
258
259 template <typename I>
260 void WriteLog<I>::remove_pool_file() {
261 ceph_assert(bdev);
262 bdev->close();
263 delete bdev;
264 bdev = nullptr;
265 ldout(m_image_ctx.cct, 5) << "block device is closed" << dendl;
266
267 if (m_cache_state->clean) {
268 ldout(m_image_ctx.cct, 5) << "Removing empty pool file: "
269 << this->m_log_pool_name << dendl;
270 if (remove(this->m_log_pool_name.c_str()) != 0) {
271 lderr(m_image_ctx.cct) << "failed to remove empty pool \""
272 << this->m_log_pool_name << "\": " << dendl;
273 } else {
274 m_cache_state->present = false;
275 }
276 } else {
277 ldout(m_image_ctx.cct, 5) << "Not removing pool file: "
278 << this->m_log_pool_name << dendl;
279 }
280 }
281
282 template <typename I>
283 void WriteLog<I>::load_existing_entries(pwl::DeferredContexts &later) {
284 CephContext *cct = m_image_ctx.cct;
285 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
286 std::map<uint64_t, bool> missing_sync_points;
287
288 // Iterate through the log_entries and append all the write_bytes
289 // of each entry to fetch the pos of next 4k of log_entries. Iterate
290 // through the log entries and append them to the in-memory vector
291 for (uint64_t next_log_pos = this->m_first_valid_entry;
292 next_log_pos != this->m_first_free_entry; ) {
293 // read the entries from SSD cache and decode
294 bufferlist bl_entries;
295 ::IOContext ioctx_entry(cct, nullptr);
296 bdev->read(next_log_pos, MIN_WRITE_ALLOC_SSD_SIZE, &bl_entries,
297 &ioctx_entry, false);
298 std::vector<WriteLogCacheEntry> ssd_log_entries;
299 auto pl = bl_entries.cbegin();
300 decode(ssd_log_entries, pl);
301 ldout(cct, 5) << "decoded ssd log entries" << dendl;
302 uint64_t curr_log_pos = next_log_pos;
303 std::shared_ptr<GenericLogEntry> log_entry = nullptr;
304
305 for (auto it = ssd_log_entries.begin(); it != ssd_log_entries.end(); ++it) {
306 this->update_entries(&log_entry, &*it, missing_sync_points,
307 sync_point_entries, curr_log_pos);
308 log_entry->ram_entry = *it;
309 log_entry->log_entry_index = curr_log_pos;
310 log_entry->completed = true;
311 m_log_entries.push_back(log_entry);
312 next_log_pos += round_up_to(it->write_bytes, MIN_WRITE_ALLOC_SSD_SIZE);
313 }
314 // along with the write_bytes, add control block size too
315 next_log_pos += MIN_WRITE_ALLOC_SSD_SIZE;
316 if (next_log_pos >= this->m_log_pool_size) {
317 next_log_pos = next_log_pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
318 }
319 }
320 this->update_sync_points(missing_sync_points, sync_point_entries, later);
321 if (m_first_valid_entry > m_first_free_entry) {
322 m_bytes_allocated = this->m_log_pool_size - m_first_valid_entry +
323 m_first_free_entry - DATA_RING_BUFFER_OFFSET;
324 } else {
325 m_bytes_allocated = m_first_free_entry - m_first_valid_entry;
326 }
327 }
328
329 // For SSD we don't calc m_bytes_allocated in this
330 template <typename I>
331 void WriteLog<I>::inc_allocated_cached_bytes(
332 std::shared_ptr<pwl::GenericLogEntry> log_entry) {
333 if (log_entry->is_write_entry()) {
334 this->m_bytes_cached += log_entry->write_bytes();
335 }
336 }
337
338 template <typename I>
339 bool WriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
340 bool alloc_succeeds = true;
341 uint64_t bytes_allocated = 0;
342 uint64_t bytes_cached = 0;
343 uint64_t bytes_dirtied = 0;
344 uint64_t num_lanes = 0;
345 uint64_t num_unpublished_reserves = 0;
346 uint64_t num_log_entries = 0;
347
348 // Setup buffer, and get all the number of required resources
349 req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated,
350 &num_lanes, &num_log_entries,
351 &num_unpublished_reserves);
352
353 ceph_assert(!num_lanes);
354 if (num_log_entries) {
355 bytes_allocated += num_log_entries * MIN_WRITE_ALLOC_SSD_SIZE;
356 num_log_entries = 0;
357 }
358 ceph_assert(!num_unpublished_reserves);
359
360 alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied,
361 bytes_allocated, num_lanes,
362 num_log_entries,
363 num_unpublished_reserves);
364 req->set_allocated(alloc_succeeds);
365 return alloc_succeeds;
366 }
367
368 template <typename I>
369 bool WriteLog<I>::has_sync_point_logs(GenericLogOperations &ops) {
370 for (auto &op : ops) {
371 if (op->get_log_entry()->is_sync_point()) {
372 return true;
373 break;
374 }
375 }
376 return false;
377 }
378
379 template<typename I>
380 void WriteLog<I>::enlist_op_appender() {
381 this->m_async_append_ops++;
382 this->m_async_op_tracker.start_op();
383 Context *append_ctx = new LambdaContext([this](int r) {
384 append_scheduled_ops();
385 });
386 this->m_work_queue.queue(append_ctx);
387 }
388
389 /*
390 * Takes custody of ops. They'll all get their log entries appended,
391 * and have their on_write_persist contexts completed once they and
392 * all prior log entries are persisted everywhere.
393 */
394 template<typename I>
395 void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req) {
396 bool need_finisher = false;
397 GenericLogOperationsVector appending;
398
399 std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
400 {
401 std::lock_guard locker(m_lock);
402
403 bool persist_on_flush = this->get_persist_on_flush();
404 need_finisher = !this->m_appending &&
405 ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) ||
406 !persist_on_flush);
407
408 // Only flush logs into SSD when there is internal/external flush request
409 if (!need_finisher) {
410 need_finisher = has_sync_point_logs(ops);
411 }
412 this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
413
414 // To preserve the order of overlapping IOs, release_cell() may be
415 // called only after the ops are added to m_ops_to_append.
416 // As soon as m_lock is released, the appended ops can be picked up
417 // by append_scheduled_ops() in another thread and req can be freed.
418 if (req != nullptr) {
419 if (persist_on_flush) {
420 req->complete_user_request(0);
421 }
422 req->release_cell();
423 }
424 }
425
426 if (need_finisher) {
427 this->enlist_op_appender();
428 }
429
430 for (auto &op : appending) {
431 op->appending();
432 }
433 }
434
435 template <typename I>
436 void WriteLog<I>::setup_schedule_append(pwl::GenericLogOperationsVector &ops,
437 bool do_early_flush,
438 C_BlockIORequestT *req) {
439 this->schedule_append(ops, req);
440 }
441
442 template <typename I>
443 void WriteLog<I>::append_scheduled_ops(void) {
444 GenericLogOperations ops;
445 ldout(m_image_ctx.cct, 20) << dendl;
446
447 bool ops_remain = false; // unused, no-op variable for SSD
448 bool appending = false; // unused, no-op variable for SSD
449 this->append_scheduled(ops, ops_remain, appending);
450
451 if (ops.size()) {
452 alloc_op_log_entries(ops);
453 append_op_log_entries(ops);
454 } else {
455 this->m_async_append_ops--;
456 this->m_async_op_tracker.finish_op();
457 }
458 }
459
460 /*
461 * Write and persist the (already allocated) write log entries and
462 * data buffer allocations for a set of ops. The data buffer for each
463 * of these must already have been persisted to its reserved area.
464 */
465 template <typename I>
466 void WriteLog<I>::append_op_log_entries(GenericLogOperations &ops) {
467 ceph_assert(!ops.empty());
468 ldout(m_image_ctx.cct, 20) << dendl;
469 Context *ctx = new LambdaContext([this, ops](int r) {
470 assert(r == 0);
471 ldout(m_image_ctx.cct, 20) << "Finished root update " << dendl;
472
473 auto captured_ops = std::move(ops);
474 this->complete_op_log_entries(std::move(captured_ops), r);
475
476 bool need_finisher = false;
477 {
478 std::lock_guard locker1(m_lock);
479 bool persist_on_flush = this->get_persist_on_flush();
480 need_finisher = ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) ||
481 !persist_on_flush);
482
483 if (!need_finisher) {
484 need_finisher = has_sync_point_logs(this->m_ops_to_append);
485 }
486 }
487
488 if (need_finisher) {
489 this->enlist_op_appender();
490 }
491 this->m_async_update_superblock--;
492 this->m_async_op_tracker.finish_op();
493 });
494 uint64_t *new_first_free_entry = new(uint64_t);
495 Context *append_ctx = new LambdaContext(
496 [this, new_first_free_entry, ops, ctx](int r) {
497 std::shared_ptr<WriteLogPoolRoot> new_root;
498 {
499 ldout(m_image_ctx.cct, 20) << "Finished appending at "
500 << *new_first_free_entry << dendl;
501 utime_t now = ceph_clock_now();
502 for (auto &operation : ops) {
503 operation->log_append_comp_time = now;
504 }
505
506 std::lock_guard locker(this->m_log_append_lock);
507 std::lock_guard locker1(m_lock);
508 assert(this->m_appending);
509 this->m_appending = false;
510 new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
511 pool_root.first_free_entry = *new_first_free_entry;
512 new_root->first_free_entry = *new_first_free_entry;
513 delete new_first_free_entry;
514 schedule_update_root(new_root, ctx);
515 }
516 this->m_async_append_ops--;
517 this->m_async_op_tracker.finish_op();
518 });
519 // Append logs and update first_free_update
520 append_ops(ops, append_ctx, new_first_free_entry);
521
522 if (ops.size()) {
523 this->dispatch_deferred_writes();
524 }
525 }
526
527 template <typename I>
528 void WriteLog<I>::release_ram(std::shared_ptr<GenericLogEntry> log_entry) {
529 log_entry->remove_cache_bl();
530 }
531
532 template <typename I>
533 void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops) {
534 std::lock_guard locker(m_lock);
535
536 for (auto &operation : ops) {
537 auto &log_entry = operation->get_log_entry();
538 log_entry->ram_entry.set_entry_valid(true);
539 m_log_entries.push_back(log_entry);
540 ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
541 }
542 if (m_cache_state->empty && !m_log_entries.empty()) {
543 m_cache_state->empty = false;
544 this->update_image_cache_state();
545 }
546 }
547
548 template <typename I>
549 void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
550 DeferredContexts &post_unlock,
551 bool has_write_entry) {
552 // snapshot so we behave consistently
553 bool invalidating = this->m_invalidating;
554
555 if (invalidating || !has_write_entry) {
556 for (auto &log_entry : entries_to_flush) {
557 GuardedRequestFunctionContext *guarded_ctx =
558 new GuardedRequestFunctionContext([this, log_entry, invalidating]
559 (GuardedRequestFunctionContext &guard_ctx) {
560 log_entry->m_cell = guard_ctx.cell;
561 Context *ctx = this->construct_flush_entry(log_entry, invalidating);
562
563 if (!invalidating) {
564 ctx = new LambdaContext([this, log_entry, ctx](int r) {
565 m_image_ctx.op_work_queue->queue(new LambdaContext(
566 [this, log_entry, ctx](int r) {
567 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
568 << " " << *log_entry << dendl;
569 log_entry->writeback(this->m_image_writeback, ctx);
570 }), 0);
571 });
572 }
573 ctx->complete(0);
574 });
575 this->detain_flush_guard_request(log_entry, guarded_ctx);
576 }
577 } else {
578 int count = entries_to_flush.size();
579 std::vector<std::shared_ptr<GenericWriteLogEntry>> write_entries;
580 std::vector<bufferlist *> read_bls;
581
582 write_entries.reserve(count);
583 read_bls.reserve(count);
584
585 for (auto &log_entry : entries_to_flush) {
586 if (log_entry->is_write_entry()) {
587 bufferlist *bl = new bufferlist;
588 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
589 write_entry->inc_bl_refs();
590 write_entries.push_back(write_entry);
591 read_bls.push_back(bl);
592 }
593 }
594
595 Context *ctx = new LambdaContext(
596 [this, entries_to_flush, read_bls](int r) {
597 int i = 0;
598 GuardedRequestFunctionContext *guarded_ctx = nullptr;
599
600 for (auto &log_entry : entries_to_flush) {
601 if (log_entry->is_write_entry()) {
602 bufferlist captured_entry_bl;
603 captured_entry_bl.claim_append(*read_bls[i]);
604 delete read_bls[i++];
605
606 guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl]
607 (GuardedRequestFunctionContext &guard_ctx) {
608 log_entry->m_cell = guard_ctx.cell;
609 Context *ctx = this->construct_flush_entry(log_entry, false);
610
611 m_image_ctx.op_work_queue->queue(new LambdaContext(
612 [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
613 auto captured_entry_bl = std::move(entry_bl);
614 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
615 << " " << *log_entry << dendl;
616 log_entry->writeback_bl(this->m_image_writeback, ctx,
617 std::move(captured_entry_bl));
618 }), 0);
619 });
620 } else {
621 guarded_ctx = new GuardedRequestFunctionContext([this, log_entry]
622 (GuardedRequestFunctionContext &guard_ctx) {
623 log_entry->m_cell = guard_ctx.cell;
624 Context *ctx = this->construct_flush_entry(log_entry, false);
625 m_image_ctx.op_work_queue->queue(new LambdaContext(
626 [this, log_entry, ctx](int r) {
627 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
628 << " " << *log_entry << dendl;
629 log_entry->writeback(this->m_image_writeback, ctx);
630 }), 0);
631 });
632 }
633 this->detain_flush_guard_request(log_entry, guarded_ctx);
634 }
635 });
636
637 aio_read_data_blocks(write_entries, read_bls, ctx);
638 }
639 }
640
641 template <typename I>
642 void WriteLog<I>::process_work() {
643 CephContext *cct = m_image_ctx.cct;
644 int max_iterations = 4;
645 bool wake_up_requested = false;
646 uint64_t aggressive_high_water_bytes =
647 this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
648 uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
649
650 ldout(cct, 20) << dendl;
651
652 do {
653 {
654 std::lock_guard locker(m_lock);
655 this->m_wake_up_requested = false;
656 }
657 if (this->m_alloc_failed_since_retire || (this->m_shutting_down) ||
658 this->m_invalidating || m_bytes_allocated > high_water_bytes) {
659 ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
660 << ", allocated > high_water="
661 << (m_bytes_allocated > high_water_bytes)
662 << dendl;
663 retire_entries((this->m_shutting_down || this->m_invalidating ||
664 m_bytes_allocated > aggressive_high_water_bytes)
665 ? MAX_ALLOC_PER_TRANSACTION : MAX_FREE_PER_TRANSACTION);
666 }
667 this->dispatch_deferred_writes();
668 this->process_writeback_dirty_entries();
669 {
670 std::lock_guard locker(m_lock);
671 wake_up_requested = this->m_wake_up_requested;
672 }
673 } while (wake_up_requested && --max_iterations > 0);
674
675 {
676 std::lock_guard locker(m_lock);
677 this->m_wake_up_scheduled = false;
678 // Reschedule if it's still requested
679 if (this->m_wake_up_requested) {
680 this->wake_up();
681 }
682 }
683 }
684
685 /**
686 * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
687 * that are eligible to be retired. Returns true if anything was
688 * retired.
689 *
690 */
691 template <typename I>
692 bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
693 CephContext *cct = m_image_ctx.cct;
694 GenericLogEntriesVector retiring_entries;
695 uint64_t initial_first_valid_entry;
696 uint64_t first_valid_entry;
697
698 std::lock_guard retire_locker(this->m_log_retire_lock);
699 ldout(cct, 20) << "Look for entries to retire" << dendl;
700 {
701 // Entry readers can't be added while we hold m_entry_reader_lock
702 RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
703 std::lock_guard locker(m_lock);
704 initial_first_valid_entry = m_first_valid_entry;
705 first_valid_entry = m_first_valid_entry;
706 while (retiring_entries.size() < frees_per_tx && !m_log_entries.empty()) {
707 GenericLogEntriesVector retiring_subentries;
708 uint64_t control_block_pos = m_log_entries.front()->log_entry_index;
709 uint64_t data_length = 0;
710 for (auto it = m_log_entries.begin(); it != m_log_entries.end(); ++it) {
711 if (this->can_retire_entry(*it)) {
712 // log_entry_index is valid after appending to SSD
713 if ((*it)->log_entry_index != control_block_pos) {
714 ldout(cct, 20) << "Old log_entry_index is " << control_block_pos
715 << ",New log_entry_index is "
716 << (*it)->log_entry_index
717 << ",data length is " << data_length << dendl;
718 ldout(cct, 20) << "The log entry is " << *(*it) << dendl;
719 if ((*it)->log_entry_index < control_block_pos) {
720 ceph_assert((*it)->log_entry_index ==
721 (control_block_pos + data_length + MIN_WRITE_ALLOC_SSD_SIZE) %
722 this->m_log_pool_size + DATA_RING_BUFFER_OFFSET);
723 } else {
724 ceph_assert((*it)->log_entry_index == control_block_pos +
725 data_length + MIN_WRITE_ALLOC_SSD_SIZE);
726 }
727 break;
728 } else {
729 retiring_subentries.push_back(*it);
730 if ((*it)->is_write_entry()) {
731 data_length += (*it)->get_aligned_data_size();
732 }
733 }
734 } else {
735 retiring_subentries.clear();
736 break;
737 }
738 }
739 // SSD: retiring_subentries in a span
740 if (!retiring_subentries.empty()) {
741 for (auto it = retiring_subentries.begin();
742 it != retiring_subentries.end(); it++) {
743 ceph_assert(m_log_entries.front() == *it);
744 m_log_entries.pop_front();
745 if ((*it)->write_bytes() > 0 || (*it)->bytes_dirty() > 0) {
746 auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(*it);
747 if (gen_write_entry) {
748 this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
749 }
750 }
751 }
752
753 ldout(cct, 20) << "span with " << retiring_subentries.size()
754 << " entries: control_block_pos=" << control_block_pos
755 << " data_length=" << data_length
756 << dendl;
757 retiring_entries.insert(
758 retiring_entries.end(), retiring_subentries.begin(),
759 retiring_subentries.end());
760
761 first_valid_entry = control_block_pos + data_length +
762 MIN_WRITE_ALLOC_SSD_SIZE;
763 if (first_valid_entry >= this->m_log_pool_size) {
764 first_valid_entry = first_valid_entry % this->m_log_pool_size +
765 DATA_RING_BUFFER_OFFSET;
766 }
767 } else {
768 break;
769 }
770 }
771 }
772 if (retiring_entries.size()) {
773 ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries"
774 << dendl;
775
776 // Advance first valid entry and release buffers
777 uint64_t flushed_sync_gen;
778 std::lock_guard append_locker(this->m_log_append_lock);
779 {
780 std::lock_guard locker(m_lock);
781 flushed_sync_gen = this->m_flushed_sync_gen;
782 }
783
784 ceph_assert(first_valid_entry != initial_first_valid_entry);
785 auto new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
786 new_root->flushed_sync_gen = flushed_sync_gen;
787 new_root->first_valid_entry = first_valid_entry;
788 pool_root.flushed_sync_gen = flushed_sync_gen;
789 pool_root.first_valid_entry = first_valid_entry;
790
791 Context *ctx = new LambdaContext(
792 [this, first_valid_entry, initial_first_valid_entry,
793 retiring_entries](int r) {
794 uint64_t allocated_bytes = 0;
795 uint64_t cached_bytes = 0;
796 uint64_t former_log_pos = 0;
797 for (auto &entry : retiring_entries) {
798 ceph_assert(entry->log_entry_index != 0);
799 if (entry->log_entry_index != former_log_pos ) {
800 // Space for control blocks
801 allocated_bytes += MIN_WRITE_ALLOC_SSD_SIZE;
802 former_log_pos = entry->log_entry_index;
803 }
804 if (entry->is_write_entry()) {
805 cached_bytes += entry->write_bytes();
806 // space for userdata
807 allocated_bytes += entry->get_aligned_data_size();
808 }
809 }
810 {
811 std::lock_guard locker(m_lock);
812 m_first_valid_entry = first_valid_entry;
813 ceph_assert(m_first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0);
814 ceph_assert(this->m_bytes_allocated >= allocated_bytes);
815 this->m_bytes_allocated -= allocated_bytes;
816 ceph_assert(this->m_bytes_cached >= cached_bytes);
817 this->m_bytes_cached -= cached_bytes;
818 if (!m_cache_state->empty && m_log_entries.empty()) {
819 m_cache_state->empty = true;
820 this->update_image_cache_state();
821 }
822
823 ldout(m_image_ctx.cct, 20)
824 << "Finished root update: initial_first_valid_entry="
825 << initial_first_valid_entry << ", m_first_valid_entry="
826 << m_first_valid_entry << ", release space = "
827 << allocated_bytes << ", m_bytes_allocated="
828 << m_bytes_allocated << ", release cached space="
829 << cached_bytes << ", m_bytes_cached="
830 << this->m_bytes_cached << dendl;
831
832 this->m_alloc_failed_since_retire = false;
833 this->wake_up();
834 }
835
836 this->dispatch_deferred_writes();
837 this->process_writeback_dirty_entries();
838 m_async_update_superblock--;
839 this->m_async_op_tracker.finish_op();
840 });
841
842 std::lock_guard locker(m_lock);
843 schedule_update_root(new_root, ctx);
844 } else {
845 ldout(cct, 20) << "Nothing to retire" << dendl;
846 return false;
847 }
848 return true;
849 }
850
851 template <typename I>
852 void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
853 uint64_t* new_first_free_entry) {
854 GenericLogEntriesVector log_entries;
855 CephContext *cct = m_image_ctx.cct;
856 uint64_t span_payload_len = 0;
857 uint64_t bytes_to_free = 0;
858 ldout(cct, 20) << "Appending " << ops.size() << " log entries." << dendl;
859
860 *new_first_free_entry = pool_root.first_free_entry;
861 AioTransContext* aio = new AioTransContext(cct, ctx);
862
863 utime_t now = ceph_clock_now();
864 for (auto &operation : ops) {
865 operation->log_append_start_time = now;
866 auto log_entry = operation->get_log_entry();
867
868 if (log_entries.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES ||
869 span_payload_len >= SPAN_MAX_DATA_LEN) {
870 if (log_entries.size() > 1) {
871 bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
872 }
873 write_log_entries(log_entries, aio, new_first_free_entry);
874 log_entries.clear();
875 span_payload_len = 0;
876 }
877 log_entries.push_back(log_entry);
878 span_payload_len += log_entry->write_bytes();
879 }
880 if (!span_payload_len || !log_entries.empty()) {
881 if (log_entries.size() > 1) {
882 bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
883 }
884 write_log_entries(log_entries, aio, new_first_free_entry);
885 }
886
887 {
888 std::lock_guard locker1(m_lock);
889 m_first_free_entry = *new_first_free_entry;
890 m_bytes_allocated -= bytes_to_free;
891 }
892
893 bdev->aio_submit(&aio->ioc);
894 }
895
896 template <typename I>
897 void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
898 AioTransContext *aio, uint64_t *pos) {
899 CephContext *cct = m_image_ctx.cct;
900 ldout(m_image_ctx.cct, 20) << "pos=" << *pos << dendl;
901 ceph_assert(*pos >= DATA_RING_BUFFER_OFFSET &&
902 *pos < this->m_log_pool_size &&
903 *pos % MIN_WRITE_ALLOC_SSD_SIZE == 0);
904
905 // The first block is for log entries
906 uint64_t control_block_pos = *pos;
907 *pos += MIN_WRITE_ALLOC_SSD_SIZE;
908 if (*pos == this->m_log_pool_size) {
909 *pos = DATA_RING_BUFFER_OFFSET;
910 }
911
912 std::vector<WriteLogCacheEntry> persist_log_entries;
913 bufferlist data_bl;
914 for (auto &log_entry : log_entries) {
915 log_entry->log_entry_index = control_block_pos;
916 // Append data buffer for write operations
917 if (log_entry->is_write_entry()) {
918 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
919 auto cache_bl = write_entry->get_cache_bl();
920 auto align_size = write_entry->get_aligned_data_size();
921 data_bl.append(cache_bl);
922 data_bl.append_zero(align_size - cache_bl.length());
923
924 write_entry->ram_entry.write_data_pos = *pos;
925 *pos += align_size;
926 if (*pos >= this->m_log_pool_size) {
927 *pos = *pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
928 }
929 }
930 // push_back _after_ setting write_data_pos
931 persist_log_entries.push_back(log_entry->ram_entry);
932 }
933
934 //aio write
935 bufferlist bl;
936 encode(persist_log_entries, bl);
937 ceph_assert(bl.length() <= MIN_WRITE_ALLOC_SSD_SIZE);
938 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
939 bl.append(data_bl);
940 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
941 if (control_block_pos + bl.length() > this->m_log_pool_size) {
942 //exceeds border, need to split
943 uint64_t size = bl.length();
944 bufferlist bl1;
945 bl.splice(0, this->m_log_pool_size - control_block_pos, &bl1);
946 ceph_assert(bl.length() == (size - bl1.length()));
947
948 ldout(cct, 20) << "write " << control_block_pos << "~"
949 << size << " spans boundary, split into "
950 << control_block_pos << "~" << bl1.length()
951 << " and " << DATA_RING_BUFFER_OFFSET << "~"
952 << bl.length() << dendl;
953 bdev->aio_write(control_block_pos, bl1, &aio->ioc, false,
954 WRITE_LIFE_NOT_SET);
955 bdev->aio_write(DATA_RING_BUFFER_OFFSET, bl, &aio->ioc, false,
956 WRITE_LIFE_NOT_SET);
957 } else {
958 ldout(cct, 20) << "write " << control_block_pos << "~"
959 << bl.length() << dendl;
960 bdev->aio_write(control_block_pos, bl, &aio->ioc, false,
961 WRITE_LIFE_NOT_SET);
962 }
963 }
964
965 template <typename I>
966 void WriteLog<I>::schedule_update_root(
967 std::shared_ptr<WriteLogPoolRoot> root, Context *ctx) {
968 CephContext *cct = m_image_ctx.cct;
969 ldout(cct, 15) << "New root: pool_size=" << root->pool_size
970 << " first_valid_entry=" << root->first_valid_entry
971 << " first_free_entry=" << root->first_free_entry
972 << " flushed_sync_gen=" << root->flushed_sync_gen
973 << dendl;
974 ceph_assert(is_valid_pool_root(*root));
975
976 bool need_finisher;
977 {
978 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
979 need_finisher = m_poolroot_to_update.empty() && !m_updating_pool_root;
980 std::shared_ptr<WriteLogPoolRootUpdate> entry =
981 std::make_shared<WriteLogPoolRootUpdate>(root, ctx);
982 this->m_async_update_superblock++;
983 this->m_async_op_tracker.start_op();
984 m_poolroot_to_update.emplace_back(entry);
985 }
986 if (need_finisher) {
987 enlist_op_update_root();
988 }
989 }
990
991 template <typename I>
992 void WriteLog<I>::enlist_op_update_root() {
993 Context *append_ctx = new LambdaContext([this](int r) {
994 update_root_scheduled_ops();
995 });
996 this->m_work_queue.queue(append_ctx);
997 }
998
999 template <typename I>
1000 void WriteLog<I>::update_root_scheduled_ops() {
1001 ldout(m_image_ctx.cct, 20) << dendl;
1002
1003 std::shared_ptr<WriteLogPoolRoot> root;
1004 WriteLogPoolRootUpdateList root_updates;
1005 Context *ctx = nullptr;
1006 {
1007 std::lock_guard locker(m_lock);
1008 if (m_updating_pool_root) {
1009 /* Another thread is appending */
1010 ldout(m_image_ctx.cct, 15) << "Another thread is updating pool root"
1011 << dendl;
1012 return;
1013 }
1014 if (m_poolroot_to_update.size()) {
1015 m_updating_pool_root = true;
1016 root_updates.swap(m_poolroot_to_update);
1017 }
1018 }
1019 ceph_assert(!root_updates.empty());
1020 ldout(m_image_ctx.cct, 15) << "Update root number: " << root_updates.size()
1021 << dendl;
1022 // We just update the last one, and call all the completions.
1023 auto entry = root_updates.back();
1024 root = entry->root;
1025
1026 ctx = new LambdaContext([this, updates = std::move(root_updates)](int r) {
1027 ldout(m_image_ctx.cct, 15) << "Start to callback." << dendl;
1028 for (auto it = updates.begin(); it != updates.end(); it++) {
1029 Context *it_ctx = (*it)->ctx;
1030 it_ctx->complete(r);
1031 }
1032 });
1033 Context *append_ctx = new LambdaContext([this, ctx](int r) {
1034 ldout(m_image_ctx.cct, 15) << "Finish the update of pool root." << dendl;
1035 bool need_finisher = false;
1036 assert(r == 0);
1037 {
1038 std::lock_guard locker(m_lock);
1039 m_updating_pool_root = false;
1040 need_finisher = !m_poolroot_to_update.empty();
1041 }
1042 if (need_finisher) {
1043 enlist_op_update_root();
1044 }
1045 ctx->complete(r);
1046 });
1047 AioTransContext* aio = new AioTransContext(m_image_ctx.cct, append_ctx);
1048 update_pool_root(root, aio);
1049 }
1050
1051 template <typename I>
1052 void WriteLog<I>::update_pool_root(std::shared_ptr<WriteLogPoolRoot> root,
1053 AioTransContext *aio) {
1054 bufferlist bl;
1055 SuperBlock superblock;
1056 superblock.root = *root;
1057 encode(superblock, bl);
1058 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
1059 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
1060 bdev->aio_write(0, bl, &aio->ioc, false, WRITE_LIFE_NOT_SET);
1061 bdev->aio_submit(&aio->ioc);
1062 }
1063
1064 template <typename I>
1065 int WriteLog<I>::update_pool_root_sync(
1066 std::shared_ptr<WriteLogPoolRoot> root) {
1067 bufferlist bl;
1068 SuperBlock superblock;
1069 superblock.root = *root;
1070 encode(superblock, bl);
1071 bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
1072 ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
1073 return bdev->write(0, bl, false);
1074 }
1075
1076 template <typename I>
1077 void WriteLog<I>::aio_read_data_block(std::shared_ptr<GenericWriteLogEntry> log_entry,
1078 bufferlist *bl, Context *ctx) {
1079 std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries = {std::move(log_entry)};
1080 std::vector<bufferlist *> bls {bl};
1081 aio_read_data_blocks(log_entries, bls, ctx);
1082 }
1083
1084 template <typename I>
1085 void WriteLog<I>::aio_read_data_blocks(
1086 std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries,
1087 std::vector<bufferlist *> &bls, Context *ctx) {
1088 ceph_assert(log_entries.size() == bls.size());
1089
1090 //get the valid part
1091 Context *read_ctx = new LambdaContext(
1092 [log_entries, bls, ctx](int r) {
1093 for (unsigned int i = 0; i < log_entries.size(); i++) {
1094 bufferlist valid_data_bl;
1095 auto write_entry = static_pointer_cast<WriteLogEntry>(log_entries[i]);
1096 auto length = write_entry->ram_entry.is_write() ? write_entry->ram_entry.write_bytes
1097 : write_entry->ram_entry.ws_datalen;
1098
1099 valid_data_bl.substr_of(*bls[i], 0, length);
1100 bls[i]->clear();
1101 bls[i]->append(valid_data_bl);
1102 write_entry->dec_bl_refs();
1103 }
1104 ctx->complete(r);
1105 });
1106
1107 CephContext *cct = m_image_ctx.cct;
1108 AioTransContext *aio = new AioTransContext(cct, read_ctx);
1109 for (unsigned int i = 0; i < log_entries.size(); i++) {
1110 WriteLogCacheEntry *log_entry = &log_entries[i]->ram_entry;
1111
1112 ceph_assert(log_entry->is_write() || log_entry->is_writesame());
1113 uint64_t len = log_entry->is_write() ? log_entry->write_bytes :
1114 log_entry->ws_datalen;
1115 uint64_t align_len = round_up_to(len, MIN_WRITE_ALLOC_SSD_SIZE);
1116
1117 ldout(cct, 20) << "entry i=" << i << " " << log_entry->write_data_pos
1118 << "~" << len << dendl;
1119 ceph_assert(log_entry->write_data_pos >= DATA_RING_BUFFER_OFFSET &&
1120 log_entry->write_data_pos < pool_root.pool_size);
1121 ceph_assert(align_len);
1122 if (log_entry->write_data_pos + align_len > pool_root.pool_size) {
1123 // spans boundary, need to split
1124 uint64_t len1 = pool_root.pool_size - log_entry->write_data_pos;
1125 uint64_t len2 = align_len - len1;
1126
1127 ldout(cct, 20) << "read " << log_entry->write_data_pos << "~"
1128 << align_len << " spans boundary, split into "
1129 << log_entry->write_data_pos << "~" << len1
1130 << " and " << DATA_RING_BUFFER_OFFSET << "~"
1131 << len2 << dendl;
1132 bdev->aio_read(log_entry->write_data_pos, len1, bls[i], &aio->ioc);
1133 bdev->aio_read(DATA_RING_BUFFER_OFFSET, len2, bls[i], &aio->ioc);
1134 } else {
1135 ldout(cct, 20) << "read " << log_entry->write_data_pos << "~"
1136 << align_len << dendl;
1137 bdev->aio_read(log_entry->write_data_pos, align_len, bls[i], &aio->ioc);
1138 }
1139 }
1140 bdev->aio_submit(&aio->ioc);
1141 }
1142
1143 template <typename I>
1144 void WriteLog<I>::complete_user_request(Context *&user_req, int r) {
1145 m_image_ctx.op_work_queue->queue(user_req, r);
1146 }
1147
1148 } // namespace ssd
1149 } // namespace pwl
1150 } // namespace cache
1151 } // namespace librbd
1152
1153 template class librbd::cache::pwl::ssd::WriteLog<librbd::ImageCtx>;