]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/demos/tcp_sctp_server_demo.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / demos / tcp_sctp_server_demo.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2014 Cloudius Systems
20 */
21
22 #include <seastar/core/reactor.hh>
23 #include <seastar/core/app-template.hh>
24 #include <seastar/core/temporary_buffer.hh>
25 #include <seastar/core/distributed.hh>
26 #include <seastar/core/print.hh>
27 #include <vector>
28 #include <iostream>
29
30 using namespace seastar;
31
32 static std::string str_ping{"ping"};
33 static std::string str_txtx{"txtx"};
34 static std::string str_rxrx{"rxrx"};
35 static std::string str_pong{"pong"};
36 static std::string str_unknow{"unknow cmd"};
37 static int tx_msg_total_size = 100 * 1024 * 1024;
38 static int tx_msg_size = 4 * 1024;
39 static int tx_msg_nr = tx_msg_total_size / tx_msg_size;
40 static int rx_msg_size = 4 * 1024;
41 static std::string str_txbuf(tx_msg_size, 'X');
42 static bool enable_tcp = false;
43 static bool enable_sctp = false;
44
45 class tcp_server {
46 std::vector<server_socket> _tcp_listeners;
47 std::vector<server_socket> _sctp_listeners;
48 public:
49 future<> listen(ipv4_addr addr) {
50 if (enable_tcp) {
51 listen_options lo;
52 lo.proto = transport::TCP;
53 lo.reuse_address = true;
54 _tcp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo));
55 do_accepts(_tcp_listeners);
56 }
57
58 if (enable_sctp) {
59 listen_options lo;
60 lo.proto = transport::SCTP;
61 lo.reuse_address = true;
62 _sctp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo));
63 do_accepts(_sctp_listeners);
64 }
65 return make_ready_future<>();
66 }
67
68 // FIXME: We should properly tear down the service here.
69 future<> stop() {
70 return make_ready_future<>();
71 }
72
73 void do_accepts(std::vector<server_socket>& listeners) {
74 int which = listeners.size() - 1;
75 // Accept in the background.
76 (void)listeners[which].accept().then([this, &listeners] (accept_result ar) mutable {
77 connected_socket fd = std::move(ar.connection);
78 socket_address addr = std::move(ar.remote_address);
79 auto conn = new connection(*this, std::move(fd), addr);
80 (void)conn->process().then_wrapped([conn] (auto&& f) {
81 delete conn;
82 try {
83 f.get();
84 } catch (std::exception& ex) {
85 std::cout << "request error " << ex.what() << "\n";
86 }
87 });
88 do_accepts(listeners);
89 }).then_wrapped([] (auto&& f) {
90 try {
91 f.get();
92 } catch (std::exception& ex) {
93 std::cout << "accept failed: " << ex.what() << "\n";
94 }
95 });
96 }
97 class connection {
98 connected_socket _fd;
99 input_stream<char> _read_buf;
100 output_stream<char> _write_buf;
101 public:
102 connection(tcp_server& server, connected_socket&& fd, socket_address addr)
103 : _fd(std::move(fd))
104 , _read_buf(_fd.input())
105 , _write_buf(_fd.output()) {}
106 future<> process() {
107 return read();
108 }
109 future<> read() {
110 if (_read_buf.eof()) {
111 return make_ready_future();
112 }
113 // Expect 4 bytes cmd from client
114 size_t n = 4;
115 return _read_buf.read_exactly(n).then([this] (temporary_buffer<char> buf) {
116 if (buf.size() == 0) {
117 return make_ready_future();
118 }
119 auto cmd = std::string(buf.get(), buf.size());
120 // pingpong test
121 if (cmd == str_ping) {
122 return _write_buf.write(str_pong).then([this] {
123 return _write_buf.flush();
124 }).then([this] {
125 return this->read();
126 });
127 // server tx test
128 } else if (cmd == str_txtx) {
129 return tx_test();
130 // server tx test
131 } else if (cmd == str_rxrx) {
132 return rx_test();
133 // unknow test
134 } else {
135 return _write_buf.write(str_unknow).then([this] {
136 return _write_buf.flush();
137 }).then([] {
138 return make_ready_future();
139 });
140 }
141 });
142 }
143 future<> do_write(int end) {
144 if (end == 0) {
145 return make_ready_future<>();
146 }
147 return _write_buf.write(str_txbuf).then([this] {
148 return _write_buf.flush();
149 }).then([this, end] {
150 return do_write(end - 1);
151 });
152 }
153 future<> tx_test() {
154 return do_write(tx_msg_nr).then([this] {
155 return _write_buf.close();
156 }).then([] {
157 return make_ready_future<>();
158 });
159 }
160 future<> do_read() {
161 return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) {
162 if (buf.size() == 0) {
163 return make_ready_future();
164 } else {
165 return do_read();
166 }
167 });
168 }
169 future<> rx_test() {
170 return do_read().then([] {
171 return make_ready_future<>();
172 });
173 }
174 };
175 };
176
177 namespace bpo = boost::program_options;
178
179 int main(int ac, char** av) {
180 app_template app;
181 app.add_options()
182 ("port", bpo::value<uint16_t>()->default_value(10000), "TCP server port")
183 ("tcp", bpo::value<std::string>()->default_value("yes"), "tcp listen")
184 ("sctp", bpo::value<std::string>()->default_value("no"), "sctp listen") ;
185 return app.run_deprecated(ac, av, [&] {
186 auto&& config = app.configuration();
187 uint16_t port = config["port"].as<uint16_t>();
188 enable_tcp = config["tcp"].as<std::string>() == "yes";
189 enable_sctp = config["sctp"].as<std::string>() == "yes";
190 if (!enable_tcp && !enable_sctp) {
191 fmt::print(std::cerr, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n");
192 return engine().exit(1);
193 }
194 auto server = new distributed<tcp_server>;
195 (void)server->start().then([server = std::move(server), port] () mutable {
196 engine().at_exit([server] {
197 return server->stop();
198 });
199 // Start listening in the background.
200 (void)server->invoke_on_all(&tcp_server::listen, ipv4_addr{port});
201 }).then([port] {
202 std::cout << "Seastar TCP server listening on port " << port << " ...\n";
203 });
204 });
205 }