"tp_pwl", 4, ""),
m_cache_state(cache_state),
m_image_ctx(image_ctx),
- m_log_pool_config_size(DEFAULT_POOL_SIZE),
+ m_log_pool_size(DEFAULT_POOL_SIZE),
m_image_writeback(image_writeback),
m_plugin_api(plugin_api),
m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name(
"Histogram of syncpoint's logentry numbers vs bytes number");
plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes");
+ plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources");
plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes");
plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries");
plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers");
plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes");
plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)");
- plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends");
plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy");
plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails");
- plb.add_u64_counter(l_librbd_pwl_flush, "flush", "Flush (flush RWL)");
+ plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)");
+ plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency");
plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL");
plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL");
template <typename I>
void AbstractWriteLog<I>::periodic_stats() {
std::lock_guard locker(m_lock);
- ldout(m_image_ctx.cct, 1) << "STATS: "
- << "m_free_log_entries=" << m_free_log_entries << ", "
- << "m_log_entries=" << m_log_entries.size() << ", "
- << "m_dirty_log_entries=" << m_dirty_log_entries.size() << ", "
- << "m_bytes_allocated=" << m_bytes_allocated << ", "
- << "m_bytes_cached=" << m_bytes_cached << ", "
- << "m_bytes_dirty=" << m_bytes_dirty << ", "
- << "bytes available=" << m_bytes_allocated_cap - m_bytes_allocated << ", "
- << "m_current_sync_gen=" << m_current_sync_gen << ", "
- << "m_flushed_sync_gen=" << m_flushed_sync_gen << ", "
+ ldout(m_image_ctx.cct, 1) << "STATS: m_log_entries=" << m_log_entries.size()
+ << ", m_dirty_log_entries=" << m_dirty_log_entries.size()
+ << ", m_free_log_entries=" << m_free_log_entries
+ << ", m_bytes_allocated=" << m_bytes_allocated
+ << ", m_bytes_cached=" << m_bytes_cached
+ << ", m_bytes_dirty=" << m_bytes_dirty
+ << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated
+ << ", m_first_valid_entry=" << m_first_valid_entry
+ << ", m_first_free_entry=" << m_first_free_entry
+ << ", m_current_sync_gen=" << m_current_sync_gen
+ << ", m_flushed_sync_gen=" << m_flushed_sync_gen
<< dendl;
}
void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry,
WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points,
std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
- int entry_index) {
+ uint64_t entry_index) {
bool writer = cache_entry->is_writer();
if (cache_entry->is_sync_point()) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
template <typename I>
void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
- DeferredContexts &later, uint32_t alloc_size ) {
+ DeferredContexts &later) {
/* Create missing sync points. These must not be appended until the
* entry reload is complete and the write map is up to
* date. Currently this is handled by the deferred contexts object
gen_write_entry->set_flushed(true);
sync_point_entry->writes_flushed++;
}
- if (log_entry->write_bytes() == log_entry->bytes_dirty()) {
- /* This entry is a basic write */
- uint64_t bytes_allocated = alloc_size;
- if (gen_write_entry->ram_entry.write_bytes > bytes_allocated) {
- bytes_allocated = gen_write_entry->ram_entry.write_bytes;
- }
- m_bytes_allocated += bytes_allocated;
- m_bytes_cached += gen_write_entry->ram_entry.write_bytes;
- }
+
+ /* calc m_bytes_allocated & m_bytes_cached */
+ inc_allocated_cached_bytes(log_entry);
}
}
} else {
ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl;
m_log_pool_name = m_cache_state->path;
- m_log_pool_config_size = max(m_cache_state->size, MIN_POOL_SIZE);
+ m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE);
+ m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN);
+ ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size
+ << " (adjusted from " << m_cache_state->size << ")" << dendl;
if ((!m_cache_state->present) &&
(access(m_log_pool_name.c_str(), F_OK) == 0)) {
(access(m_log_pool_name.c_str(), F_OK) != 0)) {
ldout(cct, 5) << "Can't find the existed pool file " << m_log_pool_name << dendl;
on_finish->complete(-errno);
+ return;
}
- initialize_pool(on_finish, later);
+ bool succeeded = initialize_pool(on_finish, later);
+ if (!succeeded) {
+ return ;
+ }
ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
<< " log entries, " << m_free_log_entries << " of which are free."
void AbstractWriteLog<I>::init(Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << dendl;
- perf_start(m_image_ctx.id);
+ auto pname = std::string("librbd-pwl-") + m_image_ctx.id +
+ std::string("-") + m_image_ctx.md_ctx.get_pool_name() +
+ std::string("-") + m_image_ctx.name;
+ perf_start(pname);
ceph_assert(!m_initialized);
}
{
std::lock_guard locker(m_lock);
- ceph_assert(m_dirty_log_entries.size() == 0);
+ check_image_cache_state_clean();
m_wake_up_enabled = false;
m_cache_state->clean = true;
m_log_entries.clear();
}
update_image_cache_state(next_ctx);
});
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ Context *next_ctx = override_ctx(r, ctx);
+ ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
+ // Wait for in progress IOs to complete
+ next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx);
+ m_async_op_tracker.wait_for_ops(next_ctx);
+ });
ctx = new LambdaContext(
[this, ctx](int r) {
Context *next_ctx = override_ctx(r, ctx);
}
flush_dirty_entries(next_ctx);
});
- ctx = new LambdaContext(
- [this, ctx](int r) {
- Context *next_ctx = override_ctx(r, ctx);
- ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
- // Wait for in progress IOs to complete
- next_ctx = util::create_async_context_callback(m_image_ctx, next_ctx);
- m_async_op_tracker.wait_for_ops(next_ctx);
- });
ctx = new LambdaContext(
[this, ctx](int r) {
ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
bl->clear();
m_perfcounter->inc(l_librbd_pwl_rd_req, 1);
- std::vector<WriteLogCacheEntry*> log_entries_to_read;
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read;
std::vector<bufferlist*> bls_to_read;
m_async_op_tracker.start_op();
}
template <typename I>
-void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops)
+void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req)
{
GenericLogOperations to_append(ops.begin(), ops.end());
- schedule_append_ops(to_append);
+ schedule_append_ops(to_append, req);
}
template <typename I>
-void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op)
+void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req)
{
GenericLogOperations to_append { op };
- schedule_append_ops(to_append);
+ schedule_append_ops(to_append, req);
}
/*
}
op->complete(result);
m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t,
- op->log_append_time - op->dispatch_time);
+ op->log_append_start_time - op->dispatch_time);
m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist,
utime_t(now - op->dispatch_time).to_nsec(),
log_entry->ram_entry.write_bytes);
- utime_t app_lat = op->log_append_comp_time - op->log_append_time;
+ utime_t app_lat = op->log_append_comp_time - op->log_append_start_time;
m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat);
m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
log_entry->ram_entry.write_bytes);
- m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_time);
+ m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time);
}
// New entries may be flushable
{
std::lock_guard locker(m_lock);
dispatch_here = m_deferred_ios.empty();
// Only flush req's total_bytes is the max uint64
- if ((req->image_extents_summary.total_bytes ==
- std::numeric_limits<uint64_t>::max())) {
+ if (req->image_extents_summary.total_bytes ==
+ std::numeric_limits<uint64_t>::max() &&
+ static_cast<C_FlushRequestT *>(req)->internal == true) {
dispatch_here = true;
}
}
}
template <typename I>
-bool AbstractWriteLog<I>::check_allocation(C_BlockIORequestT *req,
- uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
- uint64_t &num_lanes, uint64_t &num_log_entries,
- uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap){
+bool AbstractWriteLog<I>::check_allocation(
+ C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied,
+ uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries,
+ uint32_t num_unpublished_reserves) {
bool alloc_succeeds = true;
bool no_space = false;
{
no_space = true; /* Entries must be retired */
}
/* Don't attempt buffer allocate if we've exceeded the "full" threshold */
- if (m_bytes_allocated + bytes_allocated > bytes_allocated_cap) {
+ if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
if (!req->has_io_waited_for_buffers()) {
req->set_io_waited_for_buffers(true);
- ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap="
- << bytes_allocated_cap
+ ldout(m_image_ctx.cct, 5) << "Waiting for allocation cap (cap="
+ << m_bytes_allocated_cap
<< ", allocated=" << m_bytes_allocated
<< ") in write [" << *req << "]" << dendl;
}
/* We need one free log entry per extent (each is a separate entry), and
* one free "lane" for remote replication. */
if ((m_free_lanes >= num_lanes) &&
- (m_free_log_entries >= num_log_entries)) {
+ (m_free_log_entries >= num_log_entries) &&
+ (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) {
m_free_lanes -= num_lanes;
m_free_log_entries -= num_log_entries;
m_unpublished_reserves += num_unpublished_reserves;
if (req->has_io_waited_for_buffers()) {
req->set_io_waited_for_buffers(false);
}
-
} else {
alloc_succeeds = false;
}
m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
}
m_flush_ops_in_flight += 1;
+ m_flush_ops_will_send += 1;
/* For write same this is the bytes affected by the flush op, not the bytes transferred */
m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
/* Flush write completion action */
+ utime_t writeback_start_time = ceph_clock_now();
Context *ctx = new LambdaContext(
- [this, log_entry, invalidating](int r) {
+ [this, log_entry, writeback_start_time, invalidating](int r) {
+ utime_t writeback_comp_time = ceph_clock_now();
+ m_perfcounter->tinc(l_librbd_pwl_writeback_latency,
+ writeback_comp_time - writeback_start_time);
{
std::lock_guard locker(m_lock);
if (r < 0) {
CephContext *cct = m_image_ctx.cct;
bool all_clean = false;
int flushed = 0;
+ bool has_write_entry = false;
ldout(cct, 20) << "Look for dirty entries" << dendl;
{
DeferredContexts post_unlock;
+ GenericLogEntries entries_to_flush;
+
std::shared_lock entry_reader_locker(m_entry_reader_lock);
+ std::lock_guard locker(m_lock);
while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
- std::lock_guard locker(m_lock);
if (m_shutting_down) {
ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
/* Do flush complete only when all flush ops are finished */
all_clean = !m_flush_ops_in_flight;
break;
}
+
+ if (m_flush_ops_will_send) {
+ ldout(cct, 20) << "Previous flush-ops is still not sent" << dendl;
+ break;
+ }
auto candidate = m_dirty_log_entries.front();
bool flushable = can_flush_entry(candidate);
if (flushable) {
- post_unlock.add(construct_flush_entry_ctx(candidate));
+ entries_to_flush.push_back(candidate);
flushed++;
+ if (!has_write_entry)
+ has_write_entry = candidate->is_write_entry();
m_dirty_log_entries.pop_front();
} else {
ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
break;
}
}
+
+ construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
}
if (all_clean) {
}
m_async_op_tracker.start_op();
m_work_queue.queue(new LambdaContext(
- [this, log_entry](int r) {
+ [this, next = std::move(log_entry->next_sync_point_entry)](int r) {
bool handled_by_next;
{
std::lock_guard locker(m_lock);
- handled_by_next = handle_flushed_sync_point(log_entry->next_sync_point_entry);
+ handled_by_next = handle_flushed_sync_point(std::move(next));
}
if (!handled_by_next) {
persist_last_flushed_sync_gen();
if (invalidate) {
m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1);
} else {
- m_perfcounter->inc(l_librbd_pwl_flush, 1);
+ m_perfcounter->inc(l_librbd_pwl_internal_flush, 1);
}
}
return log_entry->can_retire();
}
+template <typename I>
+void AbstractWriteLog<I>::check_image_cache_state_clean() {
+ ceph_assert(m_deferred_ios.empty());
+ ceph_assert(m_ops_to_append.empty());;
+ ceph_assert(m_async_flush_ops == 0);
+ ceph_assert(m_async_append_ops == 0);
+ ceph_assert(m_dirty_log_entries.empty());
+ ceph_assert(m_ops_to_flush.empty());
+ ceph_assert(m_flush_ops_in_flight == 0);
+ ceph_assert(m_flush_bytes_in_flight == 0);
+ ceph_assert(m_bytes_dirty == 0);
+ ceph_assert(m_work_queue.empty());
+}
+
} // namespace pwl
} // namespace cache
} // namespace librbd