]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/tests/unit/rpc_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / tests / unit / rpc_test.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 ScyllaDB
20 */
21
11fdf7f2
TL
22#include "loopback_socket.hh"
23#include <seastar/rpc/rpc.hh>
24#include <seastar/rpc/rpc_types.hh>
25#include <seastar/rpc/lz4_compressor.hh>
9f95a23c 26#include <seastar/rpc/lz4_fragmented_compressor.hh>
11fdf7f2
TL
27#include <seastar/rpc/multi_algo_compressor_factory.hh>
28#include <seastar/testing/test_case.hh>
29#include <seastar/testing/thread_test_case.hh>
9f95a23c 30#include <seastar/testing/test_runner.hh>
11fdf7f2
TL
31#include <seastar/core/thread.hh>
32#include <seastar/core/sleep.hh>
9f95a23c 33#include <seastar/core/distributed.hh>
f67539c2 34#include <seastar/core/loop.hh>
11fdf7f2 35#include <seastar/util/defer.hh>
f67539c2 36#include <seastar/util/log.hh>
11fdf7f2
TL
37
38using namespace seastar;
39
40struct serializer {
41};
42
43template <typename T, typename Output>
44inline
45void write_arithmetic_type(Output& out, T v) {
46 static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
47 return out.write(reinterpret_cast<const char*>(&v), sizeof(T));
48}
49
50template <typename T, typename Input>
51inline
52T read_arithmetic_type(Input& in) {
53 static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
54 T v;
55 in.read(reinterpret_cast<char*>(&v), sizeof(T));
56 return v;
57}
58
59template <typename Output>
60inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); }
61template <typename Output>
62inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); }
63template <typename Output>
64inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); }
65template <typename Output>
66inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); }
67template <typename Output>
68inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); }
69template <typename Input>
70inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); }
71template <typename Input>
72inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); }
73template <typename Input>
74inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); }
75template <typename Input>
76inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); }
77template <typename Input>
78inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); }
79
80template <typename Output>
81inline void write(serializer, Output& out, const sstring& v) {
82 write_arithmetic_type(out, uint32_t(v.size()));
83 out.write(v.c_str(), v.size());
84}
85
86template <typename Input>
87inline sstring read(serializer, Input& in, rpc::type<sstring>) {
88 auto size = read_arithmetic_type<uint32_t>(in);
f67539c2
TL
89 sstring ret = uninitialized_string(size);
90 in.read(ret.data(), size);
11fdf7f2
TL
91 return ret;
92}
93
94using test_rpc_proto = rpc::protocol<serializer>;
95using make_socket_fn = std::function<seastar::socket ()>;
96
97struct rpc_loopback_error_injector : public loopback_error_injector {
98 int _x = 0;
99 bool server_rcv_error() override {
100 return _x++ >= 50;
101 }
102};
103
104class rpc_socket_impl : public ::net::socket_impl {
105 promise<connected_socket> _p;
106 bool _connect;
107 loopback_socket_impl _socket;
108 rpc_loopback_error_injector _error_injector;
109public:
110 rpc_socket_impl(loopback_connection_factory& factory, bool connect, bool inject_error)
111 : _connect(connect),
112 _socket(factory, inject_error ? &_error_injector : nullptr) {
113 }
114 virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override {
115 return _connect ? _socket.connect(sa, local, proto) : _p.get_future();
116 }
9f95a23c
TL
117 virtual void set_reuseaddr(bool reuseaddr) override {}
118 virtual bool get_reuseaddr() const override { return false; };
11fdf7f2
TL
119 virtual void shutdown() override {
120 if (_connect) {
121 _socket.shutdown();
122 } else {
123 _p.set_exception(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
124 }
125 }
126};
127
9f95a23c
TL
128struct rpc_test_config {
129 rpc::resource_limits resource_limits = {};
130 rpc::server_options server_options = {};
131 bool connect = true;
132 bool inject_error = false;
133};
134
135template<typename MsgType = int>
136class rpc_test_env {
137 struct rpc_test_service {
138 test_rpc_proto _proto;
139 test_rpc_proto::server _server;
140 std::vector<MsgType> _handlers;
141
142 rpc_test_service() = delete;
143 explicit rpc_test_service(const rpc_test_config& cfg, loopback_connection_factory& lcf)
144 : _proto(serializer())
145 , _server(_proto, cfg.server_options, lcf.get_server_socket(), cfg.resource_limits)
146 { }
147
148 test_rpc_proto& proto() {
149 return _proto;
150 }
151
152 test_rpc_proto::server& server() {
153 return _server;
154 }
155
156 future<> stop() {
157 return parallel_for_each(_handlers, [this] (auto t) {
158 return proto().unregister_handler(t);
159 }).finally([this] {
160 return server().stop();
161 });
162 }
163
164 template<typename Func>
165 auto register_handler(MsgType t, scheduling_group sg, Func func) {
166 _handlers.emplace_back(t);
167 return proto().register_handler(t, sg, std::move(func));
168 }
169
170 future<> unregister_handler(MsgType t) {
171 auto it = std::find(_handlers.begin(), _handlers.end(), t);
172 assert(it != _handlers.end());
173 _handlers.erase(it);
174 return proto().unregister_handler(t);
175 }
11fdf7f2 176 };
9f95a23c
TL
177
178 rpc_test_config _cfg;
179 loopback_connection_factory _lcf;
f67539c2 180 std::unique_ptr<sharded<rpc_test_service>> _service;
9f95a23c
TL
181
182public:
183 rpc_test_env() = delete;
184 explicit rpc_test_env(rpc_test_config cfg)
f67539c2 185 : _cfg(cfg), _service(std::make_unique<sharded<rpc_test_service>>())
9f95a23c
TL
186 {
187 }
188
189 using test_fn = std::function<future<> (rpc_test_env<MsgType>& env)>;
190 static future<> do_with(rpc_test_config cfg, test_fn&& func) {
191 return seastar::do_with(rpc_test_env(cfg), [func] (rpc_test_env<MsgType>& env) {
192 return env.start().then([&env, func] {
193 return func(env);
194 }).finally([&env] {
195 return env.stop();
11fdf7f2
TL
196 });
197 });
9f95a23c
TL
198 }
199
200 using thread_test_fn = std::function<void (rpc_test_env<MsgType>& env)>;
201 static future<> do_with_thread(rpc_test_config cfg, thread_test_fn&& func) {
202 return do_with(std::move(cfg), [func] (rpc_test_env<MsgType>& env) {
203 return seastar::async([&env, func] {
204 func(env);
205 });
206 });
207 }
208
209 using thread_test_fn_with_client = std::function<void (rpc_test_env<MsgType>& env, test_rpc_proto::client& cl)>;
210 static future<> do_with_thread(rpc_test_config cfg, rpc::client_options co, thread_test_fn_with_client&& func) {
211 return do_with(std::move(cfg), [func, co = std::move(co)] (rpc_test_env<MsgType>& env) {
212 return seastar::async([&env, func, co = std::move(co)] {
213 test_rpc_proto::client cl(env.proto(), co, env.make_socket(), ipv4_addr());
214 auto stop = defer([&] { cl.stop().get(); });
215 func(env, cl);
216 });
217 });
218 }
219
220 static future<> do_with_thread(rpc_test_config cfg, thread_test_fn_with_client&& func) {
221 return do_with_thread(std::move(cfg), rpc::client_options(), std::move(func));
222 }
223
224 auto make_socket() {
225 return seastar::socket(std::make_unique<rpc_socket_impl>(_lcf, _cfg.connect, _cfg.inject_error));
226 };
227
228 test_rpc_proto& proto() {
229 return local_service().proto();
230 }
231
232 test_rpc_proto::server& server() {
233 return local_service().server();
234 }
235
236 template<typename Func>
237 future<> register_handler(MsgType t, scheduling_group sg, Func func) {
f67539c2 238 return _service->invoke_on_all([t, func = std::move(func), sg] (rpc_test_service& s) mutable {
9f95a23c
TL
239 s.register_handler(t, sg, std::move(func));
240 });
241 }
242
243 template<typename Func>
244 future<> register_handler(MsgType t, Func func) {
245 return register_handler(t, scheduling_group(), std::move(func));
246 }
247
248 future<> unregister_handler(MsgType t) {
f67539c2 249 return _service->invoke_on_all([t] (rpc_test_service& s) mutable {
9f95a23c
TL
250 return s.unregister_handler(t);
251 });
252 }
253
254private:
255 rpc_test_service& local_service() {
f67539c2
TL
256 return _service->local();
257
9f95a23c
TL
258 }
259
260 future<> start() {
f67539c2 261 return _service->start(std::cref(_cfg), std::ref(_lcf));
9f95a23c
TL
262 }
263
264 future<> stop() {
f67539c2 265 return _service->stop().then([this] {
9f95a23c
TL
266 return _lcf.destroy_all_shards();
267 });
268 }
269};
11fdf7f2
TL
270
271struct cfactory : rpc::compressor::factory {
272 mutable int use_compression = 0;
273 const sstring name;
274 cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {}
275 const sstring& supported() const override {
276 return name;
277 }
f67539c2
TL
278 class mylz4 : public rpc::lz4_compressor {
279 sstring _name;
280 public:
281 mylz4(const sstring& n) : _name(n) {}
282 sstring name() const override {
283 return _name;
284 }
285 };
11fdf7f2
TL
286 std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override {
287 if (feature == name) {
288 use_compression++;
f67539c2 289 return std::make_unique<mylz4>(name);
11fdf7f2
TL
290 } else {
291 return nullptr;
292 }
293 }
294};
9f95a23c 295
11fdf7f2
TL
296SEASTAR_TEST_CASE(test_rpc_connect) {
297 std::vector<future<>> fs;
298
299 for (auto i = 0; i < 2; i++) {
300 for (auto j = 0; j < 4; j++) {
301 auto factory = std::make_unique<cfactory>();
302 rpc::server_options so;
303 rpc::client_options co;
304 if (i == 1) {
305 so.compressor_factory = factory.get();
306 }
307 if (j & 1) {
308 co.compressor_factory = factory.get();
309 }
310 co.send_timeout_data = j & 2;
9f95a23c
TL
311 rpc_test_config cfg;
312 cfg.server_options = so;
313 auto f = rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
314 env.register_handler(1, [](int a, int b) {
315 return make_ready_future<int>(a+b);
316 }).get();
317 auto sum = env.proto().make_client<int (int, int)>(1);
318 auto result = sum(c1, 2, 3).get0();
319 BOOST_REQUIRE_EQUAL(result, 2 + 3);
11fdf7f2
TL
320 }).handle_exception([] (auto ep) {
321 BOOST_FAIL("No exception expected");
322 }).finally([factory = std::move(factory), i, j = j & 1] {
323 if (i == 1 && j == 1) {
324 BOOST_REQUIRE_EQUAL(factory->use_compression, 2);
325 } else {
326 BOOST_REQUIRE_EQUAL(factory->use_compression, 0);
327 }
328 });
329 fs.emplace_back(std::move(f));
330 }
331 }
332 return when_all(fs.begin(), fs.end()).discard_result();
333}
334
335SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) {
336 auto factory1 = std::make_unique<cfactory>();
337 auto factory2 = std::make_unique<cfactory>("LZ4NEW");
338 rpc::server_options so;
339 rpc::client_options co;
340 static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()});
341 static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()});
342 so.compressor_factory = &server;
343 co.compressor_factory = &client;
9f95a23c
TL
344 rpc_test_config cfg;
345 cfg.server_options = so;
346 return rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
347 env.register_handler(1, [](int a, int b) {
348 return make_ready_future<int>(a+b);
349 }).get();
350 auto sum = env.proto().make_client<int (int, int)>(1);
351 auto result = sum(c1, 2, 3).get0();
352 BOOST_REQUIRE_EQUAL(result, 2 + 3);
11fdf7f2
TL
353 }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] {
354 BOOST_REQUIRE_EQUAL(factory1->use_compression, 0);
355 BOOST_REQUIRE_EQUAL(factory2->use_compression, 2);
356 });
357}
358
359SEASTAR_TEST_CASE(test_rpc_connect_abort) {
9f95a23c
TL
360 rpc_test_config cfg;
361 cfg.connect = false;
362 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) {
363 test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr());
364 env.register_handler(1, []() { return make_ready_future<>(); }).get();
365 auto f = env.proto().make_client<void ()>(1);
366 c1.stop().get0();
367 try {
368 f(c1).get0();
369 BOOST_REQUIRE(false);
370 } catch (...) {}
11fdf7f2
TL
371 });
372}
373
374SEASTAR_TEST_CASE(test_rpc_cancel) {
375 using namespace std::chrono_literals;
9f95a23c
TL
376 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
377 bool rpc_executed = false;
378 int good = 0;
379 promise<> handler_called;
380 future<> f_handler_called = handler_called.get_future();
381 env.register_handler(1, [&rpc_executed, &handler_called] {
382 handler_called.set_value(); rpc_executed = true; return sleep(1ms);
383 }).get();
384 auto call = env.proto().make_client<void ()>(1);
385 rpc::cancellable cancel;
386 auto f = call(c1, cancel);
387 // cancel send side
388 cancel.cancel();
389 try {
390 f.get();
391 } catch(rpc::canceled_error&) {
392 good += !rpc_executed;
393 };
394 f = call(c1, cancel);
395 // cancel wait side
396 f_handler_called.then([&cancel] {
11fdf7f2 397 cancel.cancel();
9f95a23c
TL
398 }).get();
399 try {
400 f.get();
401 } catch(rpc::canceled_error&) {
402 good += 10*rpc_executed;
403 };
404 BOOST_REQUIRE_EQUAL(good, 11);
11fdf7f2
TL
405 });
406}
407
408SEASTAR_TEST_CASE(test_message_to_big) {
9f95a23c
TL
409 rpc_test_config cfg;
410 cfg.resource_limits = {0, 1, 100};
411 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env, test_rpc_proto::client& c) {
412 bool good = true;
413 env.register_handler(1, [&] (sstring payload) mutable {
414 good = false;
415 }).get();
416 auto call = env.proto().make_client<void (sstring)>(1);
417 try {
f67539c2 418 call(c, uninitialized_string(101)).get();
9f95a23c
TL
419 good = false;
420 } catch(std::runtime_error& err) {
421 } catch(...) {
422 good = false;
423 }
424 BOOST_REQUIRE_EQUAL(good, true);
11fdf7f2
TL
425 });
426}
11fdf7f2
TL
427
428struct stream_test_result {
429 bool client_source_closed = false;
430 bool server_source_closed = false;
431 bool sink_exception = false;
432 bool sink_close_exception = false;
433 bool source_done_exception = false;
434 bool server_done_exception = false;
435 bool client_stop_exception = false;
436 int server_sum = 0;
437};
438
f67539c2
TL
439future<stream_test_result> stream_test_func(rpc_test_env<>& env, bool stop_client, bool expect_connection_error = false) {
440 return seastar::async([&env, stop_client, expect_connection_error] {
11fdf7f2 441 stream_test_result r;
9f95a23c 442 test_rpc_proto::client c(env.proto(), {}, env.make_socket(), ipv4_addr());
11fdf7f2 443 future<> server_done = make_ready_future();
9f95a23c 444 env.register_handler(1, [&](int i, rpc::source<int> source) {
11fdf7f2
TL
445 BOOST_REQUIRE_EQUAL(i, 666);
446 auto sink = source.make_sink<serializer, sstring>();
447 auto sink_loop = seastar::async([sink] () mutable {
448 for (auto i = 0; i < 100; i++) {
449 sink("seastar").get();
450 sleep(std::chrono::milliseconds(1)).get();
451 }
f67539c2
TL
452 }).finally([sink] () mutable {
453 return sink.flush();
454 }).finally([sink] () mutable {
455 return sink.close();
456 }).finally([sink] {});
457
11fdf7f2
TL
458 auto source_loop = seastar::async([source, &r] () mutable {
459 while (!r.server_source_closed) {
460 auto data = source().get0();
461 if (data) {
462 r.server_sum += std::get<0>(*data);
463 } else {
464 r.server_source_closed = true;
9f95a23c
TL
465 try {
466 // check that reading after eos does not crash
467 // and throws correct exception
468 source().get();
469 } catch (rpc::stream_closed& ex) {
470 // expected
471 } catch (...) {
472 BOOST_FAIL("wrong exception on reading from a stream after eos");
473 }
11fdf7f2
TL
474 }
475 }
476 });
477 server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result();
478 return sink;
9f95a23c
TL
479 }).get();
480 auto call = env.proto().make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1);
11fdf7f2
TL
481 auto x = [&] {
482 try {
9f95a23c 483 return c.make_stream_sink<serializer, int>(env.make_socket()).get0();
11fdf7f2
TL
484 } catch (...) {
485 c.stop().get();
486 throw;
487 }
488 };
489 auto sink = x();
490 auto source = call(c, 666, sink).get0();
491 auto source_done = seastar::async([&] {
492 while (!r.client_source_closed) {
493 auto data = source().get0();
494 if (data) {
495 BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar");
496 } else {
497 r.client_source_closed = true;
498 }
499 }
500 });
501 auto check_exception = [] (auto f) {
502 try {
503 f.get();
504 } catch (...) {
505 return true;
506 }
507 return false;
508 };
509 future<> stop_client_future = make_ready_future();
f67539c2
TL
510 // With a connection error sink() will eventually fail, but we
511 // cannot guarantee when.
512 int max = expect_connection_error ? std::numeric_limits<int>::max() : 101;
513 for (int i = 1; i < max; i++) {
11fdf7f2
TL
514 if (stop_client && i == 50) {
515 // stop client while stream is in use
516 stop_client_future = c.stop();
517 }
518 sleep(std::chrono::milliseconds(1)).get();
519 r.sink_exception = check_exception(sink(i));
520 if (r.sink_exception) {
521 break;
522 }
523 }
524 r.sink_close_exception = check_exception(sink.close());
525 r.source_done_exception = check_exception(std::move(source_done));
526 r.server_done_exception = check_exception(std::move(server_done));
527 r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future));
528 return r;
529 });
530}
531
532SEASTAR_TEST_CASE(test_stream_simple) {
533 rpc::server_options so;
534 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
535 rpc_test_config cfg;
536 cfg.server_options = so;
537 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
538 return stream_test_func(env, false).then([] (stream_test_result r) {
539 BOOST_REQUIRE(r.client_source_closed);
540 BOOST_REQUIRE(r.server_source_closed);
541 BOOST_REQUIRE(r.server_sum == 5050);
542 BOOST_REQUIRE(!r.sink_exception);
543 BOOST_REQUIRE(!r.sink_close_exception);
544 BOOST_REQUIRE(!r.source_done_exception);
545 BOOST_REQUIRE(!r.server_done_exception);
546 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
547 });
548 });
549}
550
551SEASTAR_TEST_CASE(test_stream_stop_client) {
552 rpc::server_options so;
553 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
554 rpc_test_config cfg;
555 cfg.server_options = so;
556 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
557 return stream_test_func(env, true).then([] (stream_test_result r) {
558 BOOST_REQUIRE(!r.client_source_closed);
559 BOOST_REQUIRE(!r.server_source_closed);
560 BOOST_REQUIRE(r.sink_exception);
561 BOOST_REQUIRE(r.sink_close_exception);
562 BOOST_REQUIRE(r.source_done_exception);
563 BOOST_REQUIRE(r.server_done_exception);
564 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
565 });
566 });
567}
568
569
570SEASTAR_TEST_CASE(test_stream_connection_error) {
571 rpc::server_options so;
572 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
573 rpc_test_config cfg;
574 cfg.server_options = so;
575 cfg.inject_error = true;
576 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
f67539c2 577 return stream_test_func(env, false, true).then([] (stream_test_result r) {
9f95a23c
TL
578 BOOST_REQUIRE(!r.client_source_closed);
579 BOOST_REQUIRE(!r.server_source_closed);
580 BOOST_REQUIRE(r.sink_exception);
581 BOOST_REQUIRE(r.sink_close_exception);
582 BOOST_REQUIRE(r.source_done_exception);
583 BOOST_REQUIRE(r.server_done_exception);
584 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
585 });
586 });
587}
588
589SEASTAR_TEST_CASE(test_rpc_scheduling) {
9f95a23c
TL
590 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
591 auto sg = create_scheduling_group("rpc", 100).get0();
592 env.register_handler(1, sg, [] () {
593 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
594 }).get();
595 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
596 auto id = call_sg_id(c1).get0();
597 BOOST_REQUIRE(id == internal::scheduling_group_index(sg));
11fdf7f2
TL
598 });
599}
600
601SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) {
602 auto sg1 = create_scheduling_group("sg1", 100).get0();
603 auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); });
604 auto sg2 = create_scheduling_group("sg2", 100).get0();
605 auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); });
606 rpc::resource_limits limits;
607 limits.isolate_connection = [sg1, sg2] (sstring cookie) {
608 auto sg = current_scheduling_group();
609 if (cookie == "sg1") {
610 sg = sg1;
611 } else if (cookie == "sg2") {
612 sg = sg2;
613 }
614 rpc::isolation_config cfg;
615 cfg.sched_group = sg;
616 return cfg;
617 };
9f95a23c
TL
618 rpc_test_config cfg;
619 cfg.resource_limits = limits;
620 rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) {
621 rpc::client_options co1;
622 co1.isolation_cookie = "sg1";
623 test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr());
624 rpc::client_options co2;
625 co2.isolation_cookie = "sg2";
626 test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr());
627 env.register_handler(1, [] {
628 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
629 }).get();
630 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
631 unsigned id;
632 id = call_sg_id(c1).get0();
633 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
634 id = call_sg_id(c2).get0();
635 BOOST_REQUIRE(id == internal::scheduling_group_index(sg2));
636 c1.stop().get();
637 c2.stop().get();
11fdf7f2
TL
638 }).get();
639}
640
641SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) {
642 auto sg1 = create_scheduling_group("sg1", 100).get0();
643 auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); });
644 auto sg2 = create_scheduling_group("sg2", 100).get0();
645 auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); });
646 rpc::resource_limits limits;
647 limits.isolate_connection = [sg1, sg2] (sstring cookie) {
648 auto sg = current_scheduling_group();
649 if (cookie == "sg1") {
650 sg = sg1;
651 } else if (cookie == "sg2") {
652 sg = sg2;
653 }
654 rpc::isolation_config cfg;
655 cfg.sched_group = sg;
656 return cfg;
657 };
9f95a23c
TL
658 rpc_test_config cfg;
659 cfg.resource_limits = limits;
660 rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) {
661 rpc::client_options co1;
662 co1.isolation_cookie = "sg1";
663 test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr());
664 rpc::client_options co2;
665 co2.isolation_cookie = "sg2";
666 test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr());
667 // An old client, that doesn't have an isolation cookie
668 rpc::client_options co3;
669 test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr());
670 // A server that uses sg1 if the client is old
671 env.register_handler(1, sg1, [] () {
672 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
673 }).get();
674 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
675 unsigned id;
676 id = call_sg_id(c1).get0();
677 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
678 id = call_sg_id(c2).get0();
679 BOOST_REQUIRE(id == internal::scheduling_group_index(sg2));
680 id = call_sg_id(c3).get0();
681 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
682 c1.stop().get();
683 c2.stop().get();
684 c3.stop().get();
685 }).get();
686}
687
688void test_compressor(std::function<std::unique_ptr<seastar::rpc::compressor>()> compressor_factory) {
689 using namespace seastar::rpc;
690
691 auto linearize = [&] (const auto& buffer) {
692 return seastar::visit(buffer.bufs,
693 [] (const temporary_buffer<char>& buf) {
694 return buf.clone();
695 },
696 [&] (const std::vector<temporary_buffer<char>>& bufs) {
697 auto buf = temporary_buffer<char>(buffer.size);
698 auto dst = buf.get_write();
699 for (auto& b : bufs) {
700 dst = std::copy_n(b.get(), b.size(), dst);
701 }
702 return buf;
703 }
704 );
705 };
706
707 auto split_buffer = [&] (temporary_buffer<char> b, size_t chunk_size) {
708 std::vector<temporary_buffer<char>> bufs;
709 auto src = b.get();
710 auto n = b.size();
711 while (n) {
712 auto this_chunk = std::min(chunk_size, n);
713 bufs.emplace_back(this_chunk);
714 std::copy_n(src, this_chunk, bufs.back().get_write());
715 src += this_chunk;
716 n -= this_chunk;
717 }
718 return bufs;
719 };
720
721 auto clone = [&] (const auto& buffer) {
722 auto c = std::decay_t<decltype(buffer)>();
723 c.size = buffer.size;
724 c.bufs = seastar::visit(buffer.bufs,
725 [] (const temporary_buffer<char>& buf) -> decltype(c.bufs) {
726 return buf.clone();
727 },
728 [] (const std::vector<temporary_buffer<char>>& bufs) -> decltype(c.bufs) {
729 std::vector<temporary_buffer<char>> c;
730 c.reserve(bufs.size());
731 for (auto& b : bufs) {
732 c.emplace_back(b.clone());
733 }
f67539c2 734 return c;
9f95a23c
TL
735 }
736 );
737 return c;
738 };
739
740 auto compressor = compressor_factory();
741
742 std::vector<std::tuple<sstring, size_t, snd_buf>> inputs;
743
744 auto& eng = testing::local_random_engine;
745 auto dist = std::uniform_int_distribution<char>();
746
747 auto snd = snd_buf(1);
748 *snd.front().get_write() = 'a';
749 inputs.emplace_back("one byte, no headroom", 0, std::move(snd));
750
751 snd = snd_buf(1);
752 *snd.front().get_write() = 'a';
753 inputs.emplace_back("one byte, 128k of headroom", 128 * 1024, std::move(snd));
754
f67539c2
TL
755 auto gen_fill = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
756 auto buf = temporary_buffer<char>(s);
757 std::fill_n(buf.get_write(), s, 'a');
9f95a23c 758
f67539c2
TL
759 auto snd = snd_buf();
760 snd.size = s;
761 if (split) {
762 snd.bufs = split_buffer(buf.clone(), *split);
763 } else {
764 snd.bufs = buf.clone();
765 }
766 inputs.emplace_back(msg, 0, std::move(snd));
767 };
9f95a23c 768
f67539c2 769 gen_fill(16 * 1024, "single 16 kB buffer of \'a\'");
9f95a23c 770
f67539c2
TL
771 auto gen_rand = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
772 auto buf = temporary_buffer<char>(s);
773 std::generate_n(buf.get_write(), s, [&] { return dist(eng); });
9f95a23c 774
f67539c2
TL
775 auto snd = snd_buf();
776 snd.size = s;
777 if (split) {
778 snd.bufs = split_buffer(buf.clone(), *split);
779 } else {
780 snd.bufs = buf.clone();
781 }
782 inputs.emplace_back(msg, 0, std::move(snd));
783 };
9f95a23c 784
f67539c2 785 gen_rand(16 * 1024, "single 16 kB buffer of random");
9f95a23c 786
f67539c2
TL
787 auto gen_text = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
788 static const std::string_view text = "The quick brown fox wants bananas for his long term health but sneaks bacon behind his wife's back. ";
9f95a23c 789
f67539c2
TL
790 auto buf = temporary_buffer<char>(s);
791 size_t n = 0;
792 while (n < s) {
793 auto rem = std::min(s - n, text.size());
794 std::copy(text.data(), text.data() + rem, buf.get_write() + n);
795 n += rem;
796 }
9f95a23c 797
f67539c2
TL
798 auto snd = snd_buf();
799 snd.size = s;
800 if (split) {
801 snd.bufs = split_buffer(buf.clone(), *split);
802 } else {
803 snd.bufs = buf.clone();
804 }
805 inputs.emplace_back(msg, 0, std::move(snd));
806 };
9f95a23c 807
9f95a23c 808
f67539c2
TL
809 for (auto s : { 1, 4, 8 }) {
810 for (auto ss : { 32, 64, 128, 48, 56, 246, 511 }) {
811 gen_fill(s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB - {}", s, ss, ss), ss * 1024 - ss);
812 gen_fill(s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB", s, ss), ss * 1024);
813 gen_rand(s * 1024 * 1024, format("{} MB buffer of random split into {} kB", s, ss), ss * 1024);
9f95a23c 814
f67539c2
TL
815 gen_fill(s * 1024 * 1024 + 1, format("{} MB + 1B buffer of \'a\' split into {} kB", s, ss), ss * 1024);
816 gen_rand(s * 1024 * 1024 + 1, format("{} MB + 1B buffer of random split into {} kB", s, ss), ss * 1024);
817 }
9f95a23c 818
f67539c2
TL
819 for (auto ss : { 128, 246, 511, 3567, 2*1024, 8*1024 }) {
820 gen_fill(s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} B", s, ss), ss);
821 gen_rand(s * 1024 * 1024, format("{} MB buffer of random split into {} B", s, ss), ss);
822 gen_text(s * 1024 * 1024, format("{} MB buffer of text split into {} B", s, ss), ss);
823 gen_fill(s * 1024 * 1024 - ss, format("{} MB - {}B buffer of \'a\' split into {} B", s, ss, ss), ss);
824 gen_rand(s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss);
825 gen_text(s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss);
826 }
827 }
9f95a23c 828
f67539c2
TL
829 for (auto s : { 64*1024 + 5670, 16*1024 + 3421, 32*1024 - 321 }) {
830 gen_fill(s, format("{} bytes buffer of \'a\'", s));
831 gen_rand(s, format("{} bytes buffer of random", s));
832 gen_text(s, format("{} bytes buffer of text", s));
833 }
9f95a23c
TL
834
835 std::vector<std::tuple<sstring, std::function<rcv_buf(snd_buf)>>> transforms {
836 { "identity", [] (snd_buf snd) {
837 rcv_buf rcv;
838 rcv.size = snd.size;
839 rcv.bufs = std::move(snd.bufs);
840 return rcv;
841 } },
842 { "linearized", [&linearize] (snd_buf snd) {
843 rcv_buf rcv;
844 rcv.size = snd.size;
845 rcv.bufs = linearize(snd);
846 return rcv;
847 } },
848 { "split 1 B", [&] (snd_buf snd) {
849 rcv_buf rcv;
850 rcv.size = snd.size;
851 rcv.bufs = split_buffer(linearize(snd), 1);
852 return rcv;
853 } },
854 { "split 129 B", [&] (snd_buf snd) {
855 rcv_buf rcv;
856 rcv.size = snd.size;
857 rcv.bufs = split_buffer(linearize(snd), 129);
858 return rcv;
859 } },
860 { "split 4 kB", [&] (snd_buf snd) {
861 rcv_buf rcv;
862 rcv.size = snd.size;
863 rcv.bufs = split_buffer(linearize(snd), 4096);
864 return rcv;
865 } },
866 { "split 4 kB - 128", [&] (snd_buf snd) {
867 rcv_buf rcv;
868 rcv.size = snd.size;
869 rcv.bufs = split_buffer(linearize(snd), 4096 - 128);
870 return rcv;
871 } },
872 };
873
874 auto sanity_check = [&] (const auto& buffer) {
875 auto actual_size = seastar::visit(buffer.bufs,
876 [] (const temporary_buffer<char>& buf) {
877 return buf.size();
878 },
879 [] (const std::vector<temporary_buffer<char>>& bufs) {
880 return boost::accumulate(bufs, size_t(0), [] (size_t sz, const temporary_buffer<char>& buf) {
881 return sz + buf.size();
882 });
883 }
884 );
885 BOOST_CHECK_EQUAL(actual_size, buffer.size);
886 };
887
888 for (auto& in : inputs) {
889 BOOST_TEST_MESSAGE("Input: " << std::get<0>(in));
890 auto headroom = std::get<1>(in);
891 auto compressed = compressor->compress(headroom, clone(std::get<2>(in)));
892 sanity_check(compressed);
893
894 // Remove headroom
895 BOOST_CHECK_GE(compressed.size, headroom);
896 compressed.size -= headroom;
897 seastar::visit(compressed.bufs,
898 [&] (temporary_buffer<char>& buf) {
899 BOOST_CHECK_GE(buf.size(), headroom);
900 buf.trim_front(headroom);
901 },
902 [&] (std::vector<temporary_buffer<char>>& bufs) {
903 while (headroom) {
904 BOOST_CHECK(!bufs.empty());
905 auto to_remove = std::min(bufs.front().size(), headroom);
906 bufs.front().trim_front(to_remove);
907 if (bufs.front().empty() && bufs.size() > 1) {
908 bufs.erase(bufs.begin());
909 }
910 headroom -= to_remove;
11fdf7f2 911 }
9f95a23c
TL
912 }
913 );
914
915 auto in_l = linearize(std::get<2>(in));
916
917 for (auto& t : transforms) {
918 BOOST_TEST_MESSAGE(" Transform: " << std::get<0>(t));
919 auto received = std::get<1>(t)(clone(compressed));
920
921 auto decompressed = compressor->decompress(std::move(received));
922 sanity_check(decompressed);
923
924 BOOST_CHECK_EQUAL(decompressed.size, std::get<2>(in).size);
925
926 auto out_l = linearize(decompressed);
927
928 BOOST_CHECK_EQUAL(in_l.size(), out_l.size());
929 BOOST_CHECK(in_l == out_l);
930 }
931 }
932}
933
934SEASTAR_THREAD_TEST_CASE(test_lz4_compressor) {
935 test_compressor([] { return std::make_unique<rpc::lz4_compressor>(); });
936}
937
938SEASTAR_THREAD_TEST_CASE(test_lz4_fragmented_compressor) {
939 test_compressor([] { return std::make_unique<rpc::lz4_fragmented_compressor>(); });
940}
941
942// Test reproducing issue #671: If timeout is time_point::max(), translating
943// it to relative timeout in the sender and then back in the receiver, when
944// these calculations happen across a millisecond boundary, overflowed the
945// integer and mislead the receiver to think the requested timeout was
946// negative, and cause it drop its response, so the RPC call never completed.
947SEASTAR_TEST_CASE(test_max_absolute_timeout) {
948 // The typical failure of this test is a hang. So we use semaphore to
949 // stop the test either when it succeeds, or after a long enough hang.
950 auto success = make_lw_shared<bool>(false);
951 auto done = make_lw_shared<semaphore>(0);
952 auto abrt = make_lw_shared<abort_source>();
953 (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] {
954 done->signal(1);
955 }).handle_exception([] (std::exception_ptr) {});
956 rpc::client_options co;
957 co.send_timeout_data = 1;
958 (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
959 env.register_handler(1, [](int a, int b) {
960 return make_ready_future<int>(a+b);
961 }).get();
962 auto sum = env.proto().make_client<int (int, int)>(1);
963 // The bug only reproduces if the calculation done on the sender
964 // and receiver sides, happened across a millisecond boundary.
965 // We can't control when it happens, so we just need to loop many
966 // times, at least many milliseconds, to increase the probability
967 // that we catch the bug. Experimentally, if we loop for 200ms, we
968 // catch the bug in #671 virtually every time.
969 auto until = seastar::lowres_clock::now() + std::chrono::milliseconds(200);
970 while (seastar::lowres_clock::now() <= until) {
971 auto result = sum(c1, rpc::rpc_clock_type::time_point::max(), 2, 3).get0();
972 BOOST_REQUIRE_EQUAL(result, 2 + 3);
973 }
974 }).then([success, done, abrt] {
975 *success = true;
976 abrt->request_abort();
977 done->signal();
978 });
979 return done->wait().then([done, success] {
980 BOOST_REQUIRE(*success);
981 });
982}
983
984// Similar to the above test: Test that a relative timeout duration::max()
985// also works, and again doesn't cause the timeout wrapping around to the
986// past and causing dropped responses.
987SEASTAR_TEST_CASE(test_max_relative_timeout) {
988 // The typical failure of this test is a hang. So we use semaphore to
989 // stop the test either when it succeeds, or after a long enough hang.
990 auto success = make_lw_shared<bool>(false);
991 auto done = make_lw_shared<semaphore>(0);
992 auto abrt = make_lw_shared<abort_source>();
993 (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] {
994 done->signal(1);
995 }).handle_exception([] (std::exception_ptr) {});
996 rpc::client_options co;
997 co.send_timeout_data = 1;
998 (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
999 env.register_handler(1, [](int a, int b) {
1000 return make_ready_future<int>(a+b);
1001 }).get();
1002 auto sum = env.proto().make_client<int (int, int)>(1);
1003 // The following call used to always hang, when max()+now()
1004 // overflowed and appeared to be a negative timeout.
1005 auto result = sum(c1, rpc::rpc_clock_type::duration::max(), 2, 3).get0();
1006 BOOST_REQUIRE_EQUAL(result, 2 + 3);
1007 }).then([success, done, abrt] {
1008 *success = true;
1009 abrt->request_abort();
1010 done->signal();
1011 });
1012 return done->wait().then([done, success] {
1013 BOOST_REQUIRE(*success);
1014 });
1015}
1016
1017SEASTAR_TEST_CASE(test_rpc_tuple) {
1018 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1019 env.register_handler(1, [] () {
1020 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L));
1021 }).get();
1022 auto f1 = env.proto().make_client<rpc::tuple<int, long> ()>(1);
1023 auto result = f1(c1).get0();
1024 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1025 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1026 });
1027}
1028
1029SEASTAR_TEST_CASE(test_rpc_nonvariadic_client_variadic_server) {
1030 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1031 // Server is variadic
1032 env.register_handler(1, [] () {
f67539c2 1033 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple(1, 0x7'0000'0000L));
9f95a23c
TL
1034 }).get();
1035 // Client is non-variadic
1036 auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1);
1037 auto result = f1(c1).get0();
1038 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1039 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1040 });
1041}
1042
1043SEASTAR_TEST_CASE(test_rpc_variadic_client_nonvariadic_server) {
1044 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1045 // Server is nonvariadic
1046 env.register_handler(1, [] () {
1047 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L));
1048 }).get();
1049 // Client is variadic
f67539c2
TL
1050 auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1);
1051 auto result = f1(c1).get0();
9f95a23c
TL
1052 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1053 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1054 });
1055}
1056
1057SEASTAR_TEST_CASE(test_handler_registration) {
1058 rpc_test_config cfg;
1059 cfg.connect = false;
1060 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) {
1061 auto& proto = env.proto();
1062
f67539c2
TL
1063 // new proto must be empty
1064 BOOST_REQUIRE(!proto.has_handlers());
1065
9f95a23c
TL
1066 // non-existing handler should not be found
1067 BOOST_REQUIRE(!proto.has_handler(1));
1068
1069 // unregistered non-existing handler should return ready future
1070 auto fut = proto.unregister_handler(1);
1071 BOOST_REQUIRE(fut.available() && !fut.failed());
1072 fut.get();
1073
1074 // existing handler should be found
1075 auto handler = [] () { return make_ready_future<>(); };
1076 proto.register_handler(1, handler);
1077 BOOST_REQUIRE(proto.has_handler(1));
1078
1079 // cannot register handler if already registered
1080 BOOST_REQUIRE_THROW(proto.register_handler(1, handler), std::runtime_error);
1081
1082 // unregistered handler should not be found
1083 proto.unregister_handler(1).get();
1084 BOOST_REQUIRE(!proto.has_handler(1));
1085
1086 // re-registering a handler should succeed
1087 proto.register_handler(1, handler);
1088 BOOST_REQUIRE(proto.has_handler(1));
f67539c2
TL
1089
1090 // proto with handlers must not be empty
1091 BOOST_REQUIRE(proto.has_handlers());
9f95a23c
TL
1092 });
1093}
1094
1095SEASTAR_TEST_CASE(test_unregister_handler) {
1096 using namespace std::chrono_literals;
1097 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1098 promise<> handler_called;
1099 future<> f_handler_called = handler_called.get_future();
1100 bool rpc_executed = false;
1101 bool rpc_completed = false;
1102
1103 auto reset_state = [&f_handler_called, &rpc_executed, &rpc_completed] {
1104 if (f_handler_called.available()) {
1105 f_handler_called.get();
1106 }
1107 rpc_executed = false;
1108 rpc_completed = false;
1109 };
1110
1111 auto get_handler = [&handler_called, &rpc_executed, &rpc_completed] {
1112 return [&handler_called, &rpc_executed, &rpc_completed] {
1113 handler_called.set_value();
1114 rpc_executed = true;
1115 return sleep(1ms).then([&rpc_completed] {
1116 rpc_completed = true;
1117 });
1118 };
1119 };
1120
1121 // handler should not run if unregistered before being called
1122 env.register_handler(1, get_handler()).get();
1123 env.unregister_handler(1).get();
1124 BOOST_REQUIRE(!f_handler_called.available());
1125 BOOST_REQUIRE(!rpc_executed);
1126 BOOST_REQUIRE(!rpc_completed);
1127
1128 // verify normal execution path
1129 env.register_handler(1, get_handler()).get();
1130 auto call = env.proto().make_client<void ()>(1);
1131 call(c1).get();
1132 BOOST_REQUIRE(f_handler_called.available());
1133 BOOST_REQUIRE(rpc_executed);
1134 BOOST_REQUIRE(rpc_completed);
1135 reset_state();
1136
1137 // call should fail after handler is unregistered
1138 env.unregister_handler(1).get();
1139 try {
1140 call(c1).get();
1141 BOOST_REQUIRE(false);
1142 } catch (rpc::unknown_verb_error&) {
1143 // expected
1144 } catch (...) {
1145 std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl;
1146 BOOST_REQUIRE(false);
1147 }
1148 BOOST_REQUIRE(!f_handler_called.available());
1149 BOOST_REQUIRE(!rpc_executed);
1150 BOOST_REQUIRE(!rpc_completed);
1151
1152 // unregistered is allowed while call is in flight
1153 auto delayed_unregister = [&env] {
1154 return sleep(500us).then([&env] {
1155 return env.unregister_handler(1);
11fdf7f2 1156 });
9f95a23c
TL
1157 };
1158
1159 env.register_handler(1, get_handler()).get();
1160 try {
1161 when_all_succeed(call(c1), delayed_unregister()).get();
1162 reset_state();
1163 } catch (rpc::unknown_verb_error&) {
1164 // expected
1165 } catch (...) {
1166 std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl;
1167 BOOST_REQUIRE(false);
1168 }
1169 });
11fdf7f2 1170}
9f95a23c 1171
f67539c2
TL
1172SEASTAR_TEST_CASE(test_loggers) {
1173 static seastar::logger log("dummy");
1174 log.set_level(log_level::debug);
1175 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1176 socket_address dummy_addr;
1177 auto& proto = env.proto();
1178 auto& logger = proto.get_logger();
1179 logger(dummy_addr, "Hello0");
1180 logger(dummy_addr, log_level::debug, "Hello1");
1181 proto.set_logger(&log);
1182 logger(dummy_addr, "Hello2");
1183 logger(dummy_addr, log_level::debug, "Hello3");
1184 // We *want* to test the deprecated API, don't spam warnings about it.
1185#pragma GCC diagnostic push
1186#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1187 proto.set_logger([] (const sstring& str) {
1188 log.info("Test: {}", str);
1189 });
1190#pragma GCC diagnostic pop
1191 logger(dummy_addr, "Hello4");
1192 logger(dummy_addr, log_level::debug, "Hello5");
1193 proto.set_logger(nullptr);
1194 logger(dummy_addr, "Hello6");
1195 logger(dummy_addr, log_level::debug, "Hello7");
1196 });
1197}
9f95a23c
TL
1198
1199static_assert(std::is_same_v<decltype(rpc::tuple(1U, 1L)), rpc::tuple<unsigned, long>>, "rpc::tuple deduction guid not working");