1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "common/ceph_context.h"
7 #include "common/dout.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/Context.h"
11 #include "include/rados/librados.hpp"
12 #include "include/rbd/librbd.hpp"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/internal.h"
17 #include "librbd/LibrbdWriteback.h"
18 #include "librbd/ObjectMap.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Utils.h"
21 #include "librbd/io/AioCompletion.h"
22 #include "librbd/io/ObjectRequest.h"
24 #include "include/assert.h"
26 #define dout_subsys ceph_subsys_rbd
28 #define dout_prefix *_dout << "librbdwriteback: "
33 * callback to finish a rados completion as a Context
36 * @param arg Context* recast as void*
38 void context_cb(rados_completion_t c
, void *arg
)
40 Context
*con
= reinterpret_cast<Context
*>(arg
);
41 con
->complete(rados_aio_get_return_value(c
));
45 * context to wrap another context in a Mutex
48 * @param c context to finish
49 * @param l mutex to lock
51 class C_ReadRequest
: public Context
{
53 C_ReadRequest(CephContext
*cct
, Context
*c
, Mutex
*cache_lock
)
54 : m_cct(cct
), m_ctx(c
), m_cache_lock(cache_lock
) {
56 void finish(int r
) override
{
57 ldout(m_cct
, 20) << "aio_cb completing " << dendl
;
59 Mutex::Locker
cache_locker(*m_cache_lock
);
62 ldout(m_cct
, 20) << "aio_cb finished" << dendl
;
70 class C_OrderedWrite
: public Context
{
72 C_OrderedWrite(CephContext
*cct
, LibrbdWriteback::write_result_d
*result
,
73 const ZTracer::Trace
&trace
, LibrbdWriteback
*wb
)
74 : m_cct(cct
), m_result(result
), m_trace(trace
), m_wb_handler(wb
) {}
75 ~C_OrderedWrite() override
{}
76 void finish(int r
) override
{
77 ldout(m_cct
, 20) << "C_OrderedWrite completing " << m_result
<< dendl
;
79 Mutex::Locker
l(m_wb_handler
->m_lock
);
80 assert(!m_result
->done
);
81 m_result
->done
= true;
83 m_wb_handler
->complete_writes(m_result
->oid
);
85 ldout(m_cct
, 20) << "C_OrderedWrite finished " << m_result
<< dendl
;
86 m_trace
.event("finish");
90 LibrbdWriteback::write_result_d
*m_result
;
91 ZTracer::Trace m_trace
;
92 LibrbdWriteback
*m_wb_handler
;
95 struct C_WriteJournalCommit
: public Context
{
96 typedef std::vector
<std::pair
<uint64_t,uint64_t> > Extents
;
104 uint64_t journal_tid
;
105 ZTracer::Trace trace
;
107 bool request_sent
= false;
109 C_WriteJournalCommit(ImageCtx
*_image_ctx
, const std::string
&_oid
,
110 uint64_t _object_no
, uint64_t _off
,
111 const bufferlist
&_bl
, const SnapContext
& _snapc
,
112 uint64_t _journal_tid
,
113 const ZTracer::Trace
&trace
, Context
*_req_comp
)
114 : image_ctx(_image_ctx
), oid(_oid
), object_no(_object_no
), off(_off
),
115 bl(_bl
), snapc(_snapc
), journal_tid(_journal_tid
),
116 trace(trace
), req_comp(_req_comp
) {
117 CephContext
*cct
= image_ctx
->cct
;
118 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
119 << "delaying write until journal tid "
120 << journal_tid
<< " safe" << dendl
;
123 void complete(int r
) override
{
124 if (request_sent
|| r
< 0) {
125 if (request_sent
&& r
== 0) {
126 // only commit IO events that are safely recorded to the backing image
127 // since the cache will retry all IOs that fail
128 commit_io_event_extent(0);
131 req_comp
->complete(r
);
138 void finish(int r
) override
{
141 void commit_io_event_extent(int r
) {
142 CephContext
*cct
= image_ctx
->cct
;
143 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
144 << "write committed: updating journal commit position"
147 // all IO operations are flushed prior to closing the journal
148 assert(image_ctx
->journal
!= NULL
);
150 Extents file_extents
;
151 Striper::extent_to_file(cct
, &image_ctx
->layout
, object_no
, off
,
152 bl
.length(), file_extents
);
153 for (Extents::iterator it
= file_extents
.begin();
154 it
!= file_extents
.end(); ++it
) {
155 image_ctx
->journal
->commit_io_event_extent(journal_tid
, it
->first
,
160 void send_request() {
161 CephContext
*cct
= image_ctx
->cct
;
162 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
163 << "journal committed: sending write request" << dendl
;
165 assert(image_ctx
->exclusive_lock
->is_lock_owner());
168 auto req
= new io::ObjectWriteRequest(image_ctx
, oid
, object_no
, off
,
169 bl
, snapc
, 0, trace
, this);
174 struct C_CommitIOEventExtent
: public Context
{
176 uint64_t journal_tid
;
180 C_CommitIOEventExtent(ImageCtx
*image_ctx
, uint64_t journal_tid
,
181 uint64_t offset
, uint64_t length
)
182 : image_ctx(image_ctx
), journal_tid(journal_tid
), offset(offset
),
186 void finish(int r
) override
{
187 // all IO operations are flushed prior to closing the journal
188 assert(image_ctx
->journal
!= nullptr);
190 image_ctx
->journal
->commit_io_event_extent(journal_tid
, offset
, length
,
195 LibrbdWriteback::LibrbdWriteback(ImageCtx
*ictx
, Mutex
& lock
)
196 : m_tid(0), m_lock(lock
), m_ictx(ictx
) {
199 void LibrbdWriteback::read(const object_t
& oid
, uint64_t object_no
,
200 const object_locator_t
& oloc
,
201 uint64_t off
, uint64_t len
, snapid_t snapid
,
202 bufferlist
*pbl
, uint64_t trunc_size
,
203 __u32 trunc_seq
, int op_flags
,
204 const ZTracer::Trace
&parent_trace
,
207 // on completion, take the mutex and then call onfinish.
208 Context
*req
= new C_ReadRequest(m_ictx
->cct
, onfinish
, &m_lock
);
211 RWLock::RLocker
snap_locker(m_ictx
->snap_lock
);
212 if (m_ictx
->object_map
!= nullptr &&
213 !m_ictx
->object_map
->object_may_exist(object_no
)) {
214 m_ictx
->op_work_queue
->queue(req
, -ENOENT
);
219 librados::ObjectReadOperation op
;
220 op
.read(off
, len
, pbl
, NULL
);
221 op
.set_op_flags2(op_flags
);
222 int flags
= m_ictx
->get_read_flags(snapid
);
224 librados::AioCompletion
*rados_completion
=
225 util::create_rados_callback(req
);
226 int r
= m_ictx
->data_ctx
.aio_operate(
227 oid
.name
, rados_completion
, &op
, flags
, nullptr,
228 (parent_trace
.valid() ? parent_trace
.get_info() : nullptr));
229 rados_completion
->release();
233 bool LibrbdWriteback::may_copy_on_write(const object_t
& oid
, uint64_t read_off
, uint64_t read_len
, snapid_t snapid
)
235 m_ictx
->snap_lock
.get_read();
236 librados::snap_t snap_id
= m_ictx
->snap_id
;
237 m_ictx
->parent_lock
.get_read();
238 uint64_t overlap
= 0;
239 m_ictx
->get_parent_overlap(snap_id
, &overlap
);
240 m_ictx
->parent_lock
.put_read();
241 m_ictx
->snap_lock
.put_read();
243 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
245 // reverse map this object extent onto the parent
246 vector
<pair
<uint64_t,uint64_t> > objectx
;
247 Striper::extent_to_file(m_ictx
->cct
, &m_ictx
->layout
,
248 object_no
, 0, m_ictx
->layout
.object_size
,
250 uint64_t object_overlap
= m_ictx
->prune_parent_extents(objectx
, overlap
);
251 bool may
= object_overlap
> 0;
252 ldout(m_ictx
->cct
, 10) << "may_copy_on_write " << oid
<< " " << read_off
253 << "~" << read_len
<< " = " << may
<< dendl
;
257 ceph_tid_t
LibrbdWriteback::write(const object_t
& oid
,
258 const object_locator_t
& oloc
,
259 uint64_t off
, uint64_t len
,
260 const SnapContext
& snapc
,
261 const bufferlist
&bl
,
262 ceph::real_time mtime
, uint64_t trunc_size
,
263 __u32 trunc_seq
, ceph_tid_t journal_tid
,
264 const ZTracer::Trace
&parent_trace
,
267 ZTracer::Trace trace
;
268 if (parent_trace
.valid()) {
269 trace
.init("", &m_ictx
->trace_endpoint
, &parent_trace
);
270 trace
.copy_name("writeback " + oid
.name
);
271 trace
.event("start");
274 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
276 write_result_d
*result
= new write_result_d(oid
.name
, oncommit
);
277 m_writes
[oid
.name
].push(result
);
278 ldout(m_ictx
->cct
, 20) << "write will wait for result " << result
<< dendl
;
279 C_OrderedWrite
*req_comp
= new C_OrderedWrite(m_ictx
->cct
, result
, trace
,
282 // all IO operations are flushed prior to closing the journal
283 assert(journal_tid
== 0 || m_ictx
->journal
!= NULL
);
284 if (journal_tid
!= 0) {
285 m_ictx
->journal
->flush_event(
286 journal_tid
, new C_WriteJournalCommit(
287 m_ictx
, oid
.name
, object_no
, off
, bl
, snapc
, journal_tid
, trace
,
290 auto req
= new io::ObjectWriteRequest(
291 m_ictx
, oid
.name
, object_no
, off
, bl
, snapc
, 0, trace
, req_comp
);
298 void LibrbdWriteback::overwrite_extent(const object_t
& oid
, uint64_t off
,
300 ceph_tid_t original_journal_tid
,
301 ceph_tid_t new_journal_tid
) {
302 typedef std::vector
<std::pair
<uint64_t,uint64_t> > Extents
;
304 ldout(m_ictx
->cct
, 20) << __func__
<< ": " << oid
<< " "
305 << off
<< "~" << len
<< " "
306 << "journal_tid=" << original_journal_tid
<< ", "
307 << "new_journal_tid=" << new_journal_tid
<< dendl
;
309 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
311 // all IO operations are flushed prior to closing the journal
312 assert(original_journal_tid
!= 0 && m_ictx
->journal
!= NULL
);
314 Extents file_extents
;
315 Striper::extent_to_file(m_ictx
->cct
, &m_ictx
->layout
, object_no
, off
,
317 for (Extents::iterator it
= file_extents
.begin();
318 it
!= file_extents
.end(); ++it
) {
319 if (new_journal_tid
!= 0) {
320 // ensure new journal event is safely committed to disk before
321 // committing old event
322 m_ictx
->journal
->flush_event(
323 new_journal_tid
, new C_CommitIOEventExtent(m_ictx
,
324 original_journal_tid
,
325 it
->first
, it
->second
));
327 m_ictx
->journal
->commit_io_event_extent(original_journal_tid
, it
->first
,
333 void LibrbdWriteback::complete_writes(const std::string
& oid
)
335 assert(m_lock
.is_locked());
336 std::queue
<write_result_d
*>& results
= m_writes
[oid
];
337 ldout(m_ictx
->cct
, 20) << "complete_writes() oid " << oid
<< dendl
;
338 std::list
<write_result_d
*> finished
;
340 while (!results
.empty()) {
341 write_result_d
*result
= results
.front();
344 finished
.push_back(result
);
351 for (std::list
<write_result_d
*>::iterator it
= finished
.begin();
352 it
!= finished
.end(); ++it
) {
353 write_result_d
*result
= *it
;
354 ldout(m_ictx
->cct
, 20) << "complete_writes() completing " << result
356 result
->oncommit
->complete(result
->ret
);