1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/cache/ObjectCacherObjectDispatch.h"
5 #include "include/neorados/RADOS.hpp"
6 #include "common/errno.h"
7 #include "librbd/ImageCtx.h"
8 #include "librbd/Journal.h"
9 #include "librbd/Utils.h"
10 #include "librbd/asio/ContextWQ.h"
11 #include "librbd/cache/ObjectCacherWriteback.h"
12 #include "librbd/io/ObjectDispatchSpec.h"
13 #include "librbd/io/ObjectDispatcherInterface.h"
14 #include "librbd/io/ReadResult.h"
15 #include "librbd/io/Types.h"
16 #include "librbd/io/Utils.h"
17 #include "osd/osd_types.h"
18 #include "osdc/WritebackHandler.h"
21 #define dout_subsys ceph_subsys_rbd
23 #define dout_prefix *_dout << "librbd::cache::ObjectCacherObjectDispatch: " \
24 << this << " " << __func__ << ": "
29 using librbd::util::data_object_name
;
33 typedef std::vector
<ObjectExtent
> ObjectExtents
;
35 } // anonymous namespace
38 struct ObjectCacherObjectDispatch
<I
>::C_InvalidateCache
: public Context
{
39 ObjectCacherObjectDispatch
* dispatcher
;
43 C_InvalidateCache(ObjectCacherObjectDispatch
* dispatcher
,
44 bool purge_on_error
, Context
*on_finish
)
45 : dispatcher(dispatcher
), purge_on_error(purge_on_error
),
46 on_finish(on_finish
) {
49 void finish(int r
) override
{
50 ceph_assert(ceph_mutex_is_locked(dispatcher
->m_cache_lock
));
51 auto cct
= dispatcher
->m_image_ctx
->cct
;
53 if (r
== -EBLOCKLISTED
) {
54 lderr(cct
) << "blocklisted during flush (purging)" << dendl
;
55 dispatcher
->m_object_cacher
->purge_set(dispatcher
->m_object_set
);
56 } else if (r
< 0 && purge_on_error
) {
57 lderr(cct
) << "failed to invalidate cache (purging): "
58 << cpp_strerror(r
) << dendl
;
59 dispatcher
->m_object_cacher
->purge_set(dispatcher
->m_object_set
);
61 lderr(cct
) << "failed to invalidate cache: " << cpp_strerror(r
) << dendl
;
64 auto unclean
= dispatcher
->m_object_cacher
->release_set(
65 dispatcher
->m_object_set
);
69 lderr(cct
) << "could not release all objects from cache: "
70 << unclean
<< " bytes remain" << dendl
;
76 on_finish
->complete(r
);
81 ObjectCacherObjectDispatch
<I
>::ObjectCacherObjectDispatch(
82 I
* image_ctx
, size_t max_dirty
, bool writethrough_until_flush
)
83 : m_image_ctx(image_ctx
), m_max_dirty(max_dirty
),
84 m_writethrough_until_flush(writethrough_until_flush
),
85 m_cache_lock(ceph::make_mutex(util::unique_lock_name(
86 "librbd::cache::ObjectCacherObjectDispatch::cache_lock", this))) {
87 ceph_assert(m_image_ctx
->data_ctx
.is_valid());
91 ObjectCacherObjectDispatch
<I
>::~ObjectCacherObjectDispatch() {
92 delete m_object_cacher
;
95 delete m_writeback_handler
;
99 void ObjectCacherObjectDispatch
<I
>::init() {
100 auto cct
= m_image_ctx
->cct
;
101 ldout(cct
, 5) << dendl
;
104 ldout(cct
, 5) << "enabling caching..." << dendl
;
105 m_writeback_handler
= new ObjectCacherWriteback(m_image_ctx
, m_cache_lock
);
107 auto init_max_dirty
= m_max_dirty
;
108 if (m_writethrough_until_flush
) {
113 m_image_ctx
->config
.template get_val
<Option::size_t>("rbd_cache_size");
115 m_image_ctx
->config
.template get_val
<Option::size_t>("rbd_cache_target_dirty");
117 m_image_ctx
->config
.template get_val
<double>("rbd_cache_max_dirty_age");
118 auto block_writes_upfront
=
119 m_image_ctx
->config
.template get_val
<bool>("rbd_cache_block_writes_upfront");
120 auto max_dirty_object
=
121 m_image_ctx
->config
.template get_val
<uint64_t>("rbd_cache_max_dirty_object");
123 ldout(cct
, 5) << "Initial cache settings:"
124 << " size=" << cache_size
125 << " num_objects=" << 10
126 << " max_dirty=" << init_max_dirty
127 << " target_dirty=" << target_dirty
128 << " max_dirty_age=" << max_dirty_age
<< dendl
;
130 m_object_cacher
= new ObjectCacher(cct
, m_image_ctx
->perfcounter
->get_name(),
131 *m_writeback_handler
, m_cache_lock
,
132 nullptr, nullptr, cache_size
,
133 10, /* reset this in init */
134 init_max_dirty
, target_dirty
,
135 max_dirty_age
, block_writes_upfront
);
137 // size object cache appropriately
138 if (max_dirty_object
== 0) {
139 max_dirty_object
= std::min
<uint64_t>(
140 2000, std::max
<uint64_t>(10, cache_size
/ 100 /
141 sizeof(ObjectCacher::Object
)));
143 ldout(cct
, 5) << " cache bytes " << cache_size
144 << " -> about " << max_dirty_object
<< " objects" << dendl
;
145 m_object_cacher
->set_max_objects(max_dirty_object
);
147 m_object_set
= new ObjectCacher::ObjectSet(nullptr,
148 m_image_ctx
->data_ctx
.get_id(), 0);
149 m_object_cacher
->start();
150 m_cache_lock
.unlock();
152 // add ourself to the IO object dispatcher chain
153 if (m_max_dirty
> 0) {
154 m_image_ctx
->disable_zero_copy
= true;
156 m_image_ctx
->io_object_dispatcher
->register_dispatch(this);
159 template <typename I
>
160 void ObjectCacherObjectDispatch
<I
>::shut_down(Context
* on_finish
) {
161 auto cct
= m_image_ctx
->cct
;
162 ldout(cct
, 5) << dendl
;
164 // chain shut down in reverse order
166 // shut down the cache
167 on_finish
= new LambdaContext([this, on_finish
](int r
) {
168 m_object_cacher
->stop();
169 on_finish
->complete(r
);
172 // ensure we aren't holding the cache lock post-flush
173 on_finish
= util::create_async_context_callback(*m_image_ctx
, on_finish
);
175 // invalidate any remaining cache entries
176 on_finish
= new C_InvalidateCache(this, true, on_finish
);
178 // flush all pending writeback state
179 std::lock_guard locker
{m_cache_lock
};
180 m_object_cacher
->release_set(m_object_set
);
181 m_object_cacher
->flush_set(m_object_set
, on_finish
);
184 template <typename I
>
185 bool ObjectCacherObjectDispatch
<I
>::read(
186 uint64_t object_no
, io::ReadExtents
* extents
, IOContext io_context
,
187 int op_flags
, int read_flags
, const ZTracer::Trace
&parent_trace
,
188 uint64_t* version
, int* object_dispatch_flags
,
189 io::DispatchResult
* dispatch_result
, Context
** on_finish
,
190 Context
* on_dispatched
) {
191 // IO chained in reverse order
192 auto cct
= m_image_ctx
->cct
;
193 ldout(cct
, 20) << "object_no=" << object_no
<< " " << *extents
<< dendl
;
195 if (extents
->size() == 0) {
196 ldout(cct
, 20) << "no extents to read" << dendl
;
200 if (version
!= nullptr) {
201 // we currently don't cache read versions
202 // and don't support reading more than one extent
206 // ensure we aren't holding the cache lock post-read
207 on_dispatched
= util::create_async_context_callback(*m_image_ctx
,
210 // embed the RBD-internal read flags in the genenric RADOS op_flags and
211 op_flags
= ((op_flags
& ~ObjectCacherWriteback::READ_FLAGS_MASK
) |
212 ((read_flags
<< ObjectCacherWriteback::READ_FLAGS_SHIFT
) &
213 ObjectCacherWriteback::READ_FLAGS_MASK
));
215 ceph::bufferlist
* bl
;
216 if (extents
->size() > 1) {
217 auto req
= new io::ReadResult::C_ObjectReadMergedExtents(
218 cct
, extents
, on_dispatched
);
222 bl
= &extents
->front().bl
;
225 m_image_ctx
->image_lock
.lock_shared();
226 auto rd
= m_object_cacher
->prepare_read(
227 io_context
->read_snap().value_or(CEPH_NOSNAP
), bl
, op_flags
);
228 m_image_ctx
->image_lock
.unlock_shared();
231 for (auto& read_extent
: *extents
) {
232 ObjectExtent
extent(data_object_name(m_image_ctx
, object_no
), object_no
,
233 read_extent
.offset
, read_extent
.length
, 0);
234 extent
.oloc
.pool
= m_image_ctx
->data_ctx
.get_id();
235 extent
.buffer_extents
.push_back({off
, read_extent
.length
});
236 rd
->extents
.push_back(extent
);
237 off
+= read_extent
.length
;
240 ZTracer::Trace
trace(parent_trace
);
241 *dispatch_result
= io::DISPATCH_RESULT_COMPLETE
;
244 int r
= m_object_cacher
->readx(rd
, m_object_set
, on_dispatched
, &trace
);
245 m_cache_lock
.unlock();
247 on_dispatched
->complete(r
);
252 template <typename I
>
253 bool ObjectCacherObjectDispatch
<I
>::discard(
254 uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
255 IOContext io_context
, int discard_flags
,
256 const ZTracer::Trace
&parent_trace
, int* object_dispatch_flags
,
257 uint64_t* journal_tid
, io::DispatchResult
* dispatch_result
,
258 Context
** on_finish
, Context
* on_dispatched
) {
259 auto cct
= m_image_ctx
->cct
;
260 ldout(cct
, 20) << "object_no=" << object_no
<< " " << object_off
<< "~"
261 << object_len
<< dendl
;
263 ObjectExtents object_extents
;
264 object_extents
.emplace_back(data_object_name(m_image_ctx
, object_no
),
265 object_no
, object_off
, object_len
, 0);
267 // discard the cache state after changes are committed to disk (and to
268 // prevent races w/ readahead)
269 auto ctx
= *on_finish
;
270 *on_finish
= new LambdaContext(
271 [this, object_extents
, ctx
](int r
) {
273 m_object_cacher
->discard_set(m_object_set
, object_extents
);
274 m_cache_lock
.unlock();
279 // ensure we aren't holding the cache lock post-write
280 on_dispatched
= util::create_async_context_callback(*m_image_ctx
,
283 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
285 // ensure any in-flight writeback is complete before advancing
286 // the discard request
287 std::lock_guard locker
{m_cache_lock
};
288 m_object_cacher
->discard_writeback(m_object_set
, object_extents
,
293 template <typename I
>
294 bool ObjectCacherObjectDispatch
<I
>::write(
295 uint64_t object_no
, uint64_t object_off
, ceph::bufferlist
&& data
,
296 IOContext io_context
, int op_flags
, int write_flags
,
297 std::optional
<uint64_t> assert_version
,
298 const ZTracer::Trace
&parent_trace
, int* object_dispatch_flags
,
299 uint64_t* journal_tid
, io::DispatchResult
* dispatch_result
,
300 Context
** on_finish
, Context
* on_dispatched
) {
301 auto cct
= m_image_ctx
->cct
;
302 ldout(cct
, 20) << "object_no=" << object_no
<< " " << object_off
<< "~"
303 << data
.length() << dendl
;
305 // ensure we aren't holding the cache lock post-write
306 on_dispatched
= util::create_async_context_callback(*m_image_ctx
,
309 // cache layer does not handle version checking
310 if (assert_version
.has_value() ||
311 (write_flags
& io::OBJECT_WRITE_FLAG_CREATE_EXCLUSIVE
) != 0) {
312 ObjectExtents object_extents
;
313 object_extents
.emplace_back(data_object_name(m_image_ctx
, object_no
),
314 object_no
, object_off
, data
.length(), 0);
316 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
318 // ensure any in-flight writeback is complete before advancing
320 std::lock_guard locker
{m_cache_lock
};
321 m_object_cacher
->discard_writeback(m_object_set
, object_extents
,
327 if (io_context
->write_snap_context()) {
328 auto write_snap_context
= *io_context
->write_snap_context();
329 snapc
= SnapContext(write_snap_context
.first
,
330 {write_snap_context
.second
.begin(),
331 write_snap_context
.second
.end()});
334 m_image_ctx
->image_lock
.lock_shared();
335 ObjectCacher::OSDWrite
*wr
= m_object_cacher
->prepare_write(
336 snapc
, data
, ceph::real_time::min(), op_flags
, *journal_tid
);
337 m_image_ctx
->image_lock
.unlock_shared();
339 ObjectExtent
extent(data_object_name(m_image_ctx
, object_no
),
340 object_no
, object_off
, data
.length(), 0);
341 extent
.oloc
.pool
= m_image_ctx
->data_ctx
.get_id();
342 extent
.buffer_extents
.push_back({0, data
.length()});
343 wr
->extents
.push_back(extent
);
345 ZTracer::Trace
trace(parent_trace
);
346 *dispatch_result
= io::DISPATCH_RESULT_COMPLETE
;
348 std::lock_guard locker
{m_cache_lock
};
349 m_object_cacher
->writex(wr
, m_object_set
, on_dispatched
, &trace
);
353 template <typename I
>
354 bool ObjectCacherObjectDispatch
<I
>::write_same(
355 uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
356 io::LightweightBufferExtents
&& buffer_extents
, ceph::bufferlist
&& data
,
357 IOContext io_context
, int op_flags
,
358 const ZTracer::Trace
&parent_trace
, int* object_dispatch_flags
,
359 uint64_t* journal_tid
, io::DispatchResult
* dispatch_result
,
360 Context
** on_finish
, Context
* on_dispatched
) {
361 auto cct
= m_image_ctx
->cct
;
362 ldout(cct
, 20) << "object_no=" << object_no
<< " " << object_off
<< "~"
363 << object_len
<< dendl
;
365 // ObjectCacher doesn't support write-same so convert to regular write
366 io::LightweightObjectExtent
extent(object_no
, object_off
, object_len
, 0);
367 extent
.buffer_extents
= std::move(buffer_extents
);
370 io::util::assemble_write_same_extent(extent
, data
, &ws_data
, true);
372 return write(object_no
, object_off
, std::move(ws_data
), io_context
, op_flags
,
373 0, std::nullopt
, parent_trace
, object_dispatch_flags
,
374 journal_tid
, dispatch_result
, on_finish
, on_dispatched
);
377 template <typename I
>
378 bool ObjectCacherObjectDispatch
<I
>::compare_and_write(
379 uint64_t object_no
, uint64_t object_off
, ceph::bufferlist
&& cmp_data
,
380 ceph::bufferlist
&& write_data
, IOContext io_context
, int op_flags
,
381 const ZTracer::Trace
&parent_trace
, uint64_t* mismatch_offset
,
382 int* object_dispatch_flags
, uint64_t* journal_tid
,
383 io::DispatchResult
* dispatch_result
, Context
** on_finish
,
384 Context
* on_dispatched
) {
385 auto cct
= m_image_ctx
->cct
;
386 ldout(cct
, 20) << "object_no=" << object_no
<< " " << object_off
<< "~"
387 << cmp_data
.length() << dendl
;
389 // pass-through the compare-and-write request since it's not a supported
390 // operation of the ObjectCacher
392 // ensure we aren't holding the cache lock post-flush
393 on_dispatched
= util::create_async_context_callback(*m_image_ctx
,
396 // flush any pending writes from the cache
397 ZTracer::Trace
trace(parent_trace
);
398 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
400 ObjectExtents object_extents
;
401 object_extents
.emplace_back(data_object_name(m_image_ctx
, object_no
),
402 object_no
, object_off
, cmp_data
.length(), 0);
404 std::lock_guard cache_locker
{m_cache_lock
};
405 m_object_cacher
->flush_set(m_object_set
, object_extents
, &trace
,
410 template <typename I
>
411 bool ObjectCacherObjectDispatch
<I
>::flush(
412 io::FlushSource flush_source
, const ZTracer::Trace
&parent_trace
,
413 uint64_t* journal_tid
, io::DispatchResult
* dispatch_result
,
414 Context
** on_finish
, Context
* on_dispatched
) {
415 auto cct
= m_image_ctx
->cct
;
416 ldout(cct
, 20) << dendl
;
418 // ensure we aren't holding the cache lock post-flush
419 on_dispatched
= util::create_async_context_callback(*m_image_ctx
,
422 std::lock_guard locker
{m_cache_lock
};
423 if (flush_source
== io::FLUSH_SOURCE_USER
&& !m_user_flushed
) {
424 m_user_flushed
= true;
425 if (m_writethrough_until_flush
&& m_max_dirty
> 0) {
426 m_object_cacher
->set_max_dirty(m_max_dirty
);
427 ldout(cct
, 5) << "saw first user flush, enabling writeback" << dendl
;
431 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
432 m_object_cacher
->flush_set(m_object_set
, on_dispatched
);
436 template <typename I
>
437 bool ObjectCacherObjectDispatch
<I
>::invalidate_cache(Context
* on_finish
) {
438 auto cct
= m_image_ctx
->cct
;
439 ldout(cct
, 5) << dendl
;
441 // ensure we aren't holding the cache lock post-flush
442 on_finish
= util::create_async_context_callback(*m_image_ctx
, on_finish
);
444 // invalidate any remaining cache entries
445 on_finish
= new C_InvalidateCache(this, false, on_finish
);
447 std::lock_guard locker
{m_cache_lock
};
448 m_object_cacher
->release_set(m_object_set
);
449 m_object_cacher
->flush_set(m_object_set
, on_finish
);
453 template <typename I
>
454 bool ObjectCacherObjectDispatch
<I
>::reset_existence_cache(
455 Context
* on_finish
) {
456 auto cct
= m_image_ctx
->cct
;
457 ldout(cct
, 5) << dendl
;
459 std::lock_guard locker
{m_cache_lock
};
460 m_object_cacher
->clear_nonexistence(m_object_set
);
465 } // namespace librbd
467 template class librbd::cache::ObjectCacherObjectDispatch
<librbd::ImageCtx
>;