]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/LibrbdWriteback.cc
f9292caba2a63c5347c73817668ec892f3fbb21a
[ceph.git] / ceph / src / librbd / LibrbdWriteback.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5
6 #include "common/ceph_context.h"
7 #include "common/dout.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/Context.h"
11 #include "include/rados/librados.hpp"
12 #include "include/rbd/librbd.hpp"
13
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/internal.h"
17 #include "librbd/LibrbdWriteback.h"
18 #include "librbd/ObjectMap.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Utils.h"
21 #include "librbd/io/AioCompletion.h"
22 #include "librbd/io/ObjectRequest.h"
23
24 #include "include/assert.h"
25
26 #define dout_subsys ceph_subsys_rbd
27 #undef dout_prefix
28 #define dout_prefix *_dout << "librbdwriteback: "
29
30 namespace librbd {
31
32 /**
33 * callback to finish a rados completion as a Context
34 *
35 * @param c completion
36 * @param arg Context* recast as void*
37 */
38 void context_cb(rados_completion_t c, void *arg)
39 {
40 Context *con = reinterpret_cast<Context *>(arg);
41 con->complete(rados_aio_get_return_value(c));
42 }
43
44 /**
45 * context to wrap another context in a Mutex
46 *
47 * @param cct cct
48 * @param c context to finish
49 * @param l mutex to lock
50 */
51 class C_ReadRequest : public Context {
52 public:
53 C_ReadRequest(CephContext *cct, Context *c, Mutex *cache_lock)
54 : m_cct(cct), m_ctx(c), m_cache_lock(cache_lock) {
55 }
56 void finish(int r) override {
57 ldout(m_cct, 20) << "aio_cb completing " << dendl;
58 {
59 Mutex::Locker cache_locker(*m_cache_lock);
60 m_ctx->complete(r);
61 }
62 ldout(m_cct, 20) << "aio_cb finished" << dendl;
63 }
64 private:
65 CephContext *m_cct;
66 Context *m_ctx;
67 Mutex *m_cache_lock;
68 };
69
70 class C_OrderedWrite : public Context {
71 public:
72 C_OrderedWrite(CephContext *cct, LibrbdWriteback::write_result_d *result,
73 LibrbdWriteback *wb)
74 : m_cct(cct), m_result(result), m_wb_handler(wb) {}
75 ~C_OrderedWrite() override {}
76 void finish(int r) override {
77 ldout(m_cct, 20) << "C_OrderedWrite completing " << m_result << dendl;
78 {
79 Mutex::Locker l(m_wb_handler->m_lock);
80 assert(!m_result->done);
81 m_result->done = true;
82 m_result->ret = r;
83 m_wb_handler->complete_writes(m_result->oid);
84 }
85 ldout(m_cct, 20) << "C_OrderedWrite finished " << m_result << dendl;
86 }
87 private:
88 CephContext *m_cct;
89 LibrbdWriteback::write_result_d *m_result;
90 LibrbdWriteback *m_wb_handler;
91 };
92
93 struct C_WriteJournalCommit : public Context {
94 typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
95
96 ImageCtx *image_ctx;
97 std::string oid;
98 uint64_t object_no;
99 uint64_t off;
100 bufferlist bl;
101 SnapContext snapc;
102 Context *req_comp;
103 uint64_t journal_tid;
104 bool request_sent;
105
106 C_WriteJournalCommit(ImageCtx *_image_ctx, const std::string &_oid,
107 uint64_t _object_no, uint64_t _off,
108 const bufferlist &_bl, const SnapContext& _snapc,
109 Context *_req_comp, uint64_t _journal_tid)
110 : image_ctx(_image_ctx), oid(_oid), object_no(_object_no), off(_off),
111 bl(_bl), snapc(_snapc), req_comp(_req_comp), journal_tid(_journal_tid),
112 request_sent(false) {
113 CephContext *cct = image_ctx->cct;
114 ldout(cct, 20) << this << " C_WriteJournalCommit: "
115 << "delaying write until journal tid "
116 << journal_tid << " safe" << dendl;
117 }
118
119 void complete(int r) override {
120 if (request_sent || r < 0) {
121 if (request_sent && r == 0) {
122 // only commit IO events that are safely recorded to the backing image
123 // since the cache will retry all IOs that fail
124 commit_io_event_extent(0);
125 }
126
127 req_comp->complete(r);
128 delete this;
129 } else {
130 send_request();
131 }
132 }
133
134 void finish(int r) override {
135 }
136
137 void commit_io_event_extent(int r) {
138 CephContext *cct = image_ctx->cct;
139 ldout(cct, 20) << this << " C_WriteJournalCommit: "
140 << "write committed: updating journal commit position"
141 << dendl;
142
143 // all IO operations are flushed prior to closing the journal
144 assert(image_ctx->journal != NULL);
145
146 Extents file_extents;
147 Striper::extent_to_file(cct, &image_ctx->layout, object_no, off,
148 bl.length(), file_extents);
149 for (Extents::iterator it = file_extents.begin();
150 it != file_extents.end(); ++it) {
151 image_ctx->journal->commit_io_event_extent(journal_tid, it->first,
152 it->second, r);
153 }
154 }
155
156 void send_request() {
157 CephContext *cct = image_ctx->cct;
158 ldout(cct, 20) << this << " C_WriteJournalCommit: "
159 << "journal committed: sending write request" << dendl;
160
161 assert(image_ctx->exclusive_lock->is_lock_owner());
162
163 request_sent = true;
164 auto req = new io::ObjectWriteRequest(image_ctx, oid, object_no, off,
165 bl, snapc, this, 0);
166 req->send();
167 }
168 };
169
170 struct C_CommitIOEventExtent : public Context {
171 ImageCtx *image_ctx;
172 uint64_t journal_tid;
173 uint64_t offset;
174 uint64_t length;
175
176 C_CommitIOEventExtent(ImageCtx *image_ctx, uint64_t journal_tid,
177 uint64_t offset, uint64_t length)
178 : image_ctx(image_ctx), journal_tid(journal_tid), offset(offset),
179 length(length) {
180 }
181
182 void finish(int r) override {
183 // all IO operations are flushed prior to closing the journal
184 assert(image_ctx->journal != nullptr);
185
186 image_ctx->journal->commit_io_event_extent(journal_tid, offset, length,
187 r);
188 }
189 };
190
191 LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
192 : m_tid(0), m_lock(lock), m_ictx(ictx) {
193 }
194
195 void LibrbdWriteback::read(const object_t& oid, uint64_t object_no,
196 const object_locator_t& oloc,
197 uint64_t off, uint64_t len, snapid_t snapid,
198 bufferlist *pbl, uint64_t trunc_size,
199 __u32 trunc_seq, int op_flags, Context *onfinish)
200 {
201 // on completion, take the mutex and then call onfinish.
202 Context *req = new C_ReadRequest(m_ictx->cct, onfinish, &m_lock);
203
204 {
205 RWLock::RLocker snap_locker(m_ictx->snap_lock);
206 if (m_ictx->object_map != nullptr &&
207 !m_ictx->object_map->object_may_exist(object_no)) {
208 m_ictx->op_work_queue->queue(req, -ENOENT);
209 return;
210 }
211 }
212
213 librados::ObjectReadOperation op;
214 op.read(off, len, pbl, NULL);
215 op.set_op_flags2(op_flags);
216 int flags = m_ictx->get_read_flags(snapid);
217
218 librados::AioCompletion *rados_completion =
219 util::create_rados_callback(req);
220 int r = m_ictx->data_ctx.aio_operate(oid.name, rados_completion, &op,
221 flags, NULL);
222 rados_completion->release();
223 assert(r >= 0);
224 }
225
226 bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid)
227 {
228 m_ictx->snap_lock.get_read();
229 librados::snap_t snap_id = m_ictx->snap_id;
230 m_ictx->parent_lock.get_read();
231 uint64_t overlap = 0;
232 m_ictx->get_parent_overlap(snap_id, &overlap);
233 m_ictx->parent_lock.put_read();
234 m_ictx->snap_lock.put_read();
235
236 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
237
238 // reverse map this object extent onto the parent
239 vector<pair<uint64_t,uint64_t> > objectx;
240 Striper::extent_to_file(m_ictx->cct, &m_ictx->layout,
241 object_no, 0, m_ictx->layout.object_size,
242 objectx);
243 uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap);
244 bool may = object_overlap > 0;
245 ldout(m_ictx->cct, 10) << "may_copy_on_write " << oid << " " << read_off
246 << "~" << read_len << " = " << may << dendl;
247 return may;
248 }
249
250 ceph_tid_t LibrbdWriteback::write(const object_t& oid,
251 const object_locator_t& oloc,
252 uint64_t off, uint64_t len,
253 const SnapContext& snapc,
254 const bufferlist &bl,
255 ceph::real_time mtime, uint64_t trunc_size,
256 __u32 trunc_seq, ceph_tid_t journal_tid,
257 Context *oncommit)
258 {
259 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
260
261 write_result_d *result = new write_result_d(oid.name, oncommit);
262 m_writes[oid.name].push(result);
263 ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl;
264 C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, this);
265
266 // all IO operations are flushed prior to closing the journal
267 assert(journal_tid == 0 || m_ictx->journal != NULL);
268 if (journal_tid != 0) {
269 m_ictx->journal->flush_event(
270 journal_tid, new C_WriteJournalCommit(m_ictx, oid.name, object_no, off,
271 bl, snapc, req_comp,
272 journal_tid));
273 } else {
274 auto req = new io::ObjectWriteRequest(m_ictx, oid.name, object_no,
275 off, bl, snapc, req_comp, 0);
276 req->send();
277 }
278 return ++m_tid;
279 }
280
281
282 void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off,
283 uint64_t len,
284 ceph_tid_t original_journal_tid,
285 ceph_tid_t new_journal_tid) {
286 typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
287
288 ldout(m_ictx->cct, 20) << __func__ << ": " << oid << " "
289 << off << "~" << len << " "
290 << "journal_tid=" << original_journal_tid << ", "
291 << "new_journal_tid=" << new_journal_tid << dendl;
292
293 uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
294
295 // all IO operations are flushed prior to closing the journal
296 assert(original_journal_tid != 0 && m_ictx->journal != NULL);
297
298 Extents file_extents;
299 Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off,
300 len, file_extents);
301 for (Extents::iterator it = file_extents.begin();
302 it != file_extents.end(); ++it) {
303 if (new_journal_tid != 0) {
304 // ensure new journal event is safely committed to disk before
305 // committing old event
306 m_ictx->journal->flush_event(
307 new_journal_tid, new C_CommitIOEventExtent(m_ictx,
308 original_journal_tid,
309 it->first, it->second));
310 } else {
311 m_ictx->journal->commit_io_event_extent(original_journal_tid, it->first,
312 it->second, 0);
313 }
314 }
315 }
316
317 void LibrbdWriteback::complete_writes(const std::string& oid)
318 {
319 assert(m_lock.is_locked());
320 std::queue<write_result_d*>& results = m_writes[oid];
321 ldout(m_ictx->cct, 20) << "complete_writes() oid " << oid << dendl;
322 std::list<write_result_d*> finished;
323
324 while (!results.empty()) {
325 write_result_d *result = results.front();
326 if (!result->done)
327 break;
328 finished.push_back(result);
329 results.pop();
330 }
331
332 if (results.empty())
333 m_writes.erase(oid);
334
335 for (std::list<write_result_d*>::iterator it = finished.begin();
336 it != finished.end(); ++it) {
337 write_result_d *result = *it;
338 ldout(m_ictx->cct, 20) << "complete_writes() completing " << result
339 << dendl;
340 result->oncommit->complete(result->ret);
341 delete result;
342 }
343 }
344 }