1 """Unit tests for the io module."""
3 # Tests of io are scattered over the test suite:
4 # * test_bufio - tests file buffering
5 # * test_memoryio - tests BytesIO and StringIO
6 # * test_fileio - tests FileIO
7 # * test_file - tests the file interface
8 # * test_io - tests everything else in the io module
9 # * test_univnewlines - tests universal newline support
10 # * test_largefile - tests operations on a file greater than 2**32 bytes
11 # (only enabled with -ulargefile)
13 ################################################################################
14 # ATTENTION TEST WRITERS!!!
15 ################################################################################
16 # When writing tests for io, it's important to test both the C and Python
17 # implementations. This is usually done by writing a base test that refers to
18 # the type it is testing as a attribute. Then it provides custom subclasses to
19 # test both implementations. This file has lots of examples.
20 ################################################################################
22 from __future__
import print_function
23 from __future__
import unicode_literals
35 from itertools
import cycle
, count
36 from collections
import deque
37 from test
import test_support
as support
40 import io
# C implementation of io
41 import _pyio
as pyio
# Python implementation of io
48 bytes
= support
.py3k_bytes
50 def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io
.open(__file__
, "r", encoding
="latin1") as f
:
56 class MockRawIOWithoutRead
:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
60 def __init__(self
, read_stack
=()):
61 self
._read
_stack
= list(read_stack
)
62 self
._write
_stack
= []
64 self
._extraneous
_reads
= 0
67 self
._write
_stack
.append(bytes(b
))
82 def seek(self
, pos
, whence
):
83 return 0 # wrong but we gotta return something
86 return 0 # same comment as above
88 def readinto(self
, buf
):
92 data
= self
._read
_stack
[0]
94 self
._extraneous
_reads
+= 1
97 del self
._read
_stack
[0]
100 if len(data
) <= max_len
:
101 del self
._read
_stack
[0]
105 buf
[:] = data
[:max_len
]
106 self
._read
_stack
[0] = data
[max_len
:]
109 def truncate(self
, pos
=None):
112 class CMockRawIOWithoutRead(MockRawIOWithoutRead
, io
.RawIOBase
):
115 class PyMockRawIOWithoutRead(MockRawIOWithoutRead
, pyio
.RawIOBase
):
119 class MockRawIO(MockRawIOWithoutRead
):
121 def read(self
, n
=None):
124 return self
._read
_stack
.pop(0)
126 self
._extraneous
_reads
+= 1
129 class CMockRawIO(MockRawIO
, io
.RawIOBase
):
132 class PyMockRawIO(MockRawIO
, pyio
.RawIOBase
):
136 class MisbehavedRawIO(MockRawIO
):
138 return MockRawIO
.write(self
, b
) * 2
140 def read(self
, n
=None):
141 return MockRawIO
.read(self
, n
) * 2
143 def seek(self
, pos
, whence
):
149 def readinto(self
, buf
):
150 MockRawIO
.readinto(self
, buf
)
153 class CMisbehavedRawIO(MisbehavedRawIO
, io
.RawIOBase
):
156 class PyMisbehavedRawIO(MisbehavedRawIO
, pyio
.RawIOBase
):
160 class CloseFailureIO(MockRawIO
):
168 class CCloseFailureIO(CloseFailureIO
, io
.RawIOBase
):
171 class PyCloseFailureIO(CloseFailureIO
, pyio
.RawIOBase
):
177 def __init__(self
, data
):
178 self
.read_history
= []
179 super(MockFileIO
, self
).__init
__(data
)
181 def read(self
, n
=None):
182 res
= super(MockFileIO
, self
).read(n
)
183 self
.read_history
.append(None if res
is None else len(res
))
186 def readinto(self
, b
):
187 res
= super(MockFileIO
, self
).readinto(b
)
188 self
.read_history
.append(res
)
191 class CMockFileIO(MockFileIO
, io
.BytesIO
):
194 class PyMockFileIO(MockFileIO
, pyio
.BytesIO
):
198 class MockNonBlockWriterIO
:
201 self
._write
_stack
= []
202 self
._blocker
_char
= None
204 def pop_written(self
):
205 s
= b
"".join(self
._write
_stack
)
206 self
._write
_stack
[:] = []
209 def block_on(self
, char
):
210 """Block when a given char is encountered."""
211 self
._blocker
_char
= char
225 if self
._blocker
_char
:
227 n
= b
.index(self
._blocker
_char
)
231 self
._blocker
_char
= None
232 self
._write
_stack
.append(b
[:n
])
233 raise self
.BlockingIOError(0, "test blocking", n
)
234 self
._write
_stack
.append(b
)
237 class CMockNonBlockWriterIO(MockNonBlockWriterIO
, io
.RawIOBase
):
238 BlockingIOError
= io
.BlockingIOError
240 class PyMockNonBlockWriterIO(MockNonBlockWriterIO
, pyio
.RawIOBase
):
241 BlockingIOError
= pyio
.BlockingIOError
244 class IOTest(unittest
.TestCase
):
247 support
.unlink(support
.TESTFN
)
250 support
.unlink(support
.TESTFN
)
252 def write_ops(self
, f
):
253 self
.assertEqual(f
.write(b
"blah."), 5)
255 self
.assertEqual(f
.tell(), 5)
258 self
.assertEqual(f
.write(b
"blah."), 5)
259 self
.assertEqual(f
.seek(0), 0)
260 self
.assertEqual(f
.write(b
"Hello."), 6)
261 self
.assertEqual(f
.tell(), 6)
262 self
.assertEqual(f
.seek(-1, 1), 5)
263 self
.assertEqual(f
.tell(), 5)
264 self
.assertEqual(f
.write(bytearray(b
" world\n\n\n")), 9)
265 self
.assertEqual(f
.seek(0), 0)
266 self
.assertEqual(f
.write(b
"h"), 1)
267 self
.assertEqual(f
.seek(-1, 2), 13)
268 self
.assertEqual(f
.tell(), 13)
270 self
.assertEqual(f
.truncate(12), 12)
271 self
.assertEqual(f
.tell(), 13)
272 self
.assertRaises(TypeError, f
.seek
, 0.0)
274 def read_ops(self
, f
, buffered
=False):
276 self
.assertEqual(data
, b
"hello")
277 data
= bytearray(data
)
278 self
.assertEqual(f
.readinto(data
), 5)
279 self
.assertEqual(data
, b
" worl")
280 self
.assertEqual(f
.readinto(data
), 2)
281 self
.assertEqual(len(data
), 5)
282 self
.assertEqual(data
[:2], b
"d\n")
283 self
.assertEqual(f
.seek(0), 0)
284 self
.assertEqual(f
.read(20), b
"hello world\n")
285 self
.assertEqual(f
.read(1), b
"")
286 self
.assertEqual(f
.readinto(bytearray(b
"x")), 0)
287 self
.assertEqual(f
.seek(-6, 2), 6)
288 self
.assertEqual(f
.read(5), b
"world")
289 self
.assertEqual(f
.read(0), b
"")
290 self
.assertEqual(f
.readinto(bytearray()), 0)
291 self
.assertEqual(f
.seek(-6, 1), 5)
292 self
.assertEqual(f
.read(5), b
" worl")
293 self
.assertEqual(f
.tell(), 10)
294 self
.assertRaises(TypeError, f
.seek
, 0.0)
297 self
.assertEqual(f
.read(), b
"hello world\n")
299 self
.assertEqual(f
.read(), b
"world\n")
300 self
.assertEqual(f
.read(), b
"")
304 def large_file_ops(self
, f
):
307 self
.assertEqual(f
.seek(self
.LARGE
), self
.LARGE
)
308 self
.assertEqual(f
.tell(), self
.LARGE
)
309 self
.assertEqual(f
.write(b
"xxx"), 3)
310 self
.assertEqual(f
.tell(), self
.LARGE
+ 3)
311 self
.assertEqual(f
.seek(-1, 1), self
.LARGE
+ 2)
312 self
.assertEqual(f
.truncate(), self
.LARGE
+ 2)
313 self
.assertEqual(f
.tell(), self
.LARGE
+ 2)
314 self
.assertEqual(f
.seek(0, 2), self
.LARGE
+ 2)
315 self
.assertEqual(f
.truncate(self
.LARGE
+ 1), self
.LARGE
+ 1)
316 self
.assertEqual(f
.tell(), self
.LARGE
+ 2)
317 self
.assertEqual(f
.seek(0, 2), self
.LARGE
+ 1)
318 self
.assertEqual(f
.seek(-1, 2), self
.LARGE
)
319 self
.assertEqual(f
.read(2), b
"x")
321 def test_invalid_operations(self
):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode
in ("w", "wb"):
324 with self
.open(support
.TESTFN
, mode
) as fp
:
325 self
.assertRaises(IOError, fp
.read
)
326 self
.assertRaises(IOError, fp
.readline
)
327 with self
.open(support
.TESTFN
, "rb") as fp
:
328 self
.assertRaises(IOError, fp
.write
, b
"blah")
329 self
.assertRaises(IOError, fp
.writelines
, [b
"blah\n"])
330 with self
.open(support
.TESTFN
, "r") as fp
:
331 self
.assertRaises(IOError, fp
.write
, "blah")
332 self
.assertRaises(IOError, fp
.writelines
, ["blah\n"])
334 def test_raw_file_io(self
):
335 with self
.open(support
.TESTFN
, "wb", buffering
=0) as f
:
336 self
.assertEqual(f
.readable(), False)
337 self
.assertEqual(f
.writable(), True)
338 self
.assertEqual(f
.seekable(), True)
340 with self
.open(support
.TESTFN
, "rb", buffering
=0) as f
:
341 self
.assertEqual(f
.readable(), True)
342 self
.assertEqual(f
.writable(), False)
343 self
.assertEqual(f
.seekable(), True)
346 def test_buffered_file_io(self
):
347 with self
.open(support
.TESTFN
, "wb") as f
:
348 self
.assertEqual(f
.readable(), False)
349 self
.assertEqual(f
.writable(), True)
350 self
.assertEqual(f
.seekable(), True)
352 with self
.open(support
.TESTFN
, "rb") as f
:
353 self
.assertEqual(f
.readable(), True)
354 self
.assertEqual(f
.writable(), False)
355 self
.assertEqual(f
.seekable(), True)
356 self
.read_ops(f
, True)
358 def test_readline(self
):
359 with self
.open(support
.TESTFN
, "wb") as f
:
360 f
.write(b
"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self
.open(support
.TESTFN
, "rb") as f
:
362 self
.assertEqual(f
.readline(), b
"abc\n")
363 self
.assertEqual(f
.readline(10), b
"def\n")
364 self
.assertEqual(f
.readline(2), b
"xy")
365 self
.assertEqual(f
.readline(4), b
"zzy\n")
366 self
.assertEqual(f
.readline(), b
"foo\x00bar\n")
367 self
.assertEqual(f
.readline(None), b
"another line")
368 self
.assertRaises(TypeError, f
.readline
, 5.3)
369 with self
.open(support
.TESTFN
, "r") as f
:
370 self
.assertRaises(TypeError, f
.readline
, 5.3)
372 def test_raw_bytes_io(self
):
376 self
.assertEqual(data
, b
"hello world\n")
377 f
= self
.BytesIO(data
)
378 self
.read_ops(f
, True)
380 def test_large_file_ops(self
):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
384 if sys
.platform
[:3] == 'win' or sys
.platform
== 'darwin':
385 if not support
.is_resource_enabled("largefile"):
386 print("\nTesting large file ops skipped on %s." % sys
.platform
,
388 print("It requires %d bytes and a long time." % self
.LARGE
,
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
393 with self
.open(support
.TESTFN
, "w+b", 0) as f
:
394 self
.large_file_ops(f
)
395 with self
.open(support
.TESTFN
, "w+b") as f
:
396 self
.large_file_ops(f
)
398 def test_with_open(self
):
399 for bufsize
in (0, 1, 100):
401 with self
.open(support
.TESTFN
, "wb", bufsize
) as f
:
403 self
.assertEqual(f
.closed
, True)
406 with self
.open(support
.TESTFN
, "wb", bufsize
) as f
:
408 except ZeroDivisionError:
409 self
.assertEqual(f
.closed
, True)
411 self
.fail("1 // 0 didn't raise an exception")
414 def test_append_mode_tell(self
):
415 with self
.open(support
.TESTFN
, "wb") as f
:
417 with self
.open(support
.TESTFN
, "ab", buffering
=0) as f
:
418 self
.assertEqual(f
.tell(), 3)
419 with self
.open(support
.TESTFN
, "ab") as f
:
420 self
.assertEqual(f
.tell(), 3)
421 with self
.open(support
.TESTFN
, "a") as f
:
422 self
.assertTrue(f
.tell() > 0)
424 def test_destructor(self
):
426 class MyFileIO(self
.FileIO
):
430 f
= super(MyFileIO
, self
).__del
__
431 except AttributeError:
437 super(MyFileIO
, self
).close()
440 super(MyFileIO
, self
).flush()
441 f
= MyFileIO(support
.TESTFN
, "wb")
445 self
.assertEqual(record
, [1, 2, 3])
446 with self
.open(support
.TESTFN
, "rb") as f
:
447 self
.assertEqual(f
.read(), b
"xxx")
449 def _check_base_destructor(self
, base
):
453 # This exercises the availability of attributes on object
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
461 record
.append(self
.on_del
)
463 f
= super(MyIO
, self
).__del
__
464 except AttributeError:
469 record
.append(self
.on_close
)
470 super(MyIO
, self
).close()
472 record
.append(self
.on_flush
)
473 super(MyIO
, self
).flush()
477 self
.assertEqual(record
, [1, 2, 3])
479 def test_IOBase_destructor(self
):
480 self
._check
_base
_destructor
(self
.IOBase
)
482 def test_RawIOBase_destructor(self
):
483 self
._check
_base
_destructor
(self
.RawIOBase
)
485 def test_BufferedIOBase_destructor(self
):
486 self
._check
_base
_destructor
(self
.BufferedIOBase
)
488 def test_TextIOBase_destructor(self
):
489 self
._check
_base
_destructor
(self
.TextIOBase
)
491 def test_close_flushes(self
):
492 with self
.open(support
.TESTFN
, "wb") as f
:
494 with self
.open(support
.TESTFN
, "rb") as f
:
495 self
.assertEqual(f
.read(), b
"xxx")
497 def test_array_writes(self
):
498 a
= array
.array(b
'i', range(10))
499 n
= len(a
.tostring())
500 with self
.open(support
.TESTFN
, "wb", 0) as f
:
501 self
.assertEqual(f
.write(a
), n
)
502 with self
.open(support
.TESTFN
, "wb") as f
:
503 self
.assertEqual(f
.write(a
), n
)
505 def test_closefd(self
):
506 self
.assertRaises(ValueError, self
.open, support
.TESTFN
, 'w',
509 def test_read_closed(self
):
510 with self
.open(support
.TESTFN
, "w") as f
:
512 with self
.open(support
.TESTFN
, "r") as f
:
513 file = self
.open(f
.fileno(), "r", closefd
=False)
514 self
.assertEqual(file.read(), "egg\n")
517 self
.assertRaises(ValueError, file.read
)
519 def test_no_closefd_with_filename(self
):
520 # can't use closefd in combination with a file name
521 self
.assertRaises(ValueError, self
.open, support
.TESTFN
, "r", closefd
=False)
523 def test_closefd_attr(self
):
524 with self
.open(support
.TESTFN
, "wb") as f
:
526 with self
.open(support
.TESTFN
, "r") as f
:
527 self
.assertEqual(f
.buffer.raw
.closefd
, True)
528 file = self
.open(f
.fileno(), "r", closefd
=False)
529 self
.assertEqual(file.buffer.raw
.closefd
, False)
531 def test_garbage_collection(self
):
532 # FileIO objects are collected, and collecting them flushes
534 f
= self
.FileIO(support
.TESTFN
, "wb")
540 self
.assertTrue(wr() is None, wr
)
541 with self
.open(support
.TESTFN
, "rb") as f
:
542 self
.assertEqual(f
.read(), b
"abcxxx")
544 def test_unbounded_file(self
):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
547 if not os
.path
.exists(zero
):
548 self
.skipTest("{0} does not exist".format(zero
))
549 if sys
.maxsize
> 0x7FFFFFFF:
550 self
.skipTest("test can only run in a 32-bit address space")
551 if support
.real_max_memuse
< support
._2G
:
552 self
.skipTest("test requires at least 2GB of memory")
553 with self
.open(zero
, "rb", buffering
=0) as f
:
554 self
.assertRaises(OverflowError, f
.read
)
555 with self
.open(zero
, "rb") as f
:
556 self
.assertRaises(OverflowError, f
.read
)
557 with self
.open(zero
, "r") as f
:
558 self
.assertRaises(OverflowError, f
.read
)
560 def test_flush_error_on_close(self
):
561 f
= self
.open(support
.TESTFN
, "wb", buffering
=0)
565 self
.assertRaises(IOError, f
.close
) # exception not swallowed
567 def test_multi_close(self
):
568 f
= self
.open(support
.TESTFN
, "wb", buffering
=0)
572 self
.assertRaises(ValueError, f
.flush
)
574 def test_RawIOBase_read(self
):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio
= self
.MockRawIOWithoutRead((b
"abc", b
"d", None, b
"efg", None))
578 self
.assertEqual(rawio
.read(2), b
"ab")
579 self
.assertEqual(rawio
.read(2), b
"c")
580 self
.assertEqual(rawio
.read(2), b
"d")
581 self
.assertEqual(rawio
.read(2), None)
582 self
.assertEqual(rawio
.read(2), b
"ef")
583 self
.assertEqual(rawio
.read(2), b
"g")
584 self
.assertEqual(rawio
.read(2), None)
585 self
.assertEqual(rawio
.read(2), b
"")
587 class CIOTest(IOTest
):
590 class PyIOTest(IOTest
):
591 test_array_writes
= unittest
.skip(
592 "len(array.array) returns number of elements rather than bytelength"
593 )(IOTest
.test_array_writes
)
596 class CommonBufferedTests
:
597 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
599 def test_detach(self
):
600 raw
= self
.MockRawIO()
602 self
.assertIs(buf
.detach(), raw
)
603 self
.assertRaises(ValueError, buf
.detach
)
605 def test_fileno(self
):
606 rawio
= self
.MockRawIO()
607 bufio
= self
.tp(rawio
)
609 self
.assertEqual(42, bufio
.fileno())
611 def test_no_fileno(self
):
612 # XXX will we always have fileno() function? If so, kill
613 # this test. Else, write it.
616 def test_invalid_args(self
):
617 rawio
= self
.MockRawIO()
618 bufio
= self
.tp(rawio
)
620 self
.assertRaises(ValueError, bufio
.seek
, 0, -1)
621 self
.assertRaises(ValueError, bufio
.seek
, 0, 3)
623 def test_override_destructor(self
):
626 class MyBufferedIO(tp
):
630 f
= super(MyBufferedIO
, self
).__del
__
631 except AttributeError:
637 super(MyBufferedIO
, self
).close()
640 super(MyBufferedIO
, self
).flush()
641 rawio
= self
.MockRawIO()
642 bufio
= MyBufferedIO(rawio
)
643 writable
= bufio
.writable()
647 self
.assertEqual(record
, [1, 2, 3])
649 self
.assertEqual(record
, [1, 2])
651 def test_context_manager(self
):
652 # Test usability as a context manager
653 rawio
= self
.MockRawIO()
654 bufio
= self
.tp(rawio
)
659 # bufio should now be closed, and using it a second time should raise
661 self
.assertRaises(ValueError, _with
)
663 def test_error_through_destructor(self
):
664 # Test that the exception state is not modified by a destructor,
665 # even if close() fails.
666 rawio
= self
.CloseFailureIO()
669 with support
.captured_output("stderr") as s
:
670 self
.assertRaises(AttributeError, f
)
671 s
= s
.getvalue().strip()
673 # The destructor *may* have printed an unraisable error, check it
674 self
.assertEqual(len(s
.splitlines()), 1)
675 self
.assertTrue(s
.startswith("Exception IOError: "), s
)
676 self
.assertTrue(s
.endswith(" ignored"), s
)
679 raw
= self
.MockRawIO()
681 clsname
= "%s.%s" % (self
.tp
.__module
__, self
.tp
.__name
__)
682 self
.assertEqual(repr(b
), "<%s>" % clsname
)
684 self
.assertEqual(repr(b
), "<%s name=u'dummy'>" % clsname
)
686 self
.assertEqual(repr(b
), "<%s name='dummy'>" % clsname
)
688 def test_flush_error_on_close(self
):
689 raw
= self
.MockRawIO()
692 raw
.flush
= bad_flush
694 self
.assertRaises(IOError, b
.close
) # exception not swallowed
696 def test_multi_close(self
):
697 raw
= self
.MockRawIO()
702 self
.assertRaises(ValueError, b
.flush
)
704 def test_readonly_attributes(self
):
705 raw
= self
.MockRawIO()
708 with self
.assertRaises((AttributeError, TypeError)):
712 class BufferedReaderTest(unittest
.TestCase
, CommonBufferedTests
):
715 def test_constructor(self
):
716 rawio
= self
.MockRawIO([b
"abc"])
717 bufio
= self
.tp(rawio
)
718 bufio
.__init
__(rawio
)
719 bufio
.__init
__(rawio
, buffer_size
=1024)
720 bufio
.__init
__(rawio
, buffer_size
=16)
721 self
.assertEqual(b
"abc", bufio
.read())
722 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
723 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
724 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
725 rawio
= self
.MockRawIO([b
"abc"])
726 bufio
.__init
__(rawio
)
727 self
.assertEqual(b
"abc", bufio
.read())
730 for arg
in (None, 7):
731 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
732 bufio
= self
.tp(rawio
)
733 self
.assertEqual(b
"abcdefg", bufio
.read(arg
))
735 self
.assertRaises(ValueError, bufio
.read
, -2)
737 def test_read1(self
):
738 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
739 bufio
= self
.tp(rawio
)
740 self
.assertEqual(b
"a", bufio
.read(1))
741 self
.assertEqual(b
"b", bufio
.read1(1))
742 self
.assertEqual(rawio
._reads
, 1)
743 self
.assertEqual(b
"c", bufio
.read1(100))
744 self
.assertEqual(rawio
._reads
, 1)
745 self
.assertEqual(b
"d", bufio
.read1(100))
746 self
.assertEqual(rawio
._reads
, 2)
747 self
.assertEqual(b
"efg", bufio
.read1(100))
748 self
.assertEqual(rawio
._reads
, 3)
749 self
.assertEqual(b
"", bufio
.read1(100))
750 self
.assertEqual(rawio
._reads
, 4)
752 self
.assertRaises(ValueError, bufio
.read1
, -1)
754 def test_readinto(self
):
755 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
756 bufio
= self
.tp(rawio
)
758 self
.assertEqual(bufio
.readinto(b
), 2)
759 self
.assertEqual(b
, b
"ab")
760 self
.assertEqual(bufio
.readinto(b
), 2)
761 self
.assertEqual(b
, b
"cd")
762 self
.assertEqual(bufio
.readinto(b
), 2)
763 self
.assertEqual(b
, b
"ef")
764 self
.assertEqual(bufio
.readinto(b
), 1)
765 self
.assertEqual(b
, b
"gf")
766 self
.assertEqual(bufio
.readinto(b
), 0)
767 self
.assertEqual(b
, b
"gf")
769 def test_readlines(self
):
771 rawio
= self
.MockRawIO((b
"abc\n", b
"d\n", b
"ef"))
772 return self
.tp(rawio
)
773 self
.assertEqual(bufio().readlines(), [b
"abc\n", b
"d\n", b
"ef"])
774 self
.assertEqual(bufio().readlines(5), [b
"abc\n", b
"d\n"])
775 self
.assertEqual(bufio().readlines(None), [b
"abc\n", b
"d\n", b
"ef"])
777 def test_buffering(self
):
782 [ 100, [ 3, 1, 4, 8 ], [ dlen
, 0 ] ],
783 [ 100, [ 3, 3, 3], [ dlen
] ],
784 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
787 for bufsize
, buf_read_sizes
, raw_read_sizes
in tests
:
788 rawio
= self
.MockFileIO(data
)
789 bufio
= self
.tp(rawio
, buffer_size
=bufsize
)
791 for nbytes
in buf_read_sizes
:
792 self
.assertEqual(bufio
.read(nbytes
), data
[pos
:pos
+nbytes
])
794 # this is mildly implementation-dependent
795 self
.assertEqual(rawio
.read_history
, raw_read_sizes
)
797 def test_read_non_blocking(self
):
798 # Inject some None's in there to simulate EWOULDBLOCK
799 rawio
= self
.MockRawIO((b
"abc", b
"d", None, b
"efg", None, None, None))
800 bufio
= self
.tp(rawio
)
801 self
.assertEqual(b
"abcd", bufio
.read(6))
802 self
.assertEqual(b
"e", bufio
.read(1))
803 self
.assertEqual(b
"fg", bufio
.read())
804 self
.assertEqual(b
"", bufio
.peek(1))
805 self
.assertIsNone(bufio
.read())
806 self
.assertEqual(b
"", bufio
.read())
808 rawio
= self
.MockRawIO((b
"a", None, None))
809 self
.assertEqual(b
"a", rawio
.readall())
810 self
.assertIsNone(rawio
.readall())
812 def test_read_past_eof(self
):
813 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
814 bufio
= self
.tp(rawio
)
816 self
.assertEqual(b
"abcdefg", bufio
.read(9000))
818 def test_read_all(self
):
819 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
820 bufio
= self
.tp(rawio
)
822 self
.assertEqual(b
"abcdefg", bufio
.read())
824 @unittest.skipUnless(threading
, 'Threading required for this test.')
825 @support.requires_resource('cpu')
826 def test_threads(self
):
828 # Write out many bytes with exactly the same number of 0's,
829 # 1's... 255's. This will help us check that concurrent reading
830 # doesn't duplicate or forget contents.
832 l
= list(range(256)) * N
834 s
= bytes(bytearray(l
))
835 with self
.open(support
.TESTFN
, "wb") as f
:
837 with self
.open(support
.TESTFN
, self
.read_mode
, buffering
=0) as raw
:
838 bufio
= self
.tp(raw
, 8)
843 # Intra-buffer read then buffer-flushing read
844 for n
in cycle([1, 19]):
848 # list.append() is atomic
850 except Exception as e
:
853 threads
= [threading
.Thread(target
=f
) for x
in range(20)]
856 time
.sleep(0.02) # yield
859 self
.assertFalse(errors
,
860 "the following exceptions were caught: %r" % errors
)
861 s
= b
''.join(results
)
863 c
= bytes(bytearray([i
]))
864 self
.assertEqual(s
.count(c
), N
)
866 support
.unlink(support
.TESTFN
)
868 def test_misbehaved_io(self
):
869 rawio
= self
.MisbehavedRawIO((b
"abc", b
"d", b
"efg"))
870 bufio
= self
.tp(rawio
)
871 self
.assertRaises(IOError, bufio
.seek
, 0)
872 self
.assertRaises(IOError, bufio
.tell
)
874 def test_no_extraneous_read(self
):
875 # Issue #9550; when the raw IO object has satisfied the read request,
876 # we should not issue any additional reads, otherwise it may block
879 for n
in (2, bufsize
- 1, bufsize
, bufsize
+ 1, bufsize
* 2):
880 rawio
= self
.MockRawIO([b
"x" * n
])
881 bufio
= self
.tp(rawio
, bufsize
)
882 self
.assertEqual(bufio
.read(n
), b
"x" * n
)
883 # Simple case: one raw read is enough to satisfy the request.
884 self
.assertEqual(rawio
._extraneous
_reads
, 0,
885 "failed for {}: {} != 0".format(n
, rawio
._extraneous
_reads
))
886 # A more complex case where two raw reads are needed to satisfy
888 rawio
= self
.MockRawIO([b
"x" * (n
- 1), b
"x"])
889 bufio
= self
.tp(rawio
, bufsize
)
890 self
.assertEqual(bufio
.read(n
), b
"x" * n
)
891 self
.assertEqual(rawio
._extraneous
_reads
, 0,
892 "failed for {}: {} != 0".format(n
, rawio
._extraneous
_reads
))
895 class CBufferedReaderTest(BufferedReaderTest
):
896 tp
= io
.BufferedReader
898 def test_constructor(self
):
899 BufferedReaderTest
.test_constructor(self
)
900 # The allocation can succeed on 32-bit builds, e.g. with more
901 # than 2GB RAM and a 64-bit kernel.
902 if sys
.maxsize
> 0x7FFFFFFF:
903 rawio
= self
.MockRawIO()
904 bufio
= self
.tp(rawio
)
905 self
.assertRaises((OverflowError, MemoryError, ValueError),
906 bufio
.__init
__, rawio
, sys
.maxsize
)
908 def test_initialization(self
):
909 rawio
= self
.MockRawIO([b
"abc"])
910 bufio
= self
.tp(rawio
)
911 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
912 self
.assertRaises(ValueError, bufio
.read
)
913 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
914 self
.assertRaises(ValueError, bufio
.read
)
915 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
916 self
.assertRaises(ValueError, bufio
.read
)
918 def test_misbehaved_io_read(self
):
919 rawio
= self
.MisbehavedRawIO((b
"abc", b
"d", b
"efg"))
920 bufio
= self
.tp(rawio
)
921 # _pyio.BufferedReader seems to implement reading different, so that
922 # checking this is not so easy.
923 self
.assertRaises(IOError, bufio
.read
, 10)
925 def test_garbage_collection(self
):
926 # C BufferedReader objects are collected.
927 # The Python version has __del__, so it ends into gc.garbage instead
928 rawio
= self
.FileIO(support
.TESTFN
, "w+b")
934 self
.assertTrue(wr() is None, wr
)
936 class PyBufferedReaderTest(BufferedReaderTest
):
937 tp
= pyio
.BufferedReader
940 class BufferedWriterTest(unittest
.TestCase
, CommonBufferedTests
):
943 def test_constructor(self
):
944 rawio
= self
.MockRawIO()
945 bufio
= self
.tp(rawio
)
946 bufio
.__init
__(rawio
)
947 bufio
.__init
__(rawio
, buffer_size
=1024)
948 bufio
.__init
__(rawio
, buffer_size
=16)
949 self
.assertEqual(3, bufio
.write(b
"abc"))
951 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
952 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
953 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
954 bufio
.__init
__(rawio
)
955 self
.assertEqual(3, bufio
.write(b
"ghi"))
957 self
.assertEqual(b
"".join(rawio
._write
_stack
), b
"abcghi")
959 def test_detach_flush(self
):
960 raw
= self
.MockRawIO()
963 self
.assertFalse(raw
._write
_stack
)
965 self
.assertEqual(raw
._write
_stack
, [b
"howdy!"])
967 def test_write(self
):
968 # Write to the buffered IO but don't overflow the buffer.
969 writer
= self
.MockRawIO()
970 bufio
= self
.tp(writer
, 8)
972 self
.assertFalse(writer
._write
_stack
)
974 def test_write_overflow(self
):
975 writer
= self
.MockRawIO()
976 bufio
= self
.tp(writer
, 8)
977 contents
= b
"abcdefghijklmnop"
978 for n
in range(0, len(contents
), 3):
979 bufio
.write(contents
[n
:n
+3])
980 flushed
= b
"".join(writer
._write
_stack
)
981 # At least (total - 8) bytes were implicitly flushed, perhaps more
982 # depending on the implementation.
983 self
.assertTrue(flushed
.startswith(contents
[:-8]), flushed
)
985 def check_writes(self
, intermediate_func
):
986 # Lots of writes, test the flushed output is as expected.
987 contents
= bytes(range(256)) * 1000
989 writer
= self
.MockRawIO()
990 bufio
= self
.tp(writer
, 13)
991 # Generator of write sizes: repeat each N 15 times then proceed to N+1
993 for size
in count(1):
997 while n
< len(contents
):
998 size
= min(next(sizes
), len(contents
) - n
)
999 self
.assertEqual(bufio
.write(contents
[n
:n
+size
]), size
)
1000 intermediate_func(bufio
)
1003 self
.assertEqual(contents
,
1004 b
"".join(writer
._write
_stack
))
1006 def test_writes(self
):
1007 self
.check_writes(lambda bufio
: None)
1009 def test_writes_and_flushes(self
):
1010 self
.check_writes(lambda bufio
: bufio
.flush())
1012 def test_writes_and_seeks(self
):
1013 def _seekabs(bufio
):
1015 bufio
.seek(pos
+ 1, 0)
1016 bufio
.seek(pos
- 1, 0)
1018 self
.check_writes(_seekabs
)
1019 def _seekrel(bufio
):
1020 pos
= bufio
.seek(0, 1)
1024 self
.check_writes(_seekrel
)
1026 def test_writes_and_truncates(self
):
1027 self
.check_writes(lambda bufio
: bufio
.truncate(bufio
.tell()))
1029 def test_write_non_blocking(self
):
1030 raw
= self
.MockNonBlockWriterIO()
1031 bufio
= self
.tp(raw
, 8)
1033 self
.assertEqual(bufio
.write(b
"abcd"), 4)
1034 self
.assertEqual(bufio
.write(b
"efghi"), 5)
1035 # 1 byte will be written, the rest will be buffered
1037 self
.assertEqual(bufio
.write(b
"jklmn"), 5)
1039 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1042 bufio
.write(b
"opqrwxyz0123456789")
1043 except self
.BlockingIOError
as e
:
1044 written
= e
.characters_written
1046 self
.fail("BlockingIOError should have been raised")
1047 self
.assertEqual(written
, 16)
1048 self
.assertEqual(raw
.pop_written(),
1049 b
"abcdefghijklmnopqrwxyz")
1051 self
.assertEqual(bufio
.write(b
"ABCDEFGHI"), 9)
1052 s
= raw
.pop_written()
1053 # Previously buffered bytes were flushed
1054 self
.assertTrue(s
.startswith(b
"01234567A"), s
)
1056 def test_write_and_rewind(self
):
1058 bufio
= self
.tp(raw
, 4)
1059 self
.assertEqual(bufio
.write(b
"abcdef"), 6)
1060 self
.assertEqual(bufio
.tell(), 6)
1062 self
.assertEqual(bufio
.write(b
"XY"), 2)
1064 self
.assertEqual(raw
.getvalue(), b
"XYcdef")
1065 self
.assertEqual(bufio
.write(b
"123456"), 6)
1067 self
.assertEqual(raw
.getvalue(), b
"XYcdef123456")
1069 def test_flush(self
):
1070 writer
= self
.MockRawIO()
1071 bufio
= self
.tp(writer
, 8)
1074 self
.assertEqual(b
"abc", writer
._write
_stack
[0])
1076 def test_destructor(self
):
1077 writer
= self
.MockRawIO()
1078 bufio
= self
.tp(writer
, 8)
1081 support
.gc_collect()
1082 self
.assertEqual(b
"abc", writer
._write
_stack
[0])
1084 def test_truncate(self
):
1085 # Truncate implicitly flushes the buffer.
1086 with self
.open(support
.TESTFN
, self
.write_mode
, buffering
=0) as raw
:
1087 bufio
= self
.tp(raw
, 8)
1088 bufio
.write(b
"abcdef")
1089 self
.assertEqual(bufio
.truncate(3), 3)
1090 self
.assertEqual(bufio
.tell(), 6)
1091 with self
.open(support
.TESTFN
, "rb", buffering
=0) as f
:
1092 self
.assertEqual(f
.read(), b
"abc")
1094 @unittest.skipUnless(threading
, 'Threading required for this test.')
1095 @support.requires_resource('cpu')
1096 def test_threads(self
):
1098 # Write out many bytes from many threads and test they were
1101 contents
= bytes(range(256)) * N
1102 sizes
= cycle([1, 19])
1105 while n
< len(contents
):
1107 queue
.append(contents
[n
:n
+size
])
1110 # We use a real file object because it allows us to
1111 # exercise situations where the GIL is released before
1112 # writing the buffer to the raw streams. This is in addition
1113 # to concurrency issues due to switching threads in the middle
1115 with self
.open(support
.TESTFN
, self
.write_mode
, buffering
=0) as raw
:
1116 bufio
= self
.tp(raw
, 8)
1126 except Exception as e
:
1129 threads
= [threading
.Thread(target
=f
) for x
in range(20)]
1132 time
.sleep(0.02) # yield
1135 self
.assertFalse(errors
,
1136 "the following exceptions were caught: %r" % errors
)
1138 with self
.open(support
.TESTFN
, "rb") as f
:
1140 for i
in range(256):
1141 self
.assertEqual(s
.count(bytes([i
])), N
)
1143 support
.unlink(support
.TESTFN
)
1145 def test_misbehaved_io(self
):
1146 rawio
= self
.MisbehavedRawIO()
1147 bufio
= self
.tp(rawio
, 5)
1148 self
.assertRaises(IOError, bufio
.seek
, 0)
1149 self
.assertRaises(IOError, bufio
.tell
)
1150 self
.assertRaises(IOError, bufio
.write
, b
"abcdef")
1152 def test_max_buffer_size_deprecation(self
):
1153 with support
.check_warnings(("max_buffer_size is deprecated",
1154 DeprecationWarning)):
1155 self
.tp(self
.MockRawIO(), 8, 12)
1158 class CBufferedWriterTest(BufferedWriterTest
):
1159 tp
= io
.BufferedWriter
1161 def test_constructor(self
):
1162 BufferedWriterTest
.test_constructor(self
)
1163 # The allocation can succeed on 32-bit builds, e.g. with more
1164 # than 2GB RAM and a 64-bit kernel.
1165 if sys
.maxsize
> 0x7FFFFFFF:
1166 rawio
= self
.MockRawIO()
1167 bufio
= self
.tp(rawio
)
1168 self
.assertRaises((OverflowError, MemoryError, ValueError),
1169 bufio
.__init
__, rawio
, sys
.maxsize
)
1171 def test_initialization(self
):
1172 rawio
= self
.MockRawIO()
1173 bufio
= self
.tp(rawio
)
1174 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
1175 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1176 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
1177 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1178 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
1179 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1181 def test_garbage_collection(self
):
1182 # C BufferedWriter objects are collected, and collecting them flushes
1184 # The Python version has __del__, so it ends into gc.garbage instead
1185 rawio
= self
.FileIO(support
.TESTFN
, "w+b")
1191 support
.gc_collect()
1192 self
.assertTrue(wr() is None, wr
)
1193 with self
.open(support
.TESTFN
, "rb") as f
:
1194 self
.assertEqual(f
.read(), b
"123xxx")
1197 class PyBufferedWriterTest(BufferedWriterTest
):
1198 tp
= pyio
.BufferedWriter
1200 class BufferedRWPairTest(unittest
.TestCase
):
1202 def test_constructor(self
):
1203 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1204 self
.assertFalse(pair
.closed
)
1206 def test_detach(self
):
1207 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1208 self
.assertRaises(self
.UnsupportedOperation
, pair
.detach
)
1210 def test_constructor_max_buffer_size_deprecation(self
):
1211 with support
.check_warnings(("max_buffer_size is deprecated",
1212 DeprecationWarning)):
1213 self
.tp(self
.MockRawIO(), self
.MockRawIO(), 8, 12)
1215 def test_constructor_with_not_readable(self
):
1216 class NotReadable(MockRawIO
):
1220 self
.assertRaises(IOError, self
.tp
, NotReadable(), self
.MockRawIO())
1222 def test_constructor_with_not_writeable(self
):
1223 class NotWriteable(MockRawIO
):
1227 self
.assertRaises(IOError, self
.tp
, self
.MockRawIO(), NotWriteable())
1229 def test_read(self
):
1230 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1232 self
.assertEqual(pair
.read(3), b
"abc")
1233 self
.assertEqual(pair
.read(1), b
"d")
1234 self
.assertEqual(pair
.read(), b
"ef")
1235 pair
= self
.tp(self
.BytesIO(b
"abc"), self
.MockRawIO())
1236 self
.assertEqual(pair
.read(None), b
"abc")
1238 def test_readlines(self
):
1239 pair
= lambda: self
.tp(self
.BytesIO(b
"abc\ndef\nh"), self
.MockRawIO())
1240 self
.assertEqual(pair().readlines(), [b
"abc\n", b
"def\n", b
"h"])
1241 self
.assertEqual(pair().readlines(), [b
"abc\n", b
"def\n", b
"h"])
1242 self
.assertEqual(pair().readlines(5), [b
"abc\n", b
"def\n"])
1244 def test_read1(self
):
1245 # .read1() is delegated to the underlying reader object, so this test
1247 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1249 self
.assertEqual(pair
.read1(3), b
"abc")
1251 def test_readinto(self
):
1252 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1255 self
.assertEqual(pair
.readinto(data
), 5)
1256 self
.assertEqual(data
, b
"abcde")
1258 def test_write(self
):
1259 w
= self
.MockRawIO()
1260 pair
= self
.tp(self
.MockRawIO(), w
)
1266 self
.assertEqual(w
._write
_stack
, [b
"abc", b
"def"])
1268 def test_peek(self
):
1269 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1271 self
.assertTrue(pair
.peek(3).startswith(b
"abc"))
1272 self
.assertEqual(pair
.read(3), b
"abc")
1274 def test_readable(self
):
1275 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1276 self
.assertTrue(pair
.readable())
1278 def test_writeable(self
):
1279 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1280 self
.assertTrue(pair
.writable())
1282 def test_seekable(self
):
1283 # BufferedRWPairs are never seekable, even if their readers and writers
1285 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1286 self
.assertFalse(pair
.seekable())
1288 # .flush() is delegated to the underlying writer object and has been
1289 # tested in the test_write method.
1291 def test_close_and_closed(self
):
1292 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1293 self
.assertFalse(pair
.closed
)
1295 self
.assertTrue(pair
.closed
)
1297 def test_isatty(self
):
1298 class SelectableIsAtty(MockRawIO
):
1299 def __init__(self
, isatty
):
1300 MockRawIO
.__init
__(self
)
1301 self
._isatty
= isatty
1306 pair
= self
.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1307 self
.assertFalse(pair
.isatty())
1309 pair
= self
.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1310 self
.assertTrue(pair
.isatty())
1312 pair
= self
.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1313 self
.assertTrue(pair
.isatty())
1315 pair
= self
.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1316 self
.assertTrue(pair
.isatty())
1318 class CBufferedRWPairTest(BufferedRWPairTest
):
1319 tp
= io
.BufferedRWPair
1321 class PyBufferedRWPairTest(BufferedRWPairTest
):
1322 tp
= pyio
.BufferedRWPair
1325 class BufferedRandomTest(BufferedReaderTest
, BufferedWriterTest
):
1329 def test_constructor(self
):
1330 BufferedReaderTest
.test_constructor(self
)
1331 BufferedWriterTest
.test_constructor(self
)
1333 def test_read_and_write(self
):
1334 raw
= self
.MockRawIO((b
"asdf", b
"ghjk"))
1335 rw
= self
.tp(raw
, 8)
1337 self
.assertEqual(b
"as", rw
.read(2))
1340 self
.assertFalse(raw
._write
_stack
) # Buffer writes
1341 self
.assertEqual(b
"ghjk", rw
.read())
1342 self
.assertEqual(b
"dddeee", raw
._write
_stack
[0])
1344 def test_seek_and_tell(self
):
1345 raw
= self
.BytesIO(b
"asdfghjkl")
1348 self
.assertEqual(b
"as", rw
.read(2))
1349 self
.assertEqual(2, rw
.tell())
1351 self
.assertEqual(b
"asdf", rw
.read(4))
1355 self
.assertEqual(b
"asdfasdfl", rw
.read())
1356 self
.assertEqual(9, rw
.tell())
1358 self
.assertEqual(5, rw
.tell())
1360 self
.assertEqual(7, rw
.tell())
1361 self
.assertEqual(b
"fl", rw
.read(11))
1362 self
.assertRaises(TypeError, rw
.seek
, 0.0)
1364 def check_flush_and_read(self
, read_func
):
1365 raw
= self
.BytesIO(b
"abcdefghi")
1366 bufio
= self
.tp(raw
)
1368 self
.assertEqual(b
"ab", read_func(bufio
, 2))
1370 self
.assertEqual(b
"ef", read_func(bufio
, 2))
1371 self
.assertEqual(6, bufio
.tell())
1373 self
.assertEqual(6, bufio
.tell())
1374 self
.assertEqual(b
"ghi", read_func(bufio
))
1377 # flush() resets the read buffer
1380 self
.assertEqual(b
"XYZ", read_func(bufio
, 3))
1382 def test_flush_and_read(self
):
1383 self
.check_flush_and_read(lambda bufio
, *args
: bufio
.read(*args
))
1385 def test_flush_and_readinto(self
):
1386 def _readinto(bufio
, n
=-1):
1387 b
= bytearray(n
if n
>= 0 else 9999)
1388 n
= bufio
.readinto(b
)
1390 self
.check_flush_and_read(_readinto
)
1392 def test_flush_and_peek(self
):
1393 def _peek(bufio
, n
=-1):
1394 # This relies on the fact that the buffer can contain the whole
1395 # raw stream, otherwise peek() can return less.
1399 bufio
.seek(len(b
), 1)
1401 self
.check_flush_and_read(_peek
)
1403 def test_flush_and_write(self
):
1404 raw
= self
.BytesIO(b
"abcdefghi")
1405 bufio
= self
.tp(raw
)
1412 self
.assertEqual(b
"12345fghi", raw
.getvalue())
1413 self
.assertEqual(b
"12345fghi", bufio
.read())
1415 def test_threads(self
):
1416 BufferedReaderTest
.test_threads(self
)
1417 BufferedWriterTest
.test_threads(self
)
1419 def test_writes_and_peek(self
):
1422 self
.check_writes(_peek
)
1428 self
.check_writes(_peek
)
1430 def test_writes_and_reads(self
):
1434 self
.check_writes(_read
)
1436 def test_writes_and_read1s(self
):
1440 self
.check_writes(_read1
)
1442 def test_writes_and_readintos(self
):
1445 bufio
.readinto(bytearray(1))
1446 self
.check_writes(_read
)
1448 def test_write_after_readahead(self
):
1449 # Issue #6629: writing after the buffer was filled by readahead should
1450 # first rewind the raw stream.
1451 for overwrite_size
in [1, 5]:
1452 raw
= self
.BytesIO(b
"A" * 10)
1453 bufio
= self
.tp(raw
, 4)
1455 self
.assertEqual(bufio
.read(1), b
"A")
1456 self
.assertEqual(bufio
.tell(), 1)
1457 # Overwriting should rewind the raw stream if it needs so
1458 bufio
.write(b
"B" * overwrite_size
)
1459 self
.assertEqual(bufio
.tell(), overwrite_size
+ 1)
1460 # If the write size was smaller than the buffer size, flush() and
1461 # check that rewind happens.
1463 self
.assertEqual(bufio
.tell(), overwrite_size
+ 1)
1466 b
"A" + b
"B" * overwrite_size
+ b
"A" * (9 - overwrite_size
))
1468 def test_write_rewind_write(self
):
1469 # Various combinations of reading / writing / seeking backwards / writing again
1470 def mutate(bufio
, pos1
, pos2
):
1474 bufio
.read(pos2
- pos1
)
1475 bufio
.write(b
'\x02')
1476 # This writes earlier than the previous write, but still inside
1479 bufio
.write(b
'\x01')
1481 b
= b
"\x80\x81\x82\x83\x84"
1482 for i
in range(0, len(b
)):
1483 for j
in range(i
, len(b
)):
1484 raw
= self
.BytesIO(b
)
1485 bufio
= self
.tp(raw
, 100)
1488 expected
= bytearray(b
)
1491 self
.assertEqual(raw
.getvalue(), expected
,
1492 "failed result for i=%d, j=%d" % (i
, j
))
1494 def test_truncate_after_read_or_write(self
):
1495 raw
= self
.BytesIO(b
"A" * 10)
1496 bufio
= self
.tp(raw
, 100)
1497 self
.assertEqual(bufio
.read(2), b
"AA") # the read buffer gets filled
1498 self
.assertEqual(bufio
.truncate(), 2)
1499 self
.assertEqual(bufio
.write(b
"BB"), 2) # the write buffer increases
1500 self
.assertEqual(bufio
.truncate(), 4)
1502 def test_misbehaved_io(self
):
1503 BufferedReaderTest
.test_misbehaved_io(self
)
1504 BufferedWriterTest
.test_misbehaved_io(self
)
1506 class CBufferedRandomTest(CBufferedReaderTest
, CBufferedWriterTest
, BufferedRandomTest
):
1507 tp
= io
.BufferedRandom
1509 def test_constructor(self
):
1510 BufferedRandomTest
.test_constructor(self
)
1511 # The allocation can succeed on 32-bit builds, e.g. with more
1512 # than 2GB RAM and a 64-bit kernel.
1513 if sys
.maxsize
> 0x7FFFFFFF:
1514 rawio
= self
.MockRawIO()
1515 bufio
= self
.tp(rawio
)
1516 self
.assertRaises((OverflowError, MemoryError, ValueError),
1517 bufio
.__init
__, rawio
, sys
.maxsize
)
1519 def test_garbage_collection(self
):
1520 CBufferedReaderTest
.test_garbage_collection(self
)
1521 CBufferedWriterTest
.test_garbage_collection(self
)
1523 class PyBufferedRandomTest(BufferedRandomTest
):
1524 tp
= pyio
.BufferedRandom
1527 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1529 # - A single output character can correspond to many bytes of input.
1530 # - The number of input bytes to complete the character can be
1531 # undetermined until the last input byte is received.
1532 # - The number of input bytes can vary depending on previous input.
1533 # - A single input byte can correspond to many characters of output.
1534 # - The number of output characters can be undetermined until the
1535 # last input byte is received.
1536 # - The number of output characters can vary depending on previous input.
1538 class StatefulIncrementalDecoder(codecs
.IncrementalDecoder
):
1540 For testing seek/tell behavior with a stateful, buffering decoder.
1542 Input is a sequence of words. Words may be fixed-length (length set
1543 by input) or variable-length (period-terminated). In variable-length
1544 mode, extra periods are ignored. Possible words are:
1545 - 'i' followed by a number sets the input length, I (maximum 99).
1546 When I is set to 0, words are space-terminated.
1547 - 'o' followed by a number sets the output length, O (maximum 99).
1548 - Any other word is converted into a word followed by a period on
1549 the output. The output word consists of the input word truncated
1550 or padded out with hyphens to make its length equal to O. If O
1551 is 0, the word is output verbatim without truncating or padding.
1552 I and O are initially set to 1. When I changes, any buffered input is
1553 re-scanned according to the new I. EOF also terminates the last word.
1556 def __init__(self
, errors
='strict'):
1557 codecs
.IncrementalDecoder
.__init
__(self
, errors
)
1561 return '<SID %x>' % id(self
)
1566 self
.buffer = bytearray()
1569 i
, o
= self
.i ^
1, self
.o ^
1 # so that flags = 0 after reset()
1570 return bytes(self
.buffer), i
*100 + o
1572 def setstate(self
, state
):
1574 self
.buffer = bytearray(buffer)
1575 i
, o
= divmod(io
, 100)
1576 self
.i
, self
.o
= i ^
1, o ^
1
1578 def decode(self
, input, final
=False):
1581 if self
.i
== 0: # variable-length, terminated with period
1584 output
+= self
.process_word()
1586 self
.buffer.append(b
)
1587 else: # fixed-length, terminate after self.i bytes
1588 self
.buffer.append(b
)
1589 if len(self
.buffer) == self
.i
:
1590 output
+= self
.process_word()
1591 if final
and self
.buffer: # EOF terminates the last word
1592 output
+= self
.process_word()
1595 def process_word(self
):
1597 if self
.buffer[0] == ord('i'):
1598 self
.i
= min(99, int(self
.buffer[1:] or 0)) # set input length
1599 elif self
.buffer[0] == ord('o'):
1600 self
.o
= min(99, int(self
.buffer[1:] or 0)) # set output length
1602 output
= self
.buffer.decode('ascii')
1603 if len(output
) < self
.o
:
1604 output
+= '-'*self
.o
# pad out with hyphens
1606 output
= output
[:self
.o
] # truncate to output length
1608 self
.buffer = bytearray()
1611 codecEnabled
= False
1614 def lookupTestDecoder(cls
, name
):
1615 if cls
.codecEnabled
and name
== 'test_decoder':
1616 latin1
= codecs
.lookup('latin-1')
1617 return codecs
.CodecInfo(
1618 name
='test_decoder', encode
=latin1
.encode
, decode
=None,
1619 incrementalencoder
=None,
1620 streamreader
=None, streamwriter
=None,
1621 incrementaldecoder
=cls
)
1623 # Register the previous decoder for testing.
1624 # Disabled by default, tests will enable it.
1625 codecs
.register(StatefulIncrementalDecoder
.lookupTestDecoder
)
1628 class StatefulIncrementalDecoderTest(unittest
.TestCase
):
1630 Make sure the StatefulIncrementalDecoder actually works.
1634 # I=1, O=1 (fixed-length input == fixed-length output)
1635 (b
'abcd', False, 'a.b.c.d.'),
1636 # I=0, O=0 (variable-length input, variable-length output)
1637 (b
'oiabcd', True, 'abcd.'),
1638 # I=0, O=0 (should ignore extra periods)
1639 (b
'oi...abcd...', True, 'abcd.'),
1640 # I=0, O=6 (variable-length input, fixed-length output)
1641 (b
'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1642 # I=2, O=6 (fixed-length input < fixed-length output)
1643 (b
'i.i2.o6xyz', True, 'xy----.z-----.'),
1644 # I=6, O=3 (fixed-length input > fixed-length output)
1645 (b
'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1646 # I=0, then 3; O=29, then 15 (with longer output)
1647 (b
'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1648 'a----------------------------.' +
1649 'b----------------------------.' +
1650 'cde--------------------------.' +
1651 'abcdefghijabcde.' +
1652 'a.b------------.' +
1653 '.c.------------.' +
1654 'd.e------------.' +
1655 'k--------------.' +
1656 'l--------------.' +
1660 def test_decoder(self
):
1661 # Try a few one-shot test cases.
1662 for input, eof
, output
in self
.test_cases
:
1663 d
= StatefulIncrementalDecoder()
1664 self
.assertEqual(d
.decode(input, eof
), output
)
1666 # Also test an unfinished decode, followed by forcing EOF.
1667 d
= StatefulIncrementalDecoder()
1668 self
.assertEqual(d
.decode(b
'oiabcd'), '')
1669 self
.assertEqual(d
.decode(b
'', 1), 'abcd.')
1671 class TextIOWrapperTest(unittest
.TestCase
):
1674 self
.testdata
= b
"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1675 self
.normalized
= b
"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1676 support
.unlink(support
.TESTFN
)
1679 support
.unlink(support
.TESTFN
)
1681 def test_constructor(self
):
1682 r
= self
.BytesIO(b
"\xc3\xa9\n\n")
1683 b
= self
.BufferedReader(r
, 1000)
1684 t
= self
.TextIOWrapper(b
)
1685 t
.__init
__(b
, encoding
="latin1", newline
="\r\n")
1686 self
.assertEqual(t
.encoding
, "latin1")
1687 self
.assertEqual(t
.line_buffering
, False)
1688 t
.__init
__(b
, encoding
="utf8", line_buffering
=True)
1689 self
.assertEqual(t
.encoding
, "utf8")
1690 self
.assertEqual(t
.line_buffering
, True)
1691 self
.assertEqual("\xe9\n", t
.readline())
1692 self
.assertRaises(TypeError, t
.__init
__, b
, newline
=42)
1693 self
.assertRaises(ValueError, t
.__init
__, b
, newline
='xyzzy')
1695 def test_detach(self
):
1697 b
= self
.BufferedWriter(r
)
1698 t
= self
.TextIOWrapper(b
)
1699 self
.assertIs(t
.detach(), b
)
1701 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1703 self
.assertFalse(r
.getvalue())
1705 self
.assertEqual(r
.getvalue(), b
"howdy")
1706 self
.assertRaises(ValueError, t
.detach
)
1708 def test_repr(self
):
1709 raw
= self
.BytesIO("hello".encode("utf-8"))
1710 b
= self
.BufferedReader(raw
)
1711 t
= self
.TextIOWrapper(b
, encoding
="utf-8")
1712 modname
= self
.TextIOWrapper
.__module
__
1713 self
.assertEqual(repr(t
),
1714 "<%s.TextIOWrapper encoding='utf-8'>" % modname
)
1716 self
.assertEqual(repr(t
),
1717 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname
)
1719 self
.assertEqual(repr(t
),
1720 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname
)
1722 def test_line_buffering(self
):
1724 b
= self
.BufferedWriter(r
, 1000)
1725 t
= self
.TextIOWrapper(b
, newline
="\n", line_buffering
=True)
1727 self
.assertEqual(r
.getvalue(), b
"") # No flush happened
1729 self
.assertEqual(r
.getvalue(), b
"XY\nZ") # All got flushed
1731 self
.assertEqual(r
.getvalue(), b
"XY\nZA\rB")
1733 def test_encoding(self
):
1734 # Check the encoding attribute is always set, and valid
1736 t
= self
.TextIOWrapper(b
, encoding
="utf8")
1737 self
.assertEqual(t
.encoding
, "utf8")
1738 t
= self
.TextIOWrapper(b
)
1739 self
.assertTrue(t
.encoding
is not None)
1740 codecs
.lookup(t
.encoding
)
1742 def test_encoding_errors_reading(self
):
1744 b
= self
.BytesIO(b
"abc\n\xff\n")
1745 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1746 self
.assertRaises(UnicodeError, t
.read
)
1747 # (2) explicit strict
1748 b
= self
.BytesIO(b
"abc\n\xff\n")
1749 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="strict")
1750 self
.assertRaises(UnicodeError, t
.read
)
1752 b
= self
.BytesIO(b
"abc\n\xff\n")
1753 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="ignore")
1754 self
.assertEqual(t
.read(), "abc\n\n")
1756 b
= self
.BytesIO(b
"abc\n\xff\n")
1757 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="replace")
1758 self
.assertEqual(t
.read(), "abc\n\ufffd\n")
1760 def test_encoding_errors_writing(self
):
1763 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1764 self
.assertRaises(UnicodeError, t
.write
, "\xff")
1765 # (2) explicit strict
1767 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="strict")
1768 self
.assertRaises(UnicodeError, t
.write
, "\xff")
1771 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="ignore",
1773 t
.write("abc\xffdef\n")
1775 self
.assertEqual(b
.getvalue(), b
"abcdef\n")
1778 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="replace",
1780 t
.write("abc\xffdef\n")
1782 self
.assertEqual(b
.getvalue(), b
"abc?def\n")
1784 def test_newlines(self
):
1785 input_lines
= [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1788 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1789 [ '', input_lines
],
1790 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1791 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1792 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1796 'utf-16', 'utf-16-le', 'utf-16-be',
1797 'utf-32', 'utf-32-le', 'utf-32-be',
1800 # Try a range of buffer sizes to test the case where \r is the last
1801 # character in TextIOWrapper._pending_line.
1802 for encoding
in encodings
:
1803 # XXX: str.encode() should return bytes
1804 data
= bytes(''.join(input_lines
).encode(encoding
))
1805 for do_reads
in (False, True):
1806 for bufsize
in range(1, 10):
1807 for newline
, exp_lines
in tests
:
1808 bufio
= self
.BufferedReader(self
.BytesIO(data
), bufsize
)
1809 textio
= self
.TextIOWrapper(bufio
, newline
=newline
,
1817 self
.assertEqual(len(c2
), 2)
1818 got_lines
.append(c2
+ textio
.readline())
1820 got_lines
= list(textio
)
1822 for got_line
, exp_line
in zip(got_lines
, exp_lines
):
1823 self
.assertEqual(got_line
, exp_line
)
1824 self
.assertEqual(len(got_lines
), len(exp_lines
))
1826 def test_newlines_input(self
):
1827 testdata
= b
"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1828 normalized
= testdata
.replace(b
"\r\n", b
"\n").replace(b
"\r", b
"\n")
1829 for newline
, expected
in [
1830 (None, normalized
.decode("ascii").splitlines(True)),
1831 ("", testdata
.decode("ascii").splitlines(True)),
1832 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1833 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1834 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1836 buf
= self
.BytesIO(testdata
)
1837 txt
= self
.TextIOWrapper(buf
, encoding
="ascii", newline
=newline
)
1838 self
.assertEqual(txt
.readlines(), expected
)
1840 self
.assertEqual(txt
.read(), "".join(expected
))
1842 def test_newlines_output(self
):
1844 "": b
"AAA\nBBB\nCCC\nX\rY\r\nZ",
1845 "\n": b
"AAA\nBBB\nCCC\nX\rY\r\nZ",
1846 "\r": b
"AAA\rBBB\rCCC\rX\rY\r\rZ",
1847 "\r\n": b
"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1849 tests
= [(None, testdict
[os
.linesep
])] + sorted(testdict
.items())
1850 for newline
, expected
in tests
:
1851 buf
= self
.BytesIO()
1852 txt
= self
.TextIOWrapper(buf
, encoding
="ascii", newline
=newline
)
1854 txt
.write("BB\nCCC\n")
1855 txt
.write("X\rY\r\nZ")
1857 self
.assertEqual(buf
.closed
, False)
1858 self
.assertEqual(buf
.getvalue(), expected
)
1860 def test_destructor(self
):
1863 class MyBytesIO(base
):
1865 l
.append(self
.getvalue())
1868 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1871 support
.gc_collect()
1872 self
.assertEqual([b
"abc"], l
)
1874 def test_override_destructor(self
):
1876 class MyTextIO(self
.TextIOWrapper
):
1880 f
= super(MyTextIO
, self
).__del
__
1881 except AttributeError:
1887 super(MyTextIO
, self
).close()
1890 super(MyTextIO
, self
).flush()
1892 t
= MyTextIO(b
, encoding
="ascii")
1894 support
.gc_collect()
1895 self
.assertEqual(record
, [1, 2, 3])
1897 def test_error_through_destructor(self
):
1898 # Test that the exception state is not modified by a destructor,
1899 # even if close() fails.
1900 rawio
= self
.CloseFailureIO()
1902 self
.TextIOWrapper(rawio
).xyzzy
1903 with support
.captured_output("stderr") as s
:
1904 self
.assertRaises(AttributeError, f
)
1905 s
= s
.getvalue().strip()
1907 # The destructor *may* have printed an unraisable error, check it
1908 self
.assertEqual(len(s
.splitlines()), 1)
1909 self
.assertTrue(s
.startswith("Exception IOError: "), s
)
1910 self
.assertTrue(s
.endswith(" ignored"), s
)
1912 # Systematic tests of the text I/O API
1914 def test_basic_io(self
):
1915 for chunksize
in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1916 for enc
in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1917 f
= self
.open(support
.TESTFN
, "w+", encoding
=enc
)
1918 f
._CHUNK
_SIZE
= chunksize
1919 self
.assertEqual(f
.write("abc"), 3)
1921 f
= self
.open(support
.TESTFN
, "r+", encoding
=enc
)
1922 f
._CHUNK
_SIZE
= chunksize
1923 self
.assertEqual(f
.tell(), 0)
1924 self
.assertEqual(f
.read(), "abc")
1926 self
.assertEqual(f
.seek(0), 0)
1927 self
.assertEqual(f
.read(None), "abc")
1929 self
.assertEqual(f
.read(2), "ab")
1930 self
.assertEqual(f
.read(1), "c")
1931 self
.assertEqual(f
.read(1), "")
1932 self
.assertEqual(f
.read(), "")
1933 self
.assertEqual(f
.tell(), cookie
)
1934 self
.assertEqual(f
.seek(0), 0)
1935 self
.assertEqual(f
.seek(0, 2), cookie
)
1936 self
.assertEqual(f
.write("def"), 3)
1937 self
.assertEqual(f
.seek(cookie
), cookie
)
1938 self
.assertEqual(f
.read(), "def")
1939 if enc
.startswith("utf"):
1940 self
.multi_line_test(f
, enc
)
1943 def multi_line_test(self
, f
, enc
):
1946 sample
= "s\xff\u0fff\uffff"
1948 for size
in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1950 for i
in range(size
):
1951 chars
.append(sample
[i
% len(sample
)])
1952 line
= "".join(chars
) + "\n"
1953 wlines
.append((f
.tell(), line
))
1962 rlines
.append((pos
, line
))
1963 self
.assertEqual(rlines
, wlines
)
1965 def test_telling(self
):
1966 f
= self
.open(support
.TESTFN
, "w+", encoding
="utf8")
1973 self
.assertEqual(f
.tell(), p0
)
1974 self
.assertEqual(f
.readline(), "\xff\n")
1975 self
.assertEqual(f
.tell(), p1
)
1976 self
.assertEqual(f
.readline(), "\xff\n")
1977 self
.assertEqual(f
.tell(), p2
)
1980 self
.assertEqual(line
, "\xff\n")
1981 self
.assertRaises(IOError, f
.tell
)
1982 self
.assertEqual(f
.tell(), p2
)
1985 def test_seeking(self
):
1986 chunk_size
= _default_chunk_size()
1987 prefix_size
= chunk_size
- 2
1988 u_prefix
= "a" * prefix_size
1989 prefix
= bytes(u_prefix
.encode("utf-8"))
1990 self
.assertEqual(len(u_prefix
), len(prefix
))
1991 u_suffix
= "\u8888\n"
1992 suffix
= bytes(u_suffix
.encode("utf-8"))
1993 line
= prefix
+ suffix
1994 f
= self
.open(support
.TESTFN
, "wb")
1997 f
= self
.open(support
.TESTFN
, "r", encoding
="utf-8")
1998 s
= f
.read(prefix_size
)
1999 self
.assertEqual(s
, prefix
.decode("ascii"))
2000 self
.assertEqual(f
.tell(), prefix_size
)
2001 self
.assertEqual(f
.readline(), u_suffix
)
2003 def test_seeking_too(self
):
2004 # Regression test for a specific bug
2005 data
= b
'\xe0\xbf\xbf\n'
2006 f
= self
.open(support
.TESTFN
, "wb")
2009 f
= self
.open(support
.TESTFN
, "r", encoding
="utf-8")
2010 f
._CHUNK
_SIZE
# Just test that it exists
2015 def test_seek_and_tell(self
):
2016 #Test seek/tell using the StatefulIncrementalDecoder.
2017 # Make test faster by doing smaller seeks
2020 def test_seek_and_tell_with_data(data
, min_pos
=0):
2021 """Tell/seek to various points within a data stream and ensure
2022 that the decoded data returned by read() is consistent."""
2023 f
= self
.open(support
.TESTFN
, 'wb')
2026 f
= self
.open(support
.TESTFN
, encoding
='test_decoder')
2027 f
._CHUNK
_SIZE
= CHUNK_SIZE
2031 for i
in range(min_pos
, len(decoded
) + 1): # seek positions
2032 for j
in [1, 5, len(decoded
) - i
]: # read lengths
2033 f
= self
.open(support
.TESTFN
, encoding
='test_decoder')
2034 self
.assertEqual(f
.read(i
), decoded
[:i
])
2036 self
.assertEqual(f
.read(j
), decoded
[i
:i
+ j
])
2038 self
.assertEqual(f
.read(), decoded
[i
:])
2041 # Enable the test decoder.
2042 StatefulIncrementalDecoder
.codecEnabled
= 1
2046 # Try each test case.
2047 for input, _
, _
in StatefulIncrementalDecoderTest
.test_cases
:
2048 test_seek_and_tell_with_data(input)
2050 # Position each test case so that it crosses a chunk boundary.
2051 for input, _
, _
in StatefulIncrementalDecoderTest
.test_cases
:
2052 offset
= CHUNK_SIZE
- len(input)//2
2053 prefix
= b
'.'*offset
2054 # Don't bother seeking into the prefix (takes too long).
2056 test_seek_and_tell_with_data(prefix
+ input, min_pos
)
2058 # Ensure our test decoder won't interfere with subsequent tests.
2060 StatefulIncrementalDecoder
.codecEnabled
= 0
2062 def test_encoded_writes(self
):
2070 for encoding
in tests
:
2071 buf
= self
.BytesIO()
2072 f
= self
.TextIOWrapper(buf
, encoding
=encoding
)
2073 # Check if the BOM is written only once (see issue1753).
2077 self
.assertEqual(f
.read(), data
* 2)
2079 self
.assertEqual(f
.read(), data
* 2)
2080 self
.assertEqual(buf
.getvalue(), (data
* 2).encode(encoding
))
2082 def test_unreadable(self
):
2083 class UnReadable(self
.BytesIO
):
2086 txt
= self
.TextIOWrapper(UnReadable())
2087 self
.assertRaises(IOError, txt
.read
)
2089 def test_read_one_by_one(self
):
2090 txt
= self
.TextIOWrapper(self
.BytesIO(b
"AA\r\nBB"))
2097 self
.assertEqual(reads
, "AA\nBB")
2099 def test_readlines(self
):
2100 txt
= self
.TextIOWrapper(self
.BytesIO(b
"AA\nBB\nCC"))
2101 self
.assertEqual(txt
.readlines(), ["AA\n", "BB\n", "CC"])
2103 self
.assertEqual(txt
.readlines(None), ["AA\n", "BB\n", "CC"])
2105 self
.assertEqual(txt
.readlines(5), ["AA\n", "BB\n"])
2107 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2108 def test_read_by_chunk(self
):
2109 # make sure "\r\n" straddles 128 char boundary.
2110 txt
= self
.TextIOWrapper(self
.BytesIO(b
"A" * 127 + b
"\r\nB"))
2117 self
.assertEqual(reads
, "A"*127+"\nB")
2119 def test_issue1395_1(self
):
2120 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2122 # read one char at a time
2129 self
.assertEqual(reads
, self
.normalized
)
2131 def test_issue1395_2(self
):
2132 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2141 self
.assertEqual(reads
, self
.normalized
)
2143 def test_issue1395_3(self
):
2144 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2148 reads
+= txt
.read(4)
2149 reads
+= txt
.readline()
2150 reads
+= txt
.readline()
2151 reads
+= txt
.readline()
2152 self
.assertEqual(reads
, self
.normalized
)
2154 def test_issue1395_4(self
):
2155 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2160 self
.assertEqual(reads
, self
.normalized
)
2162 def test_issue1395_5(self
):
2163 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2170 self
.assertEqual(txt
.read(4), "BBB\n")
2172 def test_issue2282(self
):
2173 buffer = self
.BytesIO(self
.testdata
)
2174 txt
= self
.TextIOWrapper(buffer, encoding
="ascii")
2176 self
.assertEqual(buffer.seekable(), txt
.seekable())
2178 def test_append_bom(self
):
2179 # The BOM is not written again when appending to a non-empty file
2180 filename
= support
.TESTFN
2181 for charset
in ('utf-8-sig', 'utf-16', 'utf-32'):
2182 with self
.open(filename
, 'w', encoding
=charset
) as f
:
2185 with self
.open(filename
, 'rb') as f
:
2186 self
.assertEqual(f
.read(), 'aaa'.encode(charset
))
2188 with self
.open(filename
, 'a', encoding
=charset
) as f
:
2190 with self
.open(filename
, 'rb') as f
:
2191 self
.assertEqual(f
.read(), 'aaaxxx'.encode(charset
))
2193 def test_seek_bom(self
):
2194 # Same test, but when seeking manually
2195 filename
= support
.TESTFN
2196 for charset
in ('utf-8-sig', 'utf-16', 'utf-32'):
2197 with self
.open(filename
, 'w', encoding
=charset
) as f
:
2200 with self
.open(filename
, 'r+', encoding
=charset
) as f
:
2205 with self
.open(filename
, 'rb') as f
:
2206 self
.assertEqual(f
.read(), 'bbbzzz'.encode(charset
))
2208 def test_errors_property(self
):
2209 with self
.open(support
.TESTFN
, "w") as f
:
2210 self
.assertEqual(f
.errors
, "strict")
2211 with self
.open(support
.TESTFN
, "w", errors
="replace") as f
:
2212 self
.assertEqual(f
.errors
, "replace")
2214 @unittest.skipUnless(threading
, 'Threading required for this test.')
2215 def test_threads_write(self
):
2216 # Issue6750: concurrent writes could duplicate data
2217 event
= threading
.Event()
2218 with self
.open(support
.TESTFN
, "w", buffering
=1) as f
:
2220 text
= "Thread%03d\n" % n
2223 threads
= [threading
.Thread(target
=lambda n
=x
: run(n
))
2231 with self
.open(support
.TESTFN
) as f
:
2234 self
.assertEqual(content
.count("Thread%03d\n" % n
), 1)
2236 def test_flush_error_on_close(self
):
2237 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2240 txt
.flush
= bad_flush
2241 self
.assertRaises(IOError, txt
.close
) # exception not swallowed
2243 def test_multi_close(self
):
2244 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2248 self
.assertRaises(ValueError, txt
.flush
)
2250 def test_readonly_attributes(self
):
2251 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2252 buf
= self
.BytesIO(self
.testdata
)
2253 with self
.assertRaises((AttributeError, TypeError)):
2256 class CTextIOWrapperTest(TextIOWrapperTest
):
2258 def test_initialization(self
):
2259 r
= self
.BytesIO(b
"\xc3\xa9\n\n")
2260 b
= self
.BufferedReader(r
, 1000)
2261 t
= self
.TextIOWrapper(b
)
2262 self
.assertRaises(TypeError, t
.__init
__, b
, newline
=42)
2263 self
.assertRaises(ValueError, t
.read
)
2264 self
.assertRaises(ValueError, t
.__init
__, b
, newline
='xyzzy')
2265 self
.assertRaises(ValueError, t
.read
)
2267 def test_garbage_collection(self
):
2268 # C TextIOWrapper objects are collected, and collecting them flushes
2270 # The Python version has __del__, so it ends in gc.garbage instead.
2271 rawio
= io
.FileIO(support
.TESTFN
, "wb")
2272 b
= self
.BufferedWriter(rawio
)
2273 t
= self
.TextIOWrapper(b
, encoding
="ascii")
2278 support
.gc_collect()
2279 self
.assertTrue(wr() is None, wr
)
2280 with self
.open(support
.TESTFN
, "rb") as f
:
2281 self
.assertEqual(f
.read(), b
"456def")
2283 class PyTextIOWrapperTest(TextIOWrapperTest
):
2287 class IncrementalNewlineDecoderTest(unittest
.TestCase
):
2289 def check_newline_decoding_utf8(self
, decoder
):
2290 # UTF-8 specific tests for a newline decoder
2291 def _check_decode(b
, s
, **kwargs
):
2292 # We exercise getstate() / setstate() as well as decode()
2293 state
= decoder
.getstate()
2294 self
.assertEqual(decoder
.decode(b
, **kwargs
), s
)
2295 decoder
.setstate(state
)
2296 self
.assertEqual(decoder
.decode(b
, **kwargs
), s
)
2298 _check_decode(b
'\xe8\xa2\x88', "\u8888")
2300 _check_decode(b
'\xe8', "")
2301 _check_decode(b
'\xa2', "")
2302 _check_decode(b
'\x88', "\u8888")
2304 _check_decode(b
'\xe8', "")
2305 _check_decode(b
'\xa2', "")
2306 _check_decode(b
'\x88', "\u8888")
2308 _check_decode(b
'\xe8', "")
2309 self
.assertRaises(UnicodeDecodeError, decoder
.decode
, b
'', final
=True)
2312 _check_decode(b
'\n', "\n")
2313 _check_decode(b
'\r', "")
2314 _check_decode(b
'', "\n", final
=True)
2315 _check_decode(b
'\r', "\n", final
=True)
2317 _check_decode(b
'\r', "")
2318 _check_decode(b
'a', "\na")
2320 _check_decode(b
'\r\r\n', "\n\n")
2321 _check_decode(b
'\r', "")
2322 _check_decode(b
'\r', "\n")
2323 _check_decode(b
'\na', "\na")
2325 _check_decode(b
'\xe8\xa2\x88\r\n', "\u8888\n")
2326 _check_decode(b
'\xe8\xa2\x88', "\u8888")
2327 _check_decode(b
'\n', "\n")
2328 _check_decode(b
'\xe8\xa2\x88\r', "\u8888")
2329 _check_decode(b
'\n', "\n")
2331 def check_newline_decoding(self
, decoder
, encoding
):
2333 if encoding
is not None:
2334 encoder
= codecs
.getincrementalencoder(encoding
)()
2335 def _decode_bytewise(s
):
2336 # Decode one byte at a time
2337 for b
in encoder
.encode(s
):
2338 result
.append(decoder
.decode(b
))
2341 def _decode_bytewise(s
):
2342 # Decode one char at a time
2344 result
.append(decoder
.decode(c
))
2345 self
.assertEqual(decoder
.newlines
, None)
2346 _decode_bytewise("abc\n\r")
2347 self
.assertEqual(decoder
.newlines
, '\n')
2348 _decode_bytewise("\nabc")
2349 self
.assertEqual(decoder
.newlines
, ('\n', '\r\n'))
2350 _decode_bytewise("abc\r")
2351 self
.assertEqual(decoder
.newlines
, ('\n', '\r\n'))
2352 _decode_bytewise("abc")
2353 self
.assertEqual(decoder
.newlines
, ('\r', '\n', '\r\n'))
2354 _decode_bytewise("abc\r")
2355 self
.assertEqual("".join(result
), "abc\n\nabcabc\nabcabc")
2358 if encoder
is not None:
2360 input = encoder
.encode(input)
2361 self
.assertEqual(decoder
.decode(input), "abc")
2362 self
.assertEqual(decoder
.newlines
, None)
2364 def test_newline_decoder(self
):
2366 # None meaning the IncrementalNewlineDecoder takes unicode input
2367 # rather than bytes input
2368 None, 'utf-8', 'latin-1',
2369 'utf-16', 'utf-16-le', 'utf-16-be',
2370 'utf-32', 'utf-32-le', 'utf-32-be',
2372 for enc
in encodings
:
2373 decoder
= enc
and codecs
.getincrementaldecoder(enc
)()
2374 decoder
= self
.IncrementalNewlineDecoder(decoder
, translate
=True)
2375 self
.check_newline_decoding(decoder
, enc
)
2376 decoder
= codecs
.getincrementaldecoder("utf-8")()
2377 decoder
= self
.IncrementalNewlineDecoder(decoder
, translate
=True)
2378 self
.check_newline_decoding_utf8(decoder
)
2380 def test_newline_bytes(self
):
2381 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2383 self
.assertEqual(dec
.newlines
, None)
2384 self
.assertEqual(dec
.decode("\u0D00"), "\u0D00")
2385 self
.assertEqual(dec
.newlines
, None)
2386 self
.assertEqual(dec
.decode("\u0A00"), "\u0A00")
2387 self
.assertEqual(dec
.newlines
, None)
2388 dec
= self
.IncrementalNewlineDecoder(None, translate
=False)
2390 dec
= self
.IncrementalNewlineDecoder(None, translate
=True)
2393 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest
):
2396 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest
):
2400 # XXX Tests for open()
2402 class MiscIOTest(unittest
.TestCase
):
2405 support
.unlink(support
.TESTFN
)
2407 def test___all__(self
):
2408 for name
in self
.io
.__all
__:
2409 obj
= getattr(self
.io
, name
, None)
2410 self
.assertTrue(obj
is not None, name
)
2413 elif "error" in name
.lower() or name
== "UnsupportedOperation":
2414 self
.assertTrue(issubclass(obj
, Exception), name
)
2415 elif not name
.startswith("SEEK_"):
2416 self
.assertTrue(issubclass(obj
, self
.IOBase
))
2418 def test_attributes(self
):
2419 f
= self
.open(support
.TESTFN
, "wb", buffering
=0)
2420 self
.assertEqual(f
.mode
, "wb")
2423 f
= self
.open(support
.TESTFN
, "U")
2424 self
.assertEqual(f
.name
, support
.TESTFN
)
2425 self
.assertEqual(f
.buffer.name
, support
.TESTFN
)
2426 self
.assertEqual(f
.buffer.raw
.name
, support
.TESTFN
)
2427 self
.assertEqual(f
.mode
, "U")
2428 self
.assertEqual(f
.buffer.mode
, "rb")
2429 self
.assertEqual(f
.buffer.raw
.mode
, "rb")
2432 f
= self
.open(support
.TESTFN
, "w+")
2433 self
.assertEqual(f
.mode
, "w+")
2434 self
.assertEqual(f
.buffer.mode
, "rb+") # Does it really matter?
2435 self
.assertEqual(f
.buffer.raw
.mode
, "rb+")
2437 g
= self
.open(f
.fileno(), "wb", closefd
=False)
2438 self
.assertEqual(g
.mode
, "wb")
2439 self
.assertEqual(g
.raw
.mode
, "wb")
2440 self
.assertEqual(g
.name
, f
.fileno())
2441 self
.assertEqual(g
.raw
.name
, f
.fileno())
2445 def test_io_after_close(self
):
2449 {"mode": "w", "buffering": 1},
2450 {"mode": "w", "buffering": 2},
2451 {"mode": "wb", "buffering": 0},
2454 {"mode": "r", "buffering": 1},
2455 {"mode": "r", "buffering": 2},
2456 {"mode": "rb", "buffering": 0},
2459 {"mode": "w+", "buffering": 1},
2460 {"mode": "w+", "buffering": 2},
2461 {"mode": "w+b", "buffering": 0},
2463 f
= self
.open(support
.TESTFN
, **kwargs
)
2465 self
.assertRaises(ValueError, f
.flush
)
2466 self
.assertRaises(ValueError, f
.fileno
)
2467 self
.assertRaises(ValueError, f
.isatty
)
2468 self
.assertRaises(ValueError, f
.__iter
__)
2469 if hasattr(f
, "peek"):
2470 self
.assertRaises(ValueError, f
.peek
, 1)
2471 self
.assertRaises(ValueError, f
.read
)
2472 if hasattr(f
, "read1"):
2473 self
.assertRaises(ValueError, f
.read1
, 1024)
2474 if hasattr(f
, "readall"):
2475 self
.assertRaises(ValueError, f
.readall
)
2476 if hasattr(f
, "readinto"):
2477 self
.assertRaises(ValueError, f
.readinto
, bytearray(1024))
2478 self
.assertRaises(ValueError, f
.readline
)
2479 self
.assertRaises(ValueError, f
.readlines
)
2480 self
.assertRaises(ValueError, f
.seek
, 0)
2481 self
.assertRaises(ValueError, f
.tell
)
2482 self
.assertRaises(ValueError, f
.truncate
)
2483 self
.assertRaises(ValueError, f
.write
,
2484 b
"" if "b" in kwargs
['mode'] else "")
2485 self
.assertRaises(ValueError, f
.writelines
, [])
2486 self
.assertRaises(ValueError, next
, f
)
2488 def test_blockingioerror(self
):
2489 # Various BlockingIOError issues
2490 self
.assertRaises(TypeError, self
.BlockingIOError
)
2491 self
.assertRaises(TypeError, self
.BlockingIOError
, 1)
2492 self
.assertRaises(TypeError, self
.BlockingIOError
, 1, 2, 3, 4)
2493 self
.assertRaises(TypeError, self
.BlockingIOError
, 1, "", None)
2494 b
= self
.BlockingIOError(1, "")
2495 self
.assertEqual(b
.characters_written
, 0)
2499 b
= self
.BlockingIOError(1, c
)
2504 support
.gc_collect()
2505 self
.assertTrue(wr() is None, wr
)
2507 def test_abcs(self
):
2508 # Test the visible base classes are ABCs.
2509 self
.assertIsInstance(self
.IOBase
, abc
.ABCMeta
)
2510 self
.assertIsInstance(self
.RawIOBase
, abc
.ABCMeta
)
2511 self
.assertIsInstance(self
.BufferedIOBase
, abc
.ABCMeta
)
2512 self
.assertIsInstance(self
.TextIOBase
, abc
.ABCMeta
)
2514 def _check_abc_inheritance(self
, abcmodule
):
2515 with self
.open(support
.TESTFN
, "wb", buffering
=0) as f
:
2516 self
.assertIsInstance(f
, abcmodule
.IOBase
)
2517 self
.assertIsInstance(f
, abcmodule
.RawIOBase
)
2518 self
.assertNotIsInstance(f
, abcmodule
.BufferedIOBase
)
2519 self
.assertNotIsInstance(f
, abcmodule
.TextIOBase
)
2520 with self
.open(support
.TESTFN
, "wb") as f
:
2521 self
.assertIsInstance(f
, abcmodule
.IOBase
)
2522 self
.assertNotIsInstance(f
, abcmodule
.RawIOBase
)
2523 self
.assertIsInstance(f
, abcmodule
.BufferedIOBase
)
2524 self
.assertNotIsInstance(f
, abcmodule
.TextIOBase
)
2525 with self
.open(support
.TESTFN
, "w") as f
:
2526 self
.assertIsInstance(f
, abcmodule
.IOBase
)
2527 self
.assertNotIsInstance(f
, abcmodule
.RawIOBase
)
2528 self
.assertNotIsInstance(f
, abcmodule
.BufferedIOBase
)
2529 self
.assertIsInstance(f
, abcmodule
.TextIOBase
)
2531 def test_abc_inheritance(self
):
2532 # Test implementations inherit from their respective ABCs
2533 self
._check
_abc
_inheritance
(self
)
2535 def test_abc_inheritance_official(self
):
2536 # Test implementations inherit from the official ABCs of the
2537 # baseline "io" module.
2538 self
._check
_abc
_inheritance
(io
)
2540 class CMiscIOTest(MiscIOTest
):
2543 class PyMiscIOTest(MiscIOTest
):
2547 @unittest.skipIf(os
.name
== 'nt', 'POSIX signals required for this test.')
2548 class SignalsTest(unittest
.TestCase
):
2551 self
.oldalrm
= signal
.signal(signal
.SIGALRM
, self
.alarm_interrupt
)
2554 signal
.signal(signal
.SIGALRM
, self
.oldalrm
)
2556 def alarm_interrupt(self
, sig
, frame
):
2559 @unittest.skipUnless(threading
, 'Threading required for this test.')
2560 def check_interrupted_write(self
, item
, bytes
, **fdopen_kwargs
):
2561 """Check that a partial write, when it gets interrupted, properly
2562 invokes the signal handler, and bubbles up the exception raised
2567 read_results
.append(s
)
2568 t
= threading
.Thread(target
=_read
)
2572 wio
= self
.io
.open(w
, **fdopen_kwargs
)
2575 # Fill the pipe enough that the write will be blocking.
2576 # It will be interrupted by the timer armed above. Since the
2577 # other thread has read one byte, the low-level write will
2578 # return with a successful (partial) result rather than an EINTR.
2579 # The buffered IO layer must check for pending signal
2580 # handlers, which in this case will invoke alarm_interrupt().
2581 self
.assertRaises(ZeroDivisionError,
2582 wio
.write
, item
* (1024 * 1024))
2584 # We got one byte, get another one and check that it isn't a
2585 # repeat of the first one.
2586 read_results
.append(os
.read(r
, 1))
2587 self
.assertEqual(read_results
, [bytes
[0:1], bytes
[1:2]])
2591 # This is deliberate. If we didn't close the file descriptor
2592 # before closing wio, wio would try to flush its internal
2593 # buffer, and block again.
2596 except IOError as e
:
2597 if e
.errno
!= errno
.EBADF
:
2600 def test_interrupted_write_unbuffered(self
):
2601 self
.check_interrupted_write(b
"xy", b
"xy", mode
="wb", buffering
=0)
2603 def test_interrupted_write_buffered(self
):
2604 self
.check_interrupted_write(b
"xy", b
"xy", mode
="wb")
2606 def test_interrupted_write_text(self
):
2607 self
.check_interrupted_write("xy", b
"xy", mode
="w", encoding
="ascii")
2609 def check_reentrant_write(self
, data
, **fdopen_kwargs
):
2610 def on_alarm(*args
):
2611 # Will be called reentrantly from the same thread
2614 signal
.signal(signal
.SIGALRM
, on_alarm
)
2616 wio
= self
.io
.open(w
, **fdopen_kwargs
)
2619 # Either the reentrant call to wio.write() fails with RuntimeError,
2620 # or the signal handler raises ZeroDivisionError.
2621 with self
.assertRaises((ZeroDivisionError, RuntimeError)) as cm
:
2623 for i
in range(100):
2626 # Make sure the buffer doesn't fill up and block further writes
2627 os
.read(r
, len(data
) * 100)
2629 if isinstance(exc
, RuntimeError):
2630 self
.assertTrue(str(exc
).startswith("reentrant call"), str(exc
))
2635 def test_reentrant_write_buffered(self
):
2636 self
.check_reentrant_write(b
"xy", mode
="wb")
2638 def test_reentrant_write_text(self
):
2639 self
.check_reentrant_write("xy", mode
="w", encoding
="ascii")
2641 def check_interrupted_read_retry(self
, decode
, **fdopen_kwargs
):
2642 """Check that a buffered read, when it gets interrupted (either
2643 returning a partial result or EINTR), properly invokes the signal
2644 handler and retries if the latter returned successfully."""
2646 fdopen_kwargs
["closefd"] = False
2647 def alarm_handler(sig
, frame
):
2649 signal
.signal(signal
.SIGALRM
, alarm_handler
)
2651 rio
= self
.io
.open(r
, **fdopen_kwargs
)
2654 # Expected behaviour:
2655 # - first raw read() returns partial b"foo"
2656 # - second raw read() returns EINTR
2657 # - third raw read() returns b"bar"
2658 self
.assertEqual(decode(rio
.read(6)), "foobar")
2664 def test_interrupterd_read_retry_buffered(self
):
2665 self
.check_interrupted_read_retry(lambda x
: x
.decode('latin1'),
2668 def test_interrupterd_read_retry_text(self
):
2669 self
.check_interrupted_read_retry(lambda x
: x
,
2672 @unittest.skipUnless(threading
, 'Threading required for this test.')
2673 def check_interrupted_write_retry(self
, item
, **fdopen_kwargs
):
2674 """Check that a buffered write, when it gets interrupted (either
2675 returning a partial result or EINTR), properly invokes the signal
2676 handler and retries if the latter returned successfully."""
2677 select
= support
.import_module("select")
2678 # A quantity that exceeds the buffer size of an anonymous pipe's
2682 fdopen_kwargs
["closefd"] = False
2683 # We need a separate thread to read from the pipe and allow the
2684 # write() to finish. This thread is started after the SIGALRM is
2685 # received (forcing a first EINTR in write()).
2687 write_finished
= False
2689 while not write_finished
:
2690 while r
in select
.select([r
], [], [], 1.0)[0]:
2691 s
= os
.read(r
, 1024)
2692 read_results
.append(s
)
2693 t
= threading
.Thread(target
=_read
)
2695 def alarm1(sig
, frame
):
2696 signal
.signal(signal
.SIGALRM
, alarm2
)
2698 def alarm2(sig
, frame
):
2700 signal
.signal(signal
.SIGALRM
, alarm1
)
2702 wio
= self
.io
.open(w
, **fdopen_kwargs
)
2704 # Expected behaviour:
2705 # - first raw write() is partial (because of the limited pipe buffer
2706 # and the first alarm)
2707 # - second raw write() returns EINTR (because of the second alarm)
2708 # - subsequent write()s are successful (either partial or complete)
2709 self
.assertEqual(N
, wio
.write(item
* N
))
2711 write_finished
= True
2713 self
.assertEqual(N
, sum(len(x
) for x
in read_results
))
2715 write_finished
= True
2718 # This is deliberate. If we didn't close the file descriptor
2719 # before closing wio, wio would try to flush its internal
2720 # buffer, and could block (in case of failure).
2723 except IOError as e
:
2724 if e
.errno
!= errno
.EBADF
:
2727 def test_interrupterd_write_retry_buffered(self
):
2728 self
.check_interrupted_write_retry(b
"x", mode
="wb")
2730 def test_interrupterd_write_retry_text(self
):
2731 self
.check_interrupted_write_retry("x", mode
="w", encoding
="latin1")
2734 class CSignalsTest(SignalsTest
):
2737 class PySignalsTest(SignalsTest
):
2740 # Handling reentrancy issues would slow down _pyio even more, so the
2741 # tests are disabled.
2742 test_reentrant_write_buffered
= None
2743 test_reentrant_write_text
= None
2747 tests
= (CIOTest
, PyIOTest
,
2748 CBufferedReaderTest
, PyBufferedReaderTest
,
2749 CBufferedWriterTest
, PyBufferedWriterTest
,
2750 CBufferedRWPairTest
, PyBufferedRWPairTest
,
2751 CBufferedRandomTest
, PyBufferedRandomTest
,
2752 StatefulIncrementalDecoderTest
,
2753 CIncrementalNewlineDecoderTest
, PyIncrementalNewlineDecoderTest
,
2754 CTextIOWrapperTest
, PyTextIOWrapperTest
,
2755 CMiscIOTest
, PyMiscIOTest
,
2756 CSignalsTest
, PySignalsTest
,
2759 # Put the namespaces of the IO module we are testing and some useful mock
2760 # classes in the __dict__ of each test.
2761 mocks
= (MockRawIO
, MisbehavedRawIO
, MockFileIO
, CloseFailureIO
,
2762 MockNonBlockWriterIO
, MockRawIOWithoutRead
)
2763 all_members
= io
.__all
__ + ["IncrementalNewlineDecoder"]
2764 c_io_ns
= dict((name
, getattr(io
, name
)) for name
in all_members
)
2765 py_io_ns
= dict((name
, getattr(pyio
, name
)) for name
in all_members
)
2767 c_io_ns
.update((x
.__name
__, globs
["C" + x
.__name
__]) for x
in mocks
)
2768 py_io_ns
.update((x
.__name
__, globs
["Py" + x
.__name
__]) for x
in mocks
)
2769 # Avoid turning open into a bound method.
2770 py_io_ns
["open"] = pyio
.OpenWrapper
2772 if test
.__name
__.startswith("C"):
2773 for name
, obj
in c_io_ns
.items():
2774 setattr(test
, name
, obj
)
2775 elif test
.__name
__.startswith("Py"):
2776 for name
, obj
in py_io_ns
.items():
2777 setattr(test
, name
, obj
)
2779 support
.run_unittest(*tests
)
2781 if __name__
== "__main__":