]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/rgw/rgw_multi/tests_ps.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests_ps.py
index 1d436c5b35adba627015f39a3b563975a43037c4..f2f27ff8f8036e0872336744c3fad44d02d294af 100644 (file)
@@ -26,10 +26,7 @@ from .zone_ps import PSTopic, \
     PSSubscription, \
     PSNotificationS3, \
     print_connection_info, \
-    delete_all_s3_topics, \
-    put_object_tagging, \
-    get_object_tagging, \
-    delete_all_objects
+    get_object_tagging
 from .multisite import User
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
@@ -849,219 +846,7 @@ def test_ps_s3_notification():
     master_zone.delete_bucket(bucket_name)
 
 
-def test_ps_s3_topic_on_master():
-    """ test s3 topics set/get/delete on master """
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    bucket_name = gen_bucket_name()
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # clean all topics
-    delete_all_s3_topics(master_zone, zonegroup.name)
-   
-    # create s3 topics
-    endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf1.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1')
-
-    endpoint_address = 'http://127.0.0.1:9001'
-    endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf2.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2')
-    endpoint_address = 'http://127.0.0.1:9002'
-    endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf3.set_config()
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
-
-    # get topic 3
-    result, status = topic_conf3.get_config()
-    assert_equal(status, 200)
-    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
-    assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
-    # Note that endpoint args may be ordered differently in the result
-    result = topic_conf3.get_attributes()
-    assert_equal(topic_arn, result['Attributes']['TopicArn'])
-    json_endpoint = json.loads(result['Attributes']['EndPoint'])
-    assert_equal(endpoint_address, json_endpoint['EndpointAddress'])
-
-    # delete topic 1
-    result = topic_conf1.del_config()
-    assert_equal(status, 200)
-
-    # try to get a deleted topic
-    _, status = topic_conf1.get_config()
-    assert_equal(status, 404)
-    try:
-        topic_conf1.get_attributes()
-    except:
-        print('topic already deleted - this is expected')
-    else:
-        assert False, 'topic 1 should be deleted at this point'
-
-    # get the remaining 2 topics
-    result, status = topic_conf1.get_list()
-    assert_equal(status, 200)
-    assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
-    
-    # delete topics
-    result = topic_conf2.del_config()
-    # TODO: should be 200OK
-    # assert_equal(status, 200)
-    result = topic_conf3.del_config()
-    # TODO: should be 200OK
-    # assert_equal(status, 200)
-
-    # get topic list, make sure it is empty
-    result, status = topic_conf1.get_list()
-    assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
-
-
-def test_ps_s3_topic_with_secret_on_master():
-    """ test s3 topics with secret set/get/delete on master """
-    master_zone, _ = init_env(require_ps=False)
-    if master_zone.secure_conn is None:
-        return SkipTest('secure connection is needed to test topic with secrets')
-
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    bucket_name = gen_bucket_name()
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # clean all topics
-    delete_all_s3_topics(master_zone, zonegroup.name)
-   
-    # create s3 topics
-    endpoint_address = 'amqp://user:password@127.0.0.1:7001'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    bad_topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    try:
-        result = bad_topic_conf.set_config()
-    except Exception as err:
-        print('Error is expected: ' + str(err))
-    else:
-        assert False, 'user password configuration set allowed only over HTTPS'
-    
-    topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-
-    assert_equal(topic_arn,
-                 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
-
-    _, status = bad_topic_conf.get_config()
-    assert_equal(status/100, 4)
-
-    # get topic
-    result, status = topic_conf.get_config()
-    assert_equal(status, 200)
-    assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
-    assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
-    
-    _, status = bad_topic_conf.get_config()
-    assert_equal(status/100, 4)
-    
-    _, status = topic_conf.get_list()
-    assert_equal(status/100, 2)
-
-    # delete topics
-    result = topic_conf.del_config()
-
-
-def test_ps_s3_notification_on_master():
-    """ test s3 notification set/get/delete on master """
-    master_zone, _  = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    bucket_name = gen_bucket_name()
-    # create bucket
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    # create s3 topic
-    endpoint_address = 'amqp://127.0.0.1:7001'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn1 = topic_conf1.set_config()
-    topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn2 = topic_conf2.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name+'_1',
-                        'TopicArn': topic_arn1,
-                        'Events': ['s3:ObjectCreated:*']
-                       },
-                       {'Id': notification_name+'_2',
-                        'TopicArn': topic_arn1,
-                        'Events': ['s3:ObjectRemoved:*']
-                       },
-                       {'Id': notification_name+'_3',
-                        'TopicArn': topic_arn1,
-                        'Events': []
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # get notifications on a bucket
-    response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
-    assert_equal(status/100, 2)
-    assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn1)
-
-    # delete specific notifications
-    _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
-    assert_equal(status/100, 2)
-
-    # get the remaining 2 notifications on a bucket
-    response, status = s3_notification_conf.get_config()
-    assert_equal(status/100, 2)
-    assert_equal(len(response['TopicConfigurations']), 2)
-    assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn1)
-    assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn1)
-
-    # delete remaining notifications
-    _, status = s3_notification_conf.del_config()
-    assert_equal(status/100, 2)
-
-    # make sure that the notifications are now deleted
-    response, status = s3_notification_conf.get_config()
-    try:
-        dummy = response['TopicConfigurations']
-    except:
-        print('"TopicConfigurations" is not in response')
-    else:
-        assert False, '"TopicConfigurations" should not be in response'
-
-    # create another s3 notification
-    topic_conf_list = [{'Id': notification_name+'_1',
-                        'TopicArn': topic_arn1,
-                        'Events': ['s3:ObjectCreated:*']
-                       }]
-    _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # make sure the notification and auto-genrated topic are deleted
-    response, status = topic_conf1.get_list()
-    topics = response['ListTopicsResponse']['ListTopicsResult']['Topics']['member']
-    before_delete = len(topics)
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    response, status = topic_conf2.get_list()
-    topics = response['ListTopicsResponse']['ListTopicsResult']['Topics']['member']
-    after_delete = len(topics)
-    assert_equal(before_delete - after_delete, 3)
-
-    # cleanup
-    topic_conf1.del_config()
-    topic_conf2.del_config()
-
-
-def ps_s3_notification_filter(on_master):
+def test_ps_s3_notification_filter():
     """ test s3 notification filter on master """
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
@@ -1069,12 +854,9 @@ def ps_s3_notification_filter(on_master):
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    if on_master:
-        master_zone, _ = init_env(require_ps=False)
-        ps_zone = master_zone
-    else:
-        master_zone, ps_zone = init_env(require_ps=True)
-        ps_zone = ps_zone
+        
+    master_zone, ps_zone = init_env(require_ps=True)
+    ps_zone = ps_zone
 
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
@@ -1092,15 +874,12 @@ def ps_s3_notification_filter(on_master):
     # create s3 topic
     endpoint_address = 'amqp://' + hostname
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    if on_master:
-        topic_conf = PSTopicS3(ps_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-        topic_arn = topic_conf.set_config()
-    else:
-        topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
-        result, _ = topic_conf.set_config()
-        parsed_result = json.loads(result)
-        topic_arn = parsed_result['arn']
-        zone_meta_checkpoint(ps_zone.zone)
+   
+    topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+    result, _ = topic_conf.set_config()
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
+    zone_meta_checkpoint(ps_zone.zone)
 
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -1137,33 +916,8 @@ def ps_s3_notification_filter(on_master):
     result, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
-    if on_master:
-        topic_conf_list = [{'Id': notification_name+'_4',
-                            'TopicArn': topic_arn,
-                            'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
-                           'Filter': {
-                               'Metadata': {
-                                    'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
-                                                    {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
-                               },
-                                'Key': {
-                                    'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
-                                }
-                            }
-                            }]
-
-        try:
-            s3_notification_conf4 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
-            _, status = s3_notification_conf4.set_config()
-            assert_equal(status/100, 2)
-            skip_notif4 = False
-        except Exception as error:
-            print('note: metadata filter is not supported by boto3 - skipping test')
-            skip_notif4 = True
-    else:
-        print('filtering by attributes only supported on master zone')
-        skip_notif4 = True
-
+    print('filtering by attributes only supported on master zone')
+    skip_notif4 = True
 
     # get all notifications
     result, status = s3_notification_conf.get_config()
@@ -1211,11 +965,7 @@ def ps_s3_notification_filter(on_master):
         key = bucket.new_key(key_name)
         key.set_contents_from_string('bar')
 
-    if on_master:
-        print('wait for 5sec for the messages...')
-        time.sleep(5)
-    else:
-        zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     found_in1 = []
     found_in2 = []
@@ -1255,112 +1005,7 @@ def ps_s3_notification_filter(on_master):
     clean_rabbitmq(proc)
 
 
-def test_ps_s3_notification_filter_on_master():
-    ps_s3_notification_filter(on_master=True)
-
-
-def test_ps_s3_notification_filter():
-    ps_s3_notification_filter(on_master=False)
-
-
-def test_ps_s3_notification_errors_on_master():
-    """ test s3 notification set/get/delete on master """
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    bucket_name = gen_bucket_name()
-    # create bucket
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    # create s3 topic
-    endpoint_address = 'amqp://127.0.0.1:7001'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-
-    # create s3 notification with invalid event name
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:Kaboom']
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    try:
-      result, status = s3_notification_conf.set_config()
-    except Exception as error:
-      print(str(error) + ' - is expected')
-    else:
-      assert False, 'invalid event name is expected to fail'
-
-    # create s3 notification with missing name
-    topic_conf_list = [{'Id': '',
-                        'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:Put']
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    try:
-      _, _ = s3_notification_conf.set_config()
-    except Exception as error:
-      print(str(error) + ' - is expected')
-    else:
-      assert False, 'missing notification name is expected to fail'
-
-    # create s3 notification with invalid topic ARN
-    invalid_topic_arn = 'kaboom'
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': invalid_topic_arn,
-                        'Events': ['s3:ObjectCreated:Put']
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    try:
-      _, _ = s3_notification_conf.set_config()
-    except Exception as error:
-      print(str(error) + ' - is expected')
-    else:
-      assert False, 'invalid ARN is expected to fail'
-
-    # create s3 notification with unknown topic ARN
-    invalid_topic_arn = 'arn:aws:sns:a::kaboom'
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': invalid_topic_arn ,
-                        'Events': ['s3:ObjectCreated:Put']
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    try:
-      _, _ = s3_notification_conf.set_config()
-    except Exception as error:
-      print(str(error) + ' - is expected')
-    else:
-      assert False, 'unknown topic is expected to fail'
-
-    # create s3 notification with wrong bucket
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:Put']
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, 'kaboom', topic_conf_list)
-    try:
-      _, _ = s3_notification_conf.set_config()
-    except Exception as error:
-      print(str(error) + ' - is expected')
-    else:
-      assert False, 'unknown bucket is expected to fail'
-
-    topic_conf.del_config()
-
-    status = topic_conf.del_config()
-    # deleting an unknown notification is not considered an error
-    assert_equal(status, 200)
-    
-    _, status = topic_conf.get_config()
-    assert_equal(status, 404)
-    
-    # cleanup
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-
-
-def test_objcet_timing():
+def test_object_timing():
     return SkipTest("only used in manual testing")
     master_zone, _ = init_env(require_ps=False)
     
@@ -1401,1401 +1046,79 @@ def test_objcet_timing():
     master_zone.delete_bucket(bucket_name)
 
 
-def test_ps_s3_notification_push_amqp_on_master():
-    """ test pushing amqp s3 notification on master """
+def test_ps_s3_opaque_data():
+    """ test that opaque id set in topic, is sent in notification """
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
     hostname = get_ip()
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    master_zone, _ = init_env(require_ps=False)
+    master_zone, ps_zone = init_env()
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+    
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = master_zone.create_bucket(bucket_name)
-    topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
-    topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
-
-    # start amqp receivers
-    exchange = 'ex1'
-    task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
-    task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
-    task1.start()
-    task2.start()
+    topic_name = bucket_name + TOPIC_SUFFIX
+    # wait for sync
+    zone_meta_checkpoint(ps_zone.zone)
 
-    # create two s3 topic
-    endpoint_address = 'amqp://' + hostname
-    # with acks from broker
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn1 = topic_conf1.set_config()
-    # without acks from broker
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
-    topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn2 = topic_conf2.set_config()
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    opaque_data = 'http://1.2.3.4:8888'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
+    topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+    result, status = topic_conf.set_config()
+    assert_equal(status/100, 2)
+    parsed_result = json.loads(result)
+    topic_arn = parsed_result['arn']
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
-                         'Events': []
-                       },
-                       {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
-                         'Events': ['s3:ObjectCreated:*']
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
                        }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
-    # create objects in the bucket (async)
-    number_of_objects = 100
+    # create objects in the bucket
     client_threads = []
-    start_time = time.time()
+    content = 'bar'
     for i in range(number_of_objects):
         key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
         thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
         thr.start()
         client_threads.append(thr)
     [thr.join() for thr in client_threads] 
 
-    time_diff = time.time() - start_time
-    print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
+    # wait for sync
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
-    # check amqp receiver
+    # check http receiver
     keys = list(bucket.list())
     print('total number of objects: ' + str(len(keys)))
-    receiver1.verify_s3_events(keys, exact_match=True)
-    receiver2.verify_s3_events(keys, exact_match=True)
-    
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    time_diff = time.time() - start_time
-    print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
+    events = http_server.get_and_reset_events()
+    for event in events:
+        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
     
-    # check amqp receiver 1 for deletions
-    receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
-    # check amqp receiver 2 has no deletions
-    try:
-        receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
-    except:
-        pass
-    else:
-        err = 'amqp receiver 2 should have no deletions'
-        assert False, err
-
     # cleanup
-    stop_amqp_receiver(receiver1, task1)
-    stop_amqp_receiver(receiver2, task2)
-    s3_notification_conf.del_config()
-    topic_conf1.del_config()
-    topic_conf2.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
-
-
-def test_ps_s3_persistent_cleanup():
-    """ test reservation cleanup after gateway crash """
-    return SkipTest("only used in manual testing")
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 200
-    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
-
-    gw = master_zone
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = gw.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-    topic_conf = PSTopicS3(gw.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
-        'Events': ['s3:ObjectCreated:Put']
-        }]
-    s3_notification_conf = PSNotificationS3(gw.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    # stop gateway while clients are sending
-    os.system("killall -9 radosgw");
-    zonegroup.master_zone.gateways[0].stop()
-    print('wait for 10 sec for before restarting the gateway')
-    time.sleep(10)
-    zonegroup.master_zone.gateways[0].start()
-    [thr.join() for thr in client_threads] 
-   
-    keys = list(bucket.list())
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    # check http receiver
-    events = http_server.get_and_reset_events()
-
-    print(str(len(events) ) + " events found out of " + str(number_of_objects))
-    
-    # make sure that things are working now
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-   
-    keys = list(bucket.list())
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    print('wait for 180 sec for reservations to be stale before queue deletion')
-    time.sleep(180)
-
-    # check http receiver
-    events = http_server.get_and_reset_events()
-
-    print(str(len(events)) + " events found out of " + str(number_of_objects))
-
-    # cleanup
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    gw.delete_bucket(bucket_name)
-    http_server.close()
-
-
-def test_ps_s3_persistent_gateways_recovery():
-    """ test gateway recovery of persistent notifications """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    if len(zonegroup.master_zone.gateways) < 2:
-        return SkipTest("this test requires two gateways")
-
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
-
-    gw1 = master_zone
-    gw2 = zonegroup.master_zone.gateways[1]
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = gw1.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    
-    # create two s3 topics
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-    topic_conf1 = PSTopicS3(gw1.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args+'&OpaqueData=fromgw1')
-    topic_arn1 = topic_conf1.set_config()
-    topic_conf2 = PSTopicS3(gw2.connection, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args+'&OpaqueData=fromgw2')
-    topic_arn2 = topic_conf2.set_config()
-
-    # create two s3 notifications
-    notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
-        'Events': ['s3:ObjectCreated:Put']
-        }]
-    s3_notification_conf1 = PSNotificationS3(gw1.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf1.set_config()
-    assert_equal(status/100, 2)
-    notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
-        'Events': ['s3:ObjectRemoved:Delete']
-        }]
-    s3_notification_conf2 = PSNotificationS3(gw2.connection, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf2.set_config()
-    assert_equal(status/100, 2)
-
-    # stop gateway 2
-    print('stopping gateway2...')
-    gw2.stop()
-
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-   
-    keys = list(bucket.list())
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    print('wait for 60 sec for before restarting the gateway')
-    time.sleep(60)
-    gw2.start()
-    
-    # check http receiver
-    events = http_server.get_and_reset_events()
-    for key in keys:
-        creations = 0
-        deletions = 0
-        for event in events:
-            if event['Records'][0]['eventName'] == 's3:ObjectCreated:Put' and \
-                    key.name == event['Records'][0]['s3']['object']['key']:
-                creations += 1
-            elif event['Records'][0]['eventName'] == 's3:ObjectRemoved:Delete' and \
-                    key.name == event['Records'][0]['s3']['object']['key']:
-                deletions += 1
-        assert_equal(creations, 1)
-        assert_equal(deletions, 1)
-
-    # cleanup
-    s3_notification_conf1.del_config()
-    topic_conf1.del_config()
-    gw1.delete_bucket(bucket_name)
-    time.sleep(10)
-    s3_notification_conf2.del_config()
-    topic_conf2.del_config()
-    http_server.close()
-
-
-def test_ps_s3_persistent_multiple_gateways():
-    """ test pushing persistent notification via two gateways """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    if len(zonegroup.master_zone.gateways) < 2:
-        return SkipTest("this test requires two gateways")
-
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
-
-    gw1 = master_zone
-    gw2 = zonegroup.master_zone.gateways[1]
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket1 = gw1.create_bucket(bucket_name)
-    bucket2 = gw2.connection.get_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    
-    # create two s3 topics
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-    topic1_opaque = 'fromgw1'
-    topic_conf1 = PSTopicS3(gw1.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args+'&OpaqueData='+topic1_opaque)
-    topic_arn1 = topic_conf1.set_config()
-    topic2_opaque = 'fromgw2'
-    topic_conf2 = PSTopicS3(gw2.connection, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args+'&OpaqueData='+topic2_opaque)
-    topic_arn2 = topic_conf2.set_config()
-
-    # create two s3 notifications
-    notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
-                         'Events': []
-                       }]
-    s3_notification_conf1 = PSNotificationS3(gw1.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf1.set_config()
-    assert_equal(status/100, 2)
-    notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
-                         'Events': []
-                       }]
-    s3_notification_conf2 = PSNotificationS3(gw2.connection, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf2.set_config()
-    assert_equal(status/100, 2)
-
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket1.new_key('gw1_'+str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-        key = bucket2.new_key('gw2_'+str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    keys = list(bucket1.list())
-
-    delay = 30
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-
-    events = http_server.get_and_reset_events()
-    for key in keys:
-        topic1_count = 0
-        topic2_count = 0
-        for event in events:
-            if event['Records'][0]['eventName'] == 's3:ObjectCreated:Put' and \
-                    key.name == event['Records'][0]['s3']['object']['key'] and \
-                    topic1_opaque == event['Records'][0]['opaqueData']:
-                topic1_count += 1
-            elif event['Records'][0]['eventName'] == 's3:ObjectCreated:Put' and \
-                    key.name == event['Records'][0]['s3']['object']['key'] and \
-                    topic2_opaque == event['Records'][0]['opaqueData']:
-                topic2_count += 1
-        assert_equal(topic1_count, 1)
-        assert_equal(topic2_count, 1)
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket1.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-
-    events = http_server.get_and_reset_events()
-    for key in keys:
-        topic1_count = 0
-        topic2_count = 0
-        for event in events:
-            if event['Records'][0]['eventName'] == 's3:ObjectRemoved:Delete' and \
-                    key.name == event['Records'][0]['s3']['object']['key'] and \
-                    topic1_opaque == event['Records'][0]['opaqueData']:
-                topic1_count += 1
-            elif event['Records'][0]['eventName'] == 's3:ObjectRemoved:Delete' and \
-                    key.name == event['Records'][0]['s3']['object']['key'] and \
-                    topic2_opaque == event['Records'][0]['opaqueData']:
-                topic2_count += 1
-        assert_equal(topic1_count, 1)
-        assert_equal(topic2_count, 1)
-    
-    # cleanup
-    s3_notification_conf1.del_config()
-    topic_conf1.del_config()
-    s3_notification_conf2.del_config()
-    topic_conf2.del_config()
-    gw1.delete_bucket(bucket_name)
-    http_server.close()
-
-
-def test_ps_s3_persistent_multiple_endpoints():
-    """ test pushing persistent notification when one of the endpoints has error """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
-
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    
-    # create two s3 topics
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-    topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn1 = topic_conf1.set_config()
-    endpoint_address = 'http://kaboom:9999'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-    topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn2 = topic_conf2.set_config()
-
-    # create two s3 notifications
-    notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
-                         'Events': []
-                       }]
-    s3_notification_conf1 = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf1.set_config()
-    assert_equal(status/100, 2)
-    notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
-                         'Events': []
-                       }]
-    s3_notification_conf2 = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf2.set_config()
-    assert_equal(status/100, 2)
-
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    keys = list(bucket.list())
-
-    delay = 30
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-    
-    http_server.verify_s3_events(keys, exact_match=False, deletions=False)
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-    
-    http_server.verify_s3_events(keys, exact_match=False, deletions=True)
-
-    # cleanup
-    s3_notification_conf1.del_config()
-    topic_conf1.del_config()
-    s3_notification_conf2.del_config()
-    topic_conf2.del_config()
-    master_zone.delete_bucket(bucket_name)
-    http_server.close()
-
-
-def persistent_notification(endpoint_type):
-    """ test pushing persistent notification """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    receiver = {}
-    host = get_ip()
-    if endpoint_type == 'http':
-        # create random port for the http server
-        port = random.randint(10000, 20000)
-        # start an http server in a separate thread
-        receiver = StreamingHTTPServer(host, port, num_workers=10)
-        endpoint_address = 'http://'+host+':'+str(port)
-        endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-        # the http server does not guarantee order, so duplicates are expected
-        exact_match = False
-    elif endpoint_type == 'amqp':
-        proc = init_rabbitmq()
-        if proc is None:
-            return SkipTest('end2end amqp tests require rabbitmq-server installed')
-        # start amqp receiver
-        exchange = 'ex1'
-        task, receiver = create_amqp_receiver_thread(exchange, topic_name)
-        task.start()
-        endpoint_address = 'amqp://' + host
-        endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true'
-        # amqp broker guarantee ordering
-        exact_match = True
-    else:
-        return SkipTest('Unknown endpoint type: ' + endpoint_type)
-
-
-    # create s3 topic
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
-                         'Events': []
-                       }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket (async)
-    number_of_objects = 100
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    time_diff = time.time() - start_time
-    print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-    
-    keys = list(bucket.list())
-
-    delay = 40
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-
-    receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    time_diff = time.time() - start_time
-    print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-    
-    receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
-
-    # cleanup
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    if endpoint_type == 'http':
-        receiver.close()
-    else:
-        stop_amqp_receiver(receiver, task)
-        clean_rabbitmq(proc)
-
-
-def test_ps_s3_persistent_notification_http():
-    """ test pushing persistent notification http """
-    persistent_notification('http')
-
-
-def test_ps_s3_persistent_notification_amqp():
-    """ test pushing persistent notification amqp """
-    persistent_notification('amqp')
-
-
-def random_string(length):
-    import string
-    letters = string.ascii_letters
-    return ''.join(random.choice(letters) for i in range(length))
-
-
-def test_ps_s3_persistent_notification_large():
-    """ test pushing persistent notification of large notifications """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    receiver = {}
-    host = get_ip()
-    proc = init_rabbitmq()
-    if proc is None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    # start amqp receiver
-    exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
-    task.start()
-    endpoint_address = 'amqp://' + host
-    opaque_data = random_string(1024*2)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true'
-    # amqp broker guarantee ordering
-    exact_match = True
-
-    # create s3 topic
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
-                         'Events': []
-                       }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket (async)
-    number_of_objects = 100
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key_value = random_string(63)
-        key = bucket.new_key(key_value)
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    time_diff = time.time() - start_time
-    print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-    
-    keys = list(bucket.list())
-
-    delay = 40
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-
-    receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    time_diff = time.time() - start_time
-    print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-    
-    receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
-
-    # cleanup
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    stop_amqp_receiver(receiver, task)
-    clean_rabbitmq(proc)
-
-
-def test_ps_s3_persistent_notification_pushback():
-    """ test pushing persistent notification pushback """
-    return SkipTest("only used in manual testing")
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    http_server = StreamingHTTPServer(host, port, num_workers=10, delay=0.5)
-
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
-                         'Events': []
-                       }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket (async)
-    for j in range(100):
-        number_of_objects = randint(500, 1000)
-        client_threads = []
-        start_time = time.time()
-        for i in range(number_of_objects):
-            key = bucket.new_key(str(j)+'-'+str(i))
-            content = str(os.urandom(1024*1024))
-            thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-            thr.start()
-            client_threads.append(thr)
-        [thr.join() for thr in client_threads]
-        time_diff = time.time() - start_time
-        print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-    
-    keys = list(bucket.list())
-
-    delay = 30
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    count = 0
-    for key in bucket.list():
-        count += 1
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-        if count%100 == 0:
-            [thr.join() for thr in client_threads]
-            time_diff = time.time() - start_time
-            print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-            client_threads = []
-            start_time = time.time()
-
-    print('wait for '+str(delay)+'sec for the messages...')
-    time.sleep(delay)
-    
-    # cleanup
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    time.sleep(delay)
-    http_server.close()
-
-
-def test_ps_s3_notification_push_kafka():
-    """ test pushing kafka s3 notification on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    kafka_proc, zk_proc, kafka_log = init_kafka()
-    if kafka_proc is  None or zk_proc is None:
-        return SkipTest('end2end kafka tests require kafka/zookeeper installed')
-
-    master_zone, ps_zone = init_env()
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    # wait for sync
-    zone_meta_checkpoint(ps_zone.zone)
-    # name is constant for manual testing
-    topic_name = bucket_name+'_topic'
-    # create consumer on the topic
-    task, receiver = create_kafka_receiver_thread(topic_name)
-    task.start()
-
-    # create topic
-    topic_conf = PSTopic(ps_zone.conn, topic_name,
-                         endpoint='kafka://' + kafka_server,
-                         endpoint_args='kafka-ack-level=broker')
-    result, status = topic_conf.set_config()
-    assert_equal(status/100, 2)
-    parsed_result = json.loads(result)
-    topic_arn = parsed_result['arn']
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
-                         'Events': []
-                       }]
-
-    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket (async)
-    number_of_objects = 10
-    client_threads = []
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    # wait for sync
-    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
-    keys = list(bucket.list())
-    receiver.verify_s3_events(keys, exact_match=True)
-
-    # delete objects from the bucket
-    client_threads = []
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
-    receiver.verify_s3_events(keys, exact_match=True, deletions=True)
-    
-    # cleanup
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    stop_kafka_receiver(receiver, task)
-    clean_kafka(kafka_proc, zk_proc, kafka_log)
-
-
-def test_ps_s3_notification_push_kafka_on_master():
-    """ test pushing kafka s3 notification on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    kafka_proc, zk_proc, kafka_log = init_kafka()
-    if kafka_proc is None or zk_proc is None:
-        return SkipTest('end2end kafka tests require kafka/zookeeper installed')
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    # name is constant for manual testing
-    topic_name = bucket_name+'_topic'
-    # create consumer on the topic
-    task, receiver = create_kafka_receiver_thread(topic_name+'_1')
-    task.start()
-
-    # create s3 topic
-    endpoint_address = 'kafka://' + kafka_server
-    # without acks from broker
-    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
-    topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn1 = topic_conf1.set_config()
-    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
-    topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn2 = topic_conf2.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
-                         'Events': []
-                       },
-                       {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
-                         'Events': []
-                       }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket (async)
-    number_of_objects = 10
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    time_diff = time.time() - start_time
-    print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    keys = list(bucket.list())
-    receiver.verify_s3_events(keys, exact_match=True)
-
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    time_diff = time.time() - start_time
-    print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    receiver.verify_s3_events(keys, exact_match=True, deletions=True)
-    
-    # cleanup
-    s3_notification_conf.del_config()
-    topic_conf1.del_config()
-    topic_conf2.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    stop_kafka_receiver(receiver, task)
-    clean_kafka(kafka_proc, zk_proc, kafka_log)
-
-
-def kafka_security(security_type):
-    """ test pushing kafka s3 notification on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    master_zone, _ = init_env(require_ps=False)
-    if security_type == 'SSL_SASL' and master_zone.secure_conn is None:
-        return SkipTest("secure connection is needed to test SASL_SSL security")
-    kafka_proc, zk_proc, kafka_log = init_kafka()
-    if kafka_proc is None or zk_proc is None:
-        return SkipTest('end2end kafka tests require kafka/zookeeper installed')
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    # name is constant for manual testing
-    topic_name = bucket_name+'_topic'
-    # create consumer on the topic
-    task, receiver = create_kafka_receiver_thread(topic_name)
-    task.start()
-
-    # create s3 topic
-    if security_type == 'SSL_SASL':
-        endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
-    else:
-        # ssl only
-        endpoint_address = 'kafka://' + kafka_server + ':9093'
-
-    KAFKA_DIR = os.environ['KAFKA_DIR']
-
-    # without acks from broker, with root CA
-    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt'
-
-    if security_type == 'SSL_SASL':
-        topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    else:
-        topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
-                         'Events': []
-                       }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    s3_notification_conf.set_config()
-
-    # create objects in the bucket (async)
-    number_of_objects = 10
-    client_threads = []
-    start_time = time.time()
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        content = str(os.urandom(1024*1024))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    time_diff = time.time() - start_time
-    print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    try:
-        print('wait for 5sec for the messages...')
-        time.sleep(5)
-        keys = list(bucket.list())
-        receiver.verify_s3_events(keys, exact_match=True)
-
-        # delete objects from the bucket
-        client_threads = []
-        start_time = time.time()
-        for key in bucket.list():
-            thr = threading.Thread(target = key.delete, args=())
-            thr.start()
-            client_threads.append(thr)
-        [thr.join() for thr in client_threads] 
-        
-        time_diff = time.time() - start_time
-        print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-        print('wait for 5sec for the messages...')
-        time.sleep(5)
-        receiver.verify_s3_events(keys, exact_match=True, deletions=True)
-    except Exception as err:
-        assert False, str(err)
-    finally: 
-        # cleanup
-        s3_notification_conf.del_config()
-        topic_conf.del_config()
-        # delete the bucket
-        for key in bucket.list():
-            key.delete()
-        master_zone.delete_bucket(bucket_name)
-        stop_kafka_receiver(receiver, task)
-        clean_kafka(kafka_proc, zk_proc, kafka_log)
-
-
-def test_ps_s3_notification_push_kafka_security_ssl():
-    kafka_security('SSL')
-
-def test_ps_s3_notification_push_kafka_security_ssl_sasl():
-    kafka_security('SSL_SASL')
-
-
-def test_ps_s3_notification_multi_delete_on_master():
-    """ test deletion of multiple keys on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    master_zone, _  = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectRemoved:*']
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket
-    client_threads = []
-    objects_size = {}
-    for i in range(number_of_objects):
-        content = str(os.urandom(randint(1, 1024)))
-        object_size = len(content)
-        key = bucket.new_key(str(i))
-        objects_size[key.name] = object_size
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    keys = list(bucket.list())
-
-    start_time = time.time()
-    delete_all_objects(master_zone.conn, bucket_name)
-    time_diff = time.time() - start_time
-    print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-   
-    # check http receiver
-    http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
-    
-    # cleanup
-    topic_conf.del_config()
-    s3_notification_conf.del_config(notification=notification_name)
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    http_server.close()
-
-
-def test_ps_s3_notification_push_http_on_master():
-    """ test pushing http s3 notification on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': []
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket
-    client_threads = []
-    objects_size = {}
-    start_time = time.time()
-    for i in range(number_of_objects):
-        content = str(os.urandom(randint(1, 1024)))
-        object_size = len(content)
-        key = bucket.new_key(str(i))
-        objects_size[key.name] = object_size
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    time_diff = time.time() - start_time
-    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    
-    # check http receiver
-    keys = list(bucket.list())
-    http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
-    
-    # delete objects from the bucket
-    client_threads = []
-    start_time = time.time()
-    for key in bucket.list():
-        thr = threading.Thread(target = key.delete, args=())
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-    
-    time_diff = time.time() - start_time
-    print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    
-    # check http receiver
-    http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
-    
-    # cleanup
-    topic_conf.del_config()
-    s3_notification_conf.del_config(notification=notification_name)
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    http_server.close()
-
-
-def test_ps_s3_opaque_data():
-    """ test that opaque id set in topic, is sent in notification """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    master_zone, ps_zone = init_env()
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    # wait for sync
-    zone_meta_checkpoint(ps_zone.zone)
-
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    opaque_data = 'http://1.2.3.4:8888'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
-    topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
-    result, status = topic_conf.set_config()
-    assert_equal(status/100, 2)
-    parsed_result = json.loads(result)
-    topic_arn = parsed_result['arn']
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': []
-                       }]
-    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket
-    client_threads = []
-    content = 'bar'
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    # wait for sync
-    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
-
-    # check http receiver
-    keys = list(bucket.list())
-    print('total number of objects: ' + str(len(keys)))
-    events = http_server.get_and_reset_events()
-    for event in events:
-        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
-    
-    # cleanup
-    for key in keys:
-        key.delete()
-    [thr.join() for thr in client_threads] 
-    topic_conf.del_config()
-    s3_notification_conf.del_config(notification=notification_name)
+    for key in keys:
+        key.delete()
+    [thr.join() for thr in client_threads] 
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket
     master_zone.delete_bucket(bucket_name)
     http_server.close()
 
 
-def test_ps_s3_opaque_data_on_master():
-    """ test that opaque id set in topic, is sent in notification on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create random port for the http server
-    host = get_ip()
-    port = random.randint(10000, 20000)
-    # start an http server in a separate thread
-    number_of_objects = 10
-    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # create s3 topic
-    endpoint_address = 'http://'+host+':'+str(port)
-    endpoint_args = 'push-endpoint='+endpoint_address
-    opaque_data = 'http://1.2.3.4:8888'
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': []
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket
-    client_threads = []
-    start_time = time.time()
-    content = 'bar'
-    for i in range(number_of_objects):
-        key = bucket.new_key(str(i))
-        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
-        thr.start()
-        client_threads.append(thr)
-    [thr.join() for thr in client_threads] 
-
-    time_diff = time.time() - start_time
-    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    
-    # check http receiver
-    keys = list(bucket.list())
-    print('total number of objects: ' + str(len(keys)))
-    events = http_server.get_and_reset_events()
-    for event in events:
-        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
-    
-    # cleanup
-    for key in keys:
-        key.delete()
-    [thr.join() for thr in client_threads] 
-    topic_conf.del_config()
-    s3_notification_conf.del_config(notification=notification_name)
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    http_server.close()
-
 def test_ps_topic():
     """ test set/get/delete of topic """
     _, ps_zone = init_env()
@@ -3422,331 +1745,32 @@ def test_ps_event_acking():
 
 def test_ps_creation_triggers():
     """ test object creation notifications in using put/copy/post """
-    master_zone, ps_zone = init_env()
-    bucket_name = gen_bucket_name()
-    topic_name = bucket_name+TOPIC_SUFFIX
-
-    # create topic
-    topic_conf = PSTopic(ps_zone.conn, topic_name)
-    topic_conf.set_config()
-    # create bucket on the first of the rados zones
-    bucket = master_zone.create_bucket(bucket_name)
-    # wait for sync
-    zone_meta_checkpoint(ps_zone.zone)
-    # create notifications
-    notification_conf = PSNotification(ps_zone.conn, bucket_name,
-                                       topic_name)
-    _, status = notification_conf.set_config()
-    assert_equal(status/100, 2)
-    # create subscription
-    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
-                              topic_name)
-    _, status = sub_conf.set_config()
-    assert_equal(status/100, 2)
-    # create objects in the bucket using PUT
-    key = bucket.new_key('put')
-    key.set_contents_from_string('bar')
-    # create objects in the bucket using COPY
-    bucket.copy_key('copy', bucket.name, key.name)
-
-    # create objects in the bucket using multi-part upload
-    fp = tempfile.NamedTemporaryFile(mode='w+b')
-    object_size = 1024
-    content = bytearray(os.urandom(object_size))
-    fp.write(content)
-    fp.flush()
-    fp.seek(0)
-    uploader = bucket.initiate_multipart_upload('multipart')
-    uploader.upload_part_from_file(fp, 1)
-    uploader.complete_upload()
-    fp.close()
-    
-    # wait for sync
-    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
-
-    # get the create events from the subscription
-    result, _ = sub_conf.get_events()
-    events = json.loads(result)
-    for event in events['events']:
-        log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
-
-    # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
-    assert len(events['events']) >= 3
-    # cleanup
-    sub_conf.del_config()
-    notification_conf.del_config()
-    topic_conf.del_config()
-    for key in bucket.list():
-        key.delete()
-    master_zone.delete_bucket(bucket_name)
-
-def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
-    """ test object creation s3 notifications in using put/copy/post on master"""
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    if not external_endpoint_address:
-        hostname = get_ip()
-        proc = init_rabbitmq()
-        if proc is  None:
-            return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    else:
-        proc = None
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # start amqp receiver
-    exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location)
-    task.start()
-
-    # create s3 topic
-    if external_endpoint_address:
-        endpoint_address = external_endpoint_address
-    elif ca_location:
-        endpoint_address = 'amqps://' + hostname
-    else:
-        endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl
-    if ca_location:
-        endpoint_args += '&ca-location={}'.format(ca_location)
-    if external_endpoint_address:
-        topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    else:
-        topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
-                       }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket using PUT
-    key = bucket.new_key('put')
-    key.set_contents_from_string('bar')
-    # create objects in the bucket using COPY
-    bucket.copy_key('copy', bucket.name, key.name)
-
-    # create objects in the bucket using multi-part upload
-    fp = tempfile.NamedTemporaryFile(mode='w+b')
-    object_size = 10*1024*1024
-    content = bytearray(os.urandom(object_size))
-    fp.write(content)
-    fp.flush()
-    fp.seek(0)
-    uploader = bucket.initiate_multipart_upload('multipart')
-    uploader.upload_part_from_file(fp, 1)
-    uploader.complete_upload()
-    fp.close()
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-
-    # check amqp receiver
-    keys = list(bucket.list())
-    receiver.verify_s3_events(keys, exact_match=True)
-
-    # cleanup
-    stop_amqp_receiver(receiver, task)
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    for key in bucket.list():
-        key.delete()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    if proc:
-        clean_rabbitmq(proc)
-
-
-def test_ps_s3_creation_triggers_on_master():
-    ps_s3_creation_triggers_on_master()
-
-
-def test_ps_s3_creation_triggers_on_master_external():
-    from distutils.util import strtobool
-
-    if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
-        try:
-            if strtobool(os.environ['AMQP_VERIFY_SSL']):
-                verify_ssl = 'true'
-            else:
-                verify_ssl = 'false'
-        except Exception as e:
-            verify_ssl = 'true'
-
-        ps_s3_creation_triggers_on_master(
-            external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
-            verify_ssl=verify_ssl)
-    else:
-        return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
-
-def test_ps_s3_creation_triggers_on_master_ssl():
-    import datetime
-    import textwrap
-    import stat
-    from cryptography import x509
-    from cryptography.x509.oid import NameOID
-    from cryptography.hazmat.primitives import hashes
-    from cryptography.hazmat.backends import default_backend
-    from cryptography.hazmat.primitives import serialization
-    from cryptography.hazmat.primitives.asymmetric import rsa
-    from tempfile import TemporaryDirectory
-
-    with TemporaryDirectory() as tempdir:
-        # modify permissions to ensure that the rabbitmq user can access them
-        os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
-        CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
-        CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
-        KEYFILE = os.path.join(tempdir, 'server_key.pem')
-        RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config')
-
-        root_key = rsa.generate_private_key(
-            public_exponent=65537,
-            key_size=2048,
-            backend=default_backend()
-        )
-        subject = issuer = x509.Name([
-            x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
-            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
-            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
-            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
-            x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
-        ])
-        root_cert = x509.CertificateBuilder().subject_name(
-            subject
-        ).issuer_name(
-            issuer
-        ).public_key(
-            root_key.public_key()
-        ).serial_number(
-            x509.random_serial_number()
-        ).not_valid_before(
-            datetime.datetime.utcnow()
-        ).not_valid_after(
-            datetime.datetime.utcnow() + datetime.timedelta(days=3650)
-        ).add_extension(
-            x509.BasicConstraints(ca=True, path_length=None), critical=True
-        ).sign(root_key, hashes.SHA256(), default_backend())
-        with open(CACERTFILE, "wb") as f:
-            f.write(root_cert.public_bytes(serialization.Encoding.PEM))
-
-        # Now we want to generate a cert from that root
-        cert_key = rsa.generate_private_key(
-            public_exponent=65537,
-            key_size=2048,
-            backend=default_backend()
-        )
-        with open(KEYFILE, "wb") as f:
-            f.write(cert_key.private_bytes(
-                encoding=serialization.Encoding.PEM,
-                format=serialization.PrivateFormat.TraditionalOpenSSL,
-                encryption_algorithm=serialization.NoEncryption(),
-            ))
-        new_subject = x509.Name([
-            x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
-            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
-            x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
-            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
-        ])
-        cert = x509.CertificateBuilder().subject_name(
-            new_subject
-        ).issuer_name(
-            root_cert.issuer
-        ).public_key(
-            cert_key.public_key()
-        ).serial_number(
-            x509.random_serial_number()
-        ).not_valid_before(
-            datetime.datetime.utcnow()
-        ).not_valid_after(
-        datetime.datetime.utcnow() + datetime.timedelta(days=30)
-        ).add_extension(
-            x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
-            critical=False,
-        ).sign(root_key, hashes.SHA256(), default_backend())
-        # Write our certificate out to disk.
-        with open(CERTFILE, "wb") as f:
-            f.write(cert.public_bytes(serialization.Encoding.PEM))
-
-        with open(RABBITMQ_CONF_FILE, "w") as f:
-            # use the old style config format to ensure it also runs on older RabbitMQ versions.
-            f.write(textwrap.dedent(f'''
-                [
-                    {{rabbit, [
-                        {{ssl_listeners, [5671]}},
-                        {{ssl_options, [{{cacertfile,           "{CACERTFILE}"}},
-                                        {{certfile,             "{CERTFILE}"}},
-                                        {{keyfile,              "{KEYFILE}"}},
-                                        {{verify,               verify_peer}},
-                                        {{fail_if_no_peer_cert, false}}]}}]}}
-                ].
-            '''))
-        os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0]
-
-        ps_s3_creation_triggers_on_master(ca_location=CACERTFILE)
-
-        del os.environ['RABBITMQ_CONFIG_FILE']
-
-
-def test_ps_s3_multipart_on_master():
-    """ test multipart object upload on master"""
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # start amqp receivers
-    exchange = 'ex1'
-    task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
-    task1.start()
-    task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
-    task2.start()
-    task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
-    task3.start()
+    master_zone, ps_zone = init_env()
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name+TOPIC_SUFFIX
 
-    # create s3 topics
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
-    topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn1 = topic_conf1.set_config()
-    topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn2 = topic_conf2.set_config()
-    topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn3 = topic_conf3.set_config()
-
-    # create s3 notifications
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
-                        'Events': ['s3:ObjectCreated:*']
-                       },
-                       {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
-                        'Events': ['s3:ObjectCreated:Post']
-                       },
-                       {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
-                        'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
+    # create topic
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
+    topic_conf.set_config()
+    # create bucket on the first of the rados zones
+    bucket = master_zone.create_bucket(bucket_name)
+    # wait for sync
+    zone_meta_checkpoint(ps_zone.zone)
+    # create notifications
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
+                                       topic_name)
+    _, status = notification_conf.set_config()
+    assert_equal(status/100, 2)
+    # create subscription
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
+                              topic_name)
+    _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
+    # create objects in the bucket using PUT
+    key = bucket.new_key('put')
+    key.set_contents_from_string('bar')
+    # create objects in the bucket using COPY
+    bucket.copy_key('copy', bucket.name, key.name)
 
     # create objects in the bucket using multi-part upload
     fp = tempfile.NamedTemporaryFile(mode='w+b')
@@ -3759,38 +1783,25 @@ def test_ps_s3_multipart_on_master():
     uploader.upload_part_from_file(fp, 1)
     uploader.complete_upload()
     fp.close()
+    
+    # wait for sync
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-
-    # check amqp receiver
-    events = receiver1.get_and_reset_events()
-    assert_equal(len(events), 3)
-
-    events = receiver2.get_and_reset_events()
-    assert_equal(len(events), 1)
-    assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post')
-    assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2')
-
-    events = receiver3.get_and_reset_events()
-    assert_equal(len(events), 1)
-    assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
-    assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
-    print(events[0]['Records'][0]['s3']['object']['size'])
+    # get the create events from the subscription
+    result, _ = sub_conf.get_events()
+    events = json.loads(result)
+    for event in events['events']:
+        log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
 
+    # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
+    assert len(events['events']) >= 3
     # cleanup
-    stop_amqp_receiver(receiver1, task1)
-    stop_amqp_receiver(receiver2, task2)
-    stop_amqp_receiver(receiver3, task3)
-    s3_notification_conf.del_config()
-    topic_conf1.del_config()
-    topic_conf2.del_config()
-    topic_conf3.del_config()
+    sub_conf.del_config()
+    notification_conf.del_config()
+    topic_conf.del_config()
     for key in bucket.list():
         key.delete()
-    # delete the bucket
     master_zone.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
 
 
 def test_ps_versioned_deletion():
@@ -3891,375 +1902,6 @@ def test_ps_versioned_deletion():
     topic_conf2.del_config()
 
 
-def test_ps_s3_metadata_on_master():
-    """ test s3 notification of metadata on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # start amqp receiver
-    exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
-    task.start()
-
-    # create s3 topic
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    meta_key = 'meta1'
-    meta_value = 'This is my metadata value'
-    meta_prefix = 'x-amz-meta-'
-    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
-        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
-        'Filter': {
-            'Metadata': {
-                'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}]
-            }
-        }
-    }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    expected_keys = []
-    # create objects in the bucket
-    key_name = 'foo'
-    key = bucket.new_key(key_name)
-    key.set_metadata(meta_key, meta_value)
-    key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
-    expected_keys.append(key_name)
-    
-    # create objects in the bucket using COPY
-    key_name = 'copy_of_foo'
-    bucket.copy_key(key_name, bucket.name, key.name)
-    expected_keys.append(key_name)
-
-    # create another objects in the bucket using COPY
-    # but override the metadata value
-    key_name = 'another_copy_of_foo'
-    bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
-
-    # create objects in the bucket using multi-part upload
-    fp = tempfile.NamedTemporaryFile(mode='w+b')
-    chunk_size = 1024*1024*5 # 5MB
-    object_size = 10*chunk_size
-    content = bytearray(os.urandom(object_size))
-    fp.write(content)
-    fp.flush()
-    fp.seek(0)
-    key_name = 'multipart_foo'
-    uploader = bucket.initiate_multipart_upload(key_name,
-                                                metadata={meta_key: meta_value})
-    for i in range(1,5):
-        uploader.upload_part_from_file(fp, i, size=chunk_size)
-        fp.seek(i*chunk_size)
-    uploader.complete_upload()
-    fp.close()
-    expected_keys.append(key_name)
-    
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    # check amqp receiver
-    events = receiver.get_and_reset_events()
-    assert_equal(len(events), 4) # PUT, COPY, Multipart start, Multipart End
-    for event in events:
-        assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
-
-    # delete objects
-    for key in bucket.list():
-        key.delete()
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    # check amqp receiver
-    #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys))
-
-    # all 3 object has metadata when deleted
-    assert_equal(event_count, 3)
-
-    # cleanup
-    stop_amqp_receiver(receiver, task)
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
-
-
-def test_ps_s3_tags_on_master():
-    """ test s3 notification of tags on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    proc = init_rabbitmq()
-    if proc is None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # start amqp receiver
-    exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
-    task.start()
-
-    # create s3 topic
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
-        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
-        'Filter': {
-            'Tags': {
-                'FilterRules': [{'Name': 'hello', 'Value': 'world'}]
-            }
-        }
-    }]
-
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket with tags
-    tags = 'hello=world&ka=boom'
-    key_name1 = 'key1'
-    put_object_tagging(master_zone.conn, bucket_name, key_name1, tags)
-    tags = 'foo=bar&ka=boom'
-    key_name2 = 'key2'
-    put_object_tagging(master_zone.conn, bucket_name, key_name2, tags)
-    key_name3 = 'key3'
-    key = bucket.new_key(key_name3)
-    key.set_contents_from_string('bar')
-    # create objects in the bucket using COPY
-    bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1)
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
-    # check amqp receiver
-    filtered_count = 0
-    for event in receiver.get_and_reset_events():
-        obj_tags =  event['Records'][0]['s3']['object']['tags']
-        assert_equal(obj_tags[0], expected_tags[0])
-        filtered_count += 1
-    assert_equal(filtered_count, 2)
-    
-    # delete the objects
-    for key in bucket.list():
-        key.delete()
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-    # check amqp receiver
-    filtered_count = 0
-    for event in receiver.get_and_reset_events():
-        obj_tags =  event['Records'][0]['s3']['object']['tags']
-        assert_equal(obj_tags[0], expected_tags[0])
-        filtered_count += 1
-    assert_equal(filtered_count, 2)
-
-    # cleanup
-    stop_amqp_receiver(receiver, task)
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
-
-
-def test_ps_s3_versioning_on_master():
-    """ test s3 notification of object versions """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    proc = init_rabbitmq()
-    if proc is None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    bucket.configure_versioning(True)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # start amqp receiver
-    exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
-    task.start()
-
-    # create s3 topic
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
-                        'Events': []
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    _, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket
-    key_value = 'foo'
-    key = bucket.new_key(key_value)
-    key.set_contents_from_string('hello')
-    ver1 = key.version_id
-    key.set_contents_from_string('world')
-    ver2 = key.version_id
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-
-    # check amqp receiver
-    events = receiver.get_and_reset_events()
-    num_of_versions = 0
-    for event_list in events:
-        for event in event_list['Records']:
-            assert_equal(event['s3']['object']['key'], key_value)
-            version = event['s3']['object']['versionId']
-            num_of_versions += 1
-            if version not in (ver1, ver2):
-                print('version mismatch: '+version+' not in: ('+ver1+', '+ver2+')')
-                assert_equal(1, 0)
-            else:
-                print('version ok: '+version+' in: ('+ver1+', '+ver2+')')
-
-    assert_equal(num_of_versions, 2)
-
-    # cleanup
-    stop_amqp_receiver(receiver, task)
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    bucket.delete_key(key.name, version_id=ver2)
-    bucket.delete_key(key.name, version_id=ver1)
-    master_zone.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
-
-
-def test_ps_s3_versioned_deletion_on_master():
-    """ test s3 notification of deletion markers on master """
-    if skip_push_tests:
-        return SkipTest("PubSub push tests don't run in teuthology")
-    hostname = get_ip()
-    proc = init_rabbitmq()
-    if proc is  None:
-        return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    master_zone, _ = init_env(require_ps=False)
-    realm = get_realm()
-    zonegroup = realm.master_zonegroup()
-    
-    # create bucket
-    bucket_name = gen_bucket_name()
-    bucket = master_zone.create_bucket(bucket_name)
-    bucket.configure_versioning(True)
-    topic_name = bucket_name + TOPIC_SUFFIX
-
-    # start amqp receiver
-    exchange = 'ex1'
-    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
-    task.start()
-
-    # create s3 topic
-    endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-    # create s3 notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectRemoved:*']
-                       },
-                       {'Id': notification_name+'_2', 'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
-                       },
-                       {'Id': notification_name+'_3', 'TopicArn': topic_arn,
-                         'Events': ['s3:ObjectRemoved:Delete']
-                       }]
-    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
-    assert_equal(status/100, 2)
-
-    # create objects in the bucket
-    key = bucket.new_key('foo')
-    content = str(os.urandom(512))
-    size1 = len(content)
-    key.set_contents_from_string(content)
-    ver1 = key.version_id
-    content = str(os.urandom(511))
-    size2 = len(content)
-    key.set_contents_from_string(content)
-    ver2 = key.version_id
-    # create delete marker (non versioned deletion)
-    delete_marker_key = bucket.delete_key(key.name)
-    
-    time.sleep(1)
-    
-    # versioned deletion
-    bucket.delete_key(key.name, version_id=ver2)
-    bucket.delete_key(key.name, version_id=ver1)
-
-    print('wait for 5sec for the messages...')
-    time.sleep(5)
-
-    # check amqp receiver
-    events = receiver.get_and_reset_events()
-    delete_events = 0
-    delete_marker_create_events = 0
-    for event_list in events:
-        for event in event_list['Records']:
-            size = event['s3']['object']['size']
-            if event['eventName'] == 's3:ObjectRemoved:Delete':
-                delete_events += 1
-                assert size in [size1, size2]
-                assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
-            if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
-                delete_marker_create_events += 1
-                assert size == size2
-                assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
-   
-    # 2 key versions were deleted
-    # notified over the same topic via 2 notifications (1,3)
-    assert_equal(delete_events, 2*2)
-    # 1 deletion marker was created
-    # notified over the same topic over 2 notifications (1,2)
-    assert_equal(delete_marker_create_events, 1*2)
-
-    # cleanup
-    delete_marker_key.delete()
-    stop_amqp_receiver(receiver, task)
-    s3_notification_conf.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    master_zone.delete_bucket(bucket_name)
-    clean_rabbitmq(proc)
-
-
 def test_ps_push_http():
     """ test pushing to http endpoint """
     if skip_push_tests: