1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2017 Red Hat, Inc
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <seastar/core/gate.hh>
18 #include <seastar/core/reactor.hh>
19 #include <seastar/core/shared_future.hh>
20 #include <seastar/core/sharded.hh>
22 #include "msg/Policy.h"
23 #include "Connection.h"
25 #include "crimson/thread/Throttle.h"
28 class AuthSessionHandler
;
32 using stop_t
= seastar::stop_iteration
;
34 class SocketMessenger
;
35 class SocketConnection
;
36 using SocketConnectionRef
= seastar::shared_ptr
<SocketConnection
>;
38 class SocketConnection
: public Connection
{
39 SocketMessenger
& messenger
;
40 seastar::foreign_ptr
<std::unique_ptr
<Socket
>> socket
;
41 Dispatcher
& dispatcher
;
42 seastar::gate pending_dispatch
;
44 // if acceptor side, socket_port is different from peer_addr.get_port();
45 // if connector side, socket_port is different from my_addr.get_port().
51 side_t side
= side_t::none
;
52 uint16_t socket_port
= 0;
63 state_t state
= state_t::none
;
65 /// become valid only when state is state_t::closing
66 seastar::shared_future
<> close_ready
;
68 /// state for handshake
70 ceph_msg_connect connect
;
71 ceph_msg_connect_reply reply
;
72 std::unique_ptr
<AuthAuthorizer
> authorizer
;
73 std::chrono::milliseconds backoff
;
74 uint32_t connect_seq
= 0;
75 uint32_t peer_global_seq
= 0;
77 seastar::promise
<> promise
;
80 /// server side of handshake negotiation
81 seastar::future
<stop_t
> repeat_handle_connect();
82 seastar::future
<stop_t
> handle_connect_with_existing(SocketConnectionRef existing
,
83 bufferlist
&& authorizer_reply
);
84 seastar::future
<stop_t
> replace_existing(SocketConnectionRef existing
,
85 bufferlist
&& authorizer_reply
,
86 bool is_reset_from_peer
= false);
87 seastar::future
<stop_t
> send_connect_reply(ceph::net::msgr_tag_t tag
,
88 bufferlist
&& authorizer_reply
= {});
89 seastar::future
<stop_t
> send_connect_reply_ready(ceph::net::msgr_tag_t tag
,
90 bufferlist
&& authorizer_reply
);
92 seastar::future
<> handle_keepalive2();
93 seastar::future
<> handle_keepalive2_ack();
95 bool require_auth_feature() const;
96 uint32_t get_proto_version(entity_type_t peer_type
, bool connec
) const;
97 /// client side of handshake negotiation
98 seastar::future
<stop_t
> repeat_connect();
99 seastar::future
<stop_t
> handle_connect_reply(ceph::net::msgr_tag_t tag
);
100 void reset_session();
102 /// state for an incoming message
103 struct MessageReader
{
104 ceph_msg_header header
;
105 ceph_msg_footer footer
;
111 seastar::future
<> maybe_throttle();
112 seastar::future
<> handle_tags();
113 seastar::future
<> handle_ack();
115 /// becomes available when handshake completes, and when all previous messages
116 /// have been sent to the output stream. send() chains new messages as
117 /// continuations to this future to act as a queue
118 seastar::future
<> send_ready
;
120 /// encode/write a message
121 seastar::future
<> write_message(MessageRef msg
);
123 ceph::net::Policy
<ceph::thread::Throttle
> policy
;
125 void set_features(uint64_t new_features
) {
126 features
= new_features
;
129 /// the seq num of the last transmitted message
130 seq_num_t out_seq
= 0;
131 /// the seq num of the last received message
132 seq_num_t in_seq
= 0;
133 /// update the seq num of last received message
134 /// @returns true if the @c seq is valid, and @c in_seq is updated,
136 bool update_rx_seq(seq_num_t seq
);
138 seastar::future
<> read_message();
140 std::unique_ptr
<AuthSessionHandler
> session_security
;
142 // messages to be resent after connection gets reset
143 std::queue
<MessageRef
> out_q
;
144 // messages sent, but not yet acked by peer
145 std::queue
<MessageRef
> sent
;
146 static void discard_up_to(std::queue
<MessageRef
>*, seq_num_t
);
150 const char tag
= CEPH_MSGR_TAG_KEEPALIVE2
;
152 } __attribute__((packed
)) req
;
154 const char tag
= CEPH_MSGR_TAG_KEEPALIVE2_ACK
;
156 } __attribute__((packed
)) ack
;
157 ceph_timespec ack_stamp
;
160 seastar::future
<> fault();
164 seastar::future
<> do_send(MessageRef msg
);
165 seastar::future
<> do_keepalive();
166 seastar::future
<> do_close();
169 SocketConnection(SocketMessenger
& messenger
,
170 Dispatcher
& dispatcher
);
173 Messenger
* get_messenger() const override
;
175 int get_peer_type() const override
{
179 seastar::future
<bool> is_connected() override
;
181 seastar::future
<> send(MessageRef msg
) override
;
183 seastar::future
<> keepalive() override
;
185 seastar::future
<> close() override
;
187 seastar::shard_id
shard_id() const override
;
189 void print(ostream
& out
) const override
;
192 /// start a handshake from the client's perspective,
193 /// only call when SocketConnection first construct
194 void start_connect(const entity_addr_t
& peer_addr
,
195 const entity_type_t
& peer_type
);
196 /// start a handshake from the server's perspective,
197 /// only call when SocketConnection first construct
198 void start_accept(seastar::foreign_ptr
<std::unique_ptr
<Socket
>>&& socket
,
199 const entity_addr_t
& peer_addr
);
201 /// the number of connections initiated in this session, increment when a
202 /// new connection is established
203 uint32_t connect_seq() const {
204 return h
.connect_seq
;
207 /// the client side should connect us with a gseq. it will be reset with
208 /// the one of exsting connection if it's greater.
209 uint32_t peer_global_seq() const {
210 return h
.peer_global_seq
;
212 seq_num_t
rx_seq_num() const {
216 /// current state of connection
217 state_t
get_state() const {
220 bool is_server_side() const {
221 return policy
.server
;
223 bool is_lossy() const {
227 /// move all messages in the sent list back into the queue
230 std::tuple
<seq_num_t
, std::queue
<MessageRef
>> get_out_queue() {
231 return {out_seq
, std::move(out_q
)};
235 } // namespace ceph::net