from __future__ import print_function
+from nose import SkipTest
from nose.tools import eq_ as eq, ok_ as ok, assert_raises
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, requires, opt,
[i.remove() for i in self.ioctx.list_objects()]
+ def test_applications(self):
+ cmd = {"prefix":"osd dump", "format":"json"}
+ ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'')
+ eq(ret, 0)
+ assert len(buf) > 0
+ release = json.loads(buf.decode("utf-8")).get("require_osd_release",
+ None)
+ if not release or release[0] < 'l':
+ raise SkipTest
+
+ eq([], self.ioctx.application_list())
+
+ self.ioctx.application_enable("app1")
+ assert_raises(Error, self.ioctx.application_enable, "app2")
+ self.ioctx.application_enable("app2", True)
+
+ assert_raises(Error, self.ioctx.application_metadata_list, "dne")
+ eq([], self.ioctx.application_metadata_list("app1"))
+
+ assert_raises(Error, self.ioctx.application_metadata_set, "dne", "key",
+ "key")
+ self.ioctx.application_metadata_set("app1", "key1", "val1")
+ self.ioctx.application_metadata_set("app1", "key2", "val2")
+ self.ioctx.application_metadata_set("app2", "key1", "val1")
+
+ eq([("key1", "val1"), ("key2", "val2")],
+ self.ioctx.application_metadata_list("app1"))
+
+ self.ioctx.application_metadata_remove("app1", "key1")
+ eq([("key2", "val2")], self.ioctx.application_metadata_list("app1"))
+
class TestObject(object):
def setUp(self):