]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | # Adapted from test_file.py by Daniel Stutzbach\r |
2 | \r | |
3 | from __future__ import unicode_literals\r | |
4 | \r | |
5 | import sys\r | |
6 | import os\r | |
7 | import errno\r | |
8 | import unittest\r | |
9 | from array import array\r | |
10 | from weakref import proxy\r | |
11 | from functools import wraps\r | |
12 | \r | |
13 | from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd\r | |
14 | from test.test_support import py3k_bytes as bytes\r | |
15 | from test.script_helper import run_python\r | |
16 | \r | |
17 | from _io import FileIO as _FileIO\r | |
18 | \r | |
19 | class AutoFileTests(unittest.TestCase):\r | |
20 | # file tests for which a test file is automatically set up\r | |
21 | \r | |
22 | def setUp(self):\r | |
23 | self.f = _FileIO(TESTFN, 'w')\r | |
24 | \r | |
25 | def tearDown(self):\r | |
26 | if self.f:\r | |
27 | self.f.close()\r | |
28 | os.remove(TESTFN)\r | |
29 | \r | |
30 | def testWeakRefs(self):\r | |
31 | # verify weak references\r | |
32 | p = proxy(self.f)\r | |
33 | p.write(bytes(range(10)))\r | |
34 | self.assertEqual(self.f.tell(), p.tell())\r | |
35 | self.f.close()\r | |
36 | self.f = None\r | |
37 | self.assertRaises(ReferenceError, getattr, p, 'tell')\r | |
38 | \r | |
39 | def testSeekTell(self):\r | |
40 | self.f.write(bytes(range(20)))\r | |
41 | self.assertEqual(self.f.tell(), 20)\r | |
42 | self.f.seek(0)\r | |
43 | self.assertEqual(self.f.tell(), 0)\r | |
44 | self.f.seek(10)\r | |
45 | self.assertEqual(self.f.tell(), 10)\r | |
46 | self.f.seek(5, 1)\r | |
47 | self.assertEqual(self.f.tell(), 15)\r | |
48 | self.f.seek(-5, 1)\r | |
49 | self.assertEqual(self.f.tell(), 10)\r | |
50 | self.f.seek(-5, 2)\r | |
51 | self.assertEqual(self.f.tell(), 15)\r | |
52 | \r | |
53 | def testAttributes(self):\r | |
54 | # verify expected attributes exist\r | |
55 | f = self.f\r | |
56 | \r | |
57 | self.assertEqual(f.mode, "wb")\r | |
58 | self.assertEqual(f.closed, False)\r | |
59 | \r | |
60 | # verify the attributes are readonly\r | |
61 | for attr in 'mode', 'closed':\r | |
62 | self.assertRaises((AttributeError, TypeError),\r | |
63 | setattr, f, attr, 'oops')\r | |
64 | \r | |
65 | def testReadinto(self):\r | |
66 | # verify readinto\r | |
67 | self.f.write(b"\x01\x02")\r | |
68 | self.f.close()\r | |
69 | a = array(b'b', b'x'*10)\r | |
70 | self.f = _FileIO(TESTFN, 'r')\r | |
71 | n = self.f.readinto(a)\r | |
72 | self.assertEqual(array(b'b', [1, 2]), a[:n])\r | |
73 | \r | |
74 | def test_none_args(self):\r | |
75 | self.f.write(b"hi\nbye\nabc")\r | |
76 | self.f.close()\r | |
77 | self.f = _FileIO(TESTFN, 'r')\r | |
78 | self.assertEqual(self.f.read(None), b"hi\nbye\nabc")\r | |
79 | self.f.seek(0)\r | |
80 | self.assertEqual(self.f.readline(None), b"hi\n")\r | |
81 | self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])\r | |
82 | \r | |
83 | def testRepr(self):\r | |
84 | self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"\r | |
85 | % (self.f.name, self.f.mode))\r | |
86 | del self.f.name\r | |
87 | self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"\r | |
88 | % (self.f.fileno(), self.f.mode))\r | |
89 | self.f.close()\r | |
90 | self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")\r | |
91 | \r | |
92 | def testErrors(self):\r | |
93 | f = self.f\r | |
94 | self.assertTrue(not f.isatty())\r | |
95 | self.assertTrue(not f.closed)\r | |
96 | #self.assertEqual(f.name, TESTFN)\r | |
97 | self.assertRaises(ValueError, f.read, 10) # Open for reading\r | |
98 | f.close()\r | |
99 | self.assertTrue(f.closed)\r | |
100 | f = _FileIO(TESTFN, 'r')\r | |
101 | self.assertRaises(TypeError, f.readinto, "")\r | |
102 | self.assertTrue(not f.closed)\r | |
103 | f.close()\r | |
104 | self.assertTrue(f.closed)\r | |
105 | \r | |
106 | def testMethods(self):\r | |
107 | methods = ['fileno', 'isatty', 'read', 'readinto',\r | |
108 | 'seek', 'tell', 'truncate', 'write', 'seekable',\r | |
109 | 'readable', 'writable']\r | |
110 | if sys.platform.startswith('atheos'):\r | |
111 | methods.remove('truncate')\r | |
112 | \r | |
113 | self.f.close()\r | |
114 | self.assertTrue(self.f.closed)\r | |
115 | \r | |
116 | for methodname in methods:\r | |
117 | method = getattr(self.f, methodname)\r | |
118 | # should raise on closed file\r | |
119 | self.assertRaises(ValueError, method)\r | |
120 | \r | |
121 | def testOpendir(self):\r | |
122 | # Issue 3703: opening a directory should fill the errno\r | |
123 | # Windows always returns "[Errno 13]: Permission denied\r | |
124 | # Unix calls dircheck() and returns "[Errno 21]: Is a directory"\r | |
125 | try:\r | |
126 | _FileIO('.', 'r')\r | |
127 | except IOError as e:\r | |
128 | self.assertNotEqual(e.errno, 0)\r | |
129 | self.assertEqual(e.filename, ".")\r | |
130 | else:\r | |
131 | self.fail("Should have raised IOError")\r | |
132 | \r | |
133 | #A set of functions testing that we get expected behaviour if someone has\r | |
134 | #manually closed the internal file descriptor. First, a decorator:\r | |
135 | def ClosedFD(func):\r | |
136 | @wraps(func)\r | |
137 | def wrapper(self):\r | |
138 | #forcibly close the fd before invoking the problem function\r | |
139 | f = self.f\r | |
140 | os.close(f.fileno())\r | |
141 | try:\r | |
142 | func(self, f)\r | |
143 | finally:\r | |
144 | try:\r | |
145 | self.f.close()\r | |
146 | except IOError:\r | |
147 | pass\r | |
148 | return wrapper\r | |
149 | \r | |
150 | def ClosedFDRaises(func):\r | |
151 | @wraps(func)\r | |
152 | def wrapper(self):\r | |
153 | #forcibly close the fd before invoking the problem function\r | |
154 | f = self.f\r | |
155 | os.close(f.fileno())\r | |
156 | try:\r | |
157 | func(self, f)\r | |
158 | except IOError as e:\r | |
159 | self.assertEqual(e.errno, errno.EBADF)\r | |
160 | else:\r | |
161 | self.fail("Should have raised IOError")\r | |
162 | finally:\r | |
163 | try:\r | |
164 | self.f.close()\r | |
165 | except IOError:\r | |
166 | pass\r | |
167 | return wrapper\r | |
168 | \r | |
169 | @ClosedFDRaises\r | |
170 | def testErrnoOnClose(self, f):\r | |
171 | f.close()\r | |
172 | \r | |
173 | @ClosedFDRaises\r | |
174 | def testErrnoOnClosedWrite(self, f):\r | |
175 | f.write('a')\r | |
176 | \r | |
177 | @ClosedFDRaises\r | |
178 | def testErrnoOnClosedSeek(self, f):\r | |
179 | f.seek(0)\r | |
180 | \r | |
181 | @ClosedFDRaises\r | |
182 | def testErrnoOnClosedTell(self, f):\r | |
183 | f.tell()\r | |
184 | \r | |
185 | @ClosedFDRaises\r | |
186 | def testErrnoOnClosedTruncate(self, f):\r | |
187 | f.truncate(0)\r | |
188 | \r | |
189 | @ClosedFD\r | |
190 | def testErrnoOnClosedSeekable(self, f):\r | |
191 | f.seekable()\r | |
192 | \r | |
193 | @ClosedFD\r | |
194 | def testErrnoOnClosedReadable(self, f):\r | |
195 | f.readable()\r | |
196 | \r | |
197 | @ClosedFD\r | |
198 | def testErrnoOnClosedWritable(self, f):\r | |
199 | f.writable()\r | |
200 | \r | |
201 | @ClosedFD\r | |
202 | def testErrnoOnClosedFileno(self, f):\r | |
203 | f.fileno()\r | |
204 | \r | |
205 | @ClosedFD\r | |
206 | def testErrnoOnClosedIsatty(self, f):\r | |
207 | self.assertEqual(f.isatty(), False)\r | |
208 | \r | |
209 | def ReopenForRead(self):\r | |
210 | try:\r | |
211 | self.f.close()\r | |
212 | except IOError:\r | |
213 | pass\r | |
214 | self.f = _FileIO(TESTFN, 'r')\r | |
215 | os.close(self.f.fileno())\r | |
216 | return self.f\r | |
217 | \r | |
218 | @ClosedFDRaises\r | |
219 | def testErrnoOnClosedRead(self, f):\r | |
220 | f = self.ReopenForRead()\r | |
221 | f.read(1)\r | |
222 | \r | |
223 | @ClosedFDRaises\r | |
224 | def testErrnoOnClosedReadall(self, f):\r | |
225 | f = self.ReopenForRead()\r | |
226 | f.readall()\r | |
227 | \r | |
228 | @ClosedFDRaises\r | |
229 | def testErrnoOnClosedReadinto(self, f):\r | |
230 | f = self.ReopenForRead()\r | |
231 | a = array(b'b', b'x'*10)\r | |
232 | f.readinto(a)\r | |
233 | \r | |
234 | class OtherFileTests(unittest.TestCase):\r | |
235 | \r | |
236 | def testAbles(self):\r | |
237 | try:\r | |
238 | f = _FileIO(TESTFN, "w")\r | |
239 | self.assertEqual(f.readable(), False)\r | |
240 | self.assertEqual(f.writable(), True)\r | |
241 | self.assertEqual(f.seekable(), True)\r | |
242 | f.close()\r | |
243 | \r | |
244 | f = _FileIO(TESTFN, "r")\r | |
245 | self.assertEqual(f.readable(), True)\r | |
246 | self.assertEqual(f.writable(), False)\r | |
247 | self.assertEqual(f.seekable(), True)\r | |
248 | f.close()\r | |
249 | \r | |
250 | f = _FileIO(TESTFN, "a+")\r | |
251 | self.assertEqual(f.readable(), True)\r | |
252 | self.assertEqual(f.writable(), True)\r | |
253 | self.assertEqual(f.seekable(), True)\r | |
254 | self.assertEqual(f.isatty(), False)\r | |
255 | f.close()\r | |
256 | \r | |
257 | if sys.platform != "win32":\r | |
258 | try:\r | |
259 | f = _FileIO("/dev/tty", "a")\r | |
260 | except EnvironmentError:\r | |
261 | # When run in a cron job there just aren't any\r | |
262 | # ttys, so skip the test. This also handles other\r | |
263 | # OS'es that don't support /dev/tty.\r | |
264 | pass\r | |
265 | else:\r | |
266 | self.assertEqual(f.readable(), False)\r | |
267 | self.assertEqual(f.writable(), True)\r | |
268 | if sys.platform != "darwin" and \\r | |
269 | 'bsd' not in sys.platform and \\r | |
270 | not sys.platform.startswith('sunos'):\r | |
271 | # Somehow /dev/tty appears seekable on some BSDs\r | |
272 | self.assertEqual(f.seekable(), False)\r | |
273 | self.assertEqual(f.isatty(), True)\r | |
274 | f.close()\r | |
275 | finally:\r | |
276 | os.unlink(TESTFN)\r | |
277 | \r | |
278 | def testModeStrings(self):\r | |
279 | # check invalid mode strings\r | |
280 | for mode in ("", "aU", "wU+", "rw", "rt"):\r | |
281 | try:\r | |
282 | f = _FileIO(TESTFN, mode)\r | |
283 | except ValueError:\r | |
284 | pass\r | |
285 | else:\r | |
286 | f.close()\r | |
287 | self.fail('%r is an invalid file mode' % mode)\r | |
288 | \r | |
289 | def testUnicodeOpen(self):\r | |
290 | # verify repr works for unicode too\r | |
291 | f = _FileIO(str(TESTFN), "w")\r | |
292 | f.close()\r | |
293 | os.unlink(TESTFN)\r | |
294 | \r | |
295 | def testBytesOpen(self):\r | |
296 | # Opening a bytes filename\r | |
297 | try:\r | |
298 | fn = TESTFN.encode("ascii")\r | |
299 | except UnicodeEncodeError:\r | |
300 | # Skip test\r | |
301 | return\r | |
302 | f = _FileIO(fn, "w")\r | |
303 | try:\r | |
304 | f.write(b"abc")\r | |
305 | f.close()\r | |
306 | with open(TESTFN, "rb") as f:\r | |
307 | self.assertEqual(f.read(), b"abc")\r | |
308 | finally:\r | |
309 | os.unlink(TESTFN)\r | |
310 | \r | |
311 | def testInvalidFd(self):\r | |
312 | self.assertRaises(ValueError, _FileIO, -10)\r | |
313 | self.assertRaises(OSError, _FileIO, make_bad_fd())\r | |
314 | if sys.platform == 'win32':\r | |
315 | import msvcrt\r | |
316 | self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())\r | |
317 | \r | |
318 | def testBadModeArgument(self):\r | |
319 | # verify that we get a sensible error message for bad mode argument\r | |
320 | bad_mode = "qwerty"\r | |
321 | try:\r | |
322 | f = _FileIO(TESTFN, bad_mode)\r | |
323 | except ValueError as msg:\r | |
324 | if msg.args[0] != 0:\r | |
325 | s = str(msg)\r | |
326 | if TESTFN in s or bad_mode not in s:\r | |
327 | self.fail("bad error message for invalid mode: %s" % s)\r | |
328 | # if msg.args[0] == 0, we're probably on Windows where there may be\r | |
329 | # no obvious way to discover why open() failed.\r | |
330 | else:\r | |
331 | f.close()\r | |
332 | self.fail("no error for invalid mode: %s" % bad_mode)\r | |
333 | \r | |
334 | def testTruncate(self):\r | |
335 | f = _FileIO(TESTFN, 'w')\r | |
336 | f.write(bytes(bytearray(range(10))))\r | |
337 | self.assertEqual(f.tell(), 10)\r | |
338 | f.truncate(5)\r | |
339 | self.assertEqual(f.tell(), 10)\r | |
340 | self.assertEqual(f.seek(0, os.SEEK_END), 5)\r | |
341 | f.truncate(15)\r | |
342 | self.assertEqual(f.tell(), 5)\r | |
343 | self.assertEqual(f.seek(0, os.SEEK_END), 15)\r | |
344 | f.close()\r | |
345 | \r | |
346 | def testTruncateOnWindows(self):\r | |
347 | def bug801631():\r | |
348 | # SF bug <http://www.python.org/sf/801631>\r | |
349 | # "file.truncate fault on windows"\r | |
350 | f = _FileIO(TESTFN, 'w')\r | |
351 | f.write(bytes(range(11)))\r | |
352 | f.close()\r | |
353 | \r | |
354 | f = _FileIO(TESTFN,'r+')\r | |
355 | data = f.read(5)\r | |
356 | if data != bytes(range(5)):\r | |
357 | self.fail("Read on file opened for update failed %r" % data)\r | |
358 | if f.tell() != 5:\r | |
359 | self.fail("File pos after read wrong %d" % f.tell())\r | |
360 | \r | |
361 | f.truncate()\r | |
362 | if f.tell() != 5:\r | |
363 | self.fail("File pos after ftruncate wrong %d" % f.tell())\r | |
364 | \r | |
365 | f.close()\r | |
366 | size = os.path.getsize(TESTFN)\r | |
367 | if size != 5:\r | |
368 | self.fail("File size after ftruncate wrong %d" % size)\r | |
369 | \r | |
370 | try:\r | |
371 | bug801631()\r | |
372 | finally:\r | |
373 | os.unlink(TESTFN)\r | |
374 | \r | |
375 | def testAppend(self):\r | |
376 | try:\r | |
377 | f = open(TESTFN, 'wb')\r | |
378 | f.write(b'spam')\r | |
379 | f.close()\r | |
380 | f = open(TESTFN, 'ab')\r | |
381 | f.write(b'eggs')\r | |
382 | f.close()\r | |
383 | f = open(TESTFN, 'rb')\r | |
384 | d = f.read()\r | |
385 | f.close()\r | |
386 | self.assertEqual(d, b'spameggs')\r | |
387 | finally:\r | |
388 | try:\r | |
389 | os.unlink(TESTFN)\r | |
390 | except:\r | |
391 | pass\r | |
392 | \r | |
393 | def testInvalidInit(self):\r | |
394 | self.assertRaises(TypeError, _FileIO, "1", 0, 0)\r | |
395 | \r | |
396 | def testWarnings(self):\r | |
397 | with check_warnings(quiet=True) as w:\r | |
398 | self.assertEqual(w.warnings, [])\r | |
399 | self.assertRaises(TypeError, _FileIO, [])\r | |
400 | self.assertEqual(w.warnings, [])\r | |
401 | self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")\r | |
402 | self.assertEqual(w.warnings, [])\r | |
403 | \r | |
404 | def test_surrogates(self):\r | |
405 | # Issue #8438: try to open a filename containing surrogates.\r | |
406 | # It should either fail because the file doesn't exist or the filename\r | |
407 | # can't be represented using the filesystem encoding, but not because\r | |
408 | # of a LookupError for the error handler "surrogateescape".\r | |
409 | filename = u'\udc80.txt'\r | |
410 | try:\r | |
411 | with _FileIO(filename):\r | |
412 | pass\r | |
413 | except (UnicodeEncodeError, IOError):\r | |
414 | pass\r | |
415 | # Spawn a separate Python process with a different "file system\r | |
416 | # default encoding", to exercise this further.\r | |
417 | env = dict(os.environ)\r | |
418 | env[b'LC_CTYPE'] = b'C'\r | |
419 | _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)\r | |
420 | if ('UnicodeEncodeError' not in out and\r | |
421 | 'IOError: [Errno 2] No such file or directory' not in out):\r | |
422 | self.fail('Bad output: %r' % out)\r | |
423 | \r | |
424 | def test_main():\r | |
425 | # Historically, these tests have been sloppy about removing TESTFN.\r | |
426 | # So get rid of it no matter what.\r | |
427 | try:\r | |
428 | run_unittest(AutoFileTests, OtherFileTests)\r | |
429 | finally:\r | |
430 | if os.path.exists(TESTFN):\r | |
431 | os.unlink(TESTFN)\r | |
432 | \r | |
433 | if __name__ == '__main__':\r | |
434 | test_main()\r |