]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/cache/ObjectCacherWriteback.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / librbd / cache / ObjectCacherWriteback.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <errno.h>
5
6#include "librbd/cache/ObjectCacherWriteback.h"
7#include "common/ceph_context.h"
8#include "common/dout.h"
9#include "common/ceph_mutex.h"
9f95a23c
TL
10#include "osdc/Striper.h"
11#include "include/Context.h"
f67539c2 12#include "include/neorados/RADOS.hpp"
9f95a23c
TL
13#include "include/rados/librados.hpp"
14#include "include/rbd/librbd.hpp"
15
16#include "librbd/ExclusiveLock.h"
17#include "librbd/ImageCtx.h"
18#include "librbd/internal.h"
19#include "librbd/ObjectMap.h"
20#include "librbd/Journal.h"
21#include "librbd/Utils.h"
f67539c2 22#include "librbd/asio/ContextWQ.h"
9f95a23c
TL
23#include "librbd/io/AioCompletion.h"
24#include "librbd/io/ObjectDispatchSpec.h"
f67539c2 25#include "librbd/io/ObjectDispatcherInterface.h"
9f95a23c 26#include "librbd/io/ReadResult.h"
f67539c2 27#include "librbd/io/Utils.h"
9f95a23c
TL
28
29#include "include/ceph_assert.h"
30
31#define dout_subsys ceph_subsys_rbd
32#undef dout_prefix
33#define dout_prefix *_dout << "librbd::cache::ObjectCacherWriteback: "
34
20effc67
TL
35using namespace std;
36
9f95a23c
TL
37namespace librbd {
38namespace cache {
39
40/**
41 * context to wrap another context in a Mutex
42 *
43 * @param cct cct
44 * @param c context to finish
45 * @param l mutex to lock
46 */
47class C_ReadRequest : public Context {
48public:
49 C_ReadRequest(CephContext *cct, Context *c, ceph::mutex *cache_lock)
50 : m_cct(cct), m_ctx(c), m_cache_lock(cache_lock) {
51 }
52 void finish(int r) override {
53 ldout(m_cct, 20) << "aio_cb completing " << dendl;
54 {
55 std::lock_guard cache_locker{*m_cache_lock};
56 m_ctx->complete(r);
57 }
58 ldout(m_cct, 20) << "aio_cb finished" << dendl;
59 }
60private:
61 CephContext *m_cct;
62 Context *m_ctx;
63 ceph::mutex *m_cache_lock;
64};
65
66class C_OrderedWrite : public Context {
67public:
68 C_OrderedWrite(CephContext *cct,
69 ObjectCacherWriteback::write_result_d *result,
70 const ZTracer::Trace &trace, ObjectCacherWriteback *wb)
71 : m_cct(cct), m_result(result), m_trace(trace), m_wb_handler(wb) {}
72 ~C_OrderedWrite() override {}
73 void finish(int r) override {
74 ldout(m_cct, 20) << "C_OrderedWrite completing " << m_result << dendl;
75 {
76 std::lock_guard l{m_wb_handler->m_lock};
77 ceph_assert(!m_result->done);
78 m_result->done = true;
79 m_result->ret = r;
80 m_wb_handler->complete_writes(m_result->oid);
81 }
82 ldout(m_cct, 20) << "C_OrderedWrite finished " << m_result << dendl;
83 m_trace.event("finish");
84 }
85private:
86 CephContext *m_cct;
87 ObjectCacherWriteback::write_result_d *m_result;
88 ZTracer::Trace m_trace;
89 ObjectCacherWriteback *m_wb_handler;
90};
91
92struct C_CommitIOEventExtent : public Context {
93 ImageCtx *image_ctx;
94 uint64_t journal_tid;
95 uint64_t offset;
96 uint64_t length;
97
98 C_CommitIOEventExtent(ImageCtx *image_ctx, uint64_t journal_tid,
99 uint64_t offset, uint64_t length)
100 : image_ctx(image_ctx), journal_tid(journal_tid), offset(offset),
101 length(length) {
102 }
103
104 void finish(int r) override {
105 // all IO operations are flushed prior to closing the journal
106 ceph_assert(image_ctx->journal != nullptr);
107
108 image_ctx->journal->commit_io_event_extent(journal_tid, offset, length, r);
109 }
110};
111
112ObjectCacherWriteback::ObjectCacherWriteback(ImageCtx *ictx, ceph::mutex& lock)
113 : m_tid(0), m_lock(lock), m_ictx(ictx) {
114}
115
116void ObjectCacherWriteback::read(const object_t& oid, uint64_t object_no,
117 const object_locator_t& oloc,
118 uint64_t off, uint64_t len, snapid_t snapid,
119 bufferlist *pbl, uint64_t trunc_size,
120 __u32 trunc_seq, int op_flags,
121 const ZTracer::Trace &parent_trace,
122 Context *onfinish)
123{
124 ZTracer::Trace trace;
125 if (parent_trace.valid()) {
126 trace.init("", &m_ictx->trace_endpoint, &parent_trace);
127 trace.copy_name("cache read " + oid.name);
128 trace.event("start");
129 }
130
131 // on completion, take the mutex and then call onfinish.
132 onfinish = new C_ReadRequest(m_ictx->cct, onfinish, &m_lock);
133
134 // re-use standard object read state machine
135 auto aio_comp = io::AioCompletion::create_and_start(onfinish, m_ictx,
136 io::AIO_TYPE_READ);
137 aio_comp->read_result = io::ReadResult{pbl};
138 aio_comp->set_request_count(1);
139
140 auto req_comp = new io::ReadResult::C_ObjectReadRequest(
f67539c2
TL
141 aio_comp, {{off, len, {{0, len}}}});
142
143 auto io_context = m_ictx->duplicate_data_io_context();
144 if (snapid != CEPH_NOSNAP) {
145 io_context->read_snap(snapid);
146 }
147
148 // extract the embedded RBD read flags from the op_flags
149 int read_flags = (op_flags & READ_FLAGS_MASK) >> READ_FLAGS_SHIFT;
150 op_flags &= ~READ_FLAGS_MASK;
9f95a23c
TL
151
152 auto req = io::ObjectDispatchSpec::create_read(
f67539c2
TL
153 m_ictx, io::OBJECT_DISPATCH_LAYER_CACHE, object_no, &req_comp->extents,
154 io_context, op_flags, read_flags, trace, nullptr, req_comp);
9f95a23c
TL
155 req->send();
156}
157
158bool ObjectCacherWriteback::may_copy_on_write(const object_t& oid,
159 uint64_t read_off,
160 uint64_t read_len,
161 snapid_t snapid)
162{
163 m_ictx->image_lock.lock_shared();
164 librados::snap_t snap_id = m_ictx->snap_id;
165 uint64_t overlap = 0;
166 m_ictx->get_parent_overlap(snap_id, &overlap);
167 m_ictx->image_lock.unlock_shared();
168
169 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
170
171 // reverse map this object extent onto the parent
172 vector<pair<uint64_t,uint64_t> > objectx;
f67539c2
TL
173 io::util::extent_to_file(
174 m_ictx, object_no, 0, m_ictx->layout.object_size, objectx);
9f95a23c
TL
175 uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap);
176 bool may = object_overlap > 0;
177 ldout(m_ictx->cct, 10) << "may_copy_on_write " << oid << " " << read_off
178 << "~" << read_len << " = " << may << dendl;
179 return may;
180}
181
182ceph_tid_t ObjectCacherWriteback::write(const object_t& oid,
183 const object_locator_t& oloc,
184 uint64_t off, uint64_t len,
185 const SnapContext& snapc,
186 const bufferlist &bl,
187 ceph::real_time mtime,
188 uint64_t trunc_size,
189 __u32 trunc_seq, ceph_tid_t journal_tid,
190 const ZTracer::Trace &parent_trace,
191 Context *oncommit)
192{
193 ZTracer::Trace trace;
194 if (parent_trace.valid()) {
195 trace.init("", &m_ictx->trace_endpoint, &parent_trace);
196 trace.copy_name("writeback " + oid.name);
197 trace.event("start");
198 }
199
200 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
201
202 write_result_d *result = new write_result_d(oid.name, oncommit);
203 m_writes[oid.name].push(result);
204 ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl;
205
206 bufferlist bl_copy(bl);
207
208 Context *ctx = new C_OrderedWrite(m_ictx->cct, result, trace, this);
209 ctx = util::create_async_context_callback(*m_ictx, ctx);
210
f67539c2
TL
211 auto io_context = m_ictx->duplicate_data_io_context();
212 if (!snapc.empty()) {
213 io_context->write_snap_context(
214 {{snapc.seq, {snapc.snaps.begin(), snapc.snaps.end()}}});
215 }
216
9f95a23c
TL
217 auto req = io::ObjectDispatchSpec::create_write(
218 m_ictx, io::OBJECT_DISPATCH_LAYER_CACHE, object_no, off, std::move(bl_copy),
f67539c2 219 io_context, 0, 0, std::nullopt, journal_tid, trace, ctx);
9f95a23c
TL
220 req->object_dispatch_flags = (
221 io::OBJECT_DISPATCH_FLAG_FLUSH |
222 io::OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR);
223 req->send();
224
225 return ++m_tid;
226}
227
228
229void ObjectCacherWriteback::overwrite_extent(const object_t& oid, uint64_t off,
230 uint64_t len,
231 ceph_tid_t original_journal_tid,
232 ceph_tid_t new_journal_tid) {
233 typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
234
235 ldout(m_ictx->cct, 20) << __func__ << ": " << oid << " "
236 << off << "~" << len << " "
237 << "journal_tid=" << original_journal_tid << ", "
238 << "new_journal_tid=" << new_journal_tid << dendl;
239
240 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
241
242 // all IO operations are flushed prior to closing the journal
243 ceph_assert(original_journal_tid != 0 && m_ictx->journal != NULL);
244
245 Extents file_extents;
f67539c2 246 io::util::extent_to_file(m_ictx, object_no, off, len, file_extents);
9f95a23c
TL
247 for (Extents::iterator it = file_extents.begin();
248 it != file_extents.end(); ++it) {
249 if (new_journal_tid != 0) {
250 // ensure new journal event is safely committed to disk before
251 // committing old event
252 m_ictx->journal->flush_event(
253 new_journal_tid, new C_CommitIOEventExtent(m_ictx,
254 original_journal_tid,
255 it->first, it->second));
256 } else {
257 m_ictx->journal->commit_io_event_extent(original_journal_tid, it->first,
258 it->second, 0);
259 }
260 }
261}
262
263void ObjectCacherWriteback::complete_writes(const std::string& oid)
264{
265 ceph_assert(ceph_mutex_is_locked(m_lock));
266 std::queue<write_result_d*>& results = m_writes[oid];
267 ldout(m_ictx->cct, 20) << "complete_writes() oid " << oid << dendl;
268 std::list<write_result_d*> finished;
269
270 while (!results.empty()) {
271 write_result_d *result = results.front();
272 if (!result->done)
273 break;
274 finished.push_back(result);
275 results.pop();
276 }
277
278 if (results.empty())
279 m_writes.erase(oid);
280
281 for (std::list<write_result_d*>::iterator it = finished.begin();
282 it != finished.end(); ++it) {
283 write_result_d *result = *it;
284 ldout(m_ictx->cct, 20) << "complete_writes() completing " << result
285 << dendl;
286 result->oncommit->complete(result->ret);
287 delete result;
288 }
289}
290
291} // namespace cache
292} // namespace librbd