]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/rgw/rgw_multi/tests_ps.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests_ps.py
CommitLineData
11fdf7f2
TL
1import logging
2import json
3import tempfile
eafe8130
TL
4import BaseHTTPServer
5import SocketServer
6import random
7import threading
8import subprocess
9import socket
10import time
11import os
9f95a23c 12from random import randint
eafe8130 13from .tests import get_realm, \
11fdf7f2
TL
14 ZonegroupConns, \
15 zonegroup_meta_checkpoint, \
16 zone_meta_checkpoint, \
17 zone_bucket_checkpoint, \
18 zone_data_checkpoint, \
eafe8130 19 zonegroup_bucket_checkpoint, \
11fdf7f2 20 check_bucket_eq, \
eafe8130
TL
21 gen_bucket_name, \
22 get_user, \
23 get_tenant
9f95a23c
TL
24from .zone_ps import PSTopic, \
25 PSTopicS3, \
26 PSNotification, \
27 PSSubscription, \
28 PSNotificationS3, \
29 print_connection_info, \
30 delete_all_s3_topics, \
31 put_object_tagging, \
32 get_object_tagging, \
33 delete_all_objects
eafe8130 34from multisite import User
11fdf7f2
TL
35from nose import SkipTest
36from nose.tools import assert_not_equal, assert_equal
9f95a23c 37import boto.s3.tagging
11fdf7f2
TL
38
39# configure logging for the tests module
eafe8130
TL
40log = logging.getLogger(__name__)
41
42skip_push_tests = True
11fdf7f2
TL
43
44####################################
45# utility functions for pubsub tests
46####################################
47
eafe8130
TL
48def set_contents_from_string(key, content):
49 try:
50 key.set_contents_from_string(content)
51 except Exception as e:
9f95a23c 52 print('Error: ' + str(e))
eafe8130
TL
53
54
55# HTTP endpoint functions
56# multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
57
58class HTTPPostHandler(BaseHTTPServer.BaseHTTPRequestHandler):
59 """HTTP POST hanler class storing the received events in its http server"""
60 def do_POST(self):
61 """implementation of POST handler"""
62 try:
63 content_length = int(self.headers['Content-Length'])
64 body = self.rfile.read(content_length)
65 log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
66 self.server.append(json.loads(body))
67 except:
68 log.error('HTTP Server received empty event')
69 self.send_response(400)
70 else:
71 self.send_response(100)
72 finally:
73 self.end_headers()
74
75
76class HTTPServerWithEvents(BaseHTTPServer.HTTPServer):
77 """HTTP server used by the handler to store events"""
78 def __init__(self, addr, handler, worker_id):
79 BaseHTTPServer.HTTPServer.__init__(self, addr, handler, False)
80 self.worker_id = worker_id
81 self.events = []
82
83 def append(self, event):
84 self.events.append(event)
85
86
87class HTTPServerThread(threading.Thread):
88 """thread for running the HTTP server. reusing the same socket for all threads"""
89 def __init__(self, i, sock, addr):
90 threading.Thread.__init__(self)
91 self.i = i
92 self.daemon = True
93 self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i)
94 self.httpd.socket = sock
95 # prevent the HTTP server from re-binding every handler
96 self.httpd.server_bind = self.server_close = lambda self: None
97 self.start()
98
99 def run(self):
100 try:
101 log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
102 self.httpd.serve_forever()
103 log.info('HTTP Server (%d) ended', self.i)
104 except Exception as error:
105 # could happen if the server r/w to a closing socket during shutdown
106 log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
107
108 def close(self):
109 self.httpd.shutdown()
110
111 def get_events(self):
112 return self.httpd.events
113
114 def reset_events(self):
115 self.httpd.events = []
116
117
118class StreamingHTTPServer:
119 """multi-threaded http server class also holding list of events received into the handler
120 each thread has its own server, and all servers share the same socket"""
121 def __init__(self, host, port, num_workers=100):
122 addr = (host, port)
123 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
125 self.sock.bind(addr)
126 self.sock.listen(num_workers)
127 self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
128
129 def verify_s3_events(self, keys, exact_match=False, deletions=False):
130 """verify stored s3 records agains a list of keys"""
131 events = []
132 for worker in self.workers:
133 events += worker.get_events()
134 worker.reset_events()
135 verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
136
137 def verify_events(self, keys, exact_match=False, deletions=False):
138 """verify stored events agains a list of keys"""
139 events = []
140 for worker in self.workers:
141 events += worker.get_events()
142 worker.reset_events()
143 verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
144
9f95a23c
TL
145 def get_and_reset_events(self):
146 events = []
147 for worker in self.workers:
148 events += worker.get_events()
149 worker.reset_events()
150 return events
151
eafe8130
TL
152 def close(self):
153 """close all workers in the http server and wait for it to finish"""
154 # make sure that the shared socket is closed
155 # this is needed in case that one of the threads is blocked on the socket
156 self.sock.shutdown(socket.SHUT_RDWR)
157 self.sock.close()
158 # wait for server threads to finish
159 for worker in self.workers:
160 worker.close()
161 worker.join()
162
eafe8130
TL
163# AMQP endpoint functions
164
165rabbitmq_port = 5672
166
167class AMQPReceiver(object):
168 """class for receiving and storing messages on a topic from the AMQP broker"""
169 def __init__(self, exchange, topic):
170 import pika
171 hostname = get_ip()
172 remaining_retries = 10
173 while remaining_retries > 0:
174 try:
175 connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
176 break
177 except Exception as error:
178 remaining_retries -= 1
9f95a23c
TL
179 print('failed to connect to rabbitmq (remaining retries '
180 + str(remaining_retries) + '): ' + str(error))
eafe8130
TL
181
182 if remaining_retries == 0:
183 raise Exception('failed to connect to rabbitmq - no retries left')
184
185 self.channel = connection.channel()
186 self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
187 result = self.channel.queue_declare('', exclusive=True)
188 queue_name = result.method.queue
189 self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic)
190 self.channel.basic_consume(queue=queue_name,
191 on_message_callback=self.on_message,
192 auto_ack=True)
193 self.events = []
194 self.topic = topic
195
196 def on_message(self, ch, method, properties, body):
197 """callback invoked when a new message arrive on the topic"""
198 log.info('AMQP received event for topic %s:\n %s', self.topic, body)
199 self.events.append(json.loads(body))
200
201 # TODO create a base class for the AMQP and HTTP cases
202 def verify_s3_events(self, keys, exact_match=False, deletions=False):
203 """verify stored s3 records agains a list of keys"""
204 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
205 self.events = []
206
207 def verify_events(self, keys, exact_match=False, deletions=False):
208 """verify stored events agains a list of keys"""
209 verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
210 self.events = []
211
212 def get_and_reset_events(self):
213 tmp = self.events
214 self.events = []
215 return tmp
216
217
218def amqp_receiver_thread_runner(receiver):
219 """main thread function for the amqp receiver"""
220 try:
221 log.info('AMQP receiver started')
222 receiver.channel.start_consuming()
223 log.info('AMQP receiver ended')
224 except Exception as error:
225 log.info('AMQP receiver ended unexpectedly: %s', str(error))
226
227
228def create_amqp_receiver_thread(exchange, topic):
229 """create amqp receiver and thread"""
230 receiver = AMQPReceiver(exchange, topic)
231 task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
232 task.daemon = True
233 return task, receiver
234
235
236def stop_amqp_receiver(receiver, task):
237 """stop the receiver thread and wait for it to finis"""
238 try:
239 receiver.channel.stop_consuming()
240 log.info('stopping AMQP receiver')
241 except Exception as error:
242 log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
243 task.join(5)
11fdf7f2
TL
244
245def check_ps_configured():
246 """check if at least one pubsub zone exist"""
247 realm = get_realm()
248 zonegroup = realm.master_zonegroup()
249
eafe8130
TL
250 ps_zones = zonegroup.zones_by_type.get("pubsub")
251 if not ps_zones:
11fdf7f2
TL
252 raise SkipTest("Requires at least one PS zone")
253
254
255def is_ps_zone(zone_conn):
256 """check if a specific zone is pubsub zone"""
257 if not zone_conn:
258 return False
259 return zone_conn.zone.tier_type() == "pubsub"
260
261
262def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
263 """ verify there is at least one event per element """
264 err = ''
265 for key in keys:
266 key_found = False
92f5a8d4
TL
267 if type(events) is list:
268 for event_list in events:
269 if key_found:
11fdf7f2 270 break
92f5a8d4
TL
271 for event in event_list['events']:
272 if event['info']['bucket']['name'] == key.bucket.name and \
273 event['info']['key']['name'] == key.name:
274 if deletions and event['event'] == 'OBJECT_DELETE':
275 key_found = True
276 break
277 elif not deletions and event['event'] == 'OBJECT_CREATE':
278 key_found = True
279 break
280 else:
281 for event in events['events']:
282 if event['info']['bucket']['name'] == key.bucket.name and \
283 event['info']['key']['name'] == key.name:
284 if deletions and event['event'] == 'OBJECT_DELETE':
285 key_found = True
286 break
287 elif not deletions and event['event'] == 'OBJECT_CREATE':
288 key_found = True
289 break
290
11fdf7f2
TL
291 if not key_found:
292 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
293 log.error(events)
294 assert False, err
295
296 if not len(events) == len(keys):
297 err = 'superfluous events are found'
298 log.debug(err)
299 if exact_match:
300 log.error(events)
301 assert False, err
302
303
eafe8130
TL
304def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False):
305 """ verify there is at least one record per element """
306 err = ''
307 for key in keys:
308 key_found = False
92f5a8d4
TL
309 if type(records) is list:
310 for record_list in records:
311 if key_found:
eafe8130 312 break
92f5a8d4
TL
313 for record in record_list['Records']:
314 if record['s3']['bucket']['name'] == key.bucket.name and \
315 record['s3']['object']['key'] == key.name:
316 if deletions and 'ObjectRemoved' in record['eventName']:
317 key_found = True
318 break
319 elif not deletions and 'ObjectCreated' in record['eventName']:
320 key_found = True
321 break
322 else:
323 for record in records['Records']:
324 if record['s3']['bucket']['name'] == key.bucket.name and \
325 record['s3']['object']['key'] == key.name:
326 if deletions and 'ObjectRemoved' in record['eventName']:
327 key_found = True
328 break
329 elif not deletions and 'ObjectCreated' in record['eventName']:
330 key_found = True
331 break
332
eafe8130
TL
333 if not key_found:
334 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
92f5a8d4
TL
335 for record_list in records:
336 for record in record_list['Records']:
337 log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
eafe8130
TL
338 assert False, err
339
340 if not len(records) == len(keys):
341 err = 'superfluous records are found'
342 log.warning(err)
343 if exact_match:
92f5a8d4
TL
344 for record_list in records:
345 for record in record_list['Records']:
346 log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
eafe8130
TL
347 assert False, err
348
349
350def init_rabbitmq():
351 """ start a rabbitmq broker """
352 hostname = get_ip()
353 #port = str(random.randint(20000, 30000))
354 #data_dir = './' + port + '_data'
355 #log_dir = './' + port + '_log'
356 #print('')
357 #try:
358 # os.mkdir(data_dir)
359 # os.mkdir(log_dir)
360 #except:
361 # print('rabbitmq directories already exists')
362 #env = {'RABBITMQ_NODE_PORT': port,
363 # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
364 # 'RABBITMQ_USE_LONGNAME': 'true',
365 # 'RABBITMQ_MNESIA_BASE': data_dir,
366 # 'RABBITMQ_LOG_BASE': log_dir}
367 # TODO: support multiple brokers per host using env
368 # make sure we don't collide with the default
369 try:
370 proc = subprocess.Popen('rabbitmq-server')
371 except Exception as error:
372 log.info('failed to execute rabbitmq-server: %s', str(error))
9f95a23c 373 print('failed to execute rabbitmq-server: %s' % str(error))
eafe8130
TL
374 return None
375 # TODO add rabbitmq checkpoint instead of sleep
376 time.sleep(5)
377 return proc #, port, data_dir, log_dir
378
379
380def clean_rabbitmq(proc): #, data_dir, log_dir)
381 """ stop the rabbitmq broker """
382 try:
383 subprocess.call(['rabbitmqctl', 'stop'])
384 time.sleep(5)
385 proc.terminate()
386 except:
387 log.info('rabbitmq server already terminated')
388 # TODO: add directory cleanup once multiple brokers are supported
389 #try:
390 # os.rmdir(data_dir)
391 # os.rmdir(log_dir)
392 #except:
393 # log.info('rabbitmq directories already removed')
394
395
9f95a23c
TL
396# Kafka endpoint functions
397
398kafka_server = 'localhost'
399
400class KafkaReceiver(object):
401 """class for receiving and storing messages on a topic from the kafka broker"""
402 def __init__(self, topic, security_type):
403 from kafka import KafkaConsumer
404 remaining_retries = 10
405 port = 9092
406 if security_type != 'PLAINTEXT':
407 security_type = 'SSL'
408 port = 9093
409 while remaining_retries > 0:
410 try:
411 self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
412 print('Kafka consumer created on topic: '+topic)
413 break
414 except Exception as error:
415 remaining_retries -= 1
416 print('failed to connect to kafka (remaining retries '
417 + str(remaining_retries) + '): ' + str(error))
418 time.sleep(1)
419
420 if remaining_retries == 0:
421 raise Exception('failed to connect to kafka - no retries left')
422
423 self.events = []
424 self.topic = topic
425 self.stop = False
426
427 def verify_s3_events(self, keys, exact_match=False, deletions=False):
428 """verify stored s3 records agains a list of keys"""
429 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
430 self.events = []
431
432
433def kafka_receiver_thread_runner(receiver):
434 """main thread function for the kafka receiver"""
435 try:
436 log.info('Kafka receiver started')
437 print('Kafka receiver started')
438 while not receiver.stop:
439 for msg in receiver.consumer:
440 receiver.events.append(json.loads(msg.value))
441 timer.sleep(0.1)
442 log.info('Kafka receiver ended')
443 print('Kafka receiver ended')
444 except Exception as error:
445 log.info('Kafka receiver ended unexpectedly: %s', str(error))
446 print('Kafka receiver ended unexpectedly: ' + str(error))
447
448
449def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
450 """create kafka receiver and thread"""
451 receiver = KafkaReceiver(topic, security_type)
452 task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
453 task.daemon = True
454 return task, receiver
455
456def stop_kafka_receiver(receiver, task):
457 """stop the receiver thread and wait for it to finis"""
458 receiver.stop = True
459 task.join(1)
460 try:
461 receiver.consumer.close()
462 except Exception as error:
463 log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
464
465
466# follow the instruction here to create and sign a broker certificate:
467# https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka
468
469# the generated broker certificate should be stored in the java keystore for the use of the server
470# assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost"
471# following lines must be added to $KAFKA_DIR/config/server.properties
472# listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
473# sasl.enabled.mechanisms=PLAIN
474# ssl.keystore.location = $KAFKA_DIR/server.keystore.jks
475# ssl.keystore.password = abcdefgh
476# ssl.key.password = abcdefgh
477# ssl.truststore.location = $KAFKA_DIR/server.truststore.jks
478# ssl.truststore.password = abcdefgh
479
480# notes:
481# (1) we dont test client authentication, hence, no need to generate client keys
482# (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above
483# should be copied to: $KAFKA_DIR
484
485def init_kafka():
486 """ start kafka/zookeeper """
487 try:
488 KAFKA_DIR = os.environ['KAFKA_DIR']
489 except:
490 KAFKA_DIR = ''
491
492 if KAFKA_DIR == '':
493 log.info('KAFKA_DIR must be set to where kafka is installed')
494 print('KAFKA_DIR must be set to where kafka is installed')
495 return None, None, None
496
497 DEVNULL = open(os.devnull, 'wb')
498
499 print('\nStarting zookeeper...')
500 try:
501 zk_proc = subprocess.Popen([KAFKA_DIR+'bin/zookeeper-server-start.sh', KAFKA_DIR+'config/zookeeper.properties'], stdout=DEVNULL)
502 except Exception as error:
503 log.info('failed to execute zookeeper: %s', str(error))
504 print('failed to execute zookeeper: %s' % str(error))
505 return None, None, None
506
507 time.sleep(5)
508 if zk_proc.poll() is not None:
509 print('zookeeper failed to start')
510 return None, None, None
511 print('Zookeeper started')
512 print('Starting kafka...')
513 kafka_log = open('./kafka.log', 'w')
514 try:
515 kafka_env = os.environ.copy()
516 kafka_env['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR+'config/kafka_server_jaas.conf'
517 kafka_proc = subprocess.Popen([
518 KAFKA_DIR+'bin/kafka-server-start.sh',
519 KAFKA_DIR+'config/server.properties'],
520 stdout=kafka_log,
521 env=kafka_env)
522 except Exception as error:
523 log.info('failed to execute kafka: %s', str(error))
524 print('failed to execute kafka: %s' % str(error))
525 zk_proc.terminate()
526 kafka_log.close()
527 return None, None, None
528
529 # TODO add kafka checkpoint instead of sleep
530 time.sleep(15)
531 if kafka_proc.poll() is not None:
532 zk_proc.terminate()
533 print('kafka failed to start. details in: ./kafka.log')
534 kafka_log.close()
535 return None, None, None
536
537 print('Kafka started')
538 return kafka_proc, zk_proc, kafka_log
539
540
541def clean_kafka(kafka_proc, zk_proc, kafka_log):
542 """ stop kafka/zookeeper """
543 try:
544 kafka_log.close()
545 print('Shutdown Kafka...')
546 kafka_proc.terminate()
547 time.sleep(5)
548 if kafka_proc.poll() is None:
549 print('Failed to shutdown Kafka... killing')
550 kafka_proc.kill()
551 print('Shutdown zookeeper...')
552 zk_proc.terminate()
553 time.sleep(5)
554 if zk_proc.poll() is None:
555 print('Failed to shutdown zookeeper... killing')
556 zk_proc.kill()
557 except:
558 log.info('kafka/zookeeper already terminated')
559
560
eafe8130 561def init_env(require_ps=True):
11fdf7f2 562 """initialize the environment"""
eafe8130
TL
563 if require_ps:
564 check_ps_configured()
11fdf7f2
TL
565
566 realm = get_realm()
567 zonegroup = realm.master_zonegroup()
568 zonegroup_conns = ZonegroupConns(zonegroup)
569
570 zonegroup_meta_checkpoint(zonegroup)
571
9f95a23c
TL
572 ps_zone = None
573 master_zone = None
11fdf7f2 574 for conn in zonegroup_conns.zones:
9f95a23c
TL
575 if conn.zone == zonegroup.master_zone:
576 master_zone = conn
11fdf7f2
TL
577 if is_ps_zone(conn):
578 zone_meta_checkpoint(conn.zone)
9f95a23c 579 ps_zone = conn
11fdf7f2 580
9f95a23c 581 assert_not_equal(master_zone, None)
eafe8130 582 if require_ps:
9f95a23c
TL
583 assert_not_equal(ps_zone, None)
584 return master_zone, ps_zone
11fdf7f2
TL
585
586
eafe8130
TL
587def get_ip():
588 """ This method returns the "primary" IP on the local box (the one with a default route)
589 source: https://stackoverflow.com/a/28950776/711085
590 this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
591 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
592 try:
593 # address should not be reachable
594 s.connect(('10.255.255.255', 1))
595 ip = s.getsockname()[0]
596 finally:
597 s.close()
598 return ip
599
600
11fdf7f2
TL
601TOPIC_SUFFIX = "_topic"
602SUB_SUFFIX = "_sub"
eafe8130 603NOTIFICATION_SUFFIX = "_notif"
11fdf7f2
TL
604
605##############
606# pubsub tests
607##############
608
eafe8130
TL
609def test_ps_info():
610 """ log information for manual testing """
611 return SkipTest("only used in manual testing")
9f95a23c 612 master_zone, ps_zone = init_env()
eafe8130
TL
613 realm = get_realm()
614 zonegroup = realm.master_zonegroup()
11fdf7f2 615 bucket_name = gen_bucket_name()
eafe8130 616 # create bucket on the first of the rados zones
9f95a23c 617 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
618 # create objects in the bucket
619 number_of_objects = 10
620 for i in range(number_of_objects):
621 key = bucket.new_key(str(i))
622 key.set_contents_from_string('bar')
9f95a23c
TL
623 print('Zonegroup: ' + zonegroup.name)
624 print('user: ' + get_user())
625 print('tenant: ' + get_tenant())
626 print('Master Zone')
627 print_connection_info(master_zone.conn)
628 print('PubSub Zone')
629 print_connection_info(ps_zone.conn)
630 print('Bucket: ' + bucket_name)
11fdf7f2
TL
631
632
eafe8130
TL
633def test_ps_s3_notification_low_level():
634 """ test low level implementation of s3 notifications """
9f95a23c 635 master_zone, ps_zone = init_env()
11fdf7f2 636 bucket_name = gen_bucket_name()
11fdf7f2 637 # create bucket on the first of the rados zones
9f95a23c 638 master_zone.create_bucket(bucket_name)
11fdf7f2 639 # wait for sync
9f95a23c 640 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
641 # create topic
642 topic_name = bucket_name + TOPIC_SUFFIX
9f95a23c 643 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130 644 result, status = topic_conf.set_config()
11fdf7f2 645 assert_equal(status/100, 2)
11fdf7f2 646 parsed_result = json.loads(result)
eafe8130
TL
647 topic_arn = parsed_result['arn']
648 # create s3 notification
649 notification_name = bucket_name + NOTIFICATION_SUFFIX
650 generated_topic_name = notification_name+'_'+topic_name
651 topic_conf_list = [{'Id': notification_name,
652 'TopicArn': topic_arn,
653 'Events': ['s3:ObjectCreated:*']
654 }]
9f95a23c 655 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130 656 _, status = s3_notification_conf.set_config()
11fdf7f2 657 assert_equal(status/100, 2)
9f95a23c 658 zone_meta_checkpoint(ps_zone.zone)
eafe8130 659 # get auto-generated topic
9f95a23c 660 generated_topic_conf = PSTopic(ps_zone.conn, generated_topic_name)
eafe8130
TL
661 result, status = generated_topic_conf.get_config()
662 parsed_result = json.loads(result)
11fdf7f2 663 assert_equal(status/100, 2)
eafe8130
TL
664 assert_equal(parsed_result['topic']['name'], generated_topic_name)
665 # get auto-generated notification
9f95a23c 666 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
667 generated_topic_name)
668 result, status = notification_conf.get_config()
11fdf7f2 669 parsed_result = json.loads(result)
eafe8130 670 assert_equal(status/100, 2)
11fdf7f2 671 assert_equal(len(parsed_result['topics']), 1)
eafe8130 672 # get auto-generated subscription
9f95a23c 673 sub_conf = PSSubscription(ps_zone.conn, notification_name,
eafe8130
TL
674 generated_topic_name)
675 result, status = sub_conf.get_config()
676 parsed_result = json.loads(result)
677 assert_equal(status/100, 2)
678 assert_equal(parsed_result['topic'], generated_topic_name)
679 # delete s3 notification
680 _, status = s3_notification_conf.del_config(notification=notification_name)
681 assert_equal(status/100, 2)
682 # delete topic
683 _, status = topic_conf.del_config()
684 assert_equal(status/100, 2)
685
686 # verify low-level cleanup
687 _, status = generated_topic_conf.get_config()
688 assert_equal(status, 404)
689 result, status = notification_conf.get_config()
690 parsed_result = json.loads(result)
691 assert_equal(len(parsed_result['topics']), 0)
692 # TODO should return 404
693 # assert_equal(status, 404)
694 result, status = sub_conf.get_config()
695 parsed_result = json.loads(result)
696 assert_equal(parsed_result['topic'], '')
697 # TODO should return 404
698 # assert_equal(status, 404)
11fdf7f2
TL
699
700 # cleanup
11fdf7f2 701 topic_conf.del_config()
eafe8130 702 # delete the bucket
9f95a23c 703 master_zone.delete_bucket(bucket_name)
11fdf7f2
TL
704
705
eafe8130
TL
706def test_ps_s3_notification_records():
707 """ test s3 records fetching """
9f95a23c 708 master_zone, ps_zone = init_env()
11fdf7f2 709 bucket_name = gen_bucket_name()
11fdf7f2 710 # create bucket on the first of the rados zones
9f95a23c 711 bucket = master_zone.create_bucket(bucket_name)
11fdf7f2 712 # wait for sync
9f95a23c 713 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
714 # create topic
715 topic_name = bucket_name + TOPIC_SUFFIX
9f95a23c 716 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130 717 result, status = topic_conf.set_config()
11fdf7f2 718 assert_equal(status/100, 2)
eafe8130
TL
719 parsed_result = json.loads(result)
720 topic_arn = parsed_result['arn']
721 # create s3 notification
722 notification_name = bucket_name + NOTIFICATION_SUFFIX
723 topic_conf_list = [{'Id': notification_name,
724 'TopicArn': topic_arn,
725 'Events': ['s3:ObjectCreated:*']
726 }]
9f95a23c 727 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
728 _, status = s3_notification_conf.set_config()
729 assert_equal(status/100, 2)
9f95a23c 730 zone_meta_checkpoint(ps_zone.zone)
eafe8130 731 # get auto-generated subscription
9f95a23c 732 sub_conf = PSSubscription(ps_zone.conn, notification_name,
11fdf7f2 733 topic_name)
eafe8130 734 _, status = sub_conf.get_config()
11fdf7f2 735 assert_equal(status/100, 2)
11fdf7f2
TL
736 # create objects in the bucket
737 number_of_objects = 10
738 for i in range(number_of_objects):
739 key = bucket.new_key(str(i))
740 key.set_contents_from_string('bar')
741 # wait for sync
9f95a23c 742 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
11fdf7f2 743
eafe8130 744 # get the events from the subscription
11fdf7f2 745 result, _ = sub_conf.get_events()
92f5a8d4
TL
746 records = json.loads(result)
747 for record in records['Records']:
eafe8130 748 log.debug(record)
11fdf7f2 749 keys = list(bucket.list())
eafe8130 750 # TODO: use exact match
92f5a8d4 751 verify_s3_records_by_elements(records, keys, exact_match=False)
11fdf7f2
TL
752
753 # cleanup
eafe8130 754 _, status = s3_notification_conf.del_config()
11fdf7f2 755 topic_conf.del_config()
eafe8130
TL
756 # delete the keys
757 for key in bucket.list():
758 key.delete()
9f95a23c 759 master_zone.delete_bucket(bucket_name)
11fdf7f2
TL
760
761
eafe8130
TL
762def test_ps_s3_notification():
763 """ test s3 notification set/get/delete """
9f95a23c 764 master_zone, ps_zone = init_env()
11fdf7f2 765 bucket_name = gen_bucket_name()
11fdf7f2 766 # create bucket on the first of the rados zones
9f95a23c 767 master_zone.create_bucket(bucket_name)
11fdf7f2 768 # wait for sync
9f95a23c 769 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
770 topic_name = bucket_name + TOPIC_SUFFIX
771 # create topic
772 topic_name = bucket_name + TOPIC_SUFFIX
9f95a23c 773 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130 774 response, status = topic_conf.set_config()
11fdf7f2 775 assert_equal(status/100, 2)
eafe8130
TL
776 parsed_result = json.loads(response)
777 topic_arn = parsed_result['arn']
778 # create one s3 notification
779 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
780 topic_conf_list = [{'Id': notification_name1,
781 'TopicArn': topic_arn,
782 'Events': ['s3:ObjectCreated:*']
783 }]
9f95a23c 784 s3_notification_conf1 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130 785 response, status = s3_notification_conf1.set_config()
11fdf7f2 786 assert_equal(status/100, 2)
eafe8130
TL
787 # create another s3 notification with the same topic
788 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
789 topic_conf_list = [{'Id': notification_name2,
790 'TopicArn': topic_arn,
791 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
792 }]
9f95a23c 793 s3_notification_conf2 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130 794 response, status = s3_notification_conf2.set_config()
11fdf7f2 795 assert_equal(status/100, 2)
9f95a23c 796 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
797
798 # get all notification on a bucket
799 response, status = s3_notification_conf1.get_config()
11fdf7f2 800 assert_equal(status/100, 2)
eafe8130
TL
801 assert_equal(len(response['TopicConfigurations']), 2)
802 assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
803 assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
804
805 # get specific notification on a bucket
806 response, status = s3_notification_conf1.get_config(notification=notification_name1)
11fdf7f2 807 assert_equal(status/100, 2)
eafe8130
TL
808 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
809 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1)
810 response, status = s3_notification_conf2.get_config(notification=notification_name2)
11fdf7f2 811 assert_equal(status/100, 2)
eafe8130
TL
812 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
813 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2)
814
815 # delete specific notifications
816 _, status = s3_notification_conf1.del_config(notification=notification_name1)
817 assert_equal(status/100, 2)
818 _, status = s3_notification_conf2.del_config(notification=notification_name2)
819 assert_equal(status/100, 2)
820
821 # cleanup
822 topic_conf.del_config()
823 # delete the bucket
9f95a23c
TL
824 master_zone.delete_bucket(bucket_name)
825
eafe8130
TL
826
827def test_ps_s3_topic_on_master():
9f95a23c
TL
828 """ test s3 topics set/get/delete on master """
829 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
830 realm = get_realm()
831 zonegroup = realm.master_zonegroup()
832 bucket_name = gen_bucket_name()
833 topic_name = bucket_name + TOPIC_SUFFIX
834
835 # clean all topics
9f95a23c 836 delete_all_s3_topics(master_zone, zonegroup.name)
eafe8130
TL
837
838 # create s3 topics
839 endpoint_address = 'amqp://127.0.0.1:7001'
840 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
9f95a23c 841 topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
842 topic_arn = topic_conf1.set_config()
843 assert_equal(topic_arn,
844 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1')
845
846 endpoint_address = 'http://127.0.0.1:9001'
847 endpoint_args = 'push-endpoint='+endpoint_address
9f95a23c 848 topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
849 topic_arn = topic_conf2.set_config()
850 assert_equal(topic_arn,
851 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2')
852 endpoint_address = 'http://127.0.0.1:9002'
853 endpoint_args = 'push-endpoint='+endpoint_address
9f95a23c 854 topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
855 topic_arn = topic_conf3.set_config()
856 assert_equal(topic_arn,
857 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
858
859 # get topic 3
860 result, status = topic_conf3.get_config()
861 assert_equal(status, 200)
862 assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
863 assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
864 # Note that endpoint args may be ordered differently in the result
865
866 # delete topic 1
867 result = topic_conf1.del_config()
868 assert_equal(status, 200)
869
870 # try to get a deleted topic
871 _, status = topic_conf1.get_config()
872 assert_equal(status, 404)
873
874 # get the remaining 2 topics
9f95a23c
TL
875 result, status = topic_conf1.get_list()
876 assert_equal(status, 200)
877 assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
eafe8130
TL
878
879 # delete topics
880 result = topic_conf2.del_config()
881 # TODO: should be 200OK
882 # assert_equal(status, 200)
883 result = topic_conf3.del_config()
884 # TODO: should be 200OK
885 # assert_equal(status, 200)
886
887 # get topic list, make sure it is empty
9f95a23c
TL
888 result, status = topic_conf1.get_list()
889 assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
890
891
892def test_ps_s3_topic_with_secret_on_master():
893 """ test s3 topics with secret set/get/delete on master """
894 master_zone, _ = init_env(require_ps=False)
895 if master_zone.secure_conn is None:
896 return SkipTest('secure connection is needed to test topic with secrets')
897
898 realm = get_realm()
899 zonegroup = realm.master_zonegroup()
900 bucket_name = gen_bucket_name()
901 topic_name = bucket_name + TOPIC_SUFFIX
902
903 # clean all topics
904 delete_all_s3_topics(master_zone, zonegroup.name)
905
906 # create s3 topics
907 endpoint_address = 'amqp://user:password@127.0.0.1:7001'
908 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
909 bad_topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
910 try:
911 result = bad_topic_conf.set_config()
912 except Exception as err:
913 print('Error is expected: ' + str(err))
914 else:
915 assert False, 'user password configuration set allowed only over HTTPS'
916
917 topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
918 topic_arn = topic_conf.set_config()
919
920 assert_equal(topic_arn,
921 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
922
923 _, status = bad_topic_conf.get_config()
924 assert_equal(status/100, 4)
925
926 # get topic
927 result, status = topic_conf.get_config()
928 assert_equal(status, 200)
929 assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
930 assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
931
932 _, status = bad_topic_conf.get_config()
933 assert_equal(status/100, 4)
934
935 _, status = topic_conf.get_list()
936 assert_equal(status/100, 2)
937
938 # delete topics
939 result = topic_conf.del_config()
eafe8130
TL
940
941
942def test_ps_s3_notification_on_master():
943 """ test s3 notification set/get/delete on master """
9f95a23c 944 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
945 realm = get_realm()
946 zonegroup = realm.master_zonegroup()
947 bucket_name = gen_bucket_name()
948 # create bucket
9f95a23c 949 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
950 topic_name = bucket_name + TOPIC_SUFFIX
951 # create s3 topic
952 endpoint_address = 'amqp://127.0.0.1:7001'
953 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
9f95a23c 954 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
955 topic_arn = topic_conf.set_config()
956 # create s3 notification
957 notification_name = bucket_name + NOTIFICATION_SUFFIX
958 topic_conf_list = [{'Id': notification_name+'_1',
959 'TopicArn': topic_arn,
960 'Events': ['s3:ObjectCreated:*']
961 },
962 {'Id': notification_name+'_2',
963 'TopicArn': topic_arn,
964 'Events': ['s3:ObjectRemoved:*']
965 },
966 {'Id': notification_name+'_3',
967 'TopicArn': topic_arn,
968 'Events': []
969 }]
9f95a23c 970 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
971 _, status = s3_notification_conf.set_config()
972 assert_equal(status/100, 2)
973
974 # get notifications on a bucket
975 response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
976 assert_equal(status/100, 2)
977 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
978
979 # delete specific notifications
980 _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
981 assert_equal(status/100, 2)
982
983 # get the remaining 2 notifications on a bucket
984 response, status = s3_notification_conf.get_config()
985 assert_equal(status/100, 2)
986 assert_equal(len(response['TopicConfigurations']), 2)
987 assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
988 assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
989
990 # delete remaining notifications
991 _, status = s3_notification_conf.del_config()
992 assert_equal(status/100, 2)
993
994 # make sure that the notifications are now deleted
995 _, status = s3_notification_conf.get_config()
996
997 # cleanup
998 topic_conf.del_config()
999 # delete the bucket
9f95a23c 1000 master_zone.delete_bucket(bucket_name)
eafe8130
TL
1001
1002
1003def ps_s3_notification_filter(on_master):
1004 """ test s3 notification filter on master """
1005 if skip_push_tests:
1006 return SkipTest("PubSub push tests don't run in teuthology")
1007 hostname = get_ip()
1008 proc = init_rabbitmq()
1009 if proc is None:
1010 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1011 if on_master:
9f95a23c
TL
1012 master_zone, _ = init_env(require_ps=False)
1013 ps_zone = master_zone
eafe8130 1014 else:
9f95a23c
TL
1015 master_zone, ps_zone = init_env(require_ps=True)
1016 ps_zone = ps_zone
eafe8130
TL
1017
1018 realm = get_realm()
1019 zonegroup = realm.master_zonegroup()
1020
1021 # create bucket
1022 bucket_name = gen_bucket_name()
9f95a23c 1023 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
1024 topic_name = bucket_name + TOPIC_SUFFIX
1025
1026 # start amqp receivers
1027 exchange = 'ex1'
1028 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
1029 task.start()
1030
1031 # create s3 topic
1032 endpoint_address = 'amqp://' + hostname
1033 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
1034 if on_master:
1035 topic_conf = PSTopicS3(ps_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1036 topic_arn = topic_conf.set_config()
1037 else:
1038 topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
1039 result, _ = topic_conf.set_config()
1040 parsed_result = json.loads(result)
1041 topic_arn = parsed_result['arn']
1042 zone_meta_checkpoint(ps_zone.zone)
1043
1044 # create s3 notification
1045 notification_name = bucket_name + NOTIFICATION_SUFFIX
1046 topic_conf_list = [{'Id': notification_name+'_1',
1047 'TopicArn': topic_arn,
1048 'Events': ['s3:ObjectCreated:*'],
1049 'Filter': {
1050 'Key': {
1051 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
1052 }
1053 }
1054 },
1055 {'Id': notification_name+'_2',
1056 'TopicArn': topic_arn,
1057 'Events': ['s3:ObjectCreated:*'],
1058 'Filter': {
1059 'Key': {
1060 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
1061 {'Name': 'suffix', 'Value': 'log'}]
1062 }
1063 }
1064 },
1065 {'Id': notification_name+'_3',
1066 'TopicArn': topic_arn,
1067 'Events': [],
1068 'Filter': {
1069 'Key': {
1070 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
1071 }
1072 }
1073 }]
1074
1075 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1076 result, status = s3_notification_conf.set_config()
1077 assert_equal(status/100, 2)
1078
1079 if on_master:
1080 topic_conf_list = [{'Id': notification_name+'_4',
1081 'TopicArn': topic_arn,
1082 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
1083 'Filter': {
1084 'Metadata': {
1085 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
1086 {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
1087 },
1088 'Key': {
1089 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
1090 }
1091 }
1092 }]
1093
1094 try:
1095 s3_notification_conf4 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1096 _, status = s3_notification_conf4.set_config()
1097 assert_equal(status/100, 2)
1098 skip_notif4 = False
1099 except Exception as error:
9f95a23c 1100 print('note: metadata filter is not supported by boto3 - skipping test')
eafe8130
TL
1101 skip_notif4 = True
1102 else:
9f95a23c 1103 print('filtering by attributes only supported on master zone')
eafe8130
TL
1104 skip_notif4 = True
1105
1106
1107 # get all notifications
1108 result, status = s3_notification_conf.get_config()
1109 assert_equal(status/100, 2)
1110 for conf in result['TopicConfigurations']:
1111 filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
1112 assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
1113
1114 if not skip_notif4:
1115 result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
1116 assert_equal(status/100, 2)
1117 filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
1118 assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
1119
1120 expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
1121 expected_in2 = ['world1.log', 'world2log', 'world3.log']
1122 expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt']
1123 expected_in4 = ['foo', 'bar', 'hello', 'world']
1124 filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
1125 filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld']
1126 # create objects in bucket
1127 for key_name in expected_in1:
1128 key = bucket.new_key(key_name)
1129 key.set_contents_from_string('bar')
1130 for key_name in expected_in2:
1131 key = bucket.new_key(key_name)
1132 key.set_contents_from_string('bar')
1133 for key_name in expected_in3:
1134 key = bucket.new_key(key_name)
1135 key.set_contents_from_string('bar')
1136 if not skip_notif4:
1137 for key_name in expected_in4:
1138 key = bucket.new_key(key_name)
1139 key.set_metadata('foo', 'bar')
1140 key.set_metadata('hello', 'world')
1141 key.set_metadata('goodbye', 'cruel world')
1142 key.set_contents_from_string('bar')
1143 for key_name in filtered:
1144 key = bucket.new_key(key_name)
1145 key.set_contents_from_string('bar')
1146 for key_name in filtered_with_attr:
1147 key.set_metadata('foo', 'nobar')
1148 key.set_metadata('hello', 'noworld')
1149 key.set_metadata('goodbye', 'cruel world')
1150 key = bucket.new_key(key_name)
1151 key.set_contents_from_string('bar')
1152
1153 if on_master:
9f95a23c 1154 print('wait for 5sec for the messages...')
eafe8130
TL
1155 time.sleep(5)
1156 else:
9f95a23c 1157 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
1158
1159 found_in1 = []
1160 found_in2 = []
1161 found_in3 = []
1162 found_in4 = []
1163
1164 for event in receiver.get_and_reset_events():
92f5a8d4
TL
1165 notif_id = event['Records'][0]['s3']['configurationId']
1166 key_name = event['Records'][0]['s3']['object']['key']
eafe8130
TL
1167 if notif_id == notification_name+'_1':
1168 found_in1.append(key_name)
1169 elif notif_id == notification_name+'_2':
1170 found_in2.append(key_name)
1171 elif notif_id == notification_name+'_3':
1172 found_in3.append(key_name)
1173 elif not skip_notif4 and notif_id == notification_name+'_4':
1174 found_in4.append(key_name)
1175 else:
1176 assert False, 'invalid notification: ' + notif_id
1177
1178 assert_equal(set(found_in1), set(expected_in1))
1179 assert_equal(set(found_in2), set(expected_in2))
1180 assert_equal(set(found_in3), set(expected_in3))
1181 if not skip_notif4:
1182 assert_equal(set(found_in4), set(expected_in4))
1183
1184 # cleanup
1185 s3_notification_conf.del_config()
1186 if not skip_notif4:
1187 s3_notification_conf4.del_config()
1188 topic_conf.del_config()
1189 # delete the bucket
1190 for key in bucket.list():
1191 key.delete()
9f95a23c 1192 master_zone.delete_bucket(bucket_name)
eafe8130
TL
1193 stop_amqp_receiver(receiver, task)
1194 clean_rabbitmq(proc)
1195
1196
1197def test_ps_s3_notification_filter_on_master():
1198 ps_s3_notification_filter(on_master=True)
1199
1200
1201def test_ps_s3_notification_filter():
1202 ps_s3_notification_filter(on_master=False)
1203
1204
1205def test_ps_s3_notification_errors_on_master():
1206 """ test s3 notification set/get/delete on master """
9f95a23c 1207 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
1208 realm = get_realm()
1209 zonegroup = realm.master_zonegroup()
1210 bucket_name = gen_bucket_name()
1211 # create bucket
9f95a23c 1212 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
1213 topic_name = bucket_name + TOPIC_SUFFIX
1214 # create s3 topic
1215 endpoint_address = 'amqp://127.0.0.1:7001'
1216 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
9f95a23c 1217 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
1218 topic_arn = topic_conf.set_config()
1219
1220 # create s3 notification with invalid event name
1221 notification_name = bucket_name + NOTIFICATION_SUFFIX
1222 topic_conf_list = [{'Id': notification_name,
1223 'TopicArn': topic_arn,
1224 'Events': ['s3:ObjectCreated:Kaboom']
1225 }]
9f95a23c 1226 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
1227 try:
1228 result, status = s3_notification_conf.set_config()
1229 except Exception as error:
9f95a23c 1230 print(str(error) + ' - is expected')
eafe8130
TL
1231 else:
1232 assert False, 'invalid event name is expected to fail'
1233
1234 # create s3 notification with missing name
1235 topic_conf_list = [{'Id': '',
1236 'TopicArn': topic_arn,
1237 'Events': ['s3:ObjectCreated:Put']
1238 }]
9f95a23c 1239 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
1240 try:
1241 _, _ = s3_notification_conf.set_config()
1242 except Exception as error:
9f95a23c 1243 print(str(error) + ' - is expected')
eafe8130
TL
1244 else:
1245 assert False, 'missing notification name is expected to fail'
1246
1247 # create s3 notification with invalid topic ARN
1248 invalid_topic_arn = 'kaboom'
1249 topic_conf_list = [{'Id': notification_name,
1250 'TopicArn': invalid_topic_arn,
1251 'Events': ['s3:ObjectCreated:Put']
1252 }]
9f95a23c 1253 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
1254 try:
1255 _, _ = s3_notification_conf.set_config()
1256 except Exception as error:
9f95a23c 1257 print(str(error) + ' - is expected')
eafe8130
TL
1258 else:
1259 assert False, 'invalid ARN is expected to fail'
1260
1261 # create s3 notification with unknown topic ARN
1262 invalid_topic_arn = 'arn:aws:sns:a::kaboom'
1263 topic_conf_list = [{'Id': notification_name,
1264 'TopicArn': invalid_topic_arn ,
1265 'Events': ['s3:ObjectCreated:Put']
1266 }]
9f95a23c 1267 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
1268 try:
1269 _, _ = s3_notification_conf.set_config()
1270 except Exception as error:
9f95a23c 1271 print(str(error) + ' - is expected')
eafe8130
TL
1272 else:
1273 assert False, 'unknown topic is expected to fail'
1274
1275 # create s3 notification with wrong bucket
1276 topic_conf_list = [{'Id': notification_name,
1277 'TopicArn': topic_arn,
1278 'Events': ['s3:ObjectCreated:Put']
1279 }]
9f95a23c 1280 s3_notification_conf = PSNotificationS3(master_zone.conn, 'kaboom', topic_conf_list)
eafe8130
TL
1281 try:
1282 _, _ = s3_notification_conf.set_config()
1283 except Exception as error:
9f95a23c 1284 print(str(error) + ' - is expected')
eafe8130
TL
1285 else:
1286 assert False, 'unknown bucket is expected to fail'
1287
1288 topic_conf.del_config()
1289
1290 status = topic_conf.del_config()
1291 # deleting an unknown notification is not considered an error
1292 assert_equal(status, 200)
1293
1294 _, status = topic_conf.get_config()
1295 assert_equal(status, 404)
1296
1297 # cleanup
1298 # delete the bucket
9f95a23c 1299 master_zone.delete_bucket(bucket_name)
eafe8130
TL
1300
1301
1302def test_objcet_timing():
1303 return SkipTest("only used in manual testing")
9f95a23c 1304 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
1305
1306 # create bucket
1307 bucket_name = gen_bucket_name()
9f95a23c 1308 bucket = master_zone.create_bucket(bucket_name)
eafe8130 1309 # create objects in the bucket (async)
9f95a23c 1310 print('creating objects...')
eafe8130
TL
1311 number_of_objects = 1000
1312 client_threads = []
1313 start_time = time.time()
1314 content = str(bytearray(os.urandom(1024*1024)))
1315 for i in range(number_of_objects):
1316 key = bucket.new_key(str(i))
1317 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1318 thr.start()
1319 client_threads.append(thr)
1320 [thr.join() for thr in client_threads]
1321
1322 time_diff = time.time() - start_time
9f95a23c 1323 print('average time for object creation: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
eafe8130 1324
9f95a23c 1325 print('total number of objects: ' + str(len(list(bucket.list()))))
eafe8130 1326
9f95a23c 1327 print('deleting objects...')
eafe8130
TL
1328 client_threads = []
1329 start_time = time.time()
1330 for key in bucket.list():
1331 thr = threading.Thread(target = key.delete, args=())
1332 thr.start()
1333 client_threads.append(thr)
1334 [thr.join() for thr in client_threads]
1335
1336 time_diff = time.time() - start_time
9f95a23c 1337 print('average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
eafe8130
TL
1338
1339 # cleanup
9f95a23c 1340 master_zone.delete_bucket(bucket_name)
eafe8130
TL
1341
1342
1343def test_ps_s3_notification_push_amqp_on_master():
1344 """ test pushing amqp s3 notification on master """
1345 if skip_push_tests:
1346 return SkipTest("PubSub push tests don't run in teuthology")
1347 hostname = get_ip()
1348 proc = init_rabbitmq()
1349 if proc is None:
1350 return SkipTest('end2end amqp tests require rabbitmq-server installed')
9f95a23c 1351 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
1352 realm = get_realm()
1353 zonegroup = realm.master_zonegroup()
1354
1355 # create bucket
1356 bucket_name = gen_bucket_name()
9f95a23c 1357 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
1358 topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
1359 topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
1360
1361 # start amqp receivers
1362 exchange = 'ex1'
1363 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
1364 task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
1365 task1.start()
1366 task2.start()
1367
1368 # create two s3 topic
1369 endpoint_address = 'amqp://' + hostname
1370 # with acks from broker
1371 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
9f95a23c 1372 topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
1373 topic_arn1 = topic_conf1.set_config()
1374 # without acks from broker
1375 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
9f95a23c 1376 topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
1377 topic_arn2 = topic_conf2.set_config()
1378 # create s3 notification
1379 notification_name = bucket_name + NOTIFICATION_SUFFIX
1380 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
1381 'Events': []
1382 },
1383 {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
1384 'Events': ['s3:ObjectCreated:*']
1385 }]
9f95a23c
TL
1386
1387 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1388 response, status = s3_notification_conf.set_config()
1389 assert_equal(status/100, 2)
1390
1391 # create objects in the bucket (async)
1392 number_of_objects = 100
1393 client_threads = []
1394 start_time = time.time()
1395 for i in range(number_of_objects):
1396 key = bucket.new_key(str(i))
1397 content = str(os.urandom(1024*1024))
1398 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1399 thr.start()
1400 client_threads.append(thr)
1401 [thr.join() for thr in client_threads]
1402
1403 time_diff = time.time() - start_time
1404 print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1405
1406 print('wait for 5sec for the messages...')
1407 time.sleep(5)
1408
1409 # check amqp receiver
1410 keys = list(bucket.list())
1411 print('total number of objects: ' + str(len(keys)))
1412 receiver1.verify_s3_events(keys, exact_match=True)
1413 receiver2.verify_s3_events(keys, exact_match=True)
1414
1415 # delete objects from the bucket
1416 client_threads = []
1417 start_time = time.time()
1418 for key in bucket.list():
1419 thr = threading.Thread(target = key.delete, args=())
1420 thr.start()
1421 client_threads.append(thr)
1422 [thr.join() for thr in client_threads]
1423
1424 time_diff = time.time() - start_time
1425 print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1426
1427 print('wait for 5sec for the messages...')
1428 time.sleep(5)
1429
1430 # check amqp receiver 1 for deletions
1431 receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
1432 # check amqp receiver 2 has no deletions
1433 try:
1434 receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
1435 except:
1436 pass
1437 else:
1438 err = 'amqp receiver 2 should have no deletions'
1439 assert False, err
1440
1441 # cleanup
1442 stop_amqp_receiver(receiver1, task1)
1443 stop_amqp_receiver(receiver2, task2)
1444 s3_notification_conf.del_config()
1445 topic_conf1.del_config()
1446 topic_conf2.del_config()
1447 # delete the bucket
1448 master_zone.delete_bucket(bucket_name)
1449 clean_rabbitmq(proc)
1450
1451
1452def test_ps_s3_notification_push_kafka():
1453 """ test pushing kafka s3 notification on master """
1454 if skip_push_tests:
1455 return SkipTest("PubSub push tests don't run in teuthology")
1456 kafka_proc, zk_proc, kafka_log = init_kafka()
1457 if kafka_proc is None or zk_proc is None:
1458 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1459
1460 master_zone, ps_zone = init_env()
1461 realm = get_realm()
1462 zonegroup = realm.master_zonegroup()
1463
1464 # create bucket
1465 bucket_name = gen_bucket_name()
1466 bucket = master_zone.create_bucket(bucket_name)
1467 # wait for sync
1468 zone_meta_checkpoint(ps_zone.zone)
1469 # name is constant for manual testing
1470 topic_name = bucket_name+'_topic'
1471 # create consumer on the topic
1472 task, receiver = create_kafka_receiver_thread(topic_name)
1473 task.start()
1474
1475 # create topic
1476 topic_conf = PSTopic(ps_zone.conn, topic_name,
1477 endpoint='kafka://' + kafka_server,
1478 endpoint_args='kafka-ack-level=broker')
1479 result, status = topic_conf.set_config()
1480 assert_equal(status/100, 2)
1481 parsed_result = json.loads(result)
1482 topic_arn = parsed_result['arn']
1483 # create s3 notification
1484 notification_name = bucket_name + NOTIFICATION_SUFFIX
1485 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
1486 'Events': []
1487 }]
1488
1489 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1490 response, status = s3_notification_conf.set_config()
1491 assert_equal(status/100, 2)
1492
1493 # create objects in the bucket (async)
1494 number_of_objects = 10
1495 client_threads = []
1496 for i in range(number_of_objects):
1497 key = bucket.new_key(str(i))
1498 content = str(os.urandom(1024*1024))
1499 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1500 thr.start()
1501 client_threads.append(thr)
1502 [thr.join() for thr in client_threads]
1503
1504 # wait for sync
1505 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1506 keys = list(bucket.list())
1507 receiver.verify_s3_events(keys, exact_match=True)
1508
1509 # delete objects from the bucket
1510 client_threads = []
1511 for key in bucket.list():
1512 thr = threading.Thread(target = key.delete, args=())
1513 thr.start()
1514 client_threads.append(thr)
1515 [thr.join() for thr in client_threads]
1516
1517 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1518 receiver.verify_s3_events(keys, exact_match=True, deletions=True)
1519
1520 # cleanup
1521 s3_notification_conf.del_config()
1522 topic_conf.del_config()
1523 # delete the bucket
1524 master_zone.delete_bucket(bucket_name)
1525 stop_kafka_receiver(receiver, task)
1526 clean_kafka(kafka_proc, zk_proc, kafka_log)
1527
1528
1529def test_ps_s3_notification_push_kafka_on_master():
1530 """ test pushing kafka s3 notification on master """
1531 if skip_push_tests:
1532 return SkipTest("PubSub push tests don't run in teuthology")
1533 kafka_proc, zk_proc, kafka_log = init_kafka()
1534 if kafka_proc is None or zk_proc is None:
1535 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1536 master_zone, _ = init_env(require_ps=False)
1537 realm = get_realm()
1538 zonegroup = realm.master_zonegroup()
1539
1540 # create bucket
1541 bucket_name = gen_bucket_name()
1542 bucket = master_zone.create_bucket(bucket_name)
1543 # name is constant for manual testing
1544 topic_name = bucket_name+'_topic'
1545 # create consumer on the topic
1546 task, receiver = create_kafka_receiver_thread(topic_name+'_1')
1547 task.start()
1548
1549 # create s3 topic
1550 endpoint_address = 'kafka://' + kafka_server
1551 # without acks from broker
1552 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
1553 topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
1554 topic_arn1 = topic_conf1.set_config()
1555 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
1556 topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
1557 topic_arn2 = topic_conf2.set_config()
1558 # create s3 notification
1559 notification_name = bucket_name + NOTIFICATION_SUFFIX
1560 topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
1561 'Events': []
1562 },
1563 {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
1564 'Events': []
1565 }]
1566
1567 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1568 response, status = s3_notification_conf.set_config()
1569 assert_equal(status/100, 2)
1570
1571 # create objects in the bucket (async)
1572 number_of_objects = 10
1573 client_threads = []
1574 start_time = time.time()
1575 for i in range(number_of_objects):
1576 key = bucket.new_key(str(i))
1577 content = str(os.urandom(1024*1024))
1578 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1579 thr.start()
1580 client_threads.append(thr)
1581 [thr.join() for thr in client_threads]
1582
1583 time_diff = time.time() - start_time
1584 print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1585
1586 print('wait for 5sec for the messages...')
1587 time.sleep(5)
1588 keys = list(bucket.list())
1589 receiver.verify_s3_events(keys, exact_match=True)
1590
1591 # delete objects from the bucket
1592 client_threads = []
1593 start_time = time.time()
1594 for key in bucket.list():
1595 thr = threading.Thread(target = key.delete, args=())
1596 thr.start()
1597 client_threads.append(thr)
1598 [thr.join() for thr in client_threads]
1599
1600 time_diff = time.time() - start_time
1601 print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1602
1603 print('wait for 5sec for the messages...')
1604 time.sleep(5)
1605 receiver.verify_s3_events(keys, exact_match=True, deletions=True)
1606
1607 # cleanup
1608 s3_notification_conf.del_config()
1609 topic_conf1.del_config()
1610 topic_conf2.del_config()
1611 # delete the bucket
1612 master_zone.delete_bucket(bucket_name)
1613 stop_kafka_receiver(receiver, task)
1614 clean_kafka(kafka_proc, zk_proc, kafka_log)
1615
1616
1617def kafka_security(security_type):
1618 """ test pushing kafka s3 notification on master """
1619 if skip_push_tests:
1620 return SkipTest("PubSub push tests don't run in teuthology")
1621 master_zone, _ = init_env(require_ps=False)
1622 if security_type == 'SSL_SASL' and master_zone.secure_conn is None:
1623 return SkipTest("secure connection is needed to test SASL_SSL security")
1624 kafka_proc, zk_proc, kafka_log = init_kafka()
1625 if kafka_proc is None or zk_proc is None:
1626 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1627 realm = get_realm()
1628 zonegroup = realm.master_zonegroup()
1629
1630 # create bucket
1631 bucket_name = gen_bucket_name()
1632 bucket = master_zone.create_bucket(bucket_name)
1633 # name is constant for manual testing
1634 topic_name = bucket_name+'_topic'
1635 # create consumer on the topic
1636 task, receiver = create_kafka_receiver_thread(topic_name)
1637 task.start()
1638
1639 # create s3 topic
1640 if security_type == 'SSL_SASL':
1641 endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
1642 else:
1643 # ssl only
1644 endpoint_address = 'kafka://' + kafka_server + ':9093'
1645
1646 KAFKA_DIR = os.environ['KAFKA_DIR']
1647
1648 # without acks from broker, with root CA
1649 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt'
1650
1651 if security_type == 'SSL_SASL':
1652 topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1653 else:
1654 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1655
1656 topic_arn = topic_conf.set_config()
1657 # create s3 notification
1658 notification_name = bucket_name + NOTIFICATION_SUFFIX
1659 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
1660 'Events': []
1661 }]
1662
1663 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1664 s3_notification_conf.set_config()
1665
1666 # create objects in the bucket (async)
1667 number_of_objects = 10
1668 client_threads = []
1669 start_time = time.time()
1670 for i in range(number_of_objects):
1671 key = bucket.new_key(str(i))
1672 content = str(os.urandom(1024*1024))
1673 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1674 thr.start()
1675 client_threads.append(thr)
1676 [thr.join() for thr in client_threads]
1677
1678 time_diff = time.time() - start_time
1679 print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1680
1681 try:
1682 print('wait for 5sec for the messages...')
1683 time.sleep(5)
1684 keys = list(bucket.list())
1685 receiver.verify_s3_events(keys, exact_match=True)
1686
1687 # delete objects from the bucket
1688 client_threads = []
1689 start_time = time.time()
1690 for key in bucket.list():
1691 thr = threading.Thread(target = key.delete, args=())
1692 thr.start()
1693 client_threads.append(thr)
1694 [thr.join() for thr in client_threads]
1695
1696 time_diff = time.time() - start_time
1697 print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1698
1699 print('wait for 5sec for the messages...')
1700 time.sleep(5)
1701 receiver.verify_s3_events(keys, exact_match=True, deletions=True)
1702 except Exception as err:
1703 assert False, str(err)
1704 finally:
1705 # cleanup
1706 s3_notification_conf.del_config()
1707 topic_conf.del_config()
1708 # delete the bucket
1709 for key in bucket.list():
1710 key.delete()
1711 master_zone.delete_bucket(bucket_name)
1712 stop_kafka_receiver(receiver, task)
1713 clean_kafka(kafka_proc, zk_proc, kafka_log)
1714
1715
1716def test_ps_s3_notification_push_kafka_security_ssl():
1717 kafka_security('SSL')
1718
1719def test_ps_s3_notification_push_kafka_security_ssl_sasl():
1720 kafka_security('SSL_SASL')
1721
1722
1723def test_ps_s3_notification_multi_delete_on_master():
1724 """ test deletion of multiple keys on master """
1725 if skip_push_tests:
1726 return SkipTest("PubSub push tests don't run in teuthology")
1727 hostname = get_ip()
1728 zones, _ = init_env(require_ps=False)
1729 realm = get_realm()
1730 zonegroup = realm.master_zonegroup()
1731
1732 # create random port for the http server
1733 host = get_ip()
1734 port = random.randint(10000, 20000)
1735 # start an http server in a separate thread
1736 number_of_objects = 10
1737 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1738
1739 # create bucket
1740 bucket_name = gen_bucket_name()
1741 bucket = zones[0].create_bucket(bucket_name)
1742 topic_name = bucket_name + TOPIC_SUFFIX
1743
1744 # create s3 topic
1745 endpoint_address = 'http://'+host+':'+str(port)
1746 endpoint_args = 'push-endpoint='+endpoint_address
1747 topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1748 topic_arn = topic_conf.set_config()
1749 # create s3 notification
1750 notification_name = bucket_name + NOTIFICATION_SUFFIX
1751 topic_conf_list = [{'Id': notification_name,
1752 'TopicArn': topic_arn,
1753 'Events': ['s3:ObjectRemoved:*']
1754 }]
1755 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1756 response, status = s3_notification_conf.set_config()
1757 assert_equal(status/100, 2)
1758
1759 # create objects in the bucket
1760 client_threads = []
1761 for i in range(number_of_objects):
1762 obj_size = randint(1, 1024)
1763 content = str(os.urandom(obj_size))
1764 key = bucket.new_key(str(i))
1765 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1766 thr.start()
1767 client_threads.append(thr)
1768 [thr.join() for thr in client_threads]
1769
1770 keys = list(bucket.list())
1771
1772 start_time = time.time()
1773 delete_all_objects(zones[0].conn, bucket_name)
1774 time_diff = time.time() - start_time
1775 print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1776
1777 print('wait for 5sec for the messages...')
1778 time.sleep(5)
1779
1780 # check http receiver
1781 http_server.verify_s3_events(keys, exact_match=True, deletions=True)
1782
1783 # cleanup
1784 topic_conf.del_config()
1785 s3_notification_conf.del_config(notification=notification_name)
1786 # delete the bucket
1787 zones[0].delete_bucket(bucket_name)
1788 http_server.close()
1789
1790
1791def test_ps_s3_notification_push_http_on_master():
1792 """ test pushing http s3 notification on master """
1793 if skip_push_tests:
1794 return SkipTest("PubSub push tests don't run in teuthology")
1795 hostname = get_ip()
1796 master_zone, _ = init_env(require_ps=False)
1797 realm = get_realm()
1798 zonegroup = realm.master_zonegroup()
1799
1800 # create random port for the http server
1801 host = get_ip()
1802 port = random.randint(10000, 20000)
1803 # start an http server in a separate thread
1804 number_of_objects = 10
1805 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1806
1807 # create bucket
1808 bucket_name = gen_bucket_name()
1809 bucket = master_zone.create_bucket(bucket_name)
1810 topic_name = bucket_name + TOPIC_SUFFIX
1811
1812 # create s3 topic
1813 endpoint_address = 'http://'+host+':'+str(port)
1814 endpoint_args = 'push-endpoint='+endpoint_address
1815 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1816 topic_arn = topic_conf.set_config()
1817 # create s3 notification
1818 notification_name = bucket_name + NOTIFICATION_SUFFIX
1819 topic_conf_list = [{'Id': notification_name,
1820 'TopicArn': topic_arn,
1821 'Events': []
1822 }]
1823 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1824 response, status = s3_notification_conf.set_config()
1825 assert_equal(status/100, 2)
1826
1827 # create objects in the bucket
1828 client_threads = []
1829 start_time = time.time()
1830 content = 'bar'
1831 for i in range(number_of_objects):
1832 key = bucket.new_key(str(i))
1833 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1834 thr.start()
1835 client_threads.append(thr)
1836 [thr.join() for thr in client_threads]
1837
1838 time_diff = time.time() - start_time
1839 print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1840
1841 print('wait for 5sec for the messages...')
1842 time.sleep(5)
1843
1844 # check http receiver
1845 keys = list(bucket.list())
1846 print('total number of objects: ' + str(len(keys)))
1847 http_server.verify_s3_events(keys, exact_match=True)
1848
1849 # delete objects from the bucket
1850 client_threads = []
1851 start_time = time.time()
1852 for key in bucket.list():
1853 thr = threading.Thread(target = key.delete, args=())
1854 thr.start()
1855 client_threads.append(thr)
1856 [thr.join() for thr in client_threads]
1857
1858 time_diff = time.time() - start_time
1859 print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1860
1861 print('wait for 5sec for the messages...')
1862 time.sleep(5)
1863
1864 # check http receiver
1865 http_server.verify_s3_events(keys, exact_match=True, deletions=True)
1866
1867 # cleanup
1868 topic_conf.del_config()
1869 s3_notification_conf.del_config(notification=notification_name)
1870 # delete the bucket
1871 master_zone.delete_bucket(bucket_name)
1872 http_server.close()
1873
1874
1875def test_ps_s3_opaque_data():
1876 """ test that opaque id set in topic, is sent in notification """
1877 if skip_push_tests:
1878 return SkipTest("PubSub push tests don't run in teuthology")
1879 hostname = get_ip()
1880 master_zone, ps_zone = init_env()
1881 realm = get_realm()
1882 zonegroup = realm.master_zonegroup()
1883
1884 # create random port for the http server
1885 host = get_ip()
1886 port = random.randint(10000, 20000)
1887 # start an http server in a separate thread
1888 number_of_objects = 10
1889 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1890
1891 # create bucket
1892 bucket_name = gen_bucket_name()
1893 bucket = master_zone.create_bucket(bucket_name)
1894 topic_name = bucket_name + TOPIC_SUFFIX
1895 # wait for sync
1896 zone_meta_checkpoint(ps_zone.zone)
1897
1898 # create s3 topic
1899 endpoint_address = 'http://'+host+':'+str(port)
1900 opaque_data = 'http://1.2.3.4:8888'
1901 endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
1902 topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
1903 result, status = topic_conf.set_config()
1904 assert_equal(status/100, 2)
1905 parsed_result = json.loads(result)
1906 topic_arn = parsed_result['arn']
1907 # create s3 notification
1908 notification_name = bucket_name + NOTIFICATION_SUFFIX
1909 topic_conf_list = [{'Id': notification_name,
1910 'TopicArn': topic_arn,
1911 'Events': []
1912 }]
1913 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
1914 response, status = s3_notification_conf.set_config()
1915 assert_equal(status/100, 2)
1916
9f95a23c 1917 # create objects in the bucket
eafe8130 1918 client_threads = []
9f95a23c 1919 content = 'bar'
eafe8130
TL
1920 for i in range(number_of_objects):
1921 key = bucket.new_key(str(i))
eafe8130
TL
1922 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1923 thr.start()
1924 client_threads.append(thr)
1925 [thr.join() for thr in client_threads]
1926
9f95a23c
TL
1927 # wait for sync
1928 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130 1929
9f95a23c 1930 # check http receiver
eafe8130 1931 keys = list(bucket.list())
9f95a23c
TL
1932 print('total number of objects: ' + str(len(keys)))
1933 events = http_server.get_and_reset_events()
1934 for event in events:
1935 assert_equal(event['Records'][0]['opaqueData'], opaque_data)
eafe8130 1936
eafe8130 1937 # cleanup
9f95a23c
TL
1938 for key in keys:
1939 key.delete()
1940 [thr.join() for thr in client_threads]
1941 topic_conf.del_config()
1942 s3_notification_conf.del_config(notification=notification_name)
eafe8130 1943 # delete the bucket
9f95a23c
TL
1944 master_zone.delete_bucket(bucket_name)
1945 http_server.close()
eafe8130
TL
1946
1947
9f95a23c
TL
1948def test_ps_s3_opaque_data_on_master():
1949 """ test that opaque id set in topic, is sent in notification on master """
eafe8130
TL
1950 if skip_push_tests:
1951 return SkipTest("PubSub push tests don't run in teuthology")
1952 hostname = get_ip()
9f95a23c 1953 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
1954 realm = get_realm()
1955 zonegroup = realm.master_zonegroup()
1956
1957 # create random port for the http server
1958 host = get_ip()
1959 port = random.randint(10000, 20000)
1960 # start an http server in a separate thread
9f95a23c 1961 number_of_objects = 10
eafe8130
TL
1962 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1963
1964 # create bucket
1965 bucket_name = gen_bucket_name()
9f95a23c 1966 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
1967 topic_name = bucket_name + TOPIC_SUFFIX
1968
1969 # create s3 topic
1970 endpoint_address = 'http://'+host+':'+str(port)
1971 endpoint_args = 'push-endpoint='+endpoint_address
9f95a23c
TL
1972 opaque_data = 'http://1.2.3.4:8888'
1973 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
eafe8130
TL
1974 topic_arn = topic_conf.set_config()
1975 # create s3 notification
1976 notification_name = bucket_name + NOTIFICATION_SUFFIX
1977 topic_conf_list = [{'Id': notification_name,
1978 'TopicArn': topic_arn,
1979 'Events': []
1980 }]
9f95a23c 1981 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
1982 response, status = s3_notification_conf.set_config()
1983 assert_equal(status/100, 2)
1984
1985 # create objects in the bucket
1986 client_threads = []
1987 start_time = time.time()
1988 content = 'bar'
1989 for i in range(number_of_objects):
1990 key = bucket.new_key(str(i))
1991 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1992 thr.start()
1993 client_threads.append(thr)
1994 [thr.join() for thr in client_threads]
1995
1996 time_diff = time.time() - start_time
9f95a23c 1997 print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
eafe8130 1998
9f95a23c 1999 print('wait for 5sec for the messages...')
eafe8130
TL
2000 time.sleep(5)
2001
2002 # check http receiver
2003 keys = list(bucket.list())
9f95a23c
TL
2004 print('total number of objects: ' + str(len(keys)))
2005 events = http_server.get_and_reset_events()
2006 for event in events:
2007 assert_equal(event['Records'][0]['opaqueData'], opaque_data)
eafe8130
TL
2008
2009 # cleanup
9f95a23c
TL
2010 for key in keys:
2011 key.delete()
2012 [thr.join() for thr in client_threads]
eafe8130
TL
2013 topic_conf.del_config()
2014 s3_notification_conf.del_config(notification=notification_name)
2015 # delete the bucket
9f95a23c 2016 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2017 http_server.close()
2018
eafe8130
TL
2019def test_ps_topic():
2020 """ test set/get/delete of topic """
9f95a23c 2021 _, ps_zone = init_env()
eafe8130
TL
2022 realm = get_realm()
2023 zonegroup = realm.master_zonegroup()
2024 bucket_name = gen_bucket_name()
2025 topic_name = bucket_name+TOPIC_SUFFIX
2026
2027 # create topic
9f95a23c 2028 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
2029 _, status = topic_conf.set_config()
2030 assert_equal(status/100, 2)
2031 # get topic
2032 result, _ = topic_conf.get_config()
2033 # verify topic content
2034 parsed_result = json.loads(result)
2035 assert_equal(parsed_result['topic']['name'], topic_name)
2036 assert_equal(len(parsed_result['subs']), 0)
2037 assert_equal(parsed_result['topic']['arn'],
2038 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
2039 # delete topic
2040 _, status = topic_conf.del_config()
2041 assert_equal(status/100, 2)
2042 # verift topic is deleted
2043 result, status = topic_conf.get_config()
2044 assert_equal(status, 404)
2045 parsed_result = json.loads(result)
2046 assert_equal(parsed_result['Code'], 'NoSuchKey')
2047
2048
2049def test_ps_topic_with_endpoint():
2050 """ test set topic with endpoint"""
9f95a23c 2051 _, ps_zone = init_env()
eafe8130
TL
2052 bucket_name = gen_bucket_name()
2053 topic_name = bucket_name+TOPIC_SUFFIX
2054
2055 # create topic
2056 dest_endpoint = 'amqp://localhost:7001'
2057 dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
9f95a23c 2058 topic_conf = PSTopic(ps_zone.conn, topic_name,
eafe8130
TL
2059 endpoint=dest_endpoint,
2060 endpoint_args=dest_args)
2061 _, status = topic_conf.set_config()
2062 assert_equal(status/100, 2)
2063 # get topic
2064 result, _ = topic_conf.get_config()
2065 # verify topic content
2066 parsed_result = json.loads(result)
2067 assert_equal(parsed_result['topic']['name'], topic_name)
2068 assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint)
2069 # cleanup
2070 topic_conf.del_config()
2071
2072
2073def test_ps_notification():
2074 """ test set/get/delete of notification """
9f95a23c 2075 master_zone, ps_zone = init_env()
eafe8130
TL
2076 bucket_name = gen_bucket_name()
2077 topic_name = bucket_name+TOPIC_SUFFIX
2078
2079 # create topic
9f95a23c 2080 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
2081 topic_conf.set_config()
2082 # create bucket on the first of the rados zones
9f95a23c 2083 master_zone.create_bucket(bucket_name)
eafe8130 2084 # wait for sync
9f95a23c 2085 zone_meta_checkpoint(ps_zone.zone)
eafe8130 2086 # create notifications
9f95a23c 2087 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2088 topic_name)
2089 _, status = notification_conf.set_config()
2090 assert_equal(status/100, 2)
2091 # get notification
2092 result, _ = notification_conf.get_config()
2093 parsed_result = json.loads(result)
2094 assert_equal(len(parsed_result['topics']), 1)
2095 assert_equal(parsed_result['topics'][0]['topic']['name'],
2096 topic_name)
2097 # delete notification
2098 _, status = notification_conf.del_config()
2099 assert_equal(status/100, 2)
2100 result, status = notification_conf.get_config()
2101 parsed_result = json.loads(result)
2102 assert_equal(len(parsed_result['topics']), 0)
2103 # TODO should return 404
2104 # assert_equal(status, 404)
2105
2106 # cleanup
2107 topic_conf.del_config()
9f95a23c 2108 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2109
2110
2111def test_ps_notification_events():
2112 """ test set/get/delete of notification on specific events"""
9f95a23c 2113 master_zone, ps_zone = init_env()
eafe8130
TL
2114 bucket_name = gen_bucket_name()
2115 topic_name = bucket_name+TOPIC_SUFFIX
2116
2117 # create topic
9f95a23c 2118 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
2119 topic_conf.set_config()
2120 # create bucket on the first of the rados zones
9f95a23c 2121 master_zone.create_bucket(bucket_name)
eafe8130 2122 # wait for sync
9f95a23c 2123 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
2124 # create notifications
2125 events = "OBJECT_CREATE,OBJECT_DELETE"
9f95a23c 2126 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2127 topic_name,
2128 events)
2129 _, status = notification_conf.set_config()
2130 assert_equal(status/100, 2)
2131 # get notification
2132 result, _ = notification_conf.get_config()
2133 parsed_result = json.loads(result)
2134 assert_equal(len(parsed_result['topics']), 1)
2135 assert_equal(parsed_result['topics'][0]['topic']['name'],
2136 topic_name)
2137 assert_not_equal(len(parsed_result['topics'][0]['events']), 0)
2138 # TODO add test for invalid event name
2139
2140 # cleanup
2141 notification_conf.del_config()
2142 topic_conf.del_config()
9f95a23c 2143 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2144
2145
2146def test_ps_subscription():
2147 """ test set/get/delete of subscription """
9f95a23c 2148 master_zone, ps_zone = init_env()
eafe8130
TL
2149 bucket_name = gen_bucket_name()
2150 topic_name = bucket_name+TOPIC_SUFFIX
2151
2152 # create topic
9f95a23c 2153 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
2154 topic_conf.set_config()
2155 # create bucket on the first of the rados zones
9f95a23c 2156 bucket = master_zone.create_bucket(bucket_name)
eafe8130 2157 # wait for sync
9f95a23c 2158 zone_meta_checkpoint(ps_zone.zone)
eafe8130 2159 # create notifications
9f95a23c 2160 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2161 topic_name)
2162 _, status = notification_conf.set_config()
2163 assert_equal(status/100, 2)
2164 # create subscription
9f95a23c 2165 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
eafe8130
TL
2166 topic_name)
2167 _, status = sub_conf.set_config()
2168 assert_equal(status/100, 2)
2169 # get the subscription
2170 result, _ = sub_conf.get_config()
2171 parsed_result = json.loads(result)
2172 assert_equal(parsed_result['topic'], topic_name)
2173 # create objects in the bucket
2174 number_of_objects = 10
2175 for i in range(number_of_objects):
2176 key = bucket.new_key(str(i))
2177 key.set_contents_from_string('bar')
2178 # wait for sync
9f95a23c 2179 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
2180
2181 # get the create events from the subscription
2182 result, _ = sub_conf.get_events()
92f5a8d4
TL
2183 events = json.loads(result)
2184 for event in events['events']:
eafe8130
TL
2185 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2186 keys = list(bucket.list())
2187 # TODO: use exact match
92f5a8d4 2188 verify_events_by_elements(events, keys, exact_match=False)
eafe8130
TL
2189 # delete objects from the bucket
2190 for key in bucket.list():
2191 key.delete()
2192 # wait for sync
9f95a23c 2193 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
2194
2195 # get the delete events from the subscriptions
9f95a23c
TL
2196 #result, _ = sub_conf.get_events()
2197 #for event in events['events']:
2198 # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
eafe8130
TL
2199 # TODO: check deletions
2200 # TODO: use exact match
92f5a8d4 2201 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
eafe8130
TL
2202 # we should see the creations as well as the deletions
2203 # delete subscription
2204 _, status = sub_conf.del_config()
2205 assert_equal(status/100, 2)
2206 result, status = sub_conf.get_config()
2207 parsed_result = json.loads(result)
2208 assert_equal(parsed_result['topic'], '')
2209 # TODO should return 404
2210 # assert_equal(status, 404)
2211
2212 # cleanup
2213 notification_conf.del_config()
2214 topic_conf.del_config()
9f95a23c 2215 master_zone.delete_bucket(bucket_name)
eafe8130 2216
9f95a23c
TL
2217def test_ps_incremental_sync():
2218 """ test that events are only sent on incremental sync """
2219 master_zone, ps_zone = init_env()
2220 bucket_name = gen_bucket_name()
2221 topic_name = bucket_name+TOPIC_SUFFIX
2222
2223 # create topic
2224 topic_conf = PSTopic(ps_zone.conn, topic_name)
2225 topic_conf.set_config()
2226 # create bucket on the first of the rados zones
2227 bucket = master_zone.create_bucket(bucket_name)
2228 # create objects in the bucket
2229 number_of_objects = 10
2230 for i in range(0, number_of_objects):
2231 key = bucket.new_key(str(i))
2232 key.set_contents_from_string('foo')
2233 # wait for sync
2234 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2235 # create notifications
2236 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2237 topic_name)
2238 _, status = notification_conf.set_config()
2239 assert_equal(status/100, 2)
2240 # create subscription
2241 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
2242 topic_name)
2243 _, status = sub_conf.set_config()
2244 assert_equal(status/100, 2)
2245
2246 # create more objects in the bucket
2247 for i in range(number_of_objects, 2*number_of_objects):
2248 key = bucket.new_key(str(i))
2249 key.set_contents_from_string('bar')
2250 # wait for sync
2251 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2252
2253 # get the create events from the subscription
2254 result, _ = sub_conf.get_events()
2255 events = json.loads(result)
2256 count = 0
2257 for event in events['events']:
2258 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2259 count += 1
2260
2261 # make sure we have 10 and not 20 events
2262 assert_equal(count, number_of_objects)
2263
2264 # cleanup
2265 for key in bucket.list():
2266 key.delete()
2267 sub_conf.del_config()
2268 notification_conf.del_config()
2269 topic_conf.del_config()
2270 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2271
2272def test_ps_event_type_subscription():
2273 """ test subscriptions for different events """
9f95a23c 2274 master_zone, ps_zone = init_env()
eafe8130
TL
2275 bucket_name = gen_bucket_name()
2276
2277 # create topic for objects creation
2278 topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
9f95a23c 2279 topic_create_conf = PSTopic(ps_zone.conn, topic_create_name)
eafe8130
TL
2280 topic_create_conf.set_config()
2281 # create topic for objects deletion
2282 topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete'
9f95a23c 2283 topic_delete_conf = PSTopic(ps_zone.conn, topic_delete_name)
eafe8130
TL
2284 topic_delete_conf.set_config()
2285 # create topic for all events
2286 topic_name = bucket_name+TOPIC_SUFFIX+'_all'
9f95a23c 2287 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
2288 topic_conf.set_config()
2289 # create bucket on the first of the rados zones
9f95a23c 2290 bucket = master_zone.create_bucket(bucket_name)
eafe8130 2291 # wait for sync
9f95a23c
TL
2292 zone_meta_checkpoint(ps_zone.zone)
2293 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130 2294 # create notifications for objects creation
9f95a23c 2295 notification_create_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2296 topic_create_name, "OBJECT_CREATE")
2297 _, status = notification_create_conf.set_config()
2298 assert_equal(status/100, 2)
2299 # create notifications for objects deletion
9f95a23c 2300 notification_delete_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2301 topic_delete_name, "OBJECT_DELETE")
2302 _, status = notification_delete_conf.set_config()
2303 assert_equal(status/100, 2)
2304 # create notifications for all events
9f95a23c 2305 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2306 topic_name, "OBJECT_DELETE,OBJECT_CREATE")
2307 _, status = notification_conf.set_config()
2308 assert_equal(status/100, 2)
2309 # create subscription for objects creation
9f95a23c 2310 sub_create_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_create',
eafe8130
TL
2311 topic_create_name)
2312 _, status = sub_create_conf.set_config()
2313 assert_equal(status/100, 2)
2314 # create subscription for objects deletion
9f95a23c 2315 sub_delete_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_delete',
eafe8130
TL
2316 topic_delete_name)
2317 _, status = sub_delete_conf.set_config()
2318 assert_equal(status/100, 2)
2319 # create subscription for all events
9f95a23c 2320 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_all',
eafe8130
TL
2321 topic_name)
2322 _, status = sub_conf.set_config()
2323 assert_equal(status/100, 2)
2324 # create objects in the bucket
2325 number_of_objects = 10
2326 for i in range(number_of_objects):
2327 key = bucket.new_key(str(i))
2328 key.set_contents_from_string('bar')
2329 # wait for sync
9f95a23c 2330 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
11fdf7f2
TL
2331
2332 # get the events from the creation subscription
2333 result, _ = sub_create_conf.get_events()
92f5a8d4
TL
2334 events = json.loads(result)
2335 for event in events['events']:
eafe8130
TL
2336 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
2337 '" type: "' + str(event['event']) + '"')
2338 keys = list(bucket.list())
2339 # TODO: use exact match
92f5a8d4 2340 verify_events_by_elements(events, keys, exact_match=False)
eafe8130
TL
2341 # get the events from the deletions subscription
2342 result, _ = sub_delete_conf.get_events()
92f5a8d4
TL
2343 events = json.loads(result)
2344 for event in events['events']:
eafe8130
TL
2345 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
2346 '" type: "' + str(event['event']) + '"')
92f5a8d4 2347 assert_equal(len(events['events']), 0)
eafe8130
TL
2348 # get the events from the all events subscription
2349 result, _ = sub_conf.get_events()
92f5a8d4
TL
2350 events = json.loads(result)
2351 for event in events['events']:
eafe8130
TL
2352 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
2353 str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2354 # TODO: use exact match
92f5a8d4 2355 verify_events_by_elements(events, keys, exact_match=False)
eafe8130
TL
2356 # delete objects from the bucket
2357 for key in bucket.list():
2358 key.delete()
2359 # wait for sync
9f95a23c 2360 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
2361 log.debug("Event (OBJECT_DELETE) synced")
2362
2363 # get the events from the creations subscription
2364 result, _ = sub_create_conf.get_events()
92f5a8d4
TL
2365 events = json.loads(result)
2366 for event in events['events']:
eafe8130
TL
2367 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
2368 '" type: "' + str(event['event']) + '"')
2369 # deletions should not change the creation events
2370 # TODO: use exact match
92f5a8d4 2371 verify_events_by_elements(events, keys, exact_match=False)
eafe8130
TL
2372 # get the events from the deletions subscription
2373 result, _ = sub_delete_conf.get_events()
92f5a8d4
TL
2374 events = json.loads(result)
2375 for event in events['events']:
eafe8130
TL
2376 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
2377 '" type: "' + str(event['event']) + '"')
2378 # only deletions should be listed here
2379 # TODO: use exact match
92f5a8d4 2380 verify_events_by_elements(events, keys, exact_match=False, deletions=True)
eafe8130
TL
2381 # get the events from the all events subscription
2382 result, _ = sub_create_conf.get_events()
92f5a8d4
TL
2383 events = json.loads(result)
2384 for event in events['events']:
eafe8130
TL
2385 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
2386 '" type: "' + str(event['event']) + '"')
2387 # both deletions and creations should be here
2388 # TODO: use exact match
92f5a8d4
TL
2389 verify_events_by_elements(events, keys, exact_match=False, deletions=False)
2390 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
eafe8130
TL
2391 # TODO: (1) test deletions (2) test overall number of events
2392
2393 # test subscription deletion when topic is specified
2394 _, status = sub_create_conf.del_config(topic=True)
2395 assert_equal(status/100, 2)
2396 _, status = sub_delete_conf.del_config(topic=True)
2397 assert_equal(status/100, 2)
2398 _, status = sub_conf.del_config(topic=True)
2399 assert_equal(status/100, 2)
2400
2401 # cleanup
2402 notification_create_conf.del_config()
2403 notification_delete_conf.del_config()
2404 notification_conf.del_config()
2405 topic_create_conf.del_config()
2406 topic_delete_conf.del_config()
2407 topic_conf.del_config()
9f95a23c 2408 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2409
2410
2411def test_ps_event_fetching():
2412 """ test incremental fetching of events from a subscription """
9f95a23c 2413 master_zone, ps_zone = init_env()
eafe8130
TL
2414 bucket_name = gen_bucket_name()
2415 topic_name = bucket_name+TOPIC_SUFFIX
2416
2417 # create topic
9f95a23c 2418 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
2419 topic_conf.set_config()
2420 # create bucket on the first of the rados zones
9f95a23c 2421 bucket = master_zone.create_bucket(bucket_name)
eafe8130 2422 # wait for sync
9f95a23c 2423 zone_meta_checkpoint(ps_zone.zone)
eafe8130 2424 # create notifications
9f95a23c 2425 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2426 topic_name)
2427 _, status = notification_conf.set_config()
2428 assert_equal(status/100, 2)
2429 # create subscription
9f95a23c 2430 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
eafe8130
TL
2431 topic_name)
2432 _, status = sub_conf.set_config()
2433 assert_equal(status/100, 2)
2434 # create objects in the bucket
2435 number_of_objects = 100
2436 for i in range(number_of_objects):
2437 key = bucket.new_key(str(i))
2438 key.set_contents_from_string('bar')
2439 # wait for sync
9f95a23c 2440 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
2441 max_events = 15
2442 total_events_count = 0
2443 next_marker = None
2444 all_events = []
2445 while True:
2446 # get the events from the subscription
2447 result, _ = sub_conf.get_events(max_events, next_marker)
92f5a8d4
TL
2448 events = json.loads(result)
2449 total_events_count += len(events['events'])
2450 all_events.extend(events['events'])
2451 next_marker = events['next_marker']
2452 for event in events['events']:
eafe8130
TL
2453 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2454 if next_marker == '':
2455 break
2456 keys = list(bucket.list())
2457 # TODO: use exact match
92f5a8d4 2458 verify_events_by_elements({'events': all_events}, keys, exact_match=False)
eafe8130
TL
2459
2460 # cleanup
2461 sub_conf.del_config()
2462 notification_conf.del_config()
2463 topic_conf.del_config()
2464 for key in bucket.list():
2465 key.delete()
9f95a23c 2466 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2467
2468
2469def test_ps_event_acking():
2470 """ test acking of some events in a subscription """
9f95a23c 2471 master_zone, ps_zone = init_env()
eafe8130
TL
2472 bucket_name = gen_bucket_name()
2473 topic_name = bucket_name+TOPIC_SUFFIX
2474
2475 # create topic
9f95a23c 2476 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
2477 topic_conf.set_config()
2478 # create bucket on the first of the rados zones
9f95a23c 2479 bucket = master_zone.create_bucket(bucket_name)
eafe8130 2480 # wait for sync
9f95a23c 2481 zone_meta_checkpoint(ps_zone.zone)
eafe8130 2482 # create notifications
9f95a23c 2483 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2484 topic_name)
2485 _, status = notification_conf.set_config()
2486 assert_equal(status/100, 2)
2487 # create subscription
9f95a23c 2488 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
eafe8130
TL
2489 topic_name)
2490 _, status = sub_conf.set_config()
2491 assert_equal(status/100, 2)
2492 # create objects in the bucket
2493 number_of_objects = 10
2494 for i in range(number_of_objects):
2495 key = bucket.new_key(str(i))
2496 key.set_contents_from_string('bar')
2497 # wait for sync
9f95a23c 2498 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
2499
2500 # get the create events from the subscription
2501 result, _ = sub_conf.get_events()
92f5a8d4 2502 events = json.loads(result)
eafe8130 2503 original_number_of_events = len(events)
92f5a8d4 2504 for event in events['events']:
eafe8130
TL
2505 log.debug('Event (before ack) id: "' + str(event['id']) + '"')
2506 keys = list(bucket.list())
2507 # TODO: use exact match
2508 verify_events_by_elements(events, keys, exact_match=False)
2509 # ack half of the events
2510 events_to_ack = number_of_objects/2
92f5a8d4 2511 for event in events['events']:
eafe8130
TL
2512 if events_to_ack == 0:
2513 break
2514 _, status = sub_conf.ack_events(event['id'])
2515 assert_equal(status/100, 2)
2516 events_to_ack -= 1
2517
2518 # verify that acked events are gone
2519 result, _ = sub_conf.get_events()
92f5a8d4
TL
2520 events = json.loads(result)
2521 for event in events['events']:
eafe8130 2522 log.debug('Event (after ack) id: "' + str(event['id']) + '"')
92f5a8d4 2523 assert len(events) >= (original_number_of_events - number_of_objects/2)
eafe8130
TL
2524
2525 # cleanup
2526 sub_conf.del_config()
2527 notification_conf.del_config()
2528 topic_conf.del_config()
2529 for key in bucket.list():
2530 key.delete()
9f95a23c 2531 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2532
2533
2534def test_ps_creation_triggers():
2535 """ test object creation notifications in using put/copy/post """
9f95a23c 2536 master_zone, ps_zone = init_env()
eafe8130
TL
2537 bucket_name = gen_bucket_name()
2538 topic_name = bucket_name+TOPIC_SUFFIX
2539
2540 # create topic
9f95a23c 2541 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
2542 topic_conf.set_config()
2543 # create bucket on the first of the rados zones
9f95a23c 2544 bucket = master_zone.create_bucket(bucket_name)
eafe8130 2545 # wait for sync
9f95a23c 2546 zone_meta_checkpoint(ps_zone.zone)
eafe8130 2547 # create notifications
9f95a23c 2548 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2549 topic_name)
2550 _, status = notification_conf.set_config()
2551 assert_equal(status/100, 2)
2552 # create subscription
9f95a23c 2553 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
eafe8130
TL
2554 topic_name)
2555 _, status = sub_conf.set_config()
2556 assert_equal(status/100, 2)
2557 # create objects in the bucket using PUT
2558 key = bucket.new_key('put')
2559 key.set_contents_from_string('bar')
2560 # create objects in the bucket using COPY
2561 bucket.copy_key('copy', bucket.name, key.name)
2562 # create objects in the bucket using multi-part upload
2563 fp = tempfile.TemporaryFile(mode='w')
2564 fp.write('bar')
2565 fp.close()
2566 uploader = bucket.initiate_multipart_upload('multipart')
2567 fp = tempfile.TemporaryFile(mode='r')
2568 uploader.upload_part_from_file(fp, 1)
2569 uploader.complete_upload()
2570 fp.close()
2571 # wait for sync
9f95a23c 2572 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
2573
2574 # get the create events from the subscription
2575 result, _ = sub_conf.get_events()
92f5a8d4
TL
2576 events = json.loads(result)
2577 for event in events['events']:
eafe8130
TL
2578 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2579
2580 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
92f5a8d4 2581 assert len(events['events']) >= 3
eafe8130
TL
2582 # cleanup
2583 sub_conf.del_config()
2584 notification_conf.del_config()
2585 topic_conf.del_config()
2586 for key in bucket.list():
2587 key.delete()
9f95a23c 2588 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2589
2590
2591def test_ps_s3_creation_triggers_on_master():
2592 """ test object creation s3 notifications in using put/copy/post on master"""
2593 if skip_push_tests:
2594 return SkipTest("PubSub push tests don't run in teuthology")
2595 hostname = get_ip()
2596 proc = init_rabbitmq()
2597 if proc is None:
2598 return SkipTest('end2end amqp tests require rabbitmq-server installed')
9f95a23c 2599 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
2600 realm = get_realm()
2601 zonegroup = realm.master_zonegroup()
2602
2603 # create bucket
2604 bucket_name = gen_bucket_name()
9f95a23c 2605 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
2606 topic_name = bucket_name + TOPIC_SUFFIX
2607
2608 # start amqp receiver
2609 exchange = 'ex1'
2610 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2611 task.start()
2612
2613 # create s3 topic
2614 endpoint_address = 'amqp://' + hostname
2615 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
9f95a23c 2616 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
2617 topic_arn = topic_conf.set_config()
2618 # create s3 notification
2619 notification_name = bucket_name + NOTIFICATION_SUFFIX
2620 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
2621 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
2622 }]
2623
9f95a23c 2624 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
2625 response, status = s3_notification_conf.set_config()
2626 assert_equal(status/100, 2)
2627
2628 # create objects in the bucket using PUT
2629 key = bucket.new_key('put')
2630 key.set_contents_from_string('bar')
2631 # create objects in the bucket using COPY
2632 bucket.copy_key('copy', bucket.name, key.name)
2633 # create objects in the bucket using multi-part upload
2634 fp = tempfile.TemporaryFile(mode='w')
2635 fp.write('bar')
2636 fp.close()
2637 uploader = bucket.initiate_multipart_upload('multipart')
2638 fp = tempfile.TemporaryFile(mode='r')
2639 uploader.upload_part_from_file(fp, 1)
2640 uploader.complete_upload()
2641 fp.close()
2642
9f95a23c 2643 print('wait for 5sec for the messages...')
eafe8130
TL
2644 time.sleep(5)
2645
2646 # check amqp receiver
11fdf7f2 2647 keys = list(bucket.list())
eafe8130
TL
2648 receiver.verify_s3_events(keys, exact_match=True)
2649
2650 # cleanup
2651 stop_amqp_receiver(receiver, task)
2652 s3_notification_conf.del_config()
2653 topic_conf.del_config()
2654 for key in bucket.list():
2655 key.delete()
2656 # delete the bucket
9f95a23c 2657 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2658 clean_rabbitmq(proc)
2659
2660
2661def test_ps_s3_multipart_on_master():
2662 """ test multipart object upload on master"""
2663 if skip_push_tests:
2664 return SkipTest("PubSub push tests don't run in teuthology")
2665 hostname = get_ip()
2666 proc = init_rabbitmq()
2667 if proc is None:
2668 return SkipTest('end2end amqp tests require rabbitmq-server installed')
9f95a23c 2669 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
2670 realm = get_realm()
2671 zonegroup = realm.master_zonegroup()
2672
2673 # create bucket
2674 bucket_name = gen_bucket_name()
9f95a23c 2675 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
2676 topic_name = bucket_name + TOPIC_SUFFIX
2677
2678 # start amqp receivers
2679 exchange = 'ex1'
2680 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
2681 task1.start()
2682 task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
2683 task2.start()
2684 task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
2685 task3.start()
2686
2687 # create s3 topics
2688 endpoint_address = 'amqp://' + hostname
2689 endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
9f95a23c 2690 topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
eafe8130 2691 topic_arn1 = topic_conf1.set_config()
9f95a23c 2692 topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
eafe8130 2693 topic_arn2 = topic_conf2.set_config()
9f95a23c 2694 topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
2695 topic_arn3 = topic_conf3.set_config()
2696
2697 # create s3 notifications
2698 notification_name = bucket_name + NOTIFICATION_SUFFIX
2699 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
2700 'Events': ['s3:ObjectCreated:*']
2701 },
2702 {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
2703 'Events': ['s3:ObjectCreated:Post']
2704 },
2705 {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
2706 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
2707 }]
9f95a23c 2708 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
2709 response, status = s3_notification_conf.set_config()
2710 assert_equal(status/100, 2)
2711
2712 # create objects in the bucket using multi-part upload
2713 fp = tempfile.TemporaryFile(mode='w+b')
2714 content = bytearray(os.urandom(1024*1024))
2715 fp.write(content)
2716 fp.flush()
2717 fp.seek(0)
2718 uploader = bucket.initiate_multipart_upload('multipart')
2719 uploader.upload_part_from_file(fp, 1)
2720 uploader.complete_upload()
2721 fp.close()
2722
9f95a23c 2723 print('wait for 5sec for the messages...')
eafe8130
TL
2724 time.sleep(5)
2725
2726 # check amqp receiver
2727 events = receiver1.get_and_reset_events()
2728 assert_equal(len(events), 3)
2729
2730 events = receiver2.get_and_reset_events()
2731 assert_equal(len(events), 1)
92f5a8d4
TL
2732 assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post')
2733 assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2')
eafe8130
TL
2734
2735 events = receiver3.get_and_reset_events()
2736 assert_equal(len(events), 1)
92f5a8d4
TL
2737 assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
2738 assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
eafe8130
TL
2739
2740 # cleanup
2741 stop_amqp_receiver(receiver1, task1)
2742 stop_amqp_receiver(receiver2, task2)
2743 stop_amqp_receiver(receiver3, task3)
2744 s3_notification_conf.del_config()
2745 topic_conf1.del_config()
2746 topic_conf2.del_config()
2747 topic_conf3.del_config()
2748 for key in bucket.list():
2749 key.delete()
2750 # delete the bucket
9f95a23c 2751 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2752 clean_rabbitmq(proc)
2753
2754
2755def test_ps_versioned_deletion():
2756 """ test notification of deletion markers """
9f95a23c 2757 master_zone, ps_zone = init_env()
eafe8130
TL
2758 bucket_name = gen_bucket_name()
2759 topic_name = bucket_name+TOPIC_SUFFIX
2760
2761 # create topics
9f95a23c 2762 topic_conf1 = PSTopic(ps_zone.conn, topic_name+'_1')
eafe8130
TL
2763 _, status = topic_conf1.set_config()
2764 assert_equal(status/100, 2)
9f95a23c 2765 topic_conf2 = PSTopic(ps_zone.conn, topic_name+'_2')
eafe8130
TL
2766 _, status = topic_conf2.set_config()
2767 assert_equal(status/100, 2)
2768
2769 # create bucket on the first of the rados zones
9f95a23c 2770 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
2771 bucket.configure_versioning(True)
2772
2773 # wait for sync
9f95a23c 2774 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
2775
2776 # create notifications
2777 event_type1 = 'OBJECT_DELETE'
9f95a23c 2778 notification_conf1 = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2779 topic_name+'_1',
2780 event_type1)
2781 _, status = notification_conf1.set_config()
2782 assert_equal(status/100, 2)
2783 event_type2 = 'DELETE_MARKER_CREATE'
9f95a23c 2784 notification_conf2 = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
2785 topic_name+'_2',
2786 event_type2)
2787 _, status = notification_conf2.set_config()
2788 assert_equal(status/100, 2)
2789
2790 # create subscriptions
9f95a23c 2791 sub_conf1 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_1',
eafe8130
TL
2792 topic_name+'_1')
2793 _, status = sub_conf1.set_config()
2794 assert_equal(status/100, 2)
9f95a23c 2795 sub_conf2 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_2',
eafe8130
TL
2796 topic_name+'_2')
2797 _, status = sub_conf2.set_config()
2798 assert_equal(status/100, 2)
2799
2800 # create objects in the bucket
2801 key = bucket.new_key('foo')
2802 key.set_contents_from_string('bar')
2803 v1 = key.version_id
2804 key.set_contents_from_string('kaboom')
2805 v2 = key.version_id
2806 # create deletion marker
2807 delete_marker_key = bucket.delete_key(key.name)
2808
2809 # wait for sync
9f95a23c 2810 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
2811
2812 # delete the deletion marker
2813 delete_marker_key.delete()
2814 # delete versions
2815 bucket.delete_key(key.name, version_id=v2)
2816 bucket.delete_key(key.name, version_id=v1)
2817
2818 # wait for sync
9f95a23c 2819 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
2820
2821 # get the delete events from the subscription
2822 result, _ = sub_conf1.get_events()
92f5a8d4
TL
2823 events = json.loads(result)
2824 for event in events['events']:
eafe8130
TL
2825 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2826 assert_equal(str(event['event']), event_type1)
2827
2828 result, _ = sub_conf2.get_events()
92f5a8d4
TL
2829 events = json.loads(result)
2830 for event in events['events']:
eafe8130
TL
2831 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2832 assert_equal(str(event['event']), event_type2)
2833
2834 # cleanup
2835 # follwing is needed for the cleanup in the case of 3-zones
2836 # see: http://tracker.ceph.com/issues/39142
2837 realm = get_realm()
2838 zonegroup = realm.master_zonegroup()
2839 zonegroup_conns = ZonegroupConns(zonegroup)
2840 try:
2841 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
9f95a23c 2842 master_zone.delete_bucket(bucket_name)
eafe8130
TL
2843 except:
2844 log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
2845 sub_conf1.del_config()
2846 sub_conf2.del_config()
2847 notification_conf1.del_config()
2848 notification_conf2.del_config()
2849 topic_conf1.del_config()
2850 topic_conf2.del_config()
2851
2852
2853def test_ps_s3_metadata_on_master():
2854 """ test s3 notification of metadata on master """
2855 if skip_push_tests:
2856 return SkipTest("PubSub push tests don't run in teuthology")
2857 hostname = get_ip()
2858 proc = init_rabbitmq()
2859 if proc is None:
2860 return SkipTest('end2end amqp tests require rabbitmq-server installed')
9f95a23c 2861 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
2862 realm = get_realm()
2863 zonegroup = realm.master_zonegroup()
2864
2865 # create bucket
2866 bucket_name = gen_bucket_name()
9f95a23c 2867 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
2868 topic_name = bucket_name + TOPIC_SUFFIX
2869
2870 # start amqp receiver
2871 exchange = 'ex1'
2872 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2873 task.start()
2874
2875 # create s3 topic
2876 endpoint_address = 'amqp://' + hostname
2877 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
9f95a23c 2878 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
2879 topic_arn = topic_conf.set_config()
2880 # create s3 notification
2881 notification_name = bucket_name + NOTIFICATION_SUFFIX
9f95a23c
TL
2882 meta_key = 'meta1'
2883 meta_value = 'This is my metadata value'
2884 meta_prefix = 'x-amz-meta-'
eafe8130 2885 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
9f95a23c
TL
2886 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2887 'Filter': {
2888 'Metadata': {
2889 'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}]
2890 }
2891 }
2892 }]
2893
2894 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
2895 response, status = s3_notification_conf.set_config()
2896 assert_equal(status/100, 2)
2897
2898 # create objects in the bucket
9f95a23c
TL
2899 key_name = 'foo'
2900 key = bucket.new_key(key_name)
2901 key.set_metadata(meta_key, meta_value)
eafe8130 2902 key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
9f95a23c
TL
2903
2904 # create objects in the bucket using COPY
2905 bucket.copy_key('copy_of_foo', bucket.name, key.name)
2906 # create objects in the bucket using multi-part upload
2907 fp = tempfile.TemporaryFile(mode='w')
2908 fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
2909 fp.close()
2910 uploader = bucket.initiate_multipart_upload('multipart_foo',
2911 metadata={meta_key: meta_value})
2912 fp = tempfile.TemporaryFile(mode='r')
2913 uploader.upload_part_from_file(fp, 1)
2914 uploader.complete_upload()
2915 fp.close()
2916 print('wait for 5sec for the messages...')
eafe8130
TL
2917 time.sleep(5)
2918 # check amqp receiver
9f95a23c
TL
2919 event_count = 0
2920 for event in receiver.get_and_reset_events():
2921 s3_event = event['Records'][0]['s3']
2922 assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
2923 assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
2924 event_count +=1
2925
2926 # only PUT and POST has the metadata value
2927 assert_equal(event_count, 2)
2928
2929 # delete objects
2930 for key in bucket.list():
2931 key.delete()
2932 print('wait for 5sec for the messages...')
2933 time.sleep(5)
2934 # check amqp receiver
2935 event_count = 0
2936 for event in receiver.get_and_reset_events():
2937 s3_event = event['Records'][0]['s3']
2938 assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
2939 assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
2940 event_count +=1
2941
2942 # all 3 object has metadata when deleted
2943 assert_equal(event_count, 3)
eafe8130
TL
2944
2945 # cleanup
2946 stop_amqp_receiver(receiver, task)
2947 s3_notification_conf.del_config()
2948 topic_conf.del_config()
9f95a23c
TL
2949 # delete the bucket
2950 master_zone.delete_bucket(bucket_name)
2951 clean_rabbitmq(proc)
2952
2953
2954def test_ps_s3_tags_on_master():
2955 """ test s3 notification of tags on master """
2956 if skip_push_tests:
2957 return SkipTest("PubSub push tests don't run in teuthology")
2958 hostname = get_ip()
2959 proc = init_rabbitmq()
2960 if proc is None:
2961 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2962 master_zone, _ = init_env(require_ps=False)
2963 realm = get_realm()
2964 zonegroup = realm.master_zonegroup()
2965
2966 # create bucket
2967 bucket_name = gen_bucket_name()
2968 bucket = master_zone.create_bucket(bucket_name)
2969 topic_name = bucket_name + TOPIC_SUFFIX
2970
2971 # start amqp receiver
2972 exchange = 'ex1'
2973 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2974 task.start()
2975
2976 # create s3 topic
2977 endpoint_address = 'amqp://' + hostname
2978 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
2979 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
2980 topic_arn = topic_conf.set_config()
2981 # create s3 notification
2982 notification_name = bucket_name + NOTIFICATION_SUFFIX
2983 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
2984 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2985 'Filter': {
2986 'Tags': {
2987 'FilterRules': [{'Name': 'hello', 'Value': 'world'}]
2988 }
2989 }
2990 }]
2991
2992 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
2993 response, status = s3_notification_conf.set_config()
2994 assert_equal(status/100, 2)
2995
2996 # create objects in the bucket with tags
2997 tags = 'hello=world&ka=boom'
2998 key_name1 = 'key1'
2999 put_object_tagging(master_zone.conn, bucket_name, key_name1, tags)
3000 tags = 'foo=bar&ka=boom'
3001 key_name2 = 'key2'
3002 put_object_tagging(master_zone.conn, bucket_name, key_name2, tags)
3003 key_name3 = 'key3'
3004 key = bucket.new_key(key_name3)
3005 key.set_contents_from_string('bar')
3006 # create objects in the bucket using COPY
3007 bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1)
3008 print('wait for 5sec for the messages...')
3009 time.sleep(5)
3010 expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
3011 # check amqp receiver
3012 for event in receiver.get_and_reset_events():
3013 obj_tags = event['Records'][0]['s3']['object']['tags']
3014 assert_equal(obj_tags[0], expected_tags[0])
3015
3016 # delete the objects
eafe8130
TL
3017 for key in bucket.list():
3018 key.delete()
9f95a23c
TL
3019 print('wait for 5sec for the messages...')
3020 time.sleep(5)
3021 # check amqp receiver
3022 for event in receiver.get_and_reset_events():
3023 obj_tags = event['Records'][0]['s3']['object']['tags']
3024 assert_equal(obj_tags[0], expected_tags[0])
3025
3026 # cleanup
3027 stop_amqp_receiver(receiver, task)
3028 s3_notification_conf.del_config()
3029 topic_conf.del_config()
eafe8130 3030 # delete the bucket
9f95a23c 3031 master_zone.delete_bucket(bucket_name)
eafe8130
TL
3032 clean_rabbitmq(proc)
3033
3034
3035def test_ps_s3_versioned_deletion_on_master():
3036 """ test s3 notification of deletion markers on master """
3037 if skip_push_tests:
3038 return SkipTest("PubSub push tests don't run in teuthology")
3039 hostname = get_ip()
3040 proc = init_rabbitmq()
3041 if proc is None:
3042 return SkipTest('end2end amqp tests require rabbitmq-server installed')
9f95a23c 3043 master_zone, _ = init_env(require_ps=False)
eafe8130
TL
3044 realm = get_realm()
3045 zonegroup = realm.master_zonegroup()
3046
3047 # create bucket
3048 bucket_name = gen_bucket_name()
9f95a23c 3049 bucket = master_zone.create_bucket(bucket_name)
eafe8130
TL
3050 bucket.configure_versioning(True)
3051 topic_name = bucket_name + TOPIC_SUFFIX
3052
3053 # start amqp receiver
3054 exchange = 'ex1'
3055 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3056 task.start()
3057
3058 # create s3 topic
3059 endpoint_address = 'amqp://' + hostname
3060 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
9f95a23c 3061 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
eafe8130
TL
3062 topic_arn = topic_conf.set_config()
3063 # create s3 notification
3064 notification_name = bucket_name + NOTIFICATION_SUFFIX
3065 # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
3066 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
3067 'Events': ['s3:ObjectRemoved:*']
3068 },
3069 {'Id': notification_name+'_2', 'TopicArn': topic_arn,
3070 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
3071 },
3072 {'Id': notification_name+'_3', 'TopicArn': topic_arn,
3073 'Events': ['s3:ObjectRemoved:Delete']
3074 }]
9f95a23c 3075 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
3076 response, status = s3_notification_conf.set_config()
3077 assert_equal(status/100, 2)
3078
3079 # create objects in the bucket
3080 key = bucket.new_key('foo')
3081 key.set_contents_from_string('bar')
3082 v1 = key.version_id
3083 key.set_contents_from_string('kaboom')
3084 v2 = key.version_id
3085 # create delete marker (non versioned deletion)
3086 delete_marker_key = bucket.delete_key(key.name)
3087
3088 time.sleep(1)
3089
3090 # versioned deletion
3091 bucket.delete_key(key.name, version_id=v2)
3092 bucket.delete_key(key.name, version_id=v1)
3093 delete_marker_key.delete()
3094
9f95a23c 3095 print('wait for 5sec for the messages...')
eafe8130
TL
3096 time.sleep(5)
3097
3098 # check amqp receiver
3099 events = receiver.get_and_reset_events()
3100 delete_events = 0
3101 delete_marker_create_events = 0
92f5a8d4
TL
3102 for event_list in events:
3103 for event in event_list['Records']:
3104 if event['eventName'] == 's3:ObjectRemoved:Delete':
3105 delete_events += 1
3106 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
3107 if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
3108 delete_marker_create_events += 1
3109 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
eafe8130
TL
3110
3111 # 3 key versions were deleted (v1, v2 and the deletion marker)
3112 # notified over the same topic via 2 notifications (1,3)
3113 assert_equal(delete_events, 3*2)
3114 # 1 deletion marker was created
3115 # notified over the same topic over 2 notifications (1,2)
3116 assert_equal(delete_marker_create_events, 1*2)
3117
3118 # cleanup
3119 stop_amqp_receiver(receiver, task)
3120 s3_notification_conf.del_config()
3121 topic_conf.del_config()
3122 # delete the bucket
9f95a23c 3123 master_zone.delete_bucket(bucket_name)
eafe8130
TL
3124 clean_rabbitmq(proc)
3125
3126
3127def test_ps_push_http():
3128 """ test pushing to http endpoint """
3129 if skip_push_tests:
3130 return SkipTest("PubSub push tests don't run in teuthology")
9f95a23c 3131 master_zone, ps_zone = init_env()
eafe8130
TL
3132 bucket_name = gen_bucket_name()
3133 topic_name = bucket_name+TOPIC_SUFFIX
3134
3135 # create random port for the http server
3136 host = get_ip()
3137 port = random.randint(10000, 20000)
3138 # start an http server in a separate thread
3139 http_server = StreamingHTTPServer(host, port)
3140
3141 # create topic
9f95a23c 3142 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
3143 _, status = topic_conf.set_config()
3144 assert_equal(status/100, 2)
3145 # create bucket on the first of the rados zones
9f95a23c 3146 bucket = master_zone.create_bucket(bucket_name)
eafe8130 3147 # wait for sync
9f95a23c 3148 zone_meta_checkpoint(ps_zone.zone)
eafe8130 3149 # create notifications
9f95a23c 3150 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
3151 topic_name)
3152 _, status = notification_conf.set_config()
3153 assert_equal(status/100, 2)
3154 # create subscription
9f95a23c 3155 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
eafe8130
TL
3156 topic_name, endpoint='http://'+host+':'+str(port))
3157 _, status = sub_conf.set_config()
3158 assert_equal(status/100, 2)
3159 # create objects in the bucket
3160 number_of_objects = 10
3161 for i in range(number_of_objects):
3162 key = bucket.new_key(str(i))
3163 key.set_contents_from_string('bar')
3164 # wait for sync
9f95a23c 3165 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3166 # check http server
3167 keys = list(bucket.list())
3168 # TODO: use exact match
3169 http_server.verify_events(keys, exact_match=False)
3170
11fdf7f2
TL
3171 # delete objects from the bucket
3172 for key in bucket.list():
3173 key.delete()
3174 # wait for sync
9f95a23c
TL
3175 zone_meta_checkpoint(ps_zone.zone)
3176 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3177 # check http server
3178 # TODO: use exact match
3179 http_server.verify_events(keys, deletions=True, exact_match=False)
3180
3181 # cleanup
3182 sub_conf.del_config()
3183 notification_conf.del_config()
3184 topic_conf.del_config()
9f95a23c 3185 master_zone.delete_bucket(bucket_name)
eafe8130
TL
3186 http_server.close()
3187
3188
3189def test_ps_s3_push_http():
3190 """ test pushing to http endpoint s3 record format"""
3191 if skip_push_tests:
3192 return SkipTest("PubSub push tests don't run in teuthology")
9f95a23c 3193 master_zone, ps_zone = init_env()
eafe8130
TL
3194 bucket_name = gen_bucket_name()
3195 topic_name = bucket_name+TOPIC_SUFFIX
3196
3197 # create random port for the http server
3198 host = get_ip()
3199 port = random.randint(10000, 20000)
3200 # start an http server in a separate thread
3201 http_server = StreamingHTTPServer(host, port)
3202
3203 # create topic
9f95a23c 3204 topic_conf = PSTopic(ps_zone.conn, topic_name,
eafe8130
TL
3205 endpoint='http://'+host+':'+str(port))
3206 result, status = topic_conf.set_config()
3207 assert_equal(status/100, 2)
11fdf7f2 3208 parsed_result = json.loads(result)
eafe8130
TL
3209 topic_arn = parsed_result['arn']
3210 # create bucket on the first of the rados zones
9f95a23c 3211 bucket = master_zone.create_bucket(bucket_name)
eafe8130 3212 # wait for sync
9f95a23c 3213 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
3214 # create s3 notification
3215 notification_name = bucket_name + NOTIFICATION_SUFFIX
3216 topic_conf_list = [{'Id': notification_name,
3217 'TopicArn': topic_arn,
3218 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
3219 }]
9f95a23c 3220 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
3221 _, status = s3_notification_conf.set_config()
3222 assert_equal(status/100, 2)
3223 # create objects in the bucket
3224 number_of_objects = 10
3225 for i in range(number_of_objects):
3226 key = bucket.new_key(str(i))
3227 key.set_contents_from_string('bar')
3228 # wait for sync
9f95a23c 3229 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3230 # check http server
3231 keys = list(bucket.list())
3232 # TODO: use exact match
3233 http_server.verify_s3_events(keys, exact_match=False)
3234
3235 # delete objects from the bucket
3236 for key in bucket.list():
3237 key.delete()
3238 # wait for sync
9f95a23c
TL
3239 zone_meta_checkpoint(ps_zone.zone)
3240 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3241 # check http server
3242 # TODO: use exact match
3243 http_server.verify_s3_events(keys, deletions=True, exact_match=False)
3244
3245 # cleanup
3246 s3_notification_conf.del_config()
3247 topic_conf.del_config()
9f95a23c 3248 master_zone.delete_bucket(bucket_name)
eafe8130
TL
3249 http_server.close()
3250
3251
3252def test_ps_push_amqp():
3253 """ test pushing to amqp endpoint """
3254 if skip_push_tests:
3255 return SkipTest("PubSub push tests don't run in teuthology")
3256 hostname = get_ip()
3257 proc = init_rabbitmq()
3258 if proc is None:
3259 return SkipTest('end2end amqp tests require rabbitmq-server installed')
9f95a23c 3260 master_zone, ps_zone = init_env()
eafe8130
TL
3261 bucket_name = gen_bucket_name()
3262 topic_name = bucket_name+TOPIC_SUFFIX
3263
3264 # create topic
3265 exchange = 'ex1'
3266 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3267 task.start()
9f95a23c 3268 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
3269 _, status = topic_conf.set_config()
3270 assert_equal(status/100, 2)
3271 # create bucket on the first of the rados zones
9f95a23c 3272 bucket = master_zone.create_bucket(bucket_name)
eafe8130 3273 # wait for sync
9f95a23c 3274 zone_meta_checkpoint(ps_zone.zone)
eafe8130 3275 # create notifications
9f95a23c 3276 notification_conf = PSNotification(ps_zone.conn, bucket_name,
eafe8130
TL
3277 topic_name)
3278 _, status = notification_conf.set_config()
3279 assert_equal(status/100, 2)
3280 # create subscription
9f95a23c 3281 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
eafe8130
TL
3282 topic_name, endpoint='amqp://'+hostname,
3283 endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker')
3284 _, status = sub_conf.set_config()
3285 assert_equal(status/100, 2)
3286 # create objects in the bucket
3287 number_of_objects = 10
3288 for i in range(number_of_objects):
3289 key = bucket.new_key(str(i))
3290 key.set_contents_from_string('bar')
3291 # wait for sync
9f95a23c 3292 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3293 # check amqp receiver
3294 keys = list(bucket.list())
3295 # TODO: use exact match
3296 receiver.verify_events(keys, exact_match=False)
3297
3298 # delete objects from the bucket
3299 for key in bucket.list():
3300 key.delete()
3301 # wait for sync
9f95a23c
TL
3302 zone_meta_checkpoint(ps_zone.zone)
3303 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3304 # check amqp receiver
3305 # TODO: use exact match
3306 receiver.verify_events(keys, deletions=True, exact_match=False)
11fdf7f2
TL
3307
3308 # cleanup
eafe8130 3309 stop_amqp_receiver(receiver, task)
11fdf7f2 3310 sub_conf.del_config()
11fdf7f2 3311 notification_conf.del_config()
11fdf7f2 3312 topic_conf.del_config()
9f95a23c 3313 master_zone.delete_bucket(bucket_name)
eafe8130
TL
3314 clean_rabbitmq(proc)
3315
3316
3317def test_ps_s3_push_amqp():
3318 """ test pushing to amqp endpoint s3 record format"""
3319 if skip_push_tests:
3320 return SkipTest("PubSub push tests don't run in teuthology")
3321 hostname = get_ip()
3322 proc = init_rabbitmq()
3323 if proc is None:
3324 return SkipTest('end2end amqp tests require rabbitmq-server installed')
9f95a23c 3325 master_zone, ps_zone = init_env()
eafe8130
TL
3326 bucket_name = gen_bucket_name()
3327 topic_name = bucket_name+TOPIC_SUFFIX
3328
3329 # create topic
3330 exchange = 'ex1'
3331 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3332 task.start()
9f95a23c 3333 topic_conf = PSTopic(ps_zone.conn, topic_name,
eafe8130
TL
3334 endpoint='amqp://' + hostname,
3335 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3336 result, status = topic_conf.set_config()
3337 assert_equal(status/100, 2)
3338 parsed_result = json.loads(result)
3339 topic_arn = parsed_result['arn']
3340 # create bucket on the first of the rados zones
9f95a23c 3341 bucket = master_zone.create_bucket(bucket_name)
eafe8130 3342 # wait for sync
9f95a23c 3343 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
3344 # create s3 notification
3345 notification_name = bucket_name + NOTIFICATION_SUFFIX
3346 topic_conf_list = [{'Id': notification_name,
3347 'TopicArn': topic_arn,
3348 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
3349 }]
9f95a23c 3350 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
3351 _, status = s3_notification_conf.set_config()
3352 assert_equal(status/100, 2)
3353 # create objects in the bucket
3354 number_of_objects = 10
3355 for i in range(number_of_objects):
3356 key = bucket.new_key(str(i))
3357 key.set_contents_from_string('bar')
3358 # wait for sync
9f95a23c 3359 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3360 # check amqp receiver
3361 keys = list(bucket.list())
3362 # TODO: use exact match
3363 receiver.verify_s3_events(keys, exact_match=False)
3364
3365 # delete objects from the bucket
3366 for key in bucket.list():
3367 key.delete()
3368 # wait for sync
9f95a23c
TL
3369 zone_meta_checkpoint(ps_zone.zone)
3370 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3371 # check amqp receiver
3372 # TODO: use exact match
3373 receiver.verify_s3_events(keys, deletions=True, exact_match=False)
3374
3375 # cleanup
3376 stop_amqp_receiver(receiver, task)
3377 s3_notification_conf.del_config()
3378 topic_conf.del_config()
9f95a23c 3379 master_zone.delete_bucket(bucket_name)
eafe8130 3380 clean_rabbitmq(proc)
11fdf7f2
TL
3381
3382
eafe8130
TL
3383def test_ps_delete_bucket():
3384 """ test notification status upon bucket deletion """
9f95a23c 3385 master_zone, ps_zone = init_env()
11fdf7f2 3386 bucket_name = gen_bucket_name()
11fdf7f2 3387 # create bucket on the first of the rados zones
9f95a23c 3388 bucket = master_zone.create_bucket(bucket_name)
11fdf7f2 3389 # wait for sync
9f95a23c 3390 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
3391 topic_name = bucket_name + TOPIC_SUFFIX
3392 # create topic
3393 topic_name = bucket_name + TOPIC_SUFFIX
9f95a23c 3394 topic_conf = PSTopic(ps_zone.conn, topic_name)
eafe8130
TL
3395 response, status = topic_conf.set_config()
3396 assert_equal(status/100, 2)
3397 parsed_result = json.loads(response)
3398 topic_arn = parsed_result['arn']
3399 # create one s3 notification
3400 notification_name = bucket_name + NOTIFICATION_SUFFIX
3401 topic_conf_list = [{'Id': notification_name,
3402 'TopicArn': topic_arn,
3403 'Events': ['s3:ObjectCreated:*']
3404 }]
9f95a23c 3405 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
3406 response, status = s3_notification_conf.set_config()
3407 assert_equal(status/100, 2)
3408
3409 # create non-s3 notification
9f95a23c 3410 notification_conf = PSNotification(ps_zone.conn, bucket_name,
11fdf7f2
TL
3411 topic_name)
3412 _, status = notification_conf.set_config()
3413 assert_equal(status/100, 2)
eafe8130 3414
11fdf7f2 3415 # create objects in the bucket
eafe8130 3416 number_of_objects = 10
11fdf7f2
TL
3417 for i in range(number_of_objects):
3418 key = bucket.new_key(str(i))
3419 key.set_contents_from_string('bar')
eafe8130 3420 # wait for bucket sync
9f95a23c 3421 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
11fdf7f2 3422 keys = list(bucket.list())
eafe8130
TL
3423 # delete objects from the bucket
3424 for key in bucket.list():
3425 key.delete()
3426 # wait for bucket sync
9f95a23c 3427 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130 3428 # delete the bucket
9f95a23c 3429 master_zone.delete_bucket(bucket_name)
eafe8130 3430 # wait for meta sync
9f95a23c 3431 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
3432
3433 # get the events from the auto-generated subscription
9f95a23c 3434 sub_conf = PSSubscription(ps_zone.conn, notification_name,
eafe8130
TL
3435 topic_name)
3436 result, _ = sub_conf.get_events()
92f5a8d4 3437 records = json.loads(result)
eafe8130 3438 # TODO: use exact match
92f5a8d4 3439 verify_s3_records_by_elements(records, keys, exact_match=False)
11fdf7f2 3440
eafe8130
TL
3441 # s3 notification is deleted with bucket
3442 _, status = s3_notification_conf.get_config(notification=notification_name)
3443 assert_equal(status, 404)
3444 # non-s3 notification is deleted with bucket
3445 _, status = notification_conf.get_config()
3446 assert_equal(status, 404)
11fdf7f2
TL
3447 # cleanup
3448 sub_conf.del_config()
11fdf7f2 3449 topic_conf.del_config()
eafe8130
TL
3450
3451
3452def test_ps_missing_topic():
3453 """ test creating a subscription when no topic info exists"""
9f95a23c 3454 master_zone, ps_zone = init_env()
eafe8130
TL
3455 bucket_name = gen_bucket_name()
3456 topic_name = bucket_name+TOPIC_SUFFIX
3457
3458 # create bucket on the first of the rados zones
9f95a23c 3459 master_zone.create_bucket(bucket_name)
eafe8130 3460 # wait for sync
9f95a23c 3461 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
3462 # create s3 notification
3463 notification_name = bucket_name + NOTIFICATION_SUFFIX
3464 topic_arn = 'arn:aws:sns:::' + topic_name
3465 topic_conf_list = [{'Id': notification_name,
3466 'TopicArn': topic_arn,
3467 'Events': ['s3:ObjectCreated:*']
3468 }]
9f95a23c 3469 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
3470 try:
3471 s3_notification_conf.set_config()
3472 except:
3473 log.info('missing topic is expected')
3474 else:
3475 assert 'missing topic is expected'
3476
3477 # cleanup
9f95a23c 3478 master_zone.delete_bucket(bucket_name)
11fdf7f2
TL
3479
3480
eafe8130
TL
3481def test_ps_s3_topic_update():
3482 """ test updating topic associated with a notification"""
3483 if skip_push_tests:
3484 return SkipTest("PubSub push tests don't run in teuthology")
3485 rabbit_proc = init_rabbitmq()
3486 if rabbit_proc is None:
3487 return SkipTest('end2end amqp tests require rabbitmq-server installed')
9f95a23c 3488 master_zone, ps_zone = init_env()
11fdf7f2
TL
3489 bucket_name = gen_bucket_name()
3490 topic_name = bucket_name+TOPIC_SUFFIX
3491
eafe8130
TL
3492 # create amqp topic
3493 hostname = get_ip()
3494 exchange = 'ex1'
3495 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3496 amqp_task.start()
9f95a23c 3497 topic_conf = PSTopic(ps_zone.conn, topic_name,
eafe8130
TL
3498 endpoint='amqp://' + hostname,
3499 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3500 result, status = topic_conf.set_config()
3501 assert_equal(status/100, 2)
3502 parsed_result = json.loads(result)
3503 topic_arn = parsed_result['arn']
3504 # get topic
3505 result, _ = topic_conf.get_config()
3506 # verify topic content
3507 parsed_result = json.loads(result)
3508 assert_equal(parsed_result['topic']['name'], topic_name)
3509 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
3510
3511 # create http server
3512 port = random.randint(10000, 20000)
3513 # start an http server in a separate thread
3514 http_server = StreamingHTTPServer(hostname, port)
3515
11fdf7f2 3516 # create bucket on the first of the rados zones
9f95a23c 3517 bucket = master_zone.create_bucket(bucket_name)
11fdf7f2 3518 # wait for sync
9f95a23c 3519 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
3520 # create s3 notification
3521 notification_name = bucket_name + NOTIFICATION_SUFFIX
3522 topic_conf_list = [{'Id': notification_name,
3523 'TopicArn': topic_arn,
3524 'Events': ['s3:ObjectCreated:*']
3525 }]
9f95a23c 3526 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130 3527 _, status = s3_notification_conf.set_config()
11fdf7f2
TL
3528 assert_equal(status/100, 2)
3529 # create objects in the bucket
3530 number_of_objects = 10
3531 for i in range(number_of_objects):
3532 key = bucket.new_key(str(i))
3533 key.set_contents_from_string('bar')
3534 # wait for sync
9f95a23c 3535 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
11fdf7f2 3536
11fdf7f2 3537 keys = list(bucket.list())
eafe8130
TL
3538 # TODO: use exact match
3539 receiver.verify_s3_events(keys, exact_match=False)
11fdf7f2 3540
eafe8130 3541 # update the same topic with new endpoint
9f95a23c 3542 topic_conf = PSTopic(ps_zone.conn, topic_name,
eafe8130
TL
3543 endpoint='http://'+ hostname + ':' + str(port))
3544 _, status = topic_conf.set_config()
3545 assert_equal(status/100, 2)
3546 # get topic
3547 result, _ = topic_conf.get_config()
3548 # verify topic content
11fdf7f2 3549 parsed_result = json.loads(result)
eafe8130
TL
3550 assert_equal(parsed_result['topic']['name'], topic_name)
3551 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
3552
3553 # delete current objects and create new objects in the bucket
3554 for key in bucket.list():
3555 key.delete()
3556 for i in range(number_of_objects):
3557 key = bucket.new_key(str(i+100))
3558 key.set_contents_from_string('bar')
3559 # wait for sync
9f95a23c
TL
3560 zone_meta_checkpoint(ps_zone.zone)
3561 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3562
3563 keys = list(bucket.list())
3564 # verify that notifications are still sent to amqp
3565 # TODO: use exact match
3566 receiver.verify_s3_events(keys, exact_match=False)
3567
3568 # update notification to update the endpoint from the topic
3569 topic_conf_list = [{'Id': notification_name,
3570 'TopicArn': topic_arn,
3571 'Events': ['s3:ObjectCreated:*']
3572 }]
9f95a23c 3573 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130
TL
3574 _, status = s3_notification_conf.set_config()
3575 assert_equal(status/100, 2)
3576
3577 # delete current objects and create new objects in the bucket
3578 for key in bucket.list():
3579 key.delete()
3580 for i in range(number_of_objects):
3581 key = bucket.new_key(str(i+200))
3582 key.set_contents_from_string('bar')
3583 # wait for sync
9f95a23c
TL
3584 zone_meta_checkpoint(ps_zone.zone)
3585 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3586
3587 keys = list(bucket.list())
3588 # check that updates switched to http
3589 # TODO: use exact match
3590 http_server.verify_s3_events(keys, exact_match=False)
11fdf7f2
TL
3591
3592 # cleanup
eafe8130
TL
3593 # delete objects from the bucket
3594 stop_amqp_receiver(receiver, amqp_task)
11fdf7f2
TL
3595 for key in bucket.list():
3596 key.delete()
eafe8130
TL
3597 s3_notification_conf.del_config()
3598 topic_conf.del_config()
9f95a23c 3599 master_zone.delete_bucket(bucket_name)
eafe8130
TL
3600 http_server.close()
3601 clean_rabbitmq(rabbit_proc)
3602
3603
3604def test_ps_s3_notification_update():
3605 """ test updating the topic of a notification"""
3606 if skip_push_tests:
3607 return SkipTest("PubSub push tests don't run in teuthology")
3608 hostname = get_ip()
3609 rabbit_proc = init_rabbitmq()
3610 if rabbit_proc is None:
3611 return SkipTest('end2end amqp tests require rabbitmq-server installed')
11fdf7f2 3612
9f95a23c 3613 master_zone, ps_zone = init_env()
11fdf7f2 3614 bucket_name = gen_bucket_name()
eafe8130
TL
3615 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
3616 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
3617
3618 # create topics
3619 # start amqp receiver in a separate thread
3620 exchange = 'ex1'
3621 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
3622 amqp_task.start()
3623 # create random port for the http server
3624 http_port = random.randint(10000, 20000)
3625 # start an http server in a separate thread
3626 http_server = StreamingHTTPServer(hostname, http_port)
3627
9f95a23c 3628 topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
eafe8130
TL
3629 endpoint='amqp://' + hostname,
3630 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3631 result, status = topic_conf1.set_config()
3632 parsed_result = json.loads(result)
3633 topic_arn1 = parsed_result['arn']
3634 assert_equal(status/100, 2)
9f95a23c 3635 topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
eafe8130
TL
3636 endpoint='http://'+hostname+':'+str(http_port))
3637 result, status = topic_conf2.set_config()
3638 parsed_result = json.loads(result)
3639 topic_arn2 = parsed_result['arn']
3640 assert_equal(status/100, 2)
11fdf7f2 3641
11fdf7f2 3642 # create bucket on the first of the rados zones
9f95a23c 3643 bucket = master_zone.create_bucket(bucket_name)
11fdf7f2 3644 # wait for sync
9f95a23c 3645 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
3646 # create s3 notification with topic1
3647 notification_name = bucket_name + NOTIFICATION_SUFFIX
3648 topic_conf_list = [{'Id': notification_name,
3649 'TopicArn': topic_arn1,
3650 'Events': ['s3:ObjectCreated:*']
3651 }]
9f95a23c 3652 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130 3653 _, status = s3_notification_conf.set_config()
11fdf7f2 3654 assert_equal(status/100, 2)
eafe8130
TL
3655 # create objects in the bucket
3656 number_of_objects = 10
3657 for i in range(number_of_objects):
3658 key = bucket.new_key(str(i))
3659 key.set_contents_from_string('bar')
3660 # wait for sync
9f95a23c 3661 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
eafe8130
TL
3662
3663 keys = list(bucket.list())
3664 # TODO: use exact match
3665 receiver.verify_s3_events(keys, exact_match=False);
3666
3667 # update notification to use topic2
3668 topic_conf_list = [{'Id': notification_name,
3669 'TopicArn': topic_arn2,
3670 'Events': ['s3:ObjectCreated:*']
3671 }]
9f95a23c 3672 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130 3673 _, status = s3_notification_conf.set_config()
11fdf7f2 3674 assert_equal(status/100, 2)
eafe8130
TL
3675
3676 # delete current objects and create new objects in the bucket
3677 for key in bucket.list():
3678 key.delete()
3679 for i in range(number_of_objects):
3680 key = bucket.new_key(str(i+100))
3681 key.set_contents_from_string('bar')
11fdf7f2 3682 # wait for sync
9f95a23c
TL
3683 zone_meta_checkpoint(ps_zone.zone)
3684 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
11fdf7f2 3685
eafe8130
TL
3686 keys = list(bucket.list())
3687 # check that updates switched to http
3688 # TODO: use exact match
3689 http_server.verify_s3_events(keys, exact_match=False)
11fdf7f2 3690
11fdf7f2 3691 # cleanup
eafe8130
TL
3692 # delete objects from the bucket
3693 stop_amqp_receiver(receiver, amqp_task)
11fdf7f2
TL
3694 for key in bucket.list():
3695 key.delete()
eafe8130
TL
3696 s3_notification_conf.del_config()
3697 topic_conf1.del_config()
3698 topic_conf2.del_config()
9f95a23c 3699 master_zone.delete_bucket(bucket_name)
eafe8130
TL
3700 http_server.close()
3701 clean_rabbitmq(rabbit_proc)
11fdf7f2
TL
3702
3703
eafe8130
TL
3704def test_ps_s3_multiple_topics_notification():
3705 """ test notification creation with multiple topics"""
3706 if skip_push_tests:
3707 return SkipTest("PubSub push tests don't run in teuthology")
3708 hostname = get_ip()
3709 rabbit_proc = init_rabbitmq()
3710 if rabbit_proc is None:
3711 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3712
9f95a23c 3713 master_zone, ps_zone = init_env()
11fdf7f2 3714 bucket_name = gen_bucket_name()
eafe8130
TL
3715 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
3716 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
3717
3718 # create topics
3719 # start amqp receiver in a separate thread
3720 exchange = 'ex1'
3721 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
3722 amqp_task.start()
3723 # create random port for the http server
3724 http_port = random.randint(10000, 20000)
3725 # start an http server in a separate thread
3726 http_server = StreamingHTTPServer(hostname, http_port)
3727
9f95a23c 3728 topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
eafe8130
TL
3729 endpoint='amqp://' + hostname,
3730 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3731 result, status = topic_conf1.set_config()
3732 parsed_result = json.loads(result)
3733 topic_arn1 = parsed_result['arn']
3734 assert_equal(status/100, 2)
9f95a23c 3735 topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
eafe8130
TL
3736 endpoint='http://'+hostname+':'+str(http_port))
3737 result, status = topic_conf2.set_config()
3738 parsed_result = json.loads(result)
3739 topic_arn2 = parsed_result['arn']
3740 assert_equal(status/100, 2)
11fdf7f2 3741
11fdf7f2 3742 # create bucket on the first of the rados zones
9f95a23c 3743 bucket = master_zone.create_bucket(bucket_name)
11fdf7f2 3744 # wait for sync
9f95a23c 3745 zone_meta_checkpoint(ps_zone.zone)
eafe8130
TL
3746 # create s3 notification
3747 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
3748 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
3749 topic_conf_list = [
3750 {
3751 'Id': notification_name1,
3752 'TopicArn': topic_arn1,
3753 'Events': ['s3:ObjectCreated:*']
3754 },
3755 {
3756 'Id': notification_name2,
3757 'TopicArn': topic_arn2,
3758 'Events': ['s3:ObjectCreated:*']
3759 }]
9f95a23c 3760 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
eafe8130 3761 _, status = s3_notification_conf.set_config()
11fdf7f2 3762 assert_equal(status/100, 2)
eafe8130
TL
3763 result, _ = s3_notification_conf.get_config()
3764 assert_equal(len(result['TopicConfigurations']), 2)
3765 assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1)
3766 assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
3767
3768 # get auto-generated subscriptions
9f95a23c 3769 sub_conf1 = PSSubscription(ps_zone.conn, notification_name1,
eafe8130
TL
3770 topic_name1)
3771 _, status = sub_conf1.get_config()
11fdf7f2 3772 assert_equal(status/100, 2)
9f95a23c 3773 sub_conf2 = PSSubscription(ps_zone.conn, notification_name2,
eafe8130
TL
3774 topic_name2)
3775 _, status = sub_conf2.get_config()
3776 assert_equal(status/100, 2)
3777
11fdf7f2 3778 # create objects in the bucket
eafe8130
TL
3779 number_of_objects = 10
3780 for i in range(number_of_objects):
3781 key = bucket.new_key(str(i))
3782 key.set_contents_from_string('bar')
11fdf7f2 3783 # wait for sync
9f95a23c 3784 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
11fdf7f2 3785
eafe8130
TL
3786 # get the events from both of the subscription
3787 result, _ = sub_conf1.get_events()
92f5a8d4
TL
3788 records = json.loads(result)
3789 for record in records['Records']:
eafe8130
TL
3790 log.debug(record)
3791 keys = list(bucket.list())
3792 # TODO: use exact match
92f5a8d4 3793 verify_s3_records_by_elements(records, keys, exact_match=False)
eafe8130
TL
3794 receiver.verify_s3_events(keys, exact_match=False)
3795
3796 result, _ = sub_conf2.get_events()
3797 parsed_result = json.loads(result)
3798 for record in parsed_result['Records']:
3799 log.debug(record)
3800 keys = list(bucket.list())
3801 # TODO: use exact match
92f5a8d4 3802 verify_s3_records_by_elements(records, keys, exact_match=False)
eafe8130 3803 http_server.verify_s3_events(keys, exact_match=False)
11fdf7f2 3804
11fdf7f2 3805 # cleanup
eafe8130
TL
3806 stop_amqp_receiver(receiver, amqp_task)
3807 s3_notification_conf.del_config()
3808 topic_conf1.del_config()
3809 topic_conf2.del_config()
3810 # delete objects from the bucket
3811 for key in bucket.list():
3812 key.delete()
9f95a23c 3813 master_zone.delete_bucket(bucket_name)
eafe8130
TL
3814 http_server.close()
3815 clean_rabbitmq(rabbit_proc)