]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | import logging |
2 | import json | |
3 | import tempfile | |
4 | import random | |
5 | import threading | |
6 | import subprocess | |
7 | import socket | |
8 | import time | |
9 | import os | |
10 | import string | |
11 | import boto | |
1e59de90 | 12 | from botocore.exceptions import ClientError |
20effc67 TL |
13 | from http import server as http_server |
14 | from random import randint | |
15 | import hashlib | |
16 | from nose.plugins.attrib import attr | |
17 | import boto3 | |
18 | import datetime | |
19 | from cloudevents.http import from_http | |
20 | from dateutil import parser | |
21 | ||
22 | from boto.s3.connection import S3Connection | |
23 | ||
24 | from . import( | |
25 | get_config_host, | |
26 | get_config_port, | |
27 | get_access_key, | |
28 | get_secret_key | |
29 | ) | |
30 | ||
31 | from .api import PSTopicS3, \ | |
32 | PSNotificationS3, \ | |
20effc67 TL |
33 | delete_all_objects, \ |
34 | put_object_tagging, \ | |
35 | admin | |
36 | ||
37 | from nose import SkipTest | |
38 | from nose.tools import assert_not_equal, assert_equal, assert_in | |
39 | import boto.s3.tagging | |
40 | ||
41 | # configure logging for the tests module | |
42 | log = logging.getLogger(__name__) | |
43 | ||
20effc67 TL |
44 | TOPIC_SUFFIX = "_topic" |
45 | NOTIFICATION_SUFFIX = "_notif" | |
46 | ||
47 | ||
48 | num_buckets = 0 | |
49 | run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) | |
50 | ||
51 | def gen_bucket_name(): | |
52 | global num_buckets | |
53 | ||
54 | num_buckets += 1 | |
55 | return run_prefix + '-' + str(num_buckets) | |
56 | ||
57 | ||
58 | def set_contents_from_string(key, content): | |
59 | try: | |
60 | key.set_contents_from_string(content) | |
61 | except Exception as e: | |
62 | print('Error: ' + str(e)) | |
63 | ||
64 | ||
65 | class HTTPPostHandler(http_server.BaseHTTPRequestHandler): | |
66 | """HTTP POST hanler class storing the received events in its http server""" | |
67 | def do_POST(self): | |
68 | """implementation of POST handler""" | |
69 | content_length = int(self.headers['Content-Length']) | |
70 | body = self.rfile.read(content_length) | |
71 | if self.server.cloudevents: | |
72 | event = from_http(self.headers, body) | |
73 | record = json.loads(body)['Records'][0] | |
74 | assert_equal(event['specversion'], '1.0') | |
75 | assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2']) | |
76 | assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name']) | |
77 | assert_equal(event['type'], 'com.amazonaws.' + record['eventName']) | |
78 | assert_equal(event['datacontenttype'], 'application/json') | |
79 | assert_equal(event['subject'], record['s3']['object']['key']) | |
80 | assert_equal(parser.parse(event['time']), parser.parse(record['eventTime'])) | |
81 | log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body)) | |
82 | self.server.append(json.loads(body)) | |
83 | if self.headers.get('Expect') == '100-continue': | |
84 | self.send_response(100) | |
85 | else: | |
86 | self.send_response(200) | |
87 | if self.server.delay > 0: | |
88 | time.sleep(self.server.delay) | |
89 | self.end_headers() | |
90 | ||
91 | ||
92 | class HTTPServerWithEvents(http_server.HTTPServer): | |
93 | """HTTP server used by the handler to store events""" | |
94 | def __init__(self, addr, handler, worker_id, delay=0, cloudevents=False): | |
95 | http_server.HTTPServer.__init__(self, addr, handler, False) | |
96 | self.worker_id = worker_id | |
97 | self.events = [] | |
98 | self.delay = delay | |
99 | self.cloudevents = cloudevents | |
100 | ||
101 | def append(self, event): | |
102 | self.events.append(event) | |
103 | ||
104 | class HTTPServerThread(threading.Thread): | |
105 | """thread for running the HTTP server. reusing the same socket for all threads""" | |
106 | def __init__(self, i, sock, addr, delay=0, cloudevents=False): | |
107 | threading.Thread.__init__(self) | |
108 | self.i = i | |
109 | self.daemon = True | |
110 | self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay, cloudevents) | |
111 | self.httpd.socket = sock | |
112 | # prevent the HTTP server from re-binding every handler | |
113 | self.httpd.server_bind = self.server_close = lambda self: None | |
114 | self.start() | |
115 | ||
116 | def run(self): | |
117 | try: | |
118 | log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address) | |
119 | self.httpd.serve_forever() | |
120 | log.info('HTTP Server (%d) ended', self.i) | |
121 | except Exception as error: | |
122 | # could happen if the server r/w to a closing socket during shutdown | |
123 | log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error)) | |
124 | ||
125 | def close(self): | |
126 | self.httpd.shutdown() | |
127 | ||
128 | def get_events(self): | |
129 | return self.httpd.events | |
130 | ||
131 | def reset_events(self): | |
132 | self.httpd.events = [] | |
133 | ||
134 | class StreamingHTTPServer: | |
135 | """multi-threaded http server class also holding list of events received into the handler | |
136 | each thread has its own server, and all servers share the same socket""" | |
137 | def __init__(self, host, port, num_workers=100, delay=0, cloudevents=False): | |
138 | addr = (host, port) | |
139 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
140 | self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
141 | self.sock.bind(addr) | |
142 | self.sock.listen(num_workers) | |
143 | self.workers = [HTTPServerThread(i, self.sock, addr, delay, cloudevents) for i in range(num_workers)] | |
144 | ||
145 | def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}): | |
146 | """verify stored s3 records agains a list of keys""" | |
147 | events = [] | |
148 | for worker in self.workers: | |
149 | events += worker.get_events() | |
150 | worker.reset_events() | |
151 | verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes) | |
152 | ||
153 | def verify_events(self, keys, exact_match=False, deletions=False): | |
154 | """verify stored events agains a list of keys""" | |
155 | events = [] | |
156 | for worker in self.workers: | |
157 | events += worker.get_events() | |
158 | worker.reset_events() | |
159 | verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions) | |
160 | ||
161 | def get_and_reset_events(self): | |
162 | events = [] | |
163 | for worker in self.workers: | |
164 | events += worker.get_events() | |
165 | worker.reset_events() | |
166 | return events | |
167 | ||
168 | def close(self): | |
169 | """close all workers in the http server and wait for it to finish""" | |
170 | # make sure that the shared socket is closed | |
171 | # this is needed in case that one of the threads is blocked on the socket | |
172 | self.sock.shutdown(socket.SHUT_RDWR) | |
173 | self.sock.close() | |
174 | # wait for server threads to finish | |
175 | for worker in self.workers: | |
176 | worker.close() | |
177 | worker.join() | |
178 | ||
179 | # AMQP endpoint functions | |
180 | ||
181 | class AMQPReceiver(object): | |
182 | """class for receiving and storing messages on a topic from the AMQP broker""" | |
183 | def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None): | |
184 | import pika | |
185 | import ssl | |
186 | ||
187 | if ca_location: | |
188 | ssl_context = ssl.create_default_context() | |
189 | ssl_context.load_verify_locations(cafile=ca_location) | |
190 | ssl_options = pika.SSLOptions(ssl_context) | |
191 | rabbitmq_port = 5671 | |
192 | else: | |
193 | rabbitmq_port = 5672 | |
194 | ssl_options = None | |
195 | ||
196 | if external_endpoint_address: | |
1e59de90 TL |
197 | if ssl_options: |
198 | # this is currently not working due to: https://github.com/pika/pika/issues/1192 | |
199 | params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options) | |
200 | else: | |
201 | params = pika.URLParameters(external_endpoint_address) | |
20effc67 TL |
202 | else: |
203 | hostname = get_ip() | |
204 | params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options) | |
1e59de90 | 205 | |
20effc67 TL |
206 | remaining_retries = 10 |
207 | while remaining_retries > 0: | |
208 | try: | |
209 | connection = pika.BlockingConnection(params) | |
210 | break | |
211 | except Exception as error: | |
212 | remaining_retries -= 1 | |
213 | print('failed to connect to rabbitmq (remaining retries ' | |
214 | + str(remaining_retries) + '): ' + str(error)) | |
215 | time.sleep(1) | |
216 | ||
217 | if remaining_retries == 0: | |
218 | raise Exception('failed to connect to rabbitmq - no retries left') | |
219 | ||
220 | self.channel = connection.channel() | |
221 | self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True) | |
222 | result = self.channel.queue_declare('', exclusive=True) | |
223 | queue_name = result.method.queue | |
224 | self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic) | |
225 | self.channel.basic_consume(queue=queue_name, | |
226 | on_message_callback=self.on_message, | |
227 | auto_ack=True) | |
228 | self.events = [] | |
229 | self.topic = topic | |
230 | ||
231 | def on_message(self, ch, method, properties, body): | |
232 | """callback invoked when a new message arrive on the topic""" | |
233 | log.info('AMQP received event for topic %s:\n %s', self.topic, body) | |
234 | self.events.append(json.loads(body)) | |
235 | ||
236 | # TODO create a base class for the AMQP and HTTP cases | |
237 | def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}): | |
238 | """verify stored s3 records agains a list of keys""" | |
239 | verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes) | |
240 | self.events = [] | |
241 | ||
242 | def verify_events(self, keys, exact_match=False, deletions=False): | |
243 | """verify stored events agains a list of keys""" | |
244 | verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) | |
245 | self.events = [] | |
246 | ||
247 | def get_and_reset_events(self): | |
248 | tmp = self.events | |
249 | self.events = [] | |
250 | return tmp | |
251 | ||
252 | ||
253 | def amqp_receiver_thread_runner(receiver): | |
254 | """main thread function for the amqp receiver""" | |
255 | try: | |
256 | log.info('AMQP receiver started') | |
257 | receiver.channel.start_consuming() | |
258 | log.info('AMQP receiver ended') | |
259 | except Exception as error: | |
260 | log.info('AMQP receiver ended unexpectedly: %s', str(error)) | |
261 | ||
262 | ||
263 | def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None): | |
264 | """create amqp receiver and thread""" | |
265 | receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location) | |
266 | task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,)) | |
267 | task.daemon = True | |
268 | return task, receiver | |
269 | ||
270 | def stop_amqp_receiver(receiver, task): | |
271 | """stop the receiver thread and wait for it to finis""" | |
272 | try: | |
273 | receiver.channel.stop_consuming() | |
274 | log.info('stopping AMQP receiver') | |
275 | except Exception as error: | |
276 | log.info('failed to gracefuly stop AMQP receiver: %s', str(error)) | |
277 | task.join(5) | |
278 | ||
279 | ||
280 | def init_rabbitmq(): | |
281 | """ start a rabbitmq broker """ | |
282 | hostname = get_ip() | |
283 | try: | |
284 | # first try to stop any existing process | |
285 | subprocess.call(['sudo', 'rabbitmqctl', 'stop']) | |
286 | time.sleep(5) | |
287 | proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server']) | |
288 | except Exception as error: | |
289 | log.info('failed to execute rabbitmq-server: %s', str(error)) | |
290 | print('failed to execute rabbitmq-server: %s' % str(error)) | |
291 | return None | |
292 | # TODO add rabbitmq checkpoint instead of sleep | |
293 | time.sleep(5) | |
294 | return proc | |
295 | ||
296 | ||
297 | def clean_rabbitmq(proc): | |
298 | """ stop the rabbitmq broker """ | |
299 | try: | |
300 | subprocess.call(['sudo', 'rabbitmqctl', 'stop']) | |
301 | time.sleep(5) | |
302 | proc.terminate() | |
303 | except: | |
304 | log.info('rabbitmq server already terminated') | |
305 | ||
306 | ||
307 | def verify_events_by_elements(events, keys, exact_match=False, deletions=False): | |
308 | """ verify there is at least one event per element """ | |
309 | err = '' | |
310 | for key in keys: | |
311 | key_found = False | |
312 | if type(events) is list: | |
313 | for event_list in events: | |
314 | if key_found: | |
315 | break | |
316 | for event in event_list['events']: | |
317 | if event['info']['bucket']['name'] == key.bucket.name and \ | |
318 | event['info']['key']['name'] == key.name: | |
319 | if deletions and event['event'] == 'OBJECT_DELETE': | |
320 | key_found = True | |
321 | break | |
322 | elif not deletions and event['event'] == 'OBJECT_CREATE': | |
323 | key_found = True | |
324 | break | |
325 | else: | |
326 | for event in events['events']: | |
327 | if event['info']['bucket']['name'] == key.bucket.name and \ | |
328 | event['info']['key']['name'] == key.name: | |
329 | if deletions and event['event'] == 'OBJECT_DELETE': | |
330 | key_found = True | |
331 | break | |
332 | elif not deletions and event['event'] == 'OBJECT_CREATE': | |
333 | key_found = True | |
334 | break | |
335 | ||
336 | if not key_found: | |
337 | err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) | |
338 | log.error(events) | |
339 | assert False, err | |
340 | ||
341 | if not len(events) == len(keys): | |
342 | err = 'superfluous events are found' | |
343 | log.debug(err) | |
344 | if exact_match: | |
345 | log.error(events) | |
346 | assert False, err | |
347 | ||
aee94f69 TL |
348 | META_PREFIX = 'x-amz-meta-' |
349 | ||
20effc67 TL |
350 | def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): |
351 | """ verify there is at least one record per element """ | |
352 | err = '' | |
353 | for key in keys: | |
354 | key_found = False | |
355 | object_size = 0 | |
356 | if type(records) is list: | |
357 | for record_list in records: | |
358 | if key_found: | |
359 | break | |
360 | for record in record_list['Records']: | |
361 | assert_in('eTag', record['s3']['object']) | |
362 | if record['s3']['bucket']['name'] == key.bucket.name and \ | |
363 | record['s3']['object']['key'] == key.name: | |
364 | # Assertion Error needs to be fixed | |
365 | #assert_equal(key.etag[1:-1], record['s3']['object']['eTag']) | |
366 | if etags: | |
367 | assert_in(key.etag[1:-1], etags) | |
aee94f69 TL |
368 | if len(record['s3']['object']['metadata']) > 0: |
369 | for meta in record['s3']['object']['metadata']: | |
370 | assert(meta['key'].startswith(META_PREFIX)) | |
20effc67 TL |
371 | if deletions and record['eventName'].startswith('ObjectRemoved'): |
372 | key_found = True | |
373 | object_size = record['s3']['object']['size'] | |
374 | break | |
375 | elif not deletions and record['eventName'].startswith('ObjectCreated'): | |
376 | key_found = True | |
377 | object_size = record['s3']['object']['size'] | |
378 | break | |
379 | else: | |
380 | for record in records['Records']: | |
381 | assert_in('eTag', record['s3']['object']) | |
382 | if record['s3']['bucket']['name'] == key.bucket.name and \ | |
383 | record['s3']['object']['key'] == key.name: | |
384 | assert_equal(key.etag, record['s3']['object']['eTag']) | |
385 | if etags: | |
386 | assert_in(key.etag[1:-1], etags) | |
aee94f69 TL |
387 | if len(record['s3']['object']['metadata']) > 0: |
388 | for meta in record['s3']['object']['metadata']: | |
389 | assert(meta['key'].startswith(META_PREFIX)) | |
20effc67 TL |
390 | if deletions and record['eventName'].startswith('ObjectRemoved'): |
391 | key_found = True | |
392 | object_size = record['s3']['object']['size'] | |
393 | break | |
394 | elif not deletions and record['eventName'].startswith('ObjectCreated'): | |
395 | key_found = True | |
396 | object_size = record['s3']['object']['size'] | |
397 | break | |
398 | ||
399 | if not key_found: | |
400 | err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) | |
401 | assert False, err | |
402 | elif expected_sizes: | |
403 | assert_equal(object_size, expected_sizes.get(key.name)) | |
404 | ||
405 | if not len(records) == len(keys): | |
406 | err = 'superfluous records are found' | |
407 | log.warning(err) | |
408 | if exact_match: | |
409 | for record_list in records: | |
410 | for record in record_list['Records']: | |
411 | log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) | |
412 | assert False, err | |
413 | ||
414 | ||
415 | # Kafka endpoint functions | |
416 | ||
417 | kafka_server = 'localhost' | |
418 | ||
419 | class KafkaReceiver(object): | |
420 | """class for receiving and storing messages on a topic from the kafka broker""" | |
421 | def __init__(self, topic, security_type): | |
422 | from kafka import KafkaConsumer | |
423 | remaining_retries = 10 | |
424 | port = 9092 | |
425 | if security_type != 'PLAINTEXT': | |
426 | security_type = 'SSL' | |
427 | port = 9093 | |
428 | while remaining_retries > 0: | |
429 | try: | |
1e59de90 TL |
430 | self.consumer = KafkaConsumer(topic, |
431 | bootstrap_servers = kafka_server+':'+str(port), | |
432 | security_protocol=security_type, | |
433 | consumer_timeout_ms=16000) | |
20effc67 TL |
434 | print('Kafka consumer created on topic: '+topic) |
435 | break | |
436 | except Exception as error: | |
437 | remaining_retries -= 1 | |
438 | print('failed to connect to kafka (remaining retries ' | |
439 | + str(remaining_retries) + '): ' + str(error)) | |
440 | time.sleep(1) | |
441 | ||
442 | if remaining_retries == 0: | |
443 | raise Exception('failed to connect to kafka - no retries left') | |
444 | ||
445 | self.events = [] | |
446 | self.topic = topic | |
447 | self.stop = False | |
448 | ||
449 | def verify_s3_events(self, keys, exact_match=False, deletions=False, etags=[]): | |
450 | """verify stored s3 records agains a list of keys""" | |
451 | verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags) | |
452 | self.events = [] | |
453 | ||
454 | def kafka_receiver_thread_runner(receiver): | |
455 | """main thread function for the kafka receiver""" | |
456 | try: | |
457 | log.info('Kafka receiver started') | |
458 | print('Kafka receiver started') | |
459 | while not receiver.stop: | |
460 | for msg in receiver.consumer: | |
461 | receiver.events.append(json.loads(msg.value)) | |
1e59de90 | 462 | time.sleep(0.1) |
20effc67 TL |
463 | log.info('Kafka receiver ended') |
464 | print('Kafka receiver ended') | |
465 | except Exception as error: | |
466 | log.info('Kafka receiver ended unexpectedly: %s', str(error)) | |
467 | print('Kafka receiver ended unexpectedly: ' + str(error)) | |
468 | ||
469 | ||
470 | def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'): | |
471 | """create kafka receiver and thread""" | |
472 | receiver = KafkaReceiver(topic, security_type) | |
473 | task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,)) | |
474 | task.daemon = True | |
475 | return task, receiver | |
476 | ||
477 | def stop_kafka_receiver(receiver, task): | |
478 | """stop the receiver thread and wait for it to finis""" | |
479 | receiver.stop = True | |
480 | task.join(1) | |
481 | try: | |
1e59de90 | 482 | receiver.consumer.unsubscribe() |
20effc67 TL |
483 | receiver.consumer.close() |
484 | except Exception as error: | |
485 | log.info('failed to gracefuly stop Kafka receiver: %s', str(error)) | |
486 | ||
487 | ||
488 | def get_ip(): | |
489 | return 'localhost' | |
490 | ||
1e59de90 | 491 | |
20effc67 TL |
492 | def get_ip_http(): |
493 | s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
494 | try: | |
495 | # address should not be reachable | |
496 | s.connect(('10.255.255.255', 1)) | |
497 | ip = s.getsockname()[0] | |
498 | finally: | |
499 | s.close() | |
500 | return ip | |
501 | ||
1e59de90 | 502 | |
20effc67 TL |
503 | def connection(): |
504 | hostname = get_config_host() | |
505 | port_no = get_config_port() | |
506 | vstart_access_key = get_access_key() | |
507 | vstart_secret_key = get_secret_key() | |
508 | ||
509 | conn = S3Connection(aws_access_key_id=vstart_access_key, | |
510 | aws_secret_access_key=vstart_secret_key, | |
511 | is_secure=False, port=port_no, host=hostname, | |
512 | calling_format='boto.s3.connection.OrdinaryCallingFormat') | |
513 | ||
514 | return conn | |
515 | ||
1e59de90 | 516 | |
20effc67 | 517 | def connection2(): |
1e59de90 TL |
518 | hostname = get_config_host() |
519 | port_no = 8001 | |
520 | vstart_access_key = get_access_key() | |
521 | vstart_secret_key = get_secret_key() | |
20effc67 TL |
522 | |
523 | conn = S3Connection(aws_access_key_id=vstart_access_key, | |
1e59de90 TL |
524 | aws_secret_access_key=vstart_secret_key, |
525 | is_secure=False, port=port_no, host=hostname, | |
20effc67 TL |
526 | calling_format='boto.s3.connection.OrdinaryCallingFormat') |
527 | ||
528 | return conn | |
529 | ||
530 | ||
1e59de90 TL |
531 | def another_user(tenant=None): |
532 | access_key = str(time.time()) | |
533 | secret_key = str(time.time()) | |
534 | uid = 'superman' + str(time.time()) | |
535 | if tenant: | |
536 | _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"']) | |
537 | else: | |
538 | _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"']) | |
539 | ||
540 | assert_equal(result, 0) | |
541 | conn = S3Connection(aws_access_key_id=access_key, | |
542 | aws_secret_access_key=secret_key, | |
543 | is_secure=False, port=get_config_port(), host=get_config_host(), | |
544 | calling_format='boto.s3.connection.OrdinaryCallingFormat') | |
545 | return conn | |
546 | ||
20effc67 TL |
547 | ############## |
548 | # bucket notifications tests | |
549 | ############## | |
550 | ||
551 | ||
1e59de90 | 552 | @attr('basic_test') |
20effc67 TL |
553 | def test_ps_s3_topic_on_master(): |
554 | """ test s3 topics set/get/delete on master """ | |
1e59de90 TL |
555 | tenant = 'kaboom' |
556 | conn = another_user(tenant) | |
20effc67 TL |
557 | zonegroup = 'default' |
558 | bucket_name = gen_bucket_name() | |
20effc67 TL |
559 | topic_name = bucket_name + TOPIC_SUFFIX |
560 | ||
20effc67 TL |
561 | # create s3 topics |
562 | endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' | |
563 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
564 | topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) | |
1e59de90 TL |
565 | # clean all topics |
566 | try: | |
567 | result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics'] | |
568 | topics = [] | |
569 | if result is not None: | |
570 | topics = result['member'] | |
571 | for topic in topics: | |
572 | topic_conf1.del_config(topic_arn=topic['TopicArn']) | |
573 | except Exception as err: | |
574 | print('failed to do topic cleanup: ' + str(err)) | |
575 | ||
20effc67 TL |
576 | topic_arn = topic_conf1.set_config() |
577 | assert_equal(topic_arn, | |
1e59de90 | 578 | 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1') |
20effc67 TL |
579 | |
580 | endpoint_address = 'http://127.0.0.1:9001' | |
581 | endpoint_args = 'push-endpoint='+endpoint_address | |
582 | topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) | |
583 | topic_arn = topic_conf2.set_config() | |
584 | assert_equal(topic_arn, | |
1e59de90 | 585 | 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2') |
20effc67 TL |
586 | endpoint_address = 'http://127.0.0.1:9002' |
587 | endpoint_args = 'push-endpoint='+endpoint_address | |
588 | topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args) | |
589 | topic_arn = topic_conf3.set_config() | |
590 | assert_equal(topic_arn, | |
1e59de90 | 591 | 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3') |
20effc67 TL |
592 | |
593 | # get topic 3 | |
594 | result, status = topic_conf3.get_config() | |
595 | assert_equal(status, 200) | |
596 | assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) | |
597 | assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) | |
598 | ||
599 | # Note that endpoint args may be ordered differently in the result | |
600 | # delete topic 1 | |
601 | result = topic_conf1.del_config() | |
602 | assert_equal(status, 200) | |
603 | ||
604 | # try to get a deleted topic | |
605 | _, status = topic_conf1.get_config() | |
606 | assert_equal(status, 404) | |
607 | ||
608 | # get the remaining 2 topics | |
609 | result, status = topic_conf1.get_list() | |
610 | assert_equal(status, 200) | |
611 | assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2) | |
612 | ||
613 | # delete topics | |
614 | result = topic_conf2.del_config() | |
1e59de90 | 615 | assert_equal(status, 200) |
20effc67 | 616 | result = topic_conf3.del_config() |
1e59de90 | 617 | assert_equal(status, 200) |
20effc67 TL |
618 | |
619 | # get topic list, make sure it is empty | |
620 | result, status = topic_conf1.get_list() | |
621 | assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None) | |
622 | ||
623 | ||
1e59de90 TL |
624 | @attr('basic_test') |
625 | def test_ps_s3_topic_admin_on_master(): | |
626 | """ test s3 topics set/get/delete on master """ | |
627 | tenant = 'kaboom' | |
628 | conn = another_user(tenant) | |
629 | zonegroup = 'default' | |
630 | bucket_name = gen_bucket_name() | |
631 | topic_name = bucket_name + TOPIC_SUFFIX | |
632 | ||
633 | # create s3 topics | |
634 | endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' | |
635 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
636 | topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) | |
637 | # clean all topics | |
638 | try: | |
639 | result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics'] | |
640 | topics = [] | |
641 | if result is not None: | |
642 | topics = result['member'] | |
643 | for topic in topics: | |
644 | topic_conf1.del_config(topic_arn=topic['TopicArn']) | |
645 | except Exception as err: | |
646 | print('failed to do topic cleanup: ' + str(err)) | |
647 | ||
648 | topic_arn1 = topic_conf1.set_config() | |
649 | assert_equal(topic_arn1, | |
650 | 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1') | |
651 | ||
652 | endpoint_address = 'http://127.0.0.1:9001' | |
653 | endpoint_args = 'push-endpoint='+endpoint_address | |
654 | topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) | |
655 | topic_arn2 = topic_conf2.set_config() | |
656 | assert_equal(topic_arn2, | |
657 | 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2') | |
658 | endpoint_address = 'http://127.0.0.1:9002' | |
659 | endpoint_args = 'push-endpoint='+endpoint_address | |
660 | topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args) | |
661 | topic_arn3 = topic_conf3.set_config() | |
662 | assert_equal(topic_arn3, | |
663 | 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3') | |
664 | ||
665 | # get topic 3 via commandline | |
666 | result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant]) | |
667 | parsed_result = json.loads(result[0]) | |
668 | assert_equal(parsed_result['arn'], topic_arn3) | |
669 | ||
670 | # delete topic 3 | |
671 | _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant]) | |
672 | assert_equal(result, 0) | |
673 | ||
674 | # try to get a deleted topic | |
675 | _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant]) | |
676 | print('"topic not found" error is expected') | |
677 | assert_equal(result, 2) | |
678 | ||
679 | # get the remaining 2 topics | |
680 | result = admin(['topic', 'list', '--tenant', tenant]) | |
681 | parsed_result = json.loads(result[0]) | |
682 | assert_equal(len(parsed_result['topics']), 2) | |
683 | ||
684 | # delete topics | |
685 | _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant]) | |
686 | assert_equal(result, 0) | |
687 | _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant]) | |
688 | assert_equal(result, 0) | |
689 | ||
690 | # get topic list, make sure it is empty | |
691 | result = admin(['topic', 'list', '--tenant', tenant]) | |
692 | parsed_result = json.loads(result[0]) | |
693 | assert_equal(len(parsed_result['topics']), 0) | |
694 | ||
695 | ||
aee94f69 TL |
696 | @attr('basic_test') |
697 | def test_ps_s3_notification_configuration_admin_on_master(): | |
698 | """ test s3 notification list/get/delete on master """ | |
699 | conn = connection() | |
700 | zonegroup = 'default' | |
701 | bucket_name = gen_bucket_name() | |
702 | bucket = conn.create_bucket(bucket_name) | |
703 | topic_name = bucket_name + TOPIC_SUFFIX | |
704 | ||
705 | # create s3 topics | |
706 | endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' | |
707 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
708 | topic_conf = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) | |
709 | # clean all topics | |
710 | try: | |
711 | result = topic_conf.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics'] | |
712 | topics = [] | |
713 | if result is not None: | |
714 | topics = result['member'] | |
715 | for topic in topics: | |
716 | topic_conf.del_config(topic_arn=topic['TopicArn']) | |
717 | except Exception as err: | |
718 | print('failed to do topic cleanup: ' + str(err)) | |
719 | ||
720 | topic_arn = topic_conf.set_config() | |
721 | assert_equal(topic_arn, | |
722 | 'arn:aws:sns:' + zonegroup + '::' + topic_name + '_1') | |
723 | # create s3 notification | |
724 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
725 | topic_conf_list = [{'Id': notification_name+'_1', | |
726 | 'TopicArn': topic_arn, | |
727 | 'Events': ['s3:ObjectCreated:*'] | |
728 | }, | |
729 | {'Id': notification_name+'_2', | |
730 | 'TopicArn': topic_arn, | |
731 | 'Events': ['s3:ObjectRemoved:*'] | |
732 | }, | |
733 | {'Id': notification_name+'_3', | |
734 | 'TopicArn': topic_arn, | |
735 | 'Events': [] | |
736 | }] | |
737 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
738 | _, status = s3_notification_conf.set_config() | |
739 | assert_equal(status/100, 2) | |
740 | ||
741 | # list notification | |
742 | result = admin(['notification', 'list', '--bucket', bucket_name]) | |
743 | parsed_result = json.loads(result[0]) | |
744 | assert_equal(len(parsed_result['notifications']), 3) | |
745 | assert_equal(result[1], 0) | |
746 | ||
747 | # get notification 1 | |
748 | result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1']) | |
749 | parsed_result = json.loads(result[0]) | |
750 | assert_equal(parsed_result['Id'], notification_name+'_1') | |
751 | assert_equal(result[1], 0) | |
752 | ||
753 | # remove notification 3 | |
754 | _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3']) | |
755 | assert_equal(result, 0) | |
756 | ||
757 | # list notification | |
758 | result = admin(['notification', 'list', '--bucket', bucket_name]) | |
759 | parsed_result = json.loads(result[0]) | |
760 | assert_equal(len(parsed_result['notifications']), 2) | |
761 | assert_equal(result[1], 0) | |
762 | ||
763 | # delete notifications | |
764 | _, result = admin(['notification', 'rm', '--bucket', bucket_name]) | |
765 | assert_equal(result, 0) | |
766 | ||
767 | # list notification, make sure it is empty | |
768 | result = admin(['notification', 'list', '--bucket', bucket_name]) | |
769 | parsed_result = json.loads(result[0]) | |
770 | assert_equal(len(parsed_result['notifications']), 0) | |
771 | assert_equal(result[1], 0) | |
772 | ||
773 | ||
20effc67 TL |
774 | @attr('modification_required') |
775 | def test_ps_s3_topic_with_secret_on_master(): | |
776 | """ test s3 topics with secret set/get/delete on master """ | |
777 | return SkipTest('secure connection is needed to test topic with secrets') | |
778 | ||
779 | conn = connection1() | |
780 | if conn.secure_conn is None: | |
781 | return SkipTest('secure connection is needed to test topic with secrets') | |
782 | ||
783 | zonegroup = 'default' | |
784 | bucket_name = gen_bucket_name() | |
785 | topic_name = bucket_name + TOPIC_SUFFIX | |
786 | ||
787 | # clean all topics | |
788 | delete_all_s3_topics(conn, zonegroup) | |
789 | ||
790 | # create s3 topics | |
791 | endpoint_address = 'amqp://user:password@127.0.0.1:7001' | |
792 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
793 | bad_topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
794 | try: | |
795 | result = bad_topic_conf.set_config() | |
796 | except Exception as err: | |
797 | print('Error is expected: ' + str(err)) | |
798 | else: | |
799 | assert False, 'user password configuration set allowed only over HTTPS' | |
800 | topic_conf = PSTopicS3(conn.secure_conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
801 | topic_arn = topic_conf.set_config() | |
802 | ||
803 | assert_equal(topic_arn, | |
804 | 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name) | |
805 | ||
806 | _, status = bad_topic_conf.get_config() | |
807 | assert_equal(status/100, 4) | |
808 | ||
809 | # get topic | |
810 | result, status = topic_conf.get_config() | |
811 | assert_equal(status, 200) | |
812 | assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) | |
813 | assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) | |
814 | ||
815 | _, status = bad_topic_conf.get_config() | |
816 | assert_equal(status/100, 4) | |
817 | ||
818 | _, status = topic_conf.get_list() | |
819 | assert_equal(status/100, 2) | |
820 | ||
821 | # delete topics | |
822 | result = topic_conf.del_config() | |
823 | ||
824 | ||
825 | @attr('basic_test') | |
826 | def test_ps_s3_notification_on_master(): | |
827 | """ test s3 notification set/get/delete on master """ | |
828 | conn = connection() | |
829 | zonegroup = 'default' | |
830 | bucket_name = gen_bucket_name() | |
831 | # create bucket | |
832 | bucket = conn.create_bucket(bucket_name) | |
833 | topic_name = bucket_name + TOPIC_SUFFIX | |
834 | # create s3 topic | |
835 | endpoint_address = 'amqp://127.0.0.1:7001' | |
836 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
837 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
838 | topic_arn = topic_conf.set_config() | |
839 | # create s3 notification | |
840 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
841 | topic_conf_list = [{'Id': notification_name+'_1', | |
842 | 'TopicArn': topic_arn, | |
843 | 'Events': ['s3:ObjectCreated:*'] | |
844 | }, | |
845 | {'Id': notification_name+'_2', | |
846 | 'TopicArn': topic_arn, | |
847 | 'Events': ['s3:ObjectRemoved:*'] | |
848 | }, | |
849 | {'Id': notification_name+'_3', | |
850 | 'TopicArn': topic_arn, | |
851 | 'Events': [] | |
852 | }] | |
853 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
854 | _, status = s3_notification_conf.set_config() | |
855 | assert_equal(status/100, 2) | |
856 | ||
857 | # get notifications on a bucket | |
858 | response, status = s3_notification_conf.get_config(notification=notification_name+'_1') | |
859 | assert_equal(status/100, 2) | |
860 | assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) | |
861 | ||
862 | # delete specific notifications | |
863 | _, status = s3_notification_conf.del_config(notification=notification_name+'_1') | |
864 | assert_equal(status/100, 2) | |
865 | ||
866 | # get the remaining 2 notifications on a bucket | |
867 | response, status = s3_notification_conf.get_config() | |
868 | assert_equal(status/100, 2) | |
869 | assert_equal(len(response['TopicConfigurations']), 2) | |
870 | assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn) | |
871 | assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn) | |
872 | ||
873 | # delete remaining notifications | |
874 | _, status = s3_notification_conf.del_config() | |
875 | assert_equal(status/100, 2) | |
876 | ||
877 | # make sure that the notifications are now deleted | |
878 | _, status = s3_notification_conf.get_config() | |
879 | ||
880 | # cleanup | |
881 | topic_conf.del_config() | |
882 | # delete the bucket | |
883 | conn.delete_bucket(bucket_name) | |
884 | ||
885 | ||
886 | @attr('basic_test') | |
887 | def test_ps_s3_notification_on_master_empty_config(): | |
888 | """ test s3 notification set/get/delete on master with empty config """ | |
889 | hostname = get_ip() | |
890 | ||
891 | conn = connection() | |
892 | ||
893 | zonegroup = 'default' | |
894 | ||
895 | # create bucket | |
896 | bucket_name = gen_bucket_name() | |
897 | bucket = conn.create_bucket(bucket_name) | |
898 | topic_name = bucket_name + TOPIC_SUFFIX | |
899 | ||
900 | # create s3 topic | |
901 | endpoint_address = 'amqp://127.0.0.1:7001' | |
902 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
903 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
904 | topic_arn = topic_conf.set_config() | |
905 | ||
906 | # create s3 notification | |
907 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
908 | topic_conf_list = [{'Id': notification_name+'_1', | |
909 | 'TopicArn': topic_arn, | |
910 | 'Events': [] | |
911 | }] | |
912 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
913 | _, status = s3_notification_conf.set_config() | |
914 | assert_equal(status/100, 2) | |
915 | ||
916 | # get notifications on a bucket | |
917 | response, status = s3_notification_conf.get_config(notification=notification_name+'_1') | |
918 | assert_equal(status/100, 2) | |
919 | assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) | |
920 | ||
921 | # create s3 notification again with empty configuration to check if it deletes or not | |
922 | topic_conf_list = [] | |
923 | ||
924 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
925 | _, status = s3_notification_conf.set_config() | |
926 | assert_equal(status/100, 2) | |
927 | ||
928 | # make sure that the notification is now deleted | |
929 | response, status = s3_notification_conf.get_config() | |
930 | try: | |
931 | check = response['NotificationConfiguration'] | |
932 | except KeyError as e: | |
933 | assert_equal(status/100, 2) | |
934 | else: | |
935 | assert False | |
936 | ||
937 | # cleanup | |
938 | topic_conf.del_config() | |
939 | # delete the bucket | |
940 | conn.delete_bucket(bucket_name) | |
941 | ||
942 | ||
943 | @attr('amqp_test') | |
944 | def test_ps_s3_notification_filter_on_master(): | |
945 | """ test s3 notification filter on master """ | |
946 | ||
947 | hostname = get_ip() | |
948 | ||
949 | conn = connection() | |
950 | ps_zone = conn | |
951 | ||
952 | zonegroup = 'default' | |
953 | ||
954 | # create bucket | |
955 | bucket_name = gen_bucket_name() | |
956 | bucket = conn.create_bucket(bucket_name) | |
957 | topic_name = bucket_name + TOPIC_SUFFIX | |
958 | ||
959 | # start amqp receivers | |
960 | exchange = 'ex1' | |
961 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
962 | task.start() | |
963 | ||
964 | # create s3 topic | |
965 | endpoint_address = 'amqp://' + hostname | |
966 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
967 | ||
968 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
969 | topic_arn = topic_conf.set_config() | |
970 | ||
971 | # create s3 notification | |
972 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
973 | topic_conf_list = [{'Id': notification_name+'_1', | |
974 | 'TopicArn': topic_arn, | |
975 | 'Events': ['s3:ObjectCreated:*'], | |
976 | 'Filter': { | |
977 | 'Key': { | |
978 | 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}] | |
979 | } | |
980 | } | |
981 | }, | |
982 | {'Id': notification_name+'_2', | |
983 | 'TopicArn': topic_arn, | |
984 | 'Events': ['s3:ObjectCreated:*'], | |
985 | 'Filter': { | |
986 | 'Key': { | |
987 | 'FilterRules': [{'Name': 'prefix', 'Value': 'world'}, | |
988 | {'Name': 'suffix', 'Value': 'log'}] | |
989 | } | |
990 | } | |
991 | }, | |
992 | {'Id': notification_name+'_3', | |
993 | 'TopicArn': topic_arn, | |
994 | 'Events': [], | |
995 | 'Filter': { | |
996 | 'Key': { | |
997 | 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}] | |
998 | } | |
999 | } | |
1000 | }] | |
1001 | ||
1002 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1003 | result, status = s3_notification_conf.set_config() | |
1004 | assert_equal(status/100, 2) | |
1005 | ||
1006 | topic_conf_list = [{'Id': notification_name+'_4', | |
1007 | 'TopicArn': topic_arn, | |
1008 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], | |
1009 | 'Filter': { | |
1010 | 'Metadata': { | |
1011 | 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'}, | |
1012 | {'Name': 'x-amz-meta-hello', 'Value': 'world'}] | |
1013 | }, | |
1014 | 'Key': { | |
1015 | 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}] | |
1016 | } | |
1017 | } | |
1018 | }] | |
1019 | ||
1020 | try: | |
1021 | s3_notification_conf4 = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1022 | _, status = s3_notification_conf4.set_config() | |
1023 | assert_equal(status/100, 2) | |
1024 | skip_notif4 = False | |
1025 | except Exception as error: | |
1026 | print('note: metadata filter is not supported by boto3 - skipping test') | |
1027 | skip_notif4 = True | |
1028 | ||
1029 | ||
1030 | # get all notifications | |
1031 | result, status = s3_notification_conf.get_config() | |
1032 | assert_equal(status/100, 2) | |
1033 | for conf in result['TopicConfigurations']: | |
1034 | filter_name = conf['Filter']['Key']['FilterRules'][0]['Name'] | |
1035 | assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name | |
1036 | ||
1037 | if not skip_notif4: | |
1038 | result, status = s3_notification_conf4.get_config(notification=notification_name+'_4') | |
1039 | assert_equal(status/100, 2) | |
1040 | filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name'] | |
1041 | assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello' | |
1042 | ||
1043 | expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello'] | |
1044 | expected_in2 = ['world1.log', 'world2log', 'world3.log'] | |
1045 | expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt'] | |
1046 | expected_in4 = ['foo', 'bar', 'hello', 'world'] | |
1047 | filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt'] | |
1048 | filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld'] | |
1049 | # create objects in bucket | |
1050 | for key_name in expected_in1: | |
1051 | key = bucket.new_key(key_name) | |
1052 | key.set_contents_from_string('bar') | |
1053 | for key_name in expected_in2: | |
1054 | key = bucket.new_key(key_name) | |
1055 | key.set_contents_from_string('bar') | |
1056 | for key_name in expected_in3: | |
1057 | key = bucket.new_key(key_name) | |
1058 | key.set_contents_from_string('bar') | |
1059 | if not skip_notif4: | |
1060 | for key_name in expected_in4: | |
1061 | key = bucket.new_key(key_name) | |
1062 | key.set_metadata('foo', 'bar') | |
1063 | key.set_metadata('hello', 'world') | |
1064 | key.set_metadata('goodbye', 'cruel world') | |
1065 | key.set_contents_from_string('bar') | |
1066 | for key_name in filtered: | |
1067 | key = bucket.new_key(key_name) | |
1068 | key.set_contents_from_string('bar') | |
1069 | for key_name in filtered_with_attr: | |
1070 | key.set_metadata('foo', 'nobar') | |
1071 | key.set_metadata('hello', 'noworld') | |
1072 | key.set_metadata('goodbye', 'cruel world') | |
1073 | key = bucket.new_key(key_name) | |
1074 | key.set_contents_from_string('bar') | |
1075 | ||
1076 | print('wait for 5sec for the messages...') | |
1077 | time.sleep(5) | |
1078 | ||
1079 | found_in1 = [] | |
1080 | found_in2 = [] | |
1081 | found_in3 = [] | |
1082 | found_in4 = [] | |
1083 | ||
1084 | for event in receiver.get_and_reset_events(): | |
1085 | notif_id = event['Records'][0]['s3']['configurationId'] | |
1086 | key_name = event['Records'][0]['s3']['object']['key'] | |
1087 | awsRegion = event['Records'][0]['awsRegion'] | |
1e59de90 TL |
1088 | assert_equal(awsRegion, zonegroup) |
1089 | bucket_arn = event['Records'][0]['s3']['bucket']['arn'] | |
1090 | assert_equal(bucket_arn, "arn:aws:s3:"+awsRegion+"::"+bucket_name) | |
20effc67 TL |
1091 | if notif_id == notification_name+'_1': |
1092 | found_in1.append(key_name) | |
1093 | elif notif_id == notification_name+'_2': | |
1094 | found_in2.append(key_name) | |
1095 | elif notif_id == notification_name+'_3': | |
1096 | found_in3.append(key_name) | |
1097 | elif not skip_notif4 and notif_id == notification_name+'_4': | |
1098 | found_in4.append(key_name) | |
1099 | else: | |
1100 | assert False, 'invalid notification: ' + notif_id | |
1101 | ||
1102 | assert_equal(set(found_in1), set(expected_in1)) | |
1103 | assert_equal(set(found_in2), set(expected_in2)) | |
1104 | assert_equal(set(found_in3), set(expected_in3)) | |
20effc67 TL |
1105 | if not skip_notif4: |
1106 | assert_equal(set(found_in4), set(expected_in4)) | |
1107 | ||
1108 | # cleanup | |
1109 | s3_notification_conf.del_config() | |
1110 | if not skip_notif4: | |
1111 | s3_notification_conf4.del_config() | |
1112 | topic_conf.del_config() | |
1113 | # delete the bucket | |
1114 | for key in bucket.list(): | |
1115 | key.delete() | |
1116 | conn.delete_bucket(bucket_name) | |
1117 | stop_amqp_receiver(receiver, task) | |
1118 | ||
1119 | ||
1120 | @attr('basic_test') | |
1121 | def test_ps_s3_notification_errors_on_master(): | |
1122 | """ test s3 notification set/get/delete on master """ | |
1123 | conn = connection() | |
1124 | zonegroup = 'default' | |
1125 | bucket_name = gen_bucket_name() | |
1126 | # create bucket | |
1127 | bucket = conn.create_bucket(bucket_name) | |
1128 | topic_name = bucket_name + TOPIC_SUFFIX | |
1129 | # create s3 topic | |
1130 | endpoint_address = 'amqp://127.0.0.1:7001' | |
1131 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
1132 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
1133 | topic_arn = topic_conf.set_config() | |
1134 | ||
1135 | # create s3 notification with invalid event name | |
1136 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1137 | topic_conf_list = [{'Id': notification_name, | |
1138 | 'TopicArn': topic_arn, | |
1139 | 'Events': ['s3:ObjectCreated:Kaboom'] | |
1140 | }] | |
1141 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1142 | try: | |
1143 | result, status = s3_notification_conf.set_config() | |
1144 | except Exception as error: | |
1145 | print(str(error) + ' - is expected') | |
1146 | else: | |
1147 | assert False, 'invalid event name is expected to fail' | |
1148 | ||
1149 | # create s3 notification with missing name | |
1150 | topic_conf_list = [{'Id': '', | |
1151 | 'TopicArn': topic_arn, | |
1152 | 'Events': ['s3:ObjectCreated:Put'] | |
1153 | }] | |
1154 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1155 | try: | |
1156 | _, _ = s3_notification_conf.set_config() | |
1157 | except Exception as error: | |
1158 | print(str(error) + ' - is expected') | |
1159 | else: | |
1160 | assert False, 'missing notification name is expected to fail' | |
1161 | ||
1162 | # create s3 notification with invalid topic ARN | |
1163 | invalid_topic_arn = 'kaboom' | |
1164 | topic_conf_list = [{'Id': notification_name, | |
1165 | 'TopicArn': invalid_topic_arn, | |
1166 | 'Events': ['s3:ObjectCreated:Put'] | |
1167 | }] | |
1168 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1169 | try: | |
1170 | _, _ = s3_notification_conf.set_config() | |
1171 | except Exception as error: | |
1172 | print(str(error) + ' - is expected') | |
1173 | else: | |
1174 | assert False, 'invalid ARN is expected to fail' | |
1175 | ||
1176 | # create s3 notification with unknown topic ARN | |
1177 | invalid_topic_arn = 'arn:aws:sns:a::kaboom' | |
1178 | topic_conf_list = [{'Id': notification_name, | |
1179 | 'TopicArn': invalid_topic_arn , | |
1180 | 'Events': ['s3:ObjectCreated:Put'] | |
1181 | }] | |
1182 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1183 | try: | |
1184 | _, _ = s3_notification_conf.set_config() | |
1185 | except Exception as error: | |
1186 | print(str(error) + ' - is expected') | |
1187 | else: | |
1188 | assert False, 'unknown topic is expected to fail' | |
1189 | ||
1190 | # create s3 notification with wrong bucket | |
1191 | topic_conf_list = [{'Id': notification_name, | |
1192 | 'TopicArn': topic_arn, | |
1193 | 'Events': ['s3:ObjectCreated:Put'] | |
1194 | }] | |
1195 | s3_notification_conf = PSNotificationS3(conn, 'kaboom', topic_conf_list) | |
1196 | try: | |
1197 | _, _ = s3_notification_conf.set_config() | |
1198 | except Exception as error: | |
1199 | print(str(error) + ' - is expected') | |
1200 | else: | |
1201 | assert False, 'unknown bucket is expected to fail' | |
1202 | ||
1203 | topic_conf.del_config() | |
1204 | ||
1205 | status = topic_conf.del_config() | |
1206 | # deleting an unknown notification is not considered an error | |
1207 | assert_equal(status, 200) | |
1208 | ||
1209 | _, status = topic_conf.get_config() | |
1210 | assert_equal(status, 404) | |
1211 | ||
1212 | # cleanup | |
1213 | # delete the bucket | |
1214 | conn.delete_bucket(bucket_name) | |
1215 | ||
1e59de90 TL |
1216 | @attr('basic_test') |
1217 | def test_ps_s3_notification_permissions(): | |
1218 | """ test s3 notification set/get/delete permissions """ | |
1219 | conn1 = connection() | |
1220 | conn2 = another_user() | |
1221 | zonegroup = 'default' | |
1222 | bucket_name = gen_bucket_name() | |
1223 | # create bucket | |
1224 | bucket = conn1.create_bucket(bucket_name) | |
1225 | topic_name = bucket_name + TOPIC_SUFFIX | |
1226 | # create s3 topic | |
1227 | endpoint_address = 'amqp://127.0.0.1:7001' | |
1228 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' | |
1229 | topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args) | |
1230 | topic_arn = topic_conf.set_config() | |
1231 | ||
1232 | # one user create a notification | |
1233 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1234 | topic_conf_list = [{'Id': notification_name, | |
1235 | 'TopicArn': topic_arn, | |
1236 | 'Events': [] | |
1237 | }] | |
1238 | s3_notification_conf1 = PSNotificationS3(conn1, bucket_name, topic_conf_list) | |
1239 | _, status = s3_notification_conf1.set_config() | |
1240 | assert_equal(status, 200) | |
1241 | # another user try to fetch it | |
1242 | s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) | |
1243 | try: | |
1244 | _, _ = s3_notification_conf2.get_config() | |
1245 | assert False, "'AccessDenied' error is expected" | |
1246 | except ClientError as error: | |
1247 | assert_equal(error.response['Error']['Code'], 'AccessDenied') | |
1248 | # other user try to delete the notification | |
1249 | _, status = s3_notification_conf2.del_config() | |
1250 | assert_equal(status, 403) | |
1251 | ||
1252 | # bucket policy is added by the 1st user | |
1253 | client = boto3.client('s3', | |
1254 | endpoint_url='http://'+conn1.host+':'+str(conn1.port), | |
1255 | aws_access_key_id=conn1.aws_access_key_id, | |
1256 | aws_secret_access_key=conn1.aws_secret_access_key) | |
1257 | bucket_policy = json.dumps({ | |
1258 | "Version": "2012-10-17", | |
1259 | "Statement": [ | |
1260 | { | |
1261 | "Sid": "Statement", | |
1262 | "Effect": "Allow", | |
1263 | "Principal": "*", | |
1264 | "Action": ["s3:GetBucketNotification", "s3:PutBucketNotification"], | |
1265 | "Resource": f"arn:aws:s3:::{bucket_name}" | |
1266 | } | |
1267 | ] | |
1268 | }) | |
1269 | response = client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy) | |
1270 | assert_equal(int(response['ResponseMetadata']['HTTPStatusCode']/100), 2) | |
1271 | result = client.get_bucket_policy(Bucket=bucket_name) | |
1272 | print(result['Policy']) | |
1273 | ||
1274 | # 2nd user try to fetch it again | |
1275 | _, status = s3_notification_conf2.get_config() | |
1276 | assert_equal(status, 200) | |
1277 | ||
1278 | # 2nd user try to delete it again | |
1279 | result, status = s3_notification_conf2.del_config() | |
1280 | assert_equal(status, 200) | |
1281 | ||
1282 | # 2nd user try to add another notification | |
1283 | topic_conf_list = [{'Id': notification_name+"2", | |
1284 | 'TopicArn': topic_arn, | |
1285 | 'Events': [] | |
1286 | }] | |
1287 | s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) | |
1288 | result, status = s3_notification_conf2.set_config() | |
1289 | assert_equal(status, 200) | |
1290 | ||
1291 | # cleanup | |
1292 | s3_notification_conf1.del_config() | |
1293 | s3_notification_conf2.del_config() | |
1294 | topic_conf.del_config() | |
1295 | # delete the bucket | |
1296 | conn1.delete_bucket(bucket_name) | |
20effc67 TL |
1297 | |
1298 | @attr('amqp_test') | |
1299 | def test_ps_s3_notification_push_amqp_on_master(): | |
1300 | """ test pushing amqp s3 notification on master """ | |
1301 | ||
1302 | hostname = get_ip() | |
1303 | conn = connection() | |
1304 | zonegroup = 'default' | |
1305 | ||
1306 | # create bucket | |
1307 | bucket_name = gen_bucket_name() | |
1308 | bucket = conn.create_bucket(bucket_name) | |
1309 | topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' | |
1310 | topic_name2 = bucket_name + TOPIC_SUFFIX + '_2' | |
1311 | ||
1312 | # start amqp receivers | |
1313 | exchange = 'ex1' | |
1314 | task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1) | |
1315 | task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2) | |
1316 | task1.start() | |
1317 | task2.start() | |
1318 | ||
1319 | # create two s3 topic | |
1320 | endpoint_address = 'amqp://' + hostname | |
1321 | # with acks from broker | |
1322 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
1323 | topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args) | |
1324 | topic_arn1 = topic_conf1.set_config() | |
1325 | # without acks from broker | |
1326 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' | |
1327 | topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args) | |
1328 | topic_arn2 = topic_conf2.set_config() | |
1329 | # create s3 notification | |
1330 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1331 | topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, | |
1332 | 'Events': [] | |
1333 | }, | |
1334 | {'Id': notification_name+'_2', 'TopicArn': topic_arn2, | |
1335 | 'Events': ['s3:ObjectCreated:*'] | |
1336 | }] | |
1337 | ||
1338 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1339 | response, status = s3_notification_conf.set_config() | |
1340 | assert_equal(status/100, 2) | |
1341 | ||
1342 | # create objects in the bucket (async) | |
1343 | number_of_objects = 100 | |
1344 | client_threads = [] | |
1345 | start_time = time.time() | |
1346 | for i in range(number_of_objects): | |
1347 | key = bucket.new_key(str(i)) | |
1348 | content = str(os.urandom(1024*1024)) | |
1349 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1350 | thr.start() | |
1351 | client_threads.append(thr) | |
1352 | [thr.join() for thr in client_threads] | |
1353 | ||
1354 | time_diff = time.time() - start_time | |
1355 | print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1356 | ||
1357 | print('wait for 5sec for the messages...') | |
1358 | time.sleep(5) | |
1359 | ||
1360 | # check amqp receiver | |
1361 | keys = list(bucket.list()) | |
1362 | print('total number of objects: ' + str(len(keys))) | |
1363 | receiver1.verify_s3_events(keys, exact_match=True) | |
1364 | receiver2.verify_s3_events(keys, exact_match=True) | |
1365 | ||
1366 | # delete objects from the bucket | |
1367 | client_threads = [] | |
1368 | start_time = time.time() | |
1369 | for key in bucket.list(): | |
1370 | thr = threading.Thread(target = key.delete, args=()) | |
1371 | thr.start() | |
1372 | client_threads.append(thr) | |
1373 | [thr.join() for thr in client_threads] | |
1374 | ||
1375 | time_diff = time.time() - start_time | |
1376 | print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1377 | ||
1378 | print('wait for 5sec for the messages...') | |
1379 | time.sleep(5) | |
1380 | ||
1381 | # check amqp receiver 1 for deletions | |
1382 | receiver1.verify_s3_events(keys, exact_match=True, deletions=True) | |
1383 | # check amqp receiver 2 has no deletions | |
1384 | try: | |
1385 | receiver1.verify_s3_events(keys, exact_match=False, deletions=True) | |
1386 | except: | |
1387 | pass | |
1388 | else: | |
1389 | err = 'amqp receiver 2 should have no deletions' | |
1390 | assert False, err | |
1391 | ||
1392 | # cleanup | |
1393 | stop_amqp_receiver(receiver1, task1) | |
1394 | stop_amqp_receiver(receiver2, task2) | |
1395 | s3_notification_conf.del_config() | |
1396 | topic_conf1.del_config() | |
1397 | topic_conf2.del_config() | |
1398 | # delete the bucket | |
1399 | conn.delete_bucket(bucket_name) | |
1400 | ||
1401 | ||
1402 | @attr('manual_test') | |
1403 | def test_ps_s3_notification_push_amqp_idleness_check(): | |
1404 | """ test pushing amqp s3 notification and checking for connection idleness """ | |
1405 | return SkipTest("only used in manual testing") | |
1406 | hostname = get_ip() | |
1407 | conn = connection() | |
1408 | zonegroup = 'default' | |
1409 | ||
1410 | # create bucket | |
1411 | bucket_name = gen_bucket_name() | |
1412 | bucket = conn.create_bucket(bucket_name) | |
1413 | topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' | |
1414 | ||
1415 | # start amqp receivers | |
1416 | exchange = 'ex1' | |
1417 | task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1) | |
1418 | task1.start() | |
1419 | ||
1420 | # create two s3 topic | |
1421 | endpoint_address = 'amqp://' + hostname | |
1422 | # with acks from broker | |
1423 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
1424 | topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args) | |
1425 | topic_arn1 = topic_conf1.set_config() | |
1426 | # create s3 notification | |
1427 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1428 | topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, | |
1429 | 'Events': [] | |
1430 | }] | |
1431 | ||
1432 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1433 | response, status = s3_notification_conf.set_config() | |
1434 | assert_equal(status/100, 2) | |
1435 | ||
1436 | # create objects in the bucket (async) | |
1437 | number_of_objects = 10 | |
1438 | client_threads = [] | |
1439 | start_time = time.time() | |
1440 | for i in range(number_of_objects): | |
1441 | key = bucket.new_key(str(i)) | |
1442 | content = str(os.urandom(1024*1024)) | |
1443 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1444 | thr.start() | |
1445 | client_threads.append(thr) | |
1446 | [thr.join() for thr in client_threads] | |
1447 | ||
1448 | time_diff = time.time() - start_time | |
1449 | print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1450 | ||
1451 | print('wait for 5sec for the messages...') | |
1452 | time.sleep(5) | |
1453 | ||
1454 | # check amqp receiver | |
1455 | keys = list(bucket.list()) | |
1456 | print('total number of objects: ' + str(len(keys))) | |
1457 | receiver1.verify_s3_events(keys, exact_match=True) | |
1458 | ||
1459 | # delete objects from the bucket | |
1460 | client_threads = [] | |
1461 | start_time = time.time() | |
1462 | for key in bucket.list(): | |
1463 | thr = threading.Thread(target = key.delete, args=()) | |
1464 | thr.start() | |
1465 | client_threads.append(thr) | |
1466 | [thr.join() for thr in client_threads] | |
1467 | ||
1468 | time_diff = time.time() - start_time | |
1469 | print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1470 | ||
1471 | print('wait for 5sec for the messages...') | |
1472 | time.sleep(5) | |
1473 | ||
1474 | # check amqp receiver 1 for deletions | |
1475 | receiver1.verify_s3_events(keys, exact_match=True, deletions=True) | |
1476 | ||
1477 | print('waiting for 40sec for checking idleness') | |
1478 | time.sleep(40) | |
1479 | ||
1480 | os.system("netstat -nnp | grep 5672"); | |
1481 | ||
1482 | # do the process of uploading an object and checking for notification again | |
1483 | number_of_objects = 10 | |
1484 | client_threads = [] | |
1485 | start_time = time.time() | |
1486 | for i in range(number_of_objects): | |
1487 | key = bucket.new_key(str(i)) | |
1488 | content = str(os.urandom(1024*1024)) | |
1489 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1490 | thr.start() | |
1491 | client_threads.append(thr) | |
1492 | [thr.join() for thr in client_threads] | |
1493 | ||
1494 | time_diff = time.time() - start_time | |
1495 | print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1496 | ||
1497 | print('wait for 5sec for the messages...') | |
1498 | time.sleep(5) | |
1499 | ||
1500 | # check amqp receiver | |
1501 | keys = list(bucket.list()) | |
1502 | print('total number of objects: ' + str(len(keys))) | |
1503 | receiver1.verify_s3_events(keys, exact_match=True) | |
1504 | ||
1505 | # delete objects from the bucket | |
1506 | client_threads = [] | |
1507 | start_time = time.time() | |
1508 | for key in bucket.list(): | |
1509 | thr = threading.Thread(target = key.delete, args=()) | |
1510 | thr.start() | |
1511 | client_threads.append(thr) | |
1512 | [thr.join() for thr in client_threads] | |
1513 | ||
1514 | time_diff = time.time() - start_time | |
1515 | print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1516 | ||
1517 | print('wait for 5sec for the messages...') | |
1518 | time.sleep(5) | |
1519 | ||
1520 | # check amqp receiver 1 for deletions | |
1521 | receiver1.verify_s3_events(keys, exact_match=True, deletions=True) | |
1522 | ||
1523 | os.system("netstat -nnp | grep 5672"); | |
1524 | ||
1525 | # cleanup | |
1526 | stop_amqp_receiver(receiver1, task1) | |
1527 | s3_notification_conf.del_config() | |
1528 | topic_conf1.del_config() | |
1529 | # delete the bucket | |
1530 | conn.delete_bucket(bucket_name) | |
1531 | ||
1532 | ||
1533 | @attr('kafka_test') | |
1534 | def test_ps_s3_notification_push_kafka_on_master(): | |
1535 | """ test pushing kafka s3 notification on master """ | |
1536 | conn = connection() | |
1537 | zonegroup = 'default' | |
1538 | ||
1539 | # create bucket | |
1540 | bucket_name = gen_bucket_name() | |
1541 | bucket = conn.create_bucket(bucket_name) | |
1542 | # name is constant for manual testing | |
1543 | topic_name = bucket_name+'_topic' | |
1544 | # create consumer on the topic | |
1545 | ||
1546 | try: | |
1547 | s3_notification_conf = None | |
1548 | topic_conf1 = None | |
1549 | topic_conf2 = None | |
1550 | receiver = None | |
1551 | task, receiver = create_kafka_receiver_thread(topic_name+'_1') | |
1552 | task.start() | |
1553 | ||
1554 | # create s3 topic | |
1555 | endpoint_address = 'kafka://' + kafka_server | |
1556 | # without acks from broker | |
1557 | endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' | |
1558 | topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) | |
1559 | topic_arn1 = topic_conf1.set_config() | |
1560 | endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none' | |
1561 | topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) | |
1562 | topic_arn2 = topic_conf2.set_config() | |
1563 | # create s3 notification | |
1564 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1565 | topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, | |
1566 | 'Events': [] | |
1567 | }, | |
1568 | {'Id': notification_name + '_2', 'TopicArn': topic_arn2, | |
1569 | 'Events': [] | |
1570 | }] | |
1571 | ||
1572 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1573 | response, status = s3_notification_conf.set_config() | |
1574 | assert_equal(status/100, 2) | |
1575 | ||
1576 | # create objects in the bucket (async) | |
1577 | number_of_objects = 10 | |
1578 | client_threads = [] | |
1579 | etags = [] | |
1580 | start_time = time.time() | |
1581 | for i in range(number_of_objects): | |
1582 | key = bucket.new_key(str(i)) | |
1583 | content = str(os.urandom(1024*1024)) | |
1584 | etag = hashlib.md5(content.encode()).hexdigest() | |
1585 | etags.append(etag) | |
1586 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1587 | thr.start() | |
1588 | client_threads.append(thr) | |
1589 | [thr.join() for thr in client_threads] | |
1590 | ||
1591 | time_diff = time.time() - start_time | |
1592 | print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1593 | ||
1594 | print('wait for 5sec for the messages...') | |
1595 | time.sleep(5) | |
1596 | keys = list(bucket.list()) | |
1597 | receiver.verify_s3_events(keys, exact_match=True, etags=etags) | |
1598 | ||
1599 | # delete objects from the bucket | |
1600 | client_threads = [] | |
1601 | start_time = time.time() | |
1602 | for key in bucket.list(): | |
1603 | thr = threading.Thread(target = key.delete, args=()) | |
1604 | thr.start() | |
1605 | client_threads.append(thr) | |
1606 | [thr.join() for thr in client_threads] | |
1607 | ||
1608 | time_diff = time.time() - start_time | |
1609 | print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1610 | ||
1611 | print('wait for 5sec for the messages...') | |
1612 | time.sleep(5) | |
1613 | receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags) | |
1614 | except Exception as e: | |
1615 | print(e) | |
1616 | assert False | |
1617 | finally: | |
1618 | # cleanup | |
1619 | if s3_notification_conf is not None: | |
1620 | s3_notification_conf.del_config() | |
1621 | if topic_conf1 is not None: | |
1622 | topic_conf1.del_config() | |
1623 | if topic_conf2 is not None: | |
1624 | topic_conf2.del_config() | |
1625 | # delete the bucket | |
1e59de90 TL |
1626 | for key in bucket.list(): |
1627 | key.delete() | |
20effc67 TL |
1628 | conn.delete_bucket(bucket_name) |
1629 | if receiver is not None: | |
1630 | stop_kafka_receiver(receiver, task) | |
1631 | ||
1632 | ||
1633 | @attr('http_test') | |
1634 | def test_ps_s3_notification_multi_delete_on_master(): | |
1635 | """ test deletion of multiple keys on master """ | |
1636 | hostname = get_ip() | |
1637 | conn = connection() | |
1638 | zonegroup = 'default' | |
1639 | ||
1640 | # create random port for the http server | |
1641 | host = get_ip() | |
1642 | port = random.randint(10000, 20000) | |
1643 | # start an http server in a separate thread | |
1644 | number_of_objects = 10 | |
1645 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
1646 | ||
1647 | # create bucket | |
1648 | bucket_name = gen_bucket_name() | |
1649 | bucket = conn.create_bucket(bucket_name) | |
1650 | topic_name = bucket_name + TOPIC_SUFFIX | |
1651 | ||
1652 | # create s3 topic | |
1653 | endpoint_address = 'http://'+host+':'+str(port) | |
1654 | endpoint_args = 'push-endpoint='+endpoint_address | |
1655 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
1656 | topic_arn = topic_conf.set_config() | |
1657 | # create s3 notification | |
1658 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1659 | topic_conf_list = [{'Id': notification_name, | |
1660 | 'TopicArn': topic_arn, | |
1661 | 'Events': ['s3:ObjectRemoved:*'] | |
1662 | }] | |
1663 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1664 | response, status = s3_notification_conf.set_config() | |
1665 | assert_equal(status/100, 2) | |
1666 | ||
1667 | # create objects in the bucket | |
1668 | client_threads = [] | |
1669 | objects_size = {} | |
1670 | for i in range(number_of_objects): | |
1671 | content = str(os.urandom(randint(1, 1024))) | |
1672 | object_size = len(content) | |
1673 | key = bucket.new_key(str(i)) | |
1674 | objects_size[key.name] = object_size | |
1675 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1676 | thr.start() | |
1677 | client_threads.append(thr) | |
1678 | [thr.join() for thr in client_threads] | |
1679 | ||
1680 | keys = list(bucket.list()) | |
1681 | ||
1682 | start_time = time.time() | |
1683 | delete_all_objects(conn, bucket_name) | |
1684 | time_diff = time.time() - start_time | |
1685 | print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1686 | ||
1687 | print('wait for 5sec for the messages...') | |
1688 | time.sleep(5) | |
1689 | ||
1690 | # check http receiver | |
1691 | http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) | |
1692 | ||
1693 | # cleanup | |
1694 | topic_conf.del_config() | |
1695 | s3_notification_conf.del_config(notification=notification_name) | |
1696 | # delete the bucket | |
1697 | conn.delete_bucket(bucket_name) | |
1698 | http_server.close() | |
1699 | ||
1700 | ||
1701 | @attr('http_test') | |
1702 | def test_ps_s3_notification_push_http_on_master(): | |
1703 | """ test pushing http s3 notification on master """ | |
1704 | hostname = get_ip_http() | |
1705 | conn = connection() | |
1706 | zonegroup = 'default' | |
1707 | ||
1708 | # create random port for the http server | |
1709 | host = get_ip() | |
1710 | port = random.randint(10000, 20000) | |
1711 | # start an http server in a separate thread | |
1712 | number_of_objects = 10 | |
1713 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
1714 | ||
1715 | # create bucket | |
1716 | bucket_name = gen_bucket_name() | |
1717 | bucket = conn.create_bucket(bucket_name) | |
1718 | topic_name = bucket_name + TOPIC_SUFFIX | |
1719 | ||
1720 | # create s3 topic | |
1721 | endpoint_address = 'http://'+host+':'+str(port) | |
1722 | endpoint_args = 'push-endpoint='+endpoint_address | |
1723 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
1724 | topic_arn = topic_conf.set_config() | |
1725 | # create s3 notification | |
1726 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1727 | topic_conf_list = [{'Id': notification_name, | |
1728 | 'TopicArn': topic_arn, | |
1729 | 'Events': [] | |
1730 | }] | |
1731 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1732 | response, status = s3_notification_conf.set_config() | |
1733 | assert_equal(status/100, 2) | |
1734 | ||
1735 | # create objects in the bucket | |
1736 | client_threads = [] | |
1737 | objects_size = {} | |
1738 | start_time = time.time() | |
1739 | for i in range(number_of_objects): | |
1740 | content = str(os.urandom(randint(1, 1024))) | |
1741 | object_size = len(content) | |
1742 | key = bucket.new_key(str(i)) | |
1743 | objects_size[key.name] = object_size | |
1744 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1745 | thr.start() | |
1746 | client_threads.append(thr) | |
1747 | [thr.join() for thr in client_threads] | |
1748 | ||
1749 | time_diff = time.time() - start_time | |
1750 | print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1751 | ||
1752 | print('wait for 5sec for the messages...') | |
1753 | time.sleep(5) | |
1754 | ||
1755 | # check http receiver | |
1756 | keys = list(bucket.list()) | |
1757 | http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size) | |
1758 | ||
1759 | # delete objects from the bucket | |
1760 | client_threads = [] | |
1761 | start_time = time.time() | |
1762 | for key in bucket.list(): | |
1763 | thr = threading.Thread(target = key.delete, args=()) | |
1764 | thr.start() | |
1765 | client_threads.append(thr) | |
1766 | [thr.join() for thr in client_threads] | |
1767 | ||
1768 | time_diff = time.time() - start_time | |
1769 | print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1770 | ||
1771 | print('wait for 5sec for the messages...') | |
1772 | time.sleep(5) | |
1773 | ||
1774 | # check http receiver | |
1775 | http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) | |
1776 | ||
1777 | # cleanup | |
1778 | topic_conf.del_config() | |
1779 | s3_notification_conf.del_config(notification=notification_name) | |
1780 | # delete the bucket | |
1781 | conn.delete_bucket(bucket_name) | |
1782 | http_server.close() | |
1783 | ||
1784 | ||
1785 | @attr('http_test') | |
1786 | def test_ps_s3_notification_push_cloudevents_on_master(): | |
1787 | """ test pushing cloudevents notification on master """ | |
1788 | hostname = get_ip_http() | |
1789 | conn = connection() | |
1790 | zonegroup = 'default' | |
1791 | ||
1792 | # create random port for the http server | |
1793 | host = get_ip() | |
1794 | port = random.randint(10000, 20000) | |
1795 | # start an http server in a separate thread | |
1796 | number_of_objects = 10 | |
1797 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects, cloudevents=True) | |
1798 | ||
1799 | # create bucket | |
1800 | bucket_name = gen_bucket_name() | |
1801 | bucket = conn.create_bucket(bucket_name) | |
1802 | topic_name = bucket_name + TOPIC_SUFFIX | |
1803 | ||
1804 | # create s3 topic | |
1805 | endpoint_address = 'http://'+host+':'+str(port) | |
1806 | endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true' | |
1807 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
1808 | topic_arn = topic_conf.set_config() | |
1809 | # create s3 notification | |
1810 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1811 | topic_conf_list = [{'Id': notification_name, | |
1812 | 'TopicArn': topic_arn, | |
1813 | 'Events': [] | |
1814 | }] | |
1815 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1816 | response, status = s3_notification_conf.set_config() | |
1817 | assert_equal(status/100, 2) | |
1818 | ||
1819 | # create objects in the bucket | |
1820 | client_threads = [] | |
1821 | objects_size = {} | |
1822 | start_time = time.time() | |
1823 | for i in range(number_of_objects): | |
1824 | content = str(os.urandom(randint(1, 1024))) | |
1825 | object_size = len(content) | |
1826 | key = bucket.new_key(str(i)) | |
1827 | objects_size[key.name] = object_size | |
1828 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1829 | thr.start() | |
1830 | client_threads.append(thr) | |
1831 | [thr.join() for thr in client_threads] | |
1832 | ||
1833 | time_diff = time.time() - start_time | |
1834 | print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1835 | ||
1836 | print('wait for 5sec for the messages...') | |
1837 | time.sleep(5) | |
1838 | ||
1839 | # check http receiver | |
1840 | keys = list(bucket.list()) | |
1841 | http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size) | |
1842 | ||
1843 | # delete objects from the bucket | |
1844 | client_threads = [] | |
1845 | start_time = time.time() | |
1846 | for key in bucket.list(): | |
1847 | thr = threading.Thread(target = key.delete, args=()) | |
1848 | thr.start() | |
1849 | client_threads.append(thr) | |
1850 | [thr.join() for thr in client_threads] | |
1851 | ||
1852 | time_diff = time.time() - start_time | |
1853 | print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1854 | ||
1855 | print('wait for 5sec for the messages...') | |
1856 | time.sleep(5) | |
1857 | ||
1858 | # check http receiver | |
1859 | http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) | |
1860 | ||
1861 | # cleanup | |
1862 | topic_conf.del_config() | |
1863 | s3_notification_conf.del_config(notification=notification_name) | |
1864 | # delete the bucket | |
1865 | conn.delete_bucket(bucket_name) | |
1866 | http_server.close() | |
1867 | ||
1868 | ||
1869 | @attr('http_test') | |
1870 | def test_ps_s3_opaque_data_on_master(): | |
1871 | """ test that opaque id set in topic, is sent in notification on master """ | |
1872 | hostname = get_ip() | |
1873 | conn = connection() | |
1874 | zonegroup = 'default' | |
1875 | ||
1876 | # create random port for the http server | |
1877 | host = get_ip() | |
1878 | port = random.randint(10000, 20000) | |
1879 | # start an http server in a separate thread | |
1880 | number_of_objects = 10 | |
1881 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
1882 | ||
1883 | # create bucket | |
1884 | bucket_name = gen_bucket_name() | |
1885 | bucket = conn.create_bucket(bucket_name) | |
1886 | topic_name = bucket_name + TOPIC_SUFFIX | |
1887 | ||
1888 | # create s3 topic | |
1889 | endpoint_address = 'http://'+host+':'+str(port) | |
1890 | endpoint_args = 'push-endpoint='+endpoint_address | |
1891 | opaque_data = 'http://1.2.3.4:8888' | |
1892 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) | |
1893 | topic_arn = topic_conf.set_config() | |
1894 | # create s3 notification | |
1895 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1896 | topic_conf_list = [{'Id': notification_name, | |
1897 | 'TopicArn': topic_arn, | |
1898 | 'Events': [] | |
1899 | }] | |
1900 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1901 | response, status = s3_notification_conf.set_config() | |
1902 | assert_equal(status/100, 2) | |
1903 | ||
1904 | # create objects in the bucket | |
1905 | client_threads = [] | |
1906 | start_time = time.time() | |
1907 | content = 'bar' | |
1908 | for i in range(number_of_objects): | |
1909 | key = bucket.new_key(str(i)) | |
1910 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1911 | thr.start() | |
1912 | client_threads.append(thr) | |
1913 | [thr.join() for thr in client_threads] | |
1914 | ||
1915 | time_diff = time.time() - start_time | |
1916 | print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1917 | ||
1918 | print('wait for 5sec for the messages...') | |
1919 | time.sleep(5) | |
1920 | ||
1921 | # check http receiver | |
1922 | keys = list(bucket.list()) | |
1923 | print('total number of objects: ' + str(len(keys))) | |
1924 | events = http_server.get_and_reset_events() | |
1925 | for event in events: | |
1926 | assert_equal(event['Records'][0]['opaqueData'], opaque_data) | |
1927 | ||
1928 | # cleanup | |
1929 | for key in keys: | |
1930 | key.delete() | |
1931 | [thr.join() for thr in client_threads] | |
1932 | topic_conf.del_config() | |
1933 | s3_notification_conf.del_config(notification=notification_name) | |
1934 | # delete the bucket | |
1935 | conn.delete_bucket(bucket_name) | |
1936 | http_server.close() | |
1937 | ||
1938 | @attr('http_test') | |
1939 | def test_ps_s3_lifecycle_on_master(): | |
1940 | """ test that when object is deleted due to lifecycle policy, notification is sent on master """ | |
1941 | hostname = get_ip() | |
1942 | conn = connection() | |
1943 | zonegroup = 'default' | |
1944 | ||
1945 | # create random port for the http server | |
1946 | host = get_ip() | |
1947 | port = random.randint(10000, 20000) | |
1948 | # start an http server in a separate thread | |
1949 | number_of_objects = 10 | |
1950 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
1951 | ||
1952 | # create bucket | |
1953 | bucket_name = gen_bucket_name() | |
1954 | bucket = conn.create_bucket(bucket_name) | |
1955 | topic_name = bucket_name + TOPIC_SUFFIX | |
1956 | ||
1957 | # create s3 topic | |
1958 | endpoint_address = 'http://'+host+':'+str(port) | |
1959 | endpoint_args = 'push-endpoint='+endpoint_address | |
1960 | opaque_data = 'http://1.2.3.4:8888' | |
1961 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) | |
1962 | topic_arn = topic_conf.set_config() | |
1963 | # create s3 notification | |
1964 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
1965 | topic_conf_list = [{'Id': notification_name, | |
1966 | 'TopicArn': topic_arn, | |
1967 | 'Events': ['s3:ObjectLifecycle:Expiration:*'] | |
1968 | }] | |
1969 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1970 | response, status = s3_notification_conf.set_config() | |
1971 | assert_equal(status/100, 2) | |
1972 | ||
1973 | # create objects in the bucket | |
1974 | obj_prefix = 'ooo' | |
1975 | client_threads = [] | |
1976 | start_time = time.time() | |
1977 | content = 'bar' | |
1978 | for i in range(number_of_objects): | |
1979 | key = bucket.new_key(obj_prefix + str(i)) | |
1980 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
1981 | thr.start() | |
1982 | client_threads.append(thr) | |
1983 | [thr.join() for thr in client_threads] | |
1984 | ||
1985 | time_diff = time.time() - start_time | |
1986 | print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
1987 | ||
1988 | # create lifecycle policy | |
1989 | client = boto3.client('s3', | |
1990 | endpoint_url='http://'+conn.host+':'+str(conn.port), | |
1991 | aws_access_key_id=conn.aws_access_key_id, | |
1992 | aws_secret_access_key=conn.aws_secret_access_key) | |
1993 | yesterday = datetime.date.today() - datetime.timedelta(days=1) | |
1994 | response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, | |
1995 | LifecycleConfiguration={'Rules': [ | |
1996 | { | |
1997 | 'ID': 'rule1', | |
1998 | 'Expiration': {'Date': yesterday.isoformat()}, | |
1999 | 'Filter': {'Prefix': obj_prefix}, | |
2000 | 'Status': 'Enabled', | |
2001 | } | |
2002 | ] | |
2003 | } | |
2004 | ) | |
2005 | ||
2006 | # start lifecycle processing | |
2007 | admin(['lc', 'process']) | |
2008 | print('wait for 5sec for the messages...') | |
2009 | time.sleep(5) | |
2010 | ||
2011 | # check http receiver does not have messages | |
2012 | keys = list(bucket.list()) | |
2013 | print('total number of objects: ' + str(len(keys))) | |
2014 | event_keys = [] | |
2015 | events = http_server.get_and_reset_events() | |
2016 | for event in events: | |
2017 | assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:Current') | |
2018 | event_keys.append(event['Records'][0]['s3']['object']['key']) | |
2019 | for key in keys: | |
2020 | key_found = False | |
2021 | for event_key in event_keys: | |
2022 | if event_key == key: | |
2023 | key_found = True | |
2024 | break | |
2025 | if not key_found: | |
2026 | err = 'no lifecycle event found for key: ' + str(key) | |
2027 | log.error(events) | |
2028 | assert False, err | |
2029 | ||
2030 | # cleanup | |
2031 | for key in keys: | |
2032 | key.delete() | |
2033 | [thr.join() for thr in client_threads] | |
2034 | topic_conf.del_config() | |
2035 | s3_notification_conf.del_config(notification=notification_name) | |
2036 | # delete the bucket | |
2037 | conn.delete_bucket(bucket_name) | |
2038 | http_server.close() | |
2039 | ||
2040 | ||
2041 | def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): | |
2042 | """ test object creation s3 notifications in using put/copy/post on master""" | |
2043 | ||
1e59de90 | 2044 | if not external_endpoint_address: |
20effc67 TL |
2045 | hostname = 'localhost' |
2046 | proc = init_rabbitmq() | |
2047 | if proc is None: | |
2048 | return SkipTest('end2end amqp tests require rabbitmq-server installed') | |
2049 | else: | |
2050 | proc = None | |
2051 | ||
2052 | conn = connection() | |
2053 | hostname = 'localhost' | |
2054 | zonegroup = 'default' | |
2055 | ||
2056 | # create bucket | |
2057 | bucket_name = gen_bucket_name() | |
2058 | bucket = conn.create_bucket(bucket_name) | |
2059 | topic_name = bucket_name + TOPIC_SUFFIX | |
2060 | ||
2061 | # start amqp receiver | |
2062 | exchange = 'ex1' | |
2063 | task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location) | |
2064 | task.start() | |
2065 | ||
2066 | # create s3 topic | |
2067 | if external_endpoint_address: | |
2068 | endpoint_address = external_endpoint_address | |
2069 | elif ca_location: | |
2070 | endpoint_address = 'amqps://' + hostname | |
2071 | else: | |
2072 | endpoint_address = 'amqp://' + hostname | |
2073 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl | |
2074 | if ca_location: | |
2075 | endpoint_args += '&ca-location={}'.format(ca_location) | |
2076 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
2077 | topic_arn = topic_conf.set_config() | |
2078 | # create s3 notification | |
2079 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2080 | topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, | |
2081 | 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:CompleteMultipartUpload'] | |
2082 | }] | |
2083 | ||
2084 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
2085 | response, status = s3_notification_conf.set_config() | |
2086 | assert_equal(status/100, 2) | |
2087 | ||
2088 | objects_size = {} | |
2089 | # create objects in the bucket using PUT | |
2090 | content = str(os.urandom(randint(1, 1024))) | |
2091 | key_name = 'put' | |
2092 | key = bucket.new_key(key_name) | |
2093 | objects_size[key_name] = len(content) | |
2094 | key.set_contents_from_string(content) | |
2095 | # create objects in the bucket using COPY | |
2096 | key_name = 'copy' | |
2097 | bucket.copy_key(key_name, bucket.name, key.name) | |
2098 | objects_size[key_name] = len(content) | |
2099 | ||
2100 | # create objects in the bucket using multi-part upload | |
2101 | fp = tempfile.NamedTemporaryFile(mode='w+b') | |
2102 | content = bytearray(os.urandom(10*1024*1024)) | |
2103 | key_name = 'multipart' | |
2104 | objects_size[key_name] = len(content) | |
2105 | fp.write(content) | |
2106 | fp.flush() | |
2107 | fp.seek(0) | |
2108 | uploader = bucket.initiate_multipart_upload(key_name) | |
2109 | uploader.upload_part_from_file(fp, 1) | |
2110 | uploader.complete_upload() | |
2111 | fp.close() | |
2112 | ||
2113 | print('wait for 5sec for the messages...') | |
2114 | time.sleep(5) | |
2115 | ||
2116 | # check amqp receiver | |
2117 | keys = list(bucket.list()) | |
2118 | receiver.verify_s3_events(keys, exact_match=True, expected_sizes=objects_size) | |
2119 | ||
2120 | # cleanup | |
2121 | stop_amqp_receiver(receiver, task) | |
2122 | s3_notification_conf.del_config() | |
2123 | topic_conf.del_config() | |
2124 | for key in bucket.list(): | |
2125 | key.delete() | |
2126 | # delete the bucket | |
2127 | conn.delete_bucket(bucket_name) | |
2128 | if proc: | |
2129 | clean_rabbitmq(proc) | |
2130 | ||
2131 | ||
2132 | @attr('amqp_test') | |
2133 | def test_ps_s3_creation_triggers_on_master(): | |
1e59de90 | 2134 | ps_s3_creation_triggers_on_master(external_endpoint_address="amqp://localhost:5672") |
20effc67 TL |
2135 | |
2136 | ||
2137 | @attr('amqp_ssl_test') | |
2138 | def test_ps_s3_creation_triggers_on_master_external(): | |
20effc67 TL |
2139 | |
2140 | from distutils.util import strtobool | |
2141 | ||
2142 | if 'AMQP_EXTERNAL_ENDPOINT' in os.environ: | |
2143 | try: | |
2144 | if strtobool(os.environ['AMQP_VERIFY_SSL']): | |
2145 | verify_ssl = 'true' | |
2146 | else: | |
2147 | verify_ssl = 'false' | |
2148 | except Exception as e: | |
2149 | verify_ssl = 'true' | |
2150 | ||
2151 | ps_s3_creation_triggers_on_master( | |
2152 | external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'], | |
2153 | verify_ssl=verify_ssl) | |
2154 | else: | |
2155 | return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run") | |
2156 | ||
2157 | ||
1e59de90 | 2158 | def generate_private_key(tempdir): |
20effc67 TL |
2159 | |
2160 | import datetime | |
20effc67 TL |
2161 | import stat |
2162 | from cryptography import x509 | |
2163 | from cryptography.x509.oid import NameOID | |
2164 | from cryptography.hazmat.primitives import hashes | |
2165 | from cryptography.hazmat.backends import default_backend | |
2166 | from cryptography.hazmat.primitives import serialization | |
2167 | from cryptography.hazmat.primitives.asymmetric import rsa | |
1e59de90 TL |
2168 | |
2169 | # modify permissions to ensure that the broker user can access them | |
2170 | os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) | |
2171 | CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem') | |
2172 | CERTFILE = os.path.join(tempdir, 'server_certificate.pem') | |
2173 | KEYFILE = os.path.join(tempdir, 'server_key.pem') | |
2174 | ||
2175 | root_key = rsa.generate_private_key( | |
2176 | public_exponent=65537, | |
2177 | key_size=2048, | |
2178 | backend=default_backend() | |
2179 | ) | |
2180 | subject = issuer = x509.Name([ | |
2181 | x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), | |
2182 | x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), | |
2183 | x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), | |
2184 | x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), | |
2185 | x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"), | |
2186 | ]) | |
2187 | root_cert = x509.CertificateBuilder().subject_name( | |
2188 | subject | |
2189 | ).issuer_name( | |
2190 | issuer | |
2191 | ).public_key( | |
2192 | root_key.public_key() | |
2193 | ).serial_number( | |
2194 | x509.random_serial_number() | |
2195 | ).not_valid_before( | |
2196 | datetime.datetime.utcnow() | |
2197 | ).not_valid_after( | |
2198 | datetime.datetime.utcnow() + datetime.timedelta(days=3650) | |
2199 | ).add_extension( | |
2200 | x509.BasicConstraints(ca=True, path_length=None), critical=True | |
2201 | ).sign(root_key, hashes.SHA256(), default_backend()) | |
2202 | with open(CACERTFILE, "wb") as f: | |
2203 | f.write(root_cert.public_bytes(serialization.Encoding.PEM)) | |
2204 | ||
2205 | # Now we want to generate a cert from that root | |
2206 | cert_key = rsa.generate_private_key( | |
2207 | public_exponent=65537, | |
2208 | key_size=2048, | |
2209 | backend=default_backend(), | |
2210 | ) | |
2211 | with open(KEYFILE, "wb") as f: | |
2212 | f.write(cert_key.private_bytes( | |
2213 | encoding=serialization.Encoding.PEM, | |
2214 | format=serialization.PrivateFormat.PKCS8, | |
2215 | encryption_algorithm=serialization.NoEncryption(), | |
2216 | )) | |
2217 | new_subject = x509.Name([ | |
2218 | x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), | |
2219 | x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), | |
2220 | x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), | |
2221 | x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), | |
2222 | ]) | |
2223 | cert = x509.CertificateBuilder().subject_name( | |
2224 | new_subject | |
2225 | ).issuer_name( | |
2226 | root_cert.issuer | |
2227 | ).public_key( | |
2228 | cert_key.public_key() | |
2229 | ).serial_number( | |
2230 | x509.random_serial_number() | |
2231 | ).not_valid_before( | |
2232 | datetime.datetime.utcnow() | |
2233 | ).not_valid_after( | |
2234 | datetime.datetime.utcnow() + datetime.timedelta(days=30) | |
2235 | ).add_extension( | |
2236 | x509.SubjectAlternativeName([x509.DNSName(u"localhost")]), | |
2237 | critical=False, | |
2238 | ).sign(root_key, hashes.SHA256(), default_backend()) | |
2239 | # Write our certificate out to disk. | |
2240 | with open(CERTFILE, "wb") as f: | |
2241 | f.write(cert.public_bytes(serialization.Encoding.PEM)) | |
2242 | ||
2243 | print("\n\n********private key generated********") | |
2244 | print(CACERTFILE, CERTFILE, KEYFILE) | |
2245 | print("\n\n") | |
2246 | return CACERTFILE, CERTFILE, KEYFILE | |
2247 | ||
2248 | ||
2249 | @attr('amqp_ssl_test') | |
2250 | def test_ps_s3_creation_triggers_on_master_ssl(): | |
2251 | ||
2252 | import textwrap | |
20effc67 TL |
2253 | from tempfile import TemporaryDirectory |
2254 | ||
2255 | with TemporaryDirectory() as tempdir: | |
1e59de90 | 2256 | CACERTFILE, CERTFILE, KEYFILE = generate_private_key(tempdir) |
20effc67 | 2257 | RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config') |
20effc67 TL |
2258 | with open(RABBITMQ_CONF_FILE, "w") as f: |
2259 | # use the old style config format to ensure it also runs on older RabbitMQ versions. | |
2260 | f.write(textwrap.dedent(f''' | |
2261 | [ | |
2262 | {{rabbit, [ | |
2263 | {{ssl_listeners, [5671]}}, | |
2264 | {{ssl_options, [{{cacertfile, "{CACERTFILE}"}}, | |
2265 | {{certfile, "{CERTFILE}"}}, | |
2266 | {{keyfile, "{KEYFILE}"}}, | |
2267 | {{verify, verify_peer}}, | |
2268 | {{fail_if_no_peer_cert, false}}]}}]}} | |
2269 | ]. | |
2270 | ''')) | |
2271 | os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0] | |
2272 | ||
2273 | ps_s3_creation_triggers_on_master(ca_location=CACERTFILE) | |
2274 | ||
2275 | del os.environ['RABBITMQ_CONFIG_FILE'] | |
2276 | ||
2277 | ||
2278 | @attr('amqp_test') | |
2279 | def test_http_post_object_upload(): | |
2280 | """ test that uploads object using HTTP POST """ | |
2281 | ||
2282 | import boto3 | |
2283 | from collections import OrderedDict | |
2284 | import requests | |
2285 | ||
2286 | hostname = get_ip() | |
2287 | zonegroup = 'default' | |
2288 | conn = connection() | |
2289 | ||
2290 | endpoint = "http://%s:%d" % (get_config_host(), get_config_port()) | |
2291 | ||
2292 | conn1 = boto3.client(service_name='s3', | |
2293 | aws_access_key_id=get_access_key(), | |
2294 | aws_secret_access_key=get_secret_key(), | |
2295 | endpoint_url=endpoint, | |
2296 | ) | |
2297 | ||
2298 | bucket_name = gen_bucket_name() | |
2299 | topic_name = bucket_name + TOPIC_SUFFIX | |
2300 | ||
2301 | key_name = 'foo.txt' | |
2302 | ||
2303 | resp = conn1.generate_presigned_post(Bucket=bucket_name, Key=key_name,) | |
2304 | ||
2305 | url = resp['url'] | |
2306 | ||
2307 | bucket = conn1.create_bucket(ACL='public-read-write', Bucket=bucket_name) | |
2308 | ||
2309 | # start amqp receivers | |
2310 | exchange = 'ex1' | |
2311 | task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1') | |
2312 | task1.start() | |
2313 | ||
2314 | # create s3 topics | |
2315 | endpoint_address = 'amqp://' + hostname | |
2316 | endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker' | |
2317 | topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) | |
2318 | topic_arn1 = topic_conf1.set_config() | |
2319 | ||
2320 | # create s3 notifications | |
2321 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2322 | topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, | |
2323 | 'Events': ['s3:ObjectCreated:Post'] | |
2324 | }] | |
2325 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
2326 | response, status = s3_notification_conf.set_config() | |
2327 | assert_equal(status/100, 2) | |
2328 | ||
2329 | payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\ | |
2330 | ("Content-Type" , "text/plain"),('file', ('bar'))]) | |
2331 | ||
2332 | # POST upload | |
2333 | r = requests.post(url, files=payload, verify=True) | |
2334 | assert_equal(r.status_code, 204) | |
2335 | ||
2336 | # check amqp receiver | |
2337 | events = receiver1.get_and_reset_events() | |
2338 | assert_equal(len(events), 1) | |
2339 | ||
2340 | # cleanup | |
2341 | stop_amqp_receiver(receiver1, task1) | |
2342 | s3_notification_conf.del_config() | |
2343 | topic_conf1.del_config() | |
2344 | conn1.delete_object(Bucket=bucket_name, Key=key_name) | |
2345 | # delete the bucket | |
2346 | conn1.delete_bucket(Bucket=bucket_name) | |
2347 | ||
2348 | ||
2349 | @attr('amqp_test') | |
2350 | def test_ps_s3_multipart_on_master(): | |
2351 | """ test multipart object upload on master""" | |
2352 | ||
2353 | hostname = get_ip() | |
2354 | conn = connection() | |
2355 | zonegroup = 'default' | |
2356 | ||
2357 | # create bucket | |
2358 | bucket_name = gen_bucket_name() | |
2359 | bucket = conn.create_bucket(bucket_name) | |
2360 | topic_name = bucket_name + TOPIC_SUFFIX | |
2361 | ||
2362 | # start amqp receivers | |
2363 | exchange = 'ex1' | |
2364 | task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1') | |
2365 | task1.start() | |
2366 | task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2') | |
2367 | task2.start() | |
2368 | task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3') | |
2369 | task3.start() | |
2370 | ||
2371 | # create s3 topics | |
2372 | endpoint_address = 'amqp://' + hostname | |
2373 | endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker' | |
2374 | topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) | |
2375 | topic_arn1 = topic_conf1.set_config() | |
2376 | topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) | |
2377 | topic_arn2 = topic_conf2.set_config() | |
2378 | topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args) | |
2379 | topic_arn3 = topic_conf3.set_config() | |
2380 | ||
2381 | # create s3 notifications | |
2382 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2383 | topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, | |
2384 | 'Events': ['s3:ObjectCreated:*'] | |
2385 | }, | |
2386 | {'Id': notification_name+'_2', 'TopicArn': topic_arn2, | |
2387 | 'Events': ['s3:ObjectCreated:Post'] | |
2388 | }, | |
2389 | {'Id': notification_name+'_3', 'TopicArn': topic_arn3, | |
2390 | 'Events': ['s3:ObjectCreated:CompleteMultipartUpload'] | |
2391 | }] | |
2392 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
2393 | response, status = s3_notification_conf.set_config() | |
2394 | assert_equal(status/100, 2) | |
2395 | ||
2396 | # create objects in the bucket using multi-part upload | |
2397 | fp = tempfile.NamedTemporaryFile(mode='w+b') | |
2398 | object_size = 1024 | |
2399 | content = bytearray(os.urandom(object_size)) | |
2400 | fp.write(content) | |
2401 | fp.flush() | |
2402 | fp.seek(0) | |
2403 | uploader = bucket.initiate_multipart_upload('multipart') | |
2404 | uploader.upload_part_from_file(fp, 1) | |
2405 | uploader.complete_upload() | |
2406 | fp.close() | |
2407 | ||
2408 | print('wait for 5sec for the messages...') | |
2409 | time.sleep(5) | |
2410 | ||
2411 | # check amqp receiver | |
2412 | events = receiver1.get_and_reset_events() | |
2413 | assert_equal(len(events), 1) | |
2414 | ||
2415 | events = receiver2.get_and_reset_events() | |
2416 | assert_equal(len(events), 0) | |
2417 | ||
2418 | events = receiver3.get_and_reset_events() | |
2419 | assert_equal(len(events), 1) | |
2420 | assert_equal(events[0]['Records'][0]['eventName'], 'ObjectCreated:CompleteMultipartUpload') | |
2421 | assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3') | |
1e59de90 TL |
2422 | assert_equal(events[0]['Records'][0]['s3']['object']['size'], object_size) |
2423 | assert events[0]['Records'][0]['eventTime'] != '0.000000', 'invalid eventTime' | |
20effc67 TL |
2424 | |
2425 | # cleanup | |
2426 | stop_amqp_receiver(receiver1, task1) | |
2427 | stop_amqp_receiver(receiver2, task2) | |
2428 | stop_amqp_receiver(receiver3, task3) | |
2429 | s3_notification_conf.del_config() | |
2430 | topic_conf1.del_config() | |
2431 | topic_conf2.del_config() | |
2432 | topic_conf3.del_config() | |
2433 | for key in bucket.list(): | |
2434 | key.delete() | |
2435 | # delete the bucket | |
2436 | conn.delete_bucket(bucket_name) | |
2437 | ||
20effc67 | 2438 | @attr('amqp_test') |
1e59de90 | 2439 | def test_ps_s3_metadata_filter_on_master(): |
20effc67 TL |
2440 | """ test s3 notification of metadata on master """ |
2441 | ||
2442 | hostname = get_ip() | |
2443 | conn = connection() | |
2444 | zonegroup = 'default' | |
2445 | ||
2446 | # create bucket | |
2447 | bucket_name = gen_bucket_name() | |
2448 | bucket = conn.create_bucket(bucket_name) | |
1e59de90 | 2449 | topic_name = bucket_name + TOPIC_SUFFIX |
20effc67 | 2450 | |
1e59de90 | 2451 | # start amqp receivers |
20effc67 TL |
2452 | exchange = 'ex1' |
2453 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
2454 | task.start() | |
2455 | ||
2456 | # create s3 topic | |
2457 | endpoint_address = 'amqp://' + hostname | |
2458 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' | |
2459 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
2460 | topic_arn = topic_conf.set_config() | |
2461 | # create s3 notification | |
2462 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2463 | meta_key = 'meta1' | |
2464 | meta_value = 'This is my metadata value' | |
1e59de90 | 2465 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, |
20effc67 TL |
2466 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], |
2467 | 'Filter': { | |
2468 | 'Metadata': { | |
1e59de90 | 2469 | 'FilterRules': [{'Name': META_PREFIX+meta_key, 'Value': meta_value}] |
20effc67 TL |
2470 | } |
2471 | } | |
2472 | }] | |
2473 | ||
2474 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
1e59de90 | 2475 | _, status = s3_notification_conf.set_config() |
20effc67 TL |
2476 | assert_equal(status/100, 2) |
2477 | ||
2478 | expected_keys = [] | |
2479 | # create objects in the bucket | |
2480 | key_name = 'foo' | |
2481 | key = bucket.new_key(key_name) | |
2482 | key.set_metadata(meta_key, meta_value) | |
2483 | key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') | |
2484 | expected_keys.append(key_name) | |
2485 | ||
2486 | # create objects in the bucket using COPY | |
2487 | key_name = 'copy_of_foo' | |
2488 | bucket.copy_key(key_name, bucket.name, key.name) | |
2489 | expected_keys.append(key_name) | |
2490 | ||
2491 | # create another objects in the bucket using COPY | |
2492 | # but override the metadata value | |
2493 | key_name = 'another_copy_of_foo' | |
2494 | bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'}) | |
1e59de90 | 2495 | # this key is not in the expected keys due to the different meta value |
20effc67 TL |
2496 | |
2497 | # create objects in the bucket using multi-part upload | |
2498 | fp = tempfile.NamedTemporaryFile(mode='w+b') | |
2499 | chunk_size = 1024*1024*5 # 5MB | |
2500 | object_size = 10*chunk_size | |
2501 | content = bytearray(os.urandom(object_size)) | |
2502 | fp.write(content) | |
2503 | fp.flush() | |
2504 | fp.seek(0) | |
2505 | key_name = 'multipart_foo' | |
2506 | uploader = bucket.initiate_multipart_upload(key_name, | |
2507 | metadata={meta_key: meta_value}) | |
2508 | for i in range(1,5): | |
2509 | uploader.upload_part_from_file(fp, i, size=chunk_size) | |
2510 | fp.seek(i*chunk_size) | |
2511 | uploader.complete_upload() | |
2512 | fp.close() | |
2513 | expected_keys.append(key_name) | |
2514 | ||
2515 | print('wait for 5sec for the messages...') | |
2516 | time.sleep(5) | |
2517 | # check amqp receiver | |
2518 | events = receiver.get_and_reset_events() | |
1e59de90 | 2519 | assert_equal(len(events), len(expected_keys)) |
20effc67 TL |
2520 | for event in events: |
2521 | assert(event['Records'][0]['s3']['object']['key'] in expected_keys) | |
2522 | ||
2523 | # delete objects | |
2524 | for key in bucket.list(): | |
2525 | key.delete() | |
2526 | print('wait for 5sec for the messages...') | |
2527 | time.sleep(5) | |
2528 | # check amqp receiver | |
1e59de90 TL |
2529 | events = receiver.get_and_reset_events() |
2530 | assert_equal(len(events), len(expected_keys)) | |
2531 | for event in events: | |
2532 | assert(event['Records'][0]['s3']['object']['key'] in expected_keys) | |
2533 | ||
2534 | # cleanup | |
2535 | stop_amqp_receiver(receiver, task) | |
2536 | s3_notification_conf.del_config() | |
2537 | topic_conf.del_config() | |
2538 | # delete the bucket | |
2539 | conn.delete_bucket(bucket_name) | |
2540 | ||
2541 | ||
2542 | @attr('amqp_test') | |
2543 | def test_ps_s3_metadata_on_master(): | |
2544 | """ test s3 notification of metadata on master """ | |
2545 | ||
2546 | hostname = get_ip() | |
2547 | conn = connection() | |
2548 | zonegroup = 'default' | |
2549 | ||
2550 | # create bucket | |
2551 | bucket_name = gen_bucket_name() | |
2552 | bucket = conn.create_bucket(bucket_name) | |
2553 | topic_name = bucket_name + TOPIC_SUFFIX | |
2554 | ||
2555 | # start amqp receivers | |
2556 | exchange = 'ex1' | |
2557 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
2558 | task.start() | |
2559 | ||
2560 | # create s3 topic | |
2561 | endpoint_address = 'amqp://' + hostname | |
2562 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' | |
2563 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
2564 | topic_arn = topic_conf.set_config() | |
2565 | # create s3 notification | |
2566 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2567 | meta_key = 'meta1' | |
2568 | meta_value = 'This is my metadata value' | |
aee94f69 | 2569 | meta_prefix = META_PREFIX |
1e59de90 TL |
2570 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, |
2571 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], | |
2572 | }] | |
2573 | ||
2574 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
2575 | _, status = s3_notification_conf.set_config() | |
2576 | assert_equal(status/100, 2) | |
2577 | ||
2578 | # create objects in the bucket | |
2579 | key_name = 'foo' | |
2580 | key = bucket.new_key(key_name) | |
2581 | key.set_metadata(meta_key, meta_value) | |
2582 | key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') | |
2583 | # update the object | |
2584 | another_meta_key = 'meta2' | |
2585 | key.set_metadata(another_meta_key, meta_value) | |
2586 | key.set_contents_from_string('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') | |
2587 | ||
2588 | # create objects in the bucket using COPY | |
2589 | key_name = 'copy_of_foo' | |
2590 | bucket.copy_key(key_name, bucket.name, key.name) | |
2591 | ||
2592 | # create objects in the bucket using multi-part upload | |
2593 | fp = tempfile.NamedTemporaryFile(mode='w+b') | |
2594 | chunk_size = 1024*1024*5 # 5MB | |
2595 | object_size = 10*chunk_size | |
2596 | content = bytearray(os.urandom(object_size)) | |
2597 | fp.write(content) | |
2598 | fp.flush() | |
2599 | fp.seek(0) | |
2600 | key_name = 'multipart_foo' | |
2601 | uploader = bucket.initiate_multipart_upload(key_name, | |
2602 | metadata={meta_key: meta_value}) | |
2603 | for i in range(1,5): | |
2604 | uploader.upload_part_from_file(fp, i, size=chunk_size) | |
2605 | fp.seek(i*chunk_size) | |
2606 | uploader.complete_upload() | |
2607 | fp.close() | |
2608 | ||
2609 | print('wait for 5sec for the messages...') | |
2610 | time.sleep(5) | |
2611 | # check amqp receiver | |
2612 | events = receiver.get_and_reset_events() | |
2613 | for event in events: | |
2614 | value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key] | |
2615 | assert_equal(value[0], meta_value) | |
2616 | ||
2617 | # delete objects | |
2618 | for key in bucket.list(): | |
2619 | key.delete() | |
2620 | print('wait for 5sec for the messages...') | |
2621 | time.sleep(5) | |
2622 | # check amqp receiver | |
2623 | events = receiver.get_and_reset_events() | |
2624 | for event in events: | |
2625 | value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key] | |
2626 | assert_equal(value[0], meta_value) | |
20effc67 TL |
2627 | |
2628 | # cleanup | |
2629 | stop_amqp_receiver(receiver, task) | |
2630 | s3_notification_conf.del_config() | |
2631 | topic_conf.del_config() | |
2632 | # delete the bucket | |
2633 | conn.delete_bucket(bucket_name) | |
2634 | ||
2635 | ||
2636 | @attr('amqp_test') | |
2637 | def test_ps_s3_tags_on_master(): | |
2638 | """ test s3 notification of tags on master """ | |
2639 | ||
2640 | hostname = get_ip() | |
2641 | conn = connection() | |
2642 | zonegroup = 'default' | |
2643 | ||
2644 | # create bucket | |
2645 | bucket_name = gen_bucket_name() | |
2646 | bucket = conn.create_bucket(bucket_name) | |
2647 | topic_name = bucket_name + TOPIC_SUFFIX | |
2648 | ||
2649 | # start amqp receiver | |
2650 | exchange = 'ex1' | |
2651 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
2652 | task.start() | |
2653 | ||
2654 | # create s3 topic | |
2655 | endpoint_address = 'amqp://' + hostname | |
2656 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' | |
2657 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
2658 | topic_arn = topic_conf.set_config() | |
2659 | # create s3 notification | |
2660 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2661 | topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, | |
2662 | 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], | |
2663 | 'Filter': { | |
2664 | 'Tags': { | |
2665 | 'FilterRules': [{'Name': 'hello', 'Value': 'world'}, {'Name': 'ka', 'Value': 'boom'}] | |
2666 | } | |
2667 | } | |
2668 | }] | |
2669 | ||
2670 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
2671 | response, status = s3_notification_conf.set_config() | |
2672 | assert_equal(status/100, 2) | |
2673 | ||
2674 | expected_keys = [] | |
2675 | # create objects in the bucket with tags | |
2676 | # key 1 has all the tags in the filter | |
2677 | tags = 'hello=world&ka=boom&hello=helloworld' | |
2678 | key_name1 = 'key1' | |
2679 | put_object_tagging(conn, bucket_name, key_name1, tags) | |
2680 | expected_keys.append(key_name1) | |
2681 | # key 2 has an additional tag not in the filter | |
2682 | tags = 'hello=world&foo=bar&ka=boom&hello=helloworld' | |
2683 | key_name = 'key2' | |
2684 | put_object_tagging(conn, bucket_name, key_name, tags) | |
2685 | expected_keys.append(key_name) | |
2686 | # key 3 has no tags | |
2687 | key_name3 = 'key3' | |
2688 | key = bucket.new_key(key_name3) | |
2689 | key.set_contents_from_string('bar') | |
2690 | # key 4 has the wrong of the multi value tags | |
2691 | tags = 'hello=helloworld&ka=boom' | |
2692 | key_name = 'key4' | |
2693 | put_object_tagging(conn, bucket_name, key_name, tags) | |
2694 | # key 5 has the right of the multi value tags | |
2695 | tags = 'hello=world&ka=boom' | |
2696 | key_name = 'key5' | |
2697 | put_object_tagging(conn, bucket_name, key_name, tags) | |
2698 | expected_keys.append(key_name) | |
2699 | # key 6 is missing a tag | |
2700 | tags = 'hello=world' | |
2701 | key_name = 'key6' | |
2702 | put_object_tagging(conn, bucket_name, key_name, tags) | |
2703 | # create objects in the bucket using COPY | |
2704 | key_name = 'copy_of_'+key_name1 | |
2705 | bucket.copy_key(key_name, bucket.name, key_name1) | |
2706 | expected_keys.append(key_name) | |
2707 | ||
2708 | print('wait for 5sec for the messages...') | |
2709 | time.sleep(5) | |
2710 | event_count = 0 | |
2711 | expected_tags1 = [{'key': 'hello', 'val': 'world'}, {'key': 'hello', 'val': 'helloworld'}, {'key': 'ka', 'val': 'boom'}] | |
2712 | expected_tags1 = sorted(expected_tags1, key=lambda k: k['key']+k['val']) | |
2713 | for event in receiver.get_and_reset_events(): | |
2714 | key = event['Records'][0]['s3']['object']['key'] | |
2715 | if (key == key_name1): | |
2716 | obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val']) | |
2717 | assert_equal(obj_tags, expected_tags1) | |
2718 | event_count += 1 | |
2719 | assert(key in expected_keys) | |
2720 | ||
2721 | assert_equal(event_count, len(expected_keys)) | |
2722 | ||
2723 | # delete the objects | |
2724 | for key in bucket.list(): | |
2725 | key.delete() | |
2726 | print('wait for 5sec for the messages...') | |
2727 | time.sleep(5) | |
2728 | event_count = 0 | |
2729 | # check amqp receiver | |
2730 | for event in receiver.get_and_reset_events(): | |
2731 | key = event['Records'][0]['s3']['object']['key'] | |
2732 | if (key == key_name1): | |
2733 | obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val']) | |
2734 | assert_equal(obj_tags, expected_tags1) | |
2735 | event_count += 1 | |
2736 | assert(key in expected_keys) | |
2737 | ||
2738 | assert(event_count == len(expected_keys)) | |
2739 | ||
2740 | # cleanup | |
2741 | stop_amqp_receiver(receiver, task) | |
2742 | s3_notification_conf.del_config() | |
2743 | topic_conf.del_config() | |
2744 | # delete the bucket | |
2745 | conn.delete_bucket(bucket_name) | |
2746 | ||
2747 | @attr('amqp_test') | |
2748 | def test_ps_s3_versioning_on_master(): | |
2749 | """ test s3 notification of object versions """ | |
2750 | ||
2751 | hostname = get_ip() | |
2752 | conn = connection() | |
2753 | zonegroup = 'default' | |
2754 | ||
2755 | # create bucket | |
2756 | bucket_name = gen_bucket_name() | |
2757 | bucket = conn.create_bucket(bucket_name) | |
2758 | bucket.configure_versioning(True) | |
2759 | topic_name = bucket_name + TOPIC_SUFFIX | |
2760 | ||
2761 | # start amqp receiver | |
2762 | exchange = 'ex1' | |
2763 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
2764 | task.start() | |
2765 | ||
2766 | # create s3 topic | |
2767 | endpoint_address = 'amqp://' + hostname | |
2768 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
2769 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
2770 | topic_arn = topic_conf.set_config() | |
2771 | # create notification | |
2772 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2773 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, | |
2774 | 'Events': [] | |
2775 | }] | |
2776 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
2777 | _, status = s3_notification_conf.set_config() | |
2778 | assert_equal(status/100, 2) | |
2779 | ||
2780 | # create objects in the bucket | |
2781 | key_name = 'foo' | |
2782 | key = bucket.new_key(key_name) | |
2783 | key.set_contents_from_string('hello') | |
2784 | ver1 = key.version_id | |
2785 | key.set_contents_from_string('world') | |
2786 | ver2 = key.version_id | |
2787 | copy_of_key = bucket.copy_key('copy_of_foo', bucket.name, key_name, src_version_id=ver1) | |
2788 | ver3 = copy_of_key.version_id | |
2789 | versions = [ver1, ver2, ver3] | |
2790 | ||
2791 | print('wait for 5sec for the messages...') | |
2792 | time.sleep(5) | |
2793 | ||
2794 | # check amqp receiver | |
2795 | events = receiver.get_and_reset_events() | |
2796 | num_of_versions = 0 | |
2797 | for event_list in events: | |
2798 | for event in event_list['Records']: | |
2799 | assert event['s3']['object']['key'] in (key_name, copy_of_key.name) | |
2800 | version = event['s3']['object']['versionId'] | |
2801 | num_of_versions += 1 | |
2802 | if version not in versions: | |
2803 | print('version mismatch: '+version+' not in: '+str(versions)) | |
2804 | # TODO: copy_key() does not return the version of the copied object | |
2805 | #assert False | |
2806 | else: | |
2807 | print('version ok: '+version+' in: '+str(versions)) | |
2808 | ||
2809 | assert_equal(num_of_versions, 3) | |
2810 | ||
2811 | # cleanup | |
2812 | stop_amqp_receiver(receiver, task) | |
2813 | s3_notification_conf.del_config() | |
2814 | topic_conf.del_config() | |
2815 | # delete the bucket | |
2816 | bucket.delete_key(copy_of_key, version_id=ver3) | |
2817 | bucket.delete_key(key.name, version_id=ver2) | |
2818 | bucket.delete_key(key.name, version_id=ver1) | |
2819 | #conn.delete_bucket(bucket_name) | |
2820 | ||
2821 | ||
2822 | @attr('amqp_test') | |
2823 | def test_ps_s3_versioned_deletion_on_master(): | |
2824 | """ test s3 notification of deletion markers on master """ | |
2825 | ||
2826 | hostname = get_ip() | |
2827 | conn = connection() | |
2828 | zonegroup = 'default' | |
2829 | ||
2830 | # create bucket | |
2831 | bucket_name = gen_bucket_name() | |
2832 | bucket = conn.create_bucket(bucket_name) | |
2833 | bucket.configure_versioning(True) | |
2834 | topic_name = bucket_name + TOPIC_SUFFIX | |
2835 | ||
2836 | # start amqp receiver | |
2837 | exchange = 'ex1' | |
2838 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
2839 | task.start() | |
2840 | ||
2841 | # create s3 topic | |
2842 | endpoint_address = 'amqp://' + hostname | |
2843 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' | |
2844 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
2845 | topic_arn = topic_conf.set_config() | |
2846 | # create s3 notification | |
2847 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2848 | topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, | |
2849 | 'Events': ['s3:ObjectRemoved:*'] | |
2850 | }, | |
2851 | {'Id': notification_name+'_2', 'TopicArn': topic_arn, | |
2852 | 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated'] | |
2853 | }, | |
2854 | {'Id': notification_name+'_3', 'TopicArn': topic_arn, | |
2855 | 'Events': ['s3:ObjectRemoved:Delete'] | |
2856 | }] | |
2857 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
2858 | response, status = s3_notification_conf.set_config() | |
2859 | assert_equal(status/100, 2) | |
2860 | ||
2861 | # create objects in the bucket | |
2862 | key = bucket.new_key('foo') | |
2863 | content = str(os.urandom(512)) | |
2864 | size1 = len(content) | |
2865 | key.set_contents_from_string(content) | |
2866 | ver1 = key.version_id | |
2867 | content = str(os.urandom(511)) | |
2868 | size2 = len(content) | |
2869 | key.set_contents_from_string(content) | |
2870 | ver2 = key.version_id | |
2871 | # create delete marker (non versioned deletion) | |
2872 | delete_marker_key = bucket.delete_key(key.name) | |
2873 | versions = [ver1, ver2, delete_marker_key.version_id] | |
2874 | ||
2875 | time.sleep(1) | |
2876 | ||
2877 | # versioned deletion | |
2878 | bucket.delete_key(key.name, version_id=ver2) | |
2879 | bucket.delete_key(key.name, version_id=ver1) | |
2880 | ||
2881 | print('wait for 5sec for the messages...') | |
2882 | time.sleep(5) | |
2883 | ||
2884 | # check amqp receiver | |
2885 | events = receiver.get_and_reset_events() | |
2886 | delete_events = 0 | |
2887 | delete_marker_create_events = 0 | |
2888 | for event_list in events: | |
2889 | for event in event_list['Records']: | |
2890 | version = event['s3']['object']['versionId'] | |
2891 | size = event['s3']['object']['size'] | |
2892 | if version not in versions: | |
2893 | print('version mismatch: '+version+' not in: '+str(versions)) | |
2894 | assert False | |
2895 | else: | |
2896 | print('version ok: '+version+' in: '+str(versions)) | |
2897 | if event['eventName'] == 'ObjectRemoved:Delete': | |
2898 | delete_events += 1 | |
2899 | assert size in [size1, size2] | |
2900 | assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3'] | |
2901 | if event['eventName'] == 'ObjectRemoved:DeleteMarkerCreated': | |
2902 | delete_marker_create_events += 1 | |
2903 | assert size == size2 | |
2904 | assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2'] | |
2905 | ||
2906 | # 2 key versions were deleted | |
2907 | # notified over the same topic via 2 notifications (1,3) | |
2908 | assert_equal(delete_events, 2*2) | |
2909 | # 1 deletion marker was created | |
2910 | # notified over the same topic over 2 notifications (1,2) | |
2911 | assert_equal(delete_marker_create_events, 1*2) | |
2912 | ||
2913 | # cleanup | |
2914 | delete_marker_key.delete() | |
2915 | stop_amqp_receiver(receiver, task) | |
2916 | s3_notification_conf.del_config() | |
2917 | topic_conf.del_config() | |
2918 | # delete the bucket | |
2919 | conn.delete_bucket(bucket_name) | |
2920 | ||
2921 | ||
2922 | @attr('manual_test') | |
2923 | def test_ps_s3_persistent_cleanup(): | |
2924 | """ test reservation cleanup after gateway crash """ | |
2925 | return SkipTest("only used in manual testing") | |
2926 | conn = connection() | |
2927 | zonegroup = 'default' | |
2928 | ||
2929 | # create random port for the http server | |
2930 | host = get_ip() | |
2931 | port = random.randint(10000, 20000) | |
2932 | # start an http server in a separate thread | |
2933 | number_of_objects = 200 | |
2934 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
2935 | ||
2936 | gw = conn | |
2937 | ||
2938 | # create bucket | |
2939 | bucket_name = gen_bucket_name() | |
2940 | bucket = gw.create_bucket(bucket_name) | |
2941 | topic_name = bucket_name + TOPIC_SUFFIX | |
2942 | ||
2943 | # create s3 topic | |
2944 | endpoint_address = 'http://'+host+':'+str(port) | |
2945 | endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' | |
2946 | topic_conf = PSTopicS3(gw, topic_name, zonegroup, endpoint_args=endpoint_args) | |
2947 | topic_arn = topic_conf.set_config() | |
2948 | ||
2949 | # create s3 notification | |
2950 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
2951 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, | |
2952 | 'Events': ['s3:ObjectCreated:Put'] | |
2953 | }] | |
2954 | s3_notification_conf = PSNotificationS3(gw, bucket_name, topic_conf_list) | |
2955 | response, status = s3_notification_conf.set_config() | |
2956 | assert_equal(status/100, 2) | |
2957 | ||
2958 | client_threads = [] | |
2959 | start_time = time.time() | |
2960 | for i in range(number_of_objects): | |
2961 | key = bucket.new_key(str(i)) | |
2962 | content = str(os.urandom(1024*1024)) | |
2963 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
2964 | thr.start() | |
2965 | client_threads.append(thr) | |
2966 | # stop gateway while clients are sending | |
2967 | os.system("killall -9 radosgw"); | |
2968 | print('wait for 10 sec for before restarting the gateway') | |
2969 | time.sleep(10) | |
2970 | # TODO: start the radosgw | |
2971 | [thr.join() for thr in client_threads] | |
2972 | ||
2973 | keys = list(bucket.list()) | |
2974 | ||
2975 | # delete objects from the bucket | |
2976 | client_threads = [] | |
2977 | start_time = time.time() | |
2978 | for key in bucket.list(): | |
2979 | thr = threading.Thread(target = key.delete, args=()) | |
2980 | thr.start() | |
2981 | client_threads.append(thr) | |
2982 | [thr.join() for thr in client_threads] | |
2983 | ||
2984 | # check http receiver | |
2985 | events = http_server.get_and_reset_events() | |
2986 | ||
2987 | print(str(len(events) ) + " events found out of " + str(number_of_objects)) | |
2988 | ||
2989 | # make sure that things are working now | |
2990 | client_threads = [] | |
2991 | start_time = time.time() | |
2992 | for i in range(number_of_objects): | |
2993 | key = bucket.new_key(str(i)) | |
2994 | content = str(os.urandom(1024*1024)) | |
2995 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
2996 | thr.start() | |
2997 | client_threads.append(thr) | |
2998 | [thr.join() for thr in client_threads] | |
2999 | ||
3000 | keys = list(bucket.list()) | |
3001 | ||
3002 | # delete objects from the bucket | |
3003 | client_threads = [] | |
3004 | start_time = time.time() | |
3005 | for key in bucket.list(): | |
3006 | thr = threading.Thread(target = key.delete, args=()) | |
3007 | thr.start() | |
3008 | client_threads.append(thr) | |
3009 | [thr.join() for thr in client_threads] | |
3010 | ||
3011 | print('wait for 180 sec for reservations to be stale before queue deletion') | |
3012 | time.sleep(180) | |
3013 | ||
3014 | # check http receiver | |
3015 | events = http_server.get_and_reset_events() | |
3016 | ||
3017 | print(str(len(events)) + " events found out of " + str(number_of_objects)) | |
3018 | ||
3019 | # cleanup | |
3020 | s3_notification_conf.del_config() | |
3021 | topic_conf.del_config() | |
3022 | gw.delete_bucket(bucket_name) | |
3023 | http_server.close() | |
3024 | ||
3025 | ||
3026 | @attr('manual_test') | |
3027 | def test_ps_s3_persistent_notification_pushback(): | |
3028 | """ test pushing persistent notification pushback """ | |
3029 | return SkipTest("only used in manual testing") | |
3030 | conn = connection() | |
3031 | zonegroup = 'default' | |
3032 | ||
3033 | # create random port for the http server | |
3034 | host = get_ip() | |
3035 | port = random.randint(10000, 20000) | |
3036 | # start an http server in a separate thread | |
3037 | http_server = StreamingHTTPServer(host, port, num_workers=10, delay=0.5) | |
3038 | ||
3039 | # create bucket | |
3040 | bucket_name = gen_bucket_name() | |
3041 | bucket = conn.create_bucket(bucket_name) | |
3042 | topic_name = bucket_name + TOPIC_SUFFIX | |
3043 | ||
3044 | # create s3 topic | |
3045 | endpoint_address = 'http://'+host+':'+str(port) | |
3046 | endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' | |
3047 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
3048 | topic_arn = topic_conf.set_config() | |
3049 | # create s3 notification | |
3050 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3051 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, | |
3052 | 'Events': [] | |
3053 | }] | |
3054 | ||
3055 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
3056 | response, status = s3_notification_conf.set_config() | |
3057 | assert_equal(status/100, 2) | |
3058 | ||
3059 | # create objects in the bucket (async) | |
3060 | for j in range(100): | |
3061 | number_of_objects = randint(500, 1000) | |
3062 | client_threads = [] | |
3063 | start_time = time.time() | |
3064 | for i in range(number_of_objects): | |
3065 | key = bucket.new_key(str(j)+'-'+str(i)) | |
3066 | content = str(os.urandom(1024*1024)) | |
3067 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3068 | thr.start() | |
3069 | client_threads.append(thr) | |
3070 | [thr.join() for thr in client_threads] | |
3071 | time_diff = time.time() - start_time | |
3072 | print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3073 | ||
3074 | keys = list(bucket.list()) | |
3075 | ||
3076 | delay = 30 | |
3077 | print('wait for '+str(delay)+'sec for the messages...') | |
3078 | time.sleep(delay) | |
3079 | ||
3080 | # delete objects from the bucket | |
3081 | client_threads = [] | |
3082 | start_time = time.time() | |
3083 | count = 0 | |
3084 | for key in bucket.list(): | |
3085 | count += 1 | |
3086 | thr = threading.Thread(target = key.delete, args=()) | |
3087 | thr.start() | |
3088 | client_threads.append(thr) | |
3089 | if count%100 == 0: | |
3090 | [thr.join() for thr in client_threads] | |
3091 | time_diff = time.time() - start_time | |
3092 | print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3093 | client_threads = [] | |
3094 | start_time = time.time() | |
3095 | ||
3096 | print('wait for '+str(delay)+'sec for the messages...') | |
3097 | time.sleep(delay) | |
3098 | ||
3099 | # cleanup | |
3100 | s3_notification_conf.del_config() | |
3101 | topic_conf.del_config() | |
3102 | # delete the bucket | |
3103 | conn.delete_bucket(bucket_name) | |
3104 | time.sleep(delay) | |
3105 | http_server.close() | |
3106 | ||
3107 | ||
1e59de90 | 3108 | @attr('kafka_test') |
20effc67 TL |
3109 | def test_ps_s3_notification_kafka_idle_behaviour(): |
3110 | """ test pushing kafka s3 notification idle behaviour check """ | |
3111 | # TODO convert this test to actual running test by changing | |
3112 | # os.system call to verify the process idleness | |
20effc67 TL |
3113 | conn = connection() |
3114 | zonegroup = 'default' | |
3115 | ||
3116 | # create bucket | |
3117 | bucket_name = gen_bucket_name() | |
3118 | bucket = conn.create_bucket(bucket_name) | |
3119 | # name is constant for manual testing | |
3120 | topic_name = bucket_name+'_topic' | |
3121 | # create consumer on the topic | |
3122 | ||
3123 | task, receiver = create_kafka_receiver_thread(topic_name+'_1') | |
3124 | task.start() | |
3125 | ||
3126 | # create s3 topic | |
3127 | endpoint_address = 'kafka://' + kafka_server | |
3128 | # with acks from broker | |
3129 | endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' | |
3130 | topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) | |
3131 | topic_arn1 = topic_conf1.set_config() | |
3132 | # create s3 notification | |
3133 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3134 | topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, | |
3135 | 'Events': [] | |
3136 | }] | |
3137 | ||
3138 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
3139 | response, status = s3_notification_conf.set_config() | |
3140 | assert_equal(status/100, 2) | |
3141 | ||
3142 | # create objects in the bucket (async) | |
3143 | number_of_objects = 10 | |
3144 | client_threads = [] | |
3145 | etags = [] | |
3146 | start_time = time.time() | |
3147 | for i in range(number_of_objects): | |
3148 | key = bucket.new_key(str(i)) | |
3149 | content = str(os.urandom(1024*1024)) | |
3150 | etag = hashlib.md5(content.encode()).hexdigest() | |
3151 | etags.append(etag) | |
3152 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3153 | thr.start() | |
3154 | client_threads.append(thr) | |
3155 | [thr.join() for thr in client_threads] | |
3156 | ||
3157 | time_diff = time.time() - start_time | |
3158 | print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3159 | ||
3160 | print('wait for 5sec for the messages...') | |
3161 | time.sleep(5) | |
3162 | keys = list(bucket.list()) | |
3163 | receiver.verify_s3_events(keys, exact_match=True, etags=etags) | |
3164 | ||
3165 | # delete objects from the bucket | |
3166 | client_threads = [] | |
3167 | start_time = time.time() | |
3168 | for key in bucket.list(): | |
3169 | thr = threading.Thread(target = key.delete, args=()) | |
3170 | thr.start() | |
3171 | client_threads.append(thr) | |
3172 | [thr.join() for thr in client_threads] | |
3173 | ||
3174 | time_diff = time.time() - start_time | |
3175 | print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3176 | ||
3177 | print('wait for 5sec for the messages...') | |
3178 | time.sleep(5) | |
3179 | receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags) | |
3180 | ||
1e59de90 | 3181 | is_idle = False |
20effc67 | 3182 | |
1e59de90 TL |
3183 | while not is_idle: |
3184 | print('waiting for 10sec for checking idleness') | |
3185 | time.sleep(10) | |
3186 | cmd = "netstat -nnp | grep 9092 | grep radosgw" | |
3187 | proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) | |
3188 | out = proc.communicate()[0] | |
3189 | if len(out) == 0: | |
3190 | is_idle = True | |
3191 | else: | |
3192 | print("radosgw<->kafka connection is not idle") | |
3193 | print(out.decode('utf-8')) | |
20effc67 TL |
3194 | |
3195 | # do the process of uploading an object and checking for notification again | |
3196 | number_of_objects = 10 | |
3197 | client_threads = [] | |
3198 | etags = [] | |
3199 | start_time = time.time() | |
3200 | for i in range(number_of_objects): | |
3201 | key = bucket.new_key(str(i)) | |
3202 | content = str(os.urandom(1024*1024)) | |
3203 | etag = hashlib.md5(content.encode()).hexdigest() | |
3204 | etags.append(etag) | |
3205 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3206 | thr.start() | |
3207 | client_threads.append(thr) | |
3208 | [thr.join() for thr in client_threads] | |
3209 | ||
3210 | time_diff = time.time() - start_time | |
3211 | print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3212 | ||
3213 | print('wait for 5sec for the messages...') | |
3214 | time.sleep(5) | |
3215 | keys = list(bucket.list()) | |
3216 | receiver.verify_s3_events(keys, exact_match=True, etags=etags) | |
3217 | ||
3218 | # delete objects from the bucket | |
3219 | client_threads = [] | |
3220 | start_time = time.time() | |
3221 | for key in bucket.list(): | |
3222 | thr = threading.Thread(target = key.delete, args=()) | |
3223 | thr.start() | |
3224 | client_threads.append(thr) | |
3225 | [thr.join() for thr in client_threads] | |
3226 | ||
3227 | time_diff = time.time() - start_time | |
3228 | print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3229 | ||
3230 | print('wait for 5sec for the messages...') | |
3231 | time.sleep(5) | |
3232 | receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags) | |
3233 | ||
3234 | # cleanup | |
3235 | s3_notification_conf.del_config() | |
3236 | topic_conf1.del_config() | |
3237 | # delete the bucket | |
3238 | conn.delete_bucket(bucket_name) | |
3239 | stop_kafka_receiver(receiver, task) | |
3240 | ||
3241 | ||
3242 | @attr('modification_required') | |
3243 | def test_ps_s3_persistent_gateways_recovery(): | |
3244 | """ test gateway recovery of persistent notifications """ | |
3245 | return SkipTest('This test requires two gateways.') | |
3246 | ||
3247 | conn = connection() | |
3248 | zonegroup = 'default' | |
3249 | # create random port for the http server | |
3250 | host = get_ip() | |
3251 | port = random.randint(10000, 20000) | |
3252 | # start an http server in a separate thread | |
3253 | number_of_objects = 10 | |
3254 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
3255 | gw1 = conn | |
3256 | gw2 = connection2() | |
3257 | # create bucket | |
3258 | bucket_name = gen_bucket_name() | |
3259 | bucket = gw1.create_bucket(bucket_name) | |
3260 | topic_name = bucket_name + TOPIC_SUFFIX | |
3261 | # create two s3 topics | |
3262 | endpoint_address = 'http://'+host+':'+str(port) | |
3263 | endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' | |
3264 | topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw1') | |
3265 | topic_arn1 = topic_conf1.set_config() | |
3266 | topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw2') | |
3267 | topic_arn2 = topic_conf2.set_config() | |
3268 | # create two s3 notifications | |
3269 | notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' | |
3270 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, | |
3271 | 'Events': ['s3:ObjectCreated:Put'] | |
3272 | }] | |
3273 | s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list) | |
3274 | response, status = s3_notification_conf1.set_config() | |
3275 | assert_equal(status/100, 2) | |
3276 | notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' | |
3277 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, | |
3278 | 'Events': ['s3:ObjectRemoved:Delete'] | |
3279 | }] | |
3280 | s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list) | |
3281 | response, status = s3_notification_conf2.set_config() | |
3282 | assert_equal(status/100, 2) | |
3283 | # stop gateway 2 | |
3284 | print('stopping gateway2...') | |
3285 | client_threads = [] | |
3286 | start_time = time.time() | |
3287 | for i in range(number_of_objects): | |
3288 | key = bucket.new_key(str(i)) | |
3289 | content = str(os.urandom(1024*1024)) | |
3290 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3291 | thr.start() | |
3292 | client_threads.append(thr) | |
3293 | [thr.join() for thr in client_threads] | |
3294 | keys = list(bucket.list()) | |
3295 | # delete objects from the bucket | |
3296 | client_threads = [] | |
3297 | start_time = time.time() | |
3298 | for key in bucket.list(): | |
3299 | thr = threading.Thread(target = key.delete, args=()) | |
3300 | thr.start() | |
3301 | client_threads.append(thr) | |
3302 | [thr.join() for thr in client_threads] | |
3303 | print('wait for 60 sec for before restarting the gateway') | |
3304 | time.sleep(60) | |
3305 | # check http receiver | |
3306 | events = http_server.get_and_reset_events() | |
3307 | for key in keys: | |
3308 | creations = 0 | |
3309 | deletions = 0 | |
3310 | for event in events: | |
3311 | if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \ | |
3312 | key.name == event['Records'][0]['s3']['object']['key']: | |
3313 | creations += 1 | |
3314 | elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \ | |
3315 | key.name == event['Records'][0]['s3']['object']['key']: | |
3316 | deletions += 1 | |
3317 | assert_equal(creations, 1) | |
3318 | assert_equal(deletions, 1) | |
3319 | # cleanup | |
3320 | s3_notification_conf1.del_config() | |
3321 | topic_conf1.del_config() | |
3322 | gw1.delete_bucket(bucket_name) | |
3323 | time.sleep(10) | |
3324 | s3_notification_conf2.del_config() | |
3325 | topic_conf2.del_config() | |
3326 | http_server.close() | |
3327 | ||
3328 | ||
3329 | @attr('modification_required') | |
3330 | def test_ps_s3_persistent_multiple_gateways(): | |
3331 | """ test pushing persistent notification via two gateways """ | |
3332 | return SkipTest('This test requires two gateways.') | |
3333 | ||
3334 | conn = connection() | |
3335 | zonegroup = 'default' | |
3336 | # create random port for the http server | |
3337 | host = get_ip() | |
3338 | port = random.randint(10000, 20000) | |
3339 | # start an http server in a separate thread | |
3340 | number_of_objects = 10 | |
3341 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
3342 | gw1 = conn | |
3343 | gw2 = connection2() | |
3344 | # create bucket | |
3345 | bucket_name = gen_bucket_name() | |
3346 | bucket1 = gw1.create_bucket(bucket_name) | |
3347 | bucket2 = gw2.get_bucket(bucket_name) | |
3348 | topic_name = bucket_name + TOPIC_SUFFIX | |
3349 | # create two s3 topics | |
3350 | endpoint_address = 'http://'+host+':'+str(port) | |
3351 | endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' | |
3352 | topic1_opaque = 'fromgw1' | |
3353 | topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic1_opaque) | |
3354 | topic_arn1 = topic_conf1.set_config() | |
3355 | topic2_opaque = 'fromgw2' | |
3356 | topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic2_opaque) | |
3357 | topic_arn2 = topic_conf2.set_config() | |
3358 | # create two s3 notifications | |
3359 | notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' | |
3360 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, | |
3361 | 'Events': [] | |
3362 | }] | |
3363 | s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list) | |
3364 | response, status = s3_notification_conf1.set_config() | |
3365 | assert_equal(status/100, 2) | |
3366 | notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' | |
3367 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, | |
3368 | 'Events': [] | |
3369 | }] | |
3370 | s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list) | |
3371 | response, status = s3_notification_conf2.set_config() | |
3372 | assert_equal(status/100, 2) | |
3373 | client_threads = [] | |
3374 | start_time = time.time() | |
3375 | for i in range(number_of_objects): | |
3376 | key = bucket1.new_key('gw1_'+str(i)) | |
3377 | content = str(os.urandom(1024*1024)) | |
3378 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3379 | thr.start() | |
3380 | client_threads.append(thr) | |
3381 | key = bucket2.new_key('gw2_'+str(i)) | |
3382 | content = str(os.urandom(1024*1024)) | |
3383 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3384 | thr.start() | |
3385 | client_threads.append(thr) | |
3386 | [thr.join() for thr in client_threads] | |
3387 | keys = list(bucket1.list()) | |
3388 | delay = 30 | |
3389 | print('wait for '+str(delay)+'sec for the messages...') | |
3390 | time.sleep(delay) | |
3391 | events = http_server.get_and_reset_events() | |
3392 | for key in keys: | |
3393 | topic1_count = 0 | |
3394 | topic2_count = 0 | |
3395 | for event in events: | |
3396 | if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \ | |
3397 | key.name == event['Records'][0]['s3']['object']['key'] and \ | |
3398 | topic1_opaque == event['Records'][0]['opaqueData']: | |
3399 | topic1_count += 1 | |
3400 | elif event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \ | |
3401 | key.name == event['Records'][0]['s3']['object']['key'] and \ | |
3402 | topic2_opaque == event['Records'][0]['opaqueData']: | |
3403 | topic2_count += 1 | |
3404 | assert_equal(topic1_count, 1) | |
3405 | assert_equal(topic2_count, 1) | |
3406 | # delete objects from the bucket | |
3407 | client_threads = [] | |
3408 | start_time = time.time() | |
3409 | for key in bucket1.list(): | |
3410 | thr = threading.Thread(target = key.delete, args=()) | |
3411 | thr.start() | |
3412 | client_threads.append(thr) | |
3413 | [thr.join() for thr in client_threads] | |
3414 | print('wait for '+str(delay)+'sec for the messages...') | |
3415 | time.sleep(delay) | |
3416 | events = http_server.get_and_reset_events() | |
3417 | for key in keys: | |
3418 | topic1_count = 0 | |
3419 | topic2_count = 0 | |
3420 | for event in events: | |
3421 | if event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \ | |
3422 | key.name == event['Records'][0]['s3']['object']['key'] and \ | |
3423 | topic1_opaque == event['Records'][0]['opaqueData']: | |
3424 | topic1_count += 1 | |
3425 | elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \ | |
3426 | key.name == event['Records'][0]['s3']['object']['key'] and \ | |
3427 | topic2_opaque == event['Records'][0]['opaqueData']: | |
3428 | topic2_count += 1 | |
3429 | assert_equal(topic1_count, 1) | |
3430 | assert_equal(topic2_count, 1) | |
3431 | # cleanup | |
3432 | s3_notification_conf1.del_config() | |
3433 | topic_conf1.del_config() | |
3434 | s3_notification_conf2.del_config() | |
3435 | topic_conf2.del_config() | |
3436 | gw1.delete_bucket(bucket_name) | |
3437 | http_server.close() | |
3438 | ||
3439 | ||
3440 | @attr('http_test') | |
3441 | def test_ps_s3_persistent_multiple_endpoints(): | |
3442 | """ test pushing persistent notification when one of the endpoints has error """ | |
3443 | conn = connection() | |
3444 | zonegroup = 'default' | |
3445 | ||
3446 | # create random port for the http server | |
3447 | host = get_ip() | |
3448 | port = random.randint(10000, 20000) | |
3449 | # start an http server in a separate thread | |
3450 | number_of_objects = 10 | |
3451 | http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) | |
3452 | ||
3453 | # create bucket | |
3454 | bucket_name = gen_bucket_name() | |
3455 | bucket = conn.create_bucket(bucket_name) | |
3456 | topic_name = bucket_name + TOPIC_SUFFIX | |
3457 | ||
3458 | # create two s3 topics | |
3459 | endpoint_address = 'http://'+host+':'+str(port) | |
3460 | endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' | |
3461 | topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) | |
3462 | topic_arn1 = topic_conf1.set_config() | |
3463 | endpoint_address = 'http://kaboom:9999' | |
3464 | endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' | |
3465 | topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args) | |
3466 | topic_arn2 = topic_conf2.set_config() | |
3467 | ||
3468 | # create two s3 notifications | |
3469 | notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' | |
3470 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, | |
3471 | 'Events': [] | |
3472 | }] | |
3473 | s3_notification_conf1 = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
3474 | response, status = s3_notification_conf1.set_config() | |
3475 | assert_equal(status/100, 2) | |
3476 | notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' | |
3477 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, | |
3478 | 'Events': [] | |
3479 | }] | |
3480 | s3_notification_conf2 = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
3481 | response, status = s3_notification_conf2.set_config() | |
3482 | assert_equal(status/100, 2) | |
3483 | ||
3484 | client_threads = [] | |
3485 | start_time = time.time() | |
3486 | for i in range(number_of_objects): | |
3487 | key = bucket.new_key(str(i)) | |
3488 | content = str(os.urandom(1024*1024)) | |
3489 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3490 | thr.start() | |
3491 | client_threads.append(thr) | |
3492 | [thr.join() for thr in client_threads] | |
3493 | ||
3494 | keys = list(bucket.list()) | |
3495 | ||
3496 | delay = 30 | |
3497 | print('wait for '+str(delay)+'sec for the messages...') | |
3498 | time.sleep(delay) | |
3499 | ||
3500 | http_server.verify_s3_events(keys, exact_match=False, deletions=False) | |
3501 | ||
3502 | # delete objects from the bucket | |
3503 | client_threads = [] | |
3504 | start_time = time.time() | |
3505 | for key in bucket.list(): | |
3506 | thr = threading.Thread(target = key.delete, args=()) | |
3507 | thr.start() | |
3508 | client_threads.append(thr) | |
3509 | [thr.join() for thr in client_threads] | |
3510 | ||
3511 | print('wait for '+str(delay)+'sec for the messages...') | |
3512 | time.sleep(delay) | |
3513 | ||
3514 | http_server.verify_s3_events(keys, exact_match=False, deletions=True) | |
3515 | ||
3516 | # cleanup | |
3517 | s3_notification_conf1.del_config() | |
3518 | topic_conf1.del_config() | |
3519 | s3_notification_conf2.del_config() | |
3520 | topic_conf2.del_config() | |
3521 | conn.delete_bucket(bucket_name) | |
3522 | http_server.close() | |
3523 | ||
3524 | def persistent_notification(endpoint_type): | |
3525 | """ test pushing persistent notification """ | |
3526 | conn = connection() | |
3527 | zonegroup = 'default' | |
3528 | ||
3529 | # create bucket | |
3530 | bucket_name = gen_bucket_name() | |
3531 | bucket = conn.create_bucket(bucket_name) | |
3532 | topic_name = bucket_name + TOPIC_SUFFIX | |
3533 | ||
3534 | receiver = {} | |
3535 | host = get_ip() | |
3536 | if endpoint_type == 'http': | |
3537 | # create random port for the http server | |
3538 | host = get_ip_http() | |
3539 | port = random.randint(10000, 20000) | |
3540 | # start an http server in a separate thread | |
3541 | receiver = StreamingHTTPServer(host, port, num_workers=10) | |
3542 | endpoint_address = 'http://'+host+':'+str(port) | |
3543 | endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' | |
3544 | # the http server does not guarantee order, so duplicates are expected | |
3545 | exact_match = False | |
3546 | elif endpoint_type == 'amqp': | |
3547 | # start amqp receiver | |
3548 | exchange = 'ex1' | |
3549 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
3550 | task.start() | |
3551 | endpoint_address = 'amqp://' + host | |
3552 | endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true' | |
3553 | # amqp broker guarantee ordering | |
3554 | exact_match = True | |
1e59de90 TL |
3555 | elif endpoint_type == 'kafka': |
3556 | # start amqp receiver | |
3557 | task, receiver = create_kafka_receiver_thread(topic_name) | |
3558 | task.start() | |
3559 | endpoint_address = 'kafka://' + host | |
3560 | endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true' | |
3561 | # amqp broker guarantee ordering | |
3562 | exact_match = True | |
20effc67 TL |
3563 | else: |
3564 | return SkipTest('Unknown endpoint type: ' + endpoint_type) | |
3565 | ||
3566 | ||
3567 | # create s3 topic | |
3568 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
3569 | topic_arn = topic_conf.set_config() | |
3570 | # create s3 notification | |
3571 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3572 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, | |
3573 | 'Events': [] | |
3574 | }] | |
3575 | ||
3576 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
3577 | response, status = s3_notification_conf.set_config() | |
3578 | assert_equal(status/100, 2) | |
3579 | ||
3580 | # create objects in the bucket (async) | |
3581 | number_of_objects = 100 | |
3582 | client_threads = [] | |
3583 | start_time = time.time() | |
3584 | for i in range(number_of_objects): | |
3585 | key = bucket.new_key(str(i)) | |
3586 | content = str(os.urandom(1024*1024)) | |
3587 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3588 | thr.start() | |
3589 | client_threads.append(thr) | |
3590 | [thr.join() for thr in client_threads] | |
3591 | ||
3592 | time_diff = time.time() - start_time | |
3593 | print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3594 | ||
3595 | keys = list(bucket.list()) | |
3596 | ||
3597 | delay = 40 | |
3598 | print('wait for '+str(delay)+'sec for the messages...') | |
3599 | time.sleep(delay) | |
3600 | ||
3601 | receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False) | |
3602 | ||
3603 | # delete objects from the bucket | |
3604 | client_threads = [] | |
3605 | start_time = time.time() | |
3606 | for key in bucket.list(): | |
3607 | thr = threading.Thread(target = key.delete, args=()) | |
3608 | thr.start() | |
3609 | client_threads.append(thr) | |
3610 | [thr.join() for thr in client_threads] | |
3611 | ||
3612 | time_diff = time.time() - start_time | |
3613 | print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3614 | ||
3615 | print('wait for '+str(delay)+'sec for the messages...') | |
3616 | time.sleep(delay) | |
3617 | ||
3618 | receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True) | |
3619 | ||
3620 | # cleanup | |
3621 | s3_notification_conf.del_config() | |
3622 | topic_conf.del_config() | |
3623 | # delete the bucket | |
3624 | conn.delete_bucket(bucket_name) | |
3625 | if endpoint_type == 'http': | |
3626 | receiver.close() | |
3627 | else: | |
3628 | stop_amqp_receiver(receiver, task) | |
3629 | ||
3630 | ||
3631 | @attr('http_test') | |
3632 | def test_ps_s3_persistent_notification_http(): | |
3633 | """ test pushing persistent notification http """ | |
3634 | persistent_notification('http') | |
3635 | ||
3636 | ||
3637 | @attr('amqp_test') | |
3638 | def test_ps_s3_persistent_notification_amqp(): | |
3639 | """ test pushing persistent notification amqp """ | |
20effc67 TL |
3640 | persistent_notification('amqp') |
3641 | ||
1e59de90 | 3642 | |
20effc67 TL |
3643 | @attr('kafka_test') |
3644 | def test_ps_s3_persistent_notification_kafka(): | |
1e59de90 | 3645 | """ test pushing persistent notification kafka """ |
20effc67 | 3646 | persistent_notification('kafka') |
1e59de90 | 3647 | |
20effc67 TL |
3648 | |
3649 | def random_string(length): | |
3650 | import string | |
3651 | letters = string.ascii_letters | |
3652 | return ''.join(random.choice(letters) for i in range(length)) | |
3653 | ||
3654 | ||
3655 | @attr('amqp_test') | |
3656 | def test_ps_s3_persistent_notification_large(): | |
3657 | """ test pushing persistent notification of large notifications """ | |
3658 | ||
3659 | conn = connection() | |
3660 | zonegroup = 'default' | |
3661 | ||
3662 | # create bucket | |
3663 | bucket_name = gen_bucket_name() | |
3664 | bucket = conn.create_bucket(bucket_name) | |
3665 | topic_name = bucket_name + TOPIC_SUFFIX | |
3666 | ||
3667 | receiver = {} | |
3668 | host = get_ip() | |
3669 | # start amqp receiver | |
3670 | exchange = 'ex1' | |
3671 | task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
3672 | task.start() | |
3673 | endpoint_address = 'amqp://' + host | |
3674 | opaque_data = random_string(1024*2) | |
3675 | endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true' | |
3676 | # amqp broker guarantee ordering | |
3677 | exact_match = True | |
3678 | ||
3679 | # create s3 topic | |
3680 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
3681 | topic_arn = topic_conf.set_config() | |
3682 | # create s3 notification | |
3683 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3684 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, | |
3685 | 'Events': [] | |
3686 | }] | |
3687 | ||
3688 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
3689 | response, status = s3_notification_conf.set_config() | |
3690 | assert_equal(status/100, 2) | |
3691 | ||
3692 | # create objects in the bucket (async) | |
3693 | number_of_objects = 100 | |
3694 | client_threads = [] | |
3695 | start_time = time.time() | |
3696 | for i in range(number_of_objects): | |
3697 | key_value = random_string(63) | |
3698 | key = bucket.new_key(key_value) | |
3699 | content = str(os.urandom(1024*1024)) | |
3700 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
3701 | thr.start() | |
3702 | client_threads.append(thr) | |
3703 | [thr.join() for thr in client_threads] | |
3704 | ||
3705 | time_diff = time.time() - start_time | |
3706 | print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3707 | ||
3708 | keys = list(bucket.list()) | |
3709 | ||
3710 | delay = 40 | |
3711 | print('wait for '+str(delay)+'sec for the messages...') | |
3712 | time.sleep(delay) | |
3713 | ||
3714 | receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False) | |
3715 | ||
3716 | # delete objects from the bucket | |
3717 | client_threads = [] | |
3718 | start_time = time.time() | |
3719 | for key in bucket.list(): | |
3720 | thr = threading.Thread(target = key.delete, args=()) | |
3721 | thr.start() | |
3722 | client_threads.append(thr) | |
3723 | [thr.join() for thr in client_threads] | |
3724 | ||
3725 | time_diff = time.time() - start_time | |
3726 | print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
3727 | ||
3728 | print('wait for '+str(delay)+'sec for the messages...') | |
3729 | time.sleep(delay) | |
3730 | ||
3731 | receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True) | |
3732 | ||
3733 | # cleanup | |
3734 | s3_notification_conf.del_config() | |
3735 | topic_conf.del_config() | |
3736 | # delete the bucket | |
3737 | conn.delete_bucket(bucket_name) | |
3738 | stop_amqp_receiver(receiver, task) | |
3739 | ||
3740 | ||
3741 | @attr('modification_required') | |
3742 | def test_ps_s3_topic_update(): | |
3743 | """ test updating topic associated with a notification""" | |
3744 | return SkipTest('This test is yet to be modified.') | |
3745 | ||
3746 | conn = connection() | |
3747 | ps_zone = None | |
3748 | bucket_name = gen_bucket_name() | |
3749 | topic_name = bucket_name+TOPIC_SUFFIX | |
3750 | # create amqp topic | |
3751 | hostname = get_ip() | |
3752 | exchange = 'ex1' | |
3753 | amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name) | |
3754 | amqp_task.start() | |
3755 | #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3756 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3757 | ||
3758 | topic_arn = topic_conf.set_config() | |
3759 | #result, status = topic_conf.set_config() | |
3760 | #assert_equal(status/100, 2) | |
3761 | parsed_result = json.loads(result) | |
3762 | topic_arn = parsed_result['arn'] | |
3763 | # get topic | |
3764 | result, _ = topic_conf.get_config() | |
3765 | # verify topic content | |
3766 | parsed_result = json.loads(result) | |
3767 | assert_equal(parsed_result['topic']['name'], topic_name) | |
3768 | assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) | |
3769 | # create http server | |
3770 | port = random.randint(10000, 20000) | |
3771 | # start an http server in a separate thread | |
3772 | http_server = StreamingHTTPServer(hostname, port) | |
3773 | # create bucket on the first of the rados zones | |
3774 | bucket = conn.create_bucket(bucket_name) | |
3775 | # create s3 notification | |
3776 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3777 | topic_conf_list = [{'Id': notification_name, | |
3778 | 'TopicArn': topic_arn, | |
3779 | 'Events': ['s3:ObjectCreated:*'] | |
3780 | }] | |
3781 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
3782 | _, status = s3_notification_conf.set_config() | |
3783 | assert_equal(status/100, 2) | |
3784 | # create objects in the bucket | |
3785 | number_of_objects = 10 | |
3786 | for i in range(number_of_objects): | |
3787 | key = bucket.new_key(str(i)) | |
3788 | key.set_contents_from_string('bar') | |
3789 | # wait for sync | |
3790 | #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
3791 | keys = list(bucket.list()) | |
3792 | # TODO: use exact match | |
3793 | receiver.verify_s3_events(keys, exact_match=False) | |
3794 | # update the same topic with new endpoint | |
3795 | #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='http://'+ hostname + ':' + str(port)) | |
3796 | topic_conf = PSTopicS3(conn, topic_name, endpoint_args='http://'+ hostname + ':' + str(port)) | |
3797 | _, status = topic_conf.set_config() | |
3798 | assert_equal(status/100, 2) | |
3799 | # get topic | |
3800 | result, _ = topic_conf.get_config() | |
3801 | # verify topic content | |
3802 | parsed_result = json.loads(result) | |
3803 | assert_equal(parsed_result['topic']['name'], topic_name) | |
3804 | assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) | |
3805 | # delete current objects and create new objects in the bucket | |
3806 | for key in bucket.list(): | |
3807 | key.delete() | |
3808 | for i in range(number_of_objects): | |
3809 | key = bucket.new_key(str(i+100)) | |
3810 | key.set_contents_from_string('bar') | |
3811 | # wait for sync | |
3812 | #zone_meta_checkpoint(ps_zone.zone) | |
3813 | #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
3814 | keys = list(bucket.list()) | |
3815 | # verify that notifications are still sent to amqp | |
3816 | # TODO: use exact match | |
3817 | receiver.verify_s3_events(keys, exact_match=False) | |
3818 | # update notification to update the endpoint from the topic | |
3819 | topic_conf_list = [{'Id': notification_name, | |
3820 | 'TopicArn': topic_arn, | |
3821 | 'Events': ['s3:ObjectCreated:*'] | |
3822 | }] | |
3823 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
3824 | _, status = s3_notification_conf.set_config() | |
3825 | assert_equal(status/100, 2) | |
3826 | # delete current objects and create new objects in the bucket | |
3827 | for key in bucket.list(): | |
3828 | key.delete() | |
3829 | for i in range(number_of_objects): | |
3830 | key = bucket.new_key(str(i+200)) | |
3831 | key.set_contents_from_string('bar') | |
3832 | # wait for sync | |
3833 | #zone_meta_checkpoint(ps_zone.zone) | |
3834 | #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
3835 | keys = list(bucket.list()) | |
3836 | # check that updates switched to http | |
3837 | # TODO: use exact match | |
3838 | http_server.verify_s3_events(keys, exact_match=False) | |
3839 | # cleanup | |
3840 | # delete objects from the bucket | |
3841 | stop_amqp_receiver(receiver, amqp_task) | |
3842 | for key in bucket.list(): | |
3843 | key.delete() | |
3844 | s3_notification_conf.del_config() | |
3845 | topic_conf.del_config() | |
3846 | conn.delete_bucket(bucket_name) | |
3847 | http_server.close() | |
3848 | ||
3849 | ||
3850 | @attr('modification_required') | |
3851 | def test_ps_s3_notification_update(): | |
3852 | """ test updating the topic of a notification""" | |
3853 | return SkipTest('This test is yet to be modified.') | |
3854 | ||
3855 | hostname = get_ip() | |
3856 | conn = connection() | |
3857 | ps_zone = None | |
3858 | bucket_name = gen_bucket_name() | |
3859 | topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX | |
3860 | topic_name2 = bucket_name+'http'+TOPIC_SUFFIX | |
3861 | zonegroup = 'default' | |
3862 | # create topics | |
3863 | # start amqp receiver in a separate thread | |
3864 | exchange = 'ex1' | |
3865 | amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) | |
3866 | amqp_task.start() | |
3867 | # create random port for the http server | |
3868 | http_port = random.randint(10000, 20000) | |
3869 | # start an http server in a separate thread | |
3870 | http_server = StreamingHTTPServer(hostname, http_port) | |
3871 | #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3872 | topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3873 | result, status = topic_conf1.set_config() | |
3874 | parsed_result = json.loads(result) | |
3875 | topic_arn1 = parsed_result['arn'] | |
3876 | assert_equal(status/100, 2) | |
3877 | #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port)) | |
3878 | topic_conf2 = PSTopicS3(conn, topic_name2, endpoint_args='http://'+hostname+':'+str(http_port)) | |
3879 | result, status = topic_conf2.set_config() | |
3880 | parsed_result = json.loads(result) | |
3881 | topic_arn2 = parsed_result['arn'] | |
3882 | assert_equal(status/100, 2) | |
3883 | # create bucket on the first of the rados zones | |
3884 | bucket = conn.create_bucket(bucket_name) | |
3885 | # wait for sync | |
3886 | #zone_meta_checkpoint(ps_zone.zone) | |
3887 | # create s3 notification with topic1 | |
3888 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
3889 | topic_conf_list = [{'Id': notification_name, | |
3890 | 'TopicArn': topic_arn1, | |
3891 | 'Events': ['s3:ObjectCreated:*'] | |
3892 | }] | |
3893 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
3894 | _, status = s3_notification_conf.set_config() | |
3895 | assert_equal(status/100, 2) | |
3896 | # create objects in the bucket | |
3897 | number_of_objects = 10 | |
3898 | for i in range(number_of_objects): | |
3899 | key = bucket.new_key(str(i)) | |
3900 | key.set_contents_from_string('bar') | |
3901 | # wait for sync | |
3902 | #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
3903 | keys = list(bucket.list()) | |
3904 | # TODO: use exact match | |
3905 | receiver.verify_s3_events(keys, exact_match=False); | |
3906 | # update notification to use topic2 | |
3907 | topic_conf_list = [{'Id': notification_name, | |
3908 | 'TopicArn': topic_arn2, | |
3909 | 'Events': ['s3:ObjectCreated:*'] | |
3910 | }] | |
3911 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
3912 | _, status = s3_notification_conf.set_config() | |
3913 | assert_equal(status/100, 2) | |
3914 | # delete current objects and create new objects in the bucket | |
3915 | for key in bucket.list(): | |
3916 | key.delete() | |
3917 | for i in range(number_of_objects): | |
3918 | key = bucket.new_key(str(i+100)) | |
3919 | key.set_contents_from_string('bar') | |
3920 | # wait for sync | |
3921 | #zone_meta_checkpoint(ps_zone.zone) | |
3922 | #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
3923 | keys = list(bucket.list()) | |
3924 | # check that updates switched to http | |
3925 | # TODO: use exact match | |
3926 | http_server.verify_s3_events(keys, exact_match=False) | |
3927 | # cleanup | |
3928 | # delete objects from the bucket | |
3929 | stop_amqp_receiver(receiver, amqp_task) | |
3930 | for key in bucket.list(): | |
3931 | key.delete() | |
3932 | s3_notification_conf.del_config() | |
3933 | topic_conf1.del_config() | |
3934 | topic_conf2.del_config() | |
3935 | conn.delete_bucket(bucket_name) | |
3936 | http_server.close() | |
3937 | ||
3938 | ||
3939 | @attr('modification_required') | |
3940 | def test_ps_s3_multiple_topics_notification(): | |
3941 | """ test notification creation with multiple topics""" | |
3942 | return SkipTest('This test is yet to be modified.') | |
3943 | ||
3944 | hostname = get_ip() | |
3945 | zonegroup = 'default' | |
3946 | conn = connection() | |
3947 | ps_zone = None | |
3948 | bucket_name = gen_bucket_name() | |
3949 | topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX | |
3950 | topic_name2 = bucket_name+'http'+TOPIC_SUFFIX | |
3951 | # create topics | |
3952 | # start amqp receiver in a separate thread | |
3953 | exchange = 'ex1' | |
3954 | amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) | |
3955 | amqp_task.start() | |
3956 | # create random port for the http server | |
3957 | http_port = random.randint(10000, 20000) | |
3958 | # start an http server in a separate thread | |
3959 | http_server = StreamingHTTPServer(hostname, http_port) | |
3960 | #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3961 | topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') | |
3962 | result, status = topic_conf1.set_config() | |
3963 | parsed_result = json.loads(result) | |
3964 | topic_arn1 = parsed_result['arn'] | |
3965 | assert_equal(status/100, 2) | |
3966 | #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port)) | |
3967 | topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args='http://'+hostname+':'+str(http_port)) | |
3968 | result, status = topic_conf2.set_config() | |
3969 | parsed_result = json.loads(result) | |
3970 | topic_arn2 = parsed_result['arn'] | |
3971 | assert_equal(status/100, 2) | |
3972 | # create bucket on the first of the rados zones | |
3973 | bucket = conn.create_bucket(bucket_name) | |
3974 | # wait for sync | |
3975 | #zone_meta_checkpoint(ps_zone.zone) | |
3976 | # create s3 notification | |
3977 | notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' | |
3978 | notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' | |
3979 | topic_conf_list = [ | |
3980 | { | |
3981 | 'Id': notification_name1, | |
3982 | 'TopicArn': topic_arn1, | |
3983 | 'Events': ['s3:ObjectCreated:*'] | |
3984 | }, | |
3985 | { | |
3986 | 'Id': notification_name2, | |
3987 | 'TopicArn': topic_arn2, | |
3988 | 'Events': ['s3:ObjectCreated:*'] | |
3989 | }] | |
3990 | s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) | |
3991 | _, status = s3_notification_conf.set_config() | |
3992 | assert_equal(status/100, 2) | |
3993 | result, _ = s3_notification_conf.get_config() | |
3994 | assert_equal(len(result['TopicConfigurations']), 2) | |
3995 | assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1) | |
3996 | assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2) | |
3997 | # get auto-generated subscriptions | |
3998 | sub_conf1 = PSSubscription(ps_zone.conn, notification_name1, | |
3999 | topic_name1) | |
4000 | _, status = sub_conf1.get_config() | |
4001 | assert_equal(status/100, 2) | |
4002 | sub_conf2 = PSSubscription(ps_zone.conn, notification_name2, | |
4003 | topic_name2) | |
4004 | _, status = sub_conf2.get_config() | |
4005 | assert_equal(status/100, 2) | |
4006 | # create objects in the bucket | |
4007 | number_of_objects = 10 | |
4008 | for i in range(number_of_objects): | |
4009 | key = bucket.new_key(str(i)) | |
4010 | key.set_contents_from_string('bar') | |
4011 | # wait for sync | |
4012 | #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) | |
4013 | # get the events from both of the subscription | |
4014 | result, _ = sub_conf1.get_events() | |
4015 | records = json.loads(result) | |
4016 | for record in records['Records']: | |
4017 | log.debug(record) | |
4018 | keys = list(bucket.list()) | |
4019 | # TODO: use exact match | |
4020 | verify_s3_records_by_elements(records, keys, exact_match=False) | |
4021 | receiver.verify_s3_events(keys, exact_match=False) | |
4022 | result, _ = sub_conf2.get_events() | |
4023 | parsed_result = json.loads(result) | |
4024 | for record in parsed_result['Records']: | |
4025 | log.debug(record) | |
4026 | keys = list(bucket.list()) | |
4027 | # TODO: use exact match | |
4028 | verify_s3_records_by_elements(records, keys, exact_match=False) | |
4029 | http_server.verify_s3_events(keys, exact_match=False) | |
4030 | # cleanup | |
4031 | stop_amqp_receiver(receiver, amqp_task) | |
4032 | s3_notification_conf.del_config() | |
4033 | topic_conf1.del_config() | |
4034 | topic_conf2.del_config() | |
4035 | # delete objects from the bucket | |
4036 | for key in bucket.list(): | |
4037 | key.delete() | |
4038 | conn.delete_bucket(bucket_name) | |
4039 | http_server.close() | |
4040 | ||
4041 | ||
20effc67 | 4042 | def kafka_security(security_type): |
1e59de90 | 4043 | """ test pushing kafka s3 notification securly to master """ |
20effc67 | 4044 | conn = connection() |
20effc67 TL |
4045 | zonegroup = 'default' |
4046 | # create bucket | |
4047 | bucket_name = gen_bucket_name() | |
4048 | bucket = conn.create_bucket(bucket_name) | |
4049 | # name is constant for manual testing | |
4050 | topic_name = bucket_name+'_topic' | |
20effc67 TL |
4051 | # create s3 topic |
4052 | if security_type == 'SSL_SASL': | |
4053 | endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' | |
1e59de90 | 4054 | elif security_type == 'SSL': |
20effc67 | 4055 | endpoint_address = 'kafka://' + kafka_server + ':9093' |
20effc67 | 4056 | else: |
1e59de90 TL |
4057 | assert False, 'unknown security method '+security_type |
4058 | ||
4059 | KAFKA_DIR = os.environ['KAFKA_DIR'] | |
4060 | endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+"/y-ca.crt" | |
4061 | ||
4062 | topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) | |
4063 | ||
4064 | # create consumer on the topic | |
4065 | task, receiver = create_kafka_receiver_thread(topic_name) | |
4066 | task.start() | |
4067 | ||
20effc67 TL |
4068 | topic_arn = topic_conf.set_config() |
4069 | # create s3 notification | |
4070 | notification_name = bucket_name + NOTIFICATION_SUFFIX | |
4071 | topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, | |
4072 | 'Events': [] | |
4073 | }] | |
4074 | s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) | |
4075 | s3_notification_conf.set_config() | |
4076 | # create objects in the bucket (async) | |
4077 | number_of_objects = 10 | |
4078 | client_threads = [] | |
4079 | start_time = time.time() | |
4080 | for i in range(number_of_objects): | |
4081 | key = bucket.new_key(str(i)) | |
4082 | content = str(os.urandom(1024*1024)) | |
4083 | thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) | |
4084 | thr.start() | |
4085 | client_threads.append(thr) | |
4086 | [thr.join() for thr in client_threads] | |
4087 | time_diff = time.time() - start_time | |
4088 | print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
4089 | try: | |
4090 | print('wait for 5sec for the messages...') | |
4091 | time.sleep(5) | |
4092 | keys = list(bucket.list()) | |
4093 | receiver.verify_s3_events(keys, exact_match=True) | |
4094 | # delete objects from the bucket | |
4095 | client_threads = [] | |
4096 | start_time = time.time() | |
4097 | for key in bucket.list(): | |
4098 | thr = threading.Thread(target = key.delete, args=()) | |
4099 | thr.start() | |
4100 | client_threads.append(thr) | |
4101 | [thr.join() for thr in client_threads] | |
4102 | time_diff = time.time() - start_time | |
4103 | print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') | |
4104 | print('wait for 5sec for the messages...') | |
4105 | time.sleep(5) | |
4106 | receiver.verify_s3_events(keys, exact_match=True, deletions=True) | |
4107 | except Exception as err: | |
4108 | assert False, str(err) | |
4109 | finally: | |
4110 | # cleanup | |
4111 | s3_notification_conf.del_config() | |
4112 | topic_conf.del_config() | |
4113 | # delete the bucket | |
4114 | for key in bucket.list(): | |
4115 | key.delete() | |
4116 | conn.delete_bucket(bucket_name) | |
4117 | stop_kafka_receiver(receiver, task) | |
4118 | ||
4119 | ||
1e59de90 | 4120 | @attr('kafka_ssl_test') |
20effc67 | 4121 | def test_ps_s3_notification_push_kafka_security_ssl(): |
20effc67 TL |
4122 | kafka_security('SSL') |
4123 | ||
4124 | ||
1e59de90 | 4125 | @attr('kafka_ssl_test') |
20effc67 | 4126 | def test_ps_s3_notification_push_kafka_security_ssl_sasl(): |
20effc67 TL |
4127 | kafka_security('SSL_SASL') |
4128 |