]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/ParentCacheObjectDispatch.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / librbd / cache / ParentCacheObjectDispatch.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/WorkQueue.h"
5 #include "librbd/ImageCtx.h"
6 #include "librbd/Journal.h"
7 #include "librbd/Utils.h"
8 #include "librbd/io/ObjectDispatchSpec.h"
9 #include "librbd/io/ObjectDispatcher.h"
10 #include "librbd/io/Utils.h"
11 #include "librbd/cache/ParentCacheObjectDispatch.h"
12 #include "osd/osd_types.h"
13 #include "osdc/WritebackHandler.h"
14
15 #include <vector>
16
17 #define dout_subsys ceph_subsys_rbd
18 #undef dout_prefix
19 #define dout_prefix *_dout << "librbd::cache::ParentCacheObjectDispatch: " \
20 << this << " " << __func__ << ": "
21
22 using namespace ceph::immutable_obj_cache;
23 using librbd::util::data_object_name;
24
25 namespace librbd {
26 namespace cache {
27
28 template <typename I>
29 ParentCacheObjectDispatch<I>::ParentCacheObjectDispatch(
30 I* image_ctx) : m_image_ctx(image_ctx), m_cache_client(nullptr),
31 m_initialized(false), m_connecting(false) {
32 ceph_assert(m_image_ctx->data_ctx.is_valid());
33 std::string controller_path =
34 ((CephContext*)(m_image_ctx->cct))->_conf.get_val<std::string>("immutable_object_cache_sock");
35 m_cache_client = new CacheClient(controller_path.c_str(), m_image_ctx->cct);
36 }
37
38 template <typename I>
39 ParentCacheObjectDispatch<I>::~ParentCacheObjectDispatch() {
40 delete m_cache_client;
41 }
42
43 template <typename I>
44 void ParentCacheObjectDispatch<I>::init(Context* on_finish) {
45 auto cct = m_image_ctx->cct;
46 ldout(cct, 5) << dendl;
47
48 if (m_image_ctx->parent != nullptr) {
49 ldout(cct, 5) << "child image: skipping" << dendl;
50 return;
51 }
52
53 Context* create_session_ctx = new LambdaContext([this, on_finish](int ret) {
54 m_connecting.store(false);
55 if (on_finish != nullptr) {
56 on_finish->complete(ret);
57 }
58 });
59
60 m_connecting.store(true);
61 create_cache_session(create_session_ctx, false);
62
63 m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
64 m_initialized = true;
65 }
66
67 template <typename I>
68 bool ParentCacheObjectDispatch<I>::read(
69 uint64_t object_no, uint64_t object_off,
70 uint64_t object_len, librados::snap_t snap_id, int op_flags,
71 const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
72 io::ExtentMap* extent_map, int* object_dispatch_flags,
73 io::DispatchResult* dispatch_result, Context** on_finish,
74 Context* on_dispatched) {
75 auto cct = m_image_ctx->cct;
76 ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
77 << object_len << dendl;
78 ceph_assert(m_initialized);
79 string oid = data_object_name(m_image_ctx, object_no);
80
81 /* if RO daemon still don't startup, or RO daemon crash,
82 * or session occur any error, try to re-connect daemon.*/
83 if (!m_cache_client->is_session_work()) {
84 if (!m_connecting.exchange(true)) {
85 /* Since we don't have a giant lock protecting the full re-connect process,
86 * if thread A first passes the if (!m_cache_client->is_session_work()),
87 * thread B could have also passed it and reconnected
88 * before thread A resumes and executes if (!m_connecting.exchange(true)).
89 * This will result in thread A re-connecting a working session.
90 * So, we need to check if session is normal again. If session work,
91 * we need set m_connecting to false. */
92 if (!m_cache_client->is_session_work()) {
93 Context* on_finish = new LambdaContext([this](int ret) {
94 m_connecting.store(false);
95 });
96 create_cache_session(on_finish, true);
97 } else {
98 m_connecting.store(false);
99 }
100 }
101 ldout(cct, 5) << "Parent cache try to re-connect to RO daemon. "
102 << "dispatch current request to lower object layer" << dendl;
103 return false;
104 }
105
106 ceph_assert(m_cache_client->is_session_work());
107
108 CacheGenContextURef ctx = make_gen_lambda_context<ObjectCacheRequest*,
109 std::function<void(ObjectCacheRequest*)>>
110 ([this, read_data, dispatch_result, on_dispatched,
111 oid, object_off, object_len](ObjectCacheRequest* ack) {
112 handle_read_cache(ack, object_off, object_len, read_data,
113 dispatch_result, on_dispatched);
114 });
115
116 m_cache_client->lookup_object(m_image_ctx->data_ctx.get_namespace(),
117 m_image_ctx->data_ctx.get_id(),
118 (uint64_t)snap_id, oid, std::move(ctx));
119 return true;
120 }
121
122 template <typename I>
123 void ParentCacheObjectDispatch<I>::handle_read_cache(
124 ObjectCacheRequest* ack, uint64_t read_off, uint64_t read_len,
125 ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
126 Context* on_dispatched) {
127 auto cct = m_image_ctx->cct;
128 ldout(cct, 20) << dendl;
129
130 if(ack->type != RBDSC_READ_REPLY) {
131 // go back to read rados
132 *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
133 on_dispatched->complete(0);
134 return;
135 }
136
137 ceph_assert(ack->type == RBDSC_READ_REPLY);
138 std::string file_path = ((ObjectCacheReadReplyData*)ack)->cache_path;
139 ceph_assert(file_path != "");
140
141 // try to read from parent image cache
142 int r = read_object(file_path, read_data, read_off, read_len, on_dispatched);
143 if(r < 0) {
144 // cache read error, fall back to read rados
145 *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
146 on_dispatched->complete(0);
147 return;
148 }
149
150 *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
151 on_dispatched->complete(r);
152 }
153
154 template <typename I>
155 int ParentCacheObjectDispatch<I>::handle_register_client(bool reg) {
156 auto cct = m_image_ctx->cct;
157 ldout(cct, 20) << dendl;
158
159 if (!reg) {
160 lderr(cct) << "Parent cache register fails." << dendl;
161 }
162 return 0;
163 }
164
165 template <typename I>
166 int ParentCacheObjectDispatch<I>::create_cache_session(Context* on_finish, bool is_reconnect) {
167 auto cct = m_image_ctx->cct;
168 ldout(cct, 20) << dendl;
169
170 Context* register_ctx = new LambdaContext([this, cct, on_finish](int ret) {
171 if (ret < 0) {
172 lderr(cct) << "Parent cache fail to register client." << dendl;
173 } else {
174 ceph_assert(m_cache_client->is_session_work());
175 }
176 handle_register_client(ret < 0 ? false : true);
177 on_finish->complete(ret);
178 });
179
180 Context* connect_ctx = new LambdaContext(
181 [this, cct, register_ctx](int ret) {
182 if (ret < 0) {
183 lderr(cct) << "Parent cache fail to connect RO daeomn." << dendl;
184 register_ctx->complete(ret);
185 return;
186 }
187
188 ldout(cct, 20) << "Parent cache connected to RO daemon." << dendl;
189
190 m_cache_client->register_client(register_ctx);
191 });
192
193 if (m_cache_client != nullptr && is_reconnect) {
194 // CacheClient's destruction will cleanup all details on old session.
195 delete m_cache_client;
196
197 // create new CacheClient to connect RO daemon.
198 std::string controller_path =
199 ((CephContext*)(m_image_ctx->cct))->_conf.get_val<std::string>("immutable_object_cache_sock");
200 m_cache_client = new CacheClient(controller_path.c_str(), m_image_ctx->cct);
201 }
202
203 m_cache_client->run();
204
205 m_cache_client->connect(connect_ctx);
206 return 0;
207 }
208
209 template <typename I>
210 int ParentCacheObjectDispatch<I>::read_object(
211 std::string file_path, ceph::bufferlist* read_data, uint64_t offset,
212 uint64_t length, Context *on_finish) {
213
214 auto *cct = m_image_ctx->cct;
215 ldout(cct, 20) << "file path: " << file_path << dendl;
216
217 std::string error;
218 int ret = read_data->pread_file(file_path.c_str(), offset, length, &error);
219 if (ret < 0) {
220 ldout(cct, 5) << "read from file return error: " << error
221 << "file path= " << file_path
222 << dendl;
223 return ret;
224 }
225 return read_data->length();
226 }
227
228 } // namespace cache
229 } // namespace librbd
230
231 template class librbd::cache::ParentCacheObjectDispatch<librbd::ImageCtx>;