]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2014 Cloudius Systems | |
20 | */ | |
21 | ||
22 | #include <seastar/core/reactor.hh> | |
23 | #include <seastar/core/app-template.hh> | |
24 | #include <seastar/core/temporary_buffer.hh> | |
25 | #include <seastar/core/distributed.hh> | |
9f95a23c | 26 | #include <seastar/core/print.hh> |
11fdf7f2 TL |
27 | #include <vector> |
28 | #include <iostream> | |
29 | ||
30 | using namespace seastar; | |
31 | ||
32 | static std::string str_ping{"ping"}; | |
33 | static std::string str_txtx{"txtx"}; | |
34 | static std::string str_rxrx{"rxrx"}; | |
35 | static std::string str_pong{"pong"}; | |
36 | static std::string str_unknow{"unknow cmd"}; | |
37 | static int tx_msg_total_size = 100 * 1024 * 1024; | |
38 | static int tx_msg_size = 4 * 1024; | |
39 | static int tx_msg_nr = tx_msg_total_size / tx_msg_size; | |
40 | static int rx_msg_size = 4 * 1024; | |
41 | static std::string str_txbuf(tx_msg_size, 'X'); | |
42 | static bool enable_tcp = false; | |
43 | static bool enable_sctp = false; | |
44 | ||
45 | class tcp_server { | |
f67539c2 TL |
46 | std::vector<server_socket> _tcp_listeners; |
47 | std::vector<server_socket> _sctp_listeners; | |
11fdf7f2 TL |
48 | public: |
49 | future<> listen(ipv4_addr addr) { | |
50 | if (enable_tcp) { | |
51 | listen_options lo; | |
52 | lo.proto = transport::TCP; | |
53 | lo.reuse_address = true; | |
f67539c2 | 54 | _tcp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo)); |
11fdf7f2 TL |
55 | do_accepts(_tcp_listeners); |
56 | } | |
57 | ||
58 | if (enable_sctp) { | |
59 | listen_options lo; | |
60 | lo.proto = transport::SCTP; | |
61 | lo.reuse_address = true; | |
f67539c2 | 62 | _sctp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo)); |
11fdf7f2 TL |
63 | do_accepts(_sctp_listeners); |
64 | } | |
65 | return make_ready_future<>(); | |
66 | } | |
67 | ||
68 | // FIXME: We should properly tear down the service here. | |
69 | future<> stop() { | |
70 | return make_ready_future<>(); | |
71 | } | |
72 | ||
f67539c2 | 73 | void do_accepts(std::vector<server_socket>& listeners) { |
11fdf7f2 | 74 | int which = listeners.size() - 1; |
9f95a23c TL |
75 | // Accept in the background. |
76 | (void)listeners[which].accept().then([this, &listeners] (accept_result ar) mutable { | |
77 | connected_socket fd = std::move(ar.connection); | |
78 | socket_address addr = std::move(ar.remote_address); | |
11fdf7f2 | 79 | auto conn = new connection(*this, std::move(fd), addr); |
9f95a23c | 80 | (void)conn->process().then_wrapped([conn] (auto&& f) { |
11fdf7f2 TL |
81 | delete conn; |
82 | try { | |
83 | f.get(); | |
84 | } catch (std::exception& ex) { | |
85 | std::cout << "request error " << ex.what() << "\n"; | |
86 | } | |
87 | }); | |
88 | do_accepts(listeners); | |
89 | }).then_wrapped([] (auto&& f) { | |
90 | try { | |
91 | f.get(); | |
92 | } catch (std::exception& ex) { | |
93 | std::cout << "accept failed: " << ex.what() << "\n"; | |
94 | } | |
95 | }); | |
96 | } | |
97 | class connection { | |
98 | connected_socket _fd; | |
99 | input_stream<char> _read_buf; | |
100 | output_stream<char> _write_buf; | |
101 | public: | |
102 | connection(tcp_server& server, connected_socket&& fd, socket_address addr) | |
103 | : _fd(std::move(fd)) | |
104 | , _read_buf(_fd.input()) | |
105 | , _write_buf(_fd.output()) {} | |
106 | future<> process() { | |
107 | return read(); | |
108 | } | |
109 | future<> read() { | |
110 | if (_read_buf.eof()) { | |
111 | return make_ready_future(); | |
112 | } | |
113 | // Expect 4 bytes cmd from client | |
114 | size_t n = 4; | |
115 | return _read_buf.read_exactly(n).then([this] (temporary_buffer<char> buf) { | |
116 | if (buf.size() == 0) { | |
117 | return make_ready_future(); | |
118 | } | |
119 | auto cmd = std::string(buf.get(), buf.size()); | |
120 | // pingpong test | |
121 | if (cmd == str_ping) { | |
122 | return _write_buf.write(str_pong).then([this] { | |
123 | return _write_buf.flush(); | |
124 | }).then([this] { | |
125 | return this->read(); | |
126 | }); | |
127 | // server tx test | |
128 | } else if (cmd == str_txtx) { | |
129 | return tx_test(); | |
130 | // server tx test | |
131 | } else if (cmd == str_rxrx) { | |
132 | return rx_test(); | |
133 | // unknow test | |
134 | } else { | |
135 | return _write_buf.write(str_unknow).then([this] { | |
136 | return _write_buf.flush(); | |
137 | }).then([] { | |
138 | return make_ready_future(); | |
139 | }); | |
140 | } | |
141 | }); | |
142 | } | |
143 | future<> do_write(int end) { | |
144 | if (end == 0) { | |
145 | return make_ready_future<>(); | |
146 | } | |
147 | return _write_buf.write(str_txbuf).then([this] { | |
148 | return _write_buf.flush(); | |
149 | }).then([this, end] { | |
150 | return do_write(end - 1); | |
151 | }); | |
152 | } | |
153 | future<> tx_test() { | |
154 | return do_write(tx_msg_nr).then([this] { | |
155 | return _write_buf.close(); | |
156 | }).then([] { | |
157 | return make_ready_future<>(); | |
158 | }); | |
159 | } | |
160 | future<> do_read() { | |
161 | return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) { | |
162 | if (buf.size() == 0) { | |
163 | return make_ready_future(); | |
164 | } else { | |
165 | return do_read(); | |
166 | } | |
167 | }); | |
168 | } | |
169 | future<> rx_test() { | |
170 | return do_read().then([] { | |
171 | return make_ready_future<>(); | |
172 | }); | |
173 | } | |
174 | }; | |
175 | }; | |
176 | ||
177 | namespace bpo = boost::program_options; | |
178 | ||
179 | int main(int ac, char** av) { | |
180 | app_template app; | |
181 | app.add_options() | |
182 | ("port", bpo::value<uint16_t>()->default_value(10000), "TCP server port") | |
183 | ("tcp", bpo::value<std::string>()->default_value("yes"), "tcp listen") | |
184 | ("sctp", bpo::value<std::string>()->default_value("no"), "sctp listen") ; | |
185 | return app.run_deprecated(ac, av, [&] { | |
186 | auto&& config = app.configuration(); | |
187 | uint16_t port = config["port"].as<uint16_t>(); | |
188 | enable_tcp = config["tcp"].as<std::string>() == "yes"; | |
189 | enable_sctp = config["sctp"].as<std::string>() == "yes"; | |
190 | if (!enable_tcp && !enable_sctp) { | |
20effc67 | 191 | fmt::print(std::cerr, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n"); |
11fdf7f2 TL |
192 | return engine().exit(1); |
193 | } | |
194 | auto server = new distributed<tcp_server>; | |
9f95a23c | 195 | (void)server->start().then([server = std::move(server), port] () mutable { |
11fdf7f2 TL |
196 | engine().at_exit([server] { |
197 | return server->stop(); | |
198 | }); | |
9f95a23c TL |
199 | // Start listening in the background. |
200 | (void)server->invoke_on_all(&tcp_server::listen, ipv4_addr{port}); | |
11fdf7f2 TL |
201 | }).then([port] { |
202 | std::cout << "Seastar TCP server listening on port " << port << " ...\n"; | |
203 | }); | |
204 | }); | |
205 | } |