4 from rgw_multi
.tests
import get_realm
, \
6 zonegroup_meta_checkpoint
, \
7 zone_meta_checkpoint
, \
8 zone_bucket_checkpoint
, \
9 zone_data_checkpoint
, \
12 from rgw_multi
.zone_ps
import PSTopic
, PSNotification
, PSSubscription
13 from nose
import SkipTest
14 from nose
.tools
import assert_not_equal
, assert_equal
16 # configure logging for the tests module
17 log
= logging
.getLogger('rgw_multi.tests')
19 ####################################
20 # utility functions for pubsub tests
21 ####################################
24 def check_ps_configured():
25 """check if at least one pubsub zone exist"""
27 zonegroup
= realm
.master_zonegroup()
29 es_zones
= zonegroup
.zones_by_type
.get("pubsub")
31 raise SkipTest("Requires at least one PS zone")
34 def is_ps_zone(zone_conn
):
35 """check if a specific zone is pubsub zone"""
38 return zone_conn
.zone
.tier_type() == "pubsub"
41 def verify_events_by_elements(events
, keys
, exact_match
=False, deletions
=False):
42 """ verify there is at least one event per element """
47 if event
['info']['bucket']['name'] == key
.bucket
.name
and \
48 event
['info']['key']['name'] == key
.name
:
49 if deletions
and event
['event'] == 'OBJECT_DELETE':
52 elif not deletions
and event
['event'] == 'OBJECT_CREATE':
56 err
= 'no ' + ('deletion' if deletions
else 'creation') + ' event found for key: ' + str(key
)
60 if not len(events
) == len(keys
):
61 err
= 'superfluous events are found'
69 """initialize the environment"""
73 zonegroup
= realm
.master_zonegroup()
74 zonegroup_conns
= ZonegroupConns(zonegroup
)
76 zonegroup_meta_checkpoint(zonegroup
)
80 for conn
in zonegroup_conns
.zones
:
82 zone_meta_checkpoint(conn
.zone
)
84 elif not conn
.zone
.is_read_only():
87 assert_not_equal(len(zones
), 0)
88 assert_not_equal(len(ps_zones
), 0)
89 return zones
, ps_zones
92 TOPIC_SUFFIX
= "_topic"
101 """ test set/get/delete of topic """
102 _
, ps_zones
= init_env()
103 bucket_name
= gen_bucket_name()
104 topic_name
= bucket_name
+TOPIC_SUFFIX
107 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
108 _
, status
= topic_conf
.set_config()
109 assert_equal(status
/100, 2)
111 result
, _
= topic_conf
.get_config()
112 # verify topic content
113 parsed_result
= json
.loads(result
)
114 assert_equal(parsed_result
['topic']['name'], topic_name
)
115 assert_equal(len(parsed_result
['subs']), 0)
117 _
, status
= topic_conf
.del_config()
118 assert_equal(status
/100, 2)
119 # verift topic is deleted
120 result
, _
= topic_conf
.get_config()
121 parsed_result
= json
.loads(result
)
122 assert_equal(parsed_result
['Code'], 'NoSuchKey')
125 def test_ps_notification():
126 """ test set/get/delete of notification """
127 zones
, ps_zones
= init_env()
128 bucket_name
= gen_bucket_name()
129 topic_name
= bucket_name
+TOPIC_SUFFIX
132 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
133 topic_conf
.set_config()
134 # create bucket on the first of the rados zones
135 zones
[0].create_bucket(bucket_name
)
137 zone_meta_checkpoint(ps_zones
[0].zone
)
138 # create notifications
139 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
141 _
, status
= notification_conf
.set_config()
142 assert_equal(status
/100, 2)
144 result
, _
= notification_conf
.get_config()
145 parsed_result
= json
.loads(result
)
146 assert_equal(len(parsed_result
['topics']), 1)
147 assert_equal(parsed_result
['topics'][0]['topic']['name'],
149 # delete notification
150 _
, status
= notification_conf
.del_config()
151 assert_equal(status
/100, 2)
152 # TODO: deletion cannot be verified via GET
153 # result, _ = notification_conf.get_config()
154 # parsed_result = json.loads(result)
155 # assert_equal(parsed_result['Code'], 'NoSuchKey')
158 topic_conf
.del_config()
159 zones
[0].delete_bucket(bucket_name
)
162 def test_ps_notification_events():
163 """ test set/get/delete of notification on specific events"""
164 zones
, ps_zones
= init_env()
165 bucket_name
= gen_bucket_name()
166 topic_name
= bucket_name
+TOPIC_SUFFIX
169 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
170 topic_conf
.set_config()
171 # create bucket on the first of the rados zones
172 zones
[0].create_bucket(bucket_name
)
174 zone_meta_checkpoint(ps_zones
[0].zone
)
175 # create notifications
176 events
= "OBJECT_CREATE,OBJECT_DELETE"
177 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
180 _
, status
= notification_conf
.set_config()
181 assert_equal(status
/100, 2)
183 result
, _
= notification_conf
.get_config()
184 parsed_result
= json
.loads(result
)
185 assert_equal(len(parsed_result
['topics']), 1)
186 assert_equal(parsed_result
['topics'][0]['topic']['name'],
188 assert_not_equal(len(parsed_result
['topics'][0]['events']), 0)
189 # TODO add test for invalid event name
192 notification_conf
.del_config()
193 topic_conf
.del_config()
194 zones
[0].delete_bucket(bucket_name
)
197 def test_ps_subscription():
198 """ test set/get/delete of subscription """
199 zones
, ps_zones
= init_env()
200 bucket_name
= gen_bucket_name()
201 topic_name
= bucket_name
+TOPIC_SUFFIX
204 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
205 topic_conf
.set_config()
206 # create bucket on the first of the rados zones
207 bucket
= zones
[0].create_bucket(bucket_name
)
209 zone_meta_checkpoint(ps_zones
[0].zone
)
210 # create notifications
211 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
213 _
, status
= notification_conf
.set_config()
214 assert_equal(status
/100, 2)
215 # create subscription
216 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
218 _
, status
= sub_conf
.set_config()
219 assert_equal(status
/100, 2)
220 # get the subscription
221 result
, _
= sub_conf
.get_config()
222 parsed_result
= json
.loads(result
)
223 assert_equal(parsed_result
['topic'], topic_name
)
224 # create objects in the bucket
225 number_of_objects
= 10
226 for i
in range(number_of_objects
):
227 key
= bucket
.new_key(str(i
))
228 key
.set_contents_from_string('bar')
230 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
232 # get the create events from the subscription
233 result
, _
= sub_conf
.get_events()
234 parsed_result
= json
.loads(result
)
235 for event
in parsed_result
['events']:
236 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
237 keys
= list(bucket
.list())
238 # TODO: set exact_match to true
239 verify_events_by_elements(parsed_result
['events'], keys
, exact_match
=False)
240 # delete objects from the bucket
241 for key
in bucket
.list():
244 zone_meta_checkpoint(ps_zones
[0].zone
)
245 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
247 # get the delete events from the subscriptions
248 result
, _
= sub_conf
.get_events()
249 for event
in parsed_result
['events']:
250 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
251 # TODO: check deletions
252 # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
253 # we should see the creations as well as the deletions
254 # delete subscription
255 _
, status
= sub_conf
.del_config()
256 assert_equal(status
/100, 2)
257 result
, _
= sub_conf
.get_config()
258 parsed_result
= json
.loads(result
)
259 assert_equal(parsed_result
['topic'], '')
260 # TODO should return "no-key" instead
261 # assert_equal(parsed_result['Code'], 'NoSuchKey')
264 notification_conf
.del_config()
265 topic_conf
.del_config()
266 zones
[0].delete_bucket(bucket_name
)
269 def test_ps_event_type_subscription():
270 """ test subscriptions for different events """
271 zones
, ps_zones
= init_env()
272 bucket_name
= gen_bucket_name()
274 # create topic for objects creation
275 topic_create_name
= bucket_name
+TOPIC_SUFFIX
+'_create'
276 topic_create_conf
= PSTopic(ps_zones
[0].conn
, topic_create_name
)
277 topic_create_conf
.set_config()
278 # create topic for objects deletion
279 topic_delete_name
= bucket_name
+TOPIC_SUFFIX
+'_delete'
280 topic_delete_conf
= PSTopic(ps_zones
[0].conn
, topic_delete_name
)
281 topic_delete_conf
.set_config()
282 # create topic for all events
283 topic_name
= bucket_name
+TOPIC_SUFFIX
+'_all'
284 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
285 topic_conf
.set_config()
286 # create bucket on the first of the rados zones
287 bucket
= zones
[0].create_bucket(bucket_name
)
289 zone_meta_checkpoint(ps_zones
[0].zone
)
290 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
291 # create notifications for objects creation
292 notification_create_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
293 topic_create_name
, "OBJECT_CREATE")
294 _
, status
= notification_create_conf
.set_config()
295 assert_equal(status
/100, 2)
296 # create notifications for objects deletion
297 notification_delete_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
298 topic_delete_name
, "OBJECT_DELETE")
299 _
, status
= notification_delete_conf
.set_config()
300 assert_equal(status
/100, 2)
301 # create notifications for all events
302 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
303 topic_name
, "OBJECT_DELETE,OBJECT_CREATE")
304 _
, status
= notification_conf
.set_config()
305 assert_equal(status
/100, 2)
306 # create subscription for objects creation
307 sub_create_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
+'_create',
309 _
, status
= sub_create_conf
.set_config()
310 assert_equal(status
/100, 2)
311 # create subscription for objects deletion
312 sub_delete_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
+'_delete',
314 _
, status
= sub_delete_conf
.set_config()
315 assert_equal(status
/100, 2)
316 # create subscription for all events
317 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
+'_all',
319 _
, status
= sub_conf
.set_config()
320 assert_equal(status
/100, 2)
321 # create objects in the bucket
322 number_of_objects
= 10
323 for i
in range(number_of_objects
):
324 key
= bucket
.new_key(str(i
))
325 key
.set_contents_from_string('bar')
327 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
329 # get the events from the creation subscription
330 result
, _
= sub_create_conf
.get_events()
331 parsed_result
= json
.loads(result
)
332 for event
in parsed_result
['events']:
333 log
.debug('Event (OBJECT_CREATE): objname: "' + str(event
['info']['key']['name']) + \
334 '" type: "' + str(event
['event']) + '"')
335 keys
= list(bucket
.list())
336 # TODO: set exact_match to true
337 verify_events_by_elements(parsed_result
['events'], keys
, exact_match
=False)
338 # get the events from the deletions subscription
339 result
, _
= sub_delete_conf
.get_events()
340 parsed_result
= json
.loads(result
)
341 for event
in parsed_result
['events']:
342 log
.debug('Event (OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) + \
343 '" type: "' + str(event
['event']) + '"')
344 assert_equal(len(parsed_result
['events']), 0)
345 # get the events from the all events subscription
346 result
, _
= sub_conf
.get_events()
347 parsed_result
= json
.loads(result
)
348 for event
in parsed_result
['events']:
349 log
.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + \
350 str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
351 # TODO: set exact_match to true
352 verify_events_by_elements(parsed_result
['events'], keys
, exact_match
=False)
353 # delete objects from the bucket
354 for key
in bucket
.list():
357 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
358 log
.debug("Event (OBJECT_DELETE) synced")
360 # get the events from the creations subscription
361 result
, _
= sub_create_conf
.get_events()
362 parsed_result
= json
.loads(result
)
363 for event
in parsed_result
['events']:
364 log
.debug('Event (OBJECT_CREATE): objname: "' + str(event
['info']['key']['name']) + \
365 '" type: "' + str(event
['event']) + '"')
366 # deletions should not change the creation events
367 # TODO: set exact_match to true
368 verify_events_by_elements(parsed_result
['events'], keys
, exact_match
=False)
369 # get the events from the deletions subscription
370 result
, _
= sub_delete_conf
.get_events()
371 parsed_result
= json
.loads(result
)
372 for event
in parsed_result
['events']:
373 log
.debug('Event (OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) + \
374 '" type: "' + str(event
['event']) + '"')
375 # only deletions should be listed here
376 # TODO: set exact_match to true
377 verify_events_by_elements(parsed_result
['events'], keys
, exact_match
=False, deletions
=True)
378 # get the events from the all events subscription
379 result
, _
= sub_create_conf
.get_events()
380 parsed_result
= json
.loads(result
)
381 for event
in parsed_result
['events']:
382 log
.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event
['info']['key']['name']) + \
383 '" type: "' + str(event
['event']) + '"')
384 # both deletions and creations should be here
385 verify_events_by_elements(parsed_result
['events'], keys
, exact_match
=False, deletions
=False)
386 # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
387 # TODO: (1) test deletions (2) test overall number of events
390 sub_create_conf
.del_config()
391 sub_delete_conf
.del_config()
392 sub_conf
.del_config()
393 notification_create_conf
.del_config()
394 notification_delete_conf
.del_config()
395 notification_conf
.del_config()
396 topic_create_conf
.del_config()
397 topic_delete_conf
.del_config()
398 topic_conf
.del_config()
399 zones
[0].delete_bucket(bucket_name
)
402 def test_ps_event_fetching():
403 """ test incremental fetching of events from a subscription """
404 zones
, ps_zones
= init_env()
405 bucket_name
= gen_bucket_name()
406 topic_name
= bucket_name
+TOPIC_SUFFIX
409 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
410 topic_conf
.set_config()
411 # create bucket on the first of the rados zones
412 bucket
= zones
[0].create_bucket(bucket_name
)
414 zone_meta_checkpoint(ps_zones
[0].zone
)
415 # create notifications
416 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
418 _
, status
= notification_conf
.set_config()
419 assert_equal(status
/100, 2)
420 # create subscription
421 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
423 _
, status
= sub_conf
.set_config()
424 assert_equal(status
/100, 2)
425 # create objects in the bucket
426 number_of_objects
= 100
427 for i
in range(number_of_objects
):
428 key
= bucket
.new_key(str(i
))
429 key
.set_contents_from_string('bar')
431 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
433 total_events_count
= 0
437 # get the events from the subscription
438 result
, _
= sub_conf
.get_events(max_events
, next_marker
)
439 parsed_result
= json
.loads(result
)
440 events
= parsed_result
['events']
441 total_events_count
+= len(events
)
442 all_events
.extend(events
)
443 next_marker
= parsed_result
['next_marker']
445 log
.debug('Event: objname: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
446 if next_marker
== '':
448 keys
= list(bucket
.list())
449 # TODO: set exact_match to true
450 verify_events_by_elements(all_events
, keys
, exact_match
=False)
453 sub_conf
.del_config()
454 notification_conf
.del_config()
455 topic_conf
.del_config()
456 for key
in bucket
.list():
458 zones
[0].delete_bucket(bucket_name
)
461 def test_ps_event_acking():
462 """ test acking of some events in a subscription """
463 zones
, ps_zones
= init_env()
464 bucket_name
= gen_bucket_name()
465 topic_name
= bucket_name
+TOPIC_SUFFIX
468 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
469 topic_conf
.set_config()
470 # create bucket on the first of the rados zones
471 bucket
= zones
[0].create_bucket(bucket_name
)
473 zone_meta_checkpoint(ps_zones
[0].zone
)
474 # create notifications
475 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
477 _
, status
= notification_conf
.set_config()
478 assert_equal(status
/100, 2)
479 # create subscription
480 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
482 _
, status
= sub_conf
.set_config()
483 assert_equal(status
/100, 2)
484 # create objects in the bucket
485 number_of_objects
= 10
486 for i
in range(number_of_objects
):
487 key
= bucket
.new_key(str(i
))
488 key
.set_contents_from_string('bar')
490 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
492 # get the create events from the subscription
493 result
, _
= sub_conf
.get_events()
494 parsed_result
= json
.loads(result
)
495 events
= parsed_result
['events']
496 original_number_of_events
= len(events
)
498 log
.debug('Event (before ack) id: "' + str(event
['id']) + '"')
499 keys
= list(bucket
.list())
500 # TODO: set exact_match to true
501 verify_events_by_elements(events
, keys
, exact_match
=False)
502 # ack half of the events
503 events_to_ack
= number_of_objects
/2
505 if events_to_ack
== 0:
507 _
, status
= sub_conf
.ack_events(event
['id'])
508 assert_equal(status
/100, 2)
511 # verify that acked events are gone
512 result
, _
= sub_conf
.get_events()
513 parsed_result
= json
.loads(result
)
514 for event
in parsed_result
['events']:
515 log
.debug('Event (after ack) id: "' + str(event
['id']) + '"')
516 assert_equal(len(parsed_result
['events']), original_number_of_events
- number_of_objects
/2)
519 sub_conf
.del_config()
520 notification_conf
.del_config()
521 topic_conf
.del_config()
522 for key
in bucket
.list():
524 zones
[0].delete_bucket(bucket_name
)
526 def test_ps_creation_triggers():
527 """ test object creation notifications in using put/copy/post """
528 zones
, ps_zones
= init_env()
529 bucket_name
= gen_bucket_name()
530 topic_name
= bucket_name
+TOPIC_SUFFIX
533 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
534 topic_conf
.set_config()
535 # create bucket on the first of the rados zones
536 bucket
= zones
[0].create_bucket(bucket_name
)
538 zone_meta_checkpoint(ps_zones
[0].zone
)
539 # create notifications
540 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
542 _
, status
= notification_conf
.set_config()
543 assert_equal(status
/100, 2)
544 # create subscription
545 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
547 _
, status
= sub_conf
.set_config()
548 assert_equal(status
/100, 2)
549 # create objects in the bucket using PUT
550 key
= bucket
.new_key('put')
551 key
.set_contents_from_string('bar')
552 # create objects in the bucket using COPY
553 bucket
.copy_key('copy', bucket
.name
, key
.name
)
554 # create objects in the bucket using multi-part upload
555 fp
= tempfile
.TemporaryFile(mode
='w')
558 uploader
= bucket
.initiate_multipart_upload('multipart')
559 fp
= tempfile
.TemporaryFile(mode
='r')
560 uploader
.upload_part_from_file(fp
, 1)
561 uploader
.complete_upload()
564 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
566 # get the create events from the subscription
567 result
, _
= sub_conf
.get_events()
568 parsed_result
= json
.loads(result
)
569 for event
in parsed_result
['events']:
570 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
572 # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart'
573 assert len(parsed_result
['events']) >= 3
575 sub_conf
.del_config()
576 notification_conf
.del_config()
577 topic_conf
.del_config()
578 for key
in bucket
.list():
580 zones
[0].delete_bucket(bucket_name
)
583 def test_ps_versioned_deletion():
584 """ test notification of deletion markers """
585 zones
, ps_zones
= init_env()
586 bucket_name
= gen_bucket_name()
587 topic_name
= bucket_name
+TOPIC_SUFFIX
590 topic_conf
= PSTopic(ps_zones
[0].conn
, topic_name
)
591 topic_conf
.set_config()
592 # create bucket on the first of the rados zones
593 bucket
= zones
[0].create_bucket(bucket_name
)
594 bucket
.configure_versioning(True)
596 zone_meta_checkpoint(ps_zones
[0].zone
)
597 # create notifications
598 notification_conf
= PSNotification(ps_zones
[0].conn
, bucket_name
,
599 topic_name
, "OBJECT_DELETE")
600 _
, status
= notification_conf
.set_config()
601 assert_equal(status
/100, 2)
602 # create subscription
603 sub_conf
= PSSubscription(ps_zones
[0].conn
, bucket_name
+SUB_SUFFIX
,
605 _
, status
= sub_conf
.set_config()
606 assert_equal(status
/100, 2)
607 # create objects in the bucket
608 key
= bucket
.new_key('foo')
609 key
.set_contents_from_string('bar')
611 key
.set_contents_from_string('kaboom')
614 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
616 bucket
.delete_key(key
.name
, version_id
=v2
)
617 bucket
.delete_key(key
.name
, version_id
=v1
)
619 zone_bucket_checkpoint(ps_zones
[0].zone
, zones
[0].zone
, bucket_name
)
621 # get the create events from the subscription
622 result
, _
= sub_conf
.get_events()
623 parsed_result
= json
.loads(result
)
624 for event
in parsed_result
['events']:
625 log
.debug('Event key: "' + str(event
['info']['key']['name']) + '" type: "' + str(event
['event']) + '"')
627 # TODO: verify the specific events
628 assert len(parsed_result
['events']) >= 2
631 sub_conf
.del_config()
632 notification_conf
.del_config()
633 topic_conf
.del_config()
634 zones
[0].delete_bucket(bucket_name
)