]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2016 ScyllaDB | |
20 | */ | |
21 | ||
11fdf7f2 TL |
22 | #include "loopback_socket.hh" |
23 | #include <seastar/rpc/rpc.hh> | |
24 | #include <seastar/rpc/rpc_types.hh> | |
25 | #include <seastar/rpc/lz4_compressor.hh> | |
9f95a23c | 26 | #include <seastar/rpc/lz4_fragmented_compressor.hh> |
11fdf7f2 TL |
27 | #include <seastar/rpc/multi_algo_compressor_factory.hh> |
28 | #include <seastar/testing/test_case.hh> | |
29 | #include <seastar/testing/thread_test_case.hh> | |
9f95a23c | 30 | #include <seastar/testing/test_runner.hh> |
11fdf7f2 TL |
31 | #include <seastar/core/thread.hh> |
32 | #include <seastar/core/sleep.hh> | |
9f95a23c | 33 | #include <seastar/core/distributed.hh> |
f67539c2 | 34 | #include <seastar/core/loop.hh> |
11fdf7f2 | 35 | #include <seastar/util/defer.hh> |
f67539c2 | 36 | #include <seastar/util/log.hh> |
20effc67 TL |
37 | #include <seastar/util/closeable.hh> |
38 | #include <seastar/util/noncopyable_function.hh> | |
11fdf7f2 TL |
39 | |
40 | using namespace seastar; | |
41 | ||
42 | struct serializer { | |
43 | }; | |
44 | ||
45 | template <typename T, typename Output> | |
46 | inline | |
47 | void write_arithmetic_type(Output& out, T v) { | |
48 | static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); | |
49 | return out.write(reinterpret_cast<const char*>(&v), sizeof(T)); | |
50 | } | |
51 | ||
52 | template <typename T, typename Input> | |
53 | inline | |
54 | T read_arithmetic_type(Input& in) { | |
55 | static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); | |
56 | T v; | |
57 | in.read(reinterpret_cast<char*>(&v), sizeof(T)); | |
58 | return v; | |
59 | } | |
60 | ||
61 | template <typename Output> | |
62 | inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); } | |
63 | template <typename Output> | |
64 | inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); } | |
65 | template <typename Output> | |
66 | inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); } | |
67 | template <typename Output> | |
68 | inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); } | |
69 | template <typename Output> | |
70 | inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); } | |
71 | template <typename Input> | |
72 | inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); } | |
73 | template <typename Input> | |
74 | inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); } | |
75 | template <typename Input> | |
76 | inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); } | |
77 | template <typename Input> | |
78 | inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); } | |
79 | template <typename Input> | |
80 | inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); } | |
81 | ||
82 | template <typename Output> | |
83 | inline void write(serializer, Output& out, const sstring& v) { | |
84 | write_arithmetic_type(out, uint32_t(v.size())); | |
85 | out.write(v.c_str(), v.size()); | |
86 | } | |
87 | ||
88 | template <typename Input> | |
89 | inline sstring read(serializer, Input& in, rpc::type<sstring>) { | |
90 | auto size = read_arithmetic_type<uint32_t>(in); | |
f67539c2 TL |
91 | sstring ret = uninitialized_string(size); |
92 | in.read(ret.data(), size); | |
11fdf7f2 TL |
93 | return ret; |
94 | } | |
95 | ||
96 | using test_rpc_proto = rpc::protocol<serializer>; | |
97 | using make_socket_fn = std::function<seastar::socket ()>; | |
98 | ||
99 | struct rpc_loopback_error_injector : public loopback_error_injector { | |
100 | int _x = 0; | |
101 | bool server_rcv_error() override { | |
102 | return _x++ >= 50; | |
103 | } | |
104 | }; | |
105 | ||
106 | class rpc_socket_impl : public ::net::socket_impl { | |
107 | promise<connected_socket> _p; | |
108 | bool _connect; | |
109 | loopback_socket_impl _socket; | |
110 | rpc_loopback_error_injector _error_injector; | |
111 | public: | |
112 | rpc_socket_impl(loopback_connection_factory& factory, bool connect, bool inject_error) | |
113 | : _connect(connect), | |
114 | _socket(factory, inject_error ? &_error_injector : nullptr) { | |
115 | } | |
116 | virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override { | |
117 | return _connect ? _socket.connect(sa, local, proto) : _p.get_future(); | |
118 | } | |
9f95a23c TL |
119 | virtual void set_reuseaddr(bool reuseaddr) override {} |
120 | virtual bool get_reuseaddr() const override { return false; }; | |
11fdf7f2 TL |
121 | virtual void shutdown() override { |
122 | if (_connect) { | |
123 | _socket.shutdown(); | |
124 | } else { | |
125 | _p.set_exception(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); | |
126 | } | |
127 | } | |
128 | }; | |
129 | ||
9f95a23c TL |
130 | struct rpc_test_config { |
131 | rpc::resource_limits resource_limits = {}; | |
132 | rpc::server_options server_options = {}; | |
133 | bool connect = true; | |
134 | bool inject_error = false; | |
135 | }; | |
136 | ||
137 | template<typename MsgType = int> | |
138 | class rpc_test_env { | |
139 | struct rpc_test_service { | |
140 | test_rpc_proto _proto; | |
141 | test_rpc_proto::server _server; | |
142 | std::vector<MsgType> _handlers; | |
143 | ||
144 | rpc_test_service() = delete; | |
145 | explicit rpc_test_service(const rpc_test_config& cfg, loopback_connection_factory& lcf) | |
146 | : _proto(serializer()) | |
147 | , _server(_proto, cfg.server_options, lcf.get_server_socket(), cfg.resource_limits) | |
148 | { } | |
149 | ||
150 | test_rpc_proto& proto() { | |
151 | return _proto; | |
152 | } | |
153 | ||
154 | test_rpc_proto::server& server() { | |
155 | return _server; | |
156 | } | |
157 | ||
158 | future<> stop() { | |
159 | return parallel_for_each(_handlers, [this] (auto t) { | |
160 | return proto().unregister_handler(t); | |
161 | }).finally([this] { | |
162 | return server().stop(); | |
163 | }); | |
164 | } | |
165 | ||
166 | template<typename Func> | |
167 | auto register_handler(MsgType t, scheduling_group sg, Func func) { | |
168 | _handlers.emplace_back(t); | |
169 | return proto().register_handler(t, sg, std::move(func)); | |
170 | } | |
171 | ||
172 | future<> unregister_handler(MsgType t) { | |
173 | auto it = std::find(_handlers.begin(), _handlers.end(), t); | |
174 | assert(it != _handlers.end()); | |
175 | _handlers.erase(it); | |
176 | return proto().unregister_handler(t); | |
177 | } | |
11fdf7f2 | 178 | }; |
9f95a23c TL |
179 | |
180 | rpc_test_config _cfg; | |
181 | loopback_connection_factory _lcf; | |
f67539c2 | 182 | std::unique_ptr<sharded<rpc_test_service>> _service; |
9f95a23c TL |
183 | |
184 | public: | |
185 | rpc_test_env() = delete; | |
186 | explicit rpc_test_env(rpc_test_config cfg) | |
f67539c2 | 187 | : _cfg(cfg), _service(std::make_unique<sharded<rpc_test_service>>()) |
9f95a23c TL |
188 | { |
189 | } | |
190 | ||
191 | using test_fn = std::function<future<> (rpc_test_env<MsgType>& env)>; | |
192 | static future<> do_with(rpc_test_config cfg, test_fn&& func) { | |
193 | return seastar::do_with(rpc_test_env(cfg), [func] (rpc_test_env<MsgType>& env) { | |
194 | return env.start().then([&env, func] { | |
195 | return func(env); | |
196 | }).finally([&env] { | |
197 | return env.stop(); | |
11fdf7f2 TL |
198 | }); |
199 | }); | |
9f95a23c TL |
200 | } |
201 | ||
202 | using thread_test_fn = std::function<void (rpc_test_env<MsgType>& env)>; | |
203 | static future<> do_with_thread(rpc_test_config cfg, thread_test_fn&& func) { | |
204 | return do_with(std::move(cfg), [func] (rpc_test_env<MsgType>& env) { | |
205 | return seastar::async([&env, func] { | |
206 | func(env); | |
207 | }); | |
208 | }); | |
209 | } | |
210 | ||
211 | using thread_test_fn_with_client = std::function<void (rpc_test_env<MsgType>& env, test_rpc_proto::client& cl)>; | |
212 | static future<> do_with_thread(rpc_test_config cfg, rpc::client_options co, thread_test_fn_with_client&& func) { | |
213 | return do_with(std::move(cfg), [func, co = std::move(co)] (rpc_test_env<MsgType>& env) { | |
214 | return seastar::async([&env, func, co = std::move(co)] { | |
215 | test_rpc_proto::client cl(env.proto(), co, env.make_socket(), ipv4_addr()); | |
20effc67 | 216 | auto stop = deferred_stop(cl); |
9f95a23c TL |
217 | func(env, cl); |
218 | }); | |
219 | }); | |
220 | } | |
221 | ||
222 | static future<> do_with_thread(rpc_test_config cfg, thread_test_fn_with_client&& func) { | |
223 | return do_with_thread(std::move(cfg), rpc::client_options(), std::move(func)); | |
224 | } | |
225 | ||
226 | auto make_socket() { | |
227 | return seastar::socket(std::make_unique<rpc_socket_impl>(_lcf, _cfg.connect, _cfg.inject_error)); | |
228 | }; | |
229 | ||
230 | test_rpc_proto& proto() { | |
231 | return local_service().proto(); | |
232 | } | |
233 | ||
234 | test_rpc_proto::server& server() { | |
235 | return local_service().server(); | |
236 | } | |
237 | ||
238 | template<typename Func> | |
239 | future<> register_handler(MsgType t, scheduling_group sg, Func func) { | |
f67539c2 | 240 | return _service->invoke_on_all([t, func = std::move(func), sg] (rpc_test_service& s) mutable { |
9f95a23c TL |
241 | s.register_handler(t, sg, std::move(func)); |
242 | }); | |
243 | } | |
244 | ||
245 | template<typename Func> | |
246 | future<> register_handler(MsgType t, Func func) { | |
247 | return register_handler(t, scheduling_group(), std::move(func)); | |
248 | } | |
249 | ||
250 | future<> unregister_handler(MsgType t) { | |
f67539c2 | 251 | return _service->invoke_on_all([t] (rpc_test_service& s) mutable { |
9f95a23c TL |
252 | return s.unregister_handler(t); |
253 | }); | |
254 | } | |
255 | ||
256 | private: | |
257 | rpc_test_service& local_service() { | |
f67539c2 TL |
258 | return _service->local(); |
259 | ||
9f95a23c TL |
260 | } |
261 | ||
262 | future<> start() { | |
f67539c2 | 263 | return _service->start(std::cref(_cfg), std::ref(_lcf)); |
9f95a23c TL |
264 | } |
265 | ||
266 | future<> stop() { | |
f67539c2 | 267 | return _service->stop().then([this] { |
9f95a23c TL |
268 | return _lcf.destroy_all_shards(); |
269 | }); | |
270 | } | |
271 | }; | |
11fdf7f2 TL |
272 | |
273 | struct cfactory : rpc::compressor::factory { | |
274 | mutable int use_compression = 0; | |
275 | const sstring name; | |
276 | cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {} | |
277 | const sstring& supported() const override { | |
278 | return name; | |
279 | } | |
f67539c2 TL |
280 | class mylz4 : public rpc::lz4_compressor { |
281 | sstring _name; | |
282 | public: | |
283 | mylz4(const sstring& n) : _name(n) {} | |
284 | sstring name() const override { | |
285 | return _name; | |
286 | } | |
287 | }; | |
11fdf7f2 TL |
288 | std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override { |
289 | if (feature == name) { | |
290 | use_compression++; | |
f67539c2 | 291 | return std::make_unique<mylz4>(name); |
11fdf7f2 TL |
292 | } else { |
293 | return nullptr; | |
294 | } | |
295 | } | |
296 | }; | |
9f95a23c | 297 | |
11fdf7f2 TL |
298 | SEASTAR_TEST_CASE(test_rpc_connect) { |
299 | std::vector<future<>> fs; | |
300 | ||
301 | for (auto i = 0; i < 2; i++) { | |
302 | for (auto j = 0; j < 4; j++) { | |
303 | auto factory = std::make_unique<cfactory>(); | |
304 | rpc::server_options so; | |
305 | rpc::client_options co; | |
306 | if (i == 1) { | |
307 | so.compressor_factory = factory.get(); | |
308 | } | |
309 | if (j & 1) { | |
310 | co.compressor_factory = factory.get(); | |
311 | } | |
312 | co.send_timeout_data = j & 2; | |
9f95a23c TL |
313 | rpc_test_config cfg; |
314 | cfg.server_options = so; | |
315 | auto f = rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
316 | env.register_handler(1, [](int a, int b) { | |
317 | return make_ready_future<int>(a+b); | |
318 | }).get(); | |
319 | auto sum = env.proto().make_client<int (int, int)>(1); | |
320 | auto result = sum(c1, 2, 3).get0(); | |
321 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
11fdf7f2 TL |
322 | }).handle_exception([] (auto ep) { |
323 | BOOST_FAIL("No exception expected"); | |
324 | }).finally([factory = std::move(factory), i, j = j & 1] { | |
325 | if (i == 1 && j == 1) { | |
326 | BOOST_REQUIRE_EQUAL(factory->use_compression, 2); | |
327 | } else { | |
328 | BOOST_REQUIRE_EQUAL(factory->use_compression, 0); | |
329 | } | |
330 | }); | |
331 | fs.emplace_back(std::move(f)); | |
332 | } | |
333 | } | |
334 | return when_all(fs.begin(), fs.end()).discard_result(); | |
335 | } | |
336 | ||
337 | SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) { | |
338 | auto factory1 = std::make_unique<cfactory>(); | |
339 | auto factory2 = std::make_unique<cfactory>("LZ4NEW"); | |
340 | rpc::server_options so; | |
341 | rpc::client_options co; | |
342 | static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()}); | |
343 | static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()}); | |
344 | so.compressor_factory = &server; | |
345 | co.compressor_factory = &client; | |
9f95a23c TL |
346 | rpc_test_config cfg; |
347 | cfg.server_options = so; | |
348 | return rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
349 | env.register_handler(1, [](int a, int b) { | |
350 | return make_ready_future<int>(a+b); | |
351 | }).get(); | |
352 | auto sum = env.proto().make_client<int (int, int)>(1); | |
353 | auto result = sum(c1, 2, 3).get0(); | |
354 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
11fdf7f2 TL |
355 | }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] { |
356 | BOOST_REQUIRE_EQUAL(factory1->use_compression, 0); | |
357 | BOOST_REQUIRE_EQUAL(factory2->use_compression, 2); | |
358 | }); | |
359 | } | |
360 | ||
361 | SEASTAR_TEST_CASE(test_rpc_connect_abort) { | |
9f95a23c TL |
362 | rpc_test_config cfg; |
363 | cfg.connect = false; | |
364 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { | |
365 | test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr()); | |
366 | env.register_handler(1, []() { return make_ready_future<>(); }).get(); | |
367 | auto f = env.proto().make_client<void ()>(1); | |
368 | c1.stop().get0(); | |
369 | try { | |
370 | f(c1).get0(); | |
371 | BOOST_REQUIRE(false); | |
372 | } catch (...) {} | |
11fdf7f2 TL |
373 | }); |
374 | } | |
375 | ||
376 | SEASTAR_TEST_CASE(test_rpc_cancel) { | |
377 | using namespace std::chrono_literals; | |
9f95a23c TL |
378 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { |
379 | bool rpc_executed = false; | |
380 | int good = 0; | |
381 | promise<> handler_called; | |
382 | future<> f_handler_called = handler_called.get_future(); | |
383 | env.register_handler(1, [&rpc_executed, &handler_called] { | |
384 | handler_called.set_value(); rpc_executed = true; return sleep(1ms); | |
385 | }).get(); | |
386 | auto call = env.proto().make_client<void ()>(1); | |
387 | rpc::cancellable cancel; | |
388 | auto f = call(c1, cancel); | |
389 | // cancel send side | |
390 | cancel.cancel(); | |
391 | try { | |
392 | f.get(); | |
393 | } catch(rpc::canceled_error&) { | |
394 | good += !rpc_executed; | |
395 | }; | |
396 | f = call(c1, cancel); | |
397 | // cancel wait side | |
398 | f_handler_called.then([&cancel] { | |
11fdf7f2 | 399 | cancel.cancel(); |
9f95a23c TL |
400 | }).get(); |
401 | try { | |
402 | f.get(); | |
403 | } catch(rpc::canceled_error&) { | |
404 | good += 10*rpc_executed; | |
405 | }; | |
406 | BOOST_REQUIRE_EQUAL(good, 11); | |
11fdf7f2 TL |
407 | }); |
408 | } | |
409 | ||
410 | SEASTAR_TEST_CASE(test_message_to_big) { | |
9f95a23c TL |
411 | rpc_test_config cfg; |
412 | cfg.resource_limits = {0, 1, 100}; | |
413 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env, test_rpc_proto::client& c) { | |
414 | bool good = true; | |
415 | env.register_handler(1, [&] (sstring payload) mutable { | |
416 | good = false; | |
417 | }).get(); | |
418 | auto call = env.proto().make_client<void (sstring)>(1); | |
419 | try { | |
f67539c2 | 420 | call(c, uninitialized_string(101)).get(); |
9f95a23c TL |
421 | good = false; |
422 | } catch(std::runtime_error& err) { | |
423 | } catch(...) { | |
424 | good = false; | |
425 | } | |
426 | BOOST_REQUIRE_EQUAL(good, true); | |
11fdf7f2 TL |
427 | }); |
428 | } | |
11fdf7f2 TL |
429 | |
430 | struct stream_test_result { | |
431 | bool client_source_closed = false; | |
432 | bool server_source_closed = false; | |
433 | bool sink_exception = false; | |
434 | bool sink_close_exception = false; | |
435 | bool source_done_exception = false; | |
436 | bool server_done_exception = false; | |
437 | bool client_stop_exception = false; | |
438 | int server_sum = 0; | |
439 | }; | |
440 | ||
f67539c2 TL |
441 | future<stream_test_result> stream_test_func(rpc_test_env<>& env, bool stop_client, bool expect_connection_error = false) { |
442 | return seastar::async([&env, stop_client, expect_connection_error] { | |
11fdf7f2 | 443 | stream_test_result r; |
9f95a23c | 444 | test_rpc_proto::client c(env.proto(), {}, env.make_socket(), ipv4_addr()); |
11fdf7f2 | 445 | future<> server_done = make_ready_future(); |
9f95a23c | 446 | env.register_handler(1, [&](int i, rpc::source<int> source) { |
11fdf7f2 TL |
447 | BOOST_REQUIRE_EQUAL(i, 666); |
448 | auto sink = source.make_sink<serializer, sstring>(); | |
449 | auto sink_loop = seastar::async([sink] () mutable { | |
450 | for (auto i = 0; i < 100; i++) { | |
451 | sink("seastar").get(); | |
452 | sleep(std::chrono::milliseconds(1)).get(); | |
453 | } | |
f67539c2 TL |
454 | }).finally([sink] () mutable { |
455 | return sink.flush(); | |
456 | }).finally([sink] () mutable { | |
457 | return sink.close(); | |
458 | }).finally([sink] {}); | |
459 | ||
11fdf7f2 TL |
460 | auto source_loop = seastar::async([source, &r] () mutable { |
461 | while (!r.server_source_closed) { | |
462 | auto data = source().get0(); | |
463 | if (data) { | |
464 | r.server_sum += std::get<0>(*data); | |
465 | } else { | |
466 | r.server_source_closed = true; | |
9f95a23c TL |
467 | try { |
468 | // check that reading after eos does not crash | |
469 | // and throws correct exception | |
470 | source().get(); | |
471 | } catch (rpc::stream_closed& ex) { | |
472 | // expected | |
473 | } catch (...) { | |
474 | BOOST_FAIL("wrong exception on reading from a stream after eos"); | |
475 | } | |
11fdf7f2 TL |
476 | } |
477 | } | |
478 | }); | |
479 | server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result(); | |
480 | return sink; | |
9f95a23c TL |
481 | }).get(); |
482 | auto call = env.proto().make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1); | |
11fdf7f2 TL |
483 | auto x = [&] { |
484 | try { | |
9f95a23c | 485 | return c.make_stream_sink<serializer, int>(env.make_socket()).get0(); |
11fdf7f2 TL |
486 | } catch (...) { |
487 | c.stop().get(); | |
488 | throw; | |
489 | } | |
490 | }; | |
491 | auto sink = x(); | |
492 | auto source = call(c, 666, sink).get0(); | |
493 | auto source_done = seastar::async([&] { | |
494 | while (!r.client_source_closed) { | |
495 | auto data = source().get0(); | |
496 | if (data) { | |
497 | BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar"); | |
498 | } else { | |
499 | r.client_source_closed = true; | |
500 | } | |
501 | } | |
502 | }); | |
503 | auto check_exception = [] (auto f) { | |
504 | try { | |
505 | f.get(); | |
506 | } catch (...) { | |
507 | return true; | |
508 | } | |
509 | return false; | |
510 | }; | |
511 | future<> stop_client_future = make_ready_future(); | |
f67539c2 TL |
512 | // With a connection error sink() will eventually fail, but we |
513 | // cannot guarantee when. | |
514 | int max = expect_connection_error ? std::numeric_limits<int>::max() : 101; | |
515 | for (int i = 1; i < max; i++) { | |
11fdf7f2 TL |
516 | if (stop_client && i == 50) { |
517 | // stop client while stream is in use | |
518 | stop_client_future = c.stop(); | |
519 | } | |
520 | sleep(std::chrono::milliseconds(1)).get(); | |
521 | r.sink_exception = check_exception(sink(i)); | |
522 | if (r.sink_exception) { | |
523 | break; | |
524 | } | |
525 | } | |
526 | r.sink_close_exception = check_exception(sink.close()); | |
527 | r.source_done_exception = check_exception(std::move(source_done)); | |
528 | r.server_done_exception = check_exception(std::move(server_done)); | |
529 | r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future)); | |
530 | return r; | |
531 | }); | |
532 | } | |
533 | ||
534 | SEASTAR_TEST_CASE(test_stream_simple) { | |
535 | rpc::server_options so; | |
536 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
537 | rpc_test_config cfg; |
538 | cfg.server_options = so; | |
539 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
540 | return stream_test_func(env, false).then([] (stream_test_result r) { | |
541 | BOOST_REQUIRE(r.client_source_closed); | |
542 | BOOST_REQUIRE(r.server_source_closed); | |
543 | BOOST_REQUIRE(r.server_sum == 5050); | |
544 | BOOST_REQUIRE(!r.sink_exception); | |
545 | BOOST_REQUIRE(!r.sink_close_exception); | |
546 | BOOST_REQUIRE(!r.source_done_exception); | |
547 | BOOST_REQUIRE(!r.server_done_exception); | |
548 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
549 | }); |
550 | }); | |
551 | } | |
552 | ||
553 | SEASTAR_TEST_CASE(test_stream_stop_client) { | |
554 | rpc::server_options so; | |
555 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
556 | rpc_test_config cfg; |
557 | cfg.server_options = so; | |
558 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
559 | return stream_test_func(env, true).then([] (stream_test_result r) { | |
560 | BOOST_REQUIRE(!r.client_source_closed); | |
561 | BOOST_REQUIRE(!r.server_source_closed); | |
562 | BOOST_REQUIRE(r.sink_exception); | |
563 | BOOST_REQUIRE(r.sink_close_exception); | |
564 | BOOST_REQUIRE(r.source_done_exception); | |
565 | BOOST_REQUIRE(r.server_done_exception); | |
566 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
567 | }); |
568 | }); | |
569 | } | |
570 | ||
571 | ||
572 | SEASTAR_TEST_CASE(test_stream_connection_error) { | |
573 | rpc::server_options so; | |
574 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
575 | rpc_test_config cfg; |
576 | cfg.server_options = so; | |
577 | cfg.inject_error = true; | |
578 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
f67539c2 | 579 | return stream_test_func(env, false, true).then([] (stream_test_result r) { |
9f95a23c TL |
580 | BOOST_REQUIRE(!r.client_source_closed); |
581 | BOOST_REQUIRE(!r.server_source_closed); | |
582 | BOOST_REQUIRE(r.sink_exception); | |
583 | BOOST_REQUIRE(r.sink_close_exception); | |
584 | BOOST_REQUIRE(r.source_done_exception); | |
585 | BOOST_REQUIRE(r.server_done_exception); | |
586 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
587 | }); |
588 | }); | |
589 | } | |
590 | ||
591 | SEASTAR_TEST_CASE(test_rpc_scheduling) { | |
9f95a23c TL |
592 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { |
593 | auto sg = create_scheduling_group("rpc", 100).get0(); | |
594 | env.register_handler(1, sg, [] () { | |
595 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
596 | }).get(); | |
597 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
598 | auto id = call_sg_id(c1).get0(); | |
599 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg)); | |
11fdf7f2 TL |
600 | }); |
601 | } | |
602 | ||
603 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) { | |
604 | auto sg1 = create_scheduling_group("sg1", 100).get0(); | |
20effc67 | 605 | auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); }); |
11fdf7f2 | 606 | auto sg2 = create_scheduling_group("sg2", 100).get0(); |
20effc67 | 607 | auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); }); |
11fdf7f2 TL |
608 | rpc::resource_limits limits; |
609 | limits.isolate_connection = [sg1, sg2] (sstring cookie) { | |
610 | auto sg = current_scheduling_group(); | |
611 | if (cookie == "sg1") { | |
612 | sg = sg1; | |
613 | } else if (cookie == "sg2") { | |
614 | sg = sg2; | |
615 | } | |
616 | rpc::isolation_config cfg; | |
617 | cfg.sched_group = sg; | |
618 | return cfg; | |
619 | }; | |
9f95a23c TL |
620 | rpc_test_config cfg; |
621 | cfg.resource_limits = limits; | |
622 | rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) { | |
623 | rpc::client_options co1; | |
624 | co1.isolation_cookie = "sg1"; | |
625 | test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); | |
626 | rpc::client_options co2; | |
627 | co2.isolation_cookie = "sg2"; | |
628 | test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); | |
629 | env.register_handler(1, [] { | |
630 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
631 | }).get(); | |
632 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
633 | unsigned id; | |
634 | id = call_sg_id(c1).get0(); | |
635 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
636 | id = call_sg_id(c2).get0(); | |
637 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); | |
638 | c1.stop().get(); | |
639 | c2.stop().get(); | |
11fdf7f2 TL |
640 | }).get(); |
641 | } | |
642 | ||
643 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) { | |
644 | auto sg1 = create_scheduling_group("sg1", 100).get0(); | |
20effc67 | 645 | auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); }); |
11fdf7f2 | 646 | auto sg2 = create_scheduling_group("sg2", 100).get0(); |
20effc67 | 647 | auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); }); |
11fdf7f2 TL |
648 | rpc::resource_limits limits; |
649 | limits.isolate_connection = [sg1, sg2] (sstring cookie) { | |
650 | auto sg = current_scheduling_group(); | |
651 | if (cookie == "sg1") { | |
652 | sg = sg1; | |
653 | } else if (cookie == "sg2") { | |
654 | sg = sg2; | |
655 | } | |
656 | rpc::isolation_config cfg; | |
657 | cfg.sched_group = sg; | |
658 | return cfg; | |
659 | }; | |
9f95a23c TL |
660 | rpc_test_config cfg; |
661 | cfg.resource_limits = limits; | |
662 | rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) { | |
663 | rpc::client_options co1; | |
664 | co1.isolation_cookie = "sg1"; | |
665 | test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); | |
666 | rpc::client_options co2; | |
667 | co2.isolation_cookie = "sg2"; | |
668 | test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); | |
669 | // An old client, that doesn't have an isolation cookie | |
670 | rpc::client_options co3; | |
671 | test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr()); | |
672 | // A server that uses sg1 if the client is old | |
673 | env.register_handler(1, sg1, [] () { | |
674 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
675 | }).get(); | |
676 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
677 | unsigned id; | |
678 | id = call_sg_id(c1).get0(); | |
679 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
680 | id = call_sg_id(c2).get0(); | |
681 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); | |
682 | id = call_sg_id(c3).get0(); | |
683 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
684 | c1.stop().get(); | |
685 | c2.stop().get(); | |
686 | c3.stop().get(); | |
687 | }).get(); | |
688 | } | |
689 | ||
690 | void test_compressor(std::function<std::unique_ptr<seastar::rpc::compressor>()> compressor_factory) { | |
691 | using namespace seastar::rpc; | |
692 | ||
693 | auto linearize = [&] (const auto& buffer) { | |
694 | return seastar::visit(buffer.bufs, | |
695 | [] (const temporary_buffer<char>& buf) { | |
696 | return buf.clone(); | |
697 | }, | |
698 | [&] (const std::vector<temporary_buffer<char>>& bufs) { | |
699 | auto buf = temporary_buffer<char>(buffer.size); | |
700 | auto dst = buf.get_write(); | |
701 | for (auto& b : bufs) { | |
702 | dst = std::copy_n(b.get(), b.size(), dst); | |
703 | } | |
704 | return buf; | |
705 | } | |
706 | ); | |
707 | }; | |
708 | ||
709 | auto split_buffer = [&] (temporary_buffer<char> b, size_t chunk_size) { | |
710 | std::vector<temporary_buffer<char>> bufs; | |
711 | auto src = b.get(); | |
712 | auto n = b.size(); | |
713 | while (n) { | |
714 | auto this_chunk = std::min(chunk_size, n); | |
715 | bufs.emplace_back(this_chunk); | |
716 | std::copy_n(src, this_chunk, bufs.back().get_write()); | |
717 | src += this_chunk; | |
718 | n -= this_chunk; | |
719 | } | |
720 | return bufs; | |
721 | }; | |
722 | ||
723 | auto clone = [&] (const auto& buffer) { | |
724 | auto c = std::decay_t<decltype(buffer)>(); | |
725 | c.size = buffer.size; | |
726 | c.bufs = seastar::visit(buffer.bufs, | |
727 | [] (const temporary_buffer<char>& buf) -> decltype(c.bufs) { | |
728 | return buf.clone(); | |
729 | }, | |
730 | [] (const std::vector<temporary_buffer<char>>& bufs) -> decltype(c.bufs) { | |
731 | std::vector<temporary_buffer<char>> c; | |
732 | c.reserve(bufs.size()); | |
733 | for (auto& b : bufs) { | |
734 | c.emplace_back(b.clone()); | |
735 | } | |
f67539c2 | 736 | return c; |
9f95a23c TL |
737 | } |
738 | ); | |
739 | return c; | |
740 | }; | |
741 | ||
742 | auto compressor = compressor_factory(); | |
743 | ||
20effc67 TL |
744 | std::vector<noncopyable_function<std::tuple<sstring, size_t, snd_buf> ()>> inputs; |
745 | ||
746 | auto add_input = [&] (auto func_returning_tuple) { | |
747 | inputs.emplace_back(std::move(func_returning_tuple)); | |
748 | }; | |
9f95a23c TL |
749 | |
750 | auto& eng = testing::local_random_engine; | |
751 | auto dist = std::uniform_int_distribution<char>(); | |
752 | ||
753 | auto snd = snd_buf(1); | |
754 | *snd.front().get_write() = 'a'; | |
20effc67 | 755 | add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, no headroom"), size_t(0), std::move(snd)); }); |
9f95a23c TL |
756 | |
757 | snd = snd_buf(1); | |
758 | *snd.front().get_write() = 'a'; | |
20effc67 | 759 | add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, 128k of headroom"), size_t(128 * 1024), std::move(snd)); }); |
9f95a23c | 760 | |
f67539c2 TL |
761 | auto gen_fill = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
762 | auto buf = temporary_buffer<char>(s); | |
763 | std::fill_n(buf.get_write(), s, 'a'); | |
9f95a23c | 764 | |
f67539c2 TL |
765 | auto snd = snd_buf(); |
766 | snd.size = s; | |
767 | if (split) { | |
768 | snd.bufs = split_buffer(buf.clone(), *split); | |
769 | } else { | |
770 | snd.bufs = buf.clone(); | |
771 | } | |
20effc67 | 772 | return std::tuple(msg, size_t(0), std::move(snd)); |
f67539c2 | 773 | }; |
9f95a23c | 774 | |
20effc67 TL |
775 | |
776 | add_input(std::bind(gen_fill, 16 * 1024, "single 16 kB buffer of \'a\'")); | |
9f95a23c | 777 | |
f67539c2 TL |
778 | auto gen_rand = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
779 | auto buf = temporary_buffer<char>(s); | |
780 | std::generate_n(buf.get_write(), s, [&] { return dist(eng); }); | |
9f95a23c | 781 | |
f67539c2 TL |
782 | auto snd = snd_buf(); |
783 | snd.size = s; | |
784 | if (split) { | |
785 | snd.bufs = split_buffer(buf.clone(), *split); | |
786 | } else { | |
787 | snd.bufs = buf.clone(); | |
788 | } | |
20effc67 | 789 | return std::tuple(msg, size_t(0), std::move(snd)); |
f67539c2 | 790 | }; |
9f95a23c | 791 | |
20effc67 | 792 | add_input(std::bind(gen_rand, 16 * 1024, "single 16 kB buffer of random")); |
9f95a23c | 793 | |
f67539c2 TL |
794 | auto gen_text = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
795 | static const std::string_view text = "The quick brown fox wants bananas for his long term health but sneaks bacon behind his wife's back. "; | |
9f95a23c | 796 | |
f67539c2 TL |
797 | auto buf = temporary_buffer<char>(s); |
798 | size_t n = 0; | |
799 | while (n < s) { | |
800 | auto rem = std::min(s - n, text.size()); | |
801 | std::copy(text.data(), text.data() + rem, buf.get_write() + n); | |
802 | n += rem; | |
803 | } | |
9f95a23c | 804 | |
f67539c2 TL |
805 | auto snd = snd_buf(); |
806 | snd.size = s; | |
807 | if (split) { | |
808 | snd.bufs = split_buffer(buf.clone(), *split); | |
809 | } else { | |
810 | snd.bufs = buf.clone(); | |
811 | } | |
20effc67 | 812 | return std::tuple(msg, size_t(0), std::move(snd)); |
f67539c2 | 813 | }; |
9f95a23c | 814 | |
9f95a23c | 815 | |
20effc67 | 816 | for (auto s : { 1, 4 }) { |
f67539c2 | 817 | for (auto ss : { 32, 64, 128, 48, 56, 246, 511 }) { |
20effc67 TL |
818 | add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB - {}", s, ss, ss), ss * 1024 - ss)); |
819 | add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB", s, ss), ss * 1024)); | |
820 | add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} kB", s, ss), ss * 1024)); | |
9f95a23c | 821 | |
20effc67 TL |
822 | add_input(std::bind(gen_fill, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of \'a\' split into {} kB", s, ss), ss * 1024)); |
823 | add_input(std::bind(gen_rand, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of random split into {} kB", s, ss), ss * 1024)); | |
f67539c2 | 824 | } |
9f95a23c | 825 | |
f67539c2 | 826 | for (auto ss : { 128, 246, 511, 3567, 2*1024, 8*1024 }) { |
20effc67 TL |
827 | add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} B", s, ss), ss)); |
828 | add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} B", s, ss), ss)); | |
829 | add_input(std::bind(gen_text, s * 1024 * 1024, format("{} MB buffer of text split into {} B", s, ss), ss)); | |
830 | add_input(std::bind(gen_fill, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of \'a\' split into {} B", s, ss, ss), ss)); | |
831 | add_input(std::bind(gen_rand, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss)); | |
832 | add_input(std::bind(gen_text, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss)); | |
f67539c2 TL |
833 | } |
834 | } | |
9f95a23c | 835 | |
f67539c2 | 836 | for (auto s : { 64*1024 + 5670, 16*1024 + 3421, 32*1024 - 321 }) { |
20effc67 TL |
837 | add_input(std::bind(gen_fill, s, format("{} bytes buffer of \'a\'", s))); |
838 | add_input(std::bind(gen_rand, s, format("{} bytes buffer of random", s))); | |
839 | add_input(std::bind(gen_text, s, format("{} bytes buffer of text", s))); | |
f67539c2 | 840 | } |
9f95a23c TL |
841 | |
842 | std::vector<std::tuple<sstring, std::function<rcv_buf(snd_buf)>>> transforms { | |
843 | { "identity", [] (snd_buf snd) { | |
844 | rcv_buf rcv; | |
845 | rcv.size = snd.size; | |
846 | rcv.bufs = std::move(snd.bufs); | |
847 | return rcv; | |
848 | } }, | |
849 | { "linearized", [&linearize] (snd_buf snd) { | |
850 | rcv_buf rcv; | |
851 | rcv.size = snd.size; | |
852 | rcv.bufs = linearize(snd); | |
853 | return rcv; | |
854 | } }, | |
855 | { "split 1 B", [&] (snd_buf snd) { | |
856 | rcv_buf rcv; | |
857 | rcv.size = snd.size; | |
858 | rcv.bufs = split_buffer(linearize(snd), 1); | |
859 | return rcv; | |
860 | } }, | |
861 | { "split 129 B", [&] (snd_buf snd) { | |
862 | rcv_buf rcv; | |
863 | rcv.size = snd.size; | |
864 | rcv.bufs = split_buffer(linearize(snd), 129); | |
865 | return rcv; | |
866 | } }, | |
867 | { "split 4 kB", [&] (snd_buf snd) { | |
868 | rcv_buf rcv; | |
869 | rcv.size = snd.size; | |
870 | rcv.bufs = split_buffer(linearize(snd), 4096); | |
871 | return rcv; | |
872 | } }, | |
873 | { "split 4 kB - 128", [&] (snd_buf snd) { | |
874 | rcv_buf rcv; | |
875 | rcv.size = snd.size; | |
876 | rcv.bufs = split_buffer(linearize(snd), 4096 - 128); | |
877 | return rcv; | |
878 | } }, | |
879 | }; | |
880 | ||
881 | auto sanity_check = [&] (const auto& buffer) { | |
882 | auto actual_size = seastar::visit(buffer.bufs, | |
883 | [] (const temporary_buffer<char>& buf) { | |
884 | return buf.size(); | |
885 | }, | |
886 | [] (const std::vector<temporary_buffer<char>>& bufs) { | |
887 | return boost::accumulate(bufs, size_t(0), [] (size_t sz, const temporary_buffer<char>& buf) { | |
888 | return sz + buf.size(); | |
889 | }); | |
890 | } | |
891 | ); | |
892 | BOOST_CHECK_EQUAL(actual_size, buffer.size); | |
893 | }; | |
894 | ||
20effc67 TL |
895 | for (auto& in_func : inputs) { |
896 | auto in = in_func(); | |
9f95a23c TL |
897 | BOOST_TEST_MESSAGE("Input: " << std::get<0>(in)); |
898 | auto headroom = std::get<1>(in); | |
899 | auto compressed = compressor->compress(headroom, clone(std::get<2>(in))); | |
900 | sanity_check(compressed); | |
901 | ||
902 | // Remove headroom | |
903 | BOOST_CHECK_GE(compressed.size, headroom); | |
904 | compressed.size -= headroom; | |
905 | seastar::visit(compressed.bufs, | |
906 | [&] (temporary_buffer<char>& buf) { | |
907 | BOOST_CHECK_GE(buf.size(), headroom); | |
908 | buf.trim_front(headroom); | |
909 | }, | |
910 | [&] (std::vector<temporary_buffer<char>>& bufs) { | |
911 | while (headroom) { | |
912 | BOOST_CHECK(!bufs.empty()); | |
913 | auto to_remove = std::min(bufs.front().size(), headroom); | |
914 | bufs.front().trim_front(to_remove); | |
915 | if (bufs.front().empty() && bufs.size() > 1) { | |
916 | bufs.erase(bufs.begin()); | |
917 | } | |
918 | headroom -= to_remove; | |
11fdf7f2 | 919 | } |
9f95a23c TL |
920 | } |
921 | ); | |
922 | ||
923 | auto in_l = linearize(std::get<2>(in)); | |
924 | ||
925 | for (auto& t : transforms) { | |
926 | BOOST_TEST_MESSAGE(" Transform: " << std::get<0>(t)); | |
927 | auto received = std::get<1>(t)(clone(compressed)); | |
928 | ||
929 | auto decompressed = compressor->decompress(std::move(received)); | |
930 | sanity_check(decompressed); | |
931 | ||
932 | BOOST_CHECK_EQUAL(decompressed.size, std::get<2>(in).size); | |
933 | ||
934 | auto out_l = linearize(decompressed); | |
935 | ||
936 | BOOST_CHECK_EQUAL(in_l.size(), out_l.size()); | |
937 | BOOST_CHECK(in_l == out_l); | |
938 | } | |
939 | } | |
940 | } | |
941 | ||
942 | SEASTAR_THREAD_TEST_CASE(test_lz4_compressor) { | |
943 | test_compressor([] { return std::make_unique<rpc::lz4_compressor>(); }); | |
944 | } | |
945 | ||
946 | SEASTAR_THREAD_TEST_CASE(test_lz4_fragmented_compressor) { | |
947 | test_compressor([] { return std::make_unique<rpc::lz4_fragmented_compressor>(); }); | |
948 | } | |
949 | ||
950 | // Test reproducing issue #671: If timeout is time_point::max(), translating | |
951 | // it to relative timeout in the sender and then back in the receiver, when | |
952 | // these calculations happen across a millisecond boundary, overflowed the | |
953 | // integer and mislead the receiver to think the requested timeout was | |
954 | // negative, and cause it drop its response, so the RPC call never completed. | |
955 | SEASTAR_TEST_CASE(test_max_absolute_timeout) { | |
956 | // The typical failure of this test is a hang. So we use semaphore to | |
957 | // stop the test either when it succeeds, or after a long enough hang. | |
958 | auto success = make_lw_shared<bool>(false); | |
959 | auto done = make_lw_shared<semaphore>(0); | |
960 | auto abrt = make_lw_shared<abort_source>(); | |
961 | (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] { | |
962 | done->signal(1); | |
963 | }).handle_exception([] (std::exception_ptr) {}); | |
964 | rpc::client_options co; | |
965 | co.send_timeout_data = 1; | |
966 | (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
967 | env.register_handler(1, [](int a, int b) { | |
968 | return make_ready_future<int>(a+b); | |
969 | }).get(); | |
970 | auto sum = env.proto().make_client<int (int, int)>(1); | |
971 | // The bug only reproduces if the calculation done on the sender | |
972 | // and receiver sides, happened across a millisecond boundary. | |
973 | // We can't control when it happens, so we just need to loop many | |
974 | // times, at least many milliseconds, to increase the probability | |
975 | // that we catch the bug. Experimentally, if we loop for 200ms, we | |
976 | // catch the bug in #671 virtually every time. | |
977 | auto until = seastar::lowres_clock::now() + std::chrono::milliseconds(200); | |
978 | while (seastar::lowres_clock::now() <= until) { | |
979 | auto result = sum(c1, rpc::rpc_clock_type::time_point::max(), 2, 3).get0(); | |
980 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
981 | } | |
982 | }).then([success, done, abrt] { | |
983 | *success = true; | |
984 | abrt->request_abort(); | |
985 | done->signal(); | |
986 | }); | |
987 | return done->wait().then([done, success] { | |
988 | BOOST_REQUIRE(*success); | |
989 | }); | |
990 | } | |
991 | ||
992 | // Similar to the above test: Test that a relative timeout duration::max() | |
993 | // also works, and again doesn't cause the timeout wrapping around to the | |
994 | // past and causing dropped responses. | |
995 | SEASTAR_TEST_CASE(test_max_relative_timeout) { | |
996 | // The typical failure of this test is a hang. So we use semaphore to | |
997 | // stop the test either when it succeeds, or after a long enough hang. | |
998 | auto success = make_lw_shared<bool>(false); | |
999 | auto done = make_lw_shared<semaphore>(0); | |
1000 | auto abrt = make_lw_shared<abort_source>(); | |
1001 | (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] { | |
1002 | done->signal(1); | |
1003 | }).handle_exception([] (std::exception_ptr) {}); | |
1004 | rpc::client_options co; | |
1005 | co.send_timeout_data = 1; | |
1006 | (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1007 | env.register_handler(1, [](int a, int b) { | |
1008 | return make_ready_future<int>(a+b); | |
1009 | }).get(); | |
1010 | auto sum = env.proto().make_client<int (int, int)>(1); | |
1011 | // The following call used to always hang, when max()+now() | |
1012 | // overflowed and appeared to be a negative timeout. | |
1013 | auto result = sum(c1, rpc::rpc_clock_type::duration::max(), 2, 3).get0(); | |
1014 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
1015 | }).then([success, done, abrt] { | |
1016 | *success = true; | |
1017 | abrt->request_abort(); | |
1018 | done->signal(); | |
1019 | }); | |
1020 | return done->wait().then([done, success] { | |
1021 | BOOST_REQUIRE(*success); | |
1022 | }); | |
1023 | } | |
1024 | ||
1025 | SEASTAR_TEST_CASE(test_rpc_tuple) { | |
1026 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1027 | env.register_handler(1, [] () { | |
1028 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L)); | |
1029 | }).get(); | |
1030 | auto f1 = env.proto().make_client<rpc::tuple<int, long> ()>(1); | |
1031 | auto result = f1(c1).get0(); | |
1032 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); | |
1033 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1034 | }); | |
1035 | } | |
1036 | ||
1037 | SEASTAR_TEST_CASE(test_rpc_nonvariadic_client_variadic_server) { | |
1038 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1039 | // Server is variadic | |
1040 | env.register_handler(1, [] () { | |
f67539c2 | 1041 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple(1, 0x7'0000'0000L)); |
9f95a23c TL |
1042 | }).get(); |
1043 | // Client is non-variadic | |
1044 | auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1); | |
1045 | auto result = f1(c1).get0(); | |
1046 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); | |
1047 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1048 | }); | |
1049 | } | |
1050 | ||
1051 | SEASTAR_TEST_CASE(test_rpc_variadic_client_nonvariadic_server) { | |
1052 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1053 | // Server is nonvariadic | |
1054 | env.register_handler(1, [] () { | |
1055 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L)); | |
1056 | }).get(); | |
1057 | // Client is variadic | |
f67539c2 TL |
1058 | auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1); |
1059 | auto result = f1(c1).get0(); | |
9f95a23c TL |
1060 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); |
1061 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1062 | }); | |
1063 | } | |
1064 | ||
1065 | SEASTAR_TEST_CASE(test_handler_registration) { | |
1066 | rpc_test_config cfg; | |
1067 | cfg.connect = false; | |
1068 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { | |
1069 | auto& proto = env.proto(); | |
1070 | ||
f67539c2 TL |
1071 | // new proto must be empty |
1072 | BOOST_REQUIRE(!proto.has_handlers()); | |
1073 | ||
9f95a23c TL |
1074 | // non-existing handler should not be found |
1075 | BOOST_REQUIRE(!proto.has_handler(1)); | |
1076 | ||
1077 | // unregistered non-existing handler should return ready future | |
1078 | auto fut = proto.unregister_handler(1); | |
1079 | BOOST_REQUIRE(fut.available() && !fut.failed()); | |
1080 | fut.get(); | |
1081 | ||
1082 | // existing handler should be found | |
1083 | auto handler = [] () { return make_ready_future<>(); }; | |
1084 | proto.register_handler(1, handler); | |
1085 | BOOST_REQUIRE(proto.has_handler(1)); | |
1086 | ||
1087 | // cannot register handler if already registered | |
1088 | BOOST_REQUIRE_THROW(proto.register_handler(1, handler), std::runtime_error); | |
1089 | ||
1090 | // unregistered handler should not be found | |
1091 | proto.unregister_handler(1).get(); | |
1092 | BOOST_REQUIRE(!proto.has_handler(1)); | |
1093 | ||
1094 | // re-registering a handler should succeed | |
1095 | proto.register_handler(1, handler); | |
1096 | BOOST_REQUIRE(proto.has_handler(1)); | |
f67539c2 TL |
1097 | |
1098 | // proto with handlers must not be empty | |
1099 | BOOST_REQUIRE(proto.has_handlers()); | |
9f95a23c TL |
1100 | }); |
1101 | } | |
1102 | ||
1103 | SEASTAR_TEST_CASE(test_unregister_handler) { | |
1104 | using namespace std::chrono_literals; | |
1105 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1106 | promise<> handler_called; | |
1107 | future<> f_handler_called = handler_called.get_future(); | |
1108 | bool rpc_executed = false; | |
1109 | bool rpc_completed = false; | |
1110 | ||
1111 | auto reset_state = [&f_handler_called, &rpc_executed, &rpc_completed] { | |
1112 | if (f_handler_called.available()) { | |
1113 | f_handler_called.get(); | |
1114 | } | |
1115 | rpc_executed = false; | |
1116 | rpc_completed = false; | |
1117 | }; | |
1118 | ||
1119 | auto get_handler = [&handler_called, &rpc_executed, &rpc_completed] { | |
1120 | return [&handler_called, &rpc_executed, &rpc_completed] { | |
1121 | handler_called.set_value(); | |
1122 | rpc_executed = true; | |
1123 | return sleep(1ms).then([&rpc_completed] { | |
1124 | rpc_completed = true; | |
1125 | }); | |
1126 | }; | |
1127 | }; | |
1128 | ||
1129 | // handler should not run if unregistered before being called | |
1130 | env.register_handler(1, get_handler()).get(); | |
1131 | env.unregister_handler(1).get(); | |
1132 | BOOST_REQUIRE(!f_handler_called.available()); | |
1133 | BOOST_REQUIRE(!rpc_executed); | |
1134 | BOOST_REQUIRE(!rpc_completed); | |
1135 | ||
1136 | // verify normal execution path | |
1137 | env.register_handler(1, get_handler()).get(); | |
1138 | auto call = env.proto().make_client<void ()>(1); | |
1139 | call(c1).get(); | |
1140 | BOOST_REQUIRE(f_handler_called.available()); | |
1141 | BOOST_REQUIRE(rpc_executed); | |
1142 | BOOST_REQUIRE(rpc_completed); | |
1143 | reset_state(); | |
1144 | ||
1145 | // call should fail after handler is unregistered | |
1146 | env.unregister_handler(1).get(); | |
1147 | try { | |
1148 | call(c1).get(); | |
1149 | BOOST_REQUIRE(false); | |
1150 | } catch (rpc::unknown_verb_error&) { | |
1151 | // expected | |
1152 | } catch (...) { | |
1153 | std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl; | |
1154 | BOOST_REQUIRE(false); | |
1155 | } | |
1156 | BOOST_REQUIRE(!f_handler_called.available()); | |
1157 | BOOST_REQUIRE(!rpc_executed); | |
1158 | BOOST_REQUIRE(!rpc_completed); | |
1159 | ||
1160 | // unregistered is allowed while call is in flight | |
1161 | auto delayed_unregister = [&env] { | |
1162 | return sleep(500us).then([&env] { | |
1163 | return env.unregister_handler(1); | |
11fdf7f2 | 1164 | }); |
9f95a23c TL |
1165 | }; |
1166 | ||
1167 | env.register_handler(1, get_handler()).get(); | |
1168 | try { | |
1169 | when_all_succeed(call(c1), delayed_unregister()).get(); | |
1170 | reset_state(); | |
1171 | } catch (rpc::unknown_verb_error&) { | |
1172 | // expected | |
1173 | } catch (...) { | |
1174 | std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl; | |
1175 | BOOST_REQUIRE(false); | |
1176 | } | |
1177 | }); | |
11fdf7f2 | 1178 | } |
9f95a23c | 1179 | |
f67539c2 TL |
1180 | SEASTAR_TEST_CASE(test_loggers) { |
1181 | static seastar::logger log("dummy"); | |
1182 | log.set_level(log_level::debug); | |
1183 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1184 | socket_address dummy_addr; | |
1185 | auto& proto = env.proto(); | |
1186 | auto& logger = proto.get_logger(); | |
1187 | logger(dummy_addr, "Hello0"); | |
1188 | logger(dummy_addr, log_level::debug, "Hello1"); | |
1189 | proto.set_logger(&log); | |
1190 | logger(dummy_addr, "Hello2"); | |
1191 | logger(dummy_addr, log_level::debug, "Hello3"); | |
1192 | // We *want* to test the deprecated API, don't spam warnings about it. | |
1193 | #pragma GCC diagnostic push | |
1194 | #pragma GCC diagnostic ignored "-Wdeprecated-declarations" | |
1195 | proto.set_logger([] (const sstring& str) { | |
1196 | log.info("Test: {}", str); | |
1197 | }); | |
1198 | #pragma GCC diagnostic pop | |
1199 | logger(dummy_addr, "Hello4"); | |
1200 | logger(dummy_addr, log_level::debug, "Hello5"); | |
1201 | proto.set_logger(nullptr); | |
1202 | logger(dummy_addr, "Hello6"); | |
1203 | logger(dummy_addr, log_level::debug, "Hello7"); | |
1204 | }); | |
1205 | } | |
9f95a23c TL |
1206 | |
1207 | static_assert(std::is_same_v<decltype(rpc::tuple(1U, 1L)), rpc::tuple<unsigned, long>>, "rpc::tuple deduction guid not working"); |