12 from .tests
import get_realm
, \
14 zonegroup_meta_checkpoint
, \
15 zone_meta_checkpoint
, \
16 zone_bucket_checkpoint
, \
17 zone_data_checkpoint
, \
18 zonegroup_bucket_checkpoint
, \
23 from .zone_ps
import PSTopic
, PSTopicS3
, PSNotification
, PSSubscription
, PSNotificationS3
, print_connection_info
, delete_all_s3_topics
24 from multisite
import User
25 from nose
import SkipTest
26 from nose
.tools
import assert_not_equal
, assert_equal
28 # configure logging for the tests module
29 log
= logging
.getLogger(__name__
)
31 skip_push_tests
= True
33 ####################################
34 # utility functions for pubsub tests
35 ####################################
37 def set_contents_from_string(key
, content
):
39 key
.set_contents_from_string(content
)
40 except Exception as e
:
41 print 'Error: ' + str(e
)
44 # HTTP endpoint functions
45 # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
47 class HTTPPostHandler(BaseHTTPServer
.BaseHTTPRequestHandler
):
48 """HTTP POST hanler class storing the received events in its http server"""
50 """implementation of POST handler"""
52 content_length
= int(self
.headers
['Content-Length'])
53 body
= self
.rfile
.read(content_length
)
54 log
.info('HTTP Server (%d) received event: %s', self
.server
.worker_id
, str(body
))
55 self
.server
.append(json
.loads(body
))
57 log
.error('HTTP Server received empty event')
58 self
.send_response(400)
60 self
.send_response(100)
65 class HTTPServerWithEvents(BaseHTTPServer
.HTTPServer
):
66 """HTTP server used by the handler to store events"""
67 def __init__(self
, addr
, handler
, worker_id
):
68 BaseHTTPServer
.HTTPServer
.__init
__(self
, addr
, handler
, False)
69 self
.worker_id
= worker_id
72 def append(self
, event
):
73 self
.events
.append(event
)
76 class HTTPServerThread(threading
.Thread
):
77 """thread for running the HTTP server. reusing the same socket for all threads"""
78 def __init__(self
, i
, sock
, addr
):
79 threading
.Thread
.__init
__(self
)
82 self
.httpd
= HTTPServerWithEvents(addr
, HTTPPostHandler
, i
)
83 self
.httpd
.socket
= sock
84 # prevent the HTTP server from re-binding every handler
85 self
.httpd
.server_bind
= self
.server_close
= lambda self
: None
90 log
.info('HTTP Server (%d) started on: %s', self
.i
, self
.httpd
.server_address
)
91 self
.httpd
.serve_forever()
92 log
.info('HTTP Server (%d) ended', self
.i
)
93 except Exception as error
:
94 # could happen if the server r/w to a closing socket during shutdown
95 log
.info('HTTP Server (%d) ended unexpectedly: %s', self
.i
, str(error
))
100 def get_events(self
):
101 return self
.httpd
.events
103 def reset_events(self
):
104 self
.httpd
.events
= []
107 class StreamingHTTPServer
:
108 """multi-threaded http server class also holding list of events received into the handler
109 each thread has its own server, and all servers share the same socket"""
110 def __init__(self
, host
, port
, num_workers
=100):
112 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
113 self
.sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
115 self
.sock
.listen(num_workers
)
116 self
.workers
= [HTTPServerThread(i
, self
.sock
, addr
) for i
in range(num_workers
)]
118 def verify_s3_events(self
, keys
, exact_match
=False, deletions
=False):
119 """verify stored s3 records agains a list of keys"""
121 for worker
in self
.workers
:
122 events
+= worker
.get_events()
123 worker
.reset_events()
124 verify_s3_records_by_elements(events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
126 def verify_events(self
, keys
, exact_match
=False, deletions
=False):
127 """verify stored events agains a list of keys"""
129 for worker
in self
.workers
:
130 events
+= worker
.get_events()
131 worker
.reset_events()
132 verify_events_by_elements(events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
135 """close all workers in the http server and wait for it to finish"""
136 # make sure that the shared socket is closed
137 # this is needed in case that one of the threads is blocked on the socket
138 self
.sock
.shutdown(socket
.SHUT_RDWR
)
140 # wait for server threads to finish
141 for worker
in self
.workers
:
146 # AMQP endpoint functions
150 class AMQPReceiver(object):
151 """class for receiving and storing messages on a topic from the AMQP broker"""
152 def __init__(self
, exchange
, topic
):
155 remaining_retries
= 10
156 while remaining_retries
> 0:
158 connection
= pika
.BlockingConnection(pika
.ConnectionParameters(host
=hostname
, port
=rabbitmq_port
))
160 except Exception as error
:
161 remaining_retries
-= 1
162 print 'failed to connect to rabbitmq (remaining retries ' + str(remaining_retries
) + '): ' + str(error
)
165 if remaining_retries
== 0:
166 raise Exception('failed to connect to rabbitmq - no retries left')
168 self
.channel
= connection
.channel()
169 self
.channel
.exchange_declare(exchange
=exchange
, exchange_type
='topic', durable
=True)
170 result
= self
.channel
.queue_declare('', exclusive
=True)
171 queue_name
= result
.method
.queue
172 self
.channel
.queue_bind(exchange
=exchange
, queue
=queue_name
, routing_key
=topic
)
173 self
.channel
.basic_consume(queue
=queue_name
,
174 on_message_callback
=self
.on_message
,
179 def on_message(self
, ch
, method
, properties
, body
):
180 """callback invoked when a new message arrive on the topic"""
181 log
.info('AMQP received event for topic %s:\n %s', self
.topic
, body
)
182 self
.events
.append(json
.loads(body
))
184 # TODO create a base class for the AMQP and HTTP cases
185 def verify_s3_events(self
, keys
, exact_match
=False, deletions
=False):
186 """verify stored s3 records agains a list of keys"""
187 verify_s3_records_by_elements(self
.events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
190 def verify_events(self
, keys
, exact_match
=False, deletions
=False):
191 """verify stored events agains a list of keys"""
192 verify_events_by_elements(self
.events
, keys
, exact_match
=exact_match
, deletions
=deletions
)
195 def get_and_reset_events(self
):
201 def amqp_receiver_thread_runner(receiver
):
202 """main thread function for the amqp receiver"""
204 log
.info('AMQP receiver started')
205 receiver
.channel
.start_consuming()
206 log
.info('AMQP receiver ended')
207 except Exception as error
:
208 log
.info('AMQP receiver ended unexpectedly: %s', str(error
))
211 def create_amqp_receiver_thread(exchange
, topic
):
212 """create amqp receiver and thread"""
213 receiver
= AMQPReceiver(exchange
, topic
)
214 task
= threading
.Thread(target
=amqp_receiver_thread_runner
, args
=(receiver
,))
216 return task
, receiver
219 def stop_amqp_receiver(receiver
, task
):
220 """stop the receiver thread and wait for it to finis"""
222 receiver
.channel
.stop_consuming()
223 log
.info('stopping AMQP receiver')
224 except Exception as error
:
225 log
.info('failed to gracefuly stop AMQP receiver: %s', str(error
))
228 def check_ps_configured():
229 """check if at least one pubsub zone exist"""
231 zonegroup
= realm
.master_zonegroup()
233 ps_zones
= zonegroup
.zones_by_type
.get("pubsub")
235 raise SkipTest("Requires at least one PS zone")
238 def is_ps_zone(zone_conn
):
239 """check if a specific zone is pubsub zone"""
242 return zone_conn
.zone
.tier_type() == "pubsub"
245 def verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=False):
246 """ verify there is at least one event per element """
250 if type(events
) is list:
251 for event_list
in events
:
254 for event
in event_list
['events']:
255 if event
['info']['bucket']['name'] == key
.bucket
.name
and \
256 event
['info']['key']['name'] == key
.name
:
257 if deletions
and event
['event'] == 'OBJECT_DELETE':
260 elif not deletions
and event
['event'] == 'OBJECT_CREATE':
264 for event
in events
['events']:
265 if event
['info']['bucket']['name'] == key
.bucket
.name
and \
266 event
['info']['key']['name'] == key
.name
:
267 if deletions
and event
['event'] == 'OBJECT_DELETE':
270 elif not deletions
and event
['event'] == 'OBJECT_CREATE':
275 err
= 'no ' + ('deletion' if deletions
else 'creation') + ' event found for key: ' + str(key
)
279 if not len(events
) == len(keys
):
280 err
= 'superfluous events are found'
287 def verify_s3_records_by_elements(records
, keys
, exact_match
=False, deletions
=False):
288 """ verify there is at least one record per element """
292 if type(records
) is list:
293 for record_list
in records
:
296 for record
in record_list
['Records']:
297 if record
['s3']['bucket']['name'] == key
.bucket
.name
and \
298 record
['s3']['object']['key'] == key
.name
:
299 if deletions
and 'ObjectRemoved' in record
['eventName']:
302 elif not deletions
and 'ObjectCreated' in record
['eventName']:
306 for record
in records
['Records']:
307 if record
['s3']['bucket']['name'] == key
.bucket
.name
and \
308 record
['s3']['object']['key'] == key
.name
:
309 if deletions
and 'ObjectRemoved' in record
['eventName']:
312 elif not deletions
and 'ObjectCreated' in record
['eventName']:
317 err
= 'no ' + ('deletion' if deletions
else 'creation') + ' event found for key: ' + str(key
)
318 for record_list
in records
:
319 for record
in record_list
['Records']:
320 log
.error(str(record
['s3']['bucket']['name']) + ',' + str(record
['s3']['object']['key']))
323 if not len(records
) == len(keys
):
324 err
= 'superfluous records are found'
327 for record_list
in records
:
328 for record
in record_list
['Records']:
329 log
.error(str(record
['s3']['bucket']['name']) + ',' + str(record
['s3']['object']['key']))
334 """ start a rabbitmq broker """
336 #port = str(random.randint(20000, 30000))
337 #data_dir = './' + port + '_data'
338 #log_dir = './' + port + '_log'
344 # print('rabbitmq directories already exists')
345 #env = {'RABBITMQ_NODE_PORT': port,
346 # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
347 # 'RABBITMQ_USE_LONGNAME': 'true',
348 # 'RABBITMQ_MNESIA_BASE': data_dir,
349 # 'RABBITMQ_LOG_BASE': log_dir}
350 # TODO: support multiple brokers per host using env
351 # make sure we don't collide with the default
353 proc
= subprocess
.Popen('rabbitmq-server')
354 except Exception as error
:
355 log
.info('failed to execute rabbitmq-server: %s', str(error
))
356 print 'failed to execute rabbitmq-server: %s' % str(error
)
358 # TODO add rabbitmq checkpoint instead of sleep
360 return proc
#, port, data_dir, log_dir
363 def clean_rabbitmq(proc
): #, data_dir, log_dir)
364 """ stop the rabbitmq broker """
366 subprocess
.call(['rabbitmqctl', 'stop'])
370 log
.info('rabbitmq server already terminated')
371 # TODO: add directory cleanup once multiple brokers are supported
376 # log.info('rabbitmq directories already removed')
379 def init_env(require_ps
=True):
380 """initialize the environment"""
382 check_ps_configured()
385 zonegroup
= realm
.master_zonegroup()
386 zonegroup_conns
= ZonegroupConns(zonegroup
)
388 zonegroup_meta_checkpoint(zonegroup
)
392 for conn
in zonegroup_conns
.zones
:
394 zone_meta_checkpoint(conn
.zone
)
395 ps_zones
.append(conn
)
396 elif not conn
.zone
.is_read_only():
399 assert_not_equal(len(zones
), 0)
401 assert_not_equal(len(ps_zones
), 0)
402 return zones
, ps_zones
406 """ This method returns the "primary" IP on the local box (the one with a default route)
407 source: https://stackoverflow.com/a/28950776/711085
408 this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
409 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
411 # address should not be reachable
412 s
.connect(('10.255.255.255', 1))
413 ip
= s
.getsockname()[0]
419 TOPIC_SUFFIX
= "_topic"
421 NOTIFICATION_SUFFIX
= "_notif"
428 """ log information for manual testing """
429 return SkipTest("only used in manual testing")
430 zones
, ps_zones
= init_env()
432 zonegroup
= realm
.master_zonegroup()
433 bucket_name
= gen_bucket_name()
434 # create bucket on the first of the rados zones
435 bucket
= zones
[0].create_bucket(bucket_name
)
436 # create objects in the bucket
437 number_of_objects
= 10
438 for i
in range(number_of_objects
):
439 key
= bucket
.new_key(str(i
))
440 key
.set_contents_from_string('bar')
441 print 'Zonegroup: ' + zonegroup
.name
442 print 'user: ' + get_user()
443 print 'tenant: ' + get_tenant()
445 print_connection_info(zones
[0].conn
)
447 print_connection_info(ps_zones
[0].conn
)
448 print 'Bucket: ' + bucket_name
451 def test_ps_s3_notification_low_level():
452 """ test low level implementation of s3 notifications """
453 zones
, ps_zones
= init_env()
454 bucket_name
= gen_bucket_name()
455 # create bucket on the first of the rados zones
456 zones
[0].create_bucket(bucket_name
)
458 zone_meta_checkpoint(ps_zones
[0].zone
)
460 topic_name
= bucket_name
+ TOPIC_SUFFIX
461 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
462 result
, status
= topic_conf
.set_config()
463 assert_equal(status
/100, 2)
464 parsed_result
= json
.loads(result
)
465 topic_arn
= parsed_result
['arn']
466 # create s3 notification
467 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
468 generated_topic_name
= notification_name
+'_'+topic_name
469 topic_conf_list
= [{'Id': notification_name
,
470 'TopicArn': topic_arn
,
471 'Events': ['s3:ObjectCreated:*']
473 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
474 _
, status
= s3_notification_conf
.set_config()
475 assert_equal(status
/100, 2)
476 zone_meta_checkpoint(ps_zones
[0].zone
)
477 # get auto-generated topic
478 generated_topic_conf
= PSTopic(ps_zones
[0].conn
, generated_topic_name
)
479 result
, status
= generated_topic_conf
.get_config()
480 parsed_result
= json
.loads(result
)
481 assert_equal(status
/100, 2)
482 assert_equal(parsed_result
['topic']['name'], generated_topic_name
)
483 # get auto-generated notification
484 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
485 generated_topic_name
)
486 result
, status
= notification_conf
.get_config()
487 parsed_result
= json
.loads(result
)
488 assert_equal(status
/100, 2)
489 assert_equal(len(parsed_result
['topics']), 1)
490 # get auto-generated subscription
491 sub_conf
= PSSubscription(ps_zones
[0].conn
, notification_name
,
492 generated_topic_name
)
493 result
, status
= sub_conf
.get_config()
494 parsed_result
= json
.loads(result
)
495 assert_equal(status
/100, 2)
496 assert_equal(parsed_result
['topic'], generated_topic_name
)
497 # delete s3 notification
498 _
, status
= s3_notification_conf
.del_config(notification
=notification_name
)
499 assert_equal(status
/100, 2)
501 _
, status
= topic_conf
.del_config()
502 assert_equal(status
/100, 2)
504 # verify low-level cleanup
505 _
, status
= generated_topic_conf
.get_config()
506 assert_equal(status
, 404)
507 result
, status
= notification_conf
.get_config()
508 parsed_result
= json
.loads(result
)
509 assert_equal(len(parsed_result
['topics']), 0)
510 # TODO should return 404
511 # assert_equal(status, 404)
512 result
, status
= sub_conf
.get_config()
513 parsed_result
= json
.loads(result
)
514 assert_equal(parsed_result
['topic'], '')
515 # TODO should return 404
516 # assert_equal(status, 404)
519 topic_conf
.del_config()
521 zones
[0].delete_bucket(bucket_name
)
524 def test_ps_s3_notification_records():
525 """ test s3 records fetching """
526 zones
, ps_zones
= init_env()
527 bucket_name
= gen_bucket_name()
528 # create bucket on the first of the rados zones
529 bucket
= zones
[0].create_bucket(bucket_name
)
531 zone_meta_checkpoint(ps_zones
[0].zone
)
533 topic_name
= bucket_name
+ TOPIC_SUFFIX
534 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
535 result
, status
= topic_conf
.set_config()
536 assert_equal(status
/100, 2)
537 parsed_result
= json
.loads(result
)
538 topic_arn
= parsed_result
['arn']
539 # create s3 notification
540 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
541 topic_conf_list
= [{'Id': notification_name
,
542 'TopicArn': topic_arn
,
543 'Events': ['s3:ObjectCreated:*']
545 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
546 _
, status
= s3_notification_conf
.set_config()
547 assert_equal(status
/100, 2)
548 zone_meta_checkpoint(ps_zones
[0].zone
)
549 # get auto-generated subscription
550 sub_conf
= PSSubscription(ps_zones
[0].conn
, notification_name
,
552 _
, status
= sub_conf
.get_config()
553 assert_equal(status
/100, 2)
554 # create objects in the bucket
555 number_of_objects
= 10
556 for i
in range(number_of_objects
):
557 key
= bucket
.new_key(str(i
))
558 key
.set_contents_from_string('bar')
560 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
562 # get the events from the subscription
563 result
, _
= sub_conf
.get_events()
564 records
= json
.loads(result
)
565 for record
in records
['Records']:
567 keys
= list(bucket
.list())
568 # TODO: use exact match
569 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
572 _
, status
= s3_notification_conf
.del_config()
573 topic_conf
.del_config()
575 for key
in bucket
.list():
577 zones
[0].delete_bucket(bucket_name
)
580 def test_ps_s3_notification():
581 """ test s3 notification set/get/delete """
582 zones
, ps_zones
= init_env()
583 bucket_name
= gen_bucket_name()
584 # create bucket on the first of the rados zones
585 zones
[0].create_bucket(bucket_name
)
587 zone_meta_checkpoint(ps_zones
[0].zone
)
588 topic_name
= bucket_name
+ TOPIC_SUFFIX
590 topic_name
= bucket_name
+ TOPIC_SUFFIX
591 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
592 response
, status
= topic_conf
.set_config()
593 assert_equal(status
/100, 2)
594 parsed_result
= json
.loads(response
)
595 topic_arn
= parsed_result
['arn']
596 # create one s3 notification
597 notification_name1
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_1'
598 topic_conf_list
= [{'Id': notification_name1
,
599 'TopicArn': topic_arn
,
600 'Events': ['s3:ObjectCreated:*']
602 s3_notification_conf1
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
603 response
, status
= s3_notification_conf1
.set_config()
604 assert_equal(status
/100, 2)
605 # create another s3 notification with the same topic
606 notification_name2
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_2'
607 topic_conf_list
= [{'Id': notification_name2
,
608 'TopicArn': topic_arn
,
609 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
611 s3_notification_conf2
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
612 response
, status
= s3_notification_conf2
.set_config()
613 assert_equal(status
/100, 2)
614 zone_meta_checkpoint(ps_zones
[0].zone
)
616 # get all notification on a bucket
617 response
, status
= s3_notification_conf1
.get_config()
618 assert_equal(status
/100, 2)
619 assert_equal(len(response
['TopicConfigurations']), 2)
620 assert_equal(response
['TopicConfigurations'][0]['TopicArn'], topic_arn
)
621 assert_equal(response
['TopicConfigurations'][1]['TopicArn'], topic_arn
)
623 # get specific notification on a bucket
624 response
, status
= s3_notification_conf1
.get_config(notification
=notification_name1
)
625 assert_equal(status
/100, 2)
626 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn
)
627 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1
)
628 response
, status
= s3_notification_conf2
.get_config(notification
=notification_name2
)
629 assert_equal(status
/100, 2)
630 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn
)
631 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2
)
633 # delete specific notifications
634 _
, status
= s3_notification_conf1
.del_config(notification
=notification_name1
)
635 assert_equal(status
/100, 2)
636 _
, status
= s3_notification_conf2
.del_config(notification
=notification_name2
)
637 assert_equal(status
/100, 2)
640 topic_conf
.del_config()
642 zones
[0].delete_bucket(bucket_name
)
644 def test_ps_s3_topic_on_master():
645 """ test s3 notification set/get/delete on master """
646 zones
, _
= init_env(require_ps
=False)
648 zonegroup
= realm
.master_zonegroup()
649 bucket_name
= gen_bucket_name()
650 topic_name
= bucket_name
+ TOPIC_SUFFIX
653 delete_all_s3_topics(zones
[0].conn
, zonegroup
.name
)
656 endpoint_address
= 'amqp://127.0.0.1:7001'
657 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
658 topic_conf1
= PSTopicS3(zones
[0].conn
, topic_name
+'_1', zonegroup
.name
, endpoint_args
=endpoint_args
)
659 topic_arn
= topic_conf1
.set_config()
660 assert_equal(topic_arn
,
661 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
+ '_1')
663 endpoint_address
= 'http://127.0.0.1:9001'
664 endpoint_args
= 'push-endpoint='+endpoint_address
665 topic_conf2
= PSTopicS3(zones
[0].conn
, topic_name
+'_2', zonegroup
.name
, endpoint_args
=endpoint_args
)
666 topic_arn
= topic_conf2
.set_config()
667 assert_equal(topic_arn
,
668 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
+ '_2')
669 endpoint_address
= 'http://127.0.0.1:9002'
670 endpoint_args
= 'push-endpoint='+endpoint_address
671 topic_conf3
= PSTopicS3(zones
[0].conn
, topic_name
+'_3', zonegroup
.name
, endpoint_args
=endpoint_args
)
672 topic_arn
= topic_conf3
.set_config()
673 assert_equal(topic_arn
,
674 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
+ '_3')
677 result
, status
= topic_conf3
.get_config()
678 assert_equal(status
, 200)
679 assert_equal(topic_arn
, result
['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
680 assert_equal(endpoint_address
, result
['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
681 # Note that endpoint args may be ordered differently in the result
684 result
= topic_conf1
.del_config()
685 assert_equal(status
, 200)
687 # try to get a deleted topic
688 _
, status
= topic_conf1
.get_config()
689 assert_equal(status
, 404)
691 # get the remaining 2 topics
692 result
= topic_conf1
.get_list()
693 assert_equal(len(result
['Topics']), 2)
696 result
= topic_conf2
.del_config()
697 # TODO: should be 200OK
698 # assert_equal(status, 200)
699 result
= topic_conf3
.del_config()
700 # TODO: should be 200OK
701 # assert_equal(status, 200)
703 # get topic list, make sure it is empty
704 result
= topic_conf1
.get_list()
705 assert_equal(len(result
['Topics']), 0)
708 def test_ps_s3_notification_on_master():
709 """ test s3 notification set/get/delete on master """
710 zones
, _
= init_env(require_ps
=False)
712 zonegroup
= realm
.master_zonegroup()
713 bucket_name
= gen_bucket_name()
715 bucket
= zones
[0].create_bucket(bucket_name
)
716 topic_name
= bucket_name
+ TOPIC_SUFFIX
718 endpoint_address
= 'amqp://127.0.0.1:7001'
719 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
720 topic_conf
= PSTopicS3(zones
[0].conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
721 topic_arn
= topic_conf
.set_config()
722 # create s3 notification
723 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
724 topic_conf_list
= [{'Id': notification_name
+'_1',
725 'TopicArn': topic_arn
,
726 'Events': ['s3:ObjectCreated:*']
728 {'Id': notification_name
+'_2',
729 'TopicArn': topic_arn
,
730 'Events': ['s3:ObjectRemoved:*']
732 {'Id': notification_name
+'_3',
733 'TopicArn': topic_arn
,
736 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
737 _
, status
= s3_notification_conf
.set_config()
738 assert_equal(status
/100, 2)
740 # get notifications on a bucket
741 response
, status
= s3_notification_conf
.get_config(notification
=notification_name
+'_1')
742 assert_equal(status
/100, 2)
743 assert_equal(response
['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn
)
745 # delete specific notifications
746 _
, status
= s3_notification_conf
.del_config(notification
=notification_name
+'_1')
747 assert_equal(status
/100, 2)
749 # get the remaining 2 notifications on a bucket
750 response
, status
= s3_notification_conf
.get_config()
751 assert_equal(status
/100, 2)
752 assert_equal(len(response
['TopicConfigurations']), 2)
753 assert_equal(response
['TopicConfigurations'][0]['TopicArn'], topic_arn
)
754 assert_equal(response
['TopicConfigurations'][1]['TopicArn'], topic_arn
)
756 # delete remaining notifications
757 _
, status
= s3_notification_conf
.del_config()
758 assert_equal(status
/100, 2)
760 # make sure that the notifications are now deleted
761 _
, status
= s3_notification_conf
.get_config()
764 topic_conf
.del_config()
766 zones
[0].delete_bucket(bucket_name
)
769 def ps_s3_notification_filter(on_master
):
770 """ test s3 notification filter on master """
772 return SkipTest("PubSub push tests don't run in teuthology")
774 proc
= init_rabbitmq()
776 return SkipTest('end2end amqp tests require rabbitmq-server installed')
778 zones
, _
= init_env(require_ps
=False)
781 zones
, ps_zones
= init_env(require_ps
=True)
782 ps_zone
= ps_zones
[0]
785 zonegroup
= realm
.master_zonegroup()
788 bucket_name
= gen_bucket_name()
789 bucket
= zones
[0].create_bucket(bucket_name
)
790 topic_name
= bucket_name
+ TOPIC_SUFFIX
792 # start amqp receivers
794 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
798 endpoint_address
= 'amqp://' + hostname
799 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
801 topic_conf
= PSTopicS3(ps_zone
.conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
802 topic_arn
= topic_conf
.set_config()
804 topic_conf
= PSTopic(ps_zone
.conn
, topic_name
, endpoint
=endpoint_address
, endpoint_args
=endpoint_args
)
805 result
, _
= topic_conf
.set_config()
806 parsed_result
= json
.loads(result
)
807 topic_arn
= parsed_result
['arn']
808 zone_meta_checkpoint(ps_zone
.zone
)
810 # create s3 notification
811 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
812 topic_conf_list
= [{'Id': notification_name
+'_1',
813 'TopicArn': topic_arn
,
814 'Events': ['s3:ObjectCreated:*'],
817 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
821 {'Id': notification_name
+'_2',
822 'TopicArn': topic_arn
,
823 'Events': ['s3:ObjectCreated:*'],
826 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
827 {'Name': 'suffix', 'Value': 'log'}]
831 {'Id': notification_name
+'_3',
832 'TopicArn': topic_arn
,
836 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
841 s3_notification_conf
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
842 result
, status
= s3_notification_conf
.set_config()
843 assert_equal(status
/100, 2)
846 topic_conf_list
= [{'Id': notification_name
+'_4',
847 'TopicArn': topic_arn
,
848 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
851 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
852 {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
855 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
861 s3_notification_conf4
= PSNotificationS3(ps_zone
.conn
, bucket_name
, topic_conf_list
)
862 _
, status
= s3_notification_conf4
.set_config()
863 assert_equal(status
/100, 2)
865 except Exception as error
:
866 print 'note: metadata filter is not supported by boto3 - skipping test'
869 print 'filtering by attributes only supported on master zone'
873 # get all notifications
874 result
, status
= s3_notification_conf
.get_config()
875 assert_equal(status
/100, 2)
876 for conf
in result
['TopicConfigurations']:
877 filter_name
= conf
['Filter']['Key']['FilterRules'][0]['Name']
878 assert filter_name
== 'prefix' or filter_name
== 'suffix' or filter_name
== 'regex', filter_name
881 result
, status
= s3_notification_conf4
.get_config(notification
=notification_name
+'_4')
882 assert_equal(status
/100, 2)
883 filter_name
= result
['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
884 assert filter_name
== 'x-amz-meta-foo' or filter_name
== 'x-amz-meta-hello'
886 expected_in1
= ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
887 expected_in2
= ['world1.log', 'world2log', 'world3.log']
888 expected_in3
= ['hello.txt', 'hell.txt', 'worldlog.txt']
889 expected_in4
= ['foo', 'bar', 'hello', 'world']
890 filtered
= ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
891 filtered_with_attr
= ['nofoo', 'nobar', 'nohello', 'noworld']
892 # create objects in bucket
893 for key_name
in expected_in1
:
894 key
= bucket
.new_key(key_name
)
895 key
.set_contents_from_string('bar')
896 for key_name
in expected_in2
:
897 key
= bucket
.new_key(key_name
)
898 key
.set_contents_from_string('bar')
899 for key_name
in expected_in3
:
900 key
= bucket
.new_key(key_name
)
901 key
.set_contents_from_string('bar')
903 for key_name
in expected_in4
:
904 key
= bucket
.new_key(key_name
)
905 key
.set_metadata('foo', 'bar')
906 key
.set_metadata('hello', 'world')
907 key
.set_metadata('goodbye', 'cruel world')
908 key
.set_contents_from_string('bar')
909 for key_name
in filtered
:
910 key
= bucket
.new_key(key_name
)
911 key
.set_contents_from_string('bar')
912 for key_name
in filtered_with_attr
:
913 key
.set_metadata('foo', 'nobar')
914 key
.set_metadata('hello', 'noworld')
915 key
.set_metadata('goodbye', 'cruel world')
916 key
= bucket
.new_key(key_name
)
917 key
.set_contents_from_string('bar')
920 print 'wait for 5sec for the messages...'
923 zone_bucket_checkpoint(ps_zone
.zone
, zones
[0].zone
, bucket_name
)
930 for event
in receiver
.get_and_reset_events():
931 notif_id
= event
['Records'][0]['s3']['configurationId']
932 key_name
= event
['Records'][0]['s3']['object']['key']
933 if notif_id
== notification_name
+'_1':
934 found_in1
.append(key_name
)
935 elif notif_id
== notification_name
+'_2':
936 found_in2
.append(key_name
)
937 elif notif_id
== notification_name
+'_3':
938 found_in3
.append(key_name
)
939 elif not skip_notif4
and notif_id
== notification_name
+'_4':
940 found_in4
.append(key_name
)
942 assert False, 'invalid notification: ' + notif_id
944 assert_equal(set(found_in1
), set(expected_in1
))
945 assert_equal(set(found_in2
), set(expected_in2
))
946 assert_equal(set(found_in3
), set(expected_in3
))
948 assert_equal(set(found_in4
), set(expected_in4
))
951 s3_notification_conf
.del_config()
953 s3_notification_conf4
.del_config()
954 topic_conf
.del_config()
956 for key
in bucket
.list():
958 zones
[0].delete_bucket(bucket_name
)
959 stop_amqp_receiver(receiver
, task
)
963 def test_ps_s3_notification_filter_on_master():
964 ps_s3_notification_filter(on_master
=True)
967 def test_ps_s3_notification_filter():
968 ps_s3_notification_filter(on_master
=False)
971 def test_ps_s3_notification_errors_on_master():
972 """ test s3 notification set/get/delete on master """
973 zones
, _
= init_env(require_ps
=False)
975 zonegroup
= realm
.master_zonegroup()
976 bucket_name
= gen_bucket_name()
978 bucket
= zones
[0].create_bucket(bucket_name
)
979 topic_name
= bucket_name
+ TOPIC_SUFFIX
981 endpoint_address
= 'amqp://127.0.0.1:7001'
982 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
983 topic_conf
= PSTopicS3(zones
[0].conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
984 topic_arn
= topic_conf
.set_config()
986 # create s3 notification with invalid event name
987 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
988 topic_conf_list
= [{'Id': notification_name
,
989 'TopicArn': topic_arn
,
990 'Events': ['s3:ObjectCreated:Kaboom']
992 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
994 result
, status
= s3_notification_conf
.set_config()
995 except Exception as error
:
996 print str(error
) + ' - is expected'
998 assert False, 'invalid event name is expected to fail'
1000 # create s3 notification with missing name
1001 topic_conf_list
= [{'Id': '',
1002 'TopicArn': topic_arn
,
1003 'Events': ['s3:ObjectCreated:Put']
1005 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
1007 _
, _
= s3_notification_conf
.set_config()
1008 except Exception as error
:
1009 print str(error
) + ' - is expected'
1011 assert False, 'missing notification name is expected to fail'
1013 # create s3 notification with invalid topic ARN
1014 invalid_topic_arn
= 'kaboom'
1015 topic_conf_list
= [{'Id': notification_name
,
1016 'TopicArn': invalid_topic_arn
,
1017 'Events': ['s3:ObjectCreated:Put']
1019 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
1021 _
, _
= s3_notification_conf
.set_config()
1022 except Exception as error
:
1023 print str(error
) + ' - is expected'
1025 assert False, 'invalid ARN is expected to fail'
1027 # create s3 notification with unknown topic ARN
1028 invalid_topic_arn
= 'arn:aws:sns:a::kaboom'
1029 topic_conf_list
= [{'Id': notification_name
,
1030 'TopicArn': invalid_topic_arn
,
1031 'Events': ['s3:ObjectCreated:Put']
1033 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
1035 _
, _
= s3_notification_conf
.set_config()
1036 except Exception as error
:
1037 print str(error
) + ' - is expected'
1039 assert False, 'unknown topic is expected to fail'
1041 # create s3 notification with wrong bucket
1042 topic_conf_list
= [{'Id': notification_name
,
1043 'TopicArn': topic_arn
,
1044 'Events': ['s3:ObjectCreated:Put']
1046 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, 'kaboom', topic_conf_list
)
1048 _
, _
= s3_notification_conf
.set_config()
1049 except Exception as error
:
1050 print str(error
) + ' - is expected'
1052 assert False, 'unknown bucket is expected to fail'
1054 topic_conf
.del_config()
1056 status
= topic_conf
.del_config()
1057 # deleting an unknown notification is not considered an error
1058 assert_equal(status
, 200)
1060 _
, status
= topic_conf
.get_config()
1061 assert_equal(status
, 404)
1065 zones
[0].delete_bucket(bucket_name
)
1068 def test_objcet_timing():
1069 return SkipTest("only used in manual testing")
1070 zones
, _
= init_env(require_ps
=False)
1073 bucket_name
= gen_bucket_name()
1074 bucket
= zones
[0].create_bucket(bucket_name
)
1075 # create objects in the bucket (async)
1076 print 'creating objects...'
1077 number_of_objects
= 1000
1079 start_time
= time
.time()
1080 content
= str(bytearray(os
.urandom(1024*1024)))
1081 for i
in range(number_of_objects
):
1082 key
= bucket
.new_key(str(i
))
1083 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1085 client_threads
.append(thr
)
1086 [thr
.join() for thr
in client_threads
]
1088 time_diff
= time
.time() - start_time
1089 print 'average time for object creation: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds'
1091 print 'total number of objects: ' + str(len(list(bucket
.list())))
1093 print 'deleting objects...'
1095 start_time
= time
.time()
1096 for key
in bucket
.list():
1097 thr
= threading
.Thread(target
= key
.delete
, args
=())
1099 client_threads
.append(thr
)
1100 [thr
.join() for thr
in client_threads
]
1102 time_diff
= time
.time() - start_time
1103 print 'average time for object deletion: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds'
1106 zones
[0].delete_bucket(bucket_name
)
1109 def test_ps_s3_notification_push_amqp_on_master():
1110 """ test pushing amqp s3 notification on master """
1112 return SkipTest("PubSub push tests don't run in teuthology")
1114 proc
= init_rabbitmq()
1116 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1117 zones
, _
= init_env(require_ps
=False)
1119 zonegroup
= realm
.master_zonegroup()
1122 bucket_name
= gen_bucket_name()
1123 bucket
= zones
[0].create_bucket(bucket_name
)
1124 topic_name1
= bucket_name
+ TOPIC_SUFFIX
+ '_1'
1125 topic_name2
= bucket_name
+ TOPIC_SUFFIX
+ '_2'
1127 # start amqp receivers
1129 task1
, receiver1
= create_amqp_receiver_thread(exchange
, topic_name1
)
1130 task2
, receiver2
= create_amqp_receiver_thread(exchange
, topic_name2
)
1134 # create two s3 topic
1135 endpoint_address
= 'amqp://' + hostname
1136 # with acks from broker
1137 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
1138 topic_conf1
= PSTopicS3(zones
[0].conn
, topic_name1
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1139 topic_arn1
= topic_conf1
.set_config()
1140 # without acks from broker
1141 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=none'
1142 topic_conf2
= PSTopicS3(zones
[0].conn
, topic_name2
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1143 topic_arn2
= topic_conf2
.set_config()
1144 # create s3 notification
1145 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1146 topic_conf_list
= [{'Id': notification_name
+'_1', 'TopicArn': topic_arn1
,
1149 {'Id': notification_name
+'_2', 'TopicArn': topic_arn2
,
1150 'Events': ['s3:ObjectCreated:*']
1153 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
1154 response
, status
= s3_notification_conf
.set_config()
1155 assert_equal(status
/100, 2)
1157 # create objects in the bucket (async)
1158 number_of_objects
= 100
1160 start_time
= time
.time()
1161 for i
in range(number_of_objects
):
1162 key
= bucket
.new_key(str(i
))
1163 content
= str(os
.urandom(1024*1024))
1164 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1166 client_threads
.append(thr
)
1167 [thr
.join() for thr
in client_threads
]
1169 time_diff
= time
.time() - start_time
1170 print 'average time for creation + qmqp notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds'
1172 print 'wait for 5sec for the messages...'
1175 # check amqp receiver
1176 keys
= list(bucket
.list())
1177 print 'total number of objects: ' + str(len(keys
))
1178 receiver1
.verify_s3_events(keys
, exact_match
=True)
1179 receiver2
.verify_s3_events(keys
, exact_match
=True)
1181 # delete objects from the bucket
1183 start_time
= time
.time()
1184 for key
in bucket
.list():
1185 thr
= threading
.Thread(target
= key
.delete
, args
=())
1187 client_threads
.append(thr
)
1188 [thr
.join() for thr
in client_threads
]
1190 time_diff
= time
.time() - start_time
1191 print 'average time for creation + http notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds'
1193 print 'wait for 5sec for the messages...'
1196 # check amqp receiver 1 for deletions
1197 receiver1
.verify_s3_events(keys
, exact_match
=True, deletions
=True)
1198 # check amqp receiver 2 has no deletions
1200 receiver1
.verify_s3_events(keys
, exact_match
=False, deletions
=True)
1204 err
= 'amqp receiver 2 should have no deletions'
1209 stop_amqp_receiver(receiver1
, task1
)
1210 stop_amqp_receiver(receiver2
, task2
)
1211 s3_notification_conf
.del_config()
1212 topic_conf1
.del_config()
1213 topic_conf2
.del_config()
1215 zones
[0].delete_bucket(bucket_name
)
1216 clean_rabbitmq(proc
)
1219 def test_ps_s3_notification_push_http_on_master():
1220 """ test pushing http s3 notification on master """
1222 return SkipTest("PubSub push tests don't run in teuthology")
1224 zones
, _
= init_env(require_ps
=False)
1226 zonegroup
= realm
.master_zonegroup()
1228 # create random port for the http server
1230 port
= random
.randint(10000, 20000)
1231 # start an http server in a separate thread
1232 number_of_objects
= 100
1233 http_server
= StreamingHTTPServer(host
, port
, num_workers
=number_of_objects
)
1236 bucket_name
= gen_bucket_name()
1237 bucket
= zones
[0].create_bucket(bucket_name
)
1238 topic_name
= bucket_name
+ TOPIC_SUFFIX
1241 endpoint_address
= 'http://'+host
+':'+str(port
)
1242 endpoint_args
= 'push-endpoint='+endpoint_address
1243 topic_conf
= PSTopicS3(zones
[0].conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1244 topic_arn
= topic_conf
.set_config()
1245 # create s3 notification
1246 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1247 topic_conf_list
= [{'Id': notification_name
,
1248 'TopicArn': topic_arn
,
1251 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
1252 response
, status
= s3_notification_conf
.set_config()
1253 assert_equal(status
/100, 2)
1255 # create objects in the bucket
1257 start_time
= time
.time()
1259 for i
in range(number_of_objects
):
1260 key
= bucket
.new_key(str(i
))
1261 thr
= threading
.Thread(target
= set_contents_from_string
, args
=(key
, content
,))
1263 client_threads
.append(thr
)
1264 [thr
.join() for thr
in client_threads
]
1266 time_diff
= time
.time() - start_time
1267 print 'average time for creation + http notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds'
1269 print 'wait for 5sec for the messages...'
1272 # check http receiver
1273 keys
= list(bucket
.list())
1274 print 'total number of objects: ' + str(len(keys
))
1275 http_server
.verify_s3_events(keys
, exact_match
=True)
1277 # delete objects from the bucket
1279 start_time
= time
.time()
1280 for key
in bucket
.list():
1281 thr
= threading
.Thread(target
= key
.delete
, args
=())
1283 client_threads
.append(thr
)
1284 [thr
.join() for thr
in client_threads
]
1286 time_diff
= time
.time() - start_time
1287 print 'average time for creation + http notification is: ' + str(time_diff
*1000/number_of_objects
) + ' milliseconds'
1289 print 'wait for 5sec for the messages...'
1292 # check http receiver
1293 http_server
.verify_s3_events(keys
, exact_match
=True, deletions
=True)
1296 topic_conf
.del_config()
1297 s3_notification_conf
.del_config(notification
=notification_name
)
1299 zones
[0].delete_bucket(bucket_name
)
1303 def test_ps_topic():
1304 """ test set/get/delete of topic """
1305 _
, ps_zones
= init_env()
1307 zonegroup
= realm
.master_zonegroup()
1308 bucket_name
= gen_bucket_name()
1309 topic_name
= bucket_name
+TOPIC_SUFFIX
1312 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
1313 _
, status
= topic_conf
.set_config()
1314 assert_equal(status
/100, 2)
1316 result
, _
= topic_conf
.get_config()
1317 # verify topic content
1318 parsed_result
= json
.loads(result
)
1319 assert_equal(parsed_result
['topic']['name'], topic_name
)
1320 assert_equal(len(parsed_result
['subs']), 0)
1321 assert_equal(parsed_result
['topic']['arn'],
1322 'arn:aws:sns:' + zonegroup
.name
+ ':' + get_tenant() + ':' + topic_name
)
1324 _
, status
= topic_conf
.del_config()
1325 assert_equal(status
/100, 2)
1326 # verift topic is deleted
1327 result
, status
= topic_conf
.get_config()
1328 assert_equal(status
, 404)
1329 parsed_result
= json
.loads(result
)
1330 assert_equal(parsed_result
['Code'], 'NoSuchKey')
1333 def test_ps_topic_with_endpoint():
1334 """ test set topic with endpoint"""
1335 _
, ps_zones
= init_env()
1336 bucket_name
= gen_bucket_name()
1337 topic_name
= bucket_name
+TOPIC_SUFFIX
1340 dest_endpoint
= 'amqp://localhost:7001'
1341 dest_args
= 'amqp-exchange=amqp.direct&amqp-ack-level=none'
1342 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
,
1343 endpoint
=dest_endpoint
,
1344 endpoint_args
=dest_args
)
1345 _
, status
= topic_conf
.set_config()
1346 assert_equal(status
/100, 2)
1348 result
, _
= topic_conf
.get_config()
1349 # verify topic content
1350 parsed_result
= json
.loads(result
)
1351 assert_equal(parsed_result
['topic']['name'], topic_name
)
1352 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], dest_endpoint
)
1354 topic_conf
.del_config()
1357 def test_ps_notification():
1358 """ test set/get/delete of notification """
1359 zones
, ps_zones
= init_env()
1360 bucket_name
= gen_bucket_name()
1361 topic_name
= bucket_name
+TOPIC_SUFFIX
1364 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
1365 topic_conf
.set_config()
1366 # create bucket on the first of the rados zones
1367 zones
[0].create_bucket(bucket_name
)
1369 zone_meta_checkpoint(ps_zones
[0].zone
)
1370 # create notifications
1371 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1373 _
, status
= notification_conf
.set_config()
1374 assert_equal(status
/100, 2)
1376 result
, _
= notification_conf
.get_config()
1377 parsed_result
= json
.loads(result
)
1378 assert_equal(len(parsed_result
['topics']), 1)
1379 assert_equal(parsed_result
['topics'][0]['topic']['name'],
1381 # delete notification
1382 _
, status
= notification_conf
.del_config()
1383 assert_equal(status
/100, 2)
1384 result
, status
= notification_conf
.get_config()
1385 parsed_result
= json
.loads(result
)
1386 assert_equal(len(parsed_result
['topics']), 0)
1387 # TODO should return 404
1388 # assert_equal(status, 404)
1391 topic_conf
.del_config()
1392 zones
[0].delete_bucket(bucket_name
)
1395 def test_ps_notification_events():
1396 """ test set/get/delete of notification on specific events"""
1397 zones
, ps_zones
= init_env()
1398 bucket_name
= gen_bucket_name()
1399 topic_name
= bucket_name
+TOPIC_SUFFIX
1402 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
1403 topic_conf
.set_config()
1404 # create bucket on the first of the rados zones
1405 zones
[0].create_bucket(bucket_name
)
1407 zone_meta_checkpoint(ps_zones
[0].zone
)
1408 # create notifications
1409 events
= "OBJECT_CREATE,OBJECT_DELETE"
1410 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1413 _
, status
= notification_conf
.set_config()
1414 assert_equal(status
/100, 2)
1416 result
, _
= notification_conf
.get_config()
1417 parsed_result
= json
.loads(result
)
1418 assert_equal(len(parsed_result
['topics']), 1)
1419 assert_equal(parsed_result
['topics'][0]['topic']['name'],
1421 assert_not_equal(len(parsed_result
['topics'][0]['events']), 0)
1422 # TODO add test for invalid event name
1425 notification_conf
.del_config()
1426 topic_conf
.del_config()
1427 zones
[0].delete_bucket(bucket_name
)
1430 def test_ps_subscription():
1431 """ test set/get/delete of subscription """
1432 zones
, ps_zones
= init_env()
1433 bucket_name
= gen_bucket_name()
1434 topic_name
= bucket_name
+TOPIC_SUFFIX
1437 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
1438 topic_conf
.set_config()
1439 # create bucket on the first of the rados zones
1440 bucket
= zones
[0].create_bucket(bucket_name
)
1442 zone_meta_checkpoint(ps_zones
[0].zone
)
1443 # create notifications
1444 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1446 _
, status
= notification_conf
.set_config()
1447 assert_equal(status
/100, 2)
1448 # create subscription
1449 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
1451 _
, status
= sub_conf
.set_config()
1452 assert_equal(status
/100, 2)
1453 # get the subscription
1454 result
, _
= sub_conf
.get_config()
1455 parsed_result
= json
.loads(result
)
1456 assert_equal(parsed_result
['topic'], topic_name
)
1457 # create objects in the bucket
1458 number_of_objects
= 10
1459 for i
in range(number_of_objects
):
1460 key
= bucket
.new_key(str(i
))
1461 key
.set_contents_from_string('bar')
1463 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
1465 # get the create events from the subscription
1466 result
, _
= sub_conf
.get_events()
1467 events
= json
.loads(result
)
1468 for event
in events
['events']:
1469 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1470 keys
= list(bucket
.list())
1471 # TODO: use exact match
1472 verify_events_by_elements(events
, keys
, exact_match
=False)
1473 # delete objects from the bucket
1474 for key
in bucket
.list():
1477 zone_meta_checkpoint(ps_zones
[0].zone
)
1478 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
1480 # get the delete events from the subscriptions
1481 result
, _
= sub_conf
.get_events()
1482 for event
in events
['events']:
1483 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1484 # TODO: check deletions
1485 # TODO: use exact match
1486 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1487 # we should see the creations as well as the deletions
1488 # delete subscription
1489 _
, status
= sub_conf
.del_config()
1490 assert_equal(status
/100, 2)
1491 result
, status
= sub_conf
.get_config()
1492 parsed_result
= json
.loads(result
)
1493 assert_equal(parsed_result
['topic'], '')
1494 # TODO should return 404
1495 # assert_equal(status, 404)
1498 notification_conf
.del_config()
1499 topic_conf
.del_config()
1500 zones
[0].delete_bucket(bucket_name
)
1503 def test_ps_event_type_subscription():
1504 """ test subscriptions for different events """
1505 zones
, ps_zones
= init_env()
1506 bucket_name
= gen_bucket_name()
1508 # create topic for objects creation
1509 topic_create_name
= bucket_name
+TOPIC_SUFFIX
+'_create'
1510 topic_create_conf
= PSTopic(ps_zones
[0].conn
, topic_create_name
)
1511 topic_create_conf
.set_config()
1512 # create topic for objects deletion
1513 topic_delete_name
= bucket_name
+TOPIC_SUFFIX
+'_delete'
1514 topic_delete_conf
= PSTopic(ps_zones
[0].conn
, topic_delete_name
)
1515 topic_delete_conf
.set_config()
1516 # create topic for all events
1517 topic_name
= bucket_name
+TOPIC_SUFFIX
+'_all'
1518 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
1519 topic_conf
.set_config()
1520 # create bucket on the first of the rados zones
1521 bucket
= zones
[0].create_bucket(bucket_name
)
1523 zone_meta_checkpoint(ps_zones
[0].zone
)
1524 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
1525 # create notifications for objects creation
1526 notification_create_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1527 topic_create_name
, "OBJECT_CREATE")
1528 _
, status
= notification_create_conf
.set_config()
1529 assert_equal(status
/100, 2)
1530 # create notifications for objects deletion
1531 notification_delete_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1532 topic_delete_name
, "OBJECT_DELETE")
1533 _
, status
= notification_delete_conf
.set_config()
1534 assert_equal(status
/100, 2)
1535 # create notifications for all events
1536 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1537 topic_name
, "OBJECT_DELETE,OBJECT_CREATE")
1538 _
, status
= notification_conf
.set_config()
1539 assert_equal(status
/100, 2)
1540 # create subscription for objects creation
1541 sub_create_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
+'_create',
1543 _
, status
= sub_create_conf
.set_config()
1544 assert_equal(status
/100, 2)
1545 # create subscription for objects deletion
1546 sub_delete_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
+'_delete',
1548 _
, status
= sub_delete_conf
.set_config()
1549 assert_equal(status
/100, 2)
1550 # create subscription for all events
1551 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
+'_all',
1553 _
, status
= sub_conf
.set_config()
1554 assert_equal(status
/100, 2)
1555 # create objects in the bucket
1556 number_of_objects
= 10
1557 for i
in range(number_of_objects
):
1558 key
= bucket
.new_key(str(i
))
1559 key
.set_contents_from_string('bar')
1561 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
1563 # get the events from the creation subscription
1564 result
, _
= sub_create_conf
.get_events()
1565 events
= json
.loads(result
)
1566 for event
in events
['events']:
1567 log
.debug('Event (OBJECT_CREATE): objname: "' + str(event
['info']['key']['name']) +
1568 '" type: "' + str(event
['event']) + '"')
1569 keys
= list(bucket
.list())
1570 # TODO: use exact match
1571 verify_events_by_elements(events
, keys
, exact_match
=False)
1572 # get the events from the deletions subscription
1573 result
, _
= sub_delete_conf
.get_events()
1574 events
= json
.loads(result
)
1575 for event
in events
['events']:
1576 log
.debug('Event (OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
1577 '" type: "' + str(event
['event']) + '"')
1578 assert_equal(len(events
['events']), 0)
1579 # get the events from the all events subscription
1580 result
, _
= sub_conf
.get_events()
1581 events
= json
.loads(result
)
1582 for event
in events
['events']:
1583 log
.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
1584 str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1585 # TODO: use exact match
1586 verify_events_by_elements(events
, keys
, exact_match
=False)
1587 # delete objects from the bucket
1588 for key
in bucket
.list():
1591 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
1592 log
.debug("Event (OBJECT_DELETE) synced")
1594 # get the events from the creations subscription
1595 result
, _
= sub_create_conf
.get_events()
1596 events
= json
.loads(result
)
1597 for event
in events
['events']:
1598 log
.debug('Event (OBJECT_CREATE): objname: "' + str(event
['info']['key']['name']) +
1599 '" type: "' + str(event
['event']) + '"')
1600 # deletions should not change the creation events
1601 # TODO: use exact match
1602 verify_events_by_elements(events
, keys
, exact_match
=False)
1603 # get the events from the deletions subscription
1604 result
, _
= sub_delete_conf
.get_events()
1605 events
= json
.loads(result
)
1606 for event
in events
['events']:
1607 log
.debug('Event (OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
1608 '" type: "' + str(event
['event']) + '"')
1609 # only deletions should be listed here
1610 # TODO: use exact match
1611 verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=True)
1612 # get the events from the all events subscription
1613 result
, _
= sub_create_conf
.get_events()
1614 events
= json
.loads(result
)
1615 for event
in events
['events']:
1616 log
.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) +
1617 '" type: "' + str(event
['event']) + '"')
1618 # both deletions and creations should be here
1619 # TODO: use exact match
1620 verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=False)
1621 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1622 # TODO: (1) test deletions (2) test overall number of events
1624 # test subscription deletion when topic is specified
1625 _
, status
= sub_create_conf
.del_config(topic
=True)
1626 assert_equal(status
/100, 2)
1627 _
, status
= sub_delete_conf
.del_config(topic
=True)
1628 assert_equal(status
/100, 2)
1629 _
, status
= sub_conf
.del_config(topic
=True)
1630 assert_equal(status
/100, 2)
1633 notification_create_conf
.del_config()
1634 notification_delete_conf
.del_config()
1635 notification_conf
.del_config()
1636 topic_create_conf
.del_config()
1637 topic_delete_conf
.del_config()
1638 topic_conf
.del_config()
1639 zones
[0].delete_bucket(bucket_name
)
1642 def test_ps_event_fetching():
1643 """ test incremental fetching of events from a subscription """
1644 zones
, ps_zones
= init_env()
1645 bucket_name
= gen_bucket_name()
1646 topic_name
= bucket_name
+TOPIC_SUFFIX
1649 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
1650 topic_conf
.set_config()
1651 # create bucket on the first of the rados zones
1652 bucket
= zones
[0].create_bucket(bucket_name
)
1654 zone_meta_checkpoint(ps_zones
[0].zone
)
1655 # create notifications
1656 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1658 _
, status
= notification_conf
.set_config()
1659 assert_equal(status
/100, 2)
1660 # create subscription
1661 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
1663 _
, status
= sub_conf
.set_config()
1664 assert_equal(status
/100, 2)
1665 # create objects in the bucket
1666 number_of_objects
= 100
1667 for i
in range(number_of_objects
):
1668 key
= bucket
.new_key(str(i
))
1669 key
.set_contents_from_string('bar')
1671 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
1673 total_events_count
= 0
1677 # get the events from the subscription
1678 result
, _
= sub_conf
.get_events(max_events
, next_marker
)
1679 events
= json
.loads(result
)
1680 total_events_count
+= len(events
['events'])
1681 all_events
.extend(events
['events'])
1682 next_marker
= events
['next_marker']
1683 for event
in events
['events']:
1684 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1685 if next_marker
== '':
1687 keys
= list(bucket
.list())
1688 # TODO: use exact match
1689 verify_events_by_elements({'events': all_events
}, keys
, exact_match
=False)
1692 sub_conf
.del_config()
1693 notification_conf
.del_config()
1694 topic_conf
.del_config()
1695 for key
in bucket
.list():
1697 zones
[0].delete_bucket(bucket_name
)
1700 def test_ps_event_acking():
1701 """ test acking of some events in a subscription """
1702 zones
, ps_zones
= init_env()
1703 bucket_name
= gen_bucket_name()
1704 topic_name
= bucket_name
+TOPIC_SUFFIX
1707 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
1708 topic_conf
.set_config()
1709 # create bucket on the first of the rados zones
1710 bucket
= zones
[0].create_bucket(bucket_name
)
1712 zone_meta_checkpoint(ps_zones
[0].zone
)
1713 # create notifications
1714 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1716 _
, status
= notification_conf
.set_config()
1717 assert_equal(status
/100, 2)
1718 # create subscription
1719 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
1721 _
, status
= sub_conf
.set_config()
1722 assert_equal(status
/100, 2)
1723 # create objects in the bucket
1724 number_of_objects
= 10
1725 for i
in range(number_of_objects
):
1726 key
= bucket
.new_key(str(i
))
1727 key
.set_contents_from_string('bar')
1729 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
1731 # get the create events from the subscription
1732 result
, _
= sub_conf
.get_events()
1733 events
= json
.loads(result
)
1734 original_number_of_events
= len(events
)
1735 for event
in events
['events']:
1736 log
.debug('Event (before ack) id: "' + str(event
['id']) + '"')
1737 keys
= list(bucket
.list())
1738 # TODO: use exact match
1739 verify_events_by_elements(events
, keys
, exact_match
=False)
1740 # ack half of the events
1741 events_to_ack
= number_of_objects
/2
1742 for event
in events
['events']:
1743 if events_to_ack
== 0:
1745 _
, status
= sub_conf
.ack_events(event
['id'])
1746 assert_equal(status
/100, 2)
1749 # verify that acked events are gone
1750 result
, _
= sub_conf
.get_events()
1751 events
= json
.loads(result
)
1752 for event
in events
['events']:
1753 log
.debug('Event (after ack) id: "' + str(event
['id']) + '"')
1754 assert len(events
) >= (original_number_of_events
- number_of_objects
/2)
1757 sub_conf
.del_config()
1758 notification_conf
.del_config()
1759 topic_conf
.del_config()
1760 for key
in bucket
.list():
1762 zones
[0].delete_bucket(bucket_name
)
1765 def test_ps_creation_triggers():
1766 """ test object creation notifications in using put/copy/post """
1767 zones
, ps_zones
= init_env()
1768 bucket_name
= gen_bucket_name()
1769 topic_name
= bucket_name
+TOPIC_SUFFIX
1772 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
1773 topic_conf
.set_config()
1774 # create bucket on the first of the rados zones
1775 bucket
= zones
[0].create_bucket(bucket_name
)
1777 zone_meta_checkpoint(ps_zones
[0].zone
)
1778 # create notifications
1779 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
1781 _
, status
= notification_conf
.set_config()
1782 assert_equal(status
/100, 2)
1783 # create subscription
1784 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
1786 _
, status
= sub_conf
.set_config()
1787 assert_equal(status
/100, 2)
1788 # create objects in the bucket using PUT
1789 key
= bucket
.new_key('put')
1790 key
.set_contents_from_string('bar')
1791 # create objects in the bucket using COPY
1792 bucket
.copy_key('copy', bucket
.name
, key
.name
)
1793 # create objects in the bucket using multi-part upload
1794 fp
= tempfile
.TemporaryFile(mode
='w')
1797 uploader
= bucket
.initiate_multipart_upload('multipart')
1798 fp
= tempfile
.TemporaryFile(mode
='r')
1799 uploader
.upload_part_from_file(fp
, 1)
1800 uploader
.complete_upload()
1803 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
1805 # get the create events from the subscription
1806 result
, _
= sub_conf
.get_events()
1807 events
= json
.loads(result
)
1808 for event
in events
['events']:
1809 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
1811 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
1812 assert len(events
['events']) >= 3
1814 sub_conf
.del_config()
1815 notification_conf
.del_config()
1816 topic_conf
.del_config()
1817 for key
in bucket
.list():
1819 zones
[0].delete_bucket(bucket_name
)
1822 def test_ps_s3_creation_triggers_on_master():
1823 """ test object creation s3 notifications in using put/copy/post on master"""
1825 return SkipTest("PubSub push tests don't run in teuthology")
1827 proc
= init_rabbitmq()
1829 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1830 zones
, _
= init_env(require_ps
=False)
1832 zonegroup
= realm
.master_zonegroup()
1835 bucket_name
= gen_bucket_name()
1836 bucket
= zones
[0].create_bucket(bucket_name
)
1837 topic_name
= bucket_name
+ TOPIC_SUFFIX
1839 # start amqp receiver
1841 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
1845 endpoint_address
= 'amqp://' + hostname
1846 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
1847 topic_conf
= PSTopicS3(zones
[0].conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
1848 topic_arn
= topic_conf
.set_config()
1849 # create s3 notification
1850 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1851 topic_conf_list
= [{'Id': notification_name
,'TopicArn': topic_arn
,
1852 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
1855 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
1856 response
, status
= s3_notification_conf
.set_config()
1857 assert_equal(status
/100, 2)
1859 # create objects in the bucket using PUT
1860 key
= bucket
.new_key('put')
1861 key
.set_contents_from_string('bar')
1862 # create objects in the bucket using COPY
1863 bucket
.copy_key('copy', bucket
.name
, key
.name
)
1864 # create objects in the bucket using multi-part upload
1865 fp
= tempfile
.TemporaryFile(mode
='w')
1868 uploader
= bucket
.initiate_multipart_upload('multipart')
1869 fp
= tempfile
.TemporaryFile(mode
='r')
1870 uploader
.upload_part_from_file(fp
, 1)
1871 uploader
.complete_upload()
1874 print 'wait for 5sec for the messages...'
1877 # check amqp receiver
1878 keys
= list(bucket
.list())
1879 receiver
.verify_s3_events(keys
, exact_match
=True)
1882 stop_amqp_receiver(receiver
, task
)
1883 s3_notification_conf
.del_config()
1884 topic_conf
.del_config()
1885 for key
in bucket
.list():
1888 zones
[0].delete_bucket(bucket_name
)
1889 clean_rabbitmq(proc
)
1892 def test_ps_s3_multipart_on_master():
1893 """ test multipart object upload on master"""
1895 return SkipTest("PubSub push tests don't run in teuthology")
1897 proc
= init_rabbitmq()
1899 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1900 zones
, _
= init_env(require_ps
=False)
1902 zonegroup
= realm
.master_zonegroup()
1905 bucket_name
= gen_bucket_name()
1906 bucket
= zones
[0].create_bucket(bucket_name
)
1907 topic_name
= bucket_name
+ TOPIC_SUFFIX
1909 # start amqp receivers
1911 task1
, receiver1
= create_amqp_receiver_thread(exchange
, topic_name
+'_1')
1913 task2
, receiver2
= create_amqp_receiver_thread(exchange
, topic_name
+'_2')
1915 task3
, receiver3
= create_amqp_receiver_thread(exchange
, topic_name
+'_3')
1919 endpoint_address
= 'amqp://' + hostname
1920 endpoint_args
= 'push-endpoint=' + endpoint_address
+ '&amqp-exchange=' + exchange
+ '&amqp-ack-level=broker'
1921 topic_conf1
= PSTopicS3(zones
[0].conn
, topic_name
+'_1', zonegroup
.name
, endpoint_args
=endpoint_args
)
1922 topic_arn1
= topic_conf1
.set_config()
1923 topic_conf2
= PSTopicS3(zones
[0].conn
, topic_name
+'_2', zonegroup
.name
, endpoint_args
=endpoint_args
)
1924 topic_arn2
= topic_conf2
.set_config()
1925 topic_conf3
= PSTopicS3(zones
[0].conn
, topic_name
+'_3', zonegroup
.name
, endpoint_args
=endpoint_args
)
1926 topic_arn3
= topic_conf3
.set_config()
1928 # create s3 notifications
1929 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
1930 topic_conf_list
= [{'Id': notification_name
+'_1', 'TopicArn': topic_arn1
,
1931 'Events': ['s3:ObjectCreated:*']
1933 {'Id': notification_name
+'_2', 'TopicArn': topic_arn2
,
1934 'Events': ['s3:ObjectCreated:Post']
1936 {'Id': notification_name
+'_3', 'TopicArn': topic_arn3
,
1937 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
1939 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
1940 response
, status
= s3_notification_conf
.set_config()
1941 assert_equal(status
/100, 2)
1943 # create objects in the bucket using multi-part upload
1944 fp
= tempfile
.TemporaryFile(mode
='w+b')
1945 content
= bytearray(os
.urandom(1024*1024))
1949 uploader
= bucket
.initiate_multipart_upload('multipart')
1950 uploader
.upload_part_from_file(fp
, 1)
1951 uploader
.complete_upload()
1954 print 'wait for 5sec for the messages...'
1957 # check amqp receiver
1958 events
= receiver1
.get_and_reset_events()
1959 assert_equal(len(events
), 3)
1961 events
= receiver2
.get_and_reset_events()
1962 assert_equal(len(events
), 1)
1963 assert_equal(events
[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post')
1964 assert_equal(events
[0]['Records'][0]['s3']['configurationId'], notification_name
+'_2')
1966 events
= receiver3
.get_and_reset_events()
1967 assert_equal(len(events
), 1)
1968 assert_equal(events
[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
1969 assert_equal(events
[0]['Records'][0]['s3']['configurationId'], notification_name
+'_3')
1972 stop_amqp_receiver(receiver1
, task1
)
1973 stop_amqp_receiver(receiver2
, task2
)
1974 stop_amqp_receiver(receiver3
, task3
)
1975 s3_notification_conf
.del_config()
1976 topic_conf1
.del_config()
1977 topic_conf2
.del_config()
1978 topic_conf3
.del_config()
1979 for key
in bucket
.list():
1982 zones
[0].delete_bucket(bucket_name
)
1983 clean_rabbitmq(proc
)
1986 def test_ps_versioned_deletion():
1987 """ test notification of deletion markers """
1988 zones
, ps_zones
= init_env()
1989 bucket_name
= gen_bucket_name()
1990 topic_name
= bucket_name
+TOPIC_SUFFIX
1993 topic_conf1
= PSTopic(ps_zones
[0].conn
, topic_name
+'_1')
1994 _
, status
= topic_conf1
.set_config()
1995 assert_equal(status
/100, 2)
1996 topic_conf2
= PSTopic(ps_zones
[0].conn
, topic_name
+'_2')
1997 _
, status
= topic_conf2
.set_config()
1998 assert_equal(status
/100, 2)
2000 # create bucket on the first of the rados zones
2001 bucket
= zones
[0].create_bucket(bucket_name
)
2002 bucket
.configure_versioning(True)
2005 zone_meta_checkpoint(ps_zones
[0].zone
)
2007 # create notifications
2008 event_type1
= 'OBJECT_DELETE'
2009 notification_conf1
= PSNotification(ps_zones
[0].conn
, bucket_name
,
2012 _
, status
= notification_conf1
.set_config()
2013 assert_equal(status
/100, 2)
2014 event_type2
= 'DELETE_MARKER_CREATE'
2015 notification_conf2
= PSNotification(ps_zones
[0].conn
, bucket_name
,
2018 _
, status
= notification_conf2
.set_config()
2019 assert_equal(status
/100, 2)
2021 # create subscriptions
2022 sub_conf1
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
+'_1',
2024 _
, status
= sub_conf1
.set_config()
2025 assert_equal(status
/100, 2)
2026 sub_conf2
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
+'_2',
2028 _
, status
= sub_conf2
.set_config()
2029 assert_equal(status
/100, 2)
2031 # create objects in the bucket
2032 key
= bucket
.new_key('foo')
2033 key
.set_contents_from_string('bar')
2035 key
.set_contents_from_string('kaboom')
2037 # create deletion marker
2038 delete_marker_key
= bucket
.delete_key(key
.name
)
2041 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2043 # delete the deletion marker
2044 delete_marker_key
.delete()
2046 bucket
.delete_key(key
.name
, version_id
=v2
)
2047 bucket
.delete_key(key
.name
, version_id
=v1
)
2050 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2052 # get the delete events from the subscription
2053 result
, _
= sub_conf1
.get_events()
2054 events
= json
.loads(result
)
2055 for event
in events
['events']:
2056 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2057 assert_equal(str(event
['event']), event_type1
)
2059 result
, _
= sub_conf2
.get_events()
2060 events
= json
.loads(result
)
2061 for event
in events
['events']:
2062 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
2063 assert_equal(str(event
['event']), event_type2
)
2066 # follwing is needed for the cleanup in the case of 3-zones
2067 # see: http://tracker.ceph.com/issues/39142
2069 zonegroup
= realm
.master_zonegroup()
2070 zonegroup_conns
= ZonegroupConns(zonegroup
)
2072 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
2073 zones
[0].delete_bucket(bucket_name
)
2075 log
.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
2076 sub_conf1
.del_config()
2077 sub_conf2
.del_config()
2078 notification_conf1
.del_config()
2079 notification_conf2
.del_config()
2080 topic_conf1
.del_config()
2081 topic_conf2
.del_config()
2084 def test_ps_s3_metadata_on_master():
2085 """ test s3 notification of metadata on master """
2087 return SkipTest("PubSub push tests don't run in teuthology")
2089 proc
= init_rabbitmq()
2091 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2092 zones
, _
= init_env(require_ps
=False)
2094 zonegroup
= realm
.master_zonegroup()
2097 bucket_name
= gen_bucket_name()
2098 bucket
= zones
[0].create_bucket(bucket_name
)
2099 topic_name
= bucket_name
+ TOPIC_SUFFIX
2101 # start amqp receiver
2103 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2107 endpoint_address
= 'amqp://' + hostname
2108 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
2109 topic_conf
= PSTopicS3(zones
[0].conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
2110 topic_arn
= topic_conf
.set_config()
2111 # create s3 notification
2112 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2113 topic_conf_list
= [{'Id': notification_name
,'TopicArn': topic_arn
,
2114 'Events': ['s3:ObjectCreated:*']
2117 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
2118 response
, status
= s3_notification_conf
.set_config()
2119 assert_equal(status
/100, 2)
2121 # create objects in the bucket
2122 key
= bucket
.new_key('foo')
2123 key
.set_metadata('meta1', 'This is my metadata value')
2124 key
.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
2125 keys
= list(bucket
.list())
2126 print 'wait for 5sec for the messages...'
2128 # check amqp receiver
2129 receiver
.verify_s3_events(keys
, exact_match
=True)
2132 stop_amqp_receiver(receiver
, task
)
2133 s3_notification_conf
.del_config()
2134 topic_conf
.del_config()
2135 for key
in bucket
.list():
2138 zones
[0].delete_bucket(bucket_name
)
2139 clean_rabbitmq(proc
)
2142 def test_ps_s3_versioned_deletion_on_master():
2143 """ test s3 notification of deletion markers on master """
2145 return SkipTest("PubSub push tests don't run in teuthology")
2147 proc
= init_rabbitmq()
2149 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2150 zones
, _
= init_env(require_ps
=False)
2152 zonegroup
= realm
.master_zonegroup()
2155 bucket_name
= gen_bucket_name()
2156 bucket
= zones
[0].create_bucket(bucket_name
)
2157 bucket
.configure_versioning(True)
2158 topic_name
= bucket_name
+ TOPIC_SUFFIX
2160 # start amqp receiver
2162 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2166 endpoint_address
= 'amqp://' + hostname
2167 endpoint_args
= 'push-endpoint='+endpoint_address
+'&amqp-exchange=' + exchange
+'&amqp-ack-level=broker'
2168 topic_conf
= PSTopicS3(zones
[0].conn
, topic_name
, zonegroup
.name
, endpoint_args
=endpoint_args
)
2169 topic_arn
= topic_conf
.set_config()
2170 # create s3 notification
2171 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2172 # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
2173 topic_conf_list
= [{'Id': notification_name
+'_1', 'TopicArn': topic_arn
,
2174 'Events': ['s3:ObjectRemoved:*']
2176 {'Id': notification_name
+'_2', 'TopicArn': topic_arn
,
2177 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
2179 {'Id': notification_name
+'_3', 'TopicArn': topic_arn
,
2180 'Events': ['s3:ObjectRemoved:Delete']
2182 s3_notification_conf
= PSNotificationS3(zones
[0].conn
, bucket_name
, topic_conf_list
)
2183 response
, status
= s3_notification_conf
.set_config()
2184 assert_equal(status
/100, 2)
2186 # create objects in the bucket
2187 key
= bucket
.new_key('foo')
2188 key
.set_contents_from_string('bar')
2190 key
.set_contents_from_string('kaboom')
2192 # create delete marker (non versioned deletion)
2193 delete_marker_key
= bucket
.delete_key(key
.name
)
2197 # versioned deletion
2198 bucket
.delete_key(key
.name
, version_id
=v2
)
2199 bucket
.delete_key(key
.name
, version_id
=v1
)
2200 delete_marker_key
.delete()
2202 print 'wait for 5sec for the messages...'
2205 # check amqp receiver
2206 events
= receiver
.get_and_reset_events()
2208 delete_marker_create_events
= 0
2209 for event_list
in events
:
2210 for event
in event_list
['Records']:
2211 if event
['eventName'] == 's3:ObjectRemoved:Delete':
2213 assert event
['s3']['configurationId'] in [notification_name
+'_1', notification_name
+'_3']
2214 if event
['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
2215 delete_marker_create_events
+= 1
2216 assert event
['s3']['configurationId'] in [notification_name
+'_1', notification_name
+'_2']
2218 # 3 key versions were deleted (v1, v2 and the deletion marker)
2219 # notified over the same topic via 2 notifications (1,3)
2220 assert_equal(delete_events
, 3*2)
2221 # 1 deletion marker was created
2222 # notified over the same topic over 2 notifications (1,2)
2223 assert_equal(delete_marker_create_events
, 1*2)
2226 stop_amqp_receiver(receiver
, task
)
2227 s3_notification_conf
.del_config()
2228 topic_conf
.del_config()
2230 zones
[0].delete_bucket(bucket_name
)
2231 clean_rabbitmq(proc
)
2234 def test_ps_push_http():
2235 """ test pushing to http endpoint """
2237 return SkipTest("PubSub push tests don't run in teuthology")
2238 zones
, ps_zones
= init_env()
2239 bucket_name
= gen_bucket_name()
2240 topic_name
= bucket_name
+TOPIC_SUFFIX
2242 # create random port for the http server
2244 port
= random
.randint(10000, 20000)
2245 # start an http server in a separate thread
2246 http_server
= StreamingHTTPServer(host
, port
)
2249 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
2250 _
, status
= topic_conf
.set_config()
2251 assert_equal(status
/100, 2)
2252 # create bucket on the first of the rados zones
2253 bucket
= zones
[0].create_bucket(bucket_name
)
2255 zone_meta_checkpoint(ps_zones
[0].zone
)
2256 # create notifications
2257 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
2259 _
, status
= notification_conf
.set_config()
2260 assert_equal(status
/100, 2)
2261 # create subscription
2262 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
2263 topic_name
, endpoint
='http://'+host
+':'+str(port
))
2264 _
, status
= sub_conf
.set_config()
2265 assert_equal(status
/100, 2)
2266 # create objects in the bucket
2267 number_of_objects
= 10
2268 for i
in range(number_of_objects
):
2269 key
= bucket
.new_key(str(i
))
2270 key
.set_contents_from_string('bar')
2272 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2274 keys
= list(bucket
.list())
2275 # TODO: use exact match
2276 http_server
.verify_events(keys
, exact_match
=False)
2278 # delete objects from the bucket
2279 for key
in bucket
.list():
2282 zone_meta_checkpoint(ps_zones
[0].zone
)
2283 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2285 # TODO: use exact match
2286 http_server
.verify_events(keys
, deletions
=True, exact_match
=False)
2289 sub_conf
.del_config()
2290 notification_conf
.del_config()
2291 topic_conf
.del_config()
2292 zones
[0].delete_bucket(bucket_name
)
2296 def test_ps_s3_push_http():
2297 """ test pushing to http endpoint s3 record format"""
2299 return SkipTest("PubSub push tests don't run in teuthology")
2300 zones
, ps_zones
= init_env()
2301 bucket_name
= gen_bucket_name()
2302 topic_name
= bucket_name
+TOPIC_SUFFIX
2304 # create random port for the http server
2306 port
= random
.randint(10000, 20000)
2307 # start an http server in a separate thread
2308 http_server
= StreamingHTTPServer(host
, port
)
2311 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
,
2312 endpoint
='http://'+host
+':'+str(port
))
2313 result
, status
= topic_conf
.set_config()
2314 assert_equal(status
/100, 2)
2315 parsed_result
= json
.loads(result
)
2316 topic_arn
= parsed_result
['arn']
2317 # create bucket on the first of the rados zones
2318 bucket
= zones
[0].create_bucket(bucket_name
)
2320 zone_meta_checkpoint(ps_zones
[0].zone
)
2321 # create s3 notification
2322 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2323 topic_conf_list
= [{'Id': notification_name
,
2324 'TopicArn': topic_arn
,
2325 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
2327 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2328 _
, status
= s3_notification_conf
.set_config()
2329 assert_equal(status
/100, 2)
2330 # create objects in the bucket
2331 number_of_objects
= 10
2332 for i
in range(number_of_objects
):
2333 key
= bucket
.new_key(str(i
))
2334 key
.set_contents_from_string('bar')
2336 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2338 keys
= list(bucket
.list())
2339 # TODO: use exact match
2340 http_server
.verify_s3_events(keys
, exact_match
=False)
2342 # delete objects from the bucket
2343 for key
in bucket
.list():
2346 zone_meta_checkpoint(ps_zones
[0].zone
)
2347 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2349 # TODO: use exact match
2350 http_server
.verify_s3_events(keys
, deletions
=True, exact_match
=False)
2353 s3_notification_conf
.del_config()
2354 topic_conf
.del_config()
2355 zones
[0].delete_bucket(bucket_name
)
2359 def test_ps_push_amqp():
2360 """ test pushing to amqp endpoint """
2362 return SkipTest("PubSub push tests don't run in teuthology")
2364 proc
= init_rabbitmq()
2366 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2367 zones
, ps_zones
= init_env()
2368 bucket_name
= gen_bucket_name()
2369 topic_name
= bucket_name
+TOPIC_SUFFIX
2373 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2375 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
2376 _
, status
= topic_conf
.set_config()
2377 assert_equal(status
/100, 2)
2378 # create bucket on the first of the rados zones
2379 bucket
= zones
[0].create_bucket(bucket_name
)
2381 zone_meta_checkpoint(ps_zones
[0].zone
)
2382 # create notifications
2383 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
2385 _
, status
= notification_conf
.set_config()
2386 assert_equal(status
/100, 2)
2387 # create subscription
2388 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
2389 topic_name
, endpoint
='amqp://'+hostname
,
2390 endpoint_args
='amqp-exchange='+exchange
+'&amqp-ack-level=broker')
2391 _
, status
= sub_conf
.set_config()
2392 assert_equal(status
/100, 2)
2393 # create objects in the bucket
2394 number_of_objects
= 10
2395 for i
in range(number_of_objects
):
2396 key
= bucket
.new_key(str(i
))
2397 key
.set_contents_from_string('bar')
2399 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2400 # check amqp receiver
2401 keys
= list(bucket
.list())
2402 # TODO: use exact match
2403 receiver
.verify_events(keys
, exact_match
=False)
2405 # delete objects from the bucket
2406 for key
in bucket
.list():
2409 zone_meta_checkpoint(ps_zones
[0].zone
)
2410 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2411 # check amqp receiver
2412 # TODO: use exact match
2413 receiver
.verify_events(keys
, deletions
=True, exact_match
=False)
2416 stop_amqp_receiver(receiver
, task
)
2417 sub_conf
.del_config()
2418 notification_conf
.del_config()
2419 topic_conf
.del_config()
2420 zones
[0].delete_bucket(bucket_name
)
2421 clean_rabbitmq(proc
)
2424 def test_ps_s3_push_amqp():
2425 """ test pushing to amqp endpoint s3 record format"""
2427 return SkipTest("PubSub push tests don't run in teuthology")
2429 proc
= init_rabbitmq()
2431 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2432 zones
, ps_zones
= init_env()
2433 bucket_name
= gen_bucket_name()
2434 topic_name
= bucket_name
+TOPIC_SUFFIX
2438 task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2440 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
,
2441 endpoint
='amqp://' + hostname
,
2442 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
2443 result
, status
= topic_conf
.set_config()
2444 assert_equal(status
/100, 2)
2445 parsed_result
= json
.loads(result
)
2446 topic_arn
= parsed_result
['arn']
2447 # create bucket on the first of the rados zones
2448 bucket
= zones
[0].create_bucket(bucket_name
)
2450 zone_meta_checkpoint(ps_zones
[0].zone
)
2451 # create s3 notification
2452 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2453 topic_conf_list
= [{'Id': notification_name
,
2454 'TopicArn': topic_arn
,
2455 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
2457 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2458 _
, status
= s3_notification_conf
.set_config()
2459 assert_equal(status
/100, 2)
2460 # create objects in the bucket
2461 number_of_objects
= 10
2462 for i
in range(number_of_objects
):
2463 key
= bucket
.new_key(str(i
))
2464 key
.set_contents_from_string('bar')
2466 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2467 # check amqp receiver
2468 keys
= list(bucket
.list())
2469 # TODO: use exact match
2470 receiver
.verify_s3_events(keys
, exact_match
=False)
2472 # delete objects from the bucket
2473 for key
in bucket
.list():
2476 zone_meta_checkpoint(ps_zones
[0].zone
)
2477 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2478 # check amqp receiver
2479 # TODO: use exact match
2480 receiver
.verify_s3_events(keys
, deletions
=True, exact_match
=False)
2483 stop_amqp_receiver(receiver
, task
)
2484 s3_notification_conf
.del_config()
2485 topic_conf
.del_config()
2486 zones
[0].delete_bucket(bucket_name
)
2487 clean_rabbitmq(proc
)
2490 def test_ps_delete_bucket():
2491 """ test notification status upon bucket deletion """
2492 zones
, ps_zones
= init_env()
2493 bucket_name
= gen_bucket_name()
2494 # create bucket on the first of the rados zones
2495 bucket
= zones
[0].create_bucket(bucket_name
)
2497 zone_meta_checkpoint(ps_zones
[0].zone
)
2498 topic_name
= bucket_name
+ TOPIC_SUFFIX
2500 topic_name
= bucket_name
+ TOPIC_SUFFIX
2501 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
2502 response
, status
= topic_conf
.set_config()
2503 assert_equal(status
/100, 2)
2504 parsed_result
= json
.loads(response
)
2505 topic_arn
= parsed_result
['arn']
2506 # create one s3 notification
2507 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2508 topic_conf_list
= [{'Id': notification_name
,
2509 'TopicArn': topic_arn
,
2510 'Events': ['s3:ObjectCreated:*']
2512 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2513 response
, status
= s3_notification_conf
.set_config()
2514 assert_equal(status
/100, 2)
2516 # create non-s3 notification
2517 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
2519 _
, status
= notification_conf
.set_config()
2520 assert_equal(status
/100, 2)
2522 # create objects in the bucket
2523 number_of_objects
= 10
2524 for i
in range(number_of_objects
):
2525 key
= bucket
.new_key(str(i
))
2526 key
.set_contents_from_string('bar')
2527 # wait for bucket sync
2528 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2529 keys
= list(bucket
.list())
2530 # delete objects from the bucket
2531 for key
in bucket
.list():
2533 # wait for bucket sync
2534 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2536 zones
[0].delete_bucket(bucket_name
)
2537 # wait for meta sync
2538 zone_meta_checkpoint(ps_zones
[0].zone
)
2540 # get the events from the auto-generated subscription
2541 sub_conf
= PSSubscription(ps_zones
[0].conn
, notification_name
,
2543 result
, _
= sub_conf
.get_events()
2544 records
= json
.loads(result
)
2545 # TODO: use exact match
2546 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
2548 # s3 notification is deleted with bucket
2549 _
, status
= s3_notification_conf
.get_config(notification
=notification_name
)
2550 assert_equal(status
, 404)
2551 # non-s3 notification is deleted with bucket
2552 _
, status
= notification_conf
.get_config()
2553 assert_equal(status
, 404)
2555 sub_conf
.del_config()
2556 topic_conf
.del_config()
2559 def test_ps_missing_topic():
2560 """ test creating a subscription when no topic info exists"""
2561 zones
, ps_zones
= init_env()
2562 bucket_name
= gen_bucket_name()
2563 topic_name
= bucket_name
+TOPIC_SUFFIX
2565 # create bucket on the first of the rados zones
2566 zones
[0].create_bucket(bucket_name
)
2568 zone_meta_checkpoint(ps_zones
[0].zone
)
2569 # create s3 notification
2570 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2571 topic_arn
= 'arn:aws:sns:::' + topic_name
2572 topic_conf_list
= [{'Id': notification_name
,
2573 'TopicArn': topic_arn
,
2574 'Events': ['s3:ObjectCreated:*']
2576 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2578 s3_notification_conf
.set_config()
2580 log
.info('missing topic is expected')
2582 assert 'missing topic is expected'
2585 zones
[0].delete_bucket(bucket_name
)
2588 def test_ps_s3_topic_update():
2589 """ test updating topic associated with a notification"""
2591 return SkipTest("PubSub push tests don't run in teuthology")
2592 rabbit_proc
= init_rabbitmq()
2593 if rabbit_proc
is None:
2594 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2595 zones
, ps_zones
= init_env()
2596 bucket_name
= gen_bucket_name()
2597 topic_name
= bucket_name
+TOPIC_SUFFIX
2602 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name
)
2604 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
,
2605 endpoint
='amqp://' + hostname
,
2606 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
2607 result
, status
= topic_conf
.set_config()
2608 assert_equal(status
/100, 2)
2609 parsed_result
= json
.loads(result
)
2610 topic_arn
= parsed_result
['arn']
2612 result
, _
= topic_conf
.get_config()
2613 # verify topic content
2614 parsed_result
= json
.loads(result
)
2615 assert_equal(parsed_result
['topic']['name'], topic_name
)
2616 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], topic_conf
.parameters
['push-endpoint'])
2618 # create http server
2619 port
= random
.randint(10000, 20000)
2620 # start an http server in a separate thread
2621 http_server
= StreamingHTTPServer(hostname
, port
)
2623 # create bucket on the first of the rados zones
2624 bucket
= zones
[0].create_bucket(bucket_name
)
2626 zone_meta_checkpoint(ps_zones
[0].zone
)
2627 # create s3 notification
2628 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2629 topic_conf_list
= [{'Id': notification_name
,
2630 'TopicArn': topic_arn
,
2631 'Events': ['s3:ObjectCreated:*']
2633 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2634 _
, status
= s3_notification_conf
.set_config()
2635 assert_equal(status
/100, 2)
2636 # create objects in the bucket
2637 number_of_objects
= 10
2638 for i
in range(number_of_objects
):
2639 key
= bucket
.new_key(str(i
))
2640 key
.set_contents_from_string('bar')
2642 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2644 keys
= list(bucket
.list())
2645 # TODO: use exact match
2646 receiver
.verify_s3_events(keys
, exact_match
=False)
2648 # update the same topic with new endpoint
2649 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
,
2650 endpoint
='http://'+ hostname
+ ':' + str(port
))
2651 _
, status
= topic_conf
.set_config()
2652 assert_equal(status
/100, 2)
2654 result
, _
= topic_conf
.get_config()
2655 # verify topic content
2656 parsed_result
= json
.loads(result
)
2657 assert_equal(parsed_result
['topic']['name'], topic_name
)
2658 assert_equal(parsed_result
['topic']['dest']['push_endpoint'], topic_conf
.parameters
['push-endpoint'])
2660 # delete current objects and create new objects in the bucket
2661 for key
in bucket
.list():
2663 for i
in range(number_of_objects
):
2664 key
= bucket
.new_key(str(i
+100))
2665 key
.set_contents_from_string('bar')
2667 zone_meta_checkpoint(ps_zones
[0].zone
)
2668 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2670 keys
= list(bucket
.list())
2671 # verify that notifications are still sent to amqp
2672 # TODO: use exact match
2673 receiver
.verify_s3_events(keys
, exact_match
=False)
2675 # update notification to update the endpoint from the topic
2676 topic_conf_list
= [{'Id': notification_name
,
2677 'TopicArn': topic_arn
,
2678 'Events': ['s3:ObjectCreated:*']
2680 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2681 _
, status
= s3_notification_conf
.set_config()
2682 assert_equal(status
/100, 2)
2684 # delete current objects and create new objects in the bucket
2685 for key
in bucket
.list():
2687 for i
in range(number_of_objects
):
2688 key
= bucket
.new_key(str(i
+200))
2689 key
.set_contents_from_string('bar')
2691 zone_meta_checkpoint(ps_zones
[0].zone
)
2692 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2694 keys
= list(bucket
.list())
2695 # check that updates switched to http
2696 # TODO: use exact match
2697 http_server
.verify_s3_events(keys
, exact_match
=False)
2700 # delete objects from the bucket
2701 stop_amqp_receiver(receiver
, amqp_task
)
2702 for key
in bucket
.list():
2704 s3_notification_conf
.del_config()
2705 topic_conf
.del_config()
2706 zones
[0].delete_bucket(bucket_name
)
2708 clean_rabbitmq(rabbit_proc
)
2711 def test_ps_s3_notification_update():
2712 """ test updating the topic of a notification"""
2714 return SkipTest("PubSub push tests don't run in teuthology")
2716 rabbit_proc
= init_rabbitmq()
2717 if rabbit_proc
is None:
2718 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2720 zones
, ps_zones
= init_env()
2721 bucket_name
= gen_bucket_name()
2722 topic_name1
= bucket_name
+'amqp'+TOPIC_SUFFIX
2723 topic_name2
= bucket_name
+'http'+TOPIC_SUFFIX
2726 # start amqp receiver in a separate thread
2728 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name1
)
2730 # create random port for the http server
2731 http_port
= random
.randint(10000, 20000)
2732 # start an http server in a separate thread
2733 http_server
= StreamingHTTPServer(hostname
, http_port
)
2735 topic_conf1
= PSTopic(ps_zones
[0].conn
, topic_name1
,
2736 endpoint
='amqp://' + hostname
,
2737 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
2738 result
, status
= topic_conf1
.set_config()
2739 parsed_result
= json
.loads(result
)
2740 topic_arn1
= parsed_result
['arn']
2741 assert_equal(status
/100, 2)
2742 topic_conf2
= PSTopic(ps_zones
[0].conn
, topic_name2
,
2743 endpoint
='http://'+hostname
+':'+str(http_port
))
2744 result
, status
= topic_conf2
.set_config()
2745 parsed_result
= json
.loads(result
)
2746 topic_arn2
= parsed_result
['arn']
2747 assert_equal(status
/100, 2)
2749 # create bucket on the first of the rados zones
2750 bucket
= zones
[0].create_bucket(bucket_name
)
2752 zone_meta_checkpoint(ps_zones
[0].zone
)
2753 # create s3 notification with topic1
2754 notification_name
= bucket_name
+ NOTIFICATION_SUFFIX
2755 topic_conf_list
= [{'Id': notification_name
,
2756 'TopicArn': topic_arn1
,
2757 'Events': ['s3:ObjectCreated:*']
2759 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2760 _
, status
= s3_notification_conf
.set_config()
2761 assert_equal(status
/100, 2)
2762 # create objects in the bucket
2763 number_of_objects
= 10
2764 for i
in range(number_of_objects
):
2765 key
= bucket
.new_key(str(i
))
2766 key
.set_contents_from_string('bar')
2768 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2770 keys
= list(bucket
.list())
2771 # TODO: use exact match
2772 receiver
.verify_s3_events(keys
, exact_match
=False);
2774 # update notification to use topic2
2775 topic_conf_list
= [{'Id': notification_name
,
2776 'TopicArn': topic_arn2
,
2777 'Events': ['s3:ObjectCreated:*']
2779 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2780 _
, status
= s3_notification_conf
.set_config()
2781 assert_equal(status
/100, 2)
2783 # delete current objects and create new objects in the bucket
2784 for key
in bucket
.list():
2786 for i
in range(number_of_objects
):
2787 key
= bucket
.new_key(str(i
+100))
2788 key
.set_contents_from_string('bar')
2790 zone_meta_checkpoint(ps_zones
[0].zone
)
2791 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2793 keys
= list(bucket
.list())
2794 # check that updates switched to http
2795 # TODO: use exact match
2796 http_server
.verify_s3_events(keys
, exact_match
=False)
2799 # delete objects from the bucket
2800 stop_amqp_receiver(receiver
, amqp_task
)
2801 for key
in bucket
.list():
2803 s3_notification_conf
.del_config()
2804 topic_conf1
.del_config()
2805 topic_conf2
.del_config()
2806 zones
[0].delete_bucket(bucket_name
)
2808 clean_rabbitmq(rabbit_proc
)
2811 def test_ps_s3_multiple_topics_notification():
2812 """ test notification creation with multiple topics"""
2814 return SkipTest("PubSub push tests don't run in teuthology")
2816 rabbit_proc
= init_rabbitmq()
2817 if rabbit_proc
is None:
2818 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2820 zones
, ps_zones
= init_env()
2821 bucket_name
= gen_bucket_name()
2822 topic_name1
= bucket_name
+'amqp'+TOPIC_SUFFIX
2823 topic_name2
= bucket_name
+'http'+TOPIC_SUFFIX
2826 # start amqp receiver in a separate thread
2828 amqp_task
, receiver
= create_amqp_receiver_thread(exchange
, topic_name1
)
2830 # create random port for the http server
2831 http_port
= random
.randint(10000, 20000)
2832 # start an http server in a separate thread
2833 http_server
= StreamingHTTPServer(hostname
, http_port
)
2835 topic_conf1
= PSTopic(ps_zones
[0].conn
, topic_name1
,
2836 endpoint
='amqp://' + hostname
,
2837 endpoint_args
='amqp-exchange=' + exchange
+ '&amqp-ack-level=none')
2838 result
, status
= topic_conf1
.set_config()
2839 parsed_result
= json
.loads(result
)
2840 topic_arn1
= parsed_result
['arn']
2841 assert_equal(status
/100, 2)
2842 topic_conf2
= PSTopic(ps_zones
[0].conn
, topic_name2
,
2843 endpoint
='http://'+hostname
+':'+str(http_port
))
2844 result
, status
= topic_conf2
.set_config()
2845 parsed_result
= json
.loads(result
)
2846 topic_arn2
= parsed_result
['arn']
2847 assert_equal(status
/100, 2)
2849 # create bucket on the first of the rados zones
2850 bucket
= zones
[0].create_bucket(bucket_name
)
2852 zone_meta_checkpoint(ps_zones
[0].zone
)
2853 # create s3 notification
2854 notification_name1
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_1'
2855 notification_name2
= bucket_name
+ NOTIFICATION_SUFFIX
+ '_2'
2858 'Id': notification_name1
,
2859 'TopicArn': topic_arn1
,
2860 'Events': ['s3:ObjectCreated:*']
2863 'Id': notification_name2
,
2864 'TopicArn': topic_arn2
,
2865 'Events': ['s3:ObjectCreated:*']
2867 s3_notification_conf
= PSNotificationS3(ps_zones
[0].conn
, bucket_name
, topic_conf_list
)
2868 _
, status
= s3_notification_conf
.set_config()
2869 assert_equal(status
/100, 2)
2870 result
, _
= s3_notification_conf
.get_config()
2871 assert_equal(len(result
['TopicConfigurations']), 2)
2872 assert_equal(result
['TopicConfigurations'][0]['Id'], notification_name1
)
2873 assert_equal(result
['TopicConfigurations'][1]['Id'], notification_name2
)
2875 # get auto-generated subscriptions
2876 sub_conf1
= PSSubscription(ps_zones
[0].conn
, notification_name1
,
2878 _
, status
= sub_conf1
.get_config()
2879 assert_equal(status
/100, 2)
2880 sub_conf2
= PSSubscription(ps_zones
[0].conn
, notification_name2
,
2882 _
, status
= sub_conf2
.get_config()
2883 assert_equal(status
/100, 2)
2885 # create objects in the bucket
2886 number_of_objects
= 10
2887 for i
in range(number_of_objects
):
2888 key
= bucket
.new_key(str(i
))
2889 key
.set_contents_from_string('bar')
2891 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
2893 # get the events from both of the subscription
2894 result
, _
= sub_conf1
.get_events()
2895 records
= json
.loads(result
)
2896 for record
in records
['Records']:
2898 keys
= list(bucket
.list())
2899 # TODO: use exact match
2900 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
2901 receiver
.verify_s3_events(keys
, exact_match
=False)
2903 result
, _
= sub_conf2
.get_events()
2904 parsed_result
= json
.loads(result
)
2905 for record
in parsed_result
['Records']:
2907 keys
= list(bucket
.list())
2908 # TODO: use exact match
2909 verify_s3_records_by_elements(records
, keys
, exact_match
=False)
2910 http_server
.verify_s3_events(keys
, exact_match
=False)
2913 stop_amqp_receiver(receiver
, amqp_task
)
2914 s3_notification_conf
.del_config()
2915 topic_conf1
.del_config()
2916 topic_conf2
.del_config()
2917 # delete objects from the bucket
2918 for key
in bucket
.list():
2920 zones
[0].delete_bucket(bucket_name
)
2922 clean_rabbitmq(rabbit_proc
)