# One for looking at the global filesystem, one for being
# the VolumeClient, two for mounting the created shares
CLIENTS_REQUIRED = 4
+ py_version = 'python'
+
+ def setUp(self):
+ CephFSTestCase.setUp(self)
+ self.py_version = self.ctx.config.get('overrides', {}).get('python', 'python')
+ log.info("using python version: %s".format(self.py_version))
def _volume_client_python(self, client, script, vol_prefix=None, ns_prefix=None):
# Can't dedent this *and* the script we pass in, because they might have different
if ns_prefix:
ns_prefix = "\"" + ns_prefix + "\""
return client.run_python("""
+from __future__ import print_function
from ceph_volume_client import CephFSVolumeClient, VolumePath
import logging
log = logging.getLogger("ceph_volume_client")
vc.connect()
{payload}
vc.disconnect()
- """.format(payload=script, conf_path=client.config_path, vol_prefix=vol_prefix, ns_prefix=ns_prefix))
+ """.format(payload=script, conf_path=client.config_path,
+ vol_prefix=vol_prefix, ns_prefix=ns_prefix),
+ self.py_version)
def _sudo_write_file(self, remote, path, data):
"""
vp = VolumePath("{group_id}", "{volume_id}")
auth_result = vc.authorize(vp, "{guest_entity}", readonly={readonly},
tenant_id="{tenant_id}")
- print auth_result['auth_key']
+ print(auth_result['auth_key'])
""".format(
group_id=group_id,
volume_id=volume_id,
mount_path = self._volume_client_python(self.mount_b, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
create_result = vc.create_volume(vp, 1024*1024*{volume_size})
- print create_result['mount_path']
+ print(create_result['mount_path'])
""".format(
group_id=group_id,
volume_id=volume_id,
self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
create_result = vc.create_volume(vp, 10 * 1024 * 1024)
- print create_result['mount_path']
+ print(create_result['mount_path'])
""".format(
group_id=group_id,
volume_id=volume_ids[i]
mount_path = self._volume_client_python(self.mount_b, dedent("""
vp = VolumePath("{group_id}", u"{volume_id}")
create_result = vc.create_volume(vp, 10)
- print create_result['mount_path']
+ print(create_result['mount_path'])
""".format(
group_id=group_id,
volume_id=volume_id
mount_path = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
create_result = vc.create_volume(vp, 1024*1024*10)
- print create_result['mount_path']
+ print(create_result['mount_path'])
""".format(
group_id=group_id,
volume_id=volume_id,
guest_entity_1 = "guest1"
guest_entity_2 = "guest2"
- log.info("print group ID: {0}".format(group_id))
+ log.info("print(group ID: {0})".format(group_id))
# Create a volume.
auths = self._volume_client_python(volumeclient_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
vc.create_volume(vp, 1024*1024*10)
auths = vc.get_authorized_ids(vp)
- print auths
+ print(auths)
""".format(
group_id=group_id,
volume_id=volume_id,
vc.authorize(vp, "{guest_entity_1}", readonly=False)
vc.authorize(vp, "{guest_entity_2}", readonly=True)
auths = vc.get_authorized_ids(vp)
- print auths
+ print(auths)
""".format(
group_id=group_id,
volume_id=volume_id,
guest_entity_2=guest_entity_2,
)))
# Check the list of authorized IDs and their access levels.
- expected_result = [(u'guest1', u'rw'), (u'guest2', u'r')]
+ if self.py_version == 'python3':
+ expected_result = [('guest1', 'rw'), ('guest2', 'r')]
+ else:
+ expected_result = [(u'guest1', u'rw'), (u'guest2', u'r')]
+
self.assertItemsEqual(str(expected_result), auths)
# Disallow both the auth IDs' access to the volume.
vc.deauthorize(vp, "{guest_entity_1}")
vc.deauthorize(vp, "{guest_entity_2}")
auths = vc.get_authorized_ids(vp)
- print auths
+ print(auths)
""".format(
group_id=group_id,
volume_id=volume_id,
"version": 2,
"compat_version": 1,
"dirty": False,
- "tenant_id": u"tenant1",
+ "tenant_id": "tenant1",
"volumes": {
"groupid/volumeid": {
"dirty": False,
- "access_level": u"rw",
+ "access_level": "rw"
}
}
}
"auths": {
"guest": {
"dirty": False,
- "access_level": u"rw"
+ "access_level": "rw"
}
}
}
obj_data = obj_data
)))
+ def test_put_object_versioned(self):
+ vc_mount = self.mounts[1]
+ vc_mount.umount_wait()
+ self._configure_vc_auth(vc_mount, "manila")
+
+ obj_data = 'test_data'
+ obj_name = 'test_vc_ob_2'
+ pool_name = self.fs.get_data_pool_names()[0]
+ self.fs.rados(['put', obj_name, '-'], pool=pool_name, stdin_data=obj_data)
+
+ # Test if put_object_versioned() crosschecks the version of the
+ # given object. Being a negative test, an exception is expected.
+ with self.assertRaises(CommandFailedError):
+ self._volume_client_python(vc_mount, dedent("""
+ data, version = vc.get_object_and_version("{pool_name}", "{obj_name}")
+ data += 'm1'
+ vc.put_object("{pool_name}", "{obj_name}", data)
+ data += 'm2'
+ vc.put_object_versioned("{pool_name}", "{obj_name}", data, version)
+ """).format(pool_name=pool_name, obj_name=obj_name))
+
def test_delete_object(self):
vc_mount = self.mounts[1]
vc_mount.umount_wait()
mount_path = self._volume_client_python(vc_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
create_result = vc.create_volume(vp, 1024*1024*10)
- print create_result['mount_path']
+ print(create_result['mount_path'])
""".format(
group_id=group_id,
volume_id=volume_id
mount_path = self._volume_client_python(vc_mount, dedent("""
vp = VolumePath("{group_id}", "{volume_id}")
create_result = vc.create_volume(vp, 1024*1024*10, namespace_isolated=False)
- print create_result['mount_path']
+ print(create_result['mount_path'])
""".format(
group_id=group_id,
volume_id=volume_id