12 from random
import randint
13 from .tests
import get_realm
, \
15 zonegroup_meta_checkpoint
, \
16 zone_meta_checkpoint
, \
17 zone_bucket_checkpoint
, \
18 zone_data_checkpoint
, \
19 zonegroup_bucket_checkpoint
, \
24 from .zone_ps
import PSTopic
, \
29 print_connection_info
, \
30 delete_all_s3_topics
, \
34 from multisite
import User
35 from nose
import SkipTest
36 from nose
.tools
import assert_not_equal
, assert_equal
37 import boto
.s3
.tagging
39 # configure logging for the tests module
40 log
= logging
.getLogger(__name__
)
42 skip_push_tests
= True
44 ####################################
45 # utility functions for pubsub tests
46 ####################################
48 def set_contents_from_string(key
, content
):
50 key
.set_contents_from_string(content
)
51 except Exception as e
:
52 print('Error: ' + str(e
))
55 # HTTP endpoint functions
56 # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
58 class HTTPPostHandler(BaseHTTPServer
.BaseHTTPRequestHandler
):
59 """HTTP POST hanler class storing the received events in its http server"""
61 """implementation of POST handler"""
63 content_length
= int(self
.headers
['Content-Length'])
64 body
= self
.rfile
.read(content_length
)
65 log
.info('HTTP Server (%d) received event: %s', self
.server
.worker_id
, str(body
))
66 self
.server
.append(json
.loads(body
))
68 log
.error('HTTP Server received empty event')
69 self
.send_response(400)
71 self
.send_response(100)
76 class HTTPServerWithEvents(BaseHTTPServer
.HTTPServer
):
77 """HTTP server used by the handler to store events"""
78 def __init__(self
, addr
, handler
, worker_id
):
79 BaseHTTPServer
.HTTPServer
.__init
__(self
, addr
, handler
, False)
80 self
.worker_id
= worker_id
83 def append(self
, event
):
84 self
.events
.append(event
)
87 class HTTPServerThread(threading
.Thread
):
88 """thread for running the HTTP server. reusing the same socket for all threads"""
89 def __init__(self
, i
, sock
, addr
):
90 threading
.Thread
.__init
__(self
)
93 self
.httpd
= HTTPServerWithEvents(addr
, HTTPPostHandler
, i
)
94 self
.httpd
.socket
= sock
95 # prevent the HTTP server from re-binding every handler
96 self
.httpd
.server_bind
= self
.server_close
= lambda self
: None
101 log
.info('HTTP Server (%d) started on: %s', self
.i
, self
.httpd
.server_address
)
102 self
.httpd
.serve_forever()
103 log
.info('HTTP Server (%d) ended', self
.i
)
104 except Exception as error
:
105 # could happen if the server r/w to a closing socket during shutdown
106 log
.info('HTTP Server (%d) ended unexpectedly: %s', self
.i
, str(error
))
109 self
.httpd
.shutdown()
111 def get_events(self
):
112 return self
.httpd
.events
114 def reset_events(self
):
115 self
.httpd
.events
= []
118 class StreamingHTTPServer
:
119 """multi-threaded http server class also holding list of events received into the handler
120 each thread has its own server, and all servers share the same socket"""
121 def __init__(self
, host
, port
, num_workers
=100):
123 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
124 self
.sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
126 self
.sock
.listen(num_workers
)
127 self
.workers
= [HTTPServerThread(i
, self
.sock
, addr
) for i
in range(num_workers
)]
129 def verify_s3_events(self
, keys
, exact_match
=False, deletions
=False):
130 """verify stored s3 records agains a list of keys"""
132 for worker
in self
.workers
:
133 events
+= worker
.get_events()
134 worker
.reset_events()
135 verify_s3_records_by_elements(events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
137 def verify_events(self
, keys
, exact_match
=False, deletions
=False):
138 """verify stored events agains a list of keys"""
140 for worker
in self
.workers
:
141 events
+= worker
.get_events()
142 worker
.reset_events()
143 verify_events_by_elements(events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
145 def get_and_reset_events(self
):
147 for worker
in self
.workers
:
148 events
+= worker
.get_events()
149 worker
.reset_events()
153 """close all workers in the http server and wait for it to finish"""
154 # make sure that the shared socket is closed
155 # this is needed in case that one of the threads is blocked on the socket
156 self
.sock
.shutdown(socket
.SHUT_RDWR
)
158 # wait for server threads to finish
159 for worker
in self
.workers
:
163 # AMQP endpoint functions
167 class AMQPReceiver(object):
168 """class for receiving and storing messages on a topic from the AMQP broker"""
169 def __init__(self
, exchange
, topic
):
172 remaining_retries
= 10
173 while remaining_retries
> 0:
175 connection
= pika
.BlockingConnection(pika
.ConnectionParameters(host
=hostname
, port
=rabbitmq_port
))
177 except Exception as error
:
178 remaining_retries
-= 1
179 print('failed to connect to rabbitmq (remaining retries '
180 + str(remaining_retries
) + '): ' + str(error
))
182 if remaining_retries
== 0:
183 raise Exception('failed to connect to rabbitmq - no retries left')
185 self
.channel
= connection
.channel()
186 self
.channel
.exchange_declare(exchange
=exchange
, exchange_type
='topic', durable
=True)
187 result
= self
.channel
.queue_declare('', exclusive
=True)
188 queue_name
= result
.method
.queue
189 self
.channel
.queue_bind(exchange
=exchange
, queue
=queue_name
, routing_key
=topic
)
190 self
.channel
.basic_consume(queue
=queue_name
,
191 on_message_callback
=self
.on_message
,
196 def on_message(self
, ch
, method
, properties
, body
):
197 """callback invoked when a new message arrive on the topic"""
198 log
.info('AMQP received event for topic %s:\n %s', self
.topic
, body
)
199 self
.events
.append(json
.loads(body
))
201 # TODO create a base class for the AMQP and HTTP cases
202 def verify_s3_events(self
, keys
, exact_match
=False, deletions
=False):
203 """verify stored s3 records agains a list of keys"""
204 verify_s3_records_by_elements(self
.events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
207 def verify_events(self
, keys
, exact_match
=False, deletions
=False):
208 """verify stored events agains a list of keys"""
209 verify_events_by_elements(self
.events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
212 def get_and_reset_events(self
):
218 def amqp_receiver_thread_runner(receiver
):
219 """main thread function for the amqp receiver"""
221 log
.info('AMQP receiver started')
222 receiver
.channel
.start_consuming()
223 log
.info('AMQP receiver ended')
224 except Exception as error
:
225 log
.info('AMQP receiver ended unexpectedly: %s', str(error
))
228 def create_amqp_receiver_thread(exchange
, topic
):
229 """create amqp receiver and thread"""
230 receiver
= AMQPReceiver(exchange
, topic
)
231 task
= threading
.Thread(target
=amqp_receiver_thread_runner
, args
=(receiver
,))
233 return task
, receiver
236 def stop_amqp_receiver(receiver
, task
):
237 """stop the receiver thread and wait for it to finis"""
239 receiver
.channel
.stop_consuming()
240 log
.info('stopping AMQP receiver')
241 except Exception as error
:
242 log
.info('failed to gracefuly stop AMQP receiver: %s', str(error
))
245 def check_ps_configured():
246 """check if at least one pubsub zone exist"""
248 zonegroup
= realm
.master_zonegroup()
250 ps_zones
= zonegroup
.zones_by_type
.get("pubsub")
252 raise SkipTest("Requires at least one PS zone")
255 def is_ps_zone(zone_conn
):
256 """check if a specific zone is pubsub zone"""
259 return zone_conn
.zone
.tier_type() == "pubsub"
262 def verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=False):
263 """ verify there is at least one event per element """
267 if type(events
) is list:
268 for event_list
in events
:
271 for event
in event_list
['events']:
272 if event
['info']['bucket']['name'] == key
.bucket
.name
and \
273 event
['info']['key']['name'] == key
.name
:
274 if deletions
and event
['event'] == 'OBJECT_DELETE':
277 elif not deletions
and event
['event'] == 'OBJECT_CREATE':
281 for event
in events
['events']:
282 if event
['info']['bucket']['name'] == key
.bucket
.name
and \
283 event
['info']['key']['name'] == key
.name
:
284 if deletions
and event
['event'] == 'OBJECT_DELETE':
287 elif not deletions
and event
['event'] == 'OBJECT_CREATE':
292 err
= 'no ' + ('deletion' if deletions
else 'creation') + ' event found for key: ' + str(key
)
296 if not len(events
) == len(keys
):
297 err
= 'superfluous events are found'
304 def verify_s3_records_by_elements(records
, keys
, exact_match
=False, deletions
=False):
305 """ verify there is at least one record per element """
309 if type(records
) is list:
310 for record_list
in records
:
313 for record
in record_list
['Records']:
314 if record
['s3']['bucket']['name'] == key
.bucket
.name
and \
315 record
['s3']['object']['key'] == key
.name
:
316 if deletions
and 'ObjectRemoved' in record
['eventName']:
319 elif not deletions
and 'ObjectCreated' in record
['eventName']:
323 for record
in records
['Records']:
324 if record
['s3']['bucket']['name'] == key
.bucket
.name
and \
325 record
['s3']['object']['key'] == key
.name
:
326 if deletions
and 'ObjectRemoved' in record
['eventName']:
329 elif not deletions
and 'ObjectCreated' in record
['eventName']:
334 err
= 'no ' + ('deletion' if deletions
else 'creation') + ' event found for key: ' + str(key
)
335 for record_list
in records
:
336 for record
in record_list
['Records']:
337 log
.error(str(record
['s3']['bucket']['name']) + ',' + str(record
['s3']['object']['key']))
340 if not len(records
) == len(keys
):
341 err
= 'superfluous records are found'
344 for record_list
in records
:
345 for record
in record_list
['Records']:
346 log
.error(str(record
['s3']['bucket']['name']) + ',' + str(record
['s3']['object']['key']))
351 """ start a rabbitmq broker """
353 #port = str(random.randint(20000, 30000))
354 #data_dir = './' + port + '_data'
355 #log_dir = './' + port + '_log'
361 # print('rabbitmq directories already exists')
362 #env = {'RABBITMQ_NODE_PORT': port,
363 # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
364 # 'RABBITMQ_USE_LONGNAME': 'true',
365 # 'RABBITMQ_MNESIA_BASE': data_dir,
366 # 'RABBITMQ_LOG_BASE': log_dir}
367 # TODO: support multiple brokers per host using env
368 # make sure we don't collide with the default
370 proc
= subprocess
.Popen('rabbitmq-server')
371 except Exception as error
:
372 log
.info('failed to execute rabbitmq-server: %s', str(error
))
373 print('failed to execute rabbitmq-server: %s' % str(error
))
375 # TODO add rabbitmq checkpoint instead of sleep
377 return proc
#, port, data_dir, log_dir
380 def clean_rabbitmq(proc
): #, data_dir, log_dir)
381 """ stop the rabbitmq broker """
383 subprocess
.call(['rabbitmqctl', 'stop'])
387 log
.info('rabbitmq server already terminated')
388 # TODO: add directory cleanup once multiple brokers are supported
393 # log.info('rabbitmq directories already removed')
396 # Kafka endpoint functions
398 kafka_server
= 'localhost'
400 class KafkaReceiver(object):
401 """class for receiving and storing messages on a topic from the kafka broker"""
402 def __init__(self
, topic
, security_type
):
403 from kafka
import KafkaConsumer
404 remaining_retries
= 10
406 if security_type
!= 'PLAINTEXT':
407 security_type
= 'SSL'
409 while remaining_retries
> 0:
411 self
.consumer
= KafkaConsumer(topic
, bootstrap_servers
= kafka_server
+':'+str(port
), security_protocol
=security_type
)
412 print('Kafka consumer created on topic: '+topic
)
414 except Exception as error
:
415 remaining_retries
-= 1
416 print('failed to connect to kafka (remaining retries '
417 + str(remaining_retries
) + '): ' + str(error
))
420 if remaining_retries
== 0:
421 raise Exception('failed to connect to kafka - no retries left')
427 def verify_s3_events(self
, keys
, exact_match
=False, deletions
=False):
428 """verify stored s3 records agains a list of keys"""
429 verify_s3_records_by_elements(self
.events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
433 def kafka_receiver_thread_runner(receiver
):
434 """main thread function for the kafka receiver"""
436 log
.info('Kafka receiver started')
437 print('Kafka receiver started')
438 while not receiver
.stop
:
439 for msg
in receiver
.consumer
:
440 receiver
.events
.append(json
.loads(msg
.value
))
442 log
.info('Kafka receiver ended')
443 print('Kafka receiver ended')
444 except Exception as error
:
445 log
.info('Kafka receiver ended unexpectedly: %s', str(error
))
446 print('Kafka receiver ended unexpectedly: ' + str(error
))
449 def create_kafka_receiver_thread(topic
, security_type
='PLAINTEXT'):
450 """create kafka receiver and thread"""
451 receiver
= KafkaReceiver(topic
, security_type
)
452 task
= threading
.Thread(target
=kafka_receiver_thread_runner
, args
=(receiver
,))
454 return task
, receiver
456 def stop_kafka_receiver(receiver
, task
):
457 """stop the receiver thread and wait for it to finis"""
461 receiver
.consumer
.close()
462 except Exception as error
:
463 log
.info('failed to gracefuly stop Kafka receiver: %s', str(error
))
466 # follow the instruction here to create and sign a broker certificate:
467 # https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka
469 # the generated broker certificate should be stored in the java keystore for the use of the server
470 # assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost"
471 # following lines must be added to $KAFKA_DIR/config/server.properties
472 # listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
473 # sasl.enabled.mechanisms=PLAIN
474 # ssl.keystore.location = $KAFKA_DIR/server.keystore.jks
475 # ssl.keystore.password = abcdefgh
476 # ssl.key.password = abcdefgh
477 # ssl.truststore.location = $KAFKA_DIR/server.truststore.jks
478 # ssl.truststore.password = abcdefgh
481 # (1) we dont test client authentication, hence, no need to generate client keys
482 # (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above
483 # should be copied to: $KAFKA_DIR
486 """ start kafka/zookeeper """
488 KAFKA_DIR
= os
.environ
['KAFKA_DIR']
493 log
.info('KAFKA_DIR must be set to where kafka is installed')
494 print('KAFKA_DIR must be set to where kafka is installed')
495 return None, None, None
497 DEVNULL
= open(os
.devnull
, 'wb')
499 print('\nStarting zookeeper...')
501 zk_proc
= subprocess
.Popen([KAFKA_DIR
+'bin/zookeeper-server-start.sh', KAFKA_DIR
+'config/zookeeper.properties'], stdout
=DEVNULL
)
502 except Exception as error
:
503 log
.info('failed to execute zookeeper: %s', str(error
))
504 print('failed to execute zookeeper: %s' % str(error
))
505 return None, None, None
508 if zk_proc
.poll() is not None:
509 print('zookeeper failed to start')
510 return None, None, None
511 print('Zookeeper started')
512 print('Starting kafka...')
513 kafka_log
= open('./kafka.log', 'w')
515 kafka_env
= os
.environ
.copy()
516 kafka_env
['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR
+'config/kafka_server_jaas.conf'
517 kafka_proc
= subprocess
.Popen([
518 KAFKA_DIR
+'bin/kafka-server-start.sh',
519 KAFKA_DIR
+'config/server.properties'],
522 except Exception as error
:
523 log
.info('failed to execute kafka: %s', str(error
))
524 print('failed to execute kafka: %s' % str(error
))
527 return None, None, None
529 # TODO add kafka checkpoint instead of sleep
531 if kafka_proc
.poll() is not None:
533 print('kafka failed to start. details in: ./kafka.log')
535 return None, None, None
537 print('Kafka started')
538 return kafka_proc
, zk_proc
, kafka_log
541 def clean_kafka(kafka_proc
, zk_proc
, kafka_log
):
542 """ stop kafka/zookeeper """
545 print('Shutdown Kafka...')
546 kafka_proc
.terminate()
548 if kafka_proc
.poll() is None:
549 print('Failed to shutdown Kafka... killing')
551 print('Shutdown zookeeper...')
554 if zk_proc
.poll() is None:
555 print('Failed to shutdown zookeeper... killing')
558 log
.info('kafka/zookeeper already terminated')
561 def init_env(require_ps
=True):
562 """initialize the environment"""
564 check_ps_configured()
567 zonegroup
= realm
.master_zonegroup()
568 zonegroup_conns
= ZonegroupConns(zonegroup
)
570 zonegroup_meta_checkpoint(zonegroup
)
574 for conn
in zonegroup_conns
.zones
:
575 if conn
.zone
== zonegroup
.master_zone
:
578 zone_meta_checkpoint(conn
.zone
)
581 assert_not_equal(master_zone
, None)
583 assert_not_equal(ps_zone
, None)
584 return master_zone
, ps_zone
588 """ This method returns the "primary" IP on the local box (the one with a default route)
589 source: https://stackoverflow.com/a/28950776/711085
590 this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
591 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
593 # address should not be reachable
594 s
.connect(('10.255.255.255', 1))
595 ip
= s
.getsockname()[0]
601 TOPIC_SUFFIX
= "_topic"
603 NOTIFICATION_SUFFIX
= "_notif"
610 """ log information for manual testing """
611 return SkipTest("only used in manual testing")
612 master_zone
, ps_zone
= init_env()
614 zonegroup
= realm
.master_zonegroup()
615 bucket_name
= gen_bucket_name()
616 # create bucket on the first of the rados zones
617 bucket
= master_zone
.create_bucket(bucket_name
)
618 # create objects in the bucket
619 number_of_objects
= 10
620 for i
in range(number_of_objects
):
621 key
= bucket
.new_key(str(i
))
622 key
.set_contents_from_string('bar')
623 print('Zonegroup: ' + zonegroup
.name
)
624 print('user: ' + get_user())
625 print('tenant: ' + get_tenant())
627 print_connection_info(master_zone
.conn
)
629 print_connection_info(ps_zone
.conn
)
630 print('Bucket: ' + bucket_name
)
633 def test_ps_s3_notification_low_level():
634 """ test low level implementation of s3 notifications """
635 master_zone
, ps_zone
= init_env()
636 bucket_name
= gen_bucket_name()
637 # create bucket on the first of the rados zones
638 master_zone
.create_bucket(bucket_name
)
640 zone_meta_checkpoint(ps_zone
.zone
)
642 topic_name
= bucket_name
+ TOPIC_SUFFIX
643 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
644 result
, status
= topic_conf
.set_config()
645 assert_equal(status
/100, 2)
646 parsed_result
= json
.loads(result
)
647 topic_arn
= parsed_result
['arn']
648 # create s3 notification
649 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
650 generated_topic_name
= notification_name
+'_'+topic_name
651 topic_conf_list
= [{'Id': notification_name
,
652 'TopicArn': topic_arn
,
653 'Events': ['s3:ObjectCreated:*']
655 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
656 _
, status
= s3_notification_conf
.set_config()
657 assert_equal(status
/100, 2)
658 zone_meta_checkpoint(ps_zone
.zone
)
659 # get auto-generated topic
660 generated_topic_conf
= PSTopic(ps_zone
.conn
, generated_topic_name
)
661 result
, status
= generated_topic_conf
.get_config()
662 parsed_result
= json
.loads(result
)
663 assert_equal(status
/100, 2)
664 assert_equal(parsed_result
['topic']['name'], generated_topic_name
)
665 # get auto-generated notification
666 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
667 generated_topic_name
)
668 result
, status
= notification_conf
.get_config()
669 parsed_result
= json
.loads(result
)
670 assert_equal(status
/100, 2)
671 assert_equal(len(parsed_result
['topics']), 1)
672 # get auto-generated subscription
673 sub_conf
= PSSubscription(ps_zone
.conn
, notification_name
,
674 generated_topic_name
)
675 result
, status
= sub_conf
.get_config()
676 parsed_result
= json
.loads(result
)
677 assert_equal(status
/100, 2)
678 assert_equal(parsed_result
['topic'], generated_topic_name
)
679 # delete s3 notification
680 _
, status
= s3_notification_conf
.del_config(notification
=notification_name
)
681 assert_equal(status
/100, 2)
683 _
, status
= topic_conf
.del_config()
684 assert_equal(status
/100, 2)
686 # verify low-level cleanup
687 _
, status
= generated_topic_conf
.get_config()
688 assert_equal(status
, 404)
689 result
, status
= notification_conf
.get_config()
690 parsed_result
= json
.loads(result
)
691 assert_equal(len(parsed_result
['topics']), 0)
692 # TODO should return 404
693 # assert_equal(status, 404)
694 result
, status
= sub_conf
.get_config()
695 parsed_result
= json
.loads(result
)
696 assert_equal(parsed_result
['topic'], '')
697 # TODO should return 404
698 # assert_equal(status, 404)
701 topic_conf
.del_config()
703 master_zone
.delete_bucket(bucket_name
)
706 def test_ps_s3_notification_records():
707 """ test s3 records fetching """
708 master_zone
, ps_zone
= init_env()
709 bucket_name
= gen_bucket_name()
710 # create bucket on the first of the rados zones
711 bucket
= master_zone
.create_bucket(bucket_name
)
713 zone_meta_checkpoint(ps_zone
.zone
)
715 topic_name
= bucket_name
+ TOPIC_SUFFIX
716 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
717 result
, status
= topic_conf
.set_config()
718 assert_equal(status
/100, 2)
719 parsed_result
= json
.loads(result
)
720 topic_arn
= parsed_result
['arn']
721 # create s3 notification
722 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
723 topic_conf_list
= [{'Id': notification_name
,
724 'TopicArn': topic_arn
,
725 'Events': ['s3:ObjectCreated:*']
727 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
728 _
, status
= s3_notification_conf
.set_config()
729 assert_equal(status
/100, 2)
730 zone_meta_checkpoint(ps_zone
.zone
)
731 # get auto-generated subscription
732 sub_conf
= PSSubscription(ps_zone
.conn
, notification_name
,
734 _
, status
= sub_conf
.get_config()
735 assert_equal(status
/100, 2)
736 # create objects in the bucket
737 number_of_objects
= 10
738 for i
in range(number_of_objects
):
739 key
= bucket
.new_key(str(i
))
740 key
.set_contents_from_string('bar')
742 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
744 # get the events from the subscription
745 result
, _
= sub_conf
.get_events()
746 records
= json
.loads(result
)
747 for record
in records
['Records']:
749 keys
= list(bucket
.list())
750 # TODO: use exact match
751 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
754 _
, status
= s3_notification_conf
.del_config()
755 topic_conf
.del_config()
757 for key
in bucket
.list():
759 master_zone
.delete_bucket(bucket_name
)
762 def test_ps_s3_notification():
763 """ test s3 notification set/get/delete """
764 master_zone
, ps_zone
= init_env()
765 bucket_name
= gen_bucket_name()
766 # create bucket on the first of the rados zones
767 master_zone
.create_bucket(bucket_name
)
769 zone_meta_checkpoint(ps_zone
.zone
)
770 topic_name
= bucket_name
+ TOPIC_SUFFIX
772 topic_name
= bucket_name
+ TOPIC_SUFFIX
773 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
774 response
, status
= topic_conf
.set_config()
775 assert_equal(status
/100, 2)
776 parsed_result
= json
.loads(response
)
777 topic_arn
= parsed_result
['arn']
778 # create one s3 notification
779 notification_name1
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_1'
780 topic_conf_list
= [{'Id': notification_name1
,
781 'TopicArn': topic_arn
,
782 'Events': ['s3:ObjectCreated:*']
784 s3_notification_conf1
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
785 response
, status
= s3_notification_conf1
.set_config()
786 assert_equal(status
/100, 2)
787 # create another s3 notification with the same topic
788 notification_name2
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_2'
789 topic_conf_list
= [{'Id': notification_name2
,
790 'TopicArn': topic_arn
,
791 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
793 s3_notification_conf2
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
794 response
, status
= s3_notification_conf2
.set_config()
795 assert_equal(status
/100, 2)
796 zone_meta_checkpoint(ps_zone
.zone
)
798 # get all notification on a bucket
799 response
, status
= s3_notification_conf1
.get_config()
800 assert_equal(status
/100, 2)
801 assert_equal(len(response
['TopicConfigurations']), 2)
802 assert_equal(response
['TopicConfigurations'][0]['TopicArn'], topic_arn
)
803 assert_equal(response
['TopicConfigurations'][1]['TopicArn'], topic_arn
)
805 # get specific notification on a bucket
806 response
, status
= s3_notification_conf1
.get_config(notification
=notification_name1
)
807 assert_equal(status
/100, 2)
808 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn
)
809 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1
)
810 response
, status
= s3_notification_conf2
.get_config(notification
=notification_name2
)
811 assert_equal(status
/100, 2)
812 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn
)
813 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2
)
815 # delete specific notifications
816 _
, status
= s3_notification_conf1
.del_config(notification
=notification_name1
)
817 assert_equal(status
/100, 2)
818 _
, status
= s3_notification_conf2
.del_config(notification
=notification_name2
)
819 assert_equal(status
/100, 2)
822 topic_conf
.del_config()
824 master_zone
.delete_bucket(bucket_name
)
827 def test_ps_s3_topic_on_master():
828 """ test s3 topics set/get/delete on master """
829 master_zone
, _
= init_env(require_ps
=False)
831 zonegroup
= realm
.master_zonegroup()
832 bucket_name
= gen_bucket_name()
833 topic_name
= bucket_name
+ TOPIC_SUFFIX
836 delete_all_s3_topics(master_zone
, zonegroup
.name
)
839 endpoint_address
= 'amqp://127.0.0.1:7001'
840 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
841 topic_conf1
= PSTopicS3(master_zone
.conn
, topic_name
+'_1', zonegroup
.name
, endpoint_args
=endpoint_args
)
842 topic_arn
= topic_conf1
.set_config()
843 assert_equal(topic_arn
,
844 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
+ '_1')
846 endpoint_address
= 'http://127.0.0.1:9001'
847 endpoint_args
= 'push-endpoint='+endpoint_address
848 topic_conf2
= PSTopicS3(master_zone
.conn
, topic_name
+'_2', zonegroup
.name
, endpoint_args
=endpoint_args
)
849 topic_arn
= topic_conf2
.set_config()
850 assert_equal(topic_arn
,
851 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
+ '_2')
852 endpoint_address
= 'http://127.0.0.1:9002'
853 endpoint_args
= 'push-endpoint='+endpoint_address
854 topic_conf3
= PSTopicS3(master_zone
.conn
, topic_name
+'_3', zonegroup
.name
, endpoint_args
=endpoint_args
)
855 topic_arn
= topic_conf3
.set_config()
856 assert_equal(topic_arn
,
857 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
+ '_3')
860 result
, status
= topic_conf3
.get_config()
861 assert_equal(status
, 200)
862 assert_equal(topic_arn
, result
['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
863 assert_equal(endpoint_address
, result
['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
864 # Note that endpoint args may be ordered differently in the result
867 result
= topic_conf1
.del_config()
868 assert_equal(status
, 200)
870 # try to get a deleted topic
871 _
, status
= topic_conf1
.get_config()
872 assert_equal(status
, 404)
874 # get the remaining 2 topics
875 result
, status
= topic_conf1
.get_list()
876 assert_equal(status
, 200)
877 assert_equal(len(result
['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
880 result
= topic_conf2
.del_config()
881 # TODO: should be 200OK
882 # assert_equal(status, 200)
883 result
= topic_conf3
.del_config()
884 # TODO: should be 200OK
885 # assert_equal(status, 200)
887 # get topic list, make sure it is empty
888 result
, status
= topic_conf1
.get_list()
889 assert_equal(result
['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
892 def test_ps_s3_topic_with_secret_on_master():
893 """ test s3 topics with secret set/get/delete on master """
894 master_zone
, _
= init_env(require_ps
=False)
895 if master_zone
.secure_conn
is None:
896 return SkipTest('secure connection is needed to test topic with secrets')
899 zonegroup
= realm
.master_zonegroup()
900 bucket_name
= gen_bucket_name()
901 topic_name
= bucket_name
+ TOPIC_SUFFIX
904 delete_all_s3_topics(master_zone
, zonegroup
.name
)
907 endpoint_address
= 'amqp://user:password@127.0.0.1:7001'
908 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
909 bad_topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
911 result
= bad_topic_conf
.set_config()
912 except Exception as err
:
913 print('Error is expected: ' + str(err
))
915 assert False, 'user password configuration set allowed only over HTTPS'
917 topic_conf
= PSTopicS3(master_zone
.secure_conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
918 topic_arn
= topic_conf
.set_config()
920 assert_equal(topic_arn
,
921 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
)
923 _
, status
= bad_topic_conf
.get_config()
924 assert_equal(status
/100, 4)
927 result
, status
= topic_conf
.get_config()
928 assert_equal(status
, 200)
929 assert_equal(topic_arn
, result
['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
930 assert_equal(endpoint_address
, result
['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
932 _
, status
= bad_topic_conf
.get_config()
933 assert_equal(status
/100, 4)
935 _
, status
= topic_conf
.get_list()
936 assert_equal(status
/100, 2)
939 result
= topic_conf
.del_config()
942 def test_ps_s3_notification_on_master():
943 """ test s3 notification set/get/delete on master """
944 master_zone
, _
= init_env(require_ps
=False)
946 zonegroup
= realm
.master_zonegroup()
947 bucket_name
= gen_bucket_name()
949 bucket
= master_zone
.create_bucket(bucket_name
)
950 topic_name
= bucket_name
+ TOPIC_SUFFIX
952 endpoint_address
= 'amqp://127.0.0.1:7001'
953 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
954 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
955 topic_arn
= topic_conf
.set_config()
956 # create s3 notification
957 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
958 topic_conf_list
= [{'Id': notification_name
+'_1',
959 'TopicArn': topic_arn
,
960 'Events': ['s3:ObjectCreated:*']
962 {'Id': notification_name
+'_2',
963 'TopicArn': topic_arn
,
964 'Events': ['s3:ObjectRemoved:*']
966 {'Id': notification_name
+'_3',
967 'TopicArn': topic_arn
,
970 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
971 _
, status
= s3_notification_conf
.set_config()
972 assert_equal(status
/100, 2)
974 # get notifications on a bucket
975 response
, status
= s3_notification_conf
.get_config(notification
=notification_name
+'_1')
976 assert_equal(status
/100, 2)
977 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn
)
979 # delete specific notifications
980 _
, status
= s3_notification_conf
.del_config(notification
=notification_name
+'_1')
981 assert_equal(status
/100, 2)
983 # get the remaining 2 notifications on a bucket
984 response
, status
= s3_notification_conf
.get_config()
985 assert_equal(status
/100, 2)
986 assert_equal(len(response
['TopicConfigurations']), 2)
987 assert_equal(response
['TopicConfigurations'][0]['TopicArn'], topic_arn
)
988 assert_equal(response
['TopicConfigurations'][1]['TopicArn'], topic_arn
)
990 # delete remaining notifications
991 _
, status
= s3_notification_conf
.del_config()
992 assert_equal(status
/100, 2)
994 # make sure that the notifications are now deleted
995 _
, status
= s3_notification_conf
.get_config()
998 topic_conf
.del_config()
1000 master_zone
.delete_bucket(bucket_name
)
1003 def ps_s3_notification_filter(on_master
):
1004 """ test s3 notification filter on master """
1006 return SkipTest("PubSub push tests don't run in teuthology")
1008 proc
= init_rabbitmq()
1010 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1012 master_zone
, _
= init_env(require_ps
=False)
1013 ps_zone
= master_zone
1015 master_zone
, ps_zone
= init_env(require_ps
=True)
1019 zonegroup
= realm
.master_zonegroup()
1022 bucket_name
= gen_bucket_name()
1023 bucket
= master_zone
.create_bucket(bucket_name
)
1024 topic_name
= bucket_name
+ TOPIC_SUFFIX
1026 # start amqp receivers
1028 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
1032 endpoint_address
= 'amqp://' + hostname
1033 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
1035 topic_conf
= PSTopicS3(ps_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1036 topic_arn
= topic_conf
.set_config()
1038 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
, endpoint
=endpoint_address
, endpoint_args
=endpoint_args
)
1039 result
, _
= topic_conf
.set_config()
1040 parsed_result
= json
.loads(result
)
1041 topic_arn
= parsed_result
['arn']
1042 zone_meta_checkpoint(ps_zone
.zone
)
1044 # create s3 notification
1045 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1046 topic_conf_list
= [{'Id': notification_name
+'_1',
1047 'TopicArn': topic_arn
,
1048 'Events': ['s3:ObjectCreated:*'],
1051 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
1055 {'Id': notification_name
+'_2',
1056 'TopicArn': topic_arn
,
1057 'Events': ['s3:ObjectCreated:*'],
1060 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
1061 {'Name': 'suffix', 'Value': 'log'}]
1065 {'Id': notification_name
+'_3',
1066 'TopicArn': topic_arn
,
1070 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
1075 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
1076 result
, status
= s3_notification_conf
.set_config()
1077 assert_equal(status
/100, 2)
1080 topic_conf_list
= [{'Id': notification_name
+'_4',
1081 'TopicArn': topic_arn
,
1082 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
1085 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
1086 {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
1089 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
1095 s3_notification_conf4
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
1096 _
, status
= s3_notification_conf4
.set_config()
1097 assert_equal(status
/100, 2)
1099 except Exception as error
:
1100 print('note: metadata filter is not supported by boto3 - skipping test')
1103 print('filtering by attributes only supported on master zone')
1107 # get all notifications
1108 result
, status
= s3_notification_conf
.get_config()
1109 assert_equal(status
/100, 2)
1110 for conf
in result
['TopicConfigurations']:
1111 filter_name
= conf
['Filter']['Key']['FilterRules'][0]['Name']
1112 assert filter_name
== 'prefix' or filter_name
== 'suffix' or filter_name
== 'regex', filter_name
1115 result
, status
= s3_notification_conf4
.get_config(notification
=notification_name
+'_4')
1116 assert_equal(status
/100, 2)
1117 filter_name
= result
['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
1118 assert filter_name
== 'x-amz-meta-foo' or filter_name
== 'x-amz-meta-hello'
1120 expected_in1
= ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
1121 expected_in2
= ['world1.log', 'world2log', 'world3.log']
1122 expected_in3
= ['hello.txt', 'hell.txt', 'worldlog.txt']
1123 expected_in4
= ['foo', 'bar', 'hello', 'world']
1124 filtered
= ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
1125 filtered_with_attr
= ['nofoo', 'nobar', 'nohello', 'noworld']
1126 # create objects in bucket
1127 for key_name
in expected_in1
:
1128 key
= bucket
.new_key(key_name
)
1129 key
.set_contents_from_string('bar')
1130 for key_name
in expected_in2
:
1131 key
= bucket
.new_key(key_name
)
1132 key
.set_contents_from_string('bar')
1133 for key_name
in expected_in3
:
1134 key
= bucket
.new_key(key_name
)
1135 key
.set_contents_from_string('bar')
1137 for key_name
in expected_in4
:
1138 key
= bucket
.new_key(key_name
)
1139 key
.set_metadata('foo', 'bar')
1140 key
.set_metadata('hello', 'world')
1141 key
.set_metadata('goodbye', 'cruel world')
1142 key
.set_contents_from_string('bar')
1143 for key_name
in filtered
:
1144 key
= bucket
.new_key(key_name
)
1145 key
.set_contents_from_string('bar')
1146 for key_name
in filtered_with_attr
:
1147 key
.set_metadata('foo', 'nobar')
1148 key
.set_metadata('hello', 'noworld')
1149 key
.set_metadata('goodbye', 'cruel world')
1150 key
= bucket
.new_key(key_name
)
1151 key
.set_contents_from_string('bar')
1154 print('wait for 5sec for the messages...')
1157 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1164 for event
in receiver
.get_and_reset_events():
1165 notif_id
= event
['Records'][0]['s3']['configurationId']
1166 key_name
= event
['Records'][0]['s3']['object']['key']
1167 if notif_id
== notification_name
+'_1':
1168 found_in1
.append(key_name
)
1169 elif notif_id
== notification_name
+'_2':
1170 found_in2
.append(key_name
)
1171 elif notif_id
== notification_name
+'_3':
1172 found_in3
.append(key_name
)
1173 elif not skip_notif4
and notif_id
== notification_name
+'_4':
1174 found_in4
.append(key_name
)
1176 assert False, 'invalid notification: ' + notif_id
1178 assert_equal(set(found_in1
), set(expected_in1
))
1179 assert_equal(set(found_in2
), set(expected_in2
))
1180 assert_equal(set(found_in3
), set(expected_in3
))
1182 assert_equal(set(found_in4
), set(expected_in4
))
1185 s3_notification_conf
.del_config()
1187 s3_notification_conf4
.del_config()
1188 topic_conf
.del_config()
1190 for key
in bucket
.list():
1192 master_zone
.delete_bucket(bucket_name
)
1193 stop_amqp_receiver(receiver
, task
)
1194 clean_rabbitmq(proc
)
1197 def test_ps_s3_notification_filter_on_master():
1198 ps_s3_notification_filter(on_master
=True)
1201 def test_ps_s3_notification_filter():
1202 ps_s3_notification_filter(on_master
=False)
1205 def test_ps_s3_notification_errors_on_master():
1206 """ test s3 notification set/get/delete on master """
1207 master_zone
, _
= init_env(require_ps
=False)
1209 zonegroup
= realm
.master_zonegroup()
1210 bucket_name
= gen_bucket_name()
1212 bucket
= master_zone
.create_bucket(bucket_name
)
1213 topic_name
= bucket_name
+ TOPIC_SUFFIX
1215 endpoint_address
= 'amqp://127.0.0.1:7001'
1216 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
1217 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1218 topic_arn
= topic_conf
.set_config()
1220 # create s3 notification with invalid event name
1221 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1222 topic_conf_list
= [{'Id': notification_name
,
1223 'TopicArn': topic_arn
,
1224 'Events': ['s3:ObjectCreated:Kaboom']
1226 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1228 result
, status
= s3_notification_conf
.set_config()
1229 except Exception as error
:
1230 print(str(error
) + ' - is expected')
1232 assert False, 'invalid event name is expected to fail'
1234 # create s3 notification with missing name
1235 topic_conf_list
= [{'Id': '',
1236 'TopicArn': topic_arn
,
1237 'Events': ['s3:ObjectCreated:Put']
1239 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1241 _
, _
= s3_notification_conf
.set_config()
1242 except Exception as error
:
1243 print(str(error
) + ' - is expected')
1245 assert False, 'missing notification name is expected to fail'
1247 # create s3 notification with invalid topic ARN
1248 invalid_topic_arn
= 'kaboom'
1249 topic_conf_list
= [{'Id': notification_name
,
1250 'TopicArn': invalid_topic_arn
,
1251 'Events': ['s3:ObjectCreated:Put']
1253 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1255 _
, _
= s3_notification_conf
.set_config()
1256 except Exception as error
:
1257 print(str(error
) + ' - is expected')
1259 assert False, 'invalid ARN is expected to fail'
1261 # create s3 notification with unknown topic ARN
1262 invalid_topic_arn
= 'arn:aws:sns:a::kaboom'
1263 topic_conf_list
= [{'Id': notification_name
,
1264 'TopicArn': invalid_topic_arn
,
1265 'Events': ['s3:ObjectCreated:Put']
1267 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1269 _
, _
= s3_notification_conf
.set_config()
1270 except Exception as error
:
1271 print(str(error
) + ' - is expected')
1273 assert False, 'unknown topic is expected to fail'
1275 # create s3 notification with wrong bucket
1276 topic_conf_list
= [{'Id': notification_name
,
1277 'TopicArn': topic_arn
,
1278 'Events': ['s3:ObjectCreated:Put']
1280 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, 'kaboom', topic_conf_list
)
1282 _
, _
= s3_notification_conf
.set_config()
1283 except Exception as error
:
1284 print(str(error
) + ' - is expected')
1286 assert False, 'unknown bucket is expected to fail'
1288 topic_conf
.del_config()
1290 status
= topic_conf
.del_config()
1291 # deleting an unknown notification is not considered an error
1292 assert_equal(status
, 200)
1294 _
, status
= topic_conf
.get_config()
1295 assert_equal(status
, 404)
1299 master_zone
.delete_bucket(bucket_name
)
1302 def test_objcet_timing():
1303 return SkipTest("only used in manual testing")
1304 master_zone
, _
= init_env(require_ps
=False)
1307 bucket_name
= gen_bucket_name()
1308 bucket
= master_zone
.create_bucket(bucket_name
)
1309 # create objects in the bucket (async)
1310 print('creating objects...')
1311 number_of_objects
= 1000
1313 start_time
= time
.time()
1314 content
= str(bytearray(os
.urandom(1024*1024)))
1315 for i
in range(number_of_objects
):
1316 key
= bucket
.new_key(str(i
))
1317 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1319 client_threads
.append(thr
)
1320 [thr
.join() for thr
in client_threads
]
1322 time_diff
= time
.time() - start_time
1323 print('average time for object creation: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1325 print('total number of objects: ' + str(len(list(bucket
.list()))))
1327 print('deleting objects...')
1329 start_time
= time
.time()
1330 for key
in bucket
.list():
1331 thr
= threading
.Thread(target
= key
.delete
, args
=())
1333 client_threads
.append(thr
)
1334 [thr
.join() for thr
in client_threads
]
1336 time_diff
= time
.time() - start_time
1337 print('average time for object deletion: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1340 master_zone
.delete_bucket(bucket_name
)
1343 def test_ps_s3_notification_push_amqp_on_master():
1344 """ test pushing amqp s3 notification on master """
1346 return SkipTest("PubSub push tests don't run in teuthology")
1348 proc
= init_rabbitmq()
1350 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1351 master_zone
, _
= init_env(require_ps
=False)
1353 zonegroup
= realm
.master_zonegroup()
1356 bucket_name
= gen_bucket_name()
1357 bucket
= master_zone
.create_bucket(bucket_name
)
1358 topic_name1
= bucket_name
+ TOPIC_SUFFIX
+ '_1'
1359 topic_name2
= bucket_name
+ TOPIC_SUFFIX
+ '_2'
1361 # start amqp receivers
1363 task1
, receiver1
= create_amqp_receiver_thread(exchange
, topic_name1
)
1364 task2
, receiver2
= create_amqp_receiver_thread(exchange
, topic_name2
)
1368 # create two s3 topic
1369 endpoint_address
= 'amqp://' + hostname
1370 # with acks from broker
1371 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
1372 topic_conf1
= PSTopicS3(master_zone
.conn
, topic_name1
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1373 topic_arn1
= topic_conf1
.set_config()
1374 # without acks from broker
1375 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=none'
1376 topic_conf2
= PSTopicS3(master_zone
.conn
, topic_name2
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1377 topic_arn2
= topic_conf2
.set_config()
1378 # create s3 notification
1379 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1380 topic_conf_list
= [{'Id': notification_name
+'_1', 'TopicArn': topic_arn1
,
1383 {'Id': notification_name
+'_2', 'TopicArn': topic_arn2
,
1384 'Events': ['s3:ObjectCreated:*']
1387 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1388 response
, status
= s3_notification_conf
.set_config()
1389 assert_equal(status
/100, 2)
1391 # create objects in the bucket (async)
1392 number_of_objects
= 100
1394 start_time
= time
.time()
1395 for i
in range(number_of_objects
):
1396 key
= bucket
.new_key(str(i
))
1397 content
= str(os
.urandom(1024*1024))
1398 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1400 client_threads
.append(thr
)
1401 [thr
.join() for thr
in client_threads
]
1403 time_diff
= time
.time() - start_time
1404 print('average time for creation + qmqp notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1406 print('wait for 5sec for the messages...')
1409 # check amqp receiver
1410 keys
= list(bucket
.list())
1411 print('total number of objects: ' + str(len(keys
)))
1412 receiver1
.verify_s3_events(keys
, exact_match
=True)
1413 receiver2
.verify_s3_events(keys
, exact_match
=True)
1415 # delete objects from the bucket
1417 start_time
= time
.time()
1418 for key
in bucket
.list():
1419 thr
= threading
.Thread(target
= key
.delete
, args
=())
1421 client_threads
.append(thr
)
1422 [thr
.join() for thr
in client_threads
]
1424 time_diff
= time
.time() - start_time
1425 print('average time for deletion + amqp notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1427 print('wait for 5sec for the messages...')
1430 # check amqp receiver 1 for deletions
1431 receiver1
.verify_s3_events(keys
, exact_match
=True, deletions
=True)
1432 # check amqp receiver 2 has no deletions
1434 receiver1
.verify_s3_events(keys
, exact_match
=False, deletions
=True)
1438 err
= 'amqp receiver 2 should have no deletions'
1442 stop_amqp_receiver(receiver1
, task1
)
1443 stop_amqp_receiver(receiver2
, task2
)
1444 s3_notification_conf
.del_config()
1445 topic_conf1
.del_config()
1446 topic_conf2
.del_config()
1448 master_zone
.delete_bucket(bucket_name
)
1449 clean_rabbitmq(proc
)
1452 def test_ps_s3_notification_push_kafka():
1453 """ test pushing kafka s3 notification on master """
1455 return SkipTest("PubSub push tests don't run in teuthology")
1456 kafka_proc
, zk_proc
, kafka_log
= init_kafka()
1457 if kafka_proc
is None or zk_proc
is None:
1458 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1460 master_zone
, ps_zone
= init_env()
1462 zonegroup
= realm
.master_zonegroup()
1465 bucket_name
= gen_bucket_name()
1466 bucket
= master_zone
.create_bucket(bucket_name
)
1468 zone_meta_checkpoint(ps_zone
.zone
)
1469 # name is constant for manual testing
1470 topic_name
= bucket_name
+'_topic'
1471 # create consumer on the topic
1472 task
, receiver
= create_kafka_receiver_thread(topic_name
)
1476 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
1477 endpoint
='kafka://' + kafka_server
,
1478 endpoint_args
='kafka-ack-level=broker')
1479 result
, status
= topic_conf
.set_config()
1480 assert_equal(status
/100, 2)
1481 parsed_result
= json
.loads(result
)
1482 topic_arn
= parsed_result
['arn']
1483 # create s3 notification
1484 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1485 topic_conf_list
= [{'Id': notification_name
, 'TopicArn': topic_arn
,
1489 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
1490 response
, status
= s3_notification_conf
.set_config()
1491 assert_equal(status
/100, 2)
1493 # create objects in the bucket (async)
1494 number_of_objects
= 10
1496 for i
in range(number_of_objects
):
1497 key
= bucket
.new_key(str(i
))
1498 content
= str(os
.urandom(1024*1024))
1499 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1501 client_threads
.append(thr
)
1502 [thr
.join() for thr
in client_threads
]
1505 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1506 keys
= list(bucket
.list())
1507 receiver
.verify_s3_events(keys
, exact_match
=True)
1509 # delete objects from the bucket
1511 for key
in bucket
.list():
1512 thr
= threading
.Thread(target
= key
.delete
, args
=())
1514 client_threads
.append(thr
)
1515 [thr
.join() for thr
in client_threads
]
1517 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1518 receiver
.verify_s3_events(keys
, exact_match
=True, deletions
=True)
1521 s3_notification_conf
.del_config()
1522 topic_conf
.del_config()
1524 master_zone
.delete_bucket(bucket_name
)
1525 stop_kafka_receiver(receiver
, task
)
1526 clean_kafka(kafka_proc
, zk_proc
, kafka_log
)
1529 def test_ps_s3_notification_push_kafka_on_master():
1530 """ test pushing kafka s3 notification on master """
1532 return SkipTest("PubSub push tests don't run in teuthology")
1533 kafka_proc
, zk_proc
, kafka_log
= init_kafka()
1534 if kafka_proc
is None or zk_proc
is None:
1535 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1536 master_zone
, _
= init_env(require_ps
=False)
1538 zonegroup
= realm
.master_zonegroup()
1541 bucket_name
= gen_bucket_name()
1542 bucket
= master_zone
.create_bucket(bucket_name
)
1543 # name is constant for manual testing
1544 topic_name
= bucket_name
+'_topic'
1545 # create consumer on the topic
1546 task
, receiver
= create_kafka_receiver_thread(topic_name
+'_1')
1550 endpoint_address
= 'kafka://' + kafka_server
1551 # without acks from broker
1552 endpoint_args
= 'push-endpoint='+endpoint_address
+'&kafka-ack-level=broker'
1553 topic_conf1
= PSTopicS3(master_zone
.conn
, topic_name
+'_1', zonegroup
.name
, endpoint_args
=endpoint_args
)
1554 topic_arn1
= topic_conf1
.set_config()
1555 endpoint_args
= 'push-endpoint='+endpoint_address
+'&kafka-ack-level=none'
1556 topic_conf2
= PSTopicS3(master_zone
.conn
, topic_name
+'_2', zonegroup
.name
, endpoint_args
=endpoint_args
)
1557 topic_arn2
= topic_conf2
.set_config()
1558 # create s3 notification
1559 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1560 topic_conf_list
= [{'Id': notification_name
+ '_1', 'TopicArn': topic_arn1
,
1563 {'Id': notification_name
+ '_2', 'TopicArn': topic_arn2
,
1567 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1568 response
, status
= s3_notification_conf
.set_config()
1569 assert_equal(status
/100, 2)
1571 # create objects in the bucket (async)
1572 number_of_objects
= 10
1574 start_time
= time
.time()
1575 for i
in range(number_of_objects
):
1576 key
= bucket
.new_key(str(i
))
1577 content
= str(os
.urandom(1024*1024))
1578 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1580 client_threads
.append(thr
)
1581 [thr
.join() for thr
in client_threads
]
1583 time_diff
= time
.time() - start_time
1584 print('average time for creation + kafka notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1586 print('wait for 5sec for the messages...')
1588 keys
= list(bucket
.list())
1589 receiver
.verify_s3_events(keys
, exact_match
=True)
1591 # delete objects from the bucket
1593 start_time
= time
.time()
1594 for key
in bucket
.list():
1595 thr
= threading
.Thread(target
= key
.delete
, args
=())
1597 client_threads
.append(thr
)
1598 [thr
.join() for thr
in client_threads
]
1600 time_diff
= time
.time() - start_time
1601 print('average time for deletion + kafka notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1603 print('wait for 5sec for the messages...')
1605 receiver
.verify_s3_events(keys
, exact_match
=True, deletions
=True)
1608 s3_notification_conf
.del_config()
1609 topic_conf1
.del_config()
1610 topic_conf2
.del_config()
1612 master_zone
.delete_bucket(bucket_name
)
1613 stop_kafka_receiver(receiver
, task
)
1614 clean_kafka(kafka_proc
, zk_proc
, kafka_log
)
1617 def kafka_security(security_type
):
1618 """ test pushing kafka s3 notification on master """
1620 return SkipTest("PubSub push tests don't run in teuthology")
1621 master_zone
, _
= init_env(require_ps
=False)
1622 if security_type
== 'SSL_SASL' and master_zone
.secure_conn
is None:
1623 return SkipTest("secure connection is needed to test SASL_SSL security")
1624 kafka_proc
, zk_proc
, kafka_log
= init_kafka()
1625 if kafka_proc
is None or zk_proc
is None:
1626 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1628 zonegroup
= realm
.master_zonegroup()
1631 bucket_name
= gen_bucket_name()
1632 bucket
= master_zone
.create_bucket(bucket_name
)
1633 # name is constant for manual testing
1634 topic_name
= bucket_name
+'_topic'
1635 # create consumer on the topic
1636 task
, receiver
= create_kafka_receiver_thread(topic_name
)
1640 if security_type
== 'SSL_SASL':
1641 endpoint_address
= 'kafka://alice:alice-secret@' + kafka_server
+ ':9094'
1644 endpoint_address
= 'kafka://' + kafka_server
+ ':9093'
1646 KAFKA_DIR
= os
.environ
['KAFKA_DIR']
1648 # without acks from broker, with root CA
1649 endpoint_args
= 'push-endpoint='+endpoint_address
+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR
+'rootCA.crt'
1651 if security_type
== 'SSL_SASL':
1652 topic_conf
= PSTopicS3(master_zone
.secure_conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1654 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1656 topic_arn
= topic_conf
.set_config()
1657 # create s3 notification
1658 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1659 topic_conf_list
= [{'Id': notification_name
, 'TopicArn': topic_arn
,
1663 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1664 s3_notification_conf
.set_config()
1666 # create objects in the bucket (async)
1667 number_of_objects
= 10
1669 start_time
= time
.time()
1670 for i
in range(number_of_objects
):
1671 key
= bucket
.new_key(str(i
))
1672 content
= str(os
.urandom(1024*1024))
1673 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1675 client_threads
.append(thr
)
1676 [thr
.join() for thr
in client_threads
]
1678 time_diff
= time
.time() - start_time
1679 print('average time for creation + kafka notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1682 print('wait for 5sec for the messages...')
1684 keys
= list(bucket
.list())
1685 receiver
.verify_s3_events(keys
, exact_match
=True)
1687 # delete objects from the bucket
1689 start_time
= time
.time()
1690 for key
in bucket
.list():
1691 thr
= threading
.Thread(target
= key
.delete
, args
=())
1693 client_threads
.append(thr
)
1694 [thr
.join() for thr
in client_threads
]
1696 time_diff
= time
.time() - start_time
1697 print('average time for deletion + kafka notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1699 print('wait for 5sec for the messages...')
1701 receiver
.verify_s3_events(keys
, exact_match
=True, deletions
=True)
1702 except Exception as err
:
1703 assert False, str(err
)
1706 s3_notification_conf
.del_config()
1707 topic_conf
.del_config()
1709 for key
in bucket
.list():
1711 master_zone
.delete_bucket(bucket_name
)
1712 stop_kafka_receiver(receiver
, task
)
1713 clean_kafka(kafka_proc
, zk_proc
, kafka_log
)
1716 def test_ps_s3_notification_push_kafka_security_ssl():
1717 kafka_security('SSL')
1719 def test_ps_s3_notification_push_kafka_security_ssl_sasl():
1720 kafka_security('SSL_SASL')
1723 def test_ps_s3_notification_multi_delete_on_master():
1724 """ test deletion of multiple keys on master """
1726 return SkipTest("PubSub push tests don't run in teuthology")
1728 zones
, _
= init_env(require_ps
=False)
1730 zonegroup
= realm
.master_zonegroup()
1732 # create random port for the http server
1734 port
= random
.randint(10000, 20000)
1735 # start an http server in a separate thread
1736 number_of_objects
= 10
1737 http_server
= StreamingHTTPServer(host
, port
, num_workers
=number_of_objects
)
1740 bucket_name
= gen_bucket_name()
1741 bucket
= zones
[0].create_bucket(bucket_name
)
1742 topic_name
= bucket_name
+ TOPIC_SUFFIX
1745 endpoint_address
= 'http://'+host
+':'+str(port
)
1746 endpoint_args
= 'push-endpoint='+endpoint_address
1747 topic_conf
= PSTopicS3(zones
[0].conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1748 topic_arn
= topic_conf
.set_config()
1749 # create s3 notification
1750 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1751 topic_conf_list
= [{'Id': notification_name
,
1752 'TopicArn': topic_arn
,
1753 'Events': ['s3:ObjectRemoved:*']
1755 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
1756 response
, status
= s3_notification_conf
.set_config()
1757 assert_equal(status
/100, 2)
1759 # create objects in the bucket
1761 for i
in range(number_of_objects
):
1762 obj_size
= randint(1, 1024)
1763 content
= str(os
.urandom(obj_size
))
1764 key
= bucket
.new_key(str(i
))
1765 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1767 client_threads
.append(thr
)
1768 [thr
.join() for thr
in client_threads
]
1770 keys
= list(bucket
.list())
1772 start_time
= time
.time()
1773 delete_all_objects(zones
[0].conn
, bucket_name
)
1774 time_diff
= time
.time() - start_time
1775 print('average time for deletion + http notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1777 print('wait for 5sec for the messages...')
1780 # check http receiver
1781 http_server
.verify_s3_events(keys
, exact_match
=True, deletions
=True)
1784 topic_conf
.del_config()
1785 s3_notification_conf
.del_config(notification
=notification_name
)
1787 zones
[0].delete_bucket(bucket_name
)
1791 def test_ps_s3_notification_push_http_on_master():
1792 """ test pushing http s3 notification on master """
1794 return SkipTest("PubSub push tests don't run in teuthology")
1796 master_zone
, _
= init_env(require_ps
=False)
1798 zonegroup
= realm
.master_zonegroup()
1800 # create random port for the http server
1802 port
= random
.randint(10000, 20000)
1803 # start an http server in a separate thread
1804 number_of_objects
= 10
1805 http_server
= StreamingHTTPServer(host
, port
, num_workers
=number_of_objects
)
1808 bucket_name
= gen_bucket_name()
1809 bucket
= master_zone
.create_bucket(bucket_name
)
1810 topic_name
= bucket_name
+ TOPIC_SUFFIX
1813 endpoint_address
= 'http://'+host
+':'+str(port
)
1814 endpoint_args
= 'push-endpoint='+endpoint_address
1815 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1816 topic_arn
= topic_conf
.set_config()
1817 # create s3 notification
1818 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1819 topic_conf_list
= [{'Id': notification_name
,
1820 'TopicArn': topic_arn
,
1823 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1824 response
, status
= s3_notification_conf
.set_config()
1825 assert_equal(status
/100, 2)
1827 # create objects in the bucket
1829 start_time
= time
.time()
1831 for i
in range(number_of_objects
):
1832 key
= bucket
.new_key(str(i
))
1833 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1835 client_threads
.append(thr
)
1836 [thr
.join() for thr
in client_threads
]
1838 time_diff
= time
.time() - start_time
1839 print('average time for creation + http notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1841 print('wait for 5sec for the messages...')
1844 # check http receiver
1845 keys
= list(bucket
.list())
1846 print('total number of objects: ' + str(len(keys
)))
1847 http_server
.verify_s3_events(keys
, exact_match
=True)
1849 # delete objects from the bucket
1851 start_time
= time
.time()
1852 for key
in bucket
.list():
1853 thr
= threading
.Thread(target
= key
.delete
, args
=())
1855 client_threads
.append(thr
)
1856 [thr
.join() for thr
in client_threads
]
1858 time_diff
= time
.time() - start_time
1859 print('average time for deletion + http notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1861 print('wait for 5sec for the messages...')
1864 # check http receiver
1865 http_server
.verify_s3_events(keys
, exact_match
=True, deletions
=True)
1868 topic_conf
.del_config()
1869 s3_notification_conf
.del_config(notification
=notification_name
)
1871 master_zone
.delete_bucket(bucket_name
)
1875 def test_ps_s3_opaque_data():
1876 """ test that opaque id set in topic, is sent in notification """
1878 return SkipTest("PubSub push tests don't run in teuthology")
1880 master_zone
, ps_zone
= init_env()
1882 zonegroup
= realm
.master_zonegroup()
1884 # create random port for the http server
1886 port
= random
.randint(10000, 20000)
1887 # start an http server in a separate thread
1888 number_of_objects
= 10
1889 http_server
= StreamingHTTPServer(host
, port
, num_workers
=number_of_objects
)
1892 bucket_name
= gen_bucket_name()
1893 bucket
= master_zone
.create_bucket(bucket_name
)
1894 topic_name
= bucket_name
+ TOPIC_SUFFIX
1896 zone_meta_checkpoint(ps_zone
.zone
)
1899 endpoint_address
= 'http://'+host
+':'+str(port
)
1900 opaque_data
= 'http://1.2.3.4:8888'
1901 endpoint_args
= 'push-endpoint='+endpoint_address
+'&OpaqueData='+opaque_data
1902 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
, endpoint
=endpoint_address
, endpoint_args
=endpoint_args
)
1903 result
, status
= topic_conf
.set_config()
1904 assert_equal(status
/100, 2)
1905 parsed_result
= json
.loads(result
)
1906 topic_arn
= parsed_result
['arn']
1907 # create s3 notification
1908 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1909 topic_conf_list
= [{'Id': notification_name
,
1910 'TopicArn': topic_arn
,
1913 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
1914 response
, status
= s3_notification_conf
.set_config()
1915 assert_equal(status
/100, 2)
1917 # create objects in the bucket
1920 for i
in range(number_of_objects
):
1921 key
= bucket
.new_key(str(i
))
1922 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1924 client_threads
.append(thr
)
1925 [thr
.join() for thr
in client_threads
]
1928 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1930 # check http receiver
1931 keys
= list(bucket
.list())
1932 print('total number of objects: ' + str(len(keys
)))
1933 events
= http_server
.get_and_reset_events()
1934 for event
in events
:
1935 assert_equal(event
['Records'][0]['opaqueData'], opaque_data
)
1940 [thr
.join() for thr
in client_threads
]
1941 topic_conf
.del_config()
1942 s3_notification_conf
.del_config(notification
=notification_name
)
1944 master_zone
.delete_bucket(bucket_name
)
1948 def test_ps_s3_opaque_data_on_master():
1949 """ test that opaque id set in topic, is sent in notification on master """
1951 return SkipTest("PubSub push tests don't run in teuthology")
1953 master_zone
, _
= init_env(require_ps
=False)
1955 zonegroup
= realm
.master_zonegroup()
1957 # create random port for the http server
1959 port
= random
.randint(10000, 20000)
1960 # start an http server in a separate thread
1961 number_of_objects
= 10
1962 http_server
= StreamingHTTPServer(host
, port
, num_workers
=number_of_objects
)
1965 bucket_name
= gen_bucket_name()
1966 bucket
= master_zone
.create_bucket(bucket_name
)
1967 topic_name
= bucket_name
+ TOPIC_SUFFIX
1970 endpoint_address
= 'http://'+host
+':'+str(port
)
1971 endpoint_args
= 'push-endpoint='+endpoint_address
1972 opaque_data
= 'http://1.2.3.4:8888'
1973 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
, opaque_data
=opaque_data
)
1974 topic_arn
= topic_conf
.set_config()
1975 # create s3 notification
1976 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1977 topic_conf_list
= [{'Id': notification_name
,
1978 'TopicArn': topic_arn
,
1981 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
1982 response
, status
= s3_notification_conf
.set_config()
1983 assert_equal(status
/100, 2)
1985 # create objects in the bucket
1987 start_time
= time
.time()
1989 for i
in range(number_of_objects
):
1990 key
= bucket
.new_key(str(i
))
1991 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1993 client_threads
.append(thr
)
1994 [thr
.join() for thr
in client_threads
]
1996 time_diff
= time
.time() - start_time
1997 print('average time for creation + http notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1999 print('wait for 5sec for the messages...')
2002 # check http receiver
2003 keys
= list(bucket
.list())
2004 print('total number of objects: ' + str(len(keys
)))
2005 events
= http_server
.get_and_reset_events()
2006 for event
in events
:
2007 assert_equal(event
['Records'][0]['opaqueData'], opaque_data
)
2012 [thr
.join() for thr
in client_threads
]
2013 topic_conf
.del_config()
2014 s3_notification_conf
.del_config(notification
=notification_name
)
2016 master_zone
.delete_bucket(bucket_name
)
2019 def test_ps_topic():
2020 """ test set/get/delete of topic """
2021 _
, ps_zone
= init_env()
2023 zonegroup
= realm
.master_zonegroup()
2024 bucket_name
= gen_bucket_name()
2025 topic_name
= bucket_name
+TOPIC_SUFFIX
2028 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2029 _
, status
= topic_conf
.set_config()
2030 assert_equal(status
/100, 2)
2032 result
, _
= topic_conf
.get_config()
2033 # verify topic content
2034 parsed_result
= json
.loads(result
)
2035 assert_equal(parsed_result
['topic']['name'], topic_name
)
2036 assert_equal(len(parsed_result
['subs']), 0)
2037 assert_equal(parsed_result
['topic']['arn'],
2038 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
)
2040 _
, status
= topic_conf
.del_config()
2041 assert_equal(status
/100, 2)
2042 # verift topic is deleted
2043 result
, status
= topic_conf
.get_config()
2044 assert_equal(status
, 404)
2045 parsed_result
= json
.loads(result
)
2046 assert_equal(parsed_result
['Code'], 'NoSuchKey')
2049 def test_ps_topic_with_endpoint():
2050 """ test set topic with endpoint"""
2051 _
, ps_zone
= init_env()
2052 bucket_name
= gen_bucket_name()
2053 topic_name
= bucket_name
+TOPIC_SUFFIX
2056 dest_endpoint
= 'amqp://localhost:7001'
2057 dest_args
= 'amqp-exchange=amqp.direct&amqp-ack-level=none'
2058 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
2059 endpoint
=dest_endpoint
,
2060 endpoint_args
=dest_args
)
2061 _
, status
= topic_conf
.set_config()
2062 assert_equal(status
/100, 2)
2064 result
, _
= topic_conf
.get_config()
2065 # verify topic content
2066 parsed_result
= json
.loads(result
)
2067 assert_equal(parsed_result
['topic']['name'], topic_name
)
2068 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], dest_endpoint
)
2070 topic_conf
.del_config()
2073 def test_ps_notification():
2074 """ test set/get/delete of notification """
2075 master_zone
, ps_zone
= init_env()
2076 bucket_name
= gen_bucket_name()
2077 topic_name
= bucket_name
+TOPIC_SUFFIX
2080 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2081 topic_conf
.set_config()
2082 # create bucket on the first of the rados zones
2083 master_zone
.create_bucket(bucket_name
)
2085 zone_meta_checkpoint(ps_zone
.zone
)
2086 # create notifications
2087 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2089 _
, status
= notification_conf
.set_config()
2090 assert_equal(status
/100, 2)
2092 result
, _
= notification_conf
.get_config()
2093 parsed_result
= json
.loads(result
)
2094 assert_equal(len(parsed_result
['topics']), 1)
2095 assert_equal(parsed_result
['topics'][0]['topic']['name'],
2097 # delete notification
2098 _
, status
= notification_conf
.del_config()
2099 assert_equal(status
/100, 2)
2100 result
, status
= notification_conf
.get_config()
2101 parsed_result
= json
.loads(result
)
2102 assert_equal(len(parsed_result
['topics']), 0)
2103 # TODO should return 404
2104 # assert_equal(status, 404)
2107 topic_conf
.del_config()
2108 master_zone
.delete_bucket(bucket_name
)
2111 def test_ps_notification_events():
2112 """ test set/get/delete of notification on specific events"""
2113 master_zone
, ps_zone
= init_env()
2114 bucket_name
= gen_bucket_name()
2115 topic_name
= bucket_name
+TOPIC_SUFFIX
2118 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2119 topic_conf
.set_config()
2120 # create bucket on the first of the rados zones
2121 master_zone
.create_bucket(bucket_name
)
2123 zone_meta_checkpoint(ps_zone
.zone
)
2124 # create notifications
2125 events
= "OBJECT_CREATE,OBJECT_DELETE"
2126 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2129 _
, status
= notification_conf
.set_config()
2130 assert_equal(status
/100, 2)
2132 result
, _
= notification_conf
.get_config()
2133 parsed_result
= json
.loads(result
)
2134 assert_equal(len(parsed_result
['topics']), 1)
2135 assert_equal(parsed_result
['topics'][0]['topic']['name'],
2137 assert_not_equal(len(parsed_result
['topics'][0]['events']), 0)
2138 # TODO add test for invalid event name
2141 notification_conf
.del_config()
2142 topic_conf
.del_config()
2143 master_zone
.delete_bucket(bucket_name
)
2146 def test_ps_subscription():
2147 """ test set/get/delete of subscription """
2148 master_zone
, ps_zone
= init_env()
2149 bucket_name
= gen_bucket_name()
2150 topic_name
= bucket_name
+TOPIC_SUFFIX
2153 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2154 topic_conf
.set_config()
2155 # create bucket on the first of the rados zones
2156 bucket
= master_zone
.create_bucket(bucket_name
)
2158 zone_meta_checkpoint(ps_zone
.zone
)
2159 # create notifications
2160 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2162 _
, status
= notification_conf
.set_config()
2163 assert_equal(status
/100, 2)
2164 # create subscription
2165 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
2167 _
, status
= sub_conf
.set_config()
2168 assert_equal(status
/100, 2)
2169 # get the subscription
2170 result
, _
= sub_conf
.get_config()
2171 parsed_result
= json
.loads(result
)
2172 assert_equal(parsed_result
['topic'], topic_name
)
2173 # create objects in the bucket
2174 number_of_objects
= 10
2175 for i
in range(number_of_objects
):
2176 key
= bucket
.new_key(str(i
))
2177 key
.set_contents_from_string('bar')
2179 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2181 # get the create events from the subscription
2182 result
, _
= sub_conf
.get_events()
2183 events
= json
.loads(result
)
2184 for event
in events
['events']:
2185 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2186 keys
= list(bucket
.list())
2187 # TODO: use exact match
2188 verify_events_by_elements(events
, keys
, exact_match
=False)
2189 # delete objects from the bucket
2190 for key
in bucket
.list():
2193 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2195 # get the delete events from the subscriptions
2196 #result, _ = sub_conf.get_events()
2197 #for event in events['events']:
2198 # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2199 # TODO: check deletions
2200 # TODO: use exact match
2201 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
2202 # we should see the creations as well as the deletions
2203 # delete subscription
2204 _
, status
= sub_conf
.del_config()
2205 assert_equal(status
/100, 2)
2206 result
, status
= sub_conf
.get_config()
2207 parsed_result
= json
.loads(result
)
2208 assert_equal(parsed_result
['topic'], '')
2209 # TODO should return 404
2210 # assert_equal(status, 404)
2213 notification_conf
.del_config()
2214 topic_conf
.del_config()
2215 master_zone
.delete_bucket(bucket_name
)
2217 def test_ps_incremental_sync():
2218 """ test that events are only sent on incremental sync """
2219 master_zone
, ps_zone
= init_env()
2220 bucket_name
= gen_bucket_name()
2221 topic_name
= bucket_name
+TOPIC_SUFFIX
2224 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2225 topic_conf
.set_config()
2226 # create bucket on the first of the rados zones
2227 bucket
= master_zone
.create_bucket(bucket_name
)
2228 # create objects in the bucket
2229 number_of_objects
= 10
2230 for i
in range(0, number_of_objects
):
2231 key
= bucket
.new_key(str(i
))
2232 key
.set_contents_from_string('foo')
2234 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2235 # create notifications
2236 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2238 _
, status
= notification_conf
.set_config()
2239 assert_equal(status
/100, 2)
2240 # create subscription
2241 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
2243 _
, status
= sub_conf
.set_config()
2244 assert_equal(status
/100, 2)
2246 # create more objects in the bucket
2247 for i
in range(number_of_objects
, 2*number_of_objects
):
2248 key
= bucket
.new_key(str(i
))
2249 key
.set_contents_from_string('bar')
2251 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2253 # get the create events from the subscription
2254 result
, _
= sub_conf
.get_events()
2255 events
= json
.loads(result
)
2257 for event
in events
['events']:
2258 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2261 # make sure we have 10 and not 20 events
2262 assert_equal(count
, number_of_objects
)
2265 for key
in bucket
.list():
2267 sub_conf
.del_config()
2268 notification_conf
.del_config()
2269 topic_conf
.del_config()
2270 master_zone
.delete_bucket(bucket_name
)
2272 def test_ps_event_type_subscription():
2273 """ test subscriptions for different events """
2274 master_zone
, ps_zone
= init_env()
2275 bucket_name
= gen_bucket_name()
2277 # create topic for objects creation
2278 topic_create_name
= bucket_name
+TOPIC_SUFFIX
+'_create'
2279 topic_create_conf
= PSTopic(ps_zone
.conn
, topic_create_name
)
2280 topic_create_conf
.set_config()
2281 # create topic for objects deletion
2282 topic_delete_name
= bucket_name
+TOPIC_SUFFIX
+'_delete'
2283 topic_delete_conf
= PSTopic(ps_zone
.conn
, topic_delete_name
)
2284 topic_delete_conf
.set_config()
2285 # create topic for all events
2286 topic_name
= bucket_name
+TOPIC_SUFFIX
+'_all'
2287 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2288 topic_conf
.set_config()
2289 # create bucket on the first of the rados zones
2290 bucket
= master_zone
.create_bucket(bucket_name
)
2292 zone_meta_checkpoint(ps_zone
.zone
)
2293 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2294 # create notifications for objects creation
2295 notification_create_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2296 topic_create_name
, "OBJECT_CREATE")
2297 _
, status
= notification_create_conf
.set_config()
2298 assert_equal(status
/100, 2)
2299 # create notifications for objects deletion
2300 notification_delete_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2301 topic_delete_name
, "OBJECT_DELETE")
2302 _
, status
= notification_delete_conf
.set_config()
2303 assert_equal(status
/100, 2)
2304 # create notifications for all events
2305 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2306 topic_name
, "OBJECT_DELETE,OBJECT_CREATE")
2307 _
, status
= notification_conf
.set_config()
2308 assert_equal(status
/100, 2)
2309 # create subscription for objects creation
2310 sub_create_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_create',
2312 _
, status
= sub_create_conf
.set_config()
2313 assert_equal(status
/100, 2)
2314 # create subscription for objects deletion
2315 sub_delete_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_delete',
2317 _
, status
= sub_delete_conf
.set_config()
2318 assert_equal(status
/100, 2)
2319 # create subscription for all events
2320 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_all',
2322 _
, status
= sub_conf
.set_config()
2323 assert_equal(status
/100, 2)
2324 # create objects in the bucket
2325 number_of_objects
= 10
2326 for i
in range(number_of_objects
):
2327 key
= bucket
.new_key(str(i
))
2328 key
.set_contents_from_string('bar')
2330 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2332 # get the events from the creation subscription
2333 result
, _
= sub_create_conf
.get_events()
2334 events
= json
.loads(result
)
2335 for event
in events
['events']:
2336 log
.debug('Event (OBJECT_CREATE): objname: "' + str(event
['info']['key']['name']) +
2337 '" type: "' + str(event
['event']) + '"')
2338 keys
= list(bucket
.list())
2339 # TODO: use exact match
2340 verify_events_by_elements(events
, keys
, exact_match
=False)
2341 # get the events from the deletions subscription
2342 result
, _
= sub_delete_conf
.get_events()
2343 events
= json
.loads(result
)
2344 for event
in events
['events']:
2345 log
.debug('Event (OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
2346 '" type: "' + str(event
['event']) + '"')
2347 assert_equal(len(events
['events']), 0)
2348 # get the events from the all events subscription
2349 result
, _
= sub_conf
.get_events()
2350 events
= json
.loads(result
)
2351 for event
in events
['events']:
2352 log
.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
2353 str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2354 # TODO: use exact match
2355 verify_events_by_elements(events
, keys
, exact_match
=False)
2356 # delete objects from the bucket
2357 for key
in bucket
.list():
2360 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2361 log
.debug("Event (OBJECT_DELETE) synced")
2363 # get the events from the creations subscription
2364 result
, _
= sub_create_conf
.get_events()
2365 events
= json
.loads(result
)
2366 for event
in events
['events']:
2367 log
.debug('Event (OBJECT_CREATE): objname: "' + str(event
['info']['key']['name']) +
2368 '" type: "' + str(event
['event']) + '"')
2369 # deletions should not change the creation events
2370 # TODO: use exact match
2371 verify_events_by_elements(events
, keys
, exact_match
=False)
2372 # get the events from the deletions subscription
2373 result
, _
= sub_delete_conf
.get_events()
2374 events
= json
.loads(result
)
2375 for event
in events
['events']:
2376 log
.debug('Event (OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
2377 '" type: "' + str(event
['event']) + '"')
2378 # only deletions should be listed here
2379 # TODO: use exact match
2380 verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=True)
2381 # get the events from the all events subscription
2382 result
, _
= sub_create_conf
.get_events()
2383 events
= json
.loads(result
)
2384 for event
in events
['events']:
2385 log
.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
2386 '" type: "' + str(event
['event']) + '"')
2387 # both deletions and creations should be here
2388 # TODO: use exact match
2389 verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=False)
2390 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
2391 # TODO: (1) test deletions (2) test overall number of events
2393 # test subscription deletion when topic is specified
2394 _
, status
= sub_create_conf
.del_config(topic
=True)
2395 assert_equal(status
/100, 2)
2396 _
, status
= sub_delete_conf
.del_config(topic
=True)
2397 assert_equal(status
/100, 2)
2398 _
, status
= sub_conf
.del_config(topic
=True)
2399 assert_equal(status
/100, 2)
2402 notification_create_conf
.del_config()
2403 notification_delete_conf
.del_config()
2404 notification_conf
.del_config()
2405 topic_create_conf
.del_config()
2406 topic_delete_conf
.del_config()
2407 topic_conf
.del_config()
2408 master_zone
.delete_bucket(bucket_name
)
2411 def test_ps_event_fetching():
2412 """ test incremental fetching of events from a subscription """
2413 master_zone
, ps_zone
= init_env()
2414 bucket_name
= gen_bucket_name()
2415 topic_name
= bucket_name
+TOPIC_SUFFIX
2418 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2419 topic_conf
.set_config()
2420 # create bucket on the first of the rados zones
2421 bucket
= master_zone
.create_bucket(bucket_name
)
2423 zone_meta_checkpoint(ps_zone
.zone
)
2424 # create notifications
2425 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2427 _
, status
= notification_conf
.set_config()
2428 assert_equal(status
/100, 2)
2429 # create subscription
2430 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
2432 _
, status
= sub_conf
.set_config()
2433 assert_equal(status
/100, 2)
2434 # create objects in the bucket
2435 number_of_objects
= 100
2436 for i
in range(number_of_objects
):
2437 key
= bucket
.new_key(str(i
))
2438 key
.set_contents_from_string('bar')
2440 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2442 total_events_count
= 0
2446 # get the events from the subscription
2447 result
, _
= sub_conf
.get_events(max_events
, next_marker
)
2448 events
= json
.loads(result
)
2449 total_events_count
+= len(events
['events'])
2450 all_events
.extend(events
['events'])
2451 next_marker
= events
['next_marker']
2452 for event
in events
['events']:
2453 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2454 if next_marker
== '':
2456 keys
= list(bucket
.list())
2457 # TODO: use exact match
2458 verify_events_by_elements({'events': all_events
}, keys
, exact_match
=False)
2461 sub_conf
.del_config()
2462 notification_conf
.del_config()
2463 topic_conf
.del_config()
2464 for key
in bucket
.list():
2466 master_zone
.delete_bucket(bucket_name
)
2469 def test_ps_event_acking():
2470 """ test acking of some events in a subscription """
2471 master_zone
, ps_zone
= init_env()
2472 bucket_name
= gen_bucket_name()
2473 topic_name
= bucket_name
+TOPIC_SUFFIX
2476 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2477 topic_conf
.set_config()
2478 # create bucket on the first of the rados zones
2479 bucket
= master_zone
.create_bucket(bucket_name
)
2481 zone_meta_checkpoint(ps_zone
.zone
)
2482 # create notifications
2483 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2485 _
, status
= notification_conf
.set_config()
2486 assert_equal(status
/100, 2)
2487 # create subscription
2488 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
2490 _
, status
= sub_conf
.set_config()
2491 assert_equal(status
/100, 2)
2492 # create objects in the bucket
2493 number_of_objects
= 10
2494 for i
in range(number_of_objects
):
2495 key
= bucket
.new_key(str(i
))
2496 key
.set_contents_from_string('bar')
2498 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2500 # get the create events from the subscription
2501 result
, _
= sub_conf
.get_events()
2502 events
= json
.loads(result
)
2503 original_number_of_events
= len(events
)
2504 for event
in events
['events']:
2505 log
.debug('Event (before ack) id: "' + str(event
['id']) + '"')
2506 keys
= list(bucket
.list())
2507 # TODO: use exact match
2508 verify_events_by_elements(events
, keys
, exact_match
=False)
2509 # ack half of the events
2510 events_to_ack
= number_of_objects
/2
2511 for event
in events
['events']:
2512 if events_to_ack
== 0:
2514 _
, status
= sub_conf
.ack_events(event
['id'])
2515 assert_equal(status
/100, 2)
2518 # verify that acked events are gone
2519 result
, _
= sub_conf
.get_events()
2520 events
= json
.loads(result
)
2521 for event
in events
['events']:
2522 log
.debug('Event (after ack) id: "' + str(event
['id']) + '"')
2523 assert len(events
) >= (original_number_of_events
- number_of_objects
/2)
2526 sub_conf
.del_config()
2527 notification_conf
.del_config()
2528 topic_conf
.del_config()
2529 for key
in bucket
.list():
2531 master_zone
.delete_bucket(bucket_name
)
2534 def test_ps_creation_triggers():
2535 """ test object creation notifications in using put/copy/post """
2536 master_zone
, ps_zone
= init_env()
2537 bucket_name
= gen_bucket_name()
2538 topic_name
= bucket_name
+TOPIC_SUFFIX
2541 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2542 topic_conf
.set_config()
2543 # create bucket on the first of the rados zones
2544 bucket
= master_zone
.create_bucket(bucket_name
)
2546 zone_meta_checkpoint(ps_zone
.zone
)
2547 # create notifications
2548 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2550 _
, status
= notification_conf
.set_config()
2551 assert_equal(status
/100, 2)
2552 # create subscription
2553 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
2555 _
, status
= sub_conf
.set_config()
2556 assert_equal(status
/100, 2)
2557 # create objects in the bucket using PUT
2558 key
= bucket
.new_key('put')
2559 key
.set_contents_from_string('bar')
2560 # create objects in the bucket using COPY
2561 bucket
.copy_key('copy', bucket
.name
, key
.name
)
2562 # create objects in the bucket using multi-part upload
2563 fp
= tempfile
.TemporaryFile(mode
='w')
2566 uploader
= bucket
.initiate_multipart_upload('multipart')
2567 fp
= tempfile
.TemporaryFile(mode
='r')
2568 uploader
.upload_part_from_file(fp
, 1)
2569 uploader
.complete_upload()
2572 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2574 # get the create events from the subscription
2575 result
, _
= sub_conf
.get_events()
2576 events
= json
.loads(result
)
2577 for event
in events
['events']:
2578 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2580 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
2581 assert len(events
['events']) >= 3
2583 sub_conf
.del_config()
2584 notification_conf
.del_config()
2585 topic_conf
.del_config()
2586 for key
in bucket
.list():
2588 master_zone
.delete_bucket(bucket_name
)
2591 def test_ps_s3_creation_triggers_on_master():
2592 """ test object creation s3 notifications in using put/copy/post on master"""
2594 return SkipTest("PubSub push tests don't run in teuthology")
2596 proc
= init_rabbitmq()
2598 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2599 master_zone
, _
= init_env(require_ps
=False)
2601 zonegroup
= realm
.master_zonegroup()
2604 bucket_name
= gen_bucket_name()
2605 bucket
= master_zone
.create_bucket(bucket_name
)
2606 topic_name
= bucket_name
+ TOPIC_SUFFIX
2608 # start amqp receiver
2610 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2614 endpoint_address
= 'amqp://' + hostname
2615 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
2616 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
2617 topic_arn
= topic_conf
.set_config()
2618 # create s3 notification
2619 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2620 topic_conf_list
= [{'Id': notification_name
,'TopicArn': topic_arn
,
2621 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
2624 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
2625 response
, status
= s3_notification_conf
.set_config()
2626 assert_equal(status
/100, 2)
2628 # create objects in the bucket using PUT
2629 key
= bucket
.new_key('put')
2630 key
.set_contents_from_string('bar')
2631 # create objects in the bucket using COPY
2632 bucket
.copy_key('copy', bucket
.name
, key
.name
)
2633 # create objects in the bucket using multi-part upload
2634 fp
= tempfile
.TemporaryFile(mode
='w')
2637 uploader
= bucket
.initiate_multipart_upload('multipart')
2638 fp
= tempfile
.TemporaryFile(mode
='r')
2639 uploader
.upload_part_from_file(fp
, 1)
2640 uploader
.complete_upload()
2643 print('wait for 5sec for the messages...')
2646 # check amqp receiver
2647 keys
= list(bucket
.list())
2648 receiver
.verify_s3_events(keys
, exact_match
=True)
2651 stop_amqp_receiver(receiver
, task
)
2652 s3_notification_conf
.del_config()
2653 topic_conf
.del_config()
2654 for key
in bucket
.list():
2657 master_zone
.delete_bucket(bucket_name
)
2658 clean_rabbitmq(proc
)
2661 def test_ps_s3_multipart_on_master():
2662 """ test multipart object upload on master"""
2664 return SkipTest("PubSub push tests don't run in teuthology")
2666 proc
= init_rabbitmq()
2668 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2669 master_zone
, _
= init_env(require_ps
=False)
2671 zonegroup
= realm
.master_zonegroup()
2674 bucket_name
= gen_bucket_name()
2675 bucket
= master_zone
.create_bucket(bucket_name
)
2676 topic_name
= bucket_name
+ TOPIC_SUFFIX
2678 # start amqp receivers
2680 task1
, receiver1
= create_amqp_receiver_thread(exchange
, topic_name
+'_1')
2682 task2
, receiver2
= create_amqp_receiver_thread(exchange
, topic_name
+'_2')
2684 task3
, receiver3
= create_amqp_receiver_thread(exchange
, topic_name
+'_3')
2688 endpoint_address
= 'amqp://' + hostname
2689 endpoint_args
= 'push-endpoint=' + endpoint_address
+ '&amqp-exchange=' + exchange
+ '&amqp-ack-level=broker'
2690 topic_conf1
= PSTopicS3(master_zone
.conn
, topic_name
+'_1', zonegroup
.name
, endpoint_args
=endpoint_args
)
2691 topic_arn1
= topic_conf1
.set_config()
2692 topic_conf2
= PSTopicS3(master_zone
.conn
, topic_name
+'_2', zonegroup
.name
, endpoint_args
=endpoint_args
)
2693 topic_arn2
= topic_conf2
.set_config()
2694 topic_conf3
= PSTopicS3(master_zone
.conn
, topic_name
+'_3', zonegroup
.name
, endpoint_args
=endpoint_args
)
2695 topic_arn3
= topic_conf3
.set_config()
2697 # create s3 notifications
2698 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2699 topic_conf_list
= [{'Id': notification_name
+'_1', 'TopicArn': topic_arn1
,
2700 'Events': ['s3:ObjectCreated:*']
2702 {'Id': notification_name
+'_2', 'TopicArn': topic_arn2
,
2703 'Events': ['s3:ObjectCreated:Post']
2705 {'Id': notification_name
+'_3', 'TopicArn': topic_arn3
,
2706 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
2708 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
2709 response
, status
= s3_notification_conf
.set_config()
2710 assert_equal(status
/100, 2)
2712 # create objects in the bucket using multi-part upload
2713 fp
= tempfile
.TemporaryFile(mode
='w+b')
2714 content
= bytearray(os
.urandom(1024*1024))
2718 uploader
= bucket
.initiate_multipart_upload('multipart')
2719 uploader
.upload_part_from_file(fp
, 1)
2720 uploader
.complete_upload()
2723 print('wait for 5sec for the messages...')
2726 # check amqp receiver
2727 events
= receiver1
.get_and_reset_events()
2728 assert_equal(len(events
), 3)
2730 events
= receiver2
.get_and_reset_events()
2731 assert_equal(len(events
), 1)
2732 assert_equal(events
[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post')
2733 assert_equal(events
[0]['Records'][0]['s3']['configurationId'], notification_name
+'_2')
2735 events
= receiver3
.get_and_reset_events()
2736 assert_equal(len(events
), 1)
2737 assert_equal(events
[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
2738 assert_equal(events
[0]['Records'][0]['s3']['configurationId'], notification_name
+'_3')
2741 stop_amqp_receiver(receiver1
, task1
)
2742 stop_amqp_receiver(receiver2
, task2
)
2743 stop_amqp_receiver(receiver3
, task3
)
2744 s3_notification_conf
.del_config()
2745 topic_conf1
.del_config()
2746 topic_conf2
.del_config()
2747 topic_conf3
.del_config()
2748 for key
in bucket
.list():
2751 master_zone
.delete_bucket(bucket_name
)
2752 clean_rabbitmq(proc
)
2755 def test_ps_versioned_deletion():
2756 """ test notification of deletion markers """
2757 master_zone
, ps_zone
= init_env()
2758 bucket_name
= gen_bucket_name()
2759 topic_name
= bucket_name
+TOPIC_SUFFIX
2762 topic_conf1
= PSTopic(ps_zone
.conn
, topic_name
+'_1')
2763 _
, status
= topic_conf1
.set_config()
2764 assert_equal(status
/100, 2)
2765 topic_conf2
= PSTopic(ps_zone
.conn
, topic_name
+'_2')
2766 _
, status
= topic_conf2
.set_config()
2767 assert_equal(status
/100, 2)
2769 # create bucket on the first of the rados zones
2770 bucket
= master_zone
.create_bucket(bucket_name
)
2771 bucket
.configure_versioning(True)
2774 zone_meta_checkpoint(ps_zone
.zone
)
2776 # create notifications
2777 event_type1
= 'OBJECT_DELETE'
2778 notification_conf1
= PSNotification(ps_zone
.conn
, bucket_name
,
2781 _
, status
= notification_conf1
.set_config()
2782 assert_equal(status
/100, 2)
2783 event_type2
= 'DELETE_MARKER_CREATE'
2784 notification_conf2
= PSNotification(ps_zone
.conn
, bucket_name
,
2787 _
, status
= notification_conf2
.set_config()
2788 assert_equal(status
/100, 2)
2790 # create subscriptions
2791 sub_conf1
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_1',
2793 _
, status
= sub_conf1
.set_config()
2794 assert_equal(status
/100, 2)
2795 sub_conf2
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_2',
2797 _
, status
= sub_conf2
.set_config()
2798 assert_equal(status
/100, 2)
2800 # create objects in the bucket
2801 key
= bucket
.new_key('foo')
2802 key
.set_contents_from_string('bar')
2804 key
.set_contents_from_string('kaboom')
2806 # create deletion marker
2807 delete_marker_key
= bucket
.delete_key(key
.name
)
2810 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2812 # delete the deletion marker
2813 delete_marker_key
.delete()
2815 bucket
.delete_key(key
.name
, version_id
=v2
)
2816 bucket
.delete_key(key
.name
, version_id
=v1
)
2819 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2821 # get the delete events from the subscription
2822 result
, _
= sub_conf1
.get_events()
2823 events
= json
.loads(result
)
2824 for event
in events
['events']:
2825 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2826 assert_equal(str(event
['event']), event_type1
)
2828 result
, _
= sub_conf2
.get_events()
2829 events
= json
.loads(result
)
2830 for event
in events
['events']:
2831 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2832 assert_equal(str(event
['event']), event_type2
)
2835 # follwing is needed for the cleanup in the case of 3-zones
2836 # see: http://tracker.ceph.com/issues/39142
2838 zonegroup
= realm
.master_zonegroup()
2839 zonegroup_conns
= ZonegroupConns(zonegroup
)
2841 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
2842 master_zone
.delete_bucket(bucket_name
)
2844 log
.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
2845 sub_conf1
.del_config()
2846 sub_conf2
.del_config()
2847 notification_conf1
.del_config()
2848 notification_conf2
.del_config()
2849 topic_conf1
.del_config()
2850 topic_conf2
.del_config()
2853 def test_ps_s3_metadata_on_master():
2854 """ test s3 notification of metadata on master """
2856 return SkipTest("PubSub push tests don't run in teuthology")
2858 proc
= init_rabbitmq()
2860 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2861 master_zone
, _
= init_env(require_ps
=False)
2863 zonegroup
= realm
.master_zonegroup()
2866 bucket_name
= gen_bucket_name()
2867 bucket
= master_zone
.create_bucket(bucket_name
)
2868 topic_name
= bucket_name
+ TOPIC_SUFFIX
2870 # start amqp receiver
2872 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2876 endpoint_address
= 'amqp://' + hostname
2877 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
2878 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
2879 topic_arn
= topic_conf
.set_config()
2880 # create s3 notification
2881 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2883 meta_value
= 'This is my metadata value'
2884 meta_prefix
= 'x-amz-meta-'
2885 topic_conf_list
= [{'Id': notification_name
,'TopicArn': topic_arn
,
2886 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2889 'FilterRules': [{'Name': meta_prefix
+meta_key
, 'Value': meta_value
}]
2894 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
2895 response
, status
= s3_notification_conf
.set_config()
2896 assert_equal(status
/100, 2)
2898 # create objects in the bucket
2900 key
= bucket
.new_key(key_name
)
2901 key
.set_metadata(meta_key
, meta_value
)
2902 key
.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
2904 # create objects in the bucket using COPY
2905 bucket
.copy_key('copy_of_foo', bucket
.name
, key
.name
)
2906 # create objects in the bucket using multi-part upload
2907 fp
= tempfile
.TemporaryFile(mode
='w')
2908 fp
.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
2910 uploader
= bucket
.initiate_multipart_upload('multipart_foo',
2911 metadata
={meta_key
: meta_value
})
2912 fp
= tempfile
.TemporaryFile(mode
='r')
2913 uploader
.upload_part_from_file(fp
, 1)
2914 uploader
.complete_upload()
2916 print('wait for 5sec for the messages...')
2918 # check amqp receiver
2920 for event
in receiver
.get_and_reset_events():
2921 s3_event
= event
['Records'][0]['s3']
2922 assert_equal(s3_event
['object']['metadata'][0]['key'], meta_prefix
+meta_key
)
2923 assert_equal(s3_event
['object']['metadata'][0]['val'], meta_value
)
2926 # only PUT and POST has the metadata value
2927 assert_equal(event_count
, 2)
2930 for key
in bucket
.list():
2932 print('wait for 5sec for the messages...')
2934 # check amqp receiver
2936 for event
in receiver
.get_and_reset_events():
2937 s3_event
= event
['Records'][0]['s3']
2938 assert_equal(s3_event
['object']['metadata'][0]['key'], meta_prefix
+meta_key
)
2939 assert_equal(s3_event
['object']['metadata'][0]['val'], meta_value
)
2942 # all 3 object has metadata when deleted
2943 assert_equal(event_count
, 3)
2946 stop_amqp_receiver(receiver
, task
)
2947 s3_notification_conf
.del_config()
2948 topic_conf
.del_config()
2950 master_zone
.delete_bucket(bucket_name
)
2951 clean_rabbitmq(proc
)
2954 def test_ps_s3_tags_on_master():
2955 """ test s3 notification of tags on master """
2957 return SkipTest("PubSub push tests don't run in teuthology")
2959 proc
= init_rabbitmq()
2961 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2962 master_zone
, _
= init_env(require_ps
=False)
2964 zonegroup
= realm
.master_zonegroup()
2967 bucket_name
= gen_bucket_name()
2968 bucket
= master_zone
.create_bucket(bucket_name
)
2969 topic_name
= bucket_name
+ TOPIC_SUFFIX
2971 # start amqp receiver
2973 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2977 endpoint_address
= 'amqp://' + hostname
2978 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
2979 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
2980 topic_arn
= topic_conf
.set_config()
2981 # create s3 notification
2982 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2983 topic_conf_list
= [{'Id': notification_name
,'TopicArn': topic_arn
,
2984 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2987 'FilterRules': [{'Name': 'hello', 'Value': 'world'}]
2992 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
2993 response
, status
= s3_notification_conf
.set_config()
2994 assert_equal(status
/100, 2)
2996 # create objects in the bucket with tags
2997 tags
= 'hello=world&ka=boom'
2999 put_object_tagging(master_zone
.conn
, bucket_name
, key_name1
, tags
)
3000 tags
= 'foo=bar&ka=boom'
3002 put_object_tagging(master_zone
.conn
, bucket_name
, key_name2
, tags
)
3004 key
= bucket
.new_key(key_name3
)
3005 key
.set_contents_from_string('bar')
3006 # create objects in the bucket using COPY
3007 bucket
.copy_key('copy_of_'+key_name1
, bucket
.name
, key_name1
)
3008 print('wait for 5sec for the messages...')
3010 expected_tags
= [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
3011 # check amqp receiver
3012 for event
in receiver
.get_and_reset_events():
3013 obj_tags
= event
['Records'][0]['s3']['object']['tags']
3014 assert_equal(obj_tags
[0], expected_tags
[0])
3016 # delete the objects
3017 for key
in bucket
.list():
3019 print('wait for 5sec for the messages...')
3021 # check amqp receiver
3022 for event
in receiver
.get_and_reset_events():
3023 obj_tags
= event
['Records'][0]['s3']['object']['tags']
3024 assert_equal(obj_tags
[0], expected_tags
[0])
3027 stop_amqp_receiver(receiver
, task
)
3028 s3_notification_conf
.del_config()
3029 topic_conf
.del_config()
3031 master_zone
.delete_bucket(bucket_name
)
3032 clean_rabbitmq(proc
)
3035 def test_ps_s3_versioned_deletion_on_master():
3036 """ test s3 notification of deletion markers on master """
3038 return SkipTest("PubSub push tests don't run in teuthology")
3040 proc
= init_rabbitmq()
3042 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3043 master_zone
, _
= init_env(require_ps
=False)
3045 zonegroup
= realm
.master_zonegroup()
3048 bucket_name
= gen_bucket_name()
3049 bucket
= master_zone
.create_bucket(bucket_name
)
3050 bucket
.configure_versioning(True)
3051 topic_name
= bucket_name
+ TOPIC_SUFFIX
3053 # start amqp receiver
3055 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
3059 endpoint_address
= 'amqp://' + hostname
3060 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
3061 topic_conf
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
3062 topic_arn
= topic_conf
.set_config()
3063 # create s3 notification
3064 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
3065 # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
3066 topic_conf_list
= [{'Id': notification_name
+'_1', 'TopicArn': topic_arn
,
3067 'Events': ['s3:ObjectRemoved:*']
3069 {'Id': notification_name
+'_2', 'TopicArn': topic_arn
,
3070 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
3072 {'Id': notification_name
+'_3', 'TopicArn': topic_arn
,
3073 'Events': ['s3:ObjectRemoved:Delete']
3075 s3_notification_conf
= PSNotificationS3(master_zone
.conn
, bucket_name
, topic_conf_list
)
3076 response
, status
= s3_notification_conf
.set_config()
3077 assert_equal(status
/100, 2)
3079 # create objects in the bucket
3080 key
= bucket
.new_key('foo')
3081 key
.set_contents_from_string('bar')
3083 key
.set_contents_from_string('kaboom')
3085 # create delete marker (non versioned deletion)
3086 delete_marker_key
= bucket
.delete_key(key
.name
)
3090 # versioned deletion
3091 bucket
.delete_key(key
.name
, version_id
=v2
)
3092 bucket
.delete_key(key
.name
, version_id
=v1
)
3093 delete_marker_key
.delete()
3095 print('wait for 5sec for the messages...')
3098 # check amqp receiver
3099 events
= receiver
.get_and_reset_events()
3101 delete_marker_create_events
= 0
3102 for event_list
in events
:
3103 for event
in event_list
['Records']:
3104 if event
['eventName'] == 's3:ObjectRemoved:Delete':
3106 assert event
['s3']['configurationId'] in [notification_name
+'_1', notification_name
+'_3']
3107 if event
['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
3108 delete_marker_create_events
+= 1
3109 assert event
['s3']['configurationId'] in [notification_name
+'_1', notification_name
+'_2']
3111 # 3 key versions were deleted (v1, v2 and the deletion marker)
3112 # notified over the same topic via 2 notifications (1,3)
3113 assert_equal(delete_events
, 3*2)
3114 # 1 deletion marker was created
3115 # notified over the same topic over 2 notifications (1,2)
3116 assert_equal(delete_marker_create_events
, 1*2)
3119 stop_amqp_receiver(receiver
, task
)
3120 s3_notification_conf
.del_config()
3121 topic_conf
.del_config()
3123 master_zone
.delete_bucket(bucket_name
)
3124 clean_rabbitmq(proc
)
3127 def test_ps_push_http():
3128 """ test pushing to http endpoint """
3130 return SkipTest("PubSub push tests don't run in teuthology")
3131 master_zone
, ps_zone
= init_env()
3132 bucket_name
= gen_bucket_name()
3133 topic_name
= bucket_name
+TOPIC_SUFFIX
3135 # create random port for the http server
3137 port
= random
.randint(10000, 20000)
3138 # start an http server in a separate thread
3139 http_server
= StreamingHTTPServer(host
, port
)
3142 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
3143 _
, status
= topic_conf
.set_config()
3144 assert_equal(status
/100, 2)
3145 # create bucket on the first of the rados zones
3146 bucket
= master_zone
.create_bucket(bucket_name
)
3148 zone_meta_checkpoint(ps_zone
.zone
)
3149 # create notifications
3150 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
3152 _
, status
= notification_conf
.set_config()
3153 assert_equal(status
/100, 2)
3154 # create subscription
3155 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
3156 topic_name
, endpoint
='http://'+host
+':'+str(port
))
3157 _
, status
= sub_conf
.set_config()
3158 assert_equal(status
/100, 2)
3159 # create objects in the bucket
3160 number_of_objects
= 10
3161 for i
in range(number_of_objects
):
3162 key
= bucket
.new_key(str(i
))
3163 key
.set_contents_from_string('bar')
3165 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3167 keys
= list(bucket
.list())
3168 # TODO: use exact match
3169 http_server
.verify_events(keys
, exact_match
=False)
3171 # delete objects from the bucket
3172 for key
in bucket
.list():
3175 zone_meta_checkpoint(ps_zone
.zone
)
3176 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3178 # TODO: use exact match
3179 http_server
.verify_events(keys
, deletions
=True, exact_match
=False)
3182 sub_conf
.del_config()
3183 notification_conf
.del_config()
3184 topic_conf
.del_config()
3185 master_zone
.delete_bucket(bucket_name
)
3189 def test_ps_s3_push_http():
3190 """ test pushing to http endpoint s3 record format"""
3192 return SkipTest("PubSub push tests don't run in teuthology")
3193 master_zone
, ps_zone
= init_env()
3194 bucket_name
= gen_bucket_name()
3195 topic_name
= bucket_name
+TOPIC_SUFFIX
3197 # create random port for the http server
3199 port
= random
.randint(10000, 20000)
3200 # start an http server in a separate thread
3201 http_server
= StreamingHTTPServer(host
, port
)
3204 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
3205 endpoint
='http://'+host
+':'+str(port
))
3206 result
, status
= topic_conf
.set_config()
3207 assert_equal(status
/100, 2)
3208 parsed_result
= json
.loads(result
)
3209 topic_arn
= parsed_result
['arn']
3210 # create bucket on the first of the rados zones
3211 bucket
= master_zone
.create_bucket(bucket_name
)
3213 zone_meta_checkpoint(ps_zone
.zone
)
3214 # create s3 notification
3215 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
3216 topic_conf_list
= [{'Id': notification_name
,
3217 'TopicArn': topic_arn
,
3218 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
3220 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3221 _
, status
= s3_notification_conf
.set_config()
3222 assert_equal(status
/100, 2)
3223 # create objects in the bucket
3224 number_of_objects
= 10
3225 for i
in range(number_of_objects
):
3226 key
= bucket
.new_key(str(i
))
3227 key
.set_contents_from_string('bar')
3229 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3231 keys
= list(bucket
.list())
3232 # TODO: use exact match
3233 http_server
.verify_s3_events(keys
, exact_match
=False)
3235 # delete objects from the bucket
3236 for key
in bucket
.list():
3239 zone_meta_checkpoint(ps_zone
.zone
)
3240 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3242 # TODO: use exact match
3243 http_server
.verify_s3_events(keys
, deletions
=True, exact_match
=False)
3246 s3_notification_conf
.del_config()
3247 topic_conf
.del_config()
3248 master_zone
.delete_bucket(bucket_name
)
3252 def test_ps_push_amqp():
3253 """ test pushing to amqp endpoint """
3255 return SkipTest("PubSub push tests don't run in teuthology")
3257 proc
= init_rabbitmq()
3259 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3260 master_zone
, ps_zone
= init_env()
3261 bucket_name
= gen_bucket_name()
3262 topic_name
= bucket_name
+TOPIC_SUFFIX
3266 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
3268 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
3269 _
, status
= topic_conf
.set_config()
3270 assert_equal(status
/100, 2)
3271 # create bucket on the first of the rados zones
3272 bucket
= master_zone
.create_bucket(bucket_name
)
3274 zone_meta_checkpoint(ps_zone
.zone
)
3275 # create notifications
3276 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
3278 _
, status
= notification_conf
.set_config()
3279 assert_equal(status
/100, 2)
3280 # create subscription
3281 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
3282 topic_name
, endpoint
='amqp://'+hostname
,
3283 endpoint_args
='amqp-exchange='+exchange
+'&amqp-ack-level=broker')
3284 _
, status
= sub_conf
.set_config()
3285 assert_equal(status
/100, 2)
3286 # create objects in the bucket
3287 number_of_objects
= 10
3288 for i
in range(number_of_objects
):
3289 key
= bucket
.new_key(str(i
))
3290 key
.set_contents_from_string('bar')
3292 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3293 # check amqp receiver
3294 keys
= list(bucket
.list())
3295 # TODO: use exact match
3296 receiver
.verify_events(keys
, exact_match
=False)
3298 # delete objects from the bucket
3299 for key
in bucket
.list():
3302 zone_meta_checkpoint(ps_zone
.zone
)
3303 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3304 # check amqp receiver
3305 # TODO: use exact match
3306 receiver
.verify_events(keys
, deletions
=True, exact_match
=False)
3309 stop_amqp_receiver(receiver
, task
)
3310 sub_conf
.del_config()
3311 notification_conf
.del_config()
3312 topic_conf
.del_config()
3313 master_zone
.delete_bucket(bucket_name
)
3314 clean_rabbitmq(proc
)
3317 def test_ps_s3_push_amqp():
3318 """ test pushing to amqp endpoint s3 record format"""
3320 return SkipTest("PubSub push tests don't run in teuthology")
3322 proc
= init_rabbitmq()
3324 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3325 master_zone
, ps_zone
= init_env()
3326 bucket_name
= gen_bucket_name()
3327 topic_name
= bucket_name
+TOPIC_SUFFIX
3331 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
3333 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
3334 endpoint
='amqp://' + hostname
,
3335 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
3336 result
, status
= topic_conf
.set_config()
3337 assert_equal(status
/100, 2)
3338 parsed_result
= json
.loads(result
)
3339 topic_arn
= parsed_result
['arn']
3340 # create bucket on the first of the rados zones
3341 bucket
= master_zone
.create_bucket(bucket_name
)
3343 zone_meta_checkpoint(ps_zone
.zone
)
3344 # create s3 notification
3345 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
3346 topic_conf_list
= [{'Id': notification_name
,
3347 'TopicArn': topic_arn
,
3348 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
3350 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3351 _
, status
= s3_notification_conf
.set_config()
3352 assert_equal(status
/100, 2)
3353 # create objects in the bucket
3354 number_of_objects
= 10
3355 for i
in range(number_of_objects
):
3356 key
= bucket
.new_key(str(i
))
3357 key
.set_contents_from_string('bar')
3359 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3360 # check amqp receiver
3361 keys
= list(bucket
.list())
3362 # TODO: use exact match
3363 receiver
.verify_s3_events(keys
, exact_match
=False)
3365 # delete objects from the bucket
3366 for key
in bucket
.list():
3369 zone_meta_checkpoint(ps_zone
.zone
)
3370 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3371 # check amqp receiver
3372 # TODO: use exact match
3373 receiver
.verify_s3_events(keys
, deletions
=True, exact_match
=False)
3376 stop_amqp_receiver(receiver
, task
)
3377 s3_notification_conf
.del_config()
3378 topic_conf
.del_config()
3379 master_zone
.delete_bucket(bucket_name
)
3380 clean_rabbitmq(proc
)
3383 def test_ps_delete_bucket():
3384 """ test notification status upon bucket deletion """
3385 master_zone
, ps_zone
= init_env()
3386 bucket_name
= gen_bucket_name()
3387 # create bucket on the first of the rados zones
3388 bucket
= master_zone
.create_bucket(bucket_name
)
3390 zone_meta_checkpoint(ps_zone
.zone
)
3391 topic_name
= bucket_name
+ TOPIC_SUFFIX
3393 topic_name
= bucket_name
+ TOPIC_SUFFIX
3394 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
3395 response
, status
= topic_conf
.set_config()
3396 assert_equal(status
/100, 2)
3397 parsed_result
= json
.loads(response
)
3398 topic_arn
= parsed_result
['arn']
3399 # create one s3 notification
3400 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
3401 topic_conf_list
= [{'Id': notification_name
,
3402 'TopicArn': topic_arn
,
3403 'Events': ['s3:ObjectCreated:*']
3405 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3406 response
, status
= s3_notification_conf
.set_config()
3407 assert_equal(status
/100, 2)
3409 # create non-s3 notification
3410 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
3412 _
, status
= notification_conf
.set_config()
3413 assert_equal(status
/100, 2)
3415 # create objects in the bucket
3416 number_of_objects
= 10
3417 for i
in range(number_of_objects
):
3418 key
= bucket
.new_key(str(i
))
3419 key
.set_contents_from_string('bar')
3420 # wait for bucket sync
3421 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3422 keys
= list(bucket
.list())
3423 # delete objects from the bucket
3424 for key
in bucket
.list():
3426 # wait for bucket sync
3427 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3429 master_zone
.delete_bucket(bucket_name
)
3430 # wait for meta sync
3431 zone_meta_checkpoint(ps_zone
.zone
)
3433 # get the events from the auto-generated subscription
3434 sub_conf
= PSSubscription(ps_zone
.conn
, notification_name
,
3436 result
, _
= sub_conf
.get_events()
3437 records
= json
.loads(result
)
3438 # TODO: use exact match
3439 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
3441 # s3 notification is deleted with bucket
3442 _
, status
= s3_notification_conf
.get_config(notification
=notification_name
)
3443 assert_equal(status
, 404)
3444 # non-s3 notification is deleted with bucket
3445 _
, status
= notification_conf
.get_config()
3446 assert_equal(status
, 404)
3448 sub_conf
.del_config()
3449 topic_conf
.del_config()
3452 def test_ps_missing_topic():
3453 """ test creating a subscription when no topic info exists"""
3454 master_zone
, ps_zone
= init_env()
3455 bucket_name
= gen_bucket_name()
3456 topic_name
= bucket_name
+TOPIC_SUFFIX
3458 # create bucket on the first of the rados zones
3459 master_zone
.create_bucket(bucket_name
)
3461 zone_meta_checkpoint(ps_zone
.zone
)
3462 # create s3 notification
3463 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
3464 topic_arn
= 'arn:aws:sns:::' + topic_name
3465 topic_conf_list
= [{'Id': notification_name
,
3466 'TopicArn': topic_arn
,
3467 'Events': ['s3:ObjectCreated:*']
3469 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3471 s3_notification_conf
.set_config()
3473 log
.info('missing topic is expected')
3475 assert 'missing topic is expected'
3478 master_zone
.delete_bucket(bucket_name
)
3481 def test_ps_s3_topic_update():
3482 """ test updating topic associated with a notification"""
3484 return SkipTest("PubSub push tests don't run in teuthology")
3485 rabbit_proc
= init_rabbitmq()
3486 if rabbit_proc
is None:
3487 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3488 master_zone
, ps_zone
= init_env()
3489 bucket_name
= gen_bucket_name()
3490 topic_name
= bucket_name
+TOPIC_SUFFIX
3495 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
3497 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
3498 endpoint
='amqp://' + hostname
,
3499 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
3500 result
, status
= topic_conf
.set_config()
3501 assert_equal(status
/100, 2)
3502 parsed_result
= json
.loads(result
)
3503 topic_arn
= parsed_result
['arn']
3505 result
, _
= topic_conf
.get_config()
3506 # verify topic content
3507 parsed_result
= json
.loads(result
)
3508 assert_equal(parsed_result
['topic']['name'], topic_name
)
3509 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], topic_conf
.parameters
['push-endpoint'])
3511 # create http server
3512 port
= random
.randint(10000, 20000)
3513 # start an http server in a separate thread
3514 http_server
= StreamingHTTPServer(hostname
, port
)
3516 # create bucket on the first of the rados zones
3517 bucket
= master_zone
.create_bucket(bucket_name
)
3519 zone_meta_checkpoint(ps_zone
.zone
)
3520 # create s3 notification
3521 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
3522 topic_conf_list
= [{'Id': notification_name
,
3523 'TopicArn': topic_arn
,
3524 'Events': ['s3:ObjectCreated:*']
3526 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3527 _
, status
= s3_notification_conf
.set_config()
3528 assert_equal(status
/100, 2)
3529 # create objects in the bucket
3530 number_of_objects
= 10
3531 for i
in range(number_of_objects
):
3532 key
= bucket
.new_key(str(i
))
3533 key
.set_contents_from_string('bar')
3535 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3537 keys
= list(bucket
.list())
3538 # TODO: use exact match
3539 receiver
.verify_s3_events(keys
, exact_match
=False)
3541 # update the same topic with new endpoint
3542 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
3543 endpoint
='http://'+ hostname
+ ':' + str(port
))
3544 _
, status
= topic_conf
.set_config()
3545 assert_equal(status
/100, 2)
3547 result
, _
= topic_conf
.get_config()
3548 # verify topic content
3549 parsed_result
= json
.loads(result
)
3550 assert_equal(parsed_result
['topic']['name'], topic_name
)
3551 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], topic_conf
.parameters
['push-endpoint'])
3553 # delete current objects and create new objects in the bucket
3554 for key
in bucket
.list():
3556 for i
in range(number_of_objects
):
3557 key
= bucket
.new_key(str(i
+100))
3558 key
.set_contents_from_string('bar')
3560 zone_meta_checkpoint(ps_zone
.zone
)
3561 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3563 keys
= list(bucket
.list())
3564 # verify that notifications are still sent to amqp
3565 # TODO: use exact match
3566 receiver
.verify_s3_events(keys
, exact_match
=False)
3568 # update notification to update the endpoint from the topic
3569 topic_conf_list
= [{'Id': notification_name
,
3570 'TopicArn': topic_arn
,
3571 'Events': ['s3:ObjectCreated:*']
3573 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3574 _
, status
= s3_notification_conf
.set_config()
3575 assert_equal(status
/100, 2)
3577 # delete current objects and create new objects in the bucket
3578 for key
in bucket
.list():
3580 for i
in range(number_of_objects
):
3581 key
= bucket
.new_key(str(i
+200))
3582 key
.set_contents_from_string('bar')
3584 zone_meta_checkpoint(ps_zone
.zone
)
3585 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3587 keys
= list(bucket
.list())
3588 # check that updates switched to http
3589 # TODO: use exact match
3590 http_server
.verify_s3_events(keys
, exact_match
=False)
3593 # delete objects from the bucket
3594 stop_amqp_receiver(receiver
, amqp_task
)
3595 for key
in bucket
.list():
3597 s3_notification_conf
.del_config()
3598 topic_conf
.del_config()
3599 master_zone
.delete_bucket(bucket_name
)
3601 clean_rabbitmq(rabbit_proc
)
3604 def test_ps_s3_notification_update():
3605 """ test updating the topic of a notification"""
3607 return SkipTest("PubSub push tests don't run in teuthology")
3609 rabbit_proc
= init_rabbitmq()
3610 if rabbit_proc
is None:
3611 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3613 master_zone
, ps_zone
= init_env()
3614 bucket_name
= gen_bucket_name()
3615 topic_name1
= bucket_name
+'amqp'+TOPIC_SUFFIX
3616 topic_name2
= bucket_name
+'http'+TOPIC_SUFFIX
3619 # start amqp receiver in a separate thread
3621 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name1
)
3623 # create random port for the http server
3624 http_port
= random
.randint(10000, 20000)
3625 # start an http server in a separate thread
3626 http_server
= StreamingHTTPServer(hostname
, http_port
)
3628 topic_conf1
= PSTopic(ps_zone
.conn
, topic_name1
,
3629 endpoint
='amqp://' + hostname
,
3630 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
3631 result
, status
= topic_conf1
.set_config()
3632 parsed_result
= json
.loads(result
)
3633 topic_arn1
= parsed_result
['arn']
3634 assert_equal(status
/100, 2)
3635 topic_conf2
= PSTopic(ps_zone
.conn
, topic_name2
,
3636 endpoint
='http://'+hostname
+':'+str(http_port
))
3637 result
, status
= topic_conf2
.set_config()
3638 parsed_result
= json
.loads(result
)
3639 topic_arn2
= parsed_result
['arn']
3640 assert_equal(status
/100, 2)
3642 # create bucket on the first of the rados zones
3643 bucket
= master_zone
.create_bucket(bucket_name
)
3645 zone_meta_checkpoint(ps_zone
.zone
)
3646 # create s3 notification with topic1
3647 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
3648 topic_conf_list
= [{'Id': notification_name
,
3649 'TopicArn': topic_arn1
,
3650 'Events': ['s3:ObjectCreated:*']
3652 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3653 _
, status
= s3_notification_conf
.set_config()
3654 assert_equal(status
/100, 2)
3655 # create objects in the bucket
3656 number_of_objects
= 10
3657 for i
in range(number_of_objects
):
3658 key
= bucket
.new_key(str(i
))
3659 key
.set_contents_from_string('bar')
3661 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3663 keys
= list(bucket
.list())
3664 # TODO: use exact match
3665 receiver
.verify_s3_events(keys
, exact_match
=False);
3667 # update notification to use topic2
3668 topic_conf_list
= [{'Id': notification_name
,
3669 'TopicArn': topic_arn2
,
3670 'Events': ['s3:ObjectCreated:*']
3672 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3673 _
, status
= s3_notification_conf
.set_config()
3674 assert_equal(status
/100, 2)
3676 # delete current objects and create new objects in the bucket
3677 for key
in bucket
.list():
3679 for i
in range(number_of_objects
):
3680 key
= bucket
.new_key(str(i
+100))
3681 key
.set_contents_from_string('bar')
3683 zone_meta_checkpoint(ps_zone
.zone
)
3684 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3686 keys
= list(bucket
.list())
3687 # check that updates switched to http
3688 # TODO: use exact match
3689 http_server
.verify_s3_events(keys
, exact_match
=False)
3692 # delete objects from the bucket
3693 stop_amqp_receiver(receiver
, amqp_task
)
3694 for key
in bucket
.list():
3696 s3_notification_conf
.del_config()
3697 topic_conf1
.del_config()
3698 topic_conf2
.del_config()
3699 master_zone
.delete_bucket(bucket_name
)
3701 clean_rabbitmq(rabbit_proc
)
3704 def test_ps_s3_multiple_topics_notification():
3705 """ test notification creation with multiple topics"""
3707 return SkipTest("PubSub push tests don't run in teuthology")
3709 rabbit_proc
= init_rabbitmq()
3710 if rabbit_proc
is None:
3711 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3713 master_zone
, ps_zone
= init_env()
3714 bucket_name
= gen_bucket_name()
3715 topic_name1
= bucket_name
+'amqp'+TOPIC_SUFFIX
3716 topic_name2
= bucket_name
+'http'+TOPIC_SUFFIX
3719 # start amqp receiver in a separate thread
3721 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name1
)
3723 # create random port for the http server
3724 http_port
= random
.randint(10000, 20000)
3725 # start an http server in a separate thread
3726 http_server
= StreamingHTTPServer(hostname
, http_port
)
3728 topic_conf1
= PSTopic(ps_zone
.conn
, topic_name1
,
3729 endpoint
='amqp://' + hostname
,
3730 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
3731 result
, status
= topic_conf1
.set_config()
3732 parsed_result
= json
.loads(result
)
3733 topic_arn1
= parsed_result
['arn']
3734 assert_equal(status
/100, 2)
3735 topic_conf2
= PSTopic(ps_zone
.conn
, topic_name2
,
3736 endpoint
='http://'+hostname
+':'+str(http_port
))
3737 result
, status
= topic_conf2
.set_config()
3738 parsed_result
= json
.loads(result
)
3739 topic_arn2
= parsed_result
['arn']
3740 assert_equal(status
/100, 2)
3742 # create bucket on the first of the rados zones
3743 bucket
= master_zone
.create_bucket(bucket_name
)
3745 zone_meta_checkpoint(ps_zone
.zone
)
3746 # create s3 notification
3747 notification_name1
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_1'
3748 notification_name2
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_2'
3751 'Id': notification_name1
,
3752 'TopicArn': topic_arn1
,
3753 'Events': ['s3:ObjectCreated:*']
3756 'Id': notification_name2
,
3757 'TopicArn': topic_arn2
,
3758 'Events': ['s3:ObjectCreated:*']
3760 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
3761 _
, status
= s3_notification_conf
.set_config()
3762 assert_equal(status
/100, 2)
3763 result
, _
= s3_notification_conf
.get_config()
3764 assert_equal(len(result
['TopicConfigurations']), 2)
3765 assert_equal(result
['TopicConfigurations'][0]['Id'], notification_name1
)
3766 assert_equal(result
['TopicConfigurations'][1]['Id'], notification_name2
)
3768 # get auto-generated subscriptions
3769 sub_conf1
= PSSubscription(ps_zone
.conn
, notification_name1
,
3771 _
, status
= sub_conf1
.get_config()
3772 assert_equal(status
/100, 2)
3773 sub_conf2
= PSSubscription(ps_zone
.conn
, notification_name2
,
3775 _
, status
= sub_conf2
.get_config()
3776 assert_equal(status
/100, 2)
3778 # create objects in the bucket
3779 number_of_objects
= 10
3780 for i
in range(number_of_objects
):
3781 key
= bucket
.new_key(str(i
))
3782 key
.set_contents_from_string('bar')
3784 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
3786 # get the events from both of the subscription
3787 result
, _
= sub_conf1
.get_events()
3788 records
= json
.loads(result
)
3789 for record
in records
['Records']:
3791 keys
= list(bucket
.list())
3792 # TODO: use exact match
3793 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
3794 receiver
.verify_s3_events(keys
, exact_match
=False)
3796 result
, _
= sub_conf2
.get_events()
3797 parsed_result
= json
.loads(result
)
3798 for record
in parsed_result
['Records']:
3800 keys
= list(bucket
.list())
3801 # TODO: use exact match
3802 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
3803 http_server
.verify_s3_events(keys
, exact_match
=False)
3806 stop_amqp_receiver(receiver
, amqp_task
)
3807 s3_notification_conf
.del_config()
3808 topic_conf1
.del_config()
3809 topic_conf2
.del_config()
3810 # delete objects from the bucket
3811 for key
in bucket
.list():
3813 master_zone
.delete_bucket(bucket_name
)
3815 clean_rabbitmq(rabbit_proc
)