+++ /dev/null
-import os\r
-import unittest\r
-import random\r
-from test import test_support\r
-thread = test_support.import_module('thread')\r
-import time\r
-import sys\r
-import weakref\r
-\r
-from test import lock_tests\r
-\r
-NUMTASKS = 10\r
-NUMTRIPS = 3\r
-\r
-\r
-_print_mutex = thread.allocate_lock()\r
-\r
-def verbose_print(arg):\r
- """Helper function for printing out debugging output."""\r
- if test_support.verbose:\r
- with _print_mutex:\r
- print arg\r
-\r
-\r
-class BasicThreadTest(unittest.TestCase):\r
-\r
- def setUp(self):\r
- self.done_mutex = thread.allocate_lock()\r
- self.done_mutex.acquire()\r
- self.running_mutex = thread.allocate_lock()\r
- self.random_mutex = thread.allocate_lock()\r
- self.created = 0\r
- self.running = 0\r
- self.next_ident = 0\r
-\r
-\r
-class ThreadRunningTests(BasicThreadTest):\r
-\r
- def newtask(self):\r
- with self.running_mutex:\r
- self.next_ident += 1\r
- verbose_print("creating task %s" % self.next_ident)\r
- thread.start_new_thread(self.task, (self.next_ident,))\r
- self.created += 1\r
- self.running += 1\r
-\r
- def task(self, ident):\r
- with self.random_mutex:\r
- delay = random.random() / 10000.0\r
- verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))\r
- time.sleep(delay)\r
- verbose_print("task %s done" % ident)\r
- with self.running_mutex:\r
- self.running -= 1\r
- if self.created == NUMTASKS and self.running == 0:\r
- self.done_mutex.release()\r
-\r
- def test_starting_threads(self):\r
- # Basic test for thread creation.\r
- for i in range(NUMTASKS):\r
- self.newtask()\r
- verbose_print("waiting for tasks to complete...")\r
- self.done_mutex.acquire()\r
- verbose_print("all tasks done")\r
-\r
- def test_stack_size(self):\r
- # Various stack size tests.\r
- self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")\r
-\r
- thread.stack_size(0)\r
- self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")\r
-\r
- if os.name not in ("nt", "os2", "posix"):\r
- return\r
-\r
- tss_supported = True\r
- try:\r
- thread.stack_size(4096)\r
- except ValueError:\r
- verbose_print("caught expected ValueError setting "\r
- "stack_size(4096)")\r
- except thread.error:\r
- tss_supported = False\r
- verbose_print("platform does not support changing thread stack "\r
- "size")\r
-\r
- if tss_supported:\r
- fail_msg = "stack_size(%d) failed - should succeed"\r
- for tss in (262144, 0x100000, 0):\r
- thread.stack_size(tss)\r
- self.assertEqual(thread.stack_size(), tss, fail_msg % tss)\r
- verbose_print("successfully set stack_size(%d)" % tss)\r
-\r
- for tss in (262144, 0x100000):\r
- verbose_print("trying stack_size = (%d)" % tss)\r
- self.next_ident = 0\r
- self.created = 0\r
- for i in range(NUMTASKS):\r
- self.newtask()\r
-\r
- verbose_print("waiting for all tasks to complete")\r
- self.done_mutex.acquire()\r
- verbose_print("all tasks done")\r
-\r
- thread.stack_size(0)\r
-\r
- def test__count(self):\r
- # Test the _count() function.\r
- orig = thread._count()\r
- mut = thread.allocate_lock()\r
- mut.acquire()\r
- started = []\r
- def task():\r
- started.append(None)\r
- mut.acquire()\r
- mut.release()\r
- thread.start_new_thread(task, ())\r
- while not started:\r
- time.sleep(0.01)\r
- self.assertEqual(thread._count(), orig + 1)\r
- # Allow the task to finish.\r
- mut.release()\r
- # The only reliable way to be sure that the thread ended from the\r
- # interpreter's point of view is to wait for the function object to be\r
- # destroyed.\r
- done = []\r
- wr = weakref.ref(task, lambda _: done.append(None))\r
- del task\r
- while not done:\r
- time.sleep(0.01)\r
- self.assertEqual(thread._count(), orig)\r
-\r
-\r
-class Barrier:\r
- def __init__(self, num_threads):\r
- self.num_threads = num_threads\r
- self.waiting = 0\r
- self.checkin_mutex = thread.allocate_lock()\r
- self.checkout_mutex = thread.allocate_lock()\r
- self.checkout_mutex.acquire()\r
-\r
- def enter(self):\r
- self.checkin_mutex.acquire()\r
- self.waiting = self.waiting + 1\r
- if self.waiting == self.num_threads:\r
- self.waiting = self.num_threads - 1\r
- self.checkout_mutex.release()\r
- return\r
- self.checkin_mutex.release()\r
-\r
- self.checkout_mutex.acquire()\r
- self.waiting = self.waiting - 1\r
- if self.waiting == 0:\r
- self.checkin_mutex.release()\r
- return\r
- self.checkout_mutex.release()\r
-\r
-\r
-class BarrierTest(BasicThreadTest):\r
-\r
- def test_barrier(self):\r
- self.bar = Barrier(NUMTASKS)\r
- self.running = NUMTASKS\r
- for i in range(NUMTASKS):\r
- thread.start_new_thread(self.task2, (i,))\r
- verbose_print("waiting for tasks to end")\r
- self.done_mutex.acquire()\r
- verbose_print("tasks done")\r
-\r
- def task2(self, ident):\r
- for i in range(NUMTRIPS):\r
- if ident == 0:\r
- # give it a good chance to enter the next\r
- # barrier before the others are all out\r
- # of the current one\r
- delay = 0\r
- else:\r
- with self.random_mutex:\r
- delay = random.random() / 10000.0\r
- verbose_print("task %s will run for %sus" %\r
- (ident, round(delay * 1e6)))\r
- time.sleep(delay)\r
- verbose_print("task %s entering %s" % (ident, i))\r
- self.bar.enter()\r
- verbose_print("task %s leaving barrier" % ident)\r
- with self.running_mutex:\r
- self.running -= 1\r
- # Must release mutex before releasing done, else the main thread can\r
- # exit and set mutex to None as part of global teardown; then\r
- # mutex.release() raises AttributeError.\r
- finished = self.running == 0\r
- if finished:\r
- self.done_mutex.release()\r
-\r
-\r
-class LockTests(lock_tests.LockTests):\r
- locktype = thread.allocate_lock\r
-\r
-\r
-class TestForkInThread(unittest.TestCase):\r
- def setUp(self):\r
- self.read_fd, self.write_fd = os.pipe()\r
-\r
- @unittest.skipIf(sys.platform.startswith('win'),\r
- "This test is only appropriate for POSIX-like systems.")\r
- @test_support.reap_threads\r
- def test_forkinthread(self):\r
- def thread1():\r
- try:\r
- pid = os.fork() # fork in a thread\r
- except RuntimeError:\r
- sys.exit(0) # exit the child\r
-\r
- if pid == 0: # child\r
- os.close(self.read_fd)\r
- os.write(self.write_fd, "OK")\r
- sys.exit(0)\r
- else: # parent\r
- os.close(self.write_fd)\r
-\r
- thread.start_new_thread(thread1, ())\r
- self.assertEqual(os.read(self.read_fd, 2), "OK",\r
- "Unable to fork() in thread")\r
-\r
- def tearDown(self):\r
- try:\r
- os.close(self.read_fd)\r
- except OSError:\r
- pass\r
-\r
- try:\r
- os.close(self.write_fd)\r
- except OSError:\r
- pass\r
-\r
-\r
-def test_main():\r
- test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,\r
- TestForkInThread)\r
-\r
-if __name__ == "__main__":\r
- test_main()\r