]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_thread.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_thread.py
CommitLineData
4710c53d 1import os\r
2import unittest\r
3import random\r
4from test import test_support\r
5thread = test_support.import_module('thread')\r
6import time\r
7import sys\r
8import weakref\r
9\r
10from test import lock_tests\r
11\r
12NUMTASKS = 10\r
13NUMTRIPS = 3\r
14\r
15\r
16_print_mutex = thread.allocate_lock()\r
17\r
18def verbose_print(arg):\r
19 """Helper function for printing out debugging output."""\r
20 if test_support.verbose:\r
21 with _print_mutex:\r
22 print arg\r
23\r
24\r
25class BasicThreadTest(unittest.TestCase):\r
26\r
27 def setUp(self):\r
28 self.done_mutex = thread.allocate_lock()\r
29 self.done_mutex.acquire()\r
30 self.running_mutex = thread.allocate_lock()\r
31 self.random_mutex = thread.allocate_lock()\r
32 self.created = 0\r
33 self.running = 0\r
34 self.next_ident = 0\r
35\r
36\r
37class ThreadRunningTests(BasicThreadTest):\r
38\r
39 def newtask(self):\r
40 with self.running_mutex:\r
41 self.next_ident += 1\r
42 verbose_print("creating task %s" % self.next_ident)\r
43 thread.start_new_thread(self.task, (self.next_ident,))\r
44 self.created += 1\r
45 self.running += 1\r
46\r
47 def task(self, ident):\r
48 with self.random_mutex:\r
49 delay = random.random() / 10000.0\r
50 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))\r
51 time.sleep(delay)\r
52 verbose_print("task %s done" % ident)\r
53 with self.running_mutex:\r
54 self.running -= 1\r
55 if self.created == NUMTASKS and self.running == 0:\r
56 self.done_mutex.release()\r
57\r
58 def test_starting_threads(self):\r
59 # Basic test for thread creation.\r
60 for i in range(NUMTASKS):\r
61 self.newtask()\r
62 verbose_print("waiting for tasks to complete...")\r
63 self.done_mutex.acquire()\r
64 verbose_print("all tasks done")\r
65\r
66 def test_stack_size(self):\r
67 # Various stack size tests.\r
68 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")\r
69\r
70 thread.stack_size(0)\r
71 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")\r
72\r
73 if os.name not in ("nt", "os2", "posix"):\r
74 return\r
75\r
76 tss_supported = True\r
77 try:\r
78 thread.stack_size(4096)\r
79 except ValueError:\r
80 verbose_print("caught expected ValueError setting "\r
81 "stack_size(4096)")\r
82 except thread.error:\r
83 tss_supported = False\r
84 verbose_print("platform does not support changing thread stack "\r
85 "size")\r
86\r
87 if tss_supported:\r
88 fail_msg = "stack_size(%d) failed - should succeed"\r
89 for tss in (262144, 0x100000, 0):\r
90 thread.stack_size(tss)\r
91 self.assertEqual(thread.stack_size(), tss, fail_msg % tss)\r
92 verbose_print("successfully set stack_size(%d)" % tss)\r
93\r
94 for tss in (262144, 0x100000):\r
95 verbose_print("trying stack_size = (%d)" % tss)\r
96 self.next_ident = 0\r
97 self.created = 0\r
98 for i in range(NUMTASKS):\r
99 self.newtask()\r
100\r
101 verbose_print("waiting for all tasks to complete")\r
102 self.done_mutex.acquire()\r
103 verbose_print("all tasks done")\r
104\r
105 thread.stack_size(0)\r
106\r
107 def test__count(self):\r
108 # Test the _count() function.\r
109 orig = thread._count()\r
110 mut = thread.allocate_lock()\r
111 mut.acquire()\r
112 started = []\r
113 def task():\r
114 started.append(None)\r
115 mut.acquire()\r
116 mut.release()\r
117 thread.start_new_thread(task, ())\r
118 while not started:\r
119 time.sleep(0.01)\r
120 self.assertEqual(thread._count(), orig + 1)\r
121 # Allow the task to finish.\r
122 mut.release()\r
123 # The only reliable way to be sure that the thread ended from the\r
124 # interpreter's point of view is to wait for the function object to be\r
125 # destroyed.\r
126 done = []\r
127 wr = weakref.ref(task, lambda _: done.append(None))\r
128 del task\r
129 while not done:\r
130 time.sleep(0.01)\r
131 self.assertEqual(thread._count(), orig)\r
132\r
133\r
134class Barrier:\r
135 def __init__(self, num_threads):\r
136 self.num_threads = num_threads\r
137 self.waiting = 0\r
138 self.checkin_mutex = thread.allocate_lock()\r
139 self.checkout_mutex = thread.allocate_lock()\r
140 self.checkout_mutex.acquire()\r
141\r
142 def enter(self):\r
143 self.checkin_mutex.acquire()\r
144 self.waiting = self.waiting + 1\r
145 if self.waiting == self.num_threads:\r
146 self.waiting = self.num_threads - 1\r
147 self.checkout_mutex.release()\r
148 return\r
149 self.checkin_mutex.release()\r
150\r
151 self.checkout_mutex.acquire()\r
152 self.waiting = self.waiting - 1\r
153 if self.waiting == 0:\r
154 self.checkin_mutex.release()\r
155 return\r
156 self.checkout_mutex.release()\r
157\r
158\r
159class BarrierTest(BasicThreadTest):\r
160\r
161 def test_barrier(self):\r
162 self.bar = Barrier(NUMTASKS)\r
163 self.running = NUMTASKS\r
164 for i in range(NUMTASKS):\r
165 thread.start_new_thread(self.task2, (i,))\r
166 verbose_print("waiting for tasks to end")\r
167 self.done_mutex.acquire()\r
168 verbose_print("tasks done")\r
169\r
170 def task2(self, ident):\r
171 for i in range(NUMTRIPS):\r
172 if ident == 0:\r
173 # give it a good chance to enter the next\r
174 # barrier before the others are all out\r
175 # of the current one\r
176 delay = 0\r
177 else:\r
178 with self.random_mutex:\r
179 delay = random.random() / 10000.0\r
180 verbose_print("task %s will run for %sus" %\r
181 (ident, round(delay * 1e6)))\r
182 time.sleep(delay)\r
183 verbose_print("task %s entering %s" % (ident, i))\r
184 self.bar.enter()\r
185 verbose_print("task %s leaving barrier" % ident)\r
186 with self.running_mutex:\r
187 self.running -= 1\r
188 # Must release mutex before releasing done, else the main thread can\r
189 # exit and set mutex to None as part of global teardown; then\r
190 # mutex.release() raises AttributeError.\r
191 finished = self.running == 0\r
192 if finished:\r
193 self.done_mutex.release()\r
194\r
195\r
196class LockTests(lock_tests.LockTests):\r
197 locktype = thread.allocate_lock\r
198\r
199\r
200class TestForkInThread(unittest.TestCase):\r
201 def setUp(self):\r
202 self.read_fd, self.write_fd = os.pipe()\r
203\r
204 @unittest.skipIf(sys.platform.startswith('win'),\r
205 "This test is only appropriate for POSIX-like systems.")\r
206 @test_support.reap_threads\r
207 def test_forkinthread(self):\r
208 def thread1():\r
209 try:\r
210 pid = os.fork() # fork in a thread\r
211 except RuntimeError:\r
212 sys.exit(0) # exit the child\r
213\r
214 if pid == 0: # child\r
215 os.close(self.read_fd)\r
216 os.write(self.write_fd, "OK")\r
217 sys.exit(0)\r
218 else: # parent\r
219 os.close(self.write_fd)\r
220\r
221 thread.start_new_thread(thread1, ())\r
222 self.assertEqual(os.read(self.read_fd, 2), "OK",\r
223 "Unable to fork() in thread")\r
224\r
225 def tearDown(self):\r
226 try:\r
227 os.close(self.read_fd)\r
228 except OSError:\r
229 pass\r
230\r
231 try:\r
232 os.close(self.write_fd)\r
233 except OSError:\r
234 pass\r
235\r
236\r
237def test_main():\r
238 test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,\r
239 TestForkInThread)\r
240\r
241if __name__ == "__main__":\r
242 test_main()\r