]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/tests/unit/rpc_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / tests / unit / rpc_test.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 ScyllaDB
20 */
21
11fdf7f2
TL
22#include "loopback_socket.hh"
23#include <seastar/rpc/rpc.hh>
24#include <seastar/rpc/rpc_types.hh>
25#include <seastar/rpc/lz4_compressor.hh>
9f95a23c 26#include <seastar/rpc/lz4_fragmented_compressor.hh>
11fdf7f2
TL
27#include <seastar/rpc/multi_algo_compressor_factory.hh>
28#include <seastar/testing/test_case.hh>
29#include <seastar/testing/thread_test_case.hh>
9f95a23c 30#include <seastar/testing/test_runner.hh>
11fdf7f2
TL
31#include <seastar/core/thread.hh>
32#include <seastar/core/sleep.hh>
9f95a23c 33#include <seastar/core/distributed.hh>
f67539c2 34#include <seastar/core/loop.hh>
11fdf7f2 35#include <seastar/util/defer.hh>
f67539c2 36#include <seastar/util/log.hh>
20effc67
TL
37#include <seastar/util/closeable.hh>
38#include <seastar/util/noncopyable_function.hh>
11fdf7f2
TL
39
40using namespace seastar;
41
42struct serializer {
43};
44
45template <typename T, typename Output>
46inline
47void write_arithmetic_type(Output& out, T v) {
48 static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
49 return out.write(reinterpret_cast<const char*>(&v), sizeof(T));
50}
51
52template <typename T, typename Input>
53inline
54T read_arithmetic_type(Input& in) {
55 static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
56 T v;
57 in.read(reinterpret_cast<char*>(&v), sizeof(T));
58 return v;
59}
60
61template <typename Output>
62inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); }
63template <typename Output>
64inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); }
65template <typename Output>
66inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); }
67template <typename Output>
68inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); }
69template <typename Output>
70inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); }
71template <typename Input>
72inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); }
73template <typename Input>
74inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); }
75template <typename Input>
76inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); }
77template <typename Input>
78inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); }
79template <typename Input>
80inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); }
81
82template <typename Output>
83inline void write(serializer, Output& out, const sstring& v) {
84 write_arithmetic_type(out, uint32_t(v.size()));
85 out.write(v.c_str(), v.size());
86}
87
88template <typename Input>
89inline sstring read(serializer, Input& in, rpc::type<sstring>) {
90 auto size = read_arithmetic_type<uint32_t>(in);
f67539c2
TL
91 sstring ret = uninitialized_string(size);
92 in.read(ret.data(), size);
11fdf7f2
TL
93 return ret;
94}
95
96using test_rpc_proto = rpc::protocol<serializer>;
97using make_socket_fn = std::function<seastar::socket ()>;
98
1e59de90
TL
99class rpc_loopback_error_injector : public loopback_error_injector {
100public:
101 struct config {
102 struct {
103 int limit = 0;
104 error kind = error::none;
105 private:
106 friend class rpc_loopback_error_injector;
107 int _x = 0;
108 error inject() {
109 return _x++ >= limit ? kind : error::none;
110 }
111 } server_rcv = {}, server_snd = {}, client_rcv = {}, client_snd = {};
112 error connect_kind = error::none;
113 };
114private:
115 config _cfg;
116public:
117 rpc_loopback_error_injector(config cfg) : _cfg(std::move(cfg)) {}
118
119 error server_rcv_error() override {
120 return _cfg.server_rcv.inject();
121 }
122
123 error server_snd_error() override {
124 return _cfg.server_snd.inject();
125 }
126
127 error client_rcv_error() override {
128 return _cfg.client_rcv.inject();
129 }
130
131 error client_snd_error() override {
132 return _cfg.client_snd.inject();
133 }
134
135 error connect_error() override {
136 return _cfg.connect_kind;
11fdf7f2
TL
137 }
138};
139
140class rpc_socket_impl : public ::net::socket_impl {
11fdf7f2 141 rpc_loopback_error_injector _error_injector;
1e59de90 142 loopback_socket_impl _socket;
11fdf7f2 143public:
1e59de90
TL
144 rpc_socket_impl(loopback_connection_factory& factory, std::optional<rpc_loopback_error_injector::config> inject_error)
145 :
146 _error_injector(inject_error.value_or(rpc_loopback_error_injector::config{})),
11fdf7f2
TL
147 _socket(factory, inject_error ? &_error_injector : nullptr) {
148 }
149 virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override {
1e59de90 150 return _socket.connect(sa, local, proto);
11fdf7f2 151 }
9f95a23c
TL
152 virtual void set_reuseaddr(bool reuseaddr) override {}
153 virtual bool get_reuseaddr() const override { return false; };
11fdf7f2 154 virtual void shutdown() override {
1e59de90 155 _socket.shutdown();
11fdf7f2
TL
156 }
157};
158
9f95a23c
TL
159struct rpc_test_config {
160 rpc::resource_limits resource_limits = {};
161 rpc::server_options server_options = {};
1e59de90 162 std::optional<rpc_loopback_error_injector::config> inject_error;
9f95a23c
TL
163};
164
165template<typename MsgType = int>
166class rpc_test_env {
167 struct rpc_test_service {
168 test_rpc_proto _proto;
169 test_rpc_proto::server _server;
170 std::vector<MsgType> _handlers;
171
172 rpc_test_service() = delete;
173 explicit rpc_test_service(const rpc_test_config& cfg, loopback_connection_factory& lcf)
174 : _proto(serializer())
175 , _server(_proto, cfg.server_options, lcf.get_server_socket(), cfg.resource_limits)
176 { }
177
178 test_rpc_proto& proto() {
179 return _proto;
180 }
181
182 test_rpc_proto::server& server() {
183 return _server;
184 }
185
186 future<> stop() {
187 return parallel_for_each(_handlers, [this] (auto t) {
188 return proto().unregister_handler(t);
189 }).finally([this] {
190 return server().stop();
191 });
192 }
193
194 template<typename Func>
195 auto register_handler(MsgType t, scheduling_group sg, Func func) {
196 _handlers.emplace_back(t);
197 return proto().register_handler(t, sg, std::move(func));
198 }
199
200 future<> unregister_handler(MsgType t) {
201 auto it = std::find(_handlers.begin(), _handlers.end(), t);
202 assert(it != _handlers.end());
203 _handlers.erase(it);
204 return proto().unregister_handler(t);
205 }
11fdf7f2 206 };
9f95a23c
TL
207
208 rpc_test_config _cfg;
209 loopback_connection_factory _lcf;
f67539c2 210 std::unique_ptr<sharded<rpc_test_service>> _service;
9f95a23c
TL
211
212public:
213 rpc_test_env() = delete;
214 explicit rpc_test_env(rpc_test_config cfg)
f67539c2 215 : _cfg(cfg), _service(std::make_unique<sharded<rpc_test_service>>())
9f95a23c
TL
216 {
217 }
218
219 using test_fn = std::function<future<> (rpc_test_env<MsgType>& env)>;
220 static future<> do_with(rpc_test_config cfg, test_fn&& func) {
221 return seastar::do_with(rpc_test_env(cfg), [func] (rpc_test_env<MsgType>& env) {
222 return env.start().then([&env, func] {
223 return func(env);
224 }).finally([&env] {
225 return env.stop();
11fdf7f2
TL
226 });
227 });
9f95a23c
TL
228 }
229
230 using thread_test_fn = std::function<void (rpc_test_env<MsgType>& env)>;
231 static future<> do_with_thread(rpc_test_config cfg, thread_test_fn&& func) {
232 return do_with(std::move(cfg), [func] (rpc_test_env<MsgType>& env) {
233 return seastar::async([&env, func] {
234 func(env);
235 });
236 });
237 }
238
239 using thread_test_fn_with_client = std::function<void (rpc_test_env<MsgType>& env, test_rpc_proto::client& cl)>;
240 static future<> do_with_thread(rpc_test_config cfg, rpc::client_options co, thread_test_fn_with_client&& func) {
241 return do_with(std::move(cfg), [func, co = std::move(co)] (rpc_test_env<MsgType>& env) {
242 return seastar::async([&env, func, co = std::move(co)] {
243 test_rpc_proto::client cl(env.proto(), co, env.make_socket(), ipv4_addr());
20effc67 244 auto stop = deferred_stop(cl);
9f95a23c
TL
245 func(env, cl);
246 });
247 });
248 }
249
250 static future<> do_with_thread(rpc_test_config cfg, thread_test_fn_with_client&& func) {
251 return do_with_thread(std::move(cfg), rpc::client_options(), std::move(func));
252 }
253
254 auto make_socket() {
1e59de90 255 return seastar::socket(std::make_unique<rpc_socket_impl>(_lcf, _cfg.inject_error));
9f95a23c
TL
256 };
257
258 test_rpc_proto& proto() {
259 return local_service().proto();
260 }
261
262 test_rpc_proto::server& server() {
263 return local_service().server();
264 }
265
266 template<typename Func>
267 future<> register_handler(MsgType t, scheduling_group sg, Func func) {
f67539c2 268 return _service->invoke_on_all([t, func = std::move(func), sg] (rpc_test_service& s) mutable {
9f95a23c
TL
269 s.register_handler(t, sg, std::move(func));
270 });
271 }
272
273 template<typename Func>
274 future<> register_handler(MsgType t, Func func) {
275 return register_handler(t, scheduling_group(), std::move(func));
276 }
277
278 future<> unregister_handler(MsgType t) {
f67539c2 279 return _service->invoke_on_all([t] (rpc_test_service& s) mutable {
9f95a23c
TL
280 return s.unregister_handler(t);
281 });
282 }
283
284private:
285 rpc_test_service& local_service() {
f67539c2
TL
286 return _service->local();
287
9f95a23c
TL
288 }
289
290 future<> start() {
f67539c2 291 return _service->start(std::cref(_cfg), std::ref(_lcf));
9f95a23c
TL
292 }
293
294 future<> stop() {
f67539c2 295 return _service->stop().then([this] {
9f95a23c
TL
296 return _lcf.destroy_all_shards();
297 });
298 }
299};
11fdf7f2
TL
300
301struct cfactory : rpc::compressor::factory {
302 mutable int use_compression = 0;
303 const sstring name;
304 cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {}
305 const sstring& supported() const override {
306 return name;
307 }
f67539c2
TL
308 class mylz4 : public rpc::lz4_compressor {
309 sstring _name;
310 public:
311 mylz4(const sstring& n) : _name(n) {}
312 sstring name() const override {
313 return _name;
314 }
315 };
11fdf7f2
TL
316 std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override {
317 if (feature == name) {
318 use_compression++;
f67539c2 319 return std::make_unique<mylz4>(name);
11fdf7f2
TL
320 } else {
321 return nullptr;
322 }
323 }
324};
9f95a23c 325
11fdf7f2
TL
326SEASTAR_TEST_CASE(test_rpc_connect) {
327 std::vector<future<>> fs;
328
329 for (auto i = 0; i < 2; i++) {
330 for (auto j = 0; j < 4; j++) {
331 auto factory = std::make_unique<cfactory>();
332 rpc::server_options so;
333 rpc::client_options co;
334 if (i == 1) {
335 so.compressor_factory = factory.get();
336 }
337 if (j & 1) {
338 co.compressor_factory = factory.get();
339 }
340 co.send_timeout_data = j & 2;
9f95a23c
TL
341 rpc_test_config cfg;
342 cfg.server_options = so;
343 auto f = rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
344 env.register_handler(1, [](int a, int b) {
345 return make_ready_future<int>(a+b);
346 }).get();
347 auto sum = env.proto().make_client<int (int, int)>(1);
348 auto result = sum(c1, 2, 3).get0();
349 BOOST_REQUIRE_EQUAL(result, 2 + 3);
11fdf7f2
TL
350 }).handle_exception([] (auto ep) {
351 BOOST_FAIL("No exception expected");
352 }).finally([factory = std::move(factory), i, j = j & 1] {
353 if (i == 1 && j == 1) {
354 BOOST_REQUIRE_EQUAL(factory->use_compression, 2);
355 } else {
356 BOOST_REQUIRE_EQUAL(factory->use_compression, 0);
357 }
358 });
359 fs.emplace_back(std::move(f));
360 }
361 }
362 return when_all(fs.begin(), fs.end()).discard_result();
363}
364
365SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) {
366 auto factory1 = std::make_unique<cfactory>();
367 auto factory2 = std::make_unique<cfactory>("LZ4NEW");
368 rpc::server_options so;
369 rpc::client_options co;
370 static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()});
371 static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()});
372 so.compressor_factory = &server;
373 co.compressor_factory = &client;
9f95a23c
TL
374 rpc_test_config cfg;
375 cfg.server_options = so;
376 return rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
377 env.register_handler(1, [](int a, int b) {
378 return make_ready_future<int>(a+b);
379 }).get();
380 auto sum = env.proto().make_client<int (int, int)>(1);
381 auto result = sum(c1, 2, 3).get0();
382 BOOST_REQUIRE_EQUAL(result, 2 + 3);
11fdf7f2
TL
383 }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] {
384 BOOST_REQUIRE_EQUAL(factory1->use_compression, 0);
385 BOOST_REQUIRE_EQUAL(factory2->use_compression, 2);
386 });
387}
388
389SEASTAR_TEST_CASE(test_rpc_connect_abort) {
9f95a23c 390 rpc_test_config cfg;
1e59de90
TL
391 rpc_loopback_error_injector::config ecfg;
392 ecfg.connect_kind = loopback_error_injector::error::abort;
393 cfg.inject_error = ecfg;
9f95a23c
TL
394 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) {
395 test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr());
396 env.register_handler(1, []() { return make_ready_future<>(); }).get();
397 auto f = env.proto().make_client<void ()>(1);
1e59de90 398 auto fut = f(c1);
9f95a23c 399 c1.stop().get0();
1e59de90
TL
400 try {
401 fut.get0();
402 BOOST_REQUIRE(false);
403 } catch (...) {}
9f95a23c
TL
404 try {
405 f(c1).get0();
406 BOOST_REQUIRE(false);
407 } catch (...) {}
11fdf7f2
TL
408 });
409}
410
411SEASTAR_TEST_CASE(test_rpc_cancel) {
412 using namespace std::chrono_literals;
9f95a23c
TL
413 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
414 bool rpc_executed = false;
415 int good = 0;
416 promise<> handler_called;
417 future<> f_handler_called = handler_called.get_future();
418 env.register_handler(1, [&rpc_executed, &handler_called] {
419 handler_called.set_value(); rpc_executed = true; return sleep(1ms);
420 }).get();
421 auto call = env.proto().make_client<void ()>(1);
1e59de90 422 promise<> cont;
9f95a23c 423 rpc::cancellable cancel;
1e59de90 424 c1.suspend_for_testing(cont);
9f95a23c
TL
425 auto f = call(c1, cancel);
426 // cancel send side
427 cancel.cancel();
1e59de90 428 cont.set_value();
9f95a23c
TL
429 try {
430 f.get();
431 } catch(rpc::canceled_error&) {
432 good += !rpc_executed;
433 };
434 f = call(c1, cancel);
435 // cancel wait side
436 f_handler_called.then([&cancel] {
11fdf7f2 437 cancel.cancel();
9f95a23c
TL
438 }).get();
439 try {
440 f.get();
441 } catch(rpc::canceled_error&) {
442 good += 10*rpc_executed;
443 };
444 BOOST_REQUIRE_EQUAL(good, 11);
11fdf7f2
TL
445 });
446}
447
448SEASTAR_TEST_CASE(test_message_to_big) {
9f95a23c
TL
449 rpc_test_config cfg;
450 cfg.resource_limits = {0, 1, 100};
451 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env, test_rpc_proto::client& c) {
452 bool good = true;
453 env.register_handler(1, [&] (sstring payload) mutable {
454 good = false;
455 }).get();
456 auto call = env.proto().make_client<void (sstring)>(1);
457 try {
f67539c2 458 call(c, uninitialized_string(101)).get();
9f95a23c
TL
459 good = false;
460 } catch(std::runtime_error& err) {
461 } catch(...) {
462 good = false;
463 }
464 BOOST_REQUIRE_EQUAL(good, true);
11fdf7f2
TL
465 });
466}
11fdf7f2 467
1e59de90
TL
468SEASTAR_TEST_CASE(test_rpc_remote_verb_error) {
469 rpc_test_config cfg;
470 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) {
471 test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr());
472 env.register_handler(1, []() { throw std::runtime_error("error"); }).get();
473 auto f = env.proto().make_client<void ()>(1);
474 BOOST_REQUIRE_THROW(f(c1).get0(), rpc::remote_verb_error);
475 c1.stop().get0();
476 });
477}
478
11fdf7f2
TL
479struct stream_test_result {
480 bool client_source_closed = false;
481 bool server_source_closed = false;
482 bool sink_exception = false;
483 bool sink_close_exception = false;
484 bool source_done_exception = false;
485 bool server_done_exception = false;
486 bool client_stop_exception = false;
487 int server_sum = 0;
1e59de90 488 bool exception_while_creating_sink = false;
11fdf7f2
TL
489};
490
f67539c2
TL
491future<stream_test_result> stream_test_func(rpc_test_env<>& env, bool stop_client, bool expect_connection_error = false) {
492 return seastar::async([&env, stop_client, expect_connection_error] {
11fdf7f2 493 stream_test_result r;
9f95a23c 494 test_rpc_proto::client c(env.proto(), {}, env.make_socket(), ipv4_addr());
11fdf7f2 495 future<> server_done = make_ready_future();
9f95a23c 496 env.register_handler(1, [&](int i, rpc::source<int> source) {
11fdf7f2
TL
497 BOOST_REQUIRE_EQUAL(i, 666);
498 auto sink = source.make_sink<serializer, sstring>();
499 auto sink_loop = seastar::async([sink] () mutable {
500 for (auto i = 0; i < 100; i++) {
501 sink("seastar").get();
502 sleep(std::chrono::milliseconds(1)).get();
503 }
f67539c2
TL
504 }).finally([sink] () mutable {
505 return sink.flush();
506 }).finally([sink] () mutable {
507 return sink.close();
508 }).finally([sink] {});
509
11fdf7f2
TL
510 auto source_loop = seastar::async([source, &r] () mutable {
511 while (!r.server_source_closed) {
512 auto data = source().get0();
513 if (data) {
514 r.server_sum += std::get<0>(*data);
515 } else {
516 r.server_source_closed = true;
9f95a23c
TL
517 try {
518 // check that reading after eos does not crash
519 // and throws correct exception
520 source().get();
521 } catch (rpc::stream_closed& ex) {
522 // expected
523 } catch (...) {
524 BOOST_FAIL("wrong exception on reading from a stream after eos");
525 }
11fdf7f2
TL
526 }
527 }
528 });
529 server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result();
530 return sink;
9f95a23c
TL
531 }).get();
532 auto call = env.proto().make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1);
1e59de90 533 struct failed_to_create_sync{};
11fdf7f2
TL
534 auto x = [&] {
535 try {
9f95a23c 536 return c.make_stream_sink<serializer, int>(env.make_socket()).get0();
11fdf7f2
TL
537 } catch (...) {
538 c.stop().get();
1e59de90 539 throw failed_to_create_sync{};
11fdf7f2
TL
540 }
541 };
1e59de90
TL
542 try {
543 auto sink = x();
544 auto source = call(c, 666, sink).get0();
545 auto source_done = seastar::async([&] {
546 while (!r.client_source_closed) {
547 auto data = source().get0();
548 if (data) {
549 BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar");
550 } else {
551 r.client_source_closed = true;
552 }
553 }
554 });
555 auto check_exception = [] (auto f) {
556 try {
557 f.get();
558 } catch (...) {
559 return true;
560 }
561 return false;
562 };
563 future<> stop_client_future = make_ready_future();
564 // With a connection error sink() will eventually fail, but we
565 // cannot guarantee when.
566 int max = expect_connection_error ? std::numeric_limits<int>::max() : 101;
567 for (int i = 1; i < max; i++) {
568 if (stop_client && i == 50) {
569 // stop client while stream is in use
570 stop_client_future = c.stop();
571 }
572 sleep(std::chrono::milliseconds(1)).get();
573 r.sink_exception = check_exception(sink(i));
574 if (r.sink_exception) {
575 break;
11fdf7f2
TL
576 }
577 }
1e59de90
TL
578 r.sink_close_exception = check_exception(sink.close());
579 r.source_done_exception = check_exception(std::move(source_done));
580 r.server_done_exception = check_exception(std::move(server_done));
581 r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future));
582 return r;
583 } catch(failed_to_create_sync&) {
584 r.exception_while_creating_sink = true;
585 return r;
11fdf7f2 586 }
11fdf7f2
TL
587 });
588}
589
590SEASTAR_TEST_CASE(test_stream_simple) {
591 rpc::server_options so;
592 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
593 rpc_test_config cfg;
594 cfg.server_options = so;
595 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
596 return stream_test_func(env, false).then([] (stream_test_result r) {
597 BOOST_REQUIRE(r.client_source_closed);
598 BOOST_REQUIRE(r.server_source_closed);
599 BOOST_REQUIRE(r.server_sum == 5050);
600 BOOST_REQUIRE(!r.sink_exception);
601 BOOST_REQUIRE(!r.sink_close_exception);
602 BOOST_REQUIRE(!r.source_done_exception);
603 BOOST_REQUIRE(!r.server_done_exception);
604 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
605 });
606 });
607}
608
609SEASTAR_TEST_CASE(test_stream_stop_client) {
610 rpc::server_options so;
611 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
612 rpc_test_config cfg;
613 cfg.server_options = so;
614 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
615 return stream_test_func(env, true).then([] (stream_test_result r) {
616 BOOST_REQUIRE(!r.client_source_closed);
617 BOOST_REQUIRE(!r.server_source_closed);
618 BOOST_REQUIRE(r.sink_exception);
619 BOOST_REQUIRE(r.sink_close_exception);
620 BOOST_REQUIRE(r.source_done_exception);
621 BOOST_REQUIRE(r.server_done_exception);
622 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
623 });
624 });
625}
626
627
628SEASTAR_TEST_CASE(test_stream_connection_error) {
629 rpc::server_options so;
630 so.streaming_domain = rpc::streaming_domain_type(1);
9f95a23c
TL
631 rpc_test_config cfg;
632 cfg.server_options = so;
1e59de90
TL
633 rpc_loopback_error_injector::config ecfg;
634 ecfg.server_rcv.limit = 50;
635 ecfg.server_rcv.kind = loopback_error_injector::error::abort;
636 cfg.inject_error = ecfg;
9f95a23c 637 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
f67539c2 638 return stream_test_func(env, false, true).then([] (stream_test_result r) {
9f95a23c
TL
639 BOOST_REQUIRE(!r.client_source_closed);
640 BOOST_REQUIRE(!r.server_source_closed);
641 BOOST_REQUIRE(r.sink_exception);
642 BOOST_REQUIRE(r.sink_close_exception);
643 BOOST_REQUIRE(r.source_done_exception);
644 BOOST_REQUIRE(r.server_done_exception);
645 BOOST_REQUIRE(!r.client_stop_exception);
11fdf7f2
TL
646 });
647 });
648}
649
1e59de90
TL
650SEASTAR_TEST_CASE(test_stream_negotiation_error) {
651 rpc::server_options so;
652 so.streaming_domain = rpc::streaming_domain_type(1);
653 rpc_test_config cfg;
654 cfg.server_options = so;
655 rpc_loopback_error_injector::config ecfg;
656 ecfg.server_rcv.limit = 0;
657 ecfg.server_rcv.kind = loopback_error_injector::error::abort;
658 cfg.inject_error = ecfg;
659 return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) {
660 return stream_test_func(env, false, true).then([] (stream_test_result r) {
661 BOOST_REQUIRE(r.exception_while_creating_sink);
662 });
663 });
664}
665
11fdf7f2 666SEASTAR_TEST_CASE(test_rpc_scheduling) {
9f95a23c
TL
667 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
668 auto sg = create_scheduling_group("rpc", 100).get0();
669 env.register_handler(1, sg, [] () {
670 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
671 }).get();
672 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
673 auto id = call_sg_id(c1).get0();
674 BOOST_REQUIRE(id == internal::scheduling_group_index(sg));
11fdf7f2
TL
675 });
676}
677
678SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) {
679 auto sg1 = create_scheduling_group("sg1", 100).get0();
20effc67 680 auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); });
11fdf7f2 681 auto sg2 = create_scheduling_group("sg2", 100).get0();
20effc67 682 auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); });
11fdf7f2
TL
683 rpc::resource_limits limits;
684 limits.isolate_connection = [sg1, sg2] (sstring cookie) {
685 auto sg = current_scheduling_group();
686 if (cookie == "sg1") {
687 sg = sg1;
688 } else if (cookie == "sg2") {
689 sg = sg2;
690 }
691 rpc::isolation_config cfg;
692 cfg.sched_group = sg;
693 return cfg;
694 };
9f95a23c
TL
695 rpc_test_config cfg;
696 cfg.resource_limits = limits;
697 rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) {
698 rpc::client_options co1;
699 co1.isolation_cookie = "sg1";
700 test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr());
701 rpc::client_options co2;
702 co2.isolation_cookie = "sg2";
703 test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr());
704 env.register_handler(1, [] {
705 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
706 }).get();
707 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
708 unsigned id;
709 id = call_sg_id(c1).get0();
710 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
711 id = call_sg_id(c2).get0();
712 BOOST_REQUIRE(id == internal::scheduling_group_index(sg2));
713 c1.stop().get();
714 c2.stop().get();
11fdf7f2
TL
715 }).get();
716}
717
718SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) {
719 auto sg1 = create_scheduling_group("sg1", 100).get0();
20effc67 720 auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); });
11fdf7f2 721 auto sg2 = create_scheduling_group("sg2", 100).get0();
20effc67 722 auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); });
11fdf7f2
TL
723 rpc::resource_limits limits;
724 limits.isolate_connection = [sg1, sg2] (sstring cookie) {
725 auto sg = current_scheduling_group();
726 if (cookie == "sg1") {
727 sg = sg1;
728 } else if (cookie == "sg2") {
729 sg = sg2;
730 }
731 rpc::isolation_config cfg;
732 cfg.sched_group = sg;
733 return cfg;
734 };
9f95a23c
TL
735 rpc_test_config cfg;
736 cfg.resource_limits = limits;
737 rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) {
738 rpc::client_options co1;
739 co1.isolation_cookie = "sg1";
740 test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr());
741 rpc::client_options co2;
742 co2.isolation_cookie = "sg2";
743 test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr());
744 // An old client, that doesn't have an isolation cookie
745 rpc::client_options co3;
746 test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr());
747 // A server that uses sg1 if the client is old
748 env.register_handler(1, sg1, [] () {
749 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
750 }).get();
751 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
752 unsigned id;
753 id = call_sg_id(c1).get0();
754 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
755 id = call_sg_id(c2).get0();
756 BOOST_REQUIRE(id == internal::scheduling_group_index(sg2));
757 id = call_sg_id(c3).get0();
758 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
759 c1.stop().get();
760 c2.stop().get();
761 c3.stop().get();
762 }).get();
763}
764
1e59de90
TL
765SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_async) {
766 scheduling_group sg1 = default_scheduling_group();
767 scheduling_group sg2 = default_scheduling_group();
768 auto sg1_kill = defer([&] () noexcept {
769 if (sg1 != default_scheduling_group()) {
770 destroy_scheduling_group(sg1).get();
771 }
772 });
773 auto sg2_kill = defer([&] () noexcept {
774 if (sg2 != default_scheduling_group()) {
775 destroy_scheduling_group(sg2).get();
776 }
777 });
778 rpc::resource_limits limits;
779 limits.isolate_connection = [&sg1, &sg2] (sstring cookie) {
780 future<seastar::scheduling_group> get_scheduling_group = make_ready_future<>().then([&sg1, &sg2, cookie] {
781 if (cookie == "sg1") {
782 if (sg1 == default_scheduling_group()) {
783 return create_scheduling_group("sg1", 100).then([&sg1] (seastar::scheduling_group sg) {
784 sg1 = sg;
785 return sg;
786 });
787 } else {
788 return make_ready_future<seastar::scheduling_group>(sg1);
789 }
790 } else if (cookie == "sg2") {
791 if (sg2 == default_scheduling_group()) {
792 return create_scheduling_group("sg2", 100).then([&sg2] (seastar::scheduling_group sg) {
793 sg2 = sg;
794 return sg;
795 });
796 } else {
797 return make_ready_future<seastar::scheduling_group>(sg2);
798 }
799 }
800 return make_ready_future<seastar::scheduling_group>(current_scheduling_group());
801 });
802
803 return get_scheduling_group.then([] (scheduling_group sg) {
804 rpc::isolation_config cfg;
805 cfg.sched_group = sg;
806 return cfg;
807 });
808 };
809 rpc_test_config cfg;
810 cfg.resource_limits = limits;
811 rpc_test_env<>::do_with_thread(cfg, [&sg1, &sg2] (rpc_test_env<>& env) {
812 rpc::client_options co1;
813 co1.isolation_cookie = "sg1";
814 test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr());
815 rpc::client_options co2;
816 co2.isolation_cookie = "sg2";
817 test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr());
818 env.register_handler(1, [] {
819 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
820 }).get();
821 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
822 unsigned id;
823 id = call_sg_id(c1).get0();
824 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
825 id = call_sg_id(c2).get0();
826 BOOST_REQUIRE(id == internal::scheduling_group_index(sg2));
827 c1.stop().get();
828 c2.stop().get();
829 }).get();
830}
831
832SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility_async) {
833 scheduling_group sg1 = default_scheduling_group();
834 scheduling_group sg2 = default_scheduling_group();
835 scheduling_group sg3 = create_scheduling_group("sg3", 100).get0();
836 auto sg1_kill = defer([&] () noexcept {
837 if (sg1 != default_scheduling_group()) {
838 destroy_scheduling_group(sg1).get();
839 }
840 });
841 auto sg2_kill = defer([&] () noexcept {
842 if (sg2 != default_scheduling_group()) {
843 destroy_scheduling_group(sg2).get();
844 }
845 });
846 auto sg3_kill = defer([&] () noexcept { destroy_scheduling_group(sg3).get(); });
847 rpc::resource_limits limits;
848 limits.isolate_connection = [&sg1, &sg2] (sstring cookie) {
849 future<seastar::scheduling_group> get_scheduling_group = make_ready_future<>().then([&sg1, &sg2, cookie] {
850 if (cookie == "sg1") {
851 if (sg1 == default_scheduling_group()) {
852 return create_scheduling_group("sg1", 100).then([&sg1] (seastar::scheduling_group sg) {
853 sg1 = sg;
854 return sg;
855 });
856 } else {
857 return make_ready_future<seastar::scheduling_group>(sg1);
858 }
859 } else if (cookie == "sg2") {
860 if (sg2 == default_scheduling_group()) {
861 return create_scheduling_group("sg2", 100).then([&sg2] (seastar::scheduling_group sg) {
862 sg2 = sg;
863 return sg;
864 });
865 } else {
866 return make_ready_future<seastar::scheduling_group>(sg2);
867 }
868 }
869 return make_ready_future<seastar::scheduling_group>(current_scheduling_group());
870 });
871
872 return get_scheduling_group.then([] (scheduling_group sg) {
873 rpc::isolation_config cfg;
874 cfg.sched_group = sg;
875 return cfg;
876 });
877 };
878 rpc_test_config cfg;
879 cfg.resource_limits = limits;
880 rpc_test_env<>::do_with_thread(cfg, [&sg1, &sg2, &sg3] (rpc_test_env<>& env) {
881 rpc::client_options co1;
882 co1.isolation_cookie = "sg1";
883 test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr());
884 rpc::client_options co2;
885 co2.isolation_cookie = "sg2";
886 test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr());
887 // An old client, that doesn't have an isolation cookie
888 rpc::client_options co3;
889 test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr());
890 // A server that uses sg3 if the client is old
891 env.register_handler(1, sg3, [] () {
892 return make_ready_future<unsigned>(internal::scheduling_group_index(current_scheduling_group()));
893 }).get();
894 auto call_sg_id = env.proto().make_client<unsigned ()>(1);
895 unsigned id;
896 id = call_sg_id(c1).get0();
897 BOOST_REQUIRE(id == internal::scheduling_group_index(sg1));
898 id = call_sg_id(c2).get0();
899 BOOST_REQUIRE(id == internal::scheduling_group_index(sg2));
900 id = call_sg_id(c3).get0();
901 BOOST_REQUIRE(id == internal::scheduling_group_index(sg3));
902 c1.stop().get();
903 c2.stop().get();
904 c3.stop().get();
905 }).get();
906}
907
9f95a23c
TL
908void test_compressor(std::function<std::unique_ptr<seastar::rpc::compressor>()> compressor_factory) {
909 using namespace seastar::rpc;
910
911 auto linearize = [&] (const auto& buffer) {
912 return seastar::visit(buffer.bufs,
913 [] (const temporary_buffer<char>& buf) {
914 return buf.clone();
915 },
916 [&] (const std::vector<temporary_buffer<char>>& bufs) {
917 auto buf = temporary_buffer<char>(buffer.size);
918 auto dst = buf.get_write();
919 for (auto& b : bufs) {
920 dst = std::copy_n(b.get(), b.size(), dst);
921 }
922 return buf;
923 }
924 );
925 };
926
927 auto split_buffer = [&] (temporary_buffer<char> b, size_t chunk_size) {
928 std::vector<temporary_buffer<char>> bufs;
929 auto src = b.get();
930 auto n = b.size();
931 while (n) {
932 auto this_chunk = std::min(chunk_size, n);
933 bufs.emplace_back(this_chunk);
934 std::copy_n(src, this_chunk, bufs.back().get_write());
935 src += this_chunk;
936 n -= this_chunk;
937 }
938 return bufs;
939 };
940
941 auto clone = [&] (const auto& buffer) {
942 auto c = std::decay_t<decltype(buffer)>();
943 c.size = buffer.size;
944 c.bufs = seastar::visit(buffer.bufs,
945 [] (const temporary_buffer<char>& buf) -> decltype(c.bufs) {
946 return buf.clone();
947 },
948 [] (const std::vector<temporary_buffer<char>>& bufs) -> decltype(c.bufs) {
949 std::vector<temporary_buffer<char>> c;
950 c.reserve(bufs.size());
951 for (auto& b : bufs) {
952 c.emplace_back(b.clone());
953 }
f67539c2 954 return c;
9f95a23c
TL
955 }
956 );
957 return c;
958 };
959
960 auto compressor = compressor_factory();
961
20effc67
TL
962 std::vector<noncopyable_function<std::tuple<sstring, size_t, snd_buf> ()>> inputs;
963
964 auto add_input = [&] (auto func_returning_tuple) {
965 inputs.emplace_back(std::move(func_returning_tuple));
966 };
9f95a23c
TL
967
968 auto& eng = testing::local_random_engine;
1e59de90 969 auto dist = std::uniform_int_distribution<int>(0, std::numeric_limits<char>::max());
9f95a23c
TL
970
971 auto snd = snd_buf(1);
972 *snd.front().get_write() = 'a';
20effc67 973 add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, no headroom"), size_t(0), std::move(snd)); });
9f95a23c
TL
974
975 snd = snd_buf(1);
976 *snd.front().get_write() = 'a';
20effc67 977 add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, 128k of headroom"), size_t(128 * 1024), std::move(snd)); });
9f95a23c 978
f67539c2
TL
979 auto gen_fill = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
980 auto buf = temporary_buffer<char>(s);
981 std::fill_n(buf.get_write(), s, 'a');
9f95a23c 982
f67539c2
TL
983 auto snd = snd_buf();
984 snd.size = s;
985 if (split) {
986 snd.bufs = split_buffer(buf.clone(), *split);
987 } else {
988 snd.bufs = buf.clone();
989 }
20effc67 990 return std::tuple(msg, size_t(0), std::move(snd));
f67539c2 991 };
9f95a23c 992
20effc67
TL
993
994 add_input(std::bind(gen_fill, 16 * 1024, "single 16 kB buffer of \'a\'"));
9f95a23c 995
f67539c2
TL
996 auto gen_rand = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
997 auto buf = temporary_buffer<char>(s);
998 std::generate_n(buf.get_write(), s, [&] { return dist(eng); });
9f95a23c 999
f67539c2
TL
1000 auto snd = snd_buf();
1001 snd.size = s;
1002 if (split) {
1003 snd.bufs = split_buffer(buf.clone(), *split);
1004 } else {
1005 snd.bufs = buf.clone();
1006 }
20effc67 1007 return std::tuple(msg, size_t(0), std::move(snd));
f67539c2 1008 };
9f95a23c 1009
20effc67 1010 add_input(std::bind(gen_rand, 16 * 1024, "single 16 kB buffer of random"));
9f95a23c 1011
f67539c2
TL
1012 auto gen_text = [&](size_t s, sstring msg, std::optional<size_t> split = {}) {
1013 static const std::string_view text = "The quick brown fox wants bananas for his long term health but sneaks bacon behind his wife's back. ";
9f95a23c 1014
f67539c2
TL
1015 auto buf = temporary_buffer<char>(s);
1016 size_t n = 0;
1017 while (n < s) {
1018 auto rem = std::min(s - n, text.size());
1019 std::copy(text.data(), text.data() + rem, buf.get_write() + n);
1020 n += rem;
1021 }
9f95a23c 1022
f67539c2
TL
1023 auto snd = snd_buf();
1024 snd.size = s;
1025 if (split) {
1026 snd.bufs = split_buffer(buf.clone(), *split);
1027 } else {
1028 snd.bufs = buf.clone();
1029 }
20effc67 1030 return std::tuple(msg, size_t(0), std::move(snd));
f67539c2 1031 };
9f95a23c 1032
1e59de90
TL
1033#ifndef SEASTAR_DEBUG
1034 auto buffer_sizes = { 1, 4 };
1035#else
1036 auto buffer_sizes = { 1 };
1037#endif
9f95a23c 1038
1e59de90 1039 for (auto s : buffer_sizes) {
f67539c2 1040 for (auto ss : { 32, 64, 128, 48, 56, 246, 511 }) {
20effc67
TL
1041 add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB - {}", s, ss, ss), ss * 1024 - ss));
1042 add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB", s, ss), ss * 1024));
1043 add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} kB", s, ss), ss * 1024));
9f95a23c 1044
20effc67
TL
1045 add_input(std::bind(gen_fill, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of \'a\' split into {} kB", s, ss), ss * 1024));
1046 add_input(std::bind(gen_rand, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of random split into {} kB", s, ss), ss * 1024));
f67539c2 1047 }
9f95a23c 1048
f67539c2 1049 for (auto ss : { 128, 246, 511, 3567, 2*1024, 8*1024 }) {
20effc67
TL
1050 add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} B", s, ss), ss));
1051 add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} B", s, ss), ss));
1052 add_input(std::bind(gen_text, s * 1024 * 1024, format("{} MB buffer of text split into {} B", s, ss), ss));
1053 add_input(std::bind(gen_fill, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of \'a\' split into {} B", s, ss, ss), ss));
1054 add_input(std::bind(gen_rand, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss));
1055 add_input(std::bind(gen_text, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss));
f67539c2
TL
1056 }
1057 }
9f95a23c 1058
f67539c2 1059 for (auto s : { 64*1024 + 5670, 16*1024 + 3421, 32*1024 - 321 }) {
20effc67
TL
1060 add_input(std::bind(gen_fill, s, format("{} bytes buffer of \'a\'", s)));
1061 add_input(std::bind(gen_rand, s, format("{} bytes buffer of random", s)));
1062 add_input(std::bind(gen_text, s, format("{} bytes buffer of text", s)));
f67539c2 1063 }
9f95a23c
TL
1064
1065 std::vector<std::tuple<sstring, std::function<rcv_buf(snd_buf)>>> transforms {
1066 { "identity", [] (snd_buf snd) {
1067 rcv_buf rcv;
1068 rcv.size = snd.size;
1069 rcv.bufs = std::move(snd.bufs);
1070 return rcv;
1071 } },
1072 { "linearized", [&linearize] (snd_buf snd) {
1073 rcv_buf rcv;
1074 rcv.size = snd.size;
1075 rcv.bufs = linearize(snd);
1076 return rcv;
1077 } },
1078 { "split 1 B", [&] (snd_buf snd) {
1079 rcv_buf rcv;
1080 rcv.size = snd.size;
1081 rcv.bufs = split_buffer(linearize(snd), 1);
1082 return rcv;
1083 } },
1084 { "split 129 B", [&] (snd_buf snd) {
1085 rcv_buf rcv;
1086 rcv.size = snd.size;
1087 rcv.bufs = split_buffer(linearize(snd), 129);
1088 return rcv;
1089 } },
1090 { "split 4 kB", [&] (snd_buf snd) {
1091 rcv_buf rcv;
1092 rcv.size = snd.size;
1093 rcv.bufs = split_buffer(linearize(snd), 4096);
1094 return rcv;
1095 } },
1096 { "split 4 kB - 128", [&] (snd_buf snd) {
1097 rcv_buf rcv;
1098 rcv.size = snd.size;
1099 rcv.bufs = split_buffer(linearize(snd), 4096 - 128);
1100 return rcv;
1101 } },
1102 };
1103
1104 auto sanity_check = [&] (const auto& buffer) {
1105 auto actual_size = seastar::visit(buffer.bufs,
1106 [] (const temporary_buffer<char>& buf) {
1107 return buf.size();
1108 },
1109 [] (const std::vector<temporary_buffer<char>>& bufs) {
1110 return boost::accumulate(bufs, size_t(0), [] (size_t sz, const temporary_buffer<char>& buf) {
1111 return sz + buf.size();
1112 });
1113 }
1114 );
1115 BOOST_CHECK_EQUAL(actual_size, buffer.size);
1116 };
1117
20effc67
TL
1118 for (auto& in_func : inputs) {
1119 auto in = in_func();
9f95a23c
TL
1120 BOOST_TEST_MESSAGE("Input: " << std::get<0>(in));
1121 auto headroom = std::get<1>(in);
1122 auto compressed = compressor->compress(headroom, clone(std::get<2>(in)));
1123 sanity_check(compressed);
1124
1125 // Remove headroom
1126 BOOST_CHECK_GE(compressed.size, headroom);
1127 compressed.size -= headroom;
1128 seastar::visit(compressed.bufs,
1129 [&] (temporary_buffer<char>& buf) {
1130 BOOST_CHECK_GE(buf.size(), headroom);
1131 buf.trim_front(headroom);
1132 },
1133 [&] (std::vector<temporary_buffer<char>>& bufs) {
1134 while (headroom) {
1135 BOOST_CHECK(!bufs.empty());
1136 auto to_remove = std::min(bufs.front().size(), headroom);
1137 bufs.front().trim_front(to_remove);
1138 if (bufs.front().empty() && bufs.size() > 1) {
1139 bufs.erase(bufs.begin());
1140 }
1141 headroom -= to_remove;
11fdf7f2 1142 }
9f95a23c
TL
1143 }
1144 );
1145
1146 auto in_l = linearize(std::get<2>(in));
1147
1148 for (auto& t : transforms) {
1149 BOOST_TEST_MESSAGE(" Transform: " << std::get<0>(t));
1150 auto received = std::get<1>(t)(clone(compressed));
1151
1152 auto decompressed = compressor->decompress(std::move(received));
1153 sanity_check(decompressed);
1154
1155 BOOST_CHECK_EQUAL(decompressed.size, std::get<2>(in).size);
1156
1157 auto out_l = linearize(decompressed);
1158
1159 BOOST_CHECK_EQUAL(in_l.size(), out_l.size());
1160 BOOST_CHECK(in_l == out_l);
1161 }
1162 }
1163}
1164
1165SEASTAR_THREAD_TEST_CASE(test_lz4_compressor) {
1166 test_compressor([] { return std::make_unique<rpc::lz4_compressor>(); });
1167}
1168
1169SEASTAR_THREAD_TEST_CASE(test_lz4_fragmented_compressor) {
1170 test_compressor([] { return std::make_unique<rpc::lz4_fragmented_compressor>(); });
1171}
1172
1173// Test reproducing issue #671: If timeout is time_point::max(), translating
1174// it to relative timeout in the sender and then back in the receiver, when
1175// these calculations happen across a millisecond boundary, overflowed the
1176// integer and mislead the receiver to think the requested timeout was
1177// negative, and cause it drop its response, so the RPC call never completed.
1178SEASTAR_TEST_CASE(test_max_absolute_timeout) {
1179 // The typical failure of this test is a hang. So we use semaphore to
1180 // stop the test either when it succeeds, or after a long enough hang.
1181 auto success = make_lw_shared<bool>(false);
1182 auto done = make_lw_shared<semaphore>(0);
1183 auto abrt = make_lw_shared<abort_source>();
1184 (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] {
1185 done->signal(1);
1186 }).handle_exception([] (std::exception_ptr) {});
1187 rpc::client_options co;
1188 co.send_timeout_data = 1;
1189 (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1190 env.register_handler(1, [](int a, int b) {
1191 return make_ready_future<int>(a+b);
1192 }).get();
1193 auto sum = env.proto().make_client<int (int, int)>(1);
1194 // The bug only reproduces if the calculation done on the sender
1195 // and receiver sides, happened across a millisecond boundary.
1196 // We can't control when it happens, so we just need to loop many
1197 // times, at least many milliseconds, to increase the probability
1198 // that we catch the bug. Experimentally, if we loop for 200ms, we
1199 // catch the bug in #671 virtually every time.
1200 auto until = seastar::lowres_clock::now() + std::chrono::milliseconds(200);
1201 while (seastar::lowres_clock::now() <= until) {
1202 auto result = sum(c1, rpc::rpc_clock_type::time_point::max(), 2, 3).get0();
1203 BOOST_REQUIRE_EQUAL(result, 2 + 3);
1204 }
1205 }).then([success, done, abrt] {
1206 *success = true;
1207 abrt->request_abort();
1208 done->signal();
1209 });
1210 return done->wait().then([done, success] {
1211 BOOST_REQUIRE(*success);
1212 });
1213}
1214
1215// Similar to the above test: Test that a relative timeout duration::max()
1216// also works, and again doesn't cause the timeout wrapping around to the
1217// past and causing dropped responses.
1218SEASTAR_TEST_CASE(test_max_relative_timeout) {
1219 // The typical failure of this test is a hang. So we use semaphore to
1220 // stop the test either when it succeeds, or after a long enough hang.
1221 auto success = make_lw_shared<bool>(false);
1222 auto done = make_lw_shared<semaphore>(0);
1223 auto abrt = make_lw_shared<abort_source>();
1224 (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] {
1225 done->signal(1);
1226 }).handle_exception([] (std::exception_ptr) {});
1227 rpc::client_options co;
1228 co.send_timeout_data = 1;
1229 (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1230 env.register_handler(1, [](int a, int b) {
1231 return make_ready_future<int>(a+b);
1232 }).get();
1233 auto sum = env.proto().make_client<int (int, int)>(1);
1234 // The following call used to always hang, when max()+now()
1235 // overflowed and appeared to be a negative timeout.
1236 auto result = sum(c1, rpc::rpc_clock_type::duration::max(), 2, 3).get0();
1237 BOOST_REQUIRE_EQUAL(result, 2 + 3);
1238 }).then([success, done, abrt] {
1239 *success = true;
1240 abrt->request_abort();
1241 done->signal();
1242 });
1243 return done->wait().then([done, success] {
1244 BOOST_REQUIRE(*success);
1245 });
1246}
1247
1248SEASTAR_TEST_CASE(test_rpc_tuple) {
1249 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1250 env.register_handler(1, [] () {
1251 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L));
1252 }).get();
1253 auto f1 = env.proto().make_client<rpc::tuple<int, long> ()>(1);
1254 auto result = f1(c1).get0();
1255 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1256 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1257 });
1258}
1259
1260SEASTAR_TEST_CASE(test_rpc_nonvariadic_client_variadic_server) {
1261 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1262 // Server is variadic
1263 env.register_handler(1, [] () {
f67539c2 1264 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple(1, 0x7'0000'0000L));
9f95a23c
TL
1265 }).get();
1266 // Client is non-variadic
1267 auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1);
1268 auto result = f1(c1).get0();
1269 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1270 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1271 });
1272}
1273
1274SEASTAR_TEST_CASE(test_rpc_variadic_client_nonvariadic_server) {
1275 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1276 // Server is nonvariadic
1277 env.register_handler(1, [] () {
1278 return make_ready_future<rpc::tuple<int, long>>(rpc::tuple<int, long>(1, 0x7'0000'0000L));
1279 }).get();
1280 // Client is variadic
f67539c2
TL
1281 auto f1 = env.proto().make_client<future<rpc::tuple<int, long>> ()>(1);
1282 auto result = f1(c1).get0();
9f95a23c
TL
1283 BOOST_REQUIRE_EQUAL(std::get<0>(result), 1);
1284 BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L);
1285 });
1286}
1287
1288SEASTAR_TEST_CASE(test_handler_registration) {
1289 rpc_test_config cfg;
1e59de90
TL
1290 rpc_loopback_error_injector::config ecfg;
1291 ecfg.connect_kind = loopback_error_injector::error::abort;
1292 cfg.inject_error = ecfg;
9f95a23c
TL
1293 return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) {
1294 auto& proto = env.proto();
1295
f67539c2
TL
1296 // new proto must be empty
1297 BOOST_REQUIRE(!proto.has_handlers());
1298
9f95a23c
TL
1299 // non-existing handler should not be found
1300 BOOST_REQUIRE(!proto.has_handler(1));
1301
1302 // unregistered non-existing handler should return ready future
1303 auto fut = proto.unregister_handler(1);
1304 BOOST_REQUIRE(fut.available() && !fut.failed());
1305 fut.get();
1306
1307 // existing handler should be found
1308 auto handler = [] () { return make_ready_future<>(); };
1309 proto.register_handler(1, handler);
1310 BOOST_REQUIRE(proto.has_handler(1));
1311
1312 // cannot register handler if already registered
1313 BOOST_REQUIRE_THROW(proto.register_handler(1, handler), std::runtime_error);
1314
1315 // unregistered handler should not be found
1316 proto.unregister_handler(1).get();
1317 BOOST_REQUIRE(!proto.has_handler(1));
1318
1319 // re-registering a handler should succeed
1320 proto.register_handler(1, handler);
1321 BOOST_REQUIRE(proto.has_handler(1));
f67539c2
TL
1322
1323 // proto with handlers must not be empty
1324 BOOST_REQUIRE(proto.has_handlers());
9f95a23c
TL
1325 });
1326}
1327
1328SEASTAR_TEST_CASE(test_unregister_handler) {
1329 using namespace std::chrono_literals;
1330 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1331 promise<> handler_called;
1332 future<> f_handler_called = handler_called.get_future();
1333 bool rpc_executed = false;
1334 bool rpc_completed = false;
1335
1336 auto reset_state = [&f_handler_called, &rpc_executed, &rpc_completed] {
1337 if (f_handler_called.available()) {
1338 f_handler_called.get();
1339 }
1340 rpc_executed = false;
1341 rpc_completed = false;
1342 };
1343
1344 auto get_handler = [&handler_called, &rpc_executed, &rpc_completed] {
1345 return [&handler_called, &rpc_executed, &rpc_completed] {
1346 handler_called.set_value();
1347 rpc_executed = true;
1348 return sleep(1ms).then([&rpc_completed] {
1349 rpc_completed = true;
1350 });
1351 };
1352 };
1353
1354 // handler should not run if unregistered before being called
1355 env.register_handler(1, get_handler()).get();
1356 env.unregister_handler(1).get();
1357 BOOST_REQUIRE(!f_handler_called.available());
1358 BOOST_REQUIRE(!rpc_executed);
1359 BOOST_REQUIRE(!rpc_completed);
1360
1361 // verify normal execution path
1362 env.register_handler(1, get_handler()).get();
1363 auto call = env.proto().make_client<void ()>(1);
1364 call(c1).get();
1365 BOOST_REQUIRE(f_handler_called.available());
1366 BOOST_REQUIRE(rpc_executed);
1367 BOOST_REQUIRE(rpc_completed);
1368 reset_state();
1369
1370 // call should fail after handler is unregistered
1371 env.unregister_handler(1).get();
1372 try {
1373 call(c1).get();
1374 BOOST_REQUIRE(false);
1375 } catch (rpc::unknown_verb_error&) {
1376 // expected
1377 } catch (...) {
1378 std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl;
1379 BOOST_REQUIRE(false);
1380 }
1381 BOOST_REQUIRE(!f_handler_called.available());
1382 BOOST_REQUIRE(!rpc_executed);
1383 BOOST_REQUIRE(!rpc_completed);
1384
1385 // unregistered is allowed while call is in flight
1386 auto delayed_unregister = [&env] {
1387 return sleep(500us).then([&env] {
1388 return env.unregister_handler(1);
11fdf7f2 1389 });
9f95a23c
TL
1390 };
1391
1392 env.register_handler(1, get_handler()).get();
1393 try {
1394 when_all_succeed(call(c1), delayed_unregister()).get();
1395 reset_state();
1396 } catch (rpc::unknown_verb_error&) {
1397 // expected
1398 } catch (...) {
1399 std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl;
1400 BOOST_REQUIRE(false);
1401 }
1402 });
11fdf7f2 1403}
9f95a23c 1404
f67539c2
TL
1405SEASTAR_TEST_CASE(test_loggers) {
1406 static seastar::logger log("dummy");
1407 log.set_level(log_level::debug);
1408 return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
1409 socket_address dummy_addr;
1410 auto& proto = env.proto();
1411 auto& logger = proto.get_logger();
1412 logger(dummy_addr, "Hello0");
1413 logger(dummy_addr, log_level::debug, "Hello1");
1414 proto.set_logger(&log);
1415 logger(dummy_addr, "Hello2");
1416 logger(dummy_addr, log_level::debug, "Hello3");
1417 // We *want* to test the deprecated API, don't spam warnings about it.
1418#pragma GCC diagnostic push
1419#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1420 proto.set_logger([] (const sstring& str) {
1421 log.info("Test: {}", str);
1422 });
1423#pragma GCC diagnostic pop
1424 logger(dummy_addr, "Hello4");
1425 logger(dummy_addr, log_level::debug, "Hello5");
1426 proto.set_logger(nullptr);
1427 logger(dummy_addr, "Hello6");
1428 logger(dummy_addr, log_level::debug, "Hello7");
1429 });
1430}
9f95a23c 1431
1e59de90
TL
1432SEASTAR_TEST_CASE(test_connection_id_format) {
1433 rpc::connection_id cid = rpc::connection_id::make_id(0x123, 1);
1434 std::string res = format("{}", cid);
1435 BOOST_REQUIRE_EQUAL(res, "1230001");
1436 return make_ready_future<>();
1437}
1438
9f95a23c 1439static_assert(std::is_same_v<decltype(rpc::tuple(1U, 1L)), rpc::tuple<unsigned, long>>, "rpc::tuple deduction guid not working");
1e59de90
TL
1440
1441SEASTAR_TEST_CASE(test_client_info) {
1442 rpc::client_info info;
1443 const rpc::client_info& const_info = *const_cast<rpc::client_info*>(&info);
1444
1445 info.attach_auxiliary("key", 0);
1446 BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary<int>("key"), 0);
1447 info.retrieve_auxiliary<int>("key") = 1;
1448 BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary<int>("key"), 1);
1449
1450 BOOST_REQUIRE_EQUAL(info.retrieve_auxiliary_opt<int>("key"), &info.retrieve_auxiliary<int>("key"));
1451 BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary_opt<int>("key"), &const_info.retrieve_auxiliary<int>("key"));
1452
1453 BOOST_REQUIRE_EQUAL(info.retrieve_auxiliary_opt<int>("missing"), nullptr);
1454 BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary_opt<int>("missing"), nullptr);
1455
1456 return make_ready_future<>();
1457}