eq(self.object.read(3), b'bar')
eq(self.object.read(3), b'baz')
+class TestIoCtxSelfManagedSnaps(object):
+ def setUp(self):
+ self.rados = Rados(conffile='')
+ self.rados.connect()
+ self.rados.create_pool('test_pool')
+ assert self.rados.pool_exists('test_pool')
+ self.ioctx = self.rados.open_ioctx('test_pool')
+
+ def tearDown(self):
+ cmd = {"prefix":"osd unset", "key":"noup"}
+ self.rados.mon_command(json.dumps(cmd), b'')
+ self.ioctx.close()
+ self.rados.delete_pool('test_pool')
+ self.rados.shutdown()
+
+ def test(self):
+ # cannot mix-and-match pool and self-managed snapshot mode
+ self.ioctx.set_self_managed_snap_write([])
+ self.ioctx.write('abc', b'abc')
+ snap_id_1 = self.ioctx.create_self_managed_snap()
+ self.ioctx.set_self_managed_snap_write([snap_id_1])
+
+ self.ioctx.write('abc', b'def')
+ snap_id_2 = self.ioctx.create_self_managed_snap()
+ self.ioctx.set_self_managed_snap_write([snap_id_1, snap_id_2])
+
+ self.ioctx.write('abc', b'ghi')
+
+ self.ioctx.rollback_self_managed_snap('abc', snap_id_1)
+ eq(self.ioctx.read('abc'), b'abc')
+
+ self.ioctx.rollback_self_managed_snap('abc', snap_id_2)
+ eq(self.ioctx.read('abc'), b'def')
+
+ self.ioctx.remove_self_managed_snap(snap_id_1)
+ self.ioctx.remove_self_managed_snap(snap_id_2)
+
class TestCommand(object):
def setUp(self):