]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/rgw/test_rgw_amqp.cc
import 15.2.4
[ceph.git] / ceph / src / test / rgw / test_rgw_amqp.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "rgw/rgw_amqp.h"
eafe8130 5#include "common/ceph_context.h"
11fdf7f2
TL
6#include "amqp_mock.h"
7#include <gtest/gtest.h>
8#include <chrono>
9#include <thread>
10#include <atomic>
11
12using namespace rgw;
13
eafe8130
TL
14const std::chrono::milliseconds wait_time(10);
15const std::chrono::milliseconds long_wait_time = wait_time*50;
11fdf7f2 16
eafe8130
TL
17
18class CctCleaner {
19 CephContext* cct;
20public:
21 CctCleaner(CephContext* _cct) : cct(_cct) {}
22 ~CctCleaner() {
23#ifdef WITH_SEASTAR
24 delete cct;
25#else
26 cct->put();
27#endif
28 }
29};
30
31auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT);
32
33CctCleaner cleaner(cct);
34
35class TestAMQP : public ::testing::Test {
36protected:
37 amqp::connection_ptr_t conn = nullptr;
38 unsigned current_dequeued = 0U;
39
40 void SetUp() override {
41 ASSERT_TRUE(amqp::init(cct));
42 }
43
44 void TearDown() override {
45 amqp::shutdown();
46 }
47
48 // wait for at least one new (since last drain) message to be dequeueud
49 // and then wait for all pending answers to be received
50 void wait_until_drained() {
51 while (amqp::get_dequeued() == current_dequeued) {
52 std::this_thread::sleep_for(wait_time);
53 }
54 while (amqp::get_inflight() > 0) {
55 std::this_thread::sleep_for(wait_time);
56 }
57 current_dequeued = amqp::get_dequeued();
58 }
59};
60
61TEST_F(TestAMQP, ConnectionOK)
11fdf7f2
TL
62{
63 const auto connection_number = amqp::get_connection_count();
e306af50 64 conn = amqp::connect("amqp://localhost", "ex1", false);
11fdf7f2
TL
65 EXPECT_TRUE(conn);
66 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
67 auto rc = amqp::publish(conn, "topic", "message");
68 EXPECT_EQ(rc, 0);
69}
70
eafe8130 71TEST_F(TestAMQP, ConnectionReuse)
11fdf7f2 72{
e306af50 73 amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false);
eafe8130 74 EXPECT_TRUE(conn1);
11fdf7f2 75 const auto connection_number = amqp::get_connection_count();
e306af50 76 amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false);
eafe8130 77 EXPECT_TRUE(conn2);
11fdf7f2 78 EXPECT_EQ(amqp::get_connection_count(), connection_number);
eafe8130 79 auto rc = amqp::publish(conn1, "topic", "message");
11fdf7f2
TL
80 EXPECT_EQ(rc, 0);
81}
82
eafe8130 83TEST_F(TestAMQP, NameResolutionFail)
11fdf7f2
TL
84{
85 const auto connection_number = amqp::get_connection_count();
e306af50 86 conn = amqp::connect("amqp://kaboom", "ex1", false);
11fdf7f2
TL
87 EXPECT_TRUE(conn);
88 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
89 auto rc = amqp::publish(conn, "topic", "message");
90 EXPECT_LT(rc, 0);
91}
92
eafe8130 93TEST_F(TestAMQP, InvalidPort)
11fdf7f2
TL
94{
95 const auto connection_number = amqp::get_connection_count();
e306af50 96 conn = amqp::connect("amqp://localhost:1234", "ex1", false);
11fdf7f2
TL
97 EXPECT_TRUE(conn);
98 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
99 auto rc = amqp::publish(conn, "topic", "message");
100 EXPECT_LT(rc, 0);
101}
102
eafe8130 103TEST_F(TestAMQP, InvalidHost)
11fdf7f2
TL
104{
105 const auto connection_number = amqp::get_connection_count();
e306af50 106 conn = amqp::connect("amqp://0.0.0.1", "ex1", false);
11fdf7f2
TL
107 EXPECT_TRUE(conn);
108 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
109 auto rc = amqp::publish(conn, "topic", "message");
110 EXPECT_LT(rc, 0);
111}
112
eafe8130 113TEST_F(TestAMQP, InvalidVhost)
11fdf7f2
TL
114{
115 const auto connection_number = amqp::get_connection_count();
e306af50 116 conn = amqp::connect("amqp://localhost/kaboom", "ex1", false);
11fdf7f2
TL
117 EXPECT_TRUE(conn);
118 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
119 auto rc = amqp::publish(conn, "topic", "message");
120 EXPECT_LT(rc, 0);
121}
122
eafe8130 123TEST_F(TestAMQP, UserPassword)
11fdf7f2
TL
124{
125 amqp_mock::set_valid_host("127.0.0.1");
126 {
127 const auto connection_number = amqp::get_connection_count();
e306af50 128 conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false);
11fdf7f2
TL
129 EXPECT_TRUE(conn);
130 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
131 auto rc = amqp::publish(conn, "topic", "message");
132 EXPECT_LT(rc, 0);
133 }
134 // now try the same connection with default user/password
135 amqp_mock::set_valid_host("127.0.0.2");
136 {
137 const auto connection_number = amqp::get_connection_count();
e306af50 138 conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false);
11fdf7f2
TL
139 EXPECT_TRUE(conn);
140 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
141 auto rc = amqp::publish(conn, "topic", "message");
142 EXPECT_EQ(rc, 0);
143 }
144 amqp_mock::set_valid_host("localhost");
145}
146
eafe8130 147TEST_F(TestAMQP, URLParseError)
11fdf7f2
TL
148{
149 const auto connection_number = amqp::get_connection_count();
e306af50 150 conn = amqp::connect("http://localhost", "ex1", false);
11fdf7f2
TL
151 EXPECT_FALSE(conn);
152 EXPECT_EQ(amqp::get_connection_count(), connection_number);
153 auto rc = amqp::publish(conn, "topic", "message");
154 EXPECT_LT(rc, 0);
155}
156
eafe8130 157TEST_F(TestAMQP, ExchangeMismatch)
11fdf7f2
TL
158{
159 const auto connection_number = amqp::get_connection_count();
e306af50 160 conn = amqp::connect("http://localhost", "ex2", false);
11fdf7f2
TL
161 EXPECT_FALSE(conn);
162 EXPECT_EQ(amqp::get_connection_count(), connection_number);
163 auto rc = amqp::publish(conn, "topic", "message");
164 EXPECT_LT(rc, 0);
165}
166
eafe8130 167TEST_F(TestAMQP, MaxConnections)
11fdf7f2
TL
168{
169 // fill up all connections
170 std::vector<amqp::connection_ptr_t> connections;
171 auto remaining_connections = amqp::get_max_connections() - amqp::get_connection_count();
172 while (remaining_connections > 0) {
173 const auto host = "127.10.0." + std::to_string(remaining_connections);
174 amqp_mock::set_valid_host(host);
e306af50 175 amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
11fdf7f2
TL
176 EXPECT_TRUE(conn);
177 auto rc = amqp::publish(conn, "topic", "message");
178 EXPECT_EQ(rc, 0);
179 --remaining_connections;
180 connections.push_back(conn);
181 }
182 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
183 // try to add another connection
184 {
185 const std::string host = "toomany";
186 amqp_mock::set_valid_host(host);
e306af50 187 amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
11fdf7f2
TL
188 EXPECT_FALSE(conn);
189 auto rc = amqp::publish(conn, "topic", "message");
190 EXPECT_LT(rc, 0);
191 }
192 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
193 amqp_mock::set_valid_host("localhost");
194 // delete connections to make space for new ones
195 for (auto conn : connections) {
196 EXPECT_TRUE(amqp::disconnect(conn));
197 }
198 // wait for them to be deleted
eafe8130 199 std::this_thread::sleep_for(long_wait_time);
11fdf7f2
TL
200 EXPECT_LT(amqp::get_connection_count(), amqp::get_max_connections());
201}
202
203std::atomic<bool> callback_invoked = false;
204
eafe8130
TL
205std::atomic<int> callbacks_invoked = 0;
206
11fdf7f2
TL
207// note: because these callback are shared among different "publish" calls
208// they should be used on different connections
209
210void my_callback_expect_ack(int rc) {
211 EXPECT_EQ(0, rc);
212 callback_invoked = true;
213}
214
215void my_callback_expect_nack(int rc) {
216 EXPECT_LT(rc, 0);
217 callback_invoked = true;
218}
219
eafe8130
TL
220void my_callback_expect_multiple_acks(int rc) {
221 EXPECT_EQ(0, rc);
222 ++callbacks_invoked;
223}
224
225class dynamic_callback_wrapper {
226 dynamic_callback_wrapper() = default;
227public:
228 static dynamic_callback_wrapper* create() {
229 return new dynamic_callback_wrapper;
230 }
231 void callback(int rc) {
232 EXPECT_EQ(0, rc);
233 ++callbacks_invoked;
234 delete this;
235 }
236};
237
238void my_callback_expect_close_or_ack(int rc) {
239 // deleting the connection should trigger the callback with -4098
240 // but due to race conditions, some my get an ack
241 EXPECT_TRUE(-4098 == rc || 0 == rc);
242}
243
244TEST_F(TestAMQP, ReceiveAck)
11fdf7f2
TL
245{
246 callback_invoked = false;
247 const std::string host("localhost1");
248 amqp_mock::set_valid_host(host);
e306af50 249 conn = amqp::connect("amqp://" + host, "ex1", false);
11fdf7f2
TL
250 EXPECT_TRUE(conn);
251 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
252 EXPECT_EQ(rc, 0);
eafe8130 253 wait_until_drained();
11fdf7f2 254 EXPECT_TRUE(callback_invoked);
eafe8130
TL
255 amqp_mock::set_valid_host("localhost");
256}
257
258TEST_F(TestAMQP, ImplicitConnectionClose)
259{
11fdf7f2 260 callback_invoked = false;
eafe8130
TL
261 const std::string host("localhost1");
262 amqp_mock::set_valid_host(host);
e306af50 263 conn = amqp::connect("amqp://" + host, "ex1", false);
eafe8130
TL
264 EXPECT_TRUE(conn);
265 const auto NUMBER_OF_CALLS = 2000;
266 for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
267 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack);
268 EXPECT_EQ(rc, 0);
269 }
270 wait_until_drained();
271 // deleting the connection object should close the connection
272 conn.reset(nullptr);
273 amqp_mock::set_valid_host("localhost");
274}
275
276TEST_F(TestAMQP, ReceiveMultipleAck)
277{
278 callbacks_invoked = 0;
279 const std::string host("localhost1");
280 amqp_mock::set_valid_host(host);
e306af50 281 conn = amqp::connect("amqp://" + host, "ex1", false);
eafe8130
TL
282 EXPECT_TRUE(conn);
283 const auto NUMBER_OF_CALLS = 100;
284 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
285 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
286 EXPECT_EQ(rc, 0);
287 }
288 wait_until_drained();
289 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
290 callbacks_invoked = 0;
291 amqp_mock::set_valid_host("localhost");
292}
293
294TEST_F(TestAMQP, ReceiveAckForMultiple)
295{
296 callbacks_invoked = 0;
297 const std::string host("localhost1");
298 amqp_mock::set_valid_host(host);
e306af50 299 conn = amqp::connect("amqp://" + host, "ex1", false);
eafe8130
TL
300 EXPECT_TRUE(conn);
301 amqp_mock::set_multiple(59);
302 const auto NUMBER_OF_CALLS = 100;
303 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
304 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
305 EXPECT_EQ(rc, 0);
306 }
307 wait_until_drained();
308 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
309 callbacks_invoked = 0;
310 amqp_mock::set_valid_host("localhost");
311}
312
313TEST_F(TestAMQP, DynamicCallback)
314{
315 callbacks_invoked = 0;
316 const std::string host("localhost1");
317 amqp_mock::set_valid_host(host);
e306af50 318 conn = amqp::connect("amqp://" + host, "ex1", false);
eafe8130
TL
319 EXPECT_TRUE(conn);
320 amqp_mock::set_multiple(59);
321 const auto NUMBER_OF_CALLS = 100;
322 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
323 auto rc = publish_with_confirm(conn, "topic", "message",
324 std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
325 EXPECT_EQ(rc, 0);
326 }
327 wait_until_drained();
328 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
329 callbacks_invoked = 0;
11fdf7f2
TL
330 amqp_mock::set_valid_host("localhost");
331}
332
eafe8130 333TEST_F(TestAMQP, ReceiveNack)
11fdf7f2
TL
334{
335 callback_invoked = false;
336 amqp_mock::REPLY_ACK = false;
337 const std::string host("localhost2");
338 amqp_mock::set_valid_host(host);
e306af50 339 conn = amqp::connect("amqp://" + host, "ex1", false);
11fdf7f2
TL
340 EXPECT_TRUE(conn);
341 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
342 EXPECT_EQ(rc, 0);
eafe8130 343 wait_until_drained();
11fdf7f2
TL
344 EXPECT_TRUE(callback_invoked);
345 amqp_mock::REPLY_ACK = true;
346 callback_invoked = false;
347 amqp_mock::set_valid_host("localhost");
348}
349
eafe8130 350TEST_F(TestAMQP, FailWrite)
11fdf7f2
TL
351{
352 callback_invoked = false;
353 amqp_mock::FAIL_NEXT_WRITE = true;
354 const std::string host("localhost2");
355 amqp_mock::set_valid_host(host);
e306af50 356 conn = amqp::connect("amqp://" + host, "ex1", false);
11fdf7f2
TL
357 EXPECT_TRUE(conn);
358 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
359 EXPECT_EQ(rc, 0);
eafe8130 360 wait_until_drained();
11fdf7f2
TL
361 EXPECT_TRUE(callback_invoked);
362 amqp_mock::FAIL_NEXT_WRITE = false;
363 callback_invoked = false;
364 amqp_mock::set_valid_host("localhost");
365}
366
eafe8130 367TEST_F(TestAMQP, ClosedConnection)
11fdf7f2
TL
368{
369 callback_invoked = false;
370 const auto current_connections = amqp::get_connection_count();
371 const std::string host("localhost3");
372 amqp_mock::set_valid_host(host);
e306af50 373 conn = amqp::connect("amqp://" + host, "ex1", false);
11fdf7f2
TL
374 EXPECT_TRUE(conn);
375 EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
376 EXPECT_TRUE(amqp::disconnect(conn));
eafe8130 377 std::this_thread::sleep_for(long_wait_time);
11fdf7f2
TL
378 // make sure number of connections decreased back
379 EXPECT_EQ(amqp::get_connection_count(), current_connections);
380 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
381 EXPECT_LT(rc, 0);
eafe8130 382 std::this_thread::sleep_for(long_wait_time);
11fdf7f2
TL
383 EXPECT_FALSE(callback_invoked);
384 callback_invoked = false;
385 amqp_mock::set_valid_host("localhost");
386}
387
eafe8130 388TEST_F(TestAMQP, RetryInvalidHost)
11fdf7f2
TL
389{
390 const std::string host = "192.168.0.1";
391 const auto connection_number = amqp::get_connection_count();
e306af50 392 conn = amqp::connect("amqp://"+host, "ex1", false);
11fdf7f2
TL
393 EXPECT_TRUE(conn);
394 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
395 auto rc = amqp::publish(conn, "topic", "message");
396 EXPECT_LT(rc, 0);
397 // now next retry should be ok
398 amqp_mock::set_valid_host(host);
eafe8130 399 std::this_thread::sleep_for(long_wait_time);
11fdf7f2
TL
400 rc = amqp::publish(conn, "topic", "message");
401 EXPECT_EQ(rc, 0);
402 amqp_mock::set_valid_host("localhost");
403}
404
eafe8130 405TEST_F(TestAMQP, RetryInvalidPort)
11fdf7f2
TL
406{
407 const int port = 9999;
408 const auto connection_number = amqp::get_connection_count();
e306af50 409 conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false);
11fdf7f2
TL
410 EXPECT_TRUE(conn);
411 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
412 auto rc = amqp::publish(conn, "topic", "message");
413 EXPECT_LT(rc, 0);
414 // now next retry should be ok
415 amqp_mock::set_valid_port(port);
eafe8130 416 std::this_thread::sleep_for(long_wait_time);
11fdf7f2
TL
417 rc = amqp::publish(conn, "topic", "message");
418 EXPECT_EQ(rc, 0);
419 amqp_mock::set_valid_port(5672);
420}
421
eafe8130 422TEST_F(TestAMQP, RetryFailWrite)
11fdf7f2
TL
423{
424 callback_invoked = false;
425 amqp_mock::FAIL_NEXT_WRITE = true;
426 const std::string host("localhost4");
427 amqp_mock::set_valid_host(host);
e306af50 428 conn = amqp::connect("amqp://" + host, "ex1", false);
11fdf7f2
TL
429 EXPECT_TRUE(conn);
430 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
431 EXPECT_EQ(rc, 0);
432 // set port to a different one, so that reconnect would fail
433 amqp_mock::set_valid_port(9999);
eafe8130 434 wait_until_drained();
11fdf7f2
TL
435 EXPECT_TRUE(callback_invoked);
436 callback_invoked = false;
437 rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
438 EXPECT_LT(rc, 0);
439 // expect immediate failure, no callback called after sleep
eafe8130 440 std::this_thread::sleep_for(long_wait_time);
11fdf7f2
TL
441 EXPECT_FALSE(callback_invoked);
442 // set port to the right one so that reconnect would succeed
443 amqp_mock::set_valid_port(5672);
444 callback_invoked = false;
445 amqp_mock::FAIL_NEXT_WRITE = false;
446 // give time to reconnect
eafe8130 447 std::this_thread::sleep_for(long_wait_time);
11fdf7f2
TL
448 // retry to publish should succeed now
449 rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
450 EXPECT_EQ(rc, 0);
eafe8130 451 wait_until_drained();
11fdf7f2
TL
452 EXPECT_TRUE(callback_invoked);
453 callback_invoked = false;
454 amqp_mock::set_valid_host("localhost");
455}
456