]> git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_io.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_io.py
1 """Unit tests for the io module."""
2
3 # Tests of io are scattered over the test suite:
4 # * test_bufio - tests file buffering
5 # * test_memoryio - tests BytesIO and StringIO
6 # * test_fileio - tests FileIO
7 # * test_file - tests the file interface
8 # * test_io - tests everything else in the io module
9 # * test_univnewlines - tests universal newline support
10 # * test_largefile - tests operations on a file greater than 2**32 bytes
11 # (only enabled with -ulargefile)
12
13 ################################################################################
14 # ATTENTION TEST WRITERS!!!
15 ################################################################################
16 # When writing tests for io, it's important to test both the C and Python
17 # implementations. This is usually done by writing a base test that refers to
18 # the type it is testing as a attribute. Then it provides custom subclasses to
19 # test both implementations. This file has lots of examples.
20 ################################################################################
21
22 from __future__ import print_function
23 from __future__ import unicode_literals
24
25 import os
26 import sys
27 import time
28 import array
29 import random
30 import unittest
31 import weakref
32 import abc
33 import signal
34 import errno
35 from itertools import cycle, count
36 from collections import deque
37 from test import test_support as support
38
39 import codecs
40 import io # C implementation of io
41 import _pyio as pyio # Python implementation of io
42 try:
43 import threading
44 except ImportError:
45 threading = None
46
47 __metaclass__ = type
48 bytes = support.py3k_bytes
49
50 def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
54
55
56 class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
59
60 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
63 self._reads = 0
64 self._extraneous_reads = 0
65
66 def write(self, b):
67 self._write_stack.append(bytes(b))
68 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
83 return 0 # wrong but we gotta return something
84
85 def tell(self):
86 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
94 self._extraneous_reads += 1
95 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
112 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119 class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
129 class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
134
135
136 class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return MockRawIO.write(self, b) * 2
139
140 def read(self, n=None):
141 return MockRawIO.read(self, n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 MockRawIO.readinto(self, buf)
151 return len(buf) * 5
152
153 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160 class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175 class MockFileIO:
176
177 def __init__(self, data):
178 self.read_history = []
179 super(MockFileIO, self).__init__(data)
180
181 def read(self, n=None):
182 res = super(MockFileIO, self).read(n)
183 self.read_history.append(None if res is None else len(res))
184 return res
185
186 def readinto(self, b):
187 res = super(MockFileIO, self).readinto(b)
188 self.read_history.append(res)
189 return res
190
191 class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
193
194 class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
198 class MockNonBlockWriterIO:
199
200 def __init__(self):
201 self._write_stack = []
202 self._blocker_char = None
203
204 def pop_written(self):
205 s = b"".join(self._write_stack)
206 self._write_stack[:] = []
207 return s
208
209 def block_on(self, char):
210 """Block when a given char is encountered."""
211 self._blocker_char = char
212
213 def readable(self):
214 return True
215
216 def seekable(self):
217 return True
218
219 def writable(self):
220 return True
221
222 def write(self, b):
223 b = bytes(b)
224 n = -1
225 if self._blocker_char:
226 try:
227 n = b.index(self._blocker_char)
228 except ValueError:
229 pass
230 else:
231 self._blocker_char = None
232 self._write_stack.append(b[:n])
233 raise self.BlockingIOError(0, "test blocking", n)
234 self._write_stack.append(b)
235 return len(b)
236
237 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238 BlockingIOError = io.BlockingIOError
239
240 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241 BlockingIOError = pyio.BlockingIOError
242
243
244 class IOTest(unittest.TestCase):
245
246 def setUp(self):
247 support.unlink(support.TESTFN)
248
249 def tearDown(self):
250 support.unlink(support.TESTFN)
251
252 def write_ops(self, f):
253 self.assertEqual(f.write(b"blah."), 5)
254 f.truncate(0)
255 self.assertEqual(f.tell(), 5)
256 f.seek(0)
257
258 self.assertEqual(f.write(b"blah."), 5)
259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.write(b"Hello."), 6)
261 self.assertEqual(f.tell(), 6)
262 self.assertEqual(f.seek(-1, 1), 5)
263 self.assertEqual(f.tell(), 5)
264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.write(b"h"), 1)
267 self.assertEqual(f.seek(-1, 2), 13)
268 self.assertEqual(f.tell(), 13)
269
270 self.assertEqual(f.truncate(12), 12)
271 self.assertEqual(f.tell(), 13)
272 self.assertRaises(TypeError, f.seek, 0.0)
273
274 def read_ops(self, f, buffered=False):
275 data = f.read(5)
276 self.assertEqual(data, b"hello")
277 data = bytearray(data)
278 self.assertEqual(f.readinto(data), 5)
279 self.assertEqual(data, b" worl")
280 self.assertEqual(f.readinto(data), 2)
281 self.assertEqual(len(data), 5)
282 self.assertEqual(data[:2], b"d\n")
283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.read(20), b"hello world\n")
285 self.assertEqual(f.read(1), b"")
286 self.assertEqual(f.readinto(bytearray(b"x")), 0)
287 self.assertEqual(f.seek(-6, 2), 6)
288 self.assertEqual(f.read(5), b"world")
289 self.assertEqual(f.read(0), b"")
290 self.assertEqual(f.readinto(bytearray()), 0)
291 self.assertEqual(f.seek(-6, 1), 5)
292 self.assertEqual(f.read(5), b" worl")
293 self.assertEqual(f.tell(), 10)
294 self.assertRaises(TypeError, f.seek, 0.0)
295 if buffered:
296 f.seek(0)
297 self.assertEqual(f.read(), b"hello world\n")
298 f.seek(6)
299 self.assertEqual(f.read(), b"world\n")
300 self.assertEqual(f.read(), b"")
301
302 LARGE = 2**31
303
304 def large_file_ops(self, f):
305 assert f.readable()
306 assert f.writable()
307 self.assertEqual(f.seek(self.LARGE), self.LARGE)
308 self.assertEqual(f.tell(), self.LARGE)
309 self.assertEqual(f.write(b"xxx"), 3)
310 self.assertEqual(f.tell(), self.LARGE + 3)
311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312 self.assertEqual(f.truncate(), self.LARGE + 2)
313 self.assertEqual(f.tell(), self.LARGE + 2)
314 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
316 self.assertEqual(f.tell(), self.LARGE + 2)
317 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318 self.assertEqual(f.seek(-1, 2), self.LARGE)
319 self.assertEqual(f.read(2), b"x")
320
321 def test_invalid_operations(self):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode in ("w", "wb"):
324 with self.open(support.TESTFN, mode) as fp:
325 self.assertRaises(IOError, fp.read)
326 self.assertRaises(IOError, fp.readline)
327 with self.open(support.TESTFN, "rb") as fp:
328 self.assertRaises(IOError, fp.write, b"blah")
329 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
330 with self.open(support.TESTFN, "r") as fp:
331 self.assertRaises(IOError, fp.write, "blah")
332 self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
334 def test_raw_file_io(self):
335 with self.open(support.TESTFN, "wb", buffering=0) as f:
336 self.assertEqual(f.readable(), False)
337 self.assertEqual(f.writable(), True)
338 self.assertEqual(f.seekable(), True)
339 self.write_ops(f)
340 with self.open(support.TESTFN, "rb", buffering=0) as f:
341 self.assertEqual(f.readable(), True)
342 self.assertEqual(f.writable(), False)
343 self.assertEqual(f.seekable(), True)
344 self.read_ops(f)
345
346 def test_buffered_file_io(self):
347 with self.open(support.TESTFN, "wb") as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb") as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f, True)
357
358 def test_readline(self):
359 with self.open(support.TESTFN, "wb") as f:
360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readline(), b"abc\n")
363 self.assertEqual(f.readline(10), b"def\n")
364 self.assertEqual(f.readline(2), b"xy")
365 self.assertEqual(f.readline(4), b"zzy\n")
366 self.assertEqual(f.readline(), b"foo\x00bar\n")
367 self.assertEqual(f.readline(None), b"another line")
368 self.assertRaises(TypeError, f.readline, 5.3)
369 with self.open(support.TESTFN, "r") as f:
370 self.assertRaises(TypeError, f.readline, 5.3)
371
372 def test_raw_bytes_io(self):
373 f = self.BytesIO()
374 self.write_ops(f)
375 data = f.getvalue()
376 self.assertEqual(data, b"hello world\n")
377 f = self.BytesIO(data)
378 self.read_ops(f, True)
379
380 def test_large_file_ops(self):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
384 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385 if not support.is_resource_enabled("largefile"):
386 print("\nTesting large file ops skipped on %s." % sys.platform,
387 file=sys.stderr)
388 print("It requires %d bytes and a long time." % self.LARGE,
389 file=sys.stderr)
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
391 file=sys.stderr)
392 return
393 with self.open(support.TESTFN, "w+b", 0) as f:
394 self.large_file_ops(f)
395 with self.open(support.TESTFN, "w+b") as f:
396 self.large_file_ops(f)
397
398 def test_with_open(self):
399 for bufsize in (0, 1, 100):
400 f = None
401 with self.open(support.TESTFN, "wb", bufsize) as f:
402 f.write(b"xxx")
403 self.assertEqual(f.closed, True)
404 f = None
405 try:
406 with self.open(support.TESTFN, "wb", bufsize) as f:
407 1 // 0
408 except ZeroDivisionError:
409 self.assertEqual(f.closed, True)
410 else:
411 self.fail("1 // 0 didn't raise an exception")
412
413 # issue 5008
414 def test_append_mode_tell(self):
415 with self.open(support.TESTFN, "wb") as f:
416 f.write(b"xxx")
417 with self.open(support.TESTFN, "ab", buffering=0) as f:
418 self.assertEqual(f.tell(), 3)
419 with self.open(support.TESTFN, "ab") as f:
420 self.assertEqual(f.tell(), 3)
421 with self.open(support.TESTFN, "a") as f:
422 self.assertTrue(f.tell() > 0)
423
424 def test_destructor(self):
425 record = []
426 class MyFileIO(self.FileIO):
427 def __del__(self):
428 record.append(1)
429 try:
430 f = super(MyFileIO, self).__del__
431 except AttributeError:
432 pass
433 else:
434 f()
435 def close(self):
436 record.append(2)
437 super(MyFileIO, self).close()
438 def flush(self):
439 record.append(3)
440 super(MyFileIO, self).flush()
441 f = MyFileIO(support.TESTFN, "wb")
442 f.write(b"xxx")
443 del f
444 support.gc_collect()
445 self.assertEqual(record, [1, 2, 3])
446 with self.open(support.TESTFN, "rb") as f:
447 self.assertEqual(f.read(), b"xxx")
448
449 def _check_base_destructor(self, base):
450 record = []
451 class MyIO(base):
452 def __init__(self):
453 # This exercises the availability of attributes on object
454 # destruction.
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
457 self.on_del = 1
458 self.on_close = 2
459 self.on_flush = 3
460 def __del__(self):
461 record.append(self.on_del)
462 try:
463 f = super(MyIO, self).__del__
464 except AttributeError:
465 pass
466 else:
467 f()
468 def close(self):
469 record.append(self.on_close)
470 super(MyIO, self).close()
471 def flush(self):
472 record.append(self.on_flush)
473 super(MyIO, self).flush()
474 f = MyIO()
475 del f
476 support.gc_collect()
477 self.assertEqual(record, [1, 2, 3])
478
479 def test_IOBase_destructor(self):
480 self._check_base_destructor(self.IOBase)
481
482 def test_RawIOBase_destructor(self):
483 self._check_base_destructor(self.RawIOBase)
484
485 def test_BufferedIOBase_destructor(self):
486 self._check_base_destructor(self.BufferedIOBase)
487
488 def test_TextIOBase_destructor(self):
489 self._check_base_destructor(self.TextIOBase)
490
491 def test_close_flushes(self):
492 with self.open(support.TESTFN, "wb") as f:
493 f.write(b"xxx")
494 with self.open(support.TESTFN, "rb") as f:
495 self.assertEqual(f.read(), b"xxx")
496
497 def test_array_writes(self):
498 a = array.array(b'i', range(10))
499 n = len(a.tostring())
500 with self.open(support.TESTFN, "wb", 0) as f:
501 self.assertEqual(f.write(a), n)
502 with self.open(support.TESTFN, "wb") as f:
503 self.assertEqual(f.write(a), n)
504
505 def test_closefd(self):
506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
507 closefd=False)
508
509 def test_read_closed(self):
510 with self.open(support.TESTFN, "w") as f:
511 f.write("egg\n")
512 with self.open(support.TESTFN, "r") as f:
513 file = self.open(f.fileno(), "r", closefd=False)
514 self.assertEqual(file.read(), "egg\n")
515 file.seek(0)
516 file.close()
517 self.assertRaises(ValueError, file.read)
518
519 def test_no_closefd_with_filename(self):
520 # can't use closefd in combination with a file name
521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
522
523 def test_closefd_attr(self):
524 with self.open(support.TESTFN, "wb") as f:
525 f.write(b"egg\n")
526 with self.open(support.TESTFN, "r") as f:
527 self.assertEqual(f.buffer.raw.closefd, True)
528 file = self.open(f.fileno(), "r", closefd=False)
529 self.assertEqual(file.buffer.raw.closefd, False)
530
531 def test_garbage_collection(self):
532 # FileIO objects are collected, and collecting them flushes
533 # all data to disk.
534 f = self.FileIO(support.TESTFN, "wb")
535 f.write(b"abcxxx")
536 f.f = f
537 wr = weakref.ref(f)
538 del f
539 support.gc_collect()
540 self.assertTrue(wr() is None, wr)
541 with self.open(support.TESTFN, "rb") as f:
542 self.assertEqual(f.read(), b"abcxxx")
543
544 def test_unbounded_file(self):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546 zero = "/dev/zero"
547 if not os.path.exists(zero):
548 self.skipTest("{0} does not exist".format(zero))
549 if sys.maxsize > 0x7FFFFFFF:
550 self.skipTest("test can only run in a 32-bit address space")
551 if support.real_max_memuse < support._2G:
552 self.skipTest("test requires at least 2GB of memory")
553 with self.open(zero, "rb", buffering=0) as f:
554 self.assertRaises(OverflowError, f.read)
555 with self.open(zero, "rb") as f:
556 self.assertRaises(OverflowError, f.read)
557 with self.open(zero, "r") as f:
558 self.assertRaises(OverflowError, f.read)
559
560 def test_flush_error_on_close(self):
561 f = self.open(support.TESTFN, "wb", buffering=0)
562 def bad_flush():
563 raise IOError()
564 f.flush = bad_flush
565 self.assertRaises(IOError, f.close) # exception not swallowed
566
567 def test_multi_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 f.close()
570 f.close()
571 f.close()
572 self.assertRaises(ValueError, f.flush)
573
574 def test_RawIOBase_read(self):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578 self.assertEqual(rawio.read(2), b"ab")
579 self.assertEqual(rawio.read(2), b"c")
580 self.assertEqual(rawio.read(2), b"d")
581 self.assertEqual(rawio.read(2), None)
582 self.assertEqual(rawio.read(2), b"ef")
583 self.assertEqual(rawio.read(2), b"g")
584 self.assertEqual(rawio.read(2), None)
585 self.assertEqual(rawio.read(2), b"")
586
587 class CIOTest(IOTest):
588 pass
589
590 class PyIOTest(IOTest):
591 test_array_writes = unittest.skip(
592 "len(array.array) returns number of elements rather than bytelength"
593 )(IOTest.test_array_writes)
594
595
596 class CommonBufferedTests:
597 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
598
599 def test_detach(self):
600 raw = self.MockRawIO()
601 buf = self.tp(raw)
602 self.assertIs(buf.detach(), raw)
603 self.assertRaises(ValueError, buf.detach)
604
605 def test_fileno(self):
606 rawio = self.MockRawIO()
607 bufio = self.tp(rawio)
608
609 self.assertEqual(42, bufio.fileno())
610
611 def test_no_fileno(self):
612 # XXX will we always have fileno() function? If so, kill
613 # this test. Else, write it.
614 pass
615
616 def test_invalid_args(self):
617 rawio = self.MockRawIO()
618 bufio = self.tp(rawio)
619 # Invalid whence
620 self.assertRaises(ValueError, bufio.seek, 0, -1)
621 self.assertRaises(ValueError, bufio.seek, 0, 3)
622
623 def test_override_destructor(self):
624 tp = self.tp
625 record = []
626 class MyBufferedIO(tp):
627 def __del__(self):
628 record.append(1)
629 try:
630 f = super(MyBufferedIO, self).__del__
631 except AttributeError:
632 pass
633 else:
634 f()
635 def close(self):
636 record.append(2)
637 super(MyBufferedIO, self).close()
638 def flush(self):
639 record.append(3)
640 super(MyBufferedIO, self).flush()
641 rawio = self.MockRawIO()
642 bufio = MyBufferedIO(rawio)
643 writable = bufio.writable()
644 del bufio
645 support.gc_collect()
646 if writable:
647 self.assertEqual(record, [1, 2, 3])
648 else:
649 self.assertEqual(record, [1, 2])
650
651 def test_context_manager(self):
652 # Test usability as a context manager
653 rawio = self.MockRawIO()
654 bufio = self.tp(rawio)
655 def _with():
656 with bufio:
657 pass
658 _with()
659 # bufio should now be closed, and using it a second time should raise
660 # a ValueError.
661 self.assertRaises(ValueError, _with)
662
663 def test_error_through_destructor(self):
664 # Test that the exception state is not modified by a destructor,
665 # even if close() fails.
666 rawio = self.CloseFailureIO()
667 def f():
668 self.tp(rawio).xyzzy
669 with support.captured_output("stderr") as s:
670 self.assertRaises(AttributeError, f)
671 s = s.getvalue().strip()
672 if s:
673 # The destructor *may* have printed an unraisable error, check it
674 self.assertEqual(len(s.splitlines()), 1)
675 self.assertTrue(s.startswith("Exception IOError: "), s)
676 self.assertTrue(s.endswith(" ignored"), s)
677
678 def test_repr(self):
679 raw = self.MockRawIO()
680 b = self.tp(raw)
681 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
682 self.assertEqual(repr(b), "<%s>" % clsname)
683 raw.name = "dummy"
684 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
685 raw.name = b"dummy"
686 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
687
688 def test_flush_error_on_close(self):
689 raw = self.MockRawIO()
690 def bad_flush():
691 raise IOError()
692 raw.flush = bad_flush
693 b = self.tp(raw)
694 self.assertRaises(IOError, b.close) # exception not swallowed
695
696 def test_multi_close(self):
697 raw = self.MockRawIO()
698 b = self.tp(raw)
699 b.close()
700 b.close()
701 b.close()
702 self.assertRaises(ValueError, b.flush)
703
704 def test_readonly_attributes(self):
705 raw = self.MockRawIO()
706 buf = self.tp(raw)
707 x = self.MockRawIO()
708 with self.assertRaises((AttributeError, TypeError)):
709 buf.raw = x
710
711
712 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
713 read_mode = "rb"
714
715 def test_constructor(self):
716 rawio = self.MockRawIO([b"abc"])
717 bufio = self.tp(rawio)
718 bufio.__init__(rawio)
719 bufio.__init__(rawio, buffer_size=1024)
720 bufio.__init__(rawio, buffer_size=16)
721 self.assertEqual(b"abc", bufio.read())
722 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
723 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
724 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
725 rawio = self.MockRawIO([b"abc"])
726 bufio.__init__(rawio)
727 self.assertEqual(b"abc", bufio.read())
728
729 def test_read(self):
730 for arg in (None, 7):
731 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
732 bufio = self.tp(rawio)
733 self.assertEqual(b"abcdefg", bufio.read(arg))
734 # Invalid args
735 self.assertRaises(ValueError, bufio.read, -2)
736
737 def test_read1(self):
738 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
739 bufio = self.tp(rawio)
740 self.assertEqual(b"a", bufio.read(1))
741 self.assertEqual(b"b", bufio.read1(1))
742 self.assertEqual(rawio._reads, 1)
743 self.assertEqual(b"c", bufio.read1(100))
744 self.assertEqual(rawio._reads, 1)
745 self.assertEqual(b"d", bufio.read1(100))
746 self.assertEqual(rawio._reads, 2)
747 self.assertEqual(b"efg", bufio.read1(100))
748 self.assertEqual(rawio._reads, 3)
749 self.assertEqual(b"", bufio.read1(100))
750 self.assertEqual(rawio._reads, 4)
751 # Invalid args
752 self.assertRaises(ValueError, bufio.read1, -1)
753
754 def test_readinto(self):
755 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
756 bufio = self.tp(rawio)
757 b = bytearray(2)
758 self.assertEqual(bufio.readinto(b), 2)
759 self.assertEqual(b, b"ab")
760 self.assertEqual(bufio.readinto(b), 2)
761 self.assertEqual(b, b"cd")
762 self.assertEqual(bufio.readinto(b), 2)
763 self.assertEqual(b, b"ef")
764 self.assertEqual(bufio.readinto(b), 1)
765 self.assertEqual(b, b"gf")
766 self.assertEqual(bufio.readinto(b), 0)
767 self.assertEqual(b, b"gf")
768
769 def test_readlines(self):
770 def bufio():
771 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
772 return self.tp(rawio)
773 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
774 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
775 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
776
777 def test_buffering(self):
778 data = b"abcdefghi"
779 dlen = len(data)
780
781 tests = [
782 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
783 [ 100, [ 3, 3, 3], [ dlen ] ],
784 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
785 ]
786
787 for bufsize, buf_read_sizes, raw_read_sizes in tests:
788 rawio = self.MockFileIO(data)
789 bufio = self.tp(rawio, buffer_size=bufsize)
790 pos = 0
791 for nbytes in buf_read_sizes:
792 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
793 pos += nbytes
794 # this is mildly implementation-dependent
795 self.assertEqual(rawio.read_history, raw_read_sizes)
796
797 def test_read_non_blocking(self):
798 # Inject some None's in there to simulate EWOULDBLOCK
799 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
800 bufio = self.tp(rawio)
801 self.assertEqual(b"abcd", bufio.read(6))
802 self.assertEqual(b"e", bufio.read(1))
803 self.assertEqual(b"fg", bufio.read())
804 self.assertEqual(b"", bufio.peek(1))
805 self.assertIsNone(bufio.read())
806 self.assertEqual(b"", bufio.read())
807
808 rawio = self.MockRawIO((b"a", None, None))
809 self.assertEqual(b"a", rawio.readall())
810 self.assertIsNone(rawio.readall())
811
812 def test_read_past_eof(self):
813 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
814 bufio = self.tp(rawio)
815
816 self.assertEqual(b"abcdefg", bufio.read(9000))
817
818 def test_read_all(self):
819 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
820 bufio = self.tp(rawio)
821
822 self.assertEqual(b"abcdefg", bufio.read())
823
824 @unittest.skipUnless(threading, 'Threading required for this test.')
825 @support.requires_resource('cpu')
826 def test_threads(self):
827 try:
828 # Write out many bytes with exactly the same number of 0's,
829 # 1's... 255's. This will help us check that concurrent reading
830 # doesn't duplicate or forget contents.
831 N = 1000
832 l = list(range(256)) * N
833 random.shuffle(l)
834 s = bytes(bytearray(l))
835 with self.open(support.TESTFN, "wb") as f:
836 f.write(s)
837 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
838 bufio = self.tp(raw, 8)
839 errors = []
840 results = []
841 def f():
842 try:
843 # Intra-buffer read then buffer-flushing read
844 for n in cycle([1, 19]):
845 s = bufio.read(n)
846 if not s:
847 break
848 # list.append() is atomic
849 results.append(s)
850 except Exception as e:
851 errors.append(e)
852 raise
853 threads = [threading.Thread(target=f) for x in range(20)]
854 for t in threads:
855 t.start()
856 time.sleep(0.02) # yield
857 for t in threads:
858 t.join()
859 self.assertFalse(errors,
860 "the following exceptions were caught: %r" % errors)
861 s = b''.join(results)
862 for i in range(256):
863 c = bytes(bytearray([i]))
864 self.assertEqual(s.count(c), N)
865 finally:
866 support.unlink(support.TESTFN)
867
868 def test_misbehaved_io(self):
869 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
870 bufio = self.tp(rawio)
871 self.assertRaises(IOError, bufio.seek, 0)
872 self.assertRaises(IOError, bufio.tell)
873
874 def test_no_extraneous_read(self):
875 # Issue #9550; when the raw IO object has satisfied the read request,
876 # we should not issue any additional reads, otherwise it may block
877 # (e.g. socket).
878 bufsize = 16
879 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
880 rawio = self.MockRawIO([b"x" * n])
881 bufio = self.tp(rawio, bufsize)
882 self.assertEqual(bufio.read(n), b"x" * n)
883 # Simple case: one raw read is enough to satisfy the request.
884 self.assertEqual(rawio._extraneous_reads, 0,
885 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
886 # A more complex case where two raw reads are needed to satisfy
887 # the request.
888 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
889 bufio = self.tp(rawio, bufsize)
890 self.assertEqual(bufio.read(n), b"x" * n)
891 self.assertEqual(rawio._extraneous_reads, 0,
892 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
893
894
895 class CBufferedReaderTest(BufferedReaderTest):
896 tp = io.BufferedReader
897
898 def test_constructor(self):
899 BufferedReaderTest.test_constructor(self)
900 # The allocation can succeed on 32-bit builds, e.g. with more
901 # than 2GB RAM and a 64-bit kernel.
902 if sys.maxsize > 0x7FFFFFFF:
903 rawio = self.MockRawIO()
904 bufio = self.tp(rawio)
905 self.assertRaises((OverflowError, MemoryError, ValueError),
906 bufio.__init__, rawio, sys.maxsize)
907
908 def test_initialization(self):
909 rawio = self.MockRawIO([b"abc"])
910 bufio = self.tp(rawio)
911 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
912 self.assertRaises(ValueError, bufio.read)
913 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
914 self.assertRaises(ValueError, bufio.read)
915 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
916 self.assertRaises(ValueError, bufio.read)
917
918 def test_misbehaved_io_read(self):
919 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
920 bufio = self.tp(rawio)
921 # _pyio.BufferedReader seems to implement reading different, so that
922 # checking this is not so easy.
923 self.assertRaises(IOError, bufio.read, 10)
924
925 def test_garbage_collection(self):
926 # C BufferedReader objects are collected.
927 # The Python version has __del__, so it ends into gc.garbage instead
928 rawio = self.FileIO(support.TESTFN, "w+b")
929 f = self.tp(rawio)
930 f.f = f
931 wr = weakref.ref(f)
932 del f
933 support.gc_collect()
934 self.assertTrue(wr() is None, wr)
935
936 class PyBufferedReaderTest(BufferedReaderTest):
937 tp = pyio.BufferedReader
938
939
940 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
941 write_mode = "wb"
942
943 def test_constructor(self):
944 rawio = self.MockRawIO()
945 bufio = self.tp(rawio)
946 bufio.__init__(rawio)
947 bufio.__init__(rawio, buffer_size=1024)
948 bufio.__init__(rawio, buffer_size=16)
949 self.assertEqual(3, bufio.write(b"abc"))
950 bufio.flush()
951 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
952 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
953 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
954 bufio.__init__(rawio)
955 self.assertEqual(3, bufio.write(b"ghi"))
956 bufio.flush()
957 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
958
959 def test_detach_flush(self):
960 raw = self.MockRawIO()
961 buf = self.tp(raw)
962 buf.write(b"howdy!")
963 self.assertFalse(raw._write_stack)
964 buf.detach()
965 self.assertEqual(raw._write_stack, [b"howdy!"])
966
967 def test_write(self):
968 # Write to the buffered IO but don't overflow the buffer.
969 writer = self.MockRawIO()
970 bufio = self.tp(writer, 8)
971 bufio.write(b"abc")
972 self.assertFalse(writer._write_stack)
973
974 def test_write_overflow(self):
975 writer = self.MockRawIO()
976 bufio = self.tp(writer, 8)
977 contents = b"abcdefghijklmnop"
978 for n in range(0, len(contents), 3):
979 bufio.write(contents[n:n+3])
980 flushed = b"".join(writer._write_stack)
981 # At least (total - 8) bytes were implicitly flushed, perhaps more
982 # depending on the implementation.
983 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
984
985 def check_writes(self, intermediate_func):
986 # Lots of writes, test the flushed output is as expected.
987 contents = bytes(range(256)) * 1000
988 n = 0
989 writer = self.MockRawIO()
990 bufio = self.tp(writer, 13)
991 # Generator of write sizes: repeat each N 15 times then proceed to N+1
992 def gen_sizes():
993 for size in count(1):
994 for i in range(15):
995 yield size
996 sizes = gen_sizes()
997 while n < len(contents):
998 size = min(next(sizes), len(contents) - n)
999 self.assertEqual(bufio.write(contents[n:n+size]), size)
1000 intermediate_func(bufio)
1001 n += size
1002 bufio.flush()
1003 self.assertEqual(contents,
1004 b"".join(writer._write_stack))
1005
1006 def test_writes(self):
1007 self.check_writes(lambda bufio: None)
1008
1009 def test_writes_and_flushes(self):
1010 self.check_writes(lambda bufio: bufio.flush())
1011
1012 def test_writes_and_seeks(self):
1013 def _seekabs(bufio):
1014 pos = bufio.tell()
1015 bufio.seek(pos + 1, 0)
1016 bufio.seek(pos - 1, 0)
1017 bufio.seek(pos, 0)
1018 self.check_writes(_seekabs)
1019 def _seekrel(bufio):
1020 pos = bufio.seek(0, 1)
1021 bufio.seek(+1, 1)
1022 bufio.seek(-1, 1)
1023 bufio.seek(pos, 0)
1024 self.check_writes(_seekrel)
1025
1026 def test_writes_and_truncates(self):
1027 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1028
1029 def test_write_non_blocking(self):
1030 raw = self.MockNonBlockWriterIO()
1031 bufio = self.tp(raw, 8)
1032
1033 self.assertEqual(bufio.write(b"abcd"), 4)
1034 self.assertEqual(bufio.write(b"efghi"), 5)
1035 # 1 byte will be written, the rest will be buffered
1036 raw.block_on(b"k")
1037 self.assertEqual(bufio.write(b"jklmn"), 5)
1038
1039 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1040 raw.block_on(b"0")
1041 try:
1042 bufio.write(b"opqrwxyz0123456789")
1043 except self.BlockingIOError as e:
1044 written = e.characters_written
1045 else:
1046 self.fail("BlockingIOError should have been raised")
1047 self.assertEqual(written, 16)
1048 self.assertEqual(raw.pop_written(),
1049 b"abcdefghijklmnopqrwxyz")
1050
1051 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1052 s = raw.pop_written()
1053 # Previously buffered bytes were flushed
1054 self.assertTrue(s.startswith(b"01234567A"), s)
1055
1056 def test_write_and_rewind(self):
1057 raw = io.BytesIO()
1058 bufio = self.tp(raw, 4)
1059 self.assertEqual(bufio.write(b"abcdef"), 6)
1060 self.assertEqual(bufio.tell(), 6)
1061 bufio.seek(0, 0)
1062 self.assertEqual(bufio.write(b"XY"), 2)
1063 bufio.seek(6, 0)
1064 self.assertEqual(raw.getvalue(), b"XYcdef")
1065 self.assertEqual(bufio.write(b"123456"), 6)
1066 bufio.flush()
1067 self.assertEqual(raw.getvalue(), b"XYcdef123456")
1068
1069 def test_flush(self):
1070 writer = self.MockRawIO()
1071 bufio = self.tp(writer, 8)
1072 bufio.write(b"abc")
1073 bufio.flush()
1074 self.assertEqual(b"abc", writer._write_stack[0])
1075
1076 def test_destructor(self):
1077 writer = self.MockRawIO()
1078 bufio = self.tp(writer, 8)
1079 bufio.write(b"abc")
1080 del bufio
1081 support.gc_collect()
1082 self.assertEqual(b"abc", writer._write_stack[0])
1083
1084 def test_truncate(self):
1085 # Truncate implicitly flushes the buffer.
1086 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1087 bufio = self.tp(raw, 8)
1088 bufio.write(b"abcdef")
1089 self.assertEqual(bufio.truncate(3), 3)
1090 self.assertEqual(bufio.tell(), 6)
1091 with self.open(support.TESTFN, "rb", buffering=0) as f:
1092 self.assertEqual(f.read(), b"abc")
1093
1094 @unittest.skipUnless(threading, 'Threading required for this test.')
1095 @support.requires_resource('cpu')
1096 def test_threads(self):
1097 try:
1098 # Write out many bytes from many threads and test they were
1099 # all flushed.
1100 N = 1000
1101 contents = bytes(range(256)) * N
1102 sizes = cycle([1, 19])
1103 n = 0
1104 queue = deque()
1105 while n < len(contents):
1106 size = next(sizes)
1107 queue.append(contents[n:n+size])
1108 n += size
1109 del contents
1110 # We use a real file object because it allows us to
1111 # exercise situations where the GIL is released before
1112 # writing the buffer to the raw streams. This is in addition
1113 # to concurrency issues due to switching threads in the middle
1114 # of Python code.
1115 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1116 bufio = self.tp(raw, 8)
1117 errors = []
1118 def f():
1119 try:
1120 while True:
1121 try:
1122 s = queue.popleft()
1123 except IndexError:
1124 return
1125 bufio.write(s)
1126 except Exception as e:
1127 errors.append(e)
1128 raise
1129 threads = [threading.Thread(target=f) for x in range(20)]
1130 for t in threads:
1131 t.start()
1132 time.sleep(0.02) # yield
1133 for t in threads:
1134 t.join()
1135 self.assertFalse(errors,
1136 "the following exceptions were caught: %r" % errors)
1137 bufio.close()
1138 with self.open(support.TESTFN, "rb") as f:
1139 s = f.read()
1140 for i in range(256):
1141 self.assertEqual(s.count(bytes([i])), N)
1142 finally:
1143 support.unlink(support.TESTFN)
1144
1145 def test_misbehaved_io(self):
1146 rawio = self.MisbehavedRawIO()
1147 bufio = self.tp(rawio, 5)
1148 self.assertRaises(IOError, bufio.seek, 0)
1149 self.assertRaises(IOError, bufio.tell)
1150 self.assertRaises(IOError, bufio.write, b"abcdef")
1151
1152 def test_max_buffer_size_deprecation(self):
1153 with support.check_warnings(("max_buffer_size is deprecated",
1154 DeprecationWarning)):
1155 self.tp(self.MockRawIO(), 8, 12)
1156
1157
1158 class CBufferedWriterTest(BufferedWriterTest):
1159 tp = io.BufferedWriter
1160
1161 def test_constructor(self):
1162 BufferedWriterTest.test_constructor(self)
1163 # The allocation can succeed on 32-bit builds, e.g. with more
1164 # than 2GB RAM and a 64-bit kernel.
1165 if sys.maxsize > 0x7FFFFFFF:
1166 rawio = self.MockRawIO()
1167 bufio = self.tp(rawio)
1168 self.assertRaises((OverflowError, MemoryError, ValueError),
1169 bufio.__init__, rawio, sys.maxsize)
1170
1171 def test_initialization(self):
1172 rawio = self.MockRawIO()
1173 bufio = self.tp(rawio)
1174 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1175 self.assertRaises(ValueError, bufio.write, b"def")
1176 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1177 self.assertRaises(ValueError, bufio.write, b"def")
1178 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1179 self.assertRaises(ValueError, bufio.write, b"def")
1180
1181 def test_garbage_collection(self):
1182 # C BufferedWriter objects are collected, and collecting them flushes
1183 # all data to disk.
1184 # The Python version has __del__, so it ends into gc.garbage instead
1185 rawio = self.FileIO(support.TESTFN, "w+b")
1186 f = self.tp(rawio)
1187 f.write(b"123xxx")
1188 f.x = f
1189 wr = weakref.ref(f)
1190 del f
1191 support.gc_collect()
1192 self.assertTrue(wr() is None, wr)
1193 with self.open(support.TESTFN, "rb") as f:
1194 self.assertEqual(f.read(), b"123xxx")
1195
1196
1197 class PyBufferedWriterTest(BufferedWriterTest):
1198 tp = pyio.BufferedWriter
1199
1200 class BufferedRWPairTest(unittest.TestCase):
1201
1202 def test_constructor(self):
1203 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1204 self.assertFalse(pair.closed)
1205
1206 def test_detach(self):
1207 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1208 self.assertRaises(self.UnsupportedOperation, pair.detach)
1209
1210 def test_constructor_max_buffer_size_deprecation(self):
1211 with support.check_warnings(("max_buffer_size is deprecated",
1212 DeprecationWarning)):
1213 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1214
1215 def test_constructor_with_not_readable(self):
1216 class NotReadable(MockRawIO):
1217 def readable(self):
1218 return False
1219
1220 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1221
1222 def test_constructor_with_not_writeable(self):
1223 class NotWriteable(MockRawIO):
1224 def writable(self):
1225 return False
1226
1227 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1228
1229 def test_read(self):
1230 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1231
1232 self.assertEqual(pair.read(3), b"abc")
1233 self.assertEqual(pair.read(1), b"d")
1234 self.assertEqual(pair.read(), b"ef")
1235 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1236 self.assertEqual(pair.read(None), b"abc")
1237
1238 def test_readlines(self):
1239 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1240 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1241 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1242 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1243
1244 def test_read1(self):
1245 # .read1() is delegated to the underlying reader object, so this test
1246 # can be shallow.
1247 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1248
1249 self.assertEqual(pair.read1(3), b"abc")
1250
1251 def test_readinto(self):
1252 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1253
1254 data = bytearray(5)
1255 self.assertEqual(pair.readinto(data), 5)
1256 self.assertEqual(data, b"abcde")
1257
1258 def test_write(self):
1259 w = self.MockRawIO()
1260 pair = self.tp(self.MockRawIO(), w)
1261
1262 pair.write(b"abc")
1263 pair.flush()
1264 pair.write(b"def")
1265 pair.flush()
1266 self.assertEqual(w._write_stack, [b"abc", b"def"])
1267
1268 def test_peek(self):
1269 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1270
1271 self.assertTrue(pair.peek(3).startswith(b"abc"))
1272 self.assertEqual(pair.read(3), b"abc")
1273
1274 def test_readable(self):
1275 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1276 self.assertTrue(pair.readable())
1277
1278 def test_writeable(self):
1279 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1280 self.assertTrue(pair.writable())
1281
1282 def test_seekable(self):
1283 # BufferedRWPairs are never seekable, even if their readers and writers
1284 # are.
1285 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1286 self.assertFalse(pair.seekable())
1287
1288 # .flush() is delegated to the underlying writer object and has been
1289 # tested in the test_write method.
1290
1291 def test_close_and_closed(self):
1292 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1293 self.assertFalse(pair.closed)
1294 pair.close()
1295 self.assertTrue(pair.closed)
1296
1297 def test_isatty(self):
1298 class SelectableIsAtty(MockRawIO):
1299 def __init__(self, isatty):
1300 MockRawIO.__init__(self)
1301 self._isatty = isatty
1302
1303 def isatty(self):
1304 return self._isatty
1305
1306 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1307 self.assertFalse(pair.isatty())
1308
1309 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1310 self.assertTrue(pair.isatty())
1311
1312 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1313 self.assertTrue(pair.isatty())
1314
1315 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1316 self.assertTrue(pair.isatty())
1317
1318 class CBufferedRWPairTest(BufferedRWPairTest):
1319 tp = io.BufferedRWPair
1320
1321 class PyBufferedRWPairTest(BufferedRWPairTest):
1322 tp = pyio.BufferedRWPair
1323
1324
1325 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1326 read_mode = "rb+"
1327 write_mode = "wb+"
1328
1329 def test_constructor(self):
1330 BufferedReaderTest.test_constructor(self)
1331 BufferedWriterTest.test_constructor(self)
1332
1333 def test_read_and_write(self):
1334 raw = self.MockRawIO((b"asdf", b"ghjk"))
1335 rw = self.tp(raw, 8)
1336
1337 self.assertEqual(b"as", rw.read(2))
1338 rw.write(b"ddd")
1339 rw.write(b"eee")
1340 self.assertFalse(raw._write_stack) # Buffer writes
1341 self.assertEqual(b"ghjk", rw.read())
1342 self.assertEqual(b"dddeee", raw._write_stack[0])
1343
1344 def test_seek_and_tell(self):
1345 raw = self.BytesIO(b"asdfghjkl")
1346 rw = self.tp(raw)
1347
1348 self.assertEqual(b"as", rw.read(2))
1349 self.assertEqual(2, rw.tell())
1350 rw.seek(0, 0)
1351 self.assertEqual(b"asdf", rw.read(4))
1352
1353 rw.write(b"asdf")
1354 rw.seek(0, 0)
1355 self.assertEqual(b"asdfasdfl", rw.read())
1356 self.assertEqual(9, rw.tell())
1357 rw.seek(-4, 2)
1358 self.assertEqual(5, rw.tell())
1359 rw.seek(2, 1)
1360 self.assertEqual(7, rw.tell())
1361 self.assertEqual(b"fl", rw.read(11))
1362 self.assertRaises(TypeError, rw.seek, 0.0)
1363
1364 def check_flush_and_read(self, read_func):
1365 raw = self.BytesIO(b"abcdefghi")
1366 bufio = self.tp(raw)
1367
1368 self.assertEqual(b"ab", read_func(bufio, 2))
1369 bufio.write(b"12")
1370 self.assertEqual(b"ef", read_func(bufio, 2))
1371 self.assertEqual(6, bufio.tell())
1372 bufio.flush()
1373 self.assertEqual(6, bufio.tell())
1374 self.assertEqual(b"ghi", read_func(bufio))
1375 raw.seek(0, 0)
1376 raw.write(b"XYZ")
1377 # flush() resets the read buffer
1378 bufio.flush()
1379 bufio.seek(0, 0)
1380 self.assertEqual(b"XYZ", read_func(bufio, 3))
1381
1382 def test_flush_and_read(self):
1383 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1384
1385 def test_flush_and_readinto(self):
1386 def _readinto(bufio, n=-1):
1387 b = bytearray(n if n >= 0 else 9999)
1388 n = bufio.readinto(b)
1389 return bytes(b[:n])
1390 self.check_flush_and_read(_readinto)
1391
1392 def test_flush_and_peek(self):
1393 def _peek(bufio, n=-1):
1394 # This relies on the fact that the buffer can contain the whole
1395 # raw stream, otherwise peek() can return less.
1396 b = bufio.peek(n)
1397 if n != -1:
1398 b = b[:n]
1399 bufio.seek(len(b), 1)
1400 return b
1401 self.check_flush_and_read(_peek)
1402
1403 def test_flush_and_write(self):
1404 raw = self.BytesIO(b"abcdefghi")
1405 bufio = self.tp(raw)
1406
1407 bufio.write(b"123")
1408 bufio.flush()
1409 bufio.write(b"45")
1410 bufio.flush()
1411 bufio.seek(0, 0)
1412 self.assertEqual(b"12345fghi", raw.getvalue())
1413 self.assertEqual(b"12345fghi", bufio.read())
1414
1415 def test_threads(self):
1416 BufferedReaderTest.test_threads(self)
1417 BufferedWriterTest.test_threads(self)
1418
1419 def test_writes_and_peek(self):
1420 def _peek(bufio):
1421 bufio.peek(1)
1422 self.check_writes(_peek)
1423 def _peek(bufio):
1424 pos = bufio.tell()
1425 bufio.seek(-1, 1)
1426 bufio.peek(1)
1427 bufio.seek(pos, 0)
1428 self.check_writes(_peek)
1429
1430 def test_writes_and_reads(self):
1431 def _read(bufio):
1432 bufio.seek(-1, 1)
1433 bufio.read(1)
1434 self.check_writes(_read)
1435
1436 def test_writes_and_read1s(self):
1437 def _read1(bufio):
1438 bufio.seek(-1, 1)
1439 bufio.read1(1)
1440 self.check_writes(_read1)
1441
1442 def test_writes_and_readintos(self):
1443 def _read(bufio):
1444 bufio.seek(-1, 1)
1445 bufio.readinto(bytearray(1))
1446 self.check_writes(_read)
1447
1448 def test_write_after_readahead(self):
1449 # Issue #6629: writing after the buffer was filled by readahead should
1450 # first rewind the raw stream.
1451 for overwrite_size in [1, 5]:
1452 raw = self.BytesIO(b"A" * 10)
1453 bufio = self.tp(raw, 4)
1454 # Trigger readahead
1455 self.assertEqual(bufio.read(1), b"A")
1456 self.assertEqual(bufio.tell(), 1)
1457 # Overwriting should rewind the raw stream if it needs so
1458 bufio.write(b"B" * overwrite_size)
1459 self.assertEqual(bufio.tell(), overwrite_size + 1)
1460 # If the write size was smaller than the buffer size, flush() and
1461 # check that rewind happens.
1462 bufio.flush()
1463 self.assertEqual(bufio.tell(), overwrite_size + 1)
1464 s = raw.getvalue()
1465 self.assertEqual(s,
1466 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1467
1468 def test_write_rewind_write(self):
1469 # Various combinations of reading / writing / seeking backwards / writing again
1470 def mutate(bufio, pos1, pos2):
1471 assert pos2 >= pos1
1472 # Fill the buffer
1473 bufio.seek(pos1)
1474 bufio.read(pos2 - pos1)
1475 bufio.write(b'\x02')
1476 # This writes earlier than the previous write, but still inside
1477 # the buffer.
1478 bufio.seek(pos1)
1479 bufio.write(b'\x01')
1480
1481 b = b"\x80\x81\x82\x83\x84"
1482 for i in range(0, len(b)):
1483 for j in range(i, len(b)):
1484 raw = self.BytesIO(b)
1485 bufio = self.tp(raw, 100)
1486 mutate(bufio, i, j)
1487 bufio.flush()
1488 expected = bytearray(b)
1489 expected[j] = 2
1490 expected[i] = 1
1491 self.assertEqual(raw.getvalue(), expected,
1492 "failed result for i=%d, j=%d" % (i, j))
1493
1494 def test_truncate_after_read_or_write(self):
1495 raw = self.BytesIO(b"A" * 10)
1496 bufio = self.tp(raw, 100)
1497 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1498 self.assertEqual(bufio.truncate(), 2)
1499 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1500 self.assertEqual(bufio.truncate(), 4)
1501
1502 def test_misbehaved_io(self):
1503 BufferedReaderTest.test_misbehaved_io(self)
1504 BufferedWriterTest.test_misbehaved_io(self)
1505
1506 class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1507 tp = io.BufferedRandom
1508
1509 def test_constructor(self):
1510 BufferedRandomTest.test_constructor(self)
1511 # The allocation can succeed on 32-bit builds, e.g. with more
1512 # than 2GB RAM and a 64-bit kernel.
1513 if sys.maxsize > 0x7FFFFFFF:
1514 rawio = self.MockRawIO()
1515 bufio = self.tp(rawio)
1516 self.assertRaises((OverflowError, MemoryError, ValueError),
1517 bufio.__init__, rawio, sys.maxsize)
1518
1519 def test_garbage_collection(self):
1520 CBufferedReaderTest.test_garbage_collection(self)
1521 CBufferedWriterTest.test_garbage_collection(self)
1522
1523 class PyBufferedRandomTest(BufferedRandomTest):
1524 tp = pyio.BufferedRandom
1525
1526
1527 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1528 # properties:
1529 # - A single output character can correspond to many bytes of input.
1530 # - The number of input bytes to complete the character can be
1531 # undetermined until the last input byte is received.
1532 # - The number of input bytes can vary depending on previous input.
1533 # - A single input byte can correspond to many characters of output.
1534 # - The number of output characters can be undetermined until the
1535 # last input byte is received.
1536 # - The number of output characters can vary depending on previous input.
1537
1538 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1539 """
1540 For testing seek/tell behavior with a stateful, buffering decoder.
1541
1542 Input is a sequence of words. Words may be fixed-length (length set
1543 by input) or variable-length (period-terminated). In variable-length
1544 mode, extra periods are ignored. Possible words are:
1545 - 'i' followed by a number sets the input length, I (maximum 99).
1546 When I is set to 0, words are space-terminated.
1547 - 'o' followed by a number sets the output length, O (maximum 99).
1548 - Any other word is converted into a word followed by a period on
1549 the output. The output word consists of the input word truncated
1550 or padded out with hyphens to make its length equal to O. If O
1551 is 0, the word is output verbatim without truncating or padding.
1552 I and O are initially set to 1. When I changes, any buffered input is
1553 re-scanned according to the new I. EOF also terminates the last word.
1554 """
1555
1556 def __init__(self, errors='strict'):
1557 codecs.IncrementalDecoder.__init__(self, errors)
1558 self.reset()
1559
1560 def __repr__(self):
1561 return '<SID %x>' % id(self)
1562
1563 def reset(self):
1564 self.i = 1
1565 self.o = 1
1566 self.buffer = bytearray()
1567
1568 def getstate(self):
1569 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1570 return bytes(self.buffer), i*100 + o
1571
1572 def setstate(self, state):
1573 buffer, io = state
1574 self.buffer = bytearray(buffer)
1575 i, o = divmod(io, 100)
1576 self.i, self.o = i ^ 1, o ^ 1
1577
1578 def decode(self, input, final=False):
1579 output = ''
1580 for b in input:
1581 if self.i == 0: # variable-length, terminated with period
1582 if b == '.':
1583 if self.buffer:
1584 output += self.process_word()
1585 else:
1586 self.buffer.append(b)
1587 else: # fixed-length, terminate after self.i bytes
1588 self.buffer.append(b)
1589 if len(self.buffer) == self.i:
1590 output += self.process_word()
1591 if final and self.buffer: # EOF terminates the last word
1592 output += self.process_word()
1593 return output
1594
1595 def process_word(self):
1596 output = ''
1597 if self.buffer[0] == ord('i'):
1598 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1599 elif self.buffer[0] == ord('o'):
1600 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1601 else:
1602 output = self.buffer.decode('ascii')
1603 if len(output) < self.o:
1604 output += '-'*self.o # pad out with hyphens
1605 if self.o:
1606 output = output[:self.o] # truncate to output length
1607 output += '.'
1608 self.buffer = bytearray()
1609 return output
1610
1611 codecEnabled = False
1612
1613 @classmethod
1614 def lookupTestDecoder(cls, name):
1615 if cls.codecEnabled and name == 'test_decoder':
1616 latin1 = codecs.lookup('latin-1')
1617 return codecs.CodecInfo(
1618 name='test_decoder', encode=latin1.encode, decode=None,
1619 incrementalencoder=None,
1620 streamreader=None, streamwriter=None,
1621 incrementaldecoder=cls)
1622
1623 # Register the previous decoder for testing.
1624 # Disabled by default, tests will enable it.
1625 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1626
1627
1628 class StatefulIncrementalDecoderTest(unittest.TestCase):
1629 """
1630 Make sure the StatefulIncrementalDecoder actually works.
1631 """
1632
1633 test_cases = [
1634 # I=1, O=1 (fixed-length input == fixed-length output)
1635 (b'abcd', False, 'a.b.c.d.'),
1636 # I=0, O=0 (variable-length input, variable-length output)
1637 (b'oiabcd', True, 'abcd.'),
1638 # I=0, O=0 (should ignore extra periods)
1639 (b'oi...abcd...', True, 'abcd.'),
1640 # I=0, O=6 (variable-length input, fixed-length output)
1641 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1642 # I=2, O=6 (fixed-length input < fixed-length output)
1643 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1644 # I=6, O=3 (fixed-length input > fixed-length output)
1645 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1646 # I=0, then 3; O=29, then 15 (with longer output)
1647 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1648 'a----------------------------.' +
1649 'b----------------------------.' +
1650 'cde--------------------------.' +
1651 'abcdefghijabcde.' +
1652 'a.b------------.' +
1653 '.c.------------.' +
1654 'd.e------------.' +
1655 'k--------------.' +
1656 'l--------------.' +
1657 'm--------------.')
1658 ]
1659
1660 def test_decoder(self):
1661 # Try a few one-shot test cases.
1662 for input, eof, output in self.test_cases:
1663 d = StatefulIncrementalDecoder()
1664 self.assertEqual(d.decode(input, eof), output)
1665
1666 # Also test an unfinished decode, followed by forcing EOF.
1667 d = StatefulIncrementalDecoder()
1668 self.assertEqual(d.decode(b'oiabcd'), '')
1669 self.assertEqual(d.decode(b'', 1), 'abcd.')
1670
1671 class TextIOWrapperTest(unittest.TestCase):
1672
1673 def setUp(self):
1674 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1675 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1676 support.unlink(support.TESTFN)
1677
1678 def tearDown(self):
1679 support.unlink(support.TESTFN)
1680
1681 def test_constructor(self):
1682 r = self.BytesIO(b"\xc3\xa9\n\n")
1683 b = self.BufferedReader(r, 1000)
1684 t = self.TextIOWrapper(b)
1685 t.__init__(b, encoding="latin1", newline="\r\n")
1686 self.assertEqual(t.encoding, "latin1")
1687 self.assertEqual(t.line_buffering, False)
1688 t.__init__(b, encoding="utf8", line_buffering=True)
1689 self.assertEqual(t.encoding, "utf8")
1690 self.assertEqual(t.line_buffering, True)
1691 self.assertEqual("\xe9\n", t.readline())
1692 self.assertRaises(TypeError, t.__init__, b, newline=42)
1693 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1694
1695 def test_detach(self):
1696 r = self.BytesIO()
1697 b = self.BufferedWriter(r)
1698 t = self.TextIOWrapper(b)
1699 self.assertIs(t.detach(), b)
1700
1701 t = self.TextIOWrapper(b, encoding="ascii")
1702 t.write("howdy")
1703 self.assertFalse(r.getvalue())
1704 t.detach()
1705 self.assertEqual(r.getvalue(), b"howdy")
1706 self.assertRaises(ValueError, t.detach)
1707
1708 def test_repr(self):
1709 raw = self.BytesIO("hello".encode("utf-8"))
1710 b = self.BufferedReader(raw)
1711 t = self.TextIOWrapper(b, encoding="utf-8")
1712 modname = self.TextIOWrapper.__module__
1713 self.assertEqual(repr(t),
1714 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1715 raw.name = "dummy"
1716 self.assertEqual(repr(t),
1717 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1718 raw.name = b"dummy"
1719 self.assertEqual(repr(t),
1720 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1721
1722 def test_line_buffering(self):
1723 r = self.BytesIO()
1724 b = self.BufferedWriter(r, 1000)
1725 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1726 t.write("X")
1727 self.assertEqual(r.getvalue(), b"") # No flush happened
1728 t.write("Y\nZ")
1729 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
1730 t.write("A\rB")
1731 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
1732
1733 def test_encoding(self):
1734 # Check the encoding attribute is always set, and valid
1735 b = self.BytesIO()
1736 t = self.TextIOWrapper(b, encoding="utf8")
1737 self.assertEqual(t.encoding, "utf8")
1738 t = self.TextIOWrapper(b)
1739 self.assertTrue(t.encoding is not None)
1740 codecs.lookup(t.encoding)
1741
1742 def test_encoding_errors_reading(self):
1743 # (1) default
1744 b = self.BytesIO(b"abc\n\xff\n")
1745 t = self.TextIOWrapper(b, encoding="ascii")
1746 self.assertRaises(UnicodeError, t.read)
1747 # (2) explicit strict
1748 b = self.BytesIO(b"abc\n\xff\n")
1749 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1750 self.assertRaises(UnicodeError, t.read)
1751 # (3) ignore
1752 b = self.BytesIO(b"abc\n\xff\n")
1753 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
1754 self.assertEqual(t.read(), "abc\n\n")
1755 # (4) replace
1756 b = self.BytesIO(b"abc\n\xff\n")
1757 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1758 self.assertEqual(t.read(), "abc\n\ufffd\n")
1759
1760 def test_encoding_errors_writing(self):
1761 # (1) default
1762 b = self.BytesIO()
1763 t = self.TextIOWrapper(b, encoding="ascii")
1764 self.assertRaises(UnicodeError, t.write, "\xff")
1765 # (2) explicit strict
1766 b = self.BytesIO()
1767 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1768 self.assertRaises(UnicodeError, t.write, "\xff")
1769 # (3) ignore
1770 b = self.BytesIO()
1771 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
1772 newline="\n")
1773 t.write("abc\xffdef\n")
1774 t.flush()
1775 self.assertEqual(b.getvalue(), b"abcdef\n")
1776 # (4) replace
1777 b = self.BytesIO()
1778 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
1779 newline="\n")
1780 t.write("abc\xffdef\n")
1781 t.flush()
1782 self.assertEqual(b.getvalue(), b"abc?def\n")
1783
1784 def test_newlines(self):
1785 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1786
1787 tests = [
1788 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1789 [ '', input_lines ],
1790 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1791 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1792 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1793 ]
1794 encodings = (
1795 'utf-8', 'latin-1',
1796 'utf-16', 'utf-16-le', 'utf-16-be',
1797 'utf-32', 'utf-32-le', 'utf-32-be',
1798 )
1799
1800 # Try a range of buffer sizes to test the case where \r is the last
1801 # character in TextIOWrapper._pending_line.
1802 for encoding in encodings:
1803 # XXX: str.encode() should return bytes
1804 data = bytes(''.join(input_lines).encode(encoding))
1805 for do_reads in (False, True):
1806 for bufsize in range(1, 10):
1807 for newline, exp_lines in tests:
1808 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1809 textio = self.TextIOWrapper(bufio, newline=newline,
1810 encoding=encoding)
1811 if do_reads:
1812 got_lines = []
1813 while True:
1814 c2 = textio.read(2)
1815 if c2 == '':
1816 break
1817 self.assertEqual(len(c2), 2)
1818 got_lines.append(c2 + textio.readline())
1819 else:
1820 got_lines = list(textio)
1821
1822 for got_line, exp_line in zip(got_lines, exp_lines):
1823 self.assertEqual(got_line, exp_line)
1824 self.assertEqual(len(got_lines), len(exp_lines))
1825
1826 def test_newlines_input(self):
1827 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1828 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1829 for newline, expected in [
1830 (None, normalized.decode("ascii").splitlines(True)),
1831 ("", testdata.decode("ascii").splitlines(True)),
1832 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1833 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1834 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1835 ]:
1836 buf = self.BytesIO(testdata)
1837 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1838 self.assertEqual(txt.readlines(), expected)
1839 txt.seek(0)
1840 self.assertEqual(txt.read(), "".join(expected))
1841
1842 def test_newlines_output(self):
1843 testdict = {
1844 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1845 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1846 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1847 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1848 }
1849 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1850 for newline, expected in tests:
1851 buf = self.BytesIO()
1852 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1853 txt.write("AAA\nB")
1854 txt.write("BB\nCCC\n")
1855 txt.write("X\rY\r\nZ")
1856 txt.flush()
1857 self.assertEqual(buf.closed, False)
1858 self.assertEqual(buf.getvalue(), expected)
1859
1860 def test_destructor(self):
1861 l = []
1862 base = self.BytesIO
1863 class MyBytesIO(base):
1864 def close(self):
1865 l.append(self.getvalue())
1866 base.close(self)
1867 b = MyBytesIO()
1868 t = self.TextIOWrapper(b, encoding="ascii")
1869 t.write("abc")
1870 del t
1871 support.gc_collect()
1872 self.assertEqual([b"abc"], l)
1873
1874 def test_override_destructor(self):
1875 record = []
1876 class MyTextIO(self.TextIOWrapper):
1877 def __del__(self):
1878 record.append(1)
1879 try:
1880 f = super(MyTextIO, self).__del__
1881 except AttributeError:
1882 pass
1883 else:
1884 f()
1885 def close(self):
1886 record.append(2)
1887 super(MyTextIO, self).close()
1888 def flush(self):
1889 record.append(3)
1890 super(MyTextIO, self).flush()
1891 b = self.BytesIO()
1892 t = MyTextIO(b, encoding="ascii")
1893 del t
1894 support.gc_collect()
1895 self.assertEqual(record, [1, 2, 3])
1896
1897 def test_error_through_destructor(self):
1898 # Test that the exception state is not modified by a destructor,
1899 # even if close() fails.
1900 rawio = self.CloseFailureIO()
1901 def f():
1902 self.TextIOWrapper(rawio).xyzzy
1903 with support.captured_output("stderr") as s:
1904 self.assertRaises(AttributeError, f)
1905 s = s.getvalue().strip()
1906 if s:
1907 # The destructor *may* have printed an unraisable error, check it
1908 self.assertEqual(len(s.splitlines()), 1)
1909 self.assertTrue(s.startswith("Exception IOError: "), s)
1910 self.assertTrue(s.endswith(" ignored"), s)
1911
1912 # Systematic tests of the text I/O API
1913
1914 def test_basic_io(self):
1915 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1916 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1917 f = self.open(support.TESTFN, "w+", encoding=enc)
1918 f._CHUNK_SIZE = chunksize
1919 self.assertEqual(f.write("abc"), 3)
1920 f.close()
1921 f = self.open(support.TESTFN, "r+", encoding=enc)
1922 f._CHUNK_SIZE = chunksize
1923 self.assertEqual(f.tell(), 0)
1924 self.assertEqual(f.read(), "abc")
1925 cookie = f.tell()
1926 self.assertEqual(f.seek(0), 0)
1927 self.assertEqual(f.read(None), "abc")
1928 f.seek(0)
1929 self.assertEqual(f.read(2), "ab")
1930 self.assertEqual(f.read(1), "c")
1931 self.assertEqual(f.read(1), "")
1932 self.assertEqual(f.read(), "")
1933 self.assertEqual(f.tell(), cookie)
1934 self.assertEqual(f.seek(0), 0)
1935 self.assertEqual(f.seek(0, 2), cookie)
1936 self.assertEqual(f.write("def"), 3)
1937 self.assertEqual(f.seek(cookie), cookie)
1938 self.assertEqual(f.read(), "def")
1939 if enc.startswith("utf"):
1940 self.multi_line_test(f, enc)
1941 f.close()
1942
1943 def multi_line_test(self, f, enc):
1944 f.seek(0)
1945 f.truncate()
1946 sample = "s\xff\u0fff\uffff"
1947 wlines = []
1948 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1949 chars = []
1950 for i in range(size):
1951 chars.append(sample[i % len(sample)])
1952 line = "".join(chars) + "\n"
1953 wlines.append((f.tell(), line))
1954 f.write(line)
1955 f.seek(0)
1956 rlines = []
1957 while True:
1958 pos = f.tell()
1959 line = f.readline()
1960 if not line:
1961 break
1962 rlines.append((pos, line))
1963 self.assertEqual(rlines, wlines)
1964
1965 def test_telling(self):
1966 f = self.open(support.TESTFN, "w+", encoding="utf8")
1967 p0 = f.tell()
1968 f.write("\xff\n")
1969 p1 = f.tell()
1970 f.write("\xff\n")
1971 p2 = f.tell()
1972 f.seek(0)
1973 self.assertEqual(f.tell(), p0)
1974 self.assertEqual(f.readline(), "\xff\n")
1975 self.assertEqual(f.tell(), p1)
1976 self.assertEqual(f.readline(), "\xff\n")
1977 self.assertEqual(f.tell(), p2)
1978 f.seek(0)
1979 for line in f:
1980 self.assertEqual(line, "\xff\n")
1981 self.assertRaises(IOError, f.tell)
1982 self.assertEqual(f.tell(), p2)
1983 f.close()
1984
1985 def test_seeking(self):
1986 chunk_size = _default_chunk_size()
1987 prefix_size = chunk_size - 2
1988 u_prefix = "a" * prefix_size
1989 prefix = bytes(u_prefix.encode("utf-8"))
1990 self.assertEqual(len(u_prefix), len(prefix))
1991 u_suffix = "\u8888\n"
1992 suffix = bytes(u_suffix.encode("utf-8"))
1993 line = prefix + suffix
1994 f = self.open(support.TESTFN, "wb")
1995 f.write(line*2)
1996 f.close()
1997 f = self.open(support.TESTFN, "r", encoding="utf-8")
1998 s = f.read(prefix_size)
1999 self.assertEqual(s, prefix.decode("ascii"))
2000 self.assertEqual(f.tell(), prefix_size)
2001 self.assertEqual(f.readline(), u_suffix)
2002
2003 def test_seeking_too(self):
2004 # Regression test for a specific bug
2005 data = b'\xe0\xbf\xbf\n'
2006 f = self.open(support.TESTFN, "wb")
2007 f.write(data)
2008 f.close()
2009 f = self.open(support.TESTFN, "r", encoding="utf-8")
2010 f._CHUNK_SIZE # Just test that it exists
2011 f._CHUNK_SIZE = 2
2012 f.readline()
2013 f.tell()
2014
2015 def test_seek_and_tell(self):
2016 #Test seek/tell using the StatefulIncrementalDecoder.
2017 # Make test faster by doing smaller seeks
2018 CHUNK_SIZE = 128
2019
2020 def test_seek_and_tell_with_data(data, min_pos=0):
2021 """Tell/seek to various points within a data stream and ensure
2022 that the decoded data returned by read() is consistent."""
2023 f = self.open(support.TESTFN, 'wb')
2024 f.write(data)
2025 f.close()
2026 f = self.open(support.TESTFN, encoding='test_decoder')
2027 f._CHUNK_SIZE = CHUNK_SIZE
2028 decoded = f.read()
2029 f.close()
2030
2031 for i in range(min_pos, len(decoded) + 1): # seek positions
2032 for j in [1, 5, len(decoded) - i]: # read lengths
2033 f = self.open(support.TESTFN, encoding='test_decoder')
2034 self.assertEqual(f.read(i), decoded[:i])
2035 cookie = f.tell()
2036 self.assertEqual(f.read(j), decoded[i:i + j])
2037 f.seek(cookie)
2038 self.assertEqual(f.read(), decoded[i:])
2039 f.close()
2040
2041 # Enable the test decoder.
2042 StatefulIncrementalDecoder.codecEnabled = 1
2043
2044 # Run the tests.
2045 try:
2046 # Try each test case.
2047 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2048 test_seek_and_tell_with_data(input)
2049
2050 # Position each test case so that it crosses a chunk boundary.
2051 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2052 offset = CHUNK_SIZE - len(input)//2
2053 prefix = b'.'*offset
2054 # Don't bother seeking into the prefix (takes too long).
2055 min_pos = offset*2
2056 test_seek_and_tell_with_data(prefix + input, min_pos)
2057
2058 # Ensure our test decoder won't interfere with subsequent tests.
2059 finally:
2060 StatefulIncrementalDecoder.codecEnabled = 0
2061
2062 def test_encoded_writes(self):
2063 data = "1234567890"
2064 tests = ("utf-16",
2065 "utf-16-le",
2066 "utf-16-be",
2067 "utf-32",
2068 "utf-32-le",
2069 "utf-32-be")
2070 for encoding in tests:
2071 buf = self.BytesIO()
2072 f = self.TextIOWrapper(buf, encoding=encoding)
2073 # Check if the BOM is written only once (see issue1753).
2074 f.write(data)
2075 f.write(data)
2076 f.seek(0)
2077 self.assertEqual(f.read(), data * 2)
2078 f.seek(0)
2079 self.assertEqual(f.read(), data * 2)
2080 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
2081
2082 def test_unreadable(self):
2083 class UnReadable(self.BytesIO):
2084 def readable(self):
2085 return False
2086 txt = self.TextIOWrapper(UnReadable())
2087 self.assertRaises(IOError, txt.read)
2088
2089 def test_read_one_by_one(self):
2090 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
2091 reads = ""
2092 while True:
2093 c = txt.read(1)
2094 if not c:
2095 break
2096 reads += c
2097 self.assertEqual(reads, "AA\nBB")
2098
2099 def test_readlines(self):
2100 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2101 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2102 txt.seek(0)
2103 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2104 txt.seek(0)
2105 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2106
2107 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2108 def test_read_by_chunk(self):
2109 # make sure "\r\n" straddles 128 char boundary.
2110 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
2111 reads = ""
2112 while True:
2113 c = txt.read(128)
2114 if not c:
2115 break
2116 reads += c
2117 self.assertEqual(reads, "A"*127+"\nB")
2118
2119 def test_issue1395_1(self):
2120 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2121
2122 # read one char at a time
2123 reads = ""
2124 while True:
2125 c = txt.read(1)
2126 if not c:
2127 break
2128 reads += c
2129 self.assertEqual(reads, self.normalized)
2130
2131 def test_issue1395_2(self):
2132 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2133 txt._CHUNK_SIZE = 4
2134
2135 reads = ""
2136 while True:
2137 c = txt.read(4)
2138 if not c:
2139 break
2140 reads += c
2141 self.assertEqual(reads, self.normalized)
2142
2143 def test_issue1395_3(self):
2144 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2145 txt._CHUNK_SIZE = 4
2146
2147 reads = txt.read(4)
2148 reads += txt.read(4)
2149 reads += txt.readline()
2150 reads += txt.readline()
2151 reads += txt.readline()
2152 self.assertEqual(reads, self.normalized)
2153
2154 def test_issue1395_4(self):
2155 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2156 txt._CHUNK_SIZE = 4
2157
2158 reads = txt.read(4)
2159 reads += txt.read()
2160 self.assertEqual(reads, self.normalized)
2161
2162 def test_issue1395_5(self):
2163 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2164 txt._CHUNK_SIZE = 4
2165
2166 reads = txt.read(4)
2167 pos = txt.tell()
2168 txt.seek(0)
2169 txt.seek(pos)
2170 self.assertEqual(txt.read(4), "BBB\n")
2171
2172 def test_issue2282(self):
2173 buffer = self.BytesIO(self.testdata)
2174 txt = self.TextIOWrapper(buffer, encoding="ascii")
2175
2176 self.assertEqual(buffer.seekable(), txt.seekable())
2177
2178 def test_append_bom(self):
2179 # The BOM is not written again when appending to a non-empty file
2180 filename = support.TESTFN
2181 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2182 with self.open(filename, 'w', encoding=charset) as f:
2183 f.write('aaa')
2184 pos = f.tell()
2185 with self.open(filename, 'rb') as f:
2186 self.assertEqual(f.read(), 'aaa'.encode(charset))
2187
2188 with self.open(filename, 'a', encoding=charset) as f:
2189 f.write('xxx')
2190 with self.open(filename, 'rb') as f:
2191 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2192
2193 def test_seek_bom(self):
2194 # Same test, but when seeking manually
2195 filename = support.TESTFN
2196 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2197 with self.open(filename, 'w', encoding=charset) as f:
2198 f.write('aaa')
2199 pos = f.tell()
2200 with self.open(filename, 'r+', encoding=charset) as f:
2201 f.seek(pos)
2202 f.write('zzz')
2203 f.seek(0)
2204 f.write('bbb')
2205 with self.open(filename, 'rb') as f:
2206 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
2207
2208 def test_errors_property(self):
2209 with self.open(support.TESTFN, "w") as f:
2210 self.assertEqual(f.errors, "strict")
2211 with self.open(support.TESTFN, "w", errors="replace") as f:
2212 self.assertEqual(f.errors, "replace")
2213
2214 @unittest.skipUnless(threading, 'Threading required for this test.')
2215 def test_threads_write(self):
2216 # Issue6750: concurrent writes could duplicate data
2217 event = threading.Event()
2218 with self.open(support.TESTFN, "w", buffering=1) as f:
2219 def run(n):
2220 text = "Thread%03d\n" % n
2221 event.wait()
2222 f.write(text)
2223 threads = [threading.Thread(target=lambda n=x: run(n))
2224 for x in range(20)]
2225 for t in threads:
2226 t.start()
2227 time.sleep(0.02)
2228 event.set()
2229 for t in threads:
2230 t.join()
2231 with self.open(support.TESTFN) as f:
2232 content = f.read()
2233 for n in range(20):
2234 self.assertEqual(content.count("Thread%03d\n" % n), 1)
2235
2236 def test_flush_error_on_close(self):
2237 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2238 def bad_flush():
2239 raise IOError()
2240 txt.flush = bad_flush
2241 self.assertRaises(IOError, txt.close) # exception not swallowed
2242
2243 def test_multi_close(self):
2244 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2245 txt.close()
2246 txt.close()
2247 txt.close()
2248 self.assertRaises(ValueError, txt.flush)
2249
2250 def test_readonly_attributes(self):
2251 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2252 buf = self.BytesIO(self.testdata)
2253 with self.assertRaises((AttributeError, TypeError)):
2254 txt.buffer = buf
2255
2256 class CTextIOWrapperTest(TextIOWrapperTest):
2257
2258 def test_initialization(self):
2259 r = self.BytesIO(b"\xc3\xa9\n\n")
2260 b = self.BufferedReader(r, 1000)
2261 t = self.TextIOWrapper(b)
2262 self.assertRaises(TypeError, t.__init__, b, newline=42)
2263 self.assertRaises(ValueError, t.read)
2264 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2265 self.assertRaises(ValueError, t.read)
2266
2267 def test_garbage_collection(self):
2268 # C TextIOWrapper objects are collected, and collecting them flushes
2269 # all data to disk.
2270 # The Python version has __del__, so it ends in gc.garbage instead.
2271 rawio = io.FileIO(support.TESTFN, "wb")
2272 b = self.BufferedWriter(rawio)
2273 t = self.TextIOWrapper(b, encoding="ascii")
2274 t.write("456def")
2275 t.x = t
2276 wr = weakref.ref(t)
2277 del t
2278 support.gc_collect()
2279 self.assertTrue(wr() is None, wr)
2280 with self.open(support.TESTFN, "rb") as f:
2281 self.assertEqual(f.read(), b"456def")
2282
2283 class PyTextIOWrapperTest(TextIOWrapperTest):
2284 pass
2285
2286
2287 class IncrementalNewlineDecoderTest(unittest.TestCase):
2288
2289 def check_newline_decoding_utf8(self, decoder):
2290 # UTF-8 specific tests for a newline decoder
2291 def _check_decode(b, s, **kwargs):
2292 # We exercise getstate() / setstate() as well as decode()
2293 state = decoder.getstate()
2294 self.assertEqual(decoder.decode(b, **kwargs), s)
2295 decoder.setstate(state)
2296 self.assertEqual(decoder.decode(b, **kwargs), s)
2297
2298 _check_decode(b'\xe8\xa2\x88', "\u8888")
2299
2300 _check_decode(b'\xe8', "")
2301 _check_decode(b'\xa2', "")
2302 _check_decode(b'\x88', "\u8888")
2303
2304 _check_decode(b'\xe8', "")
2305 _check_decode(b'\xa2', "")
2306 _check_decode(b'\x88', "\u8888")
2307
2308 _check_decode(b'\xe8', "")
2309 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2310
2311 decoder.reset()
2312 _check_decode(b'\n', "\n")
2313 _check_decode(b'\r', "")
2314 _check_decode(b'', "\n", final=True)
2315 _check_decode(b'\r', "\n", final=True)
2316
2317 _check_decode(b'\r', "")
2318 _check_decode(b'a', "\na")
2319
2320 _check_decode(b'\r\r\n', "\n\n")
2321 _check_decode(b'\r', "")
2322 _check_decode(b'\r', "\n")
2323 _check_decode(b'\na', "\na")
2324
2325 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2326 _check_decode(b'\xe8\xa2\x88', "\u8888")
2327 _check_decode(b'\n', "\n")
2328 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2329 _check_decode(b'\n', "\n")
2330
2331 def check_newline_decoding(self, decoder, encoding):
2332 result = []
2333 if encoding is not None:
2334 encoder = codecs.getincrementalencoder(encoding)()
2335 def _decode_bytewise(s):
2336 # Decode one byte at a time
2337 for b in encoder.encode(s):
2338 result.append(decoder.decode(b))
2339 else:
2340 encoder = None
2341 def _decode_bytewise(s):
2342 # Decode one char at a time
2343 for c in s:
2344 result.append(decoder.decode(c))
2345 self.assertEqual(decoder.newlines, None)
2346 _decode_bytewise("abc\n\r")
2347 self.assertEqual(decoder.newlines, '\n')
2348 _decode_bytewise("\nabc")
2349 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2350 _decode_bytewise("abc\r")
2351 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2352 _decode_bytewise("abc")
2353 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
2354 _decode_bytewise("abc\r")
2355 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
2356 decoder.reset()
2357 input = "abc"
2358 if encoder is not None:
2359 encoder.reset()
2360 input = encoder.encode(input)
2361 self.assertEqual(decoder.decode(input), "abc")
2362 self.assertEqual(decoder.newlines, None)
2363
2364 def test_newline_decoder(self):
2365 encodings = (
2366 # None meaning the IncrementalNewlineDecoder takes unicode input
2367 # rather than bytes input
2368 None, 'utf-8', 'latin-1',
2369 'utf-16', 'utf-16-le', 'utf-16-be',
2370 'utf-32', 'utf-32-le', 'utf-32-be',
2371 )
2372 for enc in encodings:
2373 decoder = enc and codecs.getincrementaldecoder(enc)()
2374 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2375 self.check_newline_decoding(decoder, enc)
2376 decoder = codecs.getincrementaldecoder("utf-8")()
2377 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2378 self.check_newline_decoding_utf8(decoder)
2379
2380 def test_newline_bytes(self):
2381 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2382 def _check(dec):
2383 self.assertEqual(dec.newlines, None)
2384 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2385 self.assertEqual(dec.newlines, None)
2386 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2387 self.assertEqual(dec.newlines, None)
2388 dec = self.IncrementalNewlineDecoder(None, translate=False)
2389 _check(dec)
2390 dec = self.IncrementalNewlineDecoder(None, translate=True)
2391 _check(dec)
2392
2393 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2394 pass
2395
2396 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2397 pass
2398
2399
2400 # XXX Tests for open()
2401
2402 class MiscIOTest(unittest.TestCase):
2403
2404 def tearDown(self):
2405 support.unlink(support.TESTFN)
2406
2407 def test___all__(self):
2408 for name in self.io.__all__:
2409 obj = getattr(self.io, name, None)
2410 self.assertTrue(obj is not None, name)
2411 if name == "open":
2412 continue
2413 elif "error" in name.lower() or name == "UnsupportedOperation":
2414 self.assertTrue(issubclass(obj, Exception), name)
2415 elif not name.startswith("SEEK_"):
2416 self.assertTrue(issubclass(obj, self.IOBase))
2417
2418 def test_attributes(self):
2419 f = self.open(support.TESTFN, "wb", buffering=0)
2420 self.assertEqual(f.mode, "wb")
2421 f.close()
2422
2423 f = self.open(support.TESTFN, "U")
2424 self.assertEqual(f.name, support.TESTFN)
2425 self.assertEqual(f.buffer.name, support.TESTFN)
2426 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2427 self.assertEqual(f.mode, "U")
2428 self.assertEqual(f.buffer.mode, "rb")
2429 self.assertEqual(f.buffer.raw.mode, "rb")
2430 f.close()
2431
2432 f = self.open(support.TESTFN, "w+")
2433 self.assertEqual(f.mode, "w+")
2434 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2435 self.assertEqual(f.buffer.raw.mode, "rb+")
2436
2437 g = self.open(f.fileno(), "wb", closefd=False)
2438 self.assertEqual(g.mode, "wb")
2439 self.assertEqual(g.raw.mode, "wb")
2440 self.assertEqual(g.name, f.fileno())
2441 self.assertEqual(g.raw.name, f.fileno())
2442 f.close()
2443 g.close()
2444
2445 def test_io_after_close(self):
2446 for kwargs in [
2447 {"mode": "w"},
2448 {"mode": "wb"},
2449 {"mode": "w", "buffering": 1},
2450 {"mode": "w", "buffering": 2},
2451 {"mode": "wb", "buffering": 0},
2452 {"mode": "r"},
2453 {"mode": "rb"},
2454 {"mode": "r", "buffering": 1},
2455 {"mode": "r", "buffering": 2},
2456 {"mode": "rb", "buffering": 0},
2457 {"mode": "w+"},
2458 {"mode": "w+b"},
2459 {"mode": "w+", "buffering": 1},
2460 {"mode": "w+", "buffering": 2},
2461 {"mode": "w+b", "buffering": 0},
2462 ]:
2463 f = self.open(support.TESTFN, **kwargs)
2464 f.close()
2465 self.assertRaises(ValueError, f.flush)
2466 self.assertRaises(ValueError, f.fileno)
2467 self.assertRaises(ValueError, f.isatty)
2468 self.assertRaises(ValueError, f.__iter__)
2469 if hasattr(f, "peek"):
2470 self.assertRaises(ValueError, f.peek, 1)
2471 self.assertRaises(ValueError, f.read)
2472 if hasattr(f, "read1"):
2473 self.assertRaises(ValueError, f.read1, 1024)
2474 if hasattr(f, "readall"):
2475 self.assertRaises(ValueError, f.readall)
2476 if hasattr(f, "readinto"):
2477 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2478 self.assertRaises(ValueError, f.readline)
2479 self.assertRaises(ValueError, f.readlines)
2480 self.assertRaises(ValueError, f.seek, 0)
2481 self.assertRaises(ValueError, f.tell)
2482 self.assertRaises(ValueError, f.truncate)
2483 self.assertRaises(ValueError, f.write,
2484 b"" if "b" in kwargs['mode'] else "")
2485 self.assertRaises(ValueError, f.writelines, [])
2486 self.assertRaises(ValueError, next, f)
2487
2488 def test_blockingioerror(self):
2489 # Various BlockingIOError issues
2490 self.assertRaises(TypeError, self.BlockingIOError)
2491 self.assertRaises(TypeError, self.BlockingIOError, 1)
2492 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2493 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2494 b = self.BlockingIOError(1, "")
2495 self.assertEqual(b.characters_written, 0)
2496 class C(unicode):
2497 pass
2498 c = C("")
2499 b = self.BlockingIOError(1, c)
2500 c.b = b
2501 b.c = c
2502 wr = weakref.ref(c)
2503 del c, b
2504 support.gc_collect()
2505 self.assertTrue(wr() is None, wr)
2506
2507 def test_abcs(self):
2508 # Test the visible base classes are ABCs.
2509 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2510 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2511 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2512 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
2513
2514 def _check_abc_inheritance(self, abcmodule):
2515 with self.open(support.TESTFN, "wb", buffering=0) as f:
2516 self.assertIsInstance(f, abcmodule.IOBase)
2517 self.assertIsInstance(f, abcmodule.RawIOBase)
2518 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2519 self.assertNotIsInstance(f, abcmodule.TextIOBase)
2520 with self.open(support.TESTFN, "wb") as f:
2521 self.assertIsInstance(f, abcmodule.IOBase)
2522 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2523 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2524 self.assertNotIsInstance(f, abcmodule.TextIOBase)
2525 with self.open(support.TESTFN, "w") as f:
2526 self.assertIsInstance(f, abcmodule.IOBase)
2527 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2528 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2529 self.assertIsInstance(f, abcmodule.TextIOBase)
2530
2531 def test_abc_inheritance(self):
2532 # Test implementations inherit from their respective ABCs
2533 self._check_abc_inheritance(self)
2534
2535 def test_abc_inheritance_official(self):
2536 # Test implementations inherit from the official ABCs of the
2537 # baseline "io" module.
2538 self._check_abc_inheritance(io)
2539
2540 class CMiscIOTest(MiscIOTest):
2541 io = io
2542
2543 class PyMiscIOTest(MiscIOTest):
2544 io = pyio
2545
2546
2547 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2548 class SignalsTest(unittest.TestCase):
2549
2550 def setUp(self):
2551 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2552
2553 def tearDown(self):
2554 signal.signal(signal.SIGALRM, self.oldalrm)
2555
2556 def alarm_interrupt(self, sig, frame):
2557 1 // 0
2558
2559 @unittest.skipUnless(threading, 'Threading required for this test.')
2560 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2561 """Check that a partial write, when it gets interrupted, properly
2562 invokes the signal handler, and bubbles up the exception raised
2563 in the latter."""
2564 read_results = []
2565 def _read():
2566 s = os.read(r, 1)
2567 read_results.append(s)
2568 t = threading.Thread(target=_read)
2569 t.daemon = True
2570 r, w = os.pipe()
2571 try:
2572 wio = self.io.open(w, **fdopen_kwargs)
2573 t.start()
2574 signal.alarm(1)
2575 # Fill the pipe enough that the write will be blocking.
2576 # It will be interrupted by the timer armed above. Since the
2577 # other thread has read one byte, the low-level write will
2578 # return with a successful (partial) result rather than an EINTR.
2579 # The buffered IO layer must check for pending signal
2580 # handlers, which in this case will invoke alarm_interrupt().
2581 self.assertRaises(ZeroDivisionError,
2582 wio.write, item * (1024 * 1024))
2583 t.join()
2584 # We got one byte, get another one and check that it isn't a
2585 # repeat of the first one.
2586 read_results.append(os.read(r, 1))
2587 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2588 finally:
2589 os.close(w)
2590 os.close(r)
2591 # This is deliberate. If we didn't close the file descriptor
2592 # before closing wio, wio would try to flush its internal
2593 # buffer, and block again.
2594 try:
2595 wio.close()
2596 except IOError as e:
2597 if e.errno != errno.EBADF:
2598 raise
2599
2600 def test_interrupted_write_unbuffered(self):
2601 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2602
2603 def test_interrupted_write_buffered(self):
2604 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2605
2606 def test_interrupted_write_text(self):
2607 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2608
2609 def check_reentrant_write(self, data, **fdopen_kwargs):
2610 def on_alarm(*args):
2611 # Will be called reentrantly from the same thread
2612 wio.write(data)
2613 1/0
2614 signal.signal(signal.SIGALRM, on_alarm)
2615 r, w = os.pipe()
2616 wio = self.io.open(w, **fdopen_kwargs)
2617 try:
2618 signal.alarm(1)
2619 # Either the reentrant call to wio.write() fails with RuntimeError,
2620 # or the signal handler raises ZeroDivisionError.
2621 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2622 while 1:
2623 for i in range(100):
2624 wio.write(data)
2625 wio.flush()
2626 # Make sure the buffer doesn't fill up and block further writes
2627 os.read(r, len(data) * 100)
2628 exc = cm.exception
2629 if isinstance(exc, RuntimeError):
2630 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2631 finally:
2632 wio.close()
2633 os.close(r)
2634
2635 def test_reentrant_write_buffered(self):
2636 self.check_reentrant_write(b"xy", mode="wb")
2637
2638 def test_reentrant_write_text(self):
2639 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2640
2641 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2642 """Check that a buffered read, when it gets interrupted (either
2643 returning a partial result or EINTR), properly invokes the signal
2644 handler and retries if the latter returned successfully."""
2645 r, w = os.pipe()
2646 fdopen_kwargs["closefd"] = False
2647 def alarm_handler(sig, frame):
2648 os.write(w, b"bar")
2649 signal.signal(signal.SIGALRM, alarm_handler)
2650 try:
2651 rio = self.io.open(r, **fdopen_kwargs)
2652 os.write(w, b"foo")
2653 signal.alarm(1)
2654 # Expected behaviour:
2655 # - first raw read() returns partial b"foo"
2656 # - second raw read() returns EINTR
2657 # - third raw read() returns b"bar"
2658 self.assertEqual(decode(rio.read(6)), "foobar")
2659 finally:
2660 rio.close()
2661 os.close(w)
2662 os.close(r)
2663
2664 def test_interrupterd_read_retry_buffered(self):
2665 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2666 mode="rb")
2667
2668 def test_interrupterd_read_retry_text(self):
2669 self.check_interrupted_read_retry(lambda x: x,
2670 mode="r")
2671
2672 @unittest.skipUnless(threading, 'Threading required for this test.')
2673 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2674 """Check that a buffered write, when it gets interrupted (either
2675 returning a partial result or EINTR), properly invokes the signal
2676 handler and retries if the latter returned successfully."""
2677 select = support.import_module("select")
2678 # A quantity that exceeds the buffer size of an anonymous pipe's
2679 # write end.
2680 N = 1024 * 1024
2681 r, w = os.pipe()
2682 fdopen_kwargs["closefd"] = False
2683 # We need a separate thread to read from the pipe and allow the
2684 # write() to finish. This thread is started after the SIGALRM is
2685 # received (forcing a first EINTR in write()).
2686 read_results = []
2687 write_finished = False
2688 def _read():
2689 while not write_finished:
2690 while r in select.select([r], [], [], 1.0)[0]:
2691 s = os.read(r, 1024)
2692 read_results.append(s)
2693 t = threading.Thread(target=_read)
2694 t.daemon = True
2695 def alarm1(sig, frame):
2696 signal.signal(signal.SIGALRM, alarm2)
2697 signal.alarm(1)
2698 def alarm2(sig, frame):
2699 t.start()
2700 signal.signal(signal.SIGALRM, alarm1)
2701 try:
2702 wio = self.io.open(w, **fdopen_kwargs)
2703 signal.alarm(1)
2704 # Expected behaviour:
2705 # - first raw write() is partial (because of the limited pipe buffer
2706 # and the first alarm)
2707 # - second raw write() returns EINTR (because of the second alarm)
2708 # - subsequent write()s are successful (either partial or complete)
2709 self.assertEqual(N, wio.write(item * N))
2710 wio.flush()
2711 write_finished = True
2712 t.join()
2713 self.assertEqual(N, sum(len(x) for x in read_results))
2714 finally:
2715 write_finished = True
2716 os.close(w)
2717 os.close(r)
2718 # This is deliberate. If we didn't close the file descriptor
2719 # before closing wio, wio would try to flush its internal
2720 # buffer, and could block (in case of failure).
2721 try:
2722 wio.close()
2723 except IOError as e:
2724 if e.errno != errno.EBADF:
2725 raise
2726
2727 def test_interrupterd_write_retry_buffered(self):
2728 self.check_interrupted_write_retry(b"x", mode="wb")
2729
2730 def test_interrupterd_write_retry_text(self):
2731 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2732
2733
2734 class CSignalsTest(SignalsTest):
2735 io = io
2736
2737 class PySignalsTest(SignalsTest):
2738 io = pyio
2739
2740 # Handling reentrancy issues would slow down _pyio even more, so the
2741 # tests are disabled.
2742 test_reentrant_write_buffered = None
2743 test_reentrant_write_text = None
2744
2745
2746 def test_main():
2747 tests = (CIOTest, PyIOTest,
2748 CBufferedReaderTest, PyBufferedReaderTest,
2749 CBufferedWriterTest, PyBufferedWriterTest,
2750 CBufferedRWPairTest, PyBufferedRWPairTest,
2751 CBufferedRandomTest, PyBufferedRandomTest,
2752 StatefulIncrementalDecoderTest,
2753 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2754 CTextIOWrapperTest, PyTextIOWrapperTest,
2755 CMiscIOTest, PyMiscIOTest,
2756 CSignalsTest, PySignalsTest,
2757 )
2758
2759 # Put the namespaces of the IO module we are testing and some useful mock
2760 # classes in the __dict__ of each test.
2761 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2762 MockNonBlockWriterIO, MockRawIOWithoutRead)
2763 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2764 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2765 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2766 globs = globals()
2767 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2768 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2769 # Avoid turning open into a bound method.
2770 py_io_ns["open"] = pyio.OpenWrapper
2771 for test in tests:
2772 if test.__name__.startswith("C"):
2773 for name, obj in c_io_ns.items():
2774 setattr(test, name, obj)
2775 elif test.__name__.startswith("Py"):
2776 for name, obj in py_io_ns.items():
2777 setattr(test, name, obj)
2778
2779 support.run_unittest(*tests)
2780
2781 if __name__ == "__main__":
2782 test_main()