]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2016 ScyllaDB | |
20 | */ | |
21 | ||
22 | ||
23 | #include "loopback_socket.hh" | |
24 | #include <seastar/rpc/rpc.hh> | |
25 | #include <seastar/rpc/rpc_types.hh> | |
26 | #include <seastar/rpc/lz4_compressor.hh> | |
27 | #include <seastar/rpc/multi_algo_compressor_factory.hh> | |
28 | #include <seastar/testing/test_case.hh> | |
29 | #include <seastar/testing/thread_test_case.hh> | |
30 | #include <seastar/core/thread.hh> | |
31 | #include <seastar/core/sleep.hh> | |
32 | #include <seastar/util/defer.hh> | |
33 | ||
34 | using namespace seastar; | |
35 | ||
36 | struct serializer { | |
37 | }; | |
38 | ||
39 | template <typename T, typename Output> | |
40 | inline | |
41 | void write_arithmetic_type(Output& out, T v) { | |
42 | static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); | |
43 | return out.write(reinterpret_cast<const char*>(&v), sizeof(T)); | |
44 | } | |
45 | ||
46 | template <typename T, typename Input> | |
47 | inline | |
48 | T read_arithmetic_type(Input& in) { | |
49 | static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); | |
50 | T v; | |
51 | in.read(reinterpret_cast<char*>(&v), sizeof(T)); | |
52 | return v; | |
53 | } | |
54 | ||
55 | template <typename Output> | |
56 | inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); } | |
57 | template <typename Output> | |
58 | inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); } | |
59 | template <typename Output> | |
60 | inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); } | |
61 | template <typename Output> | |
62 | inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); } | |
63 | template <typename Output> | |
64 | inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); } | |
65 | template <typename Input> | |
66 | inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); } | |
67 | template <typename Input> | |
68 | inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); } | |
69 | template <typename Input> | |
70 | inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); } | |
71 | template <typename Input> | |
72 | inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); } | |
73 | template <typename Input> | |
74 | inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); } | |
75 | ||
76 | template <typename Output> | |
77 | inline void write(serializer, Output& out, const sstring& v) { | |
78 | write_arithmetic_type(out, uint32_t(v.size())); | |
79 | out.write(v.c_str(), v.size()); | |
80 | } | |
81 | ||
82 | template <typename Input> | |
83 | inline sstring read(serializer, Input& in, rpc::type<sstring>) { | |
84 | auto size = read_arithmetic_type<uint32_t>(in); | |
85 | sstring ret(sstring::initialized_later(), size); | |
86 | in.read(ret.begin(), size); | |
87 | return ret; | |
88 | } | |
89 | ||
90 | using test_rpc_proto = rpc::protocol<serializer>; | |
91 | using make_socket_fn = std::function<seastar::socket ()>; | |
92 | ||
93 | struct rpc_loopback_error_injector : public loopback_error_injector { | |
94 | int _x = 0; | |
95 | bool server_rcv_error() override { | |
96 | return _x++ >= 50; | |
97 | } | |
98 | }; | |
99 | ||
100 | class rpc_socket_impl : public ::net::socket_impl { | |
101 | promise<connected_socket> _p; | |
102 | bool _connect; | |
103 | loopback_socket_impl _socket; | |
104 | rpc_loopback_error_injector _error_injector; | |
105 | public: | |
106 | rpc_socket_impl(loopback_connection_factory& factory, bool connect, bool inject_error) | |
107 | : _connect(connect), | |
108 | _socket(factory, inject_error ? &_error_injector : nullptr) { | |
109 | } | |
110 | virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override { | |
111 | return _connect ? _socket.connect(sa, local, proto) : _p.get_future(); | |
112 | } | |
113 | virtual void shutdown() override { | |
114 | if (_connect) { | |
115 | _socket.shutdown(); | |
116 | } else { | |
117 | _p.set_exception(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); | |
118 | } | |
119 | } | |
120 | }; | |
121 | ||
122 | future<> | |
123 | with_rpc_env(rpc::resource_limits resource_limits, rpc::server_options so, bool connect, bool inject_error, | |
124 | std::function<future<> (test_rpc_proto& proto, test_rpc_proto::server& server, make_socket_fn make_socket)> test_fn) { | |
125 | struct state { | |
126 | test_rpc_proto proto{serializer()}; | |
127 | loopback_connection_factory lcf; | |
128 | std::vector<std::unique_ptr<test_rpc_proto::server>> servers; | |
129 | }; | |
130 | return do_with(state(), [=] (state& s) { | |
131 | s.servers.resize(smp::count); | |
132 | return smp::invoke_on_all([=, &s] { | |
133 | s.servers[engine().cpu_id()] = std::make_unique<test_rpc_proto::server>(s.proto, so, s.lcf.get_server_socket(), resource_limits); | |
134 | }).then([=, &s] { | |
135 | auto make_socket = [&s, connect, inject_error] () { | |
136 | return seastar::socket(std::make_unique<rpc_socket_impl>(s.lcf, connect, inject_error)); | |
137 | }; | |
138 | return test_fn(s.proto, *s.servers[0], make_socket).finally([&] { | |
139 | return smp::invoke_on_all([&s] { | |
140 | auto sptr = s.servers[engine().cpu_id()].get(); | |
141 | s.lcf.destroy_shard(engine().cpu_id()); | |
142 | return sptr->stop().finally([p = std::move(s.servers[engine().cpu_id()])] {}); | |
143 | }); | |
144 | }); | |
145 | }); | |
146 | }); | |
147 | } | |
148 | ||
149 | struct cfactory : rpc::compressor::factory { | |
150 | mutable int use_compression = 0; | |
151 | const sstring name; | |
152 | cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {} | |
153 | const sstring& supported() const override { | |
154 | return name; | |
155 | } | |
156 | std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override { | |
157 | if (feature == name) { | |
158 | use_compression++; | |
159 | return std::make_unique<rpc::lz4_compressor>(); | |
160 | } else { | |
161 | return nullptr; | |
162 | } | |
163 | } | |
164 | }; | |
165 | #if 1 | |
166 | SEASTAR_TEST_CASE(test_rpc_connect) { | |
167 | std::vector<future<>> fs; | |
168 | ||
169 | for (auto i = 0; i < 2; i++) { | |
170 | for (auto j = 0; j < 4; j++) { | |
171 | auto factory = std::make_unique<cfactory>(); | |
172 | rpc::server_options so; | |
173 | rpc::client_options co; | |
174 | if (i == 1) { | |
175 | so.compressor_factory = factory.get(); | |
176 | } | |
177 | if (j & 1) { | |
178 | co.compressor_factory = factory.get(); | |
179 | } | |
180 | co.send_timeout_data = j & 2; | |
181 | auto f = with_rpc_env({}, so, true, false, [co] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
182 | return seastar::async([&proto, make_socket, co] { | |
183 | test_rpc_proto::client c1(proto, co, make_socket(), ipv4_addr()); | |
184 | auto sum = proto.register_handler(1, [](int a, int b) { | |
185 | return make_ready_future<int>(a+b); | |
186 | }); | |
187 | auto result = sum(c1, 2, 3).get0(); | |
188 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
189 | c1.stop().get(); | |
190 | }); | |
191 | }).handle_exception([] (auto ep) { | |
192 | BOOST_FAIL("No exception expected"); | |
193 | }).finally([factory = std::move(factory), i, j = j & 1] { | |
194 | if (i == 1 && j == 1) { | |
195 | BOOST_REQUIRE_EQUAL(factory->use_compression, 2); | |
196 | } else { | |
197 | BOOST_REQUIRE_EQUAL(factory->use_compression, 0); | |
198 | } | |
199 | }); | |
200 | fs.emplace_back(std::move(f)); | |
201 | } | |
202 | } | |
203 | return when_all(fs.begin(), fs.end()).discard_result(); | |
204 | } | |
205 | ||
206 | SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) { | |
207 | auto factory1 = std::make_unique<cfactory>(); | |
208 | auto factory2 = std::make_unique<cfactory>("LZ4NEW"); | |
209 | rpc::server_options so; | |
210 | rpc::client_options co; | |
211 | static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()}); | |
212 | static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()}); | |
213 | so.compressor_factory = &server; | |
214 | co.compressor_factory = &client; | |
215 | return with_rpc_env({}, so, true, false, [co] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
216 | return seastar::async([&proto, make_socket, co] { | |
217 | test_rpc_proto::client c1(proto, co, make_socket(), ipv4_addr()); | |
218 | auto sum = proto.register_handler(1, [](int a, int b) { | |
219 | return make_ready_future<int>(a+b); | |
220 | }); | |
221 | auto result = sum(c1, 2, 3).get0(); | |
222 | BOOST_REQUIRE_EQUAL(result, 2 + 3); | |
223 | c1.stop().get(); | |
224 | }); | |
225 | }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] { | |
226 | BOOST_REQUIRE_EQUAL(factory1->use_compression, 0); | |
227 | BOOST_REQUIRE_EQUAL(factory2->use_compression, 2); | |
228 | }); | |
229 | } | |
230 | ||
231 | SEASTAR_TEST_CASE(test_rpc_connect_abort) { | |
232 | return with_rpc_env({}, {}, false, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
233 | return seastar::async([&proto, make_socket] { | |
234 | test_rpc_proto::client c1(proto, {}, make_socket(), ipv4_addr()); | |
235 | auto f = proto.register_handler(1, []() { return make_ready_future<>(); }); | |
236 | c1.stop().get0(); | |
237 | try { | |
238 | f(c1).get0(); | |
239 | BOOST_REQUIRE(false); | |
240 | } catch (...) {} | |
241 | }); | |
242 | }); | |
243 | } | |
244 | ||
245 | SEASTAR_TEST_CASE(test_rpc_cancel) { | |
246 | using namespace std::chrono_literals; | |
247 | return with_rpc_env({}, {}, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
248 | return seastar::async([&proto, make_socket] { | |
249 | test_rpc_proto::client c1(proto, {}, make_socket(), ipv4_addr()); | |
250 | bool rpc_executed = false; | |
251 | int good = 0; | |
252 | promise<> handler_called; | |
253 | future<> f_handler_called = handler_called.get_future(); | |
254 | auto call = proto.register_handler(1, [&rpc_executed, handler_called = std::move(handler_called)] () mutable { | |
255 | handler_called.set_value(); rpc_executed = true; return sleep(1ms); | |
256 | }); | |
257 | rpc::cancellable cancel; | |
258 | auto f = call(c1, cancel); | |
259 | // cancel send side | |
260 | cancel.cancel(); | |
261 | try { | |
262 | f.get(); | |
263 | } catch(rpc::canceled_error&) { | |
264 | good += !rpc_executed; | |
265 | }; | |
266 | f = call(c1, cancel); | |
267 | // cancel wait side | |
268 | f_handler_called.then([&cancel] { | |
269 | cancel.cancel(); | |
270 | }).get(); | |
271 | try { | |
272 | f.get(); | |
273 | } catch(rpc::canceled_error&) { | |
274 | good += 10*rpc_executed; | |
275 | }; | |
276 | c1.stop().get(); | |
277 | BOOST_REQUIRE_EQUAL(good, 11); | |
278 | }); | |
279 | }); | |
280 | } | |
281 | ||
282 | SEASTAR_TEST_CASE(test_message_to_big) { | |
283 | return with_rpc_env({0, 1, 100}, {}, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
284 | return seastar::async([&proto, make_socket] { | |
285 | test_rpc_proto::client c(proto, {}, make_socket(), ipv4_addr()); | |
286 | bool good = true; | |
287 | auto call = proto.register_handler(1, [&] (sstring payload) mutable { | |
288 | good = false; | |
289 | }); | |
290 | try { | |
291 | call(c, sstring(sstring::initialized_later(), 101)).get(); | |
292 | good = false; | |
293 | } catch(std::runtime_error& err) { | |
294 | } catch(...) { | |
295 | good = false; | |
296 | } | |
297 | c.stop().get(); | |
298 | BOOST_REQUIRE_EQUAL(good, true); | |
299 | }); | |
300 | }); | |
301 | } | |
302 | #endif | |
303 | ||
304 | struct stream_test_result { | |
305 | bool client_source_closed = false; | |
306 | bool server_source_closed = false; | |
307 | bool sink_exception = false; | |
308 | bool sink_close_exception = false; | |
309 | bool source_done_exception = false; | |
310 | bool server_done_exception = false; | |
311 | bool client_stop_exception = false; | |
312 | int server_sum = 0; | |
313 | }; | |
314 | ||
315 | future<stream_test_result> stream_test_func(test_rpc_proto& proto, make_socket_fn make_socket, bool stop_client) { | |
316 | return seastar::async([&proto, make_socket, stop_client] { | |
317 | stream_test_result r; | |
318 | test_rpc_proto::client c(proto, {}, make_socket(), ipv4_addr()); | |
319 | future<> server_done = make_ready_future(); | |
320 | proto.register_handler(1, [&](int i, rpc::source<int> source) { | |
321 | BOOST_REQUIRE_EQUAL(i, 666); | |
322 | auto sink = source.make_sink<serializer, sstring>(); | |
323 | auto sink_loop = seastar::async([sink] () mutable { | |
324 | for (auto i = 0; i < 100; i++) { | |
325 | sink("seastar").get(); | |
326 | sleep(std::chrono::milliseconds(1)).get(); | |
327 | } | |
328 | sink.close().get(); | |
329 | }); | |
330 | auto source_loop = seastar::async([source, &r] () mutable { | |
331 | while (!r.server_source_closed) { | |
332 | auto data = source().get0(); | |
333 | if (data) { | |
334 | r.server_sum += std::get<0>(*data); | |
335 | } else { | |
336 | r.server_source_closed = true; | |
337 | } | |
338 | } | |
339 | }); | |
340 | server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result(); | |
341 | return sink; | |
342 | }); | |
343 | auto call = proto.make_client<rpc::source<sstring> (int, rpc::sink<int>)>(1); | |
344 | auto x = [&] { | |
345 | try { | |
346 | return c.make_stream_sink<serializer, int>(make_socket()).get0(); | |
347 | } catch (...) { | |
348 | c.stop().get(); | |
349 | throw; | |
350 | } | |
351 | }; | |
352 | auto sink = x(); | |
353 | auto source = call(c, 666, sink).get0(); | |
354 | auto source_done = seastar::async([&] { | |
355 | while (!r.client_source_closed) { | |
356 | auto data = source().get0(); | |
357 | if (data) { | |
358 | BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar"); | |
359 | } else { | |
360 | r.client_source_closed = true; | |
361 | } | |
362 | } | |
363 | }); | |
364 | auto check_exception = [] (auto f) { | |
365 | try { | |
366 | f.get(); | |
367 | } catch (...) { | |
368 | return true; | |
369 | } | |
370 | return false; | |
371 | }; | |
372 | future<> stop_client_future = make_ready_future(); | |
373 | for (int i = 1; i < 101; i++) { | |
374 | if (stop_client && i == 50) { | |
375 | // stop client while stream is in use | |
376 | stop_client_future = c.stop(); | |
377 | } | |
378 | sleep(std::chrono::milliseconds(1)).get(); | |
379 | r.sink_exception = check_exception(sink(i)); | |
380 | if (r.sink_exception) { | |
381 | break; | |
382 | } | |
383 | } | |
384 | r.sink_close_exception = check_exception(sink.close()); | |
385 | r.source_done_exception = check_exception(std::move(source_done)); | |
386 | r.server_done_exception = check_exception(std::move(server_done)); | |
387 | r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future)); | |
388 | return r; | |
389 | }); | |
390 | } | |
391 | ||
392 | SEASTAR_TEST_CASE(test_stream_simple) { | |
393 | rpc::server_options so; | |
394 | so.streaming_domain = rpc::streaming_domain_type(1); | |
395 | return with_rpc_env({}, so, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
396 | return stream_test_func(proto, make_socket, false).then([] (stream_test_result r) { | |
397 | BOOST_REQUIRE(r.client_source_closed && | |
398 | r.server_source_closed && | |
399 | r.server_sum == 5050 && | |
400 | !r.sink_exception && | |
401 | !r.sink_close_exception && | |
402 | !r.source_done_exception && | |
403 | !r.server_done_exception && | |
404 | !r.client_stop_exception); | |
405 | }); | |
406 | }); | |
407 | } | |
408 | ||
409 | SEASTAR_TEST_CASE(test_stream_stop_client) { | |
410 | rpc::server_options so; | |
411 | so.streaming_domain = rpc::streaming_domain_type(1); | |
412 | return with_rpc_env({}, so, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
413 | return stream_test_func(proto, make_socket, true).then([] (stream_test_result r) { | |
414 | BOOST_REQUIRE(!r.client_source_closed && | |
415 | !r.server_source_closed && | |
416 | r.sink_exception && | |
417 | r.sink_close_exception && | |
418 | r.source_done_exception && | |
419 | r.server_done_exception && | |
420 | !r.client_stop_exception); | |
421 | }); | |
422 | }); | |
423 | } | |
424 | ||
425 | ||
426 | SEASTAR_TEST_CASE(test_stream_connection_error) { | |
427 | rpc::server_options so; | |
428 | so.streaming_domain = rpc::streaming_domain_type(1); | |
429 | return with_rpc_env({}, so, true, true, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
430 | return stream_test_func(proto, make_socket, false).then([] (stream_test_result r) { | |
431 | BOOST_REQUIRE(!r.client_source_closed && | |
432 | !r.server_source_closed && | |
433 | r.sink_exception && | |
434 | r.sink_close_exception && | |
435 | r.source_done_exception && | |
436 | r.server_done_exception && | |
437 | !r.client_stop_exception); | |
438 | }); | |
439 | }); | |
440 | } | |
441 | ||
442 | SEASTAR_TEST_CASE(test_rpc_scheduling) { | |
443 | return with_rpc_env({}, {}, true, false, [] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
444 | return seastar::async([&proto, make_socket] { | |
445 | test_rpc_proto::client c1(proto, {}, make_socket(), ipv4_addr()); | |
446 | auto sg = create_scheduling_group("rpc", 100).get0(); | |
447 | auto call = proto.register_handler(1, sg, [sg] () mutable { | |
448 | BOOST_REQUIRE(sg == current_scheduling_group()); | |
449 | return make_ready_future<>(); | |
450 | }); | |
451 | call(c1).get(); | |
452 | c1.stop().get(); | |
453 | }); | |
454 | }); | |
455 | } | |
456 | ||
457 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) { | |
458 | auto sg1 = create_scheduling_group("sg1", 100).get0(); | |
459 | auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); }); | |
460 | auto sg2 = create_scheduling_group("sg2", 100).get0(); | |
461 | auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); }); | |
462 | rpc::resource_limits limits; | |
463 | limits.isolate_connection = [sg1, sg2] (sstring cookie) { | |
464 | auto sg = current_scheduling_group(); | |
465 | if (cookie == "sg1") { | |
466 | sg = sg1; | |
467 | } else if (cookie == "sg2") { | |
468 | sg = sg2; | |
469 | } | |
470 | rpc::isolation_config cfg; | |
471 | cfg.sched_group = sg; | |
472 | return cfg; | |
473 | }; | |
474 | with_rpc_env(limits, {}, true, false, [sg1, sg2] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
475 | return async([&proto, make_socket, sg1, sg2] { | |
476 | rpc::client_options co1; | |
477 | co1.isolation_cookie = "sg1"; | |
478 | test_rpc_proto::client c1(proto, co1, make_socket(), ipv4_addr()); | |
479 | rpc::client_options co2; | |
480 | co2.isolation_cookie = "sg2"; | |
481 | test_rpc_proto::client c2(proto, co2, make_socket(), ipv4_addr()); | |
482 | auto call = proto.register_handler(1, [sg1, sg2] (int which) mutable { | |
483 | scheduling_group expected; | |
484 | if (which == 1) { | |
485 | expected = sg1; | |
486 | } else if (which == 2) { | |
487 | expected = sg2; | |
488 | } | |
489 | BOOST_REQUIRE(current_scheduling_group() == expected); | |
490 | return make_ready_future<>(); | |
491 | }); | |
492 | call(c1, 1).get(); | |
493 | call(c2, 2).get(); | |
494 | c1.stop().get(); | |
495 | c2.stop().get(); | |
496 | }); | |
497 | }).get(); | |
498 | } | |
499 | ||
500 | SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) { | |
501 | auto sg1 = create_scheduling_group("sg1", 100).get0(); | |
502 | auto sg1_kill = defer([&] { destroy_scheduling_group(sg1).get(); }); | |
503 | auto sg2 = create_scheduling_group("sg2", 100).get0(); | |
504 | auto sg2_kill = defer([&] { destroy_scheduling_group(sg2).get(); }); | |
505 | rpc::resource_limits limits; | |
506 | limits.isolate_connection = [sg1, sg2] (sstring cookie) { | |
507 | auto sg = current_scheduling_group(); | |
508 | if (cookie == "sg1") { | |
509 | sg = sg1; | |
510 | } else if (cookie == "sg2") { | |
511 | sg = sg2; | |
512 | } | |
513 | rpc::isolation_config cfg; | |
514 | cfg.sched_group = sg; | |
515 | return cfg; | |
516 | }; | |
517 | with_rpc_env(limits, {}, true, false, [sg1, sg2] (test_rpc_proto& proto, test_rpc_proto::server& s, make_socket_fn make_socket) { | |
518 | return async([&proto, make_socket, sg1, sg2] { | |
519 | rpc::client_options co1; | |
520 | co1.isolation_cookie = "sg1"; | |
521 | test_rpc_proto::client c1(proto, co1, make_socket(), ipv4_addr()); | |
522 | rpc::client_options co2; | |
523 | co2.isolation_cookie = "sg2"; | |
524 | test_rpc_proto::client c2(proto, co2, make_socket(), ipv4_addr()); | |
525 | // An old client, that doesn't have an isolation cookie | |
526 | rpc::client_options co3; | |
527 | test_rpc_proto::client c3(proto, co3, make_socket(), ipv4_addr()); | |
528 | // A server that uses sg1 if the client is old | |
529 | auto call = proto.register_handler(1, sg1, [sg1, sg2] (int which) mutable { | |
530 | scheduling_group expected; | |
531 | if (which == 1) { | |
532 | expected = sg1; | |
533 | } else if (which == 2) { | |
534 | expected = sg2; | |
535 | } | |
536 | BOOST_REQUIRE(current_scheduling_group() == expected); | |
537 | return make_ready_future<>(); | |
538 | }); | |
539 | call(c1, 1).get(); | |
540 | call(c2, 2).get(); | |
541 | call(c3, 1).get(); | |
542 | c1.stop().get(); | |
543 | c2.stop().get(); | |
544 | c3.stop().get(); | |
545 | }); | |
546 | }).get(); | |
547 | } |