1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <amqp_tcp_socket.h>
10 #include <boost/lockfree/queue.hpp>
14 std::mutex set_valid_lock
;
16 std::string
VALID_HOST("localhost");
17 std::string
VALID_VHOST("/");
18 std::string
VALID_USER("guest");
19 std::string
VALID_PASSWORD("guest");
21 void set_valid_port(int port
) {
22 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
26 void set_valid_host(const std::string
& host
) {
27 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
31 void set_valid_vhost(const std::string
& vhost
) {
32 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
36 void set_valid_user(const std::string
& user
, const std::string
& password
) {
37 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
39 VALID_PASSWORD
= password
;
42 std::atomic
<unsigned> g_tag_skip
= 0;
43 std::atomic
<int> g_multiple
= 0;
45 void set_multiple(unsigned tag_skip
) {
47 g_tag_skip
= tag_skip
;
50 void reset_multiple() {
55 bool FAIL_NEXT_WRITE(false);
56 bool FAIL_NEXT_READ(false);
60 using namespace amqp_mock
;
62 struct amqp_connection_state_t_
{
63 amqp_socket_t
* socket
;
64 amqp_channel_open_ok_t
* channel1
;
65 amqp_channel_open_ok_t
* channel2
;
66 amqp_exchange_declare_ok_t
* exchange
;
67 amqp_queue_declare_ok_t
* queue
;
68 amqp_confirm_select_ok_t
* confirm
;
69 amqp_basic_consume_ok_t
* consume
;
71 boost::lockfree::queue
<amqp_basic_ack_t
> ack_list
;
72 boost::lockfree::queue
<amqp_basic_nack_t
> nack_list
;
73 std::atomic
<uint64_t> delivery_tag
;
74 amqp_rpc_reply_t reply
;
76 amqp_basic_nack_t nack
;
78 amqp_connection_state_t_() :
90 reply
.reply_type
= AMQP_RESPONSE_NONE
;
94 struct amqp_socket_t_
{
97 amqp_socket_t_() : open_called(false) {
101 amqp_connection_state_t AMQP_CALL
amqp_new_connection(void) {
102 auto s
= new amqp_connection_state_t_
;
106 int amqp_destroy_connection(amqp_connection_state_t state
) {
107 delete state
->socket
;
108 delete state
->channel1
;
109 delete state
->channel2
;
110 delete state
->exchange
;
112 delete state
->confirm
;
113 delete state
->consume
;
118 amqp_socket_t
* amqp_tcp_socket_new(amqp_connection_state_t state
) {
119 state
->socket
= new amqp_socket_t
;
120 return state
->socket
;
123 int amqp_socket_open(amqp_socket_t
*self
, const char *host
, int port
) {
128 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
129 if (std::string(host
) != VALID_HOST
) {
132 if (port
!= VALID_PORT
) {
136 self
->open_called
= true;
140 amqp_rpc_reply_t
amqp_login(
141 amqp_connection_state_t state
,
146 amqp_sasl_method_enum sasl_method
, ...) {
147 state
->reply
.reply_type
= AMQP_RESPONSE_SERVER_EXCEPTION
;
148 state
->reply
.library_error
= 0;
149 state
->reply
.reply
.decoded
= nullptr;
150 state
->reply
.reply
.id
= 0;
151 if (std::string(vhost
) != VALID_VHOST
) {
154 if (sasl_method
!= AMQP_SASL_METHOD_PLAIN
) {
158 va_start(args
, sasl_method
);
159 char* user
= va_arg(args
, char*);
160 char* password
= va_arg(args
, char*);
162 if (std::string(user
) != VALID_USER
) {
165 if (std::string(password
) != VALID_PASSWORD
) {
168 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
169 state
->login_called
= true;
173 amqp_channel_open_ok_t
* amqp_channel_open(amqp_connection_state_t state
, amqp_channel_t channel
) {
174 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
175 if (state
->channel1
== nullptr) {
176 state
->channel1
= new amqp_channel_open_ok_t
;
177 return state
->channel1
;
180 state
->channel2
= new amqp_channel_open_ok_t
;
181 return state
->channel2
;
184 amqp_exchange_declare_ok_t
* amqp_exchange_declare(
185 amqp_connection_state_t state
,
186 amqp_channel_t channel
,
187 amqp_bytes_t exchange
,
189 amqp_boolean_t passive
,
190 amqp_boolean_t durable
,
191 amqp_boolean_t auto_delete
,
192 amqp_boolean_t internal
,
193 amqp_table_t arguments
) {
194 state
->exchange
= new amqp_exchange_declare_ok_t
;
195 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
196 return state
->exchange
;
199 amqp_rpc_reply_t
amqp_get_rpc_reply(amqp_connection_state_t state
) {
203 int amqp_basic_publish(
204 amqp_connection_state_t state
,
205 amqp_channel_t channel
,
206 amqp_bytes_t exchange
,
207 amqp_bytes_t routing_key
,
208 amqp_boolean_t mandatory
,
209 amqp_boolean_t immediate
,
210 struct amqp_basic_properties_t_
const *properties
,
212 // make sure that all calls happened before publish
213 if (state
->socket
&& state
->socket
->open_called
&&
214 state
->login_called
&& state
->channel1
&& state
->channel2
&& state
->exchange
&&
216 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
219 state
->ack_list
.push(amqp_basic_ack_t
{state
->delivery_tag
++, 0});
221 state
->nack_list
.push(amqp_basic_nack_t
{state
->delivery_tag
++, 0});
224 return AMQP_STATUS_OK
;
226 return AMQP_STATUS_CONNECTION_CLOSED
;
229 const amqp_table_t amqp_empty_table
= {0, NULL
};
230 const amqp_bytes_t amqp_empty_bytes
= {0, NULL
};
232 const char* amqp_error_string2(int code
) {
233 static const char* str
= "mock error";
237 char const* amqp_method_name(amqp_method_number_t methodNumber
) {
238 static const char* str
= "mock method";
242 amqp_queue_declare_ok_t
* amqp_queue_declare(
243 amqp_connection_state_t state
, amqp_channel_t channel
, amqp_bytes_t queue
,
244 amqp_boolean_t passive
, amqp_boolean_t durable
, amqp_boolean_t exclusive
,
245 amqp_boolean_t auto_delete
, amqp_table_t arguments
) {
246 state
->queue
= new amqp_queue_declare_ok_t
;
247 static const char* str
= "tmp-queue";
248 state
->queue
->queue
= amqp_cstring_bytes(str
);
249 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
253 amqp_confirm_select_ok_t
* amqp_confirm_select(amqp_connection_state_t state
, amqp_channel_t channel
) {
254 state
->confirm
= new amqp_confirm_select_ok_t
;
255 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
256 return state
->confirm
;
259 int amqp_simple_wait_frame_noblock(amqp_connection_state_t state
, amqp_frame_t
*decoded_frame
, struct timeval
* tv
) {
260 if (state
->socket
&& state
->socket
->open_called
&&
261 state
->login_called
&& state
->channel1
&& state
->channel2
&& state
->exchange
&&
262 state
->queue
&& state
->consume
&& state
->confirm
&& !FAIL_NEXT_READ
) {
264 usleep(tv
->tv_sec
*1000000+tv
->tv_usec
);
267 // pop multiples and reply once at the end
268 for (auto i
= 0U; i
< g_tag_skip
; ++i
) {
269 if (REPLY_ACK
&& !state
->ack_list
.pop(state
->ack
)) {
271 return AMQP_STATUS_TIMEOUT
;
272 } else if (!REPLY_ACK
&& !state
->nack_list
.pop(state
->nack
)) {
274 return AMQP_STATUS_TIMEOUT
;
278 state
->ack
.multiple
= g_multiple
;
279 decoded_frame
->payload
.method
.id
= AMQP_BASIC_ACK_METHOD
;
280 decoded_frame
->payload
.method
.decoded
= &state
->ack
;
282 state
->nack
.multiple
= g_multiple
;
283 decoded_frame
->payload
.method
.id
= AMQP_BASIC_NACK_METHOD
;
284 decoded_frame
->payload
.method
.decoded
= &state
->nack
;
286 decoded_frame
->frame_type
= AMQP_FRAME_METHOD
;
287 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
289 return AMQP_STATUS_OK
;
291 // pop replies one by one
292 if (REPLY_ACK
&& state
->ack_list
.pop(state
->ack
)) {
293 state
->ack
.multiple
= g_multiple
;
294 decoded_frame
->frame_type
= AMQP_FRAME_METHOD
;
295 decoded_frame
->payload
.method
.id
= AMQP_BASIC_ACK_METHOD
;
296 decoded_frame
->payload
.method
.decoded
= &state
->ack
;
297 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
298 return AMQP_STATUS_OK
;
299 } else if (!REPLY_ACK
&& state
->nack_list
.pop(state
->nack
)) {
300 state
->nack
.multiple
= g_multiple
;
301 decoded_frame
->frame_type
= AMQP_FRAME_METHOD
;
302 decoded_frame
->payload
.method
.id
= AMQP_BASIC_NACK_METHOD
;
303 decoded_frame
->payload
.method
.decoded
= &state
->nack
;
304 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
305 return AMQP_STATUS_OK
;
308 return AMQP_STATUS_TIMEOUT
;
311 return AMQP_STATUS_CONNECTION_CLOSED
;
314 amqp_basic_consume_ok_t
* amqp_basic_consume(
315 amqp_connection_state_t state
, amqp_channel_t channel
, amqp_bytes_t queue
,
316 amqp_bytes_t consumer_tag
, amqp_boolean_t no_local
, amqp_boolean_t no_ack
,
317 amqp_boolean_t exclusive
, amqp_table_t arguments
) {
318 state
->consume
= new amqp_basic_consume_ok_t
;
319 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
320 return state
->consume
;
323 // amqp_parse_url() is linked via the actual rabbitmq-c library code. see: amqp_url.c
325 // following functions are the actual implementation copied from rabbitmq-c library
329 amqp_bytes_t
amqp_cstring_bytes(const char* cstr
) {
331 result
.len
= strlen(cstr
);
332 result
.bytes
= (void *)cstr
;
336 void amqp_bytes_free(amqp_bytes_t bytes
) { free(bytes
.bytes
); }
338 amqp_bytes_t
amqp_bytes_malloc_dup(amqp_bytes_t src
) {
340 result
.len
= src
.len
;
341 result
.bytes
= malloc(src
.len
);
342 if (result
.bytes
!= NULL
) {
343 memcpy(result
.bytes
, src
.bytes
, src
.len
);