]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "amqp_mock.h" | |
5 | #include <amqp.h> | |
6 | #include <amqp_tcp_socket.h> | |
7 | #include <string> | |
8 | #include <stdarg.h> | |
9 | #include <mutex> | |
10 | #include <boost/lockfree/queue.hpp> | |
11 | ||
12 | namespace amqp_mock { | |
13 | ||
14 | std::mutex set_valid_lock; | |
15 | int VALID_PORT(5672); | |
16 | std::string VALID_HOST("localhost"); | |
17 | std::string VALID_VHOST("/"); | |
18 | std::string VALID_USER("guest"); | |
19 | std::string VALID_PASSWORD("guest"); | |
20 | ||
21 | void set_valid_port(int port) { | |
22 | std::lock_guard<std::mutex> lock(set_valid_lock); | |
23 | VALID_PORT = port; | |
24 | } | |
25 | ||
26 | void set_valid_host(const std::string& host) { | |
27 | std::lock_guard<std::mutex> lock(set_valid_lock); | |
28 | VALID_HOST = host; | |
29 | } | |
30 | ||
31 | void set_valid_vhost(const std::string& vhost) { | |
32 | std::lock_guard<std::mutex> lock(set_valid_lock); | |
33 | VALID_VHOST = vhost; | |
34 | } | |
35 | ||
36 | void set_valid_user(const std::string& user, const std::string& password) { | |
37 | std::lock_guard<std::mutex> lock(set_valid_lock); | |
38 | VALID_USER = user; | |
39 | VALID_PASSWORD = password; | |
40 | } | |
41 | ||
eafe8130 TL |
42 | std::atomic<unsigned> g_tag_skip = 0; |
43 | std::atomic<int> g_multiple = 0; | |
44 | ||
45 | void set_multiple(unsigned tag_skip) { | |
46 | g_multiple = 1; | |
47 | g_tag_skip = tag_skip; | |
48 | } | |
49 | ||
50 | void reset_multiple() { | |
51 | g_multiple = 0; | |
52 | g_tag_skip = 0; | |
53 | } | |
54 | ||
11fdf7f2 TL |
55 | bool FAIL_NEXT_WRITE(false); |
56 | bool FAIL_NEXT_READ(false); | |
57 | bool REPLY_ACK(true); | |
58 | } | |
59 | ||
60 | using namespace amqp_mock; | |
61 | ||
62 | struct amqp_connection_state_t_ { | |
63 | amqp_socket_t* socket; | |
64 | amqp_channel_open_ok_t* channel1; | |
65 | amqp_channel_open_ok_t* channel2; | |
66 | amqp_exchange_declare_ok_t* exchange; | |
67 | amqp_queue_declare_ok_t* queue; | |
68 | amqp_confirm_select_ok_t* confirm; | |
69 | amqp_basic_consume_ok_t* consume; | |
70 | bool login_called; | |
71 | boost::lockfree::queue<amqp_basic_ack_t> ack_list; | |
72 | boost::lockfree::queue<amqp_basic_nack_t> nack_list; | |
73 | std::atomic<uint64_t> delivery_tag; | |
74 | amqp_rpc_reply_t reply; | |
75 | amqp_basic_ack_t ack; | |
76 | amqp_basic_nack_t nack; | |
77 | // ctor | |
78 | amqp_connection_state_t_() : | |
79 | socket(nullptr), | |
80 | channel1(nullptr), | |
81 | channel2(nullptr), | |
82 | exchange(nullptr), | |
83 | queue(nullptr), | |
84 | confirm(nullptr), | |
85 | consume(nullptr), | |
86 | login_called(false), | |
87 | ack_list(1024), | |
88 | nack_list(1024), | |
89 | delivery_tag(1) { | |
90 | reply.reply_type = AMQP_RESPONSE_NONE; | |
91 | } | |
92 | }; | |
93 | ||
94 | struct amqp_socket_t_ { | |
95 | bool open_called; | |
96 | // ctor | |
97 | amqp_socket_t_() : open_called(false) { | |
98 | } | |
99 | }; | |
100 | ||
101 | amqp_connection_state_t AMQP_CALL amqp_new_connection(void) { | |
102 | auto s = new amqp_connection_state_t_; | |
103 | return s; | |
104 | } | |
105 | ||
106 | int amqp_destroy_connection(amqp_connection_state_t state) { | |
107 | delete state->socket; | |
108 | delete state->channel1; | |
109 | delete state->channel2; | |
110 | delete state->exchange; | |
111 | delete state->queue; | |
112 | delete state->confirm; | |
113 | delete state->consume; | |
114 | delete state; | |
115 | return 0; | |
116 | } | |
117 | ||
118 | amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state) { | |
119 | state->socket = new amqp_socket_t; | |
120 | return state->socket; | |
121 | } | |
122 | ||
123 | int amqp_socket_open(amqp_socket_t *self, const char *host, int port) { | |
124 | if (!self) { | |
125 | return -1; | |
126 | } | |
127 | { | |
128 | std::lock_guard<std::mutex> lock(set_valid_lock); | |
129 | if (std::string(host) != VALID_HOST) { | |
130 | return -2; | |
131 | } | |
132 | if (port != VALID_PORT) { | |
133 | return -3; | |
134 | } | |
135 | } | |
136 | self->open_called = true; | |
137 | return 0; | |
138 | } | |
139 | ||
140 | amqp_rpc_reply_t amqp_login( | |
141 | amqp_connection_state_t state, | |
142 | char const *vhost, | |
143 | int channel_max, | |
144 | int frame_max, | |
145 | int heartbeat, | |
146 | amqp_sasl_method_enum sasl_method, ...) { | |
147 | state->reply.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; | |
148 | state->reply.library_error = 0; | |
149 | state->reply.reply.decoded = nullptr; | |
150 | state->reply.reply.id = 0; | |
151 | if (std::string(vhost) != VALID_VHOST) { | |
152 | return state->reply; | |
153 | } | |
154 | if (sasl_method != AMQP_SASL_METHOD_PLAIN) { | |
155 | return state->reply; | |
156 | } | |
157 | va_list args; | |
158 | va_start(args, sasl_method); | |
159 | char* user = va_arg(args, char*); | |
160 | char* password = va_arg(args, char*); | |
161 | va_end(args); | |
162 | if (std::string(user) != VALID_USER) { | |
163 | return state->reply; | |
164 | } | |
165 | if (std::string(password) != VALID_PASSWORD) { | |
166 | return state->reply; | |
167 | } | |
168 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
169 | state->login_called = true; | |
170 | return state->reply; | |
171 | } | |
172 | ||
173 | amqp_channel_open_ok_t* amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) { | |
174 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
175 | if (state->channel1 == nullptr) { | |
176 | state->channel1 = new amqp_channel_open_ok_t; | |
177 | return state->channel1; | |
178 | } | |
179 | ||
180 | state->channel2 = new amqp_channel_open_ok_t; | |
181 | return state->channel2; | |
182 | } | |
183 | ||
184 | amqp_exchange_declare_ok_t* amqp_exchange_declare( | |
185 | amqp_connection_state_t state, | |
186 | amqp_channel_t channel, | |
187 | amqp_bytes_t exchange, | |
188 | amqp_bytes_t type, | |
189 | amqp_boolean_t passive, | |
190 | amqp_boolean_t durable, | |
191 | amqp_boolean_t auto_delete, | |
192 | amqp_boolean_t internal, | |
193 | amqp_table_t arguments) { | |
194 | state->exchange = new amqp_exchange_declare_ok_t; | |
195 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
196 | return state->exchange; | |
197 | } | |
198 | ||
199 | amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { | |
200 | return state->reply; | |
201 | } | |
202 | ||
203 | int amqp_basic_publish( | |
204 | amqp_connection_state_t state, | |
205 | amqp_channel_t channel, | |
206 | amqp_bytes_t exchange, | |
207 | amqp_bytes_t routing_key, | |
208 | amqp_boolean_t mandatory, | |
209 | amqp_boolean_t immediate, | |
210 | struct amqp_basic_properties_t_ const *properties, | |
211 | amqp_bytes_t body) { | |
212 | // make sure that all calls happened before publish | |
213 | if (state->socket && state->socket->open_called && | |
214 | state->login_called && state->channel1 && state->channel2 && state->exchange && | |
215 | !FAIL_NEXT_WRITE) { | |
216 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
217 | if (properties) { | |
218 | if (REPLY_ACK) { | |
219 | state->ack_list.push(amqp_basic_ack_t{state->delivery_tag++, 0}); | |
220 | } else { | |
221 | state->nack_list.push(amqp_basic_nack_t{state->delivery_tag++, 0}); | |
222 | } | |
223 | } | |
224 | return AMQP_STATUS_OK; | |
225 | } | |
226 | return AMQP_STATUS_CONNECTION_CLOSED; | |
227 | } | |
228 | ||
229 | const amqp_table_t amqp_empty_table = {0, NULL}; | |
230 | const amqp_bytes_t amqp_empty_bytes = {0, NULL}; | |
231 | ||
232 | const char* amqp_error_string2(int code) { | |
233 | static const char* str = "mock error"; | |
234 | return str; | |
235 | } | |
236 | ||
237 | char const* amqp_method_name(amqp_method_number_t methodNumber) { | |
238 | static const char* str = "mock method"; | |
239 | return str; | |
240 | } | |
241 | ||
242 | amqp_queue_declare_ok_t* amqp_queue_declare( | |
243 | amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, | |
244 | amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, | |
245 | amqp_boolean_t auto_delete, amqp_table_t arguments) { | |
246 | state->queue = new amqp_queue_declare_ok_t; | |
247 | static const char* str = "tmp-queue"; | |
248 | state->queue->queue = amqp_cstring_bytes(str); | |
249 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
250 | return state->queue; | |
251 | } | |
252 | ||
253 | amqp_confirm_select_ok_t* amqp_confirm_select(amqp_connection_state_t state, amqp_channel_t channel) { | |
254 | state->confirm = new amqp_confirm_select_ok_t; | |
255 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
256 | return state->confirm; | |
257 | } | |
258 | ||
259 | int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval* tv) { | |
260 | if (state->socket && state->socket->open_called && | |
261 | state->login_called && state->channel1 && state->channel2 && state->exchange && | |
262 | state->queue && state->consume && state->confirm && !FAIL_NEXT_READ) { | |
263 | // "wait" for queue | |
264 | usleep(tv->tv_sec*1000000+tv->tv_usec); | |
265 | // read from queue | |
eafe8130 TL |
266 | if (g_multiple) { |
267 | // pop multiples and reply once at the end | |
268 | for (auto i = 0U; i < g_tag_skip; ++i) { | |
269 | if (REPLY_ACK && !state->ack_list.pop(state->ack)) { | |
270 | // queue is empty | |
271 | return AMQP_STATUS_TIMEOUT; | |
272 | } else if (!REPLY_ACK && !state->nack_list.pop(state->nack)) { | |
273 | // queue is empty | |
274 | return AMQP_STATUS_TIMEOUT; | |
275 | } | |
276 | } | |
277 | if (REPLY_ACK) { | |
278 | state->ack.multiple = g_multiple; | |
11fdf7f2 TL |
279 | decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD; |
280 | decoded_frame->payload.method.decoded = &state->ack; | |
11fdf7f2 | 281 | } else { |
eafe8130 | 282 | state->nack.multiple = g_multiple; |
11fdf7f2 TL |
283 | decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD; |
284 | decoded_frame->payload.method.decoded = &state->nack; | |
11fdf7f2 | 285 | } |
eafe8130 TL |
286 | decoded_frame->frame_type = AMQP_FRAME_METHOD; |
287 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
288 | reset_multiple(); | |
289 | return AMQP_STATUS_OK; | |
290 | } | |
291 | // pop replies one by one | |
292 | if (REPLY_ACK && state->ack_list.pop(state->ack)) { | |
293 | state->ack.multiple = g_multiple; | |
294 | decoded_frame->frame_type = AMQP_FRAME_METHOD; | |
295 | decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD; | |
296 | decoded_frame->payload.method.decoded = &state->ack; | |
297 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
298 | return AMQP_STATUS_OK; | |
299 | } else if (!REPLY_ACK && state->nack_list.pop(state->nack)) { | |
300 | state->nack.multiple = g_multiple; | |
301 | decoded_frame->frame_type = AMQP_FRAME_METHOD; | |
302 | decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD; | |
303 | decoded_frame->payload.method.decoded = &state->nack; | |
304 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
305 | return AMQP_STATUS_OK; | |
306 | } else { | |
307 | // queue is empty | |
308 | return AMQP_STATUS_TIMEOUT; | |
11fdf7f2 TL |
309 | } |
310 | } | |
311 | return AMQP_STATUS_CONNECTION_CLOSED; | |
312 | } | |
313 | ||
314 | amqp_basic_consume_ok_t* amqp_basic_consume( | |
315 | amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, | |
316 | amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, | |
317 | amqp_boolean_t exclusive, amqp_table_t arguments) { | |
318 | state->consume = new amqp_basic_consume_ok_t; | |
319 | state->reply.reply_type = AMQP_RESPONSE_NORMAL; | |
320 | return state->consume; | |
321 | } | |
322 | ||
323 | // amqp_parse_url() is linked via the actual rabbitmq-c library code. see: amqp_url.c | |
324 | ||
325 | // following functions are the actual implementation copied from rabbitmq-c library | |
326 | ||
327 | #include <string.h> | |
328 | ||
329 | amqp_bytes_t amqp_cstring_bytes(const char* cstr) { | |
330 | amqp_bytes_t result; | |
331 | result.len = strlen(cstr); | |
332 | result.bytes = (void *)cstr; | |
333 | return result; | |
334 | } | |
335 | ||
336 | void amqp_bytes_free(amqp_bytes_t bytes) { free(bytes.bytes); } | |
337 | ||
338 | amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) { | |
339 | amqp_bytes_t result; | |
340 | result.len = src.len; | |
341 | result.bytes = malloc(src.len); | |
342 | if (result.bytes != NULL) { | |
343 | memcpy(result.bytes, src.bytes, src.len); | |
344 | } | |
345 | return result; | |
346 | } | |
347 |