namespace cache {
namespace pwl {
+using namespace std;
using namespace librbd::cache::pwl;
typedef AbstractWriteLog<ImageCtx>::Extent Extent;
plugin::Api<I>& plugin_api)
: m_builder(builder),
m_write_log_guard(image_ctx.cct),
+ m_flush_guard(image_ctx.cct),
+ m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
"librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
ceph_assert(kv.first == m_current_sync_gen+1);
init_flush_new_sync_point(later);
ceph_assert(kv.first == m_current_sync_gen);
- sync_point_entries[kv.first] = m_current_sync_point->log_entry;;
+ sync_point_entries[kv.first] = m_current_sync_point->log_entry;
}
/*
ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name
<< ", While there's no cache in the image metatata." << dendl;
if (remove(m_log_pool_name.c_str()) != 0) {
- lderr(cct) << "Failed to remove the pool file " << m_log_pool_name
+ lderr(cct) << "failed to remove the pool file " << m_log_pool_name
<< dendl;
on_finish->complete(-errno);
return;
}
} else if ((m_cache_state->present) &&
(access(m_log_pool_name.c_str(), F_OK) != 0)) {
- ldout(cct, 5) << "Can't find the existed pool file " << m_log_pool_name << dendl;
+ lderr(cct) << "can't find the existed pool file: " << m_log_pool_name
+ << ". error: " << cpp_strerror(-errno) << dendl;
on_finish->complete(-errno);
return;
}
C_ReadRequest *read_ctx = m_builder->create_read_request(
cct, now, m_perfcounter, bl, on_finish);
ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
- << "image_extents=" << image_extents << ", "
- << "bl=" << bl << ", "
- << "on_finish=" << on_finish << dendl;
+ << "image_extents=" << image_extents
+ << ", bl=" << bl
+ << ", on_finish=" << on_finish << dendl;
ceph_assert(m_initialized);
bl->clear();
}
}
- ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents << ", "
- << "miss_bl=" << read_ctx->miss_bl << dendl;
+ ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents
+ << ", miss_bl=" << read_ctx->miss_bl << dendl;
complete_read(log_entries_to_read, bls_to_read, ctx);
}
ceph_assert(m_initialized);
- /* Split images because PMDK's space management is not perfect, there are
- * fragment problems. The larger the block size difference of the block,
- * the easier the fragmentation problem will occur, resulting in the
- * remaining space can not be allocated in large size. We plan to manage
- * pmem space and allocation by ourselves in the future.
+ /* Split image extents larger than 1M. This isn't strictly necessary but
+ * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost.
+ * We plan to manage pmem space and allocation by ourselves in the future.
*/
Extents split_image_extents;
uint64_t max_extent_size = get_max_extent();
std::advance(last_in_batch, ops_to_append);
ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
ops_remain = true; /* Always check again before leaving */
- ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
- << m_ops_to_append.size() << " remain" << dendl;
+ ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain "
+ << m_ops_to_append.size() << dendl;
} else if (isRWL) {
ops_remain = false;
if (appending) {
{
std::lock_guard locker(m_lock);
if (m_free_lanes < num_lanes) {
- req->set_io_waited_for_lanes(true);
ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
<< num_lanes
<< ", have " << m_free_lanes << ") "
/* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
}
if (m_free_log_entries < num_log_entries) {
- req->set_io_waited_for_entries(true);
ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
<< num_log_entries
<< ", have " << m_free_log_entries << ") "
}
/* Don't attempt buffer allocate if we've exceeded the "full" threshold */
if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
- if (!req->has_io_waited_for_buffers()) {
- req->set_io_waited_for_buffers(true);
- ldout(m_image_ctx.cct, 5) << "Waiting for allocation cap (cap="
- << m_bytes_allocated_cap
- << ", allocated=" << m_bytes_allocated
- << ") in write [" << *req << "]" << dendl;
- }
+ ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap="
+ << m_bytes_allocated_cap
+ << ", allocated=" << m_bytes_allocated
+ << ") in write [" << *req << "]" << dendl;
alloc_succeeds = false;
no_space = true; /* Entries must be retired */
}
m_bytes_allocated += bytes_allocated;
m_bytes_cached += bytes_cached;
m_bytes_dirty += bytes_dirtied;
- if (req->has_io_waited_for_buffers()) {
- req->set_io_waited_for_buffers(false);
- }
} else {
alloc_succeeds = false;
}
}
template <typename I>
-Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
- bool invalidating) {
- CephContext *cct = m_image_ctx.cct;
+void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
+ GuardedRequestFunctionContext *guarded_ctx) {
+ ldout(m_image_ctx.cct, 20) << dendl;
- ldout(cct, 20) << "" << dendl;
- ceph_assert(m_entry_reader_lock.is_locked());
- ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
- if (!m_flush_ops_in_flight ||
- (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
- m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
+ BlockExtent extent;
+ if (log_entry->is_sync_point()) {
+ extent = block_extent(whole_volume_extent());
+ } else {
+ extent = log_entry->ram_entry.block_extent();
}
- m_flush_ops_in_flight += 1;
- m_flush_ops_will_send += 1;
- /* For write same this is the bytes affected by the flush op, not the bytes transferred */
- m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
+
+ auto req = GuardedRequest(extent, guarded_ctx, false);
+ BlockGuardCell *cell = nullptr;
+
+ {
+ std::lock_guard locker(m_flush_guard_lock);
+ m_flush_guard.detain(req.block_extent, &req, &cell);
+ }
+ if (cell) {
+ req.guard_ctx->cell = cell;
+ m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+ }
+}
+
+template <typename I>
+Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
+ bool invalidating) {
+ ldout(m_image_ctx.cct, 20) << "" << dendl;
/* Flush write completion action */
utime_t writeback_start_time = ceph_clock_now();
});
/* Flush through lower cache before completing */
ctx = new LambdaContext(
- [this, ctx](int r) {
+ [this, ctx, log_entry](int r) {
+ {
+
+ WriteLogGuard::BlockOperations block_reqs;
+ BlockGuardCell *detained_cell = nullptr;
+
+ std::lock_guard locker{m_flush_guard_lock};
+ m_flush_guard.release(log_entry->m_cell, &block_reqs);
+
+ for (auto &req : block_reqs) {
+ m_flush_guard.detain(req.block_extent, &req, &detained_cell);
+ if (detained_cell) {
+ req.guard_ctx->cell = detained_cell;
+ m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+ }
+ }
+ }
+
if (r < 0) {
lderr(m_image_ctx.cct) << "failed to flush log entry"
<< cpp_strerror(r) << dendl;
break;
}
- if (m_flush_ops_will_send) {
- ldout(cct, 20) << "Previous flush-ops is still not sent" << dendl;
- break;
- }
auto candidate = m_dirty_log_entries.front();
bool flushable = can_flush_entry(candidate);
if (flushable) {
if (!has_write_entry)
has_write_entry = candidate->is_write_entry();
m_dirty_log_entries.pop_front();
+
+ // To track candidate, we should add m_flush_ops_in_flight in here
+ {
+ if (!m_flush_ops_in_flight ||
+ (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
+ m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number;
+ }
+ m_flush_ops_in_flight += 1;
+ /* For write same this is the bytes affected by the flush op, not the bytes transferred */
+ m_flush_bytes_in_flight += candidate->ram_entry.write_bytes;
+ }
} else {
ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
break;
<< invalidate << ")" << dendl;
if (m_log_entries.size()) {
ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
- << m_log_entries.size() << ", "
- << "front()=" << *m_log_entries.front()
+ << m_log_entries.size()
+ << ", front()=" << *m_log_entries.front()
<< dendl;
}
if (invalidate) {
ldout(cct, 20) << dendl;
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(log_entry);
return log_entry->can_retire();
}
template <typename I>
void AbstractWriteLog<I>::check_image_cache_state_clean() {
ceph_assert(m_deferred_ios.empty());
- ceph_assert(m_ops_to_append.empty());;
+ ceph_assert(m_ops_to_append.empty());
ceph_assert(m_async_flush_ops == 0);
ceph_assert(m_async_append_ops == 0);
ceph_assert(m_dirty_log_entries.empty());