#include "common/dout.h"
#include "common/Mutex.h"
#include "common/WorkQueue.h"
+#include "osdc/Striper.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "include/rbd/librbd.hpp"
#include "librbd/Journal.h"
#include "librbd/Utils.h"
#include "librbd/io/AioCompletion.h"
-#include "librbd/io/ObjectRequest.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcher.h"
#include "librbd/io/ReadResult.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
namespace librbd {
- /**
- * callback to finish a rados completion as a Context
- *
- * @param c completion
- * @param arg Context* recast as void*
- */
- void context_cb(rados_completion_t c, void *arg)
- {
- Context *con = reinterpret_cast<Context *>(arg);
- con->complete(rados_aio_get_return_value(c));
- }
-
/**
* context to wrap another context in a Mutex
*
ldout(m_cct, 20) << "C_OrderedWrite completing " << m_result << dendl;
{
Mutex::Locker l(m_wb_handler->m_lock);
- assert(!m_result->done);
+ ceph_assert(!m_result->done);
m_result->done = true;
m_result->ret = r;
m_wb_handler->complete_writes(m_result->oid);
LibrbdWriteback *m_wb_handler;
};
- struct C_WriteJournalCommit : public Context {
- typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
-
- ImageCtx *image_ctx;
- std::string oid;
- uint64_t object_no;
- uint64_t off;
- bufferlist bl;
- SnapContext snapc;
- uint64_t journal_tid;
- ZTracer::Trace trace;
- Context *req_comp;
- bool request_sent = false;
-
- C_WriteJournalCommit(ImageCtx *_image_ctx, const std::string &_oid,
- uint64_t _object_no, uint64_t _off,
- const bufferlist &_bl, const SnapContext& _snapc,
- uint64_t _journal_tid,
- const ZTracer::Trace &trace, Context *_req_comp)
- : image_ctx(_image_ctx), oid(_oid), object_no(_object_no), off(_off),
- bl(_bl), snapc(_snapc), journal_tid(_journal_tid),
- trace(trace), req_comp(_req_comp) {
- CephContext *cct = image_ctx->cct;
- ldout(cct, 20) << this << " C_WriteJournalCommit: "
- << "delaying write until journal tid "
- << journal_tid << " safe" << dendl;
- }
-
- void complete(int r) override {
- if (request_sent || r < 0) {
- if (request_sent && r == 0) {
- // only commit IO events that are safely recorded to the backing image
- // since the cache will retry all IOs that fail
- commit_io_event_extent(0);
- }
-
- req_comp->complete(r);
- delete this;
- } else {
- send_request();
- }
- }
-
- void finish(int r) override {
- }
-
- void commit_io_event_extent(int r) {
- CephContext *cct = image_ctx->cct;
- ldout(cct, 20) << this << " C_WriteJournalCommit: "
- << "write committed: updating journal commit position"
- << dendl;
-
- // all IO operations are flushed prior to closing the journal
- assert(image_ctx->journal != NULL);
-
- Extents file_extents;
- Striper::extent_to_file(cct, &image_ctx->layout, object_no, off,
- bl.length(), file_extents);
- for (Extents::iterator it = file_extents.begin();
- it != file_extents.end(); ++it) {
- image_ctx->journal->commit_io_event_extent(journal_tid, it->first,
- it->second, r);
- }
- }
-
- void send_request() {
- CephContext *cct = image_ctx->cct;
- ldout(cct, 20) << this << " C_WriteJournalCommit: "
- << "journal committed: sending write request" << dendl;
-
- assert(image_ctx->exclusive_lock->is_lock_owner());
-
- request_sent = true;
- auto req = new io::ObjectWriteRequest<>(image_ctx, oid, object_no, off,
- bl, snapc, 0, trace, this);
- req->send();
- }
- };
-
struct C_CommitIOEventExtent : public Context {
ImageCtx *image_ctx;
uint64_t journal_tid;
void finish(int r) override {
// all IO operations are flushed prior to closing the journal
- assert(image_ctx->journal != nullptr);
+ ceph_assert(image_ctx->journal != nullptr);
image_ctx->journal->commit_io_event_extent(journal_tid, offset, length,
r);
aio_comp->read_result = io::ReadResult{pbl};
aio_comp->set_request_count(1);
- auto req_comp = new io::ReadResult::C_SparseReadRequest<>(
- aio_comp, {{0, len}}, false);
- auto req = io::ObjectReadRequest<>::create(m_ictx, oid.name, object_no, off,
- len, snapid, op_flags, true,
- trace, req_comp);
- req_comp->request = req;
+ auto req_comp = new io::ReadResult::C_ObjectReadRequest(
+ aio_comp, off, len, {{0, len}});
+
+ auto req = io::ObjectDispatchSpec::create_read(
+ m_ictx, io::OBJECT_DISPATCH_LAYER_CACHE, oid.name, object_no, off, len,
+ snapid, op_flags, trace, &req_comp->bl, &req_comp->extent_map, req_comp);
req->send();
}
write_result_d *result = new write_result_d(oid.name, oncommit);
m_writes[oid.name].push(result);
ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl;
- C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, trace,
- this);
- // all IO operations are flushed prior to closing the journal
- assert(journal_tid == 0 || m_ictx->journal != NULL);
- if (journal_tid != 0) {
- m_ictx->journal->flush_event(
- journal_tid, new C_WriteJournalCommit(
- m_ictx, oid.name, object_no, off, bl, snapc, journal_tid, trace,
- req_comp));
- } else {
- auto req = new io::ObjectWriteRequest<>(
- m_ictx, oid.name, object_no, off, bl, snapc, 0, trace, req_comp);
- req->send();
- }
+ bufferlist bl_copy(bl);
+
+ Context *ctx = new C_OrderedWrite(m_ictx->cct, result, trace, this);
+ ctx = util::create_async_context_callback(*m_ictx, ctx);
+
+ auto req = io::ObjectDispatchSpec::create_write(
+ m_ictx, io::OBJECT_DISPATCH_LAYER_CACHE, oid.name, object_no, off,
+ std::move(bl_copy), snapc, 0, journal_tid, trace, ctx);
+ req->object_dispatch_flags = (
+ io::OBJECT_DISPATCH_FLAG_FLUSH |
+ io::OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR);
+ req->send();
+
return ++m_tid;
}
uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
// all IO operations are flushed prior to closing the journal
- assert(original_journal_tid != 0 && m_ictx->journal != NULL);
+ ceph_assert(original_journal_tid != 0 && m_ictx->journal != NULL);
Extents file_extents;
Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off,
void LibrbdWriteback::complete_writes(const std::string& oid)
{
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
std::queue<write_result_d*>& results = m_writes[oid];
ldout(m_ictx->cct, 20) << "complete_writes() oid " << oid << dendl;
std::list<write_result_d*> finished;