1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_CACHE_CACHE_CLIENT_H
5 #define CEPH_CACHE_CACHE_CLIENT_H
8 #include <boost/asio.hpp>
9 #include <boost/bind.hpp>
10 #include <boost/asio/error.hpp>
11 #include <boost/algorithm/string.hpp>
13 #include "include/ceph_assert.h"
14 #include "common/ceph_mutex.h"
15 #include "include/Context.h"
17 #include "SocketCommon.h"
20 using boost::asio::local::stream_protocol
;
23 namespace immutable_obj_cache
{
27 CacheClient(const std::string
& file
, CephContext
* ceph_ctx
);
30 bool is_session_work();
34 void connect(Context
* on_finish
);
35 void lookup_object(std::string pool_nspace
, uint64_t pool_id
,
36 uint64_t snap_id
, std::string oid
,
37 CacheGenContextURef
&& on_finish
);
38 int register_client(Context
* on_finish
);
43 void fault(const int err_type
, const boost::system::error_code
& err
);
44 void handle_connect(Context
* on_finish
, const boost::system::error_code
& err
);
46 void receive_message();
47 void process(ObjectCacheRequest
* reply
, uint64_t seq_id
);
48 void read_reply_header();
49 void handle_reply_header(bufferptr bp_head
,
50 const boost::system::error_code
& ec
,
51 size_t bytes_transferred
);
52 void read_reply_data(bufferptr
&& bp_head
, bufferptr
&& bp_data
,
53 const uint64_t data_len
);
54 void handle_reply_data(bufferptr bp_head
, bufferptr bp_data
,
55 const uint64_t data_len
,
56 const boost::system::error_code
& ec
,
57 size_t bytes_transferred
);
61 boost::asio::io_service m_io_service
;
62 boost::asio::io_service::work m_io_service_work
;
63 stream_protocol::socket m_dm_socket
;
64 stream_protocol::endpoint m_ep
;
65 std::shared_ptr
<std::thread
> m_io_thread
;
66 std::atomic
<bool> m_session_work
;
68 uint64_t m_worker_thread_num
;
69 boost::asio::io_service
* m_worker
;
70 std::vector
<std::thread
*> m_worker_threads
;
71 boost::asio::io_service::work
* m_worker_io_service_work
;
73 std::atomic
<bool> m_writing
;
74 std::atomic
<bool> m_reading
;
75 std::atomic
<uint64_t> m_sequence_id
;
77 ceph::make_mutex("ceph::cache::cacheclient::m_lock");
78 std::map
<uint64_t, ObjectCacheRequest
*> m_seq_to_req
;
79 bufferlist m_outcoming_bl
;
80 bufferptr m_bp_header
;
83 } // namespace immutable_obj_cache
85 #endif // CEPH_CACHE_CACHE_CLIENT_H