]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | # NOTE: this file tests the new `io` library backported from Python 3.x.\r |
2 | # Similar tests for the builtin file object can be found in test_file2k.py.\r | |
3 | \r | |
4 | from __future__ import print_function\r | |
5 | \r | |
6 | import sys\r | |
7 | import os\r | |
8 | import unittest\r | |
9 | from array import array\r | |
10 | from weakref import proxy\r | |
11 | \r | |
12 | import io\r | |
13 | import _pyio as pyio\r | |
14 | \r | |
15 | from test.test_support import TESTFN, run_unittest\r | |
16 | from UserList import UserList\r | |
17 | \r | |
18 | class AutoFileTests(unittest.TestCase):\r | |
19 | # file tests for which a test file is automatically set up\r | |
20 | \r | |
21 | def setUp(self):\r | |
22 | self.f = self.open(TESTFN, 'wb')\r | |
23 | \r | |
24 | def tearDown(self):\r | |
25 | if self.f:\r | |
26 | self.f.close()\r | |
27 | os.remove(TESTFN)\r | |
28 | \r | |
29 | def testWeakRefs(self):\r | |
30 | # verify weak references\r | |
31 | p = proxy(self.f)\r | |
32 | p.write(b'teststring')\r | |
33 | self.assertEqual(self.f.tell(), p.tell())\r | |
34 | self.f.close()\r | |
35 | self.f = None\r | |
36 | self.assertRaises(ReferenceError, getattr, p, 'tell')\r | |
37 | \r | |
38 | def testAttributes(self):\r | |
39 | # verify expected attributes exist\r | |
40 | f = self.f\r | |
41 | f.name # merely shouldn't blow up\r | |
42 | f.mode # ditto\r | |
43 | f.closed # ditto\r | |
44 | \r | |
45 | def testReadinto(self):\r | |
46 | # verify readinto\r | |
47 | self.f.write(b'12')\r | |
48 | self.f.close()\r | |
49 | a = array('b', b'x'*10)\r | |
50 | self.f = self.open(TESTFN, 'rb')\r | |
51 | n = self.f.readinto(a)\r | |
52 | self.assertEqual(b'12', a.tostring()[:n])\r | |
53 | \r | |
54 | def testReadinto_text(self):\r | |
55 | # verify readinto refuses text files\r | |
56 | a = array('b', b'x'*10)\r | |
57 | self.f.close()\r | |
58 | self.f = self.open(TESTFN, 'r')\r | |
59 | if hasattr(self.f, "readinto"):\r | |
60 | self.assertRaises(TypeError, self.f.readinto, a)\r | |
61 | \r | |
62 | def testWritelinesUserList(self):\r | |
63 | # verify writelines with instance sequence\r | |
64 | l = UserList([b'1', b'2'])\r | |
65 | self.f.writelines(l)\r | |
66 | self.f.close()\r | |
67 | self.f = self.open(TESTFN, 'rb')\r | |
68 | buf = self.f.read()\r | |
69 | self.assertEqual(buf, b'12')\r | |
70 | \r | |
71 | def testWritelinesIntegers(self):\r | |
72 | # verify writelines with integers\r | |
73 | self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])\r | |
74 | \r | |
75 | def testWritelinesIntegersUserList(self):\r | |
76 | # verify writelines with integers in UserList\r | |
77 | l = UserList([1,2,3])\r | |
78 | self.assertRaises(TypeError, self.f.writelines, l)\r | |
79 | \r | |
80 | def testWritelinesNonString(self):\r | |
81 | # verify writelines with non-string object\r | |
82 | class NonString:\r | |
83 | pass\r | |
84 | \r | |
85 | self.assertRaises(TypeError, self.f.writelines,\r | |
86 | [NonString(), NonString()])\r | |
87 | \r | |
88 | def testErrors(self):\r | |
89 | f = self.f\r | |
90 | self.assertEqual(f.name, TESTFN)\r | |
91 | self.assertTrue(not f.isatty())\r | |
92 | self.assertTrue(not f.closed)\r | |
93 | \r | |
94 | if hasattr(f, "readinto"):\r | |
95 | self.assertRaises((IOError, TypeError), f.readinto, "")\r | |
96 | f.close()\r | |
97 | self.assertTrue(f.closed)\r | |
98 | \r | |
99 | def testMethods(self):\r | |
100 | methods = [('fileno', ()),\r | |
101 | ('flush', ()),\r | |
102 | ('isatty', ()),\r | |
103 | ('next', ()),\r | |
104 | ('read', ()),\r | |
105 | ('write', (b"",)),\r | |
106 | ('readline', ()),\r | |
107 | ('readlines', ()),\r | |
108 | ('seek', (0,)),\r | |
109 | ('tell', ()),\r | |
110 | ('write', (b"",)),\r | |
111 | ('writelines', ([],)),\r | |
112 | ('__iter__', ()),\r | |
113 | ]\r | |
114 | if not sys.platform.startswith('atheos'):\r | |
115 | methods.append(('truncate', ()))\r | |
116 | \r | |
117 | # __exit__ should close the file\r | |
118 | self.f.__exit__(None, None, None)\r | |
119 | self.assertTrue(self.f.closed)\r | |
120 | \r | |
121 | for methodname, args in methods:\r | |
122 | method = getattr(self.f, methodname)\r | |
123 | # should raise on closed file\r | |
124 | self.assertRaises(ValueError, method, *args)\r | |
125 | \r | |
126 | # file is closed, __exit__ shouldn't do anything\r | |
127 | self.assertEqual(self.f.__exit__(None, None, None), None)\r | |
128 | # it must also return None if an exception was given\r | |
129 | try:\r | |
130 | 1 // 0\r | |
131 | except:\r | |
132 | self.assertEqual(self.f.__exit__(*sys.exc_info()), None)\r | |
133 | \r | |
134 | def testReadWhenWriting(self):\r | |
135 | self.assertRaises(IOError, self.f.read)\r | |
136 | \r | |
137 | class CAutoFileTests(AutoFileTests):\r | |
138 | open = io.open\r | |
139 | \r | |
140 | class PyAutoFileTests(AutoFileTests):\r | |
141 | open = staticmethod(pyio.open)\r | |
142 | \r | |
143 | \r | |
144 | class OtherFileTests(unittest.TestCase):\r | |
145 | \r | |
146 | def testModeStrings(self):\r | |
147 | # check invalid mode strings\r | |
148 | for mode in ("", "aU", "wU+"):\r | |
149 | try:\r | |
150 | f = self.open(TESTFN, mode)\r | |
151 | except ValueError:\r | |
152 | pass\r | |
153 | else:\r | |
154 | f.close()\r | |
155 | self.fail('%r is an invalid file mode' % mode)\r | |
156 | \r | |
157 | def testStdin(self):\r | |
158 | # This causes the interpreter to exit on OSF1 v5.1.\r | |
159 | if sys.platform != 'osf1V5':\r | |
160 | self.assertRaises((IOError, ValueError), sys.stdin.seek, -1)\r | |
161 | else:\r | |
162 | print((\r | |
163 | ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'\r | |
164 | ' Test manually.'), file=sys.__stdout__)\r | |
165 | self.assertRaises((IOError, ValueError), sys.stdin.truncate)\r | |
166 | \r | |
167 | def testBadModeArgument(self):\r | |
168 | # verify that we get a sensible error message for bad mode argument\r | |
169 | bad_mode = "qwerty"\r | |
170 | try:\r | |
171 | f = self.open(TESTFN, bad_mode)\r | |
172 | except ValueError as msg:\r | |
173 | if msg.args[0] != 0:\r | |
174 | s = str(msg)\r | |
175 | if TESTFN in s or bad_mode not in s:\r | |
176 | self.fail("bad error message for invalid mode: %s" % s)\r | |
177 | # if msg.args[0] == 0, we're probably on Windows where there may be\r | |
178 | # no obvious way to discover why open() failed.\r | |
179 | else:\r | |
180 | f.close()\r | |
181 | self.fail("no error for invalid mode: %s" % bad_mode)\r | |
182 | \r | |
183 | def testSetBufferSize(self):\r | |
184 | # make sure that explicitly setting the buffer size doesn't cause\r | |
185 | # misbehaviour especially with repeated close() calls\r | |
186 | for s in (-1, 0, 1, 512):\r | |
187 | try:\r | |
188 | f = self.open(TESTFN, 'wb', s)\r | |
189 | f.write(str(s).encode("ascii"))\r | |
190 | f.close()\r | |
191 | f.close()\r | |
192 | f = self.open(TESTFN, 'rb', s)\r | |
193 | d = int(f.read().decode("ascii"))\r | |
194 | f.close()\r | |
195 | f.close()\r | |
196 | except IOError as msg:\r | |
197 | self.fail('error setting buffer size %d: %s' % (s, str(msg)))\r | |
198 | self.assertEqual(d, s)\r | |
199 | \r | |
200 | def testTruncateOnWindows(self):\r | |
201 | # SF bug <http://www.python.org/sf/801631>\r | |
202 | # "file.truncate fault on windows"\r | |
203 | \r | |
204 | os.unlink(TESTFN)\r | |
205 | f = self.open(TESTFN, 'wb')\r | |
206 | \r | |
207 | try:\r | |
208 | f.write(b'12345678901') # 11 bytes\r | |
209 | f.close()\r | |
210 | \r | |
211 | f = self.open(TESTFN,'rb+')\r | |
212 | data = f.read(5)\r | |
213 | if data != b'12345':\r | |
214 | self.fail("Read on file opened for update failed %r" % data)\r | |
215 | if f.tell() != 5:\r | |
216 | self.fail("File pos after read wrong %d" % f.tell())\r | |
217 | \r | |
218 | f.truncate()\r | |
219 | if f.tell() != 5:\r | |
220 | self.fail("File pos after ftruncate wrong %d" % f.tell())\r | |
221 | \r | |
222 | f.close()\r | |
223 | size = os.path.getsize(TESTFN)\r | |
224 | if size != 5:\r | |
225 | self.fail("File size after ftruncate wrong %d" % size)\r | |
226 | finally:\r | |
227 | f.close()\r | |
228 | os.unlink(TESTFN)\r | |
229 | \r | |
230 | def testIteration(self):\r | |
231 | # Test the complex interaction when mixing file-iteration and the\r | |
232 | # various read* methods.\r | |
233 | dataoffset = 16384\r | |
234 | filler = b"ham\n"\r | |
235 | assert not dataoffset % len(filler), \\r | |
236 | "dataoffset must be multiple of len(filler)"\r | |
237 | nchunks = dataoffset // len(filler)\r | |
238 | testlines = [\r | |
239 | b"spam, spam and eggs\n",\r | |
240 | b"eggs, spam, ham and spam\n",\r | |
241 | b"saussages, spam, spam and eggs\n",\r | |
242 | b"spam, ham, spam and eggs\n",\r | |
243 | b"spam, spam, spam, spam, spam, ham, spam\n",\r | |
244 | b"wonderful spaaaaaam.\n"\r | |
245 | ]\r | |
246 | methods = [("readline", ()), ("read", ()), ("readlines", ()),\r | |
247 | ("readinto", (array("b", b" "*100),))]\r | |
248 | \r | |
249 | try:\r | |
250 | # Prepare the testfile\r | |
251 | bag = self.open(TESTFN, "wb")\r | |
252 | bag.write(filler * nchunks)\r | |
253 | bag.writelines(testlines)\r | |
254 | bag.close()\r | |
255 | # Test for appropriate errors mixing read* and iteration\r | |
256 | for methodname, args in methods:\r | |
257 | f = self.open(TESTFN, 'rb')\r | |
258 | if next(f) != filler:\r | |
259 | self.fail, "Broken testfile"\r | |
260 | meth = getattr(f, methodname)\r | |
261 | meth(*args) # This simply shouldn't fail\r | |
262 | f.close()\r | |
263 | \r | |
264 | # Test to see if harmless (by accident) mixing of read* and\r | |
265 | # iteration still works. This depends on the size of the internal\r | |
266 | # iteration buffer (currently 8192,) but we can test it in a\r | |
267 | # flexible manner. Each line in the bag o' ham is 4 bytes\r | |
268 | # ("h", "a", "m", "\n"), so 4096 lines of that should get us\r | |
269 | # exactly on the buffer boundary for any power-of-2 buffersize\r | |
270 | # between 4 and 16384 (inclusive).\r | |
271 | f = self.open(TESTFN, 'rb')\r | |
272 | for i in range(nchunks):\r | |
273 | next(f)\r | |
274 | testline = testlines.pop(0)\r | |
275 | try:\r | |
276 | line = f.readline()\r | |
277 | except ValueError:\r | |
278 | self.fail("readline() after next() with supposedly empty "\r | |
279 | "iteration-buffer failed anyway")\r | |
280 | if line != testline:\r | |
281 | self.fail("readline() after next() with empty buffer "\r | |
282 | "failed. Got %r, expected %r" % (line, testline))\r | |
283 | testline = testlines.pop(0)\r | |
284 | buf = array("b", b"\x00" * len(testline))\r | |
285 | try:\r | |
286 | f.readinto(buf)\r | |
287 | except ValueError:\r | |
288 | self.fail("readinto() after next() with supposedly empty "\r | |
289 | "iteration-buffer failed anyway")\r | |
290 | line = buf.tostring()\r | |
291 | if line != testline:\r | |
292 | self.fail("readinto() after next() with empty buffer "\r | |
293 | "failed. Got %r, expected %r" % (line, testline))\r | |
294 | \r | |
295 | testline = testlines.pop(0)\r | |
296 | try:\r | |
297 | line = f.read(len(testline))\r | |
298 | except ValueError:\r | |
299 | self.fail("read() after next() with supposedly empty "\r | |
300 | "iteration-buffer failed anyway")\r | |
301 | if line != testline:\r | |
302 | self.fail("read() after next() with empty buffer "\r | |
303 | "failed. Got %r, expected %r" % (line, testline))\r | |
304 | try:\r | |
305 | lines = f.readlines()\r | |
306 | except ValueError:\r | |
307 | self.fail("readlines() after next() with supposedly empty "\r | |
308 | "iteration-buffer failed anyway")\r | |
309 | if lines != testlines:\r | |
310 | self.fail("readlines() after next() with empty buffer "\r | |
311 | "failed. Got %r, expected %r" % (line, testline))\r | |
312 | # Reading after iteration hit EOF shouldn't hurt either\r | |
313 | f = self.open(TESTFN, 'rb')\r | |
314 | try:\r | |
315 | for line in f:\r | |
316 | pass\r | |
317 | try:\r | |
318 | f.readline()\r | |
319 | f.readinto(buf)\r | |
320 | f.read()\r | |
321 | f.readlines()\r | |
322 | except ValueError:\r | |
323 | self.fail("read* failed after next() consumed file")\r | |
324 | finally:\r | |
325 | f.close()\r | |
326 | finally:\r | |
327 | os.unlink(TESTFN)\r | |
328 | \r | |
329 | class COtherFileTests(OtherFileTests):\r | |
330 | open = io.open\r | |
331 | \r | |
332 | class PyOtherFileTests(OtherFileTests):\r | |
333 | open = staticmethod(pyio.open)\r | |
334 | \r | |
335 | \r | |
336 | def test_main():\r | |
337 | # Historically, these tests have been sloppy about removing TESTFN.\r | |
338 | # So get rid of it no matter what.\r | |
339 | try:\r | |
340 | run_unittest(CAutoFileTests, PyAutoFileTests,\r | |
341 | COtherFileTests, PyOtherFileTests)\r | |
342 | finally:\r | |
343 | if os.path.exists(TESTFN):\r | |
344 | os.unlink(TESTFN)\r | |
345 | \r | |
346 | if __name__ == '__main__':\r | |
347 | test_main()\r |