1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/bind/bind.hpp>
5 #include "CacheClient.h"
6 #include "common/Cond.h"
7 #include "common/version.h"
9 #define dout_context g_ceph_context
10 #define dout_subsys ceph_subsys_immutable_obj_cache
12 #define dout_prefix *_dout << "ceph::cache::CacheClient: " << this << " " \
16 namespace immutable_obj_cache
{
18 CacheClient::CacheClient(const std::string
& file
, CephContext
* ceph_ctx
)
19 : m_cct(ceph_ctx
), m_io_service_work(m_io_service
),
20 m_dm_socket(m_io_service
), m_ep(stream_protocol::endpoint(file
)),
21 m_io_thread(nullptr), m_session_work(false), m_writing(false),
22 m_reading(false), m_sequence_id(0) {
24 m_cct
->_conf
.get_val
<uint64_t>(
25 "immutable_object_cache_client_dedicated_thread_num");
27 if (m_worker_thread_num
!= 0) {
28 m_worker
= new boost::asio::io_service();
29 m_worker_io_service_work
= new boost::asio::io_service::work(*m_worker
);
30 for (uint64_t i
= 0; i
< m_worker_thread_num
; i
++) {
31 std::thread
* thd
= new std::thread([this](){m_worker
->run();});
32 m_worker_threads
.push_back(thd
);
35 m_bp_header
= buffer::create(get_header_size());
38 CacheClient::~CacheClient() {
42 void CacheClient::run() {
43 m_io_thread
.reset(new std::thread([this](){m_io_service
.run(); }));
46 bool CacheClient::is_session_work() {
47 return m_session_work
.load() == true;
50 int CacheClient::stop() {
51 m_session_work
.store(false);
54 if (m_io_thread
!= nullptr) {
57 if (m_worker_thread_num
!= 0) {
59 for (auto thd
: m_worker_threads
) {
63 delete m_worker_io_service_work
;
69 // close domain socket
70 void CacheClient::close() {
71 m_session_work
.store(false);
72 boost::system::error_code close_ec
;
73 m_dm_socket
.close(close_ec
);
75 ldout(m_cct
, 20) << "close: " << close_ec
.message() << dendl
;
80 int CacheClient::connect() {
83 Context
* on_finish
= new LambdaContext([&cond
, &ret
](int err
) {
95 void CacheClient::connect(Context
* on_finish
) {
96 m_dm_socket
.async_connect(m_ep
,
97 boost::bind(&CacheClient::handle_connect
, this,
98 on_finish
, boost::asio::placeholders::error
));
101 void CacheClient::handle_connect(Context
* on_finish
,
102 const boost::system::error_code
& err
) {
104 ldout(m_cct
, 20) << "fails to connect to cache server. error : "
105 << err
.message() << dendl
;
106 fault(ASIO_ERROR_CONNECT
, err
);
107 on_finish
->complete(-1);
111 ldout(m_cct
, 20) << "successfully connected to cache server." << dendl
;
112 on_finish
->complete(0);
115 void CacheClient::lookup_object(std::string pool_nspace
, uint64_t pool_id
,
116 uint64_t snap_id
, uint64_t object_size
,
118 CacheGenContextURef
&& on_finish
) {
119 ldout(m_cct
, 20) << dendl
;
120 ObjectCacheRequest
* req
= new ObjectCacheReadData(RBDSC_READ
,
121 ++m_sequence_id
, 0, 0, pool_id
,
122 snap_id
, object_size
, oid
, pool_nspace
);
123 req
->process_msg
= std::move(on_finish
);
127 std::lock_guard locker
{m_lock
};
128 m_outcoming_bl
.append(req
->get_payload_bufferlist());
129 ceph_assert(m_seq_to_req
.find(req
->seq
) == m_seq_to_req
.end());
130 m_seq_to_req
[req
->seq
] = req
;
133 // try to send message to server.
136 // try to receive ack from server.
140 void CacheClient::try_send() {
141 ldout(m_cct
, 20) << dendl
;
142 if (!m_writing
.load()) {
143 m_writing
.store(true);
148 void CacheClient::send_message() {
149 ldout(m_cct
, 20) << dendl
;
152 std::lock_guard locker
{m_lock
};
153 bl
.swap(m_outcoming_bl
);
154 ceph_assert(m_outcoming_bl
.length() == 0);
157 // send bytes as many as possible.
158 boost::asio::async_write(m_dm_socket
,
159 boost::asio::buffer(bl
.c_str(), bl
.length()),
160 boost::asio::transfer_exactly(bl
.length()),
161 [this, bl
](const boost::system::error_code
& err
, size_t cb
) {
162 if (err
|| cb
!= bl
.length()) {
163 fault(ASIO_ERROR_WRITE
, err
);
167 ceph_assert(cb
== bl
.length());
170 std::lock_guard locker
{m_lock
};
171 if (m_outcoming_bl
.length() == 0) {
172 m_writing
.store(false);
177 // still have left bytes, continue to send.
183 void CacheClient::try_receive() {
184 ldout(m_cct
, 20) << dendl
;
185 if (!m_reading
.load()) {
186 m_reading
.store(true);
191 void CacheClient::receive_message() {
192 ldout(m_cct
, 20) << dendl
;
193 ceph_assert(m_reading
.load());
197 void CacheClient::read_reply_header() {
198 ldout(m_cct
, 20) << dendl
;
199 /* create new head buffer for every reply */
200 bufferptr
bp_head(buffer::create(get_header_size()));
201 auto raw_ptr
= bp_head
.c_str();
203 boost::asio::async_read(m_dm_socket
,
204 boost::asio::buffer(raw_ptr
, get_header_size()),
205 boost::asio::transfer_exactly(get_header_size()),
206 boost::bind(&CacheClient::handle_reply_header
,
208 boost::asio::placeholders::error
,
209 boost::asio::placeholders::bytes_transferred
));
212 void CacheClient::handle_reply_header(bufferptr bp_head
,
213 const boost::system::error_code
& ec
,
214 size_t bytes_transferred
) {
215 ldout(m_cct
, 20) << dendl
;
216 if (ec
|| bytes_transferred
!= get_header_size()) {
217 fault(ASIO_ERROR_READ
, ec
);
221 ceph_assert(bytes_transferred
== bp_head
.length());
223 uint32_t data_len
= get_data_len(bp_head
.c_str());
225 bufferptr
bp_data(buffer::create(data_len
));
226 read_reply_data(std::move(bp_head
), std::move(bp_data
), data_len
);
229 void CacheClient::read_reply_data(bufferptr
&& bp_head
,
231 const uint64_t data_len
) {
232 ldout(m_cct
, 20) << dendl
;
233 auto raw_ptr
= bp_data
.c_str();
234 boost::asio::async_read(m_dm_socket
, boost::asio::buffer(raw_ptr
, data_len
),
235 boost::asio::transfer_exactly(data_len
),
236 boost::bind(&CacheClient::handle_reply_data
,
237 this, std::move(bp_head
), std::move(bp_data
), data_len
,
238 boost::asio::placeholders::error
,
239 boost::asio::placeholders::bytes_transferred
));
242 void CacheClient::handle_reply_data(bufferptr bp_head
,
244 const uint64_t data_len
,
245 const boost::system::error_code
& ec
,
246 size_t bytes_transferred
) {
247 ldout(m_cct
, 20) << dendl
;
248 if (ec
|| bytes_transferred
!= data_len
) {
249 fault(ASIO_ERROR_WRITE
, ec
);
252 ceph_assert(bp_data
.length() == data_len
);
254 bufferlist data_buffer
;
255 data_buffer
.append(std::move(bp_head
));
256 data_buffer
.append(std::move(bp_data
));
258 ObjectCacheRequest
* reply
= decode_object_cache_request(data_buffer
);
260 ceph_assert(data_buffer
.length() == 0);
262 process(reply
, reply
->seq
);
265 std::lock_guard locker
{m_lock
};
266 if (m_seq_to_req
.size() == 0 && m_outcoming_bl
.length()) {
267 m_reading
.store(false);
271 if (is_session_work()) {
276 void CacheClient::process(ObjectCacheRequest
* reply
, uint64_t seq_id
) {
277 ldout(m_cct
, 20) << dendl
;
278 ObjectCacheRequest
* current_request
= nullptr;
280 std::lock_guard locker
{m_lock
};
281 ceph_assert(m_seq_to_req
.find(seq_id
) != m_seq_to_req
.end());
282 current_request
= m_seq_to_req
[seq_id
];
283 m_seq_to_req
.erase(seq_id
);
286 ceph_assert(current_request
!= nullptr);
287 auto process_reply
= new LambdaContext([current_request
, reply
]
290 // dedicated thrad to execute this context.
292 current_request
->process_msg
.release()->complete(reply
);
293 delete current_request
;
297 if (m_worker_thread_num
!= 0) {
298 m_worker
->post([process_reply
]() {
299 process_reply
->complete(true);
302 process_reply
->complete(false);
306 // if there is one request fails, just execute fault, then shutdown RO.
307 void CacheClient::fault(const int err_type
,
308 const boost::system::error_code
& ec
) {
309 ldout(m_cct
, 20) << "fault." << ec
.message() << dendl
;
311 if (err_type
== ASIO_ERROR_CONNECT
) {
312 ceph_assert(!m_session_work
.load());
313 if (ec
== boost::asio::error::connection_refused
) {
314 ldout(m_cct
, 20) << "Connecting RO daenmon fails : "<< ec
.message()
315 << ". Immutable-object-cache daemon is down ? "
316 << "Data will be read from ceph cluster " << dendl
;
318 ldout(m_cct
, 20) << "Connecting RO daemon fails : "
319 << ec
.message() << dendl
;
322 if (m_dm_socket
.is_open()) {
323 // Set to indicate what error occurred, if any.
324 // Note that, even if the function indicates an error,
325 // the underlying descriptor is closed.
326 boost::system::error_code close_ec
;
327 m_dm_socket
.close(close_ec
);
329 ldout(m_cct
, 20) << "close: " << close_ec
.message() << dendl
;
335 if (!m_session_work
.load()) {
339 /* when current session don't work, ASIO will don't receive any new request from hook.
340 * On the other hand, for pending request of ASIO, cancle these request,
341 * then call their callback. these request which are cancled by this method,
342 * will be re-dispatched to RADOS layer.
343 * make sure just have one thread to modify execute below code. */
344 m_session_work
.store(false);
346 if (err_type
== ASIO_ERROR_MSG_INCOMPLETE
) {
347 ldout(m_cct
, 20) << "ASIO In-complete message." << ec
.message() << dendl
;
351 if (err_type
== ASIO_ERROR_READ
) {
352 ldout(m_cct
, 20) << "ASIO async read fails : " << ec
.message() << dendl
;
355 if (err_type
== ASIO_ERROR_WRITE
) {
356 ldout(m_cct
, 20) << "ASIO asyn write fails : " << ec
.message() << dendl
;
357 // CacheClient should not occur this error.
361 // currently, for any asio error, just shutdown RO.
364 /* all pending request, which have entered into ASIO,
365 * will be re-dispatched to RADOS.*/
367 std::lock_guard locker
{m_lock
};
368 for (auto it
: m_seq_to_req
) {
369 it
.second
->type
= RBDSC_READ_RADOS
;
370 it
.second
->process_msg
->complete(it
.second
);
372 m_seq_to_req
.clear();
375 ldout(m_cct
, 20) << "Because ASIO domain socket fails, just shutdown RO.\
376 Later all reading will be re-dispatched RADOS layer"
377 << ec
.message() << dendl
;
380 // TODO : re-implement this method
381 int CacheClient::register_client(Context
* on_finish
) {
382 ObjectCacheRequest
* reg_req
= new ObjectCacheRegData(RBDSC_REGISTER
,
384 ceph_version_to_str());
388 bl
.append(reg_req
->get_payload_bufferlist());
391 boost::system::error_code ec
;
393 ret
= boost::asio::write(m_dm_socket
,
394 boost::asio::buffer(bl
.c_str(), bl
.length()), ec
);
396 if (ec
|| ret
!= bl
.length()) {
397 fault(ASIO_ERROR_WRITE
, ec
);
402 ret
= boost::asio::read(m_dm_socket
,
403 boost::asio::buffer(m_bp_header
.c_str(), get_header_size()), ec
);
404 if (ec
|| ret
!= get_header_size()) {
405 fault(ASIO_ERROR_READ
, ec
);
409 uint64_t data_len
= get_data_len(m_bp_header
.c_str());
410 bufferptr
bp_data(buffer::create(data_len
));
412 ret
= boost::asio::read(m_dm_socket
, boost::asio::buffer(bp_data
.c_str(),
414 if (ec
|| ret
!= data_len
) {
415 fault(ASIO_ERROR_READ
, ec
);
419 bufferlist data_buffer
;
420 data_buffer
.append(m_bp_header
);
421 data_buffer
.append(std::move(bp_data
));
422 ObjectCacheRequest
* req
= decode_object_cache_request(data_buffer
);
423 if (req
->type
== RBDSC_REGISTER_REPLY
) {
424 m_session_work
.store(true);
425 on_finish
->complete(0);
427 on_finish
->complete(-1);
434 } // namespace immutable_obj_cache