template <typename I>
void WriteLog<I>::collect_read_extents(
uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
- std::vector<WriteLogCacheEntry*> &log_entries_to_read,
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
std::vector<bufferlist*> &bls_to_read, uint64_t entry_hit_length,
Extent hit_extent, pwl::C_ReadRequest *read_ctx) {
/* Make a bl for this hit extent. This will add references to the
template <typename I>
void WriteLog<I>::complete_read(
- std::vector<WriteLogCacheEntry*> &log_entries_to_read,
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
std::vector<bufferlist*> &bls_to_read, Context *ctx) {
ctx->complete(0);
}
ldout(m_image_ctx.cct, 05) << "APPENDING: index="
<< operation->get_log_entry()->log_entry_index << " "
<< "operation=[" << *operation << "]" << dendl;
- operation->log_append_time = now;
+ operation->log_append_start_time = now;
*operation->get_log_entry()->cache_entry = operation->get_log_entry()->ram_entry;
ldout(m_image_ctx.cct, 20) << "APPENDING: index="
<< operation->get_log_entry()->log_entry_index << " "
}
template <typename I>
-void WriteLog<I>::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) {
+bool WriteLog<I>::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) {
CephContext *cct = m_image_ctx.cct;
TOID(struct WriteLogPoolRoot) pool_root;
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
if ((m_log_pool =
pmemobj_create(this->m_log_pool_name.c_str(),
this->m_pwl_pool_layout_name,
- this->m_log_pool_config_size,
+ this->m_log_pool_size,
(S_IWUSR | S_IRUSR))) == NULL) {
lderr(cct) << "failed to create pool (" << this->m_log_pool_name << ")"
<< pmemobj_errormsg() << dendl;
m_cache_state->empty = true;
/* TODO: filter/replace errnos that are meaningless to the caller */
on_finish->complete(-errno);
- return;
+ return false;
}
m_cache_state->present = true;
m_cache_state->clean = true;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
/* new pool, calculate and store metadata */
- size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE);
+ size_t effective_pool_size = (size_t)(this->m_log_pool_size * USABLE_SIZE);
size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogCacheEntry);
uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size);
if (num_small_writes > MAX_LOG_ENTRIES) {
if (num_small_writes <= 2) {
lderr(cct) << "num_small_writes needs to > 2" << dendl;
on_finish->complete(-EINVAL);
- return;
+ return false;
}
- this->m_log_pool_actual_size = this->m_log_pool_config_size;
this->m_bytes_allocated_cap = effective_pool_size;
/* Log ring empty */
m_first_free_entry = 0;
m_first_valid_entry = 0;
TX_BEGIN(m_log_pool) {
TX_ADD(pool_root);
- D_RW(pool_root)->header.layout_version = RWL_POOL_VERSION;
+ D_RW(pool_root)->header.layout_version = RWL_LAYOUT_VERSION;
D_RW(pool_root)->log_entries =
TX_ZALLOC(struct WriteLogCacheEntry,
sizeof(struct WriteLogCacheEntry) * num_small_writes);
- D_RW(pool_root)->pool_size = this->m_log_pool_actual_size;
+ D_RW(pool_root)->pool_size = this->m_log_pool_size;
D_RW(pool_root)->flushed_sync_gen = this->m_flushed_sync_gen;
D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE;
D_RW(pool_root)->num_log_entries = num_small_writes;
this->m_free_log_entries = 0;
lderr(cct) << "failed to initialize pool (" << this->m_log_pool_name << ")" << dendl;
on_finish->complete(-pmemobj_tx_errno());
- return;
+ return false;
} TX_FINALLY {
} TX_END;
} else {
lderr(cct) << "failed to open pool (" << this->m_log_pool_name << "): "
<< pmemobj_errormsg() << dendl;
on_finish->complete(-errno);
- return;
+ return false;
}
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
- if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
+ if (D_RO(pool_root)->header.layout_version != RWL_LAYOUT_VERSION) {
// TODO: will handle upgrading version in the future
lderr(cct) << "Pool layout version is "
<< D_RO(pool_root)->header.layout_version
- << " expected " << RWL_POOL_VERSION << dendl;
+ << " expected " << RWL_LAYOUT_VERSION << dendl;
on_finish->complete(-EINVAL);
- return;
+ return false;
}
if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
<< " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
on_finish->complete(-EINVAL);
- return;
+ return false;
}
- this->m_log_pool_actual_size = D_RO(pool_root)->pool_size;
+ this->m_log_pool_size = D_RO(pool_root)->pool_size;
this->m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
m_first_free_entry = D_RO(pool_root)->first_free_entry;
* entries, and n-1 free log entries */
this->m_free_log_entries = this->m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
}
- size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE);
+ size_t effective_pool_size = (size_t)(this->m_log_pool_size * USABLE_SIZE);
this->m_bytes_allocated_cap = effective_pool_size;
load_existing_entries(later);
m_cache_state->clean = this->m_dirty_log_entries.empty();
m_cache_state->empty = m_log_entries.empty();
}
+ return true;
}
/*
entry_index = (entry_index + 1) % this->m_total_log_entries;
}
- this->update_sync_points(missing_sync_points, sync_point_entries, later, MIN_WRITE_ALLOC_SIZE);
+ this->update_sync_points(missing_sync_points, sync_point_entries, later);
+}
+
+template <typename I>
+void WriteLog<I>::inc_allocated_cached_bytes(
+ std::shared_ptr<pwl::GenericLogEntry> log_entry) {
+ if (log_entry->is_write_entry()) {
+ this->m_bytes_allocated += std::max(log_entry->write_bytes(), MIN_WRITE_ALLOC_SIZE);
+ this->m_bytes_cached += log_entry->write_bytes();
+ }
}
template <typename I>
}
template <typename I>
-Context* WriteLog<I>::construct_flush_entry_ctx(
- std::shared_ptr<GenericLogEntry> log_entry) {
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+ DeferredContexts &post_unlock,
+ bool has_write_entry) {
bool invalidating = this->m_invalidating; // snapshot so we behave consistently
- Context *ctx = this->construct_flush_entry(log_entry, invalidating);
- if (invalidating) {
- return ctx;
- }
- return new LambdaContext(
- [this, log_entry, ctx](int r) {
- m_image_ctx.op_work_queue->queue(new LambdaContext(
+ for (auto &log_entry : entries_to_flush) {
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (!invalidating) {
+ ctx = new LambdaContext(
[this, log_entry, ctx](int r) {
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback(this->m_image_writeback, ctx);
- }), 0);
- });
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ this->m_flush_ops_will_send -= 1;
+ }), 0);
+ });
+ }
+ post_unlock.add(ctx);
+ }
}
const unsigned long int ops_flushed_together = 4;
* get to the log message append step. */
if (ops.size()) {
flush_pmem_buffer(ops);
- schedule_append_ops(ops);
+ schedule_append_ops(ops, nullptr);
}
} while (ops_remain);
append_scheduled_ops();
* all prior log entries are persisted everywhere.
*/
template <typename I>
-void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops)
+void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req)
{
bool need_finisher;
GenericLogOperationsVector appending;
template <typename V>
void WriteLog<I>::flush_pmem_buffer(V& ops)
{
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ if (operation->reserved_allocated()) {
+ operation->buf_persist_start_time = now;
+ } else {
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: "
+ << *operation << dendl;
+ }
+ }
+
for (auto &operation : ops) {
if(operation->is_writing_op()) {
auto log_entry = static_pointer_cast<WriteLogEntry>(operation->get_log_entry());
/* Drain once for all */
pmemobj_drain(m_log_pool);
- utime_t now = ceph_clock_now();
+ now = ceph_clock_now();
for (auto &operation : ops) {
if (operation->reserved_allocated()) {
operation->buf_persist_comp_time = now;
} else {
- ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: "
+ << *operation << dendl;
}
}
}
<< *req << dendl;
alloc_succeeds = false;
no_space = true; /* Entries need to be retired */
+
+ if (this->m_free_log_entries == this->m_total_log_entries - 1) {
+ /* When the cache is empty, there is still no space to allocate.
+ * Defragment. */
+ pmemobj_defrag(m_log_pool, NULL, 0, NULL);
+ }
break;
} else {
buffer.allocated = true;
req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated,
&num_lanes, &num_log_entries, &num_unpublished_reserves);
- alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied, bytes_allocated,
- num_lanes, num_log_entries, num_unpublished_reserves,
- this->m_bytes_allocated_cap);
+ alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied,
+ bytes_allocated, num_lanes, num_log_entries,
+ num_unpublished_reserves);
std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
if (!alloc_succeeds) {