1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <amqp_tcp_socket.h>
10 #include <boost/lockfree/queue.hpp>
14 std::mutex set_valid_lock
;
16 std::string
VALID_HOST("localhost");
17 std::string
VALID_VHOST("/");
18 std::string
VALID_USER("guest");
19 std::string
VALID_PASSWORD("guest");
21 void set_valid_port(int port
) {
22 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
26 void set_valid_host(const std::string
& host
) {
27 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
31 void set_valid_vhost(const std::string
& vhost
) {
32 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
36 void set_valid_user(const std::string
& user
, const std::string
& password
) {
37 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
39 VALID_PASSWORD
= password
;
42 bool FAIL_NEXT_WRITE(false);
43 bool FAIL_NEXT_READ(false);
47 using namespace amqp_mock
;
49 struct amqp_connection_state_t_
{
50 amqp_socket_t
* socket
;
51 amqp_channel_open_ok_t
* channel1
;
52 amqp_channel_open_ok_t
* channel2
;
53 amqp_exchange_declare_ok_t
* exchange
;
54 amqp_queue_declare_ok_t
* queue
;
55 amqp_confirm_select_ok_t
* confirm
;
56 amqp_basic_consume_ok_t
* consume
;
58 boost::lockfree::queue
<amqp_basic_ack_t
> ack_list
;
59 boost::lockfree::queue
<amqp_basic_nack_t
> nack_list
;
60 std::atomic
<uint64_t> delivery_tag
;
61 amqp_rpc_reply_t reply
;
63 amqp_basic_nack_t nack
;
65 amqp_connection_state_t_() :
77 reply
.reply_type
= AMQP_RESPONSE_NONE
;
81 struct amqp_socket_t_
{
84 amqp_socket_t_() : open_called(false) {
88 amqp_connection_state_t AMQP_CALL
amqp_new_connection(void) {
89 auto s
= new amqp_connection_state_t_
;
93 int amqp_destroy_connection(amqp_connection_state_t state
) {
95 delete state
->channel1
;
96 delete state
->channel2
;
97 delete state
->exchange
;
99 delete state
->confirm
;
100 delete state
->consume
;
105 amqp_socket_t
* amqp_tcp_socket_new(amqp_connection_state_t state
) {
106 state
->socket
= new amqp_socket_t
;
107 return state
->socket
;
110 int amqp_socket_open(amqp_socket_t
*self
, const char *host
, int port
) {
115 std::lock_guard
<std::mutex
> lock(set_valid_lock
);
116 if (std::string(host
) != VALID_HOST
) {
119 if (port
!= VALID_PORT
) {
123 self
->open_called
= true;
127 amqp_rpc_reply_t
amqp_login(
128 amqp_connection_state_t state
,
133 amqp_sasl_method_enum sasl_method
, ...) {
134 state
->reply
.reply_type
= AMQP_RESPONSE_SERVER_EXCEPTION
;
135 state
->reply
.library_error
= 0;
136 state
->reply
.reply
.decoded
= nullptr;
137 state
->reply
.reply
.id
= 0;
138 if (std::string(vhost
) != VALID_VHOST
) {
141 if (sasl_method
!= AMQP_SASL_METHOD_PLAIN
) {
145 va_start(args
, sasl_method
);
146 char* user
= va_arg(args
, char*);
147 char* password
= va_arg(args
, char*);
149 if (std::string(user
) != VALID_USER
) {
152 if (std::string(password
) != VALID_PASSWORD
) {
155 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
156 state
->login_called
= true;
160 amqp_channel_open_ok_t
* amqp_channel_open(amqp_connection_state_t state
, amqp_channel_t channel
) {
161 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
162 if (state
->channel1
== nullptr) {
163 state
->channel1
= new amqp_channel_open_ok_t
;
164 return state
->channel1
;
167 state
->channel2
= new amqp_channel_open_ok_t
;
168 return state
->channel2
;
171 amqp_exchange_declare_ok_t
* amqp_exchange_declare(
172 amqp_connection_state_t state
,
173 amqp_channel_t channel
,
174 amqp_bytes_t exchange
,
176 amqp_boolean_t passive
,
177 amqp_boolean_t durable
,
178 amqp_boolean_t auto_delete
,
179 amqp_boolean_t internal
,
180 amqp_table_t arguments
) {
181 state
->exchange
= new amqp_exchange_declare_ok_t
;
182 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
183 return state
->exchange
;
186 amqp_rpc_reply_t
amqp_get_rpc_reply(amqp_connection_state_t state
) {
190 int amqp_basic_publish(
191 amqp_connection_state_t state
,
192 amqp_channel_t channel
,
193 amqp_bytes_t exchange
,
194 amqp_bytes_t routing_key
,
195 amqp_boolean_t mandatory
,
196 amqp_boolean_t immediate
,
197 struct amqp_basic_properties_t_
const *properties
,
199 // make sure that all calls happened before publish
200 if (state
->socket
&& state
->socket
->open_called
&&
201 state
->login_called
&& state
->channel1
&& state
->channel2
&& state
->exchange
&&
203 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
206 state
->ack_list
.push(amqp_basic_ack_t
{state
->delivery_tag
++, 0});
208 state
->nack_list
.push(amqp_basic_nack_t
{state
->delivery_tag
++, 0});
211 return AMQP_STATUS_OK
;
213 return AMQP_STATUS_CONNECTION_CLOSED
;
216 const amqp_table_t amqp_empty_table
= {0, NULL
};
217 const amqp_bytes_t amqp_empty_bytes
= {0, NULL
};
219 const char* amqp_error_string2(int code
) {
220 static const char* str
= "mock error";
224 char const* amqp_method_name(amqp_method_number_t methodNumber
) {
225 static const char* str
= "mock method";
229 amqp_queue_declare_ok_t
* amqp_queue_declare(
230 amqp_connection_state_t state
, amqp_channel_t channel
, amqp_bytes_t queue
,
231 amqp_boolean_t passive
, amqp_boolean_t durable
, amqp_boolean_t exclusive
,
232 amqp_boolean_t auto_delete
, amqp_table_t arguments
) {
233 state
->queue
= new amqp_queue_declare_ok_t
;
234 static const char* str
= "tmp-queue";
235 state
->queue
->queue
= amqp_cstring_bytes(str
);
236 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
240 amqp_confirm_select_ok_t
* amqp_confirm_select(amqp_connection_state_t state
, amqp_channel_t channel
) {
241 state
->confirm
= new amqp_confirm_select_ok_t
;
242 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
243 return state
->confirm
;
246 int amqp_simple_wait_frame_noblock(amqp_connection_state_t state
, amqp_frame_t
*decoded_frame
, struct timeval
* tv
) {
247 if (state
->socket
&& state
->socket
->open_called
&&
248 state
->login_called
&& state
->channel1
&& state
->channel2
&& state
->exchange
&&
249 state
->queue
&& state
->consume
&& state
->confirm
&& !FAIL_NEXT_READ
) {
251 usleep(tv
->tv_sec
*1000000+tv
->tv_usec
);
254 if (state
->ack_list
.pop(state
->ack
)) {
255 decoded_frame
->frame_type
= AMQP_FRAME_METHOD
;
256 decoded_frame
->payload
.method
.id
= AMQP_BASIC_ACK_METHOD
;
257 decoded_frame
->payload
.method
.decoded
= &state
->ack
;
258 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
259 return AMQP_STATUS_OK
;
262 return AMQP_STATUS_TIMEOUT
;
265 if (state
->nack_list
.pop(state
->nack
)) {
266 decoded_frame
->frame_type
= AMQP_FRAME_METHOD
;
267 decoded_frame
->payload
.method
.id
= AMQP_BASIC_NACK_METHOD
;
268 decoded_frame
->payload
.method
.decoded
= &state
->nack
;
269 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
270 return AMQP_STATUS_OK
;
273 return AMQP_STATUS_TIMEOUT
;
277 return AMQP_STATUS_CONNECTION_CLOSED
;
280 amqp_basic_consume_ok_t
* amqp_basic_consume(
281 amqp_connection_state_t state
, amqp_channel_t channel
, amqp_bytes_t queue
,
282 amqp_bytes_t consumer_tag
, amqp_boolean_t no_local
, amqp_boolean_t no_ack
,
283 amqp_boolean_t exclusive
, amqp_table_t arguments
) {
284 state
->consume
= new amqp_basic_consume_ok_t
;
285 state
->reply
.reply_type
= AMQP_RESPONSE_NORMAL
;
286 return state
->consume
;
289 // amqp_parse_url() is linked via the actual rabbitmq-c library code. see: amqp_url.c
291 // following functions are the actual implementation copied from rabbitmq-c library
295 amqp_bytes_t
amqp_cstring_bytes(const char* cstr
) {
297 result
.len
= strlen(cstr
);
298 result
.bytes
= (void *)cstr
;
302 void amqp_bytes_free(amqp_bytes_t bytes
) { free(bytes
.bytes
); }
304 amqp_bytes_t
amqp_bytes_malloc_dup(amqp_bytes_t src
) {
306 result
.len
= src
.len
;
307 result
.bytes
= malloc(src
.len
);
308 if (result
.bytes
!= NULL
) {
309 memcpy(result
.bytes
, src
.bytes
, src
.len
);