]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/amqp_mock.cc
import ceph 14.2.5
[ceph.git] / ceph / src / test / rgw / amqp_mock.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "amqp_mock.h"
5 #include <amqp.h>
6 #include <amqp_tcp_socket.h>
7 #include <string>
8 #include <stdarg.h>
9 #include <mutex>
10 #include <boost/lockfree/queue.hpp>
11
12 namespace amqp_mock {
13
14 std::mutex set_valid_lock;
15 int VALID_PORT(5672);
16 std::string VALID_HOST("localhost");
17 std::string VALID_VHOST("/");
18 std::string VALID_USER("guest");
19 std::string VALID_PASSWORD("guest");
20
21 void set_valid_port(int port) {
22 std::lock_guard<std::mutex> lock(set_valid_lock);
23 VALID_PORT = port;
24 }
25
26 void set_valid_host(const std::string& host) {
27 std::lock_guard<std::mutex> lock(set_valid_lock);
28 VALID_HOST = host;
29 }
30
31 void set_valid_vhost(const std::string& vhost) {
32 std::lock_guard<std::mutex> lock(set_valid_lock);
33 VALID_VHOST = vhost;
34 }
35
36 void set_valid_user(const std::string& user, const std::string& password) {
37 std::lock_guard<std::mutex> lock(set_valid_lock);
38 VALID_USER = user;
39 VALID_PASSWORD = password;
40 }
41
42 std::atomic<unsigned> g_tag_skip = 0;
43 std::atomic<int> g_multiple = 0;
44
45 void set_multiple(unsigned tag_skip) {
46 g_multiple = 1;
47 g_tag_skip = tag_skip;
48 }
49
50 void reset_multiple() {
51 g_multiple = 0;
52 g_tag_skip = 0;
53 }
54
55 bool FAIL_NEXT_WRITE(false);
56 bool FAIL_NEXT_READ(false);
57 bool REPLY_ACK(true);
58 }
59
60 using namespace amqp_mock;
61
62 struct amqp_connection_state_t_ {
63 amqp_socket_t* socket;
64 amqp_channel_open_ok_t* channel1;
65 amqp_channel_open_ok_t* channel2;
66 amqp_exchange_declare_ok_t* exchange;
67 amqp_queue_declare_ok_t* queue;
68 amqp_confirm_select_ok_t* confirm;
69 amqp_basic_consume_ok_t* consume;
70 bool login_called;
71 boost::lockfree::queue<amqp_basic_ack_t> ack_list;
72 boost::lockfree::queue<amqp_basic_nack_t> nack_list;
73 std::atomic<uint64_t> delivery_tag;
74 amqp_rpc_reply_t reply;
75 amqp_basic_ack_t ack;
76 amqp_basic_nack_t nack;
77 // ctor
78 amqp_connection_state_t_() :
79 socket(nullptr),
80 channel1(nullptr),
81 channel2(nullptr),
82 exchange(nullptr),
83 queue(nullptr),
84 confirm(nullptr),
85 consume(nullptr),
86 login_called(false),
87 ack_list(1024),
88 nack_list(1024),
89 delivery_tag(1) {
90 reply.reply_type = AMQP_RESPONSE_NONE;
91 }
92 };
93
94 struct amqp_socket_t_ {
95 bool open_called;
96 // ctor
97 amqp_socket_t_() : open_called(false) {
98 }
99 };
100
101 amqp_connection_state_t AMQP_CALL amqp_new_connection(void) {
102 auto s = new amqp_connection_state_t_;
103 return s;
104 }
105
106 int amqp_destroy_connection(amqp_connection_state_t state) {
107 delete state->socket;
108 delete state->channel1;
109 delete state->channel2;
110 delete state->exchange;
111 delete state->queue;
112 delete state->confirm;
113 delete state->consume;
114 delete state;
115 return 0;
116 }
117
118 amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state) {
119 state->socket = new amqp_socket_t;
120 return state->socket;
121 }
122
123 int amqp_socket_open(amqp_socket_t *self, const char *host, int port) {
124 if (!self) {
125 return -1;
126 }
127 {
128 std::lock_guard<std::mutex> lock(set_valid_lock);
129 if (std::string(host) != VALID_HOST) {
130 return -2;
131 }
132 if (port != VALID_PORT) {
133 return -3;
134 }
135 }
136 self->open_called = true;
137 return 0;
138 }
139
140 amqp_rpc_reply_t amqp_login(
141 amqp_connection_state_t state,
142 char const *vhost,
143 int channel_max,
144 int frame_max,
145 int heartbeat,
146 amqp_sasl_method_enum sasl_method, ...) {
147 state->reply.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
148 state->reply.library_error = 0;
149 state->reply.reply.decoded = nullptr;
150 state->reply.reply.id = 0;
151 if (std::string(vhost) != VALID_VHOST) {
152 return state->reply;
153 }
154 if (sasl_method != AMQP_SASL_METHOD_PLAIN) {
155 return state->reply;
156 }
157 va_list args;
158 va_start(args, sasl_method);
159 char* user = va_arg(args, char*);
160 char* password = va_arg(args, char*);
161 va_end(args);
162 if (std::string(user) != VALID_USER) {
163 return state->reply;
164 }
165 if (std::string(password) != VALID_PASSWORD) {
166 return state->reply;
167 }
168 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
169 state->login_called = true;
170 return state->reply;
171 }
172
173 amqp_channel_open_ok_t* amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) {
174 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
175 if (state->channel1 == nullptr) {
176 state->channel1 = new amqp_channel_open_ok_t;
177 return state->channel1;
178 }
179
180 state->channel2 = new amqp_channel_open_ok_t;
181 return state->channel2;
182 }
183
184 amqp_exchange_declare_ok_t* amqp_exchange_declare(
185 amqp_connection_state_t state,
186 amqp_channel_t channel,
187 amqp_bytes_t exchange,
188 amqp_bytes_t type,
189 amqp_boolean_t passive,
190 amqp_boolean_t durable,
191 amqp_boolean_t auto_delete,
192 amqp_boolean_t internal,
193 amqp_table_t arguments) {
194 state->exchange = new amqp_exchange_declare_ok_t;
195 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
196 return state->exchange;
197 }
198
199 amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) {
200 return state->reply;
201 }
202
203 int amqp_basic_publish(
204 amqp_connection_state_t state,
205 amqp_channel_t channel,
206 amqp_bytes_t exchange,
207 amqp_bytes_t routing_key,
208 amqp_boolean_t mandatory,
209 amqp_boolean_t immediate,
210 struct amqp_basic_properties_t_ const *properties,
211 amqp_bytes_t body) {
212 // make sure that all calls happened before publish
213 if (state->socket && state->socket->open_called &&
214 state->login_called && state->channel1 && state->channel2 && state->exchange &&
215 !FAIL_NEXT_WRITE) {
216 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
217 if (properties) {
218 if (REPLY_ACK) {
219 state->ack_list.push(amqp_basic_ack_t{state->delivery_tag++, 0});
220 } else {
221 state->nack_list.push(amqp_basic_nack_t{state->delivery_tag++, 0});
222 }
223 }
224 return AMQP_STATUS_OK;
225 }
226 return AMQP_STATUS_CONNECTION_CLOSED;
227 }
228
229 const amqp_table_t amqp_empty_table = {0, NULL};
230 const amqp_bytes_t amqp_empty_bytes = {0, NULL};
231
232 const char* amqp_error_string2(int code) {
233 static const char* str = "mock error";
234 return str;
235 }
236
237 char const* amqp_method_name(amqp_method_number_t methodNumber) {
238 static const char* str = "mock method";
239 return str;
240 }
241
242 amqp_queue_declare_ok_t* amqp_queue_declare(
243 amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue,
244 amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive,
245 amqp_boolean_t auto_delete, amqp_table_t arguments) {
246 state->queue = new amqp_queue_declare_ok_t;
247 static const char* str = "tmp-queue";
248 state->queue->queue = amqp_cstring_bytes(str);
249 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
250 return state->queue;
251 }
252
253 amqp_confirm_select_ok_t* amqp_confirm_select(amqp_connection_state_t state, amqp_channel_t channel) {
254 state->confirm = new amqp_confirm_select_ok_t;
255 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
256 return state->confirm;
257 }
258
259 int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval* tv) {
260 if (state->socket && state->socket->open_called &&
261 state->login_called && state->channel1 && state->channel2 && state->exchange &&
262 state->queue && state->consume && state->confirm && !FAIL_NEXT_READ) {
263 // "wait" for queue
264 usleep(tv->tv_sec*1000000+tv->tv_usec);
265 // read from queue
266 if (g_multiple) {
267 // pop multiples and reply once at the end
268 for (auto i = 0U; i < g_tag_skip; ++i) {
269 if (REPLY_ACK && !state->ack_list.pop(state->ack)) {
270 // queue is empty
271 return AMQP_STATUS_TIMEOUT;
272 } else if (!REPLY_ACK && !state->nack_list.pop(state->nack)) {
273 // queue is empty
274 return AMQP_STATUS_TIMEOUT;
275 }
276 }
277 if (REPLY_ACK) {
278 state->ack.multiple = g_multiple;
279 decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
280 decoded_frame->payload.method.decoded = &state->ack;
281 } else {
282 state->nack.multiple = g_multiple;
283 decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
284 decoded_frame->payload.method.decoded = &state->nack;
285 }
286 decoded_frame->frame_type = AMQP_FRAME_METHOD;
287 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
288 reset_multiple();
289 return AMQP_STATUS_OK;
290 }
291 // pop replies one by one
292 if (REPLY_ACK && state->ack_list.pop(state->ack)) {
293 state->ack.multiple = g_multiple;
294 decoded_frame->frame_type = AMQP_FRAME_METHOD;
295 decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
296 decoded_frame->payload.method.decoded = &state->ack;
297 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
298 return AMQP_STATUS_OK;
299 } else if (!REPLY_ACK && state->nack_list.pop(state->nack)) {
300 state->nack.multiple = g_multiple;
301 decoded_frame->frame_type = AMQP_FRAME_METHOD;
302 decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
303 decoded_frame->payload.method.decoded = &state->nack;
304 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
305 return AMQP_STATUS_OK;
306 } else {
307 // queue is empty
308 return AMQP_STATUS_TIMEOUT;
309 }
310 }
311 return AMQP_STATUS_CONNECTION_CLOSED;
312 }
313
314 amqp_basic_consume_ok_t* amqp_basic_consume(
315 amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue,
316 amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack,
317 amqp_boolean_t exclusive, amqp_table_t arguments) {
318 state->consume = new amqp_basic_consume_ok_t;
319 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
320 return state->consume;
321 }
322
323 // amqp_parse_url() is linked via the actual rabbitmq-c library code. see: amqp_url.c
324
325 // following functions are the actual implementation copied from rabbitmq-c library
326
327 #include <string.h>
328
329 amqp_bytes_t amqp_cstring_bytes(const char* cstr) {
330 amqp_bytes_t result;
331 result.len = strlen(cstr);
332 result.bytes = (void *)cstr;
333 return result;
334 }
335
336 void amqp_bytes_free(amqp_bytes_t bytes) { free(bytes.bytes); }
337
338 amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) {
339 amqp_bytes_t result;
340 result.len = src.len;
341 result.bytes = malloc(src.len);
342 if (result.bytes != NULL) {
343 memcpy(result.bytes, src.bytes, src.len);
344 }
345 return result;
346 }
347