]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/amqp_mock.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / rgw / amqp_mock.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "amqp_mock.h"
5 #include <amqp.h>
6 #include <amqp_tcp_socket.h>
7 #include <string>
8 #include <stdarg.h>
9 #include <mutex>
10 #include <boost/lockfree/queue.hpp>
11
12 namespace amqp_mock {
13
14 std::mutex set_valid_lock;
15 int VALID_PORT(5672);
16 std::string VALID_HOST("localhost");
17 std::string VALID_VHOST("/");
18 std::string VALID_USER("guest");
19 std::string VALID_PASSWORD("guest");
20
21 void set_valid_port(int port) {
22 std::lock_guard<std::mutex> lock(set_valid_lock);
23 VALID_PORT = port;
24 }
25
26 void set_valid_host(const std::string& host) {
27 std::lock_guard<std::mutex> lock(set_valid_lock);
28 VALID_HOST = host;
29 }
30
31 void set_valid_vhost(const std::string& vhost) {
32 std::lock_guard<std::mutex> lock(set_valid_lock);
33 VALID_VHOST = vhost;
34 }
35
36 void set_valid_user(const std::string& user, const std::string& password) {
37 std::lock_guard<std::mutex> lock(set_valid_lock);
38 VALID_USER = user;
39 VALID_PASSWORD = password;
40 }
41
42 bool FAIL_NEXT_WRITE(false);
43 bool FAIL_NEXT_READ(false);
44 bool REPLY_ACK(true);
45 }
46
47 using namespace amqp_mock;
48
49 struct amqp_connection_state_t_ {
50 amqp_socket_t* socket;
51 amqp_channel_open_ok_t* channel1;
52 amqp_channel_open_ok_t* channel2;
53 amqp_exchange_declare_ok_t* exchange;
54 amqp_queue_declare_ok_t* queue;
55 amqp_confirm_select_ok_t* confirm;
56 amqp_basic_consume_ok_t* consume;
57 bool login_called;
58 boost::lockfree::queue<amqp_basic_ack_t> ack_list;
59 boost::lockfree::queue<amqp_basic_nack_t> nack_list;
60 std::atomic<uint64_t> delivery_tag;
61 amqp_rpc_reply_t reply;
62 amqp_basic_ack_t ack;
63 amqp_basic_nack_t nack;
64 // ctor
65 amqp_connection_state_t_() :
66 socket(nullptr),
67 channel1(nullptr),
68 channel2(nullptr),
69 exchange(nullptr),
70 queue(nullptr),
71 confirm(nullptr),
72 consume(nullptr),
73 login_called(false),
74 ack_list(1024),
75 nack_list(1024),
76 delivery_tag(1) {
77 reply.reply_type = AMQP_RESPONSE_NONE;
78 }
79 };
80
81 struct amqp_socket_t_ {
82 bool open_called;
83 // ctor
84 amqp_socket_t_() : open_called(false) {
85 }
86 };
87
88 amqp_connection_state_t AMQP_CALL amqp_new_connection(void) {
89 auto s = new amqp_connection_state_t_;
90 return s;
91 }
92
93 int amqp_destroy_connection(amqp_connection_state_t state) {
94 delete state->socket;
95 delete state->channel1;
96 delete state->channel2;
97 delete state->exchange;
98 delete state->queue;
99 delete state->confirm;
100 delete state->consume;
101 delete state;
102 return 0;
103 }
104
105 amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state) {
106 state->socket = new amqp_socket_t;
107 return state->socket;
108 }
109
110 int amqp_socket_open(amqp_socket_t *self, const char *host, int port) {
111 if (!self) {
112 return -1;
113 }
114 {
115 std::lock_guard<std::mutex> lock(set_valid_lock);
116 if (std::string(host) != VALID_HOST) {
117 return -2;
118 }
119 if (port != VALID_PORT) {
120 return -3;
121 }
122 }
123 self->open_called = true;
124 return 0;
125 }
126
127 amqp_rpc_reply_t amqp_login(
128 amqp_connection_state_t state,
129 char const *vhost,
130 int channel_max,
131 int frame_max,
132 int heartbeat,
133 amqp_sasl_method_enum sasl_method, ...) {
134 state->reply.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
135 state->reply.library_error = 0;
136 state->reply.reply.decoded = nullptr;
137 state->reply.reply.id = 0;
138 if (std::string(vhost) != VALID_VHOST) {
139 return state->reply;
140 }
141 if (sasl_method != AMQP_SASL_METHOD_PLAIN) {
142 return state->reply;
143 }
144 va_list args;
145 va_start(args, sasl_method);
146 char* user = va_arg(args, char*);
147 char* password = va_arg(args, char*);
148 va_end(args);
149 if (std::string(user) != VALID_USER) {
150 return state->reply;
151 }
152 if (std::string(password) != VALID_PASSWORD) {
153 return state->reply;
154 }
155 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
156 state->login_called = true;
157 return state->reply;
158 }
159
160 amqp_channel_open_ok_t* amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) {
161 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
162 if (state->channel1 == nullptr) {
163 state->channel1 = new amqp_channel_open_ok_t;
164 return state->channel1;
165 }
166
167 state->channel2 = new amqp_channel_open_ok_t;
168 return state->channel2;
169 }
170
171 amqp_exchange_declare_ok_t* amqp_exchange_declare(
172 amqp_connection_state_t state,
173 amqp_channel_t channel,
174 amqp_bytes_t exchange,
175 amqp_bytes_t type,
176 amqp_boolean_t passive,
177 amqp_boolean_t durable,
178 amqp_boolean_t auto_delete,
179 amqp_boolean_t internal,
180 amqp_table_t arguments) {
181 state->exchange = new amqp_exchange_declare_ok_t;
182 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
183 return state->exchange;
184 }
185
186 amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) {
187 return state->reply;
188 }
189
190 int amqp_basic_publish(
191 amqp_connection_state_t state,
192 amqp_channel_t channel,
193 amqp_bytes_t exchange,
194 amqp_bytes_t routing_key,
195 amqp_boolean_t mandatory,
196 amqp_boolean_t immediate,
197 struct amqp_basic_properties_t_ const *properties,
198 amqp_bytes_t body) {
199 // make sure that all calls happened before publish
200 if (state->socket && state->socket->open_called &&
201 state->login_called && state->channel1 && state->channel2 && state->exchange &&
202 !FAIL_NEXT_WRITE) {
203 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
204 if (properties) {
205 if (REPLY_ACK) {
206 state->ack_list.push(amqp_basic_ack_t{state->delivery_tag++, 0});
207 } else {
208 state->nack_list.push(amqp_basic_nack_t{state->delivery_tag++, 0});
209 }
210 }
211 return AMQP_STATUS_OK;
212 }
213 return AMQP_STATUS_CONNECTION_CLOSED;
214 }
215
216 const amqp_table_t amqp_empty_table = {0, NULL};
217 const amqp_bytes_t amqp_empty_bytes = {0, NULL};
218
219 const char* amqp_error_string2(int code) {
220 static const char* str = "mock error";
221 return str;
222 }
223
224 char const* amqp_method_name(amqp_method_number_t methodNumber) {
225 static const char* str = "mock method";
226 return str;
227 }
228
229 amqp_queue_declare_ok_t* amqp_queue_declare(
230 amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue,
231 amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive,
232 amqp_boolean_t auto_delete, amqp_table_t arguments) {
233 state->queue = new amqp_queue_declare_ok_t;
234 static const char* str = "tmp-queue";
235 state->queue->queue = amqp_cstring_bytes(str);
236 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
237 return state->queue;
238 }
239
240 amqp_confirm_select_ok_t* amqp_confirm_select(amqp_connection_state_t state, amqp_channel_t channel) {
241 state->confirm = new amqp_confirm_select_ok_t;
242 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
243 return state->confirm;
244 }
245
246 int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval* tv) {
247 if (state->socket && state->socket->open_called &&
248 state->login_called && state->channel1 && state->channel2 && state->exchange &&
249 state->queue && state->consume && state->confirm && !FAIL_NEXT_READ) {
250 // "wait" for queue
251 usleep(tv->tv_sec*1000000+tv->tv_usec);
252 // read from queue
253 if (REPLY_ACK) {
254 if (state->ack_list.pop(state->ack)) {
255 decoded_frame->frame_type = AMQP_FRAME_METHOD;
256 decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
257 decoded_frame->payload.method.decoded = &state->ack;
258 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
259 return AMQP_STATUS_OK;
260 } else {
261 // queue is empty
262 return AMQP_STATUS_TIMEOUT;
263 }
264 } else {
265 if (state->nack_list.pop(state->nack)) {
266 decoded_frame->frame_type = AMQP_FRAME_METHOD;
267 decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
268 decoded_frame->payload.method.decoded = &state->nack;
269 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
270 return AMQP_STATUS_OK;
271 } else {
272 // queue is empty
273 return AMQP_STATUS_TIMEOUT;
274 }
275 }
276 }
277 return AMQP_STATUS_CONNECTION_CLOSED;
278 }
279
280 amqp_basic_consume_ok_t* amqp_basic_consume(
281 amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue,
282 amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack,
283 amqp_boolean_t exclusive, amqp_table_t arguments) {
284 state->consume = new amqp_basic_consume_ok_t;
285 state->reply.reply_type = AMQP_RESPONSE_NORMAL;
286 return state->consume;
287 }
288
289 // amqp_parse_url() is linked via the actual rabbitmq-c library code. see: amqp_url.c
290
291 // following functions are the actual implementation copied from rabbitmq-c library
292
293 #include <string.h>
294
295 amqp_bytes_t amqp_cstring_bytes(const char* cstr) {
296 amqp_bytes_t result;
297 result.len = strlen(cstr);
298 result.bytes = (void *)cstr;
299 return result;
300 }
301
302 void amqp_bytes_free(amqp_bytes_t bytes) { free(bytes.bytes); }
303
304 amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) {
305 amqp_bytes_t result;
306 result.len = src.len;
307 result.bytes = malloc(src.len);
308 if (result.bytes != NULL) {
309 memcpy(result.bytes, src.bytes, src.len);
310 }
311 return result;
312 }
313