import unittest
+from unittest import case
import time
import logging
mon_manager = None
+ # Declarative test requirements: subclasses should override these to indicate
+ # their special needs. If not met, tests will be skipped.
+ REQUIRE_MEMSTORE = False
+
def setUp(self):
self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
"Starting test {0}".format(self.id()))
+ if self.REQUIRE_MEMSTORE:
+ objectstore = self.ceph_cluster.get_config("osd_objectstore", "osd")
+ if objectstore != "memstore":
+ # You certainly *could* run this on a real OSD, but you don't want to sit
+ # here for hours waiting for the test to fill up a 1TB drive!
+ raise case.SkipTest("Require `memstore` OSD backend (test " \
+ "would take too long on full sized OSDs")
+
+
+
def tearDown(self):
self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
"Ended test {0}".format(self.id()))
- def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
+ def assert_cluster_log(self, expected_pattern, invert_match=False,
+ timeout=10, watch_channel=None):
"""
Context manager. Assert that during execution, or up to 5 seconds later,
the Ceph cluster log emits a message matching the expected pattern.
- :param expected_pattern: a string that you expect to see in the log output
+ :param expected_pattern: A string that you expect to see in the log output
+ :type expected_pattern: str
+ :param watch_channel: Specifies the channel to be watched. This can be
+ 'cluster', 'audit', ...
+ :type watch_channel: str
"""
ceph_manager = self.ceph_cluster.mon_manager
return found
def __enter__(self):
- self.watcher_process = ceph_manager.run_ceph_w()
+ self.watcher_process = ceph_manager.run_ceph_w(watch_channel)
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.watcher_process.finished:
log.debug("wait_until_equal: success")
- def wait_until_true(self, condition, timeout):
- period = 5
+ @classmethod
+ def wait_until_true(cls, condition, timeout, period=5):
elapsed = 0
while True:
if condition():