]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/rgw/bucket_notification/test_bn.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / test / rgw / bucket_notification / test_bn.py
CommitLineData
20effc67
TL
1import logging
2import json
3import tempfile
4import random
5import threading
6import subprocess
7import socket
8import time
9import os
10import string
11import boto
1e59de90 12from botocore.exceptions import ClientError
20effc67
TL
13from http import server as http_server
14from random import randint
15import hashlib
16from nose.plugins.attrib import attr
17import boto3
18import datetime
19from cloudevents.http import from_http
20from dateutil import parser
21
22from boto.s3.connection import S3Connection
23
24from . import(
25 get_config_host,
26 get_config_port,
27 get_access_key,
28 get_secret_key
29 )
30
31from .api import PSTopicS3, \
32 PSNotificationS3, \
20effc67
TL
33 delete_all_objects, \
34 put_object_tagging, \
35 admin
36
37from nose import SkipTest
38from nose.tools import assert_not_equal, assert_equal, assert_in
39import boto.s3.tagging
40
41# configure logging for the tests module
42log = logging.getLogger(__name__)
43
20effc67
TL
44TOPIC_SUFFIX = "_topic"
45NOTIFICATION_SUFFIX = "_notif"
46
47
48num_buckets = 0
49run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
50
51def gen_bucket_name():
52 global num_buckets
53
54 num_buckets += 1
55 return run_prefix + '-' + str(num_buckets)
56
57
58def set_contents_from_string(key, content):
59 try:
60 key.set_contents_from_string(content)
61 except Exception as e:
62 print('Error: ' + str(e))
63
64
65class HTTPPostHandler(http_server.BaseHTTPRequestHandler):
66 """HTTP POST hanler class storing the received events in its http server"""
67 def do_POST(self):
68 """implementation of POST handler"""
69 content_length = int(self.headers['Content-Length'])
70 body = self.rfile.read(content_length)
71 if self.server.cloudevents:
72 event = from_http(self.headers, body)
73 record = json.loads(body)['Records'][0]
74 assert_equal(event['specversion'], '1.0')
75 assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'])
76 assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'])
77 assert_equal(event['type'], 'com.amazonaws.' + record['eventName'])
78 assert_equal(event['datacontenttype'], 'application/json')
79 assert_equal(event['subject'], record['s3']['object']['key'])
80 assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
81 log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
82 self.server.append(json.loads(body))
83 if self.headers.get('Expect') == '100-continue':
84 self.send_response(100)
85 else:
86 self.send_response(200)
87 if self.server.delay > 0:
88 time.sleep(self.server.delay)
89 self.end_headers()
90
91
92class HTTPServerWithEvents(http_server.HTTPServer):
93 """HTTP server used by the handler to store events"""
94 def __init__(self, addr, handler, worker_id, delay=0, cloudevents=False):
95 http_server.HTTPServer.__init__(self, addr, handler, False)
96 self.worker_id = worker_id
97 self.events = []
98 self.delay = delay
99 self.cloudevents = cloudevents
100
101 def append(self, event):
102 self.events.append(event)
103
104class HTTPServerThread(threading.Thread):
105 """thread for running the HTTP server. reusing the same socket for all threads"""
106 def __init__(self, i, sock, addr, delay=0, cloudevents=False):
107 threading.Thread.__init__(self)
108 self.i = i
109 self.daemon = True
110 self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay, cloudevents)
111 self.httpd.socket = sock
112 # prevent the HTTP server from re-binding every handler
113 self.httpd.server_bind = self.server_close = lambda self: None
114 self.start()
115
116 def run(self):
117 try:
118 log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
119 self.httpd.serve_forever()
120 log.info('HTTP Server (%d) ended', self.i)
121 except Exception as error:
122 # could happen if the server r/w to a closing socket during shutdown
123 log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
124
125 def close(self):
126 self.httpd.shutdown()
127
128 def get_events(self):
129 return self.httpd.events
130
131 def reset_events(self):
132 self.httpd.events = []
133
134class StreamingHTTPServer:
135 """multi-threaded http server class also holding list of events received into the handler
136 each thread has its own server, and all servers share the same socket"""
137 def __init__(self, host, port, num_workers=100, delay=0, cloudevents=False):
138 addr = (host, port)
139 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
140 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
141 self.sock.bind(addr)
142 self.sock.listen(num_workers)
143 self.workers = [HTTPServerThread(i, self.sock, addr, delay, cloudevents) for i in range(num_workers)]
144
145 def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
146 """verify stored s3 records agains a list of keys"""
147 events = []
148 for worker in self.workers:
149 events += worker.get_events()
150 worker.reset_events()
151 verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
152
153 def verify_events(self, keys, exact_match=False, deletions=False):
154 """verify stored events agains a list of keys"""
155 events = []
156 for worker in self.workers:
157 events += worker.get_events()
158 worker.reset_events()
159 verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
160
161 def get_and_reset_events(self):
162 events = []
163 for worker in self.workers:
164 events += worker.get_events()
165 worker.reset_events()
166 return events
167
168 def close(self):
169 """close all workers in the http server and wait for it to finish"""
170 # make sure that the shared socket is closed
171 # this is needed in case that one of the threads is blocked on the socket
172 self.sock.shutdown(socket.SHUT_RDWR)
173 self.sock.close()
174 # wait for server threads to finish
175 for worker in self.workers:
176 worker.close()
177 worker.join()
178
179# AMQP endpoint functions
180
181class AMQPReceiver(object):
182 """class for receiving and storing messages on a topic from the AMQP broker"""
183 def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None):
184 import pika
185 import ssl
186
187 if ca_location:
188 ssl_context = ssl.create_default_context()
189 ssl_context.load_verify_locations(cafile=ca_location)
190 ssl_options = pika.SSLOptions(ssl_context)
191 rabbitmq_port = 5671
192 else:
193 rabbitmq_port = 5672
194 ssl_options = None
195
196 if external_endpoint_address:
1e59de90
TL
197 if ssl_options:
198 # this is currently not working due to: https://github.com/pika/pika/issues/1192
199 params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
200 else:
201 params = pika.URLParameters(external_endpoint_address)
20effc67
TL
202 else:
203 hostname = get_ip()
204 params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options)
1e59de90 205
20effc67
TL
206 remaining_retries = 10
207 while remaining_retries > 0:
208 try:
209 connection = pika.BlockingConnection(params)
210 break
211 except Exception as error:
212 remaining_retries -= 1
213 print('failed to connect to rabbitmq (remaining retries '
214 + str(remaining_retries) + '): ' + str(error))
215 time.sleep(1)
216
217 if remaining_retries == 0:
218 raise Exception('failed to connect to rabbitmq - no retries left')
219
220 self.channel = connection.channel()
221 self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
222 result = self.channel.queue_declare('', exclusive=True)
223 queue_name = result.method.queue
224 self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic)
225 self.channel.basic_consume(queue=queue_name,
226 on_message_callback=self.on_message,
227 auto_ack=True)
228 self.events = []
229 self.topic = topic
230
231 def on_message(self, ch, method, properties, body):
232 """callback invoked when a new message arrive on the topic"""
233 log.info('AMQP received event for topic %s:\n %s', self.topic, body)
234 self.events.append(json.loads(body))
235
236 # TODO create a base class for the AMQP and HTTP cases
237 def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
238 """verify stored s3 records agains a list of keys"""
239 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
240 self.events = []
241
242 def verify_events(self, keys, exact_match=False, deletions=False):
243 """verify stored events agains a list of keys"""
244 verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
245 self.events = []
246
247 def get_and_reset_events(self):
248 tmp = self.events
249 self.events = []
250 return tmp
251
252
253def amqp_receiver_thread_runner(receiver):
254 """main thread function for the amqp receiver"""
255 try:
256 log.info('AMQP receiver started')
257 receiver.channel.start_consuming()
258 log.info('AMQP receiver ended')
259 except Exception as error:
260 log.info('AMQP receiver ended unexpectedly: %s', str(error))
261
262
263def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None):
264 """create amqp receiver and thread"""
265 receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location)
266 task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
267 task.daemon = True
268 return task, receiver
269
270def stop_amqp_receiver(receiver, task):
271 """stop the receiver thread and wait for it to finis"""
272 try:
273 receiver.channel.stop_consuming()
274 log.info('stopping AMQP receiver')
275 except Exception as error:
276 log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
277 task.join(5)
278
279
280def init_rabbitmq():
281 """ start a rabbitmq broker """
282 hostname = get_ip()
283 try:
284 # first try to stop any existing process
285 subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
286 time.sleep(5)
287 proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server'])
288 except Exception as error:
289 log.info('failed to execute rabbitmq-server: %s', str(error))
290 print('failed to execute rabbitmq-server: %s' % str(error))
291 return None
292 # TODO add rabbitmq checkpoint instead of sleep
293 time.sleep(5)
294 return proc
295
296
297def clean_rabbitmq(proc):
298 """ stop the rabbitmq broker """
299 try:
300 subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
301 time.sleep(5)
302 proc.terminate()
303 except:
304 log.info('rabbitmq server already terminated')
305
306
307def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
308 """ verify there is at least one event per element """
309 err = ''
310 for key in keys:
311 key_found = False
312 if type(events) is list:
313 for event_list in events:
314 if key_found:
315 break
316 for event in event_list['events']:
317 if event['info']['bucket']['name'] == key.bucket.name and \
318 event['info']['key']['name'] == key.name:
319 if deletions and event['event'] == 'OBJECT_DELETE':
320 key_found = True
321 break
322 elif not deletions and event['event'] == 'OBJECT_CREATE':
323 key_found = True
324 break
325 else:
326 for event in events['events']:
327 if event['info']['bucket']['name'] == key.bucket.name and \
328 event['info']['key']['name'] == key.name:
329 if deletions and event['event'] == 'OBJECT_DELETE':
330 key_found = True
331 break
332 elif not deletions and event['event'] == 'OBJECT_CREATE':
333 key_found = True
334 break
335
336 if not key_found:
337 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
338 log.error(events)
339 assert False, err
340
341 if not len(events) == len(keys):
342 err = 'superfluous events are found'
343 log.debug(err)
344 if exact_match:
345 log.error(events)
346 assert False, err
347
aee94f69
TL
348META_PREFIX = 'x-amz-meta-'
349
20effc67
TL
350def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
351 """ verify there is at least one record per element """
352 err = ''
353 for key in keys:
354 key_found = False
355 object_size = 0
356 if type(records) is list:
357 for record_list in records:
358 if key_found:
359 break
360 for record in record_list['Records']:
361 assert_in('eTag', record['s3']['object'])
362 if record['s3']['bucket']['name'] == key.bucket.name and \
363 record['s3']['object']['key'] == key.name:
364 # Assertion Error needs to be fixed
365 #assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
366 if etags:
367 assert_in(key.etag[1:-1], etags)
aee94f69
TL
368 if len(record['s3']['object']['metadata']) > 0:
369 for meta in record['s3']['object']['metadata']:
370 assert(meta['key'].startswith(META_PREFIX))
20effc67
TL
371 if deletions and record['eventName'].startswith('ObjectRemoved'):
372 key_found = True
373 object_size = record['s3']['object']['size']
374 break
375 elif not deletions and record['eventName'].startswith('ObjectCreated'):
376 key_found = True
377 object_size = record['s3']['object']['size']
378 break
379 else:
380 for record in records['Records']:
381 assert_in('eTag', record['s3']['object'])
382 if record['s3']['bucket']['name'] == key.bucket.name and \
383 record['s3']['object']['key'] == key.name:
384 assert_equal(key.etag, record['s3']['object']['eTag'])
385 if etags:
386 assert_in(key.etag[1:-1], etags)
aee94f69
TL
387 if len(record['s3']['object']['metadata']) > 0:
388 for meta in record['s3']['object']['metadata']:
389 assert(meta['key'].startswith(META_PREFIX))
20effc67
TL
390 if deletions and record['eventName'].startswith('ObjectRemoved'):
391 key_found = True
392 object_size = record['s3']['object']['size']
393 break
394 elif not deletions and record['eventName'].startswith('ObjectCreated'):
395 key_found = True
396 object_size = record['s3']['object']['size']
397 break
398
399 if not key_found:
400 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
401 assert False, err
402 elif expected_sizes:
403 assert_equal(object_size, expected_sizes.get(key.name))
404
405 if not len(records) == len(keys):
406 err = 'superfluous records are found'
407 log.warning(err)
408 if exact_match:
409 for record_list in records:
410 for record in record_list['Records']:
411 log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
412 assert False, err
413
414
415# Kafka endpoint functions
416
417kafka_server = 'localhost'
418
419class KafkaReceiver(object):
420 """class for receiving and storing messages on a topic from the kafka broker"""
421 def __init__(self, topic, security_type):
422 from kafka import KafkaConsumer
423 remaining_retries = 10
424 port = 9092
425 if security_type != 'PLAINTEXT':
426 security_type = 'SSL'
427 port = 9093
428 while remaining_retries > 0:
429 try:
1e59de90
TL
430 self.consumer = KafkaConsumer(topic,
431 bootstrap_servers = kafka_server+':'+str(port),
432 security_protocol=security_type,
433 consumer_timeout_ms=16000)
20effc67
TL
434 print('Kafka consumer created on topic: '+topic)
435 break
436 except Exception as error:
437 remaining_retries -= 1
438 print('failed to connect to kafka (remaining retries '
439 + str(remaining_retries) + '): ' + str(error))
440 time.sleep(1)
441
442 if remaining_retries == 0:
443 raise Exception('failed to connect to kafka - no retries left')
444
445 self.events = []
446 self.topic = topic
447 self.stop = False
448
449 def verify_s3_events(self, keys, exact_match=False, deletions=False, etags=[]):
450 """verify stored s3 records agains a list of keys"""
451 verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags)
452 self.events = []
453
454def kafka_receiver_thread_runner(receiver):
455 """main thread function for the kafka receiver"""
456 try:
457 log.info('Kafka receiver started')
458 print('Kafka receiver started')
459 while not receiver.stop:
460 for msg in receiver.consumer:
461 receiver.events.append(json.loads(msg.value))
1e59de90 462 time.sleep(0.1)
20effc67
TL
463 log.info('Kafka receiver ended')
464 print('Kafka receiver ended')
465 except Exception as error:
466 log.info('Kafka receiver ended unexpectedly: %s', str(error))
467 print('Kafka receiver ended unexpectedly: ' + str(error))
468
469
470def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
471 """create kafka receiver and thread"""
472 receiver = KafkaReceiver(topic, security_type)
473 task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
474 task.daemon = True
475 return task, receiver
476
477def stop_kafka_receiver(receiver, task):
478 """stop the receiver thread and wait for it to finis"""
479 receiver.stop = True
480 task.join(1)
481 try:
1e59de90 482 receiver.consumer.unsubscribe()
20effc67
TL
483 receiver.consumer.close()
484 except Exception as error:
485 log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
486
487
488def get_ip():
489 return 'localhost'
490
1e59de90 491
20effc67
TL
492def get_ip_http():
493 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
494 try:
495 # address should not be reachable
496 s.connect(('10.255.255.255', 1))
497 ip = s.getsockname()[0]
498 finally:
499 s.close()
500 return ip
501
1e59de90 502
20effc67
TL
503def connection():
504 hostname = get_config_host()
505 port_no = get_config_port()
506 vstart_access_key = get_access_key()
507 vstart_secret_key = get_secret_key()
508
509 conn = S3Connection(aws_access_key_id=vstart_access_key,
510 aws_secret_access_key=vstart_secret_key,
511 is_secure=False, port=port_no, host=hostname,
512 calling_format='boto.s3.connection.OrdinaryCallingFormat')
513
514 return conn
515
1e59de90 516
20effc67 517def connection2():
1e59de90
TL
518 hostname = get_config_host()
519 port_no = 8001
520 vstart_access_key = get_access_key()
521 vstart_secret_key = get_secret_key()
20effc67
TL
522
523 conn = S3Connection(aws_access_key_id=vstart_access_key,
1e59de90
TL
524 aws_secret_access_key=vstart_secret_key,
525 is_secure=False, port=port_no, host=hostname,
20effc67
TL
526 calling_format='boto.s3.connection.OrdinaryCallingFormat')
527
528 return conn
529
530
1e59de90
TL
531def another_user(tenant=None):
532 access_key = str(time.time())
533 secret_key = str(time.time())
534 uid = 'superman' + str(time.time())
535 if tenant:
536 _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
537 else:
538 _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
539
540 assert_equal(result, 0)
541 conn = S3Connection(aws_access_key_id=access_key,
542 aws_secret_access_key=secret_key,
543 is_secure=False, port=get_config_port(), host=get_config_host(),
544 calling_format='boto.s3.connection.OrdinaryCallingFormat')
545 return conn
546
20effc67
TL
547##############
548# bucket notifications tests
549##############
550
551
1e59de90 552@attr('basic_test')
20effc67
TL
553def test_ps_s3_topic_on_master():
554 """ test s3 topics set/get/delete on master """
1e59de90
TL
555 tenant = 'kaboom'
556 conn = another_user(tenant)
20effc67
TL
557 zonegroup = 'default'
558 bucket_name = gen_bucket_name()
20effc67
TL
559 topic_name = bucket_name + TOPIC_SUFFIX
560
20effc67
TL
561 # create s3 topics
562 endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
563 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
564 topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
1e59de90
TL
565 # clean all topics
566 try:
567 result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
568 topics = []
569 if result is not None:
570 topics = result['member']
571 for topic in topics:
572 topic_conf1.del_config(topic_arn=topic['TopicArn'])
573 except Exception as err:
574 print('failed to do topic cleanup: ' + str(err))
575
20effc67
TL
576 topic_arn = topic_conf1.set_config()
577 assert_equal(topic_arn,
1e59de90 578 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
20effc67
TL
579
580 endpoint_address = 'http://127.0.0.1:9001'
581 endpoint_args = 'push-endpoint='+endpoint_address
582 topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
583 topic_arn = topic_conf2.set_config()
584 assert_equal(topic_arn,
1e59de90 585 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
20effc67
TL
586 endpoint_address = 'http://127.0.0.1:9002'
587 endpoint_args = 'push-endpoint='+endpoint_address
588 topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
589 topic_arn = topic_conf3.set_config()
590 assert_equal(topic_arn,
1e59de90 591 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
20effc67
TL
592
593 # get topic 3
594 result, status = topic_conf3.get_config()
595 assert_equal(status, 200)
596 assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
597 assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
598
599 # Note that endpoint args may be ordered differently in the result
600 # delete topic 1
601 result = topic_conf1.del_config()
602 assert_equal(status, 200)
603
604 # try to get a deleted topic
605 _, status = topic_conf1.get_config()
606 assert_equal(status, 404)
607
608 # get the remaining 2 topics
609 result, status = topic_conf1.get_list()
610 assert_equal(status, 200)
611 assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
612
613 # delete topics
614 result = topic_conf2.del_config()
1e59de90 615 assert_equal(status, 200)
20effc67 616 result = topic_conf3.del_config()
1e59de90 617 assert_equal(status, 200)
20effc67
TL
618
619 # get topic list, make sure it is empty
620 result, status = topic_conf1.get_list()
621 assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
622
623
1e59de90
TL
624@attr('basic_test')
625def test_ps_s3_topic_admin_on_master():
626 """ test s3 topics set/get/delete on master """
627 tenant = 'kaboom'
628 conn = another_user(tenant)
629 zonegroup = 'default'
630 bucket_name = gen_bucket_name()
631 topic_name = bucket_name + TOPIC_SUFFIX
632
633 # create s3 topics
634 endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
635 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
636 topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
637 # clean all topics
638 try:
639 result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
640 topics = []
641 if result is not None:
642 topics = result['member']
643 for topic in topics:
644 topic_conf1.del_config(topic_arn=topic['TopicArn'])
645 except Exception as err:
646 print('failed to do topic cleanup: ' + str(err))
647
648 topic_arn1 = topic_conf1.set_config()
649 assert_equal(topic_arn1,
650 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
651
652 endpoint_address = 'http://127.0.0.1:9001'
653 endpoint_args = 'push-endpoint='+endpoint_address
654 topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
655 topic_arn2 = topic_conf2.set_config()
656 assert_equal(topic_arn2,
657 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
658 endpoint_address = 'http://127.0.0.1:9002'
659 endpoint_args = 'push-endpoint='+endpoint_address
660 topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
661 topic_arn3 = topic_conf3.set_config()
662 assert_equal(topic_arn3,
663 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
664
665 # get topic 3 via commandline
666 result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
667 parsed_result = json.loads(result[0])
668 assert_equal(parsed_result['arn'], topic_arn3)
669
670 # delete topic 3
671 _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant])
672 assert_equal(result, 0)
673
674 # try to get a deleted topic
675 _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
676 print('"topic not found" error is expected')
677 assert_equal(result, 2)
678
679 # get the remaining 2 topics
680 result = admin(['topic', 'list', '--tenant', tenant])
681 parsed_result = json.loads(result[0])
682 assert_equal(len(parsed_result['topics']), 2)
683
684 # delete topics
685 _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant])
686 assert_equal(result, 0)
687 _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant])
688 assert_equal(result, 0)
689
690 # get topic list, make sure it is empty
691 result = admin(['topic', 'list', '--tenant', tenant])
692 parsed_result = json.loads(result[0])
693 assert_equal(len(parsed_result['topics']), 0)
694
695
aee94f69
TL
696@attr('basic_test')
697def test_ps_s3_notification_configuration_admin_on_master():
698 """ test s3 notification list/get/delete on master """
699 conn = connection()
700 zonegroup = 'default'
701 bucket_name = gen_bucket_name()
702 bucket = conn.create_bucket(bucket_name)
703 topic_name = bucket_name + TOPIC_SUFFIX
704
705 # create s3 topics
706 endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
707 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
708 topic_conf = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
709 # clean all topics
710 try:
711 result = topic_conf.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
712 topics = []
713 if result is not None:
714 topics = result['member']
715 for topic in topics:
716 topic_conf.del_config(topic_arn=topic['TopicArn'])
717 except Exception as err:
718 print('failed to do topic cleanup: ' + str(err))
719
720 topic_arn = topic_conf.set_config()
721 assert_equal(topic_arn,
722 'arn:aws:sns:' + zonegroup + '::' + topic_name + '_1')
723 # create s3 notification
724 notification_name = bucket_name + NOTIFICATION_SUFFIX
725 topic_conf_list = [{'Id': notification_name+'_1',
726 'TopicArn': topic_arn,
727 'Events': ['s3:ObjectCreated:*']
728 },
729 {'Id': notification_name+'_2',
730 'TopicArn': topic_arn,
731 'Events': ['s3:ObjectRemoved:*']
732 },
733 {'Id': notification_name+'_3',
734 'TopicArn': topic_arn,
735 'Events': []
736 }]
737 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
738 _, status = s3_notification_conf.set_config()
739 assert_equal(status/100, 2)
740
741 # list notification
742 result = admin(['notification', 'list', '--bucket', bucket_name])
743 parsed_result = json.loads(result[0])
744 assert_equal(len(parsed_result['notifications']), 3)
745 assert_equal(result[1], 0)
746
747 # get notification 1
748 result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'])
749 parsed_result = json.loads(result[0])
750 assert_equal(parsed_result['Id'], notification_name+'_1')
751 assert_equal(result[1], 0)
752
753 # remove notification 3
754 _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'])
755 assert_equal(result, 0)
756
757 # list notification
758 result = admin(['notification', 'list', '--bucket', bucket_name])
759 parsed_result = json.loads(result[0])
760 assert_equal(len(parsed_result['notifications']), 2)
761 assert_equal(result[1], 0)
762
763 # delete notifications
764 _, result = admin(['notification', 'rm', '--bucket', bucket_name])
765 assert_equal(result, 0)
766
767 # list notification, make sure it is empty
768 result = admin(['notification', 'list', '--bucket', bucket_name])
769 parsed_result = json.loads(result[0])
770 assert_equal(len(parsed_result['notifications']), 0)
771 assert_equal(result[1], 0)
772
773
20effc67
TL
774@attr('modification_required')
775def test_ps_s3_topic_with_secret_on_master():
776 """ test s3 topics with secret set/get/delete on master """
777 return SkipTest('secure connection is needed to test topic with secrets')
778
779 conn = connection1()
780 if conn.secure_conn is None:
781 return SkipTest('secure connection is needed to test topic with secrets')
782
783 zonegroup = 'default'
784 bucket_name = gen_bucket_name()
785 topic_name = bucket_name + TOPIC_SUFFIX
786
787 # clean all topics
788 delete_all_s3_topics(conn, zonegroup)
789
790 # create s3 topics
791 endpoint_address = 'amqp://user:password@127.0.0.1:7001'
792 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
793 bad_topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
794 try:
795 result = bad_topic_conf.set_config()
796 except Exception as err:
797 print('Error is expected: ' + str(err))
798 else:
799 assert False, 'user password configuration set allowed only over HTTPS'
800 topic_conf = PSTopicS3(conn.secure_conn, topic_name, zonegroup, endpoint_args=endpoint_args)
801 topic_arn = topic_conf.set_config()
802
803 assert_equal(topic_arn,
804 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name)
805
806 _, status = bad_topic_conf.get_config()
807 assert_equal(status/100, 4)
808
809 # get topic
810 result, status = topic_conf.get_config()
811 assert_equal(status, 200)
812 assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
813 assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
814
815 _, status = bad_topic_conf.get_config()
816 assert_equal(status/100, 4)
817
818 _, status = topic_conf.get_list()
819 assert_equal(status/100, 2)
820
821 # delete topics
822 result = topic_conf.del_config()
823
824
825@attr('basic_test')
826def test_ps_s3_notification_on_master():
827 """ test s3 notification set/get/delete on master """
828 conn = connection()
829 zonegroup = 'default'
830 bucket_name = gen_bucket_name()
831 # create bucket
832 bucket = conn.create_bucket(bucket_name)
833 topic_name = bucket_name + TOPIC_SUFFIX
834 # create s3 topic
835 endpoint_address = 'amqp://127.0.0.1:7001'
836 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
837 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
838 topic_arn = topic_conf.set_config()
839 # create s3 notification
840 notification_name = bucket_name + NOTIFICATION_SUFFIX
841 topic_conf_list = [{'Id': notification_name+'_1',
842 'TopicArn': topic_arn,
843 'Events': ['s3:ObjectCreated:*']
844 },
845 {'Id': notification_name+'_2',
846 'TopicArn': topic_arn,
847 'Events': ['s3:ObjectRemoved:*']
848 },
849 {'Id': notification_name+'_3',
850 'TopicArn': topic_arn,
851 'Events': []
852 }]
853 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
854 _, status = s3_notification_conf.set_config()
855 assert_equal(status/100, 2)
856
857 # get notifications on a bucket
858 response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
859 assert_equal(status/100, 2)
860 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
861
862 # delete specific notifications
863 _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
864 assert_equal(status/100, 2)
865
866 # get the remaining 2 notifications on a bucket
867 response, status = s3_notification_conf.get_config()
868 assert_equal(status/100, 2)
869 assert_equal(len(response['TopicConfigurations']), 2)
870 assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
871 assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
872
873 # delete remaining notifications
874 _, status = s3_notification_conf.del_config()
875 assert_equal(status/100, 2)
876
877 # make sure that the notifications are now deleted
878 _, status = s3_notification_conf.get_config()
879
880 # cleanup
881 topic_conf.del_config()
882 # delete the bucket
883 conn.delete_bucket(bucket_name)
884
885
886@attr('basic_test')
887def test_ps_s3_notification_on_master_empty_config():
888 """ test s3 notification set/get/delete on master with empty config """
889 hostname = get_ip()
890
891 conn = connection()
892
893 zonegroup = 'default'
894
895 # create bucket
896 bucket_name = gen_bucket_name()
897 bucket = conn.create_bucket(bucket_name)
898 topic_name = bucket_name + TOPIC_SUFFIX
899
900 # create s3 topic
901 endpoint_address = 'amqp://127.0.0.1:7001'
902 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
903 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
904 topic_arn = topic_conf.set_config()
905
906 # create s3 notification
907 notification_name = bucket_name + NOTIFICATION_SUFFIX
908 topic_conf_list = [{'Id': notification_name+'_1',
909 'TopicArn': topic_arn,
910 'Events': []
911 }]
912 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
913 _, status = s3_notification_conf.set_config()
914 assert_equal(status/100, 2)
915
916 # get notifications on a bucket
917 response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
918 assert_equal(status/100, 2)
919 assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
920
921 # create s3 notification again with empty configuration to check if it deletes or not
922 topic_conf_list = []
923
924 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
925 _, status = s3_notification_conf.set_config()
926 assert_equal(status/100, 2)
927
928 # make sure that the notification is now deleted
929 response, status = s3_notification_conf.get_config()
930 try:
931 check = response['NotificationConfiguration']
932 except KeyError as e:
933 assert_equal(status/100, 2)
934 else:
935 assert False
936
937 # cleanup
938 topic_conf.del_config()
939 # delete the bucket
940 conn.delete_bucket(bucket_name)
941
942
943@attr('amqp_test')
944def test_ps_s3_notification_filter_on_master():
945 """ test s3 notification filter on master """
946
947 hostname = get_ip()
948
949 conn = connection()
950 ps_zone = conn
951
952 zonegroup = 'default'
953
954 # create bucket
955 bucket_name = gen_bucket_name()
956 bucket = conn.create_bucket(bucket_name)
957 topic_name = bucket_name + TOPIC_SUFFIX
958
959 # start amqp receivers
960 exchange = 'ex1'
961 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
962 task.start()
963
964 # create s3 topic
965 endpoint_address = 'amqp://' + hostname
966 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
967
968 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
969 topic_arn = topic_conf.set_config()
970
971 # create s3 notification
972 notification_name = bucket_name + NOTIFICATION_SUFFIX
973 topic_conf_list = [{'Id': notification_name+'_1',
974 'TopicArn': topic_arn,
975 'Events': ['s3:ObjectCreated:*'],
976 'Filter': {
977 'Key': {
978 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
979 }
980 }
981 },
982 {'Id': notification_name+'_2',
983 'TopicArn': topic_arn,
984 'Events': ['s3:ObjectCreated:*'],
985 'Filter': {
986 'Key': {
987 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
988 {'Name': 'suffix', 'Value': 'log'}]
989 }
990 }
991 },
992 {'Id': notification_name+'_3',
993 'TopicArn': topic_arn,
994 'Events': [],
995 'Filter': {
996 'Key': {
997 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
998 }
999 }
1000 }]
1001
1002 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1003 result, status = s3_notification_conf.set_config()
1004 assert_equal(status/100, 2)
1005
1006 topic_conf_list = [{'Id': notification_name+'_4',
1007 'TopicArn': topic_arn,
1008 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
1009 'Filter': {
1010 'Metadata': {
1011 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
1012 {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
1013 },
1014 'Key': {
1015 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
1016 }
1017 }
1018 }]
1019
1020 try:
1021 s3_notification_conf4 = PSNotificationS3(conn, bucket_name, topic_conf_list)
1022 _, status = s3_notification_conf4.set_config()
1023 assert_equal(status/100, 2)
1024 skip_notif4 = False
1025 except Exception as error:
1026 print('note: metadata filter is not supported by boto3 - skipping test')
1027 skip_notif4 = True
1028
1029
1030 # get all notifications
1031 result, status = s3_notification_conf.get_config()
1032 assert_equal(status/100, 2)
1033 for conf in result['TopicConfigurations']:
1034 filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
1035 assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
1036
1037 if not skip_notif4:
1038 result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
1039 assert_equal(status/100, 2)
1040 filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
1041 assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
1042
1043 expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
1044 expected_in2 = ['world1.log', 'world2log', 'world3.log']
1045 expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt']
1046 expected_in4 = ['foo', 'bar', 'hello', 'world']
1047 filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
1048 filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld']
1049 # create objects in bucket
1050 for key_name in expected_in1:
1051 key = bucket.new_key(key_name)
1052 key.set_contents_from_string('bar')
1053 for key_name in expected_in2:
1054 key = bucket.new_key(key_name)
1055 key.set_contents_from_string('bar')
1056 for key_name in expected_in3:
1057 key = bucket.new_key(key_name)
1058 key.set_contents_from_string('bar')
1059 if not skip_notif4:
1060 for key_name in expected_in4:
1061 key = bucket.new_key(key_name)
1062 key.set_metadata('foo', 'bar')
1063 key.set_metadata('hello', 'world')
1064 key.set_metadata('goodbye', 'cruel world')
1065 key.set_contents_from_string('bar')
1066 for key_name in filtered:
1067 key = bucket.new_key(key_name)
1068 key.set_contents_from_string('bar')
1069 for key_name in filtered_with_attr:
1070 key.set_metadata('foo', 'nobar')
1071 key.set_metadata('hello', 'noworld')
1072 key.set_metadata('goodbye', 'cruel world')
1073 key = bucket.new_key(key_name)
1074 key.set_contents_from_string('bar')
1075
1076 print('wait for 5sec for the messages...')
1077 time.sleep(5)
1078
1079 found_in1 = []
1080 found_in2 = []
1081 found_in3 = []
1082 found_in4 = []
1083
1084 for event in receiver.get_and_reset_events():
1085 notif_id = event['Records'][0]['s3']['configurationId']
1086 key_name = event['Records'][0]['s3']['object']['key']
1087 awsRegion = event['Records'][0]['awsRegion']
1e59de90
TL
1088 assert_equal(awsRegion, zonegroup)
1089 bucket_arn = event['Records'][0]['s3']['bucket']['arn']
1090 assert_equal(bucket_arn, "arn:aws:s3:"+awsRegion+"::"+bucket_name)
20effc67
TL
1091 if notif_id == notification_name+'_1':
1092 found_in1.append(key_name)
1093 elif notif_id == notification_name+'_2':
1094 found_in2.append(key_name)
1095 elif notif_id == notification_name+'_3':
1096 found_in3.append(key_name)
1097 elif not skip_notif4 and notif_id == notification_name+'_4':
1098 found_in4.append(key_name)
1099 else:
1100 assert False, 'invalid notification: ' + notif_id
1101
1102 assert_equal(set(found_in1), set(expected_in1))
1103 assert_equal(set(found_in2), set(expected_in2))
1104 assert_equal(set(found_in3), set(expected_in3))
20effc67
TL
1105 if not skip_notif4:
1106 assert_equal(set(found_in4), set(expected_in4))
1107
1108 # cleanup
1109 s3_notification_conf.del_config()
1110 if not skip_notif4:
1111 s3_notification_conf4.del_config()
1112 topic_conf.del_config()
1113 # delete the bucket
1114 for key in bucket.list():
1115 key.delete()
1116 conn.delete_bucket(bucket_name)
1117 stop_amqp_receiver(receiver, task)
1118
1119
1120@attr('basic_test')
1121def test_ps_s3_notification_errors_on_master():
1122 """ test s3 notification set/get/delete on master """
1123 conn = connection()
1124 zonegroup = 'default'
1125 bucket_name = gen_bucket_name()
1126 # create bucket
1127 bucket = conn.create_bucket(bucket_name)
1128 topic_name = bucket_name + TOPIC_SUFFIX
1129 # create s3 topic
1130 endpoint_address = 'amqp://127.0.0.1:7001'
1131 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
1132 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
1133 topic_arn = topic_conf.set_config()
1134
1135 # create s3 notification with invalid event name
1136 notification_name = bucket_name + NOTIFICATION_SUFFIX
1137 topic_conf_list = [{'Id': notification_name,
1138 'TopicArn': topic_arn,
1139 'Events': ['s3:ObjectCreated:Kaboom']
1140 }]
1141 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1142 try:
1143 result, status = s3_notification_conf.set_config()
1144 except Exception as error:
1145 print(str(error) + ' - is expected')
1146 else:
1147 assert False, 'invalid event name is expected to fail'
1148
1149 # create s3 notification with missing name
1150 topic_conf_list = [{'Id': '',
1151 'TopicArn': topic_arn,
1152 'Events': ['s3:ObjectCreated:Put']
1153 }]
1154 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1155 try:
1156 _, _ = s3_notification_conf.set_config()
1157 except Exception as error:
1158 print(str(error) + ' - is expected')
1159 else:
1160 assert False, 'missing notification name is expected to fail'
1161
1162 # create s3 notification with invalid topic ARN
1163 invalid_topic_arn = 'kaboom'
1164 topic_conf_list = [{'Id': notification_name,
1165 'TopicArn': invalid_topic_arn,
1166 'Events': ['s3:ObjectCreated:Put']
1167 }]
1168 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1169 try:
1170 _, _ = s3_notification_conf.set_config()
1171 except Exception as error:
1172 print(str(error) + ' - is expected')
1173 else:
1174 assert False, 'invalid ARN is expected to fail'
1175
1176 # create s3 notification with unknown topic ARN
1177 invalid_topic_arn = 'arn:aws:sns:a::kaboom'
1178 topic_conf_list = [{'Id': notification_name,
1179 'TopicArn': invalid_topic_arn ,
1180 'Events': ['s3:ObjectCreated:Put']
1181 }]
1182 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1183 try:
1184 _, _ = s3_notification_conf.set_config()
1185 except Exception as error:
1186 print(str(error) + ' - is expected')
1187 else:
1188 assert False, 'unknown topic is expected to fail'
1189
1190 # create s3 notification with wrong bucket
1191 topic_conf_list = [{'Id': notification_name,
1192 'TopicArn': topic_arn,
1193 'Events': ['s3:ObjectCreated:Put']
1194 }]
1195 s3_notification_conf = PSNotificationS3(conn, 'kaboom', topic_conf_list)
1196 try:
1197 _, _ = s3_notification_conf.set_config()
1198 except Exception as error:
1199 print(str(error) + ' - is expected')
1200 else:
1201 assert False, 'unknown bucket is expected to fail'
1202
1203 topic_conf.del_config()
1204
1205 status = topic_conf.del_config()
1206 # deleting an unknown notification is not considered an error
1207 assert_equal(status, 200)
1208
1209 _, status = topic_conf.get_config()
1210 assert_equal(status, 404)
1211
1212 # cleanup
1213 # delete the bucket
1214 conn.delete_bucket(bucket_name)
1215
1e59de90
TL
1216@attr('basic_test')
1217def test_ps_s3_notification_permissions():
1218 """ test s3 notification set/get/delete permissions """
1219 conn1 = connection()
1220 conn2 = another_user()
1221 zonegroup = 'default'
1222 bucket_name = gen_bucket_name()
1223 # create bucket
1224 bucket = conn1.create_bucket(bucket_name)
1225 topic_name = bucket_name + TOPIC_SUFFIX
1226 # create s3 topic
1227 endpoint_address = 'amqp://127.0.0.1:7001'
1228 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
1229 topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
1230 topic_arn = topic_conf.set_config()
1231
1232 # one user create a notification
1233 notification_name = bucket_name + NOTIFICATION_SUFFIX
1234 topic_conf_list = [{'Id': notification_name,
1235 'TopicArn': topic_arn,
1236 'Events': []
1237 }]
1238 s3_notification_conf1 = PSNotificationS3(conn1, bucket_name, topic_conf_list)
1239 _, status = s3_notification_conf1.set_config()
1240 assert_equal(status, 200)
1241 # another user try to fetch it
1242 s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
1243 try:
1244 _, _ = s3_notification_conf2.get_config()
1245 assert False, "'AccessDenied' error is expected"
1246 except ClientError as error:
1247 assert_equal(error.response['Error']['Code'], 'AccessDenied')
1248 # other user try to delete the notification
1249 _, status = s3_notification_conf2.del_config()
1250 assert_equal(status, 403)
1251
1252 # bucket policy is added by the 1st user
1253 client = boto3.client('s3',
1254 endpoint_url='http://'+conn1.host+':'+str(conn1.port),
1255 aws_access_key_id=conn1.aws_access_key_id,
1256 aws_secret_access_key=conn1.aws_secret_access_key)
1257 bucket_policy = json.dumps({
1258 "Version": "2012-10-17",
1259 "Statement": [
1260 {
1261 "Sid": "Statement",
1262 "Effect": "Allow",
1263 "Principal": "*",
1264 "Action": ["s3:GetBucketNotification", "s3:PutBucketNotification"],
1265 "Resource": f"arn:aws:s3:::{bucket_name}"
1266 }
1267 ]
1268 })
1269 response = client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)
1270 assert_equal(int(response['ResponseMetadata']['HTTPStatusCode']/100), 2)
1271 result = client.get_bucket_policy(Bucket=bucket_name)
1272 print(result['Policy'])
1273
1274 # 2nd user try to fetch it again
1275 _, status = s3_notification_conf2.get_config()
1276 assert_equal(status, 200)
1277
1278 # 2nd user try to delete it again
1279 result, status = s3_notification_conf2.del_config()
1280 assert_equal(status, 200)
1281
1282 # 2nd user try to add another notification
1283 topic_conf_list = [{'Id': notification_name+"2",
1284 'TopicArn': topic_arn,
1285 'Events': []
1286 }]
1287 s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
1288 result, status = s3_notification_conf2.set_config()
1289 assert_equal(status, 200)
1290
1291 # cleanup
1292 s3_notification_conf1.del_config()
1293 s3_notification_conf2.del_config()
1294 topic_conf.del_config()
1295 # delete the bucket
1296 conn1.delete_bucket(bucket_name)
20effc67
TL
1297
1298@attr('amqp_test')
1299def test_ps_s3_notification_push_amqp_on_master():
1300 """ test pushing amqp s3 notification on master """
1301
1302 hostname = get_ip()
1303 conn = connection()
1304 zonegroup = 'default'
1305
1306 # create bucket
1307 bucket_name = gen_bucket_name()
1308 bucket = conn.create_bucket(bucket_name)
1309 topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
1310 topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
1311
1312 # start amqp receivers
1313 exchange = 'ex1'
1314 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
1315 task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
1316 task1.start()
1317 task2.start()
1318
1319 # create two s3 topic
1320 endpoint_address = 'amqp://' + hostname
1321 # with acks from broker
1322 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
1323 topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
1324 topic_arn1 = topic_conf1.set_config()
1325 # without acks from broker
1326 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
1327 topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args)
1328 topic_arn2 = topic_conf2.set_config()
1329 # create s3 notification
1330 notification_name = bucket_name + NOTIFICATION_SUFFIX
1331 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
1332 'Events': []
1333 },
1334 {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
1335 'Events': ['s3:ObjectCreated:*']
1336 }]
1337
1338 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1339 response, status = s3_notification_conf.set_config()
1340 assert_equal(status/100, 2)
1341
1342 # create objects in the bucket (async)
1343 number_of_objects = 100
1344 client_threads = []
1345 start_time = time.time()
1346 for i in range(number_of_objects):
1347 key = bucket.new_key(str(i))
1348 content = str(os.urandom(1024*1024))
1349 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1350 thr.start()
1351 client_threads.append(thr)
1352 [thr.join() for thr in client_threads]
1353
1354 time_diff = time.time() - start_time
1355 print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1356
1357 print('wait for 5sec for the messages...')
1358 time.sleep(5)
1359
1360 # check amqp receiver
1361 keys = list(bucket.list())
1362 print('total number of objects: ' + str(len(keys)))
1363 receiver1.verify_s3_events(keys, exact_match=True)
1364 receiver2.verify_s3_events(keys, exact_match=True)
1365
1366 # delete objects from the bucket
1367 client_threads = []
1368 start_time = time.time()
1369 for key in bucket.list():
1370 thr = threading.Thread(target = key.delete, args=())
1371 thr.start()
1372 client_threads.append(thr)
1373 [thr.join() for thr in client_threads]
1374
1375 time_diff = time.time() - start_time
1376 print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1377
1378 print('wait for 5sec for the messages...')
1379 time.sleep(5)
1380
1381 # check amqp receiver 1 for deletions
1382 receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
1383 # check amqp receiver 2 has no deletions
1384 try:
1385 receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
1386 except:
1387 pass
1388 else:
1389 err = 'amqp receiver 2 should have no deletions'
1390 assert False, err
1391
1392 # cleanup
1393 stop_amqp_receiver(receiver1, task1)
1394 stop_amqp_receiver(receiver2, task2)
1395 s3_notification_conf.del_config()
1396 topic_conf1.del_config()
1397 topic_conf2.del_config()
1398 # delete the bucket
1399 conn.delete_bucket(bucket_name)
1400
1401
1402@attr('manual_test')
1403def test_ps_s3_notification_push_amqp_idleness_check():
1404 """ test pushing amqp s3 notification and checking for connection idleness """
1405 return SkipTest("only used in manual testing")
1406 hostname = get_ip()
1407 conn = connection()
1408 zonegroup = 'default'
1409
1410 # create bucket
1411 bucket_name = gen_bucket_name()
1412 bucket = conn.create_bucket(bucket_name)
1413 topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
1414
1415 # start amqp receivers
1416 exchange = 'ex1'
1417 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
1418 task1.start()
1419
1420 # create two s3 topic
1421 endpoint_address = 'amqp://' + hostname
1422 # with acks from broker
1423 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
1424 topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
1425 topic_arn1 = topic_conf1.set_config()
1426 # create s3 notification
1427 notification_name = bucket_name + NOTIFICATION_SUFFIX
1428 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
1429 'Events': []
1430 }]
1431
1432 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1433 response, status = s3_notification_conf.set_config()
1434 assert_equal(status/100, 2)
1435
1436 # create objects in the bucket (async)
1437 number_of_objects = 10
1438 client_threads = []
1439 start_time = time.time()
1440 for i in range(number_of_objects):
1441 key = bucket.new_key(str(i))
1442 content = str(os.urandom(1024*1024))
1443 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1444 thr.start()
1445 client_threads.append(thr)
1446 [thr.join() for thr in client_threads]
1447
1448 time_diff = time.time() - start_time
1449 print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1450
1451 print('wait for 5sec for the messages...')
1452 time.sleep(5)
1453
1454 # check amqp receiver
1455 keys = list(bucket.list())
1456 print('total number of objects: ' + str(len(keys)))
1457 receiver1.verify_s3_events(keys, exact_match=True)
1458
1459 # delete objects from the bucket
1460 client_threads = []
1461 start_time = time.time()
1462 for key in bucket.list():
1463 thr = threading.Thread(target = key.delete, args=())
1464 thr.start()
1465 client_threads.append(thr)
1466 [thr.join() for thr in client_threads]
1467
1468 time_diff = time.time() - start_time
1469 print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1470
1471 print('wait for 5sec for the messages...')
1472 time.sleep(5)
1473
1474 # check amqp receiver 1 for deletions
1475 receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
1476
1477 print('waiting for 40sec for checking idleness')
1478 time.sleep(40)
1479
1480 os.system("netstat -nnp | grep 5672");
1481
1482 # do the process of uploading an object and checking for notification again
1483 number_of_objects = 10
1484 client_threads = []
1485 start_time = time.time()
1486 for i in range(number_of_objects):
1487 key = bucket.new_key(str(i))
1488 content = str(os.urandom(1024*1024))
1489 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1490 thr.start()
1491 client_threads.append(thr)
1492 [thr.join() for thr in client_threads]
1493
1494 time_diff = time.time() - start_time
1495 print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1496
1497 print('wait for 5sec for the messages...')
1498 time.sleep(5)
1499
1500 # check amqp receiver
1501 keys = list(bucket.list())
1502 print('total number of objects: ' + str(len(keys)))
1503 receiver1.verify_s3_events(keys, exact_match=True)
1504
1505 # delete objects from the bucket
1506 client_threads = []
1507 start_time = time.time()
1508 for key in bucket.list():
1509 thr = threading.Thread(target = key.delete, args=())
1510 thr.start()
1511 client_threads.append(thr)
1512 [thr.join() for thr in client_threads]
1513
1514 time_diff = time.time() - start_time
1515 print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1516
1517 print('wait for 5sec for the messages...')
1518 time.sleep(5)
1519
1520 # check amqp receiver 1 for deletions
1521 receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
1522
1523 os.system("netstat -nnp | grep 5672");
1524
1525 # cleanup
1526 stop_amqp_receiver(receiver1, task1)
1527 s3_notification_conf.del_config()
1528 topic_conf1.del_config()
1529 # delete the bucket
1530 conn.delete_bucket(bucket_name)
1531
1532
1533@attr('kafka_test')
1534def test_ps_s3_notification_push_kafka_on_master():
1535 """ test pushing kafka s3 notification on master """
1536 conn = connection()
1537 zonegroup = 'default'
1538
1539 # create bucket
1540 bucket_name = gen_bucket_name()
1541 bucket = conn.create_bucket(bucket_name)
1542 # name is constant for manual testing
1543 topic_name = bucket_name+'_topic'
1544 # create consumer on the topic
1545
1546 try:
1547 s3_notification_conf = None
1548 topic_conf1 = None
1549 topic_conf2 = None
1550 receiver = None
1551 task, receiver = create_kafka_receiver_thread(topic_name+'_1')
1552 task.start()
1553
1554 # create s3 topic
1555 endpoint_address = 'kafka://' + kafka_server
1556 # without acks from broker
1557 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
1558 topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
1559 topic_arn1 = topic_conf1.set_config()
1560 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
1561 topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
1562 topic_arn2 = topic_conf2.set_config()
1563 # create s3 notification
1564 notification_name = bucket_name + NOTIFICATION_SUFFIX
1565 topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
1566 'Events': []
1567 },
1568 {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
1569 'Events': []
1570 }]
1571
1572 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1573 response, status = s3_notification_conf.set_config()
1574 assert_equal(status/100, 2)
1575
1576 # create objects in the bucket (async)
1577 number_of_objects = 10
1578 client_threads = []
1579 etags = []
1580 start_time = time.time()
1581 for i in range(number_of_objects):
1582 key = bucket.new_key(str(i))
1583 content = str(os.urandom(1024*1024))
1584 etag = hashlib.md5(content.encode()).hexdigest()
1585 etags.append(etag)
1586 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1587 thr.start()
1588 client_threads.append(thr)
1589 [thr.join() for thr in client_threads]
1590
1591 time_diff = time.time() - start_time
1592 print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1593
1594 print('wait for 5sec for the messages...')
1595 time.sleep(5)
1596 keys = list(bucket.list())
1597 receiver.verify_s3_events(keys, exact_match=True, etags=etags)
1598
1599 # delete objects from the bucket
1600 client_threads = []
1601 start_time = time.time()
1602 for key in bucket.list():
1603 thr = threading.Thread(target = key.delete, args=())
1604 thr.start()
1605 client_threads.append(thr)
1606 [thr.join() for thr in client_threads]
1607
1608 time_diff = time.time() - start_time
1609 print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1610
1611 print('wait for 5sec for the messages...')
1612 time.sleep(5)
1613 receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
1614 except Exception as e:
1615 print(e)
1616 assert False
1617 finally:
1618 # cleanup
1619 if s3_notification_conf is not None:
1620 s3_notification_conf.del_config()
1621 if topic_conf1 is not None:
1622 topic_conf1.del_config()
1623 if topic_conf2 is not None:
1624 topic_conf2.del_config()
1625 # delete the bucket
1e59de90
TL
1626 for key in bucket.list():
1627 key.delete()
20effc67
TL
1628 conn.delete_bucket(bucket_name)
1629 if receiver is not None:
1630 stop_kafka_receiver(receiver, task)
1631
1632
1633@attr('http_test')
1634def test_ps_s3_notification_multi_delete_on_master():
1635 """ test deletion of multiple keys on master """
1636 hostname = get_ip()
1637 conn = connection()
1638 zonegroup = 'default'
1639
1640 # create random port for the http server
1641 host = get_ip()
1642 port = random.randint(10000, 20000)
1643 # start an http server in a separate thread
1644 number_of_objects = 10
1645 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1646
1647 # create bucket
1648 bucket_name = gen_bucket_name()
1649 bucket = conn.create_bucket(bucket_name)
1650 topic_name = bucket_name + TOPIC_SUFFIX
1651
1652 # create s3 topic
1653 endpoint_address = 'http://'+host+':'+str(port)
1654 endpoint_args = 'push-endpoint='+endpoint_address
1655 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
1656 topic_arn = topic_conf.set_config()
1657 # create s3 notification
1658 notification_name = bucket_name + NOTIFICATION_SUFFIX
1659 topic_conf_list = [{'Id': notification_name,
1660 'TopicArn': topic_arn,
1661 'Events': ['s3:ObjectRemoved:*']
1662 }]
1663 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1664 response, status = s3_notification_conf.set_config()
1665 assert_equal(status/100, 2)
1666
1667 # create objects in the bucket
1668 client_threads = []
1669 objects_size = {}
1670 for i in range(number_of_objects):
1671 content = str(os.urandom(randint(1, 1024)))
1672 object_size = len(content)
1673 key = bucket.new_key(str(i))
1674 objects_size[key.name] = object_size
1675 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1676 thr.start()
1677 client_threads.append(thr)
1678 [thr.join() for thr in client_threads]
1679
1680 keys = list(bucket.list())
1681
1682 start_time = time.time()
1683 delete_all_objects(conn, bucket_name)
1684 time_diff = time.time() - start_time
1685 print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1686
1687 print('wait for 5sec for the messages...')
1688 time.sleep(5)
1689
1690 # check http receiver
1691 http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
1692
1693 # cleanup
1694 topic_conf.del_config()
1695 s3_notification_conf.del_config(notification=notification_name)
1696 # delete the bucket
1697 conn.delete_bucket(bucket_name)
1698 http_server.close()
1699
1700
1701@attr('http_test')
1702def test_ps_s3_notification_push_http_on_master():
1703 """ test pushing http s3 notification on master """
1704 hostname = get_ip_http()
1705 conn = connection()
1706 zonegroup = 'default'
1707
1708 # create random port for the http server
1709 host = get_ip()
1710 port = random.randint(10000, 20000)
1711 # start an http server in a separate thread
1712 number_of_objects = 10
1713 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1714
1715 # create bucket
1716 bucket_name = gen_bucket_name()
1717 bucket = conn.create_bucket(bucket_name)
1718 topic_name = bucket_name + TOPIC_SUFFIX
1719
1720 # create s3 topic
1721 endpoint_address = 'http://'+host+':'+str(port)
1722 endpoint_args = 'push-endpoint='+endpoint_address
1723 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
1724 topic_arn = topic_conf.set_config()
1725 # create s3 notification
1726 notification_name = bucket_name + NOTIFICATION_SUFFIX
1727 topic_conf_list = [{'Id': notification_name,
1728 'TopicArn': topic_arn,
1729 'Events': []
1730 }]
1731 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1732 response, status = s3_notification_conf.set_config()
1733 assert_equal(status/100, 2)
1734
1735 # create objects in the bucket
1736 client_threads = []
1737 objects_size = {}
1738 start_time = time.time()
1739 for i in range(number_of_objects):
1740 content = str(os.urandom(randint(1, 1024)))
1741 object_size = len(content)
1742 key = bucket.new_key(str(i))
1743 objects_size[key.name] = object_size
1744 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1745 thr.start()
1746 client_threads.append(thr)
1747 [thr.join() for thr in client_threads]
1748
1749 time_diff = time.time() - start_time
1750 print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1751
1752 print('wait for 5sec for the messages...')
1753 time.sleep(5)
1754
1755 # check http receiver
1756 keys = list(bucket.list())
1757 http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
1758
1759 # delete objects from the bucket
1760 client_threads = []
1761 start_time = time.time()
1762 for key in bucket.list():
1763 thr = threading.Thread(target = key.delete, args=())
1764 thr.start()
1765 client_threads.append(thr)
1766 [thr.join() for thr in client_threads]
1767
1768 time_diff = time.time() - start_time
1769 print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1770
1771 print('wait for 5sec for the messages...')
1772 time.sleep(5)
1773
1774 # check http receiver
1775 http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
1776
1777 # cleanup
1778 topic_conf.del_config()
1779 s3_notification_conf.del_config(notification=notification_name)
1780 # delete the bucket
1781 conn.delete_bucket(bucket_name)
1782 http_server.close()
1783
1784
1785@attr('http_test')
1786def test_ps_s3_notification_push_cloudevents_on_master():
1787 """ test pushing cloudevents notification on master """
1788 hostname = get_ip_http()
1789 conn = connection()
1790 zonegroup = 'default'
1791
1792 # create random port for the http server
1793 host = get_ip()
1794 port = random.randint(10000, 20000)
1795 # start an http server in a separate thread
1796 number_of_objects = 10
1797 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects, cloudevents=True)
1798
1799 # create bucket
1800 bucket_name = gen_bucket_name()
1801 bucket = conn.create_bucket(bucket_name)
1802 topic_name = bucket_name + TOPIC_SUFFIX
1803
1804 # create s3 topic
1805 endpoint_address = 'http://'+host+':'+str(port)
1806 endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
1807 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
1808 topic_arn = topic_conf.set_config()
1809 # create s3 notification
1810 notification_name = bucket_name + NOTIFICATION_SUFFIX
1811 topic_conf_list = [{'Id': notification_name,
1812 'TopicArn': topic_arn,
1813 'Events': []
1814 }]
1815 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1816 response, status = s3_notification_conf.set_config()
1817 assert_equal(status/100, 2)
1818
1819 # create objects in the bucket
1820 client_threads = []
1821 objects_size = {}
1822 start_time = time.time()
1823 for i in range(number_of_objects):
1824 content = str(os.urandom(randint(1, 1024)))
1825 object_size = len(content)
1826 key = bucket.new_key(str(i))
1827 objects_size[key.name] = object_size
1828 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1829 thr.start()
1830 client_threads.append(thr)
1831 [thr.join() for thr in client_threads]
1832
1833 time_diff = time.time() - start_time
1834 print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1835
1836 print('wait for 5sec for the messages...')
1837 time.sleep(5)
1838
1839 # check http receiver
1840 keys = list(bucket.list())
1841 http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
1842
1843 # delete objects from the bucket
1844 client_threads = []
1845 start_time = time.time()
1846 for key in bucket.list():
1847 thr = threading.Thread(target = key.delete, args=())
1848 thr.start()
1849 client_threads.append(thr)
1850 [thr.join() for thr in client_threads]
1851
1852 time_diff = time.time() - start_time
1853 print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1854
1855 print('wait for 5sec for the messages...')
1856 time.sleep(5)
1857
1858 # check http receiver
1859 http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
1860
1861 # cleanup
1862 topic_conf.del_config()
1863 s3_notification_conf.del_config(notification=notification_name)
1864 # delete the bucket
1865 conn.delete_bucket(bucket_name)
1866 http_server.close()
1867
1868
1869@attr('http_test')
1870def test_ps_s3_opaque_data_on_master():
1871 """ test that opaque id set in topic, is sent in notification on master """
1872 hostname = get_ip()
1873 conn = connection()
1874 zonegroup = 'default'
1875
1876 # create random port for the http server
1877 host = get_ip()
1878 port = random.randint(10000, 20000)
1879 # start an http server in a separate thread
1880 number_of_objects = 10
1881 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1882
1883 # create bucket
1884 bucket_name = gen_bucket_name()
1885 bucket = conn.create_bucket(bucket_name)
1886 topic_name = bucket_name + TOPIC_SUFFIX
1887
1888 # create s3 topic
1889 endpoint_address = 'http://'+host+':'+str(port)
1890 endpoint_args = 'push-endpoint='+endpoint_address
1891 opaque_data = 'http://1.2.3.4:8888'
1892 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
1893 topic_arn = topic_conf.set_config()
1894 # create s3 notification
1895 notification_name = bucket_name + NOTIFICATION_SUFFIX
1896 topic_conf_list = [{'Id': notification_name,
1897 'TopicArn': topic_arn,
1898 'Events': []
1899 }]
1900 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1901 response, status = s3_notification_conf.set_config()
1902 assert_equal(status/100, 2)
1903
1904 # create objects in the bucket
1905 client_threads = []
1906 start_time = time.time()
1907 content = 'bar'
1908 for i in range(number_of_objects):
1909 key = bucket.new_key(str(i))
1910 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1911 thr.start()
1912 client_threads.append(thr)
1913 [thr.join() for thr in client_threads]
1914
1915 time_diff = time.time() - start_time
1916 print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1917
1918 print('wait for 5sec for the messages...')
1919 time.sleep(5)
1920
1921 # check http receiver
1922 keys = list(bucket.list())
1923 print('total number of objects: ' + str(len(keys)))
1924 events = http_server.get_and_reset_events()
1925 for event in events:
1926 assert_equal(event['Records'][0]['opaqueData'], opaque_data)
1927
1928 # cleanup
1929 for key in keys:
1930 key.delete()
1931 [thr.join() for thr in client_threads]
1932 topic_conf.del_config()
1933 s3_notification_conf.del_config(notification=notification_name)
1934 # delete the bucket
1935 conn.delete_bucket(bucket_name)
1936 http_server.close()
1937
1938@attr('http_test')
1939def test_ps_s3_lifecycle_on_master():
1940 """ test that when object is deleted due to lifecycle policy, notification is sent on master """
1941 hostname = get_ip()
1942 conn = connection()
1943 zonegroup = 'default'
1944
1945 # create random port for the http server
1946 host = get_ip()
1947 port = random.randint(10000, 20000)
1948 # start an http server in a separate thread
1949 number_of_objects = 10
1950 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
1951
1952 # create bucket
1953 bucket_name = gen_bucket_name()
1954 bucket = conn.create_bucket(bucket_name)
1955 topic_name = bucket_name + TOPIC_SUFFIX
1956
1957 # create s3 topic
1958 endpoint_address = 'http://'+host+':'+str(port)
1959 endpoint_args = 'push-endpoint='+endpoint_address
1960 opaque_data = 'http://1.2.3.4:8888'
1961 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
1962 topic_arn = topic_conf.set_config()
1963 # create s3 notification
1964 notification_name = bucket_name + NOTIFICATION_SUFFIX
1965 topic_conf_list = [{'Id': notification_name,
1966 'TopicArn': topic_arn,
1967 'Events': ['s3:ObjectLifecycle:Expiration:*']
1968 }]
1969 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1970 response, status = s3_notification_conf.set_config()
1971 assert_equal(status/100, 2)
1972
1973 # create objects in the bucket
1974 obj_prefix = 'ooo'
1975 client_threads = []
1976 start_time = time.time()
1977 content = 'bar'
1978 for i in range(number_of_objects):
1979 key = bucket.new_key(obj_prefix + str(i))
1980 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
1981 thr.start()
1982 client_threads.append(thr)
1983 [thr.join() for thr in client_threads]
1984
1985 time_diff = time.time() - start_time
1986 print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
1987
1988 # create lifecycle policy
1989 client = boto3.client('s3',
1990 endpoint_url='http://'+conn.host+':'+str(conn.port),
1991 aws_access_key_id=conn.aws_access_key_id,
1992 aws_secret_access_key=conn.aws_secret_access_key)
1993 yesterday = datetime.date.today() - datetime.timedelta(days=1)
1994 response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
1995 LifecycleConfiguration={'Rules': [
1996 {
1997 'ID': 'rule1',
1998 'Expiration': {'Date': yesterday.isoformat()},
1999 'Filter': {'Prefix': obj_prefix},
2000 'Status': 'Enabled',
2001 }
2002 ]
2003 }
2004 )
2005
2006 # start lifecycle processing
2007 admin(['lc', 'process'])
2008 print('wait for 5sec for the messages...')
2009 time.sleep(5)
2010
2011 # check http receiver does not have messages
2012 keys = list(bucket.list())
2013 print('total number of objects: ' + str(len(keys)))
2014 event_keys = []
2015 events = http_server.get_and_reset_events()
2016 for event in events:
2017 assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:Current')
2018 event_keys.append(event['Records'][0]['s3']['object']['key'])
2019 for key in keys:
2020 key_found = False
2021 for event_key in event_keys:
2022 if event_key == key:
2023 key_found = True
2024 break
2025 if not key_found:
2026 err = 'no lifecycle event found for key: ' + str(key)
2027 log.error(events)
2028 assert False, err
2029
2030 # cleanup
2031 for key in keys:
2032 key.delete()
2033 [thr.join() for thr in client_threads]
2034 topic_conf.del_config()
2035 s3_notification_conf.del_config(notification=notification_name)
2036 # delete the bucket
2037 conn.delete_bucket(bucket_name)
2038 http_server.close()
2039
2040
2041def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
2042 """ test object creation s3 notifications in using put/copy/post on master"""
2043
1e59de90 2044 if not external_endpoint_address:
20effc67
TL
2045 hostname = 'localhost'
2046 proc = init_rabbitmq()
2047 if proc is None:
2048 return SkipTest('end2end amqp tests require rabbitmq-server installed')
2049 else:
2050 proc = None
2051
2052 conn = connection()
2053 hostname = 'localhost'
2054 zonegroup = 'default'
2055
2056 # create bucket
2057 bucket_name = gen_bucket_name()
2058 bucket = conn.create_bucket(bucket_name)
2059 topic_name = bucket_name + TOPIC_SUFFIX
2060
2061 # start amqp receiver
2062 exchange = 'ex1'
2063 task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location)
2064 task.start()
2065
2066 # create s3 topic
2067 if external_endpoint_address:
2068 endpoint_address = external_endpoint_address
2069 elif ca_location:
2070 endpoint_address = 'amqps://' + hostname
2071 else:
2072 endpoint_address = 'amqp://' + hostname
2073 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl
2074 if ca_location:
2075 endpoint_args += '&ca-location={}'.format(ca_location)
2076 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
2077 topic_arn = topic_conf.set_config()
2078 # create s3 notification
2079 notification_name = bucket_name + NOTIFICATION_SUFFIX
2080 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
2081 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:CompleteMultipartUpload']
2082 }]
2083
2084 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
2085 response, status = s3_notification_conf.set_config()
2086 assert_equal(status/100, 2)
2087
2088 objects_size = {}
2089 # create objects in the bucket using PUT
2090 content = str(os.urandom(randint(1, 1024)))
2091 key_name = 'put'
2092 key = bucket.new_key(key_name)
2093 objects_size[key_name] = len(content)
2094 key.set_contents_from_string(content)
2095 # create objects in the bucket using COPY
2096 key_name = 'copy'
2097 bucket.copy_key(key_name, bucket.name, key.name)
2098 objects_size[key_name] = len(content)
2099
2100 # create objects in the bucket using multi-part upload
2101 fp = tempfile.NamedTemporaryFile(mode='w+b')
2102 content = bytearray(os.urandom(10*1024*1024))
2103 key_name = 'multipart'
2104 objects_size[key_name] = len(content)
2105 fp.write(content)
2106 fp.flush()
2107 fp.seek(0)
2108 uploader = bucket.initiate_multipart_upload(key_name)
2109 uploader.upload_part_from_file(fp, 1)
2110 uploader.complete_upload()
2111 fp.close()
2112
2113 print('wait for 5sec for the messages...')
2114 time.sleep(5)
2115
2116 # check amqp receiver
2117 keys = list(bucket.list())
2118 receiver.verify_s3_events(keys, exact_match=True, expected_sizes=objects_size)
2119
2120 # cleanup
2121 stop_amqp_receiver(receiver, task)
2122 s3_notification_conf.del_config()
2123 topic_conf.del_config()
2124 for key in bucket.list():
2125 key.delete()
2126 # delete the bucket
2127 conn.delete_bucket(bucket_name)
2128 if proc:
2129 clean_rabbitmq(proc)
2130
2131
2132@attr('amqp_test')
2133def test_ps_s3_creation_triggers_on_master():
1e59de90 2134 ps_s3_creation_triggers_on_master(external_endpoint_address="amqp://localhost:5672")
20effc67
TL
2135
2136
2137@attr('amqp_ssl_test')
2138def test_ps_s3_creation_triggers_on_master_external():
20effc67
TL
2139
2140 from distutils.util import strtobool
2141
2142 if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
2143 try:
2144 if strtobool(os.environ['AMQP_VERIFY_SSL']):
2145 verify_ssl = 'true'
2146 else:
2147 verify_ssl = 'false'
2148 except Exception as e:
2149 verify_ssl = 'true'
2150
2151 ps_s3_creation_triggers_on_master(
2152 external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
2153 verify_ssl=verify_ssl)
2154 else:
2155 return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
2156
2157
1e59de90 2158def generate_private_key(tempdir):
20effc67
TL
2159
2160 import datetime
20effc67
TL
2161 import stat
2162 from cryptography import x509
2163 from cryptography.x509.oid import NameOID
2164 from cryptography.hazmat.primitives import hashes
2165 from cryptography.hazmat.backends import default_backend
2166 from cryptography.hazmat.primitives import serialization
2167 from cryptography.hazmat.primitives.asymmetric import rsa
1e59de90
TL
2168
2169 # modify permissions to ensure that the broker user can access them
2170 os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
2171 CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
2172 CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
2173 KEYFILE = os.path.join(tempdir, 'server_key.pem')
2174
2175 root_key = rsa.generate_private_key(
2176 public_exponent=65537,
2177 key_size=2048,
2178 backend=default_backend()
2179 )
2180 subject = issuer = x509.Name([
2181 x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
2182 x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
2183 x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
2184 x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
2185 x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
2186 ])
2187 root_cert = x509.CertificateBuilder().subject_name(
2188 subject
2189 ).issuer_name(
2190 issuer
2191 ).public_key(
2192 root_key.public_key()
2193 ).serial_number(
2194 x509.random_serial_number()
2195 ).not_valid_before(
2196 datetime.datetime.utcnow()
2197 ).not_valid_after(
2198 datetime.datetime.utcnow() + datetime.timedelta(days=3650)
2199 ).add_extension(
2200 x509.BasicConstraints(ca=True, path_length=None), critical=True
2201 ).sign(root_key, hashes.SHA256(), default_backend())
2202 with open(CACERTFILE, "wb") as f:
2203 f.write(root_cert.public_bytes(serialization.Encoding.PEM))
2204
2205 # Now we want to generate a cert from that root
2206 cert_key = rsa.generate_private_key(
2207 public_exponent=65537,
2208 key_size=2048,
2209 backend=default_backend(),
2210 )
2211 with open(KEYFILE, "wb") as f:
2212 f.write(cert_key.private_bytes(
2213 encoding=serialization.Encoding.PEM,
2214 format=serialization.PrivateFormat.PKCS8,
2215 encryption_algorithm=serialization.NoEncryption(),
2216 ))
2217 new_subject = x509.Name([
2218 x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
2219 x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
2220 x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
2221 x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
2222 ])
2223 cert = x509.CertificateBuilder().subject_name(
2224 new_subject
2225 ).issuer_name(
2226 root_cert.issuer
2227 ).public_key(
2228 cert_key.public_key()
2229 ).serial_number(
2230 x509.random_serial_number()
2231 ).not_valid_before(
2232 datetime.datetime.utcnow()
2233 ).not_valid_after(
2234 datetime.datetime.utcnow() + datetime.timedelta(days=30)
2235 ).add_extension(
2236 x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
2237 critical=False,
2238 ).sign(root_key, hashes.SHA256(), default_backend())
2239 # Write our certificate out to disk.
2240 with open(CERTFILE, "wb") as f:
2241 f.write(cert.public_bytes(serialization.Encoding.PEM))
2242
2243 print("\n\n********private key generated********")
2244 print(CACERTFILE, CERTFILE, KEYFILE)
2245 print("\n\n")
2246 return CACERTFILE, CERTFILE, KEYFILE
2247
2248
2249@attr('amqp_ssl_test')
2250def test_ps_s3_creation_triggers_on_master_ssl():
2251
2252 import textwrap
20effc67
TL
2253 from tempfile import TemporaryDirectory
2254
2255 with TemporaryDirectory() as tempdir:
1e59de90 2256 CACERTFILE, CERTFILE, KEYFILE = generate_private_key(tempdir)
20effc67 2257 RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config')
20effc67
TL
2258 with open(RABBITMQ_CONF_FILE, "w") as f:
2259 # use the old style config format to ensure it also runs on older RabbitMQ versions.
2260 f.write(textwrap.dedent(f'''
2261 [
2262 {{rabbit, [
2263 {{ssl_listeners, [5671]}},
2264 {{ssl_options, [{{cacertfile, "{CACERTFILE}"}},
2265 {{certfile, "{CERTFILE}"}},
2266 {{keyfile, "{KEYFILE}"}},
2267 {{verify, verify_peer}},
2268 {{fail_if_no_peer_cert, false}}]}}]}}
2269 ].
2270 '''))
2271 os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0]
2272
2273 ps_s3_creation_triggers_on_master(ca_location=CACERTFILE)
2274
2275 del os.environ['RABBITMQ_CONFIG_FILE']
2276
2277
2278@attr('amqp_test')
2279def test_http_post_object_upload():
2280 """ test that uploads object using HTTP POST """
2281
2282 import boto3
2283 from collections import OrderedDict
2284 import requests
2285
2286 hostname = get_ip()
2287 zonegroup = 'default'
2288 conn = connection()
2289
2290 endpoint = "http://%s:%d" % (get_config_host(), get_config_port())
2291
2292 conn1 = boto3.client(service_name='s3',
2293 aws_access_key_id=get_access_key(),
2294 aws_secret_access_key=get_secret_key(),
2295 endpoint_url=endpoint,
2296 )
2297
2298 bucket_name = gen_bucket_name()
2299 topic_name = bucket_name + TOPIC_SUFFIX
2300
2301 key_name = 'foo.txt'
2302
2303 resp = conn1.generate_presigned_post(Bucket=bucket_name, Key=key_name,)
2304
2305 url = resp['url']
2306
2307 bucket = conn1.create_bucket(ACL='public-read-write', Bucket=bucket_name)
2308
2309 # start amqp receivers
2310 exchange = 'ex1'
2311 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
2312 task1.start()
2313
2314 # create s3 topics
2315 endpoint_address = 'amqp://' + hostname
2316 endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
2317 topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
2318 topic_arn1 = topic_conf1.set_config()
2319
2320 # create s3 notifications
2321 notification_name = bucket_name + NOTIFICATION_SUFFIX
2322 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
2323 'Events': ['s3:ObjectCreated:Post']
2324 }]
2325 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
2326 response, status = s3_notification_conf.set_config()
2327 assert_equal(status/100, 2)
2328
2329 payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
2330 ("Content-Type" , "text/plain"),('file', ('bar'))])
2331
2332 # POST upload
2333 r = requests.post(url, files=payload, verify=True)
2334 assert_equal(r.status_code, 204)
2335
2336 # check amqp receiver
2337 events = receiver1.get_and_reset_events()
2338 assert_equal(len(events), 1)
2339
2340 # cleanup
2341 stop_amqp_receiver(receiver1, task1)
2342 s3_notification_conf.del_config()
2343 topic_conf1.del_config()
2344 conn1.delete_object(Bucket=bucket_name, Key=key_name)
2345 # delete the bucket
2346 conn1.delete_bucket(Bucket=bucket_name)
2347
2348
2349@attr('amqp_test')
2350def test_ps_s3_multipart_on_master():
2351 """ test multipart object upload on master"""
2352
2353 hostname = get_ip()
2354 conn = connection()
2355 zonegroup = 'default'
2356
2357 # create bucket
2358 bucket_name = gen_bucket_name()
2359 bucket = conn.create_bucket(bucket_name)
2360 topic_name = bucket_name + TOPIC_SUFFIX
2361
2362 # start amqp receivers
2363 exchange = 'ex1'
2364 task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
2365 task1.start()
2366 task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
2367 task2.start()
2368 task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
2369 task3.start()
2370
2371 # create s3 topics
2372 endpoint_address = 'amqp://' + hostname
2373 endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
2374 topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
2375 topic_arn1 = topic_conf1.set_config()
2376 topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
2377 topic_arn2 = topic_conf2.set_config()
2378 topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
2379 topic_arn3 = topic_conf3.set_config()
2380
2381 # create s3 notifications
2382 notification_name = bucket_name + NOTIFICATION_SUFFIX
2383 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
2384 'Events': ['s3:ObjectCreated:*']
2385 },
2386 {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
2387 'Events': ['s3:ObjectCreated:Post']
2388 },
2389 {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
2390 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
2391 }]
2392 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
2393 response, status = s3_notification_conf.set_config()
2394 assert_equal(status/100, 2)
2395
2396 # create objects in the bucket using multi-part upload
2397 fp = tempfile.NamedTemporaryFile(mode='w+b')
2398 object_size = 1024
2399 content = bytearray(os.urandom(object_size))
2400 fp.write(content)
2401 fp.flush()
2402 fp.seek(0)
2403 uploader = bucket.initiate_multipart_upload('multipart')
2404 uploader.upload_part_from_file(fp, 1)
2405 uploader.complete_upload()
2406 fp.close()
2407
2408 print('wait for 5sec for the messages...')
2409 time.sleep(5)
2410
2411 # check amqp receiver
2412 events = receiver1.get_and_reset_events()
2413 assert_equal(len(events), 1)
2414
2415 events = receiver2.get_and_reset_events()
2416 assert_equal(len(events), 0)
2417
2418 events = receiver3.get_and_reset_events()
2419 assert_equal(len(events), 1)
2420 assert_equal(events[0]['Records'][0]['eventName'], 'ObjectCreated:CompleteMultipartUpload')
2421 assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
1e59de90
TL
2422 assert_equal(events[0]['Records'][0]['s3']['object']['size'], object_size)
2423 assert events[0]['Records'][0]['eventTime'] != '0.000000', 'invalid eventTime'
20effc67
TL
2424
2425 # cleanup
2426 stop_amqp_receiver(receiver1, task1)
2427 stop_amqp_receiver(receiver2, task2)
2428 stop_amqp_receiver(receiver3, task3)
2429 s3_notification_conf.del_config()
2430 topic_conf1.del_config()
2431 topic_conf2.del_config()
2432 topic_conf3.del_config()
2433 for key in bucket.list():
2434 key.delete()
2435 # delete the bucket
2436 conn.delete_bucket(bucket_name)
2437
20effc67 2438@attr('amqp_test')
1e59de90 2439def test_ps_s3_metadata_filter_on_master():
20effc67
TL
2440 """ test s3 notification of metadata on master """
2441
2442 hostname = get_ip()
2443 conn = connection()
2444 zonegroup = 'default'
2445
2446 # create bucket
2447 bucket_name = gen_bucket_name()
2448 bucket = conn.create_bucket(bucket_name)
1e59de90 2449 topic_name = bucket_name + TOPIC_SUFFIX
20effc67 2450
1e59de90 2451 # start amqp receivers
20effc67
TL
2452 exchange = 'ex1'
2453 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2454 task.start()
2455
2456 # create s3 topic
2457 endpoint_address = 'amqp://' + hostname
2458 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
2459 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
2460 topic_arn = topic_conf.set_config()
2461 # create s3 notification
2462 notification_name = bucket_name + NOTIFICATION_SUFFIX
2463 meta_key = 'meta1'
2464 meta_value = 'This is my metadata value'
1e59de90 2465 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
20effc67
TL
2466 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2467 'Filter': {
2468 'Metadata': {
1e59de90 2469 'FilterRules': [{'Name': META_PREFIX+meta_key, 'Value': meta_value}]
20effc67
TL
2470 }
2471 }
2472 }]
2473
2474 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
1e59de90 2475 _, status = s3_notification_conf.set_config()
20effc67
TL
2476 assert_equal(status/100, 2)
2477
2478 expected_keys = []
2479 # create objects in the bucket
2480 key_name = 'foo'
2481 key = bucket.new_key(key_name)
2482 key.set_metadata(meta_key, meta_value)
2483 key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
2484 expected_keys.append(key_name)
2485
2486 # create objects in the bucket using COPY
2487 key_name = 'copy_of_foo'
2488 bucket.copy_key(key_name, bucket.name, key.name)
2489 expected_keys.append(key_name)
2490
2491 # create another objects in the bucket using COPY
2492 # but override the metadata value
2493 key_name = 'another_copy_of_foo'
2494 bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
1e59de90 2495 # this key is not in the expected keys due to the different meta value
20effc67
TL
2496
2497 # create objects in the bucket using multi-part upload
2498 fp = tempfile.NamedTemporaryFile(mode='w+b')
2499 chunk_size = 1024*1024*5 # 5MB
2500 object_size = 10*chunk_size
2501 content = bytearray(os.urandom(object_size))
2502 fp.write(content)
2503 fp.flush()
2504 fp.seek(0)
2505 key_name = 'multipart_foo'
2506 uploader = bucket.initiate_multipart_upload(key_name,
2507 metadata={meta_key: meta_value})
2508 for i in range(1,5):
2509 uploader.upload_part_from_file(fp, i, size=chunk_size)
2510 fp.seek(i*chunk_size)
2511 uploader.complete_upload()
2512 fp.close()
2513 expected_keys.append(key_name)
2514
2515 print('wait for 5sec for the messages...')
2516 time.sleep(5)
2517 # check amqp receiver
2518 events = receiver.get_and_reset_events()
1e59de90 2519 assert_equal(len(events), len(expected_keys))
20effc67
TL
2520 for event in events:
2521 assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
2522
2523 # delete objects
2524 for key in bucket.list():
2525 key.delete()
2526 print('wait for 5sec for the messages...')
2527 time.sleep(5)
2528 # check amqp receiver
1e59de90
TL
2529 events = receiver.get_and_reset_events()
2530 assert_equal(len(events), len(expected_keys))
2531 for event in events:
2532 assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
2533
2534 # cleanup
2535 stop_amqp_receiver(receiver, task)
2536 s3_notification_conf.del_config()
2537 topic_conf.del_config()
2538 # delete the bucket
2539 conn.delete_bucket(bucket_name)
2540
2541
2542@attr('amqp_test')
2543def test_ps_s3_metadata_on_master():
2544 """ test s3 notification of metadata on master """
2545
2546 hostname = get_ip()
2547 conn = connection()
2548 zonegroup = 'default'
2549
2550 # create bucket
2551 bucket_name = gen_bucket_name()
2552 bucket = conn.create_bucket(bucket_name)
2553 topic_name = bucket_name + TOPIC_SUFFIX
2554
2555 # start amqp receivers
2556 exchange = 'ex1'
2557 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2558 task.start()
2559
2560 # create s3 topic
2561 endpoint_address = 'amqp://' + hostname
2562 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
2563 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
2564 topic_arn = topic_conf.set_config()
2565 # create s3 notification
2566 notification_name = bucket_name + NOTIFICATION_SUFFIX
2567 meta_key = 'meta1'
2568 meta_value = 'This is my metadata value'
aee94f69 2569 meta_prefix = META_PREFIX
1e59de90
TL
2570 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
2571 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2572 }]
2573
2574 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
2575 _, status = s3_notification_conf.set_config()
2576 assert_equal(status/100, 2)
2577
2578 # create objects in the bucket
2579 key_name = 'foo'
2580 key = bucket.new_key(key_name)
2581 key.set_metadata(meta_key, meta_value)
2582 key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
2583 # update the object
2584 another_meta_key = 'meta2'
2585 key.set_metadata(another_meta_key, meta_value)
2586 key.set_contents_from_string('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
2587
2588 # create objects in the bucket using COPY
2589 key_name = 'copy_of_foo'
2590 bucket.copy_key(key_name, bucket.name, key.name)
2591
2592 # create objects in the bucket using multi-part upload
2593 fp = tempfile.NamedTemporaryFile(mode='w+b')
2594 chunk_size = 1024*1024*5 # 5MB
2595 object_size = 10*chunk_size
2596 content = bytearray(os.urandom(object_size))
2597 fp.write(content)
2598 fp.flush()
2599 fp.seek(0)
2600 key_name = 'multipart_foo'
2601 uploader = bucket.initiate_multipart_upload(key_name,
2602 metadata={meta_key: meta_value})
2603 for i in range(1,5):
2604 uploader.upload_part_from_file(fp, i, size=chunk_size)
2605 fp.seek(i*chunk_size)
2606 uploader.complete_upload()
2607 fp.close()
2608
2609 print('wait for 5sec for the messages...')
2610 time.sleep(5)
2611 # check amqp receiver
2612 events = receiver.get_and_reset_events()
2613 for event in events:
2614 value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
2615 assert_equal(value[0], meta_value)
2616
2617 # delete objects
2618 for key in bucket.list():
2619 key.delete()
2620 print('wait for 5sec for the messages...')
2621 time.sleep(5)
2622 # check amqp receiver
2623 events = receiver.get_and_reset_events()
2624 for event in events:
2625 value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
2626 assert_equal(value[0], meta_value)
20effc67
TL
2627
2628 # cleanup
2629 stop_amqp_receiver(receiver, task)
2630 s3_notification_conf.del_config()
2631 topic_conf.del_config()
2632 # delete the bucket
2633 conn.delete_bucket(bucket_name)
2634
2635
2636@attr('amqp_test')
2637def test_ps_s3_tags_on_master():
2638 """ test s3 notification of tags on master """
2639
2640 hostname = get_ip()
2641 conn = connection()
2642 zonegroup = 'default'
2643
2644 # create bucket
2645 bucket_name = gen_bucket_name()
2646 bucket = conn.create_bucket(bucket_name)
2647 topic_name = bucket_name + TOPIC_SUFFIX
2648
2649 # start amqp receiver
2650 exchange = 'ex1'
2651 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2652 task.start()
2653
2654 # create s3 topic
2655 endpoint_address = 'amqp://' + hostname
2656 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
2657 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
2658 topic_arn = topic_conf.set_config()
2659 # create s3 notification
2660 notification_name = bucket_name + NOTIFICATION_SUFFIX
2661 topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
2662 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
2663 'Filter': {
2664 'Tags': {
2665 'FilterRules': [{'Name': 'hello', 'Value': 'world'}, {'Name': 'ka', 'Value': 'boom'}]
2666 }
2667 }
2668 }]
2669
2670 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
2671 response, status = s3_notification_conf.set_config()
2672 assert_equal(status/100, 2)
2673
2674 expected_keys = []
2675 # create objects in the bucket with tags
2676 # key 1 has all the tags in the filter
2677 tags = 'hello=world&ka=boom&hello=helloworld'
2678 key_name1 = 'key1'
2679 put_object_tagging(conn, bucket_name, key_name1, tags)
2680 expected_keys.append(key_name1)
2681 # key 2 has an additional tag not in the filter
2682 tags = 'hello=world&foo=bar&ka=boom&hello=helloworld'
2683 key_name = 'key2'
2684 put_object_tagging(conn, bucket_name, key_name, tags)
2685 expected_keys.append(key_name)
2686 # key 3 has no tags
2687 key_name3 = 'key3'
2688 key = bucket.new_key(key_name3)
2689 key.set_contents_from_string('bar')
2690 # key 4 has the wrong of the multi value tags
2691 tags = 'hello=helloworld&ka=boom'
2692 key_name = 'key4'
2693 put_object_tagging(conn, bucket_name, key_name, tags)
2694 # key 5 has the right of the multi value tags
2695 tags = 'hello=world&ka=boom'
2696 key_name = 'key5'
2697 put_object_tagging(conn, bucket_name, key_name, tags)
2698 expected_keys.append(key_name)
2699 # key 6 is missing a tag
2700 tags = 'hello=world'
2701 key_name = 'key6'
2702 put_object_tagging(conn, bucket_name, key_name, tags)
2703 # create objects in the bucket using COPY
2704 key_name = 'copy_of_'+key_name1
2705 bucket.copy_key(key_name, bucket.name, key_name1)
2706 expected_keys.append(key_name)
2707
2708 print('wait for 5sec for the messages...')
2709 time.sleep(5)
2710 event_count = 0
2711 expected_tags1 = [{'key': 'hello', 'val': 'world'}, {'key': 'hello', 'val': 'helloworld'}, {'key': 'ka', 'val': 'boom'}]
2712 expected_tags1 = sorted(expected_tags1, key=lambda k: k['key']+k['val'])
2713 for event in receiver.get_and_reset_events():
2714 key = event['Records'][0]['s3']['object']['key']
2715 if (key == key_name1):
2716 obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
2717 assert_equal(obj_tags, expected_tags1)
2718 event_count += 1
2719 assert(key in expected_keys)
2720
2721 assert_equal(event_count, len(expected_keys))
2722
2723 # delete the objects
2724 for key in bucket.list():
2725 key.delete()
2726 print('wait for 5sec for the messages...')
2727 time.sleep(5)
2728 event_count = 0
2729 # check amqp receiver
2730 for event in receiver.get_and_reset_events():
2731 key = event['Records'][0]['s3']['object']['key']
2732 if (key == key_name1):
2733 obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
2734 assert_equal(obj_tags, expected_tags1)
2735 event_count += 1
2736 assert(key in expected_keys)
2737
2738 assert(event_count == len(expected_keys))
2739
2740 # cleanup
2741 stop_amqp_receiver(receiver, task)
2742 s3_notification_conf.del_config()
2743 topic_conf.del_config()
2744 # delete the bucket
2745 conn.delete_bucket(bucket_name)
2746
2747@attr('amqp_test')
2748def test_ps_s3_versioning_on_master():
2749 """ test s3 notification of object versions """
2750
2751 hostname = get_ip()
2752 conn = connection()
2753 zonegroup = 'default'
2754
2755 # create bucket
2756 bucket_name = gen_bucket_name()
2757 bucket = conn.create_bucket(bucket_name)
2758 bucket.configure_versioning(True)
2759 topic_name = bucket_name + TOPIC_SUFFIX
2760
2761 # start amqp receiver
2762 exchange = 'ex1'
2763 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2764 task.start()
2765
2766 # create s3 topic
2767 endpoint_address = 'amqp://' + hostname
2768 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
2769 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
2770 topic_arn = topic_conf.set_config()
2771 # create notification
2772 notification_name = bucket_name + NOTIFICATION_SUFFIX
2773 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
2774 'Events': []
2775 }]
2776 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
2777 _, status = s3_notification_conf.set_config()
2778 assert_equal(status/100, 2)
2779
2780 # create objects in the bucket
2781 key_name = 'foo'
2782 key = bucket.new_key(key_name)
2783 key.set_contents_from_string('hello')
2784 ver1 = key.version_id
2785 key.set_contents_from_string('world')
2786 ver2 = key.version_id
2787 copy_of_key = bucket.copy_key('copy_of_foo', bucket.name, key_name, src_version_id=ver1)
2788 ver3 = copy_of_key.version_id
2789 versions = [ver1, ver2, ver3]
2790
2791 print('wait for 5sec for the messages...')
2792 time.sleep(5)
2793
2794 # check amqp receiver
2795 events = receiver.get_and_reset_events()
2796 num_of_versions = 0
2797 for event_list in events:
2798 for event in event_list['Records']:
2799 assert event['s3']['object']['key'] in (key_name, copy_of_key.name)
2800 version = event['s3']['object']['versionId']
2801 num_of_versions += 1
2802 if version not in versions:
2803 print('version mismatch: '+version+' not in: '+str(versions))
2804 # TODO: copy_key() does not return the version of the copied object
2805 #assert False
2806 else:
2807 print('version ok: '+version+' in: '+str(versions))
2808
2809 assert_equal(num_of_versions, 3)
2810
2811 # cleanup
2812 stop_amqp_receiver(receiver, task)
2813 s3_notification_conf.del_config()
2814 topic_conf.del_config()
2815 # delete the bucket
2816 bucket.delete_key(copy_of_key, version_id=ver3)
2817 bucket.delete_key(key.name, version_id=ver2)
2818 bucket.delete_key(key.name, version_id=ver1)
2819 #conn.delete_bucket(bucket_name)
2820
2821
2822@attr('amqp_test')
2823def test_ps_s3_versioned_deletion_on_master():
2824 """ test s3 notification of deletion markers on master """
2825
2826 hostname = get_ip()
2827 conn = connection()
2828 zonegroup = 'default'
2829
2830 # create bucket
2831 bucket_name = gen_bucket_name()
2832 bucket = conn.create_bucket(bucket_name)
2833 bucket.configure_versioning(True)
2834 topic_name = bucket_name + TOPIC_SUFFIX
2835
2836 # start amqp receiver
2837 exchange = 'ex1'
2838 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
2839 task.start()
2840
2841 # create s3 topic
2842 endpoint_address = 'amqp://' + hostname
2843 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
2844 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
2845 topic_arn = topic_conf.set_config()
2846 # create s3 notification
2847 notification_name = bucket_name + NOTIFICATION_SUFFIX
2848 topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
2849 'Events': ['s3:ObjectRemoved:*']
2850 },
2851 {'Id': notification_name+'_2', 'TopicArn': topic_arn,
2852 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
2853 },
2854 {'Id': notification_name+'_3', 'TopicArn': topic_arn,
2855 'Events': ['s3:ObjectRemoved:Delete']
2856 }]
2857 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
2858 response, status = s3_notification_conf.set_config()
2859 assert_equal(status/100, 2)
2860
2861 # create objects in the bucket
2862 key = bucket.new_key('foo')
2863 content = str(os.urandom(512))
2864 size1 = len(content)
2865 key.set_contents_from_string(content)
2866 ver1 = key.version_id
2867 content = str(os.urandom(511))
2868 size2 = len(content)
2869 key.set_contents_from_string(content)
2870 ver2 = key.version_id
2871 # create delete marker (non versioned deletion)
2872 delete_marker_key = bucket.delete_key(key.name)
2873 versions = [ver1, ver2, delete_marker_key.version_id]
2874
2875 time.sleep(1)
2876
2877 # versioned deletion
2878 bucket.delete_key(key.name, version_id=ver2)
2879 bucket.delete_key(key.name, version_id=ver1)
2880
2881 print('wait for 5sec for the messages...')
2882 time.sleep(5)
2883
2884 # check amqp receiver
2885 events = receiver.get_and_reset_events()
2886 delete_events = 0
2887 delete_marker_create_events = 0
2888 for event_list in events:
2889 for event in event_list['Records']:
2890 version = event['s3']['object']['versionId']
2891 size = event['s3']['object']['size']
2892 if version not in versions:
2893 print('version mismatch: '+version+' not in: '+str(versions))
2894 assert False
2895 else:
2896 print('version ok: '+version+' in: '+str(versions))
2897 if event['eventName'] == 'ObjectRemoved:Delete':
2898 delete_events += 1
2899 assert size in [size1, size2]
2900 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
2901 if event['eventName'] == 'ObjectRemoved:DeleteMarkerCreated':
2902 delete_marker_create_events += 1
2903 assert size == size2
2904 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
2905
2906 # 2 key versions were deleted
2907 # notified over the same topic via 2 notifications (1,3)
2908 assert_equal(delete_events, 2*2)
2909 # 1 deletion marker was created
2910 # notified over the same topic over 2 notifications (1,2)
2911 assert_equal(delete_marker_create_events, 1*2)
2912
2913 # cleanup
2914 delete_marker_key.delete()
2915 stop_amqp_receiver(receiver, task)
2916 s3_notification_conf.del_config()
2917 topic_conf.del_config()
2918 # delete the bucket
2919 conn.delete_bucket(bucket_name)
2920
2921
2922@attr('manual_test')
2923def test_ps_s3_persistent_cleanup():
2924 """ test reservation cleanup after gateway crash """
2925 return SkipTest("only used in manual testing")
2926 conn = connection()
2927 zonegroup = 'default'
2928
2929 # create random port for the http server
2930 host = get_ip()
2931 port = random.randint(10000, 20000)
2932 # start an http server in a separate thread
2933 number_of_objects = 200
2934 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
2935
2936 gw = conn
2937
2938 # create bucket
2939 bucket_name = gen_bucket_name()
2940 bucket = gw.create_bucket(bucket_name)
2941 topic_name = bucket_name + TOPIC_SUFFIX
2942
2943 # create s3 topic
2944 endpoint_address = 'http://'+host+':'+str(port)
2945 endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
2946 topic_conf = PSTopicS3(gw, topic_name, zonegroup, endpoint_args=endpoint_args)
2947 topic_arn = topic_conf.set_config()
2948
2949 # create s3 notification
2950 notification_name = bucket_name + NOTIFICATION_SUFFIX
2951 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
2952 'Events': ['s3:ObjectCreated:Put']
2953 }]
2954 s3_notification_conf = PSNotificationS3(gw, bucket_name, topic_conf_list)
2955 response, status = s3_notification_conf.set_config()
2956 assert_equal(status/100, 2)
2957
2958 client_threads = []
2959 start_time = time.time()
2960 for i in range(number_of_objects):
2961 key = bucket.new_key(str(i))
2962 content = str(os.urandom(1024*1024))
2963 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
2964 thr.start()
2965 client_threads.append(thr)
2966 # stop gateway while clients are sending
2967 os.system("killall -9 radosgw");
2968 print('wait for 10 sec for before restarting the gateway')
2969 time.sleep(10)
2970 # TODO: start the radosgw
2971 [thr.join() for thr in client_threads]
2972
2973 keys = list(bucket.list())
2974
2975 # delete objects from the bucket
2976 client_threads = []
2977 start_time = time.time()
2978 for key in bucket.list():
2979 thr = threading.Thread(target = key.delete, args=())
2980 thr.start()
2981 client_threads.append(thr)
2982 [thr.join() for thr in client_threads]
2983
2984 # check http receiver
2985 events = http_server.get_and_reset_events()
2986
2987 print(str(len(events) ) + " events found out of " + str(number_of_objects))
2988
2989 # make sure that things are working now
2990 client_threads = []
2991 start_time = time.time()
2992 for i in range(number_of_objects):
2993 key = bucket.new_key(str(i))
2994 content = str(os.urandom(1024*1024))
2995 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
2996 thr.start()
2997 client_threads.append(thr)
2998 [thr.join() for thr in client_threads]
2999
3000 keys = list(bucket.list())
3001
3002 # delete objects from the bucket
3003 client_threads = []
3004 start_time = time.time()
3005 for key in bucket.list():
3006 thr = threading.Thread(target = key.delete, args=())
3007 thr.start()
3008 client_threads.append(thr)
3009 [thr.join() for thr in client_threads]
3010
3011 print('wait for 180 sec for reservations to be stale before queue deletion')
3012 time.sleep(180)
3013
3014 # check http receiver
3015 events = http_server.get_and_reset_events()
3016
3017 print(str(len(events)) + " events found out of " + str(number_of_objects))
3018
3019 # cleanup
3020 s3_notification_conf.del_config()
3021 topic_conf.del_config()
3022 gw.delete_bucket(bucket_name)
3023 http_server.close()
3024
3025
3026@attr('manual_test')
3027def test_ps_s3_persistent_notification_pushback():
3028 """ test pushing persistent notification pushback """
3029 return SkipTest("only used in manual testing")
3030 conn = connection()
3031 zonegroup = 'default'
3032
3033 # create random port for the http server
3034 host = get_ip()
3035 port = random.randint(10000, 20000)
3036 # start an http server in a separate thread
3037 http_server = StreamingHTTPServer(host, port, num_workers=10, delay=0.5)
3038
3039 # create bucket
3040 bucket_name = gen_bucket_name()
3041 bucket = conn.create_bucket(bucket_name)
3042 topic_name = bucket_name + TOPIC_SUFFIX
3043
3044 # create s3 topic
3045 endpoint_address = 'http://'+host+':'+str(port)
3046 endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
3047 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
3048 topic_arn = topic_conf.set_config()
3049 # create s3 notification
3050 notification_name = bucket_name + NOTIFICATION_SUFFIX
3051 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
3052 'Events': []
3053 }]
3054
3055 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
3056 response, status = s3_notification_conf.set_config()
3057 assert_equal(status/100, 2)
3058
3059 # create objects in the bucket (async)
3060 for j in range(100):
3061 number_of_objects = randint(500, 1000)
3062 client_threads = []
3063 start_time = time.time()
3064 for i in range(number_of_objects):
3065 key = bucket.new_key(str(j)+'-'+str(i))
3066 content = str(os.urandom(1024*1024))
3067 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3068 thr.start()
3069 client_threads.append(thr)
3070 [thr.join() for thr in client_threads]
3071 time_diff = time.time() - start_time
3072 print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3073
3074 keys = list(bucket.list())
3075
3076 delay = 30
3077 print('wait for '+str(delay)+'sec for the messages...')
3078 time.sleep(delay)
3079
3080 # delete objects from the bucket
3081 client_threads = []
3082 start_time = time.time()
3083 count = 0
3084 for key in bucket.list():
3085 count += 1
3086 thr = threading.Thread(target = key.delete, args=())
3087 thr.start()
3088 client_threads.append(thr)
3089 if count%100 == 0:
3090 [thr.join() for thr in client_threads]
3091 time_diff = time.time() - start_time
3092 print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3093 client_threads = []
3094 start_time = time.time()
3095
3096 print('wait for '+str(delay)+'sec for the messages...')
3097 time.sleep(delay)
3098
3099 # cleanup
3100 s3_notification_conf.del_config()
3101 topic_conf.del_config()
3102 # delete the bucket
3103 conn.delete_bucket(bucket_name)
3104 time.sleep(delay)
3105 http_server.close()
3106
3107
1e59de90 3108@attr('kafka_test')
20effc67
TL
3109def test_ps_s3_notification_kafka_idle_behaviour():
3110 """ test pushing kafka s3 notification idle behaviour check """
3111 # TODO convert this test to actual running test by changing
3112 # os.system call to verify the process idleness
20effc67
TL
3113 conn = connection()
3114 zonegroup = 'default'
3115
3116 # create bucket
3117 bucket_name = gen_bucket_name()
3118 bucket = conn.create_bucket(bucket_name)
3119 # name is constant for manual testing
3120 topic_name = bucket_name+'_topic'
3121 # create consumer on the topic
3122
3123 task, receiver = create_kafka_receiver_thread(topic_name+'_1')
3124 task.start()
3125
3126 # create s3 topic
3127 endpoint_address = 'kafka://' + kafka_server
3128 # with acks from broker
3129 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
3130 topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
3131 topic_arn1 = topic_conf1.set_config()
3132 # create s3 notification
3133 notification_name = bucket_name + NOTIFICATION_SUFFIX
3134 topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
3135 'Events': []
3136 }]
3137
3138 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
3139 response, status = s3_notification_conf.set_config()
3140 assert_equal(status/100, 2)
3141
3142 # create objects in the bucket (async)
3143 number_of_objects = 10
3144 client_threads = []
3145 etags = []
3146 start_time = time.time()
3147 for i in range(number_of_objects):
3148 key = bucket.new_key(str(i))
3149 content = str(os.urandom(1024*1024))
3150 etag = hashlib.md5(content.encode()).hexdigest()
3151 etags.append(etag)
3152 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3153 thr.start()
3154 client_threads.append(thr)
3155 [thr.join() for thr in client_threads]
3156
3157 time_diff = time.time() - start_time
3158 print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3159
3160 print('wait for 5sec for the messages...')
3161 time.sleep(5)
3162 keys = list(bucket.list())
3163 receiver.verify_s3_events(keys, exact_match=True, etags=etags)
3164
3165 # delete objects from the bucket
3166 client_threads = []
3167 start_time = time.time()
3168 for key in bucket.list():
3169 thr = threading.Thread(target = key.delete, args=())
3170 thr.start()
3171 client_threads.append(thr)
3172 [thr.join() for thr in client_threads]
3173
3174 time_diff = time.time() - start_time
3175 print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3176
3177 print('wait for 5sec for the messages...')
3178 time.sleep(5)
3179 receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
3180
1e59de90 3181 is_idle = False
20effc67 3182
1e59de90
TL
3183 while not is_idle:
3184 print('waiting for 10sec for checking idleness')
3185 time.sleep(10)
3186 cmd = "netstat -nnp | grep 9092 | grep radosgw"
3187 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
3188 out = proc.communicate()[0]
3189 if len(out) == 0:
3190 is_idle = True
3191 else:
3192 print("radosgw<->kafka connection is not idle")
3193 print(out.decode('utf-8'))
20effc67
TL
3194
3195 # do the process of uploading an object and checking for notification again
3196 number_of_objects = 10
3197 client_threads = []
3198 etags = []
3199 start_time = time.time()
3200 for i in range(number_of_objects):
3201 key = bucket.new_key(str(i))
3202 content = str(os.urandom(1024*1024))
3203 etag = hashlib.md5(content.encode()).hexdigest()
3204 etags.append(etag)
3205 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3206 thr.start()
3207 client_threads.append(thr)
3208 [thr.join() for thr in client_threads]
3209
3210 time_diff = time.time() - start_time
3211 print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3212
3213 print('wait for 5sec for the messages...')
3214 time.sleep(5)
3215 keys = list(bucket.list())
3216 receiver.verify_s3_events(keys, exact_match=True, etags=etags)
3217
3218 # delete objects from the bucket
3219 client_threads = []
3220 start_time = time.time()
3221 for key in bucket.list():
3222 thr = threading.Thread(target = key.delete, args=())
3223 thr.start()
3224 client_threads.append(thr)
3225 [thr.join() for thr in client_threads]
3226
3227 time_diff = time.time() - start_time
3228 print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3229
3230 print('wait for 5sec for the messages...')
3231 time.sleep(5)
3232 receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
3233
3234 # cleanup
3235 s3_notification_conf.del_config()
3236 topic_conf1.del_config()
3237 # delete the bucket
3238 conn.delete_bucket(bucket_name)
3239 stop_kafka_receiver(receiver, task)
3240
3241
3242@attr('modification_required')
3243def test_ps_s3_persistent_gateways_recovery():
3244 """ test gateway recovery of persistent notifications """
3245 return SkipTest('This test requires two gateways.')
3246
3247 conn = connection()
3248 zonegroup = 'default'
3249 # create random port for the http server
3250 host = get_ip()
3251 port = random.randint(10000, 20000)
3252 # start an http server in a separate thread
3253 number_of_objects = 10
3254 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
3255 gw1 = conn
3256 gw2 = connection2()
3257 # create bucket
3258 bucket_name = gen_bucket_name()
3259 bucket = gw1.create_bucket(bucket_name)
3260 topic_name = bucket_name + TOPIC_SUFFIX
3261 # create two s3 topics
3262 endpoint_address = 'http://'+host+':'+str(port)
3263 endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
3264 topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw1')
3265 topic_arn1 = topic_conf1.set_config()
3266 topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw2')
3267 topic_arn2 = topic_conf2.set_config()
3268 # create two s3 notifications
3269 notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
3270 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
3271 'Events': ['s3:ObjectCreated:Put']
3272 }]
3273 s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list)
3274 response, status = s3_notification_conf1.set_config()
3275 assert_equal(status/100, 2)
3276 notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
3277 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
3278 'Events': ['s3:ObjectRemoved:Delete']
3279 }]
3280 s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list)
3281 response, status = s3_notification_conf2.set_config()
3282 assert_equal(status/100, 2)
3283 # stop gateway 2
3284 print('stopping gateway2...')
3285 client_threads = []
3286 start_time = time.time()
3287 for i in range(number_of_objects):
3288 key = bucket.new_key(str(i))
3289 content = str(os.urandom(1024*1024))
3290 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3291 thr.start()
3292 client_threads.append(thr)
3293 [thr.join() for thr in client_threads]
3294 keys = list(bucket.list())
3295 # delete objects from the bucket
3296 client_threads = []
3297 start_time = time.time()
3298 for key in bucket.list():
3299 thr = threading.Thread(target = key.delete, args=())
3300 thr.start()
3301 client_threads.append(thr)
3302 [thr.join() for thr in client_threads]
3303 print('wait for 60 sec for before restarting the gateway')
3304 time.sleep(60)
3305 # check http receiver
3306 events = http_server.get_and_reset_events()
3307 for key in keys:
3308 creations = 0
3309 deletions = 0
3310 for event in events:
3311 if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
3312 key.name == event['Records'][0]['s3']['object']['key']:
3313 creations += 1
3314 elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
3315 key.name == event['Records'][0]['s3']['object']['key']:
3316 deletions += 1
3317 assert_equal(creations, 1)
3318 assert_equal(deletions, 1)
3319 # cleanup
3320 s3_notification_conf1.del_config()
3321 topic_conf1.del_config()
3322 gw1.delete_bucket(bucket_name)
3323 time.sleep(10)
3324 s3_notification_conf2.del_config()
3325 topic_conf2.del_config()
3326 http_server.close()
3327
3328
3329@attr('modification_required')
3330def test_ps_s3_persistent_multiple_gateways():
3331 """ test pushing persistent notification via two gateways """
3332 return SkipTest('This test requires two gateways.')
3333
3334 conn = connection()
3335 zonegroup = 'default'
3336 # create random port for the http server
3337 host = get_ip()
3338 port = random.randint(10000, 20000)
3339 # start an http server in a separate thread
3340 number_of_objects = 10
3341 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
3342 gw1 = conn
3343 gw2 = connection2()
3344 # create bucket
3345 bucket_name = gen_bucket_name()
3346 bucket1 = gw1.create_bucket(bucket_name)
3347 bucket2 = gw2.get_bucket(bucket_name)
3348 topic_name = bucket_name + TOPIC_SUFFIX
3349 # create two s3 topics
3350 endpoint_address = 'http://'+host+':'+str(port)
3351 endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
3352 topic1_opaque = 'fromgw1'
3353 topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic1_opaque)
3354 topic_arn1 = topic_conf1.set_config()
3355 topic2_opaque = 'fromgw2'
3356 topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic2_opaque)
3357 topic_arn2 = topic_conf2.set_config()
3358 # create two s3 notifications
3359 notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
3360 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
3361 'Events': []
3362 }]
3363 s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list)
3364 response, status = s3_notification_conf1.set_config()
3365 assert_equal(status/100, 2)
3366 notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
3367 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
3368 'Events': []
3369 }]
3370 s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list)
3371 response, status = s3_notification_conf2.set_config()
3372 assert_equal(status/100, 2)
3373 client_threads = []
3374 start_time = time.time()
3375 for i in range(number_of_objects):
3376 key = bucket1.new_key('gw1_'+str(i))
3377 content = str(os.urandom(1024*1024))
3378 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3379 thr.start()
3380 client_threads.append(thr)
3381 key = bucket2.new_key('gw2_'+str(i))
3382 content = str(os.urandom(1024*1024))
3383 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3384 thr.start()
3385 client_threads.append(thr)
3386 [thr.join() for thr in client_threads]
3387 keys = list(bucket1.list())
3388 delay = 30
3389 print('wait for '+str(delay)+'sec for the messages...')
3390 time.sleep(delay)
3391 events = http_server.get_and_reset_events()
3392 for key in keys:
3393 topic1_count = 0
3394 topic2_count = 0
3395 for event in events:
3396 if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
3397 key.name == event['Records'][0]['s3']['object']['key'] and \
3398 topic1_opaque == event['Records'][0]['opaqueData']:
3399 topic1_count += 1
3400 elif event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
3401 key.name == event['Records'][0]['s3']['object']['key'] and \
3402 topic2_opaque == event['Records'][0]['opaqueData']:
3403 topic2_count += 1
3404 assert_equal(topic1_count, 1)
3405 assert_equal(topic2_count, 1)
3406 # delete objects from the bucket
3407 client_threads = []
3408 start_time = time.time()
3409 for key in bucket1.list():
3410 thr = threading.Thread(target = key.delete, args=())
3411 thr.start()
3412 client_threads.append(thr)
3413 [thr.join() for thr in client_threads]
3414 print('wait for '+str(delay)+'sec for the messages...')
3415 time.sleep(delay)
3416 events = http_server.get_and_reset_events()
3417 for key in keys:
3418 topic1_count = 0
3419 topic2_count = 0
3420 for event in events:
3421 if event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
3422 key.name == event['Records'][0]['s3']['object']['key'] and \
3423 topic1_opaque == event['Records'][0]['opaqueData']:
3424 topic1_count += 1
3425 elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
3426 key.name == event['Records'][0]['s3']['object']['key'] and \
3427 topic2_opaque == event['Records'][0]['opaqueData']:
3428 topic2_count += 1
3429 assert_equal(topic1_count, 1)
3430 assert_equal(topic2_count, 1)
3431 # cleanup
3432 s3_notification_conf1.del_config()
3433 topic_conf1.del_config()
3434 s3_notification_conf2.del_config()
3435 topic_conf2.del_config()
3436 gw1.delete_bucket(bucket_name)
3437 http_server.close()
3438
3439
3440@attr('http_test')
3441def test_ps_s3_persistent_multiple_endpoints():
3442 """ test pushing persistent notification when one of the endpoints has error """
3443 conn = connection()
3444 zonegroup = 'default'
3445
3446 # create random port for the http server
3447 host = get_ip()
3448 port = random.randint(10000, 20000)
3449 # start an http server in a separate thread
3450 number_of_objects = 10
3451 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
3452
3453 # create bucket
3454 bucket_name = gen_bucket_name()
3455 bucket = conn.create_bucket(bucket_name)
3456 topic_name = bucket_name + TOPIC_SUFFIX
3457
3458 # create two s3 topics
3459 endpoint_address = 'http://'+host+':'+str(port)
3460 endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
3461 topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
3462 topic_arn1 = topic_conf1.set_config()
3463 endpoint_address = 'http://kaboom:9999'
3464 endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
3465 topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
3466 topic_arn2 = topic_conf2.set_config()
3467
3468 # create two s3 notifications
3469 notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
3470 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
3471 'Events': []
3472 }]
3473 s3_notification_conf1 = PSNotificationS3(conn, bucket_name, topic_conf_list)
3474 response, status = s3_notification_conf1.set_config()
3475 assert_equal(status/100, 2)
3476 notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
3477 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
3478 'Events': []
3479 }]
3480 s3_notification_conf2 = PSNotificationS3(conn, bucket_name, topic_conf_list)
3481 response, status = s3_notification_conf2.set_config()
3482 assert_equal(status/100, 2)
3483
3484 client_threads = []
3485 start_time = time.time()
3486 for i in range(number_of_objects):
3487 key = bucket.new_key(str(i))
3488 content = str(os.urandom(1024*1024))
3489 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3490 thr.start()
3491 client_threads.append(thr)
3492 [thr.join() for thr in client_threads]
3493
3494 keys = list(bucket.list())
3495
3496 delay = 30
3497 print('wait for '+str(delay)+'sec for the messages...')
3498 time.sleep(delay)
3499
3500 http_server.verify_s3_events(keys, exact_match=False, deletions=False)
3501
3502 # delete objects from the bucket
3503 client_threads = []
3504 start_time = time.time()
3505 for key in bucket.list():
3506 thr = threading.Thread(target = key.delete, args=())
3507 thr.start()
3508 client_threads.append(thr)
3509 [thr.join() for thr in client_threads]
3510
3511 print('wait for '+str(delay)+'sec for the messages...')
3512 time.sleep(delay)
3513
3514 http_server.verify_s3_events(keys, exact_match=False, deletions=True)
3515
3516 # cleanup
3517 s3_notification_conf1.del_config()
3518 topic_conf1.del_config()
3519 s3_notification_conf2.del_config()
3520 topic_conf2.del_config()
3521 conn.delete_bucket(bucket_name)
3522 http_server.close()
3523
3524def persistent_notification(endpoint_type):
3525 """ test pushing persistent notification """
3526 conn = connection()
3527 zonegroup = 'default'
3528
3529 # create bucket
3530 bucket_name = gen_bucket_name()
3531 bucket = conn.create_bucket(bucket_name)
3532 topic_name = bucket_name + TOPIC_SUFFIX
3533
3534 receiver = {}
3535 host = get_ip()
3536 if endpoint_type == 'http':
3537 # create random port for the http server
3538 host = get_ip_http()
3539 port = random.randint(10000, 20000)
3540 # start an http server in a separate thread
3541 receiver = StreamingHTTPServer(host, port, num_workers=10)
3542 endpoint_address = 'http://'+host+':'+str(port)
3543 endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
3544 # the http server does not guarantee order, so duplicates are expected
3545 exact_match = False
3546 elif endpoint_type == 'amqp':
3547 # start amqp receiver
3548 exchange = 'ex1'
3549 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3550 task.start()
3551 endpoint_address = 'amqp://' + host
3552 endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true'
3553 # amqp broker guarantee ordering
3554 exact_match = True
1e59de90
TL
3555 elif endpoint_type == 'kafka':
3556 # start amqp receiver
3557 task, receiver = create_kafka_receiver_thread(topic_name)
3558 task.start()
3559 endpoint_address = 'kafka://' + host
3560 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true'
3561 # amqp broker guarantee ordering
3562 exact_match = True
20effc67
TL
3563 else:
3564 return SkipTest('Unknown endpoint type: ' + endpoint_type)
3565
3566
3567 # create s3 topic
3568 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
3569 topic_arn = topic_conf.set_config()
3570 # create s3 notification
3571 notification_name = bucket_name + NOTIFICATION_SUFFIX
3572 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
3573 'Events': []
3574 }]
3575
3576 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
3577 response, status = s3_notification_conf.set_config()
3578 assert_equal(status/100, 2)
3579
3580 # create objects in the bucket (async)
3581 number_of_objects = 100
3582 client_threads = []
3583 start_time = time.time()
3584 for i in range(number_of_objects):
3585 key = bucket.new_key(str(i))
3586 content = str(os.urandom(1024*1024))
3587 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3588 thr.start()
3589 client_threads.append(thr)
3590 [thr.join() for thr in client_threads]
3591
3592 time_diff = time.time() - start_time
3593 print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3594
3595 keys = list(bucket.list())
3596
3597 delay = 40
3598 print('wait for '+str(delay)+'sec for the messages...')
3599 time.sleep(delay)
3600
3601 receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
3602
3603 # delete objects from the bucket
3604 client_threads = []
3605 start_time = time.time()
3606 for key in bucket.list():
3607 thr = threading.Thread(target = key.delete, args=())
3608 thr.start()
3609 client_threads.append(thr)
3610 [thr.join() for thr in client_threads]
3611
3612 time_diff = time.time() - start_time
3613 print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3614
3615 print('wait for '+str(delay)+'sec for the messages...')
3616 time.sleep(delay)
3617
3618 receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
3619
3620 # cleanup
3621 s3_notification_conf.del_config()
3622 topic_conf.del_config()
3623 # delete the bucket
3624 conn.delete_bucket(bucket_name)
3625 if endpoint_type == 'http':
3626 receiver.close()
3627 else:
3628 stop_amqp_receiver(receiver, task)
3629
3630
3631@attr('http_test')
3632def test_ps_s3_persistent_notification_http():
3633 """ test pushing persistent notification http """
3634 persistent_notification('http')
3635
3636
3637@attr('amqp_test')
3638def test_ps_s3_persistent_notification_amqp():
3639 """ test pushing persistent notification amqp """
20effc67
TL
3640 persistent_notification('amqp')
3641
1e59de90 3642
20effc67
TL
3643@attr('kafka_test')
3644def test_ps_s3_persistent_notification_kafka():
1e59de90 3645 """ test pushing persistent notification kafka """
20effc67 3646 persistent_notification('kafka')
1e59de90 3647
20effc67
TL
3648
3649def random_string(length):
3650 import string
3651 letters = string.ascii_letters
3652 return ''.join(random.choice(letters) for i in range(length))
3653
3654
3655@attr('amqp_test')
3656def test_ps_s3_persistent_notification_large():
3657 """ test pushing persistent notification of large notifications """
3658
3659 conn = connection()
3660 zonegroup = 'default'
3661
3662 # create bucket
3663 bucket_name = gen_bucket_name()
3664 bucket = conn.create_bucket(bucket_name)
3665 topic_name = bucket_name + TOPIC_SUFFIX
3666
3667 receiver = {}
3668 host = get_ip()
3669 # start amqp receiver
3670 exchange = 'ex1'
3671 task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3672 task.start()
3673 endpoint_address = 'amqp://' + host
3674 opaque_data = random_string(1024*2)
3675 endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true'
3676 # amqp broker guarantee ordering
3677 exact_match = True
3678
3679 # create s3 topic
3680 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
3681 topic_arn = topic_conf.set_config()
3682 # create s3 notification
3683 notification_name = bucket_name + NOTIFICATION_SUFFIX
3684 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
3685 'Events': []
3686 }]
3687
3688 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
3689 response, status = s3_notification_conf.set_config()
3690 assert_equal(status/100, 2)
3691
3692 # create objects in the bucket (async)
3693 number_of_objects = 100
3694 client_threads = []
3695 start_time = time.time()
3696 for i in range(number_of_objects):
3697 key_value = random_string(63)
3698 key = bucket.new_key(key_value)
3699 content = str(os.urandom(1024*1024))
3700 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3701 thr.start()
3702 client_threads.append(thr)
3703 [thr.join() for thr in client_threads]
3704
3705 time_diff = time.time() - start_time
3706 print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3707
3708 keys = list(bucket.list())
3709
3710 delay = 40
3711 print('wait for '+str(delay)+'sec for the messages...')
3712 time.sleep(delay)
3713
3714 receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
3715
3716 # delete objects from the bucket
3717 client_threads = []
3718 start_time = time.time()
3719 for key in bucket.list():
3720 thr = threading.Thread(target = key.delete, args=())
3721 thr.start()
3722 client_threads.append(thr)
3723 [thr.join() for thr in client_threads]
3724
3725 time_diff = time.time() - start_time
3726 print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3727
3728 print('wait for '+str(delay)+'sec for the messages...')
3729 time.sleep(delay)
3730
3731 receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
3732
3733 # cleanup
3734 s3_notification_conf.del_config()
3735 topic_conf.del_config()
3736 # delete the bucket
3737 conn.delete_bucket(bucket_name)
3738 stop_amqp_receiver(receiver, task)
3739
3740
3741@attr('modification_required')
3742def test_ps_s3_topic_update():
3743 """ test updating topic associated with a notification"""
3744 return SkipTest('This test is yet to be modified.')
3745
3746 conn = connection()
3747 ps_zone = None
3748 bucket_name = gen_bucket_name()
3749 topic_name = bucket_name+TOPIC_SUFFIX
3750 # create amqp topic
3751 hostname = get_ip()
3752 exchange = 'ex1'
3753 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
3754 amqp_task.start()
3755 #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3756 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3757
3758 topic_arn = topic_conf.set_config()
3759 #result, status = topic_conf.set_config()
3760 #assert_equal(status/100, 2)
3761 parsed_result = json.loads(result)
3762 topic_arn = parsed_result['arn']
3763 # get topic
3764 result, _ = topic_conf.get_config()
3765 # verify topic content
3766 parsed_result = json.loads(result)
3767 assert_equal(parsed_result['topic']['name'], topic_name)
3768 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
3769 # create http server
3770 port = random.randint(10000, 20000)
3771 # start an http server in a separate thread
3772 http_server = StreamingHTTPServer(hostname, port)
3773 # create bucket on the first of the rados zones
3774 bucket = conn.create_bucket(bucket_name)
3775 # create s3 notification
3776 notification_name = bucket_name + NOTIFICATION_SUFFIX
3777 topic_conf_list = [{'Id': notification_name,
3778 'TopicArn': topic_arn,
3779 'Events': ['s3:ObjectCreated:*']
3780 }]
3781 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3782 _, status = s3_notification_conf.set_config()
3783 assert_equal(status/100, 2)
3784 # create objects in the bucket
3785 number_of_objects = 10
3786 for i in range(number_of_objects):
3787 key = bucket.new_key(str(i))
3788 key.set_contents_from_string('bar')
3789 # wait for sync
3790 #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3791 keys = list(bucket.list())
3792 # TODO: use exact match
3793 receiver.verify_s3_events(keys, exact_match=False)
3794 # update the same topic with new endpoint
3795 #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='http://'+ hostname + ':' + str(port))
3796 topic_conf = PSTopicS3(conn, topic_name, endpoint_args='http://'+ hostname + ':' + str(port))
3797 _, status = topic_conf.set_config()
3798 assert_equal(status/100, 2)
3799 # get topic
3800 result, _ = topic_conf.get_config()
3801 # verify topic content
3802 parsed_result = json.loads(result)
3803 assert_equal(parsed_result['topic']['name'], topic_name)
3804 assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
3805 # delete current objects and create new objects in the bucket
3806 for key in bucket.list():
3807 key.delete()
3808 for i in range(number_of_objects):
3809 key = bucket.new_key(str(i+100))
3810 key.set_contents_from_string('bar')
3811 # wait for sync
3812 #zone_meta_checkpoint(ps_zone.zone)
3813 #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3814 keys = list(bucket.list())
3815 # verify that notifications are still sent to amqp
3816 # TODO: use exact match
3817 receiver.verify_s3_events(keys, exact_match=False)
3818 # update notification to update the endpoint from the topic
3819 topic_conf_list = [{'Id': notification_name,
3820 'TopicArn': topic_arn,
3821 'Events': ['s3:ObjectCreated:*']
3822 }]
3823 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3824 _, status = s3_notification_conf.set_config()
3825 assert_equal(status/100, 2)
3826 # delete current objects and create new objects in the bucket
3827 for key in bucket.list():
3828 key.delete()
3829 for i in range(number_of_objects):
3830 key = bucket.new_key(str(i+200))
3831 key.set_contents_from_string('bar')
3832 # wait for sync
3833 #zone_meta_checkpoint(ps_zone.zone)
3834 #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3835 keys = list(bucket.list())
3836 # check that updates switched to http
3837 # TODO: use exact match
3838 http_server.verify_s3_events(keys, exact_match=False)
3839 # cleanup
3840 # delete objects from the bucket
3841 stop_amqp_receiver(receiver, amqp_task)
3842 for key in bucket.list():
3843 key.delete()
3844 s3_notification_conf.del_config()
3845 topic_conf.del_config()
3846 conn.delete_bucket(bucket_name)
3847 http_server.close()
3848
3849
3850@attr('modification_required')
3851def test_ps_s3_notification_update():
3852 """ test updating the topic of a notification"""
3853 return SkipTest('This test is yet to be modified.')
3854
3855 hostname = get_ip()
3856 conn = connection()
3857 ps_zone = None
3858 bucket_name = gen_bucket_name()
3859 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
3860 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
3861 zonegroup = 'default'
3862 # create topics
3863 # start amqp receiver in a separate thread
3864 exchange = 'ex1'
3865 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
3866 amqp_task.start()
3867 # create random port for the http server
3868 http_port = random.randint(10000, 20000)
3869 # start an http server in a separate thread
3870 http_server = StreamingHTTPServer(hostname, http_port)
3871 #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3872 topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3873 result, status = topic_conf1.set_config()
3874 parsed_result = json.loads(result)
3875 topic_arn1 = parsed_result['arn']
3876 assert_equal(status/100, 2)
3877 #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port))
3878 topic_conf2 = PSTopicS3(conn, topic_name2, endpoint_args='http://'+hostname+':'+str(http_port))
3879 result, status = topic_conf2.set_config()
3880 parsed_result = json.loads(result)
3881 topic_arn2 = parsed_result['arn']
3882 assert_equal(status/100, 2)
3883 # create bucket on the first of the rados zones
3884 bucket = conn.create_bucket(bucket_name)
3885 # wait for sync
3886 #zone_meta_checkpoint(ps_zone.zone)
3887 # create s3 notification with topic1
3888 notification_name = bucket_name + NOTIFICATION_SUFFIX
3889 topic_conf_list = [{'Id': notification_name,
3890 'TopicArn': topic_arn1,
3891 'Events': ['s3:ObjectCreated:*']
3892 }]
3893 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3894 _, status = s3_notification_conf.set_config()
3895 assert_equal(status/100, 2)
3896 # create objects in the bucket
3897 number_of_objects = 10
3898 for i in range(number_of_objects):
3899 key = bucket.new_key(str(i))
3900 key.set_contents_from_string('bar')
3901 # wait for sync
3902 #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3903 keys = list(bucket.list())
3904 # TODO: use exact match
3905 receiver.verify_s3_events(keys, exact_match=False);
3906 # update notification to use topic2
3907 topic_conf_list = [{'Id': notification_name,
3908 'TopicArn': topic_arn2,
3909 'Events': ['s3:ObjectCreated:*']
3910 }]
3911 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3912 _, status = s3_notification_conf.set_config()
3913 assert_equal(status/100, 2)
3914 # delete current objects and create new objects in the bucket
3915 for key in bucket.list():
3916 key.delete()
3917 for i in range(number_of_objects):
3918 key = bucket.new_key(str(i+100))
3919 key.set_contents_from_string('bar')
3920 # wait for sync
3921 #zone_meta_checkpoint(ps_zone.zone)
3922 #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
3923 keys = list(bucket.list())
3924 # check that updates switched to http
3925 # TODO: use exact match
3926 http_server.verify_s3_events(keys, exact_match=False)
3927 # cleanup
3928 # delete objects from the bucket
3929 stop_amqp_receiver(receiver, amqp_task)
3930 for key in bucket.list():
3931 key.delete()
3932 s3_notification_conf.del_config()
3933 topic_conf1.del_config()
3934 topic_conf2.del_config()
3935 conn.delete_bucket(bucket_name)
3936 http_server.close()
3937
3938
3939@attr('modification_required')
3940def test_ps_s3_multiple_topics_notification():
3941 """ test notification creation with multiple topics"""
3942 return SkipTest('This test is yet to be modified.')
3943
3944 hostname = get_ip()
3945 zonegroup = 'default'
3946 conn = connection()
3947 ps_zone = None
3948 bucket_name = gen_bucket_name()
3949 topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
3950 topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
3951 # create topics
3952 # start amqp receiver in a separate thread
3953 exchange = 'ex1'
3954 amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
3955 amqp_task.start()
3956 # create random port for the http server
3957 http_port = random.randint(10000, 20000)
3958 # start an http server in a separate thread
3959 http_server = StreamingHTTPServer(hostname, http_port)
3960 #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3961 topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
3962 result, status = topic_conf1.set_config()
3963 parsed_result = json.loads(result)
3964 topic_arn1 = parsed_result['arn']
3965 assert_equal(status/100, 2)
3966 #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port))
3967 topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args='http://'+hostname+':'+str(http_port))
3968 result, status = topic_conf2.set_config()
3969 parsed_result = json.loads(result)
3970 topic_arn2 = parsed_result['arn']
3971 assert_equal(status/100, 2)
3972 # create bucket on the first of the rados zones
3973 bucket = conn.create_bucket(bucket_name)
3974 # wait for sync
3975 #zone_meta_checkpoint(ps_zone.zone)
3976 # create s3 notification
3977 notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
3978 notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
3979 topic_conf_list = [
3980 {
3981 'Id': notification_name1,
3982 'TopicArn': topic_arn1,
3983 'Events': ['s3:ObjectCreated:*']
3984 },
3985 {
3986 'Id': notification_name2,
3987 'TopicArn': topic_arn2,
3988 'Events': ['s3:ObjectCreated:*']
3989 }]
3990 s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
3991 _, status = s3_notification_conf.set_config()
3992 assert_equal(status/100, 2)
3993 result, _ = s3_notification_conf.get_config()
3994 assert_equal(len(result['TopicConfigurations']), 2)
3995 assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1)
3996 assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
3997 # get auto-generated subscriptions
3998 sub_conf1 = PSSubscription(ps_zone.conn, notification_name1,
3999 topic_name1)
4000 _, status = sub_conf1.get_config()
4001 assert_equal(status/100, 2)
4002 sub_conf2 = PSSubscription(ps_zone.conn, notification_name2,
4003 topic_name2)
4004 _, status = sub_conf2.get_config()
4005 assert_equal(status/100, 2)
4006 # create objects in the bucket
4007 number_of_objects = 10
4008 for i in range(number_of_objects):
4009 key = bucket.new_key(str(i))
4010 key.set_contents_from_string('bar')
4011 # wait for sync
4012 #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
4013 # get the events from both of the subscription
4014 result, _ = sub_conf1.get_events()
4015 records = json.loads(result)
4016 for record in records['Records']:
4017 log.debug(record)
4018 keys = list(bucket.list())
4019 # TODO: use exact match
4020 verify_s3_records_by_elements(records, keys, exact_match=False)
4021 receiver.verify_s3_events(keys, exact_match=False)
4022 result, _ = sub_conf2.get_events()
4023 parsed_result = json.loads(result)
4024 for record in parsed_result['Records']:
4025 log.debug(record)
4026 keys = list(bucket.list())
4027 # TODO: use exact match
4028 verify_s3_records_by_elements(records, keys, exact_match=False)
4029 http_server.verify_s3_events(keys, exact_match=False)
4030 # cleanup
4031 stop_amqp_receiver(receiver, amqp_task)
4032 s3_notification_conf.del_config()
4033 topic_conf1.del_config()
4034 topic_conf2.del_config()
4035 # delete objects from the bucket
4036 for key in bucket.list():
4037 key.delete()
4038 conn.delete_bucket(bucket_name)
4039 http_server.close()
4040
4041
20effc67 4042def kafka_security(security_type):
1e59de90 4043 """ test pushing kafka s3 notification securly to master """
20effc67 4044 conn = connection()
20effc67
TL
4045 zonegroup = 'default'
4046 # create bucket
4047 bucket_name = gen_bucket_name()
4048 bucket = conn.create_bucket(bucket_name)
4049 # name is constant for manual testing
4050 topic_name = bucket_name+'_topic'
20effc67
TL
4051 # create s3 topic
4052 if security_type == 'SSL_SASL':
4053 endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
1e59de90 4054 elif security_type == 'SSL':
20effc67 4055 endpoint_address = 'kafka://' + kafka_server + ':9093'
20effc67 4056 else:
1e59de90
TL
4057 assert False, 'unknown security method '+security_type
4058
4059 KAFKA_DIR = os.environ['KAFKA_DIR']
4060 endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+"/y-ca.crt"
4061
4062 topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
4063
4064 # create consumer on the topic
4065 task, receiver = create_kafka_receiver_thread(topic_name)
4066 task.start()
4067
20effc67
TL
4068 topic_arn = topic_conf.set_config()
4069 # create s3 notification
4070 notification_name = bucket_name + NOTIFICATION_SUFFIX
4071 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
4072 'Events': []
4073 }]
4074 s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
4075 s3_notification_conf.set_config()
4076 # create objects in the bucket (async)
4077 number_of_objects = 10
4078 client_threads = []
4079 start_time = time.time()
4080 for i in range(number_of_objects):
4081 key = bucket.new_key(str(i))
4082 content = str(os.urandom(1024*1024))
4083 thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
4084 thr.start()
4085 client_threads.append(thr)
4086 [thr.join() for thr in client_threads]
4087 time_diff = time.time() - start_time
4088 print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
4089 try:
4090 print('wait for 5sec for the messages...')
4091 time.sleep(5)
4092 keys = list(bucket.list())
4093 receiver.verify_s3_events(keys, exact_match=True)
4094 # delete objects from the bucket
4095 client_threads = []
4096 start_time = time.time()
4097 for key in bucket.list():
4098 thr = threading.Thread(target = key.delete, args=())
4099 thr.start()
4100 client_threads.append(thr)
4101 [thr.join() for thr in client_threads]
4102 time_diff = time.time() - start_time
4103 print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
4104 print('wait for 5sec for the messages...')
4105 time.sleep(5)
4106 receiver.verify_s3_events(keys, exact_match=True, deletions=True)
4107 except Exception as err:
4108 assert False, str(err)
4109 finally:
4110 # cleanup
4111 s3_notification_conf.del_config()
4112 topic_conf.del_config()
4113 # delete the bucket
4114 for key in bucket.list():
4115 key.delete()
4116 conn.delete_bucket(bucket_name)
4117 stop_kafka_receiver(receiver, task)
4118
4119
1e59de90 4120@attr('kafka_ssl_test')
20effc67 4121def test_ps_s3_notification_push_kafka_security_ssl():
20effc67
TL
4122 kafka_security('SSL')
4123
4124
1e59de90 4125@attr('kafka_ssl_test')
20effc67 4126def test_ps_s3_notification_push_kafka_security_ssl_sasl():
20effc67
TL
4127 kafka_security('SSL_SASL')
4128