]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests_ps.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests_ps.py
1 import logging
2 import json
3 import tempfile
4 import BaseHTTPServer
5 import SocketServer
6 import random
7 import threading
8 import subprocess
9 import socket
10 import time
11 import os
12 from .tests import get_realm, \
13 ZonegroupConns, \
14 zonegroup_meta_checkpoint, \
15 zone_meta_checkpoint, \
16 zone_bucket_checkpoint, \
17 zone_data_checkpoint, \
18 zonegroup_bucket_checkpoint, \
19 check_bucket_eq, \
20 gen_bucket_name, \
21 get_user, \
22 get_tenant
23 from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics
24 from multisite import User
25 from nose import SkipTest
26 from nose.tools import assert_not_equal, assert_equal
27
28 # configure logging for the tests module
29 log = logging.getLogger(__name__)
30
31 skip_push_tests = True
32
33 ####################################
34 # utility functions for pubsub tests
35 ####################################
36
37 def set_contents_from_string(key, content):
38 try:
39 key.set_contents_from_string(content)
40 except Exception as e:
41 print 'Error: ' + str(e)
42
43
44 # HTTP endpoint functions
45 # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
46
47 class HTTPPostHandler(BaseHTTPServer.BaseHTTPRequestHandler):
48 """HTTP POST hanler class storing the received events in its http server"""
49 def do_POST(self):
50 """implementation of POST handler"""
51 try:
52 content_length = int(self.headers['Content-Length'])
53 body = self.rfile.read(content_length)
54 log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
55 self.server.append(json.loads(body))
56 except:
57 log.error('HTTP Server received empty event')
58 self.send_response(400)
59 else:
60 self.send_response(100)
61 finally:
62 self.end_headers()
63
64
65 class HTTPServerWithEvents(BaseHTTPServer.HTTPServer):
66 """HTTP server used by the handler to store events"""
67 def __init__(self, addr, handler, worker_id):
68 BaseHTTPServer.HTTPServer.__init__(self, addr, handler, False)
69 self.worker_id = worker_id
70 self.events = []
71
72 def append(self, event):
73 self.events.append(event)
74
75
76 class HTTPServerThread(threading.Thread):
77 """thread for running the HTTP server. reusing the same socket for all threads"""
78 def __init__(self, i, sock, addr):
79 threading.Thread.__init__(self)
80 self.i = i
81 self.daemon = True
82 self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i)
83 self.httpd.socket = sock
84 # prevent the HTTP server from re-binding every handler
85 self.httpd.server_bind = self.server_close = lambda self: None
86 self.start()
87
88 def run(self):
89 try:
90 log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
91 self.httpd.serve_forever()
92 log.info('HTTP Server (%d) ended', self.i)
93 except Exception as error:
94 # could happen if the server r/w to a closing socket during shutdown
95 log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
96
97 def close(self):
98 self.httpd.shutdown()
99
100 def get_events(self):
101 return self.httpd.events
102
103 def reset_events(self):
104 self.httpd.events = []
105
106
107 class StreamingHTTPServer:
108 """multi-threaded http server class also holding list of events received into the handler
109 each thread has its own server, and all servers share the same socket"""
110 def __init__(self, host, port, num_workers=100):
111 addr = (host, port)
112 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
113 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
114 self.sock.bind(addr)
115 self.sock.listen(num_workers)
116 self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
117
118 def verify_s3_events(self, keys, exact_match=False, deletions=False):
119 """verify stored s3 records agains a list of keys"""
120 events = []
121 for worker in self.workers:
122 events += worker.get_events()
123 worker.reset_events()
124 verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
125
126 def verify_events(self, keys, exact_match=False, deletions=False):
127 """verify stored events agains a list of keys"""
128 events = []
129 for worker in self.workers:
130 events += worker.get_events()
131 worker.reset_events()
132 verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
133
134 def close(self):
135 """close all workers in the http server and wait for it to finish"""
136 # make sure that the shared socket is closed
137 # this is needed in case that one of the threads is blocked on the socket
138 self.sock.shutdown(socket.SHUT_RDWR)
139 self.sock.close()
140 # wait for server threads to finish
141 for worker in self.workers:
142 worker.close()
143 worker.join()
144
145
146 # AMQP endpoint functions
147
148 rabbitmq_port = 5672
149
150 class AMQPReceiver(object):
151 """class for receiving and storing messages on a topic from the AMQP broker"""
152 def __init__(self, exchange, topic):
153 import pika
154 hostname = get_ip()
155 remaining_retries = 10
156 while remaining_retries > 0:
157 try:
158 connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
159 break
160 except Exception as error:
161 remaining_retries -= 1
162 print 'failed to connect to rabbitmq (remaining retries ' + str(remaining_retries) + '): ' + str(error)
163 time.sleep(0.5)
164
165 if remaining_retries == 0:
166 raise Exception('failed to connect to rabbitmq - no retries left')
167
168 self.channel = connection.channel()
169 self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
170 result = self.channel.queue_declare('', exclusive=True)
171 queue_name = result.method.queue
172 self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic)
173 self.channel.basic_consume(queue=queue_name,
174 on_message_callback=self.on_message,
175 auto_ack=True)
176 self.events = []
177 self.topic = topic
178
179 def on_message(self, ch, method, properties, body):
180 """callback invoked when a new message arrive on the topic"""
181 log.info('AMQP received event for topic %s:\n %s', self.topic, body)
182 self.events.append(json.loads(body))
183
184 # TODO create a base class for the AMQP and HTTP cases
185 def verify_s3_events(self, keys, exact_match=False, deletions=False):
186 """verify stored s3 records agains a list of keys"""
187 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
188 self.events = []
189
190 def verify_events(self, keys, exact_match=False, deletions=False):
191 """verify stored events agains a list of keys"""
192 verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
193 self.events = []
194
195 def get_and_reset_events(self):
196 tmp = self.events
197 self.events = []
198 return tmp
199
200
201 def amqp_receiver_thread_runner(receiver):
202 """main thread function for the amqp receiver"""
203 try:
204 log.info('AMQP receiver started')
205 receiver.channel.start_consuming()
206 log.info('AMQP receiver ended')
207 except Exception as error:
208 log.info('AMQP receiver ended unexpectedly: %s', str(error))
209
210
211 def create_amqp_receiver_thread(exchange, topic):
212 """create amqp receiver and thread"""
213 receiver = AMQPReceiver(exchange, topic)
214 task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
215 task.daemon = True
216 return task, receiver
217
218
219 def stop_amqp_receiver(receiver, task):
220 """stop the receiver thread and wait for it to finis"""
221 try:
222 receiver.channel.stop_consuming()
223 log.info('stopping AMQP receiver')
224 except Exception as error:
225 log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
226 task.join(5)
227
228 def check_ps_configured():
229 """check if at least one pubsub zone exist"""
230 realm = get_realm()
231 zonegroup = realm.master_zonegroup()
232
233 ps_zones = zonegroup.zones_by_type.get("pubsub")
234 if not ps_zones:
235 raise SkipTest("Requires at least one PS zone")
236
237
238 def is_ps_zone(zone_conn):
239 """check if a specific zone is pubsub zone"""
240 if not zone_conn:
241 return False
242 return zone_conn.zone.tier_type() == "pubsub"
243
244
245 def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
246 """ verify there is at least one event per element """
247 err = ''
248 for key in keys:
249 key_found = False
250 if type(events) is list:
251 for event_list in events:
252 if key_found:
253 break
254 for event in event_list['events']:
255 if event['info']['bucket']['name'] == key.bucket.name and \
256 event['info']['key']['name'] == key.name:
257 if deletions and event['event'] == 'OBJECT_DELETE':
258 key_found = True
259 break
260 elif not deletions and event['event'] == 'OBJECT_CREATE':
261 key_found = True
262 break
263 else:
264 for event in events['events']:
265 if event['info']['bucket']['name'] == key.bucket.name and \
266 event['info']['key']['name'] == key.name:
267 if deletions and event['event'] == 'OBJECT_DELETE':
268 key_found = True
269 break
270 elif not deletions and event['event'] == 'OBJECT_CREATE':
271 key_found = True
272 break
273
274 if not key_found:
275 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
276 log.error(events)
277 assert False, err
278
279 if not len(events) == len(keys):
280 err = 'superfluous events are found'
281 log.debug(err)
282 if exact_match:
283 log.error(events)
284 assert False, err
285
286
287 def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False):
288 """ verify there is at least one record per element """
289 err = ''
290 for key in keys:
291 key_found = False
292 if type(records) is list:
293 for record_list in records:
294 if key_found:
295 break
296 for record in record_list['Records']:
297 if record['s3']['bucket']['name'] == key.bucket.name and \
298 record['s3']['object']['key'] == key.name:
299 if deletions and 'ObjectRemoved' in record['eventName']:
300 key_found = True
301 break
302 elif not deletions and 'ObjectCreated' in record['eventName']:
303 key_found = True
304 break
305 else:
306 for record in records['Records']:
307 if record['s3']['bucket']['name'] == key.bucket.name and \
308 record['s3']['object']['key'] == key.name:
309 if deletions and 'ObjectRemoved' in record['eventName']:
310 key_found = True
311 break
312 elif not deletions and 'ObjectCreated' in record['eventName']:
313 key_found = True
314 break
315
316 if not key_found:
317 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
318 for record_list in records:
319 for record in record_list['Records']:
320 log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
321 assert False, err
322
323 if not len(records) == len(keys):
324 err = 'superfluous records are found'
325 log.warning(err)
326 if exact_match:
327 for record_list in records:
328 for record in record_list['Records']:
329 log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
330 assert False, err
331
332
333 def init_rabbitmq():
334 """ start a rabbitmq broker """
335 hostname = get_ip()
336 #port = str(random.randint(20000, 30000))
337 #data_dir = './' + port + '_data'
338 #log_dir = './' + port + '_log'
339 #print('')
340 #try:
341 # os.mkdir(data_dir)
342 # os.mkdir(log_dir)
343 #except:
344 # print('rabbitmq directories already exists')
345 #env = {'RABBITMQ_NODE_PORT': port,
346 # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
347 # 'RABBITMQ_USE_LONGNAME': 'true',
348 # 'RABBITMQ_MNESIA_BASE': data_dir,
349 # 'RABBITMQ_LOG_BASE': log_dir}
350 # TODO: support multiple brokers per host using env
351 # make sure we don't collide with the default
352 try:
353 proc = subprocess.Popen('rabbitmq-server')
354 except Exception as error:
355 log.info('failed to execute rabbitmq-server: %s', str(error))
356 print 'failed to execute rabbitmq-server: %s' % str(error)
357 return None
358 # TODO add rabbitmq checkpoint instead of sleep
359 time.sleep(5)
360 return proc #, port, data_dir, log_dir
361
362
363 def clean_rabbitmq(proc): #, data_dir, log_dir)
364 """ stop the rabbitmq broker """
365 try:
366 subprocess.call(['rabbitmqctl', 'stop'])
367 time.sleep(5)
368 proc.terminate()
369 except:
370 log.info('rabbitmq server already terminated')
371 # TODO: add directory cleanup once multiple brokers are supported
372 #try:
373 # os.rmdir(data_dir)
374 # os.rmdir(log_dir)
375 #except:
376 # log.info('rabbitmq directories already removed')
377
378
379 def init_env(require_ps=True):
380 """initialize the environment"""
381 if require_ps:
382 check_ps_configured()
383
384 realm = get_realm()
385 zonegroup = realm.master_zonegroup()
386 zonegroup_conns = ZonegroupConns(zonegroup)
387
388 zonegroup_meta_checkpoint(zonegroup)
389
390 ps_zones = []
391 zones = []
392 for conn in zonegroup_conns.zones:
393 if is_ps_zone(conn):
394 zone_meta_checkpoint(conn.zone)
395 ps_zones.append(conn)
396 elif not conn.zone.is_read_only():
397 zones.append(conn)
398
399 assert_not_equal(len(zones), 0)
400 if require_ps:
401 assert_not_equal(len(ps_zones), 0)
402 return zones, ps_zones
403
404
405 def get_ip():
406 """ This method returns the "primary" IP on the local box (the one with a default route)
407 source: https://stackoverflow.com/a/28950776/711085
408 this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
409 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
410 try:
411 # address should not be reachable
412 s.connect(('10.255.255.255', 1))
413 ip = s.getsockname()[0]
414 finally:
415 s.close()
416 return ip
417
418
419 TOPIC_SUFFIX = "_topic"
420 SUB_SUFFIX = "_sub"
421 NOTIFICATION_SUFFIX = "_notif"
422
423 ##############
424 # pubsub tests
425 ##############
426
427 def test_ps_info():
428 """ log information for manual testing """
429 return SkipTest("only used in manual testing")
430 zones, ps_zones = init_env()
431 realm = get_realm()
432 zonegroup = realm.master_zonegroup()
433 bucket_name = gen_bucket_name()
434 # create bucket on the first of the rados zones
435 bucket = zones[0].create_bucket(bucket_name)
436 # create objects in the bucket
437 number_of_objects = 10
438 for i in range(number_of_objects):
439 key = bucket.new_key(str(i))
440 key.set_contents_from_string('bar')
441 print 'Zonegroup: ' + zonegroup.name
442 print 'user: ' + get_user()
443 print 'tenant: ' + get_tenant()
444 print 'Master Zone'
445 print_connection_info(zones[0].conn)
446 print 'PubSub Zone'
447 print_connection_info(ps_zones[0].conn)
448 print 'Bucket: ' + bucket_name
449
450
451 def test_ps_s3_notification_low_level():
452 """ test low level implementation of s3 notifications """
453 zones, ps_zones = init_env()
454 bucket_name = gen_bucket_name()
455 # create bucket on the first of the rados zones
456 zones[0].create_bucket(bucket_name)
457 # wait for sync
458 zone_meta_checkpoint(ps_zones[0].zone)
459 # create topic
460 topic_name = bucket_name + TOPIC_SUFFIX
461 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
462 result, status = topic_conf.set_config()
463 assert_equal(status/100, 2)
464 parsed_result = json.loads(result)
465 topic_arn = parsed_result['arn']
466 # create s3 notification
467 notification_name = bucket_name + NOTIFICATION_SUFFIX
468 generated_topic_name = notification_name+'_'+topic_name
469 topic_conf_list = [{'Id': notification_name,
470 'TopicArn': topic_arn,
471 'Events': ['s3:ObjectCreated:*']
472 }]
473 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
474 _, status = s3_notification_conf.set_config()
475 assert_equal(status/100, 2)
476 zone_meta_checkpoint(ps_zones[0].zone)
477 # get auto-generated topic
478 generated_topic_conf = PSTopic(ps_zones[0].conn, generated_topic_name)
479 result, status = generated_topic_conf.get_config()
480 parsed_result = json.loads(result)
481 assert_equal(status/100, 2)
482 assert_equal(parsed_result['topic']['name'], generated_topic_name)
483 # get auto-generated notification
484 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
485 generated_topic_name)
486 result, status = notification_conf.get_config()
487 parsed_result = json.loads(result)
488 assert_equal(status/100, 2)
489 assert_equal(len(parsed_result['topics']), 1)
490 # get auto-generated subscription
491 sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
492 generated_topic_name)
493 result, status = sub_conf.get_config()
494 parsed_result = json.loads(result)
495 assert_equal(status/100, 2)
496 assert_equal(parsed_result['topic'], generated_topic_name)
497 # delete s3 notification
498 _, status = s3_notification_conf.del_config(notification=notification_name)
499 assert_equal(status/100, 2)
500 # delete topic
501 _, status = topic_conf.del_config()
502 assert_equal(status/100, 2)
503
504 # verify low-level cleanup
505 _, status = generated_topic_conf.get_config()
506 assert_equal(status, 404)
507 result, status = notification_conf.get_config()
508 parsed_result = json.loads(result)
509 assert_equal(len(parsed_result['topics']), 0)
510 # TODO should return 404
511 # assert_equal(status, 404)
512 result, status = sub_conf.get_config()
513 parsed_result = json.loads(result)
514 assert_equal(parsed_result['topic'], '')
515 # TODO should return 404
516 # assert_equal(status, 404)
517
518 # cleanup
519 topic_conf.del_config()
520 # delete the bucket
521 zones[0].delete_bucket(bucket_name)
522
523
524 def test_ps_s3_notification_records():
525 """ test s3 records fetching """
526 zones, ps_zones = init_env()
527 bucket_name = gen_bucket_name()
528 # create bucket on the first of the rados zones
529 bucket = zones[0].create_bucket(bucket_name)
530 # wait for sync
531 zone_meta_checkpoint(ps_zones[0].zone)
532 # create topic
533 topic_name = bucket_name + TOPIC_SUFFIX
534 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
535 result, status = topic_conf.set_config()
536 assert_equal(status/100, 2)
537 parsed_result = json.loads(result)
538 topic_arn = parsed_result['arn']
539 # create s3 notification
540 notification_name = bucket_name + NOTIFICATION_SUFFIX
541 topic_conf_list = [{'Id': notification_name,
542 'TopicArn': topic_arn,
543 'Events': ['s3:ObjectCreated:*']
544 }]
545 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
546 _, status = s3_notification_conf.set_config()
547 assert_equal(status/100, 2)
548 zone_meta_checkpoint(ps_zones[0].zone)
549 # get auto-generated subscription
550 sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
551 topic_name)
552 _, status = sub_conf.get_config()
553 assert_equal(status/100, 2)
554 # create objects in the bucket
555 number_of_objects = 10
556 for i in range(number_of_objects):
557 key = bucket.new_key(str(i))
558 key.set_contents_from_string('bar')
559 # wait for sync
560 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
561
562 # get the events from the subscription
563 result, _ = sub_conf.get_events()
564 records = json.loads(result)
565 for record in records['Records']:
566 log.debug(record)
567 keys = list(bucket.list())
568 # TODO: use exact match
569 verify_s3_records_by_elements(records, keys, exact_match=False)
570
571 # cleanup
572 _, status = s3_notification_conf.del_config()
573 topic_conf.del_config()
574 # delete the keys
575 for key in bucket.list():
576 key.delete()
577 zones[0].delete_bucket(bucket_name)
578
579
580 def test_ps_s3_notification():
581 """ test s3 notification set/get/delete """
582 zones, ps_zones = init_env()
583 bucket_name = gen_bucket_name()
584 # create bucket on the first of the rados zones
585 zones[0].create_bucket(bucket_name)
586 # wait for sync
587 zone_meta_checkpoint(ps_zones[0].zone)
588 topic_name = bucket_name + TOPIC_SUFFIX
589 # create topic
590 topic_name = bucket_name + TOPIC_SUFFIX
591 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
592 response, status = topic_conf.set_config()
593 assert_equal(status/100, 2)
594 parsed_result = json.loads(response)
595 topic_arn = parsed_result['arn']
596 # create one s3 notification
597 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
598 topic_conf_list = [{'Id': notification_name1,
599 'TopicArn': topic_arn,
600 'Events': ['s3:ObjectCreated:*']
601 }]
602 s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
603 response, status = s3_notification_conf1.set_config()
604 assert_equal(status/100, 2)
605 # create another s3 notification with the same topic
606 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
607 topic_conf_list = [{'Id': notification_name2,
608 'TopicArn': topic_arn,
609 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
610 }]
611 s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
612 response, status = s3_notification_conf2.set_config()
613 assert_equal(status/100, 2)
614 zone_meta_checkpoint(ps_zones[0].zone)
615
616 # get all notification on a bucket
617 response, status = s3_notification_conf1.get_config()
618 assert_equal(status/100, 2)
619 assert_equal(len(response['TopicConfigurations']), 2)
620 assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
621 assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
622
623 # get specific notification on a bucket
624 response, status = s3_notification_conf1.get_config(notification=notification_name1)
625 assert_equal(status/100, 2)
626 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
627 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1)
628 response, status = s3_notification_conf2.get_config(notification=notification_name2)
629 assert_equal(status/100, 2)
630 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
631 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2)
632
633 # delete specific notifications
634 _, status = s3_notification_conf1.del_config(notification=notification_name1)
635 assert_equal(status/100, 2)
636 _, status = s3_notification_conf2.del_config(notification=notification_name2)
637 assert_equal(status/100, 2)
638
639 # cleanup
640 topic_conf.del_config()
641 # delete the bucket
642 zones[0].delete_bucket(bucket_name)
643
644 def test_ps_s3_topic_on_master():
645 """ test s3 notification set/get/delete on master """
646 zones, _ = init_env(require_ps=False)
647 realm = get_realm()
648 zonegroup = realm.master_zonegroup()
649 bucket_name = gen_bucket_name()
650 topic_name = bucket_name + TOPIC_SUFFIX
651
652 # clean all topics
653 delete_all_s3_topics(zones[0].conn, zonegroup.name)
654
655 # create s3 topics
656 endpoint_address = 'amqp://127.0.0.1:7001'
657 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
658 topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
659 topic_arn = topic_conf1.set_config()
660 assert_equal(topic_arn,
661 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1')
662
663 endpoint_address = 'http://127.0.0.1:9001'
664 endpoint_args = 'push-endpoint='+endpoint_address
665 topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
666 topic_arn = topic_conf2.set_config()
667 assert_equal(topic_arn,
668 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2')
669 endpoint_address = 'http://127.0.0.1:9002'
670 endpoint_args = 'push-endpoint='+endpoint_address
671 topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
672 topic_arn = topic_conf3.set_config()
673 assert_equal(topic_arn,
674 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
675
676 # get topic 3
677 result, status = topic_conf3.get_config()
678 assert_equal(status, 200)
679 assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
680 assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
681 # Note that endpoint args may be ordered differently in the result
682
683 # delete topic 1
684 result = topic_conf1.del_config()
685 assert_equal(status, 200)
686
687 # try to get a deleted topic
688 _, status = topic_conf1.get_config()
689 assert_equal(status, 404)
690
691 # get the remaining 2 topics
692 result = topic_conf1.get_list()
693 assert_equal(len(result['Topics']), 2)
694
695 # delete topics
696 result = topic_conf2.del_config()
697 # TODO: should be 200OK
698 # assert_equal(status, 200)
699 result = topic_conf3.del_config()
700 # TODO: should be 200OK
701 # assert_equal(status, 200)
702
703 # get topic list, make sure it is empty
704 result = topic_conf1.get_list()
705 assert_equal(len(result['Topics']), 0)
706
707
708 def test_ps_s3_notification_on_master():
709 """ test s3 notification set/get/delete on master """
710 zones, _ = init_env(require_ps=False)
711 realm = get_realm()
712 zonegroup = realm.master_zonegroup()
713 bucket_name = gen_bucket_name()
714 # create bucket
715 bucket = zones[0].create_bucket(bucket_name)
716 topic_name = bucket_name + TOPIC_SUFFIX
717 # create s3 topic
718 endpoint_address = 'amqp://127.0.0.1:7001'
719 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
720 topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
721 topic_arn = topic_conf.set_config()
722 # create s3 notification
723 notification_name = bucket_name + NOTIFICATION_SUFFIX
724 topic_conf_list = [{'Id': notification_name+'_1',
725 'TopicArn': topic_arn,
726 'Events': ['s3:ObjectCreated:*']
727 },
728 {'Id': notification_name+'_2',
729 'TopicArn': topic_arn,
730 'Events': ['s3:ObjectRemoved:*']
731 },
732 {'Id': notification_name+'_3',
733 'TopicArn': topic_arn,
734 'Events': []
735 }]
736 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
737 _, status = s3_notification_conf.set_config()
738 assert_equal(status/100, 2)
739
740 # get notifications on a bucket
741 response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
742 assert_equal(status/100, 2)
743 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
744
745 # delete specific notifications
746 _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
747 assert_equal(status/100, 2)
748
749 # get the remaining 2 notifications on a bucket
750 response, status = s3_notification_conf.get_config()
751 assert_equal(status/100, 2)
752 assert_equal(len(response['TopicConfigurations']), 2)
753 assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
754 assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
755
756 # delete remaining notifications
757 _, status = s3_notification_conf.del_config()
758 assert_equal(status/100, 2)
759
760 # make sure that the notifications are now deleted
761 _, status = s3_notification_conf.get_config()
762
763 # cleanup
764 topic_conf.del_config()
765 # delete the bucket
766 zones[0].delete_bucket(bucket_name)
767
768
769 def ps_s3_notification_filter(on_master):
770 """ test s3 notification filter on master """
771 if skip_push_tests:
772 return SkipTest("PubSub push tests don't run in teuthology")
773 hostname = get_ip()
774 proc = init_rabbitmq()
775 if proc is None:
776 return SkipTest('end2end amqp tests require rabbitmq-server installed')
777 if on_master:
778 zones, _ = init_env(require_ps=False)
779 ps_zone = zones[0]
780 else:
781 zones, ps_zones = init_env(require_ps=True)
782 ps_zone = ps_zones[0]
783
784 realm = get_realm()
785 zonegroup = realm.master_zonegroup()
786
787 # create bucket
788 bucket_name = gen_bucket_name()
789 bucket = zones[0].create_bucket(bucket_name)
790 topic_name = bucket_name + TOPIC_SUFFIX
791
792 # start amqp receivers
793 exchange = 'ex1'
794 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
795 task.start()
796
797 # create s3 topic
798 endpoint_address = 'amqp://' + hostname
799 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
800 if on_master:
801 topic_conf = PSTopicS3(ps_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
802 topic_arn = topic_conf.set_config()
803 else:
804 topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
805 result, _ = topic_conf.set_config()
806 parsed_result = json.loads(result)
807 topic_arn = parsed_result['arn']
808 zone_meta_checkpoint(ps_zone.zone)
809
810 # create s3 notification
811 notification_name = bucket_name + NOTIFICATION_SUFFIX
812 topic_conf_list = [{'Id': notification_name+'_1',
813 'TopicArn': topic_arn,
814 'Events': ['s3:ObjectCreated:*'],
815 'Filter': {
816 'Key': {
817 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
818 }
819 }
820 },
821 {'Id': notification_name+'_2',
822 'TopicArn': topic_arn,
823 'Events': ['s3:ObjectCreated:*'],
824 'Filter': {
825 'Key': {
826 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
827 {'Name': 'suffix', 'Value': 'log'}]
828 }
829 }
830 },
831 {'Id': notification_name+'_3',
832 'TopicArn': topic_arn,
833 'Events': [],
834 'Filter': {
835 'Key': {
836 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
837 }
838 }
839 }]
840
841 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
842 result, status = s3_notification_conf.set_config()
843 assert_equal(status/100, 2)
844
845 if on_master:
846 topic_conf_list = [{'Id': notification_name+'_4',
847 'TopicArn': topic_arn,
848 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
849 'Filter': {
850 'Metadata': {
851 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
852 {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
853 },
854 'Key': {
855 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
856 }
857 }
858 }]
859
860 try:
861 s3_notification_conf4 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
862 _, status = s3_notification_conf4.set_config()
863 assert_equal(status/100, 2)
864 skip_notif4 = False
865 except Exception as error:
866 print 'note: metadata filter is not supported by boto3 - skipping test'
867 skip_notif4 = True
868 else:
869 print 'filtering by attributes only supported on master zone'
870 skip_notif4 = True
871
872
873 # get all notifications
874 result, status = s3_notification_conf.get_config()
875 assert_equal(status/100, 2)
876 for conf in result['TopicConfigurations']:
877 filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
878 assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
879
880 if not skip_notif4:
881 result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
882 assert_equal(status/100, 2)
883 filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
884 assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
885
886 expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
887 expected_in2 = ['world1.log', 'world2log', 'world3.log']
888 expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt']
889 expected_in4 = ['foo', 'bar', 'hello', 'world']
890 filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
891 filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld']
892 # create objects in bucket
893 for key_name in expected_in1:
894 key = bucket.new_key(key_name)
895 key.set_contents_from_string('bar')
896 for key_name in expected_in2:
897 key = bucket.new_key(key_name)
898 key.set_contents_from_string('bar')
899 for key_name in expected_in3:
900 key = bucket.new_key(key_name)
901 key.set_contents_from_string('bar')
902 if not skip_notif4:
903 for key_name in expected_in4:
904 key = bucket.new_key(key_name)
905 key.set_metadata('foo', 'bar')
906 key.set_metadata('hello', 'world')
907 key.set_metadata('goodbye', 'cruel world')
908 key.set_contents_from_string('bar')
909 for key_name in filtered:
910 key = bucket.new_key(key_name)
911 key.set_contents_from_string('bar')
912 for key_name in filtered_with_attr:
913 key.set_metadata('foo', 'nobar')
914 key.set_metadata('hello', 'noworld')
915 key.set_metadata('goodbye', 'cruel world')
916 key = bucket.new_key(key_name)
917 key.set_contents_from_string('bar')
918
919 if on_master:
920 print 'wait for 5sec for the messages...'
921 time.sleep(5)
922 else:
923 zone_bucket_checkpoint(ps_zone.zone, zones[0].zone, bucket_name)
924
925 found_in1 = []
926 found_in2 = []
927 found_in3 = []
928 found_in4 = []
929
930 for event in receiver.get_and_reset_events():
931 notif_id = event['Records'][0]['s3']['configurationId']
932 key_name = event['Records'][0]['s3']['object']['key']
933 if notif_id == notification_name+'_1':
934 found_in1.append(key_name)
935 elif notif_id == notification_name+'_2':
936 found_in2.append(key_name)
937 elif notif_id == notification_name+'_3':
938 found_in3.append(key_name)
939 elif not skip_notif4 and notif_id == notification_name+'_4':
940 found_in4.append(key_name)
941 else:
942 assert False, 'invalid notification: ' + notif_id
943
944 assert_equal(set(found_in1), set(expected_in1))
945 assert_equal(set(found_in2), set(expected_in2))
946 assert_equal(set(found_in3), set(expected_in3))
947 if not skip_notif4:
948 assert_equal(set(found_in4), set(expected_in4))
949
950 # cleanup
951 s3_notification_conf.del_config()
952 if not skip_notif4:
953 s3_notification_conf4.del_config()
954 topic_conf.del_config()
955 # delete the bucket
956 for key in bucket.list():
957 key.delete()
958 zones[0].delete_bucket(bucket_name)
959 stop_amqp_receiver(receiver, task)
960 clean_rabbitmq(proc)
961
962
963 def test_ps_s3_notification_filter_on_master():
964 ps_s3_notification_filter(on_master=True)
965
966
967 def test_ps_s3_notification_filter():
968 ps_s3_notification_filter(on_master=False)
969
970
971 def test_ps_s3_notification_errors_on_master():
972 """ test s3 notification set/get/delete on master """
973 zones, _ = init_env(require_ps=False)
974 realm = get_realm()
975 zonegroup = realm.master_zonegroup()
976 bucket_name = gen_bucket_name()
977 # create bucket
978 bucket = zones[0].create_bucket(bucket_name)
979 topic_name = bucket_name + TOPIC_SUFFIX
980 # create s3 topic
981 endpoint_address = 'amqp://127.0.0.1:7001'
982 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
983 topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
984 topic_arn = topic_conf.set_config()
985
986 # create s3 notification with invalid event name
987 notification_name = bucket_name + NOTIFICATION_SUFFIX
988 topic_conf_list = [{'Id': notification_name,
989 'TopicArn': topic_arn,
990 'Events': ['s3:ObjectCreated:Kaboom']
991 }]
992 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
993 try:
994 result, status = s3_notification_conf.set_config()
995 except Exception as error:
996 print str(error) + ' - is expected'
997 else:
998 assert False, 'invalid event name is expected to fail'
999
1000 # create s3 notification with missing name
1001 topic_conf_list = [{'Id': '',
1002 'TopicArn': topic_arn,
1003 'Events': ['s3:ObjectCreated:Put']
1004 }]
1005 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1006 try:
1007 _, _ = s3_notification_conf.set_config()
1008 except Exception as error:
1009 print str(error) + ' - is expected'
1010 else:
1011 assert False, 'missing notification name is expected to fail'
1012
1013 # create s3 notification with invalid topic ARN
1014 invalid_topic_arn = 'kaboom'
1015 topic_conf_list = [{'Id': notification_name,
1016 'TopicArn': invalid_topic_arn,
1017 'Events': ['s3:ObjectCreated:Put']
1018 }]
1019 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1020 try:
1021 _, _ = s3_notification_conf.set_config()
1022 except Exception as error:
1023 print str(error) + ' - is expected'
1024 else:
1025 assert False, 'invalid ARN is expected to fail'
1026
1027 # create s3 notification with unknown topic ARN
1028 invalid_topic_arn = 'arn:aws:sns:a::kaboom'
1029 topic_conf_list = [{'Id': notification_name,
1030 'TopicArn': invalid_topic_arn ,
1031 'Events': ['s3:ObjectCreated:Put']
1032 }]
1033 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1034 try:
1035 _, _ = s3_notification_conf.set_config()
1036 except Exception as error:
1037 print str(error) + ' - is expected'
1038 else:
1039 assert False, 'unknown topic is expected to fail'
1040
1041 # create s3 notification with wrong bucket
1042 topic_conf_list = [{'Id': notification_name,
1043 'TopicArn': topic_arn,
1044 'Events': ['s3:ObjectCreated:Put']
1045 }]
1046 s3_notification_conf = PSNotificationS3(zones[0].conn, 'kaboom', topic_conf_list)
1047 try:
1048 _, _ = s3_notification_conf.set_config()
1049 except Exception as error:
1050 print str(error) + ' - is expected'
1051 else:
1052 assert False, 'unknown bucket is expected to fail'
1053
1054 topic_conf.del_config()
1055
1056 status = topic_conf.del_config()
1057 # deleting an unknown notification is not considered an error
1058 assert_equal(status, 200)
1059
1060 _, status = topic_conf.get_config()
1061 assert_equal(status, 404)
1062
1063 # cleanup
1064 # delete the bucket
1065 zones[0].delete_bucket(bucket_name)
1066
1067
1068 def test_objcet_timing():
1069 return SkipTest("only used in manual testing")
1070 zones, _ = init_env(require_ps=False)
1071
1072 # create bucket
1073 bucket_name = gen_bucket_name()
1074 bucket = zones[0].create_bucket(bucket_name)
1075 # create objects in the bucket (async)
1076 print 'creating objects...'
1077 number_of_objects = 1000
1078 client_threads = []
1079 start_time = time.time()
1080 content = str(bytearray(os.urandom(1024*1024)))
1081 for i in range(number_of_objects):
1082 key = bucket.new_key(str(i))
1083 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1084 thr.start()
1085 client_threads.append(thr)
1086 [thr.join() for thr in client_threads]
1087
1088 time_diff = time.time() - start_time
1089 print 'average time for object creation: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
1090
1091 print 'total number of objects: ' + str(len(list(bucket.list())))
1092
1093 print 'deleting objects...'
1094 client_threads = []
1095 start_time = time.time()
1096 for key in bucket.list():
1097 thr = threading.Thread(target = key.delete, args=())
1098 thr.start()
1099 client_threads.append(thr)
1100 [thr.join() for thr in client_threads]
1101
1102 time_diff = time.time() - start_time
1103 print 'average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
1104
1105 # cleanup
1106 zones[0].delete_bucket(bucket_name)
1107
1108
1109 def test_ps_s3_notification_push_amqp_on_master():
1110 """ test pushing amqp s3 notification on master """
1111 if skip_push_tests:
1112 return SkipTest("PubSub push tests don't run in teuthology")
1113 hostname = get_ip()
1114 proc = init_rabbitmq()
1115 if proc is None:
1116 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1117 zones, _ = init_env(require_ps=False)
1118 realm = get_realm()
1119 zonegroup = realm.master_zonegroup()
1120
1121 # create bucket
1122 bucket_name = gen_bucket_name()
1123 bucket = zones[0].create_bucket(bucket_name)
1124 topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
1125 topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
1126
1127 # start amqp receivers
1128 exchange = 'ex1'
1129 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
1130 task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
1131 task1.start()
1132 task2.start()
1133
1134 # create two s3 topic
1135 endpoint_address = 'amqp://' + hostname
1136 # with acks from broker
1137 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
1138 topic_conf1 = PSTopicS3(zones[0].conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
1139 topic_arn1 = topic_conf1.set_config()
1140 # without acks from broker
1141 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
1142 topic_conf2 = PSTopicS3(zones[0].conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
1143 topic_arn2 = topic_conf2.set_config()
1144 # create s3 notification
1145 notification_name = bucket_name + NOTIFICATION_SUFFIX
1146 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
1147 'Events': []
1148 },
1149 {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
1150 'Events': ['s3:ObjectCreated:*']
1151 }]
1152
1153 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1154 response, status = s3_notification_conf.set_config()
1155 assert_equal(status/100, 2)
1156
1157 # create objects in the bucket (async)
1158 number_of_objects = 100
1159 client_threads = []
1160 start_time = time.time()
1161 for i in range(number_of_objects):
1162 key = bucket.new_key(str(i))
1163 content = str(os.urandom(1024*1024))
1164 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1165 thr.start()
1166 client_threads.append(thr)
1167 [thr.join() for thr in client_threads]
1168
1169 time_diff = time.time() - start_time
1170 print 'average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
1171
1172 print 'wait for 5sec for the messages...'
1173 time.sleep(5)
1174
1175 # check amqp receiver
1176 keys = list(bucket.list())
1177 print 'total number of objects: ' + str(len(keys))
1178 receiver1.verify_s3_events(keys, exact_match=True)
1179 receiver2.verify_s3_events(keys, exact_match=True)
1180
1181 # delete objects from the bucket
1182 client_threads = []
1183 start_time = time.time()
1184 for key in bucket.list():
1185 thr = threading.Thread(target = key.delete, args=())
1186 thr.start()
1187 client_threads.append(thr)
1188 [thr.join() for thr in client_threads]
1189
1190 time_diff = time.time() - start_time
1191 print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
1192
1193 print 'wait for 5sec for the messages...'
1194 time.sleep(5)
1195
1196 # check amqp receiver 1 for deletions
1197 receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
1198 # check amqp receiver 2 has no deletions
1199 try:
1200 receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
1201 except:
1202 pass
1203 else:
1204 err = 'amqp receiver 2 should have no deletions'
1205 assert False, err
1206
1207
1208 # cleanup
1209 stop_amqp_receiver(receiver1, task1)
1210 stop_amqp_receiver(receiver2, task2)
1211 s3_notification_conf.del_config()
1212 topic_conf1.del_config()
1213 topic_conf2.del_config()
1214 # delete the bucket
1215 zones[0].delete_bucket(bucket_name)
1216 clean_rabbitmq(proc)
1217
1218
1219 def test_ps_s3_notification_push_http_on_master():
1220 """ test pushing http s3 notification on master """
1221 if skip_push_tests:
1222 return SkipTest("PubSub push tests don't run in teuthology")
1223 hostname = get_ip()
1224 zones, _ = init_env(require_ps=False)
1225 realm = get_realm()
1226 zonegroup = realm.master_zonegroup()
1227
1228 # create random port for the http server
1229 host = get_ip()
1230 port = random.randint(10000, 20000)
1231 # start an http server in a separate thread
1232 number_of_objects = 100
1233 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1234
1235 # create bucket
1236 bucket_name = gen_bucket_name()
1237 bucket = zones[0].create_bucket(bucket_name)
1238 topic_name = bucket_name + TOPIC_SUFFIX
1239
1240 # create s3 topic
1241 endpoint_address = 'http://'+host+':'+str(port)
1242 endpoint_args = 'push-endpoint='+endpoint_address
1243 topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1244 topic_arn = topic_conf.set_config()
1245 # create s3 notification
1246 notification_name = bucket_name + NOTIFICATION_SUFFIX
1247 topic_conf_list = [{'Id': notification_name,
1248 'TopicArn': topic_arn,
1249 'Events': []
1250 }]
1251 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1252 response, status = s3_notification_conf.set_config()
1253 assert_equal(status/100, 2)
1254
1255 # create objects in the bucket
1256 client_threads = []
1257 start_time = time.time()
1258 content = 'bar'
1259 for i in range(number_of_objects):
1260 key = bucket.new_key(str(i))
1261 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1262 thr.start()
1263 client_threads.append(thr)
1264 [thr.join() for thr in client_threads]
1265
1266 time_diff = time.time() - start_time
1267 print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
1268
1269 print 'wait for 5sec for the messages...'
1270 time.sleep(5)
1271
1272 # check http receiver
1273 keys = list(bucket.list())
1274 print 'total number of objects: ' + str(len(keys))
1275 http_server.verify_s3_events(keys, exact_match=True)
1276
1277 # delete objects from the bucket
1278 client_threads = []
1279 start_time = time.time()
1280 for key in bucket.list():
1281 thr = threading.Thread(target = key.delete, args=())
1282 thr.start()
1283 client_threads.append(thr)
1284 [thr.join() for thr in client_threads]
1285
1286 time_diff = time.time() - start_time
1287 print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
1288
1289 print 'wait for 5sec for the messages...'
1290 time.sleep(5)
1291
1292 # check http receiver
1293 http_server.verify_s3_events(keys, exact_match=True, deletions=True)
1294
1295 # cleanup
1296 topic_conf.del_config()
1297 s3_notification_conf.del_config(notification=notification_name)
1298 # delete the bucket
1299 zones[0].delete_bucket(bucket_name)
1300 http_server.close()
1301
1302
1303 def test_ps_topic():
1304 """ test set/get/delete of topic """
1305 _, ps_zones = init_env()
1306 realm = get_realm()
1307 zonegroup = realm.master_zonegroup()
1308 bucket_name = gen_bucket_name()
1309 topic_name = bucket_name+TOPIC_SUFFIX
1310
1311 # create topic
1312 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
1313 _, status = topic_conf.set_config()
1314 assert_equal(status/100, 2)
1315 # get topic
1316 result, _ = topic_conf.get_config()
1317 # verify topic content
1318 parsed_result = json.loads(result)
1319 assert_equal(parsed_result['topic']['name'], topic_name)
1320 assert_equal(len(parsed_result['subs']), 0)
1321 assert_equal(parsed_result['topic']['arn'],
1322 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
1323 # delete topic
1324 _, status = topic_conf.del_config()
1325 assert_equal(status/100, 2)
1326 # verift topic is deleted
1327 result, status = topic_conf.get_config()
1328 assert_equal(status, 404)
1329 parsed_result = json.loads(result)
1330 assert_equal(parsed_result['Code'], 'NoSuchKey')
1331
1332
1333 def test_ps_topic_with_endpoint():
1334 """ test set topic with endpoint"""
1335 _, ps_zones = init_env()
1336 bucket_name = gen_bucket_name()
1337 topic_name = bucket_name+TOPIC_SUFFIX
1338
1339 # create topic
1340 dest_endpoint = 'amqp://localhost:7001'
1341 dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
1342 topic_conf = PSTopic(ps_zones[0].conn, topic_name,
1343 endpoint=dest_endpoint,
1344 endpoint_args=dest_args)
1345 _, status = topic_conf.set_config()
1346 assert_equal(status/100, 2)
1347 # get topic
1348 result, _ = topic_conf.get_config()
1349 # verify topic content
1350 parsed_result = json.loads(result)
1351 assert_equal(parsed_result['topic']['name'], topic_name)
1352 assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint)
1353 # cleanup
1354 topic_conf.del_config()
1355
1356
1357 def test_ps_notification():
1358 """ test set/get/delete of notification """
1359 zones, ps_zones = init_env()
1360 bucket_name = gen_bucket_name()
1361 topic_name = bucket_name+TOPIC_SUFFIX
1362
1363 # create topic
1364 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
1365 topic_conf.set_config()
1366 # create bucket on the first of the rados zones
1367 zones[0].create_bucket(bucket_name)
1368 # wait for sync
1369 zone_meta_checkpoint(ps_zones[0].zone)
1370 # create notifications
1371 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
1372 topic_name)
1373 _, status = notification_conf.set_config()
1374 assert_equal(status/100, 2)
1375 # get notification
1376 result, _ = notification_conf.get_config()
1377 parsed_result = json.loads(result)
1378 assert_equal(len(parsed_result['topics']), 1)
1379 assert_equal(parsed_result['topics'][0]['topic']['name'],
1380 topic_name)
1381 # delete notification
1382 _, status = notification_conf.del_config()
1383 assert_equal(status/100, 2)
1384 result, status = notification_conf.get_config()
1385 parsed_result = json.loads(result)
1386 assert_equal(len(parsed_result['topics']), 0)
1387 # TODO should return 404
1388 # assert_equal(status, 404)
1389
1390 # cleanup
1391 topic_conf.del_config()
1392 zones[0].delete_bucket(bucket_name)
1393
1394
1395 def test_ps_notification_events():
1396 """ test set/get/delete of notification on specific events"""
1397 zones, ps_zones = init_env()
1398 bucket_name = gen_bucket_name()
1399 topic_name = bucket_name+TOPIC_SUFFIX
1400
1401 # create topic
1402 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
1403 topic_conf.set_config()
1404 # create bucket on the first of the rados zones
1405 zones[0].create_bucket(bucket_name)
1406 # wait for sync
1407 zone_meta_checkpoint(ps_zones[0].zone)
1408 # create notifications
1409 events = "OBJECT_CREATE,OBJECT_DELETE"
1410 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
1411 topic_name,
1412 events)
1413 _, status = notification_conf.set_config()
1414 assert_equal(status/100, 2)
1415 # get notification
1416 result, _ = notification_conf.get_config()
1417 parsed_result = json.loads(result)
1418 assert_equal(len(parsed_result['topics']), 1)
1419 assert_equal(parsed_result['topics'][0]['topic']['name'],
1420 topic_name)
1421 assert_not_equal(len(parsed_result['topics'][0]['events']), 0)
1422 # TODO add test for invalid event name
1423
1424 # cleanup
1425 notification_conf.del_config()
1426 topic_conf.del_config()
1427 zones[0].delete_bucket(bucket_name)
1428
1429
1430 def test_ps_subscription():
1431 """ test set/get/delete of subscription """
1432 zones, ps_zones = init_env()
1433 bucket_name = gen_bucket_name()
1434 topic_name = bucket_name+TOPIC_SUFFIX
1435
1436 # create topic
1437 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
1438 topic_conf.set_config()
1439 # create bucket on the first of the rados zones
1440 bucket = zones[0].create_bucket(bucket_name)
1441 # wait for sync
1442 zone_meta_checkpoint(ps_zones[0].zone)
1443 # create notifications
1444 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
1445 topic_name)
1446 _, status = notification_conf.set_config()
1447 assert_equal(status/100, 2)
1448 # create subscription
1449 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
1450 topic_name)
1451 _, status = sub_conf.set_config()
1452 assert_equal(status/100, 2)
1453 # get the subscription
1454 result, _ = sub_conf.get_config()
1455 parsed_result = json.loads(result)
1456 assert_equal(parsed_result['topic'], topic_name)
1457 # create objects in the bucket
1458 number_of_objects = 10
1459 for i in range(number_of_objects):
1460 key = bucket.new_key(str(i))
1461 key.set_contents_from_string('bar')
1462 # wait for sync
1463 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
1464
1465 # get the create events from the subscription
1466 result, _ = sub_conf.get_events()
1467 events = json.loads(result)
1468 for event in events['events']:
1469 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1470 keys = list(bucket.list())
1471 # TODO: use exact match
1472 verify_events_by_elements(events, keys, exact_match=False)
1473 # delete objects from the bucket
1474 for key in bucket.list():
1475 key.delete()
1476 # wait for sync
1477 zone_meta_checkpoint(ps_zones[0].zone)
1478 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
1479
1480 # get the delete events from the subscriptions
1481 result, _ = sub_conf.get_events()
1482 for event in events['events']:
1483 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1484 # TODO: check deletions
1485 # TODO: use exact match
1486 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1487 # we should see the creations as well as the deletions
1488 # delete subscription
1489 _, status = sub_conf.del_config()
1490 assert_equal(status/100, 2)
1491 result, status = sub_conf.get_config()
1492 parsed_result = json.loads(result)
1493 assert_equal(parsed_result['topic'], '')
1494 # TODO should return 404
1495 # assert_equal(status, 404)
1496
1497 # cleanup
1498 notification_conf.del_config()
1499 topic_conf.del_config()
1500 zones[0].delete_bucket(bucket_name)
1501
1502
1503 def test_ps_event_type_subscription():
1504 """ test subscriptions for different events """
1505 zones, ps_zones = init_env()
1506 bucket_name = gen_bucket_name()
1507
1508 # create topic for objects creation
1509 topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
1510 topic_create_conf = PSTopic(ps_zones[0].conn, topic_create_name)
1511 topic_create_conf.set_config()
1512 # create topic for objects deletion
1513 topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete'
1514 topic_delete_conf = PSTopic(ps_zones[0].conn, topic_delete_name)
1515 topic_delete_conf.set_config()
1516 # create topic for all events
1517 topic_name = bucket_name+TOPIC_SUFFIX+'_all'
1518 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
1519 topic_conf.set_config()
1520 # create bucket on the first of the rados zones
1521 bucket = zones[0].create_bucket(bucket_name)
1522 # wait for sync
1523 zone_meta_checkpoint(ps_zones[0].zone)
1524 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
1525 # create notifications for objects creation
1526 notification_create_conf = PSNotification(ps_zones[0].conn, bucket_name,
1527 topic_create_name, "OBJECT_CREATE")
1528 _, status = notification_create_conf.set_config()
1529 assert_equal(status/100, 2)
1530 # create notifications for objects deletion
1531 notification_delete_conf = PSNotification(ps_zones[0].conn, bucket_name,
1532 topic_delete_name, "OBJECT_DELETE")
1533 _, status = notification_delete_conf.set_config()
1534 assert_equal(status/100, 2)
1535 # create notifications for all events
1536 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
1537 topic_name, "OBJECT_DELETE,OBJECT_CREATE")
1538 _, status = notification_conf.set_config()
1539 assert_equal(status/100, 2)
1540 # create subscription for objects creation
1541 sub_create_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_create',
1542 topic_create_name)
1543 _, status = sub_create_conf.set_config()
1544 assert_equal(status/100, 2)
1545 # create subscription for objects deletion
1546 sub_delete_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_delete',
1547 topic_delete_name)
1548 _, status = sub_delete_conf.set_config()
1549 assert_equal(status/100, 2)
1550 # create subscription for all events
1551 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_all',
1552 topic_name)
1553 _, status = sub_conf.set_config()
1554 assert_equal(status/100, 2)
1555 # create objects in the bucket
1556 number_of_objects = 10
1557 for i in range(number_of_objects):
1558 key = bucket.new_key(str(i))
1559 key.set_contents_from_string('bar')
1560 # wait for sync
1561 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
1562
1563 # get the events from the creation subscription
1564 result, _ = sub_create_conf.get_events()
1565 events = json.loads(result)
1566 for event in events['events']:
1567 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
1568 '" type: "' + str(event['event']) + '"')
1569 keys = list(bucket.list())
1570 # TODO: use exact match
1571 verify_events_by_elements(events, keys, exact_match=False)
1572 # get the events from the deletions subscription
1573 result, _ = sub_delete_conf.get_events()
1574 events = json.loads(result)
1575 for event in events['events']:
1576 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
1577 '" type: "' + str(event['event']) + '"')
1578 assert_equal(len(events['events']), 0)
1579 # get the events from the all events subscription
1580 result, _ = sub_conf.get_events()
1581 events = json.loads(result)
1582 for event in events['events']:
1583 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
1584 str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1585 # TODO: use exact match
1586 verify_events_by_elements(events, keys, exact_match=False)
1587 # delete objects from the bucket
1588 for key in bucket.list():
1589 key.delete()
1590 # wait for sync
1591 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
1592 log.debug("Event (OBJECT_DELETE) synced")
1593
1594 # get the events from the creations subscription
1595 result, _ = sub_create_conf.get_events()
1596 events = json.loads(result)
1597 for event in events['events']:
1598 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
1599 '" type: "' + str(event['event']) + '"')
1600 # deletions should not change the creation events
1601 # TODO: use exact match
1602 verify_events_by_elements(events, keys, exact_match=False)
1603 # get the events from the deletions subscription
1604 result, _ = sub_delete_conf.get_events()
1605 events = json.loads(result)
1606 for event in events['events']:
1607 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
1608 '" type: "' + str(event['event']) + '"')
1609 # only deletions should be listed here
1610 # TODO: use exact match
1611 verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1612 # get the events from the all events subscription
1613 result, _ = sub_create_conf.get_events()
1614 events = json.loads(result)
1615 for event in events['events']:
1616 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
1617 '" type: "' + str(event['event']) + '"')
1618 # both deletions and creations should be here
1619 # TODO: use exact match
1620 verify_events_by_elements(events, keys, exact_match=False, deletions=False)
1621 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1622 # TODO: (1) test deletions (2) test overall number of events
1623
1624 # test subscription deletion when topic is specified
1625 _, status = sub_create_conf.del_config(topic=True)
1626 assert_equal(status/100, 2)
1627 _, status = sub_delete_conf.del_config(topic=True)
1628 assert_equal(status/100, 2)
1629 _, status = sub_conf.del_config(topic=True)
1630 assert_equal(status/100, 2)
1631
1632 # cleanup
1633 notification_create_conf.del_config()
1634 notification_delete_conf.del_config()
1635 notification_conf.del_config()
1636 topic_create_conf.del_config()
1637 topic_delete_conf.del_config()
1638 topic_conf.del_config()
1639 zones[0].delete_bucket(bucket_name)
1640
1641
1642 def test_ps_event_fetching():
1643 """ test incremental fetching of events from a subscription """
1644 zones, ps_zones = init_env()
1645 bucket_name = gen_bucket_name()
1646 topic_name = bucket_name+TOPIC_SUFFIX
1647
1648 # create topic
1649 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
1650 topic_conf.set_config()
1651 # create bucket on the first of the rados zones
1652 bucket = zones[0].create_bucket(bucket_name)
1653 # wait for sync
1654 zone_meta_checkpoint(ps_zones[0].zone)
1655 # create notifications
1656 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
1657 topic_name)
1658 _, status = notification_conf.set_config()
1659 assert_equal(status/100, 2)
1660 # create subscription
1661 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
1662 topic_name)
1663 _, status = sub_conf.set_config()
1664 assert_equal(status/100, 2)
1665 # create objects in the bucket
1666 number_of_objects = 100
1667 for i in range(number_of_objects):
1668 key = bucket.new_key(str(i))
1669 key.set_contents_from_string('bar')
1670 # wait for sync
1671 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
1672 max_events = 15
1673 total_events_count = 0
1674 next_marker = None
1675 all_events = []
1676 while True:
1677 # get the events from the subscription
1678 result, _ = sub_conf.get_events(max_events, next_marker)
1679 events = json.loads(result)
1680 total_events_count += len(events['events'])
1681 all_events.extend(events['events'])
1682 next_marker = events['next_marker']
1683 for event in events['events']:
1684 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1685 if next_marker == '':
1686 break
1687 keys = list(bucket.list())
1688 # TODO: use exact match
1689 verify_events_by_elements({'events': all_events}, keys, exact_match=False)
1690
1691 # cleanup
1692 sub_conf.del_config()
1693 notification_conf.del_config()
1694 topic_conf.del_config()
1695 for key in bucket.list():
1696 key.delete()
1697 zones[0].delete_bucket(bucket_name)
1698
1699
1700 def test_ps_event_acking():
1701 """ test acking of some events in a subscription """
1702 zones, ps_zones = init_env()
1703 bucket_name = gen_bucket_name()
1704 topic_name = bucket_name+TOPIC_SUFFIX
1705
1706 # create topic
1707 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
1708 topic_conf.set_config()
1709 # create bucket on the first of the rados zones
1710 bucket = zones[0].create_bucket(bucket_name)
1711 # wait for sync
1712 zone_meta_checkpoint(ps_zones[0].zone)
1713 # create notifications
1714 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
1715 topic_name)
1716 _, status = notification_conf.set_config()
1717 assert_equal(status/100, 2)
1718 # create subscription
1719 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
1720 topic_name)
1721 _, status = sub_conf.set_config()
1722 assert_equal(status/100, 2)
1723 # create objects in the bucket
1724 number_of_objects = 10
1725 for i in range(number_of_objects):
1726 key = bucket.new_key(str(i))
1727 key.set_contents_from_string('bar')
1728 # wait for sync
1729 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
1730
1731 # get the create events from the subscription
1732 result, _ = sub_conf.get_events()
1733 events = json.loads(result)
1734 original_number_of_events = len(events)
1735 for event in events['events']:
1736 log.debug('Event (before ack) id: "' + str(event['id']) + '"')
1737 keys = list(bucket.list())
1738 # TODO: use exact match
1739 verify_events_by_elements(events, keys, exact_match=False)
1740 # ack half of the events
1741 events_to_ack = number_of_objects/2
1742 for event in events['events']:
1743 if events_to_ack == 0:
1744 break
1745 _, status = sub_conf.ack_events(event['id'])
1746 assert_equal(status/100, 2)
1747 events_to_ack -= 1
1748
1749 # verify that acked events are gone
1750 result, _ = sub_conf.get_events()
1751 events = json.loads(result)
1752 for event in events['events']:
1753 log.debug('Event (after ack) id: "' + str(event['id']) + '"')
1754 assert len(events) >= (original_number_of_events - number_of_objects/2)
1755
1756 # cleanup
1757 sub_conf.del_config()
1758 notification_conf.del_config()
1759 topic_conf.del_config()
1760 for key in bucket.list():
1761 key.delete()
1762 zones[0].delete_bucket(bucket_name)
1763
1764
1765 def test_ps_creation_triggers():
1766 """ test object creation notifications in using put/copy/post """
1767 zones, ps_zones = init_env()
1768 bucket_name = gen_bucket_name()
1769 topic_name = bucket_name+TOPIC_SUFFIX
1770
1771 # create topic
1772 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
1773 topic_conf.set_config()
1774 # create bucket on the first of the rados zones
1775 bucket = zones[0].create_bucket(bucket_name)
1776 # wait for sync
1777 zone_meta_checkpoint(ps_zones[0].zone)
1778 # create notifications
1779 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
1780 topic_name)
1781 _, status = notification_conf.set_config()
1782 assert_equal(status/100, 2)
1783 # create subscription
1784 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
1785 topic_name)
1786 _, status = sub_conf.set_config()
1787 assert_equal(status/100, 2)
1788 # create objects in the bucket using PUT
1789 key = bucket.new_key('put')
1790 key.set_contents_from_string('bar')
1791 # create objects in the bucket using COPY
1792 bucket.copy_key('copy', bucket.name, key.name)
1793 # create objects in the bucket using multi-part upload
1794 fp = tempfile.TemporaryFile(mode='w')
1795 fp.write('bar')
1796 fp.close()
1797 uploader = bucket.initiate_multipart_upload('multipart')
1798 fp = tempfile.TemporaryFile(mode='r')
1799 uploader.upload_part_from_file(fp, 1)
1800 uploader.complete_upload()
1801 fp.close()
1802 # wait for sync
1803 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
1804
1805 # get the create events from the subscription
1806 result, _ = sub_conf.get_events()
1807 events = json.loads(result)
1808 for event in events['events']:
1809 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1810
1811 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
1812 assert len(events['events']) >= 3
1813 # cleanup
1814 sub_conf.del_config()
1815 notification_conf.del_config()
1816 topic_conf.del_config()
1817 for key in bucket.list():
1818 key.delete()
1819 zones[0].delete_bucket(bucket_name)
1820
1821
1822 def test_ps_s3_creation_triggers_on_master():
1823 """ test object creation s3 notifications in using put/copy/post on master"""
1824 if skip_push_tests:
1825 return SkipTest("PubSub push tests don't run in teuthology")
1826 hostname = get_ip()
1827 proc = init_rabbitmq()
1828 if proc is None:
1829 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1830 zones, _ = init_env(require_ps=False)
1831 realm = get_realm()
1832 zonegroup = realm.master_zonegroup()
1833
1834 # create bucket
1835 bucket_name = gen_bucket_name()
1836 bucket = zones[0].create_bucket(bucket_name)
1837 topic_name = bucket_name + TOPIC_SUFFIX
1838
1839 # start amqp receiver
1840 exchange = 'ex1'
1841 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
1842 task.start()
1843
1844 # create s3 topic
1845 endpoint_address = 'amqp://' + hostname
1846 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
1847 topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1848 topic_arn = topic_conf.set_config()
1849 # create s3 notification
1850 notification_name = bucket_name + NOTIFICATION_SUFFIX
1851 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
1852 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
1853 }]
1854
1855 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1856 response, status = s3_notification_conf.set_config()
1857 assert_equal(status/100, 2)
1858
1859 # create objects in the bucket using PUT
1860 key = bucket.new_key('put')
1861 key.set_contents_from_string('bar')
1862 # create objects in the bucket using COPY
1863 bucket.copy_key('copy', bucket.name, key.name)
1864 # create objects in the bucket using multi-part upload
1865 fp = tempfile.TemporaryFile(mode='w')
1866 fp.write('bar')
1867 fp.close()
1868 uploader = bucket.initiate_multipart_upload('multipart')
1869 fp = tempfile.TemporaryFile(mode='r')
1870 uploader.upload_part_from_file(fp, 1)
1871 uploader.complete_upload()
1872 fp.close()
1873
1874 print 'wait for 5sec for the messages...'
1875 time.sleep(5)
1876
1877 # check amqp receiver
1878 keys = list(bucket.list())
1879 receiver.verify_s3_events(keys, exact_match=True)
1880
1881 # cleanup
1882 stop_amqp_receiver(receiver, task)
1883 s3_notification_conf.del_config()
1884 topic_conf.del_config()
1885 for key in bucket.list():
1886 key.delete()
1887 # delete the bucket
1888 zones[0].delete_bucket(bucket_name)
1889 clean_rabbitmq(proc)
1890
1891
1892 def test_ps_s3_multipart_on_master():
1893 """ test multipart object upload on master"""
1894 if skip_push_tests:
1895 return SkipTest("PubSub push tests don't run in teuthology")
1896 hostname = get_ip()
1897 proc = init_rabbitmq()
1898 if proc is None:
1899 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1900 zones, _ = init_env(require_ps=False)
1901 realm = get_realm()
1902 zonegroup = realm.master_zonegroup()
1903
1904 # create bucket
1905 bucket_name = gen_bucket_name()
1906 bucket = zones[0].create_bucket(bucket_name)
1907 topic_name = bucket_name + TOPIC_SUFFIX
1908
1909 # start amqp receivers
1910 exchange = 'ex1'
1911 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
1912 task1.start()
1913 task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
1914 task2.start()
1915 task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
1916 task3.start()
1917
1918 # create s3 topics
1919 endpoint_address = 'amqp://' + hostname
1920 endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
1921 topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
1922 topic_arn1 = topic_conf1.set_config()
1923 topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
1924 topic_arn2 = topic_conf2.set_config()
1925 topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
1926 topic_arn3 = topic_conf3.set_config()
1927
1928 # create s3 notifications
1929 notification_name = bucket_name + NOTIFICATION_SUFFIX
1930 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
1931 'Events': ['s3:ObjectCreated:*']
1932 },
1933 {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
1934 'Events': ['s3:ObjectCreated:Post']
1935 },
1936 {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
1937 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
1938 }]
1939 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1940 response, status = s3_notification_conf.set_config()
1941 assert_equal(status/100, 2)
1942
1943 # create objects in the bucket using multi-part upload
1944 fp = tempfile.TemporaryFile(mode='w+b')
1945 content = bytearray(os.urandom(1024*1024))
1946 fp.write(content)
1947 fp.flush()
1948 fp.seek(0)
1949 uploader = bucket.initiate_multipart_upload('multipart')
1950 uploader.upload_part_from_file(fp, 1)
1951 uploader.complete_upload()
1952 fp.close()
1953
1954 print 'wait for 5sec for the messages...'
1955 time.sleep(5)
1956
1957 # check amqp receiver
1958 events = receiver1.get_and_reset_events()
1959 assert_equal(len(events), 3)
1960
1961 events = receiver2.get_and_reset_events()
1962 assert_equal(len(events), 1)
1963 assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post')
1964 assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2')
1965
1966 events = receiver3.get_and_reset_events()
1967 assert_equal(len(events), 1)
1968 assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
1969 assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
1970
1971 # cleanup
1972 stop_amqp_receiver(receiver1, task1)
1973 stop_amqp_receiver(receiver2, task2)
1974 stop_amqp_receiver(receiver3, task3)
1975 s3_notification_conf.del_config()
1976 topic_conf1.del_config()
1977 topic_conf2.del_config()
1978 topic_conf3.del_config()
1979 for key in bucket.list():
1980 key.delete()
1981 # delete the bucket
1982 zones[0].delete_bucket(bucket_name)
1983 clean_rabbitmq(proc)
1984
1985
1986 def test_ps_versioned_deletion():
1987 """ test notification of deletion markers """
1988 zones, ps_zones = init_env()
1989 bucket_name = gen_bucket_name()
1990 topic_name = bucket_name+TOPIC_SUFFIX
1991
1992 # create topics
1993 topic_conf1 = PSTopic(ps_zones[0].conn, topic_name+'_1')
1994 _, status = topic_conf1.set_config()
1995 assert_equal(status/100, 2)
1996 topic_conf2 = PSTopic(ps_zones[0].conn, topic_name+'_2')
1997 _, status = topic_conf2.set_config()
1998 assert_equal(status/100, 2)
1999
2000 # create bucket on the first of the rados zones
2001 bucket = zones[0].create_bucket(bucket_name)
2002 bucket.configure_versioning(True)
2003
2004 # wait for sync
2005 zone_meta_checkpoint(ps_zones[0].zone)
2006
2007 # create notifications
2008 event_type1 = 'OBJECT_DELETE'
2009 notification_conf1 = PSNotification(ps_zones[0].conn, bucket_name,
2010 topic_name+'_1',
2011 event_type1)
2012 _, status = notification_conf1.set_config()
2013 assert_equal(status/100, 2)
2014 event_type2 = 'DELETE_MARKER_CREATE'
2015 notification_conf2 = PSNotification(ps_zones[0].conn, bucket_name,
2016 topic_name+'_2',
2017 event_type2)
2018 _, status = notification_conf2.set_config()
2019 assert_equal(status/100, 2)
2020
2021 # create subscriptions
2022 sub_conf1 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_1',
2023 topic_name+'_1')
2024 _, status = sub_conf1.set_config()
2025 assert_equal(status/100, 2)
2026 sub_conf2 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_2',
2027 topic_name+'_2')
2028 _, status = sub_conf2.set_config()
2029 assert_equal(status/100, 2)
2030
2031 # create objects in the bucket
2032 key = bucket.new_key('foo')
2033 key.set_contents_from_string('bar')
2034 v1 = key.version_id
2035 key.set_contents_from_string('kaboom')
2036 v2 = key.version_id
2037 # create deletion marker
2038 delete_marker_key = bucket.delete_key(key.name)
2039
2040 # wait for sync
2041 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2042
2043 # delete the deletion marker
2044 delete_marker_key.delete()
2045 # delete versions
2046 bucket.delete_key(key.name, version_id=v2)
2047 bucket.delete_key(key.name, version_id=v1)
2048
2049 # wait for sync
2050 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2051
2052 # get the delete events from the subscription
2053 result, _ = sub_conf1.get_events()
2054 events = json.loads(result)
2055 for event in events['events']:
2056 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2057 assert_equal(str(event['event']), event_type1)
2058
2059 result, _ = sub_conf2.get_events()
2060 events = json.loads(result)
2061 for event in events['events']:
2062 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2063 assert_equal(str(event['event']), event_type2)
2064
2065 # cleanup
2066 # follwing is needed for the cleanup in the case of 3-zones
2067 # see: http://tracker.ceph.com/issues/39142
2068 realm = get_realm()
2069 zonegroup = realm.master_zonegroup()
2070 zonegroup_conns = ZonegroupConns(zonegroup)
2071 try:
2072 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
2073 zones[0].delete_bucket(bucket_name)
2074 except:
2075 log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
2076 sub_conf1.del_config()
2077 sub_conf2.del_config()
2078 notification_conf1.del_config()
2079 notification_conf2.del_config()
2080 topic_conf1.del_config()
2081 topic_conf2.del_config()
2082
2083
2084 def test_ps_s3_metadata_on_master():
2085 """ test s3 notification of metadata on master """
2086 if skip_push_tests:
2087 return SkipTest("PubSub push tests don't run in teuthology")
2088 hostname = get_ip()
2089 proc = init_rabbitmq()
2090 if proc is None:
2091 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2092 zones, _ = init_env(require_ps=False)
2093 realm = get_realm()
2094 zonegroup = realm.master_zonegroup()
2095
2096 # create bucket
2097 bucket_name = gen_bucket_name()
2098 bucket = zones[0].create_bucket(bucket_name)
2099 topic_name = bucket_name + TOPIC_SUFFIX
2100
2101 # start amqp receiver
2102 exchange = 'ex1'
2103 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2104 task.start()
2105
2106 # create s3 topic
2107 endpoint_address = 'amqp://' + hostname
2108 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
2109 topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
2110 topic_arn = topic_conf.set_config()
2111 # create s3 notification
2112 notification_name = bucket_name + NOTIFICATION_SUFFIX
2113 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
2114 'Events': ['s3:ObjectCreated:*']
2115 }]
2116
2117 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
2118 response, status = s3_notification_conf.set_config()
2119 assert_equal(status/100, 2)
2120
2121 # create objects in the bucket
2122 key = bucket.new_key('foo')
2123 key.set_metadata('meta1', 'This is my metadata value')
2124 key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
2125 keys = list(bucket.list())
2126 print 'wait for 5sec for the messages...'
2127 time.sleep(5)
2128 # check amqp receiver
2129 receiver.verify_s3_events(keys, exact_match=True)
2130
2131 # cleanup
2132 stop_amqp_receiver(receiver, task)
2133 s3_notification_conf.del_config()
2134 topic_conf.del_config()
2135 for key in bucket.list():
2136 key.delete()
2137 # delete the bucket
2138 zones[0].delete_bucket(bucket_name)
2139 clean_rabbitmq(proc)
2140
2141
2142 def test_ps_s3_versioned_deletion_on_master():
2143 """ test s3 notification of deletion markers on master """
2144 if skip_push_tests:
2145 return SkipTest("PubSub push tests don't run in teuthology")
2146 hostname = get_ip()
2147 proc = init_rabbitmq()
2148 if proc is None:
2149 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2150 zones, _ = init_env(require_ps=False)
2151 realm = get_realm()
2152 zonegroup = realm.master_zonegroup()
2153
2154 # create bucket
2155 bucket_name = gen_bucket_name()
2156 bucket = zones[0].create_bucket(bucket_name)
2157 bucket.configure_versioning(True)
2158 topic_name = bucket_name + TOPIC_SUFFIX
2159
2160 # start amqp receiver
2161 exchange = 'ex1'
2162 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2163 task.start()
2164
2165 # create s3 topic
2166 endpoint_address = 'amqp://' + hostname
2167 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
2168 topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
2169 topic_arn = topic_conf.set_config()
2170 # create s3 notification
2171 notification_name = bucket_name + NOTIFICATION_SUFFIX
2172 # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
2173 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
2174 'Events': ['s3:ObjectRemoved:*']
2175 },
2176 {'Id': notification_name+'_2', 'TopicArn': topic_arn,
2177 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
2178 },
2179 {'Id': notification_name+'_3', 'TopicArn': topic_arn,
2180 'Events': ['s3:ObjectRemoved:Delete']
2181 }]
2182 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
2183 response, status = s3_notification_conf.set_config()
2184 assert_equal(status/100, 2)
2185
2186 # create objects in the bucket
2187 key = bucket.new_key('foo')
2188 key.set_contents_from_string('bar')
2189 v1 = key.version_id
2190 key.set_contents_from_string('kaboom')
2191 v2 = key.version_id
2192 # create delete marker (non versioned deletion)
2193 delete_marker_key = bucket.delete_key(key.name)
2194
2195 time.sleep(1)
2196
2197 # versioned deletion
2198 bucket.delete_key(key.name, version_id=v2)
2199 bucket.delete_key(key.name, version_id=v1)
2200 delete_marker_key.delete()
2201
2202 print 'wait for 5sec for the messages...'
2203 time.sleep(5)
2204
2205 # check amqp receiver
2206 events = receiver.get_and_reset_events()
2207 delete_events = 0
2208 delete_marker_create_events = 0
2209 for event_list in events:
2210 for event in event_list['Records']:
2211 if event['eventName'] == 's3:ObjectRemoved:Delete':
2212 delete_events += 1
2213 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
2214 if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
2215 delete_marker_create_events += 1
2216 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
2217
2218 # 3 key versions were deleted (v1, v2 and the deletion marker)
2219 # notified over the same topic via 2 notifications (1,3)
2220 assert_equal(delete_events, 3*2)
2221 # 1 deletion marker was created
2222 # notified over the same topic over 2 notifications (1,2)
2223 assert_equal(delete_marker_create_events, 1*2)
2224
2225 # cleanup
2226 stop_amqp_receiver(receiver, task)
2227 s3_notification_conf.del_config()
2228 topic_conf.del_config()
2229 # delete the bucket
2230 zones[0].delete_bucket(bucket_name)
2231 clean_rabbitmq(proc)
2232
2233
2234 def test_ps_push_http():
2235 """ test pushing to http endpoint """
2236 if skip_push_tests:
2237 return SkipTest("PubSub push tests don't run in teuthology")
2238 zones, ps_zones = init_env()
2239 bucket_name = gen_bucket_name()
2240 topic_name = bucket_name+TOPIC_SUFFIX
2241
2242 # create random port for the http server
2243 host = get_ip()
2244 port = random.randint(10000, 20000)
2245 # start an http server in a separate thread
2246 http_server = StreamingHTTPServer(host, port)
2247
2248 # create topic
2249 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
2250 _, status = topic_conf.set_config()
2251 assert_equal(status/100, 2)
2252 # create bucket on the first of the rados zones
2253 bucket = zones[0].create_bucket(bucket_name)
2254 # wait for sync
2255 zone_meta_checkpoint(ps_zones[0].zone)
2256 # create notifications
2257 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
2258 topic_name)
2259 _, status = notification_conf.set_config()
2260 assert_equal(status/100, 2)
2261 # create subscription
2262 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
2263 topic_name, endpoint='http://'+host+':'+str(port))
2264 _, status = sub_conf.set_config()
2265 assert_equal(status/100, 2)
2266 # create objects in the bucket
2267 number_of_objects = 10
2268 for i in range(number_of_objects):
2269 key = bucket.new_key(str(i))
2270 key.set_contents_from_string('bar')
2271 # wait for sync
2272 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2273 # check http server
2274 keys = list(bucket.list())
2275 # TODO: use exact match
2276 http_server.verify_events(keys, exact_match=False)
2277
2278 # delete objects from the bucket
2279 for key in bucket.list():
2280 key.delete()
2281 # wait for sync
2282 zone_meta_checkpoint(ps_zones[0].zone)
2283 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2284 # check http server
2285 # TODO: use exact match
2286 http_server.verify_events(keys, deletions=True, exact_match=False)
2287
2288 # cleanup
2289 sub_conf.del_config()
2290 notification_conf.del_config()
2291 topic_conf.del_config()
2292 zones[0].delete_bucket(bucket_name)
2293 http_server.close()
2294
2295
2296 def test_ps_s3_push_http():
2297 """ test pushing to http endpoint s3 record format"""
2298 if skip_push_tests:
2299 return SkipTest("PubSub push tests don't run in teuthology")
2300 zones, ps_zones = init_env()
2301 bucket_name = gen_bucket_name()
2302 topic_name = bucket_name+TOPIC_SUFFIX
2303
2304 # create random port for the http server
2305 host = get_ip()
2306 port = random.randint(10000, 20000)
2307 # start an http server in a separate thread
2308 http_server = StreamingHTTPServer(host, port)
2309
2310 # create topic
2311 topic_conf = PSTopic(ps_zones[0].conn, topic_name,
2312 endpoint='http://'+host+':'+str(port))
2313 result, status = topic_conf.set_config()
2314 assert_equal(status/100, 2)
2315 parsed_result = json.loads(result)
2316 topic_arn = parsed_result['arn']
2317 # create bucket on the first of the rados zones
2318 bucket = zones[0].create_bucket(bucket_name)
2319 # wait for sync
2320 zone_meta_checkpoint(ps_zones[0].zone)
2321 # create s3 notification
2322 notification_name = bucket_name + NOTIFICATION_SUFFIX
2323 topic_conf_list = [{'Id': notification_name,
2324 'TopicArn': topic_arn,
2325 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
2326 }]
2327 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2328 _, status = s3_notification_conf.set_config()
2329 assert_equal(status/100, 2)
2330 # create objects in the bucket
2331 number_of_objects = 10
2332 for i in range(number_of_objects):
2333 key = bucket.new_key(str(i))
2334 key.set_contents_from_string('bar')
2335 # wait for sync
2336 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2337 # check http server
2338 keys = list(bucket.list())
2339 # TODO: use exact match
2340 http_server.verify_s3_events(keys, exact_match=False)
2341
2342 # delete objects from the bucket
2343 for key in bucket.list():
2344 key.delete()
2345 # wait for sync
2346 zone_meta_checkpoint(ps_zones[0].zone)
2347 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2348 # check http server
2349 # TODO: use exact match
2350 http_server.verify_s3_events(keys, deletions=True, exact_match=False)
2351
2352 # cleanup
2353 s3_notification_conf.del_config()
2354 topic_conf.del_config()
2355 zones[0].delete_bucket(bucket_name)
2356 http_server.close()
2357
2358
2359 def test_ps_push_amqp():
2360 """ test pushing to amqp endpoint """
2361 if skip_push_tests:
2362 return SkipTest("PubSub push tests don't run in teuthology")
2363 hostname = get_ip()
2364 proc = init_rabbitmq()
2365 if proc is None:
2366 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2367 zones, ps_zones = init_env()
2368 bucket_name = gen_bucket_name()
2369 topic_name = bucket_name+TOPIC_SUFFIX
2370
2371 # create topic
2372 exchange = 'ex1'
2373 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2374 task.start()
2375 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
2376 _, status = topic_conf.set_config()
2377 assert_equal(status/100, 2)
2378 # create bucket on the first of the rados zones
2379 bucket = zones[0].create_bucket(bucket_name)
2380 # wait for sync
2381 zone_meta_checkpoint(ps_zones[0].zone)
2382 # create notifications
2383 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
2384 topic_name)
2385 _, status = notification_conf.set_config()
2386 assert_equal(status/100, 2)
2387 # create subscription
2388 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
2389 topic_name, endpoint='amqp://'+hostname,
2390 endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker')
2391 _, status = sub_conf.set_config()
2392 assert_equal(status/100, 2)
2393 # create objects in the bucket
2394 number_of_objects = 10
2395 for i in range(number_of_objects):
2396 key = bucket.new_key(str(i))
2397 key.set_contents_from_string('bar')
2398 # wait for sync
2399 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2400 # check amqp receiver
2401 keys = list(bucket.list())
2402 # TODO: use exact match
2403 receiver.verify_events(keys, exact_match=False)
2404
2405 # delete objects from the bucket
2406 for key in bucket.list():
2407 key.delete()
2408 # wait for sync
2409 zone_meta_checkpoint(ps_zones[0].zone)
2410 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2411 # check amqp receiver
2412 # TODO: use exact match
2413 receiver.verify_events(keys, deletions=True, exact_match=False)
2414
2415 # cleanup
2416 stop_amqp_receiver(receiver, task)
2417 sub_conf.del_config()
2418 notification_conf.del_config()
2419 topic_conf.del_config()
2420 zones[0].delete_bucket(bucket_name)
2421 clean_rabbitmq(proc)
2422
2423
2424 def test_ps_s3_push_amqp():
2425 """ test pushing to amqp endpoint s3 record format"""
2426 if skip_push_tests:
2427 return SkipTest("PubSub push tests don't run in teuthology")
2428 hostname = get_ip()
2429 proc = init_rabbitmq()
2430 if proc is None:
2431 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2432 zones, ps_zones = init_env()
2433 bucket_name = gen_bucket_name()
2434 topic_name = bucket_name+TOPIC_SUFFIX
2435
2436 # create topic
2437 exchange = 'ex1'
2438 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2439 task.start()
2440 topic_conf = PSTopic(ps_zones[0].conn, topic_name,
2441 endpoint='amqp://' + hostname,
2442 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
2443 result, status = topic_conf.set_config()
2444 assert_equal(status/100, 2)
2445 parsed_result = json.loads(result)
2446 topic_arn = parsed_result['arn']
2447 # create bucket on the first of the rados zones
2448 bucket = zones[0].create_bucket(bucket_name)
2449 # wait for sync
2450 zone_meta_checkpoint(ps_zones[0].zone)
2451 # create s3 notification
2452 notification_name = bucket_name + NOTIFICATION_SUFFIX
2453 topic_conf_list = [{'Id': notification_name,
2454 'TopicArn': topic_arn,
2455 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
2456 }]
2457 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2458 _, status = s3_notification_conf.set_config()
2459 assert_equal(status/100, 2)
2460 # create objects in the bucket
2461 number_of_objects = 10
2462 for i in range(number_of_objects):
2463 key = bucket.new_key(str(i))
2464 key.set_contents_from_string('bar')
2465 # wait for sync
2466 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2467 # check amqp receiver
2468 keys = list(bucket.list())
2469 # TODO: use exact match
2470 receiver.verify_s3_events(keys, exact_match=False)
2471
2472 # delete objects from the bucket
2473 for key in bucket.list():
2474 key.delete()
2475 # wait for sync
2476 zone_meta_checkpoint(ps_zones[0].zone)
2477 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2478 # check amqp receiver
2479 # TODO: use exact match
2480 receiver.verify_s3_events(keys, deletions=True, exact_match=False)
2481
2482 # cleanup
2483 stop_amqp_receiver(receiver, task)
2484 s3_notification_conf.del_config()
2485 topic_conf.del_config()
2486 zones[0].delete_bucket(bucket_name)
2487 clean_rabbitmq(proc)
2488
2489
2490 def test_ps_delete_bucket():
2491 """ test notification status upon bucket deletion """
2492 zones, ps_zones = init_env()
2493 bucket_name = gen_bucket_name()
2494 # create bucket on the first of the rados zones
2495 bucket = zones[0].create_bucket(bucket_name)
2496 # wait for sync
2497 zone_meta_checkpoint(ps_zones[0].zone)
2498 topic_name = bucket_name + TOPIC_SUFFIX
2499 # create topic
2500 topic_name = bucket_name + TOPIC_SUFFIX
2501 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
2502 response, status = topic_conf.set_config()
2503 assert_equal(status/100, 2)
2504 parsed_result = json.loads(response)
2505 topic_arn = parsed_result['arn']
2506 # create one s3 notification
2507 notification_name = bucket_name + NOTIFICATION_SUFFIX
2508 topic_conf_list = [{'Id': notification_name,
2509 'TopicArn': topic_arn,
2510 'Events': ['s3:ObjectCreated:*']
2511 }]
2512 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2513 response, status = s3_notification_conf.set_config()
2514 assert_equal(status/100, 2)
2515
2516 # create non-s3 notification
2517 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
2518 topic_name)
2519 _, status = notification_conf.set_config()
2520 assert_equal(status/100, 2)
2521
2522 # create objects in the bucket
2523 number_of_objects = 10
2524 for i in range(number_of_objects):
2525 key = bucket.new_key(str(i))
2526 key.set_contents_from_string('bar')
2527 # wait for bucket sync
2528 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2529 keys = list(bucket.list())
2530 # delete objects from the bucket
2531 for key in bucket.list():
2532 key.delete()
2533 # wait for bucket sync
2534 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2535 # delete the bucket
2536 zones[0].delete_bucket(bucket_name)
2537 # wait for meta sync
2538 zone_meta_checkpoint(ps_zones[0].zone)
2539
2540 # get the events from the auto-generated subscription
2541 sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
2542 topic_name)
2543 result, _ = sub_conf.get_events()
2544 records = json.loads(result)
2545 # TODO: use exact match
2546 verify_s3_records_by_elements(records, keys, exact_match=False)
2547
2548 # s3 notification is deleted with bucket
2549 _, status = s3_notification_conf.get_config(notification=notification_name)
2550 assert_equal(status, 404)
2551 # non-s3 notification is deleted with bucket
2552 _, status = notification_conf.get_config()
2553 assert_equal(status, 404)
2554 # cleanup
2555 sub_conf.del_config()
2556 topic_conf.del_config()
2557
2558
2559 def test_ps_missing_topic():
2560 """ test creating a subscription when no topic info exists"""
2561 zones, ps_zones = init_env()
2562 bucket_name = gen_bucket_name()
2563 topic_name = bucket_name+TOPIC_SUFFIX
2564
2565 # create bucket on the first of the rados zones
2566 zones[0].create_bucket(bucket_name)
2567 # wait for sync
2568 zone_meta_checkpoint(ps_zones[0].zone)
2569 # create s3 notification
2570 notification_name = bucket_name + NOTIFICATION_SUFFIX
2571 topic_arn = 'arn:aws:sns:::' + topic_name
2572 topic_conf_list = [{'Id': notification_name,
2573 'TopicArn': topic_arn,
2574 'Events': ['s3:ObjectCreated:*']
2575 }]
2576 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2577 try:
2578 s3_notification_conf.set_config()
2579 except:
2580 log.info('missing topic is expected')
2581 else:
2582 assert 'missing topic is expected'
2583
2584 # cleanup
2585 zones[0].delete_bucket(bucket_name)
2586
2587
2588 def test_ps_s3_topic_update():
2589 """ test updating topic associated with a notification"""
2590 if skip_push_tests:
2591 return SkipTest("PubSub push tests don't run in teuthology")
2592 rabbit_proc = init_rabbitmq()
2593 if rabbit_proc is None:
2594 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2595 zones, ps_zones = init_env()
2596 bucket_name = gen_bucket_name()
2597 topic_name = bucket_name+TOPIC_SUFFIX
2598
2599 # create amqp topic
2600 hostname = get_ip()
2601 exchange = 'ex1'
2602 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2603 amqp_task.start()
2604 topic_conf = PSTopic(ps_zones[0].conn, topic_name,
2605 endpoint='amqp://' + hostname,
2606 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
2607 result, status = topic_conf.set_config()
2608 assert_equal(status/100, 2)
2609 parsed_result = json.loads(result)
2610 topic_arn = parsed_result['arn']
2611 # get topic
2612 result, _ = topic_conf.get_config()
2613 # verify topic content
2614 parsed_result = json.loads(result)
2615 assert_equal(parsed_result['topic']['name'], topic_name)
2616 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
2617
2618 # create http server
2619 port = random.randint(10000, 20000)
2620 # start an http server in a separate thread
2621 http_server = StreamingHTTPServer(hostname, port)
2622
2623 # create bucket on the first of the rados zones
2624 bucket = zones[0].create_bucket(bucket_name)
2625 # wait for sync
2626 zone_meta_checkpoint(ps_zones[0].zone)
2627 # create s3 notification
2628 notification_name = bucket_name + NOTIFICATION_SUFFIX
2629 topic_conf_list = [{'Id': notification_name,
2630 'TopicArn': topic_arn,
2631 'Events': ['s3:ObjectCreated:*']
2632 }]
2633 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2634 _, status = s3_notification_conf.set_config()
2635 assert_equal(status/100, 2)
2636 # create objects in the bucket
2637 number_of_objects = 10
2638 for i in range(number_of_objects):
2639 key = bucket.new_key(str(i))
2640 key.set_contents_from_string('bar')
2641 # wait for sync
2642 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2643
2644 keys = list(bucket.list())
2645 # TODO: use exact match
2646 receiver.verify_s3_events(keys, exact_match=False)
2647
2648 # update the same topic with new endpoint
2649 topic_conf = PSTopic(ps_zones[0].conn, topic_name,
2650 endpoint='http://'+ hostname + ':' + str(port))
2651 _, status = topic_conf.set_config()
2652 assert_equal(status/100, 2)
2653 # get topic
2654 result, _ = topic_conf.get_config()
2655 # verify topic content
2656 parsed_result = json.loads(result)
2657 assert_equal(parsed_result['topic']['name'], topic_name)
2658 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
2659
2660 # delete current objects and create new objects in the bucket
2661 for key in bucket.list():
2662 key.delete()
2663 for i in range(number_of_objects):
2664 key = bucket.new_key(str(i+100))
2665 key.set_contents_from_string('bar')
2666 # wait for sync
2667 zone_meta_checkpoint(ps_zones[0].zone)
2668 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2669
2670 keys = list(bucket.list())
2671 # verify that notifications are still sent to amqp
2672 # TODO: use exact match
2673 receiver.verify_s3_events(keys, exact_match=False)
2674
2675 # update notification to update the endpoint from the topic
2676 topic_conf_list = [{'Id': notification_name,
2677 'TopicArn': topic_arn,
2678 'Events': ['s3:ObjectCreated:*']
2679 }]
2680 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2681 _, status = s3_notification_conf.set_config()
2682 assert_equal(status/100, 2)
2683
2684 # delete current objects and create new objects in the bucket
2685 for key in bucket.list():
2686 key.delete()
2687 for i in range(number_of_objects):
2688 key = bucket.new_key(str(i+200))
2689 key.set_contents_from_string('bar')
2690 # wait for sync
2691 zone_meta_checkpoint(ps_zones[0].zone)
2692 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2693
2694 keys = list(bucket.list())
2695 # check that updates switched to http
2696 # TODO: use exact match
2697 http_server.verify_s3_events(keys, exact_match=False)
2698
2699 # cleanup
2700 # delete objects from the bucket
2701 stop_amqp_receiver(receiver, amqp_task)
2702 for key in bucket.list():
2703 key.delete()
2704 s3_notification_conf.del_config()
2705 topic_conf.del_config()
2706 zones[0].delete_bucket(bucket_name)
2707 http_server.close()
2708 clean_rabbitmq(rabbit_proc)
2709
2710
2711 def test_ps_s3_notification_update():
2712 """ test updating the topic of a notification"""
2713 if skip_push_tests:
2714 return SkipTest("PubSub push tests don't run in teuthology")
2715 hostname = get_ip()
2716 rabbit_proc = init_rabbitmq()
2717 if rabbit_proc is None:
2718 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2719
2720 zones, ps_zones = init_env()
2721 bucket_name = gen_bucket_name()
2722 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
2723 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
2724
2725 # create topics
2726 # start amqp receiver in a separate thread
2727 exchange = 'ex1'
2728 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
2729 amqp_task.start()
2730 # create random port for the http server
2731 http_port = random.randint(10000, 20000)
2732 # start an http server in a separate thread
2733 http_server = StreamingHTTPServer(hostname, http_port)
2734
2735 topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
2736 endpoint='amqp://' + hostname,
2737 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
2738 result, status = topic_conf1.set_config()
2739 parsed_result = json.loads(result)
2740 topic_arn1 = parsed_result['arn']
2741 assert_equal(status/100, 2)
2742 topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
2743 endpoint='http://'+hostname+':'+str(http_port))
2744 result, status = topic_conf2.set_config()
2745 parsed_result = json.loads(result)
2746 topic_arn2 = parsed_result['arn']
2747 assert_equal(status/100, 2)
2748
2749 # create bucket on the first of the rados zones
2750 bucket = zones[0].create_bucket(bucket_name)
2751 # wait for sync
2752 zone_meta_checkpoint(ps_zones[0].zone)
2753 # create s3 notification with topic1
2754 notification_name = bucket_name + NOTIFICATION_SUFFIX
2755 topic_conf_list = [{'Id': notification_name,
2756 'TopicArn': topic_arn1,
2757 'Events': ['s3:ObjectCreated:*']
2758 }]
2759 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2760 _, status = s3_notification_conf.set_config()
2761 assert_equal(status/100, 2)
2762 # create objects in the bucket
2763 number_of_objects = 10
2764 for i in range(number_of_objects):
2765 key = bucket.new_key(str(i))
2766 key.set_contents_from_string('bar')
2767 # wait for sync
2768 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2769
2770 keys = list(bucket.list())
2771 # TODO: use exact match
2772 receiver.verify_s3_events(keys, exact_match=False);
2773
2774 # update notification to use topic2
2775 topic_conf_list = [{'Id': notification_name,
2776 'TopicArn': topic_arn2,
2777 'Events': ['s3:ObjectCreated:*']
2778 }]
2779 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2780 _, status = s3_notification_conf.set_config()
2781 assert_equal(status/100, 2)
2782
2783 # delete current objects and create new objects in the bucket
2784 for key in bucket.list():
2785 key.delete()
2786 for i in range(number_of_objects):
2787 key = bucket.new_key(str(i+100))
2788 key.set_contents_from_string('bar')
2789 # wait for sync
2790 zone_meta_checkpoint(ps_zones[0].zone)
2791 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2792
2793 keys = list(bucket.list())
2794 # check that updates switched to http
2795 # TODO: use exact match
2796 http_server.verify_s3_events(keys, exact_match=False)
2797
2798 # cleanup
2799 # delete objects from the bucket
2800 stop_amqp_receiver(receiver, amqp_task)
2801 for key in bucket.list():
2802 key.delete()
2803 s3_notification_conf.del_config()
2804 topic_conf1.del_config()
2805 topic_conf2.del_config()
2806 zones[0].delete_bucket(bucket_name)
2807 http_server.close()
2808 clean_rabbitmq(rabbit_proc)
2809
2810
2811 def test_ps_s3_multiple_topics_notification():
2812 """ test notification creation with multiple topics"""
2813 if skip_push_tests:
2814 return SkipTest("PubSub push tests don't run in teuthology")
2815 hostname = get_ip()
2816 rabbit_proc = init_rabbitmq()
2817 if rabbit_proc is None:
2818 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2819
2820 zones, ps_zones = init_env()
2821 bucket_name = gen_bucket_name()
2822 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
2823 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
2824
2825 # create topics
2826 # start amqp receiver in a separate thread
2827 exchange = 'ex1'
2828 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
2829 amqp_task.start()
2830 # create random port for the http server
2831 http_port = random.randint(10000, 20000)
2832 # start an http server in a separate thread
2833 http_server = StreamingHTTPServer(hostname, http_port)
2834
2835 topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
2836 endpoint='amqp://' + hostname,
2837 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
2838 result, status = topic_conf1.set_config()
2839 parsed_result = json.loads(result)
2840 topic_arn1 = parsed_result['arn']
2841 assert_equal(status/100, 2)
2842 topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
2843 endpoint='http://'+hostname+':'+str(http_port))
2844 result, status = topic_conf2.set_config()
2845 parsed_result = json.loads(result)
2846 topic_arn2 = parsed_result['arn']
2847 assert_equal(status/100, 2)
2848
2849 # create bucket on the first of the rados zones
2850 bucket = zones[0].create_bucket(bucket_name)
2851 # wait for sync
2852 zone_meta_checkpoint(ps_zones[0].zone)
2853 # create s3 notification
2854 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
2855 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
2856 topic_conf_list = [
2857 {
2858 'Id': notification_name1,
2859 'TopicArn': topic_arn1,
2860 'Events': ['s3:ObjectCreated:*']
2861 },
2862 {
2863 'Id': notification_name2,
2864 'TopicArn': topic_arn2,
2865 'Events': ['s3:ObjectCreated:*']
2866 }]
2867 s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
2868 _, status = s3_notification_conf.set_config()
2869 assert_equal(status/100, 2)
2870 result, _ = s3_notification_conf.get_config()
2871 assert_equal(len(result['TopicConfigurations']), 2)
2872 assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1)
2873 assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
2874
2875 # get auto-generated subscriptions
2876 sub_conf1 = PSSubscription(ps_zones[0].conn, notification_name1,
2877 topic_name1)
2878 _, status = sub_conf1.get_config()
2879 assert_equal(status/100, 2)
2880 sub_conf2 = PSSubscription(ps_zones[0].conn, notification_name2,
2881 topic_name2)
2882 _, status = sub_conf2.get_config()
2883 assert_equal(status/100, 2)
2884
2885 # create objects in the bucket
2886 number_of_objects = 10
2887 for i in range(number_of_objects):
2888 key = bucket.new_key(str(i))
2889 key.set_contents_from_string('bar')
2890 # wait for sync
2891 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
2892
2893 # get the events from both of the subscription
2894 result, _ = sub_conf1.get_events()
2895 records = json.loads(result)
2896 for record in records['Records']:
2897 log.debug(record)
2898 keys = list(bucket.list())
2899 # TODO: use exact match
2900 verify_s3_records_by_elements(records, keys, exact_match=False)
2901 receiver.verify_s3_events(keys, exact_match=False)
2902
2903 result, _ = sub_conf2.get_events()
2904 parsed_result = json.loads(result)
2905 for record in parsed_result['Records']:
2906 log.debug(record)
2907 keys = list(bucket.list())
2908 # TODO: use exact match
2909 verify_s3_records_by_elements(records, keys, exact_match=False)
2910 http_server.verify_s3_events(keys, exact_match=False)
2911
2912 # cleanup
2913 stop_amqp_receiver(receiver, amqp_task)
2914 s3_notification_conf.del_config()
2915 topic_conf1.del_config()
2916 topic_conf2.del_config()
2917 # delete objects from the bucket
2918 for key in bucket.list():
2919 key.delete()
2920 zones[0].delete_bucket(bucket_name)
2921 http_server.close()
2922 clean_rabbitmq(rabbit_proc)