]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2016 ScyllaDB | |
20 | */ | |
21 | ||
11fdf7f2 TL |
22 | #include "loopback_socket.hh" |
23 | #include <seastar/rpc/rpc.hh> | |
24 | #include <seastar/rpc/rpc_types.hh> | |
25 | #include <seastar/rpc/lz4_compressor.hh> | |
9f95a23c | 26 | #include <seastar/rpc/lz4_fragmented_compressor.hh> |
11fdf7f2 TL |
27 | #include <seastar/rpc/multi_algo_compressor_factory.hh> |
28 | #include <seastar/testing/test_case.hh> | |
29 | #include <seastar/testing/thread_test_case.hh> | |
9f95a23c | 30 | #include <seastar/testing/test_runner.hh> |
11fdf7f2 TL |
31 | #include <seastar/core/thread.hh> |
32 | #include <seastar/core/sleep.hh> | |
9f95a23c | 33 | #include <seastar/core/distributed.hh> |
f67539c2 | 34 | #include <seastar/core/loop.hh> |
11fdf7f2 | 35 | #include <seastar/util/defer.hh> |
f67539c2 | 36 | #include <seastar/util/log.hh> |
11fdf7f2 TL |
37 | |
38 | using namespace seastar; | |
39 | ||
40 | struct serializer { | |
41 | }; | |
42 | ||
43 | template <typename T, typename Output> | |
44 | inline | |
45 | void write_arithmetic_type(Output& out, T v) { | |
46 | static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); | |
47 | return out.write(reinterpret_cast<const char*>(&v), sizeof(T)); | |
48 | } | |
49 | ||
50 | template <typename T, typename Input> | |
51 | inline | |
52 | T read_arithmetic_type(Input& in) { | |
53 | static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); | |
54 | T v; | |
55 | in.read(reinterpret_cast<char*>(&v), sizeof(T)); | |
56 | return v; | |
57 | } | |
58 | ||
59 | template <typename Output> | |
60 | inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); } | |
61 | template <typename Output> | |
62 | inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); } | |
63 | template <typename Output> | |
64 | inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); } | |
65 | template <typename Output> | |
66 | inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); } | |
67 | template <typename Output> | |
68 | inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); } | |
69 | template <typename Input> | |
70 | inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); } | |
71 | template <typename Input> | |
72 | inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); } | |
73 | template <typename Input> | |
74 | inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); } | |
75 | template <typename Input> | |
76 | inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); } | |
77 | template <typename Input> | |
78 | inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); } | |
79 | ||
80 | template <typename Output> | |
81 | inline void write(serializer, Output& out, const sstring& v) { | |
82 | write_arithmetic_type(out, uint32_t(v.size())); | |
83 | out.write(v.c_str(), v.size()); | |
84 | } | |
85 | ||
86 | template <typename Input> | |
87 | inline sstring read(serializer, Input& in, rpc::type<sstring>) { | |
88 | auto size = read_arithmetic_type<uint32_t>(in); | |
f67539c2 TL |
89 | sstring ret = uninitialized_string(size); |
90 | in.read(ret.data(), size); | |
11fdf7f2 TL |
91 | return ret; |
92 | } | |
93 | ||
94 | using test_rpc_proto = rpc::protocol<serializer>; | |
95 | using make_socket_fn = std::function<seastar::socket ()>; | |
96 | ||
97 | struct rpc_loopback_error_injector : public loopback_error_injector { | |
98 | int _x = 0; | |
99 | bool server_rcv_error() override { | |
100 | return _x++ >= 50; | |
101 | } | |
102 | }; | |
103 | ||
104 | class rpc_socket_impl : public ::net::socket_impl { | |
105 | promise<connected_socket> _p; | |
106 | bool _connect; | |
107 | loopback_socket_impl _socket; | |
108 | rpc_loopback_error_injector _error_injector; | |
109 | public: | |
110 | rpc_socket_impl(loopback_connection_factory& factory, bool connect, bool inject_error) | |
111 | : _connect(connect), | |
112 | _socket(factory, inject_error ? &_error_injector : nullptr) { | |
113 | } | |
114 | virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override { | |
115 | return _connect ? _socket.connect(sa, local, proto) : _p.get_future(); | |
116 | } | |
9f95a23c TL |
117 | virtual void set_reuseaddr(bool reuseaddr) override {} |
118 | virtual bool get_reuseaddr() const override { return false; }; | |
11fdf7f2 TL |
119 | virtual void shutdown() override { |
120 | if (_connect) { | |
121 | _socket.shutdown(); | |
122 | } else { | |
123 | _p.set_exception(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); | |
124 | } | |
125 | } | |
126 | }; | |
127 | ||
9f95a23c TL |
128 | struct rpc_test_config { |
129 | rpc::resource_limits resource_limits = {}; | |
130 | rpc::server_options server_options = {}; | |
131 | bool connect = true; | |
132 | bool inject_error = false; | |
133 | }; | |
134 | ||
135 | template<typename MsgType = int> | |
136 | class rpc_test_env { | |
137 | struct rpc_test_service { | |
138 | test_rpc_proto _proto; | |
139 | test_rpc_proto::server _server; | |
140 | std::vector<MsgType> _handlers; | |
141 | ||
142 | rpc_test_service() = delete; | |
143 | explicit rpc_test_service(const rpc_test_config& cfg, loopback_connection_factory& lcf) | |
144 | : _proto(serializer()) | |
145 | , _server(_proto, cfg.server_options, lcf.get_server_socket(), cfg.resource_limits) | |
146 | { } | |
147 | ||
148 | test_rpc_proto& proto() { | |
149 | return _proto; | |
150 | } | |
151 | ||
152 | test_rpc_proto::server& server() { | |
153 | return _server; | |
154 | } | |
155 | ||
156 | future<> stop() { | |
157 | return parallel_for_each(_handlers, [this] (auto t) { | |
158 | return proto().unregister_handler(t); | |
159 | }).finally([this] { | |
160 | return server().stop(); | |
161 | }); | |
162 | } | |
163 | ||
164 | template<typename Func> | |
165 | auto register_handler(MsgType t, scheduling_group sg, Func func) { | |
166 | _handlers.emplace_back(t); | |
167 | return proto().register_handler(t, sg, std::move(func)); | |
168 | } | |
169 | ||
170 | future<> unregister_handler(MsgType t) { | |
171 | auto it = std::find(_handlers.begin(), _handlers.end(), t); | |
172 | assert(it != _handlers.end()); | |
173 | _handlers.erase(it); | |
174 | return proto().unregister_handler(t); | |
175 | } | |
11fdf7f2 | 176 | }; |
9f95a23c TL |
177 | |
178 | rpc_test_config _cfg; | |
179 | loopback_connection_factory _lcf; | |
f67539c2 | 180 | std::unique_ptr<sharded<rpc_test_service>> _service; |
9f95a23c TL |
181 | |
182 | public: | |
183 | rpc_test_env() = delete; | |
184 | explicit rpc_test_env(rpc_test_config cfg) | |
f67539c2 | 185 | : _cfg(cfg), _service(std::make_unique<sharded<rpc_test_service>>()) |
9f95a23c TL |
186 | { |
187 | } | |
188 | ||
189 | using test_fn = std::function<future<> (rpc_test_env<MsgType>& env)>; | |
190 | static future<> do_with(rpc_test_config cfg, test_fn&& func) { | |
191 | return seastar::do_with(rpc_test_env(cfg), [func] (rpc_test_env<MsgType>& env) { | |
192 | return env.start().then([&env, func] { | |
193 | return func(env); | |
194 | }).finally([&env] { | |
195 | return env.stop(); | |
11fdf7f2 TL |
196 | }); |
197 | }); | |
9f95a23c TL |
198 | } |
199 | ||
200 | using thread_test_fn = std::function<void (rpc_test_env<MsgType>& env)>; | |
201 | static future<> do_with_thread(rpc_test_config cfg, thread_test_fn&& func) { | |
202 | return do_with(std::move(cfg), [func] (rpc_test_env<MsgType>& env) { | |
203 | return seastar::async([&env, func] { | |
204 | func(env); | |
205 | }); | |
206 | }); | |
207 | } | |
208 | ||
209 | using thread_test_fn_with_client = std::function<void (rpc_test_env<MsgType>& env, test_rpc_proto::client& cl)>; | |
210 | static future<> do_with_thread(rpc_test_config cfg, rpc::client_options co, thread_test_fn_with_client&& func) { | |
211 | return do_with(std::move(cfg), [func, co = std::move(co)] (rpc_test_env<MsgType>& env) { | |
212 | return seastar::async([&env, func, co = std::move(co)] { | |
213 | test_rpc_proto::client cl(env.proto(), co, env.make_socket(), ipv4_addr()); | |
214 | auto stop = defer([&] { cl.stop().get(); }); | |
215 | func(env, cl); | |
216 | }); | |
217 | }); | |
218 | } | |
219 | ||
220 | static future<> do_with_thread(rpc_test_config cfg, thread_test_fn_with_client&& func) { | |
221 | return do_with_thread(std::move(cfg), rpc::client_options(), std::move(func)); | |
222 | } | |
223 | ||
224 | auto make_socket() { | |
225 | return seastar::socket(std::make_unique<rpc_socket_impl>(_lcf, _cfg.connect, _cfg.inject_error)); | |
226 | }; | |
227 | ||
228 | test_rpc_proto& proto() { | |
229 | return local_service().proto(); | |
230 | } | |
231 | ||
232 | test_rpc_proto::server& server() { | |
233 | return local_service().server(); | |
234 | } | |
235 | ||
236 | template<typename Func> | |
237 | future<> register_handler(MsgType t, scheduling_group sg, Func func) { | |
f67539c2 | 238 | return _service->invoke_on_all([t, func = std::move(func), sg] (rpc_test_service& s) mutable { |
9f95a23c TL |
239 | s.register_handler(t, sg, std::move(func)); |
240 | }); | |
241 | } | |
242 | ||
243 | template<typename Func> | |
244 | future<> register_handler(MsgType t, Func func) { | |
245 | return register_handler(t, scheduling_group(), std::move(func)); | |
246 | } | |
247 | ||
248 | future<> unregister_handler(MsgType t) { | |
f67539c2 | 249 | return _service->invoke_on_all([t] (rpc_test_service& s) mutable { |
9f95a23c TL |
250 | return s.unregister_handler(t); |
251 | }); | |
252 | } | |
253 | ||
254 | private: | |
255 | rpc_test_service& local_service() { | |
f67539c2 TL |
256 | return _service->local(); |
257 | ||
9f95a23c TL |
258 | } |
259 | ||
260 | future<> start() { | |
f67539c2 | 261 | return _service->start(std::cref(_cfg), std::ref(_lcf)); |
9f95a23c TL |
262 | } |
263 | ||
264 | future<> stop() { | |
f67539c2 | 265 | return _service->stop().then([this] { |
9f95a23c TL |
266 | return _lcf.destroy_all_shards(); |
267 | }); | |
268 | } | |
269 | }; | |
11fdf7f2 TL |
270 | |
271 | struct cfactory : rpc::compressor::factory { | |
272 | mutable int use_compression = 0; | |
273 | const sstring name; | |
274 | cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {} | |
275 | const sstring& supported() const override { | |
276 | return name; | |
277 | } | |
f67539c2 TL |
278 | class mylz4 : public rpc::lz4_compressor { |
279 | sstring _name; | |
280 | public: | |
281 | mylz4(const sstring& n) : _name(n) {} | |
282 | sstring name() const override { | |
283 | return _name; | |
284 | } | |
285 | }; | |
11fdf7f2 TL |
286 | std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override { |
287 | if (feature == name) { | |
288 | use_compression++; | |
f67539c2 | 289 | return std::make_unique<mylz4>(name); |
11fdf7f2 TL |
290 | } else { |
291 | return nullptr; | |
292 | } | |
293 | } | |
294 | }; | |
9f95a23c | 295 | |
11fdf7f2 TL |
296 | SEASTAR_TEST_CASE(test_rpc_connect) { |
297 | std::vector<future<>> fs; | |
298 | ||
299 | for (auto i = 0; i < 2; i++) { | |
300 | for (auto j = 0; j < 4; j++) { | |
301 | auto factory = std::make_unique<cfactory>(); | |
302 | rpc::server_options so; | |
303 | rpc::client_options co; | |
304 | if (i == 1) { | |
305 | so.compressor_factory = factory.get(); | |
306 | } | |
307 | if (j & 1) { | |
308 | co.compressor_factory = factory.get(); | |
309 | } | |
310 | co.send_timeout_data = j & 2; | |
9f95a23c TL |
311 | rpc_test_config cfg; |
312 | cfg.server_options = so; | |
313 | auto f = rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
314 | env.register_handler(1, [](int a, int b) { | |
315 | return make_ready_future<int>(a+b); | |
316 | }).get(); | |
317 | auto sum = env.proto().make_client<int (int, int)>(1); | |
318 | auto result = sum(c1, 2, 3).get0(); | |
319 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
11fdf7f2 TL |
320 | }).handle_exception([] (auto ep) { |
321 | BOOST_FAIL("No exception expected"); | |
322 | }).finally([factory = std::move(factory), i, j = j & 1] { | |
323 | if (i == 1 && j == 1) { | |
324 | BOOST_REQUIRE_EQUAL(factory->use_compression, 2); | |
325 | } else { | |
326 | BOOST_REQUIRE_EQUAL(factory->use_compression, 0); | |
327 | } | |
328 | }); | |
329 | fs.emplace_back(std::move(f)); | |
330 | } | |
331 | } | |
332 | return when_all(fs.begin(), fs.end()).discard_result(); | |
333 | } | |
334 | ||
335 | SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) { | |
336 | auto factory1 = std::make_unique<cfactory>(); | |
337 | auto factory2 = std::make_unique<cfactory>("LZ4NEW"); | |
338 | rpc::server_options so; | |
339 | rpc::client_options co; | |
340 | static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()}); | |
341 | static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()}); | |
342 | so.compressor_factory = &server; | |
343 | co.compressor_factory = &client; | |
9f95a23c TL |
344 | rpc_test_config cfg; |
345 | cfg.server_options = so; | |
346 | return rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
347 | env.register_handler(1, [](int a, int b) { | |
348 | return make_ready_future<int>(a+b); | |
349 | }).get(); | |
350 | auto sum = env.proto().make_client<int (int, int)>(1); | |
351 | auto result = sum(c1, 2, 3).get0(); | |
352 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
11fdf7f2 TL |
353 | }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] { |
354 | BOOST_REQUIRE_EQUAL(factory1->use_compression, 0); | |
355 | BOOST_REQUIRE_EQUAL(factory2->use_compression, 2); | |
356 | }); | |
357 | } | |
358 | ||
359 | SEASTAR_TEST_CASE(test_rpc_connect_abort) { | |
9f95a23c TL |
360 | rpc_test_config cfg; |
361 | cfg.connect = false; | |
362 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { | |
363 | test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr()); | |
364 | env.register_handler(1, []() { return make_ready_future<>(); }).get(); | |
365 | auto f = env.proto().make_client<void ()>(1); | |
366 | c1.stop().get0(); | |
367 | try { | |
368 | f(c1).get0(); | |
369 | BOOST_REQUIRE(false); | |
370 | } catch (...) {} | |
11fdf7f2 TL |
371 | }); |
372 | } | |
373 | ||
374 | SEASTAR_TEST_CASE(test_rpc_cancel) { | |
375 | using namespace std::chrono_literals; | |
9f95a23c TL |
376 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { |
377 | bool rpc_executed = false; | |
378 | int good = 0; | |
379 | promise<> handler_called; | |
380 | future<> f_handler_called = handler_called.get_future(); | |
381 | env.register_handler(1, [&rpc_executed, &handler_called] { | |
382 | handler_called.set_value(); rpc_executed = true; return sleep(1ms); | |
383 | }).get(); | |
384 | auto call = env.proto().make_client<void ()>(1); | |
385 | rpc::cancellable cancel; | |
386 | auto f = call(c1, cancel); | |
387 | // cancel send side | |
388 | cancel.cancel(); | |
389 | try { | |
390 | f.get(); | |
391 | } catch(rpc::canceled_error&) { | |
392 | good += !rpc_executed; | |
393 | }; | |
394 | f = call(c1, cancel); | |
395 | // cancel wait side | |
396 | f_handler_called.then([&cancel] { | |
11fdf7f2 | 397 | cancel.cancel(); |
9f95a23c TL |
398 | }).get(); |
399 | try { | |
400 | f.get(); | |
401 | } catch(rpc::canceled_error&) { | |
402 | good += 10*rpc_executed; | |
403 | }; | |
404 | BOOST_REQUIRE_EQUAL(good, 11); | |
11fdf7f2 TL |
405 | }); |
406 | } | |
407 | ||
408 | SEASTAR_TEST_CASE(test_message_to_big) { | |
9f95a23c TL |
409 | rpc_test_config cfg; |
410 | cfg.resource_limits = {0, 1, 100}; | |
411 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env, test_rpc_proto::client& c) { | |
412 | bool good = true; | |
413 | env.register_handler(1, [&] (sstring payload) mutable { | |
414 | good = false; | |
415 | }).get(); | |
416 | auto call = env.proto().make_client<void (sstring)>(1); | |
417 | try { | |
f67539c2 | 418 | call(c, uninitialized_string(101)).get(); |
9f95a23c TL |
419 | good = false; |
420 | } catch(std::runtime_error& err) { | |
421 | } catch(...) { | |
422 | good = false; | |
423 | } | |
424 | BOOST_REQUIRE_EQUAL(good, true); | |
11fdf7f2 TL |
425 | }); |
426 | } | |
11fdf7f2 TL |
427 | |
428 | struct stream_test_result { | |
429 | bool client_source_closed = false; | |
430 | bool server_source_closed = false; | |
431 | bool sink_exception = false; | |
432 | bool sink_close_exception = false; | |
433 | bool source_done_exception = false; | |
434 | bool server_done_exception = false; | |
435 | bool client_stop_exception = false; | |
436 | int server_sum = 0; | |
437 | }; | |
438 | ||
f67539c2 TL |
439 | future<stream_test_result> stream_test_func(rpc_test_env<>& env, bool stop_client, bool expect_connection_error = false) { |
440 | return seastar::async([&env, stop_client, expect_connection_error] { | |
11fdf7f2 | 441 | stream_test_result r; |
9f95a23c | 442 | test_rpc_proto::client c(env.proto(), {}, env.make_socket(), ipv4_addr()); |
11fdf7f2 | 443 | future<> server_done = make_ready_future(); |
9f95a23c | 444 | env.register_handler(1, [&](int i, rpc::source<int> source) { |
11fdf7f2 TL |
445 | BOOST_REQUIRE_EQUAL(i, 666); |
446 | auto sink = source.make_sink<serializer, sstring>(); | |
447 | auto sink_loop = seastar::async([sink] () mutable { | |
448 | for (auto i = 0; i < 100; i++) { | |
449 | sink("seastar").get(); | |
450 | sleep(std::chrono::milliseconds(1)).get(); | |
451 | } | |
f67539c2 TL |
452 | }).finally([sink] () mutable { |
453 | return sink.flush(); | |
454 | }).finally([sink] () mutable { | |
455 | return sink.close(); | |
456 | }).finally([sink] {}); | |
457 | ||
11fdf7f2 TL |
458 | auto source_loop = seastar::async([source, &r] () mutable { |
459 | while (!r.server_source_closed) { | |
460 | auto data = source().get0(); | |
461 | if (data) { | |
462 | r.server_sum += std::get<0>(*data); | |
463 | } else { | |
464 | r.server_source_closed = true; | |
9f95a23c TL |
465 | try { |
466 | // check that reading after eos does not crash | |
467 | // and throws correct exception | |
468 | source().get(); | |
469 | } catch (rpc::stream_closed& ex) { | |
470 | // expected | |
471 | } catch (...) { | |
472 | BOOST_FAIL("wrong exception on reading from a stream after eos"); | |
473 | } | |
11fdf7f2 TL |
474 | } |
475 | } | |
476 | }); | |
477 | server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result(); | |
478 | return sink; | |
9f95a23c TL |
479 | }).get(); |
480 | auto call = env.proto().make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1); | |
11fdf7f2 TL |
481 | auto x = [&] { |
482 | try { | |
9f95a23c | 483 | return c.make_stream_sink<serializer, int>(env.make_socket()).get0(); |
11fdf7f2 TL |
484 | } catch (...) { |
485 | c.stop().get(); | |
486 | throw; | |
487 | } | |
488 | }; | |
489 | auto sink = x(); | |
490 | auto source = call(c, 666, sink).get0(); | |
491 | auto source_done = seastar::async([&] { | |
492 | while (!r.client_source_closed) { | |
493 | auto data = source().get0(); | |
494 | if (data) { | |
495 | BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar"); | |
496 | } else { | |
497 | r.client_source_closed = true; | |
498 | } | |
499 | } | |
500 | }); | |
501 | auto check_exception = [] (auto f) { | |
502 | try { | |
503 | f.get(); | |
504 | } catch (...) { | |
505 | return true; | |
506 | } | |
507 | return false; | |
508 | }; | |
509 | future<> stop_client_future = make_ready_future(); | |
f67539c2 TL |
510 | // With a connection error sink() will eventually fail, but we |
511 | // cannot guarantee when. | |
512 | int max = expect_connection_error ? std::numeric_limits<int>::max() : 101; | |
513 | for (int i = 1; i < max; i++) { | |
11fdf7f2 TL |
514 | if (stop_client && i == 50) { |
515 | // stop client while stream is in use | |
516 | stop_client_future = c.stop(); | |
517 | } | |
518 | sleep(std::chrono::milliseconds(1)).get(); | |
519 | r.sink_exception = check_exception(sink(i)); | |
520 | if (r.sink_exception) { | |
521 | break; | |
522 | } | |
523 | } | |
524 | r.sink_close_exception = check_exception(sink.close()); | |
525 | r.source_done_exception = check_exception(std::move(source_done)); | |
526 | r.server_done_exception = check_exception(std::move(server_done)); | |
527 | r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future)); | |
528 | return r; | |
529 | }); | |
530 | } | |
531 | ||
532 | SEASTAR_TEST_CASE(test_stream_simple) { | |
533 | rpc::server_options so; | |
534 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
535 | rpc_test_config cfg; |
536 | cfg.server_options = so; | |
537 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
538 | return stream_test_func(env, false).then([] (stream_test_result r) { | |
539 | BOOST_REQUIRE(r.client_source_closed); | |
540 | BOOST_REQUIRE(r.server_source_closed); | |
541 | BOOST_REQUIRE(r.server_sum == 5050); | |
542 | BOOST_REQUIRE(!r.sink_exception); | |
543 | BOOST_REQUIRE(!r.sink_close_exception); | |
544 | BOOST_REQUIRE(!r.source_done_exception); | |
545 | BOOST_REQUIRE(!r.server_done_exception); | |
546 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
547 | }); |
548 | }); | |
549 | } | |
550 | ||
551 | SEASTAR_TEST_CASE(test_stream_stop_client) { | |
552 | rpc::server_options so; | |
553 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
554 | rpc_test_config cfg; |
555 | cfg.server_options = so; | |
556 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
557 | return stream_test_func(env, true).then([] (stream_test_result r) { | |
558 | BOOST_REQUIRE(!r.client_source_closed); | |
559 | BOOST_REQUIRE(!r.server_source_closed); | |
560 | BOOST_REQUIRE(r.sink_exception); | |
561 | BOOST_REQUIRE(r.sink_close_exception); | |
562 | BOOST_REQUIRE(r.source_done_exception); | |
563 | BOOST_REQUIRE(r.server_done_exception); | |
564 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
565 | }); |
566 | }); | |
567 | } | |
568 | ||
569 | ||
570 | SEASTAR_TEST_CASE(test_stream_connection_error) { | |
571 | rpc::server_options so; | |
572 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
573 | rpc_test_config cfg; |
574 | cfg.server_options = so; | |
575 | cfg.inject_error = true; | |
576 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
f67539c2 | 577 | return stream_test_func(env, false, true).then([] (stream_test_result r) { |
9f95a23c TL |
578 | BOOST_REQUIRE(!r.client_source_closed); |
579 | BOOST_REQUIRE(!r.server_source_closed); | |
580 | BOOST_REQUIRE(r.sink_exception); | |
581 | BOOST_REQUIRE(r.sink_close_exception); | |
582 | BOOST_REQUIRE(r.source_done_exception); | |
583 | BOOST_REQUIRE(r.server_done_exception); | |
584 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
585 | }); |
586 | }); | |
587 | } | |
588 | ||
589 | SEASTAR_TEST_CASE(test_rpc_scheduling) { | |
9f95a23c TL |
590 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { |
591 | auto sg = create_scheduling_group("rpc", 100).get0(); | |
592 | env.register_handler(1, sg, [] () { | |
593 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
594 | }).get(); | |
595 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
596 | auto id = call_sg_id(c1).get0(); | |
597 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg)); | |
11fdf7f2 TL |
598 | }); |
599 | } | |
600 | ||
601 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) { | |
602 | auto sg1 = create_scheduling_group("sg1", 100).get0(); | |
603 | auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); }); | |
604 | auto sg2 = create_scheduling_group("sg2", 100).get0(); | |
605 | auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); }); | |
606 | rpc::resource_limits limits; | |
607 | limits.isolate_connection = [sg1, sg2] (sstring cookie) { | |
608 | auto sg = current_scheduling_group(); | |
609 | if (cookie == "sg1") { | |
610 | sg = sg1; | |
611 | } else if (cookie == "sg2") { | |
612 | sg = sg2; | |
613 | } | |
614 | rpc::isolation_config cfg; | |
615 | cfg.sched_group = sg; | |
616 | return cfg; | |
617 | }; | |
9f95a23c TL |
618 | rpc_test_config cfg; |
619 | cfg.resource_limits = limits; | |
620 | rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) { | |
621 | rpc::client_options co1; | |
622 | co1.isolation_cookie = "sg1"; | |
623 | test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); | |
624 | rpc::client_options co2; | |
625 | co2.isolation_cookie = "sg2"; | |
626 | test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); | |
627 | env.register_handler(1, [] { | |
628 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
629 | }).get(); | |
630 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
631 | unsigned id; | |
632 | id = call_sg_id(c1).get0(); | |
633 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
634 | id = call_sg_id(c2).get0(); | |
635 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); | |
636 | c1.stop().get(); | |
637 | c2.stop().get(); | |
11fdf7f2 TL |
638 | }).get(); |
639 | } | |
640 | ||
641 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) { | |
642 | auto sg1 = create_scheduling_group("sg1", 100).get0(); | |
643 | auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); }); | |
644 | auto sg2 = create_scheduling_group("sg2", 100).get0(); | |
645 | auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); }); | |
646 | rpc::resource_limits limits; | |
647 | limits.isolate_connection = [sg1, sg2] (sstring cookie) { | |
648 | auto sg = current_scheduling_group(); | |
649 | if (cookie == "sg1") { | |
650 | sg = sg1; | |
651 | } else if (cookie == "sg2") { | |
652 | sg = sg2; | |
653 | } | |
654 | rpc::isolation_config cfg; | |
655 | cfg.sched_group = sg; | |
656 | return cfg; | |
657 | }; | |
9f95a23c TL |
658 | rpc_test_config cfg; |
659 | cfg.resource_limits = limits; | |
660 | rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) { | |
661 | rpc::client_options co1; | |
662 | co1.isolation_cookie = "sg1"; | |
663 | test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); | |
664 | rpc::client_options co2; | |
665 | co2.isolation_cookie = "sg2"; | |
666 | test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); | |
667 | // An old client, that doesn't have an isolation cookie | |
668 | rpc::client_options co3; | |
669 | test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr()); | |
670 | // A server that uses sg1 if the client is old | |
671 | env.register_handler(1, sg1, [] () { | |
672 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
673 | }).get(); | |
674 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
675 | unsigned id; | |
676 | id = call_sg_id(c1).get0(); | |
677 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
678 | id = call_sg_id(c2).get0(); | |
679 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); | |
680 | id = call_sg_id(c3).get0(); | |
681 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
682 | c1.stop().get(); | |
683 | c2.stop().get(); | |
684 | c3.stop().get(); | |
685 | }).get(); | |
686 | } | |
687 | ||
688 | void test_compressor(std::function<std::unique_ptr<seastar::rpc::compressor>()> compressor_factory) { | |
689 | using namespace seastar::rpc; | |
690 | ||
691 | auto linearize = [&] (const auto& buffer) { | |
692 | return seastar::visit(buffer.bufs, | |
693 | [] (const temporary_buffer<char>& buf) { | |
694 | return buf.clone(); | |
695 | }, | |
696 | [&] (const std::vector<temporary_buffer<char>>& bufs) { | |
697 | auto buf = temporary_buffer<char>(buffer.size); | |
698 | auto dst = buf.get_write(); | |
699 | for (auto& b : bufs) { | |
700 | dst = std::copy_n(b.get(), b.size(), dst); | |
701 | } | |
702 | return buf; | |
703 | } | |
704 | ); | |
705 | }; | |
706 | ||
707 | auto split_buffer = [&] (temporary_buffer<char> b, size_t chunk_size) { | |
708 | std::vector<temporary_buffer<char>> bufs; | |
709 | auto src = b.get(); | |
710 | auto n = b.size(); | |
711 | while (n) { | |
712 | auto this_chunk = std::min(chunk_size, n); | |
713 | bufs.emplace_back(this_chunk); | |
714 | std::copy_n(src, this_chunk, bufs.back().get_write()); | |
715 | src += this_chunk; | |
716 | n -= this_chunk; | |
717 | } | |
718 | return bufs; | |
719 | }; | |
720 | ||
721 | auto clone = [&] (const auto& buffer) { | |
722 | auto c = std::decay_t<decltype(buffer)>(); | |
723 | c.size = buffer.size; | |
724 | c.bufs = seastar::visit(buffer.bufs, | |
725 | [] (const temporary_buffer<char>& buf) -> decltype(c.bufs) { | |
726 | return buf.clone(); | |
727 | }, | |
728 | [] (const std::vector<temporary_buffer<char>>& bufs) -> decltype(c.bufs) { | |
729 | std::vector<temporary_buffer<char>> c; | |
730 | c.reserve(bufs.size()); | |
731 | for (auto& b : bufs) { | |
732 | c.emplace_back(b.clone()); | |
733 | } | |
f67539c2 | 734 | return c; |
9f95a23c TL |
735 | } |
736 | ); | |
737 | return c; | |
738 | }; | |
739 | ||
740 | auto compressor = compressor_factory(); | |
741 | ||
742 | std::vector<std::tuple<sstring, size_t, snd_buf>> inputs; | |
743 | ||
744 | auto& eng = testing::local_random_engine; | |
745 | auto dist = std::uniform_int_distribution<char>(); | |
746 | ||
747 | auto snd = snd_buf(1); | |
748 | *snd.front().get_write() = 'a'; | |
749 | inputs.emplace_back("one byte, no headroom", 0, std::move(snd)); | |
750 | ||
751 | snd = snd_buf(1); | |
752 | *snd.front().get_write() = 'a'; | |
753 | inputs.emplace_back("one byte, 128k of headroom", 128 * 1024, std::move(snd)); | |
754 | ||
f67539c2 TL |
755 | auto gen_fill = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
756 | auto buf = temporary_buffer<char>(s); | |
757 | std::fill_n(buf.get_write(), s, 'a'); | |
9f95a23c | 758 | |
f67539c2 TL |
759 | auto snd = snd_buf(); |
760 | snd.size = s; | |
761 | if (split) { | |
762 | snd.bufs = split_buffer(buf.clone(), *split); | |
763 | } else { | |
764 | snd.bufs = buf.clone(); | |
765 | } | |
766 | inputs.emplace_back(msg, 0, std::move(snd)); | |
767 | }; | |
9f95a23c | 768 | |
f67539c2 | 769 | gen_fill(16 * 1024, "single 16 kB buffer of \'a\'"); |
9f95a23c | 770 | |
f67539c2 TL |
771 | auto gen_rand = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
772 | auto buf = temporary_buffer<char>(s); | |
773 | std::generate_n(buf.get_write(), s, [&] { return dist(eng); }); | |
9f95a23c | 774 | |
f67539c2 TL |
775 | auto snd = snd_buf(); |
776 | snd.size = s; | |
777 | if (split) { | |
778 | snd.bufs = split_buffer(buf.clone(), *split); | |
779 | } else { | |
780 | snd.bufs = buf.clone(); | |
781 | } | |
782 | inputs.emplace_back(msg, 0, std::move(snd)); | |
783 | }; | |
9f95a23c | 784 | |
f67539c2 | 785 | gen_rand(16 * 1024, "single 16 kB buffer of random"); |
9f95a23c | 786 | |
f67539c2 TL |
787 | auto gen_text = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
788 | static const std::string_view text = "The quick brown fox wants bananas for his long term health but sneaks bacon behind his wife's back. "; | |
9f95a23c | 789 | |
f67539c2 TL |
790 | auto buf = temporary_buffer<char>(s); |
791 | size_t n = 0; | |
792 | while (n < s) { | |
793 | auto rem = std::min(s - n, text.size()); | |
794 | std::copy(text.data(), text.data() + rem, buf.get_write() + n); | |
795 | n += rem; | |
796 | } | |
9f95a23c | 797 | |
f67539c2 TL |
798 | auto snd = snd_buf(); |
799 | snd.size = s; | |
800 | if (split) { | |
801 | snd.bufs = split_buffer(buf.clone(), *split); | |
802 | } else { | |
803 | snd.bufs = buf.clone(); | |
804 | } | |
805 | inputs.emplace_back(msg, 0, std::move(snd)); | |
806 | }; | |
9f95a23c | 807 | |
9f95a23c | 808 | |
f67539c2 TL |
809 | for (auto s : { 1, 4, 8 }) { |
810 | for (auto ss : { 32, 64, 128, 48, 56, 246, 511 }) { | |
811 | gen_fill(s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB - {}", s, ss, ss), ss * 1024 - ss); | |
812 | gen_fill(s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB", s, ss), ss * 1024); | |
813 | gen_rand(s * 1024 * 1024, format("{} MB buffer of random split into {} kB", s, ss), ss * 1024); | |
9f95a23c | 814 | |
f67539c2 TL |
815 | gen_fill(s * 1024 * 1024 + 1, format("{} MB + 1B buffer of \'a\' split into {} kB", s, ss), ss * 1024); |
816 | gen_rand(s * 1024 * 1024 + 1, format("{} MB + 1B buffer of random split into {} kB", s, ss), ss * 1024); | |
817 | } | |
9f95a23c | 818 | |
f67539c2 TL |
819 | for (auto ss : { 128, 246, 511, 3567, 2*1024, 8*1024 }) { |
820 | gen_fill(s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} B", s, ss), ss); | |
821 | gen_rand(s * 1024 * 1024, format("{} MB buffer of random split into {} B", s, ss), ss); | |
822 | gen_text(s * 1024 * 1024, format("{} MB buffer of text split into {} B", s, ss), ss); | |
823 | gen_fill(s * 1024 * 1024 - ss, format("{} MB - {}B buffer of \'a\' split into {} B", s, ss, ss), ss); | |
824 | gen_rand(s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss); | |
825 | gen_text(s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss); | |
826 | } | |
827 | } | |
9f95a23c | 828 | |
f67539c2 TL |
829 | for (auto s : { 64*1024 + 5670, 16*1024 + 3421, 32*1024 - 321 }) { |
830 | gen_fill(s, format("{} bytes buffer of \'a\'", s)); | |
831 | gen_rand(s, format("{} bytes buffer of random", s)); | |
832 | gen_text(s, format("{} bytes buffer of text", s)); | |
833 | } | |
9f95a23c TL |
834 | |
835 | std::vector<std::tuple<sstring, std::function<rcv_buf(snd_buf)>>> transforms { | |
836 | { "identity", [] (snd_buf snd) { | |
837 | rcv_buf rcv; | |
838 | rcv.size = snd.size; | |
839 | rcv.bufs = std::move(snd.bufs); | |
840 | return rcv; | |
841 | } }, | |
842 | { "linearized", [&linearize] (snd_buf snd) { | |
843 | rcv_buf rcv; | |
844 | rcv.size = snd.size; | |
845 | rcv.bufs = linearize(snd); | |
846 | return rcv; | |
847 | } }, | |
848 | { "split 1 B", [&] (snd_buf snd) { | |
849 | rcv_buf rcv; | |
850 | rcv.size = snd.size; | |
851 | rcv.bufs = split_buffer(linearize(snd), 1); | |
852 | return rcv; | |
853 | } }, | |
854 | { "split 129 B", [&] (snd_buf snd) { | |
855 | rcv_buf rcv; | |
856 | rcv.size = snd.size; | |
857 | rcv.bufs = split_buffer(linearize(snd), 129); | |
858 | return rcv; | |
859 | } }, | |
860 | { "split 4 kB", [&] (snd_buf snd) { | |
861 | rcv_buf rcv; | |
862 | rcv.size = snd.size; | |
863 | rcv.bufs = split_buffer(linearize(snd), 4096); | |
864 | return rcv; | |
865 | } }, | |
866 | { "split 4 kB - 128", [&] (snd_buf snd) { | |
867 | rcv_buf rcv; | |
868 | rcv.size = snd.size; | |
869 | rcv.bufs = split_buffer(linearize(snd), 4096 - 128); | |
870 | return rcv; | |
871 | } }, | |
872 | }; | |
873 | ||
874 | auto sanity_check = [&] (const auto& buffer) { | |
875 | auto actual_size = seastar::visit(buffer.bufs, | |
876 | [] (const temporary_buffer<char>& buf) { | |
877 | return buf.size(); | |
878 | }, | |
879 | [] (const std::vector<temporary_buffer<char>>& bufs) { | |
880 | return boost::accumulate(bufs, size_t(0), [] (size_t sz, const temporary_buffer<char>& buf) { | |
881 | return sz + buf.size(); | |
882 | }); | |
883 | } | |
884 | ); | |
885 | BOOST_CHECK_EQUAL(actual_size, buffer.size); | |
886 | }; | |
887 | ||
888 | for (auto& in : inputs) { | |
889 | BOOST_TEST_MESSAGE("Input: " << std::get<0>(in)); | |
890 | auto headroom = std::get<1>(in); | |
891 | auto compressed = compressor->compress(headroom, clone(std::get<2>(in))); | |
892 | sanity_check(compressed); | |
893 | ||
894 | // Remove headroom | |
895 | BOOST_CHECK_GE(compressed.size, headroom); | |
896 | compressed.size -= headroom; | |
897 | seastar::visit(compressed.bufs, | |
898 | [&] (temporary_buffer<char>& buf) { | |
899 | BOOST_CHECK_GE(buf.size(), headroom); | |
900 | buf.trim_front(headroom); | |
901 | }, | |
902 | [&] (std::vector<temporary_buffer<char>>& bufs) { | |
903 | while (headroom) { | |
904 | BOOST_CHECK(!bufs.empty()); | |
905 | auto to_remove = std::min(bufs.front().size(), headroom); | |
906 | bufs.front().trim_front(to_remove); | |
907 | if (bufs.front().empty() && bufs.size() > 1) { | |
908 | bufs.erase(bufs.begin()); | |
909 | } | |
910 | headroom -= to_remove; | |
11fdf7f2 | 911 | } |
9f95a23c TL |
912 | } |
913 | ); | |
914 | ||
915 | auto in_l = linearize(std::get<2>(in)); | |
916 | ||
917 | for (auto& t : transforms) { | |
918 | BOOST_TEST_MESSAGE(" Transform: " << std::get<0>(t)); | |
919 | auto received = std::get<1>(t)(clone(compressed)); | |
920 | ||
921 | auto decompressed = compressor->decompress(std::move(received)); | |
922 | sanity_check(decompressed); | |
923 | ||
924 | BOOST_CHECK_EQUAL(decompressed.size, std::get<2>(in).size); | |
925 | ||
926 | auto out_l = linearize(decompressed); | |
927 | ||
928 | BOOST_CHECK_EQUAL(in_l.size(), out_l.size()); | |
929 | BOOST_CHECK(in_l == out_l); | |
930 | } | |
931 | } | |
932 | } | |
933 | ||
934 | SEASTAR_THREAD_TEST_CASE(test_lz4_compressor) { | |
935 | test_compressor([] { return std::make_unique<rpc::lz4_compressor>(); }); | |
936 | } | |
937 | ||
938 | SEASTAR_THREAD_TEST_CASE(test_lz4_fragmented_compressor) { | |
939 | test_compressor([] { return std::make_unique<rpc::lz4_fragmented_compressor>(); }); | |
940 | } | |
941 | ||
942 | // Test reproducing issue #671: If timeout is time_point::max(), translating | |
943 | // it to relative timeout in the sender and then back in the receiver, when | |
944 | // these calculations happen across a millisecond boundary, overflowed the | |
945 | // integer and mislead the receiver to think the requested timeout was | |
946 | // negative, and cause it drop its response, so the RPC call never completed. | |
947 | SEASTAR_TEST_CASE(test_max_absolute_timeout) { | |
948 | // The typical failure of this test is a hang. So we use semaphore to | |
949 | // stop the test either when it succeeds, or after a long enough hang. | |
950 | auto success = make_lw_shared<bool>(false); | |
951 | auto done = make_lw_shared<semaphore>(0); | |
952 | auto abrt = make_lw_shared<abort_source>(); | |
953 | (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] { | |
954 | done->signal(1); | |
955 | }).handle_exception([] (std::exception_ptr) {}); | |
956 | rpc::client_options co; | |
957 | co.send_timeout_data = 1; | |
958 | (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
959 | env.register_handler(1, [](int a, int b) { | |
960 | return make_ready_future<int>(a+b); | |
961 | }).get(); | |
962 | auto sum = env.proto().make_client<int (int, int)>(1); | |
963 | // The bug only reproduces if the calculation done on the sender | |
964 | // and receiver sides, happened across a millisecond boundary. | |
965 | // We can't control when it happens, so we just need to loop many | |
966 | // times, at least many milliseconds, to increase the probability | |
967 | // that we catch the bug. Experimentally, if we loop for 200ms, we | |
968 | // catch the bug in #671 virtually every time. | |
969 | auto until = seastar::lowres_clock::now() + std::chrono::milliseconds(200); | |
970 | while (seastar::lowres_clock::now() <= until) { | |
971 | auto result = sum(c1, rpc::rpc_clock_type::time_point::max(), 2, 3).get0(); | |
972 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
973 | } | |
974 | }).then([success, done, abrt] { | |
975 | *success = true; | |
976 | abrt->request_abort(); | |
977 | done->signal(); | |
978 | }); | |
979 | return done->wait().then([done, success] { | |
980 | BOOST_REQUIRE(*success); | |
981 | }); | |
982 | } | |
983 | ||
984 | // Similar to the above test: Test that a relative timeout duration::max() | |
985 | // also works, and again doesn't cause the timeout wrapping around to the | |
986 | // past and causing dropped responses. | |
987 | SEASTAR_TEST_CASE(test_max_relative_timeout) { | |
988 | // The typical failure of this test is a hang. So we use semaphore to | |
989 | // stop the test either when it succeeds, or after a long enough hang. | |
990 | auto success = make_lw_shared<bool>(false); | |
991 | auto done = make_lw_shared<semaphore>(0); | |
992 | auto abrt = make_lw_shared<abort_source>(); | |
993 | (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] { | |
994 | done->signal(1); | |
995 | }).handle_exception([] (std::exception_ptr) {}); | |
996 | rpc::client_options co; | |
997 | co.send_timeout_data = 1; | |
998 | (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
999 | env.register_handler(1, [](int a, int b) { | |
1000 | return make_ready_future<int>(a+b); | |
1001 | }).get(); | |
1002 | auto sum = env.proto().make_client<int (int, int)>(1); | |
1003 | // The following call used to always hang, when max()+now() | |
1004 | // overflowed and appeared to be a negative timeout. | |
1005 | auto result = sum(c1, rpc::rpc_clock_type::duration::max(), 2, 3).get0(); | |
1006 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
1007 | }).then([success, done, abrt] { | |
1008 | *success = true; | |
1009 | abrt->request_abort(); | |
1010 | done->signal(); | |
1011 | }); | |
1012 | return done->wait().then([done, success] { | |
1013 | BOOST_REQUIRE(*success); | |
1014 | }); | |
1015 | } | |
1016 | ||
1017 | SEASTAR_TEST_CASE(test_rpc_tuple) { | |
1018 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1019 | env.register_handler(1, [] () { | |
1020 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L)); | |
1021 | }).get(); | |
1022 | auto f1 = env.proto().make_client<rpc::tuple<int, long> ()>(1); | |
1023 | auto result = f1(c1).get0(); | |
1024 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); | |
1025 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1026 | }); | |
1027 | } | |
1028 | ||
1029 | SEASTAR_TEST_CASE(test_rpc_nonvariadic_client_variadic_server) { | |
1030 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1031 | // Server is variadic | |
1032 | env.register_handler(1, [] () { | |
f67539c2 | 1033 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple(1, 0x7'0000'0000L)); |
9f95a23c TL |
1034 | }).get(); |
1035 | // Client is non-variadic | |
1036 | auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1); | |
1037 | auto result = f1(c1).get0(); | |
1038 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); | |
1039 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1040 | }); | |
1041 | } | |
1042 | ||
1043 | SEASTAR_TEST_CASE(test_rpc_variadic_client_nonvariadic_server) { | |
1044 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1045 | // Server is nonvariadic | |
1046 | env.register_handler(1, [] () { | |
1047 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L)); | |
1048 | }).get(); | |
1049 | // Client is variadic | |
f67539c2 TL |
1050 | auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1); |
1051 | auto result = f1(c1).get0(); | |
9f95a23c TL |
1052 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); |
1053 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1054 | }); | |
1055 | } | |
1056 | ||
1057 | SEASTAR_TEST_CASE(test_handler_registration) { | |
1058 | rpc_test_config cfg; | |
1059 | cfg.connect = false; | |
1060 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { | |
1061 | auto& proto = env.proto(); | |
1062 | ||
f67539c2 TL |
1063 | // new proto must be empty |
1064 | BOOST_REQUIRE(!proto.has_handlers()); | |
1065 | ||
9f95a23c TL |
1066 | // non-existing handler should not be found |
1067 | BOOST_REQUIRE(!proto.has_handler(1)); | |
1068 | ||
1069 | // unregistered non-existing handler should return ready future | |
1070 | auto fut = proto.unregister_handler(1); | |
1071 | BOOST_REQUIRE(fut.available() && !fut.failed()); | |
1072 | fut.get(); | |
1073 | ||
1074 | // existing handler should be found | |
1075 | auto handler = [] () { return make_ready_future<>(); }; | |
1076 | proto.register_handler(1, handler); | |
1077 | BOOST_REQUIRE(proto.has_handler(1)); | |
1078 | ||
1079 | // cannot register handler if already registered | |
1080 | BOOST_REQUIRE_THROW(proto.register_handler(1, handler), std::runtime_error); | |
1081 | ||
1082 | // unregistered handler should not be found | |
1083 | proto.unregister_handler(1).get(); | |
1084 | BOOST_REQUIRE(!proto.has_handler(1)); | |
1085 | ||
1086 | // re-registering a handler should succeed | |
1087 | proto.register_handler(1, handler); | |
1088 | BOOST_REQUIRE(proto.has_handler(1)); | |
f67539c2 TL |
1089 | |
1090 | // proto with handlers must not be empty | |
1091 | BOOST_REQUIRE(proto.has_handlers()); | |
9f95a23c TL |
1092 | }); |
1093 | } | |
1094 | ||
1095 | SEASTAR_TEST_CASE(test_unregister_handler) { | |
1096 | using namespace std::chrono_literals; | |
1097 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1098 | promise<> handler_called; | |
1099 | future<> f_handler_called = handler_called.get_future(); | |
1100 | bool rpc_executed = false; | |
1101 | bool rpc_completed = false; | |
1102 | ||
1103 | auto reset_state = [&f_handler_called, &rpc_executed, &rpc_completed] { | |
1104 | if (f_handler_called.available()) { | |
1105 | f_handler_called.get(); | |
1106 | } | |
1107 | rpc_executed = false; | |
1108 | rpc_completed = false; | |
1109 | }; | |
1110 | ||
1111 | auto get_handler = [&handler_called, &rpc_executed, &rpc_completed] { | |
1112 | return [&handler_called, &rpc_executed, &rpc_completed] { | |
1113 | handler_called.set_value(); | |
1114 | rpc_executed = true; | |
1115 | return sleep(1ms).then([&rpc_completed] { | |
1116 | rpc_completed = true; | |
1117 | }); | |
1118 | }; | |
1119 | }; | |
1120 | ||
1121 | // handler should not run if unregistered before being called | |
1122 | env.register_handler(1, get_handler()).get(); | |
1123 | env.unregister_handler(1).get(); | |
1124 | BOOST_REQUIRE(!f_handler_called.available()); | |
1125 | BOOST_REQUIRE(!rpc_executed); | |
1126 | BOOST_REQUIRE(!rpc_completed); | |
1127 | ||
1128 | // verify normal execution path | |
1129 | env.register_handler(1, get_handler()).get(); | |
1130 | auto call = env.proto().make_client<void ()>(1); | |
1131 | call(c1).get(); | |
1132 | BOOST_REQUIRE(f_handler_called.available()); | |
1133 | BOOST_REQUIRE(rpc_executed); | |
1134 | BOOST_REQUIRE(rpc_completed); | |
1135 | reset_state(); | |
1136 | ||
1137 | // call should fail after handler is unregistered | |
1138 | env.unregister_handler(1).get(); | |
1139 | try { | |
1140 | call(c1).get(); | |
1141 | BOOST_REQUIRE(false); | |
1142 | } catch (rpc::unknown_verb_error&) { | |
1143 | // expected | |
1144 | } catch (...) { | |
1145 | std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl; | |
1146 | BOOST_REQUIRE(false); | |
1147 | } | |
1148 | BOOST_REQUIRE(!f_handler_called.available()); | |
1149 | BOOST_REQUIRE(!rpc_executed); | |
1150 | BOOST_REQUIRE(!rpc_completed); | |
1151 | ||
1152 | // unregistered is allowed while call is in flight | |
1153 | auto delayed_unregister = [&env] { | |
1154 | return sleep(500us).then([&env] { | |
1155 | return env.unregister_handler(1); | |
11fdf7f2 | 1156 | }); |
9f95a23c TL |
1157 | }; |
1158 | ||
1159 | env.register_handler(1, get_handler()).get(); | |
1160 | try { | |
1161 | when_all_succeed(call(c1), delayed_unregister()).get(); | |
1162 | reset_state(); | |
1163 | } catch (rpc::unknown_verb_error&) { | |
1164 | // expected | |
1165 | } catch (...) { | |
1166 | std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl; | |
1167 | BOOST_REQUIRE(false); | |
1168 | } | |
1169 | }); | |
11fdf7f2 | 1170 | } |
9f95a23c | 1171 | |
f67539c2 TL |
1172 | SEASTAR_TEST_CASE(test_loggers) { |
1173 | static seastar::logger log("dummy"); | |
1174 | log.set_level(log_level::debug); | |
1175 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1176 | socket_address dummy_addr; | |
1177 | auto& proto = env.proto(); | |
1178 | auto& logger = proto.get_logger(); | |
1179 | logger(dummy_addr, "Hello0"); | |
1180 | logger(dummy_addr, log_level::debug, "Hello1"); | |
1181 | proto.set_logger(&log); | |
1182 | logger(dummy_addr, "Hello2"); | |
1183 | logger(dummy_addr, log_level::debug, "Hello3"); | |
1184 | // We *want* to test the deprecated API, don't spam warnings about it. | |
1185 | #pragma GCC diagnostic push | |
1186 | #pragma GCC diagnostic ignored "-Wdeprecated-declarations" | |
1187 | proto.set_logger([] (const sstring& str) { | |
1188 | log.info("Test: {}", str); | |
1189 | }); | |
1190 | #pragma GCC diagnostic pop | |
1191 | logger(dummy_addr, "Hello4"); | |
1192 | logger(dummy_addr, log_level::debug, "Hello5"); | |
1193 | proto.set_logger(nullptr); | |
1194 | logger(dummy_addr, "Hello6"); | |
1195 | logger(dummy_addr, log_level::debug, "Hello7"); | |
1196 | }); | |
1197 | } | |
9f95a23c TL |
1198 | |
1199 | static_assert(std::is_same_v<decltype(rpc::tuple(1U, 1L)), rpc::tuple<unsigned, long>>, "rpc::tuple deduction guid not working"); |