]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <errno.h> | |
5 | ||
6 | #include "common/ceph_context.h" | |
7 | #include "common/dout.h" | |
8 | #include "common/Mutex.h" | |
9 | #include "common/WorkQueue.h" | |
10 | #include "include/Context.h" | |
11 | #include "include/rados/librados.hpp" | |
12 | #include "include/rbd/librbd.hpp" | |
13 | ||
14 | #include "librbd/ExclusiveLock.h" | |
15 | #include "librbd/ImageCtx.h" | |
16 | #include "librbd/internal.h" | |
17 | #include "librbd/LibrbdWriteback.h" | |
18 | #include "librbd/ObjectMap.h" | |
19 | #include "librbd/Journal.h" | |
20 | #include "librbd/Utils.h" | |
21 | #include "librbd/io/AioCompletion.h" | |
22 | #include "librbd/io/ObjectRequest.h" | |
b32b8144 | 23 | #include "librbd/io/ReadResult.h" |
7c673cae FG |
24 | |
25 | #include "include/assert.h" | |
26 | ||
27 | #define dout_subsys ceph_subsys_rbd | |
28 | #undef dout_prefix | |
29 | #define dout_prefix *_dout << "librbdwriteback: " | |
30 | ||
31 | namespace librbd { | |
32 | ||
33 | /** | |
34 | * callback to finish a rados completion as a Context | |
35 | * | |
36 | * @param c completion | |
37 | * @param arg Context* recast as void* | |
38 | */ | |
39 | void context_cb(rados_completion_t c, void *arg) | |
40 | { | |
41 | Context *con = reinterpret_cast<Context *>(arg); | |
42 | con->complete(rados_aio_get_return_value(c)); | |
43 | } | |
44 | ||
45 | /** | |
46 | * context to wrap another context in a Mutex | |
47 | * | |
48 | * @param cct cct | |
49 | * @param c context to finish | |
50 | * @param l mutex to lock | |
51 | */ | |
52 | class C_ReadRequest : public Context { | |
53 | public: | |
54 | C_ReadRequest(CephContext *cct, Context *c, Mutex *cache_lock) | |
55 | : m_cct(cct), m_ctx(c), m_cache_lock(cache_lock) { | |
56 | } | |
57 | void finish(int r) override { | |
58 | ldout(m_cct, 20) << "aio_cb completing " << dendl; | |
59 | { | |
60 | Mutex::Locker cache_locker(*m_cache_lock); | |
61 | m_ctx->complete(r); | |
62 | } | |
63 | ldout(m_cct, 20) << "aio_cb finished" << dendl; | |
64 | } | |
65 | private: | |
66 | CephContext *m_cct; | |
67 | Context *m_ctx; | |
68 | Mutex *m_cache_lock; | |
69 | }; | |
70 | ||
71 | class C_OrderedWrite : public Context { | |
72 | public: | |
73 | C_OrderedWrite(CephContext *cct, LibrbdWriteback::write_result_d *result, | |
31f18b77 FG |
74 | const ZTracer::Trace &trace, LibrbdWriteback *wb) |
75 | : m_cct(cct), m_result(result), m_trace(trace), m_wb_handler(wb) {} | |
7c673cae FG |
76 | ~C_OrderedWrite() override {} |
77 | void finish(int r) override { | |
78 | ldout(m_cct, 20) << "C_OrderedWrite completing " << m_result << dendl; | |
79 | { | |
80 | Mutex::Locker l(m_wb_handler->m_lock); | |
81 | assert(!m_result->done); | |
82 | m_result->done = true; | |
83 | m_result->ret = r; | |
84 | m_wb_handler->complete_writes(m_result->oid); | |
85 | } | |
86 | ldout(m_cct, 20) << "C_OrderedWrite finished " << m_result << dendl; | |
31f18b77 | 87 | m_trace.event("finish"); |
7c673cae FG |
88 | } |
89 | private: | |
90 | CephContext *m_cct; | |
91 | LibrbdWriteback::write_result_d *m_result; | |
31f18b77 | 92 | ZTracer::Trace m_trace; |
7c673cae FG |
93 | LibrbdWriteback *m_wb_handler; |
94 | }; | |
95 | ||
96 | struct C_WriteJournalCommit : public Context { | |
97 | typedef std::vector<std::pair<uint64_t,uint64_t> > Extents; | |
98 | ||
99 | ImageCtx *image_ctx; | |
100 | std::string oid; | |
101 | uint64_t object_no; | |
102 | uint64_t off; | |
103 | bufferlist bl; | |
104 | SnapContext snapc; | |
7c673cae | 105 | uint64_t journal_tid; |
31f18b77 FG |
106 | ZTracer::Trace trace; |
107 | Context *req_comp; | |
108 | bool request_sent = false; | |
7c673cae FG |
109 | |
110 | C_WriteJournalCommit(ImageCtx *_image_ctx, const std::string &_oid, | |
111 | uint64_t _object_no, uint64_t _off, | |
112 | const bufferlist &_bl, const SnapContext& _snapc, | |
31f18b77 FG |
113 | uint64_t _journal_tid, |
114 | const ZTracer::Trace &trace, Context *_req_comp) | |
7c673cae | 115 | : image_ctx(_image_ctx), oid(_oid), object_no(_object_no), off(_off), |
31f18b77 FG |
116 | bl(_bl), snapc(_snapc), journal_tid(_journal_tid), |
117 | trace(trace), req_comp(_req_comp) { | |
7c673cae FG |
118 | CephContext *cct = image_ctx->cct; |
119 | ldout(cct, 20) << this << " C_WriteJournalCommit: " | |
120 | << "delaying write until journal tid " | |
121 | << journal_tid << " safe" << dendl; | |
122 | } | |
123 | ||
124 | void complete(int r) override { | |
125 | if (request_sent || r < 0) { | |
126 | if (request_sent && r == 0) { | |
127 | // only commit IO events that are safely recorded to the backing image | |
128 | // since the cache will retry all IOs that fail | |
129 | commit_io_event_extent(0); | |
130 | } | |
131 | ||
132 | req_comp->complete(r); | |
133 | delete this; | |
134 | } else { | |
135 | send_request(); | |
136 | } | |
137 | } | |
138 | ||
139 | void finish(int r) override { | |
140 | } | |
141 | ||
142 | void commit_io_event_extent(int r) { | |
143 | CephContext *cct = image_ctx->cct; | |
144 | ldout(cct, 20) << this << " C_WriteJournalCommit: " | |
145 | << "write committed: updating journal commit position" | |
146 | << dendl; | |
147 | ||
148 | // all IO operations are flushed prior to closing the journal | |
149 | assert(image_ctx->journal != NULL); | |
150 | ||
151 | Extents file_extents; | |
152 | Striper::extent_to_file(cct, &image_ctx->layout, object_no, off, | |
153 | bl.length(), file_extents); | |
154 | for (Extents::iterator it = file_extents.begin(); | |
155 | it != file_extents.end(); ++it) { | |
156 | image_ctx->journal->commit_io_event_extent(journal_tid, it->first, | |
157 | it->second, r); | |
158 | } | |
159 | } | |
160 | ||
161 | void send_request() { | |
162 | CephContext *cct = image_ctx->cct; | |
163 | ldout(cct, 20) << this << " C_WriteJournalCommit: " | |
164 | << "journal committed: sending write request" << dendl; | |
165 | ||
166 | assert(image_ctx->exclusive_lock->is_lock_owner()); | |
167 | ||
168 | request_sent = true; | |
b32b8144 FG |
169 | auto req = new io::ObjectWriteRequest<>(image_ctx, oid, object_no, off, |
170 | bl, snapc, 0, trace, this); | |
7c673cae FG |
171 | req->send(); |
172 | } | |
173 | }; | |
174 | ||
175 | struct C_CommitIOEventExtent : public Context { | |
176 | ImageCtx *image_ctx; | |
177 | uint64_t journal_tid; | |
178 | uint64_t offset; | |
179 | uint64_t length; | |
180 | ||
181 | C_CommitIOEventExtent(ImageCtx *image_ctx, uint64_t journal_tid, | |
182 | uint64_t offset, uint64_t length) | |
183 | : image_ctx(image_ctx), journal_tid(journal_tid), offset(offset), | |
184 | length(length) { | |
185 | } | |
186 | ||
187 | void finish(int r) override { | |
188 | // all IO operations are flushed prior to closing the journal | |
189 | assert(image_ctx->journal != nullptr); | |
190 | ||
191 | image_ctx->journal->commit_io_event_extent(journal_tid, offset, length, | |
192 | r); | |
193 | } | |
194 | }; | |
195 | ||
196 | LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock) | |
197 | : m_tid(0), m_lock(lock), m_ictx(ictx) { | |
198 | } | |
199 | ||
200 | void LibrbdWriteback::read(const object_t& oid, uint64_t object_no, | |
201 | const object_locator_t& oloc, | |
202 | uint64_t off, uint64_t len, snapid_t snapid, | |
203 | bufferlist *pbl, uint64_t trunc_size, | |
31f18b77 FG |
204 | __u32 trunc_seq, int op_flags, |
205 | const ZTracer::Trace &parent_trace, | |
206 | Context *onfinish) | |
7c673cae | 207 | { |
b32b8144 FG |
208 | ZTracer::Trace trace; |
209 | if (parent_trace.valid()) { | |
210 | trace.init("", &m_ictx->trace_endpoint, &parent_trace); | |
211 | trace.copy_name("cache read " + oid.name); | |
212 | trace.event("start"); | |
7c673cae FG |
213 | } |
214 | ||
b32b8144 FG |
215 | // on completion, take the mutex and then call onfinish. |
216 | onfinish = new C_ReadRequest(m_ictx->cct, onfinish, &m_lock); | |
217 | ||
218 | // re-use standard object read state machine | |
219 | auto aio_comp = io::AioCompletion::create_and_start(onfinish, m_ictx, | |
220 | io::AIO_TYPE_READ); | |
221 | aio_comp->read_result = io::ReadResult{pbl}; | |
222 | aio_comp->set_request_count(1); | |
223 | ||
224 | auto req_comp = new io::ReadResult::C_SparseReadRequest<>( | |
225 | aio_comp, {{0, len}}, false); | |
226 | auto req = io::ObjectReadRequest<>::create(m_ictx, oid.name, object_no, off, | |
227 | len, snapid, op_flags, true, | |
228 | trace, req_comp); | |
229 | req_comp->request = req; | |
230 | req->send(); | |
7c673cae FG |
231 | } |
232 | ||
233 | bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) | |
234 | { | |
235 | m_ictx->snap_lock.get_read(); | |
236 | librados::snap_t snap_id = m_ictx->snap_id; | |
237 | m_ictx->parent_lock.get_read(); | |
238 | uint64_t overlap = 0; | |
239 | m_ictx->get_parent_overlap(snap_id, &overlap); | |
240 | m_ictx->parent_lock.put_read(); | |
241 | m_ictx->snap_lock.put_read(); | |
242 | ||
243 | uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); | |
244 | ||
245 | // reverse map this object extent onto the parent | |
246 | vector<pair<uint64_t,uint64_t> > objectx; | |
247 | Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, | |
248 | object_no, 0, m_ictx->layout.object_size, | |
249 | objectx); | |
250 | uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap); | |
251 | bool may = object_overlap > 0; | |
252 | ldout(m_ictx->cct, 10) << "may_copy_on_write " << oid << " " << read_off | |
253 | << "~" << read_len << " = " << may << dendl; | |
254 | return may; | |
255 | } | |
256 | ||
257 | ceph_tid_t LibrbdWriteback::write(const object_t& oid, | |
258 | const object_locator_t& oloc, | |
259 | uint64_t off, uint64_t len, | |
260 | const SnapContext& snapc, | |
261 | const bufferlist &bl, | |
262 | ceph::real_time mtime, uint64_t trunc_size, | |
263 | __u32 trunc_seq, ceph_tid_t journal_tid, | |
31f18b77 | 264 | const ZTracer::Trace &parent_trace, |
7c673cae FG |
265 | Context *oncommit) |
266 | { | |
31f18b77 FG |
267 | ZTracer::Trace trace; |
268 | if (parent_trace.valid()) { | |
269 | trace.init("", &m_ictx->trace_endpoint, &parent_trace); | |
270 | trace.copy_name("writeback " + oid.name); | |
271 | trace.event("start"); | |
272 | } | |
273 | ||
7c673cae FG |
274 | uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); |
275 | ||
276 | write_result_d *result = new write_result_d(oid.name, oncommit); | |
277 | m_writes[oid.name].push(result); | |
278 | ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl; | |
31f18b77 FG |
279 | C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, trace, |
280 | this); | |
7c673cae FG |
281 | |
282 | // all IO operations are flushed prior to closing the journal | |
283 | assert(journal_tid == 0 || m_ictx->journal != NULL); | |
284 | if (journal_tid != 0) { | |
285 | m_ictx->journal->flush_event( | |
31f18b77 FG |
286 | journal_tid, new C_WriteJournalCommit( |
287 | m_ictx, oid.name, object_no, off, bl, snapc, journal_tid, trace, | |
288 | req_comp)); | |
7c673cae | 289 | } else { |
b32b8144 | 290 | auto req = new io::ObjectWriteRequest<>( |
31f18b77 | 291 | m_ictx, oid.name, object_no, off, bl, snapc, 0, trace, req_comp); |
7c673cae FG |
292 | req->send(); |
293 | } | |
294 | return ++m_tid; | |
295 | } | |
296 | ||
297 | ||
298 | void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off, | |
299 | uint64_t len, | |
300 | ceph_tid_t original_journal_tid, | |
301 | ceph_tid_t new_journal_tid) { | |
302 | typedef std::vector<std::pair<uint64_t,uint64_t> > Extents; | |
303 | ||
304 | ldout(m_ictx->cct, 20) << __func__ << ": " << oid << " " | |
305 | << off << "~" << len << " " | |
306 | << "journal_tid=" << original_journal_tid << ", " | |
307 | << "new_journal_tid=" << new_journal_tid << dendl; | |
308 | ||
309 | uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); | |
310 | ||
311 | // all IO operations are flushed prior to closing the journal | |
312 | assert(original_journal_tid != 0 && m_ictx->journal != NULL); | |
313 | ||
314 | Extents file_extents; | |
315 | Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off, | |
316 | len, file_extents); | |
317 | for (Extents::iterator it = file_extents.begin(); | |
318 | it != file_extents.end(); ++it) { | |
319 | if (new_journal_tid != 0) { | |
320 | // ensure new journal event is safely committed to disk before | |
321 | // committing old event | |
322 | m_ictx->journal->flush_event( | |
323 | new_journal_tid, new C_CommitIOEventExtent(m_ictx, | |
324 | original_journal_tid, | |
325 | it->first, it->second)); | |
326 | } else { | |
327 | m_ictx->journal->commit_io_event_extent(original_journal_tid, it->first, | |
328 | it->second, 0); | |
329 | } | |
330 | } | |
331 | } | |
332 | ||
333 | void LibrbdWriteback::complete_writes(const std::string& oid) | |
334 | { | |
335 | assert(m_lock.is_locked()); | |
336 | std::queue<write_result_d*>& results = m_writes[oid]; | |
337 | ldout(m_ictx->cct, 20) << "complete_writes() oid " << oid << dendl; | |
338 | std::list<write_result_d*> finished; | |
339 | ||
340 | while (!results.empty()) { | |
341 | write_result_d *result = results.front(); | |
342 | if (!result->done) | |
343 | break; | |
344 | finished.push_back(result); | |
345 | results.pop(); | |
346 | } | |
347 | ||
348 | if (results.empty()) | |
349 | m_writes.erase(oid); | |
350 | ||
351 | for (std::list<write_result_d*>::iterator it = finished.begin(); | |
352 | it != finished.end(); ++it) { | |
353 | write_result_d *result = *it; | |
354 | ldout(m_ictx->cct, 20) << "complete_writes() completing " << result | |
355 | << dendl; | |
356 | result->oncommit->complete(result->ret); | |
357 | delete result; | |
358 | } | |
359 | } | |
360 | } |