]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | # Some simple queue module tests, plus some failure conditions\r |
2 | # to ensure the Queue locks remain stable.\r | |
3 | import Queue\r | |
4 | import time\r | |
5 | import unittest\r | |
6 | from test import test_support\r | |
7 | threading = test_support.import_module('threading')\r | |
8 | \r | |
9 | QUEUE_SIZE = 5\r | |
10 | \r | |
11 | # A thread to run a function that unclogs a blocked Queue.\r | |
12 | class _TriggerThread(threading.Thread):\r | |
13 | def __init__(self, fn, args):\r | |
14 | self.fn = fn\r | |
15 | self.args = args\r | |
16 | self.startedEvent = threading.Event()\r | |
17 | threading.Thread.__init__(self)\r | |
18 | \r | |
19 | def run(self):\r | |
20 | # The sleep isn't necessary, but is intended to give the blocking\r | |
21 | # function in the main thread a chance at actually blocking before\r | |
22 | # we unclog it. But if the sleep is longer than the timeout-based\r | |
23 | # tests wait in their blocking functions, those tests will fail.\r | |
24 | # So we give them much longer timeout values compared to the\r | |
25 | # sleep here (I aimed at 10 seconds for blocking functions --\r | |
26 | # they should never actually wait that long - they should make\r | |
27 | # progress as soon as we call self.fn()).\r | |
28 | time.sleep(0.1)\r | |
29 | self.startedEvent.set()\r | |
30 | self.fn(*self.args)\r | |
31 | \r | |
32 | \r | |
33 | # Execute a function that blocks, and in a separate thread, a function that\r | |
34 | # triggers the release. Returns the result of the blocking function. Caution:\r | |
35 | # block_func must guarantee to block until trigger_func is called, and\r | |
36 | # trigger_func must guarantee to change queue state so that block_func can make\r | |
37 | # enough progress to return. In particular, a block_func that just raises an\r | |
38 | # exception regardless of whether trigger_func is called will lead to\r | |
39 | # timing-dependent sporadic failures, and one of those went rarely seen but\r | |
40 | # undiagnosed for years. Now block_func must be unexceptional. If block_func\r | |
41 | # is supposed to raise an exception, call do_exceptional_blocking_test()\r | |
42 | # instead.\r | |
43 | \r | |
44 | class BlockingTestMixin:\r | |
45 | \r | |
46 | def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):\r | |
47 | self.t = _TriggerThread(trigger_func, trigger_args)\r | |
48 | self.t.start()\r | |
49 | self.result = block_func(*block_args)\r | |
50 | # If block_func returned before our thread made the call, we failed!\r | |
51 | if not self.t.startedEvent.is_set():\r | |
52 | self.fail("blocking function '%r' appeared not to block" %\r | |
53 | block_func)\r | |
54 | self.t.join(10) # make sure the thread terminates\r | |
55 | if self.t.is_alive():\r | |
56 | self.fail("trigger function '%r' appeared to not return" %\r | |
57 | trigger_func)\r | |
58 | return self.result\r | |
59 | \r | |
60 | # Call this instead if block_func is supposed to raise an exception.\r | |
61 | def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,\r | |
62 | trigger_args, expected_exception_class):\r | |
63 | self.t = _TriggerThread(trigger_func, trigger_args)\r | |
64 | self.t.start()\r | |
65 | try:\r | |
66 | try:\r | |
67 | block_func(*block_args)\r | |
68 | except expected_exception_class:\r | |
69 | raise\r | |
70 | else:\r | |
71 | self.fail("expected exception of kind %r" %\r | |
72 | expected_exception_class)\r | |
73 | finally:\r | |
74 | self.t.join(10) # make sure the thread terminates\r | |
75 | if self.t.is_alive():\r | |
76 | self.fail("trigger function '%r' appeared to not return" %\r | |
77 | trigger_func)\r | |
78 | if not self.t.startedEvent.is_set():\r | |
79 | self.fail("trigger thread ended but event never set")\r | |
80 | \r | |
81 | \r | |
82 | class BaseQueueTest(unittest.TestCase, BlockingTestMixin):\r | |
83 | def setUp(self):\r | |
84 | self.cum = 0\r | |
85 | self.cumlock = threading.Lock()\r | |
86 | \r | |
87 | def simple_queue_test(self, q):\r | |
88 | if not q.empty():\r | |
89 | raise RuntimeError, "Call this function with an empty queue"\r | |
90 | # I guess we better check things actually queue correctly a little :)\r | |
91 | q.put(111)\r | |
92 | q.put(333)\r | |
93 | q.put(222)\r | |
94 | target_order = dict(Queue = [111, 333, 222],\r | |
95 | LifoQueue = [222, 333, 111],\r | |
96 | PriorityQueue = [111, 222, 333])\r | |
97 | actual_order = [q.get(), q.get(), q.get()]\r | |
98 | self.assertEqual(actual_order, target_order[q.__class__.__name__],\r | |
99 | "Didn't seem to queue the correct data!")\r | |
100 | for i in range(QUEUE_SIZE-1):\r | |
101 | q.put(i)\r | |
102 | self.assertTrue(not q.empty(), "Queue should not be empty")\r | |
103 | self.assertTrue(not q.full(), "Queue should not be full")\r | |
104 | last = 2 * QUEUE_SIZE\r | |
105 | full = 3 * 2 * QUEUE_SIZE\r | |
106 | q.put(last)\r | |
107 | self.assertTrue(q.full(), "Queue should be full")\r | |
108 | try:\r | |
109 | q.put(full, block=0)\r | |
110 | self.fail("Didn't appear to block with a full queue")\r | |
111 | except Queue.Full:\r | |
112 | pass\r | |
113 | try:\r | |
114 | q.put(full, timeout=0.01)\r | |
115 | self.fail("Didn't appear to time-out with a full queue")\r | |
116 | except Queue.Full:\r | |
117 | pass\r | |
118 | # Test a blocking put\r | |
119 | self.do_blocking_test(q.put, (full,), q.get, ())\r | |
120 | self.do_blocking_test(q.put, (full, True, 10), q.get, ())\r | |
121 | # Empty it\r | |
122 | for i in range(QUEUE_SIZE):\r | |
123 | q.get()\r | |
124 | self.assertTrue(q.empty(), "Queue should be empty")\r | |
125 | try:\r | |
126 | q.get(block=0)\r | |
127 | self.fail("Didn't appear to block with an empty queue")\r | |
128 | except Queue.Empty:\r | |
129 | pass\r | |
130 | try:\r | |
131 | q.get(timeout=0.01)\r | |
132 | self.fail("Didn't appear to time-out with an empty queue")\r | |
133 | except Queue.Empty:\r | |
134 | pass\r | |
135 | # Test a blocking get\r | |
136 | self.do_blocking_test(q.get, (), q.put, ('empty',))\r | |
137 | self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))\r | |
138 | \r | |
139 | \r | |
140 | def worker(self, q):\r | |
141 | while True:\r | |
142 | x = q.get()\r | |
143 | if x is None:\r | |
144 | q.task_done()\r | |
145 | return\r | |
146 | with self.cumlock:\r | |
147 | self.cum += x\r | |
148 | q.task_done()\r | |
149 | \r | |
150 | def queue_join_test(self, q):\r | |
151 | self.cum = 0\r | |
152 | for i in (0,1):\r | |
153 | threading.Thread(target=self.worker, args=(q,)).start()\r | |
154 | for i in xrange(100):\r | |
155 | q.put(i)\r | |
156 | q.join()\r | |
157 | self.assertEqual(self.cum, sum(range(100)),\r | |
158 | "q.join() did not block until all tasks were done")\r | |
159 | for i in (0,1):\r | |
160 | q.put(None) # instruct the threads to close\r | |
161 | q.join() # verify that you can join twice\r | |
162 | \r | |
163 | def test_queue_task_done(self):\r | |
164 | # Test to make sure a queue task completed successfully.\r | |
165 | q = self.type2test()\r | |
166 | try:\r | |
167 | q.task_done()\r | |
168 | except ValueError:\r | |
169 | pass\r | |
170 | else:\r | |
171 | self.fail("Did not detect task count going negative")\r | |
172 | \r | |
173 | def test_queue_join(self):\r | |
174 | # Test that a queue join()s successfully, and before anything else\r | |
175 | # (done twice for insurance).\r | |
176 | q = self.type2test()\r | |
177 | self.queue_join_test(q)\r | |
178 | self.queue_join_test(q)\r | |
179 | try:\r | |
180 | q.task_done()\r | |
181 | except ValueError:\r | |
182 | pass\r | |
183 | else:\r | |
184 | self.fail("Did not detect task count going negative")\r | |
185 | \r | |
186 | def test_simple_queue(self):\r | |
187 | # Do it a couple of times on the same queue.\r | |
188 | # Done twice to make sure works with same instance reused.\r | |
189 | q = self.type2test(QUEUE_SIZE)\r | |
190 | self.simple_queue_test(q)\r | |
191 | self.simple_queue_test(q)\r | |
192 | \r | |
193 | \r | |
194 | class QueueTest(BaseQueueTest):\r | |
195 | type2test = Queue.Queue\r | |
196 | \r | |
197 | class LifoQueueTest(BaseQueueTest):\r | |
198 | type2test = Queue.LifoQueue\r | |
199 | \r | |
200 | class PriorityQueueTest(BaseQueueTest):\r | |
201 | type2test = Queue.PriorityQueue\r | |
202 | \r | |
203 | \r | |
204 | \r | |
205 | # A Queue subclass that can provoke failure at a moment's notice :)\r | |
206 | class FailingQueueException(Exception):\r | |
207 | pass\r | |
208 | \r | |
209 | class FailingQueue(Queue.Queue):\r | |
210 | def __init__(self, *args):\r | |
211 | self.fail_next_put = False\r | |
212 | self.fail_next_get = False\r | |
213 | Queue.Queue.__init__(self, *args)\r | |
214 | def _put(self, item):\r | |
215 | if self.fail_next_put:\r | |
216 | self.fail_next_put = False\r | |
217 | raise FailingQueueException, "You Lose"\r | |
218 | return Queue.Queue._put(self, item)\r | |
219 | def _get(self):\r | |
220 | if self.fail_next_get:\r | |
221 | self.fail_next_get = False\r | |
222 | raise FailingQueueException, "You Lose"\r | |
223 | return Queue.Queue._get(self)\r | |
224 | \r | |
225 | class FailingQueueTest(unittest.TestCase, BlockingTestMixin):\r | |
226 | \r | |
227 | def failing_queue_test(self, q):\r | |
228 | if not q.empty():\r | |
229 | raise RuntimeError, "Call this function with an empty queue"\r | |
230 | for i in range(QUEUE_SIZE-1):\r | |
231 | q.put(i)\r | |
232 | # Test a failing non-blocking put.\r | |
233 | q.fail_next_put = True\r | |
234 | try:\r | |
235 | q.put("oops", block=0)\r | |
236 | self.fail("The queue didn't fail when it should have")\r | |
237 | except FailingQueueException:\r | |
238 | pass\r | |
239 | q.fail_next_put = True\r | |
240 | try:\r | |
241 | q.put("oops", timeout=0.1)\r | |
242 | self.fail("The queue didn't fail when it should have")\r | |
243 | except FailingQueueException:\r | |
244 | pass\r | |
245 | q.put("last")\r | |
246 | self.assertTrue(q.full(), "Queue should be full")\r | |
247 | # Test a failing blocking put\r | |
248 | q.fail_next_put = True\r | |
249 | try:\r | |
250 | self.do_blocking_test(q.put, ("full",), q.get, ())\r | |
251 | self.fail("The queue didn't fail when it should have")\r | |
252 | except FailingQueueException:\r | |
253 | pass\r | |
254 | # Check the Queue isn't damaged.\r | |
255 | # put failed, but get succeeded - re-add\r | |
256 | q.put("last")\r | |
257 | # Test a failing timeout put\r | |
258 | q.fail_next_put = True\r | |
259 | try:\r | |
260 | self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),\r | |
261 | FailingQueueException)\r | |
262 | self.fail("The queue didn't fail when it should have")\r | |
263 | except FailingQueueException:\r | |
264 | pass\r | |
265 | # Check the Queue isn't damaged.\r | |
266 | # put failed, but get succeeded - re-add\r | |
267 | q.put("last")\r | |
268 | self.assertTrue(q.full(), "Queue should be full")\r | |
269 | q.get()\r | |
270 | self.assertTrue(not q.full(), "Queue should not be full")\r | |
271 | q.put("last")\r | |
272 | self.assertTrue(q.full(), "Queue should be full")\r | |
273 | # Test a blocking put\r | |
274 | self.do_blocking_test(q.put, ("full",), q.get, ())\r | |
275 | # Empty it\r | |
276 | for i in range(QUEUE_SIZE):\r | |
277 | q.get()\r | |
278 | self.assertTrue(q.empty(), "Queue should be empty")\r | |
279 | q.put("first")\r | |
280 | q.fail_next_get = True\r | |
281 | try:\r | |
282 | q.get()\r | |
283 | self.fail("The queue didn't fail when it should have")\r | |
284 | except FailingQueueException:\r | |
285 | pass\r | |
286 | self.assertTrue(not q.empty(), "Queue should not be empty")\r | |
287 | q.fail_next_get = True\r | |
288 | try:\r | |
289 | q.get(timeout=0.1)\r | |
290 | self.fail("The queue didn't fail when it should have")\r | |
291 | except FailingQueueException:\r | |
292 | pass\r | |
293 | self.assertTrue(not q.empty(), "Queue should not be empty")\r | |
294 | q.get()\r | |
295 | self.assertTrue(q.empty(), "Queue should be empty")\r | |
296 | q.fail_next_get = True\r | |
297 | try:\r | |
298 | self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),\r | |
299 | FailingQueueException)\r | |
300 | self.fail("The queue didn't fail when it should have")\r | |
301 | except FailingQueueException:\r | |
302 | pass\r | |
303 | # put succeeded, but get failed.\r | |
304 | self.assertTrue(not q.empty(), "Queue should not be empty")\r | |
305 | q.get()\r | |
306 | self.assertTrue(q.empty(), "Queue should be empty")\r | |
307 | \r | |
308 | def test_failing_queue(self):\r | |
309 | # Test to make sure a queue is functioning correctly.\r | |
310 | # Done twice to the same instance.\r | |
311 | q = FailingQueue(QUEUE_SIZE)\r | |
312 | self.failing_queue_test(q)\r | |
313 | self.failing_queue_test(q)\r | |
314 | \r | |
315 | \r | |
316 | def test_main():\r | |
317 | test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,\r | |
318 | FailingQueueTest)\r | |
319 | \r | |
320 | \r | |
321 | if __name__ == "__main__":\r | |
322 | test_main()\r |