]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/test_rgw_amqp.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / test / rgw / test_rgw_amqp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_amqp.h"
5 #include "common/ceph_context.h"
6 #include "amqp_mock.h"
7 #include <gtest/gtest.h>
8 #include <chrono>
9 #include <thread>
10 #include <atomic>
11
12 using namespace rgw;
13
14 const std::chrono::milliseconds wait_time(10);
15 const std::chrono::milliseconds long_wait_time = wait_time*50;
16
17
18 class CctCleaner {
19 CephContext* cct;
20 public:
21 CctCleaner(CephContext* _cct) : cct(_cct) {}
22 ~CctCleaner() {
23 #ifdef WITH_SEASTAR
24 delete cct;
25 #else
26 cct->put();
27 #endif
28 }
29 };
30
31 auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT);
32
33 CctCleaner cleaner(cct);
34
35 class TestAMQP : public ::testing::Test {
36 protected:
37 amqp::connection_ptr_t conn = nullptr;
38 unsigned current_dequeued = 0U;
39
40 void SetUp() override {
41 ASSERT_TRUE(amqp::init(cct));
42 }
43
44 void TearDown() override {
45 amqp::shutdown();
46 }
47
48 // wait for at least one new (since last drain) message to be dequeueud
49 // and then wait for all pending answers to be received
50 void wait_until_drained() {
51 while (amqp::get_dequeued() == current_dequeued) {
52 std::this_thread::sleep_for(wait_time);
53 }
54 while (amqp::get_inflight() > 0) {
55 std::this_thread::sleep_for(wait_time);
56 }
57 current_dequeued = amqp::get_dequeued();
58 }
59 };
60
61 TEST_F(TestAMQP, ConnectionOK)
62 {
63 const auto connection_number = amqp::get_connection_count();
64 conn = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
65 EXPECT_TRUE(conn);
66 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
67 auto rc = amqp::publish(conn, "topic", "message");
68 EXPECT_EQ(rc, 0);
69 }
70
71 TEST_F(TestAMQP, SSLConnectionOK)
72 {
73 const int port = 5671;
74 const auto connection_number = amqp::get_connection_count();
75 amqp_mock::set_valid_port(port);
76 conn = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
77 EXPECT_TRUE(conn);
78 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
79 auto rc = amqp::publish(conn, "topic", "message");
80 EXPECT_EQ(rc, 0);
81 amqp_mock::set_valid_port(5672);
82 }
83
84 TEST_F(TestAMQP, PlainAndSSLConnectionsOK)
85 {
86 const int port = 5671;
87 const auto connection_number = amqp::get_connection_count();
88 amqp_mock::set_valid_port(port);
89 amqp::connection_ptr_t conn1 = amqp::connect("amqps://localhost", "ex1", false, false, boost::none);
90 EXPECT_TRUE(conn1);
91 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
92 auto rc = amqp::publish(conn1, "topic", "message");
93 EXPECT_EQ(rc, 0);
94 amqp_mock::set_valid_port(5672);
95 amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
96 EXPECT_TRUE(conn2);
97 EXPECT_EQ(amqp::get_connection_count(), connection_number + 2);
98 rc = amqp::publish(conn2, "topic", "message");
99 EXPECT_EQ(rc, 0);
100 }
101
102 TEST_F(TestAMQP, ConnectionReuse)
103 {
104 amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
105 EXPECT_TRUE(conn1);
106 const auto connection_number = amqp::get_connection_count();
107 amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false, false, boost::none);
108 EXPECT_TRUE(conn2);
109 EXPECT_EQ(amqp::get_connection_count(), connection_number);
110 auto rc = amqp::publish(conn1, "topic", "message");
111 EXPECT_EQ(rc, 0);
112 }
113
114 TEST_F(TestAMQP, NameResolutionFail)
115 {
116 const auto connection_number = amqp::get_connection_count();
117 conn = amqp::connect("amqp://kaboom", "ex1", false, false, boost::none);
118 EXPECT_TRUE(conn);
119 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
120 auto rc = amqp::publish(conn, "topic", "message");
121 EXPECT_LT(rc, 0);
122 }
123
124 TEST_F(TestAMQP, InvalidPort)
125 {
126 const auto connection_number = amqp::get_connection_count();
127 conn = amqp::connect("amqp://localhost:1234", "ex1", false, false, boost::none);
128 EXPECT_TRUE(conn);
129 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
130 auto rc = amqp::publish(conn, "topic", "message");
131 EXPECT_LT(rc, 0);
132 }
133
134 TEST_F(TestAMQP, InvalidHost)
135 {
136 const auto connection_number = amqp::get_connection_count();
137 conn = amqp::connect("amqp://0.0.0.1", "ex1", false, false, boost::none);
138 EXPECT_TRUE(conn);
139 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
140 auto rc = amqp::publish(conn, "topic", "message");
141 EXPECT_LT(rc, 0);
142 }
143
144 TEST_F(TestAMQP, InvalidVhost)
145 {
146 const auto connection_number = amqp::get_connection_count();
147 conn = amqp::connect("amqp://localhost/kaboom", "ex1", false, false, boost::none);
148 EXPECT_TRUE(conn);
149 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
150 auto rc = amqp::publish(conn, "topic", "message");
151 EXPECT_LT(rc, 0);
152 }
153
154 TEST_F(TestAMQP, UserPassword)
155 {
156 amqp_mock::set_valid_host("127.0.0.1");
157 {
158 const auto connection_number = amqp::get_connection_count();
159 conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false, false, boost::none);
160 EXPECT_TRUE(conn);
161 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
162 auto rc = amqp::publish(conn, "topic", "message");
163 EXPECT_LT(rc, 0);
164 }
165 // now try the same connection with default user/password
166 amqp_mock::set_valid_host("127.0.0.2");
167 {
168 const auto connection_number = amqp::get_connection_count();
169 conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false, false, boost::none);
170 EXPECT_TRUE(conn);
171 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
172 auto rc = amqp::publish(conn, "topic", "message");
173 EXPECT_EQ(rc, 0);
174 }
175 amqp_mock::set_valid_host("localhost");
176 }
177
178 TEST_F(TestAMQP, URLParseError)
179 {
180 const auto connection_number = amqp::get_connection_count();
181 conn = amqp::connect("http://localhost", "ex1", false, false, boost::none);
182 EXPECT_FALSE(conn);
183 EXPECT_EQ(amqp::get_connection_count(), connection_number);
184 auto rc = amqp::publish(conn, "topic", "message");
185 EXPECT_LT(rc, 0);
186 }
187
188 TEST_F(TestAMQP, ExchangeMismatch)
189 {
190 const auto connection_number = amqp::get_connection_count();
191 conn = amqp::connect("http://localhost", "ex2", false, false, boost::none);
192 EXPECT_FALSE(conn);
193 EXPECT_EQ(amqp::get_connection_count(), connection_number);
194 auto rc = amqp::publish(conn, "topic", "message");
195 EXPECT_LT(rc, 0);
196 }
197
198 TEST_F(TestAMQP, MaxConnections)
199 {
200 // fill up all connections
201 std::vector<amqp::connection_ptr_t> connections;
202 auto remaining_connections = amqp::get_max_connections() - amqp::get_connection_count();
203 while (remaining_connections > 0) {
204 const auto host = "127.10.0." + std::to_string(remaining_connections);
205 amqp_mock::set_valid_host(host);
206 amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
207 EXPECT_TRUE(conn);
208 auto rc = amqp::publish(conn, "topic", "message");
209 EXPECT_EQ(rc, 0);
210 --remaining_connections;
211 connections.push_back(conn);
212 }
213 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
214 // try to add another connection
215 {
216 const std::string host = "toomany";
217 amqp_mock::set_valid_host(host);
218 amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
219 EXPECT_FALSE(conn);
220 auto rc = amqp::publish(conn, "topic", "message");
221 EXPECT_LT(rc, 0);
222 }
223 EXPECT_EQ(amqp::get_connection_count(), amqp::get_max_connections());
224 amqp_mock::set_valid_host("localhost");
225 }
226
227 std::atomic<bool> callback_invoked = false;
228
229 std::atomic<int> callbacks_invoked = 0;
230
231 // note: because these callback are shared among different "publish" calls
232 // they should be used on different connections
233
234 void my_callback_expect_ack(int rc) {
235 EXPECT_EQ(0, rc);
236 callback_invoked = true;
237 }
238
239 void my_callback_expect_nack(int rc) {
240 EXPECT_LT(rc, 0);
241 callback_invoked = true;
242 }
243
244 void my_callback_expect_multiple_acks(int rc) {
245 EXPECT_EQ(0, rc);
246 ++callbacks_invoked;
247 }
248
249 class dynamic_callback_wrapper {
250 dynamic_callback_wrapper() = default;
251 public:
252 static dynamic_callback_wrapper* create() {
253 return new dynamic_callback_wrapper;
254 }
255 void callback(int rc) {
256 EXPECT_EQ(0, rc);
257 ++callbacks_invoked;
258 delete this;
259 }
260 };
261
262 void my_callback_expect_close_or_ack(int rc) {
263 // deleting the connection should trigger the callback with -4098
264 // but due to race conditions, some my get an ack
265 EXPECT_TRUE(-4098 == rc || 0 == rc);
266 }
267
268 TEST_F(TestAMQP, ReceiveAck)
269 {
270 callback_invoked = false;
271 const std::string host("localhost1");
272 amqp_mock::set_valid_host(host);
273 conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
274 EXPECT_TRUE(conn);
275 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
276 EXPECT_EQ(rc, 0);
277 wait_until_drained();
278 EXPECT_TRUE(callback_invoked);
279 amqp_mock::set_valid_host("localhost");
280 }
281
282 TEST_F(TestAMQP, ImplicitConnectionClose)
283 {
284 callback_invoked = false;
285 const std::string host("localhost1");
286 amqp_mock::set_valid_host(host);
287 conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
288 EXPECT_TRUE(conn);
289 const auto NUMBER_OF_CALLS = 2000;
290 for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
291 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_close_or_ack);
292 EXPECT_EQ(rc, 0);
293 }
294 wait_until_drained();
295 // deleting the connection object should close the connection
296 conn.reset(nullptr);
297 amqp_mock::set_valid_host("localhost");
298 }
299
300 TEST_F(TestAMQP, ReceiveMultipleAck)
301 {
302 callbacks_invoked = 0;
303 const std::string host("localhost1");
304 amqp_mock::set_valid_host(host);
305 conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
306 EXPECT_TRUE(conn);
307 const auto NUMBER_OF_CALLS = 100;
308 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
309 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
310 EXPECT_EQ(rc, 0);
311 }
312 wait_until_drained();
313 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
314 callbacks_invoked = 0;
315 amqp_mock::set_valid_host("localhost");
316 }
317
318 TEST_F(TestAMQP, ReceiveAckForMultiple)
319 {
320 callbacks_invoked = 0;
321 const std::string host("localhost1");
322 amqp_mock::set_valid_host(host);
323 conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
324 EXPECT_TRUE(conn);
325 amqp_mock::set_multiple(59);
326 const auto NUMBER_OF_CALLS = 100;
327 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
328 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
329 EXPECT_EQ(rc, 0);
330 }
331 wait_until_drained();
332 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
333 callbacks_invoked = 0;
334 amqp_mock::set_valid_host("localhost");
335 }
336
337 TEST_F(TestAMQP, DynamicCallback)
338 {
339 callbacks_invoked = 0;
340 const std::string host("localhost1");
341 amqp_mock::set_valid_host(host);
342 conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
343 EXPECT_TRUE(conn);
344 amqp_mock::set_multiple(59);
345 const auto NUMBER_OF_CALLS = 100;
346 for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
347 auto rc = publish_with_confirm(conn, "topic", "message",
348 std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
349 EXPECT_EQ(rc, 0);
350 }
351 wait_until_drained();
352 EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
353 callbacks_invoked = 0;
354 amqp_mock::set_valid_host("localhost");
355 }
356
357 TEST_F(TestAMQP, ReceiveNack)
358 {
359 callback_invoked = false;
360 amqp_mock::REPLY_ACK = false;
361 const std::string host("localhost2");
362 amqp_mock::set_valid_host(host);
363 conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
364 EXPECT_TRUE(conn);
365 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
366 EXPECT_EQ(rc, 0);
367 wait_until_drained();
368 EXPECT_TRUE(callback_invoked);
369 amqp_mock::REPLY_ACK = true;
370 callback_invoked = false;
371 amqp_mock::set_valid_host("localhost");
372 }
373
374 TEST_F(TestAMQP, FailWrite)
375 {
376 callback_invoked = false;
377 amqp_mock::FAIL_NEXT_WRITE = true;
378 const std::string host("localhost2");
379 amqp_mock::set_valid_host(host);
380 conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
381 EXPECT_TRUE(conn);
382 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
383 EXPECT_EQ(rc, 0);
384 wait_until_drained();
385 EXPECT_TRUE(callback_invoked);
386 amqp_mock::FAIL_NEXT_WRITE = false;
387 callback_invoked = false;
388 amqp_mock::set_valid_host("localhost");
389 }
390
391 TEST_F(TestAMQP, RetryInvalidHost)
392 {
393 const std::string host = "192.168.0.1";
394 const auto connection_number = amqp::get_connection_count();
395 conn = amqp::connect("amqp://"+host, "ex1", false, false, boost::none);
396 EXPECT_TRUE(conn);
397 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
398 auto rc = amqp::publish(conn, "topic", "message");
399 EXPECT_LT(rc, 0);
400 // now next retry should be ok
401 amqp_mock::set_valid_host(host);
402 std::this_thread::sleep_for(long_wait_time);
403 rc = amqp::publish(conn, "topic", "message");
404 EXPECT_EQ(rc, 0);
405 amqp_mock::set_valid_host("localhost");
406 }
407
408 TEST_F(TestAMQP, RetryInvalidPort)
409 {
410 const int port = 9999;
411 const auto connection_number = amqp::get_connection_count();
412 conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false, false, boost::none);
413 EXPECT_TRUE(conn);
414 EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
415 auto rc = amqp::publish(conn, "topic", "message");
416 EXPECT_LT(rc, 0);
417 // now next retry should be ok
418 amqp_mock::set_valid_port(port);
419 std::this_thread::sleep_for(long_wait_time);
420 rc = amqp::publish(conn, "topic", "message");
421 EXPECT_EQ(rc, 0);
422 amqp_mock::set_valid_port(5672);
423 }
424
425 TEST_F(TestAMQP, RetryFailWrite)
426 {
427 callback_invoked = false;
428 amqp_mock::FAIL_NEXT_WRITE = true;
429 const std::string host("localhost4");
430 amqp_mock::set_valid_host(host);
431 conn = amqp::connect("amqp://" + host, "ex1", false, false, boost::none);
432 EXPECT_TRUE(conn);
433 auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
434 EXPECT_EQ(rc, 0);
435 // set port to a different one, so that reconnect would fail
436 amqp_mock::set_valid_port(9999);
437 wait_until_drained();
438 EXPECT_TRUE(callback_invoked);
439 callback_invoked = false;
440 rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
441 EXPECT_LT(rc, 0);
442 // expect immediate failure, no callback called after sleep
443 std::this_thread::sleep_for(long_wait_time);
444 EXPECT_FALSE(callback_invoked);
445 // set port to the right one so that reconnect would succeed
446 amqp_mock::set_valid_port(5672);
447 callback_invoked = false;
448 amqp_mock::FAIL_NEXT_WRITE = false;
449 // give time to reconnect
450 std::this_thread::sleep_for(long_wait_time);
451 // retry to publish should succeed now
452 rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
453 EXPECT_EQ(rc, 0);
454 wait_until_drained();
455 EXPECT_TRUE(callback_invoked);
456 callback_invoked = false;
457 amqp_mock::set_valid_host("localhost");
458 }
459