]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/tests.py
9 from itertools
import izip_longest
as zip_longest
11 from itertools
import zip_longest
12 from itertools
import combinations
15 import boto
.s3
.connection
16 from boto
.s3
.website
import WebsiteConfiguration
17 from boto
.s3
.cors
import CORSConfiguration
19 from nose
.tools
import eq_
as eq
20 from nose
.plugins
.attrib
import attr
21 from nose
.plugins
.skip
import SkipTest
23 from .multisite
import Zone
25 from .conn
import get_gateway_connection
28 """ test configuration """
29 def __init__(self
, **kwargs
):
30 # by default, wait up to 5 minutes before giving up on a sync checkpoint
31 self
.checkpoint_retries
= kwargs
.get('checkpoint_retries', 60)
32 self
.checkpoint_delay
= kwargs
.get('checkpoint_delay', 5)
33 # allow some time for realm reconfiguration after changing master zone
34 self
.reconfigure_delay
= kwargs
.get('reconfigure_delay', 5)
36 # rgw multisite tests, written against the interfaces provided in rgw_multi.
37 # these tests must be initialized and run by another module that provides
38 # implementations of these interfaces by calling init_multi()
42 def init_multi(_realm
, _user
, _config
=None):
48 config
= _config
or Config()
49 realm_meta_checkpoint(realm
)
54 log
= logging
.getLogger(__name__
)
57 run_prefix
=''.join(random
.choice(string
.ascii_lowercase
) for _
in range(6))
59 def get_gateway_connection(gateway
, credentials
):
60 """ connect to the given gateway """
61 if gateway
.connection
is None:
62 gateway
.connection
= boto
.connect_s3(
63 aws_access_key_id
= credentials
.access_key
,
64 aws_secret_access_key
= credentials
.secret
,
68 calling_format
= boto
.s3
.connection
.OrdinaryCallingFormat())
69 return gateway
.connection
71 def get_zone_connection(zone
, credentials
):
72 """ connect to the zone's first gateway """
73 if isinstance(credentials
, list):
74 credentials
= credentials
[0]
75 return get_gateway_connection(zone
.gateways
[0], credentials
)
77 def mdlog_list(zone
, period
= None):
78 cmd
= ['mdlog', 'list']
80 cmd
+= ['--period', period
]
81 (mdlog_json
, _
) = zone
.cluster
.admin(cmd
, read_only
=True)
82 mdlog_json
= mdlog_json
.decode('utf-8')
83 return json
.loads(mdlog_json
)
85 def meta_sync_status(zone
):
87 cmd
= ['metadata', 'sync', 'status'] + zone
.zone_args()
88 meta_sync_status_json
, retcode
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
91 assert(retcode
== 2) # ENOENT
94 def mdlog_autotrim(zone
):
95 zone
.cluster
.admin(['mdlog', 'autotrim'])
97 def parse_meta_sync_status(meta_sync_status_json
):
98 meta_sync_status_json
= meta_sync_status_json
.decode('utf-8')
99 log
.debug('current meta sync status=%s', meta_sync_status_json
)
100 sync_status
= json
.loads(meta_sync_status_json
)
102 sync_info
= sync_status
['sync_status']['info']
103 global_sync_status
= sync_info
['status']
104 num_shards
= sync_info
['num_shards']
105 period
= sync_info
['period']
106 realm_epoch
= sync_info
['realm_epoch']
108 sync_markers
=sync_status
['sync_status']['markers']
109 log
.debug('sync_markers=%s', sync_markers
)
110 assert(num_shards
== len(sync_markers
))
113 for i
in range(num_shards
):
114 # get marker, only if it's an incremental marker for the same realm epoch
115 if realm_epoch
> sync_markers
[i
]['val']['realm_epoch'] or sync_markers
[i
]['val']['state'] == 0:
118 markers
[i
] = sync_markers
[i
]['val']['marker']
120 return period
, realm_epoch
, num_shards
, markers
122 def meta_sync_status(zone
):
123 for _
in range(config
.checkpoint_retries
):
124 cmd
= ['metadata', 'sync', 'status'] + zone
.zone_args()
125 meta_sync_status_json
, retcode
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
127 return parse_meta_sync_status(meta_sync_status_json
)
128 assert(retcode
== 2) # ENOENT
129 time
.sleep(config
.checkpoint_delay
)
131 assert False, 'failed to read metadata sync status for zone=%s' % zone
.name
133 def meta_master_log_status(master_zone
):
134 cmd
= ['mdlog', 'status'] + master_zone
.zone_args()
135 mdlog_status_json
, retcode
= master_zone
.cluster
.admin(cmd
, read_only
=True)
136 mdlog_status
= json
.loads(mdlog_status_json
.decode('utf-8'))
138 markers
= {i
: s
['marker'] for i
, s
in enumerate(mdlog_status
)}
139 log
.debug('master meta markers=%s', markers
)
142 def compare_meta_status(zone
, log_status
, sync_status
):
143 if len(log_status
) != len(sync_status
):
144 log
.error('len(log_status)=%d, len(sync_status)=%d', len(log_status
), len(sync_status
))
148 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
152 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
155 log
.warning('zone %s behind master: %s', zone
.name
, msg
)
160 def zone_meta_checkpoint(zone
, meta_master_zone
= None, master_status
= None):
161 if not meta_master_zone
:
162 meta_master_zone
= zone
.realm().meta_master_zone()
163 if not master_status
:
164 master_status
= meta_master_log_status(meta_master_zone
)
166 current_realm_epoch
= realm
.current_period
.data
['realm_epoch']
168 log
.info('starting meta checkpoint for zone=%s', zone
.name
)
170 for _
in range(config
.checkpoint_retries
):
171 period
, realm_epoch
, num_shards
, sync_status
= meta_sync_status(zone
)
172 if realm_epoch
< current_realm_epoch
:
173 log
.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
174 zone
.name
, realm_epoch
, current_realm_epoch
)
176 log
.debug('log_status=%s', master_status
)
177 log
.debug('sync_status=%s', sync_status
)
178 if compare_meta_status(zone
, master_status
, sync_status
):
179 log
.info('finish meta checkpoint for zone=%s', zone
.name
)
182 time
.sleep(config
.checkpoint_delay
)
183 assert False, 'failed meta checkpoint for zone=%s' % zone
.name
185 def zonegroup_meta_checkpoint(zonegroup
, meta_master_zone
= None, master_status
= None):
186 if not meta_master_zone
:
187 meta_master_zone
= zonegroup
.realm().meta_master_zone()
188 if not master_status
:
189 master_status
= meta_master_log_status(meta_master_zone
)
191 for zone
in zonegroup
.zones
:
192 if zone
== meta_master_zone
:
194 zone_meta_checkpoint(zone
, meta_master_zone
, master_status
)
196 def realm_meta_checkpoint(realm
):
197 log
.info('meta checkpoint')
199 meta_master_zone
= realm
.meta_master_zone()
200 master_status
= meta_master_log_status(meta_master_zone
)
202 for zonegroup
in realm
.current_period
.zonegroups
:
203 zonegroup_meta_checkpoint(zonegroup
, meta_master_zone
, master_status
)
205 def parse_data_sync_status(data_sync_status_json
):
206 data_sync_status_json
= data_sync_status_json
.decode('utf-8')
207 log
.debug('current data sync status=%s', data_sync_status_json
)
208 sync_status
= json
.loads(data_sync_status_json
)
210 global_sync_status
=sync_status
['sync_status']['info']['status']
211 num_shards
=sync_status
['sync_status']['info']['num_shards']
213 sync_markers
=sync_status
['sync_status']['markers']
214 log
.debug('sync_markers=%s', sync_markers
)
215 assert(num_shards
== len(sync_markers
))
218 for i
in range(num_shards
):
219 markers
[i
] = sync_markers
[i
]['val']['marker']
221 return (num_shards
, markers
)
223 def data_sync_status(target_zone
, source_zone
):
224 if target_zone
== source_zone
:
227 for _
in range(config
.checkpoint_retries
):
228 cmd
= ['data', 'sync', 'status'] + target_zone
.zone_args()
229 cmd
+= ['--source-zone', source_zone
.name
]
230 data_sync_status_json
, retcode
= target_zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
232 return parse_data_sync_status(data_sync_status_json
)
234 assert(retcode
== 2) # ENOENT
235 time
.sleep(config
.checkpoint_delay
)
237 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
238 (target_zone
.name
, source_zone
.name
)
240 def bucket_sync_status(target_zone
, source_zone
, bucket_name
):
241 if target_zone
== source_zone
:
244 cmd
= ['bucket', 'sync', 'status'] + target_zone
.zone_args()
245 cmd
+= ['--source-zone', source_zone
.name
]
246 cmd
+= ['--bucket', bucket_name
]
248 bucket_sync_status_json
, retcode
= target_zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
252 assert(retcode
== 2) # ENOENT
254 bucket_sync_status_json
= bucket_sync_status_json
.decode('utf-8')
255 log
.debug('current bucket sync status=%s', bucket_sync_status_json
)
256 sync_status
= json
.loads(bucket_sync_status_json
)
259 for entry
in sync_status
:
261 if val
['status'] == 'incremental-sync':
262 pos
= val
['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
265 markers
[entry
['key']] = pos
269 def data_source_log_status(source_zone
):
270 source_cluster
= source_zone
.cluster
271 cmd
= ['datalog', 'status'] + source_zone
.zone_args()
272 datalog_status_json
, retcode
= source_cluster
.rgw_admin(cmd
, read_only
=True)
273 datalog_status
= json
.loads(datalog_status_json
.decode('utf-8'))
275 markers
= {i
: s
['marker'] for i
, s
in enumerate(datalog_status
)}
276 log
.debug('data markers for zone=%s markers=%s', source_zone
.name
, markers
)
279 def bucket_source_log_status(source_zone
, bucket_name
):
280 cmd
= ['bilog', 'status'] + source_zone
.zone_args()
281 cmd
+= ['--bucket', bucket_name
]
282 source_cluster
= source_zone
.cluster
283 bilog_status_json
, retcode
= source_cluster
.admin(cmd
, read_only
=True)
284 bilog_status
= json
.loads(bilog_status_json
.decode('utf-8'))
289 m
= bilog_status
['markers']
298 log
.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone
.name
, bucket_name
, markers
)
301 def compare_data_status(target_zone
, source_zone
, log_status
, sync_status
):
302 if len(log_status
) != len(sync_status
):
303 log
.error('len(log_status)=%d len(sync_status)=%d', len(log_status
), len(sync_status
))
307 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
311 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
314 log
.warning('data of zone %s behind zone %s: %s', target_zone
.name
, source_zone
.name
, msg
)
319 def compare_bucket_status(target_zone
, source_zone
, bucket_name
, log_status
, sync_status
):
320 if len(log_status
) != len(sync_status
):
321 log
.error('len(log_status)=%d len(sync_status)=%d', len(log_status
), len(sync_status
))
325 for i
, l
, s
in zip(log_status
, log_status
.values(), sync_status
.values()):
329 msg
+= 'shard=' + str(i
) + ' master=' + l
+ ' target=' + s
332 log
.warning('bucket %s zone %s behind zone %s: %s', bucket_name
, target_zone
.name
, source_zone
.name
, msg
)
337 def zone_data_checkpoint(target_zone
, source_zone_conn
):
338 if target_zone
== source_zone
:
341 log_status
= data_source_log_status(source_zone
)
342 log
.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone
.name
, source_zone
.name
)
344 for _
in range(config
.checkpoint_retries
):
345 num_shards
, sync_status
= data_sync_status(target_zone
, source_zone
)
347 log
.debug('log_status=%s', log_status
)
348 log
.debug('sync_status=%s', sync_status
)
350 if compare_data_status(target_zone
, source_zone
, log_status
, sync_status
):
351 log
.info('finished data checkpoint for target_zone=%s source_zone=%s',
352 target_zone
.name
, source_zone
.name
)
354 time
.sleep(config
.checkpoint_delay
)
356 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
357 (target_zone
.name
, source_zone
.name
)
360 def zone_bucket_checkpoint(target_zone
, source_zone
, bucket_name
):
361 if target_zone
== source_zone
:
364 log_status
= bucket_source_log_status(source_zone
, bucket_name
)
365 log
.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone
.name
, source_zone
.name
, bucket_name
)
367 for _
in range(config
.checkpoint_retries
):
368 sync_status
= bucket_sync_status(target_zone
, source_zone
, bucket_name
)
370 log
.debug('log_status=%s', log_status
)
371 log
.debug('sync_status=%s', sync_status
)
373 if compare_bucket_status(target_zone
, source_zone
, bucket_name
, log_status
, sync_status
):
374 log
.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone
.name
, source_zone
.name
, bucket_name
)
377 time
.sleep(config
.checkpoint_delay
)
379 assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
380 (target_zone
.name
, source_zone
.name
, bucket_name
)
382 def zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
):
383 for source_conn
in zonegroup_conns
.zones
:
384 for target_conn
in zonegroup_conns
.zones
:
385 if source_conn
.zone
== target_conn
.zone
:
387 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket_name
)
388 target_conn
.check_bucket_eq(source_conn
, bucket_name
)
390 def set_master_zone(zone
):
391 zone
.modify(zone
.cluster
, ['--master'])
392 zonegroup
= zone
.zonegroup
393 zonegroup
.period
.update(zone
, commit
=True)
394 zonegroup
.master_zone
= zone
395 log
.info('Set master zone=%s, waiting %ds for reconfiguration..', zone
.name
, config
.reconfigure_delay
)
396 time
.sleep(config
.reconfigure_delay
)
398 def enable_bucket_sync(zone
, bucket_name
):
399 cmd
= ['bucket', 'sync', 'enable', '--bucket', bucket_name
] + zone
.zone_args()
400 zone
.cluster
.admin(cmd
)
402 def disable_bucket_sync(zone
, bucket_name
):
403 cmd
= ['bucket', 'sync', 'disable', '--bucket', bucket_name
] + zone
.zone_args()
404 zone
.cluster
.admin(cmd
)
406 def check_buckets_sync_status_obj_not_exist(zone
, buckets
):
407 for _
in range(config
.checkpoint_retries
):
408 cmd
= ['log', 'list'] + zone
.zone_arg()
409 log_list
, ret
= zone
.cluster
.admin(cmd
, check_retcode
=False, read_only
=True)
410 for bucket
in buckets
:
411 if log_list
.find(':'+bucket
+":") >= 0:
415 time
.sleep(config
.checkpoint_delay
)
418 def gen_bucket_name():
422 return run_prefix
+ '-' + str(num_buckets
)
424 class ZonegroupConns
:
425 def __init__(self
, zonegroup
):
426 self
.zonegroup
= zonegroup
430 self
.master_zone
= None
431 for z
in zonegroup
.zones
:
432 zone_conn
= z
.get_conn(user
.credentials
)
433 self
.zones
.append(zone_conn
)
435 self
.ro_zones
.append(zone_conn
)
437 self
.rw_zones
.append(zone_conn
)
439 if z
== zonegroup
.master_zone
:
440 self
.master_zone
= zone_conn
442 def check_all_buckets_exist(zone_conn
, buckets
):
443 if not zone_conn
.zone
.has_buckets():
448 zone_conn
.get_bucket(b
)
450 log
.critical('zone %s does not contain bucket %s', zone
.name
, b
)
455 def check_all_buckets_dont_exist(zone_conn
, buckets
):
456 if not zone_conn
.zone
.has_buckets():
461 zone_conn
.get_bucket(b
)
465 log
.critical('zone %s contains bucket %s', zone
.zone
, b
)
470 def create_bucket_per_zone(zonegroup_conns
, buckets_per_zone
= 1):
473 for zone
in zonegroup_conns
.rw_zones
:
474 for i
in xrange(buckets_per_zone
):
475 bucket_name
= gen_bucket_name()
476 log
.info('create bucket zone=%s name=%s', zone
.name
, bucket_name
)
477 bucket
= zone
.create_bucket(bucket_name
)
478 buckets
.append(bucket_name
)
479 zone_bucket
.append((zone
, bucket
))
481 return buckets
, zone_bucket
483 def create_bucket_per_zone_in_realm():
486 for zonegroup
in realm
.current_period
.zonegroups
:
487 zg_conn
= ZonegroupConns(zonegroup
)
488 b
, z
= create_bucket_per_zone(zg_conn
)
490 zone_bucket
.extend(z
)
491 return buckets
, zone_bucket
493 def test_bucket_create():
494 zonegroup
= realm
.master_zonegroup()
495 zonegroup_conns
= ZonegroupConns(zonegroup
)
496 buckets
, _
= create_bucket_per_zone(zonegroup_conns
)
497 zonegroup_meta_checkpoint(zonegroup
)
499 for zone
in zonegroup_conns
.zones
:
500 assert check_all_buckets_exist(zone
, buckets
)
502 def test_bucket_recreate():
503 zonegroup
= realm
.master_zonegroup()
504 zonegroup_conns
= ZonegroupConns(zonegroup
)
505 buckets
, _
= create_bucket_per_zone(zonegroup_conns
)
506 zonegroup_meta_checkpoint(zonegroup
)
509 for zone
in zonegroup_conns
.zones
:
510 assert check_all_buckets_exist(zone
, buckets
)
512 # recreate buckets on all zones, make sure they weren't removed
513 for zone
in zonegroup_conns
.rw_zones
:
514 for bucket_name
in buckets
:
515 bucket
= zone
.create_bucket(bucket_name
)
517 for zone
in zonegroup_conns
.zones
:
518 assert check_all_buckets_exist(zone
, buckets
)
520 zonegroup_meta_checkpoint(zonegroup
)
522 for zone
in zonegroup_conns
.zones
:
523 assert check_all_buckets_exist(zone
, buckets
)
525 def test_bucket_remove():
526 zonegroup
= realm
.master_zonegroup()
527 zonegroup_conns
= ZonegroupConns(zonegroup
)
528 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
529 zonegroup_meta_checkpoint(zonegroup
)
531 for zone
in zonegroup_conns
.zones
:
532 assert check_all_buckets_exist(zone
, buckets
)
534 for zone
, bucket_name
in zone_bucket
:
535 zone
.conn
.delete_bucket(bucket_name
)
537 zonegroup_meta_checkpoint(zonegroup
)
539 for zone
in zonegroup_conns
.zones
:
540 assert check_all_buckets_dont_exist(zone
, buckets
)
542 def get_bucket(zone
, bucket_name
):
543 return zone
.conn
.get_bucket(bucket_name
)
545 def get_key(zone
, bucket_name
, obj_name
):
546 b
= get_bucket(zone
, bucket_name
)
547 return b
.get_key(obj_name
)
549 def new_key(zone
, bucket_name
, obj_name
):
550 b
= get_bucket(zone
, bucket_name
)
551 return b
.new_key(obj_name
)
553 def check_bucket_eq(zone_conn1
, zone_conn2
, bucket
):
554 return zone_conn2
.check_bucket_eq(zone_conn1
, bucket
.name
)
556 def test_object_sync():
557 zonegroup
= realm
.master_zonegroup()
558 zonegroup_conns
= ZonegroupConns(zonegroup
)
559 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
561 objnames
= [ 'myobj', '_myobj', ':', '&' ]
564 # don't wait for meta sync just yet
565 for zone
, bucket_name
in zone_bucket
:
566 for objname
in objnames
:
567 k
= new_key(zone
, bucket_name
, objname
)
568 k
.set_contents_from_string(content
)
570 zonegroup_meta_checkpoint(zonegroup
)
572 for source_conn
, bucket
in zone_bucket
:
573 for target_conn
in zonegroup_conns
.zones
:
574 if source_conn
.zone
== target_conn
.zone
:
577 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
578 check_bucket_eq(source_conn
, target_conn
, bucket
)
580 def test_object_delete():
581 zonegroup
= realm
.master_zonegroup()
582 zonegroup_conns
= ZonegroupConns(zonegroup
)
583 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
588 # don't wait for meta sync just yet
589 for zone
, bucket
in zone_bucket
:
590 k
= new_key(zone
, bucket
, objname
)
591 k
.set_contents_from_string(content
)
593 zonegroup_meta_checkpoint(zonegroup
)
595 # check object exists
596 for source_conn
, bucket
in zone_bucket
:
597 for target_conn
in zonegroup_conns
.zones
:
598 if source_conn
.zone
== target_conn
.zone
:
601 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
602 check_bucket_eq(source_conn
, target_conn
, bucket
)
604 # check object removal
605 for source_conn
, bucket
in zone_bucket
:
606 k
= get_key(source_conn
, bucket
, objname
)
608 for target_conn
in zonegroup_conns
.zones
:
609 if source_conn
.zone
== target_conn
.zone
:
612 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
613 check_bucket_eq(source_conn
, target_conn
, bucket
)
615 def get_latest_object_version(key
):
616 for k
in key
.bucket
.list_versions(key
.name
):
621 def test_versioned_object_incremental_sync():
622 zonegroup
= realm
.master_zonegroup()
623 zonegroup_conns
= ZonegroupConns(zonegroup
)
624 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
627 for _
, bucket
in zone_bucket
:
628 bucket
.configure_versioning(True)
630 zonegroup_meta_checkpoint(zonegroup
)
632 # upload a dummy object to each bucket and wait for sync. this forces each
633 # bucket to finish a full sync and switch to incremental
634 for source_conn
, bucket
in zone_bucket
:
635 new_key(source_conn
, bucket
, 'dummy').set_contents_from_string('')
636 for target_conn
in zonegroup_conns
.zones
:
637 if source_conn
.zone
== target_conn
.zone
:
639 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
641 for _
, bucket
in zone_bucket
:
642 # create and delete multiple versions of an object from each zone
643 for zone_conn
in zonegroup_conns
.rw_zones
:
644 obj
= 'obj-' + zone_conn
.name
645 k
= new_key(zone_conn
, bucket
, obj
)
647 k
.set_contents_from_string('version1')
648 v
= get_latest_object_version(k
)
649 log
.debug('version1 id=%s', v
.version_id
)
650 # don't delete version1 - this tests that the initial version
651 # doesn't get squashed into later versions
653 # create and delete the following object versions to test that
654 # the operations don't race with each other during sync
655 k
.set_contents_from_string('version2')
656 v
= get_latest_object_version(k
)
657 log
.debug('version2 id=%s', v
.version_id
)
658 k
.bucket
.delete_key(obj
, version_id
=v
.version_id
)
660 k
.set_contents_from_string('version3')
661 v
= get_latest_object_version(k
)
662 log
.debug('version3 id=%s', v
.version_id
)
663 k
.bucket
.delete_key(obj
, version_id
=v
.version_id
)
665 for source_conn
, bucket
in zone_bucket
:
666 for target_conn
in zonegroup_conns
.zones
:
667 if source_conn
.zone
== target_conn
.zone
:
669 zone_bucket_checkpoint(target_conn
.zone
, source_conn
.zone
, bucket
.name
)
670 check_bucket_eq(source_conn
, target_conn
, bucket
)
672 def test_bucket_versioning():
673 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
674 for _
, bucket
in zone_bucket
:
675 bucket
.configure_versioning(True)
676 res
= bucket
.get_versioning_status()
678 assert(key
in res
and res
[key
] == 'Enabled')
680 def test_bucket_acl():
681 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
682 for _
, bucket
in zone_bucket
:
683 assert(len(bucket
.get_acl().acl
.grants
) == 1) # single grant on owner
684 bucket
.set_acl('public-read')
685 assert(len(bucket
.get_acl().acl
.grants
) == 2) # new grant on AllUsers
687 def test_bucket_cors():
688 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
689 for _
, bucket
in zone_bucket
:
690 cors_cfg
= CORSConfiguration()
691 cors_cfg
.add_rule(['DELETE'], 'https://www.example.com', allowed_header
='*', max_age_seconds
=3000)
692 bucket
.set_cors(cors_cfg
)
693 assert(bucket
.get_cors().to_xml() == cors_cfg
.to_xml())
695 def test_bucket_delete_notempty():
696 zonegroup
= realm
.master_zonegroup()
697 zonegroup_conns
= ZonegroupConns(zonegroup
)
698 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
699 zonegroup_meta_checkpoint(zonegroup
)
701 for zone_conn
, bucket_name
in zone_bucket
:
702 # upload an object to each bucket on its own zone
703 conn
= zone_conn
.get_connection()
704 bucket
= conn
.get_bucket(bucket_name
)
705 k
= bucket
.new_key('foo')
706 k
.set_contents_from_string('bar')
707 # attempt to delete the bucket before this object can sync
709 conn
.delete_bucket(bucket_name
)
710 except boto
.exception
.S3ResponseError
as e
:
711 assert(e
.error_code
== 'BucketNotEmpty')
713 assert False # expected 409 BucketNotEmpty
715 # assert that each bucket still exists on the master
716 c1
= zonegroup_conns
.master_zone
.conn
717 for _
, bucket_name
in zone_bucket
:
718 assert c1
.get_bucket(bucket_name
)
720 def test_multi_period_incremental_sync():
721 zonegroup
= realm
.master_zonegroup()
722 if len(zonegroup
.zones
) < 3:
723 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
725 # periods to include in mdlog comparison
726 mdlog_periods
= [realm
.current_period
.id]
728 # create a bucket in each zone
729 zonegroup_conns
= ZonegroupConns(zonegroup
)
730 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
732 zonegroup_meta_checkpoint(zonegroup
)
734 z1
, z2
, z3
= zonegroup
.zones
[0:3]
735 assert(z1
== zonegroup
.master_zone
)
737 # kill zone 3 gateways to freeze sync status to incremental in first period
740 # change master to zone 2 -> period 2
742 mdlog_periods
+= [realm
.current_period
.id]
744 for zone_conn
, _
in zone_bucket
:
745 if zone_conn
.zone
== z3
:
747 bucket_name
= gen_bucket_name()
748 log
.info('create bucket zone=%s name=%s', zone_conn
.name
, bucket_name
)
749 bucket
= zone_conn
.conn
.create_bucket(bucket_name
)
750 buckets
.append(bucket_name
)
752 # wait for zone 1 to sync
753 zone_meta_checkpoint(z1
)
755 # change master back to zone 1 -> period 3
757 mdlog_periods
+= [realm
.current_period
.id]
759 for zone_conn
, bucket_name
in zone_bucket
:
760 if zone_conn
.zone
== z3
:
762 bucket_name
= gen_bucket_name()
763 log
.info('create bucket zone=%s name=%s', zone_conn
.name
, bucket_name
)
764 bucket
= zone_conn
.conn
.create_bucket(bucket_name
)
765 buckets
.append(bucket_name
)
767 # restart zone 3 gateway and wait for sync
769 zonegroup_meta_checkpoint(zonegroup
)
771 # verify that we end up with the same objects
772 for bucket_name
in buckets
:
773 for source_conn
, _
in zone_bucket
:
774 for target_conn
in zonegroup_conns
.zones
:
775 if source_conn
.zone
== target_conn
.zone
:
778 target_conn
.check_bucket_eq(source_conn
, bucket_name
)
780 # verify that mdlogs are not empty and match for each period
781 for period
in mdlog_periods
:
782 master_mdlog
= mdlog_list(z1
, period
)
783 assert len(master_mdlog
) > 0
784 for zone
in zonegroup
.zones
:
787 mdlog
= mdlog_list(zone
, period
)
788 assert len(mdlog
) == len(master_mdlog
)
790 # autotrim mdlogs for master zone
793 # autotrim mdlogs for peers
794 for zone
in zonegroup
.zones
:
799 # verify that mdlogs are empty for each period
800 for period
in mdlog_periods
:
801 for zone
in zonegroup
.zones
:
802 mdlog
= mdlog_list(zone
, period
)
803 assert len(mdlog
) == 0
805 def test_zonegroup_remove():
806 zonegroup
= realm
.master_zonegroup()
807 zonegroup_conns
= ZonegroupConns(zonegroup
)
808 if len(zonegroup
.zones
) < 2:
809 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
811 zonegroup_meta_checkpoint(zonegroup
)
812 z1
, z2
= zonegroup
.zones
[0:2]
813 c1
, c2
= (z1
.cluster
, z2
.cluster
)
815 # create a new zone in zonegroup on c2 and commit
816 zone
= Zone('remove', zonegroup
, c2
)
818 zonegroup
.zones
.append(zone
)
819 zonegroup
.period
.update(zone
, commit
=True)
821 zonegroup
.remove(c1
, zone
)
823 # another 'zonegroup remove' should fail with ENOENT
824 _
, retcode
= zonegroup
.remove(c1
, zone
, check_retcode
=False)
825 assert(retcode
== 2) # ENOENT
827 # delete the new zone
830 # validate the resulting period
831 zonegroup
.period
.update(z1
, commit
=True)
833 def test_set_bucket_website():
834 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
835 for _
, bucket
in zone_bucket
:
836 website_cfg
= WebsiteConfiguration(suffix
='index.html',error_key
='error.html')
838 bucket
.set_website_configuration(website_cfg
)
839 except boto
.exception
.S3ResponseError
as e
:
840 if e
.error_code
== 'MethodNotAllowed':
841 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
842 assert(bucket
.get_website_configuration_with_xml()[1] == website_cfg
.to_xml())
844 def test_set_bucket_policy():
846 "Version": "2012-10-17",
852 buckets
, zone_bucket
= create_bucket_per_zone_in_realm()
853 for _
, bucket
in zone_bucket
:
854 bucket
.set_policy(policy
)
855 assert(bucket
.get_policy() == policy
)
857 def test_bucket_sync_disable():
858 zonegroup
= realm
.master_zonegroup()
859 zonegroup_conns
= ZonegroupConns(zonegroup
)
860 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
862 for bucket_name
in buckets
:
863 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
865 for zone
in zonegroup
.zones
:
866 check_buckets_sync_status_obj_not_exist(zone
, buckets
)
868 def test_bucket_sync_enable_right_after_disable():
869 zonegroup
= realm
.master_zonegroup()
870 zonegroup_conns
= ZonegroupConns(zonegroup
)
871 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
873 objnames
= ['obj1', 'obj2', 'obj3', 'obj4']
876 for zone
, bucket
in zone_bucket
:
877 for objname
in objnames
:
878 k
= new_key(zone
, bucket
.name
, objname
)
879 k
.set_contents_from_string(content
)
881 for bucket_name
in buckets
:
882 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
884 for bucket_name
in buckets
:
885 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
886 enable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
888 objnames_2
= ['obj5', 'obj6', 'obj7', 'obj8']
890 for zone
, bucket
in zone_bucket
:
891 for objname
in objnames_2
:
892 k
= new_key(zone
, bucket
.name
, objname
)
893 k
.set_contents_from_string(content
)
895 for bucket_name
in buckets
:
896 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
898 def test_bucket_sync_disable_enable():
899 zonegroup
= realm
.master_zonegroup()
900 zonegroup_conns
= ZonegroupConns(zonegroup
)
901 buckets
, zone_bucket
= create_bucket_per_zone(zonegroup_conns
)
903 objnames
= [ 'obj1', 'obj2', 'obj3', 'obj4' ]
906 for zone
, bucket
in zone_bucket
:
907 for objname
in objnames
:
908 k
= new_key(zone
, bucket
.name
, objname
)
909 k
.set_contents_from_string(content
)
911 zonegroup_meta_checkpoint(zonegroup
)
913 for bucket_name
in buckets
:
914 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
916 for bucket_name
in buckets
:
917 disable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
919 zonegroup_meta_checkpoint(zonegroup
)
921 objnames_2
= [ 'obj5', 'obj6', 'obj7', 'obj8' ]
923 for zone
, bucket
in zone_bucket
:
924 for objname
in objnames_2
:
925 k
= new_key(zone
, bucket
.name
, objname
)
926 k
.set_contents_from_string(content
)
928 for bucket_name
in buckets
:
929 enable_bucket_sync(realm
.meta_master_zone(), bucket_name
)
931 for bucket_name
in buckets
:
932 zonegroup_bucket_checkpoint(zonegroup_conns
, bucket_name
)
934 def test_encrypted_object_sync():
935 zonegroup
= realm
.master_zonegroup()
936 zonegroup_conns
= ZonegroupConns(zonegroup
)
938 (zone1
, zone2
) = zonegroup_conns
.rw_zones
[0:2]
940 # create a bucket on the first zone
941 bucket_name
= gen_bucket_name()
942 log
.info('create bucket zone=%s name=%s', zone1
.name
, bucket_name
)
943 bucket
= zone1
.conn
.create_bucket(bucket_name
)
945 # upload an object with sse-c encryption
947 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
948 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
949 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
951 key
= bucket
.new_key('testobj-sse-c')
953 key
.set_contents_from_string(data
, headers
=sse_c_headers
)
955 # upload an object with sse-kms encryption
957 'x-amz-server-side-encryption': 'aws:kms',
958 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
959 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
961 key
= bucket
.new_key('testobj-sse-kms')
962 key
.set_contents_from_string(data
, headers
=sse_kms_headers
)
964 # wait for the bucket metadata and data to sync
965 zonegroup_meta_checkpoint(zonegroup
)
966 zone_bucket_checkpoint(zone2
.zone
, zone1
.zone
, bucket_name
)
968 # read the encrypted objects from the second zone
969 bucket2
= get_bucket(zone2
, bucket_name
)
970 key
= bucket2
.get_key('testobj-sse-c', headers
=sse_c_headers
)
971 eq(data
, key
.get_contents_as_string(headers
=sse_c_headers
))
973 key
= bucket2
.get_key('testobj-sse-kms')
974 eq(data
, key
.get_contents_as_string())