]>
git.proxmox.com Git - ceph.git/blob - ceph/src/tools/immutable_object_cache/CacheSession.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "common/debug.h"
5 #include "common/ceph_context.h"
6 #include "CacheSession.h"
8 #define dout_context g_ceph_context
9 #define dout_subsys ceph_subsys_immutable_obj_cache
11 #define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
16 namespace immutable_obj_cache
{
18 CacheSession::CacheSession(io_service
& io_service
,
19 ProcessMsg processmsg
,
21 : m_dm_socket(io_service
),
22 m_server_process_msg(processmsg
), m_cct(cct
) {
23 m_bp_header
= buffer::create(get_header_size());
26 CacheSession::~CacheSession() {
30 stream_protocol::socket
& CacheSession::socket() {
34 void CacheSession::close() {
35 if (m_dm_socket
.is_open()) {
36 boost::system::error_code close_ec
;
37 m_dm_socket
.close(close_ec
);
39 ldout(m_cct
, 20) << "close: " << close_ec
.message() << dendl
;
44 void CacheSession::start() {
45 read_request_header();
48 void CacheSession::read_request_header() {
49 ldout(m_cct
, 20) << dendl
;
50 boost::asio::async_read(m_dm_socket
,
51 boost::asio::buffer(m_bp_header
.c_str(), get_header_size()),
52 boost::asio::transfer_exactly(get_header_size()),
53 boost::bind(&CacheSession::handle_request_header
,
54 shared_from_this(), boost::asio::placeholders::error
,
55 boost::asio::placeholders::bytes_transferred
));
58 void CacheSession::handle_request_header(const boost::system::error_code
& err
,
59 size_t bytes_transferred
) {
60 ldout(m_cct
, 20) << dendl
;
61 if (err
|| bytes_transferred
!= get_header_size()) {
66 read_request_data(get_data_len(m_bp_header
.c_str()));
69 void CacheSession::read_request_data(uint64_t data_len
) {
70 ldout(m_cct
, 20) << dendl
;
71 bufferptr
bp_data(buffer::create(data_len
));
72 boost::asio::async_read(m_dm_socket
,
73 boost::asio::buffer(bp_data
.c_str(), bp_data
.length()),
74 boost::asio::transfer_exactly(data_len
),
75 boost::bind(&CacheSession::handle_request_data
,
76 shared_from_this(), bp_data
, data_len
,
77 boost::asio::placeholders::error
,
78 boost::asio::placeholders::bytes_transferred
));
81 void CacheSession::handle_request_data(bufferptr bp
, uint64_t data_len
,
82 const boost::system::error_code
& err
,
83 size_t bytes_transferred
) {
84 ldout(m_cct
, 20) << dendl
;
85 if (err
|| bytes_transferred
!= data_len
) {
92 bl_data
.append(m_bp_header
);
93 bl_data
.append(std::move(bp
));
95 ObjectCacheRequest
* req
= decode_object_cache_request(bl_data
);
99 read_request_header();
102 void CacheSession::process(ObjectCacheRequest
* req
) {
103 ldout(m_cct
, 20) << dendl
;
104 m_server_process_msg(this, req
);
107 void CacheSession::send(ObjectCacheRequest
* reply
) {
108 ldout(m_cct
, 20) << dendl
;
111 bl
.append(reply
->get_payload_bufferlist());
113 boost::asio::async_write(m_dm_socket
,
114 boost::asio::buffer(bl
.c_str(), bl
.length()),
115 boost::asio::transfer_exactly(bl
.length()),
116 [this, bl
, reply
](const boost::system::error_code
& err
,
117 size_t bytes_transferred
) {
119 if (err
|| bytes_transferred
!= bl
.length()) {
126 void CacheSession::fault(const boost::system::error_code
& ec
) {
127 ldout(m_cct
, 20) << "session fault : " << ec
.message() << dendl
;
130 } // namespace immutable_obj_cache