]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/tests/unit/rpc_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / tests / unit / rpc_test.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 ScyllaDB
20 */
21
22
23#include "loopback_socket.hh"
24#include <seastar/rpc/rpc.hh>
25#include <seastar/rpc/rpc_types.hh>
26#include <seastar/rpc/lz4_compressor.hh>
27#include <seastar/rpc/multi_algo_compressor_factory.hh>
28#include <seastar/testing/test_case.hh>
29#include <seastar/testing/thread_test_case.hh>
30#include <seastar/core/thread.hh>
31#include <seastar/core/sleep.hh>
32#include <seastar/util/defer.hh>
33
34using namespace seastar;
35
36struct serializer {
37};
38
39template <typename T, typename Output>
40inline
41void write_arithmetic_type(Output& out, T v) {
42 static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
43 return out.write(reinterpret_cast<const char*>(&v), sizeof(T));
44}
45
46template <typename T, typename Input>
47inline
48T read_arithmetic_type(Input& in) {
49 static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
50 T v;
51 in.read(reinterpret_cast<char*>(&v), sizeof(T));
52 return v;
53}
54
55template <typename Output>
56inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); }
57template <typename Output>
58inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); }
59template <typename Output>
60inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); }
61template <typename Output>
62inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); }
63template <typename Output>
64inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); }
65template <typename Input>
66inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); }
67template <typename Input>
68inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); }
69template <typename Input>
70inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); }
71template <typename Input>
72inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); }
73template <typename Input>
74inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); }
75
76template <typename Output>
77inline void write(serializer, Output& out, const sstring& v) {
78 write_arithmetic_type(out, uint32_t(v.size()));
79 out.write(v.c_str(), v.size());
80}
81
82template <typename Input>
83inline sstring read(serializer, Input& in, rpc::type<sstring>) {
84 auto size = read_arithmetic_type<uint32_t>(in);
85 sstring ret(sstring::initialized_later(), size);
86 in.read(ret.begin(), size);
87 return ret;
88}
89
90using test_rpc_proto = rpc::protocol<serializer>;
91using make_socket_fn = std::function<seastar::socket ()>;
92
93struct rpc_loopback_error_injector : public loopback_error_injector {
94 int _x = 0;
95 bool server_rcv_error() override {
96 return _x++ >= 50;
97 }
98};
99
100class rpc_socket_impl : public ::net::socket_impl {
101 promise<connected_socket> _p;
102 bool _connect;
103 loopback_socket_impl _socket;
104 rpc_loopback_error_injector _error_injector;
105public:
106 rpc_socket_impl(loopback_connection_factory& factory, bool connect, bool inject_error)
107 : _connect(connect),
108 _socket(factory, inject_error ? &_error_injector : nullptr) {
109 }
110 virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override {
111 return _connect ? _socket.connect(sa, local, proto) : _p.get_future();
112 }
113 virtual void shutdown() override {
114 if (_connect) {
115 _socket.shutdown();
116 } else {
117 _p.set_exception(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
118 }
119 }
120};
121
122future<>
123with_rpc_env(rpc::resource_limits resource_limits, rpc::server_options so, bool connect, bool inject_error,
124 std::function<future<> (test_rpc_proto& proto, test_rpc_proto::server& server, make_socket_fn make_socket)> test_fn) {
125 struct state {
126 test_rpc_proto proto{serializer()};
127 loopback_connection_factory lcf;
128 std::vector<std::unique_ptr<test_rpc_proto::server>> servers;
129 };
130 return do_with(state(), [=] (state& s) {
131 s.servers.resize(smp::count);
132 return smp::invoke_on_all([=, &s] {
133 s.servers[engine().cpu_id()] = std::make_unique<test_rpc_proto::server>(s.proto, so, s.lcf.get_server_socket(), resource_limits);
134 }).then([=, &s] {
135 auto make_socket = [&s, connect, inject_error] () {
136 return seastar::socket(std::make_unique<rpc_socket_impl>(s.lcf, connect, inject_error));
137 };
138 return test_fn(s.proto, *s.servers[0], make_socket).finally([&] {
139 return smp::invoke_on_all([&s] {
140 auto sptr = s.servers[engine().cpu_id()].get();
141 s.lcf.destroy_shard(engine().cpu_id());
142 return sptr->stop().finally([p = std::move(s.servers[engine().cpu_id()])] {});
143 });
144 });
145 });
146 });
147}
148
149struct cfactory : rpc::compressor::factory {
150 mutable int use_compression = 0;
151 const sstring name;
152 cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {}
153 const sstring& supported() const override {
154 return name;
155 }
156 std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override {
157 if (feature == name) {
158 use_compression++;
159 return std::make_unique<rpc::lz4_compressor>();
160 } else {
161 return nullptr;
162 }
163 }
164};
165#if 1
166SEASTAR_TEST_CASE(test_rpc_connect) {
167 std::vector<future<>> fs;
168
169 for (auto i = 0; i < 2; i++) {
170 for (auto j = 0; j < 4; j++) {
171 auto factory = std::make_unique<cfactory>();
172 rpc::server_options so;
173 rpc::client_options co;
174 if (i == 1) {
175 so.compressor_factory = factory.get();
176 }
177 if (j & 1) {
178 co.compressor_factory = factory.get();
179 }
180 co.send_timeout_data = j & 2;
181 auto f = with_rpc_env({}, so, true, false, [co] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
182 return seastar::async([&proto, make_socket, co] {
183 test_rpc_proto::client c1(proto, co, make_socket(), ipv4_addr());
184 auto sum = proto.register_handler(1, [](int a, int b) {
185 return make_ready_future<int>(a+b);
186 });
187 auto result = sum(c1, 2, 3).get0();
188 BOOST_REQUIRE_EQUAL(result, 2 + 3);
189 c1.stop().get();
190 });
191 }).handle_exception([] (auto ep) {
192 BOOST_FAIL("No exception expected");
193 }).finally([factory = std::move(factory), i, j = j & 1] {
194 if (i == 1 && j == 1) {
195 BOOST_REQUIRE_EQUAL(factory->use_compression, 2);
196 } else {
197 BOOST_REQUIRE_EQUAL(factory->use_compression, 0);
198 }
199 });
200 fs.emplace_back(std::move(f));
201 }
202 }
203 return when_all(fs.begin(), fs.end()).discard_result();
204}
205
206SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) {
207 auto factory1 = std::make_unique<cfactory>();
208 auto factory2 = std::make_unique<cfactory>("LZ4NEW");
209 rpc::server_options so;
210 rpc::client_options co;
211 static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()});
212 static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()});
213 so.compressor_factory = &server;
214 co.compressor_factory = &client;
215 return with_rpc_env({}, so, true, false, [co] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
216 return seastar::async([&proto, make_socket, co] {
217 test_rpc_proto::client c1(proto, co, make_socket(), ipv4_addr());
218 auto sum = proto.register_handler(1, [](int a, int b) {
219 return make_ready_future<int>(a+b);
220 });
221 auto result = sum(c1, 2, 3).get0();
222 BOOST_REQUIRE_EQUAL(result, 2 + 3);
223 c1.stop().get();
224 });
225 }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] {
226 BOOST_REQUIRE_EQUAL(factory1->use_compression, 0);
227 BOOST_REQUIRE_EQUAL(factory2->use_compression, 2);
228 });
229}
230
231SEASTAR_TEST_CASE(test_rpc_connect_abort) {
232 return with_rpc_env({}, {}, false, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
233 return seastar::async([&proto, make_socket] {
234 test_rpc_proto::client c1(proto, {}, make_socket(), ipv4_addr());
235 auto f = proto.register_handler(1, []() { return make_ready_future<>(); });
236 c1.stop().get0();
237 try {
238 f(c1).get0();
239 BOOST_REQUIRE(false);
240 } catch (...) {}
241 });
242 });
243}
244
245SEASTAR_TEST_CASE(test_rpc_cancel) {
246 using namespace std::chrono_literals;
247 return with_rpc_env({}, {}, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
248 return seastar::async([&proto, make_socket] {
249 test_rpc_proto::client c1(proto, {}, make_socket(), ipv4_addr());
250 bool rpc_executed = false;
251 int good = 0;
252 promise<> handler_called;
253 future<> f_handler_called = handler_called.get_future();
254 auto call = proto.register_handler(1, [&rpc_executed, handler_called = std::move(handler_called)] () mutable {
255 handler_called.set_value(); rpc_executed = true; return sleep(1ms);
256 });
257 rpc::cancellable cancel;
258 auto f = call(c1, cancel);
259 // cancel send side
260 cancel.cancel();
261 try {
262 f.get();
263 } catch(rpc::canceled_error&) {
264 good += !rpc_executed;
265 };
266 f = call(c1, cancel);
267 // cancel wait side
268 f_handler_called.then([&cancel] {
269 cancel.cancel();
270 }).get();
271 try {
272 f.get();
273 } catch(rpc::canceled_error&) {
274 good += 10*rpc_executed;
275 };
276 c1.stop().get();
277 BOOST_REQUIRE_EQUAL(good, 11);
278 });
279 });
280}
281
282SEASTAR_TEST_CASE(test_message_to_big) {
283 return with_rpc_env({0, 1, 100}, {}, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
284 return seastar::async([&proto, make_socket] {
285 test_rpc_proto::client c(proto, {}, make_socket(), ipv4_addr());
286 bool good = true;
287 auto call = proto.register_handler(1, [&] (sstring payload) mutable {
288 good = false;
289 });
290 try {
291 call(c, sstring(sstring::initialized_later(), 101)).get();
292 good = false;
293 } catch(std::runtime_error& err) {
294 } catch(...) {
295 good = false;
296 }
297 c.stop().get();
298 BOOST_REQUIRE_EQUAL(good, true);
299 });
300 });
301}
302#endif
303
304struct stream_test_result {
305 bool client_source_closed = false;
306 bool server_source_closed = false;
307 bool sink_exception = false;
308 bool sink_close_exception = false;
309 bool source_done_exception = false;
310 bool server_done_exception = false;
311 bool client_stop_exception = false;
312 int server_sum = 0;
313};
314
315future<stream_test_result> stream_test_func(test_rpc_proto& proto, make_socket_fn make_socket, bool stop_client) {
316 return seastar::async([&proto, make_socket, stop_client] {
317 stream_test_result r;
318 test_rpc_proto::client c(proto, {}, make_socket(), ipv4_addr());
319 future<> server_done = make_ready_future();
320 proto.register_handler(1, [&](int i, rpc::source<int> source) {
321 BOOST_REQUIRE_EQUAL(i, 666);
322 auto sink = source.make_sink<serializer, sstring>();
323 auto sink_loop = seastar::async([sink] () mutable {
324 for (auto i = 0; i < 100; i++) {
325 sink("seastar").get();
326 sleep(std::chrono::milliseconds(1)).get();
327 }
328 sink.close().get();
329 });
330 auto source_loop = seastar::async([source, &r] () mutable {
331 while (!r.server_source_closed) {
332 auto data = source().get0();
333 if (data) {
334 r.server_sum += std::get<0>(*data);
335 } else {
336 r.server_source_closed = true;
337 }
338 }
339 });
340 server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result();
341 return sink;
342 });
343 auto call = proto.make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1);
344 auto x = [&] {
345 try {
346 return c.make_stream_sink<serializer, int>(make_socket()).get0();
347 } catch (...) {
348 c.stop().get();
349 throw;
350 }
351 };
352 auto sink = x();
353 auto source = call(c, 666, sink).get0();
354 auto source_done = seastar::async([&] {
355 while (!r.client_source_closed) {
356 auto data = source().get0();
357 if (data) {
358 BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar");
359 } else {
360 r.client_source_closed = true;
361 }
362 }
363 });
364 auto check_exception = [] (auto f) {
365 try {
366 f.get();
367 } catch (...) {
368 return true;
369 }
370 return false;
371 };
372 future<> stop_client_future = make_ready_future();
373 for (int i = 1; i < 101; i++) {
374 if (stop_client && i == 50) {
375 // stop client while stream is in use
376 stop_client_future = c.stop();
377 }
378 sleep(std::chrono::milliseconds(1)).get();
379 r.sink_exception = check_exception(sink(i));
380 if (r.sink_exception) {
381 break;
382 }
383 }
384 r.sink_close_exception = check_exception(sink.close());
385 r.source_done_exception = check_exception(std::move(source_done));
386 r.server_done_exception = check_exception(std::move(server_done));
387 r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future));
388 return r;
389 });
390}
391
392SEASTAR_TEST_CASE(test_stream_simple) {
393 rpc::server_options so;
394 so.streaming_domain = rpc::streaming_domain_type(1);
395 return with_rpc_env({}, so, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
396 return stream_test_func(proto, make_socket, false).then([] (stream_test_result r) {
397 BOOST_REQUIRE(r.client_source_closed &&
398 r.server_source_closed &&
399 r.server_sum == 5050 &&
400 !r.sink_exception &&
401 !r.sink_close_exception &&
402 !r.source_done_exception &&
403 !r.server_done_exception &&
404 !r.client_stop_exception);
405 });
406 });
407}
408
409SEASTAR_TEST_CASE(test_stream_stop_client) {
410 rpc::server_options so;
411 so.streaming_domain = rpc::streaming_domain_type(1);
412 return with_rpc_env({}, so, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
413 return stream_test_func(proto, make_socket, true).then([] (stream_test_result r) {
414 BOOST_REQUIRE(!r.client_source_closed &&
415 !r.server_source_closed &&
416 r.sink_exception &&
417 r.sink_close_exception &&
418 r.source_done_exception &&
419 r.server_done_exception &&
420 !r.client_stop_exception);
421 });
422 });
423}
424
425
426SEASTAR_TEST_CASE(test_stream_connection_error) {
427 rpc::server_options so;
428 so.streaming_domain = rpc::streaming_domain_type(1);
429 return with_rpc_env({}, so, true, true, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
430 return stream_test_func(proto, make_socket, false).then([] (stream_test_result r) {
431 BOOST_REQUIRE(!r.client_source_closed &&
432 !r.server_source_closed &&
433 r.sink_exception &&
434 r.sink_close_exception &&
435 r.source_done_exception &&
436 r.server_done_exception &&
437 !r.client_stop_exception);
438 });
439 });
440}
441
442SEASTAR_TEST_CASE(test_rpc_scheduling) {
443 return with_rpc_env({}, {}, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
444 return seastar::async([&proto, make_socket] {
445 test_rpc_proto::client c1(proto, {}, make_socket(), ipv4_addr());
446 auto sg = create_scheduling_group("rpc", 100).get0();
447 auto call = proto.register_handler(1, sg, [sg] () mutable {
448 BOOST_REQUIRE(sg == current_scheduling_group());
449 return make_ready_future<>();
450 });
451 call(c1).get();
452 c1.stop().get();
453 });
454 });
455}
456
457SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) {
458 auto sg1 = create_scheduling_group("sg1", 100).get0();
459 auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); });
460 auto sg2 = create_scheduling_group("sg2", 100).get0();
461 auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); });
462 rpc::resource_limits limits;
463 limits.isolate_connection = [sg1, sg2] (sstring cookie) {
464 auto sg = current_scheduling_group();
465 if (cookie == "sg1") {
466 sg = sg1;
467 } else if (cookie == "sg2") {
468 sg = sg2;
469 }
470 rpc::isolation_config cfg;
471 cfg.sched_group = sg;
472 return cfg;
473 };
474 with_rpc_env(limits, {}, true, false, [sg1, sg2] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
475 return async([&proto, make_socket, sg1, sg2] {
476 rpc::client_options co1;
477 co1.isolation_cookie = "sg1";
478 test_rpc_proto::client c1(proto, co1, make_socket(), ipv4_addr());
479 rpc::client_options co2;
480 co2.isolation_cookie = "sg2";
481 test_rpc_proto::client c2(proto, co2, make_socket(), ipv4_addr());
482 auto call = proto.register_handler(1, [sg1, sg2] (int which) mutable {
483 scheduling_group expected;
484 if (which == 1) {
485 expected = sg1;
486 } else if (which == 2) {
487 expected = sg2;
488 }
489 BOOST_REQUIRE(current_scheduling_group() == expected);
490 return make_ready_future<>();
491 });
492 call(c1, 1).get();
493 call(c2, 2).get();
494 c1.stop().get();
495 c2.stop().get();
496 });
497 }).get();
498}
499
500SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) {
501 auto sg1 = create_scheduling_group("sg1", 100).get0();
502 auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); });
503 auto sg2 = create_scheduling_group("sg2", 100).get0();
504 auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); });
505 rpc::resource_limits limits;
506 limits.isolate_connection = [sg1, sg2] (sstring cookie) {
507 auto sg = current_scheduling_group();
508 if (cookie == "sg1") {
509 sg = sg1;
510 } else if (cookie == "sg2") {
511 sg = sg2;
512 }
513 rpc::isolation_config cfg;
514 cfg.sched_group = sg;
515 return cfg;
516 };
517 with_rpc_env(limits, {}, true, false, [sg1, sg2] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) {
518 return async([&proto, make_socket, sg1, sg2] {
519 rpc::client_options co1;
520 co1.isolation_cookie = "sg1";
521 test_rpc_proto::client c1(proto, co1, make_socket(), ipv4_addr());
522 rpc::client_options co2;
523 co2.isolation_cookie = "sg2";
524 test_rpc_proto::client c2(proto, co2, make_socket(), ipv4_addr());
525 // An old client, that doesn't have an isolation cookie
526 rpc::client_options co3;
527 test_rpc_proto::client c3(proto, co3, make_socket(), ipv4_addr());
528 // A server that uses sg1 if the client is old
529 auto call = proto.register_handler(1, sg1, [sg1, sg2] (int which) mutable {
530 scheduling_group expected;
531 if (which == 1) {
532 expected = sg1;
533 } else if (which == 2) {
534 expected = sg2;
535 }
536 BOOST_REQUIRE(current_scheduling_group() == expected);
537 return make_ready_future<>();
538 });
539 call(c1, 1).get();
540 call(c2, 2).get();
541 call(c3, 1).get();
542 c1.stop().get();
543 c2.stop().get();
544 c3.stop().get();
545 });
546 }).get();
547}