]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/rgw/rgw_multi/tests.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / test / rgw / rgw_multi / tests.py
index 07b2d7e3a06652b473dea1463741d0e041bb73cb..7f013e788c46efb08f9b71b41fbf20cd09ee4778 100644 (file)
@@ -4,13 +4,14 @@ import string
 import sys
 import time
 import logging
+import errno
 
 try:
     from itertools import izip_longest as zip_longest
 except ImportError:
     from itertools import zip_longest
 from itertools import combinations
-from cStringIO import StringIO
+from six import StringIO
 
 import boto
 import boto.s3.connection
@@ -21,9 +22,10 @@ from nose.tools import eq_ as eq
 from nose.plugins.attrib import attr
 from nose.plugins.skip import SkipTest
 
-from .multisite import Zone
+from .multisite import Zone, ZoneGroup, Credentials
 
 from .conn import get_gateway_connection
+from .tools import assert_raises
 
 class Config:
     """ test configuration """
@@ -33,6 +35,7 @@ class Config:
         self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
         # allow some time for realm reconfiguration after changing master zone
         self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
+        self.tenant = kwargs.get('tenant', '')
 
 # rgw multisite tests, written against the interfaces provided in rgw_multi.
 # these tests must be initialized and run by another module that provides
@@ -49,10 +52,16 @@ def init_multi(_realm, _user, _config=None):
     config = _config or Config()
     realm_meta_checkpoint(realm)
 
+def get_user():
+    return user.id if user is not None else ''
+
+def get_tenant():
+    return config.tenant if config is not None and config.tenant is not None else ''
+
 def get_realm():
     return realm
 
-log = logging.getLogger(__name__)
+log = logging.getLogger('rgw_multi.tests')
 
 num_buckets = 0
 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
@@ -95,8 +104,18 @@ def meta_sync_status(zone):
 def mdlog_autotrim(zone):
     zone.cluster.admin(['mdlog', 'autotrim'])
 
+def datalog_list(zone, period = None):
+    cmd = ['datalog', 'list']
+    (datalog_json, _) = zone.cluster.admin(cmd, read_only=True)
+    datalog_json = datalog_json.decode('utf-8')
+    return json.loads(datalog_json)
+
+def datalog_autotrim(zone):
+    zone.cluster.admin(['datalog', 'autotrim'])
+
 def bilog_list(zone, bucket, args = None):
     cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
+    cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
     bilog, _ = zone.cluster.admin(cmd, read_only=True)
     bilog = bilog.decode('utf-8')
     return json.loads(bilog)
@@ -254,6 +273,7 @@ def bucket_sync_status(target_zone, source_zone, bucket_name):
     cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
     cmd += ['--source-zone', source_zone.name]
     cmd += ['--bucket', bucket_name]
+    cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
     while True:
         bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
         if retcode == 0:
@@ -279,7 +299,7 @@ def bucket_sync_status(target_zone, source_zone, bucket_name):
 def data_source_log_status(source_zone):
     source_cluster = source_zone.cluster
     cmd = ['datalog', 'status'] + source_zone.zone_args()
-    datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
+    datalog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
     datalog_status = json.loads(datalog_status_json.decode('utf-8'))
 
     markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
@@ -289,6 +309,7 @@ def data_source_log_status(source_zone):
 def bucket_source_log_status(source_zone, bucket_name):
     cmd = ['bilog', 'status'] + source_zone.zone_args()
     cmd += ['--bucket', bucket_name]
+    cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
     source_cluster = source_zone.cluster
     bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
     bilog_status = json.loads(bilog_status_json.decode('utf-8'))
@@ -344,7 +365,7 @@ def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, syn
 
     return True
 
-def zone_data_checkpoint(target_zone, source_zone_conn):
+def zone_data_checkpoint(target_zone, source_zone):
     if target_zone == source_zone:
         return
 
@@ -366,6 +387,13 @@ def zone_data_checkpoint(target_zone, source_zone_conn):
     assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
                   (target_zone.name, source_zone.name)
 
+def zonegroup_data_checkpoint(zonegroup_conns):
+    for source_conn in zonegroup_conns.rw_zones:
+        for target_conn in zonegroup_conns.zones:
+            if source_conn.zone == target_conn.zone:
+                continue
+            log.debug('data checkpoint: source=%s target=%s', source_conn.zone.name, target_conn.zone.name)
+            zone_data_checkpoint(target_conn.zone, source_conn.zone)
 
 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
     if target_zone == source_zone:
@@ -390,13 +418,15 @@ def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
                   (target_zone.name, source_zone.name, bucket_name)
 
 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
-    for source_conn in zonegroup_conns.zones:
+    for source_conn in zonegroup_conns.rw_zones:
         for target_conn in zonegroup_conns.zones:
             if source_conn.zone == target_conn.zone:
                 continue
+            log.debug('bucket checkpoint: source=%s target=%s bucket=%s', source_conn.zone.name, target_conn.zone.name, bucket_name)
             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
     for source_conn, target_conn in combinations(zonegroup_conns.zones, 2):
-        target_conn.check_bucket_eq(source_conn, bucket_name)
+        if target_conn.zone.has_buckets():
+            target_conn.check_bucket_eq(source_conn, bucket_name)
 
 def set_master_zone(zone):
     zone.modify(zone.cluster, ['--master'])
@@ -406,6 +436,22 @@ def set_master_zone(zone):
     log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
     time.sleep(config.reconfigure_delay)
 
+def set_sync_from_all(zone, flag):
+    s = 'true' if flag else 'false'
+    zone.modify(zone.cluster, ['--sync-from-all={}'.format(s)])
+    zonegroup = zone.zonegroup
+    zonegroup.period.update(zone, commit=True)
+    log.info('Set sync_from_all flag on zone %s to %s', zone.name, s)
+    time.sleep(config.reconfigure_delay)
+
+def set_redirect_zone(zone, redirect_zone):
+    id_str = redirect_zone.id if redirect_zone else ''
+    zone.modify(zone.cluster, ['--redirect-zone={}'.format(id_str)])
+    zonegroup = zone.zonegroup
+    zonegroup.period.update(zone, commit=True)
+    log.info('Set redirect_zone zone %s to "%s"', zone.name, id_str)
+    time.sleep(config.reconfigure_delay)
+
 def enable_bucket_sync(zone, bucket_name):
     cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
     zone.cluster.admin(cmd)
@@ -562,7 +608,8 @@ def new_key(zone, bucket_name, obj_name):
     return b.new_key(obj_name)
 
 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
-    return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
+    if zone_conn2.zone.has_buckets():
+        zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
 
 def test_object_sync():
     zonegroup = realm.master_zonegroup()
@@ -656,26 +703,146 @@ def test_versioned_object_incremental_sync():
             k = new_key(zone_conn, bucket, obj)
 
             k.set_contents_from_string('version1')
-            v = get_latest_object_version(k)
-            log.debug('version1 id=%s', v.version_id)
+            log.debug('version1 id=%s', k.version_id)
             # don't delete version1 - this tests that the initial version
             # doesn't get squashed into later versions
 
             # create and delete the following object versions to test that
             # the operations don't race with each other during sync
             k.set_contents_from_string('version2')
-            v = get_latest_object_version(k)
-            log.debug('version2 id=%s', v.version_id)
-            k.bucket.delete_key(obj, version_id=v.version_id)
+            log.debug('version2 id=%s', k.version_id)
+            k.bucket.delete_key(obj, version_id=k.version_id)
 
             k.set_contents_from_string('version3')
+            log.debug('version3 id=%s', k.version_id)
+            k.bucket.delete_key(obj, version_id=k.version_id)
+
+    for _, bucket in zone_bucket:
+        zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    for _, bucket in zone_bucket:
+        # overwrite the acls to test that metadata-only entries are applied
+        for zone_conn in zonegroup_conns.rw_zones:
+            obj = 'obj-' + zone_conn.name
+            k = new_key(zone_conn, bucket.name, obj)
             v = get_latest_object_version(k)
-            log.debug('version3 id=%s', v.version_id)
-            k.bucket.delete_key(obj, version_id=v.version_id)
+            v.make_public()
 
     for _, bucket in zone_bucket:
         zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
+def test_concurrent_versioned_object_incremental_sync():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    zone = zonegroup_conns.rw_zones[0]
+
+    # create a versioned bucket
+    bucket = zone.create_bucket(gen_bucket_name())
+    log.debug('created bucket=%s', bucket.name)
+    bucket.configure_versioning(True)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # upload a dummy object and wait for sync. this forces each zone to finish
+    # a full sync and switch to incremental
+    new_key(zone, bucket, 'dummy').set_contents_from_string('')
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # create several concurrent versions on each zone and let them race to sync
+    obj = 'obj'
+    for i in range(10):
+        for zone_conn in zonegroup_conns.rw_zones:
+            k = new_key(zone_conn, bucket, obj)
+            k.set_contents_from_string('version1')
+            log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id)
+
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+    zonegroup_data_checkpoint(zonegroup_conns)
+
+def test_version_suspended_incremental_sync():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    zone = zonegroup_conns.rw_zones[0]
+
+    # create a non-versioned bucket
+    bucket = zone.create_bucket(gen_bucket_name())
+    log.debug('created bucket=%s', bucket.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # upload an initial object
+    key1 = new_key(zone, bucket, 'obj')
+    key1.set_contents_from_string('')
+    log.debug('created initial version id=%s', key1.version_id)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # enable versioning
+    bucket.configure_versioning(True)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # re-upload the object as a new version
+    key2 = new_key(zone, bucket, 'obj')
+    key2.set_contents_from_string('')
+    log.debug('created new version id=%s', key2.version_id)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # suspend versioning
+    bucket.configure_versioning(False)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # re-upload the object as a 'null' version
+    key3 = new_key(zone, bucket, 'obj')
+    key3.set_contents_from_string('')
+    log.debug('created null version id=%s', key3.version_id)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_delete_marker_full_sync():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+    # enable versioning
+    for _, bucket in zone_bucket:
+        bucket.configure_versioning(True)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    for zone, bucket in zone_bucket:
+        # upload an initial object
+        key1 = new_key(zone, bucket, 'obj')
+        key1.set_contents_from_string('')
+
+        # create a delete marker
+        key2 = new_key(zone, bucket, 'obj')
+        key2.delete()
+
+    # wait for full sync
+    for _, bucket in zone_bucket:
+        zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_suspended_delete_marker_full_sync():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+    # enable/suspend versioning
+    for _, bucket in zone_bucket:
+        bucket.configure_versioning(True)
+        bucket.configure_versioning(False)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    for zone, bucket in zone_bucket:
+        # upload an initial object
+        key1 = new_key(zone, bucket, 'obj')
+        key1.set_contents_from_string('')
+
+        # create a delete marker
+        key2 = new_key(zone, bucket, 'obj')
+        key2.delete()
+
+    # wait for full sync
+    for _, bucket in zone_bucket:
+        zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
 def test_bucket_versioning():
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
     for _, bucket in zone_bucket:
@@ -768,7 +935,7 @@ def test_multi_period_incremental_sync():
             continue
         bucket_name = gen_bucket_name()
         log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
-        bucket = zone_conn.conn.create_bucket(bucket_name)
+        zone_conn.conn.create_bucket(bucket_name)
         buckets.append(bucket_name)
 
     # restart zone 3 gateway and wait for sync
@@ -782,7 +949,8 @@ def test_multi_period_incremental_sync():
                 if source_conn.zone == target_conn.zone:
                     continue
 
-                target_conn.check_bucket_eq(source_conn, bucket_name)
+                if target_conn.zone.has_buckets():
+                    target_conn.check_bucket_eq(source_conn, bucket_name)
 
     # verify that mdlogs are not empty and match for each period
     for period in mdlog_periods:
@@ -809,6 +977,70 @@ def test_multi_period_incremental_sync():
             mdlog = mdlog_list(zone, period)
             assert len(mdlog) == 0
 
+def test_datalog_autotrim():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+    # upload an object to each zone to generate a datalog entry
+    for zone, bucket in zone_bucket:
+        k = new_key(zone, bucket.name, 'key')
+        k.set_contents_from_string('body')
+
+    # wait for data sync to catch up
+    zonegroup_data_checkpoint(zonegroup_conns)
+
+    # trim each datalog
+    for zone, _ in zone_bucket:
+        datalog_autotrim(zone.zone)
+        datalog = datalog_list(zone.zone)
+        assert len(datalog) == 0
+
+def test_multi_zone_redirect():
+    zonegroup = realm.master_zonegroup()
+    if len(zonegroup.rw_zones) < 2:
+        raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
+
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    (zc1, zc2) = zonegroup_conns.rw_zones[0:2]
+
+    z1, z2 = (zc1.zone, zc2.zone)
+
+    set_sync_from_all(z2, False)
+
+    # create a bucket on the first zone
+    bucket_name = gen_bucket_name()
+    log.info('create bucket zone=%s name=%s', z1.name, bucket_name)
+    bucket = zc1.conn.create_bucket(bucket_name)
+    obj = 'testredirect'
+
+    key = bucket.new_key(obj)
+    data = 'A'*512
+    key.set_contents_from_string(data)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # try to read object from second zone (should fail)
+    bucket2 = get_bucket(zc2, bucket_name)
+    assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj)
+
+    set_redirect_zone(z2, z1)
+
+    key2 = bucket2.get_key(obj)
+
+    eq(data, key2.get_contents_as_string())
+
+    key = bucket.new_key(obj)
+
+    for x in ['a', 'b', 'c', 'd']:
+        data = x*512
+        key.set_contents_from_string(data)
+        eq(data, key2.get_contents_as_string())
+
+    # revert config changes
+    set_sync_from_all(z2, True)
+    set_redirect_zone(z2, None)
+
 def test_zonegroup_remove():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
@@ -819,9 +1051,13 @@ def test_zonegroup_remove():
     z1, z2 = zonegroup.zones[0:2]
     c1, c2 = (z1.cluster, z2.cluster)
 
+    # get admin credentials out of existing zone
+    system_key = z1.data['system_key']
+    admin_creds = Credentials(system_key['access_key'], system_key['secret_key'])
+
     # create a new zone in zonegroup on c2 and commit
     zone = Zone('remove', zonegroup, c2)
-    zone.create(c2)
+    zone.create(c2, admin_creds.credential_args())
     zonegroup.zones.append(zone)
     zonegroup.period.update(zone, commit=True)
 
@@ -837,6 +1073,34 @@ def test_zonegroup_remove():
     # validate the resulting period
     zonegroup.period.update(z1, commit=True)
 
+
+def test_zg_master_zone_delete():
+
+    master_zg = realm.master_zonegroup()
+    master_zone = master_zg.master_zone
+
+    assert(len(master_zg.zones) >= 1)
+    master_cluster = master_zg.zones[0].cluster
+
+    rm_zg = ZoneGroup('remove_zg')
+    rm_zg.create(master_cluster)
+
+    rm_zone = Zone('remove', rm_zg, master_cluster)
+    rm_zone.create(master_cluster)
+    master_zg.period.update(master_zone, commit=True)
+
+
+    rm_zone.delete(master_cluster)
+    # Period update: This should now fail as the zone will be the master zone
+    # in that zg
+    _, retcode = master_zg.period.update(master_zone, check_retcode=False)
+    assert(retcode == errno.EINVAL)
+
+    # Proceed to delete the zonegroup as well, previous period now does not
+    # contain a dangling master_zone, this must succeed
+    rm_zg.delete(master_cluster)
+    master_zg.period.update(master_zone, commit=True)
+
 def test_set_bucket_website():
     buckets, zone_bucket = create_bucket_per_zone_in_realm()
     for _, bucket in zone_bucket:
@@ -872,6 +1136,8 @@ def test_bucket_sync_disable():
     for zone in zonegroup.zones:
         check_buckets_sync_status_obj_not_exist(zone, buckets)
 
+    zonegroup_data_checkpoint(zonegroup_conns)
+
 def test_bucket_sync_enable_right_after_disable():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
@@ -902,6 +1168,8 @@ def test_bucket_sync_enable_right_after_disable():
     for bucket_name in buckets:
         zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
 
+    zonegroup_data_checkpoint(zonegroup_conns)
+
 def test_bucket_sync_disable_enable():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
@@ -938,6 +1206,8 @@ def test_bucket_sync_disable_enable():
     for bucket_name in buckets:
         zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
 
+    zonegroup_data_checkpoint(zonegroup_conns)
+
 def test_multipart_object_sync():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
@@ -963,6 +1233,9 @@ def test_encrypted_object_sync():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
 
+    if len(zonegroup.rw_zones) < 2:
+        raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
+
     (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
 
     # create a bucket on the first zone
@@ -1054,3 +1327,13 @@ def test_bucket_index_log_trim():
     # verify cold bucket has empty bilog
     cold_bilog = bilog_list(zone.zone, cold_bucket.name)
     assert(len(cold_bilog) == 0)
+
+def test_bucket_creation_time():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones]
+    for z1, z2 in combinations(zone_buckets, 2):
+        for a, b in zip(z1, z2):
+            eq(a.name, b.name)
+            eq(a.creation_date, b.creation_date)