]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests_ps.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests_ps.py
1 import logging
2 import json
3 import tempfile
4 from rgw_multi.tests import get_realm, \
5 ZonegroupConns, \
6 zonegroup_meta_checkpoint, \
7 zone_meta_checkpoint, \
8 zone_bucket_checkpoint, \
9 zone_data_checkpoint, \
10 check_bucket_eq, \
11 gen_bucket_name
12 from rgw_multi.zone_ps import PSTopic, PSNotification, PSSubscription
13 from nose import SkipTest
14 from nose.tools import assert_not_equal, assert_equal
15
16 # configure logging for the tests module
17 log = logging.getLogger('rgw_multi.tests')
18
19 ####################################
20 # utility functions for pubsub tests
21 ####################################
22
23
24 def check_ps_configured():
25 """check if at least one pubsub zone exist"""
26 realm = get_realm()
27 zonegroup = realm.master_zonegroup()
28
29 es_zones = zonegroup.zones_by_type.get("pubsub")
30 if not es_zones:
31 raise SkipTest("Requires at least one PS zone")
32
33
34 def is_ps_zone(zone_conn):
35 """check if a specific zone is pubsub zone"""
36 if not zone_conn:
37 return False
38 return zone_conn.zone.tier_type() == "pubsub"
39
40
41 def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
42 """ verify there is at least one event per element """
43 err = ''
44 for key in keys:
45 key_found = False
46 for event in events:
47 if event['info']['bucket']['name'] == key.bucket.name and \
48 event['info']['key']['name'] == key.name:
49 if deletions and event['event'] == 'OBJECT_DELETE':
50 key_found = True
51 break
52 elif not deletions and event['event'] == 'OBJECT_CREATE':
53 key_found = True
54 break
55 if not key_found:
56 err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
57 log.error(events)
58 assert False, err
59
60 if not len(events) == len(keys):
61 err = 'superfluous events are found'
62 log.debug(err)
63 if exact_match:
64 log.error(events)
65 assert False, err
66
67
68 def init_env():
69 """initialize the environment"""
70 check_ps_configured()
71
72 realm = get_realm()
73 zonegroup = realm.master_zonegroup()
74 zonegroup_conns = ZonegroupConns(zonegroup)
75
76 zonegroup_meta_checkpoint(zonegroup)
77
78 ps_zones = []
79 zones = []
80 for conn in zonegroup_conns.zones:
81 if is_ps_zone(conn):
82 zone_meta_checkpoint(conn.zone)
83 ps_zones.append(conn)
84 elif not conn.zone.is_read_only():
85 zones.append(conn)
86
87 assert_not_equal(len(zones), 0)
88 assert_not_equal(len(ps_zones), 0)
89 return zones, ps_zones
90
91
92 TOPIC_SUFFIX = "_topic"
93 SUB_SUFFIX = "_sub"
94
95 ##############
96 # pubsub tests
97 ##############
98
99
100 def test_ps_topic():
101 """ test set/get/delete of topic """
102 _, ps_zones = init_env()
103 bucket_name = gen_bucket_name()
104 topic_name = bucket_name+TOPIC_SUFFIX
105
106 # create topic
107 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
108 _, status = topic_conf.set_config()
109 assert_equal(status/100, 2)
110 # get topic
111 result, _ = topic_conf.get_config()
112 # verify topic content
113 parsed_result = json.loads(result)
114 assert_equal(parsed_result['topic']['name'], topic_name)
115 assert_equal(len(parsed_result['subs']), 0)
116 # delete topic
117 _, status = topic_conf.del_config()
118 assert_equal(status/100, 2)
119 # verift topic is deleted
120 result, _ = topic_conf.get_config()
121 parsed_result = json.loads(result)
122 assert_equal(parsed_result['Code'], 'NoSuchKey')
123
124
125 def test_ps_notification():
126 """ test set/get/delete of notification """
127 zones, ps_zones = init_env()
128 bucket_name = gen_bucket_name()
129 topic_name = bucket_name+TOPIC_SUFFIX
130
131 # create topic
132 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
133 topic_conf.set_config()
134 # create bucket on the first of the rados zones
135 zones[0].create_bucket(bucket_name)
136 # wait for sync
137 zone_meta_checkpoint(ps_zones[0].zone)
138 # create notifications
139 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
140 topic_name)
141 _, status = notification_conf.set_config()
142 assert_equal(status/100, 2)
143 # get notification
144 result, _ = notification_conf.get_config()
145 parsed_result = json.loads(result)
146 assert_equal(len(parsed_result['topics']), 1)
147 assert_equal(parsed_result['topics'][0]['topic']['name'],
148 topic_name)
149 # delete notification
150 _, status = notification_conf.del_config()
151 assert_equal(status/100, 2)
152 # TODO: deletion cannot be verified via GET
153 # result, _ = notification_conf.get_config()
154 # parsed_result = json.loads(result)
155 # assert_equal(parsed_result['Code'], 'NoSuchKey')
156
157 # cleanup
158 topic_conf.del_config()
159 zones[0].delete_bucket(bucket_name)
160
161
162 def test_ps_notification_events():
163 """ test set/get/delete of notification on specific events"""
164 zones, ps_zones = init_env()
165 bucket_name = gen_bucket_name()
166 topic_name = bucket_name+TOPIC_SUFFIX
167
168 # create topic
169 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
170 topic_conf.set_config()
171 # create bucket on the first of the rados zones
172 zones[0].create_bucket(bucket_name)
173 # wait for sync
174 zone_meta_checkpoint(ps_zones[0].zone)
175 # create notifications
176 events = "OBJECT_CREATE,OBJECT_DELETE"
177 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
178 topic_name,
179 events)
180 _, status = notification_conf.set_config()
181 assert_equal(status/100, 2)
182 # get notification
183 result, _ = notification_conf.get_config()
184 parsed_result = json.loads(result)
185 assert_equal(len(parsed_result['topics']), 1)
186 assert_equal(parsed_result['topics'][0]['topic']['name'],
187 topic_name)
188 assert_not_equal(len(parsed_result['topics'][0]['events']), 0)
189 # TODO add test for invalid event name
190
191 # cleanup
192 notification_conf.del_config()
193 topic_conf.del_config()
194 zones[0].delete_bucket(bucket_name)
195
196
197 def test_ps_subscription():
198 """ test set/get/delete of subscription """
199 zones, ps_zones = init_env()
200 bucket_name = gen_bucket_name()
201 topic_name = bucket_name+TOPIC_SUFFIX
202
203 # create topic
204 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
205 topic_conf.set_config()
206 # create bucket on the first of the rados zones
207 bucket = zones[0].create_bucket(bucket_name)
208 # wait for sync
209 zone_meta_checkpoint(ps_zones[0].zone)
210 # create notifications
211 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
212 topic_name)
213 _, status = notification_conf.set_config()
214 assert_equal(status/100, 2)
215 # create subscription
216 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
217 topic_name)
218 _, status = sub_conf.set_config()
219 assert_equal(status/100, 2)
220 # get the subscription
221 result, _ = sub_conf.get_config()
222 parsed_result = json.loads(result)
223 assert_equal(parsed_result['topic'], topic_name)
224 # create objects in the bucket
225 number_of_objects = 10
226 for i in range(number_of_objects):
227 key = bucket.new_key(str(i))
228 key.set_contents_from_string('bar')
229 # wait for sync
230 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
231
232 # get the create events from the subscription
233 result, _ = sub_conf.get_events()
234 parsed_result = json.loads(result)
235 for event in parsed_result['events']:
236 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
237 keys = list(bucket.list())
238 # TODO: set exact_match to true
239 verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
240 # delete objects from the bucket
241 for key in bucket.list():
242 key.delete()
243 # wait for sync
244 zone_meta_checkpoint(ps_zones[0].zone)
245 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
246
247 # get the delete events from the subscriptions
248 result, _ = sub_conf.get_events()
249 for event in parsed_result['events']:
250 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
251 # TODO: check deletions
252 # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
253 # we should see the creations as well as the deletions
254 # delete subscription
255 _, status = sub_conf.del_config()
256 assert_equal(status/100, 2)
257 result, _ = sub_conf.get_config()
258 parsed_result = json.loads(result)
259 assert_equal(parsed_result['topic'], '')
260 # TODO should return "no-key" instead
261 # assert_equal(parsed_result['Code'], 'NoSuchKey')
262
263 # cleanup
264 notification_conf.del_config()
265 topic_conf.del_config()
266 zones[0].delete_bucket(bucket_name)
267
268
269 def test_ps_event_type_subscription():
270 """ test subscriptions for different events """
271 zones, ps_zones = init_env()
272 bucket_name = gen_bucket_name()
273
274 # create topic for objects creation
275 topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
276 topic_create_conf = PSTopic(ps_zones[0].conn, topic_create_name)
277 topic_create_conf.set_config()
278 # create topic for objects deletion
279 topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete'
280 topic_delete_conf = PSTopic(ps_zones[0].conn, topic_delete_name)
281 topic_delete_conf.set_config()
282 # create topic for all events
283 topic_name = bucket_name+TOPIC_SUFFIX+'_all'
284 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
285 topic_conf.set_config()
286 # create bucket on the first of the rados zones
287 bucket = zones[0].create_bucket(bucket_name)
288 # wait for sync
289 zone_meta_checkpoint(ps_zones[0].zone)
290 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
291 # create notifications for objects creation
292 notification_create_conf = PSNotification(ps_zones[0].conn, bucket_name,
293 topic_create_name, "OBJECT_CREATE")
294 _, status = notification_create_conf.set_config()
295 assert_equal(status/100, 2)
296 # create notifications for objects deletion
297 notification_delete_conf = PSNotification(ps_zones[0].conn, bucket_name,
298 topic_delete_name, "OBJECT_DELETE")
299 _, status = notification_delete_conf.set_config()
300 assert_equal(status/100, 2)
301 # create notifications for all events
302 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
303 topic_name, "OBJECT_DELETE,OBJECT_CREATE")
304 _, status = notification_conf.set_config()
305 assert_equal(status/100, 2)
306 # create subscription for objects creation
307 sub_create_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_create',
308 topic_create_name)
309 _, status = sub_create_conf.set_config()
310 assert_equal(status/100, 2)
311 # create subscription for objects deletion
312 sub_delete_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_delete',
313 topic_delete_name)
314 _, status = sub_delete_conf.set_config()
315 assert_equal(status/100, 2)
316 # create subscription for all events
317 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_all',
318 topic_name)
319 _, status = sub_conf.set_config()
320 assert_equal(status/100, 2)
321 # create objects in the bucket
322 number_of_objects = 10
323 for i in range(number_of_objects):
324 key = bucket.new_key(str(i))
325 key.set_contents_from_string('bar')
326 # wait for sync
327 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
328
329 # get the events from the creation subscription
330 result, _ = sub_create_conf.get_events()
331 parsed_result = json.loads(result)
332 for event in parsed_result['events']:
333 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \
334 '" type: "' + str(event['event']) + '"')
335 keys = list(bucket.list())
336 # TODO: set exact_match to true
337 verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
338 # get the events from the deletions subscription
339 result, _ = sub_delete_conf.get_events()
340 parsed_result = json.loads(result)
341 for event in parsed_result['events']:
342 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
343 '" type: "' + str(event['event']) + '"')
344 assert_equal(len(parsed_result['events']), 0)
345 # get the events from the all events subscription
346 result, _ = sub_conf.get_events()
347 parsed_result = json.loads(result)
348 for event in parsed_result['events']:
349 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + \
350 str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
351 # TODO: set exact_match to true
352 verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
353 # delete objects from the bucket
354 for key in bucket.list():
355 key.delete()
356 # wait for sync
357 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
358 log.debug("Event (OBJECT_DELETE) synced")
359
360 # get the events from the creations subscription
361 result, _ = sub_create_conf.get_events()
362 parsed_result = json.loads(result)
363 for event in parsed_result['events']:
364 log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + \
365 '" type: "' + str(event['event']) + '"')
366 # deletions should not change the creation events
367 # TODO: set exact_match to true
368 verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
369 # get the events from the deletions subscription
370 result, _ = sub_delete_conf.get_events()
371 parsed_result = json.loads(result)
372 for event in parsed_result['events']:
373 log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
374 '" type: "' + str(event['event']) + '"')
375 # only deletions should be listed here
376 # TODO: set exact_match to true
377 verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
378 # get the events from the all events subscription
379 result, _ = sub_create_conf.get_events()
380 parsed_result = json.loads(result)
381 for event in parsed_result['events']:
382 log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + \
383 '" type: "' + str(event['event']) + '"')
384 # both deletions and creations should be here
385 verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=False)
386 # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
387 # TODO: (1) test deletions (2) test overall number of events
388
389 # cleanup
390 sub_create_conf.del_config()
391 sub_delete_conf.del_config()
392 sub_conf.del_config()
393 notification_create_conf.del_config()
394 notification_delete_conf.del_config()
395 notification_conf.del_config()
396 topic_create_conf.del_config()
397 topic_delete_conf.del_config()
398 topic_conf.del_config()
399 zones[0].delete_bucket(bucket_name)
400
401
402 def test_ps_event_fetching():
403 """ test incremental fetching of events from a subscription """
404 zones, ps_zones = init_env()
405 bucket_name = gen_bucket_name()
406 topic_name = bucket_name+TOPIC_SUFFIX
407
408 # create topic
409 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
410 topic_conf.set_config()
411 # create bucket on the first of the rados zones
412 bucket = zones[0].create_bucket(bucket_name)
413 # wait for sync
414 zone_meta_checkpoint(ps_zones[0].zone)
415 # create notifications
416 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
417 topic_name)
418 _, status = notification_conf.set_config()
419 assert_equal(status/100, 2)
420 # create subscription
421 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
422 topic_name)
423 _, status = sub_conf.set_config()
424 assert_equal(status/100, 2)
425 # create objects in the bucket
426 number_of_objects = 100
427 for i in range(number_of_objects):
428 key = bucket.new_key(str(i))
429 key.set_contents_from_string('bar')
430 # wait for sync
431 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
432 max_events = 15
433 total_events_count = 0
434 next_marker = None
435 all_events = []
436 while True:
437 # get the events from the subscription
438 result, _ = sub_conf.get_events(max_events, next_marker)
439 parsed_result = json.loads(result)
440 events = parsed_result['events']
441 total_events_count += len(events)
442 all_events.extend(events)
443 next_marker = parsed_result['next_marker']
444 for event in events:
445 log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
446 if next_marker == '':
447 break
448 keys = list(bucket.list())
449 # TODO: set exact_match to true
450 verify_events_by_elements(all_events, keys, exact_match=False)
451
452 # cleanup
453 sub_conf.del_config()
454 notification_conf.del_config()
455 topic_conf.del_config()
456 for key in bucket.list():
457 key.delete()
458 zones[0].delete_bucket(bucket_name)
459
460
461 def test_ps_event_acking():
462 """ test acking of some events in a subscription """
463 zones, ps_zones = init_env()
464 bucket_name = gen_bucket_name()
465 topic_name = bucket_name+TOPIC_SUFFIX
466
467 # create topic
468 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
469 topic_conf.set_config()
470 # create bucket on the first of the rados zones
471 bucket = zones[0].create_bucket(bucket_name)
472 # wait for sync
473 zone_meta_checkpoint(ps_zones[0].zone)
474 # create notifications
475 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
476 topic_name)
477 _, status = notification_conf.set_config()
478 assert_equal(status/100, 2)
479 # create subscription
480 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
481 topic_name)
482 _, status = sub_conf.set_config()
483 assert_equal(status/100, 2)
484 # create objects in the bucket
485 number_of_objects = 10
486 for i in range(number_of_objects):
487 key = bucket.new_key(str(i))
488 key.set_contents_from_string('bar')
489 # wait for sync
490 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
491
492 # get the create events from the subscription
493 result, _ = sub_conf.get_events()
494 parsed_result = json.loads(result)
495 events = parsed_result['events']
496 original_number_of_events = len(events)
497 for event in events:
498 log.debug('Event (before ack) id: "' + str(event['id']) + '"')
499 keys = list(bucket.list())
500 # TODO: set exact_match to true
501 verify_events_by_elements(events, keys, exact_match=False)
502 # ack half of the events
503 events_to_ack = number_of_objects/2
504 for event in events:
505 if events_to_ack == 0:
506 break
507 _, status = sub_conf.ack_events(event['id'])
508 assert_equal(status/100, 2)
509 events_to_ack -= 1
510
511 # verify that acked events are gone
512 result, _ = sub_conf.get_events()
513 parsed_result = json.loads(result)
514 for event in parsed_result['events']:
515 log.debug('Event (after ack) id: "' + str(event['id']) + '"')
516 assert_equal(len(parsed_result['events']), original_number_of_events - number_of_objects/2)
517
518 # cleanup
519 sub_conf.del_config()
520 notification_conf.del_config()
521 topic_conf.del_config()
522 for key in bucket.list():
523 key.delete()
524 zones[0].delete_bucket(bucket_name)
525
526 def test_ps_creation_triggers():
527 """ test object creation notifications in using put/copy/post """
528 zones, ps_zones = init_env()
529 bucket_name = gen_bucket_name()
530 topic_name = bucket_name+TOPIC_SUFFIX
531
532 # create topic
533 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
534 topic_conf.set_config()
535 # create bucket on the first of the rados zones
536 bucket = zones[0].create_bucket(bucket_name)
537 # wait for sync
538 zone_meta_checkpoint(ps_zones[0].zone)
539 # create notifications
540 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
541 topic_name)
542 _, status = notification_conf.set_config()
543 assert_equal(status/100, 2)
544 # create subscription
545 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
546 topic_name)
547 _, status = sub_conf.set_config()
548 assert_equal(status/100, 2)
549 # create objects in the bucket using PUT
550 key = bucket.new_key('put')
551 key.set_contents_from_string('bar')
552 # create objects in the bucket using COPY
553 bucket.copy_key('copy', bucket.name, key.name)
554 # create objects in the bucket using multi-part upload
555 fp = tempfile.TemporaryFile(mode='w')
556 fp.write('bar')
557 fp.close()
558 uploader = bucket.initiate_multipart_upload('multipart')
559 fp = tempfile.TemporaryFile(mode='r')
560 uploader.upload_part_from_file(fp, 1)
561 uploader.complete_upload()
562 fp.close()
563 # wait for sync
564 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
565
566 # get the create events from the subscription
567 result, _ = sub_conf.get_events()
568 parsed_result = json.loads(result)
569 for event in parsed_result['events']:
570 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
571
572 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
573 assert len(parsed_result['events']) >= 3
574 # cleanup
575 sub_conf.del_config()
576 notification_conf.del_config()
577 topic_conf.del_config()
578 for key in bucket.list():
579 key.delete()
580 zones[0].delete_bucket(bucket_name)
581
582
583 def test_ps_versioned_deletion():
584 """ test notification of deletion markers """
585 zones, ps_zones = init_env()
586 bucket_name = gen_bucket_name()
587 topic_name = bucket_name+TOPIC_SUFFIX
588
589 # create topic
590 topic_conf = PSTopic(ps_zones[0].conn, topic_name)
591 topic_conf.set_config()
592 # create bucket on the first of the rados zones
593 bucket = zones[0].create_bucket(bucket_name)
594 bucket.configure_versioning(True)
595 # wait for sync
596 zone_meta_checkpoint(ps_zones[0].zone)
597 # create notifications
598 notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
599 topic_name, "OBJECT_DELETE")
600 _, status = notification_conf.set_config()
601 assert_equal(status/100, 2)
602 # create subscription
603 sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
604 topic_name)
605 _, status = sub_conf.set_config()
606 assert_equal(status/100, 2)
607 # create objects in the bucket
608 key = bucket.new_key('foo')
609 key.set_contents_from_string('bar')
610 v1 = key.version_id
611 key.set_contents_from_string('kaboom')
612 v2 = key.version_id
613 # wait for sync
614 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
615 # set delete markers
616 bucket.delete_key(key.name, version_id=v2)
617 bucket.delete_key(key.name, version_id=v1)
618 # wait for sync
619 zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
620
621 # get the create events from the subscription
622 result, _ = sub_conf.get_events()
623 parsed_result = json.loads(result)
624 for event in parsed_result['events']:
625 log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
626
627 # TODO: verify the specific events
628 assert len(parsed_result['events']) >= 2
629
630 # cleanup
631 sub_conf.del_config()
632 notification_conf.del_config()
633 topic_conf.del_config()
634 zones[0].delete_bucket(bucket_name)