]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2016 ScyllaDB | |
20 | */ | |
21 | ||
11fdf7f2 TL |
22 | #include "loopback_socket.hh" |
23 | #include <seastar/rpc/rpc.hh> | |
24 | #include <seastar/rpc/rpc_types.hh> | |
25 | #include <seastar/rpc/lz4_compressor.hh> | |
9f95a23c | 26 | #include <seastar/rpc/lz4_fragmented_compressor.hh> |
11fdf7f2 TL |
27 | #include <seastar/rpc/multi_algo_compressor_factory.hh> |
28 | #include <seastar/testing/test_case.hh> | |
29 | #include <seastar/testing/thread_test_case.hh> | |
9f95a23c | 30 | #include <seastar/testing/test_runner.hh> |
11fdf7f2 TL |
31 | #include <seastar/core/thread.hh> |
32 | #include <seastar/core/sleep.hh> | |
9f95a23c | 33 | #include <seastar/core/distributed.hh> |
f67539c2 | 34 | #include <seastar/core/loop.hh> |
11fdf7f2 | 35 | #include <seastar/util/defer.hh> |
f67539c2 | 36 | #include <seastar/util/log.hh> |
20effc67 TL |
37 | #include <seastar/util/closeable.hh> |
38 | #include <seastar/util/noncopyable_function.hh> | |
11fdf7f2 TL |
39 | |
40 | using namespace seastar; | |
41 | ||
42 | struct serializer { | |
43 | }; | |
44 | ||
45 | template <typename T, typename Output> | |
46 | inline | |
47 | void write_arithmetic_type(Output& out, T v) { | |
48 | static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); | |
49 | return out.write(reinterpret_cast<const char*>(&v), sizeof(T)); | |
50 | } | |
51 | ||
52 | template <typename T, typename Input> | |
53 | inline | |
54 | T read_arithmetic_type(Input& in) { | |
55 | static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); | |
56 | T v; | |
57 | in.read(reinterpret_cast<char*>(&v), sizeof(T)); | |
58 | return v; | |
59 | } | |
60 | ||
61 | template <typename Output> | |
62 | inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); } | |
63 | template <typename Output> | |
64 | inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); } | |
65 | template <typename Output> | |
66 | inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); } | |
67 | template <typename Output> | |
68 | inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); } | |
69 | template <typename Output> | |
70 | inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); } | |
71 | template <typename Input> | |
72 | inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); } | |
73 | template <typename Input> | |
74 | inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); } | |
75 | template <typename Input> | |
76 | inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); } | |
77 | template <typename Input> | |
78 | inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); } | |
79 | template <typename Input> | |
80 | inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); } | |
81 | ||
82 | template <typename Output> | |
83 | inline void write(serializer, Output& out, const sstring& v) { | |
84 | write_arithmetic_type(out, uint32_t(v.size())); | |
85 | out.write(v.c_str(), v.size()); | |
86 | } | |
87 | ||
88 | template <typename Input> | |
89 | inline sstring read(serializer, Input& in, rpc::type<sstring>) { | |
90 | auto size = read_arithmetic_type<uint32_t>(in); | |
f67539c2 TL |
91 | sstring ret = uninitialized_string(size); |
92 | in.read(ret.data(), size); | |
11fdf7f2 TL |
93 | return ret; |
94 | } | |
95 | ||
96 | using test_rpc_proto = rpc::protocol<serializer>; | |
97 | using make_socket_fn = std::function<seastar::socket ()>; | |
98 | ||
1e59de90 TL |
99 | class rpc_loopback_error_injector : public loopback_error_injector { |
100 | public: | |
101 | struct config { | |
102 | struct { | |
103 | int limit = 0; | |
104 | error kind = error::none; | |
105 | private: | |
106 | friend class rpc_loopback_error_injector; | |
107 | int _x = 0; | |
108 | error inject() { | |
109 | return _x++ >= limit ? kind : error::none; | |
110 | } | |
111 | } server_rcv = {}, server_snd = {}, client_rcv = {}, client_snd = {}; | |
112 | error connect_kind = error::none; | |
113 | }; | |
114 | private: | |
115 | config _cfg; | |
116 | public: | |
117 | rpc_loopback_error_injector(config cfg) : _cfg(std::move(cfg)) {} | |
118 | ||
119 | error server_rcv_error() override { | |
120 | return _cfg.server_rcv.inject(); | |
121 | } | |
122 | ||
123 | error server_snd_error() override { | |
124 | return _cfg.server_snd.inject(); | |
125 | } | |
126 | ||
127 | error client_rcv_error() override { | |
128 | return _cfg.client_rcv.inject(); | |
129 | } | |
130 | ||
131 | error client_snd_error() override { | |
132 | return _cfg.client_snd.inject(); | |
133 | } | |
134 | ||
135 | error connect_error() override { | |
136 | return _cfg.connect_kind; | |
11fdf7f2 TL |
137 | } |
138 | }; | |
139 | ||
140 | class rpc_socket_impl : public ::net::socket_impl { | |
11fdf7f2 | 141 | rpc_loopback_error_injector _error_injector; |
1e59de90 | 142 | loopback_socket_impl _socket; |
11fdf7f2 | 143 | public: |
1e59de90 TL |
144 | rpc_socket_impl(loopback_connection_factory& factory, std::optional<rpc_loopback_error_injector::config> inject_error) |
145 | : | |
146 | _error_injector(inject_error.value_or(rpc_loopback_error_injector::config{})), | |
11fdf7f2 TL |
147 | _socket(factory, inject_error ? &_error_injector : nullptr) { |
148 | } | |
149 | virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override { | |
1e59de90 | 150 | return _socket.connect(sa, local, proto); |
11fdf7f2 | 151 | } |
9f95a23c TL |
152 | virtual void set_reuseaddr(bool reuseaddr) override {} |
153 | virtual bool get_reuseaddr() const override { return false; }; | |
11fdf7f2 | 154 | virtual void shutdown() override { |
1e59de90 | 155 | _socket.shutdown(); |
11fdf7f2 TL |
156 | } |
157 | }; | |
158 | ||
9f95a23c TL |
159 | struct rpc_test_config { |
160 | rpc::resource_limits resource_limits = {}; | |
161 | rpc::server_options server_options = {}; | |
1e59de90 | 162 | std::optional<rpc_loopback_error_injector::config> inject_error; |
9f95a23c TL |
163 | }; |
164 | ||
165 | template<typename MsgType = int> | |
166 | class rpc_test_env { | |
167 | struct rpc_test_service { | |
168 | test_rpc_proto _proto; | |
169 | test_rpc_proto::server _server; | |
170 | std::vector<MsgType> _handlers; | |
171 | ||
172 | rpc_test_service() = delete; | |
173 | explicit rpc_test_service(const rpc_test_config& cfg, loopback_connection_factory& lcf) | |
174 | : _proto(serializer()) | |
175 | , _server(_proto, cfg.server_options, lcf.get_server_socket(), cfg.resource_limits) | |
176 | { } | |
177 | ||
178 | test_rpc_proto& proto() { | |
179 | return _proto; | |
180 | } | |
181 | ||
182 | test_rpc_proto::server& server() { | |
183 | return _server; | |
184 | } | |
185 | ||
186 | future<> stop() { | |
187 | return parallel_for_each(_handlers, [this] (auto t) { | |
188 | return proto().unregister_handler(t); | |
189 | }).finally([this] { | |
190 | return server().stop(); | |
191 | }); | |
192 | } | |
193 | ||
194 | template<typename Func> | |
195 | auto register_handler(MsgType t, scheduling_group sg, Func func) { | |
196 | _handlers.emplace_back(t); | |
197 | return proto().register_handler(t, sg, std::move(func)); | |
198 | } | |
199 | ||
200 | future<> unregister_handler(MsgType t) { | |
201 | auto it = std::find(_handlers.begin(), _handlers.end(), t); | |
202 | assert(it != _handlers.end()); | |
203 | _handlers.erase(it); | |
204 | return proto().unregister_handler(t); | |
205 | } | |
11fdf7f2 | 206 | }; |
9f95a23c TL |
207 | |
208 | rpc_test_config _cfg; | |
209 | loopback_connection_factory _lcf; | |
f67539c2 | 210 | std::unique_ptr<sharded<rpc_test_service>> _service; |
9f95a23c TL |
211 | |
212 | public: | |
213 | rpc_test_env() = delete; | |
214 | explicit rpc_test_env(rpc_test_config cfg) | |
f67539c2 | 215 | : _cfg(cfg), _service(std::make_unique<sharded<rpc_test_service>>()) |
9f95a23c TL |
216 | { |
217 | } | |
218 | ||
219 | using test_fn = std::function<future<> (rpc_test_env<MsgType>& env)>; | |
220 | static future<> do_with(rpc_test_config cfg, test_fn&& func) { | |
221 | return seastar::do_with(rpc_test_env(cfg), [func] (rpc_test_env<MsgType>& env) { | |
222 | return env.start().then([&env, func] { | |
223 | return func(env); | |
224 | }).finally([&env] { | |
225 | return env.stop(); | |
11fdf7f2 TL |
226 | }); |
227 | }); | |
9f95a23c TL |
228 | } |
229 | ||
230 | using thread_test_fn = std::function<void (rpc_test_env<MsgType>& env)>; | |
231 | static future<> do_with_thread(rpc_test_config cfg, thread_test_fn&& func) { | |
232 | return do_with(std::move(cfg), [func] (rpc_test_env<MsgType>& env) { | |
233 | return seastar::async([&env, func] { | |
234 | func(env); | |
235 | }); | |
236 | }); | |
237 | } | |
238 | ||
239 | using thread_test_fn_with_client = std::function<void (rpc_test_env<MsgType>& env, test_rpc_proto::client& cl)>; | |
240 | static future<> do_with_thread(rpc_test_config cfg, rpc::client_options co, thread_test_fn_with_client&& func) { | |
241 | return do_with(std::move(cfg), [func, co = std::move(co)] (rpc_test_env<MsgType>& env) { | |
242 | return seastar::async([&env, func, co = std::move(co)] { | |
243 | test_rpc_proto::client cl(env.proto(), co, env.make_socket(), ipv4_addr()); | |
20effc67 | 244 | auto stop = deferred_stop(cl); |
9f95a23c TL |
245 | func(env, cl); |
246 | }); | |
247 | }); | |
248 | } | |
249 | ||
250 | static future<> do_with_thread(rpc_test_config cfg, thread_test_fn_with_client&& func) { | |
251 | return do_with_thread(std::move(cfg), rpc::client_options(), std::move(func)); | |
252 | } | |
253 | ||
254 | auto make_socket() { | |
1e59de90 | 255 | return seastar::socket(std::make_unique<rpc_socket_impl>(_lcf, _cfg.inject_error)); |
9f95a23c TL |
256 | }; |
257 | ||
258 | test_rpc_proto& proto() { | |
259 | return local_service().proto(); | |
260 | } | |
261 | ||
262 | test_rpc_proto::server& server() { | |
263 | return local_service().server(); | |
264 | } | |
265 | ||
266 | template<typename Func> | |
267 | future<> register_handler(MsgType t, scheduling_group sg, Func func) { | |
f67539c2 | 268 | return _service->invoke_on_all([t, func = std::move(func), sg] (rpc_test_service& s) mutable { |
9f95a23c TL |
269 | s.register_handler(t, sg, std::move(func)); |
270 | }); | |
271 | } | |
272 | ||
273 | template<typename Func> | |
274 | future<> register_handler(MsgType t, Func func) { | |
275 | return register_handler(t, scheduling_group(), std::move(func)); | |
276 | } | |
277 | ||
278 | future<> unregister_handler(MsgType t) { | |
f67539c2 | 279 | return _service->invoke_on_all([t] (rpc_test_service& s) mutable { |
9f95a23c TL |
280 | return s.unregister_handler(t); |
281 | }); | |
282 | } | |
283 | ||
284 | private: | |
285 | rpc_test_service& local_service() { | |
f67539c2 TL |
286 | return _service->local(); |
287 | ||
9f95a23c TL |
288 | } |
289 | ||
290 | future<> start() { | |
f67539c2 | 291 | return _service->start(std::cref(_cfg), std::ref(_lcf)); |
9f95a23c TL |
292 | } |
293 | ||
294 | future<> stop() { | |
f67539c2 | 295 | return _service->stop().then([this] { |
9f95a23c TL |
296 | return _lcf.destroy_all_shards(); |
297 | }); | |
298 | } | |
299 | }; | |
11fdf7f2 TL |
300 | |
301 | struct cfactory : rpc::compressor::factory { | |
302 | mutable int use_compression = 0; | |
303 | const sstring name; | |
304 | cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {} | |
305 | const sstring& supported() const override { | |
306 | return name; | |
307 | } | |
f67539c2 TL |
308 | class mylz4 : public rpc::lz4_compressor { |
309 | sstring _name; | |
310 | public: | |
311 | mylz4(const sstring& n) : _name(n) {} | |
312 | sstring name() const override { | |
313 | return _name; | |
314 | } | |
315 | }; | |
11fdf7f2 TL |
316 | std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override { |
317 | if (feature == name) { | |
318 | use_compression++; | |
f67539c2 | 319 | return std::make_unique<mylz4>(name); |
11fdf7f2 TL |
320 | } else { |
321 | return nullptr; | |
322 | } | |
323 | } | |
324 | }; | |
9f95a23c | 325 | |
11fdf7f2 TL |
326 | SEASTAR_TEST_CASE(test_rpc_connect) { |
327 | std::vector<future<>> fs; | |
328 | ||
329 | for (auto i = 0; i < 2; i++) { | |
330 | for (auto j = 0; j < 4; j++) { | |
331 | auto factory = std::make_unique<cfactory>(); | |
332 | rpc::server_options so; | |
333 | rpc::client_options co; | |
334 | if (i == 1) { | |
335 | so.compressor_factory = factory.get(); | |
336 | } | |
337 | if (j & 1) { | |
338 | co.compressor_factory = factory.get(); | |
339 | } | |
340 | co.send_timeout_data = j & 2; | |
9f95a23c TL |
341 | rpc_test_config cfg; |
342 | cfg.server_options = so; | |
343 | auto f = rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
344 | env.register_handler(1, [](int a, int b) { | |
345 | return make_ready_future<int>(a+b); | |
346 | }).get(); | |
347 | auto sum = env.proto().make_client<int (int, int)>(1); | |
348 | auto result = sum(c1, 2, 3).get0(); | |
349 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
11fdf7f2 TL |
350 | }).handle_exception([] (auto ep) { |
351 | BOOST_FAIL("No exception expected"); | |
352 | }).finally([factory = std::move(factory), i, j = j & 1] { | |
353 | if (i == 1 && j == 1) { | |
354 | BOOST_REQUIRE_EQUAL(factory->use_compression, 2); | |
355 | } else { | |
356 | BOOST_REQUIRE_EQUAL(factory->use_compression, 0); | |
357 | } | |
358 | }); | |
359 | fs.emplace_back(std::move(f)); | |
360 | } | |
361 | } | |
362 | return when_all(fs.begin(), fs.end()).discard_result(); | |
363 | } | |
364 | ||
365 | SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) { | |
366 | auto factory1 = std::make_unique<cfactory>(); | |
367 | auto factory2 = std::make_unique<cfactory>("LZ4NEW"); | |
368 | rpc::server_options so; | |
369 | rpc::client_options co; | |
370 | static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()}); | |
371 | static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()}); | |
372 | so.compressor_factory = &server; | |
373 | co.compressor_factory = &client; | |
9f95a23c TL |
374 | rpc_test_config cfg; |
375 | cfg.server_options = so; | |
376 | return rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
377 | env.register_handler(1, [](int a, int b) { | |
378 | return make_ready_future<int>(a+b); | |
379 | }).get(); | |
380 | auto sum = env.proto().make_client<int (int, int)>(1); | |
381 | auto result = sum(c1, 2, 3).get0(); | |
382 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
11fdf7f2 TL |
383 | }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] { |
384 | BOOST_REQUIRE_EQUAL(factory1->use_compression, 0); | |
385 | BOOST_REQUIRE_EQUAL(factory2->use_compression, 2); | |
386 | }); | |
387 | } | |
388 | ||
389 | SEASTAR_TEST_CASE(test_rpc_connect_abort) { | |
9f95a23c | 390 | rpc_test_config cfg; |
1e59de90 TL |
391 | rpc_loopback_error_injector::config ecfg; |
392 | ecfg.connect_kind = loopback_error_injector::error::abort; | |
393 | cfg.inject_error = ecfg; | |
9f95a23c TL |
394 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { |
395 | test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr()); | |
396 | env.register_handler(1, []() { return make_ready_future<>(); }).get(); | |
397 | auto f = env.proto().make_client<void ()>(1); | |
1e59de90 | 398 | auto fut = f(c1); |
9f95a23c | 399 | c1.stop().get0(); |
1e59de90 TL |
400 | try { |
401 | fut.get0(); | |
402 | BOOST_REQUIRE(false); | |
403 | } catch (...) {} | |
9f95a23c TL |
404 | try { |
405 | f(c1).get0(); | |
406 | BOOST_REQUIRE(false); | |
407 | } catch (...) {} | |
11fdf7f2 TL |
408 | }); |
409 | } | |
410 | ||
411 | SEASTAR_TEST_CASE(test_rpc_cancel) { | |
412 | using namespace std::chrono_literals; | |
9f95a23c TL |
413 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { |
414 | bool rpc_executed = false; | |
415 | int good = 0; | |
416 | promise<> handler_called; | |
417 | future<> f_handler_called = handler_called.get_future(); | |
418 | env.register_handler(1, [&rpc_executed, &handler_called] { | |
419 | handler_called.set_value(); rpc_executed = true; return sleep(1ms); | |
420 | }).get(); | |
421 | auto call = env.proto().make_client<void ()>(1); | |
1e59de90 | 422 | promise<> cont; |
9f95a23c | 423 | rpc::cancellable cancel; |
1e59de90 | 424 | c1.suspend_for_testing(cont); |
9f95a23c TL |
425 | auto f = call(c1, cancel); |
426 | // cancel send side | |
427 | cancel.cancel(); | |
1e59de90 | 428 | cont.set_value(); |
9f95a23c TL |
429 | try { |
430 | f.get(); | |
431 | } catch(rpc::canceled_error&) { | |
432 | good += !rpc_executed; | |
433 | }; | |
434 | f = call(c1, cancel); | |
435 | // cancel wait side | |
436 | f_handler_called.then([&cancel] { | |
11fdf7f2 | 437 | cancel.cancel(); |
9f95a23c TL |
438 | }).get(); |
439 | try { | |
440 | f.get(); | |
441 | } catch(rpc::canceled_error&) { | |
442 | good += 10*rpc_executed; | |
443 | }; | |
444 | BOOST_REQUIRE_EQUAL(good, 11); | |
11fdf7f2 TL |
445 | }); |
446 | } | |
447 | ||
448 | SEASTAR_TEST_CASE(test_message_to_big) { | |
9f95a23c TL |
449 | rpc_test_config cfg; |
450 | cfg.resource_limits = {0, 1, 100}; | |
451 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env, test_rpc_proto::client& c) { | |
452 | bool good = true; | |
453 | env.register_handler(1, [&] (sstring payload) mutable { | |
454 | good = false; | |
455 | }).get(); | |
456 | auto call = env.proto().make_client<void (sstring)>(1); | |
457 | try { | |
f67539c2 | 458 | call(c, uninitialized_string(101)).get(); |
9f95a23c TL |
459 | good = false; |
460 | } catch(std::runtime_error& err) { | |
461 | } catch(...) { | |
462 | good = false; | |
463 | } | |
464 | BOOST_REQUIRE_EQUAL(good, true); | |
11fdf7f2 TL |
465 | }); |
466 | } | |
11fdf7f2 | 467 | |
1e59de90 TL |
468 | SEASTAR_TEST_CASE(test_rpc_remote_verb_error) { |
469 | rpc_test_config cfg; | |
470 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { | |
471 | test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr()); | |
472 | env.register_handler(1, []() { throw std::runtime_error("error"); }).get(); | |
473 | auto f = env.proto().make_client<void ()>(1); | |
474 | BOOST_REQUIRE_THROW(f(c1).get0(), rpc::remote_verb_error); | |
475 | c1.stop().get0(); | |
476 | }); | |
477 | } | |
478 | ||
11fdf7f2 TL |
479 | struct stream_test_result { |
480 | bool client_source_closed = false; | |
481 | bool server_source_closed = false; | |
482 | bool sink_exception = false; | |
483 | bool sink_close_exception = false; | |
484 | bool source_done_exception = false; | |
485 | bool server_done_exception = false; | |
486 | bool client_stop_exception = false; | |
487 | int server_sum = 0; | |
1e59de90 | 488 | bool exception_while_creating_sink = false; |
11fdf7f2 TL |
489 | }; |
490 | ||
f67539c2 TL |
491 | future<stream_test_result> stream_test_func(rpc_test_env<>& env, bool stop_client, bool expect_connection_error = false) { |
492 | return seastar::async([&env, stop_client, expect_connection_error] { | |
11fdf7f2 | 493 | stream_test_result r; |
9f95a23c | 494 | test_rpc_proto::client c(env.proto(), {}, env.make_socket(), ipv4_addr()); |
11fdf7f2 | 495 | future<> server_done = make_ready_future(); |
9f95a23c | 496 | env.register_handler(1, [&](int i, rpc::source<int> source) { |
11fdf7f2 TL |
497 | BOOST_REQUIRE_EQUAL(i, 666); |
498 | auto sink = source.make_sink<serializer, sstring>(); | |
499 | auto sink_loop = seastar::async([sink] () mutable { | |
500 | for (auto i = 0; i < 100; i++) { | |
501 | sink("seastar").get(); | |
502 | sleep(std::chrono::milliseconds(1)).get(); | |
503 | } | |
f67539c2 TL |
504 | }).finally([sink] () mutable { |
505 | return sink.flush(); | |
506 | }).finally([sink] () mutable { | |
507 | return sink.close(); | |
508 | }).finally([sink] {}); | |
509 | ||
11fdf7f2 TL |
510 | auto source_loop = seastar::async([source, &r] () mutable { |
511 | while (!r.server_source_closed) { | |
512 | auto data = source().get0(); | |
513 | if (data) { | |
514 | r.server_sum += std::get<0>(*data); | |
515 | } else { | |
516 | r.server_source_closed = true; | |
9f95a23c TL |
517 | try { |
518 | // check that reading after eos does not crash | |
519 | // and throws correct exception | |
520 | source().get(); | |
521 | } catch (rpc::stream_closed& ex) { | |
522 | // expected | |
523 | } catch (...) { | |
524 | BOOST_FAIL("wrong exception on reading from a stream after eos"); | |
525 | } | |
11fdf7f2 TL |
526 | } |
527 | } | |
528 | }); | |
529 | server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result(); | |
530 | return sink; | |
9f95a23c TL |
531 | }).get(); |
532 | auto call = env.proto().make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1); | |
1e59de90 | 533 | struct failed_to_create_sync{}; |
11fdf7f2 TL |
534 | auto x = [&] { |
535 | try { | |
9f95a23c | 536 | return c.make_stream_sink<serializer, int>(env.make_socket()).get0(); |
11fdf7f2 TL |
537 | } catch (...) { |
538 | c.stop().get(); | |
1e59de90 | 539 | throw failed_to_create_sync{}; |
11fdf7f2 TL |
540 | } |
541 | }; | |
1e59de90 TL |
542 | try { |
543 | auto sink = x(); | |
544 | auto source = call(c, 666, sink).get0(); | |
545 | auto source_done = seastar::async([&] { | |
546 | while (!r.client_source_closed) { | |
547 | auto data = source().get0(); | |
548 | if (data) { | |
549 | BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar"); | |
550 | } else { | |
551 | r.client_source_closed = true; | |
552 | } | |
553 | } | |
554 | }); | |
555 | auto check_exception = [] (auto f) { | |
556 | try { | |
557 | f.get(); | |
558 | } catch (...) { | |
559 | return true; | |
560 | } | |
561 | return false; | |
562 | }; | |
563 | future<> stop_client_future = make_ready_future(); | |
564 | // With a connection error sink() will eventually fail, but we | |
565 | // cannot guarantee when. | |
566 | int max = expect_connection_error ? std::numeric_limits<int>::max() : 101; | |
567 | for (int i = 1; i < max; i++) { | |
568 | if (stop_client && i == 50) { | |
569 | // stop client while stream is in use | |
570 | stop_client_future = c.stop(); | |
571 | } | |
572 | sleep(std::chrono::milliseconds(1)).get(); | |
573 | r.sink_exception = check_exception(sink(i)); | |
574 | if (r.sink_exception) { | |
575 | break; | |
11fdf7f2 TL |
576 | } |
577 | } | |
1e59de90 TL |
578 | r.sink_close_exception = check_exception(sink.close()); |
579 | r.source_done_exception = check_exception(std::move(source_done)); | |
580 | r.server_done_exception = check_exception(std::move(server_done)); | |
581 | r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future)); | |
582 | return r; | |
583 | } catch(failed_to_create_sync&) { | |
584 | r.exception_while_creating_sink = true; | |
585 | return r; | |
11fdf7f2 | 586 | } |
11fdf7f2 TL |
587 | }); |
588 | } | |
589 | ||
590 | SEASTAR_TEST_CASE(test_stream_simple) { | |
591 | rpc::server_options so; | |
592 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
593 | rpc_test_config cfg; |
594 | cfg.server_options = so; | |
595 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
596 | return stream_test_func(env, false).then([] (stream_test_result r) { | |
597 | BOOST_REQUIRE(r.client_source_closed); | |
598 | BOOST_REQUIRE(r.server_source_closed); | |
599 | BOOST_REQUIRE(r.server_sum == 5050); | |
600 | BOOST_REQUIRE(!r.sink_exception); | |
601 | BOOST_REQUIRE(!r.sink_close_exception); | |
602 | BOOST_REQUIRE(!r.source_done_exception); | |
603 | BOOST_REQUIRE(!r.server_done_exception); | |
604 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
605 | }); |
606 | }); | |
607 | } | |
608 | ||
609 | SEASTAR_TEST_CASE(test_stream_stop_client) { | |
610 | rpc::server_options so; | |
611 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
612 | rpc_test_config cfg; |
613 | cfg.server_options = so; | |
614 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
615 | return stream_test_func(env, true).then([] (stream_test_result r) { | |
616 | BOOST_REQUIRE(!r.client_source_closed); | |
617 | BOOST_REQUIRE(!r.server_source_closed); | |
618 | BOOST_REQUIRE(r.sink_exception); | |
619 | BOOST_REQUIRE(r.sink_close_exception); | |
620 | BOOST_REQUIRE(r.source_done_exception); | |
621 | BOOST_REQUIRE(r.server_done_exception); | |
622 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
623 | }); |
624 | }); | |
625 | } | |
626 | ||
627 | ||
628 | SEASTAR_TEST_CASE(test_stream_connection_error) { | |
629 | rpc::server_options so; | |
630 | so.streaming_domain = rpc::streaming_domain_type(1); | |
9f95a23c TL |
631 | rpc_test_config cfg; |
632 | cfg.server_options = so; | |
1e59de90 TL |
633 | rpc_loopback_error_injector::config ecfg; |
634 | ecfg.server_rcv.limit = 50; | |
635 | ecfg.server_rcv.kind = loopback_error_injector::error::abort; | |
636 | cfg.inject_error = ecfg; | |
9f95a23c | 637 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { |
f67539c2 | 638 | return stream_test_func(env, false, true).then([] (stream_test_result r) { |
9f95a23c TL |
639 | BOOST_REQUIRE(!r.client_source_closed); |
640 | BOOST_REQUIRE(!r.server_source_closed); | |
641 | BOOST_REQUIRE(r.sink_exception); | |
642 | BOOST_REQUIRE(r.sink_close_exception); | |
643 | BOOST_REQUIRE(r.source_done_exception); | |
644 | BOOST_REQUIRE(r.server_done_exception); | |
645 | BOOST_REQUIRE(!r.client_stop_exception); | |
11fdf7f2 TL |
646 | }); |
647 | }); | |
648 | } | |
649 | ||
1e59de90 TL |
650 | SEASTAR_TEST_CASE(test_stream_negotiation_error) { |
651 | rpc::server_options so; | |
652 | so.streaming_domain = rpc::streaming_domain_type(1); | |
653 | rpc_test_config cfg; | |
654 | cfg.server_options = so; | |
655 | rpc_loopback_error_injector::config ecfg; | |
656 | ecfg.server_rcv.limit = 0; | |
657 | ecfg.server_rcv.kind = loopback_error_injector::error::abort; | |
658 | cfg.inject_error = ecfg; | |
659 | return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { | |
660 | return stream_test_func(env, false, true).then([] (stream_test_result r) { | |
661 | BOOST_REQUIRE(r.exception_while_creating_sink); | |
662 | }); | |
663 | }); | |
664 | } | |
665 | ||
11fdf7f2 | 666 | SEASTAR_TEST_CASE(test_rpc_scheduling) { |
9f95a23c TL |
667 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { |
668 | auto sg = create_scheduling_group("rpc", 100).get0(); | |
669 | env.register_handler(1, sg, [] () { | |
670 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
671 | }).get(); | |
672 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
673 | auto id = call_sg_id(c1).get0(); | |
674 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg)); | |
11fdf7f2 TL |
675 | }); |
676 | } | |
677 | ||
678 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) { | |
679 | auto sg1 = create_scheduling_group("sg1", 100).get0(); | |
20effc67 | 680 | auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); }); |
11fdf7f2 | 681 | auto sg2 = create_scheduling_group("sg2", 100).get0(); |
20effc67 | 682 | auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); }); |
11fdf7f2 TL |
683 | rpc::resource_limits limits; |
684 | limits.isolate_connection = [sg1, sg2] (sstring cookie) { | |
685 | auto sg = current_scheduling_group(); | |
686 | if (cookie == "sg1") { | |
687 | sg = sg1; | |
688 | } else if (cookie == "sg2") { | |
689 | sg = sg2; | |
690 | } | |
691 | rpc::isolation_config cfg; | |
692 | cfg.sched_group = sg; | |
693 | return cfg; | |
694 | }; | |
9f95a23c TL |
695 | rpc_test_config cfg; |
696 | cfg.resource_limits = limits; | |
697 | rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) { | |
698 | rpc::client_options co1; | |
699 | co1.isolation_cookie = "sg1"; | |
700 | test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); | |
701 | rpc::client_options co2; | |
702 | co2.isolation_cookie = "sg2"; | |
703 | test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); | |
704 | env.register_handler(1, [] { | |
705 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
706 | }).get(); | |
707 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
708 | unsigned id; | |
709 | id = call_sg_id(c1).get0(); | |
710 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
711 | id = call_sg_id(c2).get0(); | |
712 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); | |
713 | c1.stop().get(); | |
714 | c2.stop().get(); | |
11fdf7f2 TL |
715 | }).get(); |
716 | } | |
717 | ||
718 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) { | |
719 | auto sg1 = create_scheduling_group("sg1", 100).get0(); | |
20effc67 | 720 | auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); }); |
11fdf7f2 | 721 | auto sg2 = create_scheduling_group("sg2", 100).get0(); |
20effc67 | 722 | auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); }); |
11fdf7f2 TL |
723 | rpc::resource_limits limits; |
724 | limits.isolate_connection = [sg1, sg2] (sstring cookie) { | |
725 | auto sg = current_scheduling_group(); | |
726 | if (cookie == "sg1") { | |
727 | sg = sg1; | |
728 | } else if (cookie == "sg2") { | |
729 | sg = sg2; | |
730 | } | |
731 | rpc::isolation_config cfg; | |
732 | cfg.sched_group = sg; | |
733 | return cfg; | |
734 | }; | |
9f95a23c TL |
735 | rpc_test_config cfg; |
736 | cfg.resource_limits = limits; | |
737 | rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) { | |
738 | rpc::client_options co1; | |
739 | co1.isolation_cookie = "sg1"; | |
740 | test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); | |
741 | rpc::client_options co2; | |
742 | co2.isolation_cookie = "sg2"; | |
743 | test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); | |
744 | // An old client, that doesn't have an isolation cookie | |
745 | rpc::client_options co3; | |
746 | test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr()); | |
747 | // A server that uses sg1 if the client is old | |
748 | env.register_handler(1, sg1, [] () { | |
749 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
750 | }).get(); | |
751 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
752 | unsigned id; | |
753 | id = call_sg_id(c1).get0(); | |
754 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
755 | id = call_sg_id(c2).get0(); | |
756 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); | |
757 | id = call_sg_id(c3).get0(); | |
758 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
759 | c1.stop().get(); | |
760 | c2.stop().get(); | |
761 | c3.stop().get(); | |
762 | }).get(); | |
763 | } | |
764 | ||
1e59de90 TL |
765 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_async) { |
766 | scheduling_group sg1 = default_scheduling_group(); | |
767 | scheduling_group sg2 = default_scheduling_group(); | |
768 | auto sg1_kill = defer([&] () noexcept { | |
769 | if (sg1 != default_scheduling_group()) { | |
770 | destroy_scheduling_group(sg1).get(); | |
771 | } | |
772 | }); | |
773 | auto sg2_kill = defer([&] () noexcept { | |
774 | if (sg2 != default_scheduling_group()) { | |
775 | destroy_scheduling_group(sg2).get(); | |
776 | } | |
777 | }); | |
778 | rpc::resource_limits limits; | |
779 | limits.isolate_connection = [&sg1, &sg2] (sstring cookie) { | |
780 | future<seastar::scheduling_group> get_scheduling_group = make_ready_future<>().then([&sg1, &sg2, cookie] { | |
781 | if (cookie == "sg1") { | |
782 | if (sg1 == default_scheduling_group()) { | |
783 | return create_scheduling_group("sg1", 100).then([&sg1] (seastar::scheduling_group sg) { | |
784 | sg1 = sg; | |
785 | return sg; | |
786 | }); | |
787 | } else { | |
788 | return make_ready_future<seastar::scheduling_group>(sg1); | |
789 | } | |
790 | } else if (cookie == "sg2") { | |
791 | if (sg2 == default_scheduling_group()) { | |
792 | return create_scheduling_group("sg2", 100).then([&sg2] (seastar::scheduling_group sg) { | |
793 | sg2 = sg; | |
794 | return sg; | |
795 | }); | |
796 | } else { | |
797 | return make_ready_future<seastar::scheduling_group>(sg2); | |
798 | } | |
799 | } | |
800 | return make_ready_future<seastar::scheduling_group>(current_scheduling_group()); | |
801 | }); | |
802 | ||
803 | return get_scheduling_group.then([] (scheduling_group sg) { | |
804 | rpc::isolation_config cfg; | |
805 | cfg.sched_group = sg; | |
806 | return cfg; | |
807 | }); | |
808 | }; | |
809 | rpc_test_config cfg; | |
810 | cfg.resource_limits = limits; | |
811 | rpc_test_env<>::do_with_thread(cfg, [&sg1, &sg2] (rpc_test_env<>& env) { | |
812 | rpc::client_options co1; | |
813 | co1.isolation_cookie = "sg1"; | |
814 | test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); | |
815 | rpc::client_options co2; | |
816 | co2.isolation_cookie = "sg2"; | |
817 | test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); | |
818 | env.register_handler(1, [] { | |
819 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
820 | }).get(); | |
821 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
822 | unsigned id; | |
823 | id = call_sg_id(c1).get0(); | |
824 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
825 | id = call_sg_id(c2).get0(); | |
826 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); | |
827 | c1.stop().get(); | |
828 | c2.stop().get(); | |
829 | }).get(); | |
830 | } | |
831 | ||
832 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility_async) { | |
833 | scheduling_group sg1 = default_scheduling_group(); | |
834 | scheduling_group sg2 = default_scheduling_group(); | |
835 | scheduling_group sg3 = create_scheduling_group("sg3", 100).get0(); | |
836 | auto sg1_kill = defer([&] () noexcept { | |
837 | if (sg1 != default_scheduling_group()) { | |
838 | destroy_scheduling_group(sg1).get(); | |
839 | } | |
840 | }); | |
841 | auto sg2_kill = defer([&] () noexcept { | |
842 | if (sg2 != default_scheduling_group()) { | |
843 | destroy_scheduling_group(sg2).get(); | |
844 | } | |
845 | }); | |
846 | auto sg3_kill = defer([&] () noexcept { destroy_scheduling_group(sg3).get(); }); | |
847 | rpc::resource_limits limits; | |
848 | limits.isolate_connection = [&sg1, &sg2] (sstring cookie) { | |
849 | future<seastar::scheduling_group> get_scheduling_group = make_ready_future<>().then([&sg1, &sg2, cookie] { | |
850 | if (cookie == "sg1") { | |
851 | if (sg1 == default_scheduling_group()) { | |
852 | return create_scheduling_group("sg1", 100).then([&sg1] (seastar::scheduling_group sg) { | |
853 | sg1 = sg; | |
854 | return sg; | |
855 | }); | |
856 | } else { | |
857 | return make_ready_future<seastar::scheduling_group>(sg1); | |
858 | } | |
859 | } else if (cookie == "sg2") { | |
860 | if (sg2 == default_scheduling_group()) { | |
861 | return create_scheduling_group("sg2", 100).then([&sg2] (seastar::scheduling_group sg) { | |
862 | sg2 = sg; | |
863 | return sg; | |
864 | }); | |
865 | } else { | |
866 | return make_ready_future<seastar::scheduling_group>(sg2); | |
867 | } | |
868 | } | |
869 | return make_ready_future<seastar::scheduling_group>(current_scheduling_group()); | |
870 | }); | |
871 | ||
872 | return get_scheduling_group.then([] (scheduling_group sg) { | |
873 | rpc::isolation_config cfg; | |
874 | cfg.sched_group = sg; | |
875 | return cfg; | |
876 | }); | |
877 | }; | |
878 | rpc_test_config cfg; | |
879 | cfg.resource_limits = limits; | |
880 | rpc_test_env<>::do_with_thread(cfg, [&sg1, &sg2, &sg3] (rpc_test_env<>& env) { | |
881 | rpc::client_options co1; | |
882 | co1.isolation_cookie = "sg1"; | |
883 | test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); | |
884 | rpc::client_options co2; | |
885 | co2.isolation_cookie = "sg2"; | |
886 | test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); | |
887 | // An old client, that doesn't have an isolation cookie | |
888 | rpc::client_options co3; | |
889 | test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr()); | |
890 | // A server that uses sg3 if the client is old | |
891 | env.register_handler(1, sg3, [] () { | |
892 | return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group())); | |
893 | }).get(); | |
894 | auto call_sg_id = env.proto().make_client<unsigned ()>(1); | |
895 | unsigned id; | |
896 | id = call_sg_id(c1).get0(); | |
897 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); | |
898 | id = call_sg_id(c2).get0(); | |
899 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); | |
900 | id = call_sg_id(c3).get0(); | |
901 | BOOST_REQUIRE(id == internal::scheduling_group_index(sg3)); | |
902 | c1.stop().get(); | |
903 | c2.stop().get(); | |
904 | c3.stop().get(); | |
905 | }).get(); | |
906 | } | |
907 | ||
9f95a23c TL |
908 | void test_compressor(std::function<std::unique_ptr<seastar::rpc::compressor>()> compressor_factory) { |
909 | using namespace seastar::rpc; | |
910 | ||
911 | auto linearize = [&] (const auto& buffer) { | |
912 | return seastar::visit(buffer.bufs, | |
913 | [] (const temporary_buffer<char>& buf) { | |
914 | return buf.clone(); | |
915 | }, | |
916 | [&] (const std::vector<temporary_buffer<char>>& bufs) { | |
917 | auto buf = temporary_buffer<char>(buffer.size); | |
918 | auto dst = buf.get_write(); | |
919 | for (auto& b : bufs) { | |
920 | dst = std::copy_n(b.get(), b.size(), dst); | |
921 | } | |
922 | return buf; | |
923 | } | |
924 | ); | |
925 | }; | |
926 | ||
927 | auto split_buffer = [&] (temporary_buffer<char> b, size_t chunk_size) { | |
928 | std::vector<temporary_buffer<char>> bufs; | |
929 | auto src = b.get(); | |
930 | auto n = b.size(); | |
931 | while (n) { | |
932 | auto this_chunk = std::min(chunk_size, n); | |
933 | bufs.emplace_back(this_chunk); | |
934 | std::copy_n(src, this_chunk, bufs.back().get_write()); | |
935 | src += this_chunk; | |
936 | n -= this_chunk; | |
937 | } | |
938 | return bufs; | |
939 | }; | |
940 | ||
941 | auto clone = [&] (const auto& buffer) { | |
942 | auto c = std::decay_t<decltype(buffer)>(); | |
943 | c.size = buffer.size; | |
944 | c.bufs = seastar::visit(buffer.bufs, | |
945 | [] (const temporary_buffer<char>& buf) -> decltype(c.bufs) { | |
946 | return buf.clone(); | |
947 | }, | |
948 | [] (const std::vector<temporary_buffer<char>>& bufs) -> decltype(c.bufs) { | |
949 | std::vector<temporary_buffer<char>> c; | |
950 | c.reserve(bufs.size()); | |
951 | for (auto& b : bufs) { | |
952 | c.emplace_back(b.clone()); | |
953 | } | |
f67539c2 | 954 | return c; |
9f95a23c TL |
955 | } |
956 | ); | |
957 | return c; | |
958 | }; | |
959 | ||
960 | auto compressor = compressor_factory(); | |
961 | ||
20effc67 TL |
962 | std::vector<noncopyable_function<std::tuple<sstring, size_t, snd_buf> ()>> inputs; |
963 | ||
964 | auto add_input = [&] (auto func_returning_tuple) { | |
965 | inputs.emplace_back(std::move(func_returning_tuple)); | |
966 | }; | |
9f95a23c TL |
967 | |
968 | auto& eng = testing::local_random_engine; | |
1e59de90 | 969 | auto dist = std::uniform_int_distribution<int>(0, std::numeric_limits<char>::max()); |
9f95a23c TL |
970 | |
971 | auto snd = snd_buf(1); | |
972 | *snd.front().get_write() = 'a'; | |
20effc67 | 973 | add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, no headroom"), size_t(0), std::move(snd)); }); |
9f95a23c TL |
974 | |
975 | snd = snd_buf(1); | |
976 | *snd.front().get_write() = 'a'; | |
20effc67 | 977 | add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, 128k of headroom"), size_t(128 * 1024), std::move(snd)); }); |
9f95a23c | 978 | |
f67539c2 TL |
979 | auto gen_fill = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
980 | auto buf = temporary_buffer<char>(s); | |
981 | std::fill_n(buf.get_write(), s, 'a'); | |
9f95a23c | 982 | |
f67539c2 TL |
983 | auto snd = snd_buf(); |
984 | snd.size = s; | |
985 | if (split) { | |
986 | snd.bufs = split_buffer(buf.clone(), *split); | |
987 | } else { | |
988 | snd.bufs = buf.clone(); | |
989 | } | |
20effc67 | 990 | return std::tuple(msg, size_t(0), std::move(snd)); |
f67539c2 | 991 | }; |
9f95a23c | 992 | |
20effc67 TL |
993 | |
994 | add_input(std::bind(gen_fill, 16 * 1024, "single 16 kB buffer of \'a\'")); | |
9f95a23c | 995 | |
f67539c2 TL |
996 | auto gen_rand = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
997 | auto buf = temporary_buffer<char>(s); | |
998 | std::generate_n(buf.get_write(), s, [&] { return dist(eng); }); | |
9f95a23c | 999 | |
f67539c2 TL |
1000 | auto snd = snd_buf(); |
1001 | snd.size = s; | |
1002 | if (split) { | |
1003 | snd.bufs = split_buffer(buf.clone(), *split); | |
1004 | } else { | |
1005 | snd.bufs = buf.clone(); | |
1006 | } | |
20effc67 | 1007 | return std::tuple(msg, size_t(0), std::move(snd)); |
f67539c2 | 1008 | }; |
9f95a23c | 1009 | |
20effc67 | 1010 | add_input(std::bind(gen_rand, 16 * 1024, "single 16 kB buffer of random")); |
9f95a23c | 1011 | |
f67539c2 TL |
1012 | auto gen_text = [&](size_t s, sstring msg, std::optional<size_t> split = {}) { |
1013 | static const std::string_view text = "The quick brown fox wants bananas for his long term health but sneaks bacon behind his wife's back. "; | |
9f95a23c | 1014 | |
f67539c2 TL |
1015 | auto buf = temporary_buffer<char>(s); |
1016 | size_t n = 0; | |
1017 | while (n < s) { | |
1018 | auto rem = std::min(s - n, text.size()); | |
1019 | std::copy(text.data(), text.data() + rem, buf.get_write() + n); | |
1020 | n += rem; | |
1021 | } | |
9f95a23c | 1022 | |
f67539c2 TL |
1023 | auto snd = snd_buf(); |
1024 | snd.size = s; | |
1025 | if (split) { | |
1026 | snd.bufs = split_buffer(buf.clone(), *split); | |
1027 | } else { | |
1028 | snd.bufs = buf.clone(); | |
1029 | } | |
20effc67 | 1030 | return std::tuple(msg, size_t(0), std::move(snd)); |
f67539c2 | 1031 | }; |
9f95a23c | 1032 | |
1e59de90 TL |
1033 | #ifndef SEASTAR_DEBUG |
1034 | auto buffer_sizes = { 1, 4 }; | |
1035 | #else | |
1036 | auto buffer_sizes = { 1 }; | |
1037 | #endif | |
9f95a23c | 1038 | |
1e59de90 | 1039 | for (auto s : buffer_sizes) { |
f67539c2 | 1040 | for (auto ss : { 32, 64, 128, 48, 56, 246, 511 }) { |
20effc67 TL |
1041 | add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB - {}", s, ss, ss), ss * 1024 - ss)); |
1042 | add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB", s, ss), ss * 1024)); | |
1043 | add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} kB", s, ss), ss * 1024)); | |
9f95a23c | 1044 | |
20effc67 TL |
1045 | add_input(std::bind(gen_fill, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of \'a\' split into {} kB", s, ss), ss * 1024)); |
1046 | add_input(std::bind(gen_rand, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of random split into {} kB", s, ss), ss * 1024)); | |
f67539c2 | 1047 | } |
9f95a23c | 1048 | |
f67539c2 | 1049 | for (auto ss : { 128, 246, 511, 3567, 2*1024, 8*1024 }) { |
20effc67 TL |
1050 | add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} B", s, ss), ss)); |
1051 | add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} B", s, ss), ss)); | |
1052 | add_input(std::bind(gen_text, s * 1024 * 1024, format("{} MB buffer of text split into {} B", s, ss), ss)); | |
1053 | add_input(std::bind(gen_fill, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of \'a\' split into {} B", s, ss, ss), ss)); | |
1054 | add_input(std::bind(gen_rand, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss)); | |
1055 | add_input(std::bind(gen_text, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss)); | |
f67539c2 TL |
1056 | } |
1057 | } | |
9f95a23c | 1058 | |
f67539c2 | 1059 | for (auto s : { 64*1024 + 5670, 16*1024 + 3421, 32*1024 - 321 }) { |
20effc67 TL |
1060 | add_input(std::bind(gen_fill, s, format("{} bytes buffer of \'a\'", s))); |
1061 | add_input(std::bind(gen_rand, s, format("{} bytes buffer of random", s))); | |
1062 | add_input(std::bind(gen_text, s, format("{} bytes buffer of text", s))); | |
f67539c2 | 1063 | } |
9f95a23c TL |
1064 | |
1065 | std::vector<std::tuple<sstring, std::function<rcv_buf(snd_buf)>>> transforms { | |
1066 | { "identity", [] (snd_buf snd) { | |
1067 | rcv_buf rcv; | |
1068 | rcv.size = snd.size; | |
1069 | rcv.bufs = std::move(snd.bufs); | |
1070 | return rcv; | |
1071 | } }, | |
1072 | { "linearized", [&linearize] (snd_buf snd) { | |
1073 | rcv_buf rcv; | |
1074 | rcv.size = snd.size; | |
1075 | rcv.bufs = linearize(snd); | |
1076 | return rcv; | |
1077 | } }, | |
1078 | { "split 1 B", [&] (snd_buf snd) { | |
1079 | rcv_buf rcv; | |
1080 | rcv.size = snd.size; | |
1081 | rcv.bufs = split_buffer(linearize(snd), 1); | |
1082 | return rcv; | |
1083 | } }, | |
1084 | { "split 129 B", [&] (snd_buf snd) { | |
1085 | rcv_buf rcv; | |
1086 | rcv.size = snd.size; | |
1087 | rcv.bufs = split_buffer(linearize(snd), 129); | |
1088 | return rcv; | |
1089 | } }, | |
1090 | { "split 4 kB", [&] (snd_buf snd) { | |
1091 | rcv_buf rcv; | |
1092 | rcv.size = snd.size; | |
1093 | rcv.bufs = split_buffer(linearize(snd), 4096); | |
1094 | return rcv; | |
1095 | } }, | |
1096 | { "split 4 kB - 128", [&] (snd_buf snd) { | |
1097 | rcv_buf rcv; | |
1098 | rcv.size = snd.size; | |
1099 | rcv.bufs = split_buffer(linearize(snd), 4096 - 128); | |
1100 | return rcv; | |
1101 | } }, | |
1102 | }; | |
1103 | ||
1104 | auto sanity_check = [&] (const auto& buffer) { | |
1105 | auto actual_size = seastar::visit(buffer.bufs, | |
1106 | [] (const temporary_buffer<char>& buf) { | |
1107 | return buf.size(); | |
1108 | }, | |
1109 | [] (const std::vector<temporary_buffer<char>>& bufs) { | |
1110 | return boost::accumulate(bufs, size_t(0), [] (size_t sz, const temporary_buffer<char>& buf) { | |
1111 | return sz + buf.size(); | |
1112 | }); | |
1113 | } | |
1114 | ); | |
1115 | BOOST_CHECK_EQUAL(actual_size, buffer.size); | |
1116 | }; | |
1117 | ||
20effc67 TL |
1118 | for (auto& in_func : inputs) { |
1119 | auto in = in_func(); | |
9f95a23c TL |
1120 | BOOST_TEST_MESSAGE("Input: " << std::get<0>(in)); |
1121 | auto headroom = std::get<1>(in); | |
1122 | auto compressed = compressor->compress(headroom, clone(std::get<2>(in))); | |
1123 | sanity_check(compressed); | |
1124 | ||
1125 | // Remove headroom | |
1126 | BOOST_CHECK_GE(compressed.size, headroom); | |
1127 | compressed.size -= headroom; | |
1128 | seastar::visit(compressed.bufs, | |
1129 | [&] (temporary_buffer<char>& buf) { | |
1130 | BOOST_CHECK_GE(buf.size(), headroom); | |
1131 | buf.trim_front(headroom); | |
1132 | }, | |
1133 | [&] (std::vector<temporary_buffer<char>>& bufs) { | |
1134 | while (headroom) { | |
1135 | BOOST_CHECK(!bufs.empty()); | |
1136 | auto to_remove = std::min(bufs.front().size(), headroom); | |
1137 | bufs.front().trim_front(to_remove); | |
1138 | if (bufs.front().empty() && bufs.size() > 1) { | |
1139 | bufs.erase(bufs.begin()); | |
1140 | } | |
1141 | headroom -= to_remove; | |
11fdf7f2 | 1142 | } |
9f95a23c TL |
1143 | } |
1144 | ); | |
1145 | ||
1146 | auto in_l = linearize(std::get<2>(in)); | |
1147 | ||
1148 | for (auto& t : transforms) { | |
1149 | BOOST_TEST_MESSAGE(" Transform: " << std::get<0>(t)); | |
1150 | auto received = std::get<1>(t)(clone(compressed)); | |
1151 | ||
1152 | auto decompressed = compressor->decompress(std::move(received)); | |
1153 | sanity_check(decompressed); | |
1154 | ||
1155 | BOOST_CHECK_EQUAL(decompressed.size, std::get<2>(in).size); | |
1156 | ||
1157 | auto out_l = linearize(decompressed); | |
1158 | ||
1159 | BOOST_CHECK_EQUAL(in_l.size(), out_l.size()); | |
1160 | BOOST_CHECK(in_l == out_l); | |
1161 | } | |
1162 | } | |
1163 | } | |
1164 | ||
1165 | SEASTAR_THREAD_TEST_CASE(test_lz4_compressor) { | |
1166 | test_compressor([] { return std::make_unique<rpc::lz4_compressor>(); }); | |
1167 | } | |
1168 | ||
1169 | SEASTAR_THREAD_TEST_CASE(test_lz4_fragmented_compressor) { | |
1170 | test_compressor([] { return std::make_unique<rpc::lz4_fragmented_compressor>(); }); | |
1171 | } | |
1172 | ||
1173 | // Test reproducing issue #671: If timeout is time_point::max(), translating | |
1174 | // it to relative timeout in the sender and then back in the receiver, when | |
1175 | // these calculations happen across a millisecond boundary, overflowed the | |
1176 | // integer and mislead the receiver to think the requested timeout was | |
1177 | // negative, and cause it drop its response, so the RPC call never completed. | |
1178 | SEASTAR_TEST_CASE(test_max_absolute_timeout) { | |
1179 | // The typical failure of this test is a hang. So we use semaphore to | |
1180 | // stop the test either when it succeeds, or after a long enough hang. | |
1181 | auto success = make_lw_shared<bool>(false); | |
1182 | auto done = make_lw_shared<semaphore>(0); | |
1183 | auto abrt = make_lw_shared<abort_source>(); | |
1184 | (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] { | |
1185 | done->signal(1); | |
1186 | }).handle_exception([] (std::exception_ptr) {}); | |
1187 | rpc::client_options co; | |
1188 | co.send_timeout_data = 1; | |
1189 | (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1190 | env.register_handler(1, [](int a, int b) { | |
1191 | return make_ready_future<int>(a+b); | |
1192 | }).get(); | |
1193 | auto sum = env.proto().make_client<int (int, int)>(1); | |
1194 | // The bug only reproduces if the calculation done on the sender | |
1195 | // and receiver sides, happened across a millisecond boundary. | |
1196 | // We can't control when it happens, so we just need to loop many | |
1197 | // times, at least many milliseconds, to increase the probability | |
1198 | // that we catch the bug. Experimentally, if we loop for 200ms, we | |
1199 | // catch the bug in #671 virtually every time. | |
1200 | auto until = seastar::lowres_clock::now() + std::chrono::milliseconds(200); | |
1201 | while (seastar::lowres_clock::now() <= until) { | |
1202 | auto result = sum(c1, rpc::rpc_clock_type::time_point::max(), 2, 3).get0(); | |
1203 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
1204 | } | |
1205 | }).then([success, done, abrt] { | |
1206 | *success = true; | |
1207 | abrt->request_abort(); | |
1208 | done->signal(); | |
1209 | }); | |
1210 | return done->wait().then([done, success] { | |
1211 | BOOST_REQUIRE(*success); | |
1212 | }); | |
1213 | } | |
1214 | ||
1215 | // Similar to the above test: Test that a relative timeout duration::max() | |
1216 | // also works, and again doesn't cause the timeout wrapping around to the | |
1217 | // past and causing dropped responses. | |
1218 | SEASTAR_TEST_CASE(test_max_relative_timeout) { | |
1219 | // The typical failure of this test is a hang. So we use semaphore to | |
1220 | // stop the test either when it succeeds, or after a long enough hang. | |
1221 | auto success = make_lw_shared<bool>(false); | |
1222 | auto done = make_lw_shared<semaphore>(0); | |
1223 | auto abrt = make_lw_shared<abort_source>(); | |
1224 | (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] { | |
1225 | done->signal(1); | |
1226 | }).handle_exception([] (std::exception_ptr) {}); | |
1227 | rpc::client_options co; | |
1228 | co.send_timeout_data = 1; | |
1229 | (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1230 | env.register_handler(1, [](int a, int b) { | |
1231 | return make_ready_future<int>(a+b); | |
1232 | }).get(); | |
1233 | auto sum = env.proto().make_client<int (int, int)>(1); | |
1234 | // The following call used to always hang, when max()+now() | |
1235 | // overflowed and appeared to be a negative timeout. | |
1236 | auto result = sum(c1, rpc::rpc_clock_type::duration::max(), 2, 3).get0(); | |
1237 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
1238 | }).then([success, done, abrt] { | |
1239 | *success = true; | |
1240 | abrt->request_abort(); | |
1241 | done->signal(); | |
1242 | }); | |
1243 | return done->wait().then([done, success] { | |
1244 | BOOST_REQUIRE(*success); | |
1245 | }); | |
1246 | } | |
1247 | ||
1248 | SEASTAR_TEST_CASE(test_rpc_tuple) { | |
1249 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1250 | env.register_handler(1, [] () { | |
1251 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L)); | |
1252 | }).get(); | |
1253 | auto f1 = env.proto().make_client<rpc::tuple<int, long> ()>(1); | |
1254 | auto result = f1(c1).get0(); | |
1255 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); | |
1256 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1257 | }); | |
1258 | } | |
1259 | ||
1260 | SEASTAR_TEST_CASE(test_rpc_nonvariadic_client_variadic_server) { | |
1261 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1262 | // Server is variadic | |
1263 | env.register_handler(1, [] () { | |
f67539c2 | 1264 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple(1, 0x7'0000'0000L)); |
9f95a23c TL |
1265 | }).get(); |
1266 | // Client is non-variadic | |
1267 | auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1); | |
1268 | auto result = f1(c1).get0(); | |
1269 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); | |
1270 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1271 | }); | |
1272 | } | |
1273 | ||
1274 | SEASTAR_TEST_CASE(test_rpc_variadic_client_nonvariadic_server) { | |
1275 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1276 | // Server is nonvariadic | |
1277 | env.register_handler(1, [] () { | |
1278 | return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L)); | |
1279 | }).get(); | |
1280 | // Client is variadic | |
f67539c2 TL |
1281 | auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1); |
1282 | auto result = f1(c1).get0(); | |
9f95a23c TL |
1283 | BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); |
1284 | BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); | |
1285 | }); | |
1286 | } | |
1287 | ||
1288 | SEASTAR_TEST_CASE(test_handler_registration) { | |
1289 | rpc_test_config cfg; | |
1e59de90 TL |
1290 | rpc_loopback_error_injector::config ecfg; |
1291 | ecfg.connect_kind = loopback_error_injector::error::abort; | |
1292 | cfg.inject_error = ecfg; | |
9f95a23c TL |
1293 | return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { |
1294 | auto& proto = env.proto(); | |
1295 | ||
f67539c2 TL |
1296 | // new proto must be empty |
1297 | BOOST_REQUIRE(!proto.has_handlers()); | |
1298 | ||
9f95a23c TL |
1299 | // non-existing handler should not be found |
1300 | BOOST_REQUIRE(!proto.has_handler(1)); | |
1301 | ||
1302 | // unregistered non-existing handler should return ready future | |
1303 | auto fut = proto.unregister_handler(1); | |
1304 | BOOST_REQUIRE(fut.available() && !fut.failed()); | |
1305 | fut.get(); | |
1306 | ||
1307 | // existing handler should be found | |
1308 | auto handler = [] () { return make_ready_future<>(); }; | |
1309 | proto.register_handler(1, handler); | |
1310 | BOOST_REQUIRE(proto.has_handler(1)); | |
1311 | ||
1312 | // cannot register handler if already registered | |
1313 | BOOST_REQUIRE_THROW(proto.register_handler(1, handler), std::runtime_error); | |
1314 | ||
1315 | // unregistered handler should not be found | |
1316 | proto.unregister_handler(1).get(); | |
1317 | BOOST_REQUIRE(!proto.has_handler(1)); | |
1318 | ||
1319 | // re-registering a handler should succeed | |
1320 | proto.register_handler(1, handler); | |
1321 | BOOST_REQUIRE(proto.has_handler(1)); | |
f67539c2 TL |
1322 | |
1323 | // proto with handlers must not be empty | |
1324 | BOOST_REQUIRE(proto.has_handlers()); | |
9f95a23c TL |
1325 | }); |
1326 | } | |
1327 | ||
1328 | SEASTAR_TEST_CASE(test_unregister_handler) { | |
1329 | using namespace std::chrono_literals; | |
1330 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1331 | promise<> handler_called; | |
1332 | future<> f_handler_called = handler_called.get_future(); | |
1333 | bool rpc_executed = false; | |
1334 | bool rpc_completed = false; | |
1335 | ||
1336 | auto reset_state = [&f_handler_called, &rpc_executed, &rpc_completed] { | |
1337 | if (f_handler_called.available()) { | |
1338 | f_handler_called.get(); | |
1339 | } | |
1340 | rpc_executed = false; | |
1341 | rpc_completed = false; | |
1342 | }; | |
1343 | ||
1344 | auto get_handler = [&handler_called, &rpc_executed, &rpc_completed] { | |
1345 | return [&handler_called, &rpc_executed, &rpc_completed] { | |
1346 | handler_called.set_value(); | |
1347 | rpc_executed = true; | |
1348 | return sleep(1ms).then([&rpc_completed] { | |
1349 | rpc_completed = true; | |
1350 | }); | |
1351 | }; | |
1352 | }; | |
1353 | ||
1354 | // handler should not run if unregistered before being called | |
1355 | env.register_handler(1, get_handler()).get(); | |
1356 | env.unregister_handler(1).get(); | |
1357 | BOOST_REQUIRE(!f_handler_called.available()); | |
1358 | BOOST_REQUIRE(!rpc_executed); | |
1359 | BOOST_REQUIRE(!rpc_completed); | |
1360 | ||
1361 | // verify normal execution path | |
1362 | env.register_handler(1, get_handler()).get(); | |
1363 | auto call = env.proto().make_client<void ()>(1); | |
1364 | call(c1).get(); | |
1365 | BOOST_REQUIRE(f_handler_called.available()); | |
1366 | BOOST_REQUIRE(rpc_executed); | |
1367 | BOOST_REQUIRE(rpc_completed); | |
1368 | reset_state(); | |
1369 | ||
1370 | // call should fail after handler is unregistered | |
1371 | env.unregister_handler(1).get(); | |
1372 | try { | |
1373 | call(c1).get(); | |
1374 | BOOST_REQUIRE(false); | |
1375 | } catch (rpc::unknown_verb_error&) { | |
1376 | // expected | |
1377 | } catch (...) { | |
1378 | std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl; | |
1379 | BOOST_REQUIRE(false); | |
1380 | } | |
1381 | BOOST_REQUIRE(!f_handler_called.available()); | |
1382 | BOOST_REQUIRE(!rpc_executed); | |
1383 | BOOST_REQUIRE(!rpc_completed); | |
1384 | ||
1385 | // unregistered is allowed while call is in flight | |
1386 | auto delayed_unregister = [&env] { | |
1387 | return sleep(500us).then([&env] { | |
1388 | return env.unregister_handler(1); | |
11fdf7f2 | 1389 | }); |
9f95a23c TL |
1390 | }; |
1391 | ||
1392 | env.register_handler(1, get_handler()).get(); | |
1393 | try { | |
1394 | when_all_succeed(call(c1), delayed_unregister()).get(); | |
1395 | reset_state(); | |
1396 | } catch (rpc::unknown_verb_error&) { | |
1397 | // expected | |
1398 | } catch (...) { | |
1399 | std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl; | |
1400 | BOOST_REQUIRE(false); | |
1401 | } | |
1402 | }); | |
11fdf7f2 | 1403 | } |
9f95a23c | 1404 | |
f67539c2 TL |
1405 | SEASTAR_TEST_CASE(test_loggers) { |
1406 | static seastar::logger log("dummy"); | |
1407 | log.set_level(log_level::debug); | |
1408 | return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { | |
1409 | socket_address dummy_addr; | |
1410 | auto& proto = env.proto(); | |
1411 | auto& logger = proto.get_logger(); | |
1412 | logger(dummy_addr, "Hello0"); | |
1413 | logger(dummy_addr, log_level::debug, "Hello1"); | |
1414 | proto.set_logger(&log); | |
1415 | logger(dummy_addr, "Hello2"); | |
1416 | logger(dummy_addr, log_level::debug, "Hello3"); | |
1417 | // We *want* to test the deprecated API, don't spam warnings about it. | |
1418 | #pragma GCC diagnostic push | |
1419 | #pragma GCC diagnostic ignored "-Wdeprecated-declarations" | |
1420 | proto.set_logger([] (const sstring& str) { | |
1421 | log.info("Test: {}", str); | |
1422 | }); | |
1423 | #pragma GCC diagnostic pop | |
1424 | logger(dummy_addr, "Hello4"); | |
1425 | logger(dummy_addr, log_level::debug, "Hello5"); | |
1426 | proto.set_logger(nullptr); | |
1427 | logger(dummy_addr, "Hello6"); | |
1428 | logger(dummy_addr, log_level::debug, "Hello7"); | |
1429 | }); | |
1430 | } | |
9f95a23c | 1431 | |
1e59de90 TL |
1432 | SEASTAR_TEST_CASE(test_connection_id_format) { |
1433 | rpc::connection_id cid = rpc::connection_id::make_id(0x123, 1); | |
1434 | std::string res = format("{}", cid); | |
1435 | BOOST_REQUIRE_EQUAL(res, "1230001"); | |
1436 | return make_ready_future<>(); | |
1437 | } | |
1438 | ||
9f95a23c | 1439 | static_assert(std::is_same_v<decltype(rpc::tuple(1U, 1L)), rpc::tuple<unsigned, long>>, "rpc::tuple deduction guid not working"); |
1e59de90 TL |
1440 | |
1441 | SEASTAR_TEST_CASE(test_client_info) { | |
1442 | rpc::client_info info; | |
1443 | const rpc::client_info& const_info = *const_cast<rpc::client_info*>(&info); | |
1444 | ||
1445 | info.attach_auxiliary("key", 0); | |
1446 | BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary<int>("key"), 0); | |
1447 | info.retrieve_auxiliary<int>("key") = 1; | |
1448 | BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary<int>("key"), 1); | |
1449 | ||
1450 | BOOST_REQUIRE_EQUAL(info.retrieve_auxiliary_opt<int>("key"), &info.retrieve_auxiliary<int>("key")); | |
1451 | BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary_opt<int>("key"), &const_info.retrieve_auxiliary<int>("key")); | |
1452 | ||
1453 | BOOST_REQUIRE_EQUAL(info.retrieve_auxiliary_opt<int>("missing"), nullptr); | |
1454 | BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary_opt<int>("missing"), nullptr); | |
1455 | ||
1456 | return make_ready_future<>(); | |
1457 | } |