]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/tests/unit/rpc_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / tests / unit / rpc_test.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 ScyllaDB
20 */
21
11fdf7f2
TL
22#include "loopback_socket.hh"
23#include <seastar/rpc/rpc.hh>
24#include <seastar/rpc/rpc_types.hh>
25#include <seastar/rpc/lz4_compressor.hh>
9f95a23c 26#include <seastar/rpc/lz4_fragmented_compressor.hh>
11fdf7f2
TL
27#include <seastar/rpc/multi_algo_compressor_factory.hh>
28#include <seastar/testing/test_case.hh>
29#include <seastar/testing/thread_test_case.hh>
9f95a23c 30#include <seastar/testing/test_runner.hh>
11fdf7f2
TL
31#include <seastar/core/thread.hh>
32#include <seastar/core/sleep.hh>
9f95a23c 33#include <seastar/core/distributed.hh>
f67539c2 34#include <seastar/core/loop.hh>
11fdf7f2 35#include <seastar/util/defer.hh>
f67539c2 36#include <seastar/util/log.hh>
20effc67
TL
37#include <seastar/util/closeable.hh>
38#include <seastar/util/noncopyable_function.hh>
11fdf7f2
TL
39
40using namespace seastar;
41
42struct serializer {
43};
44
45template <typename T, typename Output>
46inline
47void write_arithmetic_type(Output& out, T v) {
48 static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
49 return out.write(reinterpret_cast<const char*>(&v), sizeof(T));
50}
51
52template <typename T, typename Input>
53inline
54T read_arithmetic_type(Input& in) {
55 static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
56 T v;
57 in.read(reinterpret_cast<char*>(&v), sizeof(T));
58 return v;
59}
60
61template <typename Output>
62inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); }
63template <typename Output>
64inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); }
65template <typename Output>
66inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); }
67template <typename Output>
68inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); }
69template <typename Output>
70inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); }
71template <typename Input>
72inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); }
73template <typename Input>
74inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); }
75template <typename Input>
76inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); }
77template <typename Input>
78inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); }
79template <typename Input>
80inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); }
81
82template <typename Output>
83inline void write(serializer, Output& out, const sstring& v) {
84 write_arithmetic_type(out, uint32_t(v.size()));
85 out.write(v.c_str(), v.size());
86}
87
88template <typename Input>
89inline sstring read(serializer, Input& in, rpc::type<sstring>) {
90 auto size = read_arithmetic_type<uint32_t>(in);
f67539c2
TL
91 sstring ret = uninitialized_string(size);
92 in.read(ret.data(), size);
11fdf7f2
TL
93 return ret;
94}
95
96using test_rpc_proto = rpc::protocol<serializer>;
97using make_socket_fn = std::function<seastar::socket ()>;
98
99struct rpc_loopback_error_injector : public loopback_error_injector {
100 int _x = 0;
101 bool server_rcv_error() override {
102 return _x++ >= 50;
103 }
104};
105
106class rpc_socket_impl : public ::net::socket_impl {
107 promise<connected_socket> _p;
108 bool _connect;
109 loopback_socket_impl _socket;
110 rpc_loopback_error_injector _error_injector;
111public:
112 rpc_socket_impl(loopback_connection_factory& factory, bool connect, bool inject_error)
113 : _connect(connect),
114 _socket(factory, inject_error ? &_error_injector : nullptr) {
115 }
116 virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override {
117 return _connect ? _socket.connect(sa, local, proto) : _p.get_future();
118 }
9f95a23c
TL
119 virtual void set_reuseaddr(bool reuseaddr) override {}
120 virtual bool get_reuseaddr() const override { return false; };
11fdf7f2
TL
121 virtual void shutdown() override {
122 if (_connect) {
123 _socket.shutdown();
124 } else {
125 _p.set_exception(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
126 }
127 }
128};
129
9f95a23c
TL
130struct rpc_test_config {
131 rpc::resource_limits resource_limits = {};
132 rpc::server_options server_options = {};
133 bool connect = true;
134 bool inject_error = false;
135};
136
137template<typename MsgType = int>
138class rpc_test_env {
139 struct rpc_test_service {
140 test_rpc_proto _proto;
141 test_rpc_proto::server _server;
142 std::vector<MsgType> _handlers;
143
144 rpc_test_service() = delete;
145 explicit rpc_test_service(const rpc_test_config& cfg, loopback_connection_factory& lcf)
146 : _proto(serializer())
147 , _server(_proto, cfg.server_options, lcf.get_server_socket(), cfg.resource_limits)
148 { }
149
150 test_rpc_proto& proto() {
151 return _proto;
152 }
153
154 test_rpc_proto::server& server() {
155 return _server;
156 }
157
158 future<> stop() {
159 return parallel_for_each(_handlers, [this] (auto t) {
160 return proto().unregister_handler(t);
161 }).finally([this] {
162 return server().stop();
163 });
164 }
165
166 template<typename Func>
167 auto register_handler(MsgType t, scheduling_group sg, Func func) {
168 _handlers.emplace_back(t);
169 return proto().register_handler(t, sg, std::move(func));
170 }
171
172 future<> unregister_handler(MsgType t) {
173 auto it = std::find(_handlers.begin(), _handlers.end(), t);
174 assert(it != _handlers.end());
175 _handlers.erase(it);
176 return proto().unregister_handler(t);
177 }
11fdf7f2 178 };
9f95a23c
TL
179
180 rpc_test_config _cfg;
181 loopback_connection_factory _lcf;
f67539c2 182 std::unique_ptr<sharded<rpc_test_service>> _service;
9f95a23c
TL
183
184public:
185 rpc_test_env() = delete;
186 explicit rpc_test_env(rpc_test_config cfg)
f67539c2 187 : _cfg(cfg), _service(std::make_unique<sharded<rpc_test_service>>())
9f95a23c
TL
188 {
189 }
190
191 using test_fn = std::function<future<> (rpc_test_env<MsgType>& env)>;
192 static future<> do_with(rpc_test_config cfg, test_fn&& func) {
193 return seastar::do_with(rpc_test_env(cfg), [func] (rpc_test_env<MsgType>& env) {
194 return env.start().then([&env, func] {
195 return func(env);
196 }).finally([&env] {
197 return env.stop();
11fdf7f2
TL
198 });
199 });
9f95a23c
TL
200 }
201
202 using thread_test_fn = std::function<void (rpc_test_env<MsgType>& env)>;
203 static future<> do_with_thread(rpc_test_config cfg, thread_test_fn&& func) {
204 return do_with(std::move(cfg), [func] (rpc_test_env<MsgType>& env) {
205 return seastar::async([&env, func] {
206 func(env);
207 });
208 });
209 }
210
211 using thread_test_fn_with_client = std::function<void (rpc_test_env<MsgType>& env, test_rpc_proto::client& cl)>;
212 static future<> do_with_thread(rpc_test_config cfg, rpc::client_options co, thread_test_fn_with_client&& func) {
213 return do_with(std::move(cfg), [func, co = std::move(co)] (rpc_test_env<MsgType>& env) {
214 return seastar::async([&env, func, co = std::move(co)] {
215 test_rpc_proto::client cl(env.proto(), co, env.make_socket(), ipv4_addr());
20effc67 216 auto stop = deferred_stop(cl);
9f95a23c
TL
217 func(env, cl);
218 });
219 });
220 }
221
222 static future<> do_with_thread(rpc_test_config cfg, thread_test_fn_with_client&& func) {
223 return do_with_thread(std::move(cfg), rpc::client_options(), std::move(func));
224 }
225
226 auto make_socket() {
227 return seastar::socket(std::make_unique<rpc_socket_impl>(_lcf, _cfg.connect, _cfg.inject_error));
228 };
229
230 test_rpc_proto& proto() {
231 return local_service().proto();
232 }
233
234 test_rpc_proto::server& server() {
235 return local_service().server();
236 }
237
238 template<typename Func>
239 future<> register_handler(MsgType t, scheduling_group sg, Func func) {
f67539c2 240 return _service->invoke_on_all([t, func = std::move(func), sg] (rpc_test_service& s) mutable {
9f95a23c
TL
241 s.register_handler(t, sg, std::move(func));
242 });
243 }
244
245 template<typename Func>
246 future<> register_handler(MsgType t, Func func) {
247 return register_handler(t, scheduling_group(), std::move(func));
248 }
249
250 future<> unregister_handler(MsgType t) {
f67539c2 251 return _service->invoke_on_all([t] (rpc_test_service& s) mutable {
9f95a23c
TL
252 return s.unregister_handler(t);
253 });
254 }
255
256private:
257 rpc_test_service& local_service() {
f67539c2
TL
258 return _service->local();
259
9f95a23c
TL
260 }
261
262 future<> start() {
f67539c2 263 return _service->start(std::cref(_cfg), std::ref(_lcf));
9f95a23c
TL
264 }
265
266 future<> stop() {
f67539c2 267 return _service->stop().then([this] {
9f95a23c
TL
268 return _lcf.destroy_all_shards();
269 });
270 }
271};
11fdf7f2
TL
272
273struct cfactory : rpc::compressor::factory {
274 mutable int use_compression = 0;
275 const sstring name;
276 cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {}
277 const sstring& supported() const override {
278 return name;
279 }
f67539c2
TL
280 class mylz4 : public rpc::lz4_compressor {
281 sstring _name;
282 public:
283 mylz4(const sstring& n) : _name(n) {}
284 sstring name() const override {
285 return _name;
286 }
287 };
11fdf7f2
TL
288 std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override {
289 if (feature == name) {
290 use_compression++;
f67539c2 291 return std::make_unique<mylz4>(name);
11fdf7f2
TL
292 } else {
293 return nullptr;
294 }
295 }
296};
9f95a23c 297
11fdf7f2
TL
298SEASTAR_TEST_CASE(test_rpc_connect) {
299 std::vector<future<>> fs;
300
301 for (auto i = 0; i < 2; i++) {
302 for (auto j = 0; j < 4; j++) {
303 auto factory = std::make_unique<cfactory>();
304 rpc::server_options so;
305 rpc::client_options co;
306 if (i == 1) {
307 so.compressor_factory = factory.get();
308 }
309 if (j & 1) {
310 co.compressor_factory = factory.get();
311 }
312 co.send_timeout_data = j & 2;
9f95a23c
TL
313 rpc_test_config cfg;
314 cfg.server_options = so;
315 auto f = rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
316 env.register_handler(1, [](int a, int b) {
317 return make_ready_future<int>(a+b);
318 }).get();
319 auto sum = env.proto().make_client<int (int, int)>(1);
320 auto result = sum(c1, 2, 3).get0();
321 BOOST_REQUIRE_EQUAL(result, 2 + 3);
11fdf7f2
TL
322 }).handle_exception([] (auto ep) {
323 BOOST_FAIL("No exception expected");
324 }).finally([factory = std::move(factory), i, j = j & 1] {
325 if (i == 1 && j == 1) {
326 BOOST_REQUIRE_EQUAL(factory->use_compression, 2);
327 } else {
328 BOOST_REQUIRE_EQUAL(factory->use_compression, 0);
329 }
330 });
331 fs.emplace_back(std::move(f));
332 }
333 }
334 return when_all(fs.begin(), fs.end()).discard_result();
335}
336
337SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) {
338 auto factory1 = std::make_unique<cfactory>();
339 auto factory2 = std::make_unique<cfactory>("LZ4NEW");
340 rpc::server_options so;
341 rpc::client_options co;
342 static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()});
343 static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()});
344 so.compressor_factory = &server;
345 co.compressor_factory = &client;
9f95a23c
TL
346 rpc_test_config cfg;
347 cfg.server_options = so;
348 return rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
349 env.register_handler(1, [](int a, int b) {
350 return make_ready_future<int>(a+b);
351 }).get();
352 auto sum = env.proto().make_client<int (int, int)>(1);
353 auto result = sum(c1, 2, 3).get0();
354 BOOST_REQUIRE_EQUAL(result, 2 + 3);
11fdf7f2
TL
355 }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] {
356 BOOST_REQUIRE_EQUAL(factory1->use_compression, 0);
357 BOOST_REQUIRE_EQUAL(factory2->use_compression, 2);
358 });
359}
360
361SEASTAR_TEST_CASE(test_rpc_connect_abort) {
9f95a23c
TL
362 rpc_test_config cfg;
363 cfg.connect = false;
364 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) {
365 test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr());
366 env.register_handler(1, []() { return make_ready_future<>(); }).get();
367 auto f = env.proto().make_client<void ()>(1);
368 c1.stop().get0();
369 try {
370 f(c1).get0();
371 BOOST_REQUIRE(false);
372 } catch (...) {}
11fdf7f2
TL
373 });
374}
375
376SEASTAR_TEST_CASE(test_rpc_cancel) {
377 using namespace std::chrono_literals;
9f95a23c
TL
378 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
379 bool rpc_executed = false;
380 int good = 0;
381 promise<> handler_called;
382 future<> f_handler_called = handler_called.get_future();
383 env.register_handler(1, [&rpc_executed, &handler_called] {
384 handler_called.set_value(); rpc_executed = true; return sleep(1ms);
385 }).get();
386 auto call = env.proto().make_client<void ()>(1);
387 rpc::cancellable cancel;
388 auto f = call(c1, cancel);
389 // cancel send side
390 cancel.cancel();
391 try {
392 f.get();
393 } catch(rpc::canceled_error&) {
394 good += !rpc_executed;
395 };
396 f = call(c1, cancel);
397 // cancel wait side
398 f_handler_called.then([&cancel] {
11fdf7f2 399 cancel.cancel();
9f95a23c
TL
400 }).get();
401 try {
402 f.get();
403 } catch(rpc::canceled_error&) {
404 good += 10*rpc_executed;
405 };
406 BOOST_REQUIRE_EQUAL(good, 11);
11fdf7f2
TL
407 });
408}
409
410SEASTAR_TEST_CASE(test_message_to_big) {
9f95a23c
TL
411 rpc_test_config cfg;
412 cfg.resource_limits = {0, 1, 100};
413 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env, test_rpc_proto::client& c) {
414 bool good = true;
415 env.register_handler(1, [&] (sstring payload) mutable {
416 good = false;
417 }).get();
418 auto call = env.proto().make_client<void (sstring)>(1);
419 try {
f67539c2 420 call(c, uninitialized_string(101)).get();
9f95a23c
TL
421 good = false;
422 } catch(std::runtime_error& err) {
423 } catch(...) {
424 good = false;
425 }
426 BOOST_REQUIRE_EQUAL(good, true);
11fdf7f2
TL
427 });
428}
11fdf7f2
TL
429
430struct stream_test_result {
431 bool client_source_closed = false;
432 bool server_source_closed = false;
433 bool sink_exception = false;
434 bool sink_close_exception = false;
435 bool source_done_exception = false;
436 bool server_done_exception = false;
437 bool client_stop_exception = false;
438 int server_sum = 0;
439};
440
f67539c2
TL
441future<stream_test_result> stream_test_func(rpc_test_env<>& env, bool stop_client, bool expect_connection_error = false) {
442 return seastar::async([&env, stop_client, expect_connection_error] {
11fdf7f2 443 stream_test_result r;
9f95a23c 444 test_rpc_proto::client c(env.proto(), {}, env.make_socket(), ipv4_addr());
11fdf7f2 445 future<> server_done = make_ready_future();
9f95a23c 446 env.register_handler(1, [&](int i, rpc::source<int> source) {
11fdf7f2
TL
447 BOOST_REQUIRE_EQUAL(i, 666);
448 auto sink = source.make_sink<serializer, sstring>();
449 auto sink_loop = seastar::async([sink] () mutable {
450 for (auto i = 0; i < 100; i++) {
451 sink("seastar").get();
452 sleep(std::chrono::milliseconds(1)).get();
453 }
f67539c2
TL
454 }).finally([sink] () mutable {
455 return sink.flush();
456 }).finally([sink] () mutable {
457 return sink.close();
458 }).finally([sink] {});
459
11fdf7f2
TL
460 auto source_loop = seastar::async([source, &r] () mutable {
461 while (!r.server_source_closed) {
462 auto data = source().get0();
463 if (data) {
464 r.server_sum += std::get<0>(*data);
465 } else {
466 r.server_source_closed = true;
9f95a23c
TL
467 try {
468 // check that reading after eos does not crash
469 // and throws correct exception
470 source().get();
471 } catch (rpc::stream_closed& ex) {
472 // expected
473 } catch (...) {
474 BOOST_FAIL("wrong exception on reading from a stream after eos");
475 }
11fdf7f2
TL
476 }
477 }
478 });
479 server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result();
480 return sink;
9f95a23c
TL
481 }).get();
482 auto call = env.proto().make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1);
11fdf7f2
TL
483 auto x = [&] {
484 try {
9f95a23c 485 return c.make_stream_sink<serializer, int>(env.make_socket()).get0();
11fdf7f2
TL
486 } catch (...) {
487 c.stop().get();
488 throw;
489 }
490 };
491 auto sink = x();
492 auto source = call(c, 666, sink).get0();
493 auto source_done = seastar::async([&] {
494 while (!r.client_source_closed) {
495 auto data = source().get0();
496 if (data) {
497 BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar");
498 } else {
499 r.client_source_closed = true;
500 }
501 }
502 });
503 auto check_exception = [] (auto f) {
504 try {
505 f.get();
506 } catch (...) {
507 return true;
508 }
509 return false;
510 };
511 future<> stop_client_future = make_ready_future();
f67539c2
TL
512 // With a connection error sink() will eventually fail, but we
513 // cannot guarantee when.
514 int max = expect_connection_error ? std::numeric_limits<int>::max() : 101;
515 for (int i = 1; i < max; i++) {
11fdf7f2
TL
516 if (stop_client && i == 50) {
517 // stop client while stream is in use
518 stop_client_future = c.stop();
519 }
520 sleep(std::chrono::milliseconds(1)).get();
521 r.sink_exception = check_exception(sink(i));
522 if (r.sink_exception) {
523 break;
524 }
525 }
526 r.sink_close_exception = check_exception(sink.close());
527 r.source_done_exception = check_exception(std::move(source_done));
528 r.server_done_exception = check_exception(std::move(server_done));
529 r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future));
530 return r;
531 });
532}
533
534SEASTAR_TEST_CASE(test_stream_simple) {
535 rpc::server_options so;
536 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
537 rpc_test_config cfg;
538 cfg.server_options = so;
539 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
540 return stream_test_func(env, false).then([] (stream_test_result r) {
541 BOOST_REQUIRE(r.client_source_closed);
542 BOOST_REQUIRE(r.server_source_closed);
543 BOOST_REQUIRE(r.server_sum == 5050);
544 BOOST_REQUIRE(!r.sink_exception);
545 BOOST_REQUIRE(!r.sink_close_exception);
546 BOOST_REQUIRE(!r.source_done_exception);
547 BOOST_REQUIRE(!r.server_done_exception);
548 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
549 });
550 });
551}
552
553SEASTAR_TEST_CASE(test_stream_stop_client) {
554 rpc::server_options so;
555 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
556 rpc_test_config cfg;
557 cfg.server_options = so;
558 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
559 return stream_test_func(env, true).then([] (stream_test_result r) {
560 BOOST_REQUIRE(!r.client_source_closed);
561 BOOST_REQUIRE(!r.server_source_closed);
562 BOOST_REQUIRE(r.sink_exception);
563 BOOST_REQUIRE(r.sink_close_exception);
564 BOOST_REQUIRE(r.source_done_exception);
565 BOOST_REQUIRE(r.server_done_exception);
566 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
567 });
568 });
569}
570
571
572SEASTAR_TEST_CASE(test_stream_connection_error) {
573 rpc::server_options so;
574 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
575 rpc_test_config cfg;
576 cfg.server_options = so;
577 cfg.inject_error = true;
578 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
f67539c2 579 return stream_test_func(env, false, true).then([] (stream_test_result r) {
9f95a23c
TL
580 BOOST_REQUIRE(!r.client_source_closed);
581 BOOST_REQUIRE(!r.server_source_closed);
582 BOOST_REQUIRE(r.sink_exception);
583 BOOST_REQUIRE(r.sink_close_exception);
584 BOOST_REQUIRE(r.source_done_exception);
585 BOOST_REQUIRE(r.server_done_exception);
586 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
587 });
588 });
589}
590
591SEASTAR_TEST_CASE(test_rpc_scheduling) {
9f95a23c
TL
592 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
593 auto sg = create_scheduling_group("rpc", 100).get0();
594 env.register_handler(1, sg, [] () {
595 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
596 }).get();
597 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
598 auto id = call_sg_id(c1).get0();
599 BOOST_REQUIRE(id == internal::scheduling_group_index(sg));
11fdf7f2
TL
600 });
601}
602
603SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) {
604 auto sg1 = create_scheduling_group("sg1", 100).get0();
20effc67 605 auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); });
11fdf7f2 606 auto sg2 = create_scheduling_group("sg2", 100).get0();
20effc67 607 auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); });
11fdf7f2
TL
608 rpc::resource_limits limits;
609 limits.isolate_connection = [sg1, sg2] (sstring cookie) {
610 auto sg = current_scheduling_group();
611 if (cookie == "sg1") {
612 sg = sg1;
613 } else if (cookie == "sg2") {
614 sg = sg2;
615 }
616 rpc::isolation_config cfg;
617 cfg.sched_group = sg;
618 return cfg;
619 };
9f95a23c
TL
620 rpc_test_config cfg;
621 cfg.resource_limits = limits;
622 rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) {
623 rpc::client_options co1;
624 co1.isolation_cookie = "sg1";
625 test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr());
626 rpc::client_options co2;
627 co2.isolation_cookie = "sg2";
628 test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr());
629 env.register_handler(1, [] {
630 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
631 }).get();
632 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
633 unsigned id;
634 id = call_sg_id(c1).get0();
635 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
636 id = call_sg_id(c2).get0();
637 BOOST_REQUIRE(id == internal::scheduling_group_index(sg2));
638 c1.stop().get();
639 c2.stop().get();
11fdf7f2
TL
640 }).get();
641}
642
643SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) {
644 auto sg1 = create_scheduling_group("sg1", 100).get0();
20effc67 645 auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); });
11fdf7f2 646 auto sg2 = create_scheduling_group("sg2", 100).get0();
20effc67 647 auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); });
11fdf7f2
TL
648 rpc::resource_limits limits;
649 limits.isolate_connection = [sg1, sg2] (sstring cookie) {
650 auto sg = current_scheduling_group();
651 if (cookie == "sg1") {
652 sg = sg1;
653 } else if (cookie == "sg2") {
654 sg = sg2;
655 }
656 rpc::isolation_config cfg;
657 cfg.sched_group = sg;
658 return cfg;
659 };
9f95a23c
TL
660 rpc_test_config cfg;
661 cfg.resource_limits = limits;
662 rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) {
663 rpc::client_options co1;
664 co1.isolation_cookie = "sg1";
665 test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr());
666 rpc::client_options co2;
667 co2.isolation_cookie = "sg2";
668 test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr());
669 // An old client, that doesn't have an isolation cookie
670 rpc::client_options co3;
671 test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr());
672 // A server that uses sg1 if the client is old
673 env.register_handler(1, sg1, [] () {
674 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
675 }).get();
676 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
677 unsigned id;
678 id = call_sg_id(c1).get0();
679 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
680 id = call_sg_id(c2).get0();
681 BOOST_REQUIRE(id == internal::scheduling_group_index(sg2));
682 id = call_sg_id(c3).get0();
683 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
684 c1.stop().get();
685 c2.stop().get();
686 c3.stop().get();
687 }).get();
688}
689
690void test_compressor(std::function<std::unique_ptr<seastar::rpc::compressor>()> compressor_factory) {
691 using namespace seastar::rpc;
692
693 auto linearize = [&] (const auto& buffer) {
694 return seastar::visit(buffer.bufs,
695 [] (const temporary_buffer<char>& buf) {
696 return buf.clone();
697 },
698 [&] (const std::vector<temporary_buffer<char>>& bufs) {
699 auto buf = temporary_buffer<char>(buffer.size);
700 auto dst = buf.get_write();
701 for (auto& b : bufs) {
702 dst = std::copy_n(b.get(), b.size(), dst);
703 }
704 return buf;
705 }
706 );
707 };
708
709 auto split_buffer = [&] (temporary_buffer<char> b, size_t chunk_size) {
710 std::vector<temporary_buffer<char>> bufs;
711 auto src = b.get();
712 auto n = b.size();
713 while (n) {
714 auto this_chunk = std::min(chunk_size, n);
715 bufs.emplace_back(this_chunk);
716 std::copy_n(src, this_chunk, bufs.back().get_write());
717 src += this_chunk;
718 n -= this_chunk;
719 }
720 return bufs;
721 };
722
723 auto clone = [&] (const auto& buffer) {
724 auto c = std::decay_t<decltype(buffer)>();
725 c.size = buffer.size;
726 c.bufs = seastar::visit(buffer.bufs,
727 [] (const temporary_buffer<char>& buf) -> decltype(c.bufs) {
728 return buf.clone();
729 },
730 [] (const std::vector<temporary_buffer<char>>& bufs) -> decltype(c.bufs) {
731 std::vector<temporary_buffer<char>> c;
732 c.reserve(bufs.size());
733 for (auto& b : bufs) {
734 c.emplace_back(b.clone());
735 }
f67539c2 736 return c;
9f95a23c
TL
737 }
738 );
739 return c;
740 };
741
742 auto compressor = compressor_factory();
743
20effc67
TL
744 std::vector<noncopyable_function<std::tuple<sstring, size_t, snd_buf> ()>> inputs;
745
746 auto add_input = [&] (auto func_returning_tuple) {
747 inputs.emplace_back(std::move(func_returning_tuple));
748 };
9f95a23c
TL
749
750 auto& eng = testing::local_random_engine;
751 auto dist = std::uniform_int_distribution<char>();
752
753 auto snd = snd_buf(1);
754 *snd.front().get_write() = 'a';
20effc67 755 add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, no headroom"), size_t(0), std::move(snd)); });
9f95a23c
TL
756
757 snd = snd_buf(1);
758 *snd.front().get_write() = 'a';
20effc67 759 add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, 128k of headroom"), size_t(128 * 1024), std::move(snd)); });
9f95a23c 760
f67539c2
TL
761 auto gen_fill = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
762 auto buf = temporary_buffer<char>(s);
763 std::fill_n(buf.get_write(), s, 'a');
9f95a23c 764
f67539c2
TL
765 auto snd = snd_buf();
766 snd.size = s;
767 if (split) {
768 snd.bufs = split_buffer(buf.clone(), *split);
769 } else {
770 snd.bufs = buf.clone();
771 }
20effc67 772 return std::tuple(msg, size_t(0), std::move(snd));
f67539c2 773 };
9f95a23c 774
20effc67
TL
775
776 add_input(std::bind(gen_fill, 16 * 1024, "single 16 kB buffer of \'a\'"));
9f95a23c 777
f67539c2
TL
778 auto gen_rand = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
779 auto buf = temporary_buffer<char>(s);
780 std::generate_n(buf.get_write(), s, [&] { return dist(eng); });
9f95a23c 781
f67539c2
TL
782 auto snd = snd_buf();
783 snd.size = s;
784 if (split) {
785 snd.bufs = split_buffer(buf.clone(), *split);
786 } else {
787 snd.bufs = buf.clone();
788 }
20effc67 789 return std::tuple(msg, size_t(0), std::move(snd));
f67539c2 790 };
9f95a23c 791
20effc67 792 add_input(std::bind(gen_rand, 16 * 1024, "single 16 kB buffer of random"));
9f95a23c 793
f67539c2
TL
794 auto gen_text = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
795 static const std::string_view text = "The quick brown fox wants bananas for his long term health but sneaks bacon behind his wife's back. ";
9f95a23c 796
f67539c2
TL
797 auto buf = temporary_buffer<char>(s);
798 size_t n = 0;
799 while (n < s) {
800 auto rem = std::min(s - n, text.size());
801 std::copy(text.data(), text.data() + rem, buf.get_write() + n);
802 n += rem;
803 }
9f95a23c 804
f67539c2
TL
805 auto snd = snd_buf();
806 snd.size = s;
807 if (split) {
808 snd.bufs = split_buffer(buf.clone(), *split);
809 } else {
810 snd.bufs = buf.clone();
811 }
20effc67 812 return std::tuple(msg, size_t(0), std::move(snd));
f67539c2 813 };
9f95a23c 814
9f95a23c 815
20effc67 816 for (auto s : { 1, 4 }) {
f67539c2 817 for (auto ss : { 32, 64, 128, 48, 56, 246, 511 }) {
20effc67
TL
818 add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB - {}", s, ss, ss), ss * 1024 - ss));
819 add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB", s, ss), ss * 1024));
820 add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} kB", s, ss), ss * 1024));
9f95a23c 821
20effc67
TL
822 add_input(std::bind(gen_fill, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of \'a\' split into {} kB", s, ss), ss * 1024));
823 add_input(std::bind(gen_rand, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of random split into {} kB", s, ss), ss * 1024));
f67539c2 824 }
9f95a23c 825
f67539c2 826 for (auto ss : { 128, 246, 511, 3567, 2*1024, 8*1024 }) {
20effc67
TL
827 add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} B", s, ss), ss));
828 add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} B", s, ss), ss));
829 add_input(std::bind(gen_text, s * 1024 * 1024, format("{} MB buffer of text split into {} B", s, ss), ss));
830 add_input(std::bind(gen_fill, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of \'a\' split into {} B", s, ss, ss), ss));
831 add_input(std::bind(gen_rand, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss));
832 add_input(std::bind(gen_text, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss));
f67539c2
TL
833 }
834 }
9f95a23c 835
f67539c2 836 for (auto s : { 64*1024 + 5670, 16*1024 + 3421, 32*1024 - 321 }) {
20effc67
TL
837 add_input(std::bind(gen_fill, s, format("{} bytes buffer of \'a\'", s)));
838 add_input(std::bind(gen_rand, s, format("{} bytes buffer of random", s)));
839 add_input(std::bind(gen_text, s, format("{} bytes buffer of text", s)));
f67539c2 840 }
9f95a23c
TL
841
842 std::vector<std::tuple<sstring, std::function<rcv_buf(snd_buf)>>> transforms {
843 { "identity", [] (snd_buf snd) {
844 rcv_buf rcv;
845 rcv.size = snd.size;
846 rcv.bufs = std::move(snd.bufs);
847 return rcv;
848 } },
849 { "linearized", [&linearize] (snd_buf snd) {
850 rcv_buf rcv;
851 rcv.size = snd.size;
852 rcv.bufs = linearize(snd);
853 return rcv;
854 } },
855 { "split 1 B", [&] (snd_buf snd) {
856 rcv_buf rcv;
857 rcv.size = snd.size;
858 rcv.bufs = split_buffer(linearize(snd), 1);
859 return rcv;
860 } },
861 { "split 129 B", [&] (snd_buf snd) {
862 rcv_buf rcv;
863 rcv.size = snd.size;
864 rcv.bufs = split_buffer(linearize(snd), 129);
865 return rcv;
866 } },
867 { "split 4 kB", [&] (snd_buf snd) {
868 rcv_buf rcv;
869 rcv.size = snd.size;
870 rcv.bufs = split_buffer(linearize(snd), 4096);
871 return rcv;
872 } },
873 { "split 4 kB - 128", [&] (snd_buf snd) {
874 rcv_buf rcv;
875 rcv.size = snd.size;
876 rcv.bufs = split_buffer(linearize(snd), 4096 - 128);
877 return rcv;
878 } },
879 };
880
881 auto sanity_check = [&] (const auto& buffer) {
882 auto actual_size = seastar::visit(buffer.bufs,
883 [] (const temporary_buffer<char>& buf) {
884 return buf.size();
885 },
886 [] (const std::vector<temporary_buffer<char>>& bufs) {
887 return boost::accumulate(bufs, size_t(0), [] (size_t sz, const temporary_buffer<char>& buf) {
888 return sz + buf.size();
889 });
890 }
891 );
892 BOOST_CHECK_EQUAL(actual_size, buffer.size);
893 };
894
20effc67
TL
895 for (auto& in_func : inputs) {
896 auto in = in_func();
9f95a23c
TL
897 BOOST_TEST_MESSAGE("Input: " << std::get<0>(in));
898 auto headroom = std::get<1>(in);
899 auto compressed = compressor->compress(headroom, clone(std::get<2>(in)));
900 sanity_check(compressed);
901
902 // Remove headroom
903 BOOST_CHECK_GE(compressed.size, headroom);
904 compressed.size -= headroom;
905 seastar::visit(compressed.bufs,
906 [&] (temporary_buffer<char>& buf) {
907 BOOST_CHECK_GE(buf.size(), headroom);
908 buf.trim_front(headroom);
909 },
910 [&] (std::vector<temporary_buffer<char>>& bufs) {
911 while (headroom) {
912 BOOST_CHECK(!bufs.empty());
913 auto to_remove = std::min(bufs.front().size(), headroom);
914 bufs.front().trim_front(to_remove);
915 if (bufs.front().empty() && bufs.size() > 1) {
916 bufs.erase(bufs.begin());
917 }
918 headroom -= to_remove;
11fdf7f2 919 }
9f95a23c
TL
920 }
921 );
922
923 auto in_l = linearize(std::get<2>(in));
924
925 for (auto& t : transforms) {
926 BOOST_TEST_MESSAGE(" Transform: " << std::get<0>(t));
927 auto received = std::get<1>(t)(clone(compressed));
928
929 auto decompressed = compressor->decompress(std::move(received));
930 sanity_check(decompressed);
931
932 BOOST_CHECK_EQUAL(decompressed.size, std::get<2>(in).size);
933
934 auto out_l = linearize(decompressed);
935
936 BOOST_CHECK_EQUAL(in_l.size(), out_l.size());
937 BOOST_CHECK(in_l == out_l);
938 }
939 }
940}
941
942SEASTAR_THREAD_TEST_CASE(test_lz4_compressor) {
943 test_compressor([] { return std::make_unique<rpc::lz4_compressor>(); });
944}
945
946SEASTAR_THREAD_TEST_CASE(test_lz4_fragmented_compressor) {
947 test_compressor([] { return std::make_unique<rpc::lz4_fragmented_compressor>(); });
948}
949
950// Test reproducing issue #671: If timeout is time_point::max(), translating
951// it to relative timeout in the sender and then back in the receiver, when
952// these calculations happen across a millisecond boundary, overflowed the
953// integer and mislead the receiver to think the requested timeout was
954// negative, and cause it drop its response, so the RPC call never completed.
955SEASTAR_TEST_CASE(test_max_absolute_timeout) {
956 // The typical failure of this test is a hang. So we use semaphore to
957 // stop the test either when it succeeds, or after a long enough hang.
958 auto success = make_lw_shared<bool>(false);
959 auto done = make_lw_shared<semaphore>(0);
960 auto abrt = make_lw_shared<abort_source>();
961 (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] {
962 done->signal(1);
963 }).handle_exception([] (std::exception_ptr) {});
964 rpc::client_options co;
965 co.send_timeout_data = 1;
966 (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
967 env.register_handler(1, [](int a, int b) {
968 return make_ready_future<int>(a+b);
969 }).get();
970 auto sum = env.proto().make_client<int (int, int)>(1);
971 // The bug only reproduces if the calculation done on the sender
972 // and receiver sides, happened across a millisecond boundary.
973 // We can't control when it happens, so we just need to loop many
974 // times, at least many milliseconds, to increase the probability
975 // that we catch the bug. Experimentally, if we loop for 200ms, we
976 // catch the bug in #671 virtually every time.
977 auto until = seastar::lowres_clock::now() + std::chrono::milliseconds(200);
978 while (seastar::lowres_clock::now() <= until) {
979 auto result = sum(c1, rpc::rpc_clock_type::time_point::max(), 2, 3).get0();
980 BOOST_REQUIRE_EQUAL(result, 2 + 3);
981 }
982 }).then([success, done, abrt] {
983 *success = true;
984 abrt->request_abort();
985 done->signal();
986 });
987 return done->wait().then([done, success] {
988 BOOST_REQUIRE(*success);
989 });
990}
991
992// Similar to the above test: Test that a relative timeout duration::max()
993// also works, and again doesn't cause the timeout wrapping around to the
994// past and causing dropped responses.
995SEASTAR_TEST_CASE(test_max_relative_timeout) {
996 // The typical failure of this test is a hang. So we use semaphore to
997 // stop the test either when it succeeds, or after a long enough hang.
998 auto success = make_lw_shared<bool>(false);
999 auto done = make_lw_shared<semaphore>(0);
1000 auto abrt = make_lw_shared<abort_source>();
1001 (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] {
1002 done->signal(1);
1003 }).handle_exception([] (std::exception_ptr) {});
1004 rpc::client_options co;
1005 co.send_timeout_data = 1;
1006 (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1007 env.register_handler(1, [](int a, int b) {
1008 return make_ready_future<int>(a+b);
1009 }).get();
1010 auto sum = env.proto().make_client<int (int, int)>(1);
1011 // The following call used to always hang, when max()+now()
1012 // overflowed and appeared to be a negative timeout.
1013 auto result = sum(c1, rpc::rpc_clock_type::duration::max(), 2, 3).get0();
1014 BOOST_REQUIRE_EQUAL(result, 2 + 3);
1015 }).then([success, done, abrt] {
1016 *success = true;
1017 abrt->request_abort();
1018 done->signal();
1019 });
1020 return done->wait().then([done, success] {
1021 BOOST_REQUIRE(*success);
1022 });
1023}
1024
1025SEASTAR_TEST_CASE(test_rpc_tuple) {
1026 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1027 env.register_handler(1, [] () {
1028 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L));
1029 }).get();
1030 auto f1 = env.proto().make_client<rpc::tuple<int, long> ()>(1);
1031 auto result = f1(c1).get0();
1032 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1033 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1034 });
1035}
1036
1037SEASTAR_TEST_CASE(test_rpc_nonvariadic_client_variadic_server) {
1038 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1039 // Server is variadic
1040 env.register_handler(1, [] () {
f67539c2 1041 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple(1, 0x7'0000'0000L));
9f95a23c
TL
1042 }).get();
1043 // Client is non-variadic
1044 auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1);
1045 auto result = f1(c1).get0();
1046 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1047 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1048 });
1049}
1050
1051SEASTAR_TEST_CASE(test_rpc_variadic_client_nonvariadic_server) {
1052 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1053 // Server is nonvariadic
1054 env.register_handler(1, [] () {
1055 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L));
1056 }).get();
1057 // Client is variadic
f67539c2
TL
1058 auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1);
1059 auto result = f1(c1).get0();
9f95a23c
TL
1060 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1061 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1062 });
1063}
1064
1065SEASTAR_TEST_CASE(test_handler_registration) {
1066 rpc_test_config cfg;
1067 cfg.connect = false;
1068 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) {
1069 auto& proto = env.proto();
1070
f67539c2
TL
1071 // new proto must be empty
1072 BOOST_REQUIRE(!proto.has_handlers());
1073
9f95a23c
TL
1074 // non-existing handler should not be found
1075 BOOST_REQUIRE(!proto.has_handler(1));
1076
1077 // unregistered non-existing handler should return ready future
1078 auto fut = proto.unregister_handler(1);
1079 BOOST_REQUIRE(fut.available() && !fut.failed());
1080 fut.get();
1081
1082 // existing handler should be found
1083 auto handler = [] () { return make_ready_future<>(); };
1084 proto.register_handler(1, handler);
1085 BOOST_REQUIRE(proto.has_handler(1));
1086
1087 // cannot register handler if already registered
1088 BOOST_REQUIRE_THROW(proto.register_handler(1, handler), std::runtime_error);
1089
1090 // unregistered handler should not be found
1091 proto.unregister_handler(1).get();
1092 BOOST_REQUIRE(!proto.has_handler(1));
1093
1094 // re-registering a handler should succeed
1095 proto.register_handler(1, handler);
1096 BOOST_REQUIRE(proto.has_handler(1));
f67539c2
TL
1097
1098 // proto with handlers must not be empty
1099 BOOST_REQUIRE(proto.has_handlers());
9f95a23c
TL
1100 });
1101}
1102
1103SEASTAR_TEST_CASE(test_unregister_handler) {
1104 using namespace std::chrono_literals;
1105 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1106 promise<> handler_called;
1107 future<> f_handler_called = handler_called.get_future();
1108 bool rpc_executed = false;
1109 bool rpc_completed = false;
1110
1111 auto reset_state = [&f_handler_called, &rpc_executed, &rpc_completed] {
1112 if (f_handler_called.available()) {
1113 f_handler_called.get();
1114 }
1115 rpc_executed = false;
1116 rpc_completed = false;
1117 };
1118
1119 auto get_handler = [&handler_called, &rpc_executed, &rpc_completed] {
1120 return [&handler_called, &rpc_executed, &rpc_completed] {
1121 handler_called.set_value();
1122 rpc_executed = true;
1123 return sleep(1ms).then([&rpc_completed] {
1124 rpc_completed = true;
1125 });
1126 };
1127 };
1128
1129 // handler should not run if unregistered before being called
1130 env.register_handler(1, get_handler()).get();
1131 env.unregister_handler(1).get();
1132 BOOST_REQUIRE(!f_handler_called.available());
1133 BOOST_REQUIRE(!rpc_executed);
1134 BOOST_REQUIRE(!rpc_completed);
1135
1136 // verify normal execution path
1137 env.register_handler(1, get_handler()).get();
1138 auto call = env.proto().make_client<void ()>(1);
1139 call(c1).get();
1140 BOOST_REQUIRE(f_handler_called.available());
1141 BOOST_REQUIRE(rpc_executed);
1142 BOOST_REQUIRE(rpc_completed);
1143 reset_state();
1144
1145 // call should fail after handler is unregistered
1146 env.unregister_handler(1).get();
1147 try {
1148 call(c1).get();
1149 BOOST_REQUIRE(false);
1150 } catch (rpc::unknown_verb_error&) {
1151 // expected
1152 } catch (...) {
1153 std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl;
1154 BOOST_REQUIRE(false);
1155 }
1156 BOOST_REQUIRE(!f_handler_called.available());
1157 BOOST_REQUIRE(!rpc_executed);
1158 BOOST_REQUIRE(!rpc_completed);
1159
1160 // unregistered is allowed while call is in flight
1161 auto delayed_unregister = [&env] {
1162 return sleep(500us).then([&env] {
1163 return env.unregister_handler(1);
11fdf7f2 1164 });
9f95a23c
TL
1165 };
1166
1167 env.register_handler(1, get_handler()).get();
1168 try {
1169 when_all_succeed(call(c1), delayed_unregister()).get();
1170 reset_state();
1171 } catch (rpc::unknown_verb_error&) {
1172 // expected
1173 } catch (...) {
1174 std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl;
1175 BOOST_REQUIRE(false);
1176 }
1177 });
11fdf7f2 1178}
9f95a23c 1179
f67539c2
TL
1180SEASTAR_TEST_CASE(test_loggers) {
1181 static seastar::logger log("dummy");
1182 log.set_level(log_level::debug);
1183 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1184 socket_address dummy_addr;
1185 auto& proto = env.proto();
1186 auto& logger = proto.get_logger();
1187 logger(dummy_addr, "Hello0");
1188 logger(dummy_addr, log_level::debug, "Hello1");
1189 proto.set_logger(&log);
1190 logger(dummy_addr, "Hello2");
1191 logger(dummy_addr, log_level::debug, "Hello3");
1192 // We *want* to test the deprecated API, don't spam warnings about it.
1193#pragma GCC diagnostic push
1194#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1195 proto.set_logger([] (const sstring& str) {
1196 log.info("Test: {}", str);
1197 });
1198#pragma GCC diagnostic pop
1199 logger(dummy_addr, "Hello4");
1200 logger(dummy_addr, log_level::debug, "Hello5");
1201 proto.set_logger(nullptr);
1202 logger(dummy_addr, "Hello6");
1203 logger(dummy_addr, log_level::debug, "Hello7");
1204 });
1205}
9f95a23c
TL
1206
1207static_assert(std::is_same_v<decltype(rpc::tuple(1U, 1L)), rpc::tuple<unsigned, long>>, "rpc::tuple deduction guid not working");