]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/immutable_object_cache/CacheSession.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / immutable_object_cache / CacheSession.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/debug.h"
5 #include "common/ceph_context.h"
6 #include "CacheSession.h"
7
8 #define dout_context g_ceph_context
9 #define dout_subsys ceph_subsys_immutable_obj_cache
10 #undef dout_prefix
11 #define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
12 << __func__ << ": "
13
14
15 namespace ceph {
16 namespace immutable_obj_cache {
17
18 CacheSession::CacheSession(io_service& io_service,
19 ProcessMsg processmsg,
20 CephContext* cct)
21 : m_dm_socket(io_service),
22 m_server_process_msg(processmsg), m_cct(cct) {
23 m_bp_header = buffer::create(get_header_size());
24 }
25
26 CacheSession::~CacheSession() {
27 close();
28 }
29
30 stream_protocol::socket& CacheSession::socket() {
31 return m_dm_socket;
32 }
33
34 void CacheSession::close() {
35 if (m_dm_socket.is_open()) {
36 boost::system::error_code close_ec;
37 m_dm_socket.close(close_ec);
38 if (close_ec) {
39 ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
40 }
41 }
42 }
43
44 void CacheSession::start() {
45 read_request_header();
46 }
47
48 void CacheSession::read_request_header() {
49 ldout(m_cct, 20) << dendl;
50 boost::asio::async_read(m_dm_socket,
51 boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
52 boost::asio::transfer_exactly(get_header_size()),
53 boost::bind(&CacheSession::handle_request_header,
54 shared_from_this(), boost::asio::placeholders::error,
55 boost::asio::placeholders::bytes_transferred));
56 }
57
58 void CacheSession::handle_request_header(const boost::system::error_code& err,
59 size_t bytes_transferred) {
60 ldout(m_cct, 20) << dendl;
61 if (err || bytes_transferred != get_header_size()) {
62 fault(err);
63 return;
64 }
65
66 read_request_data(get_data_len(m_bp_header.c_str()));
67 }
68
69 void CacheSession::read_request_data(uint64_t data_len) {
70 ldout(m_cct, 20) << dendl;
71 bufferptr bp_data(buffer::create(data_len));
72 boost::asio::async_read(m_dm_socket,
73 boost::asio::buffer(bp_data.c_str(), bp_data.length()),
74 boost::asio::transfer_exactly(data_len),
75 boost::bind(&CacheSession::handle_request_data,
76 shared_from_this(), bp_data, data_len,
77 boost::asio::placeholders::error,
78 boost::asio::placeholders::bytes_transferred));
79 }
80
81 void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
82 const boost::system::error_code& err,
83 size_t bytes_transferred) {
84 ldout(m_cct, 20) << dendl;
85 if (err || bytes_transferred != data_len) {
86 fault(err);
87 return;
88 }
89
90 bufferlist bl_data;
91
92 bl_data.append(m_bp_header);
93 bl_data.append(std::move(bp));
94
95 ObjectCacheRequest* req = decode_object_cache_request(bl_data);
96
97 process(req);
98 delete req;
99 read_request_header();
100 }
101
102 void CacheSession::process(ObjectCacheRequest* req) {
103 ldout(m_cct, 20) << dendl;
104 m_server_process_msg(this, req);
105 }
106
107 void CacheSession::send(ObjectCacheRequest* reply) {
108 ldout(m_cct, 20) << dendl;
109 bufferlist bl;
110 reply->encode();
111 bl.append(reply->get_payload_bufferlist());
112
113 boost::asio::async_write(m_dm_socket,
114 boost::asio::buffer(bl.c_str(), bl.length()),
115 boost::asio::transfer_exactly(bl.length()),
116 [this, bl, reply](const boost::system::error_code& err,
117 size_t bytes_transferred) {
118 delete reply;
119 if (err || bytes_transferred != bl.length()) {
120 fault(err);
121 return;
122 }
123 });
124 }
125
126 void CacheSession::fault(const boost::system::error_code& ec) {
127 ldout(m_cct, 20) << "session fault : " << ec.message() << dendl;
128 }
129
130 } // namespace immutable_obj_cache
131 } // namespace ceph