// vim: ts=8 sw=2 smarttab
#include "librbd/io/SimpleSchedulerObjectDispatch.h"
+#include "include/neorados/RADOS.hpp"
+#include "common/ceph_time.h"
#include "common/Timer.h"
-#include "common/WorkQueue.h"
#include "common/errno.h"
+#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/io/FlushTracker.h"
namespace io {
using namespace boost::accumulators;
+using ceph::operator<<;
using librbd::util::data_object_name;
static const int LATENCY_STATS_WINDOW_SIZE = 10;
template <typename I>
bool SimpleSchedulerObjectDispatch<I>::ObjectRequests::try_delay_request(
- uint64_t object_off, ceph::bufferlist&& data, const ::SnapContext &snapc,
+ uint64_t object_off, ceph::bufferlist&& data, IOContext io_context,
int op_flags, int object_dispatch_flags, Context* on_dispatched) {
if (!m_delayed_requests.empty()) {
- if (snapc.seq != m_snapc.seq || op_flags != m_op_flags ||
- data.length() == 0 || intersects(object_off, data.length())) {
+ if (!m_io_context || *m_io_context != *io_context ||
+ op_flags != m_op_flags || data.length() == 0 ||
+ intersects(object_off, data.length())) {
return false;
}
} else {
- m_snapc = snapc;
+ m_io_context = io_context;
m_op_flags = op_flags;
}
auto req = ObjectDispatchSpec::create_write(
image_ctx, OBJECT_DISPATCH_LAYER_SCHEDULER,
- m_object_no, offset, std::move(merged_requests.data), m_snapc,
- m_op_flags, 0, {}, ctx);
+ m_object_no, offset, std::move(merged_requests.data), m_io_context,
+ m_op_flags, 0, std::nullopt, 0, {}, ctx);
req->object_dispatch_flags = m_object_dispatch_flags;
req->send();
ldout(cct, 5) << dendl;
// add ourself to the IO object dispatcher chain
- m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+ m_image_ctx->io_object_dispatcher->register_dispatch(this);
}
template <typename I>
template <typename I>
bool SimpleSchedulerObjectDispatch<I>::read(
- uint64_t object_no, uint64_t object_off, uint64_t object_len,
- librados::snap_t snap_id, int op_flags, const ZTracer::Trace &parent_trace,
- ceph::bufferlist* read_data, ExtentMap* extent_map,
- int* object_dispatch_flags, DispatchResult* dispatch_result,
- Context** on_finish, Context* on_dispatched) {
+ uint64_t object_no, ReadExtents* extents, IOContext io_context,
+ int op_flags, int read_flags, const ZTracer::Trace &parent_trace,
+ uint64_t* version, int* object_dispatch_flags,
+ DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
auto cct = m_image_ctx->cct;
- ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " "
- << object_off << "~" << object_len << dendl;
+ ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " " << extents
+ << dendl;
std::lock_guard locker{m_lock};
- if (intersects(object_no, object_off, object_len)) {
- dispatch_delayed_requests(object_no);
+ for (auto& extent : *extents) {
+ if (intersects(object_no, extent.offset, extent.length)) {
+ dispatch_delayed_requests(object_no);
+ break;
+ }
}
return false;
template <typename I>
bool SimpleSchedulerObjectDispatch<I>::discard(
uint64_t object_no, uint64_t object_off, uint64_t object_len,
- const ::SnapContext &snapc, int discard_flags,
+ IOContext io_context, int discard_flags,
const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
uint64_t* journal_tid, DispatchResult* dispatch_result,
Context** on_finish, Context* on_dispatched) {
template <typename I>
bool SimpleSchedulerObjectDispatch<I>::write(
uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data,
- const ::SnapContext &snapc, int op_flags,
+ IOContext io_context, int op_flags, int write_flags,
+ std::optional<uint64_t> assert_version,
const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
uint64_t* journal_tid, DispatchResult* dispatch_result,
Context** on_finish, Context* on_dispatched) {
<< object_off << "~" << data.length() << dendl;
std::lock_guard locker{m_lock};
- if (try_delay_write(object_no, object_off, std::move(data), snapc, op_flags,
- *object_dispatch_flags, on_dispatched)) {
+
+ // don't try to batch assert version writes
+ if (assert_version.has_value() ||
+ (write_flags & OBJECT_WRITE_FLAG_CREATE_EXCLUSIVE) != 0) {
+ dispatch_delayed_requests(object_no);
+ return false;
+ }
+
+ if (try_delay_write(object_no, object_off, std::move(data), io_context,
+ op_flags, *object_dispatch_flags, on_dispatched)) {
auto dispatch_seq = ++m_dispatch_seq;
m_flush_tracker->start_io(dispatch_seq);
bool SimpleSchedulerObjectDispatch<I>::write_same(
uint64_t object_no, uint64_t object_off, uint64_t object_len,
LightweightBufferExtents&& buffer_extents, ceph::bufferlist&& data,
- const ::SnapContext &snapc, int op_flags,
+ IOContext io_context, int op_flags,
const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
uint64_t* journal_tid, DispatchResult* dispatch_result,
Context** on_finish, Context* on_dispatched) {
template <typename I>
bool SimpleSchedulerObjectDispatch<I>::compare_and_write(
uint64_t object_no, uint64_t object_off, ceph::bufferlist&& cmp_data,
- ceph::bufferlist&& write_data, const ::SnapContext &snapc, int op_flags,
+ ceph::bufferlist&& write_data, IOContext io_context, int op_flags,
const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
int* object_dispatch_flags, uint64_t* journal_tid,
DispatchResult* dispatch_result, Context** on_finish,
template <typename I>
bool SimpleSchedulerObjectDispatch<I>::try_delay_write(
uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data,
- const ::SnapContext &snapc, int op_flags, int object_dispatch_flags,
+ IOContext io_context, int op_flags, int object_dispatch_flags,
Context* on_dispatched) {
ceph_assert(ceph_mutex_is_locked(m_lock));
auto cct = m_image_ctx->cct;
auto &object_requests = it->second;
bool delayed = object_requests->try_delay_request(
- object_off, std::move(data), snapc, op_flags, object_dispatch_flags,
+ object_off, std::move(data), io_context, op_flags, object_dispatch_flags,
on_dispatched);
ldout(cct, 20) << "delayed: " << delayed << dendl;
ldout(cct, 20) << "running timer task " << m_timer_task << dendl;
m_timer_task = nullptr;
- m_image_ctx->op_work_queue->queue(
- new LambdaContext(
- [this, object_no](int r) {
- std::lock_guard locker{m_lock};
- dispatch_delayed_requests(object_no);
- }), 0);
+ m_image_ctx->asio_engine->post(
+ [this, object_no]() {
+ std::lock_guard locker{m_lock};
+ dispatch_delayed_requests(object_no);
+ });
});
ldout(cct, 20) << "scheduling task " << m_timer_task << " at "