]> git.proxmox.com Git - mirror_edk2.git/blobdiff - AppPkg/Applications/Python/Python-2.7.2/Lib/test/lock_tests.py
edk2: Remove AppPkg, StdLib, StdLibPrivateInternalFiles
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / lock_tests.py
diff --git a/AppPkg/Applications/Python/Python-2.7.2/Lib/test/lock_tests.py b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/lock_tests.py
deleted file mode 100644 (file)
index 69272f7..0000000
+++ /dev/null
@@ -1,546 +0,0 @@
-"""\r
-Various tests for synchronization primitives.\r
-"""\r
-\r
-import sys\r
-import time\r
-from thread import start_new_thread, get_ident\r
-import threading\r
-import unittest\r
-\r
-from test import test_support as support\r
-\r
-\r
-def _wait():\r
-    # A crude wait/yield function not relying on synchronization primitives.\r
-    time.sleep(0.01)\r
-\r
-class Bunch(object):\r
-    """\r
-    A bunch of threads.\r
-    """\r
-    def __init__(self, f, n, wait_before_exit=False):\r
-        """\r
-        Construct a bunch of `n` threads running the same function `f`.\r
-        If `wait_before_exit` is True, the threads won't terminate until\r
-        do_finish() is called.\r
-        """\r
-        self.f = f\r
-        self.n = n\r
-        self.started = []\r
-        self.finished = []\r
-        self._can_exit = not wait_before_exit\r
-        def task():\r
-            tid = get_ident()\r
-            self.started.append(tid)\r
-            try:\r
-                f()\r
-            finally:\r
-                self.finished.append(tid)\r
-                while not self._can_exit:\r
-                    _wait()\r
-        for i in range(n):\r
-            start_new_thread(task, ())\r
-\r
-    def wait_for_started(self):\r
-        while len(self.started) < self.n:\r
-            _wait()\r
-\r
-    def wait_for_finished(self):\r
-        while len(self.finished) < self.n:\r
-            _wait()\r
-\r
-    def do_finish(self):\r
-        self._can_exit = True\r
-\r
-\r
-class BaseTestCase(unittest.TestCase):\r
-    def setUp(self):\r
-        self._threads = support.threading_setup()\r
-\r
-    def tearDown(self):\r
-        support.threading_cleanup(*self._threads)\r
-        support.reap_children()\r
-\r
-\r
-class BaseLockTests(BaseTestCase):\r
-    """\r
-    Tests for both recursive and non-recursive locks.\r
-    """\r
-\r
-    def test_constructor(self):\r
-        lock = self.locktype()\r
-        del lock\r
-\r
-    def test_acquire_destroy(self):\r
-        lock = self.locktype()\r
-        lock.acquire()\r
-        del lock\r
-\r
-    def test_acquire_release(self):\r
-        lock = self.locktype()\r
-        lock.acquire()\r
-        lock.release()\r
-        del lock\r
-\r
-    def test_try_acquire(self):\r
-        lock = self.locktype()\r
-        self.assertTrue(lock.acquire(False))\r
-        lock.release()\r
-\r
-    def test_try_acquire_contended(self):\r
-        lock = self.locktype()\r
-        lock.acquire()\r
-        result = []\r
-        def f():\r
-            result.append(lock.acquire(False))\r
-        Bunch(f, 1).wait_for_finished()\r
-        self.assertFalse(result[0])\r
-        lock.release()\r
-\r
-    def test_acquire_contended(self):\r
-        lock = self.locktype()\r
-        lock.acquire()\r
-        N = 5\r
-        def f():\r
-            lock.acquire()\r
-            lock.release()\r
-\r
-        b = Bunch(f, N)\r
-        b.wait_for_started()\r
-        _wait()\r
-        self.assertEqual(len(b.finished), 0)\r
-        lock.release()\r
-        b.wait_for_finished()\r
-        self.assertEqual(len(b.finished), N)\r
-\r
-    def test_with(self):\r
-        lock = self.locktype()\r
-        def f():\r
-            lock.acquire()\r
-            lock.release()\r
-        def _with(err=None):\r
-            with lock:\r
-                if err is not None:\r
-                    raise err\r
-        _with()\r
-        # Check the lock is unacquired\r
-        Bunch(f, 1).wait_for_finished()\r
-        self.assertRaises(TypeError, _with, TypeError)\r
-        # Check the lock is unacquired\r
-        Bunch(f, 1).wait_for_finished()\r
-\r
-    def test_thread_leak(self):\r
-        # The lock shouldn't leak a Thread instance when used from a foreign\r
-        # (non-threading) thread.\r
-        lock = self.locktype()\r
-        def f():\r
-            lock.acquire()\r
-            lock.release()\r
-        n = len(threading.enumerate())\r
-        # We run many threads in the hope that existing threads ids won't\r
-        # be recycled.\r
-        Bunch(f, 15).wait_for_finished()\r
-        self.assertEqual(n, len(threading.enumerate()))\r
-\r
-\r
-class LockTests(BaseLockTests):\r
-    """\r
-    Tests for non-recursive, weak locks\r
-    (which can be acquired and released from different threads).\r
-    """\r
-    def test_reacquire(self):\r
-        # Lock needs to be released before re-acquiring.\r
-        lock = self.locktype()\r
-        phase = []\r
-        def f():\r
-            lock.acquire()\r
-            phase.append(None)\r
-            lock.acquire()\r
-            phase.append(None)\r
-        start_new_thread(f, ())\r
-        while len(phase) == 0:\r
-            _wait()\r
-        _wait()\r
-        self.assertEqual(len(phase), 1)\r
-        lock.release()\r
-        while len(phase) == 1:\r
-            _wait()\r
-        self.assertEqual(len(phase), 2)\r
-\r
-    def test_different_thread(self):\r
-        # Lock can be released from a different thread.\r
-        lock = self.locktype()\r
-        lock.acquire()\r
-        def f():\r
-            lock.release()\r
-        b = Bunch(f, 1)\r
-        b.wait_for_finished()\r
-        lock.acquire()\r
-        lock.release()\r
-\r
-\r
-class RLockTests(BaseLockTests):\r
-    """\r
-    Tests for recursive locks.\r
-    """\r
-    def test_reacquire(self):\r
-        lock = self.locktype()\r
-        lock.acquire()\r
-        lock.acquire()\r
-        lock.release()\r
-        lock.acquire()\r
-        lock.release()\r
-        lock.release()\r
-\r
-    def test_release_unacquired(self):\r
-        # Cannot release an unacquired lock\r
-        lock = self.locktype()\r
-        self.assertRaises(RuntimeError, lock.release)\r
-        lock.acquire()\r
-        lock.acquire()\r
-        lock.release()\r
-        lock.acquire()\r
-        lock.release()\r
-        lock.release()\r
-        self.assertRaises(RuntimeError, lock.release)\r
-\r
-    def test_different_thread(self):\r
-        # Cannot release from a different thread\r
-        lock = self.locktype()\r
-        def f():\r
-            lock.acquire()\r
-        b = Bunch(f, 1, True)\r
-        try:\r
-            self.assertRaises(RuntimeError, lock.release)\r
-        finally:\r
-            b.do_finish()\r
-\r
-    def test__is_owned(self):\r
-        lock = self.locktype()\r
-        self.assertFalse(lock._is_owned())\r
-        lock.acquire()\r
-        self.assertTrue(lock._is_owned())\r
-        lock.acquire()\r
-        self.assertTrue(lock._is_owned())\r
-        result = []\r
-        def f():\r
-            result.append(lock._is_owned())\r
-        Bunch(f, 1).wait_for_finished()\r
-        self.assertFalse(result[0])\r
-        lock.release()\r
-        self.assertTrue(lock._is_owned())\r
-        lock.release()\r
-        self.assertFalse(lock._is_owned())\r
-\r
-\r
-class EventTests(BaseTestCase):\r
-    """\r
-    Tests for Event objects.\r
-    """\r
-\r
-    def test_is_set(self):\r
-        evt = self.eventtype()\r
-        self.assertFalse(evt.is_set())\r
-        evt.set()\r
-        self.assertTrue(evt.is_set())\r
-        evt.set()\r
-        self.assertTrue(evt.is_set())\r
-        evt.clear()\r
-        self.assertFalse(evt.is_set())\r
-        evt.clear()\r
-        self.assertFalse(evt.is_set())\r
-\r
-    def _check_notify(self, evt):\r
-        # All threads get notified\r
-        N = 5\r
-        results1 = []\r
-        results2 = []\r
-        def f():\r
-            results1.append(evt.wait())\r
-            results2.append(evt.wait())\r
-        b = Bunch(f, N)\r
-        b.wait_for_started()\r
-        _wait()\r
-        self.assertEqual(len(results1), 0)\r
-        evt.set()\r
-        b.wait_for_finished()\r
-        self.assertEqual(results1, [True] * N)\r
-        self.assertEqual(results2, [True] * N)\r
-\r
-    def test_notify(self):\r
-        evt = self.eventtype()\r
-        self._check_notify(evt)\r
-        # Another time, after an explicit clear()\r
-        evt.set()\r
-        evt.clear()\r
-        self._check_notify(evt)\r
-\r
-    def test_timeout(self):\r
-        evt = self.eventtype()\r
-        results1 = []\r
-        results2 = []\r
-        N = 5\r
-        def f():\r
-            results1.append(evt.wait(0.0))\r
-            t1 = time.time()\r
-            r = evt.wait(0.2)\r
-            t2 = time.time()\r
-            results2.append((r, t2 - t1))\r
-        Bunch(f, N).wait_for_finished()\r
-        self.assertEqual(results1, [False] * N)\r
-        for r, dt in results2:\r
-            self.assertFalse(r)\r
-            self.assertTrue(dt >= 0.2, dt)\r
-        # The event is set\r
-        results1 = []\r
-        results2 = []\r
-        evt.set()\r
-        Bunch(f, N).wait_for_finished()\r
-        self.assertEqual(results1, [True] * N)\r
-        for r, dt in results2:\r
-            self.assertTrue(r)\r
-\r
-\r
-class ConditionTests(BaseTestCase):\r
-    """\r
-    Tests for condition variables.\r
-    """\r
-\r
-    def test_acquire(self):\r
-        cond = self.condtype()\r
-        # Be default we have an RLock: the condition can be acquired multiple\r
-        # times.\r
-        cond.acquire()\r
-        cond.acquire()\r
-        cond.release()\r
-        cond.release()\r
-        lock = threading.Lock()\r
-        cond = self.condtype(lock)\r
-        cond.acquire()\r
-        self.assertFalse(lock.acquire(False))\r
-        cond.release()\r
-        self.assertTrue(lock.acquire(False))\r
-        self.assertFalse(cond.acquire(False))\r
-        lock.release()\r
-        with cond:\r
-            self.assertFalse(lock.acquire(False))\r
-\r
-    def test_unacquired_wait(self):\r
-        cond = self.condtype()\r
-        self.assertRaises(RuntimeError, cond.wait)\r
-\r
-    def test_unacquired_notify(self):\r
-        cond = self.condtype()\r
-        self.assertRaises(RuntimeError, cond.notify)\r
-\r
-    def _check_notify(self, cond):\r
-        N = 5\r
-        results1 = []\r
-        results2 = []\r
-        phase_num = 0\r
-        def f():\r
-            cond.acquire()\r
-            cond.wait()\r
-            cond.release()\r
-            results1.append(phase_num)\r
-            cond.acquire()\r
-            cond.wait()\r
-            cond.release()\r
-            results2.append(phase_num)\r
-        b = Bunch(f, N)\r
-        b.wait_for_started()\r
-        _wait()\r
-        self.assertEqual(results1, [])\r
-        # Notify 3 threads at first\r
-        cond.acquire()\r
-        cond.notify(3)\r
-        _wait()\r
-        phase_num = 1\r
-        cond.release()\r
-        while len(results1) < 3:\r
-            _wait()\r
-        self.assertEqual(results1, [1] * 3)\r
-        self.assertEqual(results2, [])\r
-        # Notify 5 threads: they might be in their first or second wait\r
-        cond.acquire()\r
-        cond.notify(5)\r
-        _wait()\r
-        phase_num = 2\r
-        cond.release()\r
-        while len(results1) + len(results2) < 8:\r
-            _wait()\r
-        self.assertEqual(results1, [1] * 3 + [2] * 2)\r
-        self.assertEqual(results2, [2] * 3)\r
-        # Notify all threads: they are all in their second wait\r
-        cond.acquire()\r
-        cond.notify_all()\r
-        _wait()\r
-        phase_num = 3\r
-        cond.release()\r
-        while len(results2) < 5:\r
-            _wait()\r
-        self.assertEqual(results1, [1] * 3 + [2] * 2)\r
-        self.assertEqual(results2, [2] * 3 + [3] * 2)\r
-        b.wait_for_finished()\r
-\r
-    def test_notify(self):\r
-        cond = self.condtype()\r
-        self._check_notify(cond)\r
-        # A second time, to check internal state is still ok.\r
-        self._check_notify(cond)\r
-\r
-    def test_timeout(self):\r
-        cond = self.condtype()\r
-        results = []\r
-        N = 5\r
-        def f():\r
-            cond.acquire()\r
-            t1 = time.time()\r
-            cond.wait(0.2)\r
-            t2 = time.time()\r
-            cond.release()\r
-            results.append(t2 - t1)\r
-        Bunch(f, N).wait_for_finished()\r
-        self.assertEqual(len(results), 5)\r
-        for dt in results:\r
-            self.assertTrue(dt >= 0.2, dt)\r
-\r
-\r
-class BaseSemaphoreTests(BaseTestCase):\r
-    """\r
-    Common tests for {bounded, unbounded} semaphore objects.\r
-    """\r
-\r
-    def test_constructor(self):\r
-        self.assertRaises(ValueError, self.semtype, value = -1)\r
-        self.assertRaises(ValueError, self.semtype, value = -sys.maxint)\r
-\r
-    def test_acquire(self):\r
-        sem = self.semtype(1)\r
-        sem.acquire()\r
-        sem.release()\r
-        sem = self.semtype(2)\r
-        sem.acquire()\r
-        sem.acquire()\r
-        sem.release()\r
-        sem.release()\r
-\r
-    def test_acquire_destroy(self):\r
-        sem = self.semtype()\r
-        sem.acquire()\r
-        del sem\r
-\r
-    def test_acquire_contended(self):\r
-        sem = self.semtype(7)\r
-        sem.acquire()\r
-        N = 10\r
-        results1 = []\r
-        results2 = []\r
-        phase_num = 0\r
-        def f():\r
-            sem.acquire()\r
-            results1.append(phase_num)\r
-            sem.acquire()\r
-            results2.append(phase_num)\r
-        b = Bunch(f, 10)\r
-        b.wait_for_started()\r
-        while len(results1) + len(results2) < 6:\r
-            _wait()\r
-        self.assertEqual(results1 + results2, [0] * 6)\r
-        phase_num = 1\r
-        for i in range(7):\r
-            sem.release()\r
-        while len(results1) + len(results2) < 13:\r
-            _wait()\r
-        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)\r
-        phase_num = 2\r
-        for i in range(6):\r
-            sem.release()\r
-        while len(results1) + len(results2) < 19:\r
-            _wait()\r
-        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)\r
-        # The semaphore is still locked\r
-        self.assertFalse(sem.acquire(False))\r
-        # Final release, to let the last thread finish\r
-        sem.release()\r
-        b.wait_for_finished()\r
-\r
-    def test_try_acquire(self):\r
-        sem = self.semtype(2)\r
-        self.assertTrue(sem.acquire(False))\r
-        self.assertTrue(sem.acquire(False))\r
-        self.assertFalse(sem.acquire(False))\r
-        sem.release()\r
-        self.assertTrue(sem.acquire(False))\r
-\r
-    def test_try_acquire_contended(self):\r
-        sem = self.semtype(4)\r
-        sem.acquire()\r
-        results = []\r
-        def f():\r
-            results.append(sem.acquire(False))\r
-            results.append(sem.acquire(False))\r
-        Bunch(f, 5).wait_for_finished()\r
-        # There can be a thread switch between acquiring the semaphore and\r
-        # appending the result, therefore results will not necessarily be\r
-        # ordered.\r
-        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )\r
-\r
-    def test_default_value(self):\r
-        # The default initial value is 1.\r
-        sem = self.semtype()\r
-        sem.acquire()\r
-        def f():\r
-            sem.acquire()\r
-            sem.release()\r
-        b = Bunch(f, 1)\r
-        b.wait_for_started()\r
-        _wait()\r
-        self.assertFalse(b.finished)\r
-        sem.release()\r
-        b.wait_for_finished()\r
-\r
-    def test_with(self):\r
-        sem = self.semtype(2)\r
-        def _with(err=None):\r
-            with sem:\r
-                self.assertTrue(sem.acquire(False))\r
-                sem.release()\r
-                with sem:\r
-                    self.assertFalse(sem.acquire(False))\r
-                    if err:\r
-                        raise err\r
-        _with()\r
-        self.assertTrue(sem.acquire(False))\r
-        sem.release()\r
-        self.assertRaises(TypeError, _with, TypeError)\r
-        self.assertTrue(sem.acquire(False))\r
-        sem.release()\r
-\r
-class SemaphoreTests(BaseSemaphoreTests):\r
-    """\r
-    Tests for unbounded semaphores.\r
-    """\r
-\r
-    def test_release_unacquired(self):\r
-        # Unbounded releases are allowed and increment the semaphore's value\r
-        sem = self.semtype(1)\r
-        sem.release()\r
-        sem.acquire()\r
-        sem.acquire()\r
-        sem.release()\r
-\r
-\r
-class BoundedSemaphoreTests(BaseSemaphoreTests):\r
-    """\r
-    Tests for bounded semaphores.\r
-    """\r
-\r
-    def test_release_unacquired(self):\r
-        # Cannot go past the initial value\r
-        sem = self.semtype()\r
-        self.assertRaises(ValueError, sem.release)\r
-        sem.acquire()\r
-        sem.release()\r
-        self.assertRaises(ValueError, sem.release)\r