]>
git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/demos/tcp_sctp_server_demo.cc
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2014 Cloudius Systems
22 #include <seastar/core/reactor.hh>
23 #include <seastar/core/app-template.hh>
24 #include <seastar/core/temporary_buffer.hh>
25 #include <seastar/core/distributed.hh>
26 #include <seastar/core/print.hh>
30 using namespace seastar
;
32 static std::string str_ping
{"ping"};
33 static std::string str_txtx
{"txtx"};
34 static std::string str_rxrx
{"rxrx"};
35 static std::string str_pong
{"pong"};
36 static std::string str_unknow
{"unknow cmd"};
37 static int tx_msg_total_size
= 100 * 1024 * 1024;
38 static int tx_msg_size
= 4 * 1024;
39 static int tx_msg_nr
= tx_msg_total_size
/ tx_msg_size
;
40 static int rx_msg_size
= 4 * 1024;
41 static std::string
str_txbuf(tx_msg_size
, 'X');
42 static bool enable_tcp
= false;
43 static bool enable_sctp
= false;
46 std::vector
<server_socket
> _tcp_listeners
;
47 std::vector
<server_socket
> _sctp_listeners
;
49 future
<> listen(ipv4_addr addr
) {
52 lo
.proto
= transport::TCP
;
53 lo
.reuse_address
= true;
54 _tcp_listeners
.push_back(seastar::listen(make_ipv4_address(addr
), lo
));
55 do_accepts(_tcp_listeners
);
60 lo
.proto
= transport::SCTP
;
61 lo
.reuse_address
= true;
62 _sctp_listeners
.push_back(seastar::listen(make_ipv4_address(addr
), lo
));
63 do_accepts(_sctp_listeners
);
65 return make_ready_future
<>();
68 // FIXME: We should properly tear down the service here.
70 return make_ready_future
<>();
73 void do_accepts(std::vector
<server_socket
>& listeners
) {
74 int which
= listeners
.size() - 1;
75 // Accept in the background.
76 (void)listeners
[which
].accept().then([this, &listeners
] (accept_result ar
) mutable {
77 connected_socket fd
= std::move(ar
.connection
);
78 socket_address addr
= std::move(ar
.remote_address
);
79 auto conn
= new connection(*this, std::move(fd
), addr
);
80 (void)conn
->process().then_wrapped([conn
] (auto&& f
) {
84 } catch (std::exception
& ex
) {
85 std::cout
<< "request error " << ex
.what() << "\n";
88 do_accepts(listeners
);
89 }).then_wrapped([] (auto&& f
) {
92 } catch (std::exception
& ex
) {
93 std::cout
<< "accept failed: " << ex
.what() << "\n";
99 input_stream
<char> _read_buf
;
100 output_stream
<char> _write_buf
;
102 connection(tcp_server
& server
, connected_socket
&& fd
, socket_address addr
)
104 , _read_buf(_fd
.input())
105 , _write_buf(_fd
.output()) {}
110 if (_read_buf
.eof()) {
111 return make_ready_future();
113 // Expect 4 bytes cmd from client
115 return _read_buf
.read_exactly(n
).then([this] (temporary_buffer
<char> buf
) {
116 if (buf
.size() == 0) {
117 return make_ready_future();
119 auto cmd
= std::string(buf
.get(), buf
.size());
121 if (cmd
== str_ping
) {
122 return _write_buf
.write(str_pong
).then([this] {
123 return _write_buf
.flush();
128 } else if (cmd
== str_txtx
) {
131 } else if (cmd
== str_rxrx
) {
135 return _write_buf
.write(str_unknow
).then([this] {
136 return _write_buf
.flush();
138 return make_ready_future();
143 future
<> do_write(int end
) {
145 return make_ready_future
<>();
147 return _write_buf
.write(str_txbuf
).then([this] {
148 return _write_buf
.flush();
149 }).then([this, end
] {
150 return do_write(end
- 1);
154 return do_write(tx_msg_nr
).then([this] {
155 return _write_buf
.close();
157 return make_ready_future
<>();
161 return _read_buf
.read_exactly(rx_msg_size
).then([this] (temporary_buffer
<char> buf
) {
162 if (buf
.size() == 0) {
163 return make_ready_future();
170 return do_read().then([] {
171 return make_ready_future
<>();
177 namespace bpo
= boost::program_options
;
179 int main(int ac
, char** av
) {
182 ("port", bpo::value
<uint16_t>()->default_value(10000), "TCP server port")
183 ("tcp", bpo::value
<std::string
>()->default_value("yes"), "tcp listen")
184 ("sctp", bpo::value
<std::string
>()->default_value("no"), "sctp listen") ;
185 return app
.run_deprecated(ac
, av
, [&] {
186 auto&& config
= app
.configuration();
187 uint16_t port
= config
["port"].as
<uint16_t>();
188 enable_tcp
= config
["tcp"].as
<std::string
>() == "yes";
189 enable_sctp
= config
["sctp"].as
<std::string
>() == "yes";
190 if (!enable_tcp
&& !enable_sctp
) {
191 fmt::print(std::cerr
, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n");
192 return engine().exit(1);
194 auto server
= new distributed
<tcp_server
>;
195 (void)server
->start().then([server
= std::move(server
), port
] () mutable {
196 engine().at_exit([server
] {
197 return server
->stop();
199 // Start listening in the background.
200 (void)server
->invoke_on_all(&tcp_server::listen
, ipv4_addr
{port
});
202 std::cout
<< "Seastar TCP server listening on port " << port
<< " ...\n";