1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/reactor.hh>
8 #include <seastar/core/sharded.hh>
10 #include "include/buffer.h"
12 #include "crimson/common/log.h"
16 #ifdef UNIT_TESTS_BUILT
17 #include "Interceptor.h"
20 namespace crimson::net
{
23 using SocketRef
= std::unique_ptr
<Socket
>;
24 using SocketFRef
= seastar::foreign_ptr
<SocketRef
>;
27 struct construct_tag
{};
30 // if acceptor side, peer is using a different port (ephemeral_port)
31 // if connector side, I'm using a different port (ephemeral_port)
36 Socket(seastar::connected_socket
&&, side_t
, uint16_t e_port
, construct_tag
);
40 Socket(Socket
&& o
) = delete;
42 seastar::shard_id
get_shard_id() const {
46 side_t
get_side() const {
50 uint16_t get_ephemeral_port() const {
51 return ephemeral_port
;
54 seastar::socket_address
get_local_address() const {
55 return socket
.local_address();
58 bool is_shutdown() const {
59 assert(seastar::this_shard_id() == sid
);
60 return socket_is_shutdown
;
63 // learn my ephemeral_port as connector.
64 // unfortunately, there's no way to identify which port I'm using as
65 // connector with current seastar interface.
66 void learn_ephemeral_port_as_connector(uint16_t port
) {
67 assert(side
== side_t::connector
&&
68 (ephemeral_port
== 0 || ephemeral_port
== port
));
69 ephemeral_port
= port
;
72 /// read the requested number of bytes into a bufferlist
73 seastar::future
<bufferlist
> read(size_t bytes
);
75 seastar::future
<bufferptr
> read_exactly(size_t bytes
);
77 seastar::future
<> write(bufferlist
);
79 seastar::future
<> flush();
81 seastar::future
<> write_flush(bufferlist
);
83 // preemptively disable further reads or writes, can only be shutdown once.
86 /// Socket can only be closed once.
87 seastar::future
<> close();
89 static seastar::future
<SocketRef
>
90 connect(const entity_addr_t
& peer_addr
);
97 void force_shutdown() {
98 assert(seastar::this_shard_id() == sid
);
99 socket
.shutdown_input();
100 socket
.shutdown_output();
103 // shutdown input_stream only, for tests
104 void force_shutdown_in() {
105 assert(seastar::this_shard_id() == sid
);
106 socket
.shutdown_input();
109 // shutdown output_stream only, for tests
110 void force_shutdown_out() {
111 assert(seastar::this_shard_id() == sid
);
112 socket
.shutdown_output();
116 const seastar::shard_id sid
;
117 seastar::connected_socket socket
;
118 seastar::input_stream
<char> in
;
119 seastar::output_stream
<char> out
;
120 bool socket_is_shutdown
;
122 uint16_t ephemeral_port
;
128 /// buffer state for read()
134 #ifdef UNIT_TESTS_BUILT
136 void set_trap(bp_type_t type
, bp_action_t action
, socket_blocker
* blocker_
);
139 seastar::future
<> try_trap_pre(bp_action_t
& trap
);
141 seastar::future
<> try_trap_post(bp_action_t
& trap
);
143 bp_action_t next_trap_read
= bp_action_t::CONTINUE
;
144 bp_action_t next_trap_write
= bp_action_t::CONTINUE
;
145 socket_blocker
* blocker
= nullptr;
148 friend class ShardedServerSocket
;
151 using listen_ertr
= crimson::errorator
<
152 crimson::ct_error::address_in_use
, // The address is already bound
153 crimson::ct_error::address_not_available
// https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
156 class ShardedServerSocket
157 : public seastar::peering_sharded_service
<ShardedServerSocket
> {
158 struct construct_tag
{};
162 seastar::shard_id sid
,
163 bool dispatch_only_on_primary_sid
,
166 ~ShardedServerSocket();
168 ShardedServerSocket(ShardedServerSocket
&&) = delete;
169 ShardedServerSocket(const ShardedServerSocket
&) = delete;
170 ShardedServerSocket
& operator=(ShardedServerSocket
&&) = delete;
171 ShardedServerSocket
& operator=(const ShardedServerSocket
&) = delete;
173 bool is_fixed_shard_dispatching() const {
174 return dispatch_only_on_primary_sid
;
177 listen_ertr::future
<> listen(entity_addr_t addr
);
179 using accept_func_t
=
180 std::function
<seastar::future
<>(SocketRef
, entity_addr_t
)>;
181 seastar::future
<> accept(accept_func_t
&&_fn_accept
);
183 seastar::future
<> shutdown_destroy();
185 static seastar::future
<ShardedServerSocket
*> create(
186 bool dispatch_only_on_this_shard
);
189 const seastar::shard_id primary_sid
;
190 /// XXX: Remove once all infrastructure uses multi-core messenger
191 const bool dispatch_only_on_primary_sid
;
192 entity_addr_t listen_addr
;
193 std::optional
<seastar::server_socket
> listener
;
194 seastar::gate shutdown_gate
;
195 accept_func_t fn_accept
;
197 using sharded_service_t
= seastar::sharded
<ShardedServerSocket
>;
198 std::unique_ptr
<sharded_service_t
> service
;
201 } // namespace crimson::net