]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/immutable_object_cache/CacheClient.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / tools / immutable_object_cache / CacheClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/bind/bind.hpp>
5 #include "CacheClient.h"
6 #include "common/Cond.h"
7 #include "common/version.h"
8
9 #define dout_context g_ceph_context
10 #define dout_subsys ceph_subsys_immutable_obj_cache
11 #undef dout_prefix
12 #define dout_prefix *_dout << "ceph::cache::CacheClient: " << this << " " \
13 << __func__ << ": "
14
15 namespace ceph {
16 namespace immutable_obj_cache {
17
18 CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
19 : m_cct(ceph_ctx), m_io_service_work(m_io_service),
20 m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
21 m_io_thread(nullptr), m_session_work(false), m_writing(false),
22 m_reading(false), m_sequence_id(0) {
23 m_worker_thread_num =
24 m_cct->_conf.get_val<uint64_t>(
25 "immutable_object_cache_client_dedicated_thread_num");
26
27 if (m_worker_thread_num != 0) {
28 m_worker = new boost::asio::io_service();
29 m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
30 for (uint64_t i = 0; i < m_worker_thread_num; i++) {
31 std::thread* thd = new std::thread([this](){m_worker->run();});
32 m_worker_threads.push_back(thd);
33 }
34 }
35 m_bp_header = buffer::create(get_header_size());
36 }
37
38 CacheClient::~CacheClient() {
39 stop();
40 }
41
42 void CacheClient::run() {
43 m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
44 }
45
46 bool CacheClient::is_session_work() {
47 return m_session_work.load() == true;
48 }
49
50 int CacheClient::stop() {
51 m_session_work.store(false);
52 m_io_service.stop();
53
54 if (m_io_thread != nullptr) {
55 m_io_thread->join();
56 }
57 if (m_worker_thread_num != 0) {
58 m_worker->stop();
59 for (auto thd : m_worker_threads) {
60 thd->join();
61 delete thd;
62 }
63 delete m_worker_io_service_work;
64 delete m_worker;
65 }
66 return 0;
67 }
68
69 // close domain socket
70 void CacheClient::close() {
71 m_session_work.store(false);
72 boost::system::error_code close_ec;
73 m_dm_socket.close(close_ec);
74 if (close_ec) {
75 ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
76 }
77 }
78
79 // sync connect
80 int CacheClient::connect() {
81 int ret = -1;
82 C_SaferCond cond;
83 Context* on_finish = new LambdaContext([&cond, &ret](int err) {
84 ret = err;
85 cond.complete(err);
86 });
87
88 connect(on_finish);
89 cond.wait();
90
91 return ret;
92 }
93
94 // async connect
95 void CacheClient::connect(Context* on_finish) {
96 m_dm_socket.async_connect(m_ep,
97 boost::bind(&CacheClient::handle_connect, this,
98 on_finish, boost::asio::placeholders::error));
99 }
100
101 void CacheClient::handle_connect(Context* on_finish,
102 const boost::system::error_code& err) {
103 if (err) {
104 ldout(m_cct, 20) << "fails to connect to cache server. error : "
105 << err.message() << dendl;
106 fault(ASIO_ERROR_CONNECT, err);
107 on_finish->complete(-1);
108 return;
109 }
110
111 ldout(m_cct, 20) << "successfully connected to cache server." << dendl;
112 on_finish->complete(0);
113 }
114
115 void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
116 uint64_t snap_id, uint64_t object_size,
117 std::string oid,
118 CacheGenContextURef&& on_finish) {
119 ldout(m_cct, 20) << dendl;
120 ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
121 ++m_sequence_id, 0, 0, pool_id,
122 snap_id, object_size, oid, pool_nspace);
123 req->process_msg = std::move(on_finish);
124 req->encode();
125
126 {
127 std::lock_guard locker{m_lock};
128 m_outcoming_bl.append(req->get_payload_bufferlist());
129 ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end());
130 m_seq_to_req[req->seq] = req;
131 }
132
133 // try to send message to server.
134 try_send();
135
136 // try to receive ack from server.
137 try_receive();
138 }
139
140 void CacheClient::try_send() {
141 ldout(m_cct, 20) << dendl;
142 if (!m_writing.load()) {
143 m_writing.store(true);
144 send_message();
145 }
146 }
147
148 void CacheClient::send_message() {
149 ldout(m_cct, 20) << dendl;
150 bufferlist bl;
151 {
152 std::lock_guard locker{m_lock};
153 bl.swap(m_outcoming_bl);
154 ceph_assert(m_outcoming_bl.length() == 0);
155 }
156
157 // send bytes as many as possible.
158 boost::asio::async_write(m_dm_socket,
159 boost::asio::buffer(bl.c_str(), bl.length()),
160 boost::asio::transfer_exactly(bl.length()),
161 [this, bl](const boost::system::error_code& err, size_t cb) {
162 if (err || cb != bl.length()) {
163 fault(ASIO_ERROR_WRITE, err);
164 return;
165 }
166
167 ceph_assert(cb == bl.length());
168
169 {
170 std::lock_guard locker{m_lock};
171 if (m_outcoming_bl.length() == 0) {
172 m_writing.store(false);
173 return;
174 }
175 }
176
177 // still have left bytes, continue to send.
178 send_message();
179 });
180 try_receive();
181 }
182
183 void CacheClient::try_receive() {
184 ldout(m_cct, 20) << dendl;
185 if (!m_reading.load()) {
186 m_reading.store(true);
187 receive_message();
188 }
189 }
190
191 void CacheClient::receive_message() {
192 ldout(m_cct, 20) << dendl;
193 ceph_assert(m_reading.load());
194 read_reply_header();
195 }
196
197 void CacheClient::read_reply_header() {
198 ldout(m_cct, 20) << dendl;
199 /* create new head buffer for every reply */
200 bufferptr bp_head(buffer::create(get_header_size()));
201 auto raw_ptr = bp_head.c_str();
202
203 boost::asio::async_read(m_dm_socket,
204 boost::asio::buffer(raw_ptr, get_header_size()),
205 boost::asio::transfer_exactly(get_header_size()),
206 boost::bind(&CacheClient::handle_reply_header,
207 this, bp_head,
208 boost::asio::placeholders::error,
209 boost::asio::placeholders::bytes_transferred));
210 }
211
212 void CacheClient::handle_reply_header(bufferptr bp_head,
213 const boost::system::error_code& ec,
214 size_t bytes_transferred) {
215 ldout(m_cct, 20) << dendl;
216 if (ec || bytes_transferred != get_header_size()) {
217 fault(ASIO_ERROR_READ, ec);
218 return;
219 }
220
221 ceph_assert(bytes_transferred == bp_head.length());
222
223 uint32_t data_len = get_data_len(bp_head.c_str());
224
225 bufferptr bp_data(buffer::create(data_len));
226 read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
227 }
228
229 void CacheClient::read_reply_data(bufferptr&& bp_head,
230 bufferptr&& bp_data,
231 const uint64_t data_len) {
232 ldout(m_cct, 20) << dendl;
233 auto raw_ptr = bp_data.c_str();
234 boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
235 boost::asio::transfer_exactly(data_len),
236 boost::bind(&CacheClient::handle_reply_data,
237 this, std::move(bp_head), std::move(bp_data), data_len,
238 boost::asio::placeholders::error,
239 boost::asio::placeholders::bytes_transferred));
240 }
241
242 void CacheClient::handle_reply_data(bufferptr bp_head,
243 bufferptr bp_data,
244 const uint64_t data_len,
245 const boost::system::error_code& ec,
246 size_t bytes_transferred) {
247 ldout(m_cct, 20) << dendl;
248 if (ec || bytes_transferred != data_len) {
249 fault(ASIO_ERROR_WRITE, ec);
250 return;
251 }
252 ceph_assert(bp_data.length() == data_len);
253
254 bufferlist data_buffer;
255 data_buffer.append(std::move(bp_head));
256 data_buffer.append(std::move(bp_data));
257
258 ObjectCacheRequest* reply = decode_object_cache_request(data_buffer);
259 data_buffer.clear();
260 ceph_assert(data_buffer.length() == 0);
261
262 process(reply, reply->seq);
263
264 {
265 std::lock_guard locker{m_lock};
266 if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
267 m_reading.store(false);
268 return;
269 }
270 }
271 if (is_session_work()) {
272 receive_message();
273 }
274 }
275
276 void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
277 ldout(m_cct, 20) << dendl;
278 ObjectCacheRequest* current_request = nullptr;
279 {
280 std::lock_guard locker{m_lock};
281 ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
282 current_request = m_seq_to_req[seq_id];
283 m_seq_to_req.erase(seq_id);
284 }
285
286 ceph_assert(current_request != nullptr);
287 auto process_reply = new LambdaContext([current_request, reply]
288 (bool dedicated) {
289 if (dedicated) {
290 // dedicated thrad to execute this context.
291 }
292 current_request->process_msg.release()->complete(reply);
293 delete current_request;
294 delete reply;
295 });
296
297 if (m_worker_thread_num != 0) {
298 m_worker->post([process_reply]() {
299 process_reply->complete(true);
300 });
301 } else {
302 process_reply->complete(false);
303 }
304 }
305
306 // if there is one request fails, just execute fault, then shutdown RO.
307 void CacheClient::fault(const int err_type,
308 const boost::system::error_code& ec) {
309 ldout(m_cct, 20) << "fault." << ec.message() << dendl;
310
311 if (err_type == ASIO_ERROR_CONNECT) {
312 ceph_assert(!m_session_work.load());
313 if (ec == boost::asio::error::connection_refused) {
314 ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
315 << ". Immutable-object-cache daemon is down ? "
316 << "Data will be read from ceph cluster " << dendl;
317 } else {
318 ldout(m_cct, 20) << "Connecting RO daemon fails : "
319 << ec.message() << dendl;
320 }
321
322 if (m_dm_socket.is_open()) {
323 // Set to indicate what error occurred, if any.
324 // Note that, even if the function indicates an error,
325 // the underlying descriptor is closed.
326 boost::system::error_code close_ec;
327 m_dm_socket.close(close_ec);
328 if (close_ec) {
329 ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
330 }
331 }
332 return;
333 }
334
335 if (!m_session_work.load()) {
336 return;
337 }
338
339 /* when current session don't work, ASIO will don't receive any new request from hook.
340 * On the other hand, for pending request of ASIO, cancle these request,
341 * then call their callback. these request which are cancled by this method,
342 * will be re-dispatched to RADOS layer.
343 * make sure just have one thread to modify execute below code. */
344 m_session_work.store(false);
345
346 if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
347 ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
348 ceph_assert(0);
349 }
350
351 if (err_type == ASIO_ERROR_READ) {
352 ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
353 }
354
355 if (err_type == ASIO_ERROR_WRITE) {
356 ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
357 // CacheClient should not occur this error.
358 ceph_assert(0);
359 }
360
361 // currently, for any asio error, just shutdown RO.
362 close();
363
364 /* all pending request, which have entered into ASIO,
365 * will be re-dispatched to RADOS.*/
366 {
367 std::lock_guard locker{m_lock};
368 for (auto it : m_seq_to_req) {
369 it.second->type = RBDSC_READ_RADOS;
370 it.second->process_msg->complete(it.second);
371 }
372 m_seq_to_req.clear();
373 }
374
375 ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
376 Later all reading will be re-dispatched RADOS layer"
377 << ec.message() << dendl;
378 }
379
380 // TODO : re-implement this method
381 int CacheClient::register_client(Context* on_finish) {
382 ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
383 m_sequence_id++,
384 ceph_version_to_str());
385 reg_req->encode();
386
387 bufferlist bl;
388 bl.append(reg_req->get_payload_bufferlist());
389
390 uint64_t ret;
391 boost::system::error_code ec;
392
393 ret = boost::asio::write(m_dm_socket,
394 boost::asio::buffer(bl.c_str(), bl.length()), ec);
395
396 if (ec || ret != bl.length()) {
397 fault(ASIO_ERROR_WRITE, ec);
398 return -1;
399 }
400 delete reg_req;
401
402 ret = boost::asio::read(m_dm_socket,
403 boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec);
404 if (ec || ret != get_header_size()) {
405 fault(ASIO_ERROR_READ, ec);
406 return -1;
407 }
408
409 uint64_t data_len = get_data_len(m_bp_header.c_str());
410 bufferptr bp_data(buffer::create(data_len));
411
412 ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
413 data_len), ec);
414 if (ec || ret != data_len) {
415 fault(ASIO_ERROR_READ, ec);
416 return -1;
417 }
418
419 bufferlist data_buffer;
420 data_buffer.append(m_bp_header);
421 data_buffer.append(std::move(bp_data));
422 ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
423 if (req->type == RBDSC_REGISTER_REPLY) {
424 m_session_work.store(true);
425 on_finish->complete(0);
426 } else {
427 on_finish->complete(-1);
428 }
429
430 delete req;
431 return 0;
432 }
433
434 } // namespace immutable_obj_cache
435 } // namespace ceph