]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/immutable_object_cache/CacheClient.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / immutable_object_cache / CacheClient.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_CACHE_CACHE_CLIENT_H
5 #define CEPH_CACHE_CACHE_CLIENT_H
6
7 #include <atomic>
8 #include <boost/asio.hpp>
9 #include <boost/bind.hpp>
10 #include <boost/asio/error.hpp>
11 #include <boost/algorithm/string.hpp>
12
13 #include "include/ceph_assert.h"
14 #include "common/ceph_mutex.h"
15 #include "include/Context.h"
16 #include "Types.h"
17 #include "SocketCommon.h"
18
19
20 using boost::asio::local::stream_protocol;
21
22 namespace ceph {
23 namespace immutable_obj_cache {
24
25 class CacheClient {
26 public:
27 CacheClient(const std::string& file, CephContext* ceph_ctx);
28 ~CacheClient();
29 void run();
30 bool is_session_work();
31 void close();
32 int stop();
33 int connect();
34 void connect(Context* on_finish);
35 void lookup_object(std::string pool_nspace, uint64_t pool_id,
36 uint64_t snap_id, std::string oid,
37 CacheGenContextURef&& on_finish);
38 int register_client(Context* on_finish);
39
40 private:
41 void send_message();
42 void try_send();
43 void fault(const int err_type, const boost::system::error_code& err);
44 void handle_connect(Context* on_finish, const boost::system::error_code& err);
45 void try_receive();
46 void receive_message();
47 void process(ObjectCacheRequest* reply, uint64_t seq_id);
48 void read_reply_header();
49 void handle_reply_header(bufferptr bp_head,
50 const boost::system::error_code& ec,
51 size_t bytes_transferred);
52 void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
53 const uint64_t data_len);
54 void handle_reply_data(bufferptr bp_head, bufferptr bp_data,
55 const uint64_t data_len,
56 const boost::system::error_code& ec,
57 size_t bytes_transferred);
58
59 private:
60 CephContext* m_cct;
61 boost::asio::io_service m_io_service;
62 boost::asio::io_service::work m_io_service_work;
63 stream_protocol::socket m_dm_socket;
64 stream_protocol::endpoint m_ep;
65 std::shared_ptr<std::thread> m_io_thread;
66 std::atomic<bool> m_session_work;
67
68 uint64_t m_worker_thread_num;
69 boost::asio::io_service* m_worker;
70 std::vector<std::thread*> m_worker_threads;
71 boost::asio::io_service::work* m_worker_io_service_work;
72
73 std::atomic<bool> m_writing;
74 std::atomic<bool> m_reading;
75 std::atomic<uint64_t> m_sequence_id;
76 ceph::mutex m_lock =
77 ceph::make_mutex("ceph::cache::cacheclient::m_lock");
78 std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req;
79 bufferlist m_outcoming_bl;
80 bufferptr m_bp_header;
81 };
82
83 } // namespace immutable_obj_cache
84 } // namespace ceph
85 #endif // CEPH_CACHE_CACHE_CLIENT_H