]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/LibrbdWriteback.cc
update sources to v12.2.3
[ceph.git] / ceph / src / librbd / LibrbdWriteback.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <errno.h>
5
6#include "common/ceph_context.h"
7#include "common/dout.h"
8#include "common/Mutex.h"
9#include "common/WorkQueue.h"
10#include "include/Context.h"
11#include "include/rados/librados.hpp"
12#include "include/rbd/librbd.hpp"
13
14#include "librbd/ExclusiveLock.h"
15#include "librbd/ImageCtx.h"
16#include "librbd/internal.h"
17#include "librbd/LibrbdWriteback.h"
18#include "librbd/ObjectMap.h"
19#include "librbd/Journal.h"
20#include "librbd/Utils.h"
21#include "librbd/io/AioCompletion.h"
22#include "librbd/io/ObjectRequest.h"
b32b8144 23#include "librbd/io/ReadResult.h"
7c673cae
FG
24
25#include "include/assert.h"
26
27#define dout_subsys ceph_subsys_rbd
28#undef dout_prefix
29#define dout_prefix *_dout << "librbdwriteback: "
30
31namespace librbd {
32
33 /**
34 * callback to finish a rados completion as a Context
35 *
36 * @param c completion
37 * @param arg Context* recast as void*
38 */
39 void context_cb(rados_completion_t c, void *arg)
40 {
41 Context *con = reinterpret_cast<Context *>(arg);
42 con->complete(rados_aio_get_return_value(c));
43 }
44
45 /**
46 * context to wrap another context in a Mutex
47 *
48 * @param cct cct
49 * @param c context to finish
50 * @param l mutex to lock
51 */
52 class C_ReadRequest : public Context {
53 public:
54 C_ReadRequest(CephContext *cct, Context *c, Mutex *cache_lock)
55 : m_cct(cct), m_ctx(c), m_cache_lock(cache_lock) {
56 }
57 void finish(int r) override {
58 ldout(m_cct, 20) << "aio_cb completing " << dendl;
59 {
60 Mutex::Locker cache_locker(*m_cache_lock);
61 m_ctx->complete(r);
62 }
63 ldout(m_cct, 20) << "aio_cb finished" << dendl;
64 }
65 private:
66 CephContext *m_cct;
67 Context *m_ctx;
68 Mutex *m_cache_lock;
69 };
70
71 class C_OrderedWrite : public Context {
72 public:
73 C_OrderedWrite(CephContext *cct, LibrbdWriteback::write_result_d *result,
31f18b77
FG
74 const ZTracer::Trace &trace, LibrbdWriteback *wb)
75 : m_cct(cct), m_result(result), m_trace(trace), m_wb_handler(wb) {}
7c673cae
FG
76 ~C_OrderedWrite() override {}
77 void finish(int r) override {
78 ldout(m_cct, 20) << "C_OrderedWrite completing " << m_result << dendl;
79 {
80 Mutex::Locker l(m_wb_handler->m_lock);
81 assert(!m_result->done);
82 m_result->done = true;
83 m_result->ret = r;
84 m_wb_handler->complete_writes(m_result->oid);
85 }
86 ldout(m_cct, 20) << "C_OrderedWrite finished " << m_result << dendl;
31f18b77 87 m_trace.event("finish");
7c673cae
FG
88 }
89 private:
90 CephContext *m_cct;
91 LibrbdWriteback::write_result_d *m_result;
31f18b77 92 ZTracer::Trace m_trace;
7c673cae
FG
93 LibrbdWriteback *m_wb_handler;
94 };
95
96 struct C_WriteJournalCommit : public Context {
97 typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
98
99 ImageCtx *image_ctx;
100 std::string oid;
101 uint64_t object_no;
102 uint64_t off;
103 bufferlist bl;
104 SnapContext snapc;
7c673cae 105 uint64_t journal_tid;
31f18b77
FG
106 ZTracer::Trace trace;
107 Context *req_comp;
108 bool request_sent = false;
7c673cae
FG
109
110 C_WriteJournalCommit(ImageCtx *_image_ctx, const std::string &_oid,
111 uint64_t _object_no, uint64_t _off,
112 const bufferlist &_bl, const SnapContext& _snapc,
31f18b77
FG
113 uint64_t _journal_tid,
114 const ZTracer::Trace &trace, Context *_req_comp)
7c673cae 115 : image_ctx(_image_ctx), oid(_oid), object_no(_object_no), off(_off),
31f18b77
FG
116 bl(_bl), snapc(_snapc), journal_tid(_journal_tid),
117 trace(trace), req_comp(_req_comp) {
7c673cae
FG
118 CephContext *cct = image_ctx->cct;
119 ldout(cct, 20) << this << " C_WriteJournalCommit: "
120 << "delaying write until journal tid "
121 << journal_tid << " safe" << dendl;
122 }
123
124 void complete(int r) override {
125 if (request_sent || r < 0) {
126 if (request_sent && r == 0) {
127 // only commit IO events that are safely recorded to the backing image
128 // since the cache will retry all IOs that fail
129 commit_io_event_extent(0);
130 }
131
132 req_comp->complete(r);
133 delete this;
134 } else {
135 send_request();
136 }
137 }
138
139 void finish(int r) override {
140 }
141
142 void commit_io_event_extent(int r) {
143 CephContext *cct = image_ctx->cct;
144 ldout(cct, 20) << this << " C_WriteJournalCommit: "
145 << "write committed: updating journal commit position"
146 << dendl;
147
148 // all IO operations are flushed prior to closing the journal
149 assert(image_ctx->journal != NULL);
150
151 Extents file_extents;
152 Striper::extent_to_file(cct, &image_ctx->layout, object_no, off,
153 bl.length(), file_extents);
154 for (Extents::iterator it = file_extents.begin();
155 it != file_extents.end(); ++it) {
156 image_ctx->journal->commit_io_event_extent(journal_tid, it->first,
157 it->second, r);
158 }
159 }
160
161 void send_request() {
162 CephContext *cct = image_ctx->cct;
163 ldout(cct, 20) << this << " C_WriteJournalCommit: "
164 << "journal committed: sending write request" << dendl;
165
166 assert(image_ctx->exclusive_lock->is_lock_owner());
167
168 request_sent = true;
b32b8144
FG
169 auto req = new io::ObjectWriteRequest<>(image_ctx, oid, object_no, off,
170 bl, snapc, 0, trace, this);
7c673cae
FG
171 req->send();
172 }
173 };
174
175 struct C_CommitIOEventExtent : public Context {
176 ImageCtx *image_ctx;
177 uint64_t journal_tid;
178 uint64_t offset;
179 uint64_t length;
180
181 C_CommitIOEventExtent(ImageCtx *image_ctx, uint64_t journal_tid,
182 uint64_t offset, uint64_t length)
183 : image_ctx(image_ctx), journal_tid(journal_tid), offset(offset),
184 length(length) {
185 }
186
187 void finish(int r) override {
188 // all IO operations are flushed prior to closing the journal
189 assert(image_ctx->journal != nullptr);
190
191 image_ctx->journal->commit_io_event_extent(journal_tid, offset, length,
192 r);
193 }
194 };
195
196 LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
197 : m_tid(0), m_lock(lock), m_ictx(ictx) {
198 }
199
200 void LibrbdWriteback::read(const object_t& oid, uint64_t object_no,
201 const object_locator_t& oloc,
202 uint64_t off, uint64_t len, snapid_t snapid,
203 bufferlist *pbl, uint64_t trunc_size,
31f18b77
FG
204 __u32 trunc_seq, int op_flags,
205 const ZTracer::Trace &parent_trace,
206 Context *onfinish)
7c673cae 207 {
b32b8144
FG
208 ZTracer::Trace trace;
209 if (parent_trace.valid()) {
210 trace.init("", &m_ictx->trace_endpoint, &parent_trace);
211 trace.copy_name("cache read " + oid.name);
212 trace.event("start");
7c673cae
FG
213 }
214
b32b8144
FG
215 // on completion, take the mutex and then call onfinish.
216 onfinish = new C_ReadRequest(m_ictx->cct, onfinish, &m_lock);
217
218 // re-use standard object read state machine
219 auto aio_comp = io::AioCompletion::create_and_start(onfinish, m_ictx,
220 io::AIO_TYPE_READ);
221 aio_comp->read_result = io::ReadResult{pbl};
222 aio_comp->set_request_count(1);
223
224 auto req_comp = new io::ReadResult::C_SparseReadRequest<>(
225 aio_comp, {{0, len}}, false);
226 auto req = io::ObjectReadRequest<>::create(m_ictx, oid.name, object_no, off,
227 len, snapid, op_flags, true,
228 trace, req_comp);
229 req_comp->request = req;
230 req->send();
7c673cae
FG
231 }
232
233 bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid)
234 {
235 m_ictx->snap_lock.get_read();
236 librados::snap_t snap_id = m_ictx->snap_id;
237 m_ictx->parent_lock.get_read();
238 uint64_t overlap = 0;
239 m_ictx->get_parent_overlap(snap_id, &overlap);
240 m_ictx->parent_lock.put_read();
241 m_ictx->snap_lock.put_read();
242
243 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
244
245 // reverse map this object extent onto the parent
246 vector<pair<uint64_t,uint64_t> > objectx;
247 Striper::extent_to_file(m_ictx->cct, &m_ictx->layout,
248 object_no, 0, m_ictx->layout.object_size,
249 objectx);
250 uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap);
251 bool may = object_overlap > 0;
252 ldout(m_ictx->cct, 10) << "may_copy_on_write " << oid << " " << read_off
253 << "~" << read_len << " = " << may << dendl;
254 return may;
255 }
256
257 ceph_tid_t LibrbdWriteback::write(const object_t& oid,
258 const object_locator_t& oloc,
259 uint64_t off, uint64_t len,
260 const SnapContext& snapc,
261 const bufferlist &bl,
262 ceph::real_time mtime, uint64_t trunc_size,
263 __u32 trunc_seq, ceph_tid_t journal_tid,
31f18b77 264 const ZTracer::Trace &parent_trace,
7c673cae
FG
265 Context *oncommit)
266 {
31f18b77
FG
267 ZTracer::Trace trace;
268 if (parent_trace.valid()) {
269 trace.init("", &m_ictx->trace_endpoint, &parent_trace);
270 trace.copy_name("writeback " + oid.name);
271 trace.event("start");
272 }
273
7c673cae
FG
274 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
275
276 write_result_d *result = new write_result_d(oid.name, oncommit);
277 m_writes[oid.name].push(result);
278 ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl;
31f18b77
FG
279 C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, trace,
280 this);
7c673cae
FG
281
282 // all IO operations are flushed prior to closing the journal
283 assert(journal_tid == 0 || m_ictx->journal != NULL);
284 if (journal_tid != 0) {
285 m_ictx->journal->flush_event(
31f18b77
FG
286 journal_tid, new C_WriteJournalCommit(
287 m_ictx, oid.name, object_no, off, bl, snapc, journal_tid, trace,
288 req_comp));
7c673cae 289 } else {
b32b8144 290 auto req = new io::ObjectWriteRequest<>(
31f18b77 291 m_ictx, oid.name, object_no, off, bl, snapc, 0, trace, req_comp);
7c673cae
FG
292 req->send();
293 }
294 return ++m_tid;
295 }
296
297
298 void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off,
299 uint64_t len,
300 ceph_tid_t original_journal_tid,
301 ceph_tid_t new_journal_tid) {
302 typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
303
304 ldout(m_ictx->cct, 20) << __func__ << ": " << oid << " "
305 << off << "~" << len << " "
306 << "journal_tid=" << original_journal_tid << ", "
307 << "new_journal_tid=" << new_journal_tid << dendl;
308
309 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
310
311 // all IO operations are flushed prior to closing the journal
312 assert(original_journal_tid != 0 && m_ictx->journal != NULL);
313
314 Extents file_extents;
315 Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off,
316 len, file_extents);
317 for (Extents::iterator it = file_extents.begin();
318 it != file_extents.end(); ++it) {
319 if (new_journal_tid != 0) {
320 // ensure new journal event is safely committed to disk before
321 // committing old event
322 m_ictx->journal->flush_event(
323 new_journal_tid, new C_CommitIOEventExtent(m_ictx,
324 original_journal_tid,
325 it->first, it->second));
326 } else {
327 m_ictx->journal->commit_io_event_extent(original_journal_tid, it->first,
328 it->second, 0);
329 }
330 }
331 }
332
333 void LibrbdWriteback::complete_writes(const std::string& oid)
334 {
335 assert(m_lock.is_locked());
336 std::queue<write_result_d*>& results = m_writes[oid];
337 ldout(m_ictx->cct, 20) << "complete_writes() oid " << oid << dendl;
338 std::list<write_result_d*> finished;
339
340 while (!results.empty()) {
341 write_result_d *result = results.front();
342 if (!result->done)
343 break;
344 finished.push_back(result);
345 results.pop();
346 }
347
348 if (results.empty())
349 m_writes.erase(oid);
350
351 for (std::list<write_result_d*>::iterator it = finished.begin();
352 it != finished.end(); ++it) {
353 write_result_d *result = *it;
354 ldout(m_ictx->cct, 20) << "complete_writes() completing " << result
355 << dendl;
356 result->oncommit->complete(result->ret);
357 delete result;
358 }
359 }
360}