import sys
import time
import logging
+import errno
try:
from itertools import izip_longest as zip_longest
except ImportError:
from itertools import zip_longest
from itertools import combinations
-from cStringIO import StringIO
+from six import StringIO
import boto
import boto.s3.connection
from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
-from .multisite import Zone
+from .multisite import Zone, ZoneGroup, Credentials
from .conn import get_gateway_connection
+from .tools import assert_raises
class Config:
""" test configuration """
self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
# allow some time for realm reconfiguration after changing master zone
self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
+ self.tenant = kwargs.get('tenant', '')
# rgw multisite tests, written against the interfaces provided in rgw_multi.
# these tests must be initialized and run by another module that provides
config = _config or Config()
realm_meta_checkpoint(realm)
+def get_user():
+ return user.id if user is not None else ''
+
+def get_tenant():
+ return config.tenant if config is not None and config.tenant is not None else ''
+
def get_realm():
return realm
-log = logging.getLogger(__name__)
+log = logging.getLogger('rgw_multi.tests')
num_buckets = 0
run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
def mdlog_autotrim(zone):
zone.cluster.admin(['mdlog', 'autotrim'])
+def datalog_list(zone, period = None):
+ cmd = ['datalog', 'list']
+ (datalog_json, _) = zone.cluster.admin(cmd, read_only=True)
+ datalog_json = datalog_json.decode('utf-8')
+ return json.loads(datalog_json)
+
+def datalog_autotrim(zone):
+ zone.cluster.admin(['datalog', 'autotrim'])
+
+def bilog_list(zone, bucket, args = None):
+ cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
+ bilog, _ = zone.cluster.admin(cmd, read_only=True)
+ bilog = bilog.decode('utf-8')
+ return json.loads(bilog)
+
+def bilog_autotrim(zone, args = None):
+ zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
+
def parse_meta_sync_status(meta_sync_status_json):
meta_sync_status_json = meta_sync_status_json.decode('utf-8')
log.debug('current meta sync status=%s', meta_sync_status_json)
if target_zone == source_zone:
return None
- cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args()
+ cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
cmd += ['--source-zone', source_zone.name]
cmd += ['--bucket', bucket_name]
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
while True:
bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
if retcode == 0:
assert(retcode == 2) # ENOENT
bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
- log.debug('current bucket sync status=%s', bucket_sync_status_json)
+ log.debug('current bucket sync markers=%s', bucket_sync_status_json)
sync_status = json.loads(bucket_sync_status_json)
markers={}
def data_source_log_status(source_zone):
source_cluster = source_zone.cluster
cmd = ['datalog', 'status'] + source_zone.zone_args()
- datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
+ datalog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
datalog_status = json.loads(datalog_status_json.decode('utf-8'))
markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
def bucket_source_log_status(source_zone, bucket_name):
cmd = ['bilog', 'status'] + source_zone.zone_args()
cmd += ['--bucket', bucket_name]
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
source_cluster = source_zone.cluster
bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
bilog_status = json.loads(bilog_status_json.decode('utf-8'))
return True
-def zone_data_checkpoint(target_zone, source_zone_conn):
+def zone_data_checkpoint(target_zone, source_zone):
if target_zone == source_zone:
return
assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
(target_zone.name, source_zone.name)
+def zonegroup_data_checkpoint(zonegroup_conns):
+ for source_conn in zonegroup_conns.rw_zones:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+ log.debug('data checkpoint: source=%s target=%s', source_conn.zone.name, target_conn.zone.name)
+ zone_data_checkpoint(target_conn.zone, source_conn.zone)
def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
if target_zone == source_zone:
time.sleep(config.checkpoint_delay)
- assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
+ assert False, 'failed bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
(target_zone.name, source_zone.name, bucket_name)
def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
- for source_conn in zonegroup_conns.zones:
+ for source_conn in zonegroup_conns.rw_zones:
for target_conn in zonegroup_conns.zones:
if source_conn.zone == target_conn.zone:
continue
+ log.debug('bucket checkpoint: source=%s target=%s bucket=%s', source_conn.zone.name, target_conn.zone.name, bucket_name)
zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
+ for source_conn, target_conn in combinations(zonegroup_conns.zones, 2):
+ if target_conn.zone.has_buckets():
target_conn.check_bucket_eq(source_conn, bucket_name)
def set_master_zone(zone):
log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
time.sleep(config.reconfigure_delay)
+def set_sync_from_all(zone, flag):
+ s = 'true' if flag else 'false'
+ zone.modify(zone.cluster, ['--sync-from-all={}'.format(s)])
+ zonegroup = zone.zonegroup
+ zonegroup.period.update(zone, commit=True)
+ log.info('Set sync_from_all flag on zone %s to %s', zone.name, s)
+ time.sleep(config.reconfigure_delay)
+
+def set_redirect_zone(zone, redirect_zone):
+ id_str = redirect_zone.id if redirect_zone else ''
+ zone.modify(zone.cluster, ['--redirect-zone={}'.format(id_str)])
+ zonegroup = zone.zonegroup
+ zonegroup.period.update(zone, commit=True)
+ log.info('Set redirect_zone zone %s to "%s"', zone.name, id_str)
+ time.sleep(config.reconfigure_delay)
+
def enable_bucket_sync(zone, bucket_name):
cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
zone.cluster.admin(cmd)
return b.new_key(obj_name)
def check_bucket_eq(zone_conn1, zone_conn2, bucket):
- return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
+ if zone_conn2.zone.has_buckets():
+ zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
def test_object_sync():
zonegroup = realm.master_zonegroup()
k = new_key(zone_conn, bucket, obj)
k.set_contents_from_string('version1')
- v = get_latest_object_version(k)
- log.debug('version1 id=%s', v.version_id)
+ log.debug('version1 id=%s', k.version_id)
# don't delete version1 - this tests that the initial version
# doesn't get squashed into later versions
# create and delete the following object versions to test that
# the operations don't race with each other during sync
k.set_contents_from_string('version2')
- v = get_latest_object_version(k)
- log.debug('version2 id=%s', v.version_id)
- k.bucket.delete_key(obj, version_id=v.version_id)
+ log.debug('version2 id=%s', k.version_id)
+ k.bucket.delete_key(obj, version_id=k.version_id)
k.set_contents_from_string('version3')
+ log.debug('version3 id=%s', k.version_id)
+ k.bucket.delete_key(obj, version_id=k.version_id)
+
+ for _, bucket in zone_bucket:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ for _, bucket in zone_bucket:
+ # overwrite the acls to test that metadata-only entries are applied
+ for zone_conn in zonegroup_conns.rw_zones:
+ obj = 'obj-' + zone_conn.name
+ k = new_key(zone_conn, bucket.name, obj)
v = get_latest_object_version(k)
- log.debug('version3 id=%s', v.version_id)
- k.bucket.delete_key(obj, version_id=v.version_id)
+ v.make_public()
- for source_conn, bucket in zone_bucket:
- for target_conn in zonegroup_conns.zones:
- if source_conn.zone == target_conn.zone:
- continue
- zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
- check_bucket_eq(source_conn, target_conn, bucket)
+ for _, bucket in zone_bucket:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_concurrent_versioned_object_incremental_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a versioned bucket
+ bucket = zone.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ bucket.configure_versioning(True)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload a dummy object and wait for sync. this forces each zone to finish
+ # a full sync and switch to incremental
+ new_key(zone, bucket, 'dummy').set_contents_from_string('')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # create several concurrent versions on each zone and let them race to sync
+ obj = 'obj'
+ for i in range(10):
+ for zone_conn in zonegroup_conns.rw_zones:
+ k = new_key(zone_conn, bucket, obj)
+ k.set_contents_from_string('version1')
+ log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+def test_version_suspended_incremental_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a non-versioned bucket
+ bucket = zone.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an initial object
+ key1 = new_key(zone, bucket, 'obj')
+ key1.set_contents_from_string('')
+ log.debug('created initial version id=%s', key1.version_id)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # enable versioning
+ bucket.configure_versioning(True)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # re-upload the object as a new version
+ key2 = new_key(zone, bucket, 'obj')
+ key2.set_contents_from_string('')
+ log.debug('created new version id=%s', key2.version_id)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # suspend versioning
+ bucket.configure_versioning(False)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # re-upload the object as a 'null' version
+ key3 = new_key(zone, bucket, 'obj')
+ key3.set_contents_from_string('')
+ log.debug('created null version id=%s', key3.version_id)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_delete_marker_full_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ # enable versioning
+ for _, bucket in zone_bucket:
+ bucket.configure_versioning(True)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone, bucket in zone_bucket:
+ # upload an initial object
+ key1 = new_key(zone, bucket, 'obj')
+ key1.set_contents_from_string('')
+
+ # create a delete marker
+ key2 = new_key(zone, bucket, 'obj')
+ key2.delete()
+
+ # wait for full sync
+ for _, bucket in zone_bucket:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_suspended_delete_marker_full_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ # enable/suspend versioning
+ for _, bucket in zone_bucket:
+ bucket.configure_versioning(True)
+ bucket.configure_versioning(False)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone, bucket in zone_bucket:
+ # upload an initial object
+ key1 = new_key(zone, bucket, 'obj')
+ key1.set_contents_from_string('')
+
+ # create a delete marker
+ key2 = new_key(zone, bucket, 'obj')
+ key2.delete()
+
+ # wait for full sync
+ for _, bucket in zone_bucket:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
def test_bucket_versioning():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
continue
bucket_name = gen_bucket_name()
log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
- bucket = zone_conn.conn.create_bucket(bucket_name)
+ zone_conn.conn.create_bucket(bucket_name)
buckets.append(bucket_name)
# restart zone 3 gateway and wait for sync
if source_conn.zone == target_conn.zone:
continue
- target_conn.check_bucket_eq(source_conn, bucket_name)
+ if target_conn.zone.has_buckets():
+ target_conn.check_bucket_eq(source_conn, bucket_name)
# verify that mdlogs are not empty and match for each period
for period in mdlog_periods:
mdlog = mdlog_list(zone, period)
assert len(mdlog) == 0
+def test_datalog_autotrim():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ # upload an object to each zone to generate a datalog entry
+ for zone, bucket in zone_bucket:
+ k = new_key(zone, bucket.name, 'key')
+ k.set_contents_from_string('body')
+
+ # wait for data sync to catch up
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+ # trim each datalog
+ for zone, _ in zone_bucket:
+ datalog_autotrim(zone.zone)
+ datalog = datalog_list(zone.zone)
+ assert len(datalog) == 0
+
+def test_multi_zone_redirect():
+ zonegroup = realm.master_zonegroup()
+ if len(zonegroup.rw_zones) < 2:
+ raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
+
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ (zc1, zc2) = zonegroup_conns.rw_zones[0:2]
+
+ z1, z2 = (zc1.zone, zc2.zone)
+
+ set_sync_from_all(z2, False)
+
+ # create a bucket on the first zone
+ bucket_name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', z1.name, bucket_name)
+ bucket = zc1.conn.create_bucket(bucket_name)
+ obj = 'testredirect'
+
+ key = bucket.new_key(obj)
+ data = 'A'*512
+ key.set_contents_from_string(data)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # try to read object from second zone (should fail)
+ bucket2 = get_bucket(zc2, bucket_name)
+ assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj)
+
+ set_redirect_zone(z2, z1)
+
+ key2 = bucket2.get_key(obj)
+
+ eq(data, key2.get_contents_as_string())
+
+ key = bucket.new_key(obj)
+
+ for x in ['a', 'b', 'c', 'd']:
+ data = x*512
+ key.set_contents_from_string(data)
+ eq(data, key2.get_contents_as_string())
+
+ # revert config changes
+ set_sync_from_all(z2, True)
+ set_redirect_zone(z2, None)
+
def test_zonegroup_remove():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
z1, z2 = zonegroup.zones[0:2]
c1, c2 = (z1.cluster, z2.cluster)
+ # get admin credentials out of existing zone
+ system_key = z1.data['system_key']
+ admin_creds = Credentials(system_key['access_key'], system_key['secret_key'])
+
# create a new zone in zonegroup on c2 and commit
zone = Zone('remove', zonegroup, c2)
- zone.create(c2)
+ zone.create(c2, admin_creds.credential_args())
zonegroup.zones.append(zone)
zonegroup.period.update(zone, commit=True)
# validate the resulting period
zonegroup.period.update(z1, commit=True)
+
+def test_zg_master_zone_delete():
+
+ master_zg = realm.master_zonegroup()
+ master_zone = master_zg.master_zone
+
+ assert(len(master_zg.zones) >= 1)
+ master_cluster = master_zg.zones[0].cluster
+
+ rm_zg = ZoneGroup('remove_zg')
+ rm_zg.create(master_cluster)
+
+ rm_zone = Zone('remove', rm_zg, master_cluster)
+ rm_zone.create(master_cluster)
+ master_zg.period.update(master_zone, commit=True)
+
+
+ rm_zone.delete(master_cluster)
+ # Period update: This should now fail as the zone will be the master zone
+ # in that zg
+ _, retcode = master_zg.period.update(master_zone, check_retcode=False)
+ assert(retcode == errno.EINVAL)
+
+ # Proceed to delete the zonegroup as well, previous period now does not
+ # contain a dangling master_zone, this must succeed
+ rm_zg.delete(master_cluster)
+ master_zg.period.update(master_zone, commit=True)
+
def test_set_bucket_website():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
for _, bucket in zone_bucket:
for zone in zonegroup.zones:
check_buckets_sync_status_obj_not_exist(zone, buckets)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
def test_bucket_sync_enable_right_after_disable():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
for bucket_name in buckets:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
def test_bucket_sync_disable_enable():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
for bucket_name in buckets:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
def test_multipart_object_sync():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
+ if len(zonegroup.rw_zones) < 2:
+ raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
+
(zone1, zone2) = zonegroup_conns.rw_zones[0:2]
# create a bucket on the first zone
key = bucket2.get_key('testobj-sse-kms')
eq(data, key.get_contents_as_string())
+
+def test_bucket_index_log_trim():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a test bucket, upload some objects, and wait for sync
+ def make_test_bucket():
+ name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone.name, name)
+ bucket = zone.conn.create_bucket(name)
+ for objname in ('a', 'b', 'c', 'd'):
+ k = new_key(zone, name, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_bucket_checkpoint(zonegroup_conns, name)
+ return bucket
+
+ # create a 'cold' bucket
+ cold_bucket = make_test_bucket()
+
+ # trim with max-buckets=0 to clear counters for cold bucket. this should
+ # prevent it from being considered 'active' by the next autotrim
+ bilog_autotrim(zone.zone, [
+ '--rgw-sync-log-trim-max-buckets', '0',
+ ])
+
+ # create an 'active' bucket
+ active_bucket = make_test_bucket()
+
+ # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
+ bilog_autotrim(zone.zone, [
+ '--rgw-sync-log-trim-max-buckets', '1',
+ '--rgw-sync-log-trim-min-cold-buckets', '0',
+ ])
+
+ # verify active bucket has empty bilog
+ active_bilog = bilog_list(zone.zone, active_bucket.name)
+ assert(len(active_bilog) == 0)
+
+ # verify cold bucket has nonempty bilog
+ cold_bilog = bilog_list(zone.zone, cold_bucket.name)
+ assert(len(cold_bilog) > 0)
+
+ # trim with min-cold-buckets=999 to trim all buckets
+ bilog_autotrim(zone.zone, [
+ '--rgw-sync-log-trim-max-buckets', '999',
+ '--rgw-sync-log-trim-min-cold-buckets', '999',
+ ])
+
+ # verify cold bucket has empty bilog
+ cold_bilog = bilog_list(zone.zone, cold_bucket.name)
+ assert(len(cold_bilog) == 0)
+
+def test_bucket_creation_time():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones]
+ for z1, z2 in combinations(zone_buckets, 2):
+ for a, b in zip(z1, z2):
+ eq(a.name, b.name)
+ eq(a.creation_date, b.creation_date)