+++ /dev/null
-"""\r
-Various tests for synchronization primitives.\r
-"""\r
-\r
-import sys\r
-import time\r
-from thread import start_new_thread, get_ident\r
-import threading\r
-import unittest\r
-\r
-from test import test_support as support\r
-\r
-\r
-def _wait():\r
- # A crude wait/yield function not relying on synchronization primitives.\r
- time.sleep(0.01)\r
-\r
-class Bunch(object):\r
- """\r
- A bunch of threads.\r
- """\r
- def __init__(self, f, n, wait_before_exit=False):\r
- """\r
- Construct a bunch of `n` threads running the same function `f`.\r
- If `wait_before_exit` is True, the threads won't terminate until\r
- do_finish() is called.\r
- """\r
- self.f = f\r
- self.n = n\r
- self.started = []\r
- self.finished = []\r
- self._can_exit = not wait_before_exit\r
- def task():\r
- tid = get_ident()\r
- self.started.append(tid)\r
- try:\r
- f()\r
- finally:\r
- self.finished.append(tid)\r
- while not self._can_exit:\r
- _wait()\r
- for i in range(n):\r
- start_new_thread(task, ())\r
-\r
- def wait_for_started(self):\r
- while len(self.started) < self.n:\r
- _wait()\r
-\r
- def wait_for_finished(self):\r
- while len(self.finished) < self.n:\r
- _wait()\r
-\r
- def do_finish(self):\r
- self._can_exit = True\r
-\r
-\r
-class BaseTestCase(unittest.TestCase):\r
- def setUp(self):\r
- self._threads = support.threading_setup()\r
-\r
- def tearDown(self):\r
- support.threading_cleanup(*self._threads)\r
- support.reap_children()\r
-\r
-\r
-class BaseLockTests(BaseTestCase):\r
- """\r
- Tests for both recursive and non-recursive locks.\r
- """\r
-\r
- def test_constructor(self):\r
- lock = self.locktype()\r
- del lock\r
-\r
- def test_acquire_destroy(self):\r
- lock = self.locktype()\r
- lock.acquire()\r
- del lock\r
-\r
- def test_acquire_release(self):\r
- lock = self.locktype()\r
- lock.acquire()\r
- lock.release()\r
- del lock\r
-\r
- def test_try_acquire(self):\r
- lock = self.locktype()\r
- self.assertTrue(lock.acquire(False))\r
- lock.release()\r
-\r
- def test_try_acquire_contended(self):\r
- lock = self.locktype()\r
- lock.acquire()\r
- result = []\r
- def f():\r
- result.append(lock.acquire(False))\r
- Bunch(f, 1).wait_for_finished()\r
- self.assertFalse(result[0])\r
- lock.release()\r
-\r
- def test_acquire_contended(self):\r
- lock = self.locktype()\r
- lock.acquire()\r
- N = 5\r
- def f():\r
- lock.acquire()\r
- lock.release()\r
-\r
- b = Bunch(f, N)\r
- b.wait_for_started()\r
- _wait()\r
- self.assertEqual(len(b.finished), 0)\r
- lock.release()\r
- b.wait_for_finished()\r
- self.assertEqual(len(b.finished), N)\r
-\r
- def test_with(self):\r
- lock = self.locktype()\r
- def f():\r
- lock.acquire()\r
- lock.release()\r
- def _with(err=None):\r
- with lock:\r
- if err is not None:\r
- raise err\r
- _with()\r
- # Check the lock is unacquired\r
- Bunch(f, 1).wait_for_finished()\r
- self.assertRaises(TypeError, _with, TypeError)\r
- # Check the lock is unacquired\r
- Bunch(f, 1).wait_for_finished()\r
-\r
- def test_thread_leak(self):\r
- # The lock shouldn't leak a Thread instance when used from a foreign\r
- # (non-threading) thread.\r
- lock = self.locktype()\r
- def f():\r
- lock.acquire()\r
- lock.release()\r
- n = len(threading.enumerate())\r
- # We run many threads in the hope that existing threads ids won't\r
- # be recycled.\r
- Bunch(f, 15).wait_for_finished()\r
- self.assertEqual(n, len(threading.enumerate()))\r
-\r
-\r
-class LockTests(BaseLockTests):\r
- """\r
- Tests for non-recursive, weak locks\r
- (which can be acquired and released from different threads).\r
- """\r
- def test_reacquire(self):\r
- # Lock needs to be released before re-acquiring.\r
- lock = self.locktype()\r
- phase = []\r
- def f():\r
- lock.acquire()\r
- phase.append(None)\r
- lock.acquire()\r
- phase.append(None)\r
- start_new_thread(f, ())\r
- while len(phase) == 0:\r
- _wait()\r
- _wait()\r
- self.assertEqual(len(phase), 1)\r
- lock.release()\r
- while len(phase) == 1:\r
- _wait()\r
- self.assertEqual(len(phase), 2)\r
-\r
- def test_different_thread(self):\r
- # Lock can be released from a different thread.\r
- lock = self.locktype()\r
- lock.acquire()\r
- def f():\r
- lock.release()\r
- b = Bunch(f, 1)\r
- b.wait_for_finished()\r
- lock.acquire()\r
- lock.release()\r
-\r
-\r
-class RLockTests(BaseLockTests):\r
- """\r
- Tests for recursive locks.\r
- """\r
- def test_reacquire(self):\r
- lock = self.locktype()\r
- lock.acquire()\r
- lock.acquire()\r
- lock.release()\r
- lock.acquire()\r
- lock.release()\r
- lock.release()\r
-\r
- def test_release_unacquired(self):\r
- # Cannot release an unacquired lock\r
- lock = self.locktype()\r
- self.assertRaises(RuntimeError, lock.release)\r
- lock.acquire()\r
- lock.acquire()\r
- lock.release()\r
- lock.acquire()\r
- lock.release()\r
- lock.release()\r
- self.assertRaises(RuntimeError, lock.release)\r
-\r
- def test_different_thread(self):\r
- # Cannot release from a different thread\r
- lock = self.locktype()\r
- def f():\r
- lock.acquire()\r
- b = Bunch(f, 1, True)\r
- try:\r
- self.assertRaises(RuntimeError, lock.release)\r
- finally:\r
- b.do_finish()\r
-\r
- def test__is_owned(self):\r
- lock = self.locktype()\r
- self.assertFalse(lock._is_owned())\r
- lock.acquire()\r
- self.assertTrue(lock._is_owned())\r
- lock.acquire()\r
- self.assertTrue(lock._is_owned())\r
- result = []\r
- def f():\r
- result.append(lock._is_owned())\r
- Bunch(f, 1).wait_for_finished()\r
- self.assertFalse(result[0])\r
- lock.release()\r
- self.assertTrue(lock._is_owned())\r
- lock.release()\r
- self.assertFalse(lock._is_owned())\r
-\r
-\r
-class EventTests(BaseTestCase):\r
- """\r
- Tests for Event objects.\r
- """\r
-\r
- def test_is_set(self):\r
- evt = self.eventtype()\r
- self.assertFalse(evt.is_set())\r
- evt.set()\r
- self.assertTrue(evt.is_set())\r
- evt.set()\r
- self.assertTrue(evt.is_set())\r
- evt.clear()\r
- self.assertFalse(evt.is_set())\r
- evt.clear()\r
- self.assertFalse(evt.is_set())\r
-\r
- def _check_notify(self, evt):\r
- # All threads get notified\r
- N = 5\r
- results1 = []\r
- results2 = []\r
- def f():\r
- results1.append(evt.wait())\r
- results2.append(evt.wait())\r
- b = Bunch(f, N)\r
- b.wait_for_started()\r
- _wait()\r
- self.assertEqual(len(results1), 0)\r
- evt.set()\r
- b.wait_for_finished()\r
- self.assertEqual(results1, [True] * N)\r
- self.assertEqual(results2, [True] * N)\r
-\r
- def test_notify(self):\r
- evt = self.eventtype()\r
- self._check_notify(evt)\r
- # Another time, after an explicit clear()\r
- evt.set()\r
- evt.clear()\r
- self._check_notify(evt)\r
-\r
- def test_timeout(self):\r
- evt = self.eventtype()\r
- results1 = []\r
- results2 = []\r
- N = 5\r
- def f():\r
- results1.append(evt.wait(0.0))\r
- t1 = time.time()\r
- r = evt.wait(0.2)\r
- t2 = time.time()\r
- results2.append((r, t2 - t1))\r
- Bunch(f, N).wait_for_finished()\r
- self.assertEqual(results1, [False] * N)\r
- for r, dt in results2:\r
- self.assertFalse(r)\r
- self.assertTrue(dt >= 0.2, dt)\r
- # The event is set\r
- results1 = []\r
- results2 = []\r
- evt.set()\r
- Bunch(f, N).wait_for_finished()\r
- self.assertEqual(results1, [True] * N)\r
- for r, dt in results2:\r
- self.assertTrue(r)\r
-\r
-\r
-class ConditionTests(BaseTestCase):\r
- """\r
- Tests for condition variables.\r
- """\r
-\r
- def test_acquire(self):\r
- cond = self.condtype()\r
- # Be default we have an RLock: the condition can be acquired multiple\r
- # times.\r
- cond.acquire()\r
- cond.acquire()\r
- cond.release()\r
- cond.release()\r
- lock = threading.Lock()\r
- cond = self.condtype(lock)\r
- cond.acquire()\r
- self.assertFalse(lock.acquire(False))\r
- cond.release()\r
- self.assertTrue(lock.acquire(False))\r
- self.assertFalse(cond.acquire(False))\r
- lock.release()\r
- with cond:\r
- self.assertFalse(lock.acquire(False))\r
-\r
- def test_unacquired_wait(self):\r
- cond = self.condtype()\r
- self.assertRaises(RuntimeError, cond.wait)\r
-\r
- def test_unacquired_notify(self):\r
- cond = self.condtype()\r
- self.assertRaises(RuntimeError, cond.notify)\r
-\r
- def _check_notify(self, cond):\r
- N = 5\r
- results1 = []\r
- results2 = []\r
- phase_num = 0\r
- def f():\r
- cond.acquire()\r
- cond.wait()\r
- cond.release()\r
- results1.append(phase_num)\r
- cond.acquire()\r
- cond.wait()\r
- cond.release()\r
- results2.append(phase_num)\r
- b = Bunch(f, N)\r
- b.wait_for_started()\r
- _wait()\r
- self.assertEqual(results1, [])\r
- # Notify 3 threads at first\r
- cond.acquire()\r
- cond.notify(3)\r
- _wait()\r
- phase_num = 1\r
- cond.release()\r
- while len(results1) < 3:\r
- _wait()\r
- self.assertEqual(results1, [1] * 3)\r
- self.assertEqual(results2, [])\r
- # Notify 5 threads: they might be in their first or second wait\r
- cond.acquire()\r
- cond.notify(5)\r
- _wait()\r
- phase_num = 2\r
- cond.release()\r
- while len(results1) + len(results2) < 8:\r
- _wait()\r
- self.assertEqual(results1, [1] * 3 + [2] * 2)\r
- self.assertEqual(results2, [2] * 3)\r
- # Notify all threads: they are all in their second wait\r
- cond.acquire()\r
- cond.notify_all()\r
- _wait()\r
- phase_num = 3\r
- cond.release()\r
- while len(results2) < 5:\r
- _wait()\r
- self.assertEqual(results1, [1] * 3 + [2] * 2)\r
- self.assertEqual(results2, [2] * 3 + [3] * 2)\r
- b.wait_for_finished()\r
-\r
- def test_notify(self):\r
- cond = self.condtype()\r
- self._check_notify(cond)\r
- # A second time, to check internal state is still ok.\r
- self._check_notify(cond)\r
-\r
- def test_timeout(self):\r
- cond = self.condtype()\r
- results = []\r
- N = 5\r
- def f():\r
- cond.acquire()\r
- t1 = time.time()\r
- cond.wait(0.2)\r
- t2 = time.time()\r
- cond.release()\r
- results.append(t2 - t1)\r
- Bunch(f, N).wait_for_finished()\r
- self.assertEqual(len(results), 5)\r
- for dt in results:\r
- self.assertTrue(dt >= 0.2, dt)\r
-\r
-\r
-class BaseSemaphoreTests(BaseTestCase):\r
- """\r
- Common tests for {bounded, unbounded} semaphore objects.\r
- """\r
-\r
- def test_constructor(self):\r
- self.assertRaises(ValueError, self.semtype, value = -1)\r
- self.assertRaises(ValueError, self.semtype, value = -sys.maxint)\r
-\r
- def test_acquire(self):\r
- sem = self.semtype(1)\r
- sem.acquire()\r
- sem.release()\r
- sem = self.semtype(2)\r
- sem.acquire()\r
- sem.acquire()\r
- sem.release()\r
- sem.release()\r
-\r
- def test_acquire_destroy(self):\r
- sem = self.semtype()\r
- sem.acquire()\r
- del sem\r
-\r
- def test_acquire_contended(self):\r
- sem = self.semtype(7)\r
- sem.acquire()\r
- N = 10\r
- results1 = []\r
- results2 = []\r
- phase_num = 0\r
- def f():\r
- sem.acquire()\r
- results1.append(phase_num)\r
- sem.acquire()\r
- results2.append(phase_num)\r
- b = Bunch(f, 10)\r
- b.wait_for_started()\r
- while len(results1) + len(results2) < 6:\r
- _wait()\r
- self.assertEqual(results1 + results2, [0] * 6)\r
- phase_num = 1\r
- for i in range(7):\r
- sem.release()\r
- while len(results1) + len(results2) < 13:\r
- _wait()\r
- self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)\r
- phase_num = 2\r
- for i in range(6):\r
- sem.release()\r
- while len(results1) + len(results2) < 19:\r
- _wait()\r
- self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)\r
- # The semaphore is still locked\r
- self.assertFalse(sem.acquire(False))\r
- # Final release, to let the last thread finish\r
- sem.release()\r
- b.wait_for_finished()\r
-\r
- def test_try_acquire(self):\r
- sem = self.semtype(2)\r
- self.assertTrue(sem.acquire(False))\r
- self.assertTrue(sem.acquire(False))\r
- self.assertFalse(sem.acquire(False))\r
- sem.release()\r
- self.assertTrue(sem.acquire(False))\r
-\r
- def test_try_acquire_contended(self):\r
- sem = self.semtype(4)\r
- sem.acquire()\r
- results = []\r
- def f():\r
- results.append(sem.acquire(False))\r
- results.append(sem.acquire(False))\r
- Bunch(f, 5).wait_for_finished()\r
- # There can be a thread switch between acquiring the semaphore and\r
- # appending the result, therefore results will not necessarily be\r
- # ordered.\r
- self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )\r
-\r
- def test_default_value(self):\r
- # The default initial value is 1.\r
- sem = self.semtype()\r
- sem.acquire()\r
- def f():\r
- sem.acquire()\r
- sem.release()\r
- b = Bunch(f, 1)\r
- b.wait_for_started()\r
- _wait()\r
- self.assertFalse(b.finished)\r
- sem.release()\r
- b.wait_for_finished()\r
-\r
- def test_with(self):\r
- sem = self.semtype(2)\r
- def _with(err=None):\r
- with sem:\r
- self.assertTrue(sem.acquire(False))\r
- sem.release()\r
- with sem:\r
- self.assertFalse(sem.acquire(False))\r
- if err:\r
- raise err\r
- _with()\r
- self.assertTrue(sem.acquire(False))\r
- sem.release()\r
- self.assertRaises(TypeError, _with, TypeError)\r
- self.assertTrue(sem.acquire(False))\r
- sem.release()\r
-\r
-class SemaphoreTests(BaseSemaphoreTests):\r
- """\r
- Tests for unbounded semaphores.\r
- """\r
-\r
- def test_release_unacquired(self):\r
- # Unbounded releases are allowed and increment the semaphore's value\r
- sem = self.semtype(1)\r
- sem.release()\r
- sem.acquire()\r
- sem.acquire()\r
- sem.release()\r
-\r
-\r
-class BoundedSemaphoreTests(BaseSemaphoreTests):\r
- """\r
- Tests for bounded semaphores.\r
- """\r
-\r
- def test_release_unacquired(self):\r
- # Cannot go past the initial value\r
- sem = self.semtype()\r
- self.assertRaises(ValueError, sem.release)\r
- sem.acquire()\r
- sem.release()\r
- self.assertRaises(ValueError, sem.release)\r