import socket
import os
import platform
+import pytest
import time
import sys
+from assertions import (assert_equal as eq, assert_raises, assert_not_equal,
+ assert_greater_equal)
from datetime import datetime, timedelta
-from nose import with_setup, SkipTest
-from nose.plugins.attrib import attr
-from nose.tools import eq_ as eq, assert_raises, assert_not_equal
from rados import (Rados,
LIBRADOS_OP_FLAG_FADVISE_DONTNEED,
LIBRADOS_OP_FLAG_FADVISE_NOCACHE,
if image_name is not None:
RBD().remove(ioctx, image_name)
+@pytest.fixture
+def tmp_image():
+ create_image()
+ yield
+ remove_image()
+
def create_group():
global group_name
group_name = get_temp_group_name()
if group_name is not None:
RBD().group_remove(ioctx, group_name)
+@pytest.fixture
+def tmp_group():
+ create_group()
+ yield
+ remove_group()
+
def rename_group():
new_group_name = "new" + group_name
RBD().group_rename(ioctx, group_name, new_group_name)
def _require_new_format(*args, **kwargs):
global features
if features is None:
- raise SkipTest
+ pytest.skip('requires new format')
return fn(*args, **kwargs)
return functools.wraps(fn)(_require_new_format)
return wrapper
def _require_features(*args, **kwargs):
global features
if features is None:
- raise SkipTest
+ pytest.skip('requires new format')
for feature in required_features:
if feature & features != feature:
- raise SkipTest
+ pytest.skip('missing required feature')
return fn(*args, **kwargs)
return functools.wraps(fn)(_require_features)
return wrapper
def wrapper(fn):
def _require_linux(*args, **kwargs):
if platform.system() != "Linux":
- raise SkipTest
+ pytest.skip('requires linux')
return fn(*args, **kwargs)
return functools.wraps(fn)(_require_linux)
return wrapper
global features
for feature in blocklisted_features:
if features is not None and feature & features == feature:
- raise SkipTest
+ pytest.skip('blocklisted feature enabled')
return fn(*args, **kwargs)
return functools.wraps(fn)(_blocklist_features)
return wrapper
def test_list_empty():
eq([], RBD().list(ioctx))
-@with_setup(create_image, remove_image)
-def test_list():
+def test_list(tmp_image):
eq([image_name], RBD().list(ioctx))
with Image(ioctx, image_name) as image:
image_id = image.id()
eq([{'id': image_id, 'name': image_name}], list(RBD().list2(ioctx)))
-@with_setup(create_image)
def test_remove_with_progress():
+ create_image()
d = {'received_callback': False}
def progress_cb(current, total):
d['received_callback'] = True
RBD().remove(ioctx, image_name, on_progress=progress_cb)
eq(True, d['received_callback'])
-@with_setup(create_image)
-def test_remove_canceled():
+def test_remove_canceled(tmp_image):
def progress_cb(current, total):
return -ECANCELED
assert_raises(OperationCanceled, RBD().remove, ioctx, image_name,
on_progress=progress_cb)
-@with_setup(create_image, remove_image)
-def test_rename():
+def test_rename(tmp_image):
rbd = RBD()
image_name2 = get_temp_image_name()
rbd.rename(ioctx, image_name, image_name2)
class TestImage(object):
- def setUp(self):
+ def setup_method(self, method):
self.rbd = RBD()
create_image()
self.image = Image(ioctx, image_name)
- def tearDown(self):
+ def teardown_method(self, method):
self.image.close()
remove_image()
self.image = None
def test_block_name_prefix(self):
assert_not_equal(b'', self.image.block_name_prefix())
+ def test_data_pool_id(self):
+ assert_greater_equal(self.image.data_pool_id(), 0)
+
def test_create_timestamp(self):
timestamp = self.image.create_timestamp()
assert_not_equal(0, timestamp.year)
self._test_copy(features, self.image.stat()['order'],
self.image.stripe_unit(), self.image.stripe_count())
- @attr('SKIP_IF_CRIMSON')
+ @pytest.mark.skip_if_crimson
def test_deep_copy(self):
global ioctx
global features
class TestImageId(object):
- def setUp(self):
+ def setup_method(self, method):
self.rbd = RBD()
create_image()
self.image = Image(ioctx, image_name)
self.image2 = Image(ioctx, None, None, False, self.image.id())
- def tearDown(self):
+ def teardown_method(self, method):
self.image.close()
self.image2.close()
remove_image()
class TestClone(object):
@require_features([RBD_FEATURE_LAYERING])
- def setUp(self):
+ def setup_method(self, method):
global ioctx
global features
self.rbd = RBD()
features)
self.clone = Image(ioctx, self.clone_name)
- def tearDown(self):
+ def teardown_method(self, method):
global ioctx
self.clone.close()
self.rbd.remove(ioctx, self.clone_name)
# can't remove a snapshot that has dependent clones
assert_raises(ImageBusy, self.image.remove_snap, 'snap1')
- # validate parent info of clone created by TestClone.setUp
+ # validate parent info of clone created by TestClone.setup_method
(pool, image, snap) = self.clone.parent_info()
eq(pool, pool_name)
eq(image, image_name)
class TestExclusiveLock(object):
@require_features([RBD_FEATURE_EXCLUSIVE_LOCK])
- def setUp(self):
+ def setup_method(self, method):
global rados2
rados2 = Rados(conffile='')
rados2.connect()
ioctx2 = rados2.open_ioctx(pool_name)
create_image()
- def tearDown(self):
+ def teardown_method(self, method):
remove_image()
global ioctx2
ioctx2.close()
image.lock_acquire(RBD_LOCK_MODE_EXCLUSIVE)
image.lock_release()
- @attr('SKIP_IF_CRIMSON')
+ @pytest.mark.skip_if_crimson
def test_break_lock(self):
blocklist_rados = Rados(conffile='')
blocklist_rados.connect()
if primary is not None:
eq(primary, info['primary'])
- def setUp(self):
+ def setup_method(self, method):
self.rbd = RBD()
self.initial_mirror_mode = self.rbd.mirror_mode_get(ioctx)
self.rbd.mirror_mode_set(ioctx, RBD_MIRROR_MODE_POOL)
create_image()
self.image = Image(ioctx, image_name)
- def tearDown(self):
+ def teardown_method(self, method):
self.image.close()
remove_image()
self.rbd.mirror_mode_set(ioctx, self.initial_mirror_mode)
class TestTrash(object):
- def setUp(self):
+ def setup_method(self, method):
global rados2
rados2 = Rados(conffile='')
rados2.connect()
global ioctx2
ioctx2 = rados2.open_ioctx(pool_name)
- def tearDown(self):
+ def teardown_method(self, method):
global ioctx2
ioctx2.close()
global rados2
def test_list_groups_empty():
eq([], RBD().group_list(ioctx))
-@with_setup(create_group, remove_group)
-def test_list_groups():
+def test_list_groups(tmp_group):
eq([group_name], RBD().group_list(ioctx))
-@with_setup(create_group)
def test_list_groups_after_removed():
+ create_group()
remove_group()
eq([], RBD().group_list(ioctx))
class TestGroups(object):
- def setUp(self):
+ def setup_method(self, method):
global snap_name
self.rbd = RBD()
create_image()
snap_name = get_temp_snap_name()
self.group = Group(ioctx, group_name)
- def tearDown(self):
+ def teardown_method(self, method):
remove_group()
self.image = None
for name in self.image_names:
self.group.remove_snap(snap_name)
eq([], list(self.group.list_snaps()))
-@with_setup(create_image, remove_image)
-def test_rename():
- rbd = RBD()
- image_name2 = get_temp_image_name()
-
class TestMigration(object):
def test_migration(self):