namespace pwl {
namespace ssd {
+using namespace std;
using namespace librbd::cache::pwl;
static bool is_valid_pool_root(const WriteLogPoolRoot& root) {
// Make a bl for this hit extent. This will add references to the
// write_entry->cache_bl */
ldout(m_image_ctx.cct, 5) << dendl;
- auto write_entry = static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
+ auto write_entry = std::static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
buffer::list hit_bl;
write_entry->copy_cache_bl(&hit_bl);
bool writesame = write_entry->is_writesame_entry();
::IOContext ioctx(cct, nullptr);
r = bdev->read(0, MIN_WRITE_ALLOC_SSD_SIZE, &bl, &ioctx, false);
if (r < 0) {
- lderr(cct) << "Read ssd cache superblock failed " << dendl;
- goto error_handle;
+ lderr(cct) << "read ssd cache superblock failed " << dendl;
+ goto err_close_bdev;
}
auto p = bl.cbegin();
decode(superblock, p);
<< dendl;
ceph_assert(is_valid_pool_root(pool_root));
if (pool_root.layout_version != SSD_LAYOUT_VERSION) {
- lderr(cct) << "Pool layout version is "
+ lderr(cct) << "pool layout version is "
<< pool_root.layout_version
<< " expected " << SSD_LAYOUT_VERSION
<< dendl;
- goto error_handle;
+ goto err_close_bdev;
}
if (pool_root.block_size != MIN_WRITE_ALLOC_SSD_SIZE) {
- lderr(cct) << "Pool block size is " << pool_root.block_size
+ lderr(cct) << "pool block size is " << pool_root.block_size
<< " expected " << MIN_WRITE_ALLOC_SSD_SIZE
<< dendl;
- goto error_handle;
+ goto err_close_bdev;
}
this->m_log_pool_size = pool_root.pool_size;
}
return true;
-error_handle:
+err_close_bdev:
bdev->close();
delete bdev;
on_finish->complete(-EINVAL);
this->m_async_append_ops++;
this->m_async_op_tracker.start_op();
Context *append_ctx = new LambdaContext([this](int r) {
- append_scheduled_ops();
- });
+ append_scheduled_ops();
+ });
this->m_work_queue.queue(append_ctx);
}
+
/*
* Takes custody of ops. They'll all get their log entries appended,
* and have their on_write_persist contexts completed once they and
GenericLogOperations ops;
ldout(m_image_ctx.cct, 20) << dendl;
- bool ops_remain = false; //no-op variable for SSD
- bool appending = false; //no-op variable for SSD
+ bool ops_remain = false; // unused, no-op variable for SSD
+ bool appending = false; // unused, no-op variable for SSD
this->append_scheduled(ops, ops_remain, appending);
if (ops.size()) {
});
uint64_t *new_first_free_entry = new(uint64_t);
Context *append_ctx = new LambdaContext(
- [this, new_first_free_entry, ops, ctx](int r) {
+ [this, new_first_free_entry, ops, ctx](int r) {
std::shared_ptr<WriteLogPoolRoot> new_root;
{
ldout(m_image_ctx.cct, 20) << "Finished appending at "
if (invalidating || !has_write_entry) {
for (auto &log_entry : entries_to_flush) {
- Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
- if (!invalidating) {
- ctx = new LambdaContext(
- [this, log_entry, ctx](int r) {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(this->m_image_writeback, ctx);
- this->m_flush_ops_will_send -= 1;
- }), 0);
- });
- }
- post_unlock.add(ctx);
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, log_entry, invalidating]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (!invalidating) {
+ ctx = new LambdaContext([this, log_entry, ctx](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ });
+ }
+ ctx->complete(0);
+ });
+ this->detain_flush_guard_request(log_entry, guarded_ctx);
}
} else {
int count = entries_to_flush.size();
- std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries;
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> write_entries;
std::vector<bufferlist *> read_bls;
- std::vector<Context *> contexts;
- log_entries.reserve(count);
+ write_entries.reserve(count);
read_bls.reserve(count);
- contexts.reserve(count);
for (auto &log_entry : entries_to_flush) {
- // log_entry already removed from m_dirty_log_entries and
- // in construct_flush_entry() it will inc(m_flush_ops_in_flight).
- // We call this func here to make ops can track.
- Context *ctx = this->construct_flush_entry(log_entry, invalidating);
if (log_entry->is_write_entry()) {
bufferlist *bl = new bufferlist;
auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
write_entry->inc_bl_refs();
- log_entries.push_back(write_entry);
+ write_entries.push_back(write_entry);
read_bls.push_back(bl);
}
- contexts.push_back(ctx);
}
Context *ctx = new LambdaContext(
- [this, entries_to_flush, read_bls, contexts](int r) {
- int i = 0, j = 0;
+ [this, entries_to_flush, read_bls](int r) {
+ int i = 0;
+ GuardedRequestFunctionContext *guarded_ctx = nullptr;
for (auto &log_entry : entries_to_flush) {
- Context *ctx = contexts[j++];
-
if (log_entry->is_write_entry()) {
bufferlist captured_entry_bl;
-
captured_entry_bl.claim_append(*read_bls[i]);
delete read_bls[i++];
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
- auto captured_entry_bl = std::move(entry_bl);
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback_bl(this->m_image_writeback, ctx,
- std::move(captured_entry_bl));
- this->m_flush_ops_will_send -= 1;
- }), 0);
+ guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, false);
+
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
+ auto captured_entry_bl = std::move(entry_bl);
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback_bl(this->m_image_writeback, ctx,
+ std::move(captured_entry_bl));
+ }), 0);
+ });
} else {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(this->m_image_writeback, ctx);
- this->m_flush_ops_will_send -= 1;
- }), 0);
+ guarded_ctx = new GuardedRequestFunctionContext([this, log_entry]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, false);
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ });
}
+ this->detain_flush_guard_request(log_entry, guarded_ctx);
}
});
- aio_read_data_blocks(log_entries, read_bls, ctx);
+ aio_read_data_blocks(write_entries, read_bls, ctx);
}
}
this->m_bytes_cached -= cached_bytes;
ldout(m_image_ctx.cct, 20)
- << "Finished root update: " << "initial_first_valid_entry="
- << initial_first_valid_entry << ", " << "m_first_valid_entry="
- << m_first_valid_entry << "," << "release space = "
- << allocated_bytes << "," << "m_bytes_allocated="
- << m_bytes_allocated << "," << "release cached space="
- << cached_bytes << "," << "m_bytes_cached="
+ << "Finished root update: initial_first_valid_entry="
+ << initial_first_valid_entry << ", m_first_valid_entry="
+ << m_first_valid_entry << ", release space = "
+ << allocated_bytes << ", m_bytes_allocated="
+ << m_bytes_allocated << ", release cached space="
+ << cached_bytes << ", m_bytes_cached="
<< this->m_bytes_cached << dendl;
this->m_alloc_failed_since_retire = false;
});
Context *append_ctx = new LambdaContext([this, ctx](int r) {
ldout(m_image_ctx.cct, 15) << "Finish the update of pool root." << dendl;
- bool need_finisher = false;;
+ bool need_finisher = false;
assert(r == 0);
{
std::lock_guard locker(m_lock);
bls[i]->append(valid_data_bl);
write_entry->dec_bl_refs();
}
- ctx->complete(r);
+ ctx->complete(r);
});
CephContext *cct = m_image_ctx.cct;