10 from http
import server
as http_server
11 from random
import randint
12 from .tests
import get_realm
, \
14 zonegroup_meta_checkpoint
, \
15 zone_meta_checkpoint
, \
16 zone_bucket_checkpoint
, \
17 zone_data_checkpoint
, \
18 zonegroup_bucket_checkpoint
, \
23 from .zone_ps
import PSTopic
, \
28 print_connection_info
, \
30 from .multisite
import User
31 from nose
import SkipTest
32 from nose
.tools
import assert_not_equal
, assert_equal
33 import boto
.s3
.tagging
35 # configure logging for the tests module
36 log
= logging
.getLogger(__name__
)
38 skip_push_tests
= True
40 ####################################
41 # utility functions for pubsub tests
42 ####################################
44 def set_contents_from_string(key
, content
):
46 key
.set_contents_from_string(content
)
47 except Exception as e
:
48 print('Error: ' + str(e
))
51 # HTTP endpoint functions
52 # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
54 class HTTPPostHandler(http_server
.BaseHTTPRequestHandler
):
55 """HTTP POST hanler class storing the received events in its http server"""
57 """implementation of POST handler"""
59 content_length
= int(self
.headers
['Content-Length'])
60 body
= self
.rfile
.read(content_length
)
61 log
.info('HTTP Server (%d) received event: %s', self
.server
.worker_id
, str(body
))
62 self
.server
.append(json
.loads(body
))
64 log
.error('HTTP Server received empty event')
65 self
.send_response(400)
67 if self
.headers
.get('Expect') == '100-continue':
68 self
.send_response(100)
70 self
.send_response(200)
72 if self
.server
.delay
> 0:
73 time
.sleep(self
.server
.delay
)
77 class HTTPServerWithEvents(http_server
.HTTPServer
):
78 """HTTP server used by the handler to store events"""
79 def __init__(self
, addr
, handler
, worker_id
, delay
=0):
80 http_server
.HTTPServer
.__init
__(self
, addr
, handler
, False)
81 self
.worker_id
= worker_id
85 def append(self
, event
):
86 self
.events
.append(event
)
89 class HTTPServerThread(threading
.Thread
):
90 """thread for running the HTTP server. reusing the same socket for all threads"""
91 def __init__(self
, i
, sock
, addr
, delay
=0):
92 threading
.Thread
.__init
__(self
)
95 self
.httpd
= HTTPServerWithEvents(addr
, HTTPPostHandler
, i
, delay
)
96 self
.httpd
.socket
= sock
97 # prevent the HTTP server from re-binding every handler
98 self
.httpd
.server_bind
= self
.server_close
= lambda self
: None
103 log
.info('HTTP Server (%d) started on: %s', self
.i
, self
.httpd
.server_address
)
104 self
.httpd
.serve_forever()
105 log
.info('HTTP Server (%d) ended', self
.i
)
106 except Exception as error
:
107 # could happen if the server r/w to a closing socket during shutdown
108 log
.info('HTTP Server (%d) ended unexpectedly: %s', self
.i
, str(error
))
111 self
.httpd
.shutdown()
113 def get_events(self
):
114 return self
.httpd
.events
116 def reset_events(self
):
117 self
.httpd
.events
= []
120 class StreamingHTTPServer
:
121 """multi-threaded http server class also holding list of events received into the handler
122 each thread has its own server, and all servers share the same socket"""
123 def __init__(self
, host
, port
, num_workers
=100, delay
=0):
125 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
126 self
.sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
128 self
.sock
.listen(num_workers
)
129 self
.workers
= [HTTPServerThread(i
, self
.sock
, addr
, delay
) for i
in range(num_workers
)]
131 def verify_s3_events(self
, keys
, exact_match
=False, deletions
=False, expected_sizes
={}):
132 """verify stored s3 records agains a list of keys"""
134 for worker
in self
.workers
:
135 events
+= worker
.get_events()
136 worker
.reset_events()
137 verify_s3_records_by_elements(events
, keys
, exact_match
=exact_match
, deletions
=deletions
, expected_sizes
=expected_sizes
)
139 def verify_events(self
, keys
, exact_match
=False, deletions
=False):
140 """verify stored events agains a list of keys"""
142 for worker
in self
.workers
:
143 events
+= worker
.get_events()
144 worker
.reset_events()
145 verify_events_by_elements(events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
147 def get_and_reset_events(self
):
149 for worker
in self
.workers
:
150 events
+= worker
.get_events()
151 worker
.reset_events()
155 """close all workers in the http server and wait for it to finish"""
156 # make sure that the shared socket is closed
157 # this is needed in case that one of the threads is blocked on the socket
158 self
.sock
.shutdown(socket
.SHUT_RDWR
)
160 # wait for server threads to finish
161 for worker
in self
.workers
:
166 # AMQP endpoint functions
169 class AMQPReceiver(object):
170 """class for receiving and storing messages on a topic from the AMQP broker"""
171 def __init__(self
, exchange
, topic
, external_endpoint_address
=None, ca_location
=None):
176 ssl_context
= ssl
.create_default_context()
177 ssl_context
.load_verify_locations(cafile
=ca_location
)
178 ssl_options
= pika
.SSLOptions(ssl_context
)
184 if external_endpoint_address
:
185 params
= pika
.URLParameters(external_endpoint_address
, ssl_options
=ssl_options
)
188 params
= pika
.ConnectionParameters(host
=hostname
, port
=rabbitmq_port
, ssl_options
=ssl_options
)
189 remaining_retries
= 10
190 while remaining_retries
> 0:
192 connection
= pika
.BlockingConnection(params
)
194 except Exception as error
:
195 remaining_retries
-= 1
196 print('failed to connect to rabbitmq (remaining retries '
197 + str(remaining_retries
) + '): ' + str(error
))
200 if remaining_retries
== 0:
201 raise Exception('failed to connect to rabbitmq - no retries left')
203 self
.channel
= connection
.channel()
204 self
.channel
.exchange_declare(exchange
=exchange
, exchange_type
='topic', durable
=True)
205 result
= self
.channel
.queue_declare('', exclusive
=True)
206 queue_name
= result
.method
.queue
207 self
.channel
.queue_bind(exchange
=exchange
, queue
=queue_name
, routing_key
=topic
)
208 self
.channel
.basic_consume(queue
=queue_name
,
209 on_message_callback
=self
.on_message
,
214 def on_message(self
, ch
, method
, properties
, body
):
215 """callback invoked when a new message arrive on the topic"""
216 log
.info('AMQP received event for topic %s:\n %s', self
.topic
, body
)
217 self
.events
.append(json
.loads(body
))
219 # TODO create a base class for the AMQP and HTTP cases
220 def verify_s3_events(self
, keys
, exact_match
=False, deletions
=False):
221 """verify stored s3 records agains a list of keys"""
222 verify_s3_records_by_elements(self
.events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
225 def verify_events(self
, keys
, exact_match
=False, deletions
=False):
226 """verify stored events agains a list of keys"""
227 verify_events_by_elements(self
.events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
230 def get_and_reset_events(self
):
236 def amqp_receiver_thread_runner(receiver
):
237 """main thread function for the amqp receiver"""
239 log
.info('AMQP receiver started')
240 receiver
.channel
.start_consuming()
241 log
.info('AMQP receiver ended')
242 except Exception as error
:
243 log
.info('AMQP receiver ended unexpectedly: %s', str(error
))
246 def create_amqp_receiver_thread(exchange
, topic
, external_endpoint_address
=None, ca_location
=None):
247 """create amqp receiver and thread"""
248 receiver
= AMQPReceiver(exchange
, topic
, external_endpoint_address
, ca_location
)
249 task
= threading
.Thread(target
=amqp_receiver_thread_runner
, args
=(receiver
,))
251 return task
, receiver
254 def stop_amqp_receiver(receiver
, task
):
255 """stop the receiver thread and wait for it to finis"""
257 receiver
.channel
.stop_consuming()
258 log
.info('stopping AMQP receiver')
259 except Exception as error
:
260 log
.info('failed to gracefuly stop AMQP receiver: %s', str(error
))
263 def check_ps_configured():
264 """check if at least one pubsub zone exist"""
266 zonegroup
= realm
.master_zonegroup()
268 ps_zones
= zonegroup
.zones_by_type
.get("pubsub")
270 raise SkipTest("Requires at least one PS zone")
273 def is_ps_zone(zone_conn
):
274 """check if a specific zone is pubsub zone"""
277 return zone_conn
.zone
.tier_type() == "pubsub"
280 def verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=False):
281 """ verify there is at least one event per element """
285 if type(events
) is list:
286 for event_list
in events
:
289 for event
in event_list
['events']:
290 if event
['info']['bucket']['name'] == key
.bucket
.name
and \
291 event
['info']['key']['name'] == key
.name
:
292 if deletions
and event
['event'] == 'OBJECT_DELETE':
295 elif not deletions
and event
['event'] == 'OBJECT_CREATE':
299 for event
in events
['events']:
300 if event
['info']['bucket']['name'] == key
.bucket
.name
and \
301 event
['info']['key']['name'] == key
.name
:
302 if deletions
and event
['event'] == 'OBJECT_DELETE':
305 elif not deletions
and event
['event'] == 'OBJECT_CREATE':
310 err
= 'no ' + ('deletion' if deletions
else 'creation') + ' event found for key: ' + str(key
)
314 if not len(events
) == len(keys
):
315 err
= 'superfluous events are found'
322 def verify_s3_records_by_elements(records
, keys
, exact_match
=False, deletions
=False, expected_sizes
={}):
323 """ verify there is at least one record per element """
328 if type(records
) is list:
329 for record_list
in records
:
332 for record
in record_list
['Records']:
333 if record
['s3']['bucket']['name'] == key
.bucket
.name
and \
334 record
['s3']['object']['key'] == key
.name
:
335 if deletions
and record
['eventName'].startswith('ObjectRemoved'):
337 object_size
= record
['s3']['object']['size']
339 elif not deletions
and record
['eventName'].startswith('ObjectCreated'):
341 object_size
= record
['s3']['object']['size']
344 for record
in records
['Records']:
345 if record
['s3']['bucket']['name'] == key
.bucket
.name
and \
346 record
['s3']['object']['key'] == key
.name
:
347 if deletions
and record
['eventName'].startswith('ObjectRemoved'):
349 object_size
= record
['s3']['object']['size']
351 elif not deletions
and record
['eventName'].startswith('ObjectCreated'):
353 object_size
= record
['s3']['object']['size']
357 err
= 'no ' + ('deletion' if deletions
else 'creation') + ' event found for key: ' + str(key
)
360 assert_equal(object_size
, expected_sizes
.get(key
.name
))
362 if not len(records
) == len(keys
):
363 err
= 'superfluous records are found'
366 for record_list
in records
:
367 for record
in record_list
['Records']:
368 log
.error(str(record
['s3']['bucket']['name']) + ',' + str(record
['s3']['object']['key']))
373 """ start a rabbitmq broker """
375 #port = str(random.randint(20000, 30000))
376 #data_dir = './' + port + '_data'
377 #log_dir = './' + port + '_log'
383 # print('rabbitmq directories already exists')
384 #env = {'RABBITMQ_NODE_PORT': port,
385 # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
386 # 'RABBITMQ_USE_LONGNAME': 'true',
387 # 'RABBITMQ_MNESIA_BASE': data_dir,
388 # 'RABBITMQ_LOG_BASE': log_dir}
389 # TODO: support multiple brokers per host using env
390 # make sure we don't collide with the default
392 proc
= subprocess
.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server'])
393 except Exception as error
:
394 log
.info('failed to execute rabbitmq-server: %s', str(error
))
395 print('failed to execute rabbitmq-server: %s' % str(error
))
397 # TODO add rabbitmq checkpoint instead of sleep
399 return proc
#, port, data_dir, log_dir
402 def clean_rabbitmq(proc
): #, data_dir, log_dir)
403 """ stop the rabbitmq broker """
405 subprocess
.call(['sudo', 'rabbitmqctl', 'stop'])
409 log
.info('rabbitmq server already terminated')
410 # TODO: add directory cleanup once multiple brokers are supported
415 # log.info('rabbitmq directories already removed')
418 # Kafka endpoint functions
420 kafka_server
= 'localhost'
422 class KafkaReceiver(object):
423 """class for receiving and storing messages on a topic from the kafka broker"""
424 def __init__(self
, topic
, security_type
):
425 from kafka
import KafkaConsumer
426 remaining_retries
= 10
428 if security_type
!= 'PLAINTEXT':
429 security_type
= 'SSL'
431 while remaining_retries
> 0:
433 self
.consumer
= KafkaConsumer(topic
, bootstrap_servers
= kafka_server
+':'+str(port
), security_protocol
=security_type
)
434 print('Kafka consumer created on topic: '+topic
)
436 except Exception as error
:
437 remaining_retries
-= 1
438 print('failed to connect to kafka (remaining retries '
439 + str(remaining_retries
) + '): ' + str(error
))
442 if remaining_retries
== 0:
443 raise Exception('failed to connect to kafka - no retries left')
449 def verify_s3_events(self
, keys
, exact_match
=False, deletions
=False):
450 """verify stored s3 records agains a list of keys"""
451 verify_s3_records_by_elements(self
.events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
455 def kafka_receiver_thread_runner(receiver
):
456 """main thread function for the kafka receiver"""
458 log
.info('Kafka receiver started')
459 print('Kafka receiver started')
460 while not receiver
.stop
:
461 for msg
in receiver
.consumer
:
462 receiver
.events
.append(json
.loads(msg
.value
))
464 log
.info('Kafka receiver ended')
465 print('Kafka receiver ended')
466 except Exception as error
:
467 log
.info('Kafka receiver ended unexpectedly: %s', str(error
))
468 print('Kafka receiver ended unexpectedly: ' + str(error
))
471 def create_kafka_receiver_thread(topic
, security_type
='PLAINTEXT'):
472 """create kafka receiver and thread"""
473 receiver
= KafkaReceiver(topic
, security_type
)
474 task
= threading
.Thread(target
=kafka_receiver_thread_runner
, args
=(receiver
,))
476 return task
, receiver
478 def stop_kafka_receiver(receiver
, task
):
479 """stop the receiver thread and wait for it to finis"""
483 receiver
.consumer
.close()
484 except Exception as error
:
485 log
.info('failed to gracefuly stop Kafka receiver: %s', str(error
))
488 # follow the instruction here to create and sign a broker certificate:
489 # https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka
491 # the generated broker certificate should be stored in the java keystore for the use of the server
492 # assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost"
493 # following lines must be added to $KAFKA_DIR/config/server.properties
494 # listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
495 # sasl.enabled.mechanisms=PLAIN
496 # ssl.keystore.location = $KAFKA_DIR/server.keystore.jks
497 # ssl.keystore.password = abcdefgh
498 # ssl.key.password = abcdefgh
499 # ssl.truststore.location = $KAFKA_DIR/server.truststore.jks
500 # ssl.truststore.password = abcdefgh
503 # (1) we dont test client authentication, hence, no need to generate client keys
504 # (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above
505 # should be copied to: $KAFKA_DIR
508 """ start kafka/zookeeper """
510 KAFKA_DIR
= os
.environ
['KAFKA_DIR']
515 log
.info('KAFKA_DIR must be set to where kafka is installed')
516 print('KAFKA_DIR must be set to where kafka is installed')
517 return None, None, None
519 DEVNULL
= open(os
.devnull
, 'wb')
521 print('\nStarting zookeeper...')
523 zk_proc
= subprocess
.Popen([KAFKA_DIR
+'bin/zookeeper-server-start.sh', KAFKA_DIR
+'config/zookeeper.properties'], stdout
=DEVNULL
)
524 except Exception as error
:
525 log
.info('failed to execute zookeeper: %s', str(error
))
526 print('failed to execute zookeeper: %s' % str(error
))
527 return None, None, None
530 if zk_proc
.poll() is not None:
531 print('zookeeper failed to start')
532 return None, None, None
533 print('Zookeeper started')
534 print('Starting kafka...')
535 kafka_log
= open('./kafka.log', 'w')
537 kafka_env
= os
.environ
.copy()
538 kafka_env
['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR
+'config/kafka_server_jaas.conf'
539 kafka_proc
= subprocess
.Popen([
540 KAFKA_DIR
+'bin/kafka-server-start.sh',
541 KAFKA_DIR
+'config/server.properties'],
544 except Exception as error
:
545 log
.info('failed to execute kafka: %s', str(error
))
546 print('failed to execute kafka: %s' % str(error
))
549 return None, None, None
551 # TODO add kafka checkpoint instead of sleep
553 if kafka_proc
.poll() is not None:
555 print('kafka failed to start. details in: ./kafka.log')
557 return None, None, None
559 print('Kafka started')
560 return kafka_proc
, zk_proc
, kafka_log
563 def clean_kafka(kafka_proc
, zk_proc
, kafka_log
):
564 """ stop kafka/zookeeper """
567 print('Shutdown Kafka...')
568 kafka_proc
.terminate()
570 if kafka_proc
.poll() is None:
571 print('Failed to shutdown Kafka... killing')
573 print('Shutdown zookeeper...')
576 if zk_proc
.poll() is None:
577 print('Failed to shutdown zookeeper... killing')
580 log
.info('kafka/zookeeper already terminated')
583 def init_env(require_ps
=True):
584 """initialize the environment"""
586 check_ps_configured()
589 zonegroup
= realm
.master_zonegroup()
590 zonegroup_conns
= ZonegroupConns(zonegroup
)
592 zonegroup_meta_checkpoint(zonegroup
)
596 for conn
in zonegroup_conns
.zones
:
597 if conn
.zone
== zonegroup
.master_zone
:
600 zone_meta_checkpoint(conn
.zone
)
603 assert_not_equal(master_zone
, None)
605 assert_not_equal(ps_zone
, None)
606 return master_zone
, ps_zone
610 """ This method returns the "primary" IP on the local box (the one with a default route)
611 source: https://stackoverflow.com/a/28950776/711085
612 this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
613 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
615 # address should not be reachable
616 s
.connect(('10.255.255.255', 1))
617 ip
= s
.getsockname()[0]
623 TOPIC_SUFFIX
= "_topic"
625 NOTIFICATION_SUFFIX
= "_notif"
632 """ log information for manual testing """
633 return SkipTest("only used in manual testing")
634 master_zone
, ps_zone
= init_env()
636 zonegroup
= realm
.master_zonegroup()
637 bucket_name
= gen_bucket_name()
638 # create bucket on the first of the rados zones
639 bucket
= master_zone
.create_bucket(bucket_name
)
640 # create objects in the bucket
641 number_of_objects
= 10
642 for i
in range(number_of_objects
):
643 key
= bucket
.new_key(str(i
))
644 key
.set_contents_from_string('bar')
645 print('Zonegroup: ' + zonegroup
.name
)
646 print('user: ' + get_user())
647 print('tenant: ' + get_tenant())
649 print_connection_info(master_zone
.conn
)
651 print_connection_info(ps_zone
.conn
)
652 print('Bucket: ' + bucket_name
)
655 def test_ps_s3_notification_low_level():
656 """ test low level implementation of s3 notifications """
657 master_zone
, ps_zone
= init_env()
658 bucket_name
= gen_bucket_name()
659 # create bucket on the first of the rados zones
660 master_zone
.create_bucket(bucket_name
)
662 zone_meta_checkpoint(ps_zone
.zone
)
664 topic_name
= bucket_name
+ TOPIC_SUFFIX
665 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
666 result
, status
= topic_conf
.set_config()
667 assert_equal(status
/100, 2)
668 parsed_result
= json
.loads(result
)
669 topic_arn
= parsed_result
['arn']
670 # create s3 notification
671 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
672 generated_topic_name
= notification_name
+'_'+topic_name
673 topic_conf_list
= [{'Id': notification_name
,
674 'TopicArn': topic_arn
,
675 'Events': ['s3:ObjectCreated:*']
677 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
678 _
, status
= s3_notification_conf
.set_config()
679 assert_equal(status
/100, 2)
680 zone_meta_checkpoint(ps_zone
.zone
)
681 # get auto-generated topic
682 generated_topic_conf
= PSTopic(ps_zone
.conn
, generated_topic_name
)
683 result
, status
= generated_topic_conf
.get_config()
684 parsed_result
= json
.loads(result
)
685 assert_equal(status
/100, 2)
686 assert_equal(parsed_result
['topic']['name'], generated_topic_name
)
687 # get auto-generated notification
688 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
689 generated_topic_name
)
690 result
, status
= notification_conf
.get_config()
691 parsed_result
= json
.loads(result
)
692 assert_equal(status
/100, 2)
693 assert_equal(len(parsed_result
['topics']), 1)
694 # get auto-generated subscription
695 sub_conf
= PSSubscription(ps_zone
.conn
, notification_name
,
696 generated_topic_name
)
697 result
, status
= sub_conf
.get_config()
698 parsed_result
= json
.loads(result
)
699 assert_equal(status
/100, 2)
700 assert_equal(parsed_result
['topic'], generated_topic_name
)
701 # delete s3 notification
702 _
, status
= s3_notification_conf
.del_config(notification
=notification_name
)
703 assert_equal(status
/100, 2)
705 _
, status
= topic_conf
.del_config()
706 assert_equal(status
/100, 2)
708 # verify low-level cleanup
709 _
, status
= generated_topic_conf
.get_config()
710 assert_equal(status
, 404)
711 result
, status
= notification_conf
.get_config()
712 parsed_result
= json
.loads(result
)
713 assert_equal(len(parsed_result
['topics']), 0)
714 # TODO should return 404
715 # assert_equal(status, 404)
716 result
, status
= sub_conf
.get_config()
717 parsed_result
= json
.loads(result
)
718 assert_equal(parsed_result
['topic'], '')
719 # TODO should return 404
720 # assert_equal(status, 404)
723 topic_conf
.del_config()
725 master_zone
.delete_bucket(bucket_name
)
728 def test_ps_s3_notification_records():
729 """ test s3 records fetching """
730 master_zone
, ps_zone
= init_env()
731 bucket_name
= gen_bucket_name()
732 # create bucket on the first of the rados zones
733 bucket
= master_zone
.create_bucket(bucket_name
)
735 zone_meta_checkpoint(ps_zone
.zone
)
737 topic_name
= bucket_name
+ TOPIC_SUFFIX
738 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
739 result
, status
= topic_conf
.set_config()
740 assert_equal(status
/100, 2)
741 parsed_result
= json
.loads(result
)
742 topic_arn
= parsed_result
['arn']
743 # create s3 notification
744 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
745 topic_conf_list
= [{'Id': notification_name
,
746 'TopicArn': topic_arn
,
747 'Events': ['s3:ObjectCreated:*']
749 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
750 _
, status
= s3_notification_conf
.set_config()
751 assert_equal(status
/100, 2)
752 zone_meta_checkpoint(ps_zone
.zone
)
753 # get auto-generated subscription
754 sub_conf
= PSSubscription(ps_zone
.conn
, notification_name
,
756 _
, status
= sub_conf
.get_config()
757 assert_equal(status
/100, 2)
758 # create objects in the bucket
759 number_of_objects
= 10
760 for i
in range(number_of_objects
):
761 key
= bucket
.new_key(str(i
))
762 key
.set_contents_from_string('bar')
764 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
766 # get the events from the subscription
767 result
, _
= sub_conf
.get_events()
768 records
= json
.loads(result
)
769 for record
in records
['Records']:
771 keys
= list(bucket
.list())
772 # TODO: use exact match
773 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
776 _
, status
= s3_notification_conf
.del_config()
777 topic_conf
.del_config()
779 for key
in bucket
.list():
781 master_zone
.delete_bucket(bucket_name
)
784 def test_ps_s3_notification():
785 """ test s3 notification set/get/delete """
786 master_zone
, ps_zone
= init_env()
787 bucket_name
= gen_bucket_name()
788 # create bucket on the first of the rados zones
789 master_zone
.create_bucket(bucket_name
)
791 zone_meta_checkpoint(ps_zone
.zone
)
792 topic_name
= bucket_name
+ TOPIC_SUFFIX
794 topic_name
= bucket_name
+ TOPIC_SUFFIX
795 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
796 response
, status
= topic_conf
.set_config()
797 assert_equal(status
/100, 2)
798 parsed_result
= json
.loads(response
)
799 topic_arn
= parsed_result
['arn']
800 # create one s3 notification
801 notification_name1
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_1'
802 topic_conf_list
= [{'Id': notification_name1
,
803 'TopicArn': topic_arn
,
804 'Events': ['s3:ObjectCreated:*']
806 s3_notification_conf1
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
807 response
, status
= s3_notification_conf1
.set_config()
808 assert_equal(status
/100, 2)
809 # create another s3 notification with the same topic
810 notification_name2
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_2'
811 topic_conf_list
= [{'Id': notification_name2
,
812 'TopicArn': topic_arn
,
813 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
815 s3_notification_conf2
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
816 response
, status
= s3_notification_conf2
.set_config()
817 assert_equal(status
/100, 2)
818 zone_meta_checkpoint(ps_zone
.zone
)
820 # get all notification on a bucket
821 response
, status
= s3_notification_conf1
.get_config()
822 assert_equal(status
/100, 2)
823 assert_equal(len(response
['TopicConfigurations']), 2)
824 assert_equal(response
['TopicConfigurations'][0]['TopicArn'], topic_arn
)
825 assert_equal(response
['TopicConfigurations'][1]['TopicArn'], topic_arn
)
827 # get specific notification on a bucket
828 response
, status
= s3_notification_conf1
.get_config(notification
=notification_name1
)
829 assert_equal(status
/100, 2)
830 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn
)
831 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1
)
832 response
, status
= s3_notification_conf2
.get_config(notification
=notification_name2
)
833 assert_equal(status
/100, 2)
834 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn
)
835 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2
)
837 # delete specific notifications
838 _
, status
= s3_notification_conf1
.del_config(notification
=notification_name1
)
839 assert_equal(status
/100, 2)
840 _
, status
= s3_notification_conf2
.del_config(notification
=notification_name2
)
841 assert_equal(status
/100, 2)
844 topic_conf
.del_config()
846 master_zone
.delete_bucket(bucket_name
)
849 def test_ps_s3_notification_filter():
850 """ test s3 notification filter on master """
852 return SkipTest("PubSub push tests don't run in teuthology")
854 proc
= init_rabbitmq()
856 return SkipTest('end2end amqp tests require rabbitmq-server installed')
858 master_zone
, ps_zone
= init_env(require_ps
=True)
862 zonegroup
= realm
.master_zonegroup()
865 bucket_name
= gen_bucket_name()
866 bucket
= master_zone
.create_bucket(bucket_name
)
867 topic_name
= bucket_name
+ TOPIC_SUFFIX
869 # start amqp receivers
871 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
875 endpoint_address
= 'amqp://' + hostname
876 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
878 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
, endpoint
=endpoint_address
, endpoint_args
=endpoint_args
)
879 result
, _
= topic_conf
.set_config()
880 parsed_result
= json
.loads(result
)
881 topic_arn
= parsed_result
['arn']
882 zone_meta_checkpoint(ps_zone
.zone
)
884 # create s3 notification
885 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
886 topic_conf_list
= [{'Id': notification_name
+'_1',
887 'TopicArn': topic_arn
,
888 'Events': ['s3:ObjectCreated:*'],
891 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
895 {'Id': notification_name
+'_2',
896 'TopicArn': topic_arn
,
897 'Events': ['s3:ObjectCreated:*'],
900 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
901 {'Name': 'suffix', 'Value': 'log'}]
905 {'Id': notification_name
+'_3',
906 'TopicArn': topic_arn
,
910 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
915 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
916 result
, status
= s3_notification_conf
.set_config()
917 assert_equal(status
/100, 2)
919 print('filtering by attributes only supported on master zone')
922 # get all notifications
923 result
, status
= s3_notification_conf
.get_config()
924 assert_equal(status
/100, 2)
925 for conf
in result
['TopicConfigurations']:
926 filter_name
= conf
['Filter']['Key']['FilterRules'][0]['Name']
927 assert filter_name
== 'prefix' or filter_name
== 'suffix' or filter_name
== 'regex', filter_name
930 result
, status
= s3_notification_conf4
.get_config(notification
=notification_name
+'_4')
931 assert_equal(status
/100, 2)
932 filter_name
= result
['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
933 assert filter_name
== 'x-amz-meta-foo' or filter_name
== 'x-amz-meta-hello'
935 expected_in1
= ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
936 expected_in2
= ['world1.log', 'world2log', 'world3.log']
937 expected_in3
= ['hello.txt', 'hell.txt', 'worldlog.txt']
938 expected_in4
= ['foo', 'bar', 'hello', 'world']
939 filtered
= ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
940 filtered_with_attr
= ['nofoo', 'nobar', 'nohello', 'noworld']
941 # create objects in bucket
942 for key_name
in expected_in1
:
943 key
= bucket
.new_key(key_name
)
944 key
.set_contents_from_string('bar')
945 for key_name
in expected_in2
:
946 key
= bucket
.new_key(key_name
)
947 key
.set_contents_from_string('bar')
948 for key_name
in expected_in3
:
949 key
= bucket
.new_key(key_name
)
950 key
.set_contents_from_string('bar')
952 for key_name
in expected_in4
:
953 key
= bucket
.new_key(key_name
)
954 key
.set_metadata('foo', 'bar')
955 key
.set_metadata('hello', 'world')
956 key
.set_metadata('goodbye', 'cruel world')
957 key
.set_contents_from_string('bar')
958 for key_name
in filtered
:
959 key
= bucket
.new_key(key_name
)
960 key
.set_contents_from_string('bar')
961 for key_name
in filtered_with_attr
:
962 key
.set_metadata('foo', 'nobar')
963 key
.set_metadata('hello', 'noworld')
964 key
.set_metadata('goodbye', 'cruel world')
965 key
= bucket
.new_key(key_name
)
966 key
.set_contents_from_string('bar')
968 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
975 for event
in receiver
.get_and_reset_events():
976 notif_id
= event
['Records'][0]['s3']['configurationId']
977 key_name
= event
['Records'][0]['s3']['object']['key']
978 if notif_id
== notification_name
+'_1':
979 found_in1
.append(key_name
)
980 elif notif_id
== notification_name
+'_2':
981 found_in2
.append(key_name
)
982 elif notif_id
== notification_name
+'_3':
983 found_in3
.append(key_name
)
984 elif not skip_notif4
and notif_id
== notification_name
+'_4':
985 found_in4
.append(key_name
)
987 assert False, 'invalid notification: ' + notif_id
989 assert_equal(set(found_in1
), set(expected_in1
))
990 assert_equal(set(found_in2
), set(expected_in2
))
991 assert_equal(set(found_in3
), set(expected_in3
))
993 assert_equal(set(found_in4
), set(expected_in4
))
996 s3_notification_conf
.del_config()
998 s3_notification_conf4
.del_config()
999 topic_conf
.del_config()
1001 for key
in bucket
.list():
1003 master_zone
.delete_bucket(bucket_name
)
1004 stop_amqp_receiver(receiver
, task
)
1005 clean_rabbitmq(proc
)
1008 def test_object_timing():
1009 return SkipTest("only used in manual testing")
1010 master_zone
, _
= init_env(require_ps
=False)
1013 bucket_name
= gen_bucket_name()
1014 bucket
= master_zone
.create_bucket(bucket_name
)
1015 # create objects in the bucket (async)
1016 print('creating objects...')
1017 number_of_objects
= 1000
1019 start_time
= time
.time()
1020 content
= str(bytearray(os
.urandom(1024*1024)))
1021 for i
in range(number_of_objects
):
1022 key
= bucket
.new_key(str(i
))
1023 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1025 client_threads
.append(thr
)
1026 [thr
.join() for thr
in client_threads
]
1028 time_diff
= time
.time() - start_time
1029 print('average time for object creation: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1031 print('total number of objects: ' + str(len(list(bucket
.list()))))
1033 print('deleting objects...')
1035 start_time
= time
.time()
1036 for key
in bucket
.list():
1037 thr
= threading
.Thread(target
= key
.delete
, args
=())
1039 client_threads
.append(thr
)
1040 [thr
.join() for thr
in client_threads
]
1042 time_diff
= time
.time() - start_time
1043 print('average time for object deletion: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds')
1046 master_zone
.delete_bucket(bucket_name
)
1049 def test_ps_s3_opaque_data():
1050 """ test that opaque id set in topic, is sent in notification """
1052 return SkipTest("PubSub push tests don't run in teuthology")
1054 master_zone
, ps_zone
= init_env()
1056 zonegroup
= realm
.master_zonegroup()
1058 # create random port for the http server
1060 port
= random
.randint(10000, 20000)
1061 # start an http server in a separate thread
1062 number_of_objects
= 10
1063 http_server
= StreamingHTTPServer(host
, port
, num_workers
=number_of_objects
)
1066 bucket_name
= gen_bucket_name()
1067 bucket
= master_zone
.create_bucket(bucket_name
)
1068 topic_name
= bucket_name
+ TOPIC_SUFFIX
1070 zone_meta_checkpoint(ps_zone
.zone
)
1073 endpoint_address
= 'http://'+host
+':'+str(port
)
1074 opaque_data
= 'http://1.2.3.4:8888'
1075 endpoint_args
= 'push-endpoint='+endpoint_address
+'&OpaqueData='+opaque_data
1076 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
, endpoint
=endpoint_address
, endpoint_args
=endpoint_args
)
1077 result
, status
= topic_conf
.set_config()
1078 assert_equal(status
/100, 2)
1079 parsed_result
= json
.loads(result
)
1080 topic_arn
= parsed_result
['arn']
1081 # create s3 notification
1082 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1083 topic_conf_list
= [{'Id': notification_name
,
1084 'TopicArn': topic_arn
,
1087 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
1088 response
, status
= s3_notification_conf
.set_config()
1089 assert_equal(status
/100, 2)
1091 # create objects in the bucket
1094 for i
in range(number_of_objects
):
1095 key
= bucket
.new_key(str(i
))
1096 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1098 client_threads
.append(thr
)
1099 [thr
.join() for thr
in client_threads
]
1102 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1104 # check http receiver
1105 keys
= list(bucket
.list())
1106 print('total number of objects: ' + str(len(keys
)))
1107 events
= http_server
.get_and_reset_events()
1108 for event
in events
:
1109 assert_equal(event
['Records'][0]['opaqueData'], opaque_data
)
1114 [thr
.join() for thr
in client_threads
]
1115 topic_conf
.del_config()
1116 s3_notification_conf
.del_config(notification
=notification_name
)
1118 master_zone
.delete_bucket(bucket_name
)
1122 def test_ps_topic():
1123 """ test set/get/delete of topic """
1124 _
, ps_zone
= init_env()
1126 zonegroup
= realm
.master_zonegroup()
1127 bucket_name
= gen_bucket_name()
1128 topic_name
= bucket_name
+TOPIC_SUFFIX
1131 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1132 _
, status
= topic_conf
.set_config()
1133 assert_equal(status
/100, 2)
1135 result
, _
= topic_conf
.get_config()
1136 # verify topic content
1137 parsed_result
= json
.loads(result
)
1138 assert_equal(parsed_result
['topic']['name'], topic_name
)
1139 assert_equal(len(parsed_result
['subs']), 0)
1140 assert_equal(parsed_result
['topic']['arn'],
1141 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
)
1143 _
, status
= topic_conf
.del_config()
1144 assert_equal(status
/100, 2)
1145 # verift topic is deleted
1146 result
, status
= topic_conf
.get_config()
1147 assert_equal(status
, 404)
1148 parsed_result
= json
.loads(result
)
1149 assert_equal(parsed_result
['Code'], 'NoSuchKey')
1152 def test_ps_topic_with_endpoint():
1153 """ test set topic with endpoint"""
1154 _
, ps_zone
= init_env()
1155 bucket_name
= gen_bucket_name()
1156 topic_name
= bucket_name
+TOPIC_SUFFIX
1159 dest_endpoint
= 'amqp://localhost:7001'
1160 dest_args
= 'amqp-exchange=amqp.direct&amqp-ack-level=none'
1161 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
1162 endpoint
=dest_endpoint
,
1163 endpoint_args
=dest_args
)
1164 _
, status
= topic_conf
.set_config()
1165 assert_equal(status
/100, 2)
1167 result
, _
= topic_conf
.get_config()
1168 # verify topic content
1169 parsed_result
= json
.loads(result
)
1170 assert_equal(parsed_result
['topic']['name'], topic_name
)
1171 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], dest_endpoint
)
1173 topic_conf
.del_config()
1176 def test_ps_notification():
1177 """ test set/get/delete of notification """
1178 master_zone
, ps_zone
= init_env()
1179 bucket_name
= gen_bucket_name()
1180 topic_name
= bucket_name
+TOPIC_SUFFIX
1183 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1184 topic_conf
.set_config()
1185 # create bucket on the first of the rados zones
1186 master_zone
.create_bucket(bucket_name
)
1188 zone_meta_checkpoint(ps_zone
.zone
)
1189 # create notifications
1190 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1192 _
, status
= notification_conf
.set_config()
1193 assert_equal(status
/100, 2)
1195 result
, _
= notification_conf
.get_config()
1196 parsed_result
= json
.loads(result
)
1197 assert_equal(len(parsed_result
['topics']), 1)
1198 assert_equal(parsed_result
['topics'][0]['topic']['name'],
1200 # delete notification
1201 _
, status
= notification_conf
.del_config()
1202 assert_equal(status
/100, 2)
1203 result
, status
= notification_conf
.get_config()
1204 parsed_result
= json
.loads(result
)
1205 assert_equal(len(parsed_result
['topics']), 0)
1206 # TODO should return 404
1207 # assert_equal(status, 404)
1210 topic_conf
.del_config()
1211 master_zone
.delete_bucket(bucket_name
)
1214 def test_ps_notification_events():
1215 """ test set/get/delete of notification on specific events"""
1216 master_zone
, ps_zone
= init_env()
1217 bucket_name
= gen_bucket_name()
1218 topic_name
= bucket_name
+TOPIC_SUFFIX
1221 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1222 topic_conf
.set_config()
1223 # create bucket on the first of the rados zones
1224 master_zone
.create_bucket(bucket_name
)
1226 zone_meta_checkpoint(ps_zone
.zone
)
1227 # create notifications
1228 events
= "OBJECT_CREATE,OBJECT_DELETE"
1229 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1232 _
, status
= notification_conf
.set_config()
1233 assert_equal(status
/100, 2)
1235 result
, _
= notification_conf
.get_config()
1236 parsed_result
= json
.loads(result
)
1237 assert_equal(len(parsed_result
['topics']), 1)
1238 assert_equal(parsed_result
['topics'][0]['topic']['name'],
1240 assert_not_equal(len(parsed_result
['topics'][0]['events']), 0)
1241 # TODO add test for invalid event name
1244 notification_conf
.del_config()
1245 topic_conf
.del_config()
1246 master_zone
.delete_bucket(bucket_name
)
1249 def test_ps_subscription():
1250 """ test set/get/delete of subscription """
1251 master_zone
, ps_zone
= init_env()
1252 bucket_name
= gen_bucket_name()
1253 topic_name
= bucket_name
+TOPIC_SUFFIX
1256 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1257 topic_conf
.set_config()
1258 # create bucket on the first of the rados zones
1259 bucket
= master_zone
.create_bucket(bucket_name
)
1261 zone_meta_checkpoint(ps_zone
.zone
)
1262 # create notifications
1263 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1265 _
, status
= notification_conf
.set_config()
1266 assert_equal(status
/100, 2)
1267 # create subscription
1268 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
1270 _
, status
= sub_conf
.set_config()
1271 assert_equal(status
/100, 2)
1272 # get the subscription
1273 result
, _
= sub_conf
.get_config()
1274 parsed_result
= json
.loads(result
)
1275 assert_equal(parsed_result
['topic'], topic_name
)
1276 # create objects in the bucket
1277 number_of_objects
= 10
1278 for i
in range(number_of_objects
):
1279 key
= bucket
.new_key(str(i
))
1280 key
.set_contents_from_string('bar')
1282 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1284 # get the create events from the subscription
1285 result
, _
= sub_conf
.get_events()
1286 events
= json
.loads(result
)
1287 for event
in events
['events']:
1288 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1289 keys
= list(bucket
.list())
1290 # TODO: use exact match
1291 verify_events_by_elements(events
, keys
, exact_match
=False)
1292 # delete objects from the bucket
1293 for key
in bucket
.list():
1296 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1298 # get the delete events from the subscriptions
1299 #result, _ = sub_conf.get_events()
1300 #for event in events['events']:
1301 # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1302 # TODO: check deletions
1303 # TODO: use exact match
1304 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1305 # we should see the creations as well as the deletions
1306 # delete subscription
1307 _
, status
= sub_conf
.del_config()
1308 assert_equal(status
/100, 2)
1309 result
, status
= sub_conf
.get_config()
1310 parsed_result
= json
.loads(result
)
1311 assert_equal(parsed_result
['topic'], '')
1312 # TODO should return 404
1313 # assert_equal(status, 404)
1316 notification_conf
.del_config()
1317 topic_conf
.del_config()
1318 master_zone
.delete_bucket(bucket_name
)
1321 def test_ps_admin():
1322 """ test radosgw-admin commands """
1323 master_zone
, ps_zone
= init_env()
1324 bucket_name
= gen_bucket_name()
1325 topic_name
= bucket_name
+TOPIC_SUFFIX
1327 zonegroup
= realm
.master_zonegroup()
1330 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1331 topic_conf
.set_config()
1332 result
, status
= topic_conf
.get_config()
1333 assert_equal(status
, 200)
1334 parsed_result
= json
.loads(result
)
1335 assert_equal(parsed_result
['topic']['name'], topic_name
)
1336 result
, status
= ps_zone
.zone
.cluster
.admin(['topic', 'list', '--uid', get_user()] + ps_zone
.zone
.zone_arg())
1337 assert_equal(status
, 0)
1338 parsed_result
= json
.loads(result
)
1339 assert len(parsed_result
['topics']) > 0
1340 result
, status
= ps_zone
.zone
.cluster
.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name
] + ps_zone
.zone
.zone_arg())
1341 assert_equal(status
, 0)
1342 parsed_result
= json
.loads(result
)
1343 assert_equal(parsed_result
['topic']['name'], topic_name
)
1346 endpoint_address
= 'amqp://127.0.0.1:7001/vhost_1'
1347 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
1348 topic_conf_s3
= PSTopicS3(master_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1349 topic_conf_s3
.set_config()
1350 result
, status
= topic_conf_s3
.get_config()
1351 assert_equal(status
, 200)
1352 assert_equal(result
['GetTopicResponse']['GetTopicResult']['Topic']['Name'], topic_name
)
1353 result
, status
= master_zone
.zone
.cluster
.admin(['topic', 'list', '--uid', get_user()] + master_zone
.zone
.zone_arg())
1354 assert_equal(status
, 0)
1355 parsed_result
= json
.loads(result
)
1356 assert len(parsed_result
['topics']) > 0
1357 result
, status
= master_zone
.zone
.cluster
.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name
] + master_zone
.zone
.zone_arg())
1358 assert_equal(status
, 0)
1359 parsed_result
= json
.loads(result
)
1360 assert_equal(parsed_result
['topic']['name'], topic_name
)
1362 # create bucket on the first of the rados zones
1363 bucket
= master_zone
.create_bucket(bucket_name
)
1365 zone_meta_checkpoint(ps_zone
.zone
)
1366 # create notifications
1367 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1369 _
, status
= notification_conf
.set_config()
1370 assert_equal(status
/100, 2)
1371 # create subscription
1372 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
1374 _
, status
= sub_conf
.set_config()
1375 assert_equal(status
/100, 2)
1376 result
, status
= ps_zone
.zone
.cluster
.admin(['subscription', 'get', '--uid', get_user(), '--subscription', bucket_name
+SUB_SUFFIX
]
1377 + ps_zone
.zone
.zone_arg())
1378 assert_equal(status
, 0)
1379 parsed_result
= json
.loads(result
)
1380 assert_equal(parsed_result
['name'], bucket_name
+SUB_SUFFIX
)
1381 # create objects in the bucket
1382 number_of_objects
= 110
1383 for i
in range(number_of_objects
):
1384 key
= bucket
.new_key(str(i
))
1385 key
.set_contents_from_string('bar')
1387 # get events from subscription
1388 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1389 result
, status
= ps_zone
.zone
.cluster
.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name
+SUB_SUFFIX
]
1390 + ps_zone
.zone
.zone_arg())
1391 assert_equal(status
, 0)
1392 parsed_result
= json
.loads(result
)
1393 marker
= parsed_result
['next_marker']
1394 events1
= parsed_result
['events']
1395 result
, status
= ps_zone
.zone
.cluster
.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name
+SUB_SUFFIX
, '--marker', marker
]
1396 + ps_zone
.zone
.zone_arg())
1397 assert_equal(status
, 0)
1398 parsed_result
= json
.loads(result
)
1399 events2
= parsed_result
['events']
1401 keys
= list(bucket
.list())
1402 verify_events_by_elements({"events": events1
+events2
}, keys
, exact_match
=False)
1404 # ack an event in the subscription
1405 result
, status
= ps_zone
.zone
.cluster
.admin(['subscription', 'ack', '--uid', get_user(), '--subscription', bucket_name
+SUB_SUFFIX
, '--event-id', events2
[0]['id']]
1406 + ps_zone
.zone
.zone_arg())
1407 assert_equal(status
, 0)
1409 # remove the subscription
1410 result
, status
= ps_zone
.zone
.cluster
.admin(['subscription', 'rm', '--uid', get_user(), '--subscription', bucket_name
+SUB_SUFFIX
]
1411 + ps_zone
.zone
.zone_arg())
1412 assert_equal(status
, 0)
1415 result
, status
= ps_zone
.zone
.cluster
.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name
]
1416 + ps_zone
.zone
.zone_arg())
1417 assert_equal(status
, 0)
1418 result
, status
= master_zone
.zone
.cluster
.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name
]
1419 + master_zone
.zone
.zone_arg())
1420 assert_equal(status
, 0)
1423 for key
in bucket
.list():
1425 notification_conf
.del_config()
1426 master_zone
.delete_bucket(bucket_name
)
1429 def test_ps_incremental_sync():
1430 """ test that events are only sent on incremental sync """
1431 master_zone
, ps_zone
= init_env()
1432 bucket_name
= gen_bucket_name()
1433 topic_name
= bucket_name
+TOPIC_SUFFIX
1436 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1437 topic_conf
.set_config()
1438 # create bucket on the first of the rados zones
1439 bucket
= master_zone
.create_bucket(bucket_name
)
1440 # create objects in the bucket
1441 number_of_objects
= 10
1442 for i
in range(0, number_of_objects
):
1443 key
= bucket
.new_key(str(i
))
1444 key
.set_contents_from_string('foo')
1446 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1447 # create notifications
1448 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1450 _
, status
= notification_conf
.set_config()
1451 assert_equal(status
/100, 2)
1452 # create subscription
1453 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
1455 _
, status
= sub_conf
.set_config()
1456 assert_equal(status
/100, 2)
1458 # create more objects in the bucket
1459 for i
in range(number_of_objects
, 2*number_of_objects
):
1460 key
= bucket
.new_key(str(i
))
1461 key
.set_contents_from_string('bar')
1463 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1465 # get the create events from the subscription
1466 result
, _
= sub_conf
.get_events()
1467 events
= json
.loads(result
)
1469 for event
in events
['events']:
1470 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1473 # make sure we have 10 and not 20 events
1474 assert_equal(count
, number_of_objects
)
1477 for key
in bucket
.list():
1479 sub_conf
.del_config()
1480 notification_conf
.del_config()
1481 topic_conf
.del_config()
1482 master_zone
.delete_bucket(bucket_name
)
1484 def test_ps_event_type_subscription():
1485 """ test subscriptions for different events """
1486 master_zone
, ps_zone
= init_env()
1487 bucket_name
= gen_bucket_name()
1489 # create topic for objects creation
1490 topic_create_name
= bucket_name
+TOPIC_SUFFIX
+'_create'
1491 topic_create_conf
= PSTopic(ps_zone
.conn
, topic_create_name
)
1492 topic_create_conf
.set_config()
1493 # create topic for objects deletion
1494 topic_delete_name
= bucket_name
+TOPIC_SUFFIX
+'_delete'
1495 topic_delete_conf
= PSTopic(ps_zone
.conn
, topic_delete_name
)
1496 topic_delete_conf
.set_config()
1497 # create topic for all events
1498 topic_name
= bucket_name
+TOPIC_SUFFIX
+'_all'
1499 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1500 topic_conf
.set_config()
1501 # create bucket on the first of the rados zones
1502 bucket
= master_zone
.create_bucket(bucket_name
)
1504 zone_meta_checkpoint(ps_zone
.zone
)
1505 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1506 # create notifications for objects creation
1507 notification_create_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1508 topic_create_name
, "OBJECT_CREATE")
1509 _
, status
= notification_create_conf
.set_config()
1510 assert_equal(status
/100, 2)
1511 # create notifications for objects deletion
1512 notification_delete_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1513 topic_delete_name
, "OBJECT_DELETE")
1514 _
, status
= notification_delete_conf
.set_config()
1515 assert_equal(status
/100, 2)
1516 # create notifications for all events
1517 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1518 topic_name
, "OBJECT_DELETE,OBJECT_CREATE")
1519 _
, status
= notification_conf
.set_config()
1520 assert_equal(status
/100, 2)
1521 # create subscription for objects creation
1522 sub_create_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_create',
1524 _
, status
= sub_create_conf
.set_config()
1525 assert_equal(status
/100, 2)
1526 # create subscription for objects deletion
1527 sub_delete_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_delete',
1529 _
, status
= sub_delete_conf
.set_config()
1530 assert_equal(status
/100, 2)
1531 # create subscription for all events
1532 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_all',
1534 _
, status
= sub_conf
.set_config()
1535 assert_equal(status
/100, 2)
1536 # create objects in the bucket
1537 number_of_objects
= 10
1538 for i
in range(number_of_objects
):
1539 key
= bucket
.new_key(str(i
))
1540 key
.set_contents_from_string('bar')
1542 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1544 # get the events from the creation subscription
1545 result
, _
= sub_create_conf
.get_events()
1546 events
= json
.loads(result
)
1547 for event
in events
['events']:
1548 log
.debug('Event (OBJECT_CREATE): objname: "' + str(event
['info']['key']['name']) +
1549 '" type: "' + str(event
['event']) + '"')
1550 keys
= list(bucket
.list())
1551 # TODO: use exact match
1552 verify_events_by_elements(events
, keys
, exact_match
=False)
1553 # get the events from the deletions subscription
1554 result
, _
= sub_delete_conf
.get_events()
1555 events
= json
.loads(result
)
1556 for event
in events
['events']:
1557 log
.debug('Event (OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
1558 '" type: "' + str(event
['event']) + '"')
1559 assert_equal(len(events
['events']), 0)
1560 # get the events from the all events subscription
1561 result
, _
= sub_conf
.get_events()
1562 events
= json
.loads(result
)
1563 for event
in events
['events']:
1564 log
.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
1565 str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1566 # TODO: use exact match
1567 verify_events_by_elements(events
, keys
, exact_match
=False)
1568 # delete objects from the bucket
1569 for key
in bucket
.list():
1572 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1573 log
.debug("Event (OBJECT_DELETE) synced")
1575 # get the events from the creations subscription
1576 result
, _
= sub_create_conf
.get_events()
1577 events
= json
.loads(result
)
1578 for event
in events
['events']:
1579 log
.debug('Event (OBJECT_CREATE): objname: "' + str(event
['info']['key']['name']) +
1580 '" type: "' + str(event
['event']) + '"')
1581 # deletions should not change the creation events
1582 # TODO: use exact match
1583 verify_events_by_elements(events
, keys
, exact_match
=False)
1584 # get the events from the deletions subscription
1585 result
, _
= sub_delete_conf
.get_events()
1586 events
= json
.loads(result
)
1587 for event
in events
['events']:
1588 log
.debug('Event (OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
1589 '" type: "' + str(event
['event']) + '"')
1590 # only deletions should be listed here
1591 # TODO: use exact match
1592 verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=True)
1593 # get the events from the all events subscription
1594 result
, _
= sub_create_conf
.get_events()
1595 events
= json
.loads(result
)
1596 for event
in events
['events']:
1597 log
.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
1598 '" type: "' + str(event
['event']) + '"')
1599 # both deletions and creations should be here
1600 # TODO: use exact match
1601 verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=False)
1602 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1603 # TODO: (1) test deletions (2) test overall number of events
1605 # test subscription deletion when topic is specified
1606 _
, status
= sub_create_conf
.del_config(topic
=True)
1607 assert_equal(status
/100, 2)
1608 _
, status
= sub_delete_conf
.del_config(topic
=True)
1609 assert_equal(status
/100, 2)
1610 _
, status
= sub_conf
.del_config(topic
=True)
1611 assert_equal(status
/100, 2)
1614 notification_create_conf
.del_config()
1615 notification_delete_conf
.del_config()
1616 notification_conf
.del_config()
1617 topic_create_conf
.del_config()
1618 topic_delete_conf
.del_config()
1619 topic_conf
.del_config()
1620 master_zone
.delete_bucket(bucket_name
)
1623 def test_ps_event_fetching():
1624 """ test incremental fetching of events from a subscription """
1625 master_zone
, ps_zone
= init_env()
1626 bucket_name
= gen_bucket_name()
1627 topic_name
= bucket_name
+TOPIC_SUFFIX
1630 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1631 topic_conf
.set_config()
1632 # create bucket on the first of the rados zones
1633 bucket
= master_zone
.create_bucket(bucket_name
)
1635 zone_meta_checkpoint(ps_zone
.zone
)
1636 # create notifications
1637 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1639 _
, status
= notification_conf
.set_config()
1640 assert_equal(status
/100, 2)
1641 # create subscription
1642 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
1644 _
, status
= sub_conf
.set_config()
1645 assert_equal(status
/100, 2)
1646 # create objects in the bucket
1647 number_of_objects
= 100
1648 for i
in range(number_of_objects
):
1649 key
= bucket
.new_key(str(i
))
1650 key
.set_contents_from_string('bar')
1652 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1654 total_events_count
= 0
1658 # get the events from the subscription
1659 result
, _
= sub_conf
.get_events(max_events
, next_marker
)
1660 events
= json
.loads(result
)
1661 total_events_count
+= len(events
['events'])
1662 all_events
.extend(events
['events'])
1663 next_marker
= events
['next_marker']
1664 for event
in events
['events']:
1665 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1666 if next_marker
== '':
1668 keys
= list(bucket
.list())
1669 # TODO: use exact match
1670 verify_events_by_elements({'events': all_events
}, keys
, exact_match
=False)
1673 sub_conf
.del_config()
1674 notification_conf
.del_config()
1675 topic_conf
.del_config()
1676 for key
in bucket
.list():
1678 master_zone
.delete_bucket(bucket_name
)
1681 def test_ps_event_acking():
1682 """ test acking of some events in a subscription """
1683 master_zone
, ps_zone
= init_env()
1684 bucket_name
= gen_bucket_name()
1685 topic_name
= bucket_name
+TOPIC_SUFFIX
1688 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1689 topic_conf
.set_config()
1690 # create bucket on the first of the rados zones
1691 bucket
= master_zone
.create_bucket(bucket_name
)
1693 zone_meta_checkpoint(ps_zone
.zone
)
1694 # create notifications
1695 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1697 _
, status
= notification_conf
.set_config()
1698 assert_equal(status
/100, 2)
1699 # create subscription
1700 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
1702 _
, status
= sub_conf
.set_config()
1703 assert_equal(status
/100, 2)
1704 # create objects in the bucket
1705 number_of_objects
= 10
1706 for i
in range(number_of_objects
):
1707 key
= bucket
.new_key(str(i
))
1708 key
.set_contents_from_string('bar')
1710 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1712 # get the create events from the subscription
1713 result
, _
= sub_conf
.get_events()
1714 events
= json
.loads(result
)
1715 original_number_of_events
= len(events
)
1716 for event
in events
['events']:
1717 log
.debug('Event (before ack) id: "' + str(event
['id']) + '"')
1718 keys
= list(bucket
.list())
1719 # TODO: use exact match
1720 verify_events_by_elements(events
, keys
, exact_match
=False)
1721 # ack half of the events
1722 events_to_ack
= number_of_objects
/2
1723 for event
in events
['events']:
1724 if events_to_ack
== 0:
1726 _
, status
= sub_conf
.ack_events(event
['id'])
1727 assert_equal(status
/100, 2)
1730 # verify that acked events are gone
1731 result
, _
= sub_conf
.get_events()
1732 events
= json
.loads(result
)
1733 for event
in events
['events']:
1734 log
.debug('Event (after ack) id: "' + str(event
['id']) + '"')
1735 assert len(events
) >= (original_number_of_events
- number_of_objects
/2)
1738 sub_conf
.del_config()
1739 notification_conf
.del_config()
1740 topic_conf
.del_config()
1741 for key
in bucket
.list():
1743 master_zone
.delete_bucket(bucket_name
)
1746 def test_ps_creation_triggers():
1747 """ test object creation notifications in using put/copy/post """
1748 master_zone
, ps_zone
= init_env()
1749 bucket_name
= gen_bucket_name()
1750 topic_name
= bucket_name
+TOPIC_SUFFIX
1753 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1754 topic_conf
.set_config()
1755 # create bucket on the first of the rados zones
1756 bucket
= master_zone
.create_bucket(bucket_name
)
1758 zone_meta_checkpoint(ps_zone
.zone
)
1759 # create notifications
1760 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1762 _
, status
= notification_conf
.set_config()
1763 assert_equal(status
/100, 2)
1764 # create subscription
1765 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
1767 _
, status
= sub_conf
.set_config()
1768 assert_equal(status
/100, 2)
1769 # create objects in the bucket using PUT
1770 key
= bucket
.new_key('put')
1771 key
.set_contents_from_string('bar')
1772 # create objects in the bucket using COPY
1773 bucket
.copy_key('copy', bucket
.name
, key
.name
)
1775 # create objects in the bucket using multi-part upload
1776 fp
= tempfile
.NamedTemporaryFile(mode
='w+b')
1778 content
= bytearray(os
.urandom(object_size
))
1782 uploader
= bucket
.initiate_multipart_upload('multipart')
1783 uploader
.upload_part_from_file(fp
, 1)
1784 uploader
.complete_upload()
1788 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1790 # get the create events from the subscription
1791 result
, _
= sub_conf
.get_events()
1792 events
= json
.loads(result
)
1793 for event
in events
['events']:
1794 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1796 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
1797 assert len(events
['events']) >= 3
1799 sub_conf
.del_config()
1800 notification_conf
.del_config()
1801 topic_conf
.del_config()
1802 for key
in bucket
.list():
1804 master_zone
.delete_bucket(bucket_name
)
1807 def test_ps_versioned_deletion():
1808 """ test notification of deletion markers """
1809 master_zone
, ps_zone
= init_env()
1810 bucket_name
= gen_bucket_name()
1811 topic_name
= bucket_name
+TOPIC_SUFFIX
1814 topic_conf1
= PSTopic(ps_zone
.conn
, topic_name
+'_1')
1815 _
, status
= topic_conf1
.set_config()
1816 assert_equal(status
/100, 2)
1817 topic_conf2
= PSTopic(ps_zone
.conn
, topic_name
+'_2')
1818 _
, status
= topic_conf2
.set_config()
1819 assert_equal(status
/100, 2)
1821 # create bucket on the first of the rados zones
1822 bucket
= master_zone
.create_bucket(bucket_name
)
1823 bucket
.configure_versioning(True)
1826 zone_meta_checkpoint(ps_zone
.zone
)
1828 # create notifications
1829 event_type1
= 'OBJECT_DELETE'
1830 notification_conf1
= PSNotification(ps_zone
.conn
, bucket_name
,
1833 _
, status
= notification_conf1
.set_config()
1834 assert_equal(status
/100, 2)
1835 event_type2
= 'DELETE_MARKER_CREATE'
1836 notification_conf2
= PSNotification(ps_zone
.conn
, bucket_name
,
1839 _
, status
= notification_conf2
.set_config()
1840 assert_equal(status
/100, 2)
1842 # create subscriptions
1843 sub_conf1
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_1',
1845 _
, status
= sub_conf1
.set_config()
1846 assert_equal(status
/100, 2)
1847 sub_conf2
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
+'_2',
1849 _
, status
= sub_conf2
.set_config()
1850 assert_equal(status
/100, 2)
1852 # create objects in the bucket
1853 key
= bucket
.new_key('foo')
1854 key
.set_contents_from_string('bar')
1856 key
.set_contents_from_string('kaboom')
1858 # create deletion marker
1859 delete_marker_key
= bucket
.delete_key(key
.name
)
1862 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1864 # delete the deletion marker
1865 delete_marker_key
.delete()
1867 bucket
.delete_key(key
.name
, version_id
=v2
)
1868 bucket
.delete_key(key
.name
, version_id
=v1
)
1871 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1873 # get the delete events from the subscription
1874 result
, _
= sub_conf1
.get_events()
1875 events
= json
.loads(result
)
1876 for event
in events
['events']:
1877 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1878 assert_equal(str(event
['event']), event_type1
)
1880 result
, _
= sub_conf2
.get_events()
1881 events
= json
.loads(result
)
1882 for event
in events
['events']:
1883 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1884 assert_equal(str(event
['event']), event_type2
)
1887 # follwing is needed for the cleanup in the case of 3-zones
1888 # see: http://tracker.ceph.com/issues/39142
1890 zonegroup
= realm
.master_zonegroup()
1891 zonegroup_conns
= ZonegroupConns(zonegroup
)
1893 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
1894 master_zone
.delete_bucket(bucket_name
)
1896 log
.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
1897 sub_conf1
.del_config()
1898 sub_conf2
.del_config()
1899 notification_conf1
.del_config()
1900 notification_conf2
.del_config()
1901 topic_conf1
.del_config()
1902 topic_conf2
.del_config()
1905 def test_ps_push_http():
1906 """ test pushing to http endpoint """
1908 return SkipTest("PubSub push tests don't run in teuthology")
1909 master_zone
, ps_zone
= init_env()
1910 bucket_name
= gen_bucket_name()
1911 topic_name
= bucket_name
+TOPIC_SUFFIX
1913 # create random port for the http server
1915 port
= random
.randint(10000, 20000)
1916 # start an http server in a separate thread
1917 http_server
= StreamingHTTPServer(host
, port
)
1920 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
1921 _
, status
= topic_conf
.set_config()
1922 assert_equal(status
/100, 2)
1923 # create bucket on the first of the rados zones
1924 bucket
= master_zone
.create_bucket(bucket_name
)
1926 zone_meta_checkpoint(ps_zone
.zone
)
1927 # create notifications
1928 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
1930 _
, status
= notification_conf
.set_config()
1931 assert_equal(status
/100, 2)
1932 # create subscription
1933 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
1934 topic_name
, endpoint
='http://'+host
+':'+str(port
))
1935 _
, status
= sub_conf
.set_config()
1936 assert_equal(status
/100, 2)
1937 # create objects in the bucket
1938 number_of_objects
= 10
1939 for i
in range(number_of_objects
):
1940 key
= bucket
.new_key(str(i
))
1941 key
.set_contents_from_string('bar')
1943 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1945 keys
= list(bucket
.list())
1946 # TODO: use exact match
1947 http_server
.verify_events(keys
, exact_match
=False)
1949 # delete objects from the bucket
1950 for key
in bucket
.list():
1953 zone_meta_checkpoint(ps_zone
.zone
)
1954 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
1956 # TODO: use exact match
1957 http_server
.verify_events(keys
, deletions
=True, exact_match
=False)
1960 sub_conf
.del_config()
1961 notification_conf
.del_config()
1962 topic_conf
.del_config()
1963 master_zone
.delete_bucket(bucket_name
)
1967 def test_ps_s3_push_http():
1968 """ test pushing to http endpoint s3 record format"""
1970 return SkipTest("PubSub push tests don't run in teuthology")
1971 master_zone
, ps_zone
= init_env()
1972 bucket_name
= gen_bucket_name()
1973 topic_name
= bucket_name
+TOPIC_SUFFIX
1975 # create random port for the http server
1977 port
= random
.randint(10000, 20000)
1978 # start an http server in a separate thread
1979 http_server
= StreamingHTTPServer(host
, port
)
1982 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
1983 endpoint
='http://'+host
+':'+str(port
))
1984 result
, status
= topic_conf
.set_config()
1985 assert_equal(status
/100, 2)
1986 parsed_result
= json
.loads(result
)
1987 topic_arn
= parsed_result
['arn']
1988 # create bucket on the first of the rados zones
1989 bucket
= master_zone
.create_bucket(bucket_name
)
1991 zone_meta_checkpoint(ps_zone
.zone
)
1992 # create s3 notification
1993 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1994 topic_conf_list
= [{'Id': notification_name
,
1995 'TopicArn': topic_arn
,
1996 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
1998 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
1999 _
, status
= s3_notification_conf
.set_config()
2000 assert_equal(status
/100, 2)
2001 # create objects in the bucket
2002 number_of_objects
= 10
2003 for i
in range(number_of_objects
):
2004 key
= bucket
.new_key(str(i
))
2005 key
.set_contents_from_string('bar')
2007 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2009 keys
= list(bucket
.list())
2010 # TODO: use exact match
2011 http_server
.verify_s3_events(keys
, exact_match
=False)
2013 # delete objects from the bucket
2014 for key
in bucket
.list():
2017 zone_meta_checkpoint(ps_zone
.zone
)
2018 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2020 # TODO: use exact match
2021 http_server
.verify_s3_events(keys
, deletions
=True, exact_match
=False)
2024 s3_notification_conf
.del_config()
2025 topic_conf
.del_config()
2026 master_zone
.delete_bucket(bucket_name
)
2030 def test_ps_push_amqp():
2031 """ test pushing to amqp endpoint """
2033 return SkipTest("PubSub push tests don't run in teuthology")
2035 proc
= init_rabbitmq()
2037 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2038 master_zone
, ps_zone
= init_env()
2039 bucket_name
= gen_bucket_name()
2040 topic_name
= bucket_name
+TOPIC_SUFFIX
2044 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2046 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2047 _
, status
= topic_conf
.set_config()
2048 assert_equal(status
/100, 2)
2049 # create bucket on the first of the rados zones
2050 bucket
= master_zone
.create_bucket(bucket_name
)
2052 zone_meta_checkpoint(ps_zone
.zone
)
2053 # create notifications
2054 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2056 _
, status
= notification_conf
.set_config()
2057 assert_equal(status
/100, 2)
2058 # create subscription
2059 sub_conf
= PSSubscription(ps_zone
.conn
, bucket_name
+SUB_SUFFIX
,
2060 topic_name
, endpoint
='amqp://'+hostname
,
2061 endpoint_args
='amqp-exchange='+exchange
+'&amqp-ack-level=broker')
2062 _
, status
= sub_conf
.set_config()
2063 assert_equal(status
/100, 2)
2064 # create objects in the bucket
2065 number_of_objects
= 10
2066 for i
in range(number_of_objects
):
2067 key
= bucket
.new_key(str(i
))
2068 key
.set_contents_from_string('bar')
2070 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2071 # check amqp receiver
2072 keys
= list(bucket
.list())
2073 # TODO: use exact match
2074 receiver
.verify_events(keys
, exact_match
=False)
2076 # delete objects from the bucket
2077 for key
in bucket
.list():
2080 zone_meta_checkpoint(ps_zone
.zone
)
2081 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2082 # check amqp receiver
2083 # TODO: use exact match
2084 receiver
.verify_events(keys
, deletions
=True, exact_match
=False)
2087 stop_amqp_receiver(receiver
, task
)
2088 sub_conf
.del_config()
2089 notification_conf
.del_config()
2090 topic_conf
.del_config()
2091 master_zone
.delete_bucket(bucket_name
)
2092 clean_rabbitmq(proc
)
2095 def test_ps_s3_push_amqp():
2096 """ test pushing to amqp endpoint s3 record format"""
2098 return SkipTest("PubSub push tests don't run in teuthology")
2100 proc
= init_rabbitmq()
2102 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2103 master_zone
, ps_zone
= init_env()
2104 bucket_name
= gen_bucket_name()
2105 topic_name
= bucket_name
+TOPIC_SUFFIX
2109 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2111 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
2112 endpoint
='amqp://' + hostname
,
2113 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
2114 result
, status
= topic_conf
.set_config()
2115 assert_equal(status
/100, 2)
2116 parsed_result
= json
.loads(result
)
2117 topic_arn
= parsed_result
['arn']
2118 # create bucket on the first of the rados zones
2119 bucket
= master_zone
.create_bucket(bucket_name
)
2121 zone_meta_checkpoint(ps_zone
.zone
)
2122 # create s3 notification
2123 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2124 topic_conf_list
= [{'Id': notification_name
,
2125 'TopicArn': topic_arn
,
2126 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
2128 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
2129 _
, status
= s3_notification_conf
.set_config()
2130 assert_equal(status
/100, 2)
2131 # create objects in the bucket
2132 number_of_objects
= 10
2133 for i
in range(number_of_objects
):
2134 key
= bucket
.new_key(str(i
))
2135 key
.set_contents_from_string('bar')
2137 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2138 # check amqp receiver
2139 keys
= list(bucket
.list())
2140 # TODO: use exact match
2141 receiver
.verify_s3_events(keys
, exact_match
=False)
2143 # delete objects from the bucket
2144 for key
in bucket
.list():
2147 zone_meta_checkpoint(ps_zone
.zone
)
2148 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2149 # check amqp receiver
2150 # TODO: use exact match
2151 receiver
.verify_s3_events(keys
, deletions
=True, exact_match
=False)
2154 stop_amqp_receiver(receiver
, task
)
2155 s3_notification_conf
.del_config()
2156 topic_conf
.del_config()
2157 master_zone
.delete_bucket(bucket_name
)
2158 clean_rabbitmq(proc
)
2161 def test_ps_delete_bucket():
2162 """ test notification status upon bucket deletion """
2163 master_zone
, ps_zone
= init_env()
2164 bucket_name
= gen_bucket_name()
2165 # create bucket on the first of the rados zones
2166 bucket
= master_zone
.create_bucket(bucket_name
)
2168 zone_meta_checkpoint(ps_zone
.zone
)
2169 topic_name
= bucket_name
+ TOPIC_SUFFIX
2171 topic_name
= bucket_name
+ TOPIC_SUFFIX
2172 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
)
2173 response
, status
= topic_conf
.set_config()
2174 assert_equal(status
/100, 2)
2175 parsed_result
= json
.loads(response
)
2176 topic_arn
= parsed_result
['arn']
2177 # create one s3 notification
2178 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2179 topic_conf_list
= [{'Id': notification_name
,
2180 'TopicArn': topic_arn
,
2181 'Events': ['s3:ObjectCreated:*']
2183 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
2184 response
, status
= s3_notification_conf
.set_config()
2185 assert_equal(status
/100, 2)
2187 # create non-s3 notification
2188 notification_conf
= PSNotification(ps_zone
.conn
, bucket_name
,
2190 _
, status
= notification_conf
.set_config()
2191 assert_equal(status
/100, 2)
2193 # create objects in the bucket
2194 number_of_objects
= 10
2195 for i
in range(number_of_objects
):
2196 key
= bucket
.new_key(str(i
))
2197 key
.set_contents_from_string('bar')
2198 # wait for bucket sync
2199 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2200 keys
= list(bucket
.list())
2201 # delete objects from the bucket
2202 for key
in bucket
.list():
2204 # wait for bucket sync
2205 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2207 master_zone
.delete_bucket(bucket_name
)
2208 # wait for meta sync
2209 zone_meta_checkpoint(ps_zone
.zone
)
2211 # get the events from the auto-generated subscription
2212 sub_conf
= PSSubscription(ps_zone
.conn
, notification_name
,
2214 result
, _
= sub_conf
.get_events()
2215 records
= json
.loads(result
)
2216 # TODO: use exact match
2217 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
2219 # s3 notification is deleted with bucket
2220 _
, status
= s3_notification_conf
.get_config(notification
=notification_name
)
2221 assert_equal(status
, 404)
2222 # non-s3 notification is deleted with bucket
2223 _
, status
= notification_conf
.get_config()
2224 assert_equal(status
, 404)
2226 sub_conf
.del_config()
2227 topic_conf
.del_config()
2230 def test_ps_missing_topic():
2231 """ test creating a subscription when no topic info exists"""
2232 master_zone
, ps_zone
= init_env()
2233 bucket_name
= gen_bucket_name()
2234 topic_name
= bucket_name
+TOPIC_SUFFIX
2236 # create bucket on the first of the rados zones
2237 master_zone
.create_bucket(bucket_name
)
2239 zone_meta_checkpoint(ps_zone
.zone
)
2240 # create s3 notification
2241 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2242 topic_arn
= 'arn:aws:sns:::' + topic_name
2243 topic_conf_list
= [{'Id': notification_name
,
2244 'TopicArn': topic_arn
,
2245 'Events': ['s3:ObjectCreated:*']
2247 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
2249 s3_notification_conf
.set_config()
2251 log
.info('missing topic is expected')
2253 assert 'missing topic is expected'
2256 master_zone
.delete_bucket(bucket_name
)
2259 def test_ps_s3_topic_update():
2260 """ test updating topic associated with a notification"""
2262 return SkipTest("PubSub push tests don't run in teuthology")
2263 rabbit_proc
= init_rabbitmq()
2264 if rabbit_proc
is None:
2265 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2266 master_zone
, ps_zone
= init_env()
2267 bucket_name
= gen_bucket_name()
2268 topic_name
= bucket_name
+TOPIC_SUFFIX
2273 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2275 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
2276 endpoint
='amqp://' + hostname
,
2277 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
2278 result
, status
= topic_conf
.set_config()
2279 assert_equal(status
/100, 2)
2280 parsed_result
= json
.loads(result
)
2281 topic_arn
= parsed_result
['arn']
2283 result
, _
= topic_conf
.get_config()
2284 # verify topic content
2285 parsed_result
= json
.loads(result
)
2286 assert_equal(parsed_result
['topic']['name'], topic_name
)
2287 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], topic_conf
.parameters
['push-endpoint'])
2289 # create http server
2290 port
= random
.randint(10000, 20000)
2291 # start an http server in a separate thread
2292 http_server
= StreamingHTTPServer(hostname
, port
)
2294 # create bucket on the first of the rados zones
2295 bucket
= master_zone
.create_bucket(bucket_name
)
2297 zone_meta_checkpoint(ps_zone
.zone
)
2298 # create s3 notification
2299 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2300 topic_conf_list
= [{'Id': notification_name
,
2301 'TopicArn': topic_arn
,
2302 'Events': ['s3:ObjectCreated:*']
2304 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
2305 _
, status
= s3_notification_conf
.set_config()
2306 assert_equal(status
/100, 2)
2307 # create objects in the bucket
2308 number_of_objects
= 10
2309 for i
in range(number_of_objects
):
2310 key
= bucket
.new_key(str(i
))
2311 key
.set_contents_from_string('bar')
2313 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2315 keys
= list(bucket
.list())
2316 # TODO: use exact match
2317 receiver
.verify_s3_events(keys
, exact_match
=False)
2319 # update the same topic with new endpoint
2320 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
,
2321 endpoint
='http://'+ hostname
+ ':' + str(port
))
2322 _
, status
= topic_conf
.set_config()
2323 assert_equal(status
/100, 2)
2325 result
, _
= topic_conf
.get_config()
2326 # verify topic content
2327 parsed_result
= json
.loads(result
)
2328 assert_equal(parsed_result
['topic']['name'], topic_name
)
2329 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], topic_conf
.parameters
['push-endpoint'])
2331 # delete current objects and create new objects in the bucket
2332 for key
in bucket
.list():
2334 for i
in range(number_of_objects
):
2335 key
= bucket
.new_key(str(i
+100))
2336 key
.set_contents_from_string('bar')
2338 zone_meta_checkpoint(ps_zone
.zone
)
2339 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2341 keys
= list(bucket
.list())
2342 # verify that notifications are still sent to amqp
2343 # TODO: use exact match
2344 receiver
.verify_s3_events(keys
, exact_match
=False)
2346 # update notification to update the endpoint from the topic
2347 topic_conf_list
= [{'Id': notification_name
,
2348 'TopicArn': topic_arn
,
2349 'Events': ['s3:ObjectCreated:*']
2351 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
2352 _
, status
= s3_notification_conf
.set_config()
2353 assert_equal(status
/100, 2)
2355 # delete current objects and create new objects in the bucket
2356 for key
in bucket
.list():
2358 for i
in range(number_of_objects
):
2359 key
= bucket
.new_key(str(i
+200))
2360 key
.set_contents_from_string('bar')
2362 zone_meta_checkpoint(ps_zone
.zone
)
2363 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2365 keys
= list(bucket
.list())
2366 # check that updates switched to http
2367 # TODO: use exact match
2368 http_server
.verify_s3_events(keys
, exact_match
=False)
2371 # delete objects from the bucket
2372 stop_amqp_receiver(receiver
, amqp_task
)
2373 for key
in bucket
.list():
2375 s3_notification_conf
.del_config()
2376 topic_conf
.del_config()
2377 master_zone
.delete_bucket(bucket_name
)
2379 clean_rabbitmq(rabbit_proc
)
2382 def test_ps_s3_notification_update():
2383 """ test updating the topic of a notification"""
2385 return SkipTest("PubSub push tests don't run in teuthology")
2387 rabbit_proc
= init_rabbitmq()
2388 if rabbit_proc
is None:
2389 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2391 master_zone
, ps_zone
= init_env()
2392 bucket_name
= gen_bucket_name()
2393 topic_name1
= bucket_name
+'amqp'+TOPIC_SUFFIX
2394 topic_name2
= bucket_name
+'http'+TOPIC_SUFFIX
2397 # start amqp receiver in a separate thread
2399 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name1
)
2401 # create random port for the http server
2402 http_port
= random
.randint(10000, 20000)
2403 # start an http server in a separate thread
2404 http_server
= StreamingHTTPServer(hostname
, http_port
)
2406 topic_conf1
= PSTopic(ps_zone
.conn
, topic_name1
,
2407 endpoint
='amqp://' + hostname
,
2408 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
2409 result
, status
= topic_conf1
.set_config()
2410 parsed_result
= json
.loads(result
)
2411 topic_arn1
= parsed_result
['arn']
2412 assert_equal(status
/100, 2)
2413 topic_conf2
= PSTopic(ps_zone
.conn
, topic_name2
,
2414 endpoint
='http://'+hostname
+':'+str(http_port
))
2415 result
, status
= topic_conf2
.set_config()
2416 parsed_result
= json
.loads(result
)
2417 topic_arn2
= parsed_result
['arn']
2418 assert_equal(status
/100, 2)
2420 # create bucket on the first of the rados zones
2421 bucket
= master_zone
.create_bucket(bucket_name
)
2423 zone_meta_checkpoint(ps_zone
.zone
)
2424 # create s3 notification with topic1
2425 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2426 topic_conf_list
= [{'Id': notification_name
,
2427 'TopicArn': topic_arn1
,
2428 'Events': ['s3:ObjectCreated:*']
2430 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
2431 _
, status
= s3_notification_conf
.set_config()
2432 assert_equal(status
/100, 2)
2433 # create objects in the bucket
2434 number_of_objects
= 10
2435 for i
in range(number_of_objects
):
2436 key
= bucket
.new_key(str(i
))
2437 key
.set_contents_from_string('bar')
2439 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2441 keys
= list(bucket
.list())
2442 # TODO: use exact match
2443 receiver
.verify_s3_events(keys
, exact_match
=False);
2445 # update notification to use topic2
2446 topic_conf_list
= [{'Id': notification_name
,
2447 'TopicArn': topic_arn2
,
2448 'Events': ['s3:ObjectCreated:*']
2450 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
2451 _
, status
= s3_notification_conf
.set_config()
2452 assert_equal(status
/100, 2)
2454 # delete current objects and create new objects in the bucket
2455 for key
in bucket
.list():
2457 for i
in range(number_of_objects
):
2458 key
= bucket
.new_key(str(i
+100))
2459 key
.set_contents_from_string('bar')
2461 zone_meta_checkpoint(ps_zone
.zone
)
2462 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2464 keys
= list(bucket
.list())
2465 # check that updates switched to http
2466 # TODO: use exact match
2467 http_server
.verify_s3_events(keys
, exact_match
=False)
2470 # delete objects from the bucket
2471 stop_amqp_receiver(receiver
, amqp_task
)
2472 for key
in bucket
.list():
2474 s3_notification_conf
.del_config()
2475 topic_conf1
.del_config()
2476 topic_conf2
.del_config()
2477 master_zone
.delete_bucket(bucket_name
)
2479 clean_rabbitmq(rabbit_proc
)
2482 def test_ps_s3_multiple_topics_notification():
2483 """ test notification creation with multiple topics"""
2485 return SkipTest("PubSub push tests don't run in teuthology")
2487 rabbit_proc
= init_rabbitmq()
2488 if rabbit_proc
is None:
2489 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2491 master_zone
, ps_zone
= init_env()
2492 bucket_name
= gen_bucket_name()
2493 topic_name1
= bucket_name
+'amqp'+TOPIC_SUFFIX
2494 topic_name2
= bucket_name
+'http'+TOPIC_SUFFIX
2497 # start amqp receiver in a separate thread
2499 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name1
)
2501 # create random port for the http server
2502 http_port
= random
.randint(10000, 20000)
2503 # start an http server in a separate thread
2504 http_server
= StreamingHTTPServer(hostname
, http_port
)
2506 topic_conf1
= PSTopic(ps_zone
.conn
, topic_name1
,
2507 endpoint
='amqp://' + hostname
,
2508 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
2509 result
, status
= topic_conf1
.set_config()
2510 parsed_result
= json
.loads(result
)
2511 topic_arn1
= parsed_result
['arn']
2512 assert_equal(status
/100, 2)
2513 topic_conf2
= PSTopic(ps_zone
.conn
, topic_name2
,
2514 endpoint
='http://'+hostname
+':'+str(http_port
))
2515 result
, status
= topic_conf2
.set_config()
2516 parsed_result
= json
.loads(result
)
2517 topic_arn2
= parsed_result
['arn']
2518 assert_equal(status
/100, 2)
2520 # create bucket on the first of the rados zones
2521 bucket
= master_zone
.create_bucket(bucket_name
)
2523 zone_meta_checkpoint(ps_zone
.zone
)
2524 # create s3 notification
2525 notification_name1
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_1'
2526 notification_name2
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_2'
2529 'Id': notification_name1
,
2530 'TopicArn': topic_arn1
,
2531 'Events': ['s3:ObjectCreated:*']
2534 'Id': notification_name2
,
2535 'TopicArn': topic_arn2
,
2536 'Events': ['s3:ObjectCreated:*']
2538 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
2539 _
, status
= s3_notification_conf
.set_config()
2540 assert_equal(status
/100, 2)
2541 result
, _
= s3_notification_conf
.get_config()
2542 assert_equal(len(result
['TopicConfigurations']), 2)
2543 assert_equal(result
['TopicConfigurations'][0]['Id'], notification_name1
)
2544 assert_equal(result
['TopicConfigurations'][1]['Id'], notification_name2
)
2546 # get auto-generated subscriptions
2547 sub_conf1
= PSSubscription(ps_zone
.conn
, notification_name1
,
2549 _
, status
= sub_conf1
.get_config()
2550 assert_equal(status
/100, 2)
2551 sub_conf2
= PSSubscription(ps_zone
.conn
, notification_name2
,
2553 _
, status
= sub_conf2
.get_config()
2554 assert_equal(status
/100, 2)
2556 # create objects in the bucket
2557 number_of_objects
= 10
2558 for i
in range(number_of_objects
):
2559 key
= bucket
.new_key(str(i
))
2560 key
.set_contents_from_string('bar')
2562 zone_bucket_checkpoint(ps_zone
.zone
, master_zone
.zone
, bucket_name
)
2564 # get the events from both of the subscription
2565 result
, _
= sub_conf1
.get_events()
2566 records
= json
.loads(result
)
2567 for record
in records
['Records']:
2569 keys
= list(bucket
.list())
2570 # TODO: use exact match
2571 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
2572 receiver
.verify_s3_events(keys
, exact_match
=False)
2574 result
, _
= sub_conf2
.get_events()
2575 parsed_result
= json
.loads(result
)
2576 for record
in parsed_result
['Records']:
2578 keys
= list(bucket
.list())
2579 # TODO: use exact match
2580 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
2581 http_server
.verify_s3_events(keys
, exact_match
=False)
2584 stop_amqp_receiver(receiver
, amqp_task
)
2585 s3_notification_conf
.del_config()
2586 topic_conf1
.del_config()
2587 topic_conf2
.del_config()
2588 # delete objects from the bucket
2589 for key
in bucket
.list():
2591 master_zone
.delete_bucket(bucket_name
)
2593 clean_rabbitmq(rabbit_proc
)