]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <errno.h> | |
5 | ||
6 | #include "librbd/cache/ObjectCacherWriteback.h" | |
7 | #include "common/ceph_context.h" | |
8 | #include "common/dout.h" | |
9 | #include "common/ceph_mutex.h" | |
9f95a23c TL |
10 | #include "osdc/Striper.h" |
11 | #include "include/Context.h" | |
f67539c2 | 12 | #include "include/neorados/RADOS.hpp" |
9f95a23c TL |
13 | #include "include/rados/librados.hpp" |
14 | #include "include/rbd/librbd.hpp" | |
15 | ||
16 | #include "librbd/ExclusiveLock.h" | |
17 | #include "librbd/ImageCtx.h" | |
18 | #include "librbd/internal.h" | |
19 | #include "librbd/ObjectMap.h" | |
20 | #include "librbd/Journal.h" | |
21 | #include "librbd/Utils.h" | |
f67539c2 | 22 | #include "librbd/asio/ContextWQ.h" |
9f95a23c TL |
23 | #include "librbd/io/AioCompletion.h" |
24 | #include "librbd/io/ObjectDispatchSpec.h" | |
f67539c2 | 25 | #include "librbd/io/ObjectDispatcherInterface.h" |
9f95a23c | 26 | #include "librbd/io/ReadResult.h" |
f67539c2 | 27 | #include "librbd/io/Utils.h" |
9f95a23c TL |
28 | |
29 | #include "include/ceph_assert.h" | |
30 | ||
31 | #define dout_subsys ceph_subsys_rbd | |
32 | #undef dout_prefix | |
33 | #define dout_prefix *_dout << "librbd::cache::ObjectCacherWriteback: " | |
34 | ||
20effc67 TL |
35 | using namespace std; |
36 | ||
9f95a23c TL |
37 | namespace librbd { |
38 | namespace cache { | |
39 | ||
40 | /** | |
41 | * context to wrap another context in a Mutex | |
42 | * | |
43 | * @param cct cct | |
44 | * @param c context to finish | |
45 | * @param l mutex to lock | |
46 | */ | |
47 | class C_ReadRequest : public Context { | |
48 | public: | |
49 | C_ReadRequest(CephContext *cct, Context *c, ceph::mutex *cache_lock) | |
50 | : m_cct(cct), m_ctx(c), m_cache_lock(cache_lock) { | |
51 | } | |
52 | void finish(int r) override { | |
53 | ldout(m_cct, 20) << "aio_cb completing " << dendl; | |
54 | { | |
55 | std::lock_guard cache_locker{*m_cache_lock}; | |
56 | m_ctx->complete(r); | |
57 | } | |
58 | ldout(m_cct, 20) << "aio_cb finished" << dendl; | |
59 | } | |
60 | private: | |
61 | CephContext *m_cct; | |
62 | Context *m_ctx; | |
63 | ceph::mutex *m_cache_lock; | |
64 | }; | |
65 | ||
66 | class C_OrderedWrite : public Context { | |
67 | public: | |
68 | C_OrderedWrite(CephContext *cct, | |
69 | ObjectCacherWriteback::write_result_d *result, | |
70 | const ZTracer::Trace &trace, ObjectCacherWriteback *wb) | |
71 | : m_cct(cct), m_result(result), m_trace(trace), m_wb_handler(wb) {} | |
72 | ~C_OrderedWrite() override {} | |
73 | void finish(int r) override { | |
74 | ldout(m_cct, 20) << "C_OrderedWrite completing " << m_result << dendl; | |
75 | { | |
76 | std::lock_guard l{m_wb_handler->m_lock}; | |
77 | ceph_assert(!m_result->done); | |
78 | m_result->done = true; | |
79 | m_result->ret = r; | |
80 | m_wb_handler->complete_writes(m_result->oid); | |
81 | } | |
82 | ldout(m_cct, 20) << "C_OrderedWrite finished " << m_result << dendl; | |
83 | m_trace.event("finish"); | |
84 | } | |
85 | private: | |
86 | CephContext *m_cct; | |
87 | ObjectCacherWriteback::write_result_d *m_result; | |
88 | ZTracer::Trace m_trace; | |
89 | ObjectCacherWriteback *m_wb_handler; | |
90 | }; | |
91 | ||
92 | struct C_CommitIOEventExtent : public Context { | |
93 | ImageCtx *image_ctx; | |
94 | uint64_t journal_tid; | |
95 | uint64_t offset; | |
96 | uint64_t length; | |
97 | ||
98 | C_CommitIOEventExtent(ImageCtx *image_ctx, uint64_t journal_tid, | |
99 | uint64_t offset, uint64_t length) | |
100 | : image_ctx(image_ctx), journal_tid(journal_tid), offset(offset), | |
101 | length(length) { | |
102 | } | |
103 | ||
104 | void finish(int r) override { | |
105 | // all IO operations are flushed prior to closing the journal | |
106 | ceph_assert(image_ctx->journal != nullptr); | |
107 | ||
108 | image_ctx->journal->commit_io_event_extent(journal_tid, offset, length, r); | |
109 | } | |
110 | }; | |
111 | ||
112 | ObjectCacherWriteback::ObjectCacherWriteback(ImageCtx *ictx, ceph::mutex& lock) | |
113 | : m_tid(0), m_lock(lock), m_ictx(ictx) { | |
114 | } | |
115 | ||
116 | void ObjectCacherWriteback::read(const object_t& oid, uint64_t object_no, | |
117 | const object_locator_t& oloc, | |
118 | uint64_t off, uint64_t len, snapid_t snapid, | |
119 | bufferlist *pbl, uint64_t trunc_size, | |
120 | __u32 trunc_seq, int op_flags, | |
121 | const ZTracer::Trace &parent_trace, | |
122 | Context *onfinish) | |
123 | { | |
124 | ZTracer::Trace trace; | |
125 | if (parent_trace.valid()) { | |
126 | trace.init("", &m_ictx->trace_endpoint, &parent_trace); | |
127 | trace.copy_name("cache read " + oid.name); | |
128 | trace.event("start"); | |
129 | } | |
130 | ||
131 | // on completion, take the mutex and then call onfinish. | |
132 | onfinish = new C_ReadRequest(m_ictx->cct, onfinish, &m_lock); | |
133 | ||
134 | // re-use standard object read state machine | |
135 | auto aio_comp = io::AioCompletion::create_and_start(onfinish, m_ictx, | |
136 | io::AIO_TYPE_READ); | |
137 | aio_comp->read_result = io::ReadResult{pbl}; | |
138 | aio_comp->set_request_count(1); | |
139 | ||
140 | auto req_comp = new io::ReadResult::C_ObjectReadRequest( | |
f67539c2 TL |
141 | aio_comp, {{off, len, {{0, len}}}}); |
142 | ||
143 | auto io_context = m_ictx->duplicate_data_io_context(); | |
144 | if (snapid != CEPH_NOSNAP) { | |
145 | io_context->read_snap(snapid); | |
146 | } | |
147 | ||
148 | // extract the embedded RBD read flags from the op_flags | |
149 | int read_flags = (op_flags & READ_FLAGS_MASK) >> READ_FLAGS_SHIFT; | |
150 | op_flags &= ~READ_FLAGS_MASK; | |
9f95a23c TL |
151 | |
152 | auto req = io::ObjectDispatchSpec::create_read( | |
f67539c2 TL |
153 | m_ictx, io::OBJECT_DISPATCH_LAYER_CACHE, object_no, &req_comp->extents, |
154 | io_context, op_flags, read_flags, trace, nullptr, req_comp); | |
9f95a23c TL |
155 | req->send(); |
156 | } | |
157 | ||
158 | bool ObjectCacherWriteback::may_copy_on_write(const object_t& oid, | |
159 | uint64_t read_off, | |
160 | uint64_t read_len, | |
161 | snapid_t snapid) | |
162 | { | |
163 | m_ictx->image_lock.lock_shared(); | |
164 | librados::snap_t snap_id = m_ictx->snap_id; | |
165 | uint64_t overlap = 0; | |
166 | m_ictx->get_parent_overlap(snap_id, &overlap); | |
167 | m_ictx->image_lock.unlock_shared(); | |
168 | ||
169 | uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); | |
170 | ||
171 | // reverse map this object extent onto the parent | |
172 | vector<pair<uint64_t,uint64_t> > objectx; | |
f67539c2 TL |
173 | io::util::extent_to_file( |
174 | m_ictx, object_no, 0, m_ictx->layout.object_size, objectx); | |
9f95a23c TL |
175 | uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap); |
176 | bool may = object_overlap > 0; | |
177 | ldout(m_ictx->cct, 10) << "may_copy_on_write " << oid << " " << read_off | |
178 | << "~" << read_len << " = " << may << dendl; | |
179 | return may; | |
180 | } | |
181 | ||
182 | ceph_tid_t ObjectCacherWriteback::write(const object_t& oid, | |
183 | const object_locator_t& oloc, | |
184 | uint64_t off, uint64_t len, | |
185 | const SnapContext& snapc, | |
186 | const bufferlist &bl, | |
187 | ceph::real_time mtime, | |
188 | uint64_t trunc_size, | |
189 | __u32 trunc_seq, ceph_tid_t journal_tid, | |
190 | const ZTracer::Trace &parent_trace, | |
191 | Context *oncommit) | |
192 | { | |
193 | ZTracer::Trace trace; | |
194 | if (parent_trace.valid()) { | |
195 | trace.init("", &m_ictx->trace_endpoint, &parent_trace); | |
196 | trace.copy_name("writeback " + oid.name); | |
197 | trace.event("start"); | |
198 | } | |
199 | ||
200 | uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); | |
201 | ||
202 | write_result_d *result = new write_result_d(oid.name, oncommit); | |
203 | m_writes[oid.name].push(result); | |
204 | ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl; | |
205 | ||
206 | bufferlist bl_copy(bl); | |
207 | ||
208 | Context *ctx = new C_OrderedWrite(m_ictx->cct, result, trace, this); | |
209 | ctx = util::create_async_context_callback(*m_ictx, ctx); | |
210 | ||
f67539c2 TL |
211 | auto io_context = m_ictx->duplicate_data_io_context(); |
212 | if (!snapc.empty()) { | |
213 | io_context->write_snap_context( | |
214 | {{snapc.seq, {snapc.snaps.begin(), snapc.snaps.end()}}}); | |
215 | } | |
216 | ||
9f95a23c TL |
217 | auto req = io::ObjectDispatchSpec::create_write( |
218 | m_ictx, io::OBJECT_DISPATCH_LAYER_CACHE, object_no, off, std::move(bl_copy), | |
f67539c2 | 219 | io_context, 0, 0, std::nullopt, journal_tid, trace, ctx); |
9f95a23c TL |
220 | req->object_dispatch_flags = ( |
221 | io::OBJECT_DISPATCH_FLAG_FLUSH | | |
222 | io::OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR); | |
223 | req->send(); | |
224 | ||
225 | return ++m_tid; | |
226 | } | |
227 | ||
228 | ||
229 | void ObjectCacherWriteback::overwrite_extent(const object_t& oid, uint64_t off, | |
230 | uint64_t len, | |
231 | ceph_tid_t original_journal_tid, | |
232 | ceph_tid_t new_journal_tid) { | |
233 | typedef std::vector<std::pair<uint64_t,uint64_t> > Extents; | |
234 | ||
235 | ldout(m_ictx->cct, 20) << __func__ << ": " << oid << " " | |
236 | << off << "~" << len << " " | |
237 | << "journal_tid=" << original_journal_tid << ", " | |
238 | << "new_journal_tid=" << new_journal_tid << dendl; | |
239 | ||
240 | uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); | |
241 | ||
242 | // all IO operations are flushed prior to closing the journal | |
243 | ceph_assert(original_journal_tid != 0 && m_ictx->journal != NULL); | |
244 | ||
245 | Extents file_extents; | |
f67539c2 | 246 | io::util::extent_to_file(m_ictx, object_no, off, len, file_extents); |
9f95a23c TL |
247 | for (Extents::iterator it = file_extents.begin(); |
248 | it != file_extents.end(); ++it) { | |
249 | if (new_journal_tid != 0) { | |
250 | // ensure new journal event is safely committed to disk before | |
251 | // committing old event | |
252 | m_ictx->journal->flush_event( | |
253 | new_journal_tid, new C_CommitIOEventExtent(m_ictx, | |
254 | original_journal_tid, | |
255 | it->first, it->second)); | |
256 | } else { | |
257 | m_ictx->journal->commit_io_event_extent(original_journal_tid, it->first, | |
258 | it->second, 0); | |
259 | } | |
260 | } | |
261 | } | |
262 | ||
263 | void ObjectCacherWriteback::complete_writes(const std::string& oid) | |
264 | { | |
265 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
266 | std::queue<write_result_d*>& results = m_writes[oid]; | |
267 | ldout(m_ictx->cct, 20) << "complete_writes() oid " << oid << dendl; | |
268 | std::list<write_result_d*> finished; | |
269 | ||
270 | while (!results.empty()) { | |
271 | write_result_d *result = results.front(); | |
272 | if (!result->done) | |
273 | break; | |
274 | finished.push_back(result); | |
275 | results.pop(); | |
276 | } | |
277 | ||
278 | if (results.empty()) | |
279 | m_writes.erase(oid); | |
280 | ||
281 | for (std::list<write_result_d*>::iterator it = finished.begin(); | |
282 | it != finished.end(); ++it) { | |
283 | write_result_d *result = *it; | |
284 | ldout(m_ictx->cct, 20) << "complete_writes() completing " << result | |
285 | << dendl; | |
286 | result->oncommit->complete(result->ret); | |
287 | delete result; | |
288 | } | |
289 | } | |
290 | ||
291 | } // namespace cache | |
292 | } // namespace librbd |