1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "common/ceph_context.h"
7 #include "common/dout.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/Context.h"
11 #include "include/rados/librados.hpp"
12 #include "include/rbd/librbd.hpp"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/internal.h"
17 #include "librbd/LibrbdWriteback.h"
18 #include "librbd/ObjectMap.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Utils.h"
21 #include "librbd/io/AioCompletion.h"
22 #include "librbd/io/ObjectRequest.h"
23 #include "librbd/io/ReadResult.h"
25 #include "include/assert.h"
27 #define dout_subsys ceph_subsys_rbd
29 #define dout_prefix *_dout << "librbdwriteback: "
34 * callback to finish a rados completion as a Context
37 * @param arg Context* recast as void*
39 void context_cb(rados_completion_t c
, void *arg
)
41 Context
*con
= reinterpret_cast<Context
*>(arg
);
42 con
->complete(rados_aio_get_return_value(c
));
46 * context to wrap another context in a Mutex
49 * @param c context to finish
50 * @param l mutex to lock
52 class C_ReadRequest
: public Context
{
54 C_ReadRequest(CephContext
*cct
, Context
*c
, Mutex
*cache_lock
)
55 : m_cct(cct
), m_ctx(c
), m_cache_lock(cache_lock
) {
57 void finish(int r
) override
{
58 ldout(m_cct
, 20) << "aio_cb completing " << dendl
;
60 Mutex::Locker
cache_locker(*m_cache_lock
);
63 ldout(m_cct
, 20) << "aio_cb finished" << dendl
;
71 class C_OrderedWrite
: public Context
{
73 C_OrderedWrite(CephContext
*cct
, LibrbdWriteback::write_result_d
*result
,
74 const ZTracer::Trace
&trace
, LibrbdWriteback
*wb
)
75 : m_cct(cct
), m_result(result
), m_trace(trace
), m_wb_handler(wb
) {}
76 ~C_OrderedWrite() override
{}
77 void finish(int r
) override
{
78 ldout(m_cct
, 20) << "C_OrderedWrite completing " << m_result
<< dendl
;
80 Mutex::Locker
l(m_wb_handler
->m_lock
);
81 assert(!m_result
->done
);
82 m_result
->done
= true;
84 m_wb_handler
->complete_writes(m_result
->oid
);
86 ldout(m_cct
, 20) << "C_OrderedWrite finished " << m_result
<< dendl
;
87 m_trace
.event("finish");
91 LibrbdWriteback::write_result_d
*m_result
;
92 ZTracer::Trace m_trace
;
93 LibrbdWriteback
*m_wb_handler
;
96 struct C_WriteJournalCommit
: public Context
{
97 typedef std::vector
<std::pair
<uint64_t,uint64_t> > Extents
;
105 uint64_t journal_tid
;
106 ZTracer::Trace trace
;
108 bool request_sent
= false;
110 C_WriteJournalCommit(ImageCtx
*_image_ctx
, const std::string
&_oid
,
111 uint64_t _object_no
, uint64_t _off
,
112 const bufferlist
&_bl
, const SnapContext
& _snapc
,
113 uint64_t _journal_tid
,
114 const ZTracer::Trace
&trace
, Context
*_req_comp
)
115 : image_ctx(_image_ctx
), oid(_oid
), object_no(_object_no
), off(_off
),
116 bl(_bl
), snapc(_snapc
), journal_tid(_journal_tid
),
117 trace(trace
), req_comp(_req_comp
) {
118 CephContext
*cct
= image_ctx
->cct
;
119 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
120 << "delaying write until journal tid "
121 << journal_tid
<< " safe" << dendl
;
124 void complete(int r
) override
{
125 if (request_sent
|| r
< 0) {
126 if (request_sent
&& r
== 0) {
127 // only commit IO events that are safely recorded to the backing image
128 // since the cache will retry all IOs that fail
129 commit_io_event_extent(0);
132 req_comp
->complete(r
);
139 void finish(int r
) override
{
142 void commit_io_event_extent(int r
) {
143 CephContext
*cct
= image_ctx
->cct
;
144 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
145 << "write committed: updating journal commit position"
148 // all IO operations are flushed prior to closing the journal
149 assert(image_ctx
->journal
!= NULL
);
151 Extents file_extents
;
152 Striper::extent_to_file(cct
, &image_ctx
->layout
, object_no
, off
,
153 bl
.length(), file_extents
);
154 for (Extents::iterator it
= file_extents
.begin();
155 it
!= file_extents
.end(); ++it
) {
156 image_ctx
->journal
->commit_io_event_extent(journal_tid
, it
->first
,
161 void send_request() {
162 CephContext
*cct
= image_ctx
->cct
;
163 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
164 << "journal committed: sending write request" << dendl
;
166 assert(image_ctx
->exclusive_lock
->is_lock_owner());
169 auto req
= new io::ObjectWriteRequest
<>(image_ctx
, oid
, object_no
, off
,
170 bl
, snapc
, 0, trace
, this);
175 struct C_CommitIOEventExtent
: public Context
{
177 uint64_t journal_tid
;
181 C_CommitIOEventExtent(ImageCtx
*image_ctx
, uint64_t journal_tid
,
182 uint64_t offset
, uint64_t length
)
183 : image_ctx(image_ctx
), journal_tid(journal_tid
), offset(offset
),
187 void finish(int r
) override
{
188 // all IO operations are flushed prior to closing the journal
189 assert(image_ctx
->journal
!= nullptr);
191 image_ctx
->journal
->commit_io_event_extent(journal_tid
, offset
, length
,
196 LibrbdWriteback::LibrbdWriteback(ImageCtx
*ictx
, Mutex
& lock
)
197 : m_tid(0), m_lock(lock
), m_ictx(ictx
) {
200 void LibrbdWriteback::read(const object_t
& oid
, uint64_t object_no
,
201 const object_locator_t
& oloc
,
202 uint64_t off
, uint64_t len
, snapid_t snapid
,
203 bufferlist
*pbl
, uint64_t trunc_size
,
204 __u32 trunc_seq
, int op_flags
,
205 const ZTracer::Trace
&parent_trace
,
208 ZTracer::Trace trace
;
209 if (parent_trace
.valid()) {
210 trace
.init("", &m_ictx
->trace_endpoint
, &parent_trace
);
211 trace
.copy_name("cache read " + oid
.name
);
212 trace
.event("start");
215 // on completion, take the mutex and then call onfinish.
216 onfinish
= new C_ReadRequest(m_ictx
->cct
, onfinish
, &m_lock
);
218 // re-use standard object read state machine
219 auto aio_comp
= io::AioCompletion::create_and_start(onfinish
, m_ictx
,
221 aio_comp
->read_result
= io::ReadResult
{pbl
};
222 aio_comp
->set_request_count(1);
224 auto req_comp
= new io::ReadResult::C_SparseReadRequest
<>(
225 aio_comp
, {{0, len
}}, false);
226 auto req
= io::ObjectReadRequest
<>::create(m_ictx
, oid
.name
, object_no
, off
,
227 len
, snapid
, op_flags
, true,
229 req_comp
->request
= req
;
233 bool LibrbdWriteback::may_copy_on_write(const object_t
& oid
, uint64_t read_off
, uint64_t read_len
, snapid_t snapid
)
235 m_ictx
->snap_lock
.get_read();
236 librados::snap_t snap_id
= m_ictx
->snap_id
;
237 m_ictx
->parent_lock
.get_read();
238 uint64_t overlap
= 0;
239 m_ictx
->get_parent_overlap(snap_id
, &overlap
);
240 m_ictx
->parent_lock
.put_read();
241 m_ictx
->snap_lock
.put_read();
243 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
245 // reverse map this object extent onto the parent
246 vector
<pair
<uint64_t,uint64_t> > objectx
;
247 Striper::extent_to_file(m_ictx
->cct
, &m_ictx
->layout
,
248 object_no
, 0, m_ictx
->layout
.object_size
,
250 uint64_t object_overlap
= m_ictx
->prune_parent_extents(objectx
, overlap
);
251 bool may
= object_overlap
> 0;
252 ldout(m_ictx
->cct
, 10) << "may_copy_on_write " << oid
<< " " << read_off
253 << "~" << read_len
<< " = " << may
<< dendl
;
257 ceph_tid_t
LibrbdWriteback::write(const object_t
& oid
,
258 const object_locator_t
& oloc
,
259 uint64_t off
, uint64_t len
,
260 const SnapContext
& snapc
,
261 const bufferlist
&bl
,
262 ceph::real_time mtime
, uint64_t trunc_size
,
263 __u32 trunc_seq
, ceph_tid_t journal_tid
,
264 const ZTracer::Trace
&parent_trace
,
267 ZTracer::Trace trace
;
268 if (parent_trace
.valid()) {
269 trace
.init("", &m_ictx
->trace_endpoint
, &parent_trace
);
270 trace
.copy_name("writeback " + oid
.name
);
271 trace
.event("start");
274 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
276 write_result_d
*result
= new write_result_d(oid
.name
, oncommit
);
277 m_writes
[oid
.name
].push(result
);
278 ldout(m_ictx
->cct
, 20) << "write will wait for result " << result
<< dendl
;
279 C_OrderedWrite
*req_comp
= new C_OrderedWrite(m_ictx
->cct
, result
, trace
,
282 // all IO operations are flushed prior to closing the journal
283 assert(journal_tid
== 0 || m_ictx
->journal
!= NULL
);
284 if (journal_tid
!= 0) {
285 m_ictx
->journal
->flush_event(
286 journal_tid
, new C_WriteJournalCommit(
287 m_ictx
, oid
.name
, object_no
, off
, bl
, snapc
, journal_tid
, trace
,
290 auto req
= new io::ObjectWriteRequest
<>(
291 m_ictx
, oid
.name
, object_no
, off
, bl
, snapc
, 0, trace
, req_comp
);
298 void LibrbdWriteback::overwrite_extent(const object_t
& oid
, uint64_t off
,
300 ceph_tid_t original_journal_tid
,
301 ceph_tid_t new_journal_tid
) {
302 typedef std::vector
<std::pair
<uint64_t,uint64_t> > Extents
;
304 ldout(m_ictx
->cct
, 20) << __func__
<< ": " << oid
<< " "
305 << off
<< "~" << len
<< " "
306 << "journal_tid=" << original_journal_tid
<< ", "
307 << "new_journal_tid=" << new_journal_tid
<< dendl
;
309 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
311 // all IO operations are flushed prior to closing the journal
312 assert(original_journal_tid
!= 0 && m_ictx
->journal
!= NULL
);
314 Extents file_extents
;
315 Striper::extent_to_file(m_ictx
->cct
, &m_ictx
->layout
, object_no
, off
,
317 for (Extents::iterator it
= file_extents
.begin();
318 it
!= file_extents
.end(); ++it
) {
319 if (new_journal_tid
!= 0) {
320 // ensure new journal event is safely committed to disk before
321 // committing old event
322 m_ictx
->journal
->flush_event(
323 new_journal_tid
, new C_CommitIOEventExtent(m_ictx
,
324 original_journal_tid
,
325 it
->first
, it
->second
));
327 m_ictx
->journal
->commit_io_event_extent(original_journal_tid
, it
->first
,
333 void LibrbdWriteback::complete_writes(const std::string
& oid
)
335 assert(m_lock
.is_locked());
336 std::queue
<write_result_d
*>& results
= m_writes
[oid
];
337 ldout(m_ictx
->cct
, 20) << "complete_writes() oid " << oid
<< dendl
;
338 std::list
<write_result_d
*> finished
;
340 while (!results
.empty()) {
341 write_result_d
*result
= results
.front();
344 finished
.push_back(result
);
351 for (std::list
<write_result_d
*>::iterator it
= finished
.begin();
352 it
!= finished
.end(); ++it
) {
353 write_result_d
*result
= *it
;
354 ldout(m_ictx
->cct
, 20) << "complete_writes() completing " << result
356 result
->oncommit
->complete(result
->ret
);