]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | """Unit tests for contextlib.py, and other context managers."""\r |
2 | \r | |
3 | import sys\r | |
4 | import tempfile\r | |
5 | import unittest\r | |
6 | from contextlib import * # Tests __all__\r | |
7 | from test import test_support\r | |
8 | try:\r | |
9 | import threading\r | |
10 | except ImportError:\r | |
11 | threading = None\r | |
12 | \r | |
13 | \r | |
14 | class ContextManagerTestCase(unittest.TestCase):\r | |
15 | \r | |
16 | def test_contextmanager_plain(self):\r | |
17 | state = []\r | |
18 | @contextmanager\r | |
19 | def woohoo():\r | |
20 | state.append(1)\r | |
21 | yield 42\r | |
22 | state.append(999)\r | |
23 | with woohoo() as x:\r | |
24 | self.assertEqual(state, [1])\r | |
25 | self.assertEqual(x, 42)\r | |
26 | state.append(x)\r | |
27 | self.assertEqual(state, [1, 42, 999])\r | |
28 | \r | |
29 | def test_contextmanager_finally(self):\r | |
30 | state = []\r | |
31 | @contextmanager\r | |
32 | def woohoo():\r | |
33 | state.append(1)\r | |
34 | try:\r | |
35 | yield 42\r | |
36 | finally:\r | |
37 | state.append(999)\r | |
38 | with self.assertRaises(ZeroDivisionError):\r | |
39 | with woohoo() as x:\r | |
40 | self.assertEqual(state, [1])\r | |
41 | self.assertEqual(x, 42)\r | |
42 | state.append(x)\r | |
43 | raise ZeroDivisionError()\r | |
44 | self.assertEqual(state, [1, 42, 999])\r | |
45 | \r | |
46 | def test_contextmanager_no_reraise(self):\r | |
47 | @contextmanager\r | |
48 | def whee():\r | |
49 | yield\r | |
50 | ctx = whee()\r | |
51 | ctx.__enter__()\r | |
52 | # Calling __exit__ should not result in an exception\r | |
53 | self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))\r | |
54 | \r | |
55 | def test_contextmanager_trap_yield_after_throw(self):\r | |
56 | @contextmanager\r | |
57 | def whoo():\r | |
58 | try:\r | |
59 | yield\r | |
60 | except:\r | |
61 | yield\r | |
62 | ctx = whoo()\r | |
63 | ctx.__enter__()\r | |
64 | self.assertRaises(\r | |
65 | RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None\r | |
66 | )\r | |
67 | \r | |
68 | def test_contextmanager_except(self):\r | |
69 | state = []\r | |
70 | @contextmanager\r | |
71 | def woohoo():\r | |
72 | state.append(1)\r | |
73 | try:\r | |
74 | yield 42\r | |
75 | except ZeroDivisionError, e:\r | |
76 | state.append(e.args[0])\r | |
77 | self.assertEqual(state, [1, 42, 999])\r | |
78 | with woohoo() as x:\r | |
79 | self.assertEqual(state, [1])\r | |
80 | self.assertEqual(x, 42)\r | |
81 | state.append(x)\r | |
82 | raise ZeroDivisionError(999)\r | |
83 | self.assertEqual(state, [1, 42, 999])\r | |
84 | \r | |
85 | def _create_contextmanager_attribs(self):\r | |
86 | def attribs(**kw):\r | |
87 | def decorate(func):\r | |
88 | for k,v in kw.items():\r | |
89 | setattr(func,k,v)\r | |
90 | return func\r | |
91 | return decorate\r | |
92 | @contextmanager\r | |
93 | @attribs(foo='bar')\r | |
94 | def baz(spam):\r | |
95 | """Whee!"""\r | |
96 | return baz\r | |
97 | \r | |
98 | def test_contextmanager_attribs(self):\r | |
99 | baz = self._create_contextmanager_attribs()\r | |
100 | self.assertEqual(baz.__name__,'baz')\r | |
101 | self.assertEqual(baz.foo, 'bar')\r | |
102 | \r | |
103 | @unittest.skipIf(sys.flags.optimize >= 2,\r | |
104 | "Docstrings are omitted with -O2 and above")\r | |
105 | def test_contextmanager_doc_attrib(self):\r | |
106 | baz = self._create_contextmanager_attribs()\r | |
107 | self.assertEqual(baz.__doc__, "Whee!")\r | |
108 | \r | |
109 | class NestedTestCase(unittest.TestCase):\r | |
110 | \r | |
111 | # XXX This needs more work\r | |
112 | \r | |
113 | def test_nested(self):\r | |
114 | @contextmanager\r | |
115 | def a():\r | |
116 | yield 1\r | |
117 | @contextmanager\r | |
118 | def b():\r | |
119 | yield 2\r | |
120 | @contextmanager\r | |
121 | def c():\r | |
122 | yield 3\r | |
123 | with nested(a(), b(), c()) as (x, y, z):\r | |
124 | self.assertEqual(x, 1)\r | |
125 | self.assertEqual(y, 2)\r | |
126 | self.assertEqual(z, 3)\r | |
127 | \r | |
128 | def test_nested_cleanup(self):\r | |
129 | state = []\r | |
130 | @contextmanager\r | |
131 | def a():\r | |
132 | state.append(1)\r | |
133 | try:\r | |
134 | yield 2\r | |
135 | finally:\r | |
136 | state.append(3)\r | |
137 | @contextmanager\r | |
138 | def b():\r | |
139 | state.append(4)\r | |
140 | try:\r | |
141 | yield 5\r | |
142 | finally:\r | |
143 | state.append(6)\r | |
144 | with self.assertRaises(ZeroDivisionError):\r | |
145 | with nested(a(), b()) as (x, y):\r | |
146 | state.append(x)\r | |
147 | state.append(y)\r | |
148 | 1 // 0\r | |
149 | self.assertEqual(state, [1, 4, 2, 5, 6, 3])\r | |
150 | \r | |
151 | def test_nested_right_exception(self):\r | |
152 | @contextmanager\r | |
153 | def a():\r | |
154 | yield 1\r | |
155 | class b(object):\r | |
156 | def __enter__(self):\r | |
157 | return 2\r | |
158 | def __exit__(self, *exc_info):\r | |
159 | try:\r | |
160 | raise Exception()\r | |
161 | except:\r | |
162 | pass\r | |
163 | with self.assertRaises(ZeroDivisionError):\r | |
164 | with nested(a(), b()) as (x, y):\r | |
165 | 1 // 0\r | |
166 | self.assertEqual((x, y), (1, 2))\r | |
167 | \r | |
168 | def test_nested_b_swallows(self):\r | |
169 | @contextmanager\r | |
170 | def a():\r | |
171 | yield\r | |
172 | @contextmanager\r | |
173 | def b():\r | |
174 | try:\r | |
175 | yield\r | |
176 | except:\r | |
177 | # Swallow the exception\r | |
178 | pass\r | |
179 | try:\r | |
180 | with nested(a(), b()):\r | |
181 | 1 // 0\r | |
182 | except ZeroDivisionError:\r | |
183 | self.fail("Didn't swallow ZeroDivisionError")\r | |
184 | \r | |
185 | def test_nested_break(self):\r | |
186 | @contextmanager\r | |
187 | def a():\r | |
188 | yield\r | |
189 | state = 0\r | |
190 | while True:\r | |
191 | state += 1\r | |
192 | with nested(a(), a()):\r | |
193 | break\r | |
194 | state += 10\r | |
195 | self.assertEqual(state, 1)\r | |
196 | \r | |
197 | def test_nested_continue(self):\r | |
198 | @contextmanager\r | |
199 | def a():\r | |
200 | yield\r | |
201 | state = 0\r | |
202 | while state < 3:\r | |
203 | state += 1\r | |
204 | with nested(a(), a()):\r | |
205 | continue\r | |
206 | state += 10\r | |
207 | self.assertEqual(state, 3)\r | |
208 | \r | |
209 | def test_nested_return(self):\r | |
210 | @contextmanager\r | |
211 | def a():\r | |
212 | try:\r | |
213 | yield\r | |
214 | except:\r | |
215 | pass\r | |
216 | def foo():\r | |
217 | with nested(a(), a()):\r | |
218 | return 1\r | |
219 | return 10\r | |
220 | self.assertEqual(foo(), 1)\r | |
221 | \r | |
222 | class ClosingTestCase(unittest.TestCase):\r | |
223 | \r | |
224 | # XXX This needs more work\r | |
225 | \r | |
226 | def test_closing(self):\r | |
227 | state = []\r | |
228 | class C:\r | |
229 | def close(self):\r | |
230 | state.append(1)\r | |
231 | x = C()\r | |
232 | self.assertEqual(state, [])\r | |
233 | with closing(x) as y:\r | |
234 | self.assertEqual(x, y)\r | |
235 | self.assertEqual(state, [1])\r | |
236 | \r | |
237 | def test_closing_error(self):\r | |
238 | state = []\r | |
239 | class C:\r | |
240 | def close(self):\r | |
241 | state.append(1)\r | |
242 | x = C()\r | |
243 | self.assertEqual(state, [])\r | |
244 | with self.assertRaises(ZeroDivisionError):\r | |
245 | with closing(x) as y:\r | |
246 | self.assertEqual(x, y)\r | |
247 | 1 // 0\r | |
248 | self.assertEqual(state, [1])\r | |
249 | \r | |
250 | class FileContextTestCase(unittest.TestCase):\r | |
251 | \r | |
252 | def testWithOpen(self):\r | |
253 | tfn = tempfile.mktemp()\r | |
254 | try:\r | |
255 | f = None\r | |
256 | with open(tfn, "w") as f:\r | |
257 | self.assertFalse(f.closed)\r | |
258 | f.write("Booh\n")\r | |
259 | self.assertTrue(f.closed)\r | |
260 | f = None\r | |
261 | with self.assertRaises(ZeroDivisionError):\r | |
262 | with open(tfn, "r") as f:\r | |
263 | self.assertFalse(f.closed)\r | |
264 | self.assertEqual(f.read(), "Booh\n")\r | |
265 | 1 // 0\r | |
266 | self.assertTrue(f.closed)\r | |
267 | finally:\r | |
268 | test_support.unlink(tfn)\r | |
269 | \r | |
270 | @unittest.skipUnless(threading, 'Threading required for this test.')\r | |
271 | class LockContextTestCase(unittest.TestCase):\r | |
272 | \r | |
273 | def boilerPlate(self, lock, locked):\r | |
274 | self.assertFalse(locked())\r | |
275 | with lock:\r | |
276 | self.assertTrue(locked())\r | |
277 | self.assertFalse(locked())\r | |
278 | with self.assertRaises(ZeroDivisionError):\r | |
279 | with lock:\r | |
280 | self.assertTrue(locked())\r | |
281 | 1 // 0\r | |
282 | self.assertFalse(locked())\r | |
283 | \r | |
284 | def testWithLock(self):\r | |
285 | lock = threading.Lock()\r | |
286 | self.boilerPlate(lock, lock.locked)\r | |
287 | \r | |
288 | def testWithRLock(self):\r | |
289 | lock = threading.RLock()\r | |
290 | self.boilerPlate(lock, lock._is_owned)\r | |
291 | \r | |
292 | def testWithCondition(self):\r | |
293 | lock = threading.Condition()\r | |
294 | def locked():\r | |
295 | return lock._is_owned()\r | |
296 | self.boilerPlate(lock, locked)\r | |
297 | \r | |
298 | def testWithSemaphore(self):\r | |
299 | lock = threading.Semaphore()\r | |
300 | def locked():\r | |
301 | if lock.acquire(False):\r | |
302 | lock.release()\r | |
303 | return False\r | |
304 | else:\r | |
305 | return True\r | |
306 | self.boilerPlate(lock, locked)\r | |
307 | \r | |
308 | def testWithBoundedSemaphore(self):\r | |
309 | lock = threading.BoundedSemaphore()\r | |
310 | def locked():\r | |
311 | if lock.acquire(False):\r | |
312 | lock.release()\r | |
313 | return False\r | |
314 | else:\r | |
315 | return True\r | |
316 | self.boilerPlate(lock, locked)\r | |
317 | \r | |
318 | # This is needed to make the test actually run under regrtest.py!\r | |
319 | def test_main():\r | |
320 | with test_support.check_warnings(("With-statements now directly support "\r | |
321 | "multiple context managers",\r | |
322 | DeprecationWarning)):\r | |
323 | test_support.run_unittest(__name__)\r | |
324 | \r | |
325 | if __name__ == "__main__":\r | |
326 | test_main()\r |