]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/test_rgw_amqp.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / rgw / test_rgw_amqp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_amqp.h"
5 #include "common/ceph_context.h"
6 #include "amqp_mock.h"
7 #include <gtest/gtest.h>
8 #include <chrono>
9 #include <thread>
10 #include <atomic>
11
12 using namespace rgw;
13
14 const std::chrono::milliseconds wait_time(10);
15 const std::chrono::milliseconds long_wait_time = wait_time*50;
16 const std::chrono::seconds idle_time(35);
17
18
19 class CctCleaner {
20 CephContext* cct;
21 public:
22 CctCleaner(CephContext* _cct) : cct(_cct) {}
23 ~CctCleaner() {
24 #ifdef WITH_SEASTAR
25 delete cct;
26 #else
27 cct->put();
28 #endif
29 }
30 };
31
32 auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT);
33
34 CctCleaner cleaner(cct);
35
36 class TestAMQP : public ::testing::Test {
37 protected:
38 amqp::connection_id_t conn_id;
39 unsigned current_dequeued = 0U;
40
41 void SetUp() override {
42 ASSERT_TRUE(amqp::init(cct));
43 }
44
45 void TearDown() override {
46 amqp::shutdown();
47 }
48
49 // wait for at least one new (since last drain) message to be dequeueud
50 // and then wait for all pending answers to be received
51 void wait_until_drained() {
52 while (amqp::get_dequeued() == current_dequeued) {
53 std::this_thread::sleep_for(wait_time);
54 }
55 while (amqp::get_inflight() > 0) {
56 std::this_thread::sleep_for(wait_time);
57 }
58 current_dequeued = amqp::get_dequeued();
59 }
60 };
61
62 std::atomic<bool> callback_invoked = false;
63
64 std::atomic<int> callbacks_invoked = 0;
65
66 // note: because these callback are shared among different "publish" calls
67 // they should be used on different connections
68
69 void my_callback_expect_ack(int rc) {
70 EXPECT_EQ(0, rc);
71 callback_invoked = true;
72 }
73
74 void my_callback_expect_nack(int rc) {
75 EXPECT_LT(rc, 0);
76 callback_invoked = true;
77 }
78
79 void my_callback_expect_multiple_acks(int rc) {
80 EXPECT_EQ(0, rc);
81 ++callbacks_invoked;
82 }
83
84 class dynamic_callback_wrapper {
85 dynamic_callback_wrapper() = default;
86 public:
87 static dynamic_callback_wrapper* create() {
88 return new dynamic_callback_wrapper;
89 }
90 void callback(int rc) {
91 EXPECT_EQ(0, rc);
92 ++callbacks_invoked;
93 delete this;
94 }
95 };
96
97 void my_callback_expect_close_or_ack(int rc) {
98 // deleting the connection should trigger the callback with -4098
99 // but due to race conditions, some my get an ack
100 EXPECT_TRUE(-4098 == rc || 0 == rc);
101 }
102
103 TEST_F(TestAMQP, ConnectionOK)
104 {
105 const auto connection_number = amqp::get_connection_count();
106 auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none);
107 EXPECT_TRUE(rc);
108 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
109 rc = amqp::publish(conn_id, "topic", "message");
110 EXPECT_EQ(rc, 0);
111 }
112
113 TEST_F(TestAMQP, SSLConnectionOK)
114 {
115 const int port = 5671;
116 const auto connection_number = amqp::get_connection_count();
117 amqp_mock::set_valid_port(port);
118 auto rc = amqp::connect(conn_id, "amqps://localhost", "ex1", false, false, boost::none);
119 EXPECT_TRUE(rc);
120 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
121 rc = amqp::publish(conn_id, "topic", "message");
122 EXPECT_EQ(rc, 0);
123 amqp_mock::set_valid_port(5672);
124 }
125
126 TEST_F(TestAMQP, PlainAndSSLConnectionsOK)
127 {
128 const int port = 5671;
129 const auto connection_number = amqp::get_connection_count();
130 amqp_mock::set_valid_port(port);
131 amqp::connection_id_t conn_id1;
132 auto rc = amqp::connect(conn_id1, "amqps://localhost", "ex1", false, false, boost::none);
133 EXPECT_TRUE(rc);
134 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
135 rc = amqp::publish(conn_id1, "topic", "message");
136 EXPECT_EQ(rc, 0);
137 EXPECT_EQ(amqp::to_string(conn_id1), "amqps://localhost:5671/?exchange=ex1");
138 amqp_mock::set_valid_port(5672);
139 amqp::connection_id_t conn_id2;
140 rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none);
141 EXPECT_TRUE(rc);
142 EXPECT_EQ(amqp::to_string(conn_id2), "amqp://localhost:5672/?exchange=ex1");
143 EXPECT_EQ(amqp::get_connection_count(), connection_number + 2);
144 rc = amqp::publish(conn_id2, "topic", "message");
145 EXPECT_EQ(rc, 0);
146 }
147
148 TEST_F(TestAMQP, ConnectionReuse)
149 {
150 amqp::connection_id_t conn_id1;
151 auto rc = amqp::connect(conn_id1, "amqp://localhost", "ex1", false, false, boost::none);
152 EXPECT_TRUE(rc);
153 const auto connection_number = amqp::get_connection_count();
154 amqp::connection_id_t conn_id2;
155 rc = amqp::connect(conn_id2, "amqp://localhost", "ex1", false, false, boost::none);
156 EXPECT_TRUE(rc);
157 EXPECT_EQ(amqp::get_connection_count(), connection_number);
158 rc = amqp::publish(conn_id1, "topic", "message");
159 EXPECT_EQ(rc, 0);
160 }
161
162 TEST_F(TestAMQP, NameResolutionFail)
163 {
164 callback_invoked = false;
165 const auto connection_number = amqp::get_connection_count();
166 amqp::connection_id_t conn_id;
167 auto rc = amqp::connect(conn_id, "amqp://kaboom", "ex1", false, false, boost::none);
168 EXPECT_TRUE(rc);
169 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
170 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
171 EXPECT_EQ(rc, 0);
172 wait_until_drained();
173 EXPECT_TRUE(callback_invoked);
174 }
175
176 TEST_F(TestAMQP, InvalidPort)
177 {
178 callback_invoked = false;
179 const auto connection_number = amqp::get_connection_count();
180 amqp::connection_id_t conn_id;
181 auto rc = amqp::connect(conn_id, "amqp://localhost:1234", "ex1", false, false, boost::none);
182 EXPECT_TRUE(rc);
183 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
184 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
185 EXPECT_EQ(rc, 0);
186 wait_until_drained();
187 EXPECT_TRUE(callback_invoked);
188 }
189
190 TEST_F(TestAMQP, InvalidHost)
191 {
192 callback_invoked = false;
193 const auto connection_number = amqp::get_connection_count();
194 amqp::connection_id_t conn_id;
195 auto rc = amqp::connect(conn_id, "amqp://0.0.0.1", "ex1", false, false, boost::none);
196 EXPECT_TRUE(rc);
197 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
198 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
199 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
200 EXPECT_EQ(rc, 0);
201 wait_until_drained();
202 EXPECT_TRUE(callback_invoked);
203 }
204
205 TEST_F(TestAMQP, InvalidVhost)
206 {
207 callback_invoked = false;
208 const auto connection_number = amqp::get_connection_count();
209 amqp::connection_id_t conn_id;
210 auto rc = amqp::connect(conn_id, "amqp://localhost/kaboom", "ex1", false, false, boost::none);
211 EXPECT_TRUE(rc);
212 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
213 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
214 EXPECT_EQ(rc, 0);
215 wait_until_drained();
216 EXPECT_TRUE(callback_invoked);
217 }
218
219 TEST_F(TestAMQP, UserPassword)
220 {
221 amqp_mock::set_valid_host("127.0.0.1");
222 {
223 callback_invoked = false;
224 const auto connection_number = amqp::get_connection_count();
225 amqp::connection_id_t conn_id;
226 auto rc = amqp::connect(conn_id, "amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
227 EXPECT_TRUE(rc);
228 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
229 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
230 EXPECT_EQ(rc, 0);
231 wait_until_drained();
232 EXPECT_TRUE(callback_invoked);
233 }
234 // now try the same connection with default user/password
235 amqp_mock::set_valid_host("127.0.0.2");
236 {
237 callback_invoked = false;
238 const auto connection_number = amqp::get_connection_count();
239 amqp::connection_id_t conn_id;
240 auto rc = amqp::connect(conn_id, "amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
241 EXPECT_TRUE(rc);
242 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
243 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
244 EXPECT_EQ(rc, 0);
245 wait_until_drained();
246 EXPECT_TRUE(callback_invoked);
247 }
248 amqp_mock::set_valid_host("localhost");
249 }
250
251 TEST_F(TestAMQP, URLParseError)
252 {
253 callback_invoked = false;
254 const auto connection_number = amqp::get_connection_count();
255 amqp::connection_id_t conn_id;
256 auto rc = amqp::connect(conn_id, "http://localhost", "ex1", false, false, boost::none);
257 EXPECT_FALSE(rc);
258 EXPECT_EQ(amqp::get_connection_count(), connection_number);
259 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
260 EXPECT_EQ(rc, 0);
261 wait_until_drained();
262 EXPECT_TRUE(callback_invoked);
263 }
264
265 TEST_F(TestAMQP, ExchangeMismatch)
266 {
267 callback_invoked = false;
268 const auto connection_number = amqp::get_connection_count();
269 amqp::connection_id_t conn_id;
270 auto rc = amqp::connect(conn_id, "http://localhost", "ex2", false, false, boost::none);
271 EXPECT_FALSE(rc);
272 EXPECT_EQ(amqp::get_connection_count(), connection_number);
273 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
274 EXPECT_EQ(rc, 0);
275 wait_until_drained();
276 EXPECT_TRUE(callback_invoked);
277 }
278
279 TEST_F(TestAMQP, MaxConnections)
280 {
281 // fill up all connections
282 std::vector<amqp::connection_id_t> connections;
283 auto remaining_connections = amqp::get_max_connections() - amqp::get_connection_count();
284 while (remaining_connections > 0) {
285 const auto host = "127.10.0." + std::to_string(remaining_connections);
286 amqp_mock::set_valid_host(host);
287 amqp::connection_id_t conn_id;
288 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
289 EXPECT_TRUE(rc);
290 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
291 EXPECT_EQ(rc, 0);
292 --remaining_connections;
293 connections.push_back(conn_id);
294 }
295 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
296 wait_until_drained();
297 // try to add another connection
298 {
299 const std::string host = "toomany";
300 amqp_mock::set_valid_host(host);
301 amqp::connection_id_t conn_id;
302 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
303 EXPECT_FALSE(rc);
304 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
305 EXPECT_EQ(rc, 0);
306 wait_until_drained();
307 }
308 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
309 amqp_mock::set_valid_host("localhost");
310 }
311
312
313 TEST_F(TestAMQP, ReceiveAck)
314 {
315 callback_invoked = false;
316 const std::string host("localhost1");
317 amqp_mock::set_valid_host(host);
318 amqp::connection_id_t conn_id;
319 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
320 EXPECT_TRUE(rc);
321 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
322 EXPECT_EQ(rc, 0);
323 wait_until_drained();
324 EXPECT_TRUE(callback_invoked);
325 amqp_mock::set_valid_host("localhost");
326 }
327
328 TEST_F(TestAMQP, ImplicitConnectionClose)
329 {
330 callback_invoked = false;
331 const std::string host("localhost1");
332 amqp_mock::set_valid_host(host);
333 amqp::connection_id_t conn_id;
334 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
335 EXPECT_TRUE(rc);
336 const auto NUMBER_OF_CALLS = 2000;
337 for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
338 auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_close_or_ack);
339 EXPECT_EQ(rc, 0);
340 }
341 wait_until_drained();
342 amqp_mock::set_valid_host("localhost");
343 }
344
345 TEST_F(TestAMQP, ReceiveMultipleAck)
346 {
347 callbacks_invoked = 0;
348 const std::string host("localhost1");
349 amqp_mock::set_valid_host(host);
350 amqp::connection_id_t conn_id;
351 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
352 EXPECT_TRUE(rc);
353 const auto NUMBER_OF_CALLS = 100;
354 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
355 auto rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks);
356 EXPECT_EQ(rc, 0);
357 }
358 wait_until_drained();
359 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
360 callbacks_invoked = 0;
361 amqp_mock::set_valid_host("localhost");
362 }
363
364 TEST_F(TestAMQP, ReceiveAckForMultiple)
365 {
366 callbacks_invoked = 0;
367 const std::string host("localhost1");
368 amqp_mock::set_valid_host(host);
369 amqp::connection_id_t conn_id;
370 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
371 EXPECT_TRUE(rc);
372 amqp_mock::set_multiple(59);
373 const auto NUMBER_OF_CALLS = 100;
374 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
375 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_multiple_acks);
376 EXPECT_EQ(rc, 0);
377 }
378 wait_until_drained();
379 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
380 callbacks_invoked = 0;
381 amqp_mock::set_valid_host("localhost");
382 }
383
384 TEST_F(TestAMQP, DynamicCallback)
385 {
386 callbacks_invoked = 0;
387 const std::string host("localhost1");
388 amqp_mock::set_valid_host(host);
389 amqp::connection_id_t conn_id;
390 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
391 EXPECT_TRUE(rc);
392 amqp_mock::set_multiple(59);
393 const auto NUMBER_OF_CALLS = 100;
394 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
395 rc = publish_with_confirm(conn_id, "topic", "message",
396 std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
397 EXPECT_EQ(rc, 0);
398 }
399 wait_until_drained();
400 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
401 callbacks_invoked = 0;
402 amqp_mock::set_valid_host("localhost");
403 }
404
405 TEST_F(TestAMQP, ReceiveNack)
406 {
407 callback_invoked = false;
408 amqp_mock::REPLY_ACK = false;
409 const std::string host("localhost2");
410 amqp_mock::set_valid_host(host);
411 amqp::connection_id_t conn_id;
412 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
413 EXPECT_TRUE(rc);
414 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
415 EXPECT_EQ(rc, 0);
416 wait_until_drained();
417 EXPECT_TRUE(callback_invoked);
418 amqp_mock::REPLY_ACK = true;
419 callback_invoked = false;
420 amqp_mock::set_valid_host("localhost");
421 }
422
423 TEST_F(TestAMQP, FailWrite)
424 {
425 callback_invoked = false;
426 amqp_mock::FAIL_NEXT_WRITE = true;
427 const std::string host("localhost2");
428 amqp_mock::set_valid_host(host);
429 amqp::connection_id_t conn_id;
430 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
431 EXPECT_TRUE(rc);
432 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
433 EXPECT_EQ(rc, 0);
434 wait_until_drained();
435 EXPECT_TRUE(callback_invoked);
436 amqp_mock::FAIL_NEXT_WRITE = false;
437 callback_invoked = false;
438 amqp_mock::set_valid_host("localhost");
439 }
440
441 TEST_F(TestAMQP, RetryInvalidHost)
442 {
443 callback_invoked = false;
444 const std::string host = "192.168.0.1";
445 const auto connection_number = amqp::get_connection_count();
446 amqp::connection_id_t conn_id;
447 auto rc = amqp::connect(conn_id, "amqp://"+host, "ex1", false, false, boost::none);
448 EXPECT_TRUE(rc);
449 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
450 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
451 EXPECT_EQ(rc, 0);
452 wait_until_drained();
453 EXPECT_TRUE(callback_invoked);
454 // now next retry should be ok
455 callback_invoked = false;
456 amqp_mock::set_valid_host(host);
457 std::this_thread::sleep_for(long_wait_time);
458 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
459 EXPECT_EQ(rc, 0);
460 wait_until_drained();
461 EXPECT_TRUE(callback_invoked);
462 amqp_mock::set_valid_host("localhost");
463 }
464
465 TEST_F(TestAMQP, RetryInvalidPort)
466 {
467 callback_invoked = false;
468 const int port = 9999;
469 const auto connection_number = amqp::get_connection_count();
470 amqp::connection_id_t conn_id;
471 auto rc = amqp::connect(conn_id, "amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
472 EXPECT_TRUE(rc);
473 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
474 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
475 EXPECT_EQ(rc, 0);
476 wait_until_drained();
477 EXPECT_TRUE(callback_invoked);
478 // now next retry should be ok
479 callback_invoked = false;
480 amqp_mock::set_valid_port(port);
481 std::this_thread::sleep_for(long_wait_time);
482 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
483 EXPECT_EQ(rc, 0);
484 wait_until_drained();
485 EXPECT_TRUE(callback_invoked);
486 amqp_mock::set_valid_port(5672);
487 }
488
489 TEST_F(TestAMQP, RetryFailWrite)
490 {
491 callback_invoked = false;
492 amqp_mock::FAIL_NEXT_WRITE = true;
493 const std::string host("localhost2");
494 amqp_mock::set_valid_host(host);
495 amqp::connection_id_t conn_id;
496 auto rc = amqp::connect(conn_id, "amqp://" + host, "ex1", false, false, boost::none);
497 EXPECT_TRUE(rc);
498 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
499 EXPECT_EQ(rc, 0);
500 wait_until_drained();
501 EXPECT_TRUE(callback_invoked);
502 // now next retry should be ok
503 amqp_mock::FAIL_NEXT_WRITE = false;
504 callback_invoked = false;
505 std::this_thread::sleep_for(long_wait_time);
506 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_ack);
507 EXPECT_EQ(rc, 0);
508 wait_until_drained();
509 EXPECT_TRUE(callback_invoked);
510 amqp_mock::set_valid_host("localhost");
511 }
512
513 TEST_F(TestAMQP, IdleConnection)
514 {
515 // this test is skipped since it takes 30seconds
516 //GTEST_SKIP();
517 const auto connection_number = amqp::get_connection_count();
518 amqp::connection_id_t conn_id;
519 auto rc = amqp::connect(conn_id, "amqp://localhost", "ex1", false, false, boost::none);
520 EXPECT_TRUE(rc);
521 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
522 std::this_thread::sleep_for(idle_time);
523 EXPECT_EQ(amqp::get_connection_count(), connection_number);
524 rc = publish_with_confirm(conn_id, "topic", "message", my_callback_expect_nack);
525 EXPECT_EQ(rc, 0);
526 wait_until_drained();
527 EXPECT_TRUE(callback_invoked);
528 }
529