]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | import gc\r |
2 | import os\r | |
3 | import sys\r | |
4 | import signal\r | |
5 | import weakref\r | |
6 | \r | |
7 | from cStringIO import StringIO\r | |
8 | \r | |
9 | \r | |
10 | import unittest\r | |
11 | \r | |
12 | \r | |
13 | @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")\r | |
14 | @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")\r | |
15 | @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "\r | |
16 | "if threads have been used")\r | |
17 | class TestBreak(unittest.TestCase):\r | |
18 | \r | |
19 | def setUp(self):\r | |
20 | self._default_handler = signal.getsignal(signal.SIGINT)\r | |
21 | \r | |
22 | def tearDown(self):\r | |
23 | signal.signal(signal.SIGINT, self._default_handler)\r | |
24 | unittest.signals._results = weakref.WeakKeyDictionary()\r | |
25 | unittest.signals._interrupt_handler = None\r | |
26 | \r | |
27 | \r | |
28 | def testInstallHandler(self):\r | |
29 | default_handler = signal.getsignal(signal.SIGINT)\r | |
30 | unittest.installHandler()\r | |
31 | self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)\r | |
32 | \r | |
33 | try:\r | |
34 | pid = os.getpid()\r | |
35 | os.kill(pid, signal.SIGINT)\r | |
36 | except KeyboardInterrupt:\r | |
37 | self.fail("KeyboardInterrupt not handled")\r | |
38 | \r | |
39 | self.assertTrue(unittest.signals._interrupt_handler.called)\r | |
40 | \r | |
41 | def testRegisterResult(self):\r | |
42 | result = unittest.TestResult()\r | |
43 | unittest.registerResult(result)\r | |
44 | \r | |
45 | for ref in unittest.signals._results:\r | |
46 | if ref is result:\r | |
47 | break\r | |
48 | elif ref is not result:\r | |
49 | self.fail("odd object in result set")\r | |
50 | else:\r | |
51 | self.fail("result not found")\r | |
52 | \r | |
53 | \r | |
54 | def testInterruptCaught(self):\r | |
55 | default_handler = signal.getsignal(signal.SIGINT)\r | |
56 | \r | |
57 | result = unittest.TestResult()\r | |
58 | unittest.installHandler()\r | |
59 | unittest.registerResult(result)\r | |
60 | \r | |
61 | self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)\r | |
62 | \r | |
63 | def test(result):\r | |
64 | pid = os.getpid()\r | |
65 | os.kill(pid, signal.SIGINT)\r | |
66 | result.breakCaught = True\r | |
67 | self.assertTrue(result.shouldStop)\r | |
68 | \r | |
69 | try:\r | |
70 | test(result)\r | |
71 | except KeyboardInterrupt:\r | |
72 | self.fail("KeyboardInterrupt not handled")\r | |
73 | self.assertTrue(result.breakCaught)\r | |
74 | \r | |
75 | \r | |
76 | def testSecondInterrupt(self):\r | |
77 | result = unittest.TestResult()\r | |
78 | unittest.installHandler()\r | |
79 | unittest.registerResult(result)\r | |
80 | \r | |
81 | def test(result):\r | |
82 | pid = os.getpid()\r | |
83 | os.kill(pid, signal.SIGINT)\r | |
84 | result.breakCaught = True\r | |
85 | self.assertTrue(result.shouldStop)\r | |
86 | os.kill(pid, signal.SIGINT)\r | |
87 | self.fail("Second KeyboardInterrupt not raised")\r | |
88 | \r | |
89 | try:\r | |
90 | test(result)\r | |
91 | except KeyboardInterrupt:\r | |
92 | pass\r | |
93 | else:\r | |
94 | self.fail("Second KeyboardInterrupt not raised")\r | |
95 | self.assertTrue(result.breakCaught)\r | |
96 | \r | |
97 | \r | |
98 | def testTwoResults(self):\r | |
99 | unittest.installHandler()\r | |
100 | \r | |
101 | result = unittest.TestResult()\r | |
102 | unittest.registerResult(result)\r | |
103 | new_handler = signal.getsignal(signal.SIGINT)\r | |
104 | \r | |
105 | result2 = unittest.TestResult()\r | |
106 | unittest.registerResult(result2)\r | |
107 | self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)\r | |
108 | \r | |
109 | result3 = unittest.TestResult()\r | |
110 | \r | |
111 | def test(result):\r | |
112 | pid = os.getpid()\r | |
113 | os.kill(pid, signal.SIGINT)\r | |
114 | \r | |
115 | try:\r | |
116 | test(result)\r | |
117 | except KeyboardInterrupt:\r | |
118 | self.fail("KeyboardInterrupt not handled")\r | |
119 | \r | |
120 | self.assertTrue(result.shouldStop)\r | |
121 | self.assertTrue(result2.shouldStop)\r | |
122 | self.assertFalse(result3.shouldStop)\r | |
123 | \r | |
124 | \r | |
125 | def testHandlerReplacedButCalled(self):\r | |
126 | # If our handler has been replaced (is no longer installed) but is\r | |
127 | # called by the *new* handler, then it isn't safe to delay the\r | |
128 | # SIGINT and we should immediately delegate to the default handler\r | |
129 | unittest.installHandler()\r | |
130 | \r | |
131 | handler = signal.getsignal(signal.SIGINT)\r | |
132 | def new_handler(frame, signum):\r | |
133 | handler(frame, signum)\r | |
134 | signal.signal(signal.SIGINT, new_handler)\r | |
135 | \r | |
136 | try:\r | |
137 | pid = os.getpid()\r | |
138 | os.kill(pid, signal.SIGINT)\r | |
139 | except KeyboardInterrupt:\r | |
140 | pass\r | |
141 | else:\r | |
142 | self.fail("replaced but delegated handler doesn't raise interrupt")\r | |
143 | \r | |
144 | def testRunner(self):\r | |
145 | # Creating a TextTestRunner with the appropriate argument should\r | |
146 | # register the TextTestResult it creates\r | |
147 | runner = unittest.TextTestRunner(stream=StringIO())\r | |
148 | \r | |
149 | result = runner.run(unittest.TestSuite())\r | |
150 | self.assertIn(result, unittest.signals._results)\r | |
151 | \r | |
152 | def testWeakReferences(self):\r | |
153 | # Calling registerResult on a result should not keep it alive\r | |
154 | result = unittest.TestResult()\r | |
155 | unittest.registerResult(result)\r | |
156 | \r | |
157 | ref = weakref.ref(result)\r | |
158 | del result\r | |
159 | \r | |
160 | # For non-reference counting implementations\r | |
161 | gc.collect();gc.collect()\r | |
162 | self.assertIsNone(ref())\r | |
163 | \r | |
164 | \r | |
165 | def testRemoveResult(self):\r | |
166 | result = unittest.TestResult()\r | |
167 | unittest.registerResult(result)\r | |
168 | \r | |
169 | unittest.installHandler()\r | |
170 | self.assertTrue(unittest.removeResult(result))\r | |
171 | \r | |
172 | # Should this raise an error instead?\r | |
173 | self.assertFalse(unittest.removeResult(unittest.TestResult()))\r | |
174 | \r | |
175 | try:\r | |
176 | pid = os.getpid()\r | |
177 | os.kill(pid, signal.SIGINT)\r | |
178 | except KeyboardInterrupt:\r | |
179 | pass\r | |
180 | \r | |
181 | self.assertFalse(result.shouldStop)\r | |
182 | \r | |
183 | def testMainInstallsHandler(self):\r | |
184 | failfast = object()\r | |
185 | test = object()\r | |
186 | verbosity = object()\r | |
187 | result = object()\r | |
188 | default_handler = signal.getsignal(signal.SIGINT)\r | |
189 | \r | |
190 | class FakeRunner(object):\r | |
191 | initArgs = []\r | |
192 | runArgs = []\r | |
193 | def __init__(self, *args, **kwargs):\r | |
194 | self.initArgs.append((args, kwargs))\r | |
195 | def run(self, test):\r | |
196 | self.runArgs.append(test)\r | |
197 | return result\r | |
198 | \r | |
199 | class Program(unittest.TestProgram):\r | |
200 | def __init__(self, catchbreak):\r | |
201 | self.exit = False\r | |
202 | self.verbosity = verbosity\r | |
203 | self.failfast = failfast\r | |
204 | self.catchbreak = catchbreak\r | |
205 | self.testRunner = FakeRunner\r | |
206 | self.test = test\r | |
207 | self.result = None\r | |
208 | \r | |
209 | p = Program(False)\r | |
210 | p.runTests()\r | |
211 | \r | |
212 | self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,\r | |
213 | 'verbosity': verbosity,\r | |
214 | 'failfast': failfast})])\r | |
215 | self.assertEqual(FakeRunner.runArgs, [test])\r | |
216 | self.assertEqual(p.result, result)\r | |
217 | \r | |
218 | self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)\r | |
219 | \r | |
220 | FakeRunner.initArgs = []\r | |
221 | FakeRunner.runArgs = []\r | |
222 | p = Program(True)\r | |
223 | p.runTests()\r | |
224 | \r | |
225 | self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,\r | |
226 | 'verbosity': verbosity,\r | |
227 | 'failfast': failfast})])\r | |
228 | self.assertEqual(FakeRunner.runArgs, [test])\r | |
229 | self.assertEqual(p.result, result)\r | |
230 | \r | |
231 | self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)\r | |
232 | \r | |
233 | def testRemoveHandler(self):\r | |
234 | default_handler = signal.getsignal(signal.SIGINT)\r | |
235 | unittest.installHandler()\r | |
236 | unittest.removeHandler()\r | |
237 | self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)\r | |
238 | \r | |
239 | # check that calling removeHandler multiple times has no ill-effect\r | |
240 | unittest.removeHandler()\r | |
241 | self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)\r | |
242 | \r | |
243 | def testRemoveHandlerAsDecorator(self):\r | |
244 | default_handler = signal.getsignal(signal.SIGINT)\r | |
245 | unittest.installHandler()\r | |
246 | \r | |
247 | @unittest.removeHandler\r | |
248 | def test():\r | |
249 | self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)\r | |
250 | \r | |
251 | test()\r | |
252 | self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)\r |