1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/WorkQueue.h"
5 #include "librbd/ImageCtx.h"
6 #include "librbd/Journal.h"
7 #include "librbd/Utils.h"
8 #include "librbd/io/ObjectDispatchSpec.h"
9 #include "librbd/io/ObjectDispatcher.h"
10 #include "librbd/io/Utils.h"
11 #include "librbd/cache/ParentCacheObjectDispatch.h"
12 #include "osd/osd_types.h"
13 #include "osdc/WritebackHandler.h"
17 #define dout_subsys ceph_subsys_rbd
19 #define dout_prefix *_dout << "librbd::cache::ParentCacheObjectDispatch: " \
20 << this << " " << __func__ << ": "
22 using namespace ceph::immutable_obj_cache
;
23 using librbd::util::data_object_name
;
29 ParentCacheObjectDispatch
<I
>::ParentCacheObjectDispatch(
30 I
* image_ctx
) : m_image_ctx(image_ctx
), m_cache_client(nullptr),
31 m_initialized(false), m_connecting(false) {
32 ceph_assert(m_image_ctx
->data_ctx
.is_valid());
33 std::string controller_path
=
34 ((CephContext
*)(m_image_ctx
->cct
))->_conf
.get_val
<std::string
>("immutable_object_cache_sock");
35 m_cache_client
= new CacheClient(controller_path
.c_str(), m_image_ctx
->cct
);
39 ParentCacheObjectDispatch
<I
>::~ParentCacheObjectDispatch() {
40 delete m_cache_client
;
44 void ParentCacheObjectDispatch
<I
>::init(Context
* on_finish
) {
45 auto cct
= m_image_ctx
->cct
;
46 ldout(cct
, 5) << dendl
;
48 if (m_image_ctx
->parent
!= nullptr) {
49 ldout(cct
, 5) << "child image: skipping" << dendl
;
53 Context
* create_session_ctx
= new LambdaContext([this, on_finish
](int ret
) {
54 m_connecting
.store(false);
55 if (on_finish
!= nullptr) {
56 on_finish
->complete(ret
);
60 m_connecting
.store(true);
61 create_cache_session(create_session_ctx
, false);
63 m_image_ctx
->io_object_dispatcher
->register_object_dispatch(this);
68 bool ParentCacheObjectDispatch
<I
>::read(
69 uint64_t object_no
, uint64_t object_off
,
70 uint64_t object_len
, librados::snap_t snap_id
, int op_flags
,
71 const ZTracer::Trace
&parent_trace
, ceph::bufferlist
* read_data
,
72 io::ExtentMap
* extent_map
, int* object_dispatch_flags
,
73 io::DispatchResult
* dispatch_result
, Context
** on_finish
,
74 Context
* on_dispatched
) {
75 auto cct
= m_image_ctx
->cct
;
76 ldout(cct
, 20) << "object_no=" << object_no
<< " " << object_off
<< "~"
77 << object_len
<< dendl
;
78 ceph_assert(m_initialized
);
79 string oid
= data_object_name(m_image_ctx
, object_no
);
81 /* if RO daemon still don't startup, or RO daemon crash,
82 * or session occur any error, try to re-connect daemon.*/
83 if (!m_cache_client
->is_session_work()) {
84 if (!m_connecting
.exchange(true)) {
85 /* Since we don't have a giant lock protecting the full re-connect process,
86 * if thread A first passes the if (!m_cache_client->is_session_work()),
87 * thread B could have also passed it and reconnected
88 * before thread A resumes and executes if (!m_connecting.exchange(true)).
89 * This will result in thread A re-connecting a working session.
90 * So, we need to check if session is normal again. If session work,
91 * we need set m_connecting to false. */
92 if (!m_cache_client
->is_session_work()) {
93 Context
* on_finish
= new LambdaContext([this](int ret
) {
94 m_connecting
.store(false);
96 create_cache_session(on_finish
, true);
98 m_connecting
.store(false);
101 ldout(cct
, 5) << "Parent cache try to re-connect to RO daemon. "
102 << "dispatch current request to lower object layer" << dendl
;
106 ceph_assert(m_cache_client
->is_session_work());
108 CacheGenContextURef ctx
= make_gen_lambda_context
<ObjectCacheRequest
*,
109 std::function
<void(ObjectCacheRequest
*)>>
110 ([this, read_data
, dispatch_result
, on_dispatched
,
111 oid
, object_off
, object_len
](ObjectCacheRequest
* ack
) {
112 handle_read_cache(ack
, object_off
, object_len
, read_data
,
113 dispatch_result
, on_dispatched
);
116 m_cache_client
->lookup_object(m_image_ctx
->data_ctx
.get_namespace(),
117 m_image_ctx
->data_ctx
.get_id(),
118 (uint64_t)snap_id
, oid
, std::move(ctx
));
122 template <typename I
>
123 void ParentCacheObjectDispatch
<I
>::handle_read_cache(
124 ObjectCacheRequest
* ack
, uint64_t read_off
, uint64_t read_len
,
125 ceph::bufferlist
* read_data
, io::DispatchResult
* dispatch_result
,
126 Context
* on_dispatched
) {
127 auto cct
= m_image_ctx
->cct
;
128 ldout(cct
, 20) << dendl
;
130 if(ack
->type
!= RBDSC_READ_REPLY
) {
131 // go back to read rados
132 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
133 on_dispatched
->complete(0);
137 ceph_assert(ack
->type
== RBDSC_READ_REPLY
);
138 std::string file_path
= ((ObjectCacheReadReplyData
*)ack
)->cache_path
;
139 ceph_assert(file_path
!= "");
141 // try to read from parent image cache
142 int r
= read_object(file_path
, read_data
, read_off
, read_len
, on_dispatched
);
144 // cache read error, fall back to read rados
145 *dispatch_result
= io::DISPATCH_RESULT_CONTINUE
;
146 on_dispatched
->complete(0);
150 *dispatch_result
= io::DISPATCH_RESULT_COMPLETE
;
151 on_dispatched
->complete(r
);
154 template <typename I
>
155 int ParentCacheObjectDispatch
<I
>::handle_register_client(bool reg
) {
156 auto cct
= m_image_ctx
->cct
;
157 ldout(cct
, 20) << dendl
;
160 lderr(cct
) << "Parent cache register fails." << dendl
;
165 template <typename I
>
166 int ParentCacheObjectDispatch
<I
>::create_cache_session(Context
* on_finish
, bool is_reconnect
) {
167 auto cct
= m_image_ctx
->cct
;
168 ldout(cct
, 20) << dendl
;
170 Context
* register_ctx
= new LambdaContext([this, cct
, on_finish
](int ret
) {
172 lderr(cct
) << "Parent cache fail to register client." << dendl
;
174 ceph_assert(m_cache_client
->is_session_work());
176 handle_register_client(ret
< 0 ? false : true);
177 on_finish
->complete(ret
);
180 Context
* connect_ctx
= new LambdaContext(
181 [this, cct
, register_ctx
](int ret
) {
183 lderr(cct
) << "Parent cache fail to connect RO daeomn." << dendl
;
184 register_ctx
->complete(ret
);
188 ldout(cct
, 20) << "Parent cache connected to RO daemon." << dendl
;
190 m_cache_client
->register_client(register_ctx
);
193 if (m_cache_client
!= nullptr && is_reconnect
) {
194 // CacheClient's destruction will cleanup all details on old session.
195 delete m_cache_client
;
197 // create new CacheClient to connect RO daemon.
198 std::string controller_path
=
199 ((CephContext
*)(m_image_ctx
->cct
))->_conf
.get_val
<std::string
>("immutable_object_cache_sock");
200 m_cache_client
= new CacheClient(controller_path
.c_str(), m_image_ctx
->cct
);
203 m_cache_client
->run();
205 m_cache_client
->connect(connect_ctx
);
209 template <typename I
>
210 int ParentCacheObjectDispatch
<I
>::read_object(
211 std::string file_path
, ceph::bufferlist
* read_data
, uint64_t offset
,
212 uint64_t length
, Context
*on_finish
) {
214 auto *cct
= m_image_ctx
->cct
;
215 ldout(cct
, 20) << "file path: " << file_path
<< dendl
;
218 int ret
= read_data
->pread_file(file_path
.c_str(), offset
, length
, &error
);
220 ldout(cct
, 5) << "read from file return error: " << error
221 << "file path= " << file_path
225 return read_data
->length();
229 } // namespace librbd
231 template class librbd::cache::ParentCacheObjectDispatch
<librbd::ImageCtx
>;