]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/Socket.h
478f2d630208cc699b4c0aad1a74019f35de6406
[ceph.git] / ceph / src / crimson / net / Socket.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/reactor.hh>
8 #include <seastar/core/sharded.hh>
9
10 #include "include/buffer.h"
11
12 #include "crimson/common/log.h"
13 #include "Errors.h"
14 #include "Fwd.h"
15
16 #ifdef UNIT_TESTS_BUILT
17 #include "Interceptor.h"
18 #endif
19
20 namespace crimson::net {
21
22 class Socket;
23 using SocketRef = std::unique_ptr<Socket>;
24 using SocketFRef = seastar::foreign_ptr<SocketRef>;
25
26 class Socket {
27 struct construct_tag {};
28
29 public:
30 // if acceptor side, peer is using a different port (ephemeral_port)
31 // if connector side, I'm using a different port (ephemeral_port)
32 enum class side_t {
33 acceptor,
34 connector
35 };
36 Socket(seastar::connected_socket &&, side_t, uint16_t e_port, construct_tag);
37
38 ~Socket();
39
40 Socket(Socket&& o) = delete;
41
42 seastar::shard_id get_shard_id() const {
43 return sid;
44 }
45
46 side_t get_side() const {
47 return side;
48 }
49
50 uint16_t get_ephemeral_port() const {
51 return ephemeral_port;
52 }
53
54 seastar::socket_address get_local_address() const {
55 return socket.local_address();
56 }
57
58 bool is_shutdown() const {
59 assert(seastar::this_shard_id() == sid);
60 return socket_is_shutdown;
61 }
62
63 // learn my ephemeral_port as connector.
64 // unfortunately, there's no way to identify which port I'm using as
65 // connector with current seastar interface.
66 void learn_ephemeral_port_as_connector(uint16_t port) {
67 assert(side == side_t::connector &&
68 (ephemeral_port == 0 || ephemeral_port == port));
69 ephemeral_port = port;
70 }
71
72 /// read the requested number of bytes into a bufferlist
73 seastar::future<bufferlist> read(size_t bytes);
74
75 seastar::future<bufferptr> read_exactly(size_t bytes);
76
77 seastar::future<> write(bufferlist);
78
79 seastar::future<> flush();
80
81 seastar::future<> write_flush(bufferlist);
82
83 // preemptively disable further reads or writes, can only be shutdown once.
84 void shutdown();
85
86 /// Socket can only be closed once.
87 seastar::future<> close();
88
89 static seastar::future<SocketRef>
90 connect(const entity_addr_t& peer_addr);
91
92 /*
93 * test interfaces
94 */
95
96 // shutdown for tests
97 void force_shutdown() {
98 assert(seastar::this_shard_id() == sid);
99 socket.shutdown_input();
100 socket.shutdown_output();
101 }
102
103 // shutdown input_stream only, for tests
104 void force_shutdown_in() {
105 assert(seastar::this_shard_id() == sid);
106 socket.shutdown_input();
107 }
108
109 // shutdown output_stream only, for tests
110 void force_shutdown_out() {
111 assert(seastar::this_shard_id() == sid);
112 socket.shutdown_output();
113 }
114
115 private:
116 const seastar::shard_id sid;
117 seastar::connected_socket socket;
118 seastar::input_stream<char> in;
119 seastar::output_stream<char> out;
120 bool socket_is_shutdown;
121 side_t side;
122 uint16_t ephemeral_port;
123
124 #ifndef NDEBUG
125 bool closed = false;
126 #endif
127
128 /// buffer state for read()
129 struct {
130 bufferlist buffer;
131 size_t remaining;
132 } r;
133
134 #ifdef UNIT_TESTS_BUILT
135 public:
136 void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
137
138 private:
139 seastar::future<> try_trap_pre(bp_action_t& trap);
140
141 seastar::future<> try_trap_post(bp_action_t& trap);
142
143 bp_action_t next_trap_read = bp_action_t::CONTINUE;
144 bp_action_t next_trap_write = bp_action_t::CONTINUE;
145 socket_blocker* blocker = nullptr;
146
147 #endif
148 friend class ShardedServerSocket;
149 };
150
151 using listen_ertr = crimson::errorator<
152 crimson::ct_error::address_in_use, // The address is already bound
153 crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
154 >;
155
156 class ShardedServerSocket
157 : public seastar::peering_sharded_service<ShardedServerSocket> {
158 struct construct_tag {};
159
160 public:
161 ShardedServerSocket(
162 seastar::shard_id sid,
163 bool dispatch_only_on_primary_sid,
164 construct_tag);
165
166 ~ShardedServerSocket();
167
168 ShardedServerSocket(ShardedServerSocket&&) = delete;
169 ShardedServerSocket(const ShardedServerSocket&) = delete;
170 ShardedServerSocket& operator=(ShardedServerSocket&&) = delete;
171 ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
172
173 bool is_fixed_shard_dispatching() const {
174 return dispatch_only_on_primary_sid;
175 }
176
177 listen_ertr::future<> listen(entity_addr_t addr);
178
179 using accept_func_t =
180 std::function<seastar::future<>(SocketRef, entity_addr_t)>;
181 seastar::future<> accept(accept_func_t &&_fn_accept);
182
183 seastar::future<> shutdown_destroy();
184
185 static seastar::future<ShardedServerSocket*> create(
186 bool dispatch_only_on_this_shard);
187
188 private:
189 const seastar::shard_id primary_sid;
190 /// XXX: Remove once all infrastructure uses multi-core messenger
191 const bool dispatch_only_on_primary_sid;
192 entity_addr_t listen_addr;
193 std::optional<seastar::server_socket> listener;
194 seastar::gate shutdown_gate;
195 accept_func_t fn_accept;
196
197 using sharded_service_t = seastar::sharded<ShardedServerSocket>;
198 std::unique_ptr<sharded_service_t> service;
199 };
200
201 } // namespace crimson::net