]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests_ps.py
f2f27ff8f8036e0872336744c3fad44d02d294af
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests_ps.py
1 import logging
2 import json
3 import tempfile
4 import random
5 import threading
6 import subprocess
7 import socket
8 import time
9 import os
10 from http import server as http_server
11 from random import randint
12 from .tests import get_realm, \
13 ZonegroupConns, \
14 zonegroup_meta_checkpoint, \
15 zone_meta_checkpoint, \
16 zone_bucket_checkpoint, \
17 zone_data_checkpoint, \
18 zonegroup_bucket_checkpoint, \
19 check_bucket_eq, \
20 gen_bucket_name, \
21 get_user, \
22 get_tenant
23 from .zone_ps import PSTopic, \
24 PSTopicS3, \
25 PSNotification, \
26 PSSubscription, \
27 PSNotificationS3, \
28 print_connection_info, \
29 get_object_tagging
30 from .multisite import User
31 from nose import SkipTest
32 from nose.tools import assert_not_equal, assert_equal
33 import boto.s3.tagging
34
35 # configure logging for the tests module
36 log = logging.getLogger(__name__)
37
38 skip_push_tests = True
39
40 ####################################
41 # utility functions for pubsub tests
42 ####################################
43
44 def set_contents_from_string(key, content):
45 try:
46 key.set_contents_from_string(content)
47 except Exception as e:
48 print('Error: ' + str(e))
49
50
51 # HTTP endpoint functions
52 # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
53
54 class HTTPPostHandler(http_server.BaseHTTPRequestHandler):
55 """HTTP POST hanler class storing the received events in its http server"""
56 def do_POST(self):
57 """implementation of POST handler"""
58 try:
59 content_length = int(self.headers['Content-Length'])
60 body = self.rfile.read(content_length)
61 log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
62 self.server.append(json.loads(body))
63 except:
64 log.error('HTTP Server received empty event')
65 self.send_response(400)
66 else:
67 if self.headers.get('Expect') == '100-continue':
68 self.send_response(100)
69 else:
70 self.send_response(200)
71 finally:
72 if self.server.delay > 0:
73 time.sleep(self.server.delay)
74 self.end_headers()
75
76
77 class HTTPServerWithEvents(http_server.HTTPServer):
78 """HTTP server used by the handler to store events"""
79 def __init__(self, addr, handler, worker_id, delay=0):
80 http_server.HTTPServer.__init__(self, addr, handler, False)
81 self.worker_id = worker_id
82 self.events = []
83 self.delay = delay
84
85 def append(self, event):
86 self.events.append(event)
87
88
89 class HTTPServerThread(threading.Thread):
90 """thread for running the HTTP server. reusing the same socket for all threads"""
91 def __init__(self, i, sock, addr, delay=0):
92 threading.Thread.__init__(self)
93 self.i = i
94 self.daemon = True
95 self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay)
96 self.httpd.socket = sock
97 # prevent the HTTP server from re-binding every handler
98 self.httpd.server_bind = self.server_close = lambda self: None
99 self.start()
100
101 def run(self):
102 try:
103 log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
104 self.httpd.serve_forever()
105 log.info('HTTP Server (%d) ended', self.i)
106 except Exception as error:
107 # could happen if the server r/w to a closing socket during shutdown
108 log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
109
110 def close(self):
111 self.httpd.shutdown()
112
113 def get_events(self):
114 return self.httpd.events
115
116 def reset_events(self):
117 self.httpd.events = []
118
119
120 class StreamingHTTPServer:
121 """multi-threaded http server class also holding list of events received into the handler
122 each thread has its own server, and all servers share the same socket"""
123 def __init__(self, host, port, num_workers=100, delay=0):
124 addr = (host, port)
125 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
126 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
127 self.sock.bind(addr)
128 self.sock.listen(num_workers)
129 self.workers = [HTTPServerThread(i, self.sock, addr, delay) for i in range(num_workers)]
130
131 def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
132 """verify stored s3 records agains a list of keys"""
133 events = []
134 for worker in self.workers:
135 events += worker.get_events()
136 worker.reset_events()
137 verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
138
139 def verify_events(self, keys, exact_match=False, deletions=False):
140 """verify stored events agains a list of keys"""
141 events = []
142 for worker in self.workers:
143 events += worker.get_events()
144 worker.reset_events()
145 verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
146
147 def get_and_reset_events(self):
148 events = []
149 for worker in self.workers:
150 events += worker.get_events()
151 worker.reset_events()
152 return events
153
154 def close(self):
155 """close all workers in the http server and wait for it to finish"""
156 # make sure that the shared socket is closed
157 # this is needed in case that one of the threads is blocked on the socket
158 self.sock.shutdown(socket.SHUT_RDWR)
159 self.sock.close()
160 # wait for server threads to finish
161 for worker in self.workers:
162 worker.close()
163 worker.join()
164
165
166 # AMQP endpoint functions
167
168
169 class AMQPReceiver(object):
170 """class for receiving and storing messages on a topic from the AMQP broker"""
171 def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None):
172 import pika
173 import ssl
174
175 if ca_location:
176 ssl_context = ssl.create_default_context()
177 ssl_context.load_verify_locations(cafile=ca_location)
178 ssl_options = pika.SSLOptions(ssl_context)
179 rabbitmq_port = 5671
180 else:
181 rabbitmq_port = 5672
182 ssl_options = None
183
184 if external_endpoint_address:
185 params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
186 else:
187 hostname = get_ip()
188 params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options)
189 remaining_retries = 10
190 while remaining_retries > 0:
191 try:
192 connection = pika.BlockingConnection(params)
193 break
194 except Exception as error:
195 remaining_retries -= 1
196 print('failed to connect to rabbitmq (remaining retries '
197 + str(remaining_retries) + '): ' + str(error))
198 time.sleep(1)
199
200 if remaining_retries == 0:
201 raise Exception('failed to connect to rabbitmq - no retries left')
202
203 self.channel = connection.channel()
204 self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
205 result = self.channel.queue_declare('', exclusive=True)
206 queue_name = result.method.queue
207 self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic)
208 self.channel.basic_consume(queue=queue_name,
209 on_message_callback=self.on_message,
210 auto_ack=True)
211 self.events = []
212 self.topic = topic
213
214 def on_message(self, ch, method, properties, body):
215 """callback invoked when a new message arrive on the topic"""
216 log.info('AMQP received event for topic %s:\n %s', self.topic, body)
217 self.events.append(json.loads(body))
218
219 # TODO create a base class for the AMQP and HTTP cases
220 def verify_s3_events(self, keys, exact_match=False, deletions=False):
221 """verify stored s3 records agains a list of keys"""
222 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
223 self.events = []
224
225 def verify_events(self, keys, exact_match=False, deletions=False):
226 """verify stored events agains a list of keys"""
227 verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
228 self.events = []
229
230 def get_and_reset_events(self):
231 tmp = self.events
232 self.events = []
233 return tmp
234
235
236 def amqp_receiver_thread_runner(receiver):
237 """main thread function for the amqp receiver"""
238 try:
239 log.info('AMQP receiver started')
240 receiver.channel.start_consuming()
241 log.info('AMQP receiver ended')
242 except Exception as error:
243 log.info('AMQP receiver ended unexpectedly: %s', str(error))
244
245
246 def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None):
247 """create amqp receiver and thread"""
248 receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location)
249 task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
250 task.daemon = True
251 return task, receiver
252
253
254 def stop_amqp_receiver(receiver, task):
255 """stop the receiver thread and wait for it to finis"""
256 try:
257 receiver.channel.stop_consuming()
258 log.info('stopping AMQP receiver')
259 except Exception as error:
260 log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
261 task.join(5)
262
263 def check_ps_configured():
264 """check if at least one pubsub zone exist"""
265 realm = get_realm()
266 zonegroup = realm.master_zonegroup()
267
268 ps_zones = zonegroup.zones_by_type.get("pubsub")
269 if not ps_zones:
270 raise SkipTest("Requires at least one PS zone")
271
272
273 def is_ps_zone(zone_conn):
274 """check if a specific zone is pubsub zone"""
275 if not zone_conn:
276 return False
277 return zone_conn.zone.tier_type() == "pubsub"
278
279
280 def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
281 """ verify there is at least one event per element """
282 err = ''
283 for key in keys:
284 key_found = False
285 if type(events) is list:
286 for event_list in events:
287 if key_found:
288 break
289 for event in event_list['events']:
290 if event['info']['bucket']['name'] == key.bucket.name and \
291 event['info']['key']['name'] == key.name:
292 if deletions and event['event'] == 'OBJECT_DELETE':
293 key_found = True
294 break
295 elif not deletions and event['event'] == 'OBJECT_CREATE':
296 key_found = True
297 break
298 else:
299 for event in events['events']:
300 if event['info']['bucket']['name'] == key.bucket.name and \
301 event['info']['key']['name'] == key.name:
302 if deletions and event['event'] == 'OBJECT_DELETE':
303 key_found = True
304 break
305 elif not deletions and event['event'] == 'OBJECT_CREATE':
306 key_found = True
307 break
308
309 if not key_found:
310 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
311 log.error(events)
312 assert False, err
313
314 if not len(events) == len(keys):
315 err = 'superfluous events are found'
316 log.debug(err)
317 if exact_match:
318 log.error(events)
319 assert False, err
320
321
322 def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}):
323 """ verify there is at least one record per element """
324 err = ''
325 for key in keys:
326 key_found = False
327 object_size = 0
328 if type(records) is list:
329 for record_list in records:
330 if key_found:
331 break
332 for record in record_list['Records']:
333 if record['s3']['bucket']['name'] == key.bucket.name and \
334 record['s3']['object']['key'] == key.name:
335 if deletions and record['eventName'].startswith('ObjectRemoved'):
336 key_found = True
337 object_size = record['s3']['object']['size']
338 break
339 elif not deletions and record['eventName'].startswith('ObjectCreated'):
340 key_found = True
341 object_size = record['s3']['object']['size']
342 break
343 else:
344 for record in records['Records']:
345 if record['s3']['bucket']['name'] == key.bucket.name and \
346 record['s3']['object']['key'] == key.name:
347 if deletions and record['eventName'].startswith('ObjectRemoved'):
348 key_found = True
349 object_size = record['s3']['object']['size']
350 break
351 elif not deletions and record['eventName'].startswith('ObjectCreated'):
352 key_found = True
353 object_size = record['s3']['object']['size']
354 break
355
356 if not key_found:
357 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
358 assert False, err
359 elif expected_sizes:
360 assert_equal(object_size, expected_sizes.get(key.name))
361
362 if not len(records) == len(keys):
363 err = 'superfluous records are found'
364 log.warning(err)
365 if exact_match:
366 for record_list in records:
367 for record in record_list['Records']:
368 log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
369 assert False, err
370
371
372 def init_rabbitmq():
373 """ start a rabbitmq broker """
374 hostname = get_ip()
375 #port = str(random.randint(20000, 30000))
376 #data_dir = './' + port + '_data'
377 #log_dir = './' + port + '_log'
378 #print('')
379 #try:
380 # os.mkdir(data_dir)
381 # os.mkdir(log_dir)
382 #except:
383 # print('rabbitmq directories already exists')
384 #env = {'RABBITMQ_NODE_PORT': port,
385 # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname,
386 # 'RABBITMQ_USE_LONGNAME': 'true',
387 # 'RABBITMQ_MNESIA_BASE': data_dir,
388 # 'RABBITMQ_LOG_BASE': log_dir}
389 # TODO: support multiple brokers per host using env
390 # make sure we don't collide with the default
391 try:
392 proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server'])
393 except Exception as error:
394 log.info('failed to execute rabbitmq-server: %s', str(error))
395 print('failed to execute rabbitmq-server: %s' % str(error))
396 return None
397 # TODO add rabbitmq checkpoint instead of sleep
398 time.sleep(5)
399 return proc #, port, data_dir, log_dir
400
401
402 def clean_rabbitmq(proc): #, data_dir, log_dir)
403 """ stop the rabbitmq broker """
404 try:
405 subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
406 time.sleep(5)
407 proc.terminate()
408 except:
409 log.info('rabbitmq server already terminated')
410 # TODO: add directory cleanup once multiple brokers are supported
411 #try:
412 # os.rmdir(data_dir)
413 # os.rmdir(log_dir)
414 #except:
415 # log.info('rabbitmq directories already removed')
416
417
418 # Kafka endpoint functions
419
420 kafka_server = 'localhost'
421
422 class KafkaReceiver(object):
423 """class for receiving and storing messages on a topic from the kafka broker"""
424 def __init__(self, topic, security_type):
425 from kafka import KafkaConsumer
426 remaining_retries = 10
427 port = 9092
428 if security_type != 'PLAINTEXT':
429 security_type = 'SSL'
430 port = 9093
431 while remaining_retries > 0:
432 try:
433 self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type)
434 print('Kafka consumer created on topic: '+topic)
435 break
436 except Exception as error:
437 remaining_retries -= 1
438 print('failed to connect to kafka (remaining retries '
439 + str(remaining_retries) + '): ' + str(error))
440 time.sleep(1)
441
442 if remaining_retries == 0:
443 raise Exception('failed to connect to kafka - no retries left')
444
445 self.events = []
446 self.topic = topic
447 self.stop = False
448
449 def verify_s3_events(self, keys, exact_match=False, deletions=False):
450 """verify stored s3 records agains a list of keys"""
451 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
452 self.events = []
453
454
455 def kafka_receiver_thread_runner(receiver):
456 """main thread function for the kafka receiver"""
457 try:
458 log.info('Kafka receiver started')
459 print('Kafka receiver started')
460 while not receiver.stop:
461 for msg in receiver.consumer:
462 receiver.events.append(json.loads(msg.value))
463 timer.sleep(0.1)
464 log.info('Kafka receiver ended')
465 print('Kafka receiver ended')
466 except Exception as error:
467 log.info('Kafka receiver ended unexpectedly: %s', str(error))
468 print('Kafka receiver ended unexpectedly: ' + str(error))
469
470
471 def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
472 """create kafka receiver and thread"""
473 receiver = KafkaReceiver(topic, security_type)
474 task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
475 task.daemon = True
476 return task, receiver
477
478 def stop_kafka_receiver(receiver, task):
479 """stop the receiver thread and wait for it to finis"""
480 receiver.stop = True
481 task.join(1)
482 try:
483 receiver.consumer.close()
484 except Exception as error:
485 log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
486
487
488 # follow the instruction here to create and sign a broker certificate:
489 # https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka
490
491 # the generated broker certificate should be stored in the java keystore for the use of the server
492 # assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost"
493 # following lines must be added to $KAFKA_DIR/config/server.properties
494 # listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
495 # sasl.enabled.mechanisms=PLAIN
496 # ssl.keystore.location = $KAFKA_DIR/server.keystore.jks
497 # ssl.keystore.password = abcdefgh
498 # ssl.key.password = abcdefgh
499 # ssl.truststore.location = $KAFKA_DIR/server.truststore.jks
500 # ssl.truststore.password = abcdefgh
501
502 # notes:
503 # (1) we dont test client authentication, hence, no need to generate client keys
504 # (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above
505 # should be copied to: $KAFKA_DIR
506
507 def init_kafka():
508 """ start kafka/zookeeper """
509 try:
510 KAFKA_DIR = os.environ['KAFKA_DIR']
511 except:
512 KAFKA_DIR = ''
513
514 if KAFKA_DIR == '':
515 log.info('KAFKA_DIR must be set to where kafka is installed')
516 print('KAFKA_DIR must be set to where kafka is installed')
517 return None, None, None
518
519 DEVNULL = open(os.devnull, 'wb')
520
521 print('\nStarting zookeeper...')
522 try:
523 zk_proc = subprocess.Popen([KAFKA_DIR+'bin/zookeeper-server-start.sh', KAFKA_DIR+'config/zookeeper.properties'], stdout=DEVNULL)
524 except Exception as error:
525 log.info('failed to execute zookeeper: %s', str(error))
526 print('failed to execute zookeeper: %s' % str(error))
527 return None, None, None
528
529 time.sleep(5)
530 if zk_proc.poll() is not None:
531 print('zookeeper failed to start')
532 return None, None, None
533 print('Zookeeper started')
534 print('Starting kafka...')
535 kafka_log = open('./kafka.log', 'w')
536 try:
537 kafka_env = os.environ.copy()
538 kafka_env['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR+'config/kafka_server_jaas.conf'
539 kafka_proc = subprocess.Popen([
540 KAFKA_DIR+'bin/kafka-server-start.sh',
541 KAFKA_DIR+'config/server.properties'],
542 stdout=kafka_log,
543 env=kafka_env)
544 except Exception as error:
545 log.info('failed to execute kafka: %s', str(error))
546 print('failed to execute kafka: %s' % str(error))
547 zk_proc.terminate()
548 kafka_log.close()
549 return None, None, None
550
551 # TODO add kafka checkpoint instead of sleep
552 time.sleep(15)
553 if kafka_proc.poll() is not None:
554 zk_proc.terminate()
555 print('kafka failed to start. details in: ./kafka.log')
556 kafka_log.close()
557 return None, None, None
558
559 print('Kafka started')
560 return kafka_proc, zk_proc, kafka_log
561
562
563 def clean_kafka(kafka_proc, zk_proc, kafka_log):
564 """ stop kafka/zookeeper """
565 try:
566 kafka_log.close()
567 print('Shutdown Kafka...')
568 kafka_proc.terminate()
569 time.sleep(5)
570 if kafka_proc.poll() is None:
571 print('Failed to shutdown Kafka... killing')
572 kafka_proc.kill()
573 print('Shutdown zookeeper...')
574 zk_proc.terminate()
575 time.sleep(5)
576 if zk_proc.poll() is None:
577 print('Failed to shutdown zookeeper... killing')
578 zk_proc.kill()
579 except:
580 log.info('kafka/zookeeper already terminated')
581
582
583 def init_env(require_ps=True):
584 """initialize the environment"""
585 if require_ps:
586 check_ps_configured()
587
588 realm = get_realm()
589 zonegroup = realm.master_zonegroup()
590 zonegroup_conns = ZonegroupConns(zonegroup)
591
592 zonegroup_meta_checkpoint(zonegroup)
593
594 ps_zone = None
595 master_zone = None
596 for conn in zonegroup_conns.zones:
597 if conn.zone == zonegroup.master_zone:
598 master_zone = conn
599 if is_ps_zone(conn):
600 zone_meta_checkpoint(conn.zone)
601 ps_zone = conn
602
603 assert_not_equal(master_zone, None)
604 if require_ps:
605 assert_not_equal(ps_zone, None)
606 return master_zone, ps_zone
607
608
609 def get_ip():
610 """ This method returns the "primary" IP on the local box (the one with a default route)
611 source: https://stackoverflow.com/a/28950776/711085
612 this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """
613 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
614 try:
615 # address should not be reachable
616 s.connect(('10.255.255.255', 1))
617 ip = s.getsockname()[0]
618 finally:
619 s.close()
620 return ip
621
622
623 TOPIC_SUFFIX = "_topic"
624 SUB_SUFFIX = "_sub"
625 NOTIFICATION_SUFFIX = "_notif"
626
627 ##############
628 # pubsub tests
629 ##############
630
631 def test_ps_info():
632 """ log information for manual testing """
633 return SkipTest("only used in manual testing")
634 master_zone, ps_zone = init_env()
635 realm = get_realm()
636 zonegroup = realm.master_zonegroup()
637 bucket_name = gen_bucket_name()
638 # create bucket on the first of the rados zones
639 bucket = master_zone.create_bucket(bucket_name)
640 # create objects in the bucket
641 number_of_objects = 10
642 for i in range(number_of_objects):
643 key = bucket.new_key(str(i))
644 key.set_contents_from_string('bar')
645 print('Zonegroup: ' + zonegroup.name)
646 print('user: ' + get_user())
647 print('tenant: ' + get_tenant())
648 print('Master Zone')
649 print_connection_info(master_zone.conn)
650 print('PubSub Zone')
651 print_connection_info(ps_zone.conn)
652 print('Bucket: ' + bucket_name)
653
654
655 def test_ps_s3_notification_low_level():
656 """ test low level implementation of s3 notifications """
657 master_zone, ps_zone = init_env()
658 bucket_name = gen_bucket_name()
659 # create bucket on the first of the rados zones
660 master_zone.create_bucket(bucket_name)
661 # wait for sync
662 zone_meta_checkpoint(ps_zone.zone)
663 # create topic
664 topic_name = bucket_name + TOPIC_SUFFIX
665 topic_conf = PSTopic(ps_zone.conn, topic_name)
666 result, status = topic_conf.set_config()
667 assert_equal(status/100, 2)
668 parsed_result = json.loads(result)
669 topic_arn = parsed_result['arn']
670 # create s3 notification
671 notification_name = bucket_name + NOTIFICATION_SUFFIX
672 generated_topic_name = notification_name+'_'+topic_name
673 topic_conf_list = [{'Id': notification_name,
674 'TopicArn': topic_arn,
675 'Events': ['s3:ObjectCreated:*']
676 }]
677 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
678 _, status = s3_notification_conf.set_config()
679 assert_equal(status/100, 2)
680 zone_meta_checkpoint(ps_zone.zone)
681 # get auto-generated topic
682 generated_topic_conf = PSTopic(ps_zone.conn, generated_topic_name)
683 result, status = generated_topic_conf.get_config()
684 parsed_result = json.loads(result)
685 assert_equal(status/100, 2)
686 assert_equal(parsed_result['topic']['name'], generated_topic_name)
687 # get auto-generated notification
688 notification_conf = PSNotification(ps_zone.conn, bucket_name,
689 generated_topic_name)
690 result, status = notification_conf.get_config()
691 parsed_result = json.loads(result)
692 assert_equal(status/100, 2)
693 assert_equal(len(parsed_result['topics']), 1)
694 # get auto-generated subscription
695 sub_conf = PSSubscription(ps_zone.conn, notification_name,
696 generated_topic_name)
697 result, status = sub_conf.get_config()
698 parsed_result = json.loads(result)
699 assert_equal(status/100, 2)
700 assert_equal(parsed_result['topic'], generated_topic_name)
701 # delete s3 notification
702 _, status = s3_notification_conf.del_config(notification=notification_name)
703 assert_equal(status/100, 2)
704 # delete topic
705 _, status = topic_conf.del_config()
706 assert_equal(status/100, 2)
707
708 # verify low-level cleanup
709 _, status = generated_topic_conf.get_config()
710 assert_equal(status, 404)
711 result, status = notification_conf.get_config()
712 parsed_result = json.loads(result)
713 assert_equal(len(parsed_result['topics']), 0)
714 # TODO should return 404
715 # assert_equal(status, 404)
716 result, status = sub_conf.get_config()
717 parsed_result = json.loads(result)
718 assert_equal(parsed_result['topic'], '')
719 # TODO should return 404
720 # assert_equal(status, 404)
721
722 # cleanup
723 topic_conf.del_config()
724 # delete the bucket
725 master_zone.delete_bucket(bucket_name)
726
727
728 def test_ps_s3_notification_records():
729 """ test s3 records fetching """
730 master_zone, ps_zone = init_env()
731 bucket_name = gen_bucket_name()
732 # create bucket on the first of the rados zones
733 bucket = master_zone.create_bucket(bucket_name)
734 # wait for sync
735 zone_meta_checkpoint(ps_zone.zone)
736 # create topic
737 topic_name = bucket_name + TOPIC_SUFFIX
738 topic_conf = PSTopic(ps_zone.conn, topic_name)
739 result, status = topic_conf.set_config()
740 assert_equal(status/100, 2)
741 parsed_result = json.loads(result)
742 topic_arn = parsed_result['arn']
743 # create s3 notification
744 notification_name = bucket_name + NOTIFICATION_SUFFIX
745 topic_conf_list = [{'Id': notification_name,
746 'TopicArn': topic_arn,
747 'Events': ['s3:ObjectCreated:*']
748 }]
749 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
750 _, status = s3_notification_conf.set_config()
751 assert_equal(status/100, 2)
752 zone_meta_checkpoint(ps_zone.zone)
753 # get auto-generated subscription
754 sub_conf = PSSubscription(ps_zone.conn, notification_name,
755 topic_name)
756 _, status = sub_conf.get_config()
757 assert_equal(status/100, 2)
758 # create objects in the bucket
759 number_of_objects = 10
760 for i in range(number_of_objects):
761 key = bucket.new_key(str(i))
762 key.set_contents_from_string('bar')
763 # wait for sync
764 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
765
766 # get the events from the subscription
767 result, _ = sub_conf.get_events()
768 records = json.loads(result)
769 for record in records['Records']:
770 log.debug(record)
771 keys = list(bucket.list())
772 # TODO: use exact match
773 verify_s3_records_by_elements(records, keys, exact_match=False)
774
775 # cleanup
776 _, status = s3_notification_conf.del_config()
777 topic_conf.del_config()
778 # delete the keys
779 for key in bucket.list():
780 key.delete()
781 master_zone.delete_bucket(bucket_name)
782
783
784 def test_ps_s3_notification():
785 """ test s3 notification set/get/delete """
786 master_zone, ps_zone = init_env()
787 bucket_name = gen_bucket_name()
788 # create bucket on the first of the rados zones
789 master_zone.create_bucket(bucket_name)
790 # wait for sync
791 zone_meta_checkpoint(ps_zone.zone)
792 topic_name = bucket_name + TOPIC_SUFFIX
793 # create topic
794 topic_name = bucket_name + TOPIC_SUFFIX
795 topic_conf = PSTopic(ps_zone.conn, topic_name)
796 response, status = topic_conf.set_config()
797 assert_equal(status/100, 2)
798 parsed_result = json.loads(response)
799 topic_arn = parsed_result['arn']
800 # create one s3 notification
801 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
802 topic_conf_list = [{'Id': notification_name1,
803 'TopicArn': topic_arn,
804 'Events': ['s3:ObjectCreated:*']
805 }]
806 s3_notification_conf1 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
807 response, status = s3_notification_conf1.set_config()
808 assert_equal(status/100, 2)
809 # create another s3 notification with the same topic
810 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
811 topic_conf_list = [{'Id': notification_name2,
812 'TopicArn': topic_arn,
813 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
814 }]
815 s3_notification_conf2 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
816 response, status = s3_notification_conf2.set_config()
817 assert_equal(status/100, 2)
818 zone_meta_checkpoint(ps_zone.zone)
819
820 # get all notification on a bucket
821 response, status = s3_notification_conf1.get_config()
822 assert_equal(status/100, 2)
823 assert_equal(len(response['TopicConfigurations']), 2)
824 assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
825 assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
826
827 # get specific notification on a bucket
828 response, status = s3_notification_conf1.get_config(notification=notification_name1)
829 assert_equal(status/100, 2)
830 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
831 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1)
832 response, status = s3_notification_conf2.get_config(notification=notification_name2)
833 assert_equal(status/100, 2)
834 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
835 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2)
836
837 # delete specific notifications
838 _, status = s3_notification_conf1.del_config(notification=notification_name1)
839 assert_equal(status/100, 2)
840 _, status = s3_notification_conf2.del_config(notification=notification_name2)
841 assert_equal(status/100, 2)
842
843 # cleanup
844 topic_conf.del_config()
845 # delete the bucket
846 master_zone.delete_bucket(bucket_name)
847
848
849 def test_ps_s3_notification_filter():
850 """ test s3 notification filter on master """
851 if skip_push_tests:
852 return SkipTest("PubSub push tests don't run in teuthology")
853 hostname = get_ip()
854 proc = init_rabbitmq()
855 if proc is None:
856 return SkipTest('end2end amqp tests require rabbitmq-server installed')
857
858 master_zone, ps_zone = init_env(require_ps=True)
859 ps_zone = ps_zone
860
861 realm = get_realm()
862 zonegroup = realm.master_zonegroup()
863
864 # create bucket
865 bucket_name = gen_bucket_name()
866 bucket = master_zone.create_bucket(bucket_name)
867 topic_name = bucket_name + TOPIC_SUFFIX
868
869 # start amqp receivers
870 exchange = 'ex1'
871 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
872 task.start()
873
874 # create s3 topic
875 endpoint_address = 'amqp://' + hostname
876 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
877
878 topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
879 result, _ = topic_conf.set_config()
880 parsed_result = json.loads(result)
881 topic_arn = parsed_result['arn']
882 zone_meta_checkpoint(ps_zone.zone)
883
884 # create s3 notification
885 notification_name = bucket_name + NOTIFICATION_SUFFIX
886 topic_conf_list = [{'Id': notification_name+'_1',
887 'TopicArn': topic_arn,
888 'Events': ['s3:ObjectCreated:*'],
889 'Filter': {
890 'Key': {
891 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
892 }
893 }
894 },
895 {'Id': notification_name+'_2',
896 'TopicArn': topic_arn,
897 'Events': ['s3:ObjectCreated:*'],
898 'Filter': {
899 'Key': {
900 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
901 {'Name': 'suffix', 'Value': 'log'}]
902 }
903 }
904 },
905 {'Id': notification_name+'_3',
906 'TopicArn': topic_arn,
907 'Events': [],
908 'Filter': {
909 'Key': {
910 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
911 }
912 }
913 }]
914
915 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
916 result, status = s3_notification_conf.set_config()
917 assert_equal(status/100, 2)
918
919 print('filtering by attributes only supported on master zone')
920 skip_notif4 = True
921
922 # get all notifications
923 result, status = s3_notification_conf.get_config()
924 assert_equal(status/100, 2)
925 for conf in result['TopicConfigurations']:
926 filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
927 assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
928
929 if not skip_notif4:
930 result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
931 assert_equal(status/100, 2)
932 filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
933 assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
934
935 expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
936 expected_in2 = ['world1.log', 'world2log', 'world3.log']
937 expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt']
938 expected_in4 = ['foo', 'bar', 'hello', 'world']
939 filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
940 filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld']
941 # create objects in bucket
942 for key_name in expected_in1:
943 key = bucket.new_key(key_name)
944 key.set_contents_from_string('bar')
945 for key_name in expected_in2:
946 key = bucket.new_key(key_name)
947 key.set_contents_from_string('bar')
948 for key_name in expected_in3:
949 key = bucket.new_key(key_name)
950 key.set_contents_from_string('bar')
951 if not skip_notif4:
952 for key_name in expected_in4:
953 key = bucket.new_key(key_name)
954 key.set_metadata('foo', 'bar')
955 key.set_metadata('hello', 'world')
956 key.set_metadata('goodbye', 'cruel world')
957 key.set_contents_from_string('bar')
958 for key_name in filtered:
959 key = bucket.new_key(key_name)
960 key.set_contents_from_string('bar')
961 for key_name in filtered_with_attr:
962 key.set_metadata('foo', 'nobar')
963 key.set_metadata('hello', 'noworld')
964 key.set_metadata('goodbye', 'cruel world')
965 key = bucket.new_key(key_name)
966 key.set_contents_from_string('bar')
967
968 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
969
970 found_in1 = []
971 found_in2 = []
972 found_in3 = []
973 found_in4 = []
974
975 for event in receiver.get_and_reset_events():
976 notif_id = event['Records'][0]['s3']['configurationId']
977 key_name = event['Records'][0]['s3']['object']['key']
978 if notif_id == notification_name+'_1':
979 found_in1.append(key_name)
980 elif notif_id == notification_name+'_2':
981 found_in2.append(key_name)
982 elif notif_id == notification_name+'_3':
983 found_in3.append(key_name)
984 elif not skip_notif4 and notif_id == notification_name+'_4':
985 found_in4.append(key_name)
986 else:
987 assert False, 'invalid notification: ' + notif_id
988
989 assert_equal(set(found_in1), set(expected_in1))
990 assert_equal(set(found_in2), set(expected_in2))
991 assert_equal(set(found_in3), set(expected_in3))
992 if not skip_notif4:
993 assert_equal(set(found_in4), set(expected_in4))
994
995 # cleanup
996 s3_notification_conf.del_config()
997 if not skip_notif4:
998 s3_notification_conf4.del_config()
999 topic_conf.del_config()
1000 # delete the bucket
1001 for key in bucket.list():
1002 key.delete()
1003 master_zone.delete_bucket(bucket_name)
1004 stop_amqp_receiver(receiver, task)
1005 clean_rabbitmq(proc)
1006
1007
1008 def test_object_timing():
1009 return SkipTest("only used in manual testing")
1010 master_zone, _ = init_env(require_ps=False)
1011
1012 # create bucket
1013 bucket_name = gen_bucket_name()
1014 bucket = master_zone.create_bucket(bucket_name)
1015 # create objects in the bucket (async)
1016 print('creating objects...')
1017 number_of_objects = 1000
1018 client_threads = []
1019 start_time = time.time()
1020 content = str(bytearray(os.urandom(1024*1024)))
1021 for i in range(number_of_objects):
1022 key = bucket.new_key(str(i))
1023 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1024 thr.start()
1025 client_threads.append(thr)
1026 [thr.join() for thr in client_threads]
1027
1028 time_diff = time.time() - start_time
1029 print('average time for object creation: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1030
1031 print('total number of objects: ' + str(len(list(bucket.list()))))
1032
1033 print('deleting objects...')
1034 client_threads = []
1035 start_time = time.time()
1036 for key in bucket.list():
1037 thr = threading.Thread(target = key.delete, args=())
1038 thr.start()
1039 client_threads.append(thr)
1040 [thr.join() for thr in client_threads]
1041
1042 time_diff = time.time() - start_time
1043 print('average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1044
1045 # cleanup
1046 master_zone.delete_bucket(bucket_name)
1047
1048
1049 def test_ps_s3_opaque_data():
1050 """ test that opaque id set in topic, is sent in notification """
1051 if skip_push_tests:
1052 return SkipTest("PubSub push tests don't run in teuthology")
1053 hostname = get_ip()
1054 master_zone, ps_zone = init_env()
1055 realm = get_realm()
1056 zonegroup = realm.master_zonegroup()
1057
1058 # create random port for the http server
1059 host = get_ip()
1060 port = random.randint(10000, 20000)
1061 # start an http server in a separate thread
1062 number_of_objects = 10
1063 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1064
1065 # create bucket
1066 bucket_name = gen_bucket_name()
1067 bucket = master_zone.create_bucket(bucket_name)
1068 topic_name = bucket_name + TOPIC_SUFFIX
1069 # wait for sync
1070 zone_meta_checkpoint(ps_zone.zone)
1071
1072 # create s3 topic
1073 endpoint_address = 'http://'+host+':'+str(port)
1074 opaque_data = 'http://1.2.3.4:8888'
1075 endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
1076 topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
1077 result, status = topic_conf.set_config()
1078 assert_equal(status/100, 2)
1079 parsed_result = json.loads(result)
1080 topic_arn = parsed_result['arn']
1081 # create s3 notification
1082 notification_name = bucket_name + NOTIFICATION_SUFFIX
1083 topic_conf_list = [{'Id': notification_name,
1084 'TopicArn': topic_arn,
1085 'Events': []
1086 }]
1087 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1088 response, status = s3_notification_conf.set_config()
1089 assert_equal(status/100, 2)
1090
1091 # create objects in the bucket
1092 client_threads = []
1093 content = 'bar'
1094 for i in range(number_of_objects):
1095 key = bucket.new_key(str(i))
1096 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1097 thr.start()
1098 client_threads.append(thr)
1099 [thr.join() for thr in client_threads]
1100
1101 # wait for sync
1102 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1103
1104 # check http receiver
1105 keys = list(bucket.list())
1106 print('total number of objects: ' + str(len(keys)))
1107 events = http_server.get_and_reset_events()
1108 for event in events:
1109 assert_equal(event['Records'][0]['opaqueData'], opaque_data)
1110
1111 # cleanup
1112 for key in keys:
1113 key.delete()
1114 [thr.join() for thr in client_threads]
1115 topic_conf.del_config()
1116 s3_notification_conf.del_config(notification=notification_name)
1117 # delete the bucket
1118 master_zone.delete_bucket(bucket_name)
1119 http_server.close()
1120
1121
1122 def test_ps_topic():
1123 """ test set/get/delete of topic """
1124 _, ps_zone = init_env()
1125 realm = get_realm()
1126 zonegroup = realm.master_zonegroup()
1127 bucket_name = gen_bucket_name()
1128 topic_name = bucket_name+TOPIC_SUFFIX
1129
1130 # create topic
1131 topic_conf = PSTopic(ps_zone.conn, topic_name)
1132 _, status = topic_conf.set_config()
1133 assert_equal(status/100, 2)
1134 # get topic
1135 result, _ = topic_conf.get_config()
1136 # verify topic content
1137 parsed_result = json.loads(result)
1138 assert_equal(parsed_result['topic']['name'], topic_name)
1139 assert_equal(len(parsed_result['subs']), 0)
1140 assert_equal(parsed_result['topic']['arn'],
1141 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
1142 # delete topic
1143 _, status = topic_conf.del_config()
1144 assert_equal(status/100, 2)
1145 # verift topic is deleted
1146 result, status = topic_conf.get_config()
1147 assert_equal(status, 404)
1148 parsed_result = json.loads(result)
1149 assert_equal(parsed_result['Code'], 'NoSuchKey')
1150
1151
1152 def test_ps_topic_with_endpoint():
1153 """ test set topic with endpoint"""
1154 _, ps_zone = init_env()
1155 bucket_name = gen_bucket_name()
1156 topic_name = bucket_name+TOPIC_SUFFIX
1157
1158 # create topic
1159 dest_endpoint = 'amqp://localhost:7001'
1160 dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
1161 topic_conf = PSTopic(ps_zone.conn, topic_name,
1162 endpoint=dest_endpoint,
1163 endpoint_args=dest_args)
1164 _, status = topic_conf.set_config()
1165 assert_equal(status/100, 2)
1166 # get topic
1167 result, _ = topic_conf.get_config()
1168 # verify topic content
1169 parsed_result = json.loads(result)
1170 assert_equal(parsed_result['topic']['name'], topic_name)
1171 assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint)
1172 # cleanup
1173 topic_conf.del_config()
1174
1175
1176 def test_ps_notification():
1177 """ test set/get/delete of notification """
1178 master_zone, ps_zone = init_env()
1179 bucket_name = gen_bucket_name()
1180 topic_name = bucket_name+TOPIC_SUFFIX
1181
1182 # create topic
1183 topic_conf = PSTopic(ps_zone.conn, topic_name)
1184 topic_conf.set_config()
1185 # create bucket on the first of the rados zones
1186 master_zone.create_bucket(bucket_name)
1187 # wait for sync
1188 zone_meta_checkpoint(ps_zone.zone)
1189 # create notifications
1190 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1191 topic_name)
1192 _, status = notification_conf.set_config()
1193 assert_equal(status/100, 2)
1194 # get notification
1195 result, _ = notification_conf.get_config()
1196 parsed_result = json.loads(result)
1197 assert_equal(len(parsed_result['topics']), 1)
1198 assert_equal(parsed_result['topics'][0]['topic']['name'],
1199 topic_name)
1200 # delete notification
1201 _, status = notification_conf.del_config()
1202 assert_equal(status/100, 2)
1203 result, status = notification_conf.get_config()
1204 parsed_result = json.loads(result)
1205 assert_equal(len(parsed_result['topics']), 0)
1206 # TODO should return 404
1207 # assert_equal(status, 404)
1208
1209 # cleanup
1210 topic_conf.del_config()
1211 master_zone.delete_bucket(bucket_name)
1212
1213
1214 def test_ps_notification_events():
1215 """ test set/get/delete of notification on specific events"""
1216 master_zone, ps_zone = init_env()
1217 bucket_name = gen_bucket_name()
1218 topic_name = bucket_name+TOPIC_SUFFIX
1219
1220 # create topic
1221 topic_conf = PSTopic(ps_zone.conn, topic_name)
1222 topic_conf.set_config()
1223 # create bucket on the first of the rados zones
1224 master_zone.create_bucket(bucket_name)
1225 # wait for sync
1226 zone_meta_checkpoint(ps_zone.zone)
1227 # create notifications
1228 events = "OBJECT_CREATE,OBJECT_DELETE"
1229 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1230 topic_name,
1231 events)
1232 _, status = notification_conf.set_config()
1233 assert_equal(status/100, 2)
1234 # get notification
1235 result, _ = notification_conf.get_config()
1236 parsed_result = json.loads(result)
1237 assert_equal(len(parsed_result['topics']), 1)
1238 assert_equal(parsed_result['topics'][0]['topic']['name'],
1239 topic_name)
1240 assert_not_equal(len(parsed_result['topics'][0]['events']), 0)
1241 # TODO add test for invalid event name
1242
1243 # cleanup
1244 notification_conf.del_config()
1245 topic_conf.del_config()
1246 master_zone.delete_bucket(bucket_name)
1247
1248
1249 def test_ps_subscription():
1250 """ test set/get/delete of subscription """
1251 master_zone, ps_zone = init_env()
1252 bucket_name = gen_bucket_name()
1253 topic_name = bucket_name+TOPIC_SUFFIX
1254
1255 # create topic
1256 topic_conf = PSTopic(ps_zone.conn, topic_name)
1257 topic_conf.set_config()
1258 # create bucket on the first of the rados zones
1259 bucket = master_zone.create_bucket(bucket_name)
1260 # wait for sync
1261 zone_meta_checkpoint(ps_zone.zone)
1262 # create notifications
1263 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1264 topic_name)
1265 _, status = notification_conf.set_config()
1266 assert_equal(status/100, 2)
1267 # create subscription
1268 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
1269 topic_name)
1270 _, status = sub_conf.set_config()
1271 assert_equal(status/100, 2)
1272 # get the subscription
1273 result, _ = sub_conf.get_config()
1274 parsed_result = json.loads(result)
1275 assert_equal(parsed_result['topic'], topic_name)
1276 # create objects in the bucket
1277 number_of_objects = 10
1278 for i in range(number_of_objects):
1279 key = bucket.new_key(str(i))
1280 key.set_contents_from_string('bar')
1281 # wait for sync
1282 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1283
1284 # get the create events from the subscription
1285 result, _ = sub_conf.get_events()
1286 events = json.loads(result)
1287 for event in events['events']:
1288 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1289 keys = list(bucket.list())
1290 # TODO: use exact match
1291 verify_events_by_elements(events, keys, exact_match=False)
1292 # delete objects from the bucket
1293 for key in bucket.list():
1294 key.delete()
1295 # wait for sync
1296 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1297
1298 # get the delete events from the subscriptions
1299 #result, _ = sub_conf.get_events()
1300 #for event in events['events']:
1301 # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1302 # TODO: check deletions
1303 # TODO: use exact match
1304 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1305 # we should see the creations as well as the deletions
1306 # delete subscription
1307 _, status = sub_conf.del_config()
1308 assert_equal(status/100, 2)
1309 result, status = sub_conf.get_config()
1310 parsed_result = json.loads(result)
1311 assert_equal(parsed_result['topic'], '')
1312 # TODO should return 404
1313 # assert_equal(status, 404)
1314
1315 # cleanup
1316 notification_conf.del_config()
1317 topic_conf.del_config()
1318 master_zone.delete_bucket(bucket_name)
1319
1320
1321 def test_ps_admin():
1322 """ test radosgw-admin commands """
1323 master_zone, ps_zone = init_env()
1324 bucket_name = gen_bucket_name()
1325 topic_name = bucket_name+TOPIC_SUFFIX
1326 realm = get_realm()
1327 zonegroup = realm.master_zonegroup()
1328
1329 # create topic
1330 topic_conf = PSTopic(ps_zone.conn, topic_name)
1331 topic_conf.set_config()
1332 result, status = topic_conf.get_config()
1333 assert_equal(status, 200)
1334 parsed_result = json.loads(result)
1335 assert_equal(parsed_result['topic']['name'], topic_name)
1336 result, status = ps_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + ps_zone.zone.zone_arg())
1337 assert_equal(status, 0)
1338 parsed_result = json.loads(result)
1339 assert len(parsed_result['topics']) > 0
1340 result, status = ps_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + ps_zone.zone.zone_arg())
1341 assert_equal(status, 0)
1342 parsed_result = json.loads(result)
1343 assert_equal(parsed_result['topic']['name'], topic_name)
1344
1345 # create s3 topics
1346 endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
1347 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
1348 topic_conf_s3 = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
1349 topic_conf_s3.set_config()
1350 result, status = topic_conf_s3.get_config()
1351 assert_equal(status, 200)
1352 assert_equal(result['GetTopicResponse']['GetTopicResult']['Topic']['Name'], topic_name)
1353 result, status = master_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + master_zone.zone.zone_arg())
1354 assert_equal(status, 0)
1355 parsed_result = json.loads(result)
1356 assert len(parsed_result['topics']) > 0
1357 result, status = master_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + master_zone.zone.zone_arg())
1358 assert_equal(status, 0)
1359 parsed_result = json.loads(result)
1360 assert_equal(parsed_result['topic']['name'], topic_name)
1361
1362 # create bucket on the first of the rados zones
1363 bucket = master_zone.create_bucket(bucket_name)
1364 # wait for sync
1365 zone_meta_checkpoint(ps_zone.zone)
1366 # create notifications
1367 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1368 topic_name)
1369 _, status = notification_conf.set_config()
1370 assert_equal(status/100, 2)
1371 # create subscription
1372 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
1373 topic_name)
1374 _, status = sub_conf.set_config()
1375 assert_equal(status/100, 2)
1376 result, status = ps_zone.zone.cluster.admin(['subscription', 'get', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX]
1377 + ps_zone.zone.zone_arg())
1378 assert_equal(status, 0)
1379 parsed_result = json.loads(result)
1380 assert_equal(parsed_result['name'], bucket_name+SUB_SUFFIX)
1381 # create objects in the bucket
1382 number_of_objects = 110
1383 for i in range(number_of_objects):
1384 key = bucket.new_key(str(i))
1385 key.set_contents_from_string('bar')
1386 # wait for sync
1387 # get events from subscription
1388 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1389 result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX]
1390 + ps_zone.zone.zone_arg())
1391 assert_equal(status, 0)
1392 parsed_result = json.loads(result)
1393 marker = parsed_result['next_marker']
1394 events1 = parsed_result['events']
1395 result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--marker', marker]
1396 + ps_zone.zone.zone_arg())
1397 assert_equal(status, 0)
1398 parsed_result = json.loads(result)
1399 events2 = parsed_result['events']
1400
1401 keys = list(bucket.list())
1402 verify_events_by_elements({"events": events1+events2}, keys, exact_match=False)
1403
1404 # ack an event in the subscription
1405 result, status = ps_zone.zone.cluster.admin(['subscription', 'ack', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--event-id', events2[0]['id']]
1406 + ps_zone.zone.zone_arg())
1407 assert_equal(status, 0)
1408
1409 # remove the subscription
1410 result, status = ps_zone.zone.cluster.admin(['subscription', 'rm', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX]
1411 + ps_zone.zone.zone_arg())
1412 assert_equal(status, 0)
1413
1414 # remove the topics
1415 result, status = ps_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name]
1416 + ps_zone.zone.zone_arg())
1417 assert_equal(status, 0)
1418 result, status = master_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name]
1419 + master_zone.zone.zone_arg())
1420 assert_equal(status, 0)
1421
1422 # cleanup
1423 for key in bucket.list():
1424 key.delete()
1425 notification_conf.del_config()
1426 master_zone.delete_bucket(bucket_name)
1427
1428
1429 def test_ps_incremental_sync():
1430 """ test that events are only sent on incremental sync """
1431 master_zone, ps_zone = init_env()
1432 bucket_name = gen_bucket_name()
1433 topic_name = bucket_name+TOPIC_SUFFIX
1434
1435 # create topic
1436 topic_conf = PSTopic(ps_zone.conn, topic_name)
1437 topic_conf.set_config()
1438 # create bucket on the first of the rados zones
1439 bucket = master_zone.create_bucket(bucket_name)
1440 # create objects in the bucket
1441 number_of_objects = 10
1442 for i in range(0, number_of_objects):
1443 key = bucket.new_key(str(i))
1444 key.set_contents_from_string('foo')
1445 # wait for sync
1446 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1447 # create notifications
1448 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1449 topic_name)
1450 _, status = notification_conf.set_config()
1451 assert_equal(status/100, 2)
1452 # create subscription
1453 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
1454 topic_name)
1455 _, status = sub_conf.set_config()
1456 assert_equal(status/100, 2)
1457
1458 # create more objects in the bucket
1459 for i in range(number_of_objects, 2*number_of_objects):
1460 key = bucket.new_key(str(i))
1461 key.set_contents_from_string('bar')
1462 # wait for sync
1463 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1464
1465 # get the create events from the subscription
1466 result, _ = sub_conf.get_events()
1467 events = json.loads(result)
1468 count = 0
1469 for event in events['events']:
1470 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1471 count += 1
1472
1473 # make sure we have 10 and not 20 events
1474 assert_equal(count, number_of_objects)
1475
1476 # cleanup
1477 for key in bucket.list():
1478 key.delete()
1479 sub_conf.del_config()
1480 notification_conf.del_config()
1481 topic_conf.del_config()
1482 master_zone.delete_bucket(bucket_name)
1483
1484 def test_ps_event_type_subscription():
1485 """ test subscriptions for different events """
1486 master_zone, ps_zone = init_env()
1487 bucket_name = gen_bucket_name()
1488
1489 # create topic for objects creation
1490 topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
1491 topic_create_conf = PSTopic(ps_zone.conn, topic_create_name)
1492 topic_create_conf.set_config()
1493 # create topic for objects deletion
1494 topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete'
1495 topic_delete_conf = PSTopic(ps_zone.conn, topic_delete_name)
1496 topic_delete_conf.set_config()
1497 # create topic for all events
1498 topic_name = bucket_name+TOPIC_SUFFIX+'_all'
1499 topic_conf = PSTopic(ps_zone.conn, topic_name)
1500 topic_conf.set_config()
1501 # create bucket on the first of the rados zones
1502 bucket = master_zone.create_bucket(bucket_name)
1503 # wait for sync
1504 zone_meta_checkpoint(ps_zone.zone)
1505 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1506 # create notifications for objects creation
1507 notification_create_conf = PSNotification(ps_zone.conn, bucket_name,
1508 topic_create_name, "OBJECT_CREATE")
1509 _, status = notification_create_conf.set_config()
1510 assert_equal(status/100, 2)
1511 # create notifications for objects deletion
1512 notification_delete_conf = PSNotification(ps_zone.conn, bucket_name,
1513 topic_delete_name, "OBJECT_DELETE")
1514 _, status = notification_delete_conf.set_config()
1515 assert_equal(status/100, 2)
1516 # create notifications for all events
1517 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1518 topic_name, "OBJECT_DELETE,OBJECT_CREATE")
1519 _, status = notification_conf.set_config()
1520 assert_equal(status/100, 2)
1521 # create subscription for objects creation
1522 sub_create_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_create',
1523 topic_create_name)
1524 _, status = sub_create_conf.set_config()
1525 assert_equal(status/100, 2)
1526 # create subscription for objects deletion
1527 sub_delete_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_delete',
1528 topic_delete_name)
1529 _, status = sub_delete_conf.set_config()
1530 assert_equal(status/100, 2)
1531 # create subscription for all events
1532 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_all',
1533 topic_name)
1534 _, status = sub_conf.set_config()
1535 assert_equal(status/100, 2)
1536 # create objects in the bucket
1537 number_of_objects = 10
1538 for i in range(number_of_objects):
1539 key = bucket.new_key(str(i))
1540 key.set_contents_from_string('bar')
1541 # wait for sync
1542 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1543
1544 # get the events from the creation subscription
1545 result, _ = sub_create_conf.get_events()
1546 events = json.loads(result)
1547 for event in events['events']:
1548 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
1549 '" type: "' + str(event['event']) + '"')
1550 keys = list(bucket.list())
1551 # TODO: use exact match
1552 verify_events_by_elements(events, keys, exact_match=False)
1553 # get the events from the deletions subscription
1554 result, _ = sub_delete_conf.get_events()
1555 events = json.loads(result)
1556 for event in events['events']:
1557 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
1558 '" type: "' + str(event['event']) + '"')
1559 assert_equal(len(events['events']), 0)
1560 # get the events from the all events subscription
1561 result, _ = sub_conf.get_events()
1562 events = json.loads(result)
1563 for event in events['events']:
1564 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
1565 str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1566 # TODO: use exact match
1567 verify_events_by_elements(events, keys, exact_match=False)
1568 # delete objects from the bucket
1569 for key in bucket.list():
1570 key.delete()
1571 # wait for sync
1572 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1573 log.debug("Event (OBJECT_DELETE) synced")
1574
1575 # get the events from the creations subscription
1576 result, _ = sub_create_conf.get_events()
1577 events = json.loads(result)
1578 for event in events['events']:
1579 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
1580 '" type: "' + str(event['event']) + '"')
1581 # deletions should not change the creation events
1582 # TODO: use exact match
1583 verify_events_by_elements(events, keys, exact_match=False)
1584 # get the events from the deletions subscription
1585 result, _ = sub_delete_conf.get_events()
1586 events = json.loads(result)
1587 for event in events['events']:
1588 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
1589 '" type: "' + str(event['event']) + '"')
1590 # only deletions should be listed here
1591 # TODO: use exact match
1592 verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1593 # get the events from the all events subscription
1594 result, _ = sub_create_conf.get_events()
1595 events = json.loads(result)
1596 for event in events['events']:
1597 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
1598 '" type: "' + str(event['event']) + '"')
1599 # both deletions and creations should be here
1600 # TODO: use exact match
1601 verify_events_by_elements(events, keys, exact_match=False, deletions=False)
1602 # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
1603 # TODO: (1) test deletions (2) test overall number of events
1604
1605 # test subscription deletion when topic is specified
1606 _, status = sub_create_conf.del_config(topic=True)
1607 assert_equal(status/100, 2)
1608 _, status = sub_delete_conf.del_config(topic=True)
1609 assert_equal(status/100, 2)
1610 _, status = sub_conf.del_config(topic=True)
1611 assert_equal(status/100, 2)
1612
1613 # cleanup
1614 notification_create_conf.del_config()
1615 notification_delete_conf.del_config()
1616 notification_conf.del_config()
1617 topic_create_conf.del_config()
1618 topic_delete_conf.del_config()
1619 topic_conf.del_config()
1620 master_zone.delete_bucket(bucket_name)
1621
1622
1623 def test_ps_event_fetching():
1624 """ test incremental fetching of events from a subscription """
1625 master_zone, ps_zone = init_env()
1626 bucket_name = gen_bucket_name()
1627 topic_name = bucket_name+TOPIC_SUFFIX
1628
1629 # create topic
1630 topic_conf = PSTopic(ps_zone.conn, topic_name)
1631 topic_conf.set_config()
1632 # create bucket on the first of the rados zones
1633 bucket = master_zone.create_bucket(bucket_name)
1634 # wait for sync
1635 zone_meta_checkpoint(ps_zone.zone)
1636 # create notifications
1637 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1638 topic_name)
1639 _, status = notification_conf.set_config()
1640 assert_equal(status/100, 2)
1641 # create subscription
1642 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
1643 topic_name)
1644 _, status = sub_conf.set_config()
1645 assert_equal(status/100, 2)
1646 # create objects in the bucket
1647 number_of_objects = 100
1648 for i in range(number_of_objects):
1649 key = bucket.new_key(str(i))
1650 key.set_contents_from_string('bar')
1651 # wait for sync
1652 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1653 max_events = 15
1654 total_events_count = 0
1655 next_marker = None
1656 all_events = []
1657 while True:
1658 # get the events from the subscription
1659 result, _ = sub_conf.get_events(max_events, next_marker)
1660 events = json.loads(result)
1661 total_events_count += len(events['events'])
1662 all_events.extend(events['events'])
1663 next_marker = events['next_marker']
1664 for event in events['events']:
1665 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1666 if next_marker == '':
1667 break
1668 keys = list(bucket.list())
1669 # TODO: use exact match
1670 verify_events_by_elements({'events': all_events}, keys, exact_match=False)
1671
1672 # cleanup
1673 sub_conf.del_config()
1674 notification_conf.del_config()
1675 topic_conf.del_config()
1676 for key in bucket.list():
1677 key.delete()
1678 master_zone.delete_bucket(bucket_name)
1679
1680
1681 def test_ps_event_acking():
1682 """ test acking of some events in a subscription """
1683 master_zone, ps_zone = init_env()
1684 bucket_name = gen_bucket_name()
1685 topic_name = bucket_name+TOPIC_SUFFIX
1686
1687 # create topic
1688 topic_conf = PSTopic(ps_zone.conn, topic_name)
1689 topic_conf.set_config()
1690 # create bucket on the first of the rados zones
1691 bucket = master_zone.create_bucket(bucket_name)
1692 # wait for sync
1693 zone_meta_checkpoint(ps_zone.zone)
1694 # create notifications
1695 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1696 topic_name)
1697 _, status = notification_conf.set_config()
1698 assert_equal(status/100, 2)
1699 # create subscription
1700 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
1701 topic_name)
1702 _, status = sub_conf.set_config()
1703 assert_equal(status/100, 2)
1704 # create objects in the bucket
1705 number_of_objects = 10
1706 for i in range(number_of_objects):
1707 key = bucket.new_key(str(i))
1708 key.set_contents_from_string('bar')
1709 # wait for sync
1710 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1711
1712 # get the create events from the subscription
1713 result, _ = sub_conf.get_events()
1714 events = json.loads(result)
1715 original_number_of_events = len(events)
1716 for event in events['events']:
1717 log.debug('Event (before ack) id: "' + str(event['id']) + '"')
1718 keys = list(bucket.list())
1719 # TODO: use exact match
1720 verify_events_by_elements(events, keys, exact_match=False)
1721 # ack half of the events
1722 events_to_ack = number_of_objects/2
1723 for event in events['events']:
1724 if events_to_ack == 0:
1725 break
1726 _, status = sub_conf.ack_events(event['id'])
1727 assert_equal(status/100, 2)
1728 events_to_ack -= 1
1729
1730 # verify that acked events are gone
1731 result, _ = sub_conf.get_events()
1732 events = json.loads(result)
1733 for event in events['events']:
1734 log.debug('Event (after ack) id: "' + str(event['id']) + '"')
1735 assert len(events) >= (original_number_of_events - number_of_objects/2)
1736
1737 # cleanup
1738 sub_conf.del_config()
1739 notification_conf.del_config()
1740 topic_conf.del_config()
1741 for key in bucket.list():
1742 key.delete()
1743 master_zone.delete_bucket(bucket_name)
1744
1745
1746 def test_ps_creation_triggers():
1747 """ test object creation notifications in using put/copy/post """
1748 master_zone, ps_zone = init_env()
1749 bucket_name = gen_bucket_name()
1750 topic_name = bucket_name+TOPIC_SUFFIX
1751
1752 # create topic
1753 topic_conf = PSTopic(ps_zone.conn, topic_name)
1754 topic_conf.set_config()
1755 # create bucket on the first of the rados zones
1756 bucket = master_zone.create_bucket(bucket_name)
1757 # wait for sync
1758 zone_meta_checkpoint(ps_zone.zone)
1759 # create notifications
1760 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1761 topic_name)
1762 _, status = notification_conf.set_config()
1763 assert_equal(status/100, 2)
1764 # create subscription
1765 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
1766 topic_name)
1767 _, status = sub_conf.set_config()
1768 assert_equal(status/100, 2)
1769 # create objects in the bucket using PUT
1770 key = bucket.new_key('put')
1771 key.set_contents_from_string('bar')
1772 # create objects in the bucket using COPY
1773 bucket.copy_key('copy', bucket.name, key.name)
1774
1775 # create objects in the bucket using multi-part upload
1776 fp = tempfile.NamedTemporaryFile(mode='w+b')
1777 object_size = 1024
1778 content = bytearray(os.urandom(object_size))
1779 fp.write(content)
1780 fp.flush()
1781 fp.seek(0)
1782 uploader = bucket.initiate_multipart_upload('multipart')
1783 uploader.upload_part_from_file(fp, 1)
1784 uploader.complete_upload()
1785 fp.close()
1786
1787 # wait for sync
1788 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1789
1790 # get the create events from the subscription
1791 result, _ = sub_conf.get_events()
1792 events = json.loads(result)
1793 for event in events['events']:
1794 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1795
1796 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
1797 assert len(events['events']) >= 3
1798 # cleanup
1799 sub_conf.del_config()
1800 notification_conf.del_config()
1801 topic_conf.del_config()
1802 for key in bucket.list():
1803 key.delete()
1804 master_zone.delete_bucket(bucket_name)
1805
1806
1807 def test_ps_versioned_deletion():
1808 """ test notification of deletion markers """
1809 master_zone, ps_zone = init_env()
1810 bucket_name = gen_bucket_name()
1811 topic_name = bucket_name+TOPIC_SUFFIX
1812
1813 # create topics
1814 topic_conf1 = PSTopic(ps_zone.conn, topic_name+'_1')
1815 _, status = topic_conf1.set_config()
1816 assert_equal(status/100, 2)
1817 topic_conf2 = PSTopic(ps_zone.conn, topic_name+'_2')
1818 _, status = topic_conf2.set_config()
1819 assert_equal(status/100, 2)
1820
1821 # create bucket on the first of the rados zones
1822 bucket = master_zone.create_bucket(bucket_name)
1823 bucket.configure_versioning(True)
1824
1825 # wait for sync
1826 zone_meta_checkpoint(ps_zone.zone)
1827
1828 # create notifications
1829 event_type1 = 'OBJECT_DELETE'
1830 notification_conf1 = PSNotification(ps_zone.conn, bucket_name,
1831 topic_name+'_1',
1832 event_type1)
1833 _, status = notification_conf1.set_config()
1834 assert_equal(status/100, 2)
1835 event_type2 = 'DELETE_MARKER_CREATE'
1836 notification_conf2 = PSNotification(ps_zone.conn, bucket_name,
1837 topic_name+'_2',
1838 event_type2)
1839 _, status = notification_conf2.set_config()
1840 assert_equal(status/100, 2)
1841
1842 # create subscriptions
1843 sub_conf1 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_1',
1844 topic_name+'_1')
1845 _, status = sub_conf1.set_config()
1846 assert_equal(status/100, 2)
1847 sub_conf2 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_2',
1848 topic_name+'_2')
1849 _, status = sub_conf2.set_config()
1850 assert_equal(status/100, 2)
1851
1852 # create objects in the bucket
1853 key = bucket.new_key('foo')
1854 key.set_contents_from_string('bar')
1855 v1 = key.version_id
1856 key.set_contents_from_string('kaboom')
1857 v2 = key.version_id
1858 # create deletion marker
1859 delete_marker_key = bucket.delete_key(key.name)
1860
1861 # wait for sync
1862 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1863
1864 # delete the deletion marker
1865 delete_marker_key.delete()
1866 # delete versions
1867 bucket.delete_key(key.name, version_id=v2)
1868 bucket.delete_key(key.name, version_id=v1)
1869
1870 # wait for sync
1871 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1872
1873 # get the delete events from the subscription
1874 result, _ = sub_conf1.get_events()
1875 events = json.loads(result)
1876 for event in events['events']:
1877 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1878 assert_equal(str(event['event']), event_type1)
1879
1880 result, _ = sub_conf2.get_events()
1881 events = json.loads(result)
1882 for event in events['events']:
1883 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
1884 assert_equal(str(event['event']), event_type2)
1885
1886 # cleanup
1887 # follwing is needed for the cleanup in the case of 3-zones
1888 # see: http://tracker.ceph.com/issues/39142
1889 realm = get_realm()
1890 zonegroup = realm.master_zonegroup()
1891 zonegroup_conns = ZonegroupConns(zonegroup)
1892 try:
1893 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
1894 master_zone.delete_bucket(bucket_name)
1895 except:
1896 log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
1897 sub_conf1.del_config()
1898 sub_conf2.del_config()
1899 notification_conf1.del_config()
1900 notification_conf2.del_config()
1901 topic_conf1.del_config()
1902 topic_conf2.del_config()
1903
1904
1905 def test_ps_push_http():
1906 """ test pushing to http endpoint """
1907 if skip_push_tests:
1908 return SkipTest("PubSub push tests don't run in teuthology")
1909 master_zone, ps_zone = init_env()
1910 bucket_name = gen_bucket_name()
1911 topic_name = bucket_name+TOPIC_SUFFIX
1912
1913 # create random port for the http server
1914 host = get_ip()
1915 port = random.randint(10000, 20000)
1916 # start an http server in a separate thread
1917 http_server = StreamingHTTPServer(host, port)
1918
1919 # create topic
1920 topic_conf = PSTopic(ps_zone.conn, topic_name)
1921 _, status = topic_conf.set_config()
1922 assert_equal(status/100, 2)
1923 # create bucket on the first of the rados zones
1924 bucket = master_zone.create_bucket(bucket_name)
1925 # wait for sync
1926 zone_meta_checkpoint(ps_zone.zone)
1927 # create notifications
1928 notification_conf = PSNotification(ps_zone.conn, bucket_name,
1929 topic_name)
1930 _, status = notification_conf.set_config()
1931 assert_equal(status/100, 2)
1932 # create subscription
1933 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
1934 topic_name, endpoint='http://'+host+':'+str(port))
1935 _, status = sub_conf.set_config()
1936 assert_equal(status/100, 2)
1937 # create objects in the bucket
1938 number_of_objects = 10
1939 for i in range(number_of_objects):
1940 key = bucket.new_key(str(i))
1941 key.set_contents_from_string('bar')
1942 # wait for sync
1943 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1944 # check http server
1945 keys = list(bucket.list())
1946 # TODO: use exact match
1947 http_server.verify_events(keys, exact_match=False)
1948
1949 # delete objects from the bucket
1950 for key in bucket.list():
1951 key.delete()
1952 # wait for sync
1953 zone_meta_checkpoint(ps_zone.zone)
1954 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
1955 # check http server
1956 # TODO: use exact match
1957 http_server.verify_events(keys, deletions=True, exact_match=False)
1958
1959 # cleanup
1960 sub_conf.del_config()
1961 notification_conf.del_config()
1962 topic_conf.del_config()
1963 master_zone.delete_bucket(bucket_name)
1964 http_server.close()
1965
1966
1967 def test_ps_s3_push_http():
1968 """ test pushing to http endpoint s3 record format"""
1969 if skip_push_tests:
1970 return SkipTest("PubSub push tests don't run in teuthology")
1971 master_zone, ps_zone = init_env()
1972 bucket_name = gen_bucket_name()
1973 topic_name = bucket_name+TOPIC_SUFFIX
1974
1975 # create random port for the http server
1976 host = get_ip()
1977 port = random.randint(10000, 20000)
1978 # start an http server in a separate thread
1979 http_server = StreamingHTTPServer(host, port)
1980
1981 # create topic
1982 topic_conf = PSTopic(ps_zone.conn, topic_name,
1983 endpoint='http://'+host+':'+str(port))
1984 result, status = topic_conf.set_config()
1985 assert_equal(status/100, 2)
1986 parsed_result = json.loads(result)
1987 topic_arn = parsed_result['arn']
1988 # create bucket on the first of the rados zones
1989 bucket = master_zone.create_bucket(bucket_name)
1990 # wait for sync
1991 zone_meta_checkpoint(ps_zone.zone)
1992 # create s3 notification
1993 notification_name = bucket_name + NOTIFICATION_SUFFIX
1994 topic_conf_list = [{'Id': notification_name,
1995 'TopicArn': topic_arn,
1996 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
1997 }]
1998 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
1999 _, status = s3_notification_conf.set_config()
2000 assert_equal(status/100, 2)
2001 # create objects in the bucket
2002 number_of_objects = 10
2003 for i in range(number_of_objects):
2004 key = bucket.new_key(str(i))
2005 key.set_contents_from_string('bar')
2006 # wait for sync
2007 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2008 # check http server
2009 keys = list(bucket.list())
2010 # TODO: use exact match
2011 http_server.verify_s3_events(keys, exact_match=False)
2012
2013 # delete objects from the bucket
2014 for key in bucket.list():
2015 key.delete()
2016 # wait for sync
2017 zone_meta_checkpoint(ps_zone.zone)
2018 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2019 # check http server
2020 # TODO: use exact match
2021 http_server.verify_s3_events(keys, deletions=True, exact_match=False)
2022
2023 # cleanup
2024 s3_notification_conf.del_config()
2025 topic_conf.del_config()
2026 master_zone.delete_bucket(bucket_name)
2027 http_server.close()
2028
2029
2030 def test_ps_push_amqp():
2031 """ test pushing to amqp endpoint """
2032 if skip_push_tests:
2033 return SkipTest("PubSub push tests don't run in teuthology")
2034 hostname = get_ip()
2035 proc = init_rabbitmq()
2036 if proc is None:
2037 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2038 master_zone, ps_zone = init_env()
2039 bucket_name = gen_bucket_name()
2040 topic_name = bucket_name+TOPIC_SUFFIX
2041
2042 # create topic
2043 exchange = 'ex1'
2044 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2045 task.start()
2046 topic_conf = PSTopic(ps_zone.conn, topic_name)
2047 _, status = topic_conf.set_config()
2048 assert_equal(status/100, 2)
2049 # create bucket on the first of the rados zones
2050 bucket = master_zone.create_bucket(bucket_name)
2051 # wait for sync
2052 zone_meta_checkpoint(ps_zone.zone)
2053 # create notifications
2054 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2055 topic_name)
2056 _, status = notification_conf.set_config()
2057 assert_equal(status/100, 2)
2058 # create subscription
2059 sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
2060 topic_name, endpoint='amqp://'+hostname,
2061 endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker')
2062 _, status = sub_conf.set_config()
2063 assert_equal(status/100, 2)
2064 # create objects in the bucket
2065 number_of_objects = 10
2066 for i in range(number_of_objects):
2067 key = bucket.new_key(str(i))
2068 key.set_contents_from_string('bar')
2069 # wait for sync
2070 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2071 # check amqp receiver
2072 keys = list(bucket.list())
2073 # TODO: use exact match
2074 receiver.verify_events(keys, exact_match=False)
2075
2076 # delete objects from the bucket
2077 for key in bucket.list():
2078 key.delete()
2079 # wait for sync
2080 zone_meta_checkpoint(ps_zone.zone)
2081 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2082 # check amqp receiver
2083 # TODO: use exact match
2084 receiver.verify_events(keys, deletions=True, exact_match=False)
2085
2086 # cleanup
2087 stop_amqp_receiver(receiver, task)
2088 sub_conf.del_config()
2089 notification_conf.del_config()
2090 topic_conf.del_config()
2091 master_zone.delete_bucket(bucket_name)
2092 clean_rabbitmq(proc)
2093
2094
2095 def test_ps_s3_push_amqp():
2096 """ test pushing to amqp endpoint s3 record format"""
2097 if skip_push_tests:
2098 return SkipTest("PubSub push tests don't run in teuthology")
2099 hostname = get_ip()
2100 proc = init_rabbitmq()
2101 if proc is None:
2102 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2103 master_zone, ps_zone = init_env()
2104 bucket_name = gen_bucket_name()
2105 topic_name = bucket_name+TOPIC_SUFFIX
2106
2107 # create topic
2108 exchange = 'ex1'
2109 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2110 task.start()
2111 topic_conf = PSTopic(ps_zone.conn, topic_name,
2112 endpoint='amqp://' + hostname,
2113 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
2114 result, status = topic_conf.set_config()
2115 assert_equal(status/100, 2)
2116 parsed_result = json.loads(result)
2117 topic_arn = parsed_result['arn']
2118 # create bucket on the first of the rados zones
2119 bucket = master_zone.create_bucket(bucket_name)
2120 # wait for sync
2121 zone_meta_checkpoint(ps_zone.zone)
2122 # create s3 notification
2123 notification_name = bucket_name + NOTIFICATION_SUFFIX
2124 topic_conf_list = [{'Id': notification_name,
2125 'TopicArn': topic_arn,
2126 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
2127 }]
2128 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
2129 _, status = s3_notification_conf.set_config()
2130 assert_equal(status/100, 2)
2131 # create objects in the bucket
2132 number_of_objects = 10
2133 for i in range(number_of_objects):
2134 key = bucket.new_key(str(i))
2135 key.set_contents_from_string('bar')
2136 # wait for sync
2137 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2138 # check amqp receiver
2139 keys = list(bucket.list())
2140 # TODO: use exact match
2141 receiver.verify_s3_events(keys, exact_match=False)
2142
2143 # delete objects from the bucket
2144 for key in bucket.list():
2145 key.delete()
2146 # wait for sync
2147 zone_meta_checkpoint(ps_zone.zone)
2148 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2149 # check amqp receiver
2150 # TODO: use exact match
2151 receiver.verify_s3_events(keys, deletions=True, exact_match=False)
2152
2153 # cleanup
2154 stop_amqp_receiver(receiver, task)
2155 s3_notification_conf.del_config()
2156 topic_conf.del_config()
2157 master_zone.delete_bucket(bucket_name)
2158 clean_rabbitmq(proc)
2159
2160
2161 def test_ps_delete_bucket():
2162 """ test notification status upon bucket deletion """
2163 master_zone, ps_zone = init_env()
2164 bucket_name = gen_bucket_name()
2165 # create bucket on the first of the rados zones
2166 bucket = master_zone.create_bucket(bucket_name)
2167 # wait for sync
2168 zone_meta_checkpoint(ps_zone.zone)
2169 topic_name = bucket_name + TOPIC_SUFFIX
2170 # create topic
2171 topic_name = bucket_name + TOPIC_SUFFIX
2172 topic_conf = PSTopic(ps_zone.conn, topic_name)
2173 response, status = topic_conf.set_config()
2174 assert_equal(status/100, 2)
2175 parsed_result = json.loads(response)
2176 topic_arn = parsed_result['arn']
2177 # create one s3 notification
2178 notification_name = bucket_name + NOTIFICATION_SUFFIX
2179 topic_conf_list = [{'Id': notification_name,
2180 'TopicArn': topic_arn,
2181 'Events': ['s3:ObjectCreated:*']
2182 }]
2183 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
2184 response, status = s3_notification_conf.set_config()
2185 assert_equal(status/100, 2)
2186
2187 # create non-s3 notification
2188 notification_conf = PSNotification(ps_zone.conn, bucket_name,
2189 topic_name)
2190 _, status = notification_conf.set_config()
2191 assert_equal(status/100, 2)
2192
2193 # create objects in the bucket
2194 number_of_objects = 10
2195 for i in range(number_of_objects):
2196 key = bucket.new_key(str(i))
2197 key.set_contents_from_string('bar')
2198 # wait for bucket sync
2199 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2200 keys = list(bucket.list())
2201 # delete objects from the bucket
2202 for key in bucket.list():
2203 key.delete()
2204 # wait for bucket sync
2205 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2206 # delete the bucket
2207 master_zone.delete_bucket(bucket_name)
2208 # wait for meta sync
2209 zone_meta_checkpoint(ps_zone.zone)
2210
2211 # get the events from the auto-generated subscription
2212 sub_conf = PSSubscription(ps_zone.conn, notification_name,
2213 topic_name)
2214 result, _ = sub_conf.get_events()
2215 records = json.loads(result)
2216 # TODO: use exact match
2217 verify_s3_records_by_elements(records, keys, exact_match=False)
2218
2219 # s3 notification is deleted with bucket
2220 _, status = s3_notification_conf.get_config(notification=notification_name)
2221 assert_equal(status, 404)
2222 # non-s3 notification is deleted with bucket
2223 _, status = notification_conf.get_config()
2224 assert_equal(status, 404)
2225 # cleanup
2226 sub_conf.del_config()
2227 topic_conf.del_config()
2228
2229
2230 def test_ps_missing_topic():
2231 """ test creating a subscription when no topic info exists"""
2232 master_zone, ps_zone = init_env()
2233 bucket_name = gen_bucket_name()
2234 topic_name = bucket_name+TOPIC_SUFFIX
2235
2236 # create bucket on the first of the rados zones
2237 master_zone.create_bucket(bucket_name)
2238 # wait for sync
2239 zone_meta_checkpoint(ps_zone.zone)
2240 # create s3 notification
2241 notification_name = bucket_name + NOTIFICATION_SUFFIX
2242 topic_arn = 'arn:aws:sns:::' + topic_name
2243 topic_conf_list = [{'Id': notification_name,
2244 'TopicArn': topic_arn,
2245 'Events': ['s3:ObjectCreated:*']
2246 }]
2247 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
2248 try:
2249 s3_notification_conf.set_config()
2250 except:
2251 log.info('missing topic is expected')
2252 else:
2253 assert 'missing topic is expected'
2254
2255 # cleanup
2256 master_zone.delete_bucket(bucket_name)
2257
2258
2259 def test_ps_s3_topic_update():
2260 """ test updating topic associated with a notification"""
2261 if skip_push_tests:
2262 return SkipTest("PubSub push tests don't run in teuthology")
2263 rabbit_proc = init_rabbitmq()
2264 if rabbit_proc is None:
2265 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2266 master_zone, ps_zone = init_env()
2267 bucket_name = gen_bucket_name()
2268 topic_name = bucket_name+TOPIC_SUFFIX
2269
2270 # create amqp topic
2271 hostname = get_ip()
2272 exchange = 'ex1'
2273 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2274 amqp_task.start()
2275 topic_conf = PSTopic(ps_zone.conn, topic_name,
2276 endpoint='amqp://' + hostname,
2277 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
2278 result, status = topic_conf.set_config()
2279 assert_equal(status/100, 2)
2280 parsed_result = json.loads(result)
2281 topic_arn = parsed_result['arn']
2282 # get topic
2283 result, _ = topic_conf.get_config()
2284 # verify topic content
2285 parsed_result = json.loads(result)
2286 assert_equal(parsed_result['topic']['name'], topic_name)
2287 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
2288
2289 # create http server
2290 port = random.randint(10000, 20000)
2291 # start an http server in a separate thread
2292 http_server = StreamingHTTPServer(hostname, port)
2293
2294 # create bucket on the first of the rados zones
2295 bucket = master_zone.create_bucket(bucket_name)
2296 # wait for sync
2297 zone_meta_checkpoint(ps_zone.zone)
2298 # create s3 notification
2299 notification_name = bucket_name + NOTIFICATION_SUFFIX
2300 topic_conf_list = [{'Id': notification_name,
2301 'TopicArn': topic_arn,
2302 'Events': ['s3:ObjectCreated:*']
2303 }]
2304 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
2305 _, status = s3_notification_conf.set_config()
2306 assert_equal(status/100, 2)
2307 # create objects in the bucket
2308 number_of_objects = 10
2309 for i in range(number_of_objects):
2310 key = bucket.new_key(str(i))
2311 key.set_contents_from_string('bar')
2312 # wait for sync
2313 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2314
2315 keys = list(bucket.list())
2316 # TODO: use exact match
2317 receiver.verify_s3_events(keys, exact_match=False)
2318
2319 # update the same topic with new endpoint
2320 topic_conf = PSTopic(ps_zone.conn, topic_name,
2321 endpoint='http://'+ hostname + ':' + str(port))
2322 _, status = topic_conf.set_config()
2323 assert_equal(status/100, 2)
2324 # get topic
2325 result, _ = topic_conf.get_config()
2326 # verify topic content
2327 parsed_result = json.loads(result)
2328 assert_equal(parsed_result['topic']['name'], topic_name)
2329 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
2330
2331 # delete current objects and create new objects in the bucket
2332 for key in bucket.list():
2333 key.delete()
2334 for i in range(number_of_objects):
2335 key = bucket.new_key(str(i+100))
2336 key.set_contents_from_string('bar')
2337 # wait for sync
2338 zone_meta_checkpoint(ps_zone.zone)
2339 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2340
2341 keys = list(bucket.list())
2342 # verify that notifications are still sent to amqp
2343 # TODO: use exact match
2344 receiver.verify_s3_events(keys, exact_match=False)
2345
2346 # update notification to update the endpoint from the topic
2347 topic_conf_list = [{'Id': notification_name,
2348 'TopicArn': topic_arn,
2349 'Events': ['s3:ObjectCreated:*']
2350 }]
2351 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
2352 _, status = s3_notification_conf.set_config()
2353 assert_equal(status/100, 2)
2354
2355 # delete current objects and create new objects in the bucket
2356 for key in bucket.list():
2357 key.delete()
2358 for i in range(number_of_objects):
2359 key = bucket.new_key(str(i+200))
2360 key.set_contents_from_string('bar')
2361 # wait for sync
2362 zone_meta_checkpoint(ps_zone.zone)
2363 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2364
2365 keys = list(bucket.list())
2366 # check that updates switched to http
2367 # TODO: use exact match
2368 http_server.verify_s3_events(keys, exact_match=False)
2369
2370 # cleanup
2371 # delete objects from the bucket
2372 stop_amqp_receiver(receiver, amqp_task)
2373 for key in bucket.list():
2374 key.delete()
2375 s3_notification_conf.del_config()
2376 topic_conf.del_config()
2377 master_zone.delete_bucket(bucket_name)
2378 http_server.close()
2379 clean_rabbitmq(rabbit_proc)
2380
2381
2382 def test_ps_s3_notification_update():
2383 """ test updating the topic of a notification"""
2384 if skip_push_tests:
2385 return SkipTest("PubSub push tests don't run in teuthology")
2386 hostname = get_ip()
2387 rabbit_proc = init_rabbitmq()
2388 if rabbit_proc is None:
2389 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2390
2391 master_zone, ps_zone = init_env()
2392 bucket_name = gen_bucket_name()
2393 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
2394 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
2395
2396 # create topics
2397 # start amqp receiver in a separate thread
2398 exchange = 'ex1'
2399 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
2400 amqp_task.start()
2401 # create random port for the http server
2402 http_port = random.randint(10000, 20000)
2403 # start an http server in a separate thread
2404 http_server = StreamingHTTPServer(hostname, http_port)
2405
2406 topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
2407 endpoint='amqp://' + hostname,
2408 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
2409 result, status = topic_conf1.set_config()
2410 parsed_result = json.loads(result)
2411 topic_arn1 = parsed_result['arn']
2412 assert_equal(status/100, 2)
2413 topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
2414 endpoint='http://'+hostname+':'+str(http_port))
2415 result, status = topic_conf2.set_config()
2416 parsed_result = json.loads(result)
2417 topic_arn2 = parsed_result['arn']
2418 assert_equal(status/100, 2)
2419
2420 # create bucket on the first of the rados zones
2421 bucket = master_zone.create_bucket(bucket_name)
2422 # wait for sync
2423 zone_meta_checkpoint(ps_zone.zone)
2424 # create s3 notification with topic1
2425 notification_name = bucket_name + NOTIFICATION_SUFFIX
2426 topic_conf_list = [{'Id': notification_name,
2427 'TopicArn': topic_arn1,
2428 'Events': ['s3:ObjectCreated:*']
2429 }]
2430 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
2431 _, status = s3_notification_conf.set_config()
2432 assert_equal(status/100, 2)
2433 # create objects in the bucket
2434 number_of_objects = 10
2435 for i in range(number_of_objects):
2436 key = bucket.new_key(str(i))
2437 key.set_contents_from_string('bar')
2438 # wait for sync
2439 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2440
2441 keys = list(bucket.list())
2442 # TODO: use exact match
2443 receiver.verify_s3_events(keys, exact_match=False);
2444
2445 # update notification to use topic2
2446 topic_conf_list = [{'Id': notification_name,
2447 'TopicArn': topic_arn2,
2448 'Events': ['s3:ObjectCreated:*']
2449 }]
2450 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
2451 _, status = s3_notification_conf.set_config()
2452 assert_equal(status/100, 2)
2453
2454 # delete current objects and create new objects in the bucket
2455 for key in bucket.list():
2456 key.delete()
2457 for i in range(number_of_objects):
2458 key = bucket.new_key(str(i+100))
2459 key.set_contents_from_string('bar')
2460 # wait for sync
2461 zone_meta_checkpoint(ps_zone.zone)
2462 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2463
2464 keys = list(bucket.list())
2465 # check that updates switched to http
2466 # TODO: use exact match
2467 http_server.verify_s3_events(keys, exact_match=False)
2468
2469 # cleanup
2470 # delete objects from the bucket
2471 stop_amqp_receiver(receiver, amqp_task)
2472 for key in bucket.list():
2473 key.delete()
2474 s3_notification_conf.del_config()
2475 topic_conf1.del_config()
2476 topic_conf2.del_config()
2477 master_zone.delete_bucket(bucket_name)
2478 http_server.close()
2479 clean_rabbitmq(rabbit_proc)
2480
2481
2482 def test_ps_s3_multiple_topics_notification():
2483 """ test notification creation with multiple topics"""
2484 if skip_push_tests:
2485 return SkipTest("PubSub push tests don't run in teuthology")
2486 hostname = get_ip()
2487 rabbit_proc = init_rabbitmq()
2488 if rabbit_proc is None:
2489 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2490
2491 master_zone, ps_zone = init_env()
2492 bucket_name = gen_bucket_name()
2493 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
2494 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
2495
2496 # create topics
2497 # start amqp receiver in a separate thread
2498 exchange = 'ex1'
2499 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
2500 amqp_task.start()
2501 # create random port for the http server
2502 http_port = random.randint(10000, 20000)
2503 # start an http server in a separate thread
2504 http_server = StreamingHTTPServer(hostname, http_port)
2505
2506 topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
2507 endpoint='amqp://' + hostname,
2508 endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
2509 result, status = topic_conf1.set_config()
2510 parsed_result = json.loads(result)
2511 topic_arn1 = parsed_result['arn']
2512 assert_equal(status/100, 2)
2513 topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
2514 endpoint='http://'+hostname+':'+str(http_port))
2515 result, status = topic_conf2.set_config()
2516 parsed_result = json.loads(result)
2517 topic_arn2 = parsed_result['arn']
2518 assert_equal(status/100, 2)
2519
2520 # create bucket on the first of the rados zones
2521 bucket = master_zone.create_bucket(bucket_name)
2522 # wait for sync
2523 zone_meta_checkpoint(ps_zone.zone)
2524 # create s3 notification
2525 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
2526 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
2527 topic_conf_list = [
2528 {
2529 'Id': notification_name1,
2530 'TopicArn': topic_arn1,
2531 'Events': ['s3:ObjectCreated:*']
2532 },
2533 {
2534 'Id': notification_name2,
2535 'TopicArn': topic_arn2,
2536 'Events': ['s3:ObjectCreated:*']
2537 }]
2538 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
2539 _, status = s3_notification_conf.set_config()
2540 assert_equal(status/100, 2)
2541 result, _ = s3_notification_conf.get_config()
2542 assert_equal(len(result['TopicConfigurations']), 2)
2543 assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1)
2544 assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
2545
2546 # get auto-generated subscriptions
2547 sub_conf1 = PSSubscription(ps_zone.conn, notification_name1,
2548 topic_name1)
2549 _, status = sub_conf1.get_config()
2550 assert_equal(status/100, 2)
2551 sub_conf2 = PSSubscription(ps_zone.conn, notification_name2,
2552 topic_name2)
2553 _, status = sub_conf2.get_config()
2554 assert_equal(status/100, 2)
2555
2556 # create objects in the bucket
2557 number_of_objects = 10
2558 for i in range(number_of_objects):
2559 key = bucket.new_key(str(i))
2560 key.set_contents_from_string('bar')
2561 # wait for sync
2562 zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
2563
2564 # get the events from both of the subscription
2565 result, _ = sub_conf1.get_events()
2566 records = json.loads(result)
2567 for record in records['Records']:
2568 log.debug(record)
2569 keys = list(bucket.list())
2570 # TODO: use exact match
2571 verify_s3_records_by_elements(records, keys, exact_match=False)
2572 receiver.verify_s3_events(keys, exact_match=False)
2573
2574 result, _ = sub_conf2.get_events()
2575 parsed_result = json.loads(result)
2576 for record in parsed_result['Records']:
2577 log.debug(record)
2578 keys = list(bucket.list())
2579 # TODO: use exact match
2580 verify_s3_records_by_elements(records, keys, exact_match=False)
2581 http_server.verify_s3_events(keys, exact_match=False)
2582
2583 # cleanup
2584 stop_amqp_receiver(receiver, amqp_task)
2585 s3_notification_conf.del_config()
2586 topic_conf1.del_config()
2587 topic_conf2.del_config()
2588 # delete objects from the bucket
2589 for key in bucket.list():
2590 key.delete()
2591 master_zone.delete_bucket(bucket_name)
2592 http_server.close()
2593 clean_rabbitmq(rabbit_proc)