]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/immutable_object_cache/CacheClient.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / tools / immutable_object_cache / CacheClient.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "CacheClient.h"
5#include "common/Cond.h"
f91f0fd5 6#include "common/version.h"
9f95a23c
TL
7
8#define dout_context g_ceph_context
9#define dout_subsys ceph_subsys_immutable_obj_cache
10#undef dout_prefix
11#define dout_prefix *_dout << "ceph::cache::CacheClient: " << this << " " \
12 << __func__ << ": "
13
14namespace ceph {
15namespace immutable_obj_cache {
16
17 CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
18 : m_cct(ceph_ctx), m_io_service_work(m_io_service),
19 m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
20 m_io_thread(nullptr), m_session_work(false), m_writing(false),
21 m_reading(false), m_sequence_id(0) {
22 m_worker_thread_num =
23 m_cct->_conf.get_val<uint64_t>(
24 "immutable_object_cache_client_dedicated_thread_num");
25
26 if (m_worker_thread_num != 0) {
27 m_worker = new boost::asio::io_service();
28 m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
29 for (uint64_t i = 0; i < m_worker_thread_num; i++) {
30 std::thread* thd = new std::thread([this](){m_worker->run();});
31 m_worker_threads.push_back(thd);
32 }
33 }
34 m_bp_header = buffer::create(get_header_size());
35 }
36
37 CacheClient::~CacheClient() {
38 stop();
39 }
40
41 void CacheClient::run() {
42 m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
43 }
44
45 bool CacheClient::is_session_work() {
46 return m_session_work.load() == true;
47 }
48
49 int CacheClient::stop() {
50 m_session_work.store(false);
51 m_io_service.stop();
52
53 if (m_io_thread != nullptr) {
54 m_io_thread->join();
55 }
56 if (m_worker_thread_num != 0) {
57 m_worker->stop();
58 for (auto thd : m_worker_threads) {
59 thd->join();
60 delete thd;
61 }
62 delete m_worker;
63 }
64 return 0;
65 }
66
67 // close domain socket
68 void CacheClient::close() {
69 m_session_work.store(false);
70 boost::system::error_code close_ec;
71 m_dm_socket.close(close_ec);
72 if (close_ec) {
73 ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
74 }
75 }
76
77 // sync connect
78 int CacheClient::connect() {
79 int ret = -1;
80 C_SaferCond cond;
81 Context* on_finish = new LambdaContext([&cond, &ret](int err) {
82 ret = err;
83 cond.complete(err);
84 });
85
86 connect(on_finish);
87 cond.wait();
88
89 return ret;
90 }
91
92 // async connect
93 void CacheClient::connect(Context* on_finish) {
94 m_dm_socket.async_connect(m_ep,
95 boost::bind(&CacheClient::handle_connect, this,
96 on_finish, boost::asio::placeholders::error));
97 }
98
99 void CacheClient::handle_connect(Context* on_finish,
100 const boost::system::error_code& err) {
101 if (err) {
102 ldout(m_cct, 20) << "fails to connect to cache server. error : "
103 << err.message() << dendl;
104 fault(ASIO_ERROR_CONNECT, err);
105 on_finish->complete(-1);
106 return;
107 }
108
109 ldout(m_cct, 20) << "successfully connected to cache server." << dendl;
110 on_finish->complete(0);
111 }
112
113 void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
114 uint64_t snap_id, std::string oid,
115 CacheGenContextURef&& on_finish) {
116 ldout(m_cct, 20) << dendl;
117 ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
118 ++m_sequence_id, 0, 0,
119 pool_id, snap_id, oid, pool_nspace);
120 req->process_msg = std::move(on_finish);
121 req->encode();
122
123 {
124 std::lock_guard locker{m_lock};
125 m_outcoming_bl.append(req->get_payload_bufferlist());
126 ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end());
127 m_seq_to_req[req->seq] = req;
128 }
129
130 // try to send message to server.
131 try_send();
132
133 // try to receive ack from server.
134 try_receive();
135 }
136
137 void CacheClient::try_send() {
138 ldout(m_cct, 20) << dendl;
139 if (!m_writing.load()) {
140 m_writing.store(true);
141 send_message();
142 }
143 }
144
145 void CacheClient::send_message() {
146 ldout(m_cct, 20) << dendl;
147 bufferlist bl;
148 {
149 std::lock_guard locker{m_lock};
150 bl.swap(m_outcoming_bl);
151 ceph_assert(m_outcoming_bl.length() == 0);
152 }
153
154 // send bytes as many as possible.
155 boost::asio::async_write(m_dm_socket,
156 boost::asio::buffer(bl.c_str(), bl.length()),
157 boost::asio::transfer_exactly(bl.length()),
158 [this, bl](const boost::system::error_code& err, size_t cb) {
159 if (err || cb != bl.length()) {
160 fault(ASIO_ERROR_WRITE, err);
161 return;
162 }
163
164 ceph_assert(cb == bl.length());
165
166 {
167 std::lock_guard locker{m_lock};
168 if (m_outcoming_bl.length() == 0) {
169 m_writing.store(false);
170 return;
171 }
172 }
173
174 // still have left bytes, continue to send.
175 send_message();
176 });
177 try_receive();
178 }
179
180 void CacheClient::try_receive() {
181 ldout(m_cct, 20) << dendl;
182 if (!m_reading.load()) {
183 m_reading.store(true);
184 receive_message();
185 }
186 }
187
188 void CacheClient::receive_message() {
189 ldout(m_cct, 20) << dendl;
190 ceph_assert(m_reading.load());
191 read_reply_header();
192 }
193
194 void CacheClient::read_reply_header() {
195 ldout(m_cct, 20) << dendl;
196 /* create new head buffer for every reply */
197 bufferptr bp_head(buffer::create(get_header_size()));
198 auto raw_ptr = bp_head.c_str();
199
200 boost::asio::async_read(m_dm_socket,
201 boost::asio::buffer(raw_ptr, get_header_size()),
202 boost::asio::transfer_exactly(get_header_size()),
203 boost::bind(&CacheClient::handle_reply_header,
204 this, bp_head,
205 boost::asio::placeholders::error,
206 boost::asio::placeholders::bytes_transferred));
207 }
208
209 void CacheClient::handle_reply_header(bufferptr bp_head,
210 const boost::system::error_code& ec,
211 size_t bytes_transferred) {
212 ldout(m_cct, 20) << dendl;
213 if (ec || bytes_transferred != get_header_size()) {
214 fault(ASIO_ERROR_READ, ec);
215 return;
216 }
217
218 ceph_assert(bytes_transferred == bp_head.length());
219
220 uint32_t data_len = get_data_len(bp_head.c_str());
221
222 bufferptr bp_data(buffer::create(data_len));
223 read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
224 }
225
226 void CacheClient::read_reply_data(bufferptr&& bp_head,
227 bufferptr&& bp_data,
228 const uint64_t data_len) {
229 ldout(m_cct, 20) << dendl;
230 auto raw_ptr = bp_data.c_str();
231 boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
232 boost::asio::transfer_exactly(data_len),
233 boost::bind(&CacheClient::handle_reply_data,
234 this, std::move(bp_head), std::move(bp_data), data_len,
235 boost::asio::placeholders::error,
236 boost::asio::placeholders::bytes_transferred));
237 }
238
239 void CacheClient::handle_reply_data(bufferptr bp_head,
240 bufferptr bp_data,
241 const uint64_t data_len,
242 const boost::system::error_code& ec,
243 size_t bytes_transferred) {
244 ldout(m_cct, 20) << dendl;
245 if (ec || bytes_transferred != data_len) {
246 fault(ASIO_ERROR_WRITE, ec);
247 return;
248 }
249 ceph_assert(bp_data.length() == data_len);
250
251 bufferlist data_buffer;
252 data_buffer.append(std::move(bp_head));
253 data_buffer.append(std::move(bp_data));
254
255 ObjectCacheRequest* reply = decode_object_cache_request(data_buffer);
256 data_buffer.clear();
257 ceph_assert(data_buffer.length() == 0);
258
259 process(reply, reply->seq);
260
261 {
262 std::lock_guard locker{m_lock};
263 if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
264 m_reading.store(false);
265 return;
266 }
267 }
268 if (is_session_work()) {
269 receive_message();
270 }
271 }
272
273 void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
274 ldout(m_cct, 20) << dendl;
275 ObjectCacheRequest* current_request = nullptr;
276 {
277 std::lock_guard locker{m_lock};
278 ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
279 current_request = m_seq_to_req[seq_id];
280 m_seq_to_req.erase(seq_id);
281 }
282
283 ceph_assert(current_request != nullptr);
284 auto process_reply = new LambdaContext([current_request, reply]
285 (bool dedicated) {
286 if (dedicated) {
287 // dedicated thrad to execute this context.
288 }
289 current_request->process_msg.release()->complete(reply);
290 delete current_request;
291 delete reply;
292 });
293
294 if (m_worker_thread_num != 0) {
295 m_worker->post([process_reply]() {
296 process_reply->complete(true);
297 });
298 } else {
299 process_reply->complete(false);
300 }
301 }
302
303 // if there is one request fails, just execute fault, then shutdown RO.
304 void CacheClient::fault(const int err_type,
305 const boost::system::error_code& ec) {
306 ldout(m_cct, 20) << "fault." << ec.message() << dendl;
307
308 if (err_type == ASIO_ERROR_CONNECT) {
309 ceph_assert(!m_session_work.load());
310 if (ec == boost::asio::error::connection_refused) {
311 ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
312 << ". Immutable-object-cache daemon is down ? "
313 << "Data will be read from ceph cluster " << dendl;
314 } else {
315 ldout(m_cct, 20) << "Connecting RO daemon fails : "
316 << ec.message() << dendl;
317 }
318
319 if (m_dm_socket.is_open()) {
320 // Set to indicate what error occurred, if any.
321 // Note that, even if the function indicates an error,
322 // the underlying descriptor is closed.
323 boost::system::error_code close_ec;
324 m_dm_socket.close(close_ec);
325 if (close_ec) {
326 ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
327 }
328 }
329 return;
330 }
331
332 if (!m_session_work.load()) {
333 return;
334 }
335
336 /* when current session don't work, ASIO will don't receive any new request from hook.
337 * On the other hand, for pending request of ASIO, cancle these request,
338 * then call their callback. these request which are cancled by this method,
339 * will be re-dispatched to RADOS layer.
340 * make sure just have one thread to modify execute below code. */
341 m_session_work.store(false);
342
343 if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
344 ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
345 ceph_assert(0);
346 }
347
348 if (err_type == ASIO_ERROR_READ) {
349 ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
350 }
351
352 if (err_type == ASIO_ERROR_WRITE) {
353 ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
354 // CacheClient should not occur this error.
355 ceph_assert(0);
356 }
357
358 // currently, for any asio error, just shutdown RO.
359 close();
360
361 /* all pending request, which have entered into ASIO,
362 * will be re-dispatched to RADOS.*/
363 {
364 std::lock_guard locker{m_lock};
365 for (auto it : m_seq_to_req) {
366 it.second->type = RBDSC_READ_RADOS;
367 it.second->process_msg->complete(it.second);
368 }
369 m_seq_to_req.clear();
370 }
371
372 ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
373 Later all reading will be re-dispatched RADOS layer"
374 << ec.message() << dendl;
375 }
376
377 // TODO : re-implement this method
378 int CacheClient::register_client(Context* on_finish) {
379 ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
f91f0fd5
TL
380 m_sequence_id++,
381 ceph_version_to_str());
9f95a23c
TL
382 reg_req->encode();
383
384 bufferlist bl;
385 bl.append(reg_req->get_payload_bufferlist());
386
387 uint64_t ret;
388 boost::system::error_code ec;
389
390 ret = boost::asio::write(m_dm_socket,
391 boost::asio::buffer(bl.c_str(), bl.length()), ec);
392
393 if (ec || ret != bl.length()) {
394 fault(ASIO_ERROR_WRITE, ec);
395 return -1;
396 }
397 delete reg_req;
398
399 ret = boost::asio::read(m_dm_socket,
400 boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec);
401 if (ec || ret != get_header_size()) {
402 fault(ASIO_ERROR_READ, ec);
403 return -1;
404 }
405
406 uint64_t data_len = get_data_len(m_bp_header.c_str());
407 bufferptr bp_data(buffer::create(data_len));
408
409 ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
410 data_len), ec);
411 if (ec || ret != data_len) {
412 fault(ASIO_ERROR_READ, ec);
413 return -1;
414 }
415
416 bufferlist data_buffer;
417 data_buffer.append(m_bp_header);
418 data_buffer.append(std::move(bp_data));
419 ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
420 if (req->type == RBDSC_REGISTER_REPLY) {
421 m_session_work.store(true);
422 on_finish->complete(0);
423 } else {
424 on_finish->complete(-1);
425 }
426
427 delete req;
428 return 0;
429 }
430
431} // namespace immutable_obj_cache
432} // namespace ceph