from __future__ import print_function
-from nose import SkipTest
-from nose.plugins.attrib import attr
-from nose.tools import eq_ as eq, ok_ as ok, assert_raises
+from assertions import assert_equal as eq, assert_raises
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, NotConnected,
LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, LIBRADOS_CREATE_EXCLUSIVE,
import json
import errno
import os
+import pytest
import re
import sys
class TestRados(object):
- def setUp(self):
+ def setup_method(self, method):
self.rados = Rados(conffile='')
self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH')
self.rados.conf_parse_env()
# Assume any pre-existing pools are the cluster's defaults
self.default_pools = self.rados.list_pools()
- def tearDown(self):
+ def teardown_method(self, method):
self.rados.shutdown()
def test_ping_monitor(self):
break
def test_annotations(self):
- with assert_raises(TypeError):
+ with pytest.raises(TypeError):
self.rados.create_pool(0xf00)
def test_create(self):
eq(set(['a' * 500]), self.list_non_default_pools())
self.rados.delete_pool('a' * 500)
- @attr('tier')
+ @pytest.mark.tier
def test_get_pool_base_tier(self):
self.rados.create_pool('foo')
try:
def test_blocklist_add(self):
self.rados.blocklist_add("1.2.3.4/123", 1)
- @attr('stats')
+ @pytest.mark.stats
def test_get_cluster_stats(self):
stats = self.rados.get_cluster_stats()
assert stats['kb'] > 0
class TestIoctx(object):
- def setUp(self):
+ def setup_method(self, method):
self.rados = Rados(conffile='')
self.rados.connect()
self.rados.create_pool('test_pool')
assert self.rados.pool_exists('test_pool')
self.ioctx = self.rados.open_ioctx('test_pool')
- def tearDown(self):
+ def teardown_method(self, method):
cmd = {"prefix":"osd unset", "key":"noup"}
self.rados.mon_command(json.dumps(cmd), b'')
self.ioctx.close()
self.ioctx.remove_snap('foo')
eq(list(self.ioctx.list_snaps()), [])
- @attr('rollback')
+ @pytest.mark.rollback
def test_snap_rollback(self):
self.ioctx.write("insnap", b"contents1")
self.ioctx.create_snap("snap1")
self.ioctx.remove_snap("snap1")
self.ioctx.remove_object("insnap")
- @attr('rollback')
+ @pytest.mark.rollback
def test_snap_rollback_removed(self):
self.ioctx.write("insnap", b"contents1")
self.ioctx.create_snap("snap1")
self.ioctx.remove_object("inhead")
def test_set_omap(self):
- keys = ("1", "2", "3", "4")
- values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
+ keys = ("1", "2", "3", "4", b"\xff")
+ values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04", b"5")
with WriteOpCtx() as write_op:
self.ioctx.set_omap(write_op, keys, values)
write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
self.ioctx.operate_write_op(write_op, "hw")
with ReadOpCtx() as read_op:
- iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
+ iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 5, omap_key_type=bytes)
eq(ret, 0)
self.ioctx.operate_read_op(read_op, "hw")
next(iter)
- eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
+ eq(list(iter), [(b"2", b"bbb"), (b"3", b"ccc"), (b"4", b"\x04\x04\x04\x04"), (b"\xff", b"5")])
with ReadOpCtx() as read_op:
- iter, ret = self.ioctx.get_omap_vals(read_op, "2", "", 4)
+ iter, ret = self.ioctx.get_omap_vals(read_op, b"2", "", 4, omap_key_type=bytes)
eq(ret, 0)
self.ioctx.operate_read_op(read_op, "hw")
- eq(("3", b"ccc"), next(iter))
- eq(list(iter), [("4", b"\x04\x04\x04\x04")])
+ eq((b"3", b"ccc"), next(iter))
+ eq(list(iter), [(b"4", b"\x04\x04\x04\x04"), (b"\xff", b"5")])
with ReadOpCtx() as read_op:
- iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
+ iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4, omap_key_type=bytes)
eq(ret, 0)
read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
self.ioctx.operate_read_op(read_op, "hw")
- eq(list(iter), [("2", b"bbb")])
+ eq(list(iter), [(b"2", b"bbb")])
def test_set_omap_aio(self):
lock = threading.Condition()
write_op.remove()
self.ioctx.operate_write_op(write_op, "write_ops")
- with assert_raises(ObjectNotFound):
+ with pytest.raises(ObjectNotFound):
self.ioctx.read('write_ops')
def test_execute_op(self):
eq(self.ioctx.read('abc'), b'rzxrzxrzx')
def test_get_omap_vals_by_keys(self):
- keys = ("1", "2", "3", "4")
- values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
+ keys = ("1", "2", "3", "4", b"\xff")
+ values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04", b"5")
with WriteOpCtx() as write_op:
self.ioctx.set_omap(write_op, keys, values)
self.ioctx.operate_write_op(write_op, "hw")
with ReadOpCtx() as read_op:
- iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
+ iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",b"\xff"), omap_key_type=bytes)
eq(ret, 0)
self.ioctx.operate_read_op(read_op, "hw")
- eq(list(iter), [("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])
+ eq(list(iter), [(b"3", b"ccc"), (b"4", b"\x04\x04\x04\x04"), (b"\xff", b"5")])
with ReadOpCtx() as read_op:
- iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",))
+ iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",), omap_key_type=bytes)
eq(ret, 0)
- with assert_raises(ObjectNotFound):
+ with pytest.raises(ObjectNotFound):
self.ioctx.operate_read_op(read_op, "no_such")
def test_get_omap_keys(self):
with ReadOpCtx() as read_op:
iter, ret = self.ioctx.get_omap_keys(read_op,"",2)
eq(ret, 0)
- with assert_raises(ObjectNotFound):
+ with pytest.raises(ObjectNotFound):
self.ioctx.operate_read_op(read_op, "no_such")
def test_clear_omap(self):
while count[0] < 1:
lock.wait()
eq(comp.get_return_value(), 0)
- with assert_raises(NoData):
+ with pytest.raises(NoData):
self.ioctx.get_xattr("xyz", "key")
def test_aio_write_no_comp_ref(self):
r, _, _ = self.rados.mon_command(json.dumps(cmd), b'')
eq(r, 0)
- @attr('wait')
+ @pytest.mark.wait
def test_aio_read_wait_for_complete(self):
# use wait_for_complete() and wait for cb by
# watching retval[0]
eq(retval[0], payload)
eq(sys.getrefcount(comp), 2)
- @attr('wait')
+ @pytest.mark.wait
def test_aio_read_wait_for_complete_and_cb(self):
# use wait_for_complete_and_cb(), verify retval[0] is
# set by the time we regain control
eq(retval[0], payload)
eq(sys.getrefcount(comp), 2)
- @attr('wait')
+ @pytest.mark.wait
def test_aio_read_wait_for_complete_and_cb_error(self):
# error case, use wait_for_complete_and_cb(), verify retval[0] is
# set by the time we regain control
release = json.loads(buf.decode("utf-8")).get("require_osd_release",
None)
if not release or release[0] < 'l':
- raise SkipTest
+ pytest.skip('required_osd_release >= l')
eq([], self.ioctx.application_list())
eq(self.ioctx.alignment(), None)
-@attr('ec')
+@pytest.mark.ec
class TestIoctxEc(object):
- def setUp(self):
+ def setup_method(self, method):
self.rados = Rados(conffile='')
self.rados.connect()
self.pool = 'test-ec'
cmd = {"prefix": "osd erasure-code-profile set",
"name": self.profile, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]}
ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
- eq(ret, 0, msg=out)
+ assert ret == 0, out
# create ec pool with profile created above
cmd = {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8,
'pool': self.pool, 'pool_type': 'erasure',
'erasure_code_profile': self.profile}
ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30)
- eq(ret, 0, msg=out)
+ assert ret == 0, out
assert self.rados.pool_exists(self.pool)
self.ioctx = self.rados.open_ioctx(self.pool)
- def tearDown(self):
+ def teardown_method(self, method):
cmd = {"prefix": "osd unset", "key": "noup"}
self.rados.mon_command(json.dumps(cmd), b'')
self.ioctx.close()
class TestIoctx2(object):
- def setUp(self):
+ def setup_method(self, method):
self.rados = Rados(conffile='')
self.rados.connect()
self.rados.create_pool('test_pool')
assert pool_id > 0
self.ioctx2 = self.rados.open_ioctx2(pool_id)
- def tearDown(self):
+ def teardown_method(self, method):
cmd = {"prefix": "osd unset", "key": "noup"}
self.rados.mon_command(json.dumps(cmd), b'')
self.ioctx2.close()
class TestObject(object):
- def setUp(self):
+ def setup_method(self, method):
self.rados = Rados(conffile='')
self.rados.connect()
self.rados.create_pool('test_pool')
self.ioctx.write('foo', b'bar')
self.object = Object(self.ioctx, 'foo')
- def tearDown(self):
+ def teardown_method(self, method):
self.ioctx.close()
self.ioctx = None
self.rados.delete_pool('test_pool')
eq(self.object.read(3), b'baz')
class TestIoCtxSelfManagedSnaps(object):
- def setUp(self):
+ def setup_method(self, method):
self.rados = Rados(conffile='')
self.rados.connect()
self.rados.create_pool('test_pool')
assert self.rados.pool_exists('test_pool')
self.ioctx = self.rados.open_ioctx('test_pool')
- def tearDown(self):
+ def teardown_method(self, method):
cmd = {"prefix":"osd unset", "key":"noup"}
self.rados.mon_command(json.dumps(cmd), b'')
self.ioctx.close()
self.rados.delete_pool('test_pool')
self.rados.shutdown()
- @attr('rollback')
+ @pytest.mark.rollback
def test(self):
# cannot mix-and-match pool and self-managed snapshot mode
self.ioctx.set_self_managed_snap_write([])
class TestCommand(object):
- def setUp(self):
+ def setup_method(self, method):
self.rados = Rados(conffile='')
self.rados.connect()
- def tearDown(self):
+ def teardown_method(self, method):
self.rados.shutdown()
def test_monmap_dump(self):
e = json.loads(buf.decode("utf-8"))
assert('release' in e)
- @attr('bench')
+ @pytest.mark.bench
def test_osd_bench(self):
cmd = dict(prefix='bench', size=4096, count=8192)
ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'',
eq(u"pool '\u9ec5' created", out)
-@attr('watch')
+@pytest.mark.watch
class TestWatchNotify(object):
OID = "test_watch_notify"
- def setUp(self):
+ def setup_method(self, method):
self.rados = Rados(conffile='')
self.rados.connect()
self.rados.create_pool('test_pool')
self.ack_data = {}
self.instance_id = self.rados.get_instance_id()
- def tearDown(self):
+ def teardown_method(self, method):
self.ioctx.close()
self.rados.delete_pool('test_pool')
self.rados.shutdown()
with self.ioctx.watch(self.OID, self.make_callback_reply(),
self.make_error_callback()) as watch1:
watch_id1 = watch1.get_id()
- ok(watch_id1 > 0)
+ assert watch_id1 > 0
with self.rados.open_ioctx('test_pool') as ioctx:
watch2 = ioctx.watch(self.OID, self.make_callback_reply(),
self.make_error_callback())
watch_id2 = watch2.get_id()
- ok(watch_id2 > 0)
+ assert watch_id2 > 0
comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='test')
comp.wait_for_complete_and_cb()
with self.lock:
- ok(self.instance_id in self.ack_cnt)
+ assert self.instance_id in self.ack_cnt
eq(self.ack_cnt[self.instance_id], 2)
eq(self.ack_data[self.instance_id], b'test')
- ok(watch1.check() >= timedelta())
- ok(watch2.check() >= timedelta())
+ assert watch1.check() >= timedelta()
+ assert watch2.check() >= timedelta()
comp = self.ioctx.aio_notify(self.OID, self.notify_callback, msg='best')
comp.wait_for_complete_and_cb()