]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/SocketConnection.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / crimson / net / SocketConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2017 Red Hat, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #pragma once
16
17 #include <seastar/core/gate.hh>
18 #include <seastar/core/reactor.hh>
19 #include <seastar/core/shared_future.hh>
20 #include <seastar/core/sharded.hh>
21
22 #include "msg/Policy.h"
23 #include "Connection.h"
24 #include "Socket.h"
25 #include "crimson/thread/Throttle.h"
26
27 class AuthAuthorizer;
28 class AuthSessionHandler;
29
30 namespace ceph::net {
31
32 using stop_t = seastar::stop_iteration;
33
34 class SocketMessenger;
35 class SocketConnection;
36 using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
37
38 class SocketConnection : public Connection {
39 SocketMessenger& messenger;
40 seastar::foreign_ptr<std::unique_ptr<Socket>> socket;
41 Dispatcher& dispatcher;
42 seastar::gate pending_dispatch;
43
44 // if acceptor side, socket_port is different from peer_addr.get_port();
45 // if connector side, socket_port is different from my_addr.get_port().
46 enum class side_t {
47 none,
48 acceptor,
49 connector
50 };
51 side_t side = side_t::none;
52 uint16_t socket_port = 0;
53
54 enum class state_t {
55 none,
56 accepting,
57 connecting,
58 open,
59 standby,
60 wait,
61 closing
62 };
63 state_t state = state_t::none;
64
65 /// become valid only when state is state_t::closing
66 seastar::shared_future<> close_ready;
67
68 /// state for handshake
69 struct Handshake {
70 ceph_msg_connect connect;
71 ceph_msg_connect_reply reply;
72 std::unique_ptr<AuthAuthorizer> authorizer;
73 std::chrono::milliseconds backoff;
74 uint32_t connect_seq = 0;
75 uint32_t peer_global_seq = 0;
76 uint32_t global_seq;
77 seastar::promise<> promise;
78 } h;
79
80 /// server side of handshake negotiation
81 seastar::future<stop_t> repeat_handle_connect();
82 seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing,
83 bufferlist&& authorizer_reply);
84 seastar::future<stop_t> replace_existing(SocketConnectionRef existing,
85 bufferlist&& authorizer_reply,
86 bool is_reset_from_peer = false);
87 seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag,
88 bufferlist&& authorizer_reply = {});
89 seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
90 bufferlist&& authorizer_reply);
91
92 seastar::future<> handle_keepalive2();
93 seastar::future<> handle_keepalive2_ack();
94
95 bool require_auth_feature() const;
96 uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
97 /// client side of handshake negotiation
98 seastar::future<stop_t> repeat_connect();
99 seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
100 void reset_session();
101
102 /// state for an incoming message
103 struct MessageReader {
104 ceph_msg_header header;
105 ceph_msg_footer footer;
106 bufferlist front;
107 bufferlist middle;
108 bufferlist data;
109 } m;
110
111 seastar::future<> maybe_throttle();
112 seastar::future<> handle_tags();
113 seastar::future<> handle_ack();
114
115 /// becomes available when handshake completes, and when all previous messages
116 /// have been sent to the output stream. send() chains new messages as
117 /// continuations to this future to act as a queue
118 seastar::future<> send_ready;
119
120 /// encode/write a message
121 seastar::future<> write_message(MessageRef msg);
122
123 ceph::net::Policy<ceph::thread::Throttle> policy;
124 uint64_t features;
125 void set_features(uint64_t new_features) {
126 features = new_features;
127 }
128
129 /// the seq num of the last transmitted message
130 seq_num_t out_seq = 0;
131 /// the seq num of the last received message
132 seq_num_t in_seq = 0;
133 /// update the seq num of last received message
134 /// @returns true if the @c seq is valid, and @c in_seq is updated,
135 /// false otherwise.
136 bool update_rx_seq(seq_num_t seq);
137
138 seastar::future<> read_message();
139
140 std::unique_ptr<AuthSessionHandler> session_security;
141
142 // messages to be resent after connection gets reset
143 std::queue<MessageRef> out_q;
144 // messages sent, but not yet acked by peer
145 std::queue<MessageRef> sent;
146 static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
147
148 struct Keepalive {
149 struct {
150 const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
151 ceph_timespec stamp;
152 } __attribute__((packed)) req;
153 struct {
154 const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
155 ceph_timespec stamp;
156 } __attribute__((packed)) ack;
157 ceph_timespec ack_stamp;
158 } k;
159
160 seastar::future<> fault();
161
162 void execute_open();
163
164 seastar::future<> do_send(MessageRef msg);
165 seastar::future<> do_keepalive();
166 seastar::future<> do_close();
167
168 public:
169 SocketConnection(SocketMessenger& messenger,
170 Dispatcher& dispatcher);
171 ~SocketConnection();
172
173 Messenger* get_messenger() const override;
174
175 int get_peer_type() const override {
176 return peer_type;
177 }
178
179 seastar::future<bool> is_connected() override;
180
181 seastar::future<> send(MessageRef msg) override;
182
183 seastar::future<> keepalive() override;
184
185 seastar::future<> close() override;
186
187 seastar::shard_id shard_id() const override;
188
189 void print(ostream& out) const override;
190
191 public:
192 /// start a handshake from the client's perspective,
193 /// only call when SocketConnection first construct
194 void start_connect(const entity_addr_t& peer_addr,
195 const entity_type_t& peer_type);
196 /// start a handshake from the server's perspective,
197 /// only call when SocketConnection first construct
198 void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
199 const entity_addr_t& peer_addr);
200
201 /// the number of connections initiated in this session, increment when a
202 /// new connection is established
203 uint32_t connect_seq() const {
204 return h.connect_seq;
205 }
206
207 /// the client side should connect us with a gseq. it will be reset with
208 /// the one of exsting connection if it's greater.
209 uint32_t peer_global_seq() const {
210 return h.peer_global_seq;
211 }
212 seq_num_t rx_seq_num() const {
213 return in_seq;
214 }
215
216 /// current state of connection
217 state_t get_state() const {
218 return state;
219 }
220 bool is_server_side() const {
221 return policy.server;
222 }
223 bool is_lossy() const {
224 return policy.lossy;
225 }
226
227 /// move all messages in the sent list back into the queue
228 void requeue_sent();
229
230 std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
231 return {out_seq, std::move(out_q)};
232 }
233 };
234
235 } // namespace ceph::net