1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "common/ceph_context.h"
7 #include "common/dout.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/Context.h"
11 #include "include/rados/librados.hpp"
12 #include "include/rbd/librbd.hpp"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/internal.h"
17 #include "librbd/LibrbdWriteback.h"
18 #include "librbd/ObjectMap.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Utils.h"
21 #include "librbd/io/AioCompletion.h"
22 #include "librbd/io/ObjectRequest.h"
24 #include "include/assert.h"
26 #define dout_subsys ceph_subsys_rbd
28 #define dout_prefix *_dout << "librbdwriteback: "
33 * callback to finish a rados completion as a Context
36 * @param arg Context* recast as void*
38 void context_cb(rados_completion_t c
, void *arg
)
40 Context
*con
= reinterpret_cast<Context
*>(arg
);
41 con
->complete(rados_aio_get_return_value(c
));
45 * context to wrap another context in a Mutex
48 * @param c context to finish
49 * @param l mutex to lock
51 class C_ReadRequest
: public Context
{
53 C_ReadRequest(CephContext
*cct
, Context
*c
, Mutex
*cache_lock
)
54 : m_cct(cct
), m_ctx(c
), m_cache_lock(cache_lock
) {
56 void finish(int r
) override
{
57 ldout(m_cct
, 20) << "aio_cb completing " << dendl
;
59 Mutex::Locker
cache_locker(*m_cache_lock
);
62 ldout(m_cct
, 20) << "aio_cb finished" << dendl
;
70 class C_OrderedWrite
: public Context
{
72 C_OrderedWrite(CephContext
*cct
, LibrbdWriteback::write_result_d
*result
,
74 : m_cct(cct
), m_result(result
), m_wb_handler(wb
) {}
75 ~C_OrderedWrite() override
{}
76 void finish(int r
) override
{
77 ldout(m_cct
, 20) << "C_OrderedWrite completing " << m_result
<< dendl
;
79 Mutex::Locker
l(m_wb_handler
->m_lock
);
80 assert(!m_result
->done
);
81 m_result
->done
= true;
83 m_wb_handler
->complete_writes(m_result
->oid
);
85 ldout(m_cct
, 20) << "C_OrderedWrite finished " << m_result
<< dendl
;
89 LibrbdWriteback::write_result_d
*m_result
;
90 LibrbdWriteback
*m_wb_handler
;
93 struct C_WriteJournalCommit
: public Context
{
94 typedef std::vector
<std::pair
<uint64_t,uint64_t> > Extents
;
103 uint64_t journal_tid
;
106 C_WriteJournalCommit(ImageCtx
*_image_ctx
, const std::string
&_oid
,
107 uint64_t _object_no
, uint64_t _off
,
108 const bufferlist
&_bl
, const SnapContext
& _snapc
,
109 Context
*_req_comp
, uint64_t _journal_tid
)
110 : image_ctx(_image_ctx
), oid(_oid
), object_no(_object_no
), off(_off
),
111 bl(_bl
), snapc(_snapc
), req_comp(_req_comp
), journal_tid(_journal_tid
),
112 request_sent(false) {
113 CephContext
*cct
= image_ctx
->cct
;
114 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
115 << "delaying write until journal tid "
116 << journal_tid
<< " safe" << dendl
;
119 void complete(int r
) override
{
120 if (request_sent
|| r
< 0) {
121 if (request_sent
&& r
== 0) {
122 // only commit IO events that are safely recorded to the backing image
123 // since the cache will retry all IOs that fail
124 commit_io_event_extent(0);
127 req_comp
->complete(r
);
134 void finish(int r
) override
{
137 void commit_io_event_extent(int r
) {
138 CephContext
*cct
= image_ctx
->cct
;
139 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
140 << "write committed: updating journal commit position"
143 // all IO operations are flushed prior to closing the journal
144 assert(image_ctx
->journal
!= NULL
);
146 Extents file_extents
;
147 Striper::extent_to_file(cct
, &image_ctx
->layout
, object_no
, off
,
148 bl
.length(), file_extents
);
149 for (Extents::iterator it
= file_extents
.begin();
150 it
!= file_extents
.end(); ++it
) {
151 image_ctx
->journal
->commit_io_event_extent(journal_tid
, it
->first
,
156 void send_request() {
157 CephContext
*cct
= image_ctx
->cct
;
158 ldout(cct
, 20) << this << " C_WriteJournalCommit: "
159 << "journal committed: sending write request" << dendl
;
161 assert(image_ctx
->exclusive_lock
->is_lock_owner());
164 auto req
= new io::ObjectWriteRequest(image_ctx
, oid
, object_no
, off
,
170 struct C_CommitIOEventExtent
: public Context
{
172 uint64_t journal_tid
;
176 C_CommitIOEventExtent(ImageCtx
*image_ctx
, uint64_t journal_tid
,
177 uint64_t offset
, uint64_t length
)
178 : image_ctx(image_ctx
), journal_tid(journal_tid
), offset(offset
),
182 void finish(int r
) override
{
183 // all IO operations are flushed prior to closing the journal
184 assert(image_ctx
->journal
!= nullptr);
186 image_ctx
->journal
->commit_io_event_extent(journal_tid
, offset
, length
,
191 LibrbdWriteback::LibrbdWriteback(ImageCtx
*ictx
, Mutex
& lock
)
192 : m_tid(0), m_lock(lock
), m_ictx(ictx
) {
195 void LibrbdWriteback::read(const object_t
& oid
, uint64_t object_no
,
196 const object_locator_t
& oloc
,
197 uint64_t off
, uint64_t len
, snapid_t snapid
,
198 bufferlist
*pbl
, uint64_t trunc_size
,
199 __u32 trunc_seq
, int op_flags
, Context
*onfinish
)
201 // on completion, take the mutex and then call onfinish.
202 Context
*req
= new C_ReadRequest(m_ictx
->cct
, onfinish
, &m_lock
);
205 RWLock::RLocker
snap_locker(m_ictx
->snap_lock
);
206 if (m_ictx
->object_map
!= nullptr &&
207 !m_ictx
->object_map
->object_may_exist(object_no
)) {
208 m_ictx
->op_work_queue
->queue(req
, -ENOENT
);
213 librados::ObjectReadOperation op
;
214 op
.read(off
, len
, pbl
, NULL
);
215 op
.set_op_flags2(op_flags
);
216 int flags
= m_ictx
->get_read_flags(snapid
);
218 librados::AioCompletion
*rados_completion
=
219 util::create_rados_callback(req
);
220 int r
= m_ictx
->data_ctx
.aio_operate(oid
.name
, rados_completion
, &op
,
222 rados_completion
->release();
226 bool LibrbdWriteback::may_copy_on_write(const object_t
& oid
, uint64_t read_off
, uint64_t read_len
, snapid_t snapid
)
228 m_ictx
->snap_lock
.get_read();
229 librados::snap_t snap_id
= m_ictx
->snap_id
;
230 m_ictx
->parent_lock
.get_read();
231 uint64_t overlap
= 0;
232 m_ictx
->get_parent_overlap(snap_id
, &overlap
);
233 m_ictx
->parent_lock
.put_read();
234 m_ictx
->snap_lock
.put_read();
236 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
238 // reverse map this object extent onto the parent
239 vector
<pair
<uint64_t,uint64_t> > objectx
;
240 Striper::extent_to_file(m_ictx
->cct
, &m_ictx
->layout
,
241 object_no
, 0, m_ictx
->layout
.object_size
,
243 uint64_t object_overlap
= m_ictx
->prune_parent_extents(objectx
, overlap
);
244 bool may
= object_overlap
> 0;
245 ldout(m_ictx
->cct
, 10) << "may_copy_on_write " << oid
<< " " << read_off
246 << "~" << read_len
<< " = " << may
<< dendl
;
250 ceph_tid_t
LibrbdWriteback::write(const object_t
& oid
,
251 const object_locator_t
& oloc
,
252 uint64_t off
, uint64_t len
,
253 const SnapContext
& snapc
,
254 const bufferlist
&bl
,
255 ceph::real_time mtime
, uint64_t trunc_size
,
256 __u32 trunc_seq
, ceph_tid_t journal_tid
,
259 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
261 write_result_d
*result
= new write_result_d(oid
.name
, oncommit
);
262 m_writes
[oid
.name
].push(result
);
263 ldout(m_ictx
->cct
, 20) << "write will wait for result " << result
<< dendl
;
264 C_OrderedWrite
*req_comp
= new C_OrderedWrite(m_ictx
->cct
, result
, this);
266 // all IO operations are flushed prior to closing the journal
267 assert(journal_tid
== 0 || m_ictx
->journal
!= NULL
);
268 if (journal_tid
!= 0) {
269 m_ictx
->journal
->flush_event(
270 journal_tid
, new C_WriteJournalCommit(m_ictx
, oid
.name
, object_no
, off
,
274 auto req
= new io::ObjectWriteRequest(m_ictx
, oid
.name
, object_no
,
275 off
, bl
, snapc
, req_comp
, 0);
282 void LibrbdWriteback::overwrite_extent(const object_t
& oid
, uint64_t off
,
284 ceph_tid_t original_journal_tid
,
285 ceph_tid_t new_journal_tid
) {
286 typedef std::vector
<std::pair
<uint64_t,uint64_t> > Extents
;
288 ldout(m_ictx
->cct
, 20) << __func__
<< ": " << oid
<< " "
289 << off
<< "~" << len
<< " "
290 << "journal_tid=" << original_journal_tid
<< ", "
291 << "new_journal_tid=" << new_journal_tid
<< dendl
;
293 uint64_t object_no
= oid_to_object_no(oid
.name
, m_ictx
->object_prefix
);
295 // all IO operations are flushed prior to closing the journal
296 assert(original_journal_tid
!= 0 && m_ictx
->journal
!= NULL
);
298 Extents file_extents
;
299 Striper::extent_to_file(m_ictx
->cct
, &m_ictx
->layout
, object_no
, off
,
301 for (Extents::iterator it
= file_extents
.begin();
302 it
!= file_extents
.end(); ++it
) {
303 if (new_journal_tid
!= 0) {
304 // ensure new journal event is safely committed to disk before
305 // committing old event
306 m_ictx
->journal
->flush_event(
307 new_journal_tid
, new C_CommitIOEventExtent(m_ictx
,
308 original_journal_tid
,
309 it
->first
, it
->second
));
311 m_ictx
->journal
->commit_io_event_extent(original_journal_tid
, it
->first
,
317 void LibrbdWriteback::complete_writes(const std::string
& oid
)
319 assert(m_lock
.is_locked());
320 std::queue
<write_result_d
*>& results
= m_writes
[oid
];
321 ldout(m_ictx
->cct
, 20) << "complete_writes() oid " << oid
<< dendl
;
322 std::list
<write_result_d
*> finished
;
324 while (!results
.empty()) {
325 write_result_d
*result
= results
.front();
328 finished
.push_back(result
);
335 for (std::list
<write_result_d
*>::iterator it
= finished
.begin();
336 it
!= finished
.end(); ++it
) {
337 write_result_d
*result
= *it
;
338 ldout(m_ictx
->cct
, 20) << "complete_writes() completing " << result
340 result
->oncommit
->complete(result
->ret
);