]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | import logging |
2 | import json | |
3 | import tempfile | |
eafe8130 TL |
4 | import BaseHTTPServer |
5 | import SocketServer | |
6 | import random | |
7 | import threading | |
8 | import subprocess | |
9 | import socket | |
10 | import time | |
11 | import os | |
9f95a23c | 12 | from random import randint |
eafe8130 | 13 | from .tests import get_realm, \ |
11fdf7f2 TL |
14 | ZonegroupConns, \ |
15 | zonegroup_meta_checkpoint, \ | |
16 | zone_meta_checkpoint, \ | |
17 | zone_bucket_checkpoint, \ | |
18 | zone_data_checkpoint, \ | |
eafe8130 | 19 | zonegroup_bucket_checkpoint, \ |
11fdf7f2 | 20 | check_bucket_eq, \ |
eafe8130 TL |
21 | gen_bucket_name, \ |
22 | get_user, \ | |
23 | get_tenant | |
9f95a23c TL |
24 | from .zone_ps import PSTopic, \ |
25 | PSTopicS3, \ | |
26 | PSNotification, \ | |
27 | PSSubscription, \ | |
28 | PSNotificationS3, \ | |
29 | print_connection_info, \ | |
30 | delete_all_s3_topics, \ | |
31 | put_object_tagging, \ | |
32 | get_object_tagging, \ | |
33 | delete_all_objects | |
eafe8130 | 34 | from multisite import User |
11fdf7f2 TL |
35 | from nose import SkipTest |
36 | from nose.tools import assert_not_equal, assert_equal | |
9f95a23c | 37 | import boto.s3.tagging |
11fdf7f2 TL |
38 | |
39 | # configure logging for the tests module | |
eafe8130 TL |
40 | log = logging.getLogger(__name__) |
41 | ||
42 | skip_push_tests = True | |
11fdf7f2 TL |
43 | |
44 | #################################### | |
45 | # utility functions for pubsub tests | |
46 | #################################### | |
47 | ||
eafe8130 TL |
48 | def set_contents_from_string(key, content): |
49 | try: | |
50 | key.set_contents_from_string(content) | |
51 | except Exception as e: | |
9f95a23c | 52 | print('Error: ' + str(e)) |
eafe8130 TL |
53 | |
54 | ||
55 | # HTTP endpoint functions | |
56 | # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/ | |
57 | ||
58 | class HTTPPostHandler(BaseHTTPServer.BaseHTTPRequestHandler): | |
59 | """HTTP POST hanler class storing the received events in its http server""" | |
60 | def do_POST(self): | |
61 | """implementation of POST handler""" | |
62 | try: | |
63 | content_length = int(self.headers['Content-Length']) | |
64 | body = self.rfile.read(content_length) | |
65 | log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body)) | |
66 | self.server.append(json.loads(body)) | |
67 | except: | |
68 | log.error('HTTP Server received empty event') | |
69 | self.send_response(400) | |
70 | else: | |
71 | self.send_response(100) | |
72 | finally: | |
73 | self.end_headers() | |
74 | ||
75 | ||
76 | class HTTPServerWithEvents(BaseHTTPServer.HTTPServer): | |
77 | """HTTP server used by the handler to store events""" | |
78 | def __init__(self, addr, handler, worker_id): | |
79 | BaseHTTPServer.HTTPServer.__init__(self, addr, handler, False) | |
80 | self.worker_id = worker_id | |
81 | self.events = [] | |
82 | ||
83 | def append(self, event): | |
84 | self.events.append(event) | |
85 | ||
86 | ||
87 | class HTTPServerThread(threading.Thread): | |
88 | """thread for running the HTTP server. reusing the same socket for all threads""" | |
89 | def __init__(self, i, sock, addr): | |
90 | threading.Thread.__init__(self) | |
91 | self.i = i | |
92 | self.daemon = True | |
93 | self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i) | |
94 | self.httpd.socket = sock | |
95 | # prevent the HTTP server from re-binding every handler | |
96 | self.httpd.server_bind = self.server_close = lambda self: None | |
97 | self.start() | |
98 | ||
99 | def run(self): | |
100 | try: | |
101 | log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address) | |
102 | self.httpd.serve_forever() | |
103 | log.info('HTTP Server (%d) ended', self.i) | |
104 | except Exception as error: | |
105 | # could happen if the server r/w to a closing socket during shutdown | |
106 | log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error)) | |
107 | ||
108 | def close(self): | |
109 | self.httpd.shutdown() | |
110 | ||
111 | def get_events(self): | |
112 | return self.httpd.events | |
113 | ||
114 | def reset_events(self): | |
115 | self.httpd.events = [] | |
116 | ||
117 | ||
118 | class StreamingHTTPServer: | |
119 | """multi-threaded http server class also holding list of events received into the handler | |
120 | each thread has its own server, and all servers share the same socket""" | |
121 | def __init__(self, host, port, num_workers=100): | |
122 | addr = (host, port) | |
123 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
124 | self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
125 | self.sock.bind(addr) | |
126 | self.sock.listen(num_workers) | |
127 | self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)] | |
128 | ||
129 | def verify_s3_events(self, keys, exact_match=False, deletions=False): | |
130 | """verify stored s3 records agains a list of keys""" | |
131 | events = [] | |
132 | for worker in self.workers: | |
133 | events += worker.get_events() | |
134 | worker.reset_events() | |
135 | verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions) | |
136 | ||
137 | def verify_events(self, keys, exact_match=False, deletions=False): | |
138 | """verify stored events agains a list of keys""" | |
139 | events = [] | |
140 | for worker in self.workers: | |
141 | events += worker.get_events() | |
142 | worker.reset_events() | |
143 | verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions) | |
144 | ||
9f95a23c TL |
145 | def get_and_reset_events(self): |
146 | events = [] | |
147 | for worker in self.workers: | |
148 | events += worker.get_events() | |
149 | worker.reset_events() | |
150 | return events | |
151 | ||
eafe8130 TL |
152 | def close(self): |
153 | """close all workers in the http server and wait for it to finish""" | |
154 | # make sure that the shared socket is closed | |
155 | # this is needed in case that one of the threads is blocked on the socket | |
156 | self.sock.shutdown(socket.SHUT_RDWR) | |
157 | self.sock.close() | |
158 | # wait for server threads to finish | |
159 | for worker in self.workers: | |
160 | worker.close() | |
161 | worker.join() | |
162 | ||
eafe8130 TL |
163 | # AMQP endpoint functions |
164 | ||
165 | rabbitmq_port = 5672 | |
166 | ||
167 | class AMQPReceiver(object): | |
168 | """class for receiving and storing messages on a topic from the AMQP broker""" | |
169 | def __init__(self, exchange, topic): | |
170 | import pika | |
171 | hostname = get_ip() | |
172 | remaining_retries = 10 | |
173 | while remaining_retries > 0: | |
174 | try: | |
175 | connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port)) | |
176 | break | |
177 | except Exception as error: | |
178 | remaining_retries -= 1 | |
9f95a23c TL |
179 | print('failed to connect to rabbitmq (remaining retries ' |
180 | + str(remaining_retries) + '): ' + str(error)) | |
eafe8130 TL |
181 | |
182 | if remaining_retries == 0: | |
183 | raise Exception('failed to connect to rabbitmq - no retries left') | |
184 | ||
185 | self.channel = connection.channel() | |
186 | self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True) | |
187 | result = self.channel.queue_declare('', exclusive=True) | |
188 | queue_name = result.method.queue | |
189 | self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic) | |
190 | self.channel.basic_consume(queue=queue_name, | |
191 | on_message_callback=self.on_message, | |
192 | auto_ack=True) | |
193 | self.events = [] | |
194 | self.topic = topic | |
195 | ||
196 | def on_message(self, ch, method, properties, body): | |
197 | """callback invoked when a new message arrive on the topic""" | |
198 | log.info('AMQP received event for topic %s:\n %s', self.topic, body) | |
199 | self.events.append(json.loads(body)) | |
200 | ||
201 | # TODO create a base class for the AMQP and HTTP cases | |
202 | def verify_s3_events(self, keys, exact_match=False, deletions=False): | |
203 | """verify stored s3 records agains a list of keys""" | |
204 | verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) | |
205 | self.events = [] | |
206 | ||
207 | def verify_events(self, keys, exact_match=False, deletions=False): | |
208 | """verify stored events agains a list of keys""" | |
209 | verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) | |
210 | self.events = [] | |
211 | ||
212 | def get_and_reset_events(self): | |
213 | tmp = self.events | |
214 | self.events = [] | |
215 | return tmp | |
216 | ||
217 | ||
218 | def amqp_receiver_thread_runner(receiver): | |
219 | """main thread function for the amqp receiver""" | |
220 | try: | |
221 | log.info('AMQP receiver started') | |
222 | receiver.channel.start_consuming() | |
223 | log.info('AMQP receiver ended') | |
224 | except Exception as error: | |
225 | log.info('AMQP receiver ended unexpectedly: %s', str(error)) | |
226 | ||
227 | ||
228 | def create_amqp_receiver_thread(exchange, topic): | |
229 | """create amqp receiver and thread""" | |
230 | receiver = AMQPReceiver(exchange, topic) | |
231 | task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,)) | |
232 | task.daemon = True | |
233 | return task, receiver | |
234 | ||
235 | ||
236 | def stop_amqp_receiver(receiver, task): | |
237 | """stop the receiver thread and wait for it to finis""" | |
238 | try: | |
239 | receiver.channel.stop_consuming() | |
240 | log.info('stopping AMQP receiver') | |
241 | except Exception as error: | |
242 | log.info('failed to gracefuly stop AMQP receiver: %s', str(error)) | |
243 | task.join(5) | |
11fdf7f2 TL |
244 | |
245 | def check_ps_configured(): | |
246 | """check if at least one pubsub zone exist""" | |
247 | realm = get_realm() | |
248 | zonegroup = realm.master_zonegroup() | |
249 | ||
eafe8130 TL |
250 | ps_zones = zonegroup.zones_by_type.get("pubsub") |
251 | if not ps_zones: | |
11fdf7f2 TL |
252 | raise SkipTest("Requires at least one PS zone") |
253 | ||
254 | ||
255 | def is_ps_zone(zone_conn): | |
256 | """check if a specific zone is pubsub zone""" | |
257 | if not zone_conn: | |
258 | return False | |
259 | return zone_conn.zone.tier_type() == "pubsub" | |
260 | ||
261 | ||
262 | def verify_events_by_elements(events, keys, exact_match=False, deletions=False): | |
263 | """ verify there is at least one event per element """ | |
264 | err = '' | |
265 | for key in keys: | |
266 | key_found = False | |
92f5a8d4 TL |
267 | if type(events) is list: |
268 | for event_list in events: | |
269 | if key_found: | |
11fdf7f2 | 270 | break |
92f5a8d4 TL |
271 | for event in event_list['events']: |
272 | if event['info']['bucket']['name'] == key.bucket.name and \ | |
273 | event['info']['key']['name'] == key.name: | |
274 | if deletions and event['event'] == 'OBJECT_DELETE': | |
275 | key_found = True | |
276 | break | |
277 | elif not deletions and event['event'] == 'OBJECT_CREATE': | |
278 | key_found = True | |
279 | break | |
280 | else: | |
281 | for event in events['events']: | |
282 | if event['info']['bucket']['name'] == key.bucket.name and \ | |
283 | event['info']['key']['name'] == key.name: | |
284 | if deletions and event['event'] == 'OBJECT_DELETE': | |
285 | key_found = True | |
286 | break | |
287 | elif not deletions and event['event'] == 'OBJECT_CREATE': | |
288 | key_found = True | |
289 | break | |
290 | ||
11fdf7f2 TL |
291 | if not key_found: |
292 | err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) | |
293 | log.error(events) | |
294 | assert False, err | |
295 | ||
296 | if not len(events) == len(keys): | |
297 | err = 'superfluous events are found' | |
298 | log.debug(err) | |
299 | if exact_match: | |
300 | log.error(events) | |
301 | assert False, err | |
302 | ||
303 | ||
eafe8130 TL |
304 | def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False): |
305 | """ verify there is at least one record per element """ | |
306 | err = '' | |
307 | for key in keys: | |
308 | key_found = False | |
92f5a8d4 TL |
309 | if type(records) is list: |
310 | for record_list in records: | |
311 | if key_found: | |
eafe8130 | 312 | break |
92f5a8d4 TL |
313 | for record in record_list['Records']: |
314 | if record['s3']['bucket']['name'] == key.bucket.name and \ | |
315 | record['s3']['object']['key'] == key.name: | |
316 | if deletions and 'ObjectRemoved' in record['eventName']: | |
317 | key_found = True | |
318 | break | |
319 | elif not deletions and 'ObjectCreated' in record['eventName']: | |
320 | key_found = True | |
321 | break | |
322 | else: | |
323 | for record in records['Records']: | |
324 | if record['s3']['bucket']['name'] == key.bucket.name and \ | |
325 | record['s3']['object']['key'] == key.name: | |
326 | if deletions and 'ObjectRemoved' in record['eventName']: | |
327 | key_found = True | |
328 | break | |
329 | elif not deletions and 'ObjectCreated' in record['eventName']: | |
330 | key_found = True | |
331 | break | |
332 | ||
eafe8130 TL |
333 | if not key_found: |
334 | err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) | |
92f5a8d4 TL |
335 | for record_list in records: |
336 | for record in record_list['Records']: | |
337 | log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) | |
eafe8130 TL |
338 | assert False, err |
339 | ||
340 | if not len(records) == len(keys): | |
341 | err = 'superfluous records are found' | |
342 | log.warning(err) | |
343 | if exact_match: | |
92f5a8d4 TL |
344 | for record_list in records: |
345 | for record in record_list['Records']: | |
346 | log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) | |
eafe8130 TL |
347 | assert False, err |
348 | ||
349 | ||
350 | def init_rabbitmq(): | |
351 | """ start a rabbitmq broker """ | |
352 | hostname = get_ip() | |
353 | #port = str(random.randint(20000, 30000)) | |
354 | #data_dir = './' + port + '_data' | |
355 | #log_dir = './' + port + '_log' | |
356 | #print('') | |
357 | #try: | |
358 | # os.mkdir(data_dir) | |
359 | # os.mkdir(log_dir) | |
360 | #except: | |
361 | # print('rabbitmq directories already exists') | |
362 | #env = {'RABBITMQ_NODE_PORT': port, | |
363 | # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname, | |
364 | # 'RABBITMQ_USE_LONGNAME': 'true', | |
365 | # 'RABBITMQ_MNESIA_BASE': data_dir, | |
366 | # 'RABBITMQ_LOG_BASE': log_dir} | |
367 | # TODO: support multiple brokers per host using env | |
368 | # make sure we don't collide with the default | |
369 | try: | |
370 | proc = subprocess.Popen('rabbitmq-server') | |
371 | except Exception as error: | |
372 | log.info('failed to execute rabbitmq-server: %s', str(error)) | |
9f95a23c | 373 | print('failed to execute rabbitmq-server: %s' % str(error)) |
eafe8130 TL |
374 | return None |
375 | # TODO add rabbitmq checkpoint instead of sleep | |
376 | time.sleep(5) | |
377 | return proc #, port, data_dir, log_dir | |
378 | ||
379 | ||
380 | def clean_rabbitmq(proc): #, data_dir, log_dir) | |
381 | """ stop the rabbitmq broker """ | |
382 | try: | |
383 | subprocess.call(['rabbitmqctl', 'stop']) | |
384 | time.sleep(5) | |
385 | proc.terminate() | |
386 | except: | |
387 | log.info('rabbitmq server already terminated') | |
388 | # TODO: add directory cleanup once multiple brokers are supported | |
389 | #try: | |
390 | # os.rmdir(data_dir) | |
391 | # os.rmdir(log_dir) | |
392 | #except: | |
393 | # log.info('rabbitmq directories already removed') | |
394 | ||
395 | ||
9f95a23c TL |
396 | # Kafka endpoint functions |
397 | ||
398 | kafka_server = 'localhost' | |
399 | ||
400 | class KafkaReceiver(object): | |
401 | """class for receiving and storing messages on a topic from the kafka broker""" | |
402 | def __init__(self, topic, security_type): | |
403 | from kafka import KafkaConsumer | |
404 | remaining_retries = 10 | |
405 | port = 9092 | |
406 | if security_type != 'PLAINTEXT': | |
407 | security_type = 'SSL' | |
408 | port = 9093 | |
409 | while remaining_retries > 0: | |
410 | try: | |
411 | self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type) | |
412 | print('Kafka consumer created on topic: '+topic) | |
413 | break | |
414 | except Exception as error: | |
415 | remaining_retries -= 1 | |
416 | print('failed to connect to kafka (remaining retries ' | |
417 | + str(remaining_retries) + '): ' + str(error)) | |
418 | time.sleep(1) | |
419 | ||
420 | if remaining_retries == 0: | |
421 | raise Exception('failed to connect to kafka - no retries left') | |
422 | ||
423 | self.events = [] | |
424 | self.topic = topic | |
425 | self.stop = False | |
426 | ||
427 | def verify_s3_events(self, keys, exact_match=False, deletions=False): | |
428 | """verify stored s3 records agains a list of keys""" | |
429 | verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) | |
430 | self.events = [] | |
431 | ||
432 | ||
433 | def kafka_receiver_thread_runner(receiver): | |
434 | """main thread function for the kafka receiver""" | |
435 | try: | |
436 | log.info('Kafka receiver started') | |
437 | print('Kafka receiver started') | |
438 | while not receiver.stop: | |
439 | for msg in receiver.consumer: | |
440 | receiver.events.append(json.loads(msg.value)) | |
441 | timer.sleep(0.1) | |
442 | log.info('Kafka receiver ended') | |
443 | print('Kafka receiver ended') | |
444 | except Exception as error: | |
445 | log.info('Kafka receiver ended unexpectedly: %s', str(error)) | |
446 | print('Kafka receiver ended unexpectedly: ' + str(error)) | |
447 | ||
448 | ||
449 | def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'): | |
450 | """create kafka receiver and thread""" | |
451 | receiver = KafkaReceiver(topic, security_type) | |
452 | task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,)) | |
453 | task.daemon = True | |
454 | return task, receiver | |
455 | ||
456 | def stop_kafka_receiver(receiver, task): | |
457 | """stop the receiver thread and wait for it to finis""" | |
458 | receiver.stop = True | |
459 | task.join(1) | |
460 | try: | |
461 | receiver.consumer.close() | |
462 | except Exception as error: | |
463 | log.info('failed to gracefuly stop Kafka receiver: %s', str(error)) | |
464 | ||
465 | ||
466 | # follow the instruction here to create and sign a broker certificate: | |
467 | # https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka | |
468 | ||
469 | # the generated broker certificate should be stored in the java keystore for the use of the server | |
470 | # assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost" | |
471 | # following lines must be added to $KAFKA_DIR/config/server.properties | |
472 | # listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094 | |
473 | # sasl.enabled.mechanisms=PLAIN | |
474 | # ssl.keystore.location = $KAFKA_DIR/server.keystore.jks | |
475 | # ssl.keystore.password = abcdefgh | |
476 | # ssl.key.password = abcdefgh | |
477 | # ssl.truststore.location = $KAFKA_DIR/server.truststore.jks | |
478 | # ssl.truststore.password = abcdefgh | |
479 | ||
480 | # notes: | |
481 | # (1) we dont test client authentication, hence, no need to generate client keys | |
482 | # (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above | |
483 | # should be copied to: $KAFKA_DIR | |
484 | ||
485 | def init_kafka(): | |
486 | """ start kafka/zookeeper """ | |
487 | try: | |
488 | KAFKA_DIR = os.environ['KAFKA_DIR'] | |
489 | except: | |
490 | KAFKA_DIR = '' | |
491 | ||
492 | if KAFKA_DIR == '': | |
493 | log.info('KAFKA_DIR must be set to where kafka is installed') | |
494 | print('KAFKA_DIR must be set to where kafka is installed') | |
495 | return None, None, None | |
496 | ||
497 | DEVNULL = open(os.devnull, 'wb') | |
498 | ||
499 | print('\nStarting zookeeper...') | |
500 | try: | |
501 | zk_proc = subprocess.Popen([KAFKA_DIR+'bin/zookeeper-server-start.sh', KAFKA_DIR+'config/zookeeper.properties'], stdout=DEVNULL) | |
502 | except Exception as error: | |
503 | log.info('failed to execute zookeeper: %s', str(error)) | |
504 | print('failed to execute zookeeper: %s' % str(error)) | |
505 | return None, None, None | |
506 | ||
507 | time.sleep(5) | |
508 | if zk_proc.poll() is not None: | |
509 | print('zookeeper failed to start') | |
510 | return None, None, None | |
511 | print('Zookeeper started') | |
512 | print('Starting kafka...') | |
513 | kafka_log = open('./kafka.log', 'w') | |
514 | try: | |
515 | kafka_env = os.environ.copy() | |
516 | kafka_env['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR+'config/kafka_server_jaas.conf' | |
517 | kafka_proc = subprocess.Popen([ | |
518 | KAFKA_DIR+'bin/kafka-server-start.sh', | |
519 | KAFKA_DIR+'config/server.properties'], | |
520 | stdout=kafka_log, | |
521 | env=kafka_env) | |
522 | except Exception as error: | |
523 | log.info('failed to execute kafka: %s', str(error)) | |
524 | print('failed to execute kafka: %s' % str(error)) | |
525 | zk_proc.terminate() | |
526 | kafka_log.close() | |
527 | return None, None, None | |
528 | ||
529 | # TODO add kafka checkpoint instead of sleep | |
530 | time.sleep(15) | |
531 | if kafka_proc.poll() is not None: | |
532 | zk_proc.terminate() | |
533 | print('kafka failed to start. details in: ./kafka.log') | |
534 | kafka_log.close() | |
535 | return None, None, None | |
536 | ||
537 | print('Kafka started') | |
538 | return kafka_proc, zk_proc, kafka_log | |
539 | ||
540 | ||
541 | def clean_kafka(kafka_proc, zk_proc, kafka_log): | |
542 | """ stop kafka/zookeeper """ | |
543 | try: | |
544 | kafka_log.close() | |
545 | print('Shutdown Kafka...') | |
546 | kafka_proc.terminate() | |
547 | time.sleep(5) | |
548 | if kafka_proc.poll() is None: | |
549 | print('Failed to shutdown Kafka... killing') | |
550 | kafka_proc.kill() | |
551 | print('Shutdown zookeeper...') | |
552 | zk_proc.terminate() | |
553 | time.sleep(5) | |
554 | if zk_proc.poll() is None: | |
555 | print('Failed to shutdown zookeeper... killing') | |
556 | zk_proc.kill() | |
557 | except: | |
558 | log.info('kafka/zookeeper already terminated') | |
559 | ||
560 | ||
eafe8130 | 561 | def init_env(require_ps=True): |
11fdf7f2 | 562 | """initialize the environment""" |
eafe8130 TL |
563 | if require_ps: |
564 | check_ps_configured() | |
11fdf7f2 TL |
565 | |
566 | realm = get_realm() | |
567 | zonegroup = realm.master_zonegroup() | |
568 | zonegroup_conns = ZonegroupConns(zonegroup) | |
569 | ||
570 | zonegroup_meta_checkpoint(zonegroup) | |
571 | ||
9f95a23c TL |
572 | ps_zone = None |
573 | master_zone = None | |
11fdf7f2 | 574 | for conn in zonegroup_conns.zones: |
9f95a23c TL |
575 | if conn.zone == zonegroup.master_zone: |
576 | master_zone = conn | |
11fdf7f2 TL |
577 | if is_ps_zone(conn): |
578 | zone_meta_checkpoint(conn.zone) | |
9f95a23c | 579 | ps_zone = conn |
11fdf7f2 | 580 | |
9f95a23c | 581 | assert_not_equal(master_zone, None) |
eafe8130 | 582 | if require_ps: |
9f95a23c TL |
583 | assert_not_equal(ps_zone, None) |
584 | return master_zone, ps_zone | |
11fdf7f2 TL |
585 | |
586 | ||
eafe8130 TL |
587 | def get_ip(): |
588 | """ This method returns the "primary" IP on the local box (the one with a default route) | |
589 | source: https://stackoverflow.com/a/28950776/711085 | |
590 | this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """ | |
591 | s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
592 | try: | |
593 | # address should not be reachable | |
594 | s.connect(('10.255.255.255', 1)) | |
595 | ip = s.getsockname()[0] | |
596 | finally: | |
597 | s.close() | |
598 | return ip | |
599 | ||
600 | ||
11fdf7f2 TL |
601 | TOPIC_SUFFIX = "_topic" |
602 | SUB_SUFFIX = "_sub" | |
eafe8130 | 603 | NOTIFICATION_SUFFIX = "_notif" |
11fdf7f2 TL |
604 | |
605 | ############## | |
606 | # pubsub tests | |
607 | ############## | |
608 | ||
eafe8130 TL |
609 | def test_ps_info(): |
610 | """ log information for manual testing """ | |
611 | return SkipTest("only used in manual testing") | |
9f95a23c | 612 | master_zone, ps_zone = init_env() |
eafe8130 TL |
613 | realm = get_realm() |
614 | zonegroup = realm.master_zonegroup() | |
11fdf7f2 | 615 | bucket_name = gen_bucket_name() |
eafe8130 | 616 | # create bucket on the first of the rados zones |
9f95a23c | 617 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
618 | # create objects in the bucket |
619 | number_of_objects = 10 | |
620 | for i in range(number_of_objects): | |
621 | key = bucket.new_key(str(i)) | |
622 | key.set_contents_from_string('bar') | |
9f95a23c TL |
623 | print('Zonegroup: ' + zonegroup.name) |
624 | print('user: ' + get_user()) | |
625 | print('tenant: ' + get_tenant()) | |
626 | print('Master Zone') | |
627 | print_connection_info(master_zone.conn) | |
628 | print('PubSub Zone') | |
629 | print_connection_info(ps_zone.conn) | |
630 | print('Bucket: ' + bucket_name) | |
11fdf7f2 TL |
631 | |
632 | ||
eafe8130 TL |
633 | def test_ps_s3_notification_low_level(): |
634 | """ test low level implementation of s3 notifications """ | |
9f95a23c | 635 | master_zone, ps_zone = init_env() |
11fdf7f2 | 636 | bucket_name = gen_bucket_name() |
11fdf7f2 | 637 | # create bucket on the first of the rados zones |
9f95a23c | 638 | master_zone.create_bucket(bucket_name) |
11fdf7f2 | 639 | # wait for sync |
9f95a23c | 640 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
641 | # create topic |
642 | topic_name = bucket_name + TOPIC_SUFFIX | |
9f95a23c | 643 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 | 644 | result, status = topic_conf.set_config() |
11fdf7f2 | 645 | assert_equal(status/100, 2) |
11fdf7f2 | 646 | parsed_result = json.loads(result) |
eafe8130 TL |
647 | topic_arn = parsed_result['arn'] |
648 | # create s3 notification | |
649 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
650 | generated_topic_name = notification_name+'_'+topic_name | |
651 | topic_conf_list = [{'Id': notification_name, | |
652 | 'TopicArn': topic_arn, | |
653 | 'Events': ['s3:ObjectCreated:*'] | |
654 | }] | |
9f95a23c | 655 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 | 656 | _, status = s3_notification_conf.set_config() |
11fdf7f2 | 657 | assert_equal(status/100, 2) |
9f95a23c | 658 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 659 | # get auto-generated topic |
9f95a23c | 660 | generated_topic_conf = PSTopic(ps_zone.conn, generated_topic_name) |
eafe8130 TL |
661 | result, status = generated_topic_conf.get_config() |
662 | parsed_result = json.loads(result) | |
11fdf7f2 | 663 | assert_equal(status/100, 2) |
eafe8130 TL |
664 | assert_equal(parsed_result['topic']['name'], generated_topic_name) |
665 | # get auto-generated notification | |
9f95a23c | 666 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
667 | generated_topic_name) |
668 | result, status = notification_conf.get_config() | |
11fdf7f2 | 669 | parsed_result = json.loads(result) |
eafe8130 | 670 | assert_equal(status/100, 2) |
11fdf7f2 | 671 | assert_equal(len(parsed_result['topics']), 1) |
eafe8130 | 672 | # get auto-generated subscription |
9f95a23c | 673 | sub_conf = PSSubscription(ps_zone.conn, notification_name, |
eafe8130 TL |
674 | generated_topic_name) |
675 | result, status = sub_conf.get_config() | |
676 | parsed_result = json.loads(result) | |
677 | assert_equal(status/100, 2) | |
678 | assert_equal(parsed_result['topic'], generated_topic_name) | |
679 | # delete s3 notification | |
680 | _, status = s3_notification_conf.del_config(notification=notification_name) | |
681 | assert_equal(status/100, 2) | |
682 | # delete topic | |
683 | _, status = topic_conf.del_config() | |
684 | assert_equal(status/100, 2) | |
685 | ||
686 | # verify low-level cleanup | |
687 | _, status = generated_topic_conf.get_config() | |
688 | assert_equal(status, 404) | |
689 | result, status = notification_conf.get_config() | |
690 | parsed_result = json.loads(result) | |
691 | assert_equal(len(parsed_result['topics']), 0) | |
692 | # TODO should return 404 | |
693 | # assert_equal(status, 404) | |
694 | result, status = sub_conf.get_config() | |
695 | parsed_result = json.loads(result) | |
696 | assert_equal(parsed_result['topic'], '') | |
697 | # TODO should return 404 | |
698 | # assert_equal(status, 404) | |
11fdf7f2 TL |
699 | |
700 | # cleanup | |
11fdf7f2 | 701 | topic_conf.del_config() |
eafe8130 | 702 | # delete the bucket |
9f95a23c | 703 | master_zone.delete_bucket(bucket_name) |
11fdf7f2 TL |
704 | |
705 | ||
eafe8130 TL |
706 | def test_ps_s3_notification_records(): |
707 | """ test s3 records fetching """ | |
9f95a23c | 708 | master_zone, ps_zone = init_env() |
11fdf7f2 | 709 | bucket_name = gen_bucket_name() |
11fdf7f2 | 710 | # create bucket on the first of the rados zones |
9f95a23c | 711 | bucket = master_zone.create_bucket(bucket_name) |
11fdf7f2 | 712 | # wait for sync |
9f95a23c | 713 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
714 | # create topic |
715 | topic_name = bucket_name + TOPIC_SUFFIX | |
9f95a23c | 716 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 | 717 | result, status = topic_conf.set_config() |
11fdf7f2 | 718 | assert_equal(status/100, 2) |
eafe8130 TL |
719 | parsed_result = json.loads(result) |
720 | topic_arn = parsed_result['arn'] | |
721 | # create s3 notification | |
722 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
723 | topic_conf_list = [{'Id': notification_name, | |
724 | 'TopicArn': topic_arn, | |
725 | 'Events': ['s3:ObjectCreated:*'] | |
726 | }] | |
9f95a23c | 727 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
728 | _, status = s3_notification_conf.set_config() |
729 | assert_equal(status/100, 2) | |
9f95a23c | 730 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 731 | # get auto-generated subscription |
9f95a23c | 732 | sub_conf = PSSubscription(ps_zone.conn, notification_name, |
11fdf7f2 | 733 | topic_name) |
eafe8130 | 734 | _, status = sub_conf.get_config() |
11fdf7f2 | 735 | assert_equal(status/100, 2) |
11fdf7f2 TL |
736 | # create objects in the bucket |
737 | number_of_objects = 10 | |
738 | for i in range(number_of_objects): | |
739 | key = bucket.new_key(str(i)) | |
740 | key.set_contents_from_string('bar') | |
741 | # wait for sync | |
9f95a23c | 742 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
11fdf7f2 | 743 | |
eafe8130 | 744 | # get the events from the subscription |
11fdf7f2 | 745 | result, _ = sub_conf.get_events() |
92f5a8d4 TL |
746 | records = json.loads(result) |
747 | for record in records['Records']: | |
eafe8130 | 748 | log.debug(record) |
11fdf7f2 | 749 | keys = list(bucket.list()) |
eafe8130 | 750 | # TODO: use exact match |
92f5a8d4 | 751 | verify_s3_records_by_elements(records, keys, exact_match=False) |
11fdf7f2 TL |
752 | |
753 | # cleanup | |
eafe8130 | 754 | _, status = s3_notification_conf.del_config() |
11fdf7f2 | 755 | topic_conf.del_config() |
eafe8130 TL |
756 | # delete the keys |
757 | for key in bucket.list(): | |
758 | key.delete() | |
9f95a23c | 759 | master_zone.delete_bucket(bucket_name) |
11fdf7f2 TL |
760 | |
761 | ||
eafe8130 TL |
762 | def test_ps_s3_notification(): |
763 | """ test s3 notification set/get/delete """ | |
9f95a23c | 764 | master_zone, ps_zone = init_env() |
11fdf7f2 | 765 | bucket_name = gen_bucket_name() |
11fdf7f2 | 766 | # create bucket on the first of the rados zones |
9f95a23c | 767 | master_zone.create_bucket(bucket_name) |
11fdf7f2 | 768 | # wait for sync |
9f95a23c | 769 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
770 | topic_name = bucket_name + TOPIC_SUFFIX |
771 | # create topic | |
772 | topic_name = bucket_name + TOPIC_SUFFIX | |
9f95a23c | 773 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 | 774 | response, status = topic_conf.set_config() |
11fdf7f2 | 775 | assert_equal(status/100, 2) |
eafe8130 TL |
776 | parsed_result = json.loads(response) |
777 | topic_arn = parsed_result['arn'] | |
778 | # create one s3 notification | |
779 | notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' | |
780 | topic_conf_list = [{'Id': notification_name1, | |
781 | 'TopicArn': topic_arn, | |
782 | 'Events': ['s3:ObjectCreated:*'] | |
783 | }] | |
9f95a23c | 784 | s3_notification_conf1 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 | 785 | response, status = s3_notification_conf1.set_config() |
11fdf7f2 | 786 | assert_equal(status/100, 2) |
eafe8130 TL |
787 | # create another s3 notification with the same topic |
788 | notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' | |
789 | topic_conf_list = [{'Id': notification_name2, | |
790 | 'TopicArn': topic_arn, | |
791 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] | |
792 | }] | |
9f95a23c | 793 | s3_notification_conf2 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 | 794 | response, status = s3_notification_conf2.set_config() |
11fdf7f2 | 795 | assert_equal(status/100, 2) |
9f95a23c | 796 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
797 | |
798 | # get all notification on a bucket | |
799 | response, status = s3_notification_conf1.get_config() | |
11fdf7f2 | 800 | assert_equal(status/100, 2) |
eafe8130 TL |
801 | assert_equal(len(response['TopicConfigurations']), 2) |
802 | assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn) | |
803 | assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn) | |
804 | ||
805 | # get specific notification on a bucket | |
806 | response, status = s3_notification_conf1.get_config(notification=notification_name1) | |
11fdf7f2 | 807 | assert_equal(status/100, 2) |
eafe8130 TL |
808 | assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) |
809 | assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1) | |
810 | response, status = s3_notification_conf2.get_config(notification=notification_name2) | |
11fdf7f2 | 811 | assert_equal(status/100, 2) |
eafe8130 TL |
812 | assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) |
813 | assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2) | |
814 | ||
815 | # delete specific notifications | |
816 | _, status = s3_notification_conf1.del_config(notification=notification_name1) | |
817 | assert_equal(status/100, 2) | |
818 | _, status = s3_notification_conf2.del_config(notification=notification_name2) | |
819 | assert_equal(status/100, 2) | |
820 | ||
821 | # cleanup | |
822 | topic_conf.del_config() | |
823 | # delete the bucket | |
9f95a23c TL |
824 | master_zone.delete_bucket(bucket_name) |
825 | ||
eafe8130 TL |
826 | |
827 | def test_ps_s3_topic_on_master(): | |
9f95a23c TL |
828 | """ test s3 topics set/get/delete on master """ |
829 | master_zone, _ = init_env(require_ps=False) | |
eafe8130 TL |
830 | realm = get_realm() |
831 | zonegroup = realm.master_zonegroup() | |
832 | bucket_name = gen_bucket_name() | |
833 | topic_name = bucket_name + TOPIC_SUFFIX | |
834 | ||
835 | # clean all topics | |
9f95a23c | 836 | delete_all_s3_topics(master_zone, zonegroup.name) |
eafe8130 TL |
837 | |
838 | # create s3 topics | |
839 | endpoint_address = 'amqp://127.0.0.1:7001' | |
840 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
9f95a23c | 841 | topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
842 | topic_arn = topic_conf1.set_config() |
843 | assert_equal(topic_arn, | |
844 | 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1') | |
845 | ||
846 | endpoint_address = 'http://127.0.0.1:9001' | |
847 | endpoint_args = 'push-endpoint='+endpoint_address | |
9f95a23c | 848 | topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
849 | topic_arn = topic_conf2.set_config() |
850 | assert_equal(topic_arn, | |
851 | 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2') | |
852 | endpoint_address = 'http://127.0.0.1:9002' | |
853 | endpoint_args = 'push-endpoint='+endpoint_address | |
9f95a23c | 854 | topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
855 | topic_arn = topic_conf3.set_config() |
856 | assert_equal(topic_arn, | |
857 | 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3') | |
858 | ||
859 | # get topic 3 | |
860 | result, status = topic_conf3.get_config() | |
861 | assert_equal(status, 200) | |
862 | assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) | |
863 | assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) | |
864 | # Note that endpoint args may be ordered differently in the result | |
865 | ||
866 | # delete topic 1 | |
867 | result = topic_conf1.del_config() | |
868 | assert_equal(status, 200) | |
869 | ||
870 | # try to get a deleted topic | |
871 | _, status = topic_conf1.get_config() | |
872 | assert_equal(status, 404) | |
873 | ||
874 | # get the remaining 2 topics | |
9f95a23c TL |
875 | result, status = topic_conf1.get_list() |
876 | assert_equal(status, 200) | |
877 | assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2) | |
eafe8130 TL |
878 | |
879 | # delete topics | |
880 | result = topic_conf2.del_config() | |
881 | # TODO: should be 200OK | |
882 | # assert_equal(status, 200) | |
883 | result = topic_conf3.del_config() | |
884 | # TODO: should be 200OK | |
885 | # assert_equal(status, 200) | |
886 | ||
887 | # get topic list, make sure it is empty | |
9f95a23c TL |
888 | result, status = topic_conf1.get_list() |
889 | assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None) | |
890 | ||
891 | ||
892 | def test_ps_s3_topic_with_secret_on_master(): | |
893 | """ test s3 topics with secret set/get/delete on master """ | |
894 | master_zone, _ = init_env(require_ps=False) | |
895 | if master_zone.secure_conn is None: | |
896 | return SkipTest('secure connection is needed to test topic with secrets') | |
897 | ||
898 | realm = get_realm() | |
899 | zonegroup = realm.master_zonegroup() | |
900 | bucket_name = gen_bucket_name() | |
901 | topic_name = bucket_name + TOPIC_SUFFIX | |
902 | ||
903 | # clean all topics | |
904 | delete_all_s3_topics(master_zone, zonegroup.name) | |
905 | ||
906 | # create s3 topics | |
907 | endpoint_address = 'amqp://user:password@127.0.0.1:7001' | |
908 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
909 | bad_topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) | |
910 | try: | |
911 | result = bad_topic_conf.set_config() | |
912 | except Exception as err: | |
913 | print('Error is expected: ' + str(err)) | |
914 | else: | |
915 | assert False, 'user password configuration set allowed only over HTTPS' | |
916 | ||
917 | topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) | |
918 | topic_arn = topic_conf.set_config() | |
919 | ||
920 | assert_equal(topic_arn, | |
921 | 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name) | |
922 | ||
923 | _, status = bad_topic_conf.get_config() | |
924 | assert_equal(status/100, 4) | |
925 | ||
926 | # get topic | |
927 | result, status = topic_conf.get_config() | |
928 | assert_equal(status, 200) | |
929 | assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) | |
930 | assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) | |
931 | ||
932 | _, status = bad_topic_conf.get_config() | |
933 | assert_equal(status/100, 4) | |
934 | ||
935 | _, status = topic_conf.get_list() | |
936 | assert_equal(status/100, 2) | |
937 | ||
938 | # delete topics | |
939 | result = topic_conf.del_config() | |
eafe8130 TL |
940 | |
941 | ||
942 | def test_ps_s3_notification_on_master(): | |
943 | """ test s3 notification set/get/delete on master """ | |
9f95a23c | 944 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
945 | realm = get_realm() |
946 | zonegroup = realm.master_zonegroup() | |
947 | bucket_name = gen_bucket_name() | |
948 | # create bucket | |
9f95a23c | 949 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
950 | topic_name = bucket_name + TOPIC_SUFFIX |
951 | # create s3 topic | |
952 | endpoint_address = 'amqp://127.0.0.1:7001' | |
953 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
9f95a23c | 954 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
955 | topic_arn = topic_conf.set_config() |
956 | # create s3 notification | |
957 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
958 | topic_conf_list = [{'Id': notification_name+'_1', | |
959 | 'TopicArn': topic_arn, | |
960 | 'Events': ['s3:ObjectCreated:*'] | |
961 | }, | |
962 | {'Id': notification_name+'_2', | |
963 | 'TopicArn': topic_arn, | |
964 | 'Events': ['s3:ObjectRemoved:*'] | |
965 | }, | |
966 | {'Id': notification_name+'_3', | |
967 | 'TopicArn': topic_arn, | |
968 | 'Events': [] | |
969 | }] | |
9f95a23c | 970 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
971 | _, status = s3_notification_conf.set_config() |
972 | assert_equal(status/100, 2) | |
973 | ||
974 | # get notifications on a bucket | |
975 | response, status = s3_notification_conf.get_config(notification=notification_name+'_1') | |
976 | assert_equal(status/100, 2) | |
977 | assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) | |
978 | ||
979 | # delete specific notifications | |
980 | _, status = s3_notification_conf.del_config(notification=notification_name+'_1') | |
981 | assert_equal(status/100, 2) | |
982 | ||
983 | # get the remaining 2 notifications on a bucket | |
984 | response, status = s3_notification_conf.get_config() | |
985 | assert_equal(status/100, 2) | |
986 | assert_equal(len(response['TopicConfigurations']), 2) | |
987 | assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn) | |
988 | assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn) | |
989 | ||
990 | # delete remaining notifications | |
991 | _, status = s3_notification_conf.del_config() | |
992 | assert_equal(status/100, 2) | |
993 | ||
994 | # make sure that the notifications are now deleted | |
995 | _, status = s3_notification_conf.get_config() | |
996 | ||
997 | # cleanup | |
998 | topic_conf.del_config() | |
999 | # delete the bucket | |
9f95a23c | 1000 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
1001 | |
1002 | ||
1003 | def ps_s3_notification_filter(on_master): | |
1004 | """ test s3 notification filter on master """ | |
1005 | if skip_push_tests: | |
1006 | return SkipTest("PubSub push tests don't run in teuthology") | |
1007 | hostname = get_ip() | |
1008 | proc = init_rabbitmq() | |
1009 | if proc is None: | |
1010 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
1011 | if on_master: | |
9f95a23c TL |
1012 | master_zone, _ = init_env(require_ps=False) |
1013 | ps_zone = master_zone | |
eafe8130 | 1014 | else: |
9f95a23c TL |
1015 | master_zone, ps_zone = init_env(require_ps=True) |
1016 | ps_zone = ps_zone | |
eafe8130 TL |
1017 | |
1018 | realm = get_realm() | |
1019 | zonegroup = realm.master_zonegroup() | |
1020 | ||
1021 | # create bucket | |
1022 | bucket_name = gen_bucket_name() | |
9f95a23c | 1023 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
1024 | topic_name = bucket_name + TOPIC_SUFFIX |
1025 | ||
1026 | # start amqp receivers | |
1027 | exchange = 'ex1' | |
1028 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
1029 | task.start() | |
1030 | ||
1031 | # create s3 topic | |
1032 | endpoint_address = 'amqp://' + hostname | |
1033 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
1034 | if on_master: | |
1035 | topic_conf = PSTopicS3(ps_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) | |
1036 | topic_arn = topic_conf.set_config() | |
1037 | else: | |
1038 | topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args) | |
1039 | result, _ = topic_conf.set_config() | |
1040 | parsed_result = json.loads(result) | |
1041 | topic_arn = parsed_result['arn'] | |
1042 | zone_meta_checkpoint(ps_zone.zone) | |
1043 | ||
1044 | # create s3 notification | |
1045 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1046 | topic_conf_list = [{'Id': notification_name+'_1', | |
1047 | 'TopicArn': topic_arn, | |
1048 | 'Events': ['s3:ObjectCreated:*'], | |
1049 | 'Filter': { | |
1050 | 'Key': { | |
1051 | 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}] | |
1052 | } | |
1053 | } | |
1054 | }, | |
1055 | {'Id': notification_name+'_2', | |
1056 | 'TopicArn': topic_arn, | |
1057 | 'Events': ['s3:ObjectCreated:*'], | |
1058 | 'Filter': { | |
1059 | 'Key': { | |
1060 | 'FilterRules': [{'Name': 'prefix', 'Value': 'world'}, | |
1061 | {'Name': 'suffix', 'Value': 'log'}] | |
1062 | } | |
1063 | } | |
1064 | }, | |
1065 | {'Id': notification_name+'_3', | |
1066 | 'TopicArn': topic_arn, | |
1067 | 'Events': [], | |
1068 | 'Filter': { | |
1069 | 'Key': { | |
1070 | 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}] | |
1071 | } | |
1072 | } | |
1073 | }] | |
1074 | ||
1075 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
1076 | result, status = s3_notification_conf.set_config() | |
1077 | assert_equal(status/100, 2) | |
1078 | ||
1079 | if on_master: | |
1080 | topic_conf_list = [{'Id': notification_name+'_4', | |
1081 | 'TopicArn': topic_arn, | |
1082 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], | |
1083 | 'Filter': { | |
1084 | 'Metadata': { | |
1085 | 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'}, | |
1086 | {'Name': 'x-amz-meta-hello', 'Value': 'world'}] | |
1087 | }, | |
1088 | 'Key': { | |
1089 | 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}] | |
1090 | } | |
1091 | } | |
1092 | }] | |
1093 | ||
1094 | try: | |
1095 | s3_notification_conf4 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
1096 | _, status = s3_notification_conf4.set_config() | |
1097 | assert_equal(status/100, 2) | |
1098 | skip_notif4 = False | |
1099 | except Exception as error: | |
9f95a23c | 1100 | print('note: metadata filter is not supported by boto3 - skipping test') |
eafe8130 TL |
1101 | skip_notif4 = True |
1102 | else: | |
9f95a23c | 1103 | print('filtering by attributes only supported on master zone') |
eafe8130 TL |
1104 | skip_notif4 = True |
1105 | ||
1106 | ||
1107 | # get all notifications | |
1108 | result, status = s3_notification_conf.get_config() | |
1109 | assert_equal(status/100, 2) | |
1110 | for conf in result['TopicConfigurations']: | |
1111 | filter_name = conf['Filter']['Key']['FilterRules'][0]['Name'] | |
1112 | assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name | |
1113 | ||
1114 | if not skip_notif4: | |
1115 | result, status = s3_notification_conf4.get_config(notification=notification_name+'_4') | |
1116 | assert_equal(status/100, 2) | |
1117 | filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name'] | |
1118 | assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello' | |
1119 | ||
1120 | expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello'] | |
1121 | expected_in2 = ['world1.log', 'world2log', 'world3.log'] | |
1122 | expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt'] | |
1123 | expected_in4 = ['foo', 'bar', 'hello', 'world'] | |
1124 | filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt'] | |
1125 | filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld'] | |
1126 | # create objects in bucket | |
1127 | for key_name in expected_in1: | |
1128 | key = bucket.new_key(key_name) | |
1129 | key.set_contents_from_string('bar') | |
1130 | for key_name in expected_in2: | |
1131 | key = bucket.new_key(key_name) | |
1132 | key.set_contents_from_string('bar') | |
1133 | for key_name in expected_in3: | |
1134 | key = bucket.new_key(key_name) | |
1135 | key.set_contents_from_string('bar') | |
1136 | if not skip_notif4: | |
1137 | for key_name in expected_in4: | |
1138 | key = bucket.new_key(key_name) | |
1139 | key.set_metadata('foo', 'bar') | |
1140 | key.set_metadata('hello', 'world') | |
1141 | key.set_metadata('goodbye', 'cruel world') | |
1142 | key.set_contents_from_string('bar') | |
1143 | for key_name in filtered: | |
1144 | key = bucket.new_key(key_name) | |
1145 | key.set_contents_from_string('bar') | |
1146 | for key_name in filtered_with_attr: | |
1147 | key.set_metadata('foo', 'nobar') | |
1148 | key.set_metadata('hello', 'noworld') | |
1149 | key.set_metadata('goodbye', 'cruel world') | |
1150 | key = bucket.new_key(key_name) | |
1151 | key.set_contents_from_string('bar') | |
1152 | ||
1153 | if on_master: | |
9f95a23c | 1154 | print('wait for 5sec for the messages...') |
eafe8130 TL |
1155 | time.sleep(5) |
1156 | else: | |
9f95a23c | 1157 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
1158 | |
1159 | found_in1 = [] | |
1160 | found_in2 = [] | |
1161 | found_in3 = [] | |
1162 | found_in4 = [] | |
1163 | ||
1164 | for event in receiver.get_and_reset_events(): | |
92f5a8d4 TL |
1165 | notif_id = event['Records'][0]['s3']['configurationId'] |
1166 | key_name = event['Records'][0]['s3']['object']['key'] | |
eafe8130 TL |
1167 | if notif_id == notification_name+'_1': |
1168 | found_in1.append(key_name) | |
1169 | elif notif_id == notification_name+'_2': | |
1170 | found_in2.append(key_name) | |
1171 | elif notif_id == notification_name+'_3': | |
1172 | found_in3.append(key_name) | |
1173 | elif not skip_notif4 and notif_id == notification_name+'_4': | |
1174 | found_in4.append(key_name) | |
1175 | else: | |
1176 | assert False, 'invalid notification: ' + notif_id | |
1177 | ||
1178 | assert_equal(set(found_in1), set(expected_in1)) | |
1179 | assert_equal(set(found_in2), set(expected_in2)) | |
1180 | assert_equal(set(found_in3), set(expected_in3)) | |
1181 | if not skip_notif4: | |
1182 | assert_equal(set(found_in4), set(expected_in4)) | |
1183 | ||
1184 | # cleanup | |
1185 | s3_notification_conf.del_config() | |
1186 | if not skip_notif4: | |
1187 | s3_notification_conf4.del_config() | |
1188 | topic_conf.del_config() | |
1189 | # delete the bucket | |
1190 | for key in bucket.list(): | |
1191 | key.delete() | |
9f95a23c | 1192 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
1193 | stop_amqp_receiver(receiver, task) |
1194 | clean_rabbitmq(proc) | |
1195 | ||
1196 | ||
1197 | def test_ps_s3_notification_filter_on_master(): | |
1198 | ps_s3_notification_filter(on_master=True) | |
1199 | ||
1200 | ||
1201 | def test_ps_s3_notification_filter(): | |
1202 | ps_s3_notification_filter(on_master=False) | |
1203 | ||
1204 | ||
1205 | def test_ps_s3_notification_errors_on_master(): | |
1206 | """ test s3 notification set/get/delete on master """ | |
9f95a23c | 1207 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
1208 | realm = get_realm() |
1209 | zonegroup = realm.master_zonegroup() | |
1210 | bucket_name = gen_bucket_name() | |
1211 | # create bucket | |
9f95a23c | 1212 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
1213 | topic_name = bucket_name + TOPIC_SUFFIX |
1214 | # create s3 topic | |
1215 | endpoint_address = 'amqp://127.0.0.1:7001' | |
1216 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
9f95a23c | 1217 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
1218 | topic_arn = topic_conf.set_config() |
1219 | ||
1220 | # create s3 notification with invalid event name | |
1221 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1222 | topic_conf_list = [{'Id': notification_name, | |
1223 | 'TopicArn': topic_arn, | |
1224 | 'Events': ['s3:ObjectCreated:Kaboom'] | |
1225 | }] | |
9f95a23c | 1226 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
1227 | try: |
1228 | result, status = s3_notification_conf.set_config() | |
1229 | except Exception as error: | |
9f95a23c | 1230 | print(str(error) + ' - is expected') |
eafe8130 TL |
1231 | else: |
1232 | assert False, 'invalid event name is expected to fail' | |
1233 | ||
1234 | # create s3 notification with missing name | |
1235 | topic_conf_list = [{'Id': '', | |
1236 | 'TopicArn': topic_arn, | |
1237 | 'Events': ['s3:ObjectCreated:Put'] | |
1238 | }] | |
9f95a23c | 1239 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
1240 | try: |
1241 | _, _ = s3_notification_conf.set_config() | |
1242 | except Exception as error: | |
9f95a23c | 1243 | print(str(error) + ' - is expected') |
eafe8130 TL |
1244 | else: |
1245 | assert False, 'missing notification name is expected to fail' | |
1246 | ||
1247 | # create s3 notification with invalid topic ARN | |
1248 | invalid_topic_arn = 'kaboom' | |
1249 | topic_conf_list = [{'Id': notification_name, | |
1250 | 'TopicArn': invalid_topic_arn, | |
1251 | 'Events': ['s3:ObjectCreated:Put'] | |
1252 | }] | |
9f95a23c | 1253 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
1254 | try: |
1255 | _, _ = s3_notification_conf.set_config() | |
1256 | except Exception as error: | |
9f95a23c | 1257 | print(str(error) + ' - is expected') |
eafe8130 TL |
1258 | else: |
1259 | assert False, 'invalid ARN is expected to fail' | |
1260 | ||
1261 | # create s3 notification with unknown topic ARN | |
1262 | invalid_topic_arn = 'arn:aws:sns:a::kaboom' | |
1263 | topic_conf_list = [{'Id': notification_name, | |
1264 | 'TopicArn': invalid_topic_arn , | |
1265 | 'Events': ['s3:ObjectCreated:Put'] | |
1266 | }] | |
9f95a23c | 1267 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
1268 | try: |
1269 | _, _ = s3_notification_conf.set_config() | |
1270 | except Exception as error: | |
9f95a23c | 1271 | print(str(error) + ' - is expected') |
eafe8130 TL |
1272 | else: |
1273 | assert False, 'unknown topic is expected to fail' | |
1274 | ||
1275 | # create s3 notification with wrong bucket | |
1276 | topic_conf_list = [{'Id': notification_name, | |
1277 | 'TopicArn': topic_arn, | |
1278 | 'Events': ['s3:ObjectCreated:Put'] | |
1279 | }] | |
9f95a23c | 1280 | s3_notification_conf = PSNotificationS3(master_zone.conn, 'kaboom', topic_conf_list) |
eafe8130 TL |
1281 | try: |
1282 | _, _ = s3_notification_conf.set_config() | |
1283 | except Exception as error: | |
9f95a23c | 1284 | print(str(error) + ' - is expected') |
eafe8130 TL |
1285 | else: |
1286 | assert False, 'unknown bucket is expected to fail' | |
1287 | ||
1288 | topic_conf.del_config() | |
1289 | ||
1290 | status = topic_conf.del_config() | |
1291 | # deleting an unknown notification is not considered an error | |
1292 | assert_equal(status, 200) | |
1293 | ||
1294 | _, status = topic_conf.get_config() | |
1295 | assert_equal(status, 404) | |
1296 | ||
1297 | # cleanup | |
1298 | # delete the bucket | |
9f95a23c | 1299 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
1300 | |
1301 | ||
1302 | def test_objcet_timing(): | |
1303 | return SkipTest("only used in manual testing") | |
9f95a23c | 1304 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
1305 | |
1306 | # create bucket | |
1307 | bucket_name = gen_bucket_name() | |
9f95a23c | 1308 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 1309 | # create objects in the bucket (async) |
9f95a23c | 1310 | print('creating objects...') |
eafe8130 TL |
1311 | number_of_objects = 1000 |
1312 | client_threads = [] | |
1313 | start_time = time.time() | |
1314 | content = str(bytearray(os.urandom(1024*1024))) | |
1315 | for i in range(number_of_objects): | |
1316 | key = bucket.new_key(str(i)) | |
1317 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1318 | thr.start() | |
1319 | client_threads.append(thr) | |
1320 | [thr.join() for thr in client_threads] | |
1321 | ||
1322 | time_diff = time.time() - start_time | |
9f95a23c | 1323 | print('average time for object creation: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') |
eafe8130 | 1324 | |
9f95a23c | 1325 | print('total number of objects: ' + str(len(list(bucket.list())))) |
eafe8130 | 1326 | |
9f95a23c | 1327 | print('deleting objects...') |
eafe8130 TL |
1328 | client_threads = [] |
1329 | start_time = time.time() | |
1330 | for key in bucket.list(): | |
1331 | thr = threading.Thread(target = key.delete, args=()) | |
1332 | thr.start() | |
1333 | client_threads.append(thr) | |
1334 | [thr.join() for thr in client_threads] | |
1335 | ||
1336 | time_diff = time.time() - start_time | |
9f95a23c | 1337 | print('average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') |
eafe8130 TL |
1338 | |
1339 | # cleanup | |
9f95a23c | 1340 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
1341 | |
1342 | ||
1343 | def test_ps_s3_notification_push_amqp_on_master(): | |
1344 | """ test pushing amqp s3 notification on master """ | |
1345 | if skip_push_tests: | |
1346 | return SkipTest("PubSub push tests don't run in teuthology") | |
1347 | hostname = get_ip() | |
1348 | proc = init_rabbitmq() | |
1349 | if proc is None: | |
1350 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
9f95a23c | 1351 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
1352 | realm = get_realm() |
1353 | zonegroup = realm.master_zonegroup() | |
1354 | ||
1355 | # create bucket | |
1356 | bucket_name = gen_bucket_name() | |
9f95a23c | 1357 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
1358 | topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' |
1359 | topic_name2 = bucket_name + TOPIC_SUFFIX + '_2' | |
1360 | ||
1361 | # start amqp receivers | |
1362 | exchange = 'ex1' | |
1363 | task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1) | |
1364 | task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2) | |
1365 | task1.start() | |
1366 | task2.start() | |
1367 | ||
1368 | # create two s3 topic | |
1369 | endpoint_address = 'amqp://' + hostname | |
1370 | # with acks from broker | |
1371 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
9f95a23c | 1372 | topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
1373 | topic_arn1 = topic_conf1.set_config() |
1374 | # without acks from broker | |
1375 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none' | |
9f95a23c | 1376 | topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
1377 | topic_arn2 = topic_conf2.set_config() |
1378 | # create s3 notification | |
1379 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1380 | topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, | |
1381 | 'Events': [] | |
1382 | }, | |
1383 | {'Id': notification_name+'_2', 'TopicArn': topic_arn2, | |
1384 | 'Events': ['s3:ObjectCreated:*'] | |
1385 | }] | |
9f95a23c TL |
1386 | |
1387 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) | |
1388 | response, status = s3_notification_conf.set_config() | |
1389 | assert_equal(status/100, 2) | |
1390 | ||
1391 | # create objects in the bucket (async) | |
1392 | number_of_objects = 100 | |
1393 | client_threads = [] | |
1394 | start_time = time.time() | |
1395 | for i in range(number_of_objects): | |
1396 | key = bucket.new_key(str(i)) | |
1397 | content = str(os.urandom(1024*1024)) | |
1398 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1399 | thr.start() | |
1400 | client_threads.append(thr) | |
1401 | [thr.join() for thr in client_threads] | |
1402 | ||
1403 | time_diff = time.time() - start_time | |
1404 | print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1405 | ||
1406 | print('wait for 5sec for the messages...') | |
1407 | time.sleep(5) | |
1408 | ||
1409 | # check amqp receiver | |
1410 | keys = list(bucket.list()) | |
1411 | print('total number of objects: ' + str(len(keys))) | |
1412 | receiver1.verify_s3_events(keys, exact_match=True) | |
1413 | receiver2.verify_s3_events(keys, exact_match=True) | |
1414 | ||
1415 | # delete objects from the bucket | |
1416 | client_threads = [] | |
1417 | start_time = time.time() | |
1418 | for key in bucket.list(): | |
1419 | thr = threading.Thread(target = key.delete, args=()) | |
1420 | thr.start() | |
1421 | client_threads.append(thr) | |
1422 | [thr.join() for thr in client_threads] | |
1423 | ||
1424 | time_diff = time.time() - start_time | |
1425 | print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1426 | ||
1427 | print('wait for 5sec for the messages...') | |
1428 | time.sleep(5) | |
1429 | ||
1430 | # check amqp receiver 1 for deletions | |
1431 | receiver1.verify_s3_events(keys, exact_match=True, deletions=True) | |
1432 | # check amqp receiver 2 has no deletions | |
1433 | try: | |
1434 | receiver1.verify_s3_events(keys, exact_match=False, deletions=True) | |
1435 | except: | |
1436 | pass | |
1437 | else: | |
1438 | err = 'amqp receiver 2 should have no deletions' | |
1439 | assert False, err | |
1440 | ||
1441 | # cleanup | |
1442 | stop_amqp_receiver(receiver1, task1) | |
1443 | stop_amqp_receiver(receiver2, task2) | |
1444 | s3_notification_conf.del_config() | |
1445 | topic_conf1.del_config() | |
1446 | topic_conf2.del_config() | |
1447 | # delete the bucket | |
1448 | master_zone.delete_bucket(bucket_name) | |
1449 | clean_rabbitmq(proc) | |
1450 | ||
1451 | ||
1452 | def test_ps_s3_notification_push_kafka(): | |
1453 | """ test pushing kafka s3 notification on master """ | |
1454 | if skip_push_tests: | |
1455 | return SkipTest("PubSub push tests don't run in teuthology") | |
1456 | kafka_proc, zk_proc, kafka_log = init_kafka() | |
1457 | if kafka_proc is None or zk_proc is None: | |
1458 | return SkipTest('end2end kafka tests require kafka/zookeeper installed') | |
1459 | ||
1460 | master_zone, ps_zone = init_env() | |
1461 | realm = get_realm() | |
1462 | zonegroup = realm.master_zonegroup() | |
1463 | ||
1464 | # create bucket | |
1465 | bucket_name = gen_bucket_name() | |
1466 | bucket = master_zone.create_bucket(bucket_name) | |
1467 | # wait for sync | |
1468 | zone_meta_checkpoint(ps_zone.zone) | |
1469 | # name is constant for manual testing | |
1470 | topic_name = bucket_name+'_topic' | |
1471 | # create consumer on the topic | |
1472 | task, receiver = create_kafka_receiver_thread(topic_name) | |
1473 | task.start() | |
1474 | ||
1475 | # create topic | |
1476 | topic_conf = PSTopic(ps_zone.conn, topic_name, | |
1477 | endpoint='kafka://' + kafka_server, | |
1478 | endpoint_args='kafka-ack-level=broker') | |
1479 | result, status = topic_conf.set_config() | |
1480 | assert_equal(status/100, 2) | |
1481 | parsed_result = json.loads(result) | |
1482 | topic_arn = parsed_result['arn'] | |
1483 | # create s3 notification | |
1484 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1485 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, | |
1486 | 'Events': [] | |
1487 | }] | |
1488 | ||
1489 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
1490 | response, status = s3_notification_conf.set_config() | |
1491 | assert_equal(status/100, 2) | |
1492 | ||
1493 | # create objects in the bucket (async) | |
1494 | number_of_objects = 10 | |
1495 | client_threads = [] | |
1496 | for i in range(number_of_objects): | |
1497 | key = bucket.new_key(str(i)) | |
1498 | content = str(os.urandom(1024*1024)) | |
1499 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1500 | thr.start() | |
1501 | client_threads.append(thr) | |
1502 | [thr.join() for thr in client_threads] | |
1503 | ||
1504 | # wait for sync | |
1505 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
1506 | keys = list(bucket.list()) | |
1507 | receiver.verify_s3_events(keys, exact_match=True) | |
1508 | ||
1509 | # delete objects from the bucket | |
1510 | client_threads = [] | |
1511 | for key in bucket.list(): | |
1512 | thr = threading.Thread(target = key.delete, args=()) | |
1513 | thr.start() | |
1514 | client_threads.append(thr) | |
1515 | [thr.join() for thr in client_threads] | |
1516 | ||
1517 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
1518 | receiver.verify_s3_events(keys, exact_match=True, deletions=True) | |
1519 | ||
1520 | # cleanup | |
1521 | s3_notification_conf.del_config() | |
1522 | topic_conf.del_config() | |
1523 | # delete the bucket | |
1524 | master_zone.delete_bucket(bucket_name) | |
1525 | stop_kafka_receiver(receiver, task) | |
1526 | clean_kafka(kafka_proc, zk_proc, kafka_log) | |
1527 | ||
1528 | ||
1529 | def test_ps_s3_notification_push_kafka_on_master(): | |
1530 | """ test pushing kafka s3 notification on master """ | |
1531 | if skip_push_tests: | |
1532 | return SkipTest("PubSub push tests don't run in teuthology") | |
1533 | kafka_proc, zk_proc, kafka_log = init_kafka() | |
1534 | if kafka_proc is None or zk_proc is None: | |
1535 | return SkipTest('end2end kafka tests require kafka/zookeeper installed') | |
1536 | master_zone, _ = init_env(require_ps=False) | |
1537 | realm = get_realm() | |
1538 | zonegroup = realm.master_zonegroup() | |
1539 | ||
1540 | # create bucket | |
1541 | bucket_name = gen_bucket_name() | |
1542 | bucket = master_zone.create_bucket(bucket_name) | |
1543 | # name is constant for manual testing | |
1544 | topic_name = bucket_name+'_topic' | |
1545 | # create consumer on the topic | |
1546 | task, receiver = create_kafka_receiver_thread(topic_name+'_1') | |
1547 | task.start() | |
1548 | ||
1549 | # create s3 topic | |
1550 | endpoint_address = 'kafka://' + kafka_server | |
1551 | # without acks from broker | |
1552 | endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' | |
1553 | topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) | |
1554 | topic_arn1 = topic_conf1.set_config() | |
1555 | endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none' | |
1556 | topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) | |
1557 | topic_arn2 = topic_conf2.set_config() | |
1558 | # create s3 notification | |
1559 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1560 | topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, | |
1561 | 'Events': [] | |
1562 | }, | |
1563 | {'Id': notification_name + '_2', 'TopicArn': topic_arn2, | |
1564 | 'Events': [] | |
1565 | }] | |
1566 | ||
1567 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) | |
1568 | response, status = s3_notification_conf.set_config() | |
1569 | assert_equal(status/100, 2) | |
1570 | ||
1571 | # create objects in the bucket (async) | |
1572 | number_of_objects = 10 | |
1573 | client_threads = [] | |
1574 | start_time = time.time() | |
1575 | for i in range(number_of_objects): | |
1576 | key = bucket.new_key(str(i)) | |
1577 | content = str(os.urandom(1024*1024)) | |
1578 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1579 | thr.start() | |
1580 | client_threads.append(thr) | |
1581 | [thr.join() for thr in client_threads] | |
1582 | ||
1583 | time_diff = time.time() - start_time | |
1584 | print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1585 | ||
1586 | print('wait for 5sec for the messages...') | |
1587 | time.sleep(5) | |
1588 | keys = list(bucket.list()) | |
1589 | receiver.verify_s3_events(keys, exact_match=True) | |
1590 | ||
1591 | # delete objects from the bucket | |
1592 | client_threads = [] | |
1593 | start_time = time.time() | |
1594 | for key in bucket.list(): | |
1595 | thr = threading.Thread(target = key.delete, args=()) | |
1596 | thr.start() | |
1597 | client_threads.append(thr) | |
1598 | [thr.join() for thr in client_threads] | |
1599 | ||
1600 | time_diff = time.time() - start_time | |
1601 | print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1602 | ||
1603 | print('wait for 5sec for the messages...') | |
1604 | time.sleep(5) | |
1605 | receiver.verify_s3_events(keys, exact_match=True, deletions=True) | |
1606 | ||
1607 | # cleanup | |
1608 | s3_notification_conf.del_config() | |
1609 | topic_conf1.del_config() | |
1610 | topic_conf2.del_config() | |
1611 | # delete the bucket | |
1612 | master_zone.delete_bucket(bucket_name) | |
1613 | stop_kafka_receiver(receiver, task) | |
1614 | clean_kafka(kafka_proc, zk_proc, kafka_log) | |
1615 | ||
1616 | ||
1617 | def kafka_security(security_type): | |
1618 | """ test pushing kafka s3 notification on master """ | |
1619 | if skip_push_tests: | |
1620 | return SkipTest("PubSub push tests don't run in teuthology") | |
1621 | master_zone, _ = init_env(require_ps=False) | |
1622 | if security_type == 'SSL_SASL' and master_zone.secure_conn is None: | |
1623 | return SkipTest("secure connection is needed to test SASL_SSL security") | |
1624 | kafka_proc, zk_proc, kafka_log = init_kafka() | |
1625 | if kafka_proc is None or zk_proc is None: | |
1626 | return SkipTest('end2end kafka tests require kafka/zookeeper installed') | |
1627 | realm = get_realm() | |
1628 | zonegroup = realm.master_zonegroup() | |
1629 | ||
1630 | # create bucket | |
1631 | bucket_name = gen_bucket_name() | |
1632 | bucket = master_zone.create_bucket(bucket_name) | |
1633 | # name is constant for manual testing | |
1634 | topic_name = bucket_name+'_topic' | |
1635 | # create consumer on the topic | |
1636 | task, receiver = create_kafka_receiver_thread(topic_name) | |
1637 | task.start() | |
1638 | ||
1639 | # create s3 topic | |
1640 | if security_type == 'SSL_SASL': | |
1641 | endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' | |
1642 | else: | |
1643 | # ssl only | |
1644 | endpoint_address = 'kafka://' + kafka_server + ':9093' | |
1645 | ||
1646 | KAFKA_DIR = os.environ['KAFKA_DIR'] | |
1647 | ||
1648 | # without acks from broker, with root CA | |
1649 | endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt' | |
1650 | ||
1651 | if security_type == 'SSL_SASL': | |
1652 | topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) | |
1653 | else: | |
1654 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) | |
1655 | ||
1656 | topic_arn = topic_conf.set_config() | |
1657 | # create s3 notification | |
1658 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1659 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, | |
1660 | 'Events': [] | |
1661 | }] | |
1662 | ||
1663 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) | |
1664 | s3_notification_conf.set_config() | |
1665 | ||
1666 | # create objects in the bucket (async) | |
1667 | number_of_objects = 10 | |
1668 | client_threads = [] | |
1669 | start_time = time.time() | |
1670 | for i in range(number_of_objects): | |
1671 | key = bucket.new_key(str(i)) | |
1672 | content = str(os.urandom(1024*1024)) | |
1673 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1674 | thr.start() | |
1675 | client_threads.append(thr) | |
1676 | [thr.join() for thr in client_threads] | |
1677 | ||
1678 | time_diff = time.time() - start_time | |
1679 | print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1680 | ||
1681 | try: | |
1682 | print('wait for 5sec for the messages...') | |
1683 | time.sleep(5) | |
1684 | keys = list(bucket.list()) | |
1685 | receiver.verify_s3_events(keys, exact_match=True) | |
1686 | ||
1687 | # delete objects from the bucket | |
1688 | client_threads = [] | |
1689 | start_time = time.time() | |
1690 | for key in bucket.list(): | |
1691 | thr = threading.Thread(target = key.delete, args=()) | |
1692 | thr.start() | |
1693 | client_threads.append(thr) | |
1694 | [thr.join() for thr in client_threads] | |
1695 | ||
1696 | time_diff = time.time() - start_time | |
1697 | print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1698 | ||
1699 | print('wait for 5sec for the messages...') | |
1700 | time.sleep(5) | |
1701 | receiver.verify_s3_events(keys, exact_match=True, deletions=True) | |
1702 | except Exception as err: | |
1703 | assert False, str(err) | |
1704 | finally: | |
1705 | # cleanup | |
1706 | s3_notification_conf.del_config() | |
1707 | topic_conf.del_config() | |
1708 | # delete the bucket | |
1709 | for key in bucket.list(): | |
1710 | key.delete() | |
1711 | master_zone.delete_bucket(bucket_name) | |
1712 | stop_kafka_receiver(receiver, task) | |
1713 | clean_kafka(kafka_proc, zk_proc, kafka_log) | |
1714 | ||
1715 | ||
1716 | def test_ps_s3_notification_push_kafka_security_ssl(): | |
1717 | kafka_security('SSL') | |
1718 | ||
1719 | def test_ps_s3_notification_push_kafka_security_ssl_sasl(): | |
1720 | kafka_security('SSL_SASL') | |
1721 | ||
1722 | ||
1723 | def test_ps_s3_notification_multi_delete_on_master(): | |
1724 | """ test deletion of multiple keys on master """ | |
1725 | if skip_push_tests: | |
1726 | return SkipTest("PubSub push tests don't run in teuthology") | |
1727 | hostname = get_ip() | |
1728 | zones, _ = init_env(require_ps=False) | |
1729 | realm = get_realm() | |
1730 | zonegroup = realm.master_zonegroup() | |
1731 | ||
1732 | # create random port for the http server | |
1733 | host = get_ip() | |
1734 | port = random.randint(10000, 20000) | |
1735 | # start an http server in a separate thread | |
1736 | number_of_objects = 10 | |
1737 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
1738 | ||
1739 | # create bucket | |
1740 | bucket_name = gen_bucket_name() | |
1741 | bucket = zones[0].create_bucket(bucket_name) | |
1742 | topic_name = bucket_name + TOPIC_SUFFIX | |
1743 | ||
1744 | # create s3 topic | |
1745 | endpoint_address = 'http://'+host+':'+str(port) | |
1746 | endpoint_args = 'push-endpoint='+endpoint_address | |
1747 | topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) | |
1748 | topic_arn = topic_conf.set_config() | |
1749 | # create s3 notification | |
1750 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1751 | topic_conf_list = [{'Id': notification_name, | |
1752 | 'TopicArn': topic_arn, | |
1753 | 'Events': ['s3:ObjectRemoved:*'] | |
1754 | }] | |
1755 | s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) | |
1756 | response, status = s3_notification_conf.set_config() | |
1757 | assert_equal(status/100, 2) | |
1758 | ||
1759 | # create objects in the bucket | |
1760 | client_threads = [] | |
1761 | for i in range(number_of_objects): | |
1762 | obj_size = randint(1, 1024) | |
1763 | content = str(os.urandom(obj_size)) | |
1764 | key = bucket.new_key(str(i)) | |
1765 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1766 | thr.start() | |
1767 | client_threads.append(thr) | |
1768 | [thr.join() for thr in client_threads] | |
1769 | ||
1770 | keys = list(bucket.list()) | |
1771 | ||
1772 | start_time = time.time() | |
1773 | delete_all_objects(zones[0].conn, bucket_name) | |
1774 | time_diff = time.time() - start_time | |
1775 | print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1776 | ||
1777 | print('wait for 5sec for the messages...') | |
1778 | time.sleep(5) | |
1779 | ||
1780 | # check http receiver | |
1781 | http_server.verify_s3_events(keys, exact_match=True, deletions=True) | |
1782 | ||
1783 | # cleanup | |
1784 | topic_conf.del_config() | |
1785 | s3_notification_conf.del_config(notification=notification_name) | |
1786 | # delete the bucket | |
1787 | zones[0].delete_bucket(bucket_name) | |
1788 | http_server.close() | |
1789 | ||
1790 | ||
1791 | def test_ps_s3_notification_push_http_on_master(): | |
1792 | """ test pushing http s3 notification on master """ | |
1793 | if skip_push_tests: | |
1794 | return SkipTest("PubSub push tests don't run in teuthology") | |
1795 | hostname = get_ip() | |
1796 | master_zone, _ = init_env(require_ps=False) | |
1797 | realm = get_realm() | |
1798 | zonegroup = realm.master_zonegroup() | |
1799 | ||
1800 | # create random port for the http server | |
1801 | host = get_ip() | |
1802 | port = random.randint(10000, 20000) | |
1803 | # start an http server in a separate thread | |
1804 | number_of_objects = 10 | |
1805 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
1806 | ||
1807 | # create bucket | |
1808 | bucket_name = gen_bucket_name() | |
1809 | bucket = master_zone.create_bucket(bucket_name) | |
1810 | topic_name = bucket_name + TOPIC_SUFFIX | |
1811 | ||
1812 | # create s3 topic | |
1813 | endpoint_address = 'http://'+host+':'+str(port) | |
1814 | endpoint_args = 'push-endpoint='+endpoint_address | |
1815 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) | |
1816 | topic_arn = topic_conf.set_config() | |
1817 | # create s3 notification | |
1818 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1819 | topic_conf_list = [{'Id': notification_name, | |
1820 | 'TopicArn': topic_arn, | |
1821 | 'Events': [] | |
1822 | }] | |
1823 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) | |
1824 | response, status = s3_notification_conf.set_config() | |
1825 | assert_equal(status/100, 2) | |
1826 | ||
1827 | # create objects in the bucket | |
1828 | client_threads = [] | |
1829 | start_time = time.time() | |
1830 | content = 'bar' | |
1831 | for i in range(number_of_objects): | |
1832 | key = bucket.new_key(str(i)) | |
1833 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1834 | thr.start() | |
1835 | client_threads.append(thr) | |
1836 | [thr.join() for thr in client_threads] | |
1837 | ||
1838 | time_diff = time.time() - start_time | |
1839 | print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1840 | ||
1841 | print('wait for 5sec for the messages...') | |
1842 | time.sleep(5) | |
1843 | ||
1844 | # check http receiver | |
1845 | keys = list(bucket.list()) | |
1846 | print('total number of objects: ' + str(len(keys))) | |
1847 | http_server.verify_s3_events(keys, exact_match=True) | |
1848 | ||
1849 | # delete objects from the bucket | |
1850 | client_threads = [] | |
1851 | start_time = time.time() | |
1852 | for key in bucket.list(): | |
1853 | thr = threading.Thread(target = key.delete, args=()) | |
1854 | thr.start() | |
1855 | client_threads.append(thr) | |
1856 | [thr.join() for thr in client_threads] | |
1857 | ||
1858 | time_diff = time.time() - start_time | |
1859 | print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1860 | ||
1861 | print('wait for 5sec for the messages...') | |
1862 | time.sleep(5) | |
1863 | ||
1864 | # check http receiver | |
1865 | http_server.verify_s3_events(keys, exact_match=True, deletions=True) | |
1866 | ||
1867 | # cleanup | |
1868 | topic_conf.del_config() | |
1869 | s3_notification_conf.del_config(notification=notification_name) | |
1870 | # delete the bucket | |
1871 | master_zone.delete_bucket(bucket_name) | |
1872 | http_server.close() | |
1873 | ||
1874 | ||
1875 | def test_ps_s3_opaque_data(): | |
1876 | """ test that opaque id set in topic, is sent in notification """ | |
1877 | if skip_push_tests: | |
1878 | return SkipTest("PubSub push tests don't run in teuthology") | |
1879 | hostname = get_ip() | |
1880 | master_zone, ps_zone = init_env() | |
1881 | realm = get_realm() | |
1882 | zonegroup = realm.master_zonegroup() | |
1883 | ||
1884 | # create random port for the http server | |
1885 | host = get_ip() | |
1886 | port = random.randint(10000, 20000) | |
1887 | # start an http server in a separate thread | |
1888 | number_of_objects = 10 | |
1889 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
1890 | ||
1891 | # create bucket | |
1892 | bucket_name = gen_bucket_name() | |
1893 | bucket = master_zone.create_bucket(bucket_name) | |
1894 | topic_name = bucket_name + TOPIC_SUFFIX | |
1895 | # wait for sync | |
1896 | zone_meta_checkpoint(ps_zone.zone) | |
1897 | ||
1898 | # create s3 topic | |
1899 | endpoint_address = 'http://'+host+':'+str(port) | |
1900 | opaque_data = 'http://1.2.3.4:8888' | |
1901 | endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data | |
1902 | topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args) | |
1903 | result, status = topic_conf.set_config() | |
1904 | assert_equal(status/100, 2) | |
1905 | parsed_result = json.loads(result) | |
1906 | topic_arn = parsed_result['arn'] | |
1907 | # create s3 notification | |
1908 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1909 | topic_conf_list = [{'Id': notification_name, | |
1910 | 'TopicArn': topic_arn, | |
1911 | 'Events': [] | |
1912 | }] | |
1913 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
eafe8130 TL |
1914 | response, status = s3_notification_conf.set_config() |
1915 | assert_equal(status/100, 2) | |
1916 | ||
9f95a23c | 1917 | # create objects in the bucket |
eafe8130 | 1918 | client_threads = [] |
9f95a23c | 1919 | content = 'bar' |
eafe8130 TL |
1920 | for i in range(number_of_objects): |
1921 | key = bucket.new_key(str(i)) | |
eafe8130 TL |
1922 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) |
1923 | thr.start() | |
1924 | client_threads.append(thr) | |
1925 | [thr.join() for thr in client_threads] | |
1926 | ||
9f95a23c TL |
1927 | # wait for sync |
1928 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
eafe8130 | 1929 | |
9f95a23c | 1930 | # check http receiver |
eafe8130 | 1931 | keys = list(bucket.list()) |
9f95a23c TL |
1932 | print('total number of objects: ' + str(len(keys))) |
1933 | events = http_server.get_and_reset_events() | |
1934 | for event in events: | |
1935 | assert_equal(event['Records'][0]['opaqueData'], opaque_data) | |
eafe8130 | 1936 | |
eafe8130 | 1937 | # cleanup |
9f95a23c TL |
1938 | for key in keys: |
1939 | key.delete() | |
1940 | [thr.join() for thr in client_threads] | |
1941 | topic_conf.del_config() | |
1942 | s3_notification_conf.del_config(notification=notification_name) | |
eafe8130 | 1943 | # delete the bucket |
9f95a23c TL |
1944 | master_zone.delete_bucket(bucket_name) |
1945 | http_server.close() | |
eafe8130 TL |
1946 | |
1947 | ||
9f95a23c TL |
1948 | def test_ps_s3_opaque_data_on_master(): |
1949 | """ test that opaque id set in topic, is sent in notification on master """ | |
eafe8130 TL |
1950 | if skip_push_tests: |
1951 | return SkipTest("PubSub push tests don't run in teuthology") | |
1952 | hostname = get_ip() | |
9f95a23c | 1953 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
1954 | realm = get_realm() |
1955 | zonegroup = realm.master_zonegroup() | |
1956 | ||
1957 | # create random port for the http server | |
1958 | host = get_ip() | |
1959 | port = random.randint(10000, 20000) | |
1960 | # start an http server in a separate thread | |
9f95a23c | 1961 | number_of_objects = 10 |
eafe8130 TL |
1962 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) |
1963 | ||
1964 | # create bucket | |
1965 | bucket_name = gen_bucket_name() | |
9f95a23c | 1966 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
1967 | topic_name = bucket_name + TOPIC_SUFFIX |
1968 | ||
1969 | # create s3 topic | |
1970 | endpoint_address = 'http://'+host+':'+str(port) | |
1971 | endpoint_args = 'push-endpoint='+endpoint_address | |
9f95a23c TL |
1972 | opaque_data = 'http://1.2.3.4:8888' |
1973 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data) | |
eafe8130 TL |
1974 | topic_arn = topic_conf.set_config() |
1975 | # create s3 notification | |
1976 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1977 | topic_conf_list = [{'Id': notification_name, | |
1978 | 'TopicArn': topic_arn, | |
1979 | 'Events': [] | |
1980 | }] | |
9f95a23c | 1981 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
1982 | response, status = s3_notification_conf.set_config() |
1983 | assert_equal(status/100, 2) | |
1984 | ||
1985 | # create objects in the bucket | |
1986 | client_threads = [] | |
1987 | start_time = time.time() | |
1988 | content = 'bar' | |
1989 | for i in range(number_of_objects): | |
1990 | key = bucket.new_key(str(i)) | |
1991 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1992 | thr.start() | |
1993 | client_threads.append(thr) | |
1994 | [thr.join() for thr in client_threads] | |
1995 | ||
1996 | time_diff = time.time() - start_time | |
9f95a23c | 1997 | print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') |
eafe8130 | 1998 | |
9f95a23c | 1999 | print('wait for 5sec for the messages...') |
eafe8130 TL |
2000 | time.sleep(5) |
2001 | ||
2002 | # check http receiver | |
2003 | keys = list(bucket.list()) | |
9f95a23c TL |
2004 | print('total number of objects: ' + str(len(keys))) |
2005 | events = http_server.get_and_reset_events() | |
2006 | for event in events: | |
2007 | assert_equal(event['Records'][0]['opaqueData'], opaque_data) | |
eafe8130 TL |
2008 | |
2009 | # cleanup | |
9f95a23c TL |
2010 | for key in keys: |
2011 | key.delete() | |
2012 | [thr.join() for thr in client_threads] | |
eafe8130 TL |
2013 | topic_conf.del_config() |
2014 | s3_notification_conf.del_config(notification=notification_name) | |
2015 | # delete the bucket | |
9f95a23c | 2016 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2017 | http_server.close() |
2018 | ||
eafe8130 TL |
2019 | def test_ps_topic(): |
2020 | """ test set/get/delete of topic """ | |
9f95a23c | 2021 | _, ps_zone = init_env() |
eafe8130 TL |
2022 | realm = get_realm() |
2023 | zonegroup = realm.master_zonegroup() | |
2024 | bucket_name = gen_bucket_name() | |
2025 | topic_name = bucket_name+TOPIC_SUFFIX | |
2026 | ||
2027 | # create topic | |
9f95a23c | 2028 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
2029 | _, status = topic_conf.set_config() |
2030 | assert_equal(status/100, 2) | |
2031 | # get topic | |
2032 | result, _ = topic_conf.get_config() | |
2033 | # verify topic content | |
2034 | parsed_result = json.loads(result) | |
2035 | assert_equal(parsed_result['topic']['name'], topic_name) | |
2036 | assert_equal(len(parsed_result['subs']), 0) | |
2037 | assert_equal(parsed_result['topic']['arn'], | |
2038 | 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name) | |
2039 | # delete topic | |
2040 | _, status = topic_conf.del_config() | |
2041 | assert_equal(status/100, 2) | |
2042 | # verift topic is deleted | |
2043 | result, status = topic_conf.get_config() | |
2044 | assert_equal(status, 404) | |
2045 | parsed_result = json.loads(result) | |
2046 | assert_equal(parsed_result['Code'], 'NoSuchKey') | |
2047 | ||
2048 | ||
2049 | def test_ps_topic_with_endpoint(): | |
2050 | """ test set topic with endpoint""" | |
9f95a23c | 2051 | _, ps_zone = init_env() |
eafe8130 TL |
2052 | bucket_name = gen_bucket_name() |
2053 | topic_name = bucket_name+TOPIC_SUFFIX | |
2054 | ||
2055 | # create topic | |
2056 | dest_endpoint = 'amqp://localhost:7001' | |
2057 | dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none' | |
9f95a23c | 2058 | topic_conf = PSTopic(ps_zone.conn, topic_name, |
eafe8130 TL |
2059 | endpoint=dest_endpoint, |
2060 | endpoint_args=dest_args) | |
2061 | _, status = topic_conf.set_config() | |
2062 | assert_equal(status/100, 2) | |
2063 | # get topic | |
2064 | result, _ = topic_conf.get_config() | |
2065 | # verify topic content | |
2066 | parsed_result = json.loads(result) | |
2067 | assert_equal(parsed_result['topic']['name'], topic_name) | |
2068 | assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint) | |
2069 | # cleanup | |
2070 | topic_conf.del_config() | |
2071 | ||
2072 | ||
2073 | def test_ps_notification(): | |
2074 | """ test set/get/delete of notification """ | |
9f95a23c | 2075 | master_zone, ps_zone = init_env() |
eafe8130 TL |
2076 | bucket_name = gen_bucket_name() |
2077 | topic_name = bucket_name+TOPIC_SUFFIX | |
2078 | ||
2079 | # create topic | |
9f95a23c | 2080 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
2081 | topic_conf.set_config() |
2082 | # create bucket on the first of the rados zones | |
9f95a23c | 2083 | master_zone.create_bucket(bucket_name) |
eafe8130 | 2084 | # wait for sync |
9f95a23c | 2085 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 2086 | # create notifications |
9f95a23c | 2087 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2088 | topic_name) |
2089 | _, status = notification_conf.set_config() | |
2090 | assert_equal(status/100, 2) | |
2091 | # get notification | |
2092 | result, _ = notification_conf.get_config() | |
2093 | parsed_result = json.loads(result) | |
2094 | assert_equal(len(parsed_result['topics']), 1) | |
2095 | assert_equal(parsed_result['topics'][0]['topic']['name'], | |
2096 | topic_name) | |
2097 | # delete notification | |
2098 | _, status = notification_conf.del_config() | |
2099 | assert_equal(status/100, 2) | |
2100 | result, status = notification_conf.get_config() | |
2101 | parsed_result = json.loads(result) | |
2102 | assert_equal(len(parsed_result['topics']), 0) | |
2103 | # TODO should return 404 | |
2104 | # assert_equal(status, 404) | |
2105 | ||
2106 | # cleanup | |
2107 | topic_conf.del_config() | |
9f95a23c | 2108 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2109 | |
2110 | ||
2111 | def test_ps_notification_events(): | |
2112 | """ test set/get/delete of notification on specific events""" | |
9f95a23c | 2113 | master_zone, ps_zone = init_env() |
eafe8130 TL |
2114 | bucket_name = gen_bucket_name() |
2115 | topic_name = bucket_name+TOPIC_SUFFIX | |
2116 | ||
2117 | # create topic | |
9f95a23c | 2118 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
2119 | topic_conf.set_config() |
2120 | # create bucket on the first of the rados zones | |
9f95a23c | 2121 | master_zone.create_bucket(bucket_name) |
eafe8130 | 2122 | # wait for sync |
9f95a23c | 2123 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
2124 | # create notifications |
2125 | events = "OBJECT_CREATE,OBJECT_DELETE" | |
9f95a23c | 2126 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2127 | topic_name, |
2128 | events) | |
2129 | _, status = notification_conf.set_config() | |
2130 | assert_equal(status/100, 2) | |
2131 | # get notification | |
2132 | result, _ = notification_conf.get_config() | |
2133 | parsed_result = json.loads(result) | |
2134 | assert_equal(len(parsed_result['topics']), 1) | |
2135 | assert_equal(parsed_result['topics'][0]['topic']['name'], | |
2136 | topic_name) | |
2137 | assert_not_equal(len(parsed_result['topics'][0]['events']), 0) | |
2138 | # TODO add test for invalid event name | |
2139 | ||
2140 | # cleanup | |
2141 | notification_conf.del_config() | |
2142 | topic_conf.del_config() | |
9f95a23c | 2143 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2144 | |
2145 | ||
2146 | def test_ps_subscription(): | |
2147 | """ test set/get/delete of subscription """ | |
9f95a23c | 2148 | master_zone, ps_zone = init_env() |
eafe8130 TL |
2149 | bucket_name = gen_bucket_name() |
2150 | topic_name = bucket_name+TOPIC_SUFFIX | |
2151 | ||
2152 | # create topic | |
9f95a23c | 2153 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
2154 | topic_conf.set_config() |
2155 | # create bucket on the first of the rados zones | |
9f95a23c | 2156 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 2157 | # wait for sync |
9f95a23c | 2158 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 2159 | # create notifications |
9f95a23c | 2160 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2161 | topic_name) |
2162 | _, status = notification_conf.set_config() | |
2163 | assert_equal(status/100, 2) | |
2164 | # create subscription | |
9f95a23c | 2165 | sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, |
eafe8130 TL |
2166 | topic_name) |
2167 | _, status = sub_conf.set_config() | |
2168 | assert_equal(status/100, 2) | |
2169 | # get the subscription | |
2170 | result, _ = sub_conf.get_config() | |
2171 | parsed_result = json.loads(result) | |
2172 | assert_equal(parsed_result['topic'], topic_name) | |
2173 | # create objects in the bucket | |
2174 | number_of_objects = 10 | |
2175 | for i in range(number_of_objects): | |
2176 | key = bucket.new_key(str(i)) | |
2177 | key.set_contents_from_string('bar') | |
2178 | # wait for sync | |
9f95a23c | 2179 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
2180 | |
2181 | # get the create events from the subscription | |
2182 | result, _ = sub_conf.get_events() | |
92f5a8d4 TL |
2183 | events = json.loads(result) |
2184 | for event in events['events']: | |
eafe8130 TL |
2185 | log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') |
2186 | keys = list(bucket.list()) | |
2187 | # TODO: use exact match | |
92f5a8d4 | 2188 | verify_events_by_elements(events, keys, exact_match=False) |
eafe8130 TL |
2189 | # delete objects from the bucket |
2190 | for key in bucket.list(): | |
2191 | key.delete() | |
2192 | # wait for sync | |
9f95a23c | 2193 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
2194 | |
2195 | # get the delete events from the subscriptions | |
9f95a23c TL |
2196 | #result, _ = sub_conf.get_events() |
2197 | #for event in events['events']: | |
2198 | # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') | |
eafe8130 TL |
2199 | # TODO: check deletions |
2200 | # TODO: use exact match | |
92f5a8d4 | 2201 | # verify_events_by_elements(events, keys, exact_match=False, deletions=True) |
eafe8130 TL |
2202 | # we should see the creations as well as the deletions |
2203 | # delete subscription | |
2204 | _, status = sub_conf.del_config() | |
2205 | assert_equal(status/100, 2) | |
2206 | result, status = sub_conf.get_config() | |
2207 | parsed_result = json.loads(result) | |
2208 | assert_equal(parsed_result['topic'], '') | |
2209 | # TODO should return 404 | |
2210 | # assert_equal(status, 404) | |
2211 | ||
2212 | # cleanup | |
2213 | notification_conf.del_config() | |
2214 | topic_conf.del_config() | |
9f95a23c | 2215 | master_zone.delete_bucket(bucket_name) |
eafe8130 | 2216 | |
9f95a23c TL |
2217 | def test_ps_incremental_sync(): |
2218 | """ test that events are only sent on incremental sync """ | |
2219 | master_zone, ps_zone = init_env() | |
2220 | bucket_name = gen_bucket_name() | |
2221 | topic_name = bucket_name+TOPIC_SUFFIX | |
2222 | ||
2223 | # create topic | |
2224 | topic_conf = PSTopic(ps_zone.conn, topic_name) | |
2225 | topic_conf.set_config() | |
2226 | # create bucket on the first of the rados zones | |
2227 | bucket = master_zone.create_bucket(bucket_name) | |
2228 | # create objects in the bucket | |
2229 | number_of_objects = 10 | |
2230 | for i in range(0, number_of_objects): | |
2231 | key = bucket.new_key(str(i)) | |
2232 | key.set_contents_from_string('foo') | |
2233 | # wait for sync | |
2234 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
2235 | # create notifications | |
2236 | notification_conf = PSNotification(ps_zone.conn, bucket_name, | |
2237 | topic_name) | |
2238 | _, status = notification_conf.set_config() | |
2239 | assert_equal(status/100, 2) | |
2240 | # create subscription | |
2241 | sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, | |
2242 | topic_name) | |
2243 | _, status = sub_conf.set_config() | |
2244 | assert_equal(status/100, 2) | |
2245 | ||
2246 | # create more objects in the bucket | |
2247 | for i in range(number_of_objects, 2*number_of_objects): | |
2248 | key = bucket.new_key(str(i)) | |
2249 | key.set_contents_from_string('bar') | |
2250 | # wait for sync | |
2251 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
2252 | ||
2253 | # get the create events from the subscription | |
2254 | result, _ = sub_conf.get_events() | |
2255 | events = json.loads(result) | |
2256 | count = 0 | |
2257 | for event in events['events']: | |
2258 | log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') | |
2259 | count += 1 | |
2260 | ||
2261 | # make sure we have 10 and not 20 events | |
2262 | assert_equal(count, number_of_objects) | |
2263 | ||
2264 | # cleanup | |
2265 | for key in bucket.list(): | |
2266 | key.delete() | |
2267 | sub_conf.del_config() | |
2268 | notification_conf.del_config() | |
2269 | topic_conf.del_config() | |
2270 | master_zone.delete_bucket(bucket_name) | |
eafe8130 TL |
2271 | |
2272 | def test_ps_event_type_subscription(): | |
2273 | """ test subscriptions for different events """ | |
9f95a23c | 2274 | master_zone, ps_zone = init_env() |
eafe8130 TL |
2275 | bucket_name = gen_bucket_name() |
2276 | ||
2277 | # create topic for objects creation | |
2278 | topic_create_name = bucket_name+TOPIC_SUFFIX+'_create' | |
9f95a23c | 2279 | topic_create_conf = PSTopic(ps_zone.conn, topic_create_name) |
eafe8130 TL |
2280 | topic_create_conf.set_config() |
2281 | # create topic for objects deletion | |
2282 | topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete' | |
9f95a23c | 2283 | topic_delete_conf = PSTopic(ps_zone.conn, topic_delete_name) |
eafe8130 TL |
2284 | topic_delete_conf.set_config() |
2285 | # create topic for all events | |
2286 | topic_name = bucket_name+TOPIC_SUFFIX+'_all' | |
9f95a23c | 2287 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
2288 | topic_conf.set_config() |
2289 | # create bucket on the first of the rados zones | |
9f95a23c | 2290 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 2291 | # wait for sync |
9f95a23c TL |
2292 | zone_meta_checkpoint(ps_zone.zone) |
2293 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
eafe8130 | 2294 | # create notifications for objects creation |
9f95a23c | 2295 | notification_create_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2296 | topic_create_name, "OBJECT_CREATE") |
2297 | _, status = notification_create_conf.set_config() | |
2298 | assert_equal(status/100, 2) | |
2299 | # create notifications for objects deletion | |
9f95a23c | 2300 | notification_delete_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2301 | topic_delete_name, "OBJECT_DELETE") |
2302 | _, status = notification_delete_conf.set_config() | |
2303 | assert_equal(status/100, 2) | |
2304 | # create notifications for all events | |
9f95a23c | 2305 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2306 | topic_name, "OBJECT_DELETE,OBJECT_CREATE") |
2307 | _, status = notification_conf.set_config() | |
2308 | assert_equal(status/100, 2) | |
2309 | # create subscription for objects creation | |
9f95a23c | 2310 | sub_create_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_create', |
eafe8130 TL |
2311 | topic_create_name) |
2312 | _, status = sub_create_conf.set_config() | |
2313 | assert_equal(status/100, 2) | |
2314 | # create subscription for objects deletion | |
9f95a23c | 2315 | sub_delete_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_delete', |
eafe8130 TL |
2316 | topic_delete_name) |
2317 | _, status = sub_delete_conf.set_config() | |
2318 | assert_equal(status/100, 2) | |
2319 | # create subscription for all events | |
9f95a23c | 2320 | sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_all', |
eafe8130 TL |
2321 | topic_name) |
2322 | _, status = sub_conf.set_config() | |
2323 | assert_equal(status/100, 2) | |
2324 | # create objects in the bucket | |
2325 | number_of_objects = 10 | |
2326 | for i in range(number_of_objects): | |
2327 | key = bucket.new_key(str(i)) | |
2328 | key.set_contents_from_string('bar') | |
2329 | # wait for sync | |
9f95a23c | 2330 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
11fdf7f2 TL |
2331 | |
2332 | # get the events from the creation subscription | |
2333 | result, _ = sub_create_conf.get_events() | |
92f5a8d4 TL |
2334 | events = json.loads(result) |
2335 | for event in events['events']: | |
eafe8130 TL |
2336 | log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + |
2337 | '" type: "' + str(event['event']) + '"') | |
2338 | keys = list(bucket.list()) | |
2339 | # TODO: use exact match | |
92f5a8d4 | 2340 | verify_events_by_elements(events, keys, exact_match=False) |
eafe8130 TL |
2341 | # get the events from the deletions subscription |
2342 | result, _ = sub_delete_conf.get_events() | |
92f5a8d4 TL |
2343 | events = json.loads(result) |
2344 | for event in events['events']: | |
eafe8130 TL |
2345 | log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + |
2346 | '" type: "' + str(event['event']) + '"') | |
92f5a8d4 | 2347 | assert_equal(len(events['events']), 0) |
eafe8130 TL |
2348 | # get the events from the all events subscription |
2349 | result, _ = sub_conf.get_events() | |
92f5a8d4 TL |
2350 | events = json.loads(result) |
2351 | for event in events['events']: | |
eafe8130 TL |
2352 | log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + |
2353 | str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') | |
2354 | # TODO: use exact match | |
92f5a8d4 | 2355 | verify_events_by_elements(events, keys, exact_match=False) |
eafe8130 TL |
2356 | # delete objects from the bucket |
2357 | for key in bucket.list(): | |
2358 | key.delete() | |
2359 | # wait for sync | |
9f95a23c | 2360 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
2361 | log.debug("Event (OBJECT_DELETE) synced") |
2362 | ||
2363 | # get the events from the creations subscription | |
2364 | result, _ = sub_create_conf.get_events() | |
92f5a8d4 TL |
2365 | events = json.loads(result) |
2366 | for event in events['events']: | |
eafe8130 TL |
2367 | log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + |
2368 | '" type: "' + str(event['event']) + '"') | |
2369 | # deletions should not change the creation events | |
2370 | # TODO: use exact match | |
92f5a8d4 | 2371 | verify_events_by_elements(events, keys, exact_match=False) |
eafe8130 TL |
2372 | # get the events from the deletions subscription |
2373 | result, _ = sub_delete_conf.get_events() | |
92f5a8d4 TL |
2374 | events = json.loads(result) |
2375 | for event in events['events']: | |
eafe8130 TL |
2376 | log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + |
2377 | '" type: "' + str(event['event']) + '"') | |
2378 | # only deletions should be listed here | |
2379 | # TODO: use exact match | |
92f5a8d4 | 2380 | verify_events_by_elements(events, keys, exact_match=False, deletions=True) |
eafe8130 TL |
2381 | # get the events from the all events subscription |
2382 | result, _ = sub_create_conf.get_events() | |
92f5a8d4 TL |
2383 | events = json.loads(result) |
2384 | for event in events['events']: | |
eafe8130 TL |
2385 | log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + |
2386 | '" type: "' + str(event['event']) + '"') | |
2387 | # both deletions and creations should be here | |
2388 | # TODO: use exact match | |
92f5a8d4 TL |
2389 | verify_events_by_elements(events, keys, exact_match=False, deletions=False) |
2390 | # verify_events_by_elements(events, keys, exact_match=False, deletions=True) | |
eafe8130 TL |
2391 | # TODO: (1) test deletions (2) test overall number of events |
2392 | ||
2393 | # test subscription deletion when topic is specified | |
2394 | _, status = sub_create_conf.del_config(topic=True) | |
2395 | assert_equal(status/100, 2) | |
2396 | _, status = sub_delete_conf.del_config(topic=True) | |
2397 | assert_equal(status/100, 2) | |
2398 | _, status = sub_conf.del_config(topic=True) | |
2399 | assert_equal(status/100, 2) | |
2400 | ||
2401 | # cleanup | |
2402 | notification_create_conf.del_config() | |
2403 | notification_delete_conf.del_config() | |
2404 | notification_conf.del_config() | |
2405 | topic_create_conf.del_config() | |
2406 | topic_delete_conf.del_config() | |
2407 | topic_conf.del_config() | |
9f95a23c | 2408 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2409 | |
2410 | ||
2411 | def test_ps_event_fetching(): | |
2412 | """ test incremental fetching of events from a subscription """ | |
9f95a23c | 2413 | master_zone, ps_zone = init_env() |
eafe8130 TL |
2414 | bucket_name = gen_bucket_name() |
2415 | topic_name = bucket_name+TOPIC_SUFFIX | |
2416 | ||
2417 | # create topic | |
9f95a23c | 2418 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
2419 | topic_conf.set_config() |
2420 | # create bucket on the first of the rados zones | |
9f95a23c | 2421 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 2422 | # wait for sync |
9f95a23c | 2423 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 2424 | # create notifications |
9f95a23c | 2425 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2426 | topic_name) |
2427 | _, status = notification_conf.set_config() | |
2428 | assert_equal(status/100, 2) | |
2429 | # create subscription | |
9f95a23c | 2430 | sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, |
eafe8130 TL |
2431 | topic_name) |
2432 | _, status = sub_conf.set_config() | |
2433 | assert_equal(status/100, 2) | |
2434 | # create objects in the bucket | |
2435 | number_of_objects = 100 | |
2436 | for i in range(number_of_objects): | |
2437 | key = bucket.new_key(str(i)) | |
2438 | key.set_contents_from_string('bar') | |
2439 | # wait for sync | |
9f95a23c | 2440 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
2441 | max_events = 15 |
2442 | total_events_count = 0 | |
2443 | next_marker = None | |
2444 | all_events = [] | |
2445 | while True: | |
2446 | # get the events from the subscription | |
2447 | result, _ = sub_conf.get_events(max_events, next_marker) | |
92f5a8d4 TL |
2448 | events = json.loads(result) |
2449 | total_events_count += len(events['events']) | |
2450 | all_events.extend(events['events']) | |
2451 | next_marker = events['next_marker'] | |
2452 | for event in events['events']: | |
eafe8130 TL |
2453 | log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') |
2454 | if next_marker == '': | |
2455 | break | |
2456 | keys = list(bucket.list()) | |
2457 | # TODO: use exact match | |
92f5a8d4 | 2458 | verify_events_by_elements({'events': all_events}, keys, exact_match=False) |
eafe8130 TL |
2459 | |
2460 | # cleanup | |
2461 | sub_conf.del_config() | |
2462 | notification_conf.del_config() | |
2463 | topic_conf.del_config() | |
2464 | for key in bucket.list(): | |
2465 | key.delete() | |
9f95a23c | 2466 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2467 | |
2468 | ||
2469 | def test_ps_event_acking(): | |
2470 | """ test acking of some events in a subscription """ | |
9f95a23c | 2471 | master_zone, ps_zone = init_env() |
eafe8130 TL |
2472 | bucket_name = gen_bucket_name() |
2473 | topic_name = bucket_name+TOPIC_SUFFIX | |
2474 | ||
2475 | # create topic | |
9f95a23c | 2476 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
2477 | topic_conf.set_config() |
2478 | # create bucket on the first of the rados zones | |
9f95a23c | 2479 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 2480 | # wait for sync |
9f95a23c | 2481 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 2482 | # create notifications |
9f95a23c | 2483 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2484 | topic_name) |
2485 | _, status = notification_conf.set_config() | |
2486 | assert_equal(status/100, 2) | |
2487 | # create subscription | |
9f95a23c | 2488 | sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, |
eafe8130 TL |
2489 | topic_name) |
2490 | _, status = sub_conf.set_config() | |
2491 | assert_equal(status/100, 2) | |
2492 | # create objects in the bucket | |
2493 | number_of_objects = 10 | |
2494 | for i in range(number_of_objects): | |
2495 | key = bucket.new_key(str(i)) | |
2496 | key.set_contents_from_string('bar') | |
2497 | # wait for sync | |
9f95a23c | 2498 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
2499 | |
2500 | # get the create events from the subscription | |
2501 | result, _ = sub_conf.get_events() | |
92f5a8d4 | 2502 | events = json.loads(result) |
eafe8130 | 2503 | original_number_of_events = len(events) |
92f5a8d4 | 2504 | for event in events['events']: |
eafe8130 TL |
2505 | log.debug('Event (before ack) id: "' + str(event['id']) + '"') |
2506 | keys = list(bucket.list()) | |
2507 | # TODO: use exact match | |
2508 | verify_events_by_elements(events, keys, exact_match=False) | |
2509 | # ack half of the events | |
2510 | events_to_ack = number_of_objects/2 | |
92f5a8d4 | 2511 | for event in events['events']: |
eafe8130 TL |
2512 | if events_to_ack == 0: |
2513 | break | |
2514 | _, status = sub_conf.ack_events(event['id']) | |
2515 | assert_equal(status/100, 2) | |
2516 | events_to_ack -= 1 | |
2517 | ||
2518 | # verify that acked events are gone | |
2519 | result, _ = sub_conf.get_events() | |
92f5a8d4 TL |
2520 | events = json.loads(result) |
2521 | for event in events['events']: | |
eafe8130 | 2522 | log.debug('Event (after ack) id: "' + str(event['id']) + '"') |
92f5a8d4 | 2523 | assert len(events) >= (original_number_of_events - number_of_objects/2) |
eafe8130 TL |
2524 | |
2525 | # cleanup | |
2526 | sub_conf.del_config() | |
2527 | notification_conf.del_config() | |
2528 | topic_conf.del_config() | |
2529 | for key in bucket.list(): | |
2530 | key.delete() | |
9f95a23c | 2531 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2532 | |
2533 | ||
2534 | def test_ps_creation_triggers(): | |
2535 | """ test object creation notifications in using put/copy/post """ | |
9f95a23c | 2536 | master_zone, ps_zone = init_env() |
eafe8130 TL |
2537 | bucket_name = gen_bucket_name() |
2538 | topic_name = bucket_name+TOPIC_SUFFIX | |
2539 | ||
2540 | # create topic | |
9f95a23c | 2541 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
2542 | topic_conf.set_config() |
2543 | # create bucket on the first of the rados zones | |
9f95a23c | 2544 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 2545 | # wait for sync |
9f95a23c | 2546 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 2547 | # create notifications |
9f95a23c | 2548 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2549 | topic_name) |
2550 | _, status = notification_conf.set_config() | |
2551 | assert_equal(status/100, 2) | |
2552 | # create subscription | |
9f95a23c | 2553 | sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, |
eafe8130 TL |
2554 | topic_name) |
2555 | _, status = sub_conf.set_config() | |
2556 | assert_equal(status/100, 2) | |
2557 | # create objects in the bucket using PUT | |
2558 | key = bucket.new_key('put') | |
2559 | key.set_contents_from_string('bar') | |
2560 | # create objects in the bucket using COPY | |
2561 | bucket.copy_key('copy', bucket.name, key.name) | |
2562 | # create objects in the bucket using multi-part upload | |
2563 | fp = tempfile.TemporaryFile(mode='w') | |
2564 | fp.write('bar') | |
2565 | fp.close() | |
2566 | uploader = bucket.initiate_multipart_upload('multipart') | |
2567 | fp = tempfile.TemporaryFile(mode='r') | |
2568 | uploader.upload_part_from_file(fp, 1) | |
2569 | uploader.complete_upload() | |
2570 | fp.close() | |
2571 | # wait for sync | |
9f95a23c | 2572 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
2573 | |
2574 | # get the create events from the subscription | |
2575 | result, _ = sub_conf.get_events() | |
92f5a8d4 TL |
2576 | events = json.loads(result) |
2577 | for event in events['events']: | |
eafe8130 TL |
2578 | log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') |
2579 | ||
2580 | # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart' | |
92f5a8d4 | 2581 | assert len(events['events']) >= 3 |
eafe8130 TL |
2582 | # cleanup |
2583 | sub_conf.del_config() | |
2584 | notification_conf.del_config() | |
2585 | topic_conf.del_config() | |
2586 | for key in bucket.list(): | |
2587 | key.delete() | |
9f95a23c | 2588 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2589 | |
2590 | ||
2591 | def test_ps_s3_creation_triggers_on_master(): | |
2592 | """ test object creation s3 notifications in using put/copy/post on master""" | |
2593 | if skip_push_tests: | |
2594 | return SkipTest("PubSub push tests don't run in teuthology") | |
2595 | hostname = get_ip() | |
2596 | proc = init_rabbitmq() | |
2597 | if proc is None: | |
2598 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
9f95a23c | 2599 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
2600 | realm = get_realm() |
2601 | zonegroup = realm.master_zonegroup() | |
2602 | ||
2603 | # create bucket | |
2604 | bucket_name = gen_bucket_name() | |
9f95a23c | 2605 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
2606 | topic_name = bucket_name + TOPIC_SUFFIX |
2607 | ||
2608 | # start amqp receiver | |
2609 | exchange = 'ex1' | |
2610 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
2611 | task.start() | |
2612 | ||
2613 | # create s3 topic | |
2614 | endpoint_address = 'amqp://' + hostname | |
2615 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
9f95a23c | 2616 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
2617 | topic_arn = topic_conf.set_config() |
2618 | # create s3 notification | |
2619 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2620 | topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, | |
2621 | 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy'] | |
2622 | }] | |
2623 | ||
9f95a23c | 2624 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
2625 | response, status = s3_notification_conf.set_config() |
2626 | assert_equal(status/100, 2) | |
2627 | ||
2628 | # create objects in the bucket using PUT | |
2629 | key = bucket.new_key('put') | |
2630 | key.set_contents_from_string('bar') | |
2631 | # create objects in the bucket using COPY | |
2632 | bucket.copy_key('copy', bucket.name, key.name) | |
2633 | # create objects in the bucket using multi-part upload | |
2634 | fp = tempfile.TemporaryFile(mode='w') | |
2635 | fp.write('bar') | |
2636 | fp.close() | |
2637 | uploader = bucket.initiate_multipart_upload('multipart') | |
2638 | fp = tempfile.TemporaryFile(mode='r') | |
2639 | uploader.upload_part_from_file(fp, 1) | |
2640 | uploader.complete_upload() | |
2641 | fp.close() | |
2642 | ||
9f95a23c | 2643 | print('wait for 5sec for the messages...') |
eafe8130 TL |
2644 | time.sleep(5) |
2645 | ||
2646 | # check amqp receiver | |
11fdf7f2 | 2647 | keys = list(bucket.list()) |
eafe8130 TL |
2648 | receiver.verify_s3_events(keys, exact_match=True) |
2649 | ||
2650 | # cleanup | |
2651 | stop_amqp_receiver(receiver, task) | |
2652 | s3_notification_conf.del_config() | |
2653 | topic_conf.del_config() | |
2654 | for key in bucket.list(): | |
2655 | key.delete() | |
2656 | # delete the bucket | |
9f95a23c | 2657 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2658 | clean_rabbitmq(proc) |
2659 | ||
2660 | ||
2661 | def test_ps_s3_multipart_on_master(): | |
2662 | """ test multipart object upload on master""" | |
2663 | if skip_push_tests: | |
2664 | return SkipTest("PubSub push tests don't run in teuthology") | |
2665 | hostname = get_ip() | |
2666 | proc = init_rabbitmq() | |
2667 | if proc is None: | |
2668 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
9f95a23c | 2669 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
2670 | realm = get_realm() |
2671 | zonegroup = realm.master_zonegroup() | |
2672 | ||
2673 | # create bucket | |
2674 | bucket_name = gen_bucket_name() | |
9f95a23c | 2675 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
2676 | topic_name = bucket_name + TOPIC_SUFFIX |
2677 | ||
2678 | # start amqp receivers | |
2679 | exchange = 'ex1' | |
2680 | task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1') | |
2681 | task1.start() | |
2682 | task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2') | |
2683 | task2.start() | |
2684 | task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3') | |
2685 | task3.start() | |
2686 | ||
2687 | # create s3 topics | |
2688 | endpoint_address = 'amqp://' + hostname | |
2689 | endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker' | |
9f95a23c | 2690 | topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 | 2691 | topic_arn1 = topic_conf1.set_config() |
9f95a23c | 2692 | topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 | 2693 | topic_arn2 = topic_conf2.set_config() |
9f95a23c | 2694 | topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
2695 | topic_arn3 = topic_conf3.set_config() |
2696 | ||
2697 | # create s3 notifications | |
2698 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2699 | topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, | |
2700 | 'Events': ['s3:ObjectCreated:*'] | |
2701 | }, | |
2702 | {'Id': notification_name+'_2', 'TopicArn': topic_arn2, | |
2703 | 'Events': ['s3:ObjectCreated:Post'] | |
2704 | }, | |
2705 | {'Id': notification_name+'_3', 'TopicArn': topic_arn3, | |
2706 | 'Events': ['s3:ObjectCreated:CompleteMultipartUpload'] | |
2707 | }] | |
9f95a23c | 2708 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
2709 | response, status = s3_notification_conf.set_config() |
2710 | assert_equal(status/100, 2) | |
2711 | ||
2712 | # create objects in the bucket using multi-part upload | |
2713 | fp = tempfile.TemporaryFile(mode='w+b') | |
2714 | content = bytearray(os.urandom(1024*1024)) | |
2715 | fp.write(content) | |
2716 | fp.flush() | |
2717 | fp.seek(0) | |
2718 | uploader = bucket.initiate_multipart_upload('multipart') | |
2719 | uploader.upload_part_from_file(fp, 1) | |
2720 | uploader.complete_upload() | |
2721 | fp.close() | |
2722 | ||
9f95a23c | 2723 | print('wait for 5sec for the messages...') |
eafe8130 TL |
2724 | time.sleep(5) |
2725 | ||
2726 | # check amqp receiver | |
2727 | events = receiver1.get_and_reset_events() | |
2728 | assert_equal(len(events), 3) | |
2729 | ||
2730 | events = receiver2.get_and_reset_events() | |
2731 | assert_equal(len(events), 1) | |
92f5a8d4 TL |
2732 | assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post') |
2733 | assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2') | |
eafe8130 TL |
2734 | |
2735 | events = receiver3.get_and_reset_events() | |
2736 | assert_equal(len(events), 1) | |
92f5a8d4 TL |
2737 | assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload') |
2738 | assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3') | |
eafe8130 TL |
2739 | |
2740 | # cleanup | |
2741 | stop_amqp_receiver(receiver1, task1) | |
2742 | stop_amqp_receiver(receiver2, task2) | |
2743 | stop_amqp_receiver(receiver3, task3) | |
2744 | s3_notification_conf.del_config() | |
2745 | topic_conf1.del_config() | |
2746 | topic_conf2.del_config() | |
2747 | topic_conf3.del_config() | |
2748 | for key in bucket.list(): | |
2749 | key.delete() | |
2750 | # delete the bucket | |
9f95a23c | 2751 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2752 | clean_rabbitmq(proc) |
2753 | ||
2754 | ||
2755 | def test_ps_versioned_deletion(): | |
2756 | """ test notification of deletion markers """ | |
9f95a23c | 2757 | master_zone, ps_zone = init_env() |
eafe8130 TL |
2758 | bucket_name = gen_bucket_name() |
2759 | topic_name = bucket_name+TOPIC_SUFFIX | |
2760 | ||
2761 | # create topics | |
9f95a23c | 2762 | topic_conf1 = PSTopic(ps_zone.conn, topic_name+'_1') |
eafe8130 TL |
2763 | _, status = topic_conf1.set_config() |
2764 | assert_equal(status/100, 2) | |
9f95a23c | 2765 | topic_conf2 = PSTopic(ps_zone.conn, topic_name+'_2') |
eafe8130 TL |
2766 | _, status = topic_conf2.set_config() |
2767 | assert_equal(status/100, 2) | |
2768 | ||
2769 | # create bucket on the first of the rados zones | |
9f95a23c | 2770 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
2771 | bucket.configure_versioning(True) |
2772 | ||
2773 | # wait for sync | |
9f95a23c | 2774 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
2775 | |
2776 | # create notifications | |
2777 | event_type1 = 'OBJECT_DELETE' | |
9f95a23c | 2778 | notification_conf1 = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2779 | topic_name+'_1', |
2780 | event_type1) | |
2781 | _, status = notification_conf1.set_config() | |
2782 | assert_equal(status/100, 2) | |
2783 | event_type2 = 'DELETE_MARKER_CREATE' | |
9f95a23c | 2784 | notification_conf2 = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
2785 | topic_name+'_2', |
2786 | event_type2) | |
2787 | _, status = notification_conf2.set_config() | |
2788 | assert_equal(status/100, 2) | |
2789 | ||
2790 | # create subscriptions | |
9f95a23c | 2791 | sub_conf1 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_1', |
eafe8130 TL |
2792 | topic_name+'_1') |
2793 | _, status = sub_conf1.set_config() | |
2794 | assert_equal(status/100, 2) | |
9f95a23c | 2795 | sub_conf2 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_2', |
eafe8130 TL |
2796 | topic_name+'_2') |
2797 | _, status = sub_conf2.set_config() | |
2798 | assert_equal(status/100, 2) | |
2799 | ||
2800 | # create objects in the bucket | |
2801 | key = bucket.new_key('foo') | |
2802 | key.set_contents_from_string('bar') | |
2803 | v1 = key.version_id | |
2804 | key.set_contents_from_string('kaboom') | |
2805 | v2 = key.version_id | |
2806 | # create deletion marker | |
2807 | delete_marker_key = bucket.delete_key(key.name) | |
2808 | ||
2809 | # wait for sync | |
9f95a23c | 2810 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
2811 | |
2812 | # delete the deletion marker | |
2813 | delete_marker_key.delete() | |
2814 | # delete versions | |
2815 | bucket.delete_key(key.name, version_id=v2) | |
2816 | bucket.delete_key(key.name, version_id=v1) | |
2817 | ||
2818 | # wait for sync | |
9f95a23c | 2819 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
2820 | |
2821 | # get the delete events from the subscription | |
2822 | result, _ = sub_conf1.get_events() | |
92f5a8d4 TL |
2823 | events = json.loads(result) |
2824 | for event in events['events']: | |
eafe8130 TL |
2825 | log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') |
2826 | assert_equal(str(event['event']), event_type1) | |
2827 | ||
2828 | result, _ = sub_conf2.get_events() | |
92f5a8d4 TL |
2829 | events = json.loads(result) |
2830 | for event in events['events']: | |
eafe8130 TL |
2831 | log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') |
2832 | assert_equal(str(event['event']), event_type2) | |
2833 | ||
2834 | # cleanup | |
2835 | # follwing is needed for the cleanup in the case of 3-zones | |
2836 | # see: http://tracker.ceph.com/issues/39142 | |
2837 | realm = get_realm() | |
2838 | zonegroup = realm.master_zonegroup() | |
2839 | zonegroup_conns = ZonegroupConns(zonegroup) | |
2840 | try: | |
2841 | zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) | |
9f95a23c | 2842 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
2843 | except: |
2844 | log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket') | |
2845 | sub_conf1.del_config() | |
2846 | sub_conf2.del_config() | |
2847 | notification_conf1.del_config() | |
2848 | notification_conf2.del_config() | |
2849 | topic_conf1.del_config() | |
2850 | topic_conf2.del_config() | |
2851 | ||
2852 | ||
2853 | def test_ps_s3_metadata_on_master(): | |
2854 | """ test s3 notification of metadata on master """ | |
2855 | if skip_push_tests: | |
2856 | return SkipTest("PubSub push tests don't run in teuthology") | |
2857 | hostname = get_ip() | |
2858 | proc = init_rabbitmq() | |
2859 | if proc is None: | |
2860 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
9f95a23c | 2861 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
2862 | realm = get_realm() |
2863 | zonegroup = realm.master_zonegroup() | |
2864 | ||
2865 | # create bucket | |
2866 | bucket_name = gen_bucket_name() | |
9f95a23c | 2867 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
2868 | topic_name = bucket_name + TOPIC_SUFFIX |
2869 | ||
2870 | # start amqp receiver | |
2871 | exchange = 'ex1' | |
2872 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
2873 | task.start() | |
2874 | ||
2875 | # create s3 topic | |
2876 | endpoint_address = 'amqp://' + hostname | |
2877 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
9f95a23c | 2878 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
2879 | topic_arn = topic_conf.set_config() |
2880 | # create s3 notification | |
2881 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
9f95a23c TL |
2882 | meta_key = 'meta1' |
2883 | meta_value = 'This is my metadata value' | |
2884 | meta_prefix = 'x-amz-meta-' | |
eafe8130 | 2885 | topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, |
9f95a23c TL |
2886 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], |
2887 | 'Filter': { | |
2888 | 'Metadata': { | |
2889 | 'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}] | |
2890 | } | |
2891 | } | |
2892 | }] | |
2893 | ||
2894 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) | |
eafe8130 TL |
2895 | response, status = s3_notification_conf.set_config() |
2896 | assert_equal(status/100, 2) | |
2897 | ||
2898 | # create objects in the bucket | |
9f95a23c TL |
2899 | key_name = 'foo' |
2900 | key = bucket.new_key(key_name) | |
2901 | key.set_metadata(meta_key, meta_value) | |
eafe8130 | 2902 | key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') |
9f95a23c TL |
2903 | |
2904 | # create objects in the bucket using COPY | |
2905 | bucket.copy_key('copy_of_foo', bucket.name, key.name) | |
2906 | # create objects in the bucket using multi-part upload | |
2907 | fp = tempfile.TemporaryFile(mode='w') | |
2908 | fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') | |
2909 | fp.close() | |
2910 | uploader = bucket.initiate_multipart_upload('multipart_foo', | |
2911 | metadata={meta_key: meta_value}) | |
2912 | fp = tempfile.TemporaryFile(mode='r') | |
2913 | uploader.upload_part_from_file(fp, 1) | |
2914 | uploader.complete_upload() | |
2915 | fp.close() | |
2916 | print('wait for 5sec for the messages...') | |
eafe8130 TL |
2917 | time.sleep(5) |
2918 | # check amqp receiver | |
9f95a23c TL |
2919 | event_count = 0 |
2920 | for event in receiver.get_and_reset_events(): | |
2921 | s3_event = event['Records'][0]['s3'] | |
2922 | assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) | |
2923 | assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) | |
2924 | event_count +=1 | |
2925 | ||
2926 | # only PUT and POST has the metadata value | |
2927 | assert_equal(event_count, 2) | |
2928 | ||
2929 | # delete objects | |
2930 | for key in bucket.list(): | |
2931 | key.delete() | |
2932 | print('wait for 5sec for the messages...') | |
2933 | time.sleep(5) | |
2934 | # check amqp receiver | |
2935 | event_count = 0 | |
2936 | for event in receiver.get_and_reset_events(): | |
2937 | s3_event = event['Records'][0]['s3'] | |
2938 | assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) | |
2939 | assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) | |
2940 | event_count +=1 | |
2941 | ||
2942 | # all 3 object has metadata when deleted | |
2943 | assert_equal(event_count, 3) | |
eafe8130 TL |
2944 | |
2945 | # cleanup | |
2946 | stop_amqp_receiver(receiver, task) | |
2947 | s3_notification_conf.del_config() | |
2948 | topic_conf.del_config() | |
9f95a23c TL |
2949 | # delete the bucket |
2950 | master_zone.delete_bucket(bucket_name) | |
2951 | clean_rabbitmq(proc) | |
2952 | ||
2953 | ||
2954 | def test_ps_s3_tags_on_master(): | |
2955 | """ test s3 notification of tags on master """ | |
2956 | if skip_push_tests: | |
2957 | return SkipTest("PubSub push tests don't run in teuthology") | |
2958 | hostname = get_ip() | |
2959 | proc = init_rabbitmq() | |
2960 | if proc is None: | |
2961 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
2962 | master_zone, _ = init_env(require_ps=False) | |
2963 | realm = get_realm() | |
2964 | zonegroup = realm.master_zonegroup() | |
2965 | ||
2966 | # create bucket | |
2967 | bucket_name = gen_bucket_name() | |
2968 | bucket = master_zone.create_bucket(bucket_name) | |
2969 | topic_name = bucket_name + TOPIC_SUFFIX | |
2970 | ||
2971 | # start amqp receiver | |
2972 | exchange = 'ex1' | |
2973 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
2974 | task.start() | |
2975 | ||
2976 | # create s3 topic | |
2977 | endpoint_address = 'amqp://' + hostname | |
2978 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
2979 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) | |
2980 | topic_arn = topic_conf.set_config() | |
2981 | # create s3 notification | |
2982 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2983 | topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, | |
2984 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], | |
2985 | 'Filter': { | |
2986 | 'Tags': { | |
2987 | 'FilterRules': [{'Name': 'hello', 'Value': 'world'}] | |
2988 | } | |
2989 | } | |
2990 | }] | |
2991 | ||
2992 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) | |
2993 | response, status = s3_notification_conf.set_config() | |
2994 | assert_equal(status/100, 2) | |
2995 | ||
2996 | # create objects in the bucket with tags | |
2997 | tags = 'hello=world&ka=boom' | |
2998 | key_name1 = 'key1' | |
2999 | put_object_tagging(master_zone.conn, bucket_name, key_name1, tags) | |
3000 | tags = 'foo=bar&ka=boom' | |
3001 | key_name2 = 'key2' | |
3002 | put_object_tagging(master_zone.conn, bucket_name, key_name2, tags) | |
3003 | key_name3 = 'key3' | |
3004 | key = bucket.new_key(key_name3) | |
3005 | key.set_contents_from_string('bar') | |
3006 | # create objects in the bucket using COPY | |
3007 | bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1) | |
3008 | print('wait for 5sec for the messages...') | |
3009 | time.sleep(5) | |
3010 | expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}] | |
3011 | # check amqp receiver | |
3012 | for event in receiver.get_and_reset_events(): | |
3013 | obj_tags = event['Records'][0]['s3']['object']['tags'] | |
3014 | assert_equal(obj_tags[0], expected_tags[0]) | |
3015 | ||
3016 | # delete the objects | |
eafe8130 TL |
3017 | for key in bucket.list(): |
3018 | key.delete() | |
9f95a23c TL |
3019 | print('wait for 5sec for the messages...') |
3020 | time.sleep(5) | |
3021 | # check amqp receiver | |
3022 | for event in receiver.get_and_reset_events(): | |
3023 | obj_tags = event['Records'][0]['s3']['object']['tags'] | |
3024 | assert_equal(obj_tags[0], expected_tags[0]) | |
3025 | ||
3026 | # cleanup | |
3027 | stop_amqp_receiver(receiver, task) | |
3028 | s3_notification_conf.del_config() | |
3029 | topic_conf.del_config() | |
eafe8130 | 3030 | # delete the bucket |
9f95a23c | 3031 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
3032 | clean_rabbitmq(proc) |
3033 | ||
3034 | ||
3035 | def test_ps_s3_versioned_deletion_on_master(): | |
3036 | """ test s3 notification of deletion markers on master """ | |
3037 | if skip_push_tests: | |
3038 | return SkipTest("PubSub push tests don't run in teuthology") | |
3039 | hostname = get_ip() | |
3040 | proc = init_rabbitmq() | |
3041 | if proc is None: | |
3042 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
9f95a23c | 3043 | master_zone, _ = init_env(require_ps=False) |
eafe8130 TL |
3044 | realm = get_realm() |
3045 | zonegroup = realm.master_zonegroup() | |
3046 | ||
3047 | # create bucket | |
3048 | bucket_name = gen_bucket_name() | |
9f95a23c | 3049 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 TL |
3050 | bucket.configure_versioning(True) |
3051 | topic_name = bucket_name + TOPIC_SUFFIX | |
3052 | ||
3053 | # start amqp receiver | |
3054 | exchange = 'ex1' | |
3055 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
3056 | task.start() | |
3057 | ||
3058 | # create s3 topic | |
3059 | endpoint_address = 'amqp://' + hostname | |
3060 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
9f95a23c | 3061 | topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) |
eafe8130 TL |
3062 | topic_arn = topic_conf.set_config() |
3063 | # create s3 notification | |
3064 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3065 | # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code | |
3066 | topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, | |
3067 | 'Events': ['s3:ObjectRemoved:*'] | |
3068 | }, | |
3069 | {'Id': notification_name+'_2', 'TopicArn': topic_arn, | |
3070 | 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated'] | |
3071 | }, | |
3072 | {'Id': notification_name+'_3', 'TopicArn': topic_arn, | |
3073 | 'Events': ['s3:ObjectRemoved:Delete'] | |
3074 | }] | |
9f95a23c | 3075 | s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
3076 | response, status = s3_notification_conf.set_config() |
3077 | assert_equal(status/100, 2) | |
3078 | ||
3079 | # create objects in the bucket | |
3080 | key = bucket.new_key('foo') | |
3081 | key.set_contents_from_string('bar') | |
3082 | v1 = key.version_id | |
3083 | key.set_contents_from_string('kaboom') | |
3084 | v2 = key.version_id | |
3085 | # create delete marker (non versioned deletion) | |
3086 | delete_marker_key = bucket.delete_key(key.name) | |
3087 | ||
3088 | time.sleep(1) | |
3089 | ||
3090 | # versioned deletion | |
3091 | bucket.delete_key(key.name, version_id=v2) | |
3092 | bucket.delete_key(key.name, version_id=v1) | |
3093 | delete_marker_key.delete() | |
3094 | ||
9f95a23c | 3095 | print('wait for 5sec for the messages...') |
eafe8130 TL |
3096 | time.sleep(5) |
3097 | ||
3098 | # check amqp receiver | |
3099 | events = receiver.get_and_reset_events() | |
3100 | delete_events = 0 | |
3101 | delete_marker_create_events = 0 | |
92f5a8d4 TL |
3102 | for event_list in events: |
3103 | for event in event_list['Records']: | |
3104 | if event['eventName'] == 's3:ObjectRemoved:Delete': | |
3105 | delete_events += 1 | |
3106 | assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3'] | |
3107 | if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated': | |
3108 | delete_marker_create_events += 1 | |
3109 | assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2'] | |
eafe8130 TL |
3110 | |
3111 | # 3 key versions were deleted (v1, v2 and the deletion marker) | |
3112 | # notified over the same topic via 2 notifications (1,3) | |
3113 | assert_equal(delete_events, 3*2) | |
3114 | # 1 deletion marker was created | |
3115 | # notified over the same topic over 2 notifications (1,2) | |
3116 | assert_equal(delete_marker_create_events, 1*2) | |
3117 | ||
3118 | # cleanup | |
3119 | stop_amqp_receiver(receiver, task) | |
3120 | s3_notification_conf.del_config() | |
3121 | topic_conf.del_config() | |
3122 | # delete the bucket | |
9f95a23c | 3123 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
3124 | clean_rabbitmq(proc) |
3125 | ||
3126 | ||
3127 | def test_ps_push_http(): | |
3128 | """ test pushing to http endpoint """ | |
3129 | if skip_push_tests: | |
3130 | return SkipTest("PubSub push tests don't run in teuthology") | |
9f95a23c | 3131 | master_zone, ps_zone = init_env() |
eafe8130 TL |
3132 | bucket_name = gen_bucket_name() |
3133 | topic_name = bucket_name+TOPIC_SUFFIX | |
3134 | ||
3135 | # create random port for the http server | |
3136 | host = get_ip() | |
3137 | port = random.randint(10000, 20000) | |
3138 | # start an http server in a separate thread | |
3139 | http_server = StreamingHTTPServer(host, port) | |
3140 | ||
3141 | # create topic | |
9f95a23c | 3142 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
3143 | _, status = topic_conf.set_config() |
3144 | assert_equal(status/100, 2) | |
3145 | # create bucket on the first of the rados zones | |
9f95a23c | 3146 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 3147 | # wait for sync |
9f95a23c | 3148 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 3149 | # create notifications |
9f95a23c | 3150 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
3151 | topic_name) |
3152 | _, status = notification_conf.set_config() | |
3153 | assert_equal(status/100, 2) | |
3154 | # create subscription | |
9f95a23c | 3155 | sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, |
eafe8130 TL |
3156 | topic_name, endpoint='http://'+host+':'+str(port)) |
3157 | _, status = sub_conf.set_config() | |
3158 | assert_equal(status/100, 2) | |
3159 | # create objects in the bucket | |
3160 | number_of_objects = 10 | |
3161 | for i in range(number_of_objects): | |
3162 | key = bucket.new_key(str(i)) | |
3163 | key.set_contents_from_string('bar') | |
3164 | # wait for sync | |
9f95a23c | 3165 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
3166 | # check http server |
3167 | keys = list(bucket.list()) | |
3168 | # TODO: use exact match | |
3169 | http_server.verify_events(keys, exact_match=False) | |
3170 | ||
11fdf7f2 TL |
3171 | # delete objects from the bucket |
3172 | for key in bucket.list(): | |
3173 | key.delete() | |
3174 | # wait for sync | |
9f95a23c TL |
3175 | zone_meta_checkpoint(ps_zone.zone) |
3176 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
eafe8130 TL |
3177 | # check http server |
3178 | # TODO: use exact match | |
3179 | http_server.verify_events(keys, deletions=True, exact_match=False) | |
3180 | ||
3181 | # cleanup | |
3182 | sub_conf.del_config() | |
3183 | notification_conf.del_config() | |
3184 | topic_conf.del_config() | |
9f95a23c | 3185 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
3186 | http_server.close() |
3187 | ||
3188 | ||
3189 | def test_ps_s3_push_http(): | |
3190 | """ test pushing to http endpoint s3 record format""" | |
3191 | if skip_push_tests: | |
3192 | return SkipTest("PubSub push tests don't run in teuthology") | |
9f95a23c | 3193 | master_zone, ps_zone = init_env() |
eafe8130 TL |
3194 | bucket_name = gen_bucket_name() |
3195 | topic_name = bucket_name+TOPIC_SUFFIX | |
3196 | ||
3197 | # create random port for the http server | |
3198 | host = get_ip() | |
3199 | port = random.randint(10000, 20000) | |
3200 | # start an http server in a separate thread | |
3201 | http_server = StreamingHTTPServer(host, port) | |
3202 | ||
3203 | # create topic | |
9f95a23c | 3204 | topic_conf = PSTopic(ps_zone.conn, topic_name, |
eafe8130 TL |
3205 | endpoint='http://'+host+':'+str(port)) |
3206 | result, status = topic_conf.set_config() | |
3207 | assert_equal(status/100, 2) | |
11fdf7f2 | 3208 | parsed_result = json.loads(result) |
eafe8130 TL |
3209 | topic_arn = parsed_result['arn'] |
3210 | # create bucket on the first of the rados zones | |
9f95a23c | 3211 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 3212 | # wait for sync |
9f95a23c | 3213 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
3214 | # create s3 notification |
3215 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3216 | topic_conf_list = [{'Id': notification_name, | |
3217 | 'TopicArn': topic_arn, | |
3218 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] | |
3219 | }] | |
9f95a23c | 3220 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
3221 | _, status = s3_notification_conf.set_config() |
3222 | assert_equal(status/100, 2) | |
3223 | # create objects in the bucket | |
3224 | number_of_objects = 10 | |
3225 | for i in range(number_of_objects): | |
3226 | key = bucket.new_key(str(i)) | |
3227 | key.set_contents_from_string('bar') | |
3228 | # wait for sync | |
9f95a23c | 3229 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
3230 | # check http server |
3231 | keys = list(bucket.list()) | |
3232 | # TODO: use exact match | |
3233 | http_server.verify_s3_events(keys, exact_match=False) | |
3234 | ||
3235 | # delete objects from the bucket | |
3236 | for key in bucket.list(): | |
3237 | key.delete() | |
3238 | # wait for sync | |
9f95a23c TL |
3239 | zone_meta_checkpoint(ps_zone.zone) |
3240 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
eafe8130 TL |
3241 | # check http server |
3242 | # TODO: use exact match | |
3243 | http_server.verify_s3_events(keys, deletions=True, exact_match=False) | |
3244 | ||
3245 | # cleanup | |
3246 | s3_notification_conf.del_config() | |
3247 | topic_conf.del_config() | |
9f95a23c | 3248 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
3249 | http_server.close() |
3250 | ||
3251 | ||
3252 | def test_ps_push_amqp(): | |
3253 | """ test pushing to amqp endpoint """ | |
3254 | if skip_push_tests: | |
3255 | return SkipTest("PubSub push tests don't run in teuthology") | |
3256 | hostname = get_ip() | |
3257 | proc = init_rabbitmq() | |
3258 | if proc is None: | |
3259 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
9f95a23c | 3260 | master_zone, ps_zone = init_env() |
eafe8130 TL |
3261 | bucket_name = gen_bucket_name() |
3262 | topic_name = bucket_name+TOPIC_SUFFIX | |
3263 | ||
3264 | # create topic | |
3265 | exchange = 'ex1' | |
3266 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
3267 | task.start() | |
9f95a23c | 3268 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
3269 | _, status = topic_conf.set_config() |
3270 | assert_equal(status/100, 2) | |
3271 | # create bucket on the first of the rados zones | |
9f95a23c | 3272 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 3273 | # wait for sync |
9f95a23c | 3274 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 | 3275 | # create notifications |
9f95a23c | 3276 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
eafe8130 TL |
3277 | topic_name) |
3278 | _, status = notification_conf.set_config() | |
3279 | assert_equal(status/100, 2) | |
3280 | # create subscription | |
9f95a23c | 3281 | sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, |
eafe8130 TL |
3282 | topic_name, endpoint='amqp://'+hostname, |
3283 | endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker') | |
3284 | _, status = sub_conf.set_config() | |
3285 | assert_equal(status/100, 2) | |
3286 | # create objects in the bucket | |
3287 | number_of_objects = 10 | |
3288 | for i in range(number_of_objects): | |
3289 | key = bucket.new_key(str(i)) | |
3290 | key.set_contents_from_string('bar') | |
3291 | # wait for sync | |
9f95a23c | 3292 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
3293 | # check amqp receiver |
3294 | keys = list(bucket.list()) | |
3295 | # TODO: use exact match | |
3296 | receiver.verify_events(keys, exact_match=False) | |
3297 | ||
3298 | # delete objects from the bucket | |
3299 | for key in bucket.list(): | |
3300 | key.delete() | |
3301 | # wait for sync | |
9f95a23c TL |
3302 | zone_meta_checkpoint(ps_zone.zone) |
3303 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
eafe8130 TL |
3304 | # check amqp receiver |
3305 | # TODO: use exact match | |
3306 | receiver.verify_events(keys, deletions=True, exact_match=False) | |
11fdf7f2 TL |
3307 | |
3308 | # cleanup | |
eafe8130 | 3309 | stop_amqp_receiver(receiver, task) |
11fdf7f2 | 3310 | sub_conf.del_config() |
11fdf7f2 | 3311 | notification_conf.del_config() |
11fdf7f2 | 3312 | topic_conf.del_config() |
9f95a23c | 3313 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
3314 | clean_rabbitmq(proc) |
3315 | ||
3316 | ||
3317 | def test_ps_s3_push_amqp(): | |
3318 | """ test pushing to amqp endpoint s3 record format""" | |
3319 | if skip_push_tests: | |
3320 | return SkipTest("PubSub push tests don't run in teuthology") | |
3321 | hostname = get_ip() | |
3322 | proc = init_rabbitmq() | |
3323 | if proc is None: | |
3324 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
9f95a23c | 3325 | master_zone, ps_zone = init_env() |
eafe8130 TL |
3326 | bucket_name = gen_bucket_name() |
3327 | topic_name = bucket_name+TOPIC_SUFFIX | |
3328 | ||
3329 | # create topic | |
3330 | exchange = 'ex1' | |
3331 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
3332 | task.start() | |
9f95a23c | 3333 | topic_conf = PSTopic(ps_zone.conn, topic_name, |
eafe8130 TL |
3334 | endpoint='amqp://' + hostname, |
3335 | endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3336 | result, status = topic_conf.set_config() | |
3337 | assert_equal(status/100, 2) | |
3338 | parsed_result = json.loads(result) | |
3339 | topic_arn = parsed_result['arn'] | |
3340 | # create bucket on the first of the rados zones | |
9f95a23c | 3341 | bucket = master_zone.create_bucket(bucket_name) |
eafe8130 | 3342 | # wait for sync |
9f95a23c | 3343 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
3344 | # create s3 notification |
3345 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3346 | topic_conf_list = [{'Id': notification_name, | |
3347 | 'TopicArn': topic_arn, | |
3348 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] | |
3349 | }] | |
9f95a23c | 3350 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
3351 | _, status = s3_notification_conf.set_config() |
3352 | assert_equal(status/100, 2) | |
3353 | # create objects in the bucket | |
3354 | number_of_objects = 10 | |
3355 | for i in range(number_of_objects): | |
3356 | key = bucket.new_key(str(i)) | |
3357 | key.set_contents_from_string('bar') | |
3358 | # wait for sync | |
9f95a23c | 3359 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
3360 | # check amqp receiver |
3361 | keys = list(bucket.list()) | |
3362 | # TODO: use exact match | |
3363 | receiver.verify_s3_events(keys, exact_match=False) | |
3364 | ||
3365 | # delete objects from the bucket | |
3366 | for key in bucket.list(): | |
3367 | key.delete() | |
3368 | # wait for sync | |
9f95a23c TL |
3369 | zone_meta_checkpoint(ps_zone.zone) |
3370 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
eafe8130 TL |
3371 | # check amqp receiver |
3372 | # TODO: use exact match | |
3373 | receiver.verify_s3_events(keys, deletions=True, exact_match=False) | |
3374 | ||
3375 | # cleanup | |
3376 | stop_amqp_receiver(receiver, task) | |
3377 | s3_notification_conf.del_config() | |
3378 | topic_conf.del_config() | |
9f95a23c | 3379 | master_zone.delete_bucket(bucket_name) |
eafe8130 | 3380 | clean_rabbitmq(proc) |
11fdf7f2 TL |
3381 | |
3382 | ||
eafe8130 TL |
3383 | def test_ps_delete_bucket(): |
3384 | """ test notification status upon bucket deletion """ | |
9f95a23c | 3385 | master_zone, ps_zone = init_env() |
11fdf7f2 | 3386 | bucket_name = gen_bucket_name() |
11fdf7f2 | 3387 | # create bucket on the first of the rados zones |
9f95a23c | 3388 | bucket = master_zone.create_bucket(bucket_name) |
11fdf7f2 | 3389 | # wait for sync |
9f95a23c | 3390 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
3391 | topic_name = bucket_name + TOPIC_SUFFIX |
3392 | # create topic | |
3393 | topic_name = bucket_name + TOPIC_SUFFIX | |
9f95a23c | 3394 | topic_conf = PSTopic(ps_zone.conn, topic_name) |
eafe8130 TL |
3395 | response, status = topic_conf.set_config() |
3396 | assert_equal(status/100, 2) | |
3397 | parsed_result = json.loads(response) | |
3398 | topic_arn = parsed_result['arn'] | |
3399 | # create one s3 notification | |
3400 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3401 | topic_conf_list = [{'Id': notification_name, | |
3402 | 'TopicArn': topic_arn, | |
3403 | 'Events': ['s3:ObjectCreated:*'] | |
3404 | }] | |
9f95a23c | 3405 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
3406 | response, status = s3_notification_conf.set_config() |
3407 | assert_equal(status/100, 2) | |
3408 | ||
3409 | # create non-s3 notification | |
9f95a23c | 3410 | notification_conf = PSNotification(ps_zone.conn, bucket_name, |
11fdf7f2 TL |
3411 | topic_name) |
3412 | _, status = notification_conf.set_config() | |
3413 | assert_equal(status/100, 2) | |
eafe8130 | 3414 | |
11fdf7f2 | 3415 | # create objects in the bucket |
eafe8130 | 3416 | number_of_objects = 10 |
11fdf7f2 TL |
3417 | for i in range(number_of_objects): |
3418 | key = bucket.new_key(str(i)) | |
3419 | key.set_contents_from_string('bar') | |
eafe8130 | 3420 | # wait for bucket sync |
9f95a23c | 3421 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
11fdf7f2 | 3422 | keys = list(bucket.list()) |
eafe8130 TL |
3423 | # delete objects from the bucket |
3424 | for key in bucket.list(): | |
3425 | key.delete() | |
3426 | # wait for bucket sync | |
9f95a23c | 3427 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 | 3428 | # delete the bucket |
9f95a23c | 3429 | master_zone.delete_bucket(bucket_name) |
eafe8130 | 3430 | # wait for meta sync |
9f95a23c | 3431 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
3432 | |
3433 | # get the events from the auto-generated subscription | |
9f95a23c | 3434 | sub_conf = PSSubscription(ps_zone.conn, notification_name, |
eafe8130 TL |
3435 | topic_name) |
3436 | result, _ = sub_conf.get_events() | |
92f5a8d4 | 3437 | records = json.loads(result) |
eafe8130 | 3438 | # TODO: use exact match |
92f5a8d4 | 3439 | verify_s3_records_by_elements(records, keys, exact_match=False) |
11fdf7f2 | 3440 | |
eafe8130 TL |
3441 | # s3 notification is deleted with bucket |
3442 | _, status = s3_notification_conf.get_config(notification=notification_name) | |
3443 | assert_equal(status, 404) | |
3444 | # non-s3 notification is deleted with bucket | |
3445 | _, status = notification_conf.get_config() | |
3446 | assert_equal(status, 404) | |
11fdf7f2 TL |
3447 | # cleanup |
3448 | sub_conf.del_config() | |
11fdf7f2 | 3449 | topic_conf.del_config() |
eafe8130 TL |
3450 | |
3451 | ||
3452 | def test_ps_missing_topic(): | |
3453 | """ test creating a subscription when no topic info exists""" | |
9f95a23c | 3454 | master_zone, ps_zone = init_env() |
eafe8130 TL |
3455 | bucket_name = gen_bucket_name() |
3456 | topic_name = bucket_name+TOPIC_SUFFIX | |
3457 | ||
3458 | # create bucket on the first of the rados zones | |
9f95a23c | 3459 | master_zone.create_bucket(bucket_name) |
eafe8130 | 3460 | # wait for sync |
9f95a23c | 3461 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
3462 | # create s3 notification |
3463 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3464 | topic_arn = 'arn:aws:sns:::' + topic_name | |
3465 | topic_conf_list = [{'Id': notification_name, | |
3466 | 'TopicArn': topic_arn, | |
3467 | 'Events': ['s3:ObjectCreated:*'] | |
3468 | }] | |
9f95a23c | 3469 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
3470 | try: |
3471 | s3_notification_conf.set_config() | |
3472 | except: | |
3473 | log.info('missing topic is expected') | |
3474 | else: | |
3475 | assert 'missing topic is expected' | |
3476 | ||
3477 | # cleanup | |
9f95a23c | 3478 | master_zone.delete_bucket(bucket_name) |
11fdf7f2 TL |
3479 | |
3480 | ||
eafe8130 TL |
3481 | def test_ps_s3_topic_update(): |
3482 | """ test updating topic associated with a notification""" | |
3483 | if skip_push_tests: | |
3484 | return SkipTest("PubSub push tests don't run in teuthology") | |
3485 | rabbit_proc = init_rabbitmq() | |
3486 | if rabbit_proc is None: | |
3487 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
9f95a23c | 3488 | master_zone, ps_zone = init_env() |
11fdf7f2 TL |
3489 | bucket_name = gen_bucket_name() |
3490 | topic_name = bucket_name+TOPIC_SUFFIX | |
3491 | ||
eafe8130 TL |
3492 | # create amqp topic |
3493 | hostname = get_ip() | |
3494 | exchange = 'ex1' | |
3495 | amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
3496 | amqp_task.start() | |
9f95a23c | 3497 | topic_conf = PSTopic(ps_zone.conn, topic_name, |
eafe8130 TL |
3498 | endpoint='amqp://' + hostname, |
3499 | endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3500 | result, status = topic_conf.set_config() | |
3501 | assert_equal(status/100, 2) | |
3502 | parsed_result = json.loads(result) | |
3503 | topic_arn = parsed_result['arn'] | |
3504 | # get topic | |
3505 | result, _ = topic_conf.get_config() | |
3506 | # verify topic content | |
3507 | parsed_result = json.loads(result) | |
3508 | assert_equal(parsed_result['topic']['name'], topic_name) | |
3509 | assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) | |
3510 | ||
3511 | # create http server | |
3512 | port = random.randint(10000, 20000) | |
3513 | # start an http server in a separate thread | |
3514 | http_server = StreamingHTTPServer(hostname, port) | |
3515 | ||
11fdf7f2 | 3516 | # create bucket on the first of the rados zones |
9f95a23c | 3517 | bucket = master_zone.create_bucket(bucket_name) |
11fdf7f2 | 3518 | # wait for sync |
9f95a23c | 3519 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
3520 | # create s3 notification |
3521 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3522 | topic_conf_list = [{'Id': notification_name, | |
3523 | 'TopicArn': topic_arn, | |
3524 | 'Events': ['s3:ObjectCreated:*'] | |
3525 | }] | |
9f95a23c | 3526 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 | 3527 | _, status = s3_notification_conf.set_config() |
11fdf7f2 TL |
3528 | assert_equal(status/100, 2) |
3529 | # create objects in the bucket | |
3530 | number_of_objects = 10 | |
3531 | for i in range(number_of_objects): | |
3532 | key = bucket.new_key(str(i)) | |
3533 | key.set_contents_from_string('bar') | |
3534 | # wait for sync | |
9f95a23c | 3535 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
11fdf7f2 | 3536 | |
11fdf7f2 | 3537 | keys = list(bucket.list()) |
eafe8130 TL |
3538 | # TODO: use exact match |
3539 | receiver.verify_s3_events(keys, exact_match=False) | |
11fdf7f2 | 3540 | |
eafe8130 | 3541 | # update the same topic with new endpoint |
9f95a23c | 3542 | topic_conf = PSTopic(ps_zone.conn, topic_name, |
eafe8130 TL |
3543 | endpoint='http://'+ hostname + ':' + str(port)) |
3544 | _, status = topic_conf.set_config() | |
3545 | assert_equal(status/100, 2) | |
3546 | # get topic | |
3547 | result, _ = topic_conf.get_config() | |
3548 | # verify topic content | |
11fdf7f2 | 3549 | parsed_result = json.loads(result) |
eafe8130 TL |
3550 | assert_equal(parsed_result['topic']['name'], topic_name) |
3551 | assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) | |
3552 | ||
3553 | # delete current objects and create new objects in the bucket | |
3554 | for key in bucket.list(): | |
3555 | key.delete() | |
3556 | for i in range(number_of_objects): | |
3557 | key = bucket.new_key(str(i+100)) | |
3558 | key.set_contents_from_string('bar') | |
3559 | # wait for sync | |
9f95a23c TL |
3560 | zone_meta_checkpoint(ps_zone.zone) |
3561 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
eafe8130 TL |
3562 | |
3563 | keys = list(bucket.list()) | |
3564 | # verify that notifications are still sent to amqp | |
3565 | # TODO: use exact match | |
3566 | receiver.verify_s3_events(keys, exact_match=False) | |
3567 | ||
3568 | # update notification to update the endpoint from the topic | |
3569 | topic_conf_list = [{'Id': notification_name, | |
3570 | 'TopicArn': topic_arn, | |
3571 | 'Events': ['s3:ObjectCreated:*'] | |
3572 | }] | |
9f95a23c | 3573 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 TL |
3574 | _, status = s3_notification_conf.set_config() |
3575 | assert_equal(status/100, 2) | |
3576 | ||
3577 | # delete current objects and create new objects in the bucket | |
3578 | for key in bucket.list(): | |
3579 | key.delete() | |
3580 | for i in range(number_of_objects): | |
3581 | key = bucket.new_key(str(i+200)) | |
3582 | key.set_contents_from_string('bar') | |
3583 | # wait for sync | |
9f95a23c TL |
3584 | zone_meta_checkpoint(ps_zone.zone) |
3585 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
eafe8130 TL |
3586 | |
3587 | keys = list(bucket.list()) | |
3588 | # check that updates switched to http | |
3589 | # TODO: use exact match | |
3590 | http_server.verify_s3_events(keys, exact_match=False) | |
11fdf7f2 TL |
3591 | |
3592 | # cleanup | |
eafe8130 TL |
3593 | # delete objects from the bucket |
3594 | stop_amqp_receiver(receiver, amqp_task) | |
11fdf7f2 TL |
3595 | for key in bucket.list(): |
3596 | key.delete() | |
eafe8130 TL |
3597 | s3_notification_conf.del_config() |
3598 | topic_conf.del_config() | |
9f95a23c | 3599 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
3600 | http_server.close() |
3601 | clean_rabbitmq(rabbit_proc) | |
3602 | ||
3603 | ||
3604 | def test_ps_s3_notification_update(): | |
3605 | """ test updating the topic of a notification""" | |
3606 | if skip_push_tests: | |
3607 | return SkipTest("PubSub push tests don't run in teuthology") | |
3608 | hostname = get_ip() | |
3609 | rabbit_proc = init_rabbitmq() | |
3610 | if rabbit_proc is None: | |
3611 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
11fdf7f2 | 3612 | |
9f95a23c | 3613 | master_zone, ps_zone = init_env() |
11fdf7f2 | 3614 | bucket_name = gen_bucket_name() |
eafe8130 TL |
3615 | topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX |
3616 | topic_name2 = bucket_name+'http'+TOPIC_SUFFIX | |
3617 | ||
3618 | # create topics | |
3619 | # start amqp receiver in a separate thread | |
3620 | exchange = 'ex1' | |
3621 | amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) | |
3622 | amqp_task.start() | |
3623 | # create random port for the http server | |
3624 | http_port = random.randint(10000, 20000) | |
3625 | # start an http server in a separate thread | |
3626 | http_server = StreamingHTTPServer(hostname, http_port) | |
3627 | ||
9f95a23c | 3628 | topic_conf1 = PSTopic(ps_zone.conn, topic_name1, |
eafe8130 TL |
3629 | endpoint='amqp://' + hostname, |
3630 | endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3631 | result, status = topic_conf1.set_config() | |
3632 | parsed_result = json.loads(result) | |
3633 | topic_arn1 = parsed_result['arn'] | |
3634 | assert_equal(status/100, 2) | |
9f95a23c | 3635 | topic_conf2 = PSTopic(ps_zone.conn, topic_name2, |
eafe8130 TL |
3636 | endpoint='http://'+hostname+':'+str(http_port)) |
3637 | result, status = topic_conf2.set_config() | |
3638 | parsed_result = json.loads(result) | |
3639 | topic_arn2 = parsed_result['arn'] | |
3640 | assert_equal(status/100, 2) | |
11fdf7f2 | 3641 | |
11fdf7f2 | 3642 | # create bucket on the first of the rados zones |
9f95a23c | 3643 | bucket = master_zone.create_bucket(bucket_name) |
11fdf7f2 | 3644 | # wait for sync |
9f95a23c | 3645 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
3646 | # create s3 notification with topic1 |
3647 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3648 | topic_conf_list = [{'Id': notification_name, | |
3649 | 'TopicArn': topic_arn1, | |
3650 | 'Events': ['s3:ObjectCreated:*'] | |
3651 | }] | |
9f95a23c | 3652 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 | 3653 | _, status = s3_notification_conf.set_config() |
11fdf7f2 | 3654 | assert_equal(status/100, 2) |
eafe8130 TL |
3655 | # create objects in the bucket |
3656 | number_of_objects = 10 | |
3657 | for i in range(number_of_objects): | |
3658 | key = bucket.new_key(str(i)) | |
3659 | key.set_contents_from_string('bar') | |
3660 | # wait for sync | |
9f95a23c | 3661 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
eafe8130 TL |
3662 | |
3663 | keys = list(bucket.list()) | |
3664 | # TODO: use exact match | |
3665 | receiver.verify_s3_events(keys, exact_match=False); | |
3666 | ||
3667 | # update notification to use topic2 | |
3668 | topic_conf_list = [{'Id': notification_name, | |
3669 | 'TopicArn': topic_arn2, | |
3670 | 'Events': ['s3:ObjectCreated:*'] | |
3671 | }] | |
9f95a23c | 3672 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 | 3673 | _, status = s3_notification_conf.set_config() |
11fdf7f2 | 3674 | assert_equal(status/100, 2) |
eafe8130 TL |
3675 | |
3676 | # delete current objects and create new objects in the bucket | |
3677 | for key in bucket.list(): | |
3678 | key.delete() | |
3679 | for i in range(number_of_objects): | |
3680 | key = bucket.new_key(str(i+100)) | |
3681 | key.set_contents_from_string('bar') | |
11fdf7f2 | 3682 | # wait for sync |
9f95a23c TL |
3683 | zone_meta_checkpoint(ps_zone.zone) |
3684 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
11fdf7f2 | 3685 | |
eafe8130 TL |
3686 | keys = list(bucket.list()) |
3687 | # check that updates switched to http | |
3688 | # TODO: use exact match | |
3689 | http_server.verify_s3_events(keys, exact_match=False) | |
11fdf7f2 | 3690 | |
11fdf7f2 | 3691 | # cleanup |
eafe8130 TL |
3692 | # delete objects from the bucket |
3693 | stop_amqp_receiver(receiver, amqp_task) | |
11fdf7f2 TL |
3694 | for key in bucket.list(): |
3695 | key.delete() | |
eafe8130 TL |
3696 | s3_notification_conf.del_config() |
3697 | topic_conf1.del_config() | |
3698 | topic_conf2.del_config() | |
9f95a23c | 3699 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
3700 | http_server.close() |
3701 | clean_rabbitmq(rabbit_proc) | |
11fdf7f2 TL |
3702 | |
3703 | ||
eafe8130 TL |
3704 | def test_ps_s3_multiple_topics_notification(): |
3705 | """ test notification creation with multiple topics""" | |
3706 | if skip_push_tests: | |
3707 | return SkipTest("PubSub push tests don't run in teuthology") | |
3708 | hostname = get_ip() | |
3709 | rabbit_proc = init_rabbitmq() | |
3710 | if rabbit_proc is None: | |
3711 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
3712 | ||
9f95a23c | 3713 | master_zone, ps_zone = init_env() |
11fdf7f2 | 3714 | bucket_name = gen_bucket_name() |
eafe8130 TL |
3715 | topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX |
3716 | topic_name2 = bucket_name+'http'+TOPIC_SUFFIX | |
3717 | ||
3718 | # create topics | |
3719 | # start amqp receiver in a separate thread | |
3720 | exchange = 'ex1' | |
3721 | amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) | |
3722 | amqp_task.start() | |
3723 | # create random port for the http server | |
3724 | http_port = random.randint(10000, 20000) | |
3725 | # start an http server in a separate thread | |
3726 | http_server = StreamingHTTPServer(hostname, http_port) | |
3727 | ||
9f95a23c | 3728 | topic_conf1 = PSTopic(ps_zone.conn, topic_name1, |
eafe8130 TL |
3729 | endpoint='amqp://' + hostname, |
3730 | endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3731 | result, status = topic_conf1.set_config() | |
3732 | parsed_result = json.loads(result) | |
3733 | topic_arn1 = parsed_result['arn'] | |
3734 | assert_equal(status/100, 2) | |
9f95a23c | 3735 | topic_conf2 = PSTopic(ps_zone.conn, topic_name2, |
eafe8130 TL |
3736 | endpoint='http://'+hostname+':'+str(http_port)) |
3737 | result, status = topic_conf2.set_config() | |
3738 | parsed_result = json.loads(result) | |
3739 | topic_arn2 = parsed_result['arn'] | |
3740 | assert_equal(status/100, 2) | |
11fdf7f2 | 3741 | |
11fdf7f2 | 3742 | # create bucket on the first of the rados zones |
9f95a23c | 3743 | bucket = master_zone.create_bucket(bucket_name) |
11fdf7f2 | 3744 | # wait for sync |
9f95a23c | 3745 | zone_meta_checkpoint(ps_zone.zone) |
eafe8130 TL |
3746 | # create s3 notification |
3747 | notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' | |
3748 | notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' | |
3749 | topic_conf_list = [ | |
3750 | { | |
3751 | 'Id': notification_name1, | |
3752 | 'TopicArn': topic_arn1, | |
3753 | 'Events': ['s3:ObjectCreated:*'] | |
3754 | }, | |
3755 | { | |
3756 | 'Id': notification_name2, | |
3757 | 'TopicArn': topic_arn2, | |
3758 | 'Events': ['s3:ObjectCreated:*'] | |
3759 | }] | |
9f95a23c | 3760 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) |
eafe8130 | 3761 | _, status = s3_notification_conf.set_config() |
11fdf7f2 | 3762 | assert_equal(status/100, 2) |
eafe8130 TL |
3763 | result, _ = s3_notification_conf.get_config() |
3764 | assert_equal(len(result['TopicConfigurations']), 2) | |
3765 | assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1) | |
3766 | assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2) | |
3767 | ||
3768 | # get auto-generated subscriptions | |
9f95a23c | 3769 | sub_conf1 = PSSubscription(ps_zone.conn, notification_name1, |
eafe8130 TL |
3770 | topic_name1) |
3771 | _, status = sub_conf1.get_config() | |
11fdf7f2 | 3772 | assert_equal(status/100, 2) |
9f95a23c | 3773 | sub_conf2 = PSSubscription(ps_zone.conn, notification_name2, |
eafe8130 TL |
3774 | topic_name2) |
3775 | _, status = sub_conf2.get_config() | |
3776 | assert_equal(status/100, 2) | |
3777 | ||
11fdf7f2 | 3778 | # create objects in the bucket |
eafe8130 TL |
3779 | number_of_objects = 10 |
3780 | for i in range(number_of_objects): | |
3781 | key = bucket.new_key(str(i)) | |
3782 | key.set_contents_from_string('bar') | |
11fdf7f2 | 3783 | # wait for sync |
9f95a23c | 3784 | zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) |
11fdf7f2 | 3785 | |
eafe8130 TL |
3786 | # get the events from both of the subscription |
3787 | result, _ = sub_conf1.get_events() | |
92f5a8d4 TL |
3788 | records = json.loads(result) |
3789 | for record in records['Records']: | |
eafe8130 TL |
3790 | log.debug(record) |
3791 | keys = list(bucket.list()) | |
3792 | # TODO: use exact match | |
92f5a8d4 | 3793 | verify_s3_records_by_elements(records, keys, exact_match=False) |
eafe8130 TL |
3794 | receiver.verify_s3_events(keys, exact_match=False) |
3795 | ||
3796 | result, _ = sub_conf2.get_events() | |
3797 | parsed_result = json.loads(result) | |
3798 | for record in parsed_result['Records']: | |
3799 | log.debug(record) | |
3800 | keys = list(bucket.list()) | |
3801 | # TODO: use exact match | |
92f5a8d4 | 3802 | verify_s3_records_by_elements(records, keys, exact_match=False) |
eafe8130 | 3803 | http_server.verify_s3_events(keys, exact_match=False) |
11fdf7f2 | 3804 | |
11fdf7f2 | 3805 | # cleanup |
eafe8130 TL |
3806 | stop_amqp_receiver(receiver, amqp_task) |
3807 | s3_notification_conf.del_config() | |
3808 | topic_conf1.del_config() | |
3809 | topic_conf2.del_config() | |
3810 | # delete objects from the bucket | |
3811 | for key in bucket.list(): | |
3812 | key.delete() | |
9f95a23c | 3813 | master_zone.delete_bucket(bucket_name) |
eafe8130 TL |
3814 | http_server.close() |
3815 | clean_rabbitmq(rabbit_proc) |