]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests_ps.py
d827ed55f14ed892aba16f416234b749e10876b4
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests_ps.py
1 import logging
2 import json
3 import tempfile
4 import BaseHTTPServer
5 import SocketServer
6 import random
7 import threading
8 import subprocess
9 import socket
10 import time
11 import os
12 from random import randint
13 from .tests import get_realm, \
14 ZonegroupConns, \
15 zonegroup_meta_checkpoint, \
16 zone_meta_checkpoint, \
17 zone_bucket_checkpoint, \
18 zone_data_checkpoint, \
19 zonegroup_bucket_checkpoint, \
20 check_bucket_eq, \
21 gen_bucket_name, \
22 get_user, \
23 get_tenant
24 from .zone_ps import PSTopic, \
25 PSTopicS3, \
26 PSNotification, \
27 PSSubscription, \
28 PSNotificationS3, \
29 print_connection_info, \
30 delete_all_s3_topics, \
31 put_object_tagging, \
32 get_object_tagging, \
33 delete_all_objects
34 from multisite import User
35 from nose import SkipTest
36 from nose.tools import assert_not_equal, assert_equal
37 import boto.s3.tagging
38
39 # configure logging for the tests module
40 log = logging.getLogger(__name__)
41
42 skip_push_tests = True
43
44 ####################################
45 # utility functions for pubsub tests
46 ####################################
47
48 def set_contents_from_string(key, content):
49 try:
50 key.set_contents_from_string(content)
51 except Exception as e:
52 print('Error: ' + str(e))
53
54
55 # HTTP endpoint functions
56 # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
57
58 class HTTPPostHandler(BaseHTTPServer.BaseHTTPRequestHandler):
59 """HTTP POST hanler class storing the received events in its http server"""
60 def do_POST(self):
61 """implementation of POST handler"""
62 try:
63 content_length = int(self.headers['Content-Length'])
64 body = self.rfile.read(content_length)
65 log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
66 self.server.append(json.loads(body))
67 except:
68 log.error('HTTP Server received empty event')
69 self.send_response(400)
70 else:
71 self.send_response(100)
72 finally:
73 self.end_headers()
74
75
76 class HTTPServerWithEvents(BaseHTTPServer.HTTPServer):
77 """HTTP server used by the handler to store events"""
78 def __init__(self, addr, handler, worker_id):
79 BaseHTTPServer.HTTPServer.__init__(self, addr, handler, False)
80 self.worker_id = worker_id
81 self.events = []
82
83 def append(self, event):
84 self.events.append(event)
85
86
87 class HTTPServerThread(threading.Thread):
88 """thread for running the HTTP server. reusing the same socket for all threads"""
89 def __init__(self, i, sock, addr):
90 threading.Thread.__init__(self)
91 self.i = i
92 self.daemon = True
93 self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i)
94 self.httpd.socket = sock
95 # prevent the HTTP server from re-binding every handler
96 self.httpd.server_bind = self.server_close = lambda self: None
97 self.start()
98
99 def run(self):
100 try:
101 log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
102 self.httpd.serve_forever()
103 log.info('HTTP Server (%d) ended', self.i)
104 except Exception as error:
105 # could happen if the server r/w to a closing socket during shutdown
106 log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
107
108 def close(self):
109 self.httpd.shutdown()
110
111 def get_events(self):
112 return self.httpd.events
113
114 def reset_events(self):
115 self.httpd.events = []
116
117
118 class StreamingHTTPServer:
119 """multi-threaded http server class also holding list of events received into the handler
120 each thread has its own server, and all servers share the same socket"""
121 def __init__(self, host, port, num_workers=100):
122 addr = (host, port)
123 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
125 self.sock.bind(addr)
126 self.sock.listen(num_workers)
127 self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
128
129 def verify_s3_events(self, keys, exact_match=False, deletions=False):
130 """verify stored s3 records agains a list of keys"""
131 events = []
132 for worker in self.workers:
133 events += worker.get_events()
134 worker.reset_events()
135 verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
136
137 def verify_events(self, keys, exact_match=False, deletions=False):
138 """verify stored events agains a list of keys"""
139 events = []
140 for worker in self.workers:
141 events += worker.get_events()
142 worker.reset_events()
143 verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
144
145 def get_and_reset_events(self):
146 events = []
147 for worker in self.workers:
148 events += worker.get_events()
149 worker.reset_events()
150 return events
151
152 def close(self):
153 """close all workers in the http server and wait for it to finish"""
154 # make sure that the shared socket is closed
155 # this is needed in case that one of the threads is blocked on the socket
156 self.sock.shutdown(socket.SHUT_RDWR)
157 self.sock.close()
158 # wait for server threads to finish
159 for worker in self.workers:
160 worker.close()
161 worker.join()
162
163 # AMQP endpoint functions
164
165 rabbitmq_port = 5672
166
167 class AMQPReceiver(object):
168 """class for receiving and storing messages on a topic from the AMQP broker"""
169 def __init__(self, exchange, topic):
170 import pika
171 hostname = get_ip()
172 remaining_retries = 10
173 while remaining_retries > 0:
174 try:
175 connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
176 break
177 except Exception as error:
178 remaining_retries -= 1
179 print('failed to connect to rabbitmq (remaining retries '
180 + str(remaining_retries) + '): ' + str(error))
181
182 if remaining_retries == 0:
183 raise Exception('failed to connect to rabbitmq - no retries left')
184
185 self.channel = connection.channel()
186 self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
187 result = self.channel.queue_declare('', exclusive=True)
188 queue_name = result.method.queue
189 self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic)
190 self.channel.basic_consume(queue=queue_name,
191 on_message_callback=self.on_message,
192 auto_ack=True)
193 self.events = []
194 self.topic = topic
195
196 def on_message(self, ch, method, properties, body):
197 """callback invoked when a new message arrive on the topic"""
198 log.info('AMQP received event for topic %s:\n %s', self.topic, body)
199 self.events.append(json.loads(body))
200
201 # TODO create a base class for the AMQP and HTTP cases
202 def verify_s3_events(self, keys, exact_match=False, deletions=False):
203 """verify stored s3 records agains a list of keys"""
204 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
205 self.events = []
206
207 def verify_events(self, keys, exact_match=False, deletions=False):
208 """verify stored events agains a list of keys"""
209 verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
210 self.events = []
211
212 def get_and_reset_events(self):
213 tmp = self.events
214 self.events = []
215 return tmp
216
217
218 def amqp_receiver_thread_runner(receiver):
219 """main thread function for the amqp receiver"""
220 try:
221 log.info('AMQP receiver started')
222 receiver.channel.start_consuming()
223 log.info('AMQP receiver ended')
224 except Exception as error:
225 log.info('AMQP receiver ended unexpectedly: %s', str(error))
226
227
228 def create_amqp_receiver_thread(exchange, topic):
229 """create amqp receiver and thread"""
230 receiver = AMQPReceiver(exchange, topic)
231 task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
232 task.daemon = True
233 return task, receiver
234
235
236 def stop_amqp_receiver(receiver, task):
237 """stop the receiver thread and wait for it to finis"""
238 try:
239 receiver.channel.stop_consuming()
240 log.info('stopping AMQP receiver')
241 except Exception as error:
242 log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
243 task.join(5)
244
245 def check_ps_configured():
246 """check if at least one pubsub zone exist"""
247 realm = get_realm()
248 zonegroup = realm.master_zonegroup()
249
250 ps_zones = zonegroup.zones_by_type.get("pubsub")
251 if not ps_zones:
252 raise SkipTest("Requires at least one PS zone")
253
254
255 def is_ps_zone(zone_conn):
256 """check if a specific zone is pubsub zone"""
257 if not zone_conn:
258 return False
259 return zone_conn.zone.tier_type() == "pubsub"
260
261
262 def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
263 """ verify there is at least one event per element """
264 err = ''
265 for key in keys:
266 key_found = False
267 if type(events) is list:
268 for event_list in events:
269 if key_found:
270 break
271 for event in event_list['events']:
272 if event['info']['bucket']['name'] == key.bucket.name and \
273 event['info']['key']['name'] == key.name:
274 if deletions and event['event'] == 'OBJECT_DELETE':
275 key_found = True
276 break
277 elif not deletions and event['event'] == 'OBJECT_CREATE':
278 key_found = True
279 break
280 else:
281 for event in events['events']:
282 if event['info']['bucket']['name'] == key.bucket.name and \
283 event['info']['key']['name'] == key.name:
284 if deletions and event['event'] == 'OBJECT_DELETE':
285 key_found = True
286 break
287 elif not deletions and event['event'] == 'OBJECT_CREATE':
288 key_found = True
289 break
290
291 if not key_found:
292 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
293 log.error(events)
294 assert False, err
295
296 if not len(events) == len(keys):
297 err = 'superfluous events are found'
298 log.debug(err)
299 if exact_match:
300 log.error(events)
301 assert False, err
302
303
304 def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False):
305 """ verify there is at least one record per element """
306 err = ''
307 for key in keys:
308 key_found = False
309 if type(records) is list:
310 for record_list in records:
311 if key_found:
312 break
313 for record in record_list['Records']:
314 if record['s3']['bucket']['name'] == key.bucket.name and \
315 record['s3']['object']['key'] == key.name:
316 if deletions and 'ObjectRemoved' in record['eventName']:
317 key_found = True
318 break
319 elif not deletions and 'ObjectCreated' in record['eventName']:
320 key_found = True
321 break
322 else:
323 for record in records['Records']:
324 if record['s3']['bucket']['name'] == key.bucket.name and \
325 record['s3']['object']['key'] == key.name:
326 if deletions and 'ObjectRemoved' in record['eventName']:
327 key_found = True
328 break
329 elif not deletions and 'ObjectCreated' in record['eventName']:
330 key_found = True
331 break
332
333 if not key_found:
334 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
335 for record_list in records:
336 for record in record_list['Records']:
337 log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
338 assert False, err
339
340 if not len(records) == len(keys):
341 err = 'superfluous records are found'
342 log.warning(err)
343 if exact_match:
344 for record_list in records:
345 for record in record_list['Records']:
346 log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
347 assert False, err
348
349
350 def init_rabbitmq():
351 """ start a rabbitmq broker """
352 hostname = get_ip()
353 #port = str(random.randint(20000, 30000))
354 #data_dir = './' + port + '_data'
355 #log_dir = './' + port + '_log'
356 #print('')
357 #try:
358 # os.mkdir(data_dir)
359 # os.mkdir(log_dir)
360 #except:
361 # print('rabbitmq directories already exists')
362 #env = {'RABBITMQ_NODE_PORT': port,
363 # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
364 # 'RABBITMQ_USE_LONGNAME': 'true',
365 # 'RABBITMQ_MNESIA_BASE': data_dir,
366 # 'RABBITMQ_LOG_BASE': log_dir}
367 # TODO: support multiple brokers per host using env
368 # make sure we don't collide with the default
369 try:
370 proc = subprocess.Popen('rabbitmq-server')
371 except Exception as error:
372 log.info('failed to execute rabbitmq-server: %s', str(error))
373 print('failed to execute rabbitmq-server: %s' % str(error))
374 return None
375 # TODO add rabbitmq checkpoint instead of sleep
376 time.sleep(5)
377 return proc #, port, data_dir, log_dir
378
379
380 def clean_rabbitmq(proc): #, data_dir, log_dir)
381 """ stop the rabbitmq broker """
382 try:
383 subprocess.call(['rabbitmqctl', 'stop'])
384 time.sleep(5)
385 proc.terminate()
386 except:
387 log.info('rabbitmq server already terminated')
388 # TODO: add directory cleanup once multiple brokers are supported
389 #try:
390 # os.rmdir(data_dir)
391 # os.rmdir(log_dir)
392 #except:
393 # log.info('rabbitmq directories already removed')
394
395
396 # Kafka endpoint functions
397
398 kafka_server = 'localhost'
399
400 class KafkaReceiver(object):
401 """class for receiving and storing messages on a topic from the kafka broker"""
402 def __init__(self, topic, security_type):
403 from kafka import KafkaConsumer
404 remaining_retries = 10
405 port = 9092
406 if security_type != 'PLAINTEXT':
407 security_type = 'SSL'
408 port = 9093
409 while remaining_retries > 0:
410 try:
411 self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
412 print('Kafka consumer created on topic: '+topic)
413 break
414 except Exception as error:
415 remaining_retries -= 1
416 print('failed to connect to kafka (remaining retries '
417 + str(remaining_retries) + '): ' + str(error))
418 time.sleep(1)
419
420 if remaining_retries == 0:
421 raise Exception('failed to connect to kafka - no retries left')
422
423 self.events = []
424 self.topic = topic
425 self.stop = False
426
427 def verify_s3_events(self, keys, exact_match=False, deletions=False):
428 """verify stored s3 records agains a list of keys"""
429 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
430 self.events = []
431
432
433 def kafka_receiver_thread_runner(receiver):
434 """main thread function for the kafka receiver"""
435 try:
436 log.info('Kafka receiver started')
437 print('Kafka receiver started')
438 while not receiver.stop:
439 for msg in receiver.consumer:
440 receiver.events.append(json.loads(msg.value))
441 timer.sleep(0.1)
442 log.info('Kafka receiver ended')
443 print('Kafka receiver ended')
444 except Exception as error:
445 log.info('Kafka receiver ended unexpectedly: %s', str(error))
446 print('Kafka receiver ended unexpectedly: ' + str(error))
447
448
449 def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
450 """create kafka receiver and thread"""
451 receiver = KafkaReceiver(topic, security_type)
452 task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
453 task.daemon = True
454 return task, receiver
455
456 def stop_kafka_receiver(receiver, task):
457 """stop the receiver thread and wait for it to finis"""
458 receiver.stop = True
459 task.join(1)
460 try:
461 receiver.consumer.close()
462 except Exception as error:
463 log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
464
465
466 # follow the instruction here to create and sign a broker certificate:
467 # https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka
468
469 # the generated broker certificate should be stored in the java keystore for the use of the server
470 # assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost"
471 # following lines must be added to $KAFKA_DIR/config/server.properties
472 # listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
473 # sasl.enabled.mechanisms=PLAIN
474 # ssl.keystore.location = $KAFKA_DIR/server.keystore.jks
475 # ssl.keystore.password = abcdefgh
476 # ssl.key.password = abcdefgh
477 # ssl.truststore.location = $KAFKA_DIR/server.truststore.jks
478 # ssl.truststore.password = abcdefgh
479
480 # notes:
481 # (1) we dont test client authentication, hence, no need to generate client keys
482 # (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above
483 # should be copied to: $KAFKA_DIR
484
485 def init_kafka():
486 """ start kafka/zookeeper """
487 try:
488 KAFKA_DIR = os.environ['KAFKA_DIR']
489 except:
490 KAFKA_DIR = ''
491
492 if KAFKA_DIR == '':
493 log.info('KAFKA_DIR must be set to where kafka is installed')
494 print('KAFKA_DIR must be set to where kafka is installed')
495 return None, None, None
496
497 DEVNULL = open(os.devnull, 'wb')
498
499 print('\nStarting zookeeper...')
500 try:
501 zk_proc = subprocess.Popen([KAFKA_DIR+'bin/zookeeper-server-start.sh', KAFKA_DIR+'config/zookeeper.properties'], stdout=DEVNULL)
502 except Exception as error:
503 log.info('failed to execute zookeeper: %s', str(error))
504 print('failed to execute zookeeper: %s' % str(error))
505 return None, None, None
506
507 time.sleep(5)
508 if zk_proc.poll() is not None:
509 print('zookeeper failed to start')
510 return None, None, None
511 print('Zookeeper started')
512 print('Starting kafka...')
513 kafka_log = open('./kafka.log', 'w')
514 try:
515 kafka_env = os.environ.copy()
516 kafka_env['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR+'config/kafka_server_jaas.conf'
517 kafka_proc = subprocess.Popen([
518 KAFKA_DIR+'bin/kafka-server-start.sh',
519 KAFKA_DIR+'config/server.properties'],
520 stdout=kafka_log,
521 env=kafka_env)
522 except Exception as error:
523 log.info('failed to execute kafka: %s', str(error))
524 print('failed to execute kafka: %s' % str(error))
525 zk_proc.terminate()
526 kafka_log.close()
527 return None, None, None
528
529 # TODO add kafka checkpoint instead of sleep
530 time.sleep(15)
531 if kafka_proc.poll() is not None:
532 zk_proc.terminate()
533 print('kafka failed to start. details in: ./kafka.log')
534 kafka_log.close()
535 return None, None, None
536
537 print('Kafka started')
538 return kafka_proc, zk_proc, kafka_log
539
540
541 def clean_kafka(kafka_proc, zk_proc, kafka_log):
542 """ stop kafka/zookeeper """
543 try:
544 kafka_log.close()
545 print('Shutdown Kafka...')
546 kafka_proc.terminate()
547 time.sleep(5)
548 if kafka_proc.poll() is None:
549 print('Failed to shutdown Kafka... killing')
550 kafka_proc.kill()
551 print('Shutdown zookeeper...')
552 zk_proc.terminate()
553 time.sleep(5)
554 if zk_proc.poll() is None:
555 print('Failed to shutdown zookeeper... killing')
556 zk_proc.kill()
557 except:
558 log.info('kafka/zookeeper already terminated')
559
560
561 def init_env(require_ps=True):
562 """initialize the environment"""
563 if require_ps:
564 check_ps_configured()
565
566 realm = get_realm()
567 zonegroup = realm.master_zonegroup()
568 zonegroup_conns = ZonegroupConns(zonegroup)
569
570 zonegroup_meta_checkpoint(zonegroup)
571
572 ps_zone = None
573 master_zone = None
574 for conn in zonegroup_conns.zones:
575 if conn.zone == zonegroup.master_zone:
576 master_zone = conn
577 if is_ps_zone(conn):
578 zone_meta_checkpoint(conn.zone)
579 ps_zone = conn
580
581 assert_not_equal(master_zone, None)
582 if require_ps:
583 assert_not_equal(ps_zone, None)
584 return master_zone, ps_zone
585
586
587 def get_ip():
588 """ This method returns the "primary" IP on the local box (the one with a default route)
589 source: https://stackoverflow.com/a/28950776/711085
590 this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
591 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
592 try:
593 # address should not be reachable
594 s.connect(('10.255.255.255', 1))
595 ip = s.getsockname()[0]
596 finally:
597 s.close()
598 return ip
599
600
601 TOPIC_SUFFIX = "_topic"
602 SUB_SUFFIX = "_sub"
603 NOTIFICATION_SUFFIX = "_notif"
604
605 ##############
606 # pubsub tests
607 ##############
608
609 def test_ps_info():
610 """ log information for manual testing """
611 return SkipTest("only used in manual testing")
612 master_zone, ps_zone = init_env()
613 realm = get_realm()
614 zonegroup = realm.master_zonegroup()
615 bucket_name = gen_bucket_name()
616 # create bucket on the first of the rados zones
617 bucket = master_zone.create_bucket(bucket_name)
618 # create objects in the bucket
619 number_of_objects = 10
620 for i in range(number_of_objects):
621 key = bucket.new_key(str(i))
622 key.set_contents_from_string('bar')
623 print('Zonegroup: ' + zonegroup.name)
624 print('user: ' + get_user())
625 print('tenant: ' + get_tenant())
626 print('Master Zone')
627 print_connection_info(master_zone.conn)
628 print('PubSub Zone')
629 print_connection_info(ps_zone.conn)
630 print('Bucket: ' + bucket_name)
631
632
633 def test_ps_s3_notification_low_level():
634 """ test low level implementation of s3 notifications """
635 master_zone, ps_zone = init_env()
636 bucket_name = gen_bucket_name()
637 # create bucket on the first of the rados zones
638 master_zone.create_bucket(bucket_name)
639 # wait for sync
640 zone_meta_checkpoint(ps_zone.zone)
641 # create topic
642 topic_name = bucket_name + TOPIC_SUFFIX
643 topic_conf = PSTopic(ps_zone.conn, topic_name)
644 result, status = topic_conf.set_config()
645 assert_equal(status/100, 2)
646 parsed_result = json.loads(result)
647 topic_arn = parsed_result['arn']
648 # create s3 notification
649 notification_name = bucket_name + NOTIFICATION_SUFFIX
650 generated_topic_name = notification_name+'_'+topic_name
651 topic_conf_list = [{'Id': notification_name,
652 'TopicArn': topic_arn,
653 'Events': ['s3:ObjectCreated:*']
654 }]
655 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
656 _, status = s3_notification_conf.set_config()
657 assert_equal(status/100, 2)
658 zone_meta_checkpoint(ps_zone.zone)
659 # get auto-generated topic
660 generated_topic_conf = PSTopic(ps_zone.conn, generated_topic_name)
661 result, status = generated_topic_conf.get_config()
662 parsed_result = json.loads(result)
663 assert_equal(status/100, 2)
664 assert_equal(parsed_result['topic']['name'], generated_topic_name)
665 # get auto-generated notification
666 notification_conf = PSNotification(ps_zone.conn, bucket_name,
667 generated_topic_name)
668 result, status = notification_conf.get_config()
669 parsed_result = json.loads(result)
670 assert_equal(status/100, 2)
671 assert_equal(len(parsed_result['topics']), 1)
672 # get auto-generated subscription
673 sub_conf = PSSubscription(ps_zone.conn, notification_name,
674 generated_topic_name)
675 result, status = sub_conf.get_config()
676 parsed_result = json.loads(result)
677 assert_equal(status/100, 2)
678 assert_equal(parsed_result['topic'], generated_topic_name)
679 # delete s3 notification
680 _, status = s3_notification_conf.del_config(notification=notification_name)
681 assert_equal(status/100, 2)
682 # delete topic
683 _, status = topic_conf.del_config()
684 assert_equal(status/100, 2)
685
686 # verify low-level cleanup
687 _, status = generated_topic_conf.get_config()
688 assert_equal(status, 404)
689 result, status = notification_conf.get_config()
690 parsed_result = json.loads(result)
691 assert_equal(len(parsed_result['topics']), 0)
692 # TODO should return 404
693 # assert_equal(status, 404)
694 result, status = sub_conf.get_config()
695 parsed_result = json.loads(result)
696 assert_equal(parsed_result['topic'], '')
697 # TODO should return 404
698 # assert_equal(status, 404)
699
700 # cleanup
701 topic_conf.del_config()
702 # delete the bucket
703 master_zone.delete_bucket(bucket_name)
704
705
706 def test_ps_s3_notification_records():
707 """ test s3 records fetching """
708 master_zone, ps_zone = init_env()
709 bucket_name = gen_bucket_name()
710 # create bucket on the first of the rados zones
711 bucket = master_zone.create_bucket(bucket_name)
712 # wait for sync
713 zone_meta_checkpoint(ps_zone.zone)
714 # create topic
715 topic_name = bucket_name + TOPIC_SUFFIX
716 topic_conf = PSTopic(ps_zone.conn, topic_name)
717 result, status = topic_conf.set_config()
718 assert_equal(status/100, 2)
719 parsed_result = json.loads(result)
720 topic_arn = parsed_result['arn']
721 # create s3 notification
722 notification_name = bucket_name + NOTIFICATION_SUFFIX
723 topic_conf_list = [{'Id': notification_name,
724 'TopicArn': topic_arn,
725 'Events': ['s3:ObjectCreated:*']
726 }]
727 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
728 _, status = s3_notification_conf.set_config()
729 assert_equal(status/100, 2)
730 zone_meta_checkpoint(ps_zone.zone)
731 # get auto-generated subscription
732 sub_conf = PSSubscription(ps_zone.conn, notification_name,
733 topic_name)
734 _, status = sub_conf.get_config()
735 assert_equal(status/100, 2)
736 # create objects in the bucket
737 number_of_objects = 10
738 for i in range(number_of_objects):
739 key = bucket.new_key(str(i))
740 key.set_contents_from_string('bar')
741 # wait for sync
742 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
743
744 # get the events from the subscription
745 result, _ = sub_conf.get_events()
746 records = json.loads(result)
747 for record in records['Records']:
748 log.debug(record)
749 keys = list(bucket.list())
750 # TODO: use exact match
751 verify_s3_records_by_elements(records, keys, exact_match=False)
752
753 # cleanup
754 _, status = s3_notification_conf.del_config()
755 topic_conf.del_config()
756 # delete the keys
757 for key in bucket.list():
758 key.delete()
759 master_zone.delete_bucket(bucket_name)
760
761
762 def test_ps_s3_notification():
763 """ test s3 notification set/get/delete """
764 master_zone, ps_zone = init_env()
765 bucket_name = gen_bucket_name()
766 # create bucket on the first of the rados zones
767 master_zone.create_bucket(bucket_name)
768 # wait for sync
769 zone_meta_checkpoint(ps_zone.zone)
770 topic_name = bucket_name + TOPIC_SUFFIX
771 # create topic
772 topic_name = bucket_name + TOPIC_SUFFIX
773 topic_conf = PSTopic(ps_zone.conn, topic_name)
774 response, status = topic_conf.set_config()
775 assert_equal(status/100, 2)
776 parsed_result = json.loads(response)
777 topic_arn = parsed_result['arn']
778 # create one s3 notification
779 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
780 topic_conf_list = [{'Id': notification_name1,
781 'TopicArn': topic_arn,
782 'Events': ['s3:ObjectCreated:*']
783 }]
784 s3_notification_conf1 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
785 response, status = s3_notification_conf1.set_config()
786 assert_equal(status/100, 2)
787 # create another s3 notification with the same topic
788 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
789 topic_conf_list = [{'Id': notification_name2,
790 'TopicArn': topic_arn,
791 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
792 }]
793 s3_notification_conf2 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
794 response, status = s3_notification_conf2.set_config()
795 assert_equal(status/100, 2)
796 zone_meta_checkpoint(ps_zone.zone)
797
798 # get all notification on a bucket
799 response, status = s3_notification_conf1.get_config()
800 assert_equal(status/100, 2)
801 assert_equal(len(response['TopicConfigurations']), 2)
802 assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
803 assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
804
805 # get specific notification on a bucket
806 response, status = s3_notification_conf1.get_config(notification=notification_name1)
807 assert_equal(status/100, 2)
808 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
809 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1)
810 response, status = s3_notification_conf2.get_config(notification=notification_name2)
811 assert_equal(status/100, 2)
812 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
813 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2)
814
815 # delete specific notifications
816 _, status = s3_notification_conf1.del_config(notification=notification_name1)
817 assert_equal(status/100, 2)
818 _, status = s3_notification_conf2.del_config(notification=notification_name2)
819 assert_equal(status/100, 2)
820
821 # cleanup
822 topic_conf.del_config()
823 # delete the bucket
824 master_zone.delete_bucket(bucket_name)
825
826
827 def test_ps_s3_topic_on_master():
828 """ test s3 topics set/get/delete on master """
829 master_zone, _ = init_env(require_ps=False)
830 realm = get_realm()
831 zonegroup = realm.master_zonegroup()
832 bucket_name = gen_bucket_name()
833 topic_name = bucket_name + TOPIC_SUFFIX
834
835 # clean all topics
836 delete_all_s3_topics(master_zone, zonegroup.name)
837
838 # create s3 topics
839 endpoint_address = 'amqp://127.0.0.1:7001'
840 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
841 topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
842 topic_arn = topic_conf1.set_config()
843 assert_equal(topic_arn,
844 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1')
845
846 endpoint_address = 'http://127.0.0.1:9001'
847 endpoint_args = 'push-endpoint='+endpoint_address
848 topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
849 topic_arn = topic_conf2.set_config()
850 assert_equal(topic_arn,
851 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2')
852 endpoint_address = 'http://127.0.0.1:9002'
853 endpoint_args = 'push-endpoint='+endpoint_address
854 topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
855 topic_arn = topic_conf3.set_config()
856 assert_equal(topic_arn,
857 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
858
859 # get topic 3
860 result, status = topic_conf3.get_config()
861 assert_equal(status, 200)
862 assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
863 assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
864 # Note that endpoint args may be ordered differently in the result
865
866 # delete topic 1
867 result = topic_conf1.del_config()
868 assert_equal(status, 200)
869
870 # try to get a deleted topic
871 _, status = topic_conf1.get_config()
872 assert_equal(status, 404)
873
874 # get the remaining 2 topics
875 result, status = topic_conf1.get_list()
876 assert_equal(status, 200)
877 assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
878
879 # delete topics
880 result = topic_conf2.del_config()
881 # TODO: should be 200OK
882 # assert_equal(status, 200)
883 result = topic_conf3.del_config()
884 # TODO: should be 200OK
885 # assert_equal(status, 200)
886
887 # get topic list, make sure it is empty
888 result, status = topic_conf1.get_list()
889 assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
890
891
892 def test_ps_s3_topic_with_secret_on_master():
893 """ test s3 topics with secret set/get/delete on master """
894 master_zone, _ = init_env(require_ps=False)
895 if master_zone.secure_conn is None:
896 return SkipTest('secure connection is needed to test topic with secrets')
897
898 realm = get_realm()
899 zonegroup = realm.master_zonegroup()
900 bucket_name = gen_bucket_name()
901 topic_name = bucket_name + TOPIC_SUFFIX
902
903 # clean all topics
904 delete_all_s3_topics(master_zone, zonegroup.name)
905
906 # create s3 topics
907 endpoint_address = 'amqp://user:password@127.0.0.1:7001'
908 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
909 bad_topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
910 try:
911 result = bad_topic_conf.set_config()
912 except Exception as err:
913 print('Error is expected: ' + str(err))
914 else:
915 assert False, 'user password configuration set allowed only over HTTPS'
916
917 topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
918 topic_arn = topic_conf.set_config()
919
920 assert_equal(topic_arn,
921 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
922
923 _, status = bad_topic_conf.get_config()
924 assert_equal(status/100, 4)
925
926 # get topic
927 result, status = topic_conf.get_config()
928 assert_equal(status, 200)
929 assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
930 assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
931
932 _, status = bad_topic_conf.get_config()
933 assert_equal(status/100, 4)
934
935 _, status = topic_conf.get_list()
936 assert_equal(status/100, 2)
937
938 # delete topics
939 result = topic_conf.del_config()
940
941
942 def test_ps_s3_notification_on_master():
943 """ test s3 notification set/get/delete on master """
944 master_zone, _ = init_env(require_ps=False)
945 realm = get_realm()
946 zonegroup = realm.master_zonegroup()
947 bucket_name = gen_bucket_name()
948 # create bucket
949 bucket = master_zone.create_bucket(bucket_name)
950 topic_name = bucket_name + TOPIC_SUFFIX
951 # create s3 topic
952 endpoint_address = 'amqp://127.0.0.1:7001'
953 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
954 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
955 topic_arn = topic_conf.set_config()
956 # create s3 notification
957 notification_name = bucket_name + NOTIFICATION_SUFFIX
958 topic_conf_list = [{'Id': notification_name+'_1',
959 'TopicArn': topic_arn,
960 'Events': ['s3:ObjectCreated:*']
961 },
962 {'Id': notification_name+'_2',
963 'TopicArn': topic_arn,
964 'Events': ['s3:ObjectRemoved:*']
965 },
966 {'Id': notification_name+'_3',
967 'TopicArn': topic_arn,
968 'Events': []
969 }]
970 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
971 _, status = s3_notification_conf.set_config()
972 assert_equal(status/100, 2)
973
974 # get notifications on a bucket
975 response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
976 assert_equal(status/100, 2)
977 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
978
979 # delete specific notifications
980 _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
981 assert_equal(status/100, 2)
982
983 # get the remaining 2 notifications on a bucket
984 response, status = s3_notification_conf.get_config()
985 assert_equal(status/100, 2)
986 assert_equal(len(response['TopicConfigurations']), 2)
987 assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
988 assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
989
990 # delete remaining notifications
991 _, status = s3_notification_conf.del_config()
992 assert_equal(status/100, 2)
993
994 # make sure that the notifications are now deleted
995 _, status = s3_notification_conf.get_config()
996
997 # cleanup
998 topic_conf.del_config()
999 # delete the bucket
1000 master_zone.delete_bucket(bucket_name)
1001
1002
1003 def ps_s3_notification_filter(on_master):
1004 """ test s3 notification filter on master """
1005 if skip_push_tests:
1006 return SkipTest("PubSub push tests don't run in teuthology")
1007 hostname = get_ip()
1008 proc = init_rabbitmq()
1009 if proc is None:
1010 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1011 if on_master:
1012 master_zone, _ = init_env(require_ps=False)
1013 ps_zone = master_zone
1014 else:
1015 master_zone, ps_zone = init_env(require_ps=True)
1016 ps_zone = ps_zone
1017
1018 realm = get_realm()
1019 zonegroup = realm.master_zonegroup()
1020
1021 # create bucket
1022 bucket_name = gen_bucket_name()
1023 bucket = master_zone.create_bucket(bucket_name)
1024 topic_name = bucket_name + TOPIC_SUFFIX
1025
1026 # start amqp receivers
1027 exchange = 'ex1'
1028 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
1029 task.start()
1030
1031 # create s3 topic
1032 endpoint_address = 'amqp://' + hostname
1033 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
1034 if on_master:
1035 topic_conf = PSTopicS3(ps_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1036 topic_arn = topic_conf.set_config()
1037 else:
1038 topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
1039 result, _ = topic_conf.set_config()
1040 parsed_result = json.loads(result)
1041 topic_arn = parsed_result['arn']
1042 zone_meta_checkpoint(ps_zone.zone)
1043
1044 # create s3 notification
1045 notification_name = bucket_name + NOTIFICATION_SUFFIX
1046 topic_conf_list = [{'Id': notification_name+'_1',
1047 'TopicArn': topic_arn,
1048 'Events': ['s3:ObjectCreated:*'],
1049 'Filter': {
1050 'Key': {
1051 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
1052 }
1053 }
1054 },
1055 {'Id': notification_name+'_2',
1056 'TopicArn': topic_arn,
1057 'Events': ['s3:ObjectCreated:*'],
1058 'Filter': {
1059 'Key': {
1060 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
1061 {'Name': 'suffix', 'Value': 'log'}]
1062 }
1063 }
1064 },
1065 {'Id': notification_name+'_3',
1066 'TopicArn': topic_arn,
1067 'Events': [],
1068 'Filter': {
1069 'Key': {
1070 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
1071 }
1072 }
1073 }]
1074
1075 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1076 result, status = s3_notification_conf.set_config()
1077 assert_equal(status/100, 2)
1078
1079 if on_master:
1080 topic_conf_list = [{'Id': notification_name+'_4',
1081 'TopicArn': topic_arn,
1082 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
1083 'Filter': {
1084 'Metadata': {
1085 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
1086 {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
1087 },
1088 'Key': {
1089 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
1090 }
1091 }
1092 }]
1093
1094 try:
1095 s3_notification_conf4 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1096 _, status = s3_notification_conf4.set_config()
1097 assert_equal(status/100, 2)
1098 skip_notif4 = False
1099 except Exception as error:
1100 print('note: metadata filter is not supported by boto3 - skipping test')
1101 skip_notif4 = True
1102 else:
1103 print('filtering by attributes only supported on master zone')
1104 skip_notif4 = True
1105
1106
1107 # get all notifications
1108 result, status = s3_notification_conf.get_config()
1109 assert_equal(status/100, 2)
1110 for conf in result['TopicConfigurations']:
1111 filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
1112 assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
1113
1114 if not skip_notif4:
1115 result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
1116 assert_equal(status/100, 2)
1117 filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
1118 assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
1119
1120 expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
1121 expected_in2 = ['world1.log', 'world2log', 'world3.log']
1122 expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt']
1123 expected_in4 = ['foo', 'bar', 'hello', 'world']
1124 filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
1125 filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld']
1126 # create objects in bucket
1127 for key_name in expected_in1:
1128 key = bucket.new_key(key_name)
1129 key.set_contents_from_string('bar')
1130 for key_name in expected_in2:
1131 key = bucket.new_key(key_name)
1132 key.set_contents_from_string('bar')
1133 for key_name in expected_in3:
1134 key = bucket.new_key(key_name)
1135 key.set_contents_from_string('bar')
1136 if not skip_notif4:
1137 for key_name in expected_in4:
1138 key = bucket.new_key(key_name)
1139 key.set_metadata('foo', 'bar')
1140 key.set_metadata('hello', 'world')
1141 key.set_metadata('goodbye', 'cruel world')
1142 key.set_contents_from_string('bar')
1143 for key_name in filtered:
1144 key = bucket.new_key(key_name)
1145 key.set_contents_from_string('bar')
1146 for key_name in filtered_with_attr:
1147 key.set_metadata('foo', 'nobar')
1148 key.set_metadata('hello', 'noworld')
1149 key.set_metadata('goodbye', 'cruel world')
1150 key = bucket.new_key(key_name)
1151 key.set_contents_from_string('bar')
1152
1153 if on_master:
1154 print('wait for 5sec for the messages...')
1155 time.sleep(5)
1156 else:
1157 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1158
1159 found_in1 = []
1160 found_in2 = []
1161 found_in3 = []
1162 found_in4 = []
1163
1164 for event in receiver.get_and_reset_events():
1165 notif_id = event['Records'][0]['s3']['configurationId']
1166 key_name = event['Records'][0]['s3']['object']['key']
1167 if notif_id == notification_name+'_1':
1168 found_in1.append(key_name)
1169 elif notif_id == notification_name+'_2':
1170 found_in2.append(key_name)
1171 elif notif_id == notification_name+'_3':
1172 found_in3.append(key_name)
1173 elif not skip_notif4 and notif_id == notification_name+'_4':
1174 found_in4.append(key_name)
1175 else:
1176 assert False, 'invalid notification: ' + notif_id
1177
1178 assert_equal(set(found_in1), set(expected_in1))
1179 assert_equal(set(found_in2), set(expected_in2))
1180 assert_equal(set(found_in3), set(expected_in3))
1181 if not skip_notif4:
1182 assert_equal(set(found_in4), set(expected_in4))
1183
1184 # cleanup
1185 s3_notification_conf.del_config()
1186 if not skip_notif4:
1187 s3_notification_conf4.del_config()
1188 topic_conf.del_config()
1189 # delete the bucket
1190 for key in bucket.list():
1191 key.delete()
1192 master_zone.delete_bucket(bucket_name)
1193 stop_amqp_receiver(receiver, task)
1194 clean_rabbitmq(proc)
1195
1196
1197 def test_ps_s3_notification_filter_on_master():
1198 ps_s3_notification_filter(on_master=True)
1199
1200
1201 def test_ps_s3_notification_filter():
1202 ps_s3_notification_filter(on_master=False)
1203
1204
1205 def test_ps_s3_notification_errors_on_master():
1206 """ test s3 notification set/get/delete on master """
1207 master_zone, _ = init_env(require_ps=False)
1208 realm = get_realm()
1209 zonegroup = realm.master_zonegroup()
1210 bucket_name = gen_bucket_name()
1211 # create bucket
1212 bucket = master_zone.create_bucket(bucket_name)
1213 topic_name = bucket_name + TOPIC_SUFFIX
1214 # create s3 topic
1215 endpoint_address = 'amqp://127.0.0.1:7001'
1216 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
1217 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1218 topic_arn = topic_conf.set_config()
1219
1220 # create s3 notification with invalid event name
1221 notification_name = bucket_name + NOTIFICATION_SUFFIX
1222 topic_conf_list = [{'Id': notification_name,
1223 'TopicArn': topic_arn,
1224 'Events': ['s3:ObjectCreated:Kaboom']
1225 }]
1226 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1227 try:
1228 result, status = s3_notification_conf.set_config()
1229 except Exception as error:
1230 print(str(error) + ' - is expected')
1231 else:
1232 assert False, 'invalid event name is expected to fail'
1233
1234 # create s3 notification with missing name
1235 topic_conf_list = [{'Id': '',
1236 'TopicArn': topic_arn,
1237 'Events': ['s3:ObjectCreated:Put']
1238 }]
1239 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1240 try:
1241 _, _ = s3_notification_conf.set_config()
1242 except Exception as error:
1243 print(str(error) + ' - is expected')
1244 else:
1245 assert False, 'missing notification name is expected to fail'
1246
1247 # create s3 notification with invalid topic ARN
1248 invalid_topic_arn = 'kaboom'
1249 topic_conf_list = [{'Id': notification_name,
1250 'TopicArn': invalid_topic_arn,
1251 'Events': ['s3:ObjectCreated:Put']
1252 }]
1253 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1254 try:
1255 _, _ = s3_notification_conf.set_config()
1256 except Exception as error:
1257 print(str(error) + ' - is expected')
1258 else:
1259 assert False, 'invalid ARN is expected to fail'
1260
1261 # create s3 notification with unknown topic ARN
1262 invalid_topic_arn = 'arn:aws:sns:a::kaboom'
1263 topic_conf_list = [{'Id': notification_name,
1264 'TopicArn': invalid_topic_arn ,
1265 'Events': ['s3:ObjectCreated:Put']
1266 }]
1267 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1268 try:
1269 _, _ = s3_notification_conf.set_config()
1270 except Exception as error:
1271 print(str(error) + ' - is expected')
1272 else:
1273 assert False, 'unknown topic is expected to fail'
1274
1275 # create s3 notification with wrong bucket
1276 topic_conf_list = [{'Id': notification_name,
1277 'TopicArn': topic_arn,
1278 'Events': ['s3:ObjectCreated:Put']
1279 }]
1280 s3_notification_conf = PSNotificationS3(master_zone.conn, 'kaboom', topic_conf_list)
1281 try:
1282 _, _ = s3_notification_conf.set_config()
1283 except Exception as error:
1284 print(str(error) + ' - is expected')
1285 else:
1286 assert False, 'unknown bucket is expected to fail'
1287
1288 topic_conf.del_config()
1289
1290 status = topic_conf.del_config()
1291 # deleting an unknown notification is not considered an error
1292 assert_equal(status, 200)
1293
1294 _, status = topic_conf.get_config()
1295 assert_equal(status, 404)
1296
1297 # cleanup
1298 # delete the bucket
1299 master_zone.delete_bucket(bucket_name)
1300
1301
1302 def test_objcet_timing():
1303 return SkipTest("only used in manual testing")
1304 master_zone, _ = init_env(require_ps=False)
1305
1306 # create bucket
1307 bucket_name = gen_bucket_name()
1308 bucket = master_zone.create_bucket(bucket_name)
1309 # create objects in the bucket (async)
1310 print('creating objects...')
1311 number_of_objects = 1000
1312 client_threads = []
1313 start_time = time.time()
1314 content = str(bytearray(os.urandom(1024*1024)))
1315 for i in range(number_of_objects):
1316 key = bucket.new_key(str(i))
1317 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1318 thr.start()
1319 client_threads.append(thr)
1320 [thr.join() for thr in client_threads]
1321
1322 time_diff = time.time() - start_time
1323 print('average time for object creation: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1324
1325 print('total number of objects: ' + str(len(list(bucket.list()))))
1326
1327 print('deleting objects...')
1328 client_threads = []
1329 start_time = time.time()
1330 for key in bucket.list():
1331 thr = threading.Thread(target = key.delete, args=())
1332 thr.start()
1333 client_threads.append(thr)
1334 [thr.join() for thr in client_threads]
1335
1336 time_diff = time.time() - start_time
1337 print('average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1338
1339 # cleanup
1340 master_zone.delete_bucket(bucket_name)
1341
1342
1343 def test_ps_s3_notification_push_amqp_on_master():
1344 """ test pushing amqp s3 notification on master """
1345 if skip_push_tests:
1346 return SkipTest("PubSub push tests don't run in teuthology")
1347 hostname = get_ip()
1348 proc = init_rabbitmq()
1349 if proc is None:
1350 return SkipTest('end2end amqp tests require rabbitmq-server installed')
1351 master_zone, _ = init_env(require_ps=False)
1352 realm = get_realm()
1353 zonegroup = realm.master_zonegroup()
1354
1355 # create bucket
1356 bucket_name = gen_bucket_name()
1357 bucket = master_zone.create_bucket(bucket_name)
1358 topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
1359 topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
1360
1361 # start amqp receivers
1362 exchange = 'ex1'
1363 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
1364 task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
1365 task1.start()
1366 task2.start()
1367
1368 # create two s3 topic
1369 endpoint_address = 'amqp://' + hostname
1370 # with acks from broker
1371 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
1372 topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
1373 topic_arn1 = topic_conf1.set_config()
1374 # without acks from broker
1375 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
1376 topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
1377 topic_arn2 = topic_conf2.set_config()
1378 # create s3 notification
1379 notification_name = bucket_name + NOTIFICATION_SUFFIX
1380 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
1381 'Events': []
1382 },
1383 {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
1384 'Events': ['s3:ObjectCreated:*']
1385 }]
1386
1387 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1388 response, status = s3_notification_conf.set_config()
1389 assert_equal(status/100, 2)
1390
1391 # create objects in the bucket (async)
1392 number_of_objects = 100
1393 client_threads = []
1394 start_time = time.time()
1395 for i in range(number_of_objects):
1396 key = bucket.new_key(str(i))
1397 content = str(os.urandom(1024*1024))
1398 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1399 thr.start()
1400 client_threads.append(thr)
1401 [thr.join() for thr in client_threads]
1402
1403 time_diff = time.time() - start_time
1404 print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1405
1406 print('wait for 5sec for the messages...')
1407 time.sleep(5)
1408
1409 # check amqp receiver
1410 keys = list(bucket.list())
1411 print('total number of objects: ' + str(len(keys)))
1412 receiver1.verify_s3_events(keys, exact_match=True)
1413 receiver2.verify_s3_events(keys, exact_match=True)
1414
1415 # delete objects from the bucket
1416 client_threads = []
1417 start_time = time.time()
1418 for key in bucket.list():
1419 thr = threading.Thread(target = key.delete, args=())
1420 thr.start()
1421 client_threads.append(thr)
1422 [thr.join() for thr in client_threads]
1423
1424 time_diff = time.time() - start_time
1425 print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1426
1427 print('wait for 5sec for the messages...')
1428 time.sleep(5)
1429
1430 # check amqp receiver 1 for deletions
1431 receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
1432 # check amqp receiver 2 has no deletions
1433 try:
1434 receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
1435 except:
1436 pass
1437 else:
1438 err = 'amqp receiver 2 should have no deletions'
1439 assert False, err
1440
1441 # cleanup
1442 stop_amqp_receiver(receiver1, task1)
1443 stop_amqp_receiver(receiver2, task2)
1444 s3_notification_conf.del_config()
1445 topic_conf1.del_config()
1446 topic_conf2.del_config()
1447 # delete the bucket
1448 master_zone.delete_bucket(bucket_name)
1449 clean_rabbitmq(proc)
1450
1451
1452 def test_ps_s3_notification_push_kafka():
1453 """ test pushing kafka s3 notification on master """
1454 if skip_push_tests:
1455 return SkipTest("PubSub push tests don't run in teuthology")
1456 kafka_proc, zk_proc, kafka_log = init_kafka()
1457 if kafka_proc is None or zk_proc is None:
1458 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1459
1460 master_zone, ps_zone = init_env()
1461 realm = get_realm()
1462 zonegroup = realm.master_zonegroup()
1463
1464 # create bucket
1465 bucket_name = gen_bucket_name()
1466 bucket = master_zone.create_bucket(bucket_name)
1467 # wait for sync
1468 zone_meta_checkpoint(ps_zone.zone)
1469 # name is constant for manual testing
1470 topic_name = bucket_name+'_topic'
1471 # create consumer on the topic
1472 task, receiver = create_kafka_receiver_thread(topic_name)
1473 task.start()
1474
1475 # create topic
1476 topic_conf = PSTopic(ps_zone.conn, topic_name,
1477 endpoint='kafka://' + kafka_server,
1478 endpoint_args='kafka-ack-level=broker')
1479 result, status = topic_conf.set_config()
1480 assert_equal(status/100, 2)
1481 parsed_result = json.loads(result)
1482 topic_arn = parsed_result['arn']
1483 # create s3 notification
1484 notification_name = bucket_name + NOTIFICATION_SUFFIX
1485 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
1486 'Events': []
1487 }]
1488
1489 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1490 response, status = s3_notification_conf.set_config()
1491 assert_equal(status/100, 2)
1492
1493 # create objects in the bucket (async)
1494 number_of_objects = 10
1495 client_threads = []
1496 for i in range(number_of_objects):
1497 key = bucket.new_key(str(i))
1498 content = str(os.urandom(1024*1024))
1499 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1500 thr.start()
1501 client_threads.append(thr)
1502 [thr.join() for thr in client_threads]
1503
1504 # wait for sync
1505 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1506 keys = list(bucket.list())
1507 receiver.verify_s3_events(keys, exact_match=True)
1508
1509 # delete objects from the bucket
1510 client_threads = []
1511 for key in bucket.list():
1512 thr = threading.Thread(target = key.delete, args=())
1513 thr.start()
1514 client_threads.append(thr)
1515 [thr.join() for thr in client_threads]
1516
1517 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1518 receiver.verify_s3_events(keys, exact_match=True, deletions=True)
1519
1520 # cleanup
1521 s3_notification_conf.del_config()
1522 topic_conf.del_config()
1523 # delete the bucket
1524 master_zone.delete_bucket(bucket_name)
1525 stop_kafka_receiver(receiver, task)
1526 clean_kafka(kafka_proc, zk_proc, kafka_log)
1527
1528
1529 def test_ps_s3_notification_push_kafka_on_master():
1530 """ test pushing kafka s3 notification on master """
1531 if skip_push_tests:
1532 return SkipTest("PubSub push tests don't run in teuthology")
1533 kafka_proc, zk_proc, kafka_log = init_kafka()
1534 if kafka_proc is None or zk_proc is None:
1535 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1536 master_zone, _ = init_env(require_ps=False)
1537 realm = get_realm()
1538 zonegroup = realm.master_zonegroup()
1539
1540 # create bucket
1541 bucket_name = gen_bucket_name()
1542 bucket = master_zone.create_bucket(bucket_name)
1543 # name is constant for manual testing
1544 topic_name = bucket_name+'_topic'
1545 # create consumer on the topic
1546 task, receiver = create_kafka_receiver_thread(topic_name+'_1')
1547 task.start()
1548
1549 # create s3 topic
1550 endpoint_address = 'kafka://' + kafka_server
1551 # without acks from broker
1552 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
1553 topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
1554 topic_arn1 = topic_conf1.set_config()
1555 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
1556 topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
1557 topic_arn2 = topic_conf2.set_config()
1558 # create s3 notification
1559 notification_name = bucket_name + NOTIFICATION_SUFFIX
1560 topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
1561 'Events': []
1562 },
1563 {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
1564 'Events': []
1565 }]
1566
1567 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1568 response, status = s3_notification_conf.set_config()
1569 assert_equal(status/100, 2)
1570
1571 # create objects in the bucket (async)
1572 number_of_objects = 10
1573 client_threads = []
1574 start_time = time.time()
1575 for i in range(number_of_objects):
1576 key = bucket.new_key(str(i))
1577 content = str(os.urandom(1024*1024))
1578 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1579 thr.start()
1580 client_threads.append(thr)
1581 [thr.join() for thr in client_threads]
1582
1583 time_diff = time.time() - start_time
1584 print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1585
1586 print('wait for 5sec for the messages...')
1587 time.sleep(5)
1588 keys = list(bucket.list())
1589 receiver.verify_s3_events(keys, exact_match=True)
1590
1591 # delete objects from the bucket
1592 client_threads = []
1593 start_time = time.time()
1594 for key in bucket.list():
1595 thr = threading.Thread(target = key.delete, args=())
1596 thr.start()
1597 client_threads.append(thr)
1598 [thr.join() for thr in client_threads]
1599
1600 time_diff = time.time() - start_time
1601 print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1602
1603 print('wait for 5sec for the messages...')
1604 time.sleep(5)
1605 receiver.verify_s3_events(keys, exact_match=True, deletions=True)
1606
1607 # cleanup
1608 s3_notification_conf.del_config()
1609 topic_conf1.del_config()
1610 topic_conf2.del_config()
1611 # delete the bucket
1612 master_zone.delete_bucket(bucket_name)
1613 stop_kafka_receiver(receiver, task)
1614 clean_kafka(kafka_proc, zk_proc, kafka_log)
1615
1616
1617 def kafka_security(security_type):
1618 """ test pushing kafka s3 notification on master """
1619 if skip_push_tests:
1620 return SkipTest("PubSub push tests don't run in teuthology")
1621 master_zone, _ = init_env(require_ps=False)
1622 if security_type == 'SSL_SASL' and master_zone.secure_conn is None:
1623 return SkipTest("secure connection is needed to test SASL_SSL security")
1624 kafka_proc, zk_proc, kafka_log = init_kafka()
1625 if kafka_proc is None or zk_proc is None:
1626 return SkipTest('end2end kafka tests require kafka/zookeeper installed')
1627 realm = get_realm()
1628 zonegroup = realm.master_zonegroup()
1629
1630 # create bucket
1631 bucket_name = gen_bucket_name()
1632 bucket = master_zone.create_bucket(bucket_name)
1633 # name is constant for manual testing
1634 topic_name = bucket_name+'_topic'
1635 # create consumer on the topic
1636 task, receiver = create_kafka_receiver_thread(topic_name)
1637 task.start()
1638
1639 # create s3 topic
1640 if security_type == 'SSL_SASL':
1641 endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
1642 else:
1643 # ssl only
1644 endpoint_address = 'kafka://' + kafka_server + ':9093'
1645
1646 KAFKA_DIR = os.environ['KAFKA_DIR']
1647
1648 # without acks from broker, with root CA
1649 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt'
1650
1651 if security_type == 'SSL_SASL':
1652 topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1653 else:
1654 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1655
1656 topic_arn = topic_conf.set_config()
1657 # create s3 notification
1658 notification_name = bucket_name + NOTIFICATION_SUFFIX
1659 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
1660 'Events': []
1661 }]
1662
1663 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1664 s3_notification_conf.set_config()
1665
1666 # create objects in the bucket (async)
1667 number_of_objects = 10
1668 client_threads = []
1669 start_time = time.time()
1670 for i in range(number_of_objects):
1671 key = bucket.new_key(str(i))
1672 content = str(os.urandom(1024*1024))
1673 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1674 thr.start()
1675 client_threads.append(thr)
1676 [thr.join() for thr in client_threads]
1677
1678 time_diff = time.time() - start_time
1679 print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1680
1681 try:
1682 print('wait for 5sec for the messages...')
1683 time.sleep(5)
1684 keys = list(bucket.list())
1685 receiver.verify_s3_events(keys, exact_match=True)
1686
1687 # delete objects from the bucket
1688 client_threads = []
1689 start_time = time.time()
1690 for key in bucket.list():
1691 thr = threading.Thread(target = key.delete, args=())
1692 thr.start()
1693 client_threads.append(thr)
1694 [thr.join() for thr in client_threads]
1695
1696 time_diff = time.time() - start_time
1697 print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1698
1699 print('wait for 5sec for the messages...')
1700 time.sleep(5)
1701 receiver.verify_s3_events(keys, exact_match=True, deletions=True)
1702 except Exception as err:
1703 assert False, str(err)
1704 finally:
1705 # cleanup
1706 s3_notification_conf.del_config()
1707 topic_conf.del_config()
1708 # delete the bucket
1709 for key in bucket.list():
1710 key.delete()
1711 master_zone.delete_bucket(bucket_name)
1712 stop_kafka_receiver(receiver, task)
1713 clean_kafka(kafka_proc, zk_proc, kafka_log)
1714
1715
1716 def test_ps_s3_notification_push_kafka_security_ssl():
1717 kafka_security('SSL')
1718
1719 def test_ps_s3_notification_push_kafka_security_ssl_sasl():
1720 kafka_security('SSL_SASL')
1721
1722
1723 def test_ps_s3_notification_multi_delete_on_master():
1724 """ test deletion of multiple keys on master """
1725 if skip_push_tests:
1726 return SkipTest("PubSub push tests don't run in teuthology")
1727 hostname = get_ip()
1728 zones, _ = init_env(require_ps=False)
1729 realm = get_realm()
1730 zonegroup = realm.master_zonegroup()
1731
1732 # create random port for the http server
1733 host = get_ip()
1734 port = random.randint(10000, 20000)
1735 # start an http server in a separate thread
1736 number_of_objects = 10
1737 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1738
1739 # create bucket
1740 bucket_name = gen_bucket_name()
1741 bucket = zones[0].create_bucket(bucket_name)
1742 topic_name = bucket_name + TOPIC_SUFFIX
1743
1744 # create s3 topic
1745 endpoint_address = 'http://'+host+':'+str(port)
1746 endpoint_args = 'push-endpoint='+endpoint_address
1747 topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1748 topic_arn = topic_conf.set_config()
1749 # create s3 notification
1750 notification_name = bucket_name + NOTIFICATION_SUFFIX
1751 topic_conf_list = [{'Id': notification_name,
1752 'TopicArn': topic_arn,
1753 'Events': ['s3:ObjectRemoved:*']
1754 }]
1755 s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
1756 response, status = s3_notification_conf.set_config()
1757 assert_equal(status/100, 2)
1758
1759 # create objects in the bucket
1760 client_threads = []
1761 for i in range(number_of_objects):
1762 obj_size = randint(1, 1024)
1763 content = str(os.urandom(obj_size))
1764 key = bucket.new_key(str(i))
1765 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1766 thr.start()
1767 client_threads.append(thr)
1768 [thr.join() for thr in client_threads]
1769
1770 keys = list(bucket.list())
1771
1772 start_time = time.time()
1773 delete_all_objects(zones[0].conn, bucket_name)
1774 time_diff = time.time() - start_time
1775 print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1776
1777 print('wait for 5sec for the messages...')
1778 time.sleep(5)
1779
1780 # check http receiver
1781 http_server.verify_s3_events(keys, exact_match=True, deletions=True)
1782
1783 # cleanup
1784 topic_conf.del_config()
1785 s3_notification_conf.del_config(notification=notification_name)
1786 # delete the bucket
1787 zones[0].delete_bucket(bucket_name)
1788 http_server.close()
1789
1790
1791 def test_ps_s3_notification_push_http_on_master():
1792 """ test pushing http s3 notification on master """
1793 if skip_push_tests:
1794 return SkipTest("PubSub push tests don't run in teuthology")
1795 hostname = get_ip()
1796 master_zone, _ = init_env(require_ps=False)
1797 realm = get_realm()
1798 zonegroup = realm.master_zonegroup()
1799
1800 # create random port for the http server
1801 host = get_ip()
1802 port = random.randint(10000, 20000)
1803 # start an http server in a separate thread
1804 number_of_objects = 10
1805 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1806
1807 # create bucket
1808 bucket_name = gen_bucket_name()
1809 bucket = master_zone.create_bucket(bucket_name)
1810 topic_name = bucket_name + TOPIC_SUFFIX
1811
1812 # create s3 topic
1813 endpoint_address = 'http://'+host+':'+str(port)
1814 endpoint_args = 'push-endpoint='+endpoint_address
1815 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1816 topic_arn = topic_conf.set_config()
1817 # create s3 notification
1818 notification_name = bucket_name + NOTIFICATION_SUFFIX
1819 topic_conf_list = [{'Id': notification_name,
1820 'TopicArn': topic_arn,
1821 'Events': []
1822 }]
1823 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1824 response, status = s3_notification_conf.set_config()
1825 assert_equal(status/100, 2)
1826
1827 # create objects in the bucket
1828 client_threads = []
1829 start_time = time.time()
1830 content = 'bar'
1831 for i in range(number_of_objects):
1832 key = bucket.new_key(str(i))
1833 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1834 thr.start()
1835 client_threads.append(thr)
1836 [thr.join() for thr in client_threads]
1837
1838 time_diff = time.time() - start_time
1839 print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1840
1841 print('wait for 5sec for the messages...')
1842 time.sleep(5)
1843
1844 # check http receiver
1845 keys = list(bucket.list())
1846 print('total number of objects: ' + str(len(keys)))
1847 http_server.verify_s3_events(keys, exact_match=True)
1848
1849 # delete objects from the bucket
1850 client_threads = []
1851 start_time = time.time()
1852 for key in bucket.list():
1853 thr = threading.Thread(target = key.delete, args=())
1854 thr.start()
1855 client_threads.append(thr)
1856 [thr.join() for thr in client_threads]
1857
1858 time_diff = time.time() - start_time
1859 print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1860
1861 print('wait for 5sec for the messages...')
1862 time.sleep(5)
1863
1864 # check http receiver
1865 http_server.verify_s3_events(keys, exact_match=True, deletions=True)
1866
1867 # cleanup
1868 topic_conf.del_config()
1869 s3_notification_conf.del_config(notification=notification_name)
1870 # delete the bucket
1871 master_zone.delete_bucket(bucket_name)
1872 http_server.close()
1873
1874
1875 def test_ps_s3_opaque_data():
1876 """ test that opaque id set in topic, is sent in notification """
1877 if skip_push_tests:
1878 return SkipTest("PubSub push tests don't run in teuthology")
1879 hostname = get_ip()
1880 master_zone, ps_zone = init_env()
1881 realm = get_realm()
1882 zonegroup = realm.master_zonegroup()
1883
1884 # create random port for the http server
1885 host = get_ip()
1886 port = random.randint(10000, 20000)
1887 # start an http server in a separate thread
1888 number_of_objects = 10
1889 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1890
1891 # create bucket
1892 bucket_name = gen_bucket_name()
1893 bucket = master_zone.create_bucket(bucket_name)
1894 topic_name = bucket_name + TOPIC_SUFFIX
1895 # wait for sync
1896 zone_meta_checkpoint(ps_zone.zone)
1897
1898 # create s3 topic
1899 endpoint_address = 'http://'+host+':'+str(port)
1900 opaque_data = 'http://1.2.3.4:8888'
1901 endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
1902 topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
1903 result, status = topic_conf.set_config()
1904 assert_equal(status/100, 2)
1905 parsed_result = json.loads(result)
1906 topic_arn = parsed_result['arn']
1907 # create s3 notification
1908 notification_name = bucket_name + NOTIFICATION_SUFFIX
1909 topic_conf_list = [{'Id': notification_name,
1910 'TopicArn': topic_arn,
1911 'Events': []
1912 }]
1913 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1914 response, status = s3_notification_conf.set_config()
1915 assert_equal(status/100, 2)
1916
1917 # create objects in the bucket
1918 client_threads = []
1919 content = 'bar'
1920 for i in range(number_of_objects):
1921 key = bucket.new_key(str(i))
1922 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1923 thr.start()
1924 client_threads.append(thr)
1925 [thr.join() for thr in client_threads]
1926
1927 # wait for sync
1928 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1929
1930 # check http receiver
1931 keys = list(bucket.list())
1932 print('total number of objects: ' + str(len(keys)))
1933 events = http_server.get_and_reset_events()
1934 for event in events:
1935 assert_equal(event['Records'][0]['opaqueData'], opaque_data)
1936
1937 # cleanup
1938 for key in keys:
1939 key.delete()
1940 [thr.join() for thr in client_threads]
1941 topic_conf.del_config()
1942 s3_notification_conf.del_config(notification=notification_name)
1943 # delete the bucket
1944 master_zone.delete_bucket(bucket_name)
1945 http_server.close()
1946
1947
1948 def test_ps_s3_opaque_data_on_master():
1949 """ test that opaque id set in topic, is sent in notification on master """
1950 if skip_push_tests:
1951 return SkipTest("PubSub push tests don't run in teuthology")
1952 hostname = get_ip()
1953 master_zone, _ = init_env(require_ps=False)
1954 realm = get_realm()
1955 zonegroup = realm.master_zonegroup()
1956
1957 # create random port for the http server
1958 host = get_ip()
1959 port = random.randint(10000, 20000)
1960 # start an http server in a separate thread
1961 number_of_objects = 10
1962 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1963
1964 # create bucket
1965 bucket_name = gen_bucket_name()
1966 bucket = master_zone.create_bucket(bucket_name)
1967 topic_name = bucket_name + TOPIC_SUFFIX
1968
1969 # create s3 topic
1970 endpoint_address = 'http://'+host+':'+str(port)
1971 endpoint_args = 'push-endpoint='+endpoint_address
1972 opaque_data = 'http://1.2.3.4:8888'
1973 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
1974 topic_arn = topic_conf.set_config()
1975 # create s3 notification
1976 notification_name = bucket_name + NOTIFICATION_SUFFIX
1977 topic_conf_list = [{'Id': notification_name,
1978 'TopicArn': topic_arn,
1979 'Events': []
1980 }]
1981 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
1982 response, status = s3_notification_conf.set_config()
1983 assert_equal(status/100, 2)
1984
1985 # create objects in the bucket
1986 client_threads = []
1987 start_time = time.time()
1988 content = 'bar'
1989 for i in range(number_of_objects):
1990 key = bucket.new_key(str(i))
1991 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1992 thr.start()
1993 client_threads.append(thr)
1994 [thr.join() for thr in client_threads]
1995
1996 time_diff = time.time() - start_time
1997 print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1998
1999 print('wait for 5sec for the messages...')
2000 time.sleep(5)
2001
2002 # check http receiver
2003 keys = list(bucket.list())
2004 print('total number of objects: ' + str(len(keys)))
2005 events = http_server.get_and_reset_events()
2006 for event in events:
2007 assert_equal(event['Records'][0]['opaqueData'], opaque_data)
2008
2009 # cleanup
2010 for key in keys:
2011 key.delete()
2012 [thr.join() for thr in client_threads]
2013 topic_conf.del_config()
2014 s3_notification_conf.del_config(notification=notification_name)
2015 # delete the bucket
2016 master_zone.delete_bucket(bucket_name)
2017 http_server.close()
2018
2019 def test_ps_topic():
2020 """ test set/get/delete of topic """
2021 _, ps_zone = init_env()
2022 realm = get_realm()
2023 zonegroup = realm.master_zonegroup()
2024 bucket_name = gen_bucket_name()
2025 topic_name = bucket_name+TOPIC_SUFFIX
2026
2027 # create topic
2028 topic_conf = PSTopic(ps_zone.conn, topic_name)
2029 _, status = topic_conf.set_config()
2030 assert_equal(status/100, 2)
2031 # get topic
2032 result, _ = topic_conf.get_config()
2033 # verify topic content
2034 parsed_result = json.loads(result)
2035 assert_equal(parsed_result['topic']['name'], topic_name)
2036 assert_equal(len(parsed_result['subs']), 0)
2037 assert_equal(parsed_result['topic']['arn'],
2038 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
2039 # delete topic
2040 _, status = topic_conf.del_config()
2041 assert_equal(status/100, 2)
2042 # verift topic is deleted
2043 result, status = topic_conf.get_config()
2044 assert_equal(status, 404)
2045 parsed_result = json.loads(result)
2046 assert_equal(parsed_result['Code'], 'NoSuchKey')
2047
2048
2049 def test_ps_topic_with_endpoint():
2050 """ test set topic with endpoint"""
2051 _, ps_zone = init_env()
2052 bucket_name = gen_bucket_name()
2053 topic_name = bucket_name+TOPIC_SUFFIX
2054
2055 # create topic
2056 dest_endpoint = 'amqp://localhost:7001'
2057 dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
2058 topic_conf = PSTopic(ps_zone.conn, topic_name,
2059 endpoint=dest_endpoint,
2060 endpoint_args=dest_args)
2061 _, status = topic_conf.set_config()
2062 assert_equal(status/100, 2)
2063 # get topic
2064 result, _ = topic_conf.get_config()
2065 # verify topic content
2066 parsed_result = json.loads(result)
2067 assert_equal(parsed_result['topic']['name'], topic_name)
2068 assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint)
2069 # cleanup
2070 topic_conf.del_config()
2071
2072
2073 def test_ps_notification():
2074 """ test set/get/delete of notification """
2075 master_zone, ps_zone = init_env()
2076 bucket_name = gen_bucket_name()
2077 topic_name = bucket_name+TOPIC_SUFFIX
2078
2079 # create topic
2080 topic_conf = PSTopic(ps_zone.conn, topic_name)
2081 topic_conf.set_config()
2082 # create bucket on the first of the rados zones
2083 master_zone.create_bucket(bucket_name)
2084 # wait for sync
2085 zone_meta_checkpoint(ps_zone.zone)
2086 # create notifications
2087 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2088 topic_name)
2089 _, status = notification_conf.set_config()
2090 assert_equal(status/100, 2)
2091 # get notification
2092 result, _ = notification_conf.get_config()
2093 parsed_result = json.loads(result)
2094 assert_equal(len(parsed_result['topics']), 1)
2095 assert_equal(parsed_result['topics'][0]['topic']['name'],
2096 topic_name)
2097 # delete notification
2098 _, status = notification_conf.del_config()
2099 assert_equal(status/100, 2)
2100 result, status = notification_conf.get_config()
2101 parsed_result = json.loads(result)
2102 assert_equal(len(parsed_result['topics']), 0)
2103 # TODO should return 404
2104 # assert_equal(status, 404)
2105
2106 # cleanup
2107 topic_conf.del_config()
2108 master_zone.delete_bucket(bucket_name)
2109
2110
2111 def test_ps_notification_events():
2112 """ test set/get/delete of notification on specific events"""
2113 master_zone, ps_zone = init_env()
2114 bucket_name = gen_bucket_name()
2115 topic_name = bucket_name+TOPIC_SUFFIX
2116
2117 # create topic
2118 topic_conf = PSTopic(ps_zone.conn, topic_name)
2119 topic_conf.set_config()
2120 # create bucket on the first of the rados zones
2121 master_zone.create_bucket(bucket_name)
2122 # wait for sync
2123 zone_meta_checkpoint(ps_zone.zone)
2124 # create notifications
2125 events = "OBJECT_CREATE,OBJECT_DELETE"
2126 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2127 topic_name,
2128 events)
2129 _, status = notification_conf.set_config()
2130 assert_equal(status/100, 2)
2131 # get notification
2132 result, _ = notification_conf.get_config()
2133 parsed_result = json.loads(result)
2134 assert_equal(len(parsed_result['topics']), 1)
2135 assert_equal(parsed_result['topics'][0]['topic']['name'],
2136 topic_name)
2137 assert_not_equal(len(parsed_result['topics'][0]['events']), 0)
2138 # TODO add test for invalid event name
2139
2140 # cleanup
2141 notification_conf.del_config()
2142 topic_conf.del_config()
2143 master_zone.delete_bucket(bucket_name)
2144
2145
2146 def test_ps_subscription():
2147 """ test set/get/delete of subscription """
2148 master_zone, ps_zone = init_env()
2149 bucket_name = gen_bucket_name()
2150 topic_name = bucket_name+TOPIC_SUFFIX
2151
2152 # create topic
2153 topic_conf = PSTopic(ps_zone.conn, topic_name)
2154 topic_conf.set_config()
2155 # create bucket on the first of the rados zones
2156 bucket = master_zone.create_bucket(bucket_name)
2157 # wait for sync
2158 zone_meta_checkpoint(ps_zone.zone)
2159 # create notifications
2160 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2161 topic_name)
2162 _, status = notification_conf.set_config()
2163 assert_equal(status/100, 2)
2164 # create subscription
2165 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
2166 topic_name)
2167 _, status = sub_conf.set_config()
2168 assert_equal(status/100, 2)
2169 # get the subscription
2170 result, _ = sub_conf.get_config()
2171 parsed_result = json.loads(result)
2172 assert_equal(parsed_result['topic'], topic_name)
2173 # create objects in the bucket
2174 number_of_objects = 10
2175 for i in range(number_of_objects):
2176 key = bucket.new_key(str(i))
2177 key.set_contents_from_string('bar')
2178 # wait for sync
2179 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2180
2181 # get the create events from the subscription
2182 result, _ = sub_conf.get_events()
2183 events = json.loads(result)
2184 for event in events['events']:
2185 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2186 keys = list(bucket.list())
2187 # TODO: use exact match
2188 verify_events_by_elements(events, keys, exact_match=False)
2189 # delete objects from the bucket
2190 for key in bucket.list():
2191 key.delete()
2192 # wait for sync
2193 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2194
2195 # get the delete events from the subscriptions
2196 #result, _ = sub_conf.get_events()
2197 #for event in events['events']:
2198 # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2199 # TODO: check deletions
2200 # TODO: use exact match
2201 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
2202 # we should see the creations as well as the deletions
2203 # delete subscription
2204 _, status = sub_conf.del_config()
2205 assert_equal(status/100, 2)
2206 result, status = sub_conf.get_config()
2207 parsed_result = json.loads(result)
2208 assert_equal(parsed_result['topic'], '')
2209 # TODO should return 404
2210 # assert_equal(status, 404)
2211
2212 # cleanup
2213 notification_conf.del_config()
2214 topic_conf.del_config()
2215 master_zone.delete_bucket(bucket_name)
2216
2217 def test_ps_incremental_sync():
2218 """ test that events are only sent on incremental sync """
2219 master_zone, ps_zone = init_env()
2220 bucket_name = gen_bucket_name()
2221 topic_name = bucket_name+TOPIC_SUFFIX
2222
2223 # create topic
2224 topic_conf = PSTopic(ps_zone.conn, topic_name)
2225 topic_conf.set_config()
2226 # create bucket on the first of the rados zones
2227 bucket = master_zone.create_bucket(bucket_name)
2228 # create objects in the bucket
2229 number_of_objects = 10
2230 for i in range(0, number_of_objects):
2231 key = bucket.new_key(str(i))
2232 key.set_contents_from_string('foo')
2233 # wait for sync
2234 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2235 # create notifications
2236 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2237 topic_name)
2238 _, status = notification_conf.set_config()
2239 assert_equal(status/100, 2)
2240 # create subscription
2241 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
2242 topic_name)
2243 _, status = sub_conf.set_config()
2244 assert_equal(status/100, 2)
2245
2246 # create more objects in the bucket
2247 for i in range(number_of_objects, 2*number_of_objects):
2248 key = bucket.new_key(str(i))
2249 key.set_contents_from_string('bar')
2250 # wait for sync
2251 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2252
2253 # get the create events from the subscription
2254 result, _ = sub_conf.get_events()
2255 events = json.loads(result)
2256 count = 0
2257 for event in events['events']:
2258 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2259 count += 1
2260
2261 # make sure we have 10 and not 20 events
2262 assert_equal(count, number_of_objects)
2263
2264 # cleanup
2265 for key in bucket.list():
2266 key.delete()
2267 sub_conf.del_config()
2268 notification_conf.del_config()
2269 topic_conf.del_config()
2270 master_zone.delete_bucket(bucket_name)
2271
2272 def test_ps_event_type_subscription():
2273 """ test subscriptions for different events """
2274 master_zone, ps_zone = init_env()
2275 bucket_name = gen_bucket_name()
2276
2277 # create topic for objects creation
2278 topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
2279 topic_create_conf = PSTopic(ps_zone.conn, topic_create_name)
2280 topic_create_conf.set_config()
2281 # create topic for objects deletion
2282 topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete'
2283 topic_delete_conf = PSTopic(ps_zone.conn, topic_delete_name)
2284 topic_delete_conf.set_config()
2285 # create topic for all events
2286 topic_name = bucket_name+TOPIC_SUFFIX+'_all'
2287 topic_conf = PSTopic(ps_zone.conn, topic_name)
2288 topic_conf.set_config()
2289 # create bucket on the first of the rados zones
2290 bucket = master_zone.create_bucket(bucket_name)
2291 # wait for sync
2292 zone_meta_checkpoint(ps_zone.zone)
2293 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2294 # create notifications for objects creation
2295 notification_create_conf = PSNotification(ps_zone.conn, bucket_name,
2296 topic_create_name, "OBJECT_CREATE")
2297 _, status = notification_create_conf.set_config()
2298 assert_equal(status/100, 2)
2299 # create notifications for objects deletion
2300 notification_delete_conf = PSNotification(ps_zone.conn, bucket_name,
2301 topic_delete_name, "OBJECT_DELETE")
2302 _, status = notification_delete_conf.set_config()
2303 assert_equal(status/100, 2)
2304 # create notifications for all events
2305 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2306 topic_name, "OBJECT_DELETE,OBJECT_CREATE")
2307 _, status = notification_conf.set_config()
2308 assert_equal(status/100, 2)
2309 # create subscription for objects creation
2310 sub_create_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_create',
2311 topic_create_name)
2312 _, status = sub_create_conf.set_config()
2313 assert_equal(status/100, 2)
2314 # create subscription for objects deletion
2315 sub_delete_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_delete',
2316 topic_delete_name)
2317 _, status = sub_delete_conf.set_config()
2318 assert_equal(status/100, 2)
2319 # create subscription for all events
2320 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_all',
2321 topic_name)
2322 _, status = sub_conf.set_config()
2323 assert_equal(status/100, 2)
2324 # create objects in the bucket
2325 number_of_objects = 10
2326 for i in range(number_of_objects):
2327 key = bucket.new_key(str(i))
2328 key.set_contents_from_string('bar')
2329 # wait for sync
2330 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2331
2332 # get the events from the creation subscription
2333 result, _ = sub_create_conf.get_events()
2334 events = json.loads(result)
2335 for event in events['events']:
2336 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
2337 '" type: "' + str(event['event']) + '"')
2338 keys = list(bucket.list())
2339 # TODO: use exact match
2340 verify_events_by_elements(events, keys, exact_match=False)
2341 # get the events from the deletions subscription
2342 result, _ = sub_delete_conf.get_events()
2343 events = json.loads(result)
2344 for event in events['events']:
2345 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
2346 '" type: "' + str(event['event']) + '"')
2347 assert_equal(len(events['events']), 0)
2348 # get the events from the all events subscription
2349 result, _ = sub_conf.get_events()
2350 events = json.loads(result)
2351 for event in events['events']:
2352 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
2353 str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2354 # TODO: use exact match
2355 verify_events_by_elements(events, keys, exact_match=False)
2356 # delete objects from the bucket
2357 for key in bucket.list():
2358 key.delete()
2359 # wait for sync
2360 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2361 log.debug("Event (OBJECT_DELETE) synced")
2362
2363 # get the events from the creations subscription
2364 result, _ = sub_create_conf.get_events()
2365 events = json.loads(result)
2366 for event in events['events']:
2367 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
2368 '" type: "' + str(event['event']) + '"')
2369 # deletions should not change the creation events
2370 # TODO: use exact match
2371 verify_events_by_elements(events, keys, exact_match=False)
2372 # get the events from the deletions subscription
2373 result, _ = sub_delete_conf.get_events()
2374 events = json.loads(result)
2375 for event in events['events']:
2376 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
2377 '" type: "' + str(event['event']) + '"')
2378 # only deletions should be listed here
2379 # TODO: use exact match
2380 verify_events_by_elements(events, keys, exact_match=False, deletions=True)
2381 # get the events from the all events subscription
2382 result, _ = sub_create_conf.get_events()
2383 events = json.loads(result)
2384 for event in events['events']:
2385 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
2386 '" type: "' + str(event['event']) + '"')
2387 # both deletions and creations should be here
2388 # TODO: use exact match
2389 verify_events_by_elements(events, keys, exact_match=False, deletions=False)
2390 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
2391 # TODO: (1) test deletions (2) test overall number of events
2392
2393 # test subscription deletion when topic is specified
2394 _, status = sub_create_conf.del_config(topic=True)
2395 assert_equal(status/100, 2)
2396 _, status = sub_delete_conf.del_config(topic=True)
2397 assert_equal(status/100, 2)
2398 _, status = sub_conf.del_config(topic=True)
2399 assert_equal(status/100, 2)
2400
2401 # cleanup
2402 notification_create_conf.del_config()
2403 notification_delete_conf.del_config()
2404 notification_conf.del_config()
2405 topic_create_conf.del_config()
2406 topic_delete_conf.del_config()
2407 topic_conf.del_config()
2408 master_zone.delete_bucket(bucket_name)
2409
2410
2411 def test_ps_event_fetching():
2412 """ test incremental fetching of events from a subscription """
2413 master_zone, ps_zone = init_env()
2414 bucket_name = gen_bucket_name()
2415 topic_name = bucket_name+TOPIC_SUFFIX
2416
2417 # create topic
2418 topic_conf = PSTopic(ps_zone.conn, topic_name)
2419 topic_conf.set_config()
2420 # create bucket on the first of the rados zones
2421 bucket = master_zone.create_bucket(bucket_name)
2422 # wait for sync
2423 zone_meta_checkpoint(ps_zone.zone)
2424 # create notifications
2425 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2426 topic_name)
2427 _, status = notification_conf.set_config()
2428 assert_equal(status/100, 2)
2429 # create subscription
2430 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
2431 topic_name)
2432 _, status = sub_conf.set_config()
2433 assert_equal(status/100, 2)
2434 # create objects in the bucket
2435 number_of_objects = 100
2436 for i in range(number_of_objects):
2437 key = bucket.new_key(str(i))
2438 key.set_contents_from_string('bar')
2439 # wait for sync
2440 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2441 max_events = 15
2442 total_events_count = 0
2443 next_marker = None
2444 all_events = []
2445 while True:
2446 # get the events from the subscription
2447 result, _ = sub_conf.get_events(max_events, next_marker)
2448 events = json.loads(result)
2449 total_events_count += len(events['events'])
2450 all_events.extend(events['events'])
2451 next_marker = events['next_marker']
2452 for event in events['events']:
2453 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2454 if next_marker == '':
2455 break
2456 keys = list(bucket.list())
2457 # TODO: use exact match
2458 verify_events_by_elements({'events': all_events}, keys, exact_match=False)
2459
2460 # cleanup
2461 sub_conf.del_config()
2462 notification_conf.del_config()
2463 topic_conf.del_config()
2464 for key in bucket.list():
2465 key.delete()
2466 master_zone.delete_bucket(bucket_name)
2467
2468
2469 def test_ps_event_acking():
2470 """ test acking of some events in a subscription """
2471 master_zone, ps_zone = init_env()
2472 bucket_name = gen_bucket_name()
2473 topic_name = bucket_name+TOPIC_SUFFIX
2474
2475 # create topic
2476 topic_conf = PSTopic(ps_zone.conn, topic_name)
2477 topic_conf.set_config()
2478 # create bucket on the first of the rados zones
2479 bucket = master_zone.create_bucket(bucket_name)
2480 # wait for sync
2481 zone_meta_checkpoint(ps_zone.zone)
2482 # create notifications
2483 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2484 topic_name)
2485 _, status = notification_conf.set_config()
2486 assert_equal(status/100, 2)
2487 # create subscription
2488 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
2489 topic_name)
2490 _, status = sub_conf.set_config()
2491 assert_equal(status/100, 2)
2492 # create objects in the bucket
2493 number_of_objects = 10
2494 for i in range(number_of_objects):
2495 key = bucket.new_key(str(i))
2496 key.set_contents_from_string('bar')
2497 # wait for sync
2498 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2499
2500 # get the create events from the subscription
2501 result, _ = sub_conf.get_events()
2502 events = json.loads(result)
2503 original_number_of_events = len(events)
2504 for event in events['events']:
2505 log.debug('Event (before ack) id: "' + str(event['id']) + '"')
2506 keys = list(bucket.list())
2507 # TODO: use exact match
2508 verify_events_by_elements(events, keys, exact_match=False)
2509 # ack half of the events
2510 events_to_ack = number_of_objects/2
2511 for event in events['events']:
2512 if events_to_ack == 0:
2513 break
2514 _, status = sub_conf.ack_events(event['id'])
2515 assert_equal(status/100, 2)
2516 events_to_ack -= 1
2517
2518 # verify that acked events are gone
2519 result, _ = sub_conf.get_events()
2520 events = json.loads(result)
2521 for event in events['events']:
2522 log.debug('Event (after ack) id: "' + str(event['id']) + '"')
2523 assert len(events) >= (original_number_of_events - number_of_objects/2)
2524
2525 # cleanup
2526 sub_conf.del_config()
2527 notification_conf.del_config()
2528 topic_conf.del_config()
2529 for key in bucket.list():
2530 key.delete()
2531 master_zone.delete_bucket(bucket_name)
2532
2533
2534 def test_ps_creation_triggers():
2535 """ test object creation notifications in using put/copy/post """
2536 master_zone, ps_zone = init_env()
2537 bucket_name = gen_bucket_name()
2538 topic_name = bucket_name+TOPIC_SUFFIX
2539
2540 # create topic
2541 topic_conf = PSTopic(ps_zone.conn, topic_name)
2542 topic_conf.set_config()
2543 # create bucket on the first of the rados zones
2544 bucket = master_zone.create_bucket(bucket_name)
2545 # wait for sync
2546 zone_meta_checkpoint(ps_zone.zone)
2547 # create notifications
2548 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2549 topic_name)
2550 _, status = notification_conf.set_config()
2551 assert_equal(status/100, 2)
2552 # create subscription
2553 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
2554 topic_name)
2555 _, status = sub_conf.set_config()
2556 assert_equal(status/100, 2)
2557 # create objects in the bucket using PUT
2558 key = bucket.new_key('put')
2559 key.set_contents_from_string('bar')
2560 # create objects in the bucket using COPY
2561 bucket.copy_key('copy', bucket.name, key.name)
2562 # create objects in the bucket using multi-part upload
2563 fp = tempfile.TemporaryFile(mode='w')
2564 fp.write('bar')
2565 fp.close()
2566 uploader = bucket.initiate_multipart_upload('multipart')
2567 fp = tempfile.TemporaryFile(mode='r')
2568 uploader.upload_part_from_file(fp, 1)
2569 uploader.complete_upload()
2570 fp.close()
2571 # wait for sync
2572 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2573
2574 # get the create events from the subscription
2575 result, _ = sub_conf.get_events()
2576 events = json.loads(result)
2577 for event in events['events']:
2578 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2579
2580 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
2581 assert len(events['events']) >= 3
2582 # cleanup
2583 sub_conf.del_config()
2584 notification_conf.del_config()
2585 topic_conf.del_config()
2586 for key in bucket.list():
2587 key.delete()
2588 master_zone.delete_bucket(bucket_name)
2589
2590
2591 def test_ps_s3_creation_triggers_on_master():
2592 """ test object creation s3 notifications in using put/copy/post on master"""
2593 if skip_push_tests:
2594 return SkipTest("PubSub push tests don't run in teuthology")
2595 hostname = get_ip()
2596 proc = init_rabbitmq()
2597 if proc is None:
2598 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2599 master_zone, _ = init_env(require_ps=False)
2600 realm = get_realm()
2601 zonegroup = realm.master_zonegroup()
2602
2603 # create bucket
2604 bucket_name = gen_bucket_name()
2605 bucket = master_zone.create_bucket(bucket_name)
2606 topic_name = bucket_name + TOPIC_SUFFIX
2607
2608 # start amqp receiver
2609 exchange = 'ex1'
2610 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2611 task.start()
2612
2613 # create s3 topic
2614 endpoint_address = 'amqp://' + hostname
2615 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
2616 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
2617 topic_arn = topic_conf.set_config()
2618 # create s3 notification
2619 notification_name = bucket_name + NOTIFICATION_SUFFIX
2620 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
2621 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
2622 }]
2623
2624 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
2625 response, status = s3_notification_conf.set_config()
2626 assert_equal(status/100, 2)
2627
2628 # create objects in the bucket using PUT
2629 key = bucket.new_key('put')
2630 key.set_contents_from_string('bar')
2631 # create objects in the bucket using COPY
2632 bucket.copy_key('copy', bucket.name, key.name)
2633 # create objects in the bucket using multi-part upload
2634 fp = tempfile.TemporaryFile(mode='w')
2635 fp.write('bar')
2636 fp.close()
2637 uploader = bucket.initiate_multipart_upload('multipart')
2638 fp = tempfile.TemporaryFile(mode='r')
2639 uploader.upload_part_from_file(fp, 1)
2640 uploader.complete_upload()
2641 fp.close()
2642
2643 print('wait for 5sec for the messages...')
2644 time.sleep(5)
2645
2646 # check amqp receiver
2647 keys = list(bucket.list())
2648 receiver.verify_s3_events(keys, exact_match=True)
2649
2650 # cleanup
2651 stop_amqp_receiver(receiver, task)
2652 s3_notification_conf.del_config()
2653 topic_conf.del_config()
2654 for key in bucket.list():
2655 key.delete()
2656 # delete the bucket
2657 master_zone.delete_bucket(bucket_name)
2658 clean_rabbitmq(proc)
2659
2660
2661 def test_ps_s3_multipart_on_master():
2662 """ test multipart object upload on master"""
2663 if skip_push_tests:
2664 return SkipTest("PubSub push tests don't run in teuthology")
2665 hostname = get_ip()
2666 proc = init_rabbitmq()
2667 if proc is None:
2668 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2669 master_zone, _ = init_env(require_ps=False)
2670 realm = get_realm()
2671 zonegroup = realm.master_zonegroup()
2672
2673 # create bucket
2674 bucket_name = gen_bucket_name()
2675 bucket = master_zone.create_bucket(bucket_name)
2676 topic_name = bucket_name + TOPIC_SUFFIX
2677
2678 # start amqp receivers
2679 exchange = 'ex1'
2680 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
2681 task1.start()
2682 task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
2683 task2.start()
2684 task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
2685 task3.start()
2686
2687 # create s3 topics
2688 endpoint_address = 'amqp://' + hostname
2689 endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
2690 topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
2691 topic_arn1 = topic_conf1.set_config()
2692 topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
2693 topic_arn2 = topic_conf2.set_config()
2694 topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
2695 topic_arn3 = topic_conf3.set_config()
2696
2697 # create s3 notifications
2698 notification_name = bucket_name + NOTIFICATION_SUFFIX
2699 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
2700 'Events': ['s3:ObjectCreated:*']
2701 },
2702 {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
2703 'Events': ['s3:ObjectCreated:Post']
2704 },
2705 {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
2706 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
2707 }]
2708 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
2709 response, status = s3_notification_conf.set_config()
2710 assert_equal(status/100, 2)
2711
2712 # create objects in the bucket using multi-part upload
2713 fp = tempfile.TemporaryFile(mode='w+b')
2714 content = bytearray(os.urandom(1024*1024))
2715 fp.write(content)
2716 fp.flush()
2717 fp.seek(0)
2718 uploader = bucket.initiate_multipart_upload('multipart')
2719 uploader.upload_part_from_file(fp, 1)
2720 uploader.complete_upload()
2721 fp.close()
2722
2723 print('wait for 5sec for the messages...')
2724 time.sleep(5)
2725
2726 # check amqp receiver
2727 events = receiver1.get_and_reset_events()
2728 assert_equal(len(events), 3)
2729
2730 events = receiver2.get_and_reset_events()
2731 assert_equal(len(events), 1)
2732 assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post')
2733 assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2')
2734
2735 events = receiver3.get_and_reset_events()
2736 assert_equal(len(events), 1)
2737 assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
2738 assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
2739
2740 # cleanup
2741 stop_amqp_receiver(receiver1, task1)
2742 stop_amqp_receiver(receiver2, task2)
2743 stop_amqp_receiver(receiver3, task3)
2744 s3_notification_conf.del_config()
2745 topic_conf1.del_config()
2746 topic_conf2.del_config()
2747 topic_conf3.del_config()
2748 for key in bucket.list():
2749 key.delete()
2750 # delete the bucket
2751 master_zone.delete_bucket(bucket_name)
2752 clean_rabbitmq(proc)
2753
2754
2755 def test_ps_versioned_deletion():
2756 """ test notification of deletion markers """
2757 master_zone, ps_zone = init_env()
2758 bucket_name = gen_bucket_name()
2759 topic_name = bucket_name+TOPIC_SUFFIX
2760
2761 # create topics
2762 topic_conf1 = PSTopic(ps_zone.conn, topic_name+'_1')
2763 _, status = topic_conf1.set_config()
2764 assert_equal(status/100, 2)
2765 topic_conf2 = PSTopic(ps_zone.conn, topic_name+'_2')
2766 _, status = topic_conf2.set_config()
2767 assert_equal(status/100, 2)
2768
2769 # create bucket on the first of the rados zones
2770 bucket = master_zone.create_bucket(bucket_name)
2771 bucket.configure_versioning(True)
2772
2773 # wait for sync
2774 zone_meta_checkpoint(ps_zone.zone)
2775
2776 # create notifications
2777 event_type1 = 'OBJECT_DELETE'
2778 notification_conf1 = PSNotification(ps_zone.conn, bucket_name,
2779 topic_name+'_1',
2780 event_type1)
2781 _, status = notification_conf1.set_config()
2782 assert_equal(status/100, 2)
2783 event_type2 = 'DELETE_MARKER_CREATE'
2784 notification_conf2 = PSNotification(ps_zone.conn, bucket_name,
2785 topic_name+'_2',
2786 event_type2)
2787 _, status = notification_conf2.set_config()
2788 assert_equal(status/100, 2)
2789
2790 # create subscriptions
2791 sub_conf1 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_1',
2792 topic_name+'_1')
2793 _, status = sub_conf1.set_config()
2794 assert_equal(status/100, 2)
2795 sub_conf2 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_2',
2796 topic_name+'_2')
2797 _, status = sub_conf2.set_config()
2798 assert_equal(status/100, 2)
2799
2800 # create objects in the bucket
2801 key = bucket.new_key('foo')
2802 key.set_contents_from_string('bar')
2803 v1 = key.version_id
2804 key.set_contents_from_string('kaboom')
2805 v2 = key.version_id
2806 # create deletion marker
2807 delete_marker_key = bucket.delete_key(key.name)
2808
2809 # wait for sync
2810 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2811
2812 # delete the deletion marker
2813 delete_marker_key.delete()
2814 # delete versions
2815 bucket.delete_key(key.name, version_id=v2)
2816 bucket.delete_key(key.name, version_id=v1)
2817
2818 # wait for sync
2819 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2820
2821 # get the delete events from the subscription
2822 result, _ = sub_conf1.get_events()
2823 events = json.loads(result)
2824 for event in events['events']:
2825 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2826 assert_equal(str(event['event']), event_type1)
2827
2828 result, _ = sub_conf2.get_events()
2829 events = json.loads(result)
2830 for event in events['events']:
2831 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
2832 assert_equal(str(event['event']), event_type2)
2833
2834 # cleanup
2835 # follwing is needed for the cleanup in the case of 3-zones
2836 # see: http://tracker.ceph.com/issues/39142
2837 realm = get_realm()
2838 zonegroup = realm.master_zonegroup()
2839 zonegroup_conns = ZonegroupConns(zonegroup)
2840 try:
2841 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
2842 master_zone.delete_bucket(bucket_name)
2843 except:
2844 log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
2845 sub_conf1.del_config()
2846 sub_conf2.del_config()
2847 notification_conf1.del_config()
2848 notification_conf2.del_config()
2849 topic_conf1.del_config()
2850 topic_conf2.del_config()
2851
2852
2853 def test_ps_s3_metadata_on_master():
2854 """ test s3 notification of metadata on master """
2855 if skip_push_tests:
2856 return SkipTest("PubSub push tests don't run in teuthology")
2857 hostname = get_ip()
2858 proc = init_rabbitmq()
2859 if proc is None:
2860 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2861 master_zone, _ = init_env(require_ps=False)
2862 realm = get_realm()
2863 zonegroup = realm.master_zonegroup()
2864
2865 # create bucket
2866 bucket_name = gen_bucket_name()
2867 bucket = master_zone.create_bucket(bucket_name)
2868 topic_name = bucket_name + TOPIC_SUFFIX
2869
2870 # start amqp receiver
2871 exchange = 'ex1'
2872 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2873 task.start()
2874
2875 # create s3 topic
2876 endpoint_address = 'amqp://' + hostname
2877 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
2878 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
2879 topic_arn = topic_conf.set_config()
2880 # create s3 notification
2881 notification_name = bucket_name + NOTIFICATION_SUFFIX
2882 meta_key = 'meta1'
2883 meta_value = 'This is my metadata value'
2884 meta_prefix = 'x-amz-meta-'
2885 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
2886 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2887 'Filter': {
2888 'Metadata': {
2889 'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}]
2890 }
2891 }
2892 }]
2893
2894 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
2895 response, status = s3_notification_conf.set_config()
2896 assert_equal(status/100, 2)
2897
2898 # create objects in the bucket
2899 key_name = 'foo'
2900 key = bucket.new_key(key_name)
2901 key.set_metadata(meta_key, meta_value)
2902 key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
2903
2904 # create objects in the bucket using COPY
2905 bucket.copy_key('copy_of_foo', bucket.name, key.name)
2906 # create objects in the bucket using multi-part upload
2907 fp = tempfile.TemporaryFile(mode='w')
2908 fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
2909 fp.close()
2910 uploader = bucket.initiate_multipart_upload('multipart_foo',
2911 metadata={meta_key: meta_value})
2912 fp = tempfile.TemporaryFile(mode='r')
2913 uploader.upload_part_from_file(fp, 1)
2914 uploader.complete_upload()
2915 fp.close()
2916 print('wait for 5sec for the messages...')
2917 time.sleep(5)
2918 # check amqp receiver
2919 event_count = 0
2920 for event in receiver.get_and_reset_events():
2921 s3_event = event['Records'][0]['s3']
2922 assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
2923 assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
2924 event_count +=1
2925
2926 # only PUT and POST has the metadata value
2927 assert_equal(event_count, 2)
2928
2929 # delete objects
2930 for key in bucket.list():
2931 key.delete()
2932 print('wait for 5sec for the messages...')
2933 time.sleep(5)
2934 # check amqp receiver
2935 event_count = 0
2936 for event in receiver.get_and_reset_events():
2937 s3_event = event['Records'][0]['s3']
2938 assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
2939 assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
2940 event_count +=1
2941
2942 # all 3 object has metadata when deleted
2943 assert_equal(event_count, 3)
2944
2945 # cleanup
2946 stop_amqp_receiver(receiver, task)
2947 s3_notification_conf.del_config()
2948 topic_conf.del_config()
2949 # delete the bucket
2950 master_zone.delete_bucket(bucket_name)
2951 clean_rabbitmq(proc)
2952
2953
2954 def test_ps_s3_tags_on_master():
2955 """ test s3 notification of tags on master """
2956 if skip_push_tests:
2957 return SkipTest("PubSub push tests don't run in teuthology")
2958 hostname = get_ip()
2959 proc = init_rabbitmq()
2960 if proc is None:
2961 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2962 master_zone, _ = init_env(require_ps=False)
2963 realm = get_realm()
2964 zonegroup = realm.master_zonegroup()
2965
2966 # create bucket
2967 bucket_name = gen_bucket_name()
2968 bucket = master_zone.create_bucket(bucket_name)
2969 topic_name = bucket_name + TOPIC_SUFFIX
2970
2971 # start amqp receiver
2972 exchange = 'ex1'
2973 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2974 task.start()
2975
2976 # create s3 topic
2977 endpoint_address = 'amqp://' + hostname
2978 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
2979 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
2980 topic_arn = topic_conf.set_config()
2981 # create s3 notification
2982 notification_name = bucket_name + NOTIFICATION_SUFFIX
2983 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
2984 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2985 'Filter': {
2986 'Tags': {
2987 'FilterRules': [{'Name': 'hello', 'Value': 'world'}]
2988 }
2989 }
2990 }]
2991
2992 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
2993 response, status = s3_notification_conf.set_config()
2994 assert_equal(status/100, 2)
2995
2996 # create objects in the bucket with tags
2997 tags = 'hello=world&ka=boom'
2998 key_name1 = 'key1'
2999 put_object_tagging(master_zone.conn, bucket_name, key_name1, tags)
3000 tags = 'foo=bar&ka=boom'
3001 key_name2 = 'key2'
3002 put_object_tagging(master_zone.conn, bucket_name, key_name2, tags)
3003 key_name3 = 'key3'
3004 key = bucket.new_key(key_name3)
3005 key.set_contents_from_string('bar')
3006 # create objects in the bucket using COPY
3007 bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1)
3008 print('wait for 5sec for the messages...')
3009 time.sleep(5)
3010 expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
3011 # check amqp receiver
3012 for event in receiver.get_and_reset_events():
3013 obj_tags = event['Records'][0]['s3']['object']['tags']
3014 assert_equal(obj_tags[0], expected_tags[0])
3015
3016 # delete the objects
3017 for key in bucket.list():
3018 key.delete()
3019 print('wait for 5sec for the messages...')
3020 time.sleep(5)
3021 # check amqp receiver
3022 for event in receiver.get_and_reset_events():
3023 obj_tags = event['Records'][0]['s3']['object']['tags']
3024 assert_equal(obj_tags[0], expected_tags[0])
3025
3026 # cleanup
3027 stop_amqp_receiver(receiver, task)
3028 s3_notification_conf.del_config()
3029 topic_conf.del_config()
3030 # delete the bucket
3031 master_zone.delete_bucket(bucket_name)
3032 clean_rabbitmq(proc)
3033
3034
3035 def test_ps_s3_versioned_deletion_on_master():
3036 """ test s3 notification of deletion markers on master """
3037 if skip_push_tests:
3038 return SkipTest("PubSub push tests don't run in teuthology")
3039 hostname = get_ip()
3040 proc = init_rabbitmq()
3041 if proc is None:
3042 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3043 master_zone, _ = init_env(require_ps=False)
3044 realm = get_realm()
3045 zonegroup = realm.master_zonegroup()
3046
3047 # create bucket
3048 bucket_name = gen_bucket_name()
3049 bucket = master_zone.create_bucket(bucket_name)
3050 bucket.configure_versioning(True)
3051 topic_name = bucket_name + TOPIC_SUFFIX
3052
3053 # start amqp receiver
3054 exchange = 'ex1'
3055 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3056 task.start()
3057
3058 # create s3 topic
3059 endpoint_address = 'amqp://' + hostname
3060 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
3061 topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
3062 topic_arn = topic_conf.set_config()
3063 # create s3 notification
3064 notification_name = bucket_name + NOTIFICATION_SUFFIX
3065 # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
3066 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
3067 'Events': ['s3:ObjectRemoved:*']
3068 },
3069 {'Id': notification_name+'_2', 'TopicArn': topic_arn,
3070 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
3071 },
3072 {'Id': notification_name+'_3', 'TopicArn': topic_arn,
3073 'Events': ['s3:ObjectRemoved:Delete']
3074 }]
3075 s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
3076 response, status = s3_notification_conf.set_config()
3077 assert_equal(status/100, 2)
3078
3079 # create objects in the bucket
3080 key = bucket.new_key('foo')
3081 key.set_contents_from_string('bar')
3082 v1 = key.version_id
3083 key.set_contents_from_string('kaboom')
3084 v2 = key.version_id
3085 # create delete marker (non versioned deletion)
3086 delete_marker_key = bucket.delete_key(key.name)
3087
3088 time.sleep(1)
3089
3090 # versioned deletion
3091 bucket.delete_key(key.name, version_id=v2)
3092 bucket.delete_key(key.name, version_id=v1)
3093 delete_marker_key.delete()
3094
3095 print('wait for 5sec for the messages...')
3096 time.sleep(5)
3097
3098 # check amqp receiver
3099 events = receiver.get_and_reset_events()
3100 delete_events = 0
3101 delete_marker_create_events = 0
3102 for event_list in events:
3103 for event in event_list['Records']:
3104 if event['eventName'] == 's3:ObjectRemoved:Delete':
3105 delete_events += 1
3106 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
3107 if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
3108 delete_marker_create_events += 1
3109 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
3110
3111 # 3 key versions were deleted (v1, v2 and the deletion marker)
3112 # notified over the same topic via 2 notifications (1,3)
3113 assert_equal(delete_events, 3*2)
3114 # 1 deletion marker was created
3115 # notified over the same topic over 2 notifications (1,2)
3116 assert_equal(delete_marker_create_events, 1*2)
3117
3118 # cleanup
3119 stop_amqp_receiver(receiver, task)
3120 s3_notification_conf.del_config()
3121 topic_conf.del_config()
3122 # delete the bucket
3123 master_zone.delete_bucket(bucket_name)
3124 clean_rabbitmq(proc)
3125
3126
3127 def test_ps_push_http():
3128 """ test pushing to http endpoint """
3129 if skip_push_tests:
3130 return SkipTest("PubSub push tests don't run in teuthology")
3131 master_zone, ps_zone = init_env()
3132 bucket_name = gen_bucket_name()
3133 topic_name = bucket_name+TOPIC_SUFFIX
3134
3135 # create random port for the http server
3136 host = get_ip()
3137 port = random.randint(10000, 20000)
3138 # start an http server in a separate thread
3139 http_server = StreamingHTTPServer(host, port)
3140
3141 # create topic
3142 topic_conf = PSTopic(ps_zone.conn, topic_name)
3143 _, status = topic_conf.set_config()
3144 assert_equal(status/100, 2)
3145 # create bucket on the first of the rados zones
3146 bucket = master_zone.create_bucket(bucket_name)
3147 # wait for sync
3148 zone_meta_checkpoint(ps_zone.zone)
3149 # create notifications
3150 notification_conf = PSNotification(ps_zone.conn, bucket_name,
3151 topic_name)
3152 _, status = notification_conf.set_config()
3153 assert_equal(status/100, 2)
3154 # create subscription
3155 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
3156 topic_name, endpoint='http://'+host+':'+str(port))
3157 _, status = sub_conf.set_config()
3158 assert_equal(status/100, 2)
3159 # create objects in the bucket
3160 number_of_objects = 10
3161 for i in range(number_of_objects):
3162 key = bucket.new_key(str(i))
3163 key.set_contents_from_string('bar')
3164 # wait for sync
3165 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3166 # check http server
3167 keys = list(bucket.list())
3168 # TODO: use exact match
3169 http_server.verify_events(keys, exact_match=False)
3170
3171 # delete objects from the bucket
3172 for key in bucket.list():
3173 key.delete()
3174 # wait for sync
3175 zone_meta_checkpoint(ps_zone.zone)
3176 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3177 # check http server
3178 # TODO: use exact match
3179 http_server.verify_events(keys, deletions=True, exact_match=False)
3180
3181 # cleanup
3182 sub_conf.del_config()
3183 notification_conf.del_config()
3184 topic_conf.del_config()
3185 master_zone.delete_bucket(bucket_name)
3186 http_server.close()
3187
3188
3189 def test_ps_s3_push_http():
3190 """ test pushing to http endpoint s3 record format"""
3191 if skip_push_tests:
3192 return SkipTest("PubSub push tests don't run in teuthology")
3193 master_zone, ps_zone = init_env()
3194 bucket_name = gen_bucket_name()
3195 topic_name = bucket_name+TOPIC_SUFFIX
3196
3197 # create random port for the http server
3198 host = get_ip()
3199 port = random.randint(10000, 20000)
3200 # start an http server in a separate thread
3201 http_server = StreamingHTTPServer(host, port)
3202
3203 # create topic
3204 topic_conf = PSTopic(ps_zone.conn, topic_name,
3205 endpoint='http://'+host+':'+str(port))
3206 result, status = topic_conf.set_config()
3207 assert_equal(status/100, 2)
3208 parsed_result = json.loads(result)
3209 topic_arn = parsed_result['arn']
3210 # create bucket on the first of the rados zones
3211 bucket = master_zone.create_bucket(bucket_name)
3212 # wait for sync
3213 zone_meta_checkpoint(ps_zone.zone)
3214 # create s3 notification
3215 notification_name = bucket_name + NOTIFICATION_SUFFIX
3216 topic_conf_list = [{'Id': notification_name,
3217 'TopicArn': topic_arn,
3218 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
3219 }]
3220 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3221 _, status = s3_notification_conf.set_config()
3222 assert_equal(status/100, 2)
3223 # create objects in the bucket
3224 number_of_objects = 10
3225 for i in range(number_of_objects):
3226 key = bucket.new_key(str(i))
3227 key.set_contents_from_string('bar')
3228 # wait for sync
3229 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3230 # check http server
3231 keys = list(bucket.list())
3232 # TODO: use exact match
3233 http_server.verify_s3_events(keys, exact_match=False)
3234
3235 # delete objects from the bucket
3236 for key in bucket.list():
3237 key.delete()
3238 # wait for sync
3239 zone_meta_checkpoint(ps_zone.zone)
3240 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3241 # check http server
3242 # TODO: use exact match
3243 http_server.verify_s3_events(keys, deletions=True, exact_match=False)
3244
3245 # cleanup
3246 s3_notification_conf.del_config()
3247 topic_conf.del_config()
3248 master_zone.delete_bucket(bucket_name)
3249 http_server.close()
3250
3251
3252 def test_ps_push_amqp():
3253 """ test pushing to amqp endpoint """
3254 if skip_push_tests:
3255 return SkipTest("PubSub push tests don't run in teuthology")
3256 hostname = get_ip()
3257 proc = init_rabbitmq()
3258 if proc is None:
3259 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3260 master_zone, ps_zone = init_env()
3261 bucket_name = gen_bucket_name()
3262 topic_name = bucket_name+TOPIC_SUFFIX
3263
3264 # create topic
3265 exchange = 'ex1'
3266 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3267 task.start()
3268 topic_conf = PSTopic(ps_zone.conn, topic_name)
3269 _, status = topic_conf.set_config()
3270 assert_equal(status/100, 2)
3271 # create bucket on the first of the rados zones
3272 bucket = master_zone.create_bucket(bucket_name)
3273 # wait for sync
3274 zone_meta_checkpoint(ps_zone.zone)
3275 # create notifications
3276 notification_conf = PSNotification(ps_zone.conn, bucket_name,
3277 topic_name)
3278 _, status = notification_conf.set_config()
3279 assert_equal(status/100, 2)
3280 # create subscription
3281 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
3282 topic_name, endpoint='amqp://'+hostname,
3283 endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker')
3284 _, status = sub_conf.set_config()
3285 assert_equal(status/100, 2)
3286 # create objects in the bucket
3287 number_of_objects = 10
3288 for i in range(number_of_objects):
3289 key = bucket.new_key(str(i))
3290 key.set_contents_from_string('bar')
3291 # wait for sync
3292 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3293 # check amqp receiver
3294 keys = list(bucket.list())
3295 # TODO: use exact match
3296 receiver.verify_events(keys, exact_match=False)
3297
3298 # delete objects from the bucket
3299 for key in bucket.list():
3300 key.delete()
3301 # wait for sync
3302 zone_meta_checkpoint(ps_zone.zone)
3303 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3304 # check amqp receiver
3305 # TODO: use exact match
3306 receiver.verify_events(keys, deletions=True, exact_match=False)
3307
3308 # cleanup
3309 stop_amqp_receiver(receiver, task)
3310 sub_conf.del_config()
3311 notification_conf.del_config()
3312 topic_conf.del_config()
3313 master_zone.delete_bucket(bucket_name)
3314 clean_rabbitmq(proc)
3315
3316
3317 def test_ps_s3_push_amqp():
3318 """ test pushing to amqp endpoint s3 record format"""
3319 if skip_push_tests:
3320 return SkipTest("PubSub push tests don't run in teuthology")
3321 hostname = get_ip()
3322 proc = init_rabbitmq()
3323 if proc is None:
3324 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3325 master_zone, ps_zone = init_env()
3326 bucket_name = gen_bucket_name()
3327 topic_name = bucket_name+TOPIC_SUFFIX
3328
3329 # create topic
3330 exchange = 'ex1'
3331 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3332 task.start()
3333 topic_conf = PSTopic(ps_zone.conn, topic_name,
3334 endpoint='amqp://' + hostname,
3335 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3336 result, status = topic_conf.set_config()
3337 assert_equal(status/100, 2)
3338 parsed_result = json.loads(result)
3339 topic_arn = parsed_result['arn']
3340 # create bucket on the first of the rados zones
3341 bucket = master_zone.create_bucket(bucket_name)
3342 # wait for sync
3343 zone_meta_checkpoint(ps_zone.zone)
3344 # create s3 notification
3345 notification_name = bucket_name + NOTIFICATION_SUFFIX
3346 topic_conf_list = [{'Id': notification_name,
3347 'TopicArn': topic_arn,
3348 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
3349 }]
3350 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3351 _, status = s3_notification_conf.set_config()
3352 assert_equal(status/100, 2)
3353 # create objects in the bucket
3354 number_of_objects = 10
3355 for i in range(number_of_objects):
3356 key = bucket.new_key(str(i))
3357 key.set_contents_from_string('bar')
3358 # wait for sync
3359 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3360 # check amqp receiver
3361 keys = list(bucket.list())
3362 # TODO: use exact match
3363 receiver.verify_s3_events(keys, exact_match=False)
3364
3365 # delete objects from the bucket
3366 for key in bucket.list():
3367 key.delete()
3368 # wait for sync
3369 zone_meta_checkpoint(ps_zone.zone)
3370 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3371 # check amqp receiver
3372 # TODO: use exact match
3373 receiver.verify_s3_events(keys, deletions=True, exact_match=False)
3374
3375 # cleanup
3376 stop_amqp_receiver(receiver, task)
3377 s3_notification_conf.del_config()
3378 topic_conf.del_config()
3379 master_zone.delete_bucket(bucket_name)
3380 clean_rabbitmq(proc)
3381
3382
3383 def test_ps_delete_bucket():
3384 """ test notification status upon bucket deletion """
3385 master_zone, ps_zone = init_env()
3386 bucket_name = gen_bucket_name()
3387 # create bucket on the first of the rados zones
3388 bucket = master_zone.create_bucket(bucket_name)
3389 # wait for sync
3390 zone_meta_checkpoint(ps_zone.zone)
3391 topic_name = bucket_name + TOPIC_SUFFIX
3392 # create topic
3393 topic_name = bucket_name + TOPIC_SUFFIX
3394 topic_conf = PSTopic(ps_zone.conn, topic_name)
3395 response, status = topic_conf.set_config()
3396 assert_equal(status/100, 2)
3397 parsed_result = json.loads(response)
3398 topic_arn = parsed_result['arn']
3399 # create one s3 notification
3400 notification_name = bucket_name + NOTIFICATION_SUFFIX
3401 topic_conf_list = [{'Id': notification_name,
3402 'TopicArn': topic_arn,
3403 'Events': ['s3:ObjectCreated:*']
3404 }]
3405 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3406 response, status = s3_notification_conf.set_config()
3407 assert_equal(status/100, 2)
3408
3409 # create non-s3 notification
3410 notification_conf = PSNotification(ps_zone.conn, bucket_name,
3411 topic_name)
3412 _, status = notification_conf.set_config()
3413 assert_equal(status/100, 2)
3414
3415 # create objects in the bucket
3416 number_of_objects = 10
3417 for i in range(number_of_objects):
3418 key = bucket.new_key(str(i))
3419 key.set_contents_from_string('bar')
3420 # wait for bucket sync
3421 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3422 keys = list(bucket.list())
3423 # delete objects from the bucket
3424 for key in bucket.list():
3425 key.delete()
3426 # wait for bucket sync
3427 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3428 # delete the bucket
3429 master_zone.delete_bucket(bucket_name)
3430 # wait for meta sync
3431 zone_meta_checkpoint(ps_zone.zone)
3432
3433 # get the events from the auto-generated subscription
3434 sub_conf = PSSubscription(ps_zone.conn, notification_name,
3435 topic_name)
3436 result, _ = sub_conf.get_events()
3437 records = json.loads(result)
3438 # TODO: use exact match
3439 verify_s3_records_by_elements(records, keys, exact_match=False)
3440
3441 # s3 notification is deleted with bucket
3442 _, status = s3_notification_conf.get_config(notification=notification_name)
3443 assert_equal(status, 404)
3444 # non-s3 notification is deleted with bucket
3445 _, status = notification_conf.get_config()
3446 assert_equal(status, 404)
3447 # cleanup
3448 sub_conf.del_config()
3449 topic_conf.del_config()
3450
3451
3452 def test_ps_missing_topic():
3453 """ test creating a subscription when no topic info exists"""
3454 master_zone, ps_zone = init_env()
3455 bucket_name = gen_bucket_name()
3456 topic_name = bucket_name+TOPIC_SUFFIX
3457
3458 # create bucket on the first of the rados zones
3459 master_zone.create_bucket(bucket_name)
3460 # wait for sync
3461 zone_meta_checkpoint(ps_zone.zone)
3462 # create s3 notification
3463 notification_name = bucket_name + NOTIFICATION_SUFFIX
3464 topic_arn = 'arn:aws:sns:::' + topic_name
3465 topic_conf_list = [{'Id': notification_name,
3466 'TopicArn': topic_arn,
3467 'Events': ['s3:ObjectCreated:*']
3468 }]
3469 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3470 try:
3471 s3_notification_conf.set_config()
3472 except:
3473 log.info('missing topic is expected')
3474 else:
3475 assert 'missing topic is expected'
3476
3477 # cleanup
3478 master_zone.delete_bucket(bucket_name)
3479
3480
3481 def test_ps_s3_topic_update():
3482 """ test updating topic associated with a notification"""
3483 if skip_push_tests:
3484 return SkipTest("PubSub push tests don't run in teuthology")
3485 rabbit_proc = init_rabbitmq()
3486 if rabbit_proc is None:
3487 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3488 master_zone, ps_zone = init_env()
3489 bucket_name = gen_bucket_name()
3490 topic_name = bucket_name+TOPIC_SUFFIX
3491
3492 # create amqp topic
3493 hostname = get_ip()
3494 exchange = 'ex1'
3495 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3496 amqp_task.start()
3497 topic_conf = PSTopic(ps_zone.conn, topic_name,
3498 endpoint='amqp://' + hostname,
3499 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3500 result, status = topic_conf.set_config()
3501 assert_equal(status/100, 2)
3502 parsed_result = json.loads(result)
3503 topic_arn = parsed_result['arn']
3504 # get topic
3505 result, _ = topic_conf.get_config()
3506 # verify topic content
3507 parsed_result = json.loads(result)
3508 assert_equal(parsed_result['topic']['name'], topic_name)
3509 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
3510
3511 # create http server
3512 port = random.randint(10000, 20000)
3513 # start an http server in a separate thread
3514 http_server = StreamingHTTPServer(hostname, port)
3515
3516 # create bucket on the first of the rados zones
3517 bucket = master_zone.create_bucket(bucket_name)
3518 # wait for sync
3519 zone_meta_checkpoint(ps_zone.zone)
3520 # create s3 notification
3521 notification_name = bucket_name + NOTIFICATION_SUFFIX
3522 topic_conf_list = [{'Id': notification_name,
3523 'TopicArn': topic_arn,
3524 'Events': ['s3:ObjectCreated:*']
3525 }]
3526 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3527 _, status = s3_notification_conf.set_config()
3528 assert_equal(status/100, 2)
3529 # create objects in the bucket
3530 number_of_objects = 10
3531 for i in range(number_of_objects):
3532 key = bucket.new_key(str(i))
3533 key.set_contents_from_string('bar')
3534 # wait for sync
3535 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3536
3537 keys = list(bucket.list())
3538 # TODO: use exact match
3539 receiver.verify_s3_events(keys, exact_match=False)
3540
3541 # update the same topic with new endpoint
3542 topic_conf = PSTopic(ps_zone.conn, topic_name,
3543 endpoint='http://'+ hostname + ':' + str(port))
3544 _, status = topic_conf.set_config()
3545 assert_equal(status/100, 2)
3546 # get topic
3547 result, _ = topic_conf.get_config()
3548 # verify topic content
3549 parsed_result = json.loads(result)
3550 assert_equal(parsed_result['topic']['name'], topic_name)
3551 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
3552
3553 # delete current objects and create new objects in the bucket
3554 for key in bucket.list():
3555 key.delete()
3556 for i in range(number_of_objects):
3557 key = bucket.new_key(str(i+100))
3558 key.set_contents_from_string('bar')
3559 # wait for sync
3560 zone_meta_checkpoint(ps_zone.zone)
3561 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3562
3563 keys = list(bucket.list())
3564 # verify that notifications are still sent to amqp
3565 # TODO: use exact match
3566 receiver.verify_s3_events(keys, exact_match=False)
3567
3568 # update notification to update the endpoint from the topic
3569 topic_conf_list = [{'Id': notification_name,
3570 'TopicArn': topic_arn,
3571 'Events': ['s3:ObjectCreated:*']
3572 }]
3573 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3574 _, status = s3_notification_conf.set_config()
3575 assert_equal(status/100, 2)
3576
3577 # delete current objects and create new objects in the bucket
3578 for key in bucket.list():
3579 key.delete()
3580 for i in range(number_of_objects):
3581 key = bucket.new_key(str(i+200))
3582 key.set_contents_from_string('bar')
3583 # wait for sync
3584 zone_meta_checkpoint(ps_zone.zone)
3585 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3586
3587 keys = list(bucket.list())
3588 # check that updates switched to http
3589 # TODO: use exact match
3590 http_server.verify_s3_events(keys, exact_match=False)
3591
3592 # cleanup
3593 # delete objects from the bucket
3594 stop_amqp_receiver(receiver, amqp_task)
3595 for key in bucket.list():
3596 key.delete()
3597 s3_notification_conf.del_config()
3598 topic_conf.del_config()
3599 master_zone.delete_bucket(bucket_name)
3600 http_server.close()
3601 clean_rabbitmq(rabbit_proc)
3602
3603
3604 def test_ps_s3_notification_update():
3605 """ test updating the topic of a notification"""
3606 if skip_push_tests:
3607 return SkipTest("PubSub push tests don't run in teuthology")
3608 hostname = get_ip()
3609 rabbit_proc = init_rabbitmq()
3610 if rabbit_proc is None:
3611 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3612
3613 master_zone, ps_zone = init_env()
3614 bucket_name = gen_bucket_name()
3615 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
3616 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
3617
3618 # create topics
3619 # start amqp receiver in a separate thread
3620 exchange = 'ex1'
3621 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
3622 amqp_task.start()
3623 # create random port for the http server
3624 http_port = random.randint(10000, 20000)
3625 # start an http server in a separate thread
3626 http_server = StreamingHTTPServer(hostname, http_port)
3627
3628 topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
3629 endpoint='amqp://' + hostname,
3630 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3631 result, status = topic_conf1.set_config()
3632 parsed_result = json.loads(result)
3633 topic_arn1 = parsed_result['arn']
3634 assert_equal(status/100, 2)
3635 topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
3636 endpoint='http://'+hostname+':'+str(http_port))
3637 result, status = topic_conf2.set_config()
3638 parsed_result = json.loads(result)
3639 topic_arn2 = parsed_result['arn']
3640 assert_equal(status/100, 2)
3641
3642 # create bucket on the first of the rados zones
3643 bucket = master_zone.create_bucket(bucket_name)
3644 # wait for sync
3645 zone_meta_checkpoint(ps_zone.zone)
3646 # create s3 notification with topic1
3647 notification_name = bucket_name + NOTIFICATION_SUFFIX
3648 topic_conf_list = [{'Id': notification_name,
3649 'TopicArn': topic_arn1,
3650 'Events': ['s3:ObjectCreated:*']
3651 }]
3652 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3653 _, status = s3_notification_conf.set_config()
3654 assert_equal(status/100, 2)
3655 # create objects in the bucket
3656 number_of_objects = 10
3657 for i in range(number_of_objects):
3658 key = bucket.new_key(str(i))
3659 key.set_contents_from_string('bar')
3660 # wait for sync
3661 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3662
3663 keys = list(bucket.list())
3664 # TODO: use exact match
3665 receiver.verify_s3_events(keys, exact_match=False);
3666
3667 # update notification to use topic2
3668 topic_conf_list = [{'Id': notification_name,
3669 'TopicArn': topic_arn2,
3670 'Events': ['s3:ObjectCreated:*']
3671 }]
3672 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3673 _, status = s3_notification_conf.set_config()
3674 assert_equal(status/100, 2)
3675
3676 # delete current objects and create new objects in the bucket
3677 for key in bucket.list():
3678 key.delete()
3679 for i in range(number_of_objects):
3680 key = bucket.new_key(str(i+100))
3681 key.set_contents_from_string('bar')
3682 # wait for sync
3683 zone_meta_checkpoint(ps_zone.zone)
3684 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3685
3686 keys = list(bucket.list())
3687 # check that updates switched to http
3688 # TODO: use exact match
3689 http_server.verify_s3_events(keys, exact_match=False)
3690
3691 # cleanup
3692 # delete objects from the bucket
3693 stop_amqp_receiver(receiver, amqp_task)
3694 for key in bucket.list():
3695 key.delete()
3696 s3_notification_conf.del_config()
3697 topic_conf1.del_config()
3698 topic_conf2.del_config()
3699 master_zone.delete_bucket(bucket_name)
3700 http_server.close()
3701 clean_rabbitmq(rabbit_proc)
3702
3703
3704 def test_ps_s3_multiple_topics_notification():
3705 """ test notification creation with multiple topics"""
3706 if skip_push_tests:
3707 return SkipTest("PubSub push tests don't run in teuthology")
3708 hostname = get_ip()
3709 rabbit_proc = init_rabbitmq()
3710 if rabbit_proc is None:
3711 return SkipTest('end2end amqp tests require rabbitmq-server installed')
3712
3713 master_zone, ps_zone = init_env()
3714 bucket_name = gen_bucket_name()
3715 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
3716 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
3717
3718 # create topics
3719 # start amqp receiver in a separate thread
3720 exchange = 'ex1'
3721 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
3722 amqp_task.start()
3723 # create random port for the http server
3724 http_port = random.randint(10000, 20000)
3725 # start an http server in a separate thread
3726 http_server = StreamingHTTPServer(hostname, http_port)
3727
3728 topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
3729 endpoint='amqp://' + hostname,
3730 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3731 result, status = topic_conf1.set_config()
3732 parsed_result = json.loads(result)
3733 topic_arn1 = parsed_result['arn']
3734 assert_equal(status/100, 2)
3735 topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
3736 endpoint='http://'+hostname+':'+str(http_port))
3737 result, status = topic_conf2.set_config()
3738 parsed_result = json.loads(result)
3739 topic_arn2 = parsed_result['arn']
3740 assert_equal(status/100, 2)
3741
3742 # create bucket on the first of the rados zones
3743 bucket = master_zone.create_bucket(bucket_name)
3744 # wait for sync
3745 zone_meta_checkpoint(ps_zone.zone)
3746 # create s3 notification
3747 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
3748 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
3749 topic_conf_list = [
3750 {
3751 'Id': notification_name1,
3752 'TopicArn': topic_arn1,
3753 'Events': ['s3:ObjectCreated:*']
3754 },
3755 {
3756 'Id': notification_name2,
3757 'TopicArn': topic_arn2,
3758 'Events': ['s3:ObjectCreated:*']
3759 }]
3760 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3761 _, status = s3_notification_conf.set_config()
3762 assert_equal(status/100, 2)
3763 result, _ = s3_notification_conf.get_config()
3764 assert_equal(len(result['TopicConfigurations']), 2)
3765 assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1)
3766 assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
3767
3768 # get auto-generated subscriptions
3769 sub_conf1 = PSSubscription(ps_zone.conn, notification_name1,
3770 topic_name1)
3771 _, status = sub_conf1.get_config()
3772 assert_equal(status/100, 2)
3773 sub_conf2 = PSSubscription(ps_zone.conn, notification_name2,
3774 topic_name2)
3775 _, status = sub_conf2.get_config()
3776 assert_equal(status/100, 2)
3777
3778 # create objects in the bucket
3779 number_of_objects = 10
3780 for i in range(number_of_objects):
3781 key = bucket.new_key(str(i))
3782 key.set_contents_from_string('bar')
3783 # wait for sync
3784 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3785
3786 # get the events from both of the subscription
3787 result, _ = sub_conf1.get_events()
3788 records = json.loads(result)
3789 for record in records['Records']:
3790 log.debug(record)
3791 keys = list(bucket.list())
3792 # TODO: use exact match
3793 verify_s3_records_by_elements(records, keys, exact_match=False)
3794 receiver.verify_s3_events(keys, exact_match=False)
3795
3796 result, _ = sub_conf2.get_events()
3797 parsed_result = json.loads(result)
3798 for record in parsed_result['Records']:
3799 log.debug(record)
3800 keys = list(bucket.list())
3801 # TODO: use exact match
3802 verify_s3_records_by_elements(records, keys, exact_match=False)
3803 http_server.verify_s3_events(keys, exact_match=False)
3804
3805 # cleanup
3806 stop_amqp_receiver(receiver, amqp_task)
3807 s3_notification_conf.del_config()
3808 topic_conf1.del_config()
3809 topic_conf2.del_config()
3810 # delete objects from the bucket
3811 for key in bucket.list():
3812 key.delete()
3813 master_zone.delete_bucket(bucket_name)
3814 http_server.close()
3815 clean_rabbitmq(rabbit_proc)