]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_queue.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_queue.py
CommitLineData
4710c53d 1# Some simple queue module tests, plus some failure conditions\r
2# to ensure the Queue locks remain stable.\r
3import Queue\r
4import time\r
5import unittest\r
6from test import test_support\r
7threading = test_support.import_module('threading')\r
8\r
9QUEUE_SIZE = 5\r
10\r
11# A thread to run a function that unclogs a blocked Queue.\r
12class _TriggerThread(threading.Thread):\r
13 def __init__(self, fn, args):\r
14 self.fn = fn\r
15 self.args = args\r
16 self.startedEvent = threading.Event()\r
17 threading.Thread.__init__(self)\r
18\r
19 def run(self):\r
20 # The sleep isn't necessary, but is intended to give the blocking\r
21 # function in the main thread a chance at actually blocking before\r
22 # we unclog it. But if the sleep is longer than the timeout-based\r
23 # tests wait in their blocking functions, those tests will fail.\r
24 # So we give them much longer timeout values compared to the\r
25 # sleep here (I aimed at 10 seconds for blocking functions --\r
26 # they should never actually wait that long - they should make\r
27 # progress as soon as we call self.fn()).\r
28 time.sleep(0.1)\r
29 self.startedEvent.set()\r
30 self.fn(*self.args)\r
31\r
32\r
33# Execute a function that blocks, and in a separate thread, a function that\r
34# triggers the release. Returns the result of the blocking function. Caution:\r
35# block_func must guarantee to block until trigger_func is called, and\r
36# trigger_func must guarantee to change queue state so that block_func can make\r
37# enough progress to return. In particular, a block_func that just raises an\r
38# exception regardless of whether trigger_func is called will lead to\r
39# timing-dependent sporadic failures, and one of those went rarely seen but\r
40# undiagnosed for years. Now block_func must be unexceptional. If block_func\r
41# is supposed to raise an exception, call do_exceptional_blocking_test()\r
42# instead.\r
43\r
44class BlockingTestMixin:\r
45\r
46 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):\r
47 self.t = _TriggerThread(trigger_func, trigger_args)\r
48 self.t.start()\r
49 self.result = block_func(*block_args)\r
50 # If block_func returned before our thread made the call, we failed!\r
51 if not self.t.startedEvent.is_set():\r
52 self.fail("blocking function '%r' appeared not to block" %\r
53 block_func)\r
54 self.t.join(10) # make sure the thread terminates\r
55 if self.t.is_alive():\r
56 self.fail("trigger function '%r' appeared to not return" %\r
57 trigger_func)\r
58 return self.result\r
59\r
60 # Call this instead if block_func is supposed to raise an exception.\r
61 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,\r
62 trigger_args, expected_exception_class):\r
63 self.t = _TriggerThread(trigger_func, trigger_args)\r
64 self.t.start()\r
65 try:\r
66 try:\r
67 block_func(*block_args)\r
68 except expected_exception_class:\r
69 raise\r
70 else:\r
71 self.fail("expected exception of kind %r" %\r
72 expected_exception_class)\r
73 finally:\r
74 self.t.join(10) # make sure the thread terminates\r
75 if self.t.is_alive():\r
76 self.fail("trigger function '%r' appeared to not return" %\r
77 trigger_func)\r
78 if not self.t.startedEvent.is_set():\r
79 self.fail("trigger thread ended but event never set")\r
80\r
81\r
82class BaseQueueTest(unittest.TestCase, BlockingTestMixin):\r
83 def setUp(self):\r
84 self.cum = 0\r
85 self.cumlock = threading.Lock()\r
86\r
87 def simple_queue_test(self, q):\r
88 if not q.empty():\r
89 raise RuntimeError, "Call this function with an empty queue"\r
90 # I guess we better check things actually queue correctly a little :)\r
91 q.put(111)\r
92 q.put(333)\r
93 q.put(222)\r
94 target_order = dict(Queue = [111, 333, 222],\r
95 LifoQueue = [222, 333, 111],\r
96 PriorityQueue = [111, 222, 333])\r
97 actual_order = [q.get(), q.get(), q.get()]\r
98 self.assertEqual(actual_order, target_order[q.__class__.__name__],\r
99 "Didn't seem to queue the correct data!")\r
100 for i in range(QUEUE_SIZE-1):\r
101 q.put(i)\r
102 self.assertTrue(not q.empty(), "Queue should not be empty")\r
103 self.assertTrue(not q.full(), "Queue should not be full")\r
104 last = 2 * QUEUE_SIZE\r
105 full = 3 * 2 * QUEUE_SIZE\r
106 q.put(last)\r
107 self.assertTrue(q.full(), "Queue should be full")\r
108 try:\r
109 q.put(full, block=0)\r
110 self.fail("Didn't appear to block with a full queue")\r
111 except Queue.Full:\r
112 pass\r
113 try:\r
114 q.put(full, timeout=0.01)\r
115 self.fail("Didn't appear to time-out with a full queue")\r
116 except Queue.Full:\r
117 pass\r
118 # Test a blocking put\r
119 self.do_blocking_test(q.put, (full,), q.get, ())\r
120 self.do_blocking_test(q.put, (full, True, 10), q.get, ())\r
121 # Empty it\r
122 for i in range(QUEUE_SIZE):\r
123 q.get()\r
124 self.assertTrue(q.empty(), "Queue should be empty")\r
125 try:\r
126 q.get(block=0)\r
127 self.fail("Didn't appear to block with an empty queue")\r
128 except Queue.Empty:\r
129 pass\r
130 try:\r
131 q.get(timeout=0.01)\r
132 self.fail("Didn't appear to time-out with an empty queue")\r
133 except Queue.Empty:\r
134 pass\r
135 # Test a blocking get\r
136 self.do_blocking_test(q.get, (), q.put, ('empty',))\r
137 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))\r
138\r
139\r
140 def worker(self, q):\r
141 while True:\r
142 x = q.get()\r
143 if x is None:\r
144 q.task_done()\r
145 return\r
146 with self.cumlock:\r
147 self.cum += x\r
148 q.task_done()\r
149\r
150 def queue_join_test(self, q):\r
151 self.cum = 0\r
152 for i in (0,1):\r
153 threading.Thread(target=self.worker, args=(q,)).start()\r
154 for i in xrange(100):\r
155 q.put(i)\r
156 q.join()\r
157 self.assertEqual(self.cum, sum(range(100)),\r
158 "q.join() did not block until all tasks were done")\r
159 for i in (0,1):\r
160 q.put(None) # instruct the threads to close\r
161 q.join() # verify that you can join twice\r
162\r
163 def test_queue_task_done(self):\r
164 # Test to make sure a queue task completed successfully.\r
165 q = self.type2test()\r
166 try:\r
167 q.task_done()\r
168 except ValueError:\r
169 pass\r
170 else:\r
171 self.fail("Did not detect task count going negative")\r
172\r
173 def test_queue_join(self):\r
174 # Test that a queue join()s successfully, and before anything else\r
175 # (done twice for insurance).\r
176 q = self.type2test()\r
177 self.queue_join_test(q)\r
178 self.queue_join_test(q)\r
179 try:\r
180 q.task_done()\r
181 except ValueError:\r
182 pass\r
183 else:\r
184 self.fail("Did not detect task count going negative")\r
185\r
186 def test_simple_queue(self):\r
187 # Do it a couple of times on the same queue.\r
188 # Done twice to make sure works with same instance reused.\r
189 q = self.type2test(QUEUE_SIZE)\r
190 self.simple_queue_test(q)\r
191 self.simple_queue_test(q)\r
192\r
193\r
194class QueueTest(BaseQueueTest):\r
195 type2test = Queue.Queue\r
196\r
197class LifoQueueTest(BaseQueueTest):\r
198 type2test = Queue.LifoQueue\r
199\r
200class PriorityQueueTest(BaseQueueTest):\r
201 type2test = Queue.PriorityQueue\r
202\r
203\r
204\r
205# A Queue subclass that can provoke failure at a moment's notice :)\r
206class FailingQueueException(Exception):\r
207 pass\r
208\r
209class FailingQueue(Queue.Queue):\r
210 def __init__(self, *args):\r
211 self.fail_next_put = False\r
212 self.fail_next_get = False\r
213 Queue.Queue.__init__(self, *args)\r
214 def _put(self, item):\r
215 if self.fail_next_put:\r
216 self.fail_next_put = False\r
217 raise FailingQueueException, "You Lose"\r
218 return Queue.Queue._put(self, item)\r
219 def _get(self):\r
220 if self.fail_next_get:\r
221 self.fail_next_get = False\r
222 raise FailingQueueException, "You Lose"\r
223 return Queue.Queue._get(self)\r
224\r
225class FailingQueueTest(unittest.TestCase, BlockingTestMixin):\r
226\r
227 def failing_queue_test(self, q):\r
228 if not q.empty():\r
229 raise RuntimeError, "Call this function with an empty queue"\r
230 for i in range(QUEUE_SIZE-1):\r
231 q.put(i)\r
232 # Test a failing non-blocking put.\r
233 q.fail_next_put = True\r
234 try:\r
235 q.put("oops", block=0)\r
236 self.fail("The queue didn't fail when it should have")\r
237 except FailingQueueException:\r
238 pass\r
239 q.fail_next_put = True\r
240 try:\r
241 q.put("oops", timeout=0.1)\r
242 self.fail("The queue didn't fail when it should have")\r
243 except FailingQueueException:\r
244 pass\r
245 q.put("last")\r
246 self.assertTrue(q.full(), "Queue should be full")\r
247 # Test a failing blocking put\r
248 q.fail_next_put = True\r
249 try:\r
250 self.do_blocking_test(q.put, ("full",), q.get, ())\r
251 self.fail("The queue didn't fail when it should have")\r
252 except FailingQueueException:\r
253 pass\r
254 # Check the Queue isn't damaged.\r
255 # put failed, but get succeeded - re-add\r
256 q.put("last")\r
257 # Test a failing timeout put\r
258 q.fail_next_put = True\r
259 try:\r
260 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),\r
261 FailingQueueException)\r
262 self.fail("The queue didn't fail when it should have")\r
263 except FailingQueueException:\r
264 pass\r
265 # Check the Queue isn't damaged.\r
266 # put failed, but get succeeded - re-add\r
267 q.put("last")\r
268 self.assertTrue(q.full(), "Queue should be full")\r
269 q.get()\r
270 self.assertTrue(not q.full(), "Queue should not be full")\r
271 q.put("last")\r
272 self.assertTrue(q.full(), "Queue should be full")\r
273 # Test a blocking put\r
274 self.do_blocking_test(q.put, ("full",), q.get, ())\r
275 # Empty it\r
276 for i in range(QUEUE_SIZE):\r
277 q.get()\r
278 self.assertTrue(q.empty(), "Queue should be empty")\r
279 q.put("first")\r
280 q.fail_next_get = True\r
281 try:\r
282 q.get()\r
283 self.fail("The queue didn't fail when it should have")\r
284 except FailingQueueException:\r
285 pass\r
286 self.assertTrue(not q.empty(), "Queue should not be empty")\r
287 q.fail_next_get = True\r
288 try:\r
289 q.get(timeout=0.1)\r
290 self.fail("The queue didn't fail when it should have")\r
291 except FailingQueueException:\r
292 pass\r
293 self.assertTrue(not q.empty(), "Queue should not be empty")\r
294 q.get()\r
295 self.assertTrue(q.empty(), "Queue should be empty")\r
296 q.fail_next_get = True\r
297 try:\r
298 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),\r
299 FailingQueueException)\r
300 self.fail("The queue didn't fail when it should have")\r
301 except FailingQueueException:\r
302 pass\r
303 # put succeeded, but get failed.\r
304 self.assertTrue(not q.empty(), "Queue should not be empty")\r
305 q.get()\r
306 self.assertTrue(q.empty(), "Queue should be empty")\r
307\r
308 def test_failing_queue(self):\r
309 # Test to make sure a queue is functioning correctly.\r
310 # Done twice to the same instance.\r
311 q = FailingQueue(QUEUE_SIZE)\r
312 self.failing_queue_test(q)\r
313 self.failing_queue_test(q)\r
314\r
315\r
316def test_main():\r
317 test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,\r
318 FailingQueueTest)\r
319\r
320\r
321if __name__ == "__main__":\r
322 test_main()\r