]> git.proxmox.com Git - mirror_edk2.git/blobdiff - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py
edk2: Remove AppPkg, StdLib, StdLibPrivateInternalFiles
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_thread.py
diff --git a/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py
deleted file mode 100644 (file)
index 82b8fbe..0000000
+++ /dev/null
@@ -1,242 +0,0 @@
-import os\r
-import unittest\r
-import random\r
-from test import test_support\r
-thread = test_support.import_module('thread')\r
-import time\r
-import sys\r
-import weakref\r
-\r
-from test import lock_tests\r
-\r
-NUMTASKS = 10\r
-NUMTRIPS = 3\r
-\r
-\r
-_print_mutex = thread.allocate_lock()\r
-\r
-def verbose_print(arg):\r
-    """Helper function for printing out debugging output."""\r
-    if test_support.verbose:\r
-        with _print_mutex:\r
-            print arg\r
-\r
-\r
-class BasicThreadTest(unittest.TestCase):\r
-\r
-    def setUp(self):\r
-        self.done_mutex = thread.allocate_lock()\r
-        self.done_mutex.acquire()\r
-        self.running_mutex = thread.allocate_lock()\r
-        self.random_mutex = thread.allocate_lock()\r
-        self.created = 0\r
-        self.running = 0\r
-        self.next_ident = 0\r
-\r
-\r
-class ThreadRunningTests(BasicThreadTest):\r
-\r
-    def newtask(self):\r
-        with self.running_mutex:\r
-            self.next_ident += 1\r
-            verbose_print("creating task %s" % self.next_ident)\r
-            thread.start_new_thread(self.task, (self.next_ident,))\r
-            self.created += 1\r
-            self.running += 1\r
-\r
-    def task(self, ident):\r
-        with self.random_mutex:\r
-            delay = random.random() / 10000.0\r
-        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))\r
-        time.sleep(delay)\r
-        verbose_print("task %s done" % ident)\r
-        with self.running_mutex:\r
-            self.running -= 1\r
-            if self.created == NUMTASKS and self.running == 0:\r
-                self.done_mutex.release()\r
-\r
-    def test_starting_threads(self):\r
-        # Basic test for thread creation.\r
-        for i in range(NUMTASKS):\r
-            self.newtask()\r
-        verbose_print("waiting for tasks to complete...")\r
-        self.done_mutex.acquire()\r
-        verbose_print("all tasks done")\r
-\r
-    def test_stack_size(self):\r
-        # Various stack size tests.\r
-        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")\r
-\r
-        thread.stack_size(0)\r
-        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")\r
-\r
-        if os.name not in ("nt", "os2", "posix"):\r
-            return\r
-\r
-        tss_supported = True\r
-        try:\r
-            thread.stack_size(4096)\r
-        except ValueError:\r
-            verbose_print("caught expected ValueError setting "\r
-                            "stack_size(4096)")\r
-        except thread.error:\r
-            tss_supported = False\r
-            verbose_print("platform does not support changing thread stack "\r
-                            "size")\r
-\r
-        if tss_supported:\r
-            fail_msg = "stack_size(%d) failed - should succeed"\r
-            for tss in (262144, 0x100000, 0):\r
-                thread.stack_size(tss)\r
-                self.assertEqual(thread.stack_size(), tss, fail_msg % tss)\r
-                verbose_print("successfully set stack_size(%d)" % tss)\r
-\r
-            for tss in (262144, 0x100000):\r
-                verbose_print("trying stack_size = (%d)" % tss)\r
-                self.next_ident = 0\r
-                self.created = 0\r
-                for i in range(NUMTASKS):\r
-                    self.newtask()\r
-\r
-                verbose_print("waiting for all tasks to complete")\r
-                self.done_mutex.acquire()\r
-                verbose_print("all tasks done")\r
-\r
-            thread.stack_size(0)\r
-\r
-    def test__count(self):\r
-        # Test the _count() function.\r
-        orig = thread._count()\r
-        mut = thread.allocate_lock()\r
-        mut.acquire()\r
-        started = []\r
-        def task():\r
-            started.append(None)\r
-            mut.acquire()\r
-            mut.release()\r
-        thread.start_new_thread(task, ())\r
-        while not started:\r
-            time.sleep(0.01)\r
-        self.assertEqual(thread._count(), orig + 1)\r
-        # Allow the task to finish.\r
-        mut.release()\r
-        # The only reliable way to be sure that the thread ended from the\r
-        # interpreter's point of view is to wait for the function object to be\r
-        # destroyed.\r
-        done = []\r
-        wr = weakref.ref(task, lambda _: done.append(None))\r
-        del task\r
-        while not done:\r
-            time.sleep(0.01)\r
-        self.assertEqual(thread._count(), orig)\r
-\r
-\r
-class Barrier:\r
-    def __init__(self, num_threads):\r
-        self.num_threads = num_threads\r
-        self.waiting = 0\r
-        self.checkin_mutex  = thread.allocate_lock()\r
-        self.checkout_mutex = thread.allocate_lock()\r
-        self.checkout_mutex.acquire()\r
-\r
-    def enter(self):\r
-        self.checkin_mutex.acquire()\r
-        self.waiting = self.waiting + 1\r
-        if self.waiting == self.num_threads:\r
-            self.waiting = self.num_threads - 1\r
-            self.checkout_mutex.release()\r
-            return\r
-        self.checkin_mutex.release()\r
-\r
-        self.checkout_mutex.acquire()\r
-        self.waiting = self.waiting - 1\r
-        if self.waiting == 0:\r
-            self.checkin_mutex.release()\r
-            return\r
-        self.checkout_mutex.release()\r
-\r
-\r
-class BarrierTest(BasicThreadTest):\r
-\r
-    def test_barrier(self):\r
-        self.bar = Barrier(NUMTASKS)\r
-        self.running = NUMTASKS\r
-        for i in range(NUMTASKS):\r
-            thread.start_new_thread(self.task2, (i,))\r
-        verbose_print("waiting for tasks to end")\r
-        self.done_mutex.acquire()\r
-        verbose_print("tasks done")\r
-\r
-    def task2(self, ident):\r
-        for i in range(NUMTRIPS):\r
-            if ident == 0:\r
-                # give it a good chance to enter the next\r
-                # barrier before the others are all out\r
-                # of the current one\r
-                delay = 0\r
-            else:\r
-                with self.random_mutex:\r
-                    delay = random.random() / 10000.0\r
-            verbose_print("task %s will run for %sus" %\r
-                          (ident, round(delay * 1e6)))\r
-            time.sleep(delay)\r
-            verbose_print("task %s entering %s" % (ident, i))\r
-            self.bar.enter()\r
-            verbose_print("task %s leaving barrier" % ident)\r
-        with self.running_mutex:\r
-            self.running -= 1\r
-            # Must release mutex before releasing done, else the main thread can\r
-            # exit and set mutex to None as part of global teardown; then\r
-            # mutex.release() raises AttributeError.\r
-            finished = self.running == 0\r
-        if finished:\r
-            self.done_mutex.release()\r
-\r
-\r
-class LockTests(lock_tests.LockTests):\r
-    locktype = thread.allocate_lock\r
-\r
-\r
-class TestForkInThread(unittest.TestCase):\r
-    def setUp(self):\r
-        self.read_fd, self.write_fd = os.pipe()\r
-\r
-    @unittest.skipIf(sys.platform.startswith('win'),\r
-                     "This test is only appropriate for POSIX-like systems.")\r
-    @test_support.reap_threads\r
-    def test_forkinthread(self):\r
-        def thread1():\r
-            try:\r
-                pid = os.fork() # fork in a thread\r
-            except RuntimeError:\r
-                sys.exit(0) # exit the child\r
-\r
-            if pid == 0: # child\r
-                os.close(self.read_fd)\r
-                os.write(self.write_fd, "OK")\r
-                sys.exit(0)\r
-            else: # parent\r
-                os.close(self.write_fd)\r
-\r
-        thread.start_new_thread(thread1, ())\r
-        self.assertEqual(os.read(self.read_fd, 2), "OK",\r
-                         "Unable to fork() in thread")\r
-\r
-    def tearDown(self):\r
-        try:\r
-            os.close(self.read_fd)\r
-        except OSError:\r
-            pass\r
-\r
-        try:\r
-            os.close(self.write_fd)\r
-        except OSError:\r
-            pass\r
-\r
-\r
-def test_main():\r
-    test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,\r
-                              TestForkInThread)\r
-\r
-if __name__ == "__main__":\r
-    test_main()\r