1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
19 #include <fcntl.h> // IWYU pragma: keep
35 #include <gtest/gtest.h>
37 #include "arrow/io/buffered.h"
38 #include "arrow/io/file.h"
39 #include "arrow/io/interfaces.h"
40 #include "arrow/io/memory.h"
41 #include "arrow/io/test_common.h"
42 #include "arrow/memory_pool.h"
43 #include "arrow/status.h"
44 #include "arrow/testing/gtest_util.h"
45 #include "arrow/util/io_util.h"
46 #include "arrow/util/string_view.h"
51 using ::arrow::internal::TemporaryDir
;
53 static std::string
GenerateRandomData(size_t nbytes
) {
54 // MSVC doesn't accept uint8_t for std::independent_bits_engine<>
55 typedef unsigned long UInt
; // NOLINT
56 std::independent_bits_engine
<std::default_random_engine
, 8 * sizeof(UInt
), UInt
> engine
;
58 std::vector
<UInt
> data(nbytes
/ sizeof(UInt
) + 1);
59 std::generate(begin(data
), end(data
), std::ref(engine
));
60 return std::string(reinterpret_cast<char*>(data
.data()), nbytes
);
63 template <typename FileType
>
64 class FileTestFixture
: public ::testing::Test
{
67 ASSERT_OK_AND_ASSIGN(temp_dir_
, TemporaryDir::Make("buffered-test-"));
68 path_
= temp_dir_
->path()
69 .Join("arrow-test-io-buffered-stream.txt")
75 void TearDown() { EnsureFileDeleted(); }
77 void EnsureFileDeleted() {
78 if (FileExists(path_
)) {
79 ARROW_UNUSED(std::remove(path_
.c_str()));
83 void AssertTell(int64_t expected
) { ASSERT_OK_AND_EQ(expected
, buffered_
->Tell()); }
87 std::shared_ptr
<FileType
> buffered_
;
89 std::unique_ptr
<TemporaryDir
> temp_dir_
;
92 // ----------------------------------------------------------------------
93 // Buffered output tests
95 constexpr int64_t kDefaultBufferSize
= 4096;
97 class TestBufferedOutputStream
: public FileTestFixture
<BufferedOutputStream
> {
99 void OpenBuffered(int64_t buffer_size
= kDefaultBufferSize
, bool append
= false) {
100 // So that any open file is closed
103 ASSERT_OK_AND_ASSIGN(auto file
, FileOutputStream::Open(path_
, append
));
104 fd_
= file
->file_descriptor();
106 // Workaround for ARROW-2466 ("append" flag doesn't set file pos)
107 #if defined(_MSC_VER)
108 _lseeki64(fd_
, 0, SEEK_END
);
110 lseek(fd_
, 0, SEEK_END
);
113 ASSERT_OK_AND_ASSIGN(buffered_
, BufferedOutputStream::Create(
114 buffer_size
, default_memory_pool(), file
));
117 void WriteChunkwise(const std::string
& datastr
, const std::valarray
<int64_t>& sizes
) {
118 const char* data
= datastr
.data();
119 const int64_t data_size
= static_cast<int64_t>(datastr
.size());
120 int64_t data_pos
= 0;
121 auto size_it
= std::begin(sizes
);
123 // Write datastr, chunk by chunk, until exhausted
125 int64_t size
= *size_it
++;
126 if (size_it
== std::end(sizes
)) {
127 size_it
= std::begin(sizes
);
129 if (data_pos
+ size
> data_size
) {
132 ASSERT_OK(buffered_
->Write(data
+ data_pos
, size
));
135 ASSERT_OK(buffered_
->Write(data
+ data_pos
, data_size
- data_pos
));
139 TEST_F(TestBufferedOutputStream
, DestructorClosesFile
) {
141 ASSERT_FALSE(FileIsClosed(fd_
));
143 ASSERT_TRUE(FileIsClosed(fd_
));
146 TEST_F(TestBufferedOutputStream
, Detach
) {
148 const std::string datastr
= "1234568790";
150 ASSERT_OK(buffered_
->Write(datastr
.data(), 10));
152 ASSERT_OK_AND_ASSIGN(auto detached_stream
, buffered_
->Detach());
154 // Destroying the stream does not close the file because we have detached
156 ASSERT_FALSE(FileIsClosed(fd_
));
158 ASSERT_OK(detached_stream
->Close());
159 ASSERT_TRUE(FileIsClosed(fd_
));
161 AssertFileContents(path_
, datastr
);
164 TEST_F(TestBufferedOutputStream
, ExplicitCloseClosesFile
) {
166 ASSERT_FALSE(buffered_
->closed());
167 ASSERT_FALSE(FileIsClosed(fd_
));
168 ASSERT_OK(buffered_
->Close());
169 ASSERT_TRUE(buffered_
->closed());
170 ASSERT_TRUE(FileIsClosed(fd_
));
172 ASSERT_OK(buffered_
->Close());
173 ASSERT_TRUE(buffered_
->closed());
174 ASSERT_TRUE(FileIsClosed(fd_
));
177 TEST_F(TestBufferedOutputStream
, InvalidWrites
) {
180 const char* data
= "";
181 ASSERT_RAISES(Invalid
, buffered_
->Write(data
, -1));
184 TEST_F(TestBufferedOutputStream
, TinyWrites
) {
187 const std::string datastr
= "1234568790";
188 const char* data
= datastr
.data();
190 ASSERT_OK(buffered_
->Write(data
, 2));
191 ASSERT_OK(buffered_
->Write(data
+ 2, 6));
192 ASSERT_OK(buffered_
->Close());
194 AssertFileContents(path_
, datastr
.substr(0, 8));
197 TEST_F(TestBufferedOutputStream
, SmallWrites
) {
200 // Data here should be larger than BufferedOutputStream's buffer size
201 const std::string data
= GenerateRandomData(200000);
202 const std::valarray
<int64_t> sizes
= {1, 1, 2, 3, 5, 8, 13};
204 WriteChunkwise(data
, sizes
);
205 ASSERT_OK(buffered_
->Close());
207 AssertFileContents(path_
, data
);
210 TEST_F(TestBufferedOutputStream
, MixedWrites
) {
213 const std::string data
= GenerateRandomData(300000);
214 const std::valarray
<int64_t> sizes
= {1, 1, 2, 3, 70000};
216 WriteChunkwise(data
, sizes
);
217 ASSERT_OK(buffered_
->Close());
219 AssertFileContents(path_
, data
);
222 TEST_F(TestBufferedOutputStream
, LargeWrites
) {
225 const std::string data
= GenerateRandomData(800000);
226 const std::valarray
<int64_t> sizes
= {10000, 60000, 70000};
228 WriteChunkwise(data
, sizes
);
229 ASSERT_OK(buffered_
->Close());
231 AssertFileContents(path_
, data
);
234 TEST_F(TestBufferedOutputStream
, Flush
) {
237 const std::string datastr
= "1234568790";
238 const char* data
= datastr
.data();
240 ASSERT_OK(buffered_
->Write(data
, datastr
.size()));
241 ASSERT_OK(buffered_
->Flush());
243 AssertFileContents(path_
, datastr
);
245 ASSERT_OK(buffered_
->Close());
248 TEST_F(TestBufferedOutputStream
, SetBufferSize
) {
251 ASSERT_EQ(20, buffered_
->buffer_size());
253 const std::string datastr
= "1234568790abcdefghij";
254 const char* data
= datastr
.data();
256 // Write part of the data, then shrink buffer size to make sure it gets
258 ASSERT_OK(buffered_
->Write(data
, 10));
259 ASSERT_OK(buffered_
->SetBufferSize(10));
261 ASSERT_EQ(10, buffered_
->buffer_size());
263 // Shrink buffer, write some buffered bytes, then expand buffer
264 ASSERT_OK(buffered_
->SetBufferSize(5));
265 ASSERT_OK(buffered_
->Write(data
+ 10, 3));
266 ASSERT_OK(buffered_
->SetBufferSize(10));
267 ASSERT_EQ(3, buffered_
->bytes_buffered());
269 ASSERT_OK(buffered_
->Write(data
+ 13, 7));
270 ASSERT_OK(buffered_
->Flush());
272 AssertFileContents(path_
, datastr
);
273 ASSERT_OK(buffered_
->Close());
276 TEST_F(TestBufferedOutputStream
, Tell
) {
281 WriteChunkwise(std::string(100, 'x'), {1, 1, 2, 3, 5, 8});
283 WriteChunkwise(std::string(100000, 'x'), {60000});
286 ASSERT_OK(buffered_
->Close());
288 OpenBuffered(kDefaultBufferSize
, true /* append */);
290 WriteChunkwise(std::string(90, 'x'), {1, 1, 2, 3, 5, 8});
293 ASSERT_OK(buffered_
->Close());
299 TEST_F(TestBufferedOutputStream
, TruncatesFile
) {
302 const std::string datastr
= "1234568790";
303 ASSERT_OK(buffered_
->Write(datastr
.data(), datastr
.size()));
304 ASSERT_OK(buffered_
->Close());
306 AssertFileContents(path_
, datastr
);
309 AssertFileContents(path_
, "");
312 // ----------------------------------------------------------------------
313 // BufferedInputStream tests
315 const char kExample1
[] = "informaticacrobaticsimmolation";
317 class TestBufferedInputStream
: public FileTestFixture
<BufferedInputStream
> {
320 FileTestFixture
<BufferedInputStream
>::SetUp();
321 local_pool_
= MemoryPool::CreateDefault();
324 void MakeExample1(int64_t buffer_size
, MemoryPool
* pool
= default_memory_pool()) {
325 test_data_
= kExample1
;
327 ASSERT_OK_AND_ASSIGN(auto file_out
, FileOutputStream::Open(path_
));
328 ASSERT_OK(file_out
->Write(test_data_
));
329 ASSERT_OK(file_out
->Close());
331 ASSERT_OK_AND_ASSIGN(auto file_in
, ReadableFile::Open(path_
));
333 ASSERT_OK_AND_ASSIGN(buffered_
, BufferedInputStream::Create(buffer_size
, pool
, raw_
));
337 std::unique_ptr
<MemoryPool
> local_pool_
;
338 std::string test_data_
;
339 std::shared_ptr
<InputStream
> raw_
;
342 TEST_F(TestBufferedInputStream
, InvalidReads
) {
343 const int64_t kBufferSize
= 10;
344 MakeExample1(kBufferSize
);
345 ASSERT_EQ(kBufferSize
, buffered_
->buffer_size());
346 std::vector
<char> buf(test_data_
.size());
347 ASSERT_RAISES(Invalid
, buffered_
->Read(-1, buf
.data()));
350 TEST_F(TestBufferedInputStream
, BasicOperation
) {
351 const int64_t kBufferSize
= 10;
352 MakeExample1(kBufferSize
);
353 ASSERT_EQ(kBufferSize
, buffered_
->buffer_size());
355 ASSERT_OK_AND_EQ(0, buffered_
->Tell());
357 // Nothing in the buffer
358 ASSERT_EQ(0, buffered_
->bytes_buffered());
360 std::vector
<char> buf(test_data_
.size());
361 ASSERT_OK_AND_EQ(0, buffered_
->Read(0, buf
.data()));
362 ASSERT_OK_AND_EQ(4, buffered_
->Read(4, buf
.data()));
363 ASSERT_EQ(0, memcmp(buf
.data(), test_data_
.data(), 4));
365 // 6 bytes remaining in buffer
366 ASSERT_EQ(6, buffered_
->bytes_buffered());
368 // This make sure Peek() works well when buffered bytes are not enough
369 ASSERT_OK_AND_ASSIGN(auto peek
, buffered_
->Peek(8));
370 ASSERT_EQ(8, peek
.size());
371 ASSERT_EQ('r', peek
.data()[0]);
372 ASSERT_EQ('m', peek
.data()[1]);
373 ASSERT_EQ('a', peek
.data()[2]);
374 ASSERT_EQ('t', peek
.data()[3]);
375 ASSERT_EQ('i', peek
.data()[4]);
376 ASSERT_EQ('c', peek
.data()[5]);
377 ASSERT_EQ('a', peek
.data()[6]);
378 ASSERT_EQ('c', peek
.data()[7]);
380 // Buffered position is 4
381 ASSERT_OK_AND_EQ(4, buffered_
->Tell());
383 // Raw position actually 12
384 ASSERT_OK_AND_EQ(12, raw_
->Tell());
386 // Reading to end of buffered bytes does not cause any more data to be
388 ASSERT_OK_AND_EQ(8, buffered_
->Read(8, buf
.data()));
389 ASSERT_EQ(0, memcmp(buf
.data(), test_data_
.data() + 4, 8));
391 ASSERT_EQ(0, buffered_
->bytes_buffered());
393 // Read to EOF, exceeding buffer size
394 ASSERT_OK_AND_EQ(18, buffered_
->Read(18, buf
.data()));
395 ASSERT_EQ(0, memcmp(buf
.data(), test_data_
.data() + 12, 18));
396 ASSERT_EQ(0, buffered_
->bytes_buffered());
399 ASSERT_OK_AND_EQ(0, buffered_
->Read(1, buf
.data()));
400 ASSERT_OK_AND_EQ(test_data_
.size(), buffered_
->Tell());
403 ASSERT_OK_AND_ASSIGN(peek
, buffered_
->Peek(10));
404 ASSERT_EQ(0, peek
.size());
406 // Calling Close closes raw_
407 ASSERT_OK(buffered_
->Close());
408 ASSERT_TRUE(buffered_
->raw()->closed());
411 TEST_F(TestBufferedInputStream
, Detach
) {
413 auto raw
= buffered_
->Detach();
414 ASSERT_OK(buffered_
->Close());
415 ASSERT_FALSE(raw
->closed());
418 TEST_F(TestBufferedInputStream
, ReadBuffer
) {
419 const int64_t kBufferSize
= 10;
420 MakeExample1(kBufferSize
);
422 std::shared_ptr
<Buffer
> buf
;
424 // Read exceeding buffer size
425 ASSERT_OK_AND_ASSIGN(buf
, buffered_
->Read(15));
426 ASSERT_EQ(0, memcmp(buf
->data(), test_data_
.data(), 15));
427 ASSERT_EQ(0, buffered_
->bytes_buffered());
430 ASSERT_OK_AND_ASSIGN(buf
, buffered_
->Read(6));
431 ASSERT_EQ(6, buf
->size());
432 ASSERT_EQ(0, memcmp(buf
->data(), test_data_
.data() + 15, 6));
433 ASSERT_EQ(4, buffered_
->bytes_buffered());
435 ASSERT_OK_AND_ASSIGN(buf
, buffered_
->Read(4));
436 ASSERT_EQ(4, buf
->size());
437 ASSERT_EQ(0, memcmp(buf
->data(), test_data_
.data() + 21, 4));
438 ASSERT_EQ(0, buffered_
->bytes_buffered());
441 TEST_F(TestBufferedInputStream
, SetBufferSize
) {
444 std::shared_ptr
<Buffer
> buf
;
445 ASSERT_OK_AND_ASSIGN(buf
, buffered_
->Read(5));
446 ASSERT_EQ(5, buf
->size());
448 // Increase buffer size
449 ASSERT_OK(buffered_
->SetBufferSize(10));
450 ASSERT_EQ(10, buffered_
->buffer_size());
451 ASSERT_OK_AND_ASSIGN(buf
, buffered_
->Read(6));
452 ASSERT_EQ(4, buffered_
->bytes_buffered());
454 // Consume until 5 byte left
455 ASSERT_OK(buffered_
->Read(15));
457 // Read at EOF so there will be only 5 bytes in the buffer
458 ASSERT_OK(buffered_
->Read(2));
460 // Cannot shrink buffer if it would destroy data
461 ASSERT_RAISES(Invalid
, buffered_
->SetBufferSize(4));
463 // Shrinking to exactly number of buffered bytes is ok
464 ASSERT_OK(buffered_
->SetBufferSize(5));
467 class TestBufferedInputStreamBound
: public ::testing::Test
{
469 void SetUp() { CreateExample(/*bounded=*/true); }
471 void CreateExample(bool bounded
= true) {
472 // Create a buffer larger than source size, to check that the
473 // stream end is respected
474 ASSERT_OK_AND_ASSIGN(auto buf
, AllocateResizableBuffer(source_size_
+ 10));
475 ASSERT_LT(source_size_
, buf
->size());
476 for (int i
= 0; i
< source_size_
; i
++) {
477 buf
->mutable_data()[i
] = static_cast<uint8_t>(i
);
479 source_
= std::make_shared
<BufferReader
>(std::move(buf
));
480 ASSERT_OK(source_
->Advance(stream_offset_
));
481 ASSERT_OK_AND_ASSIGN(
482 stream_
, BufferedInputStream::Create(chunk_size_
, default_memory_pool(), source_
,
483 bounded
? stream_size_
: -1));
487 int64_t source_size_
= 256;
488 int64_t stream_offset_
= 10;
489 int64_t stream_size_
= source_size_
- stream_offset_
;
490 int64_t chunk_size_
= 50;
491 std::shared_ptr
<InputStream
> source_
;
492 std::shared_ptr
<BufferedInputStream
> stream_
;
495 TEST_F(TestBufferedInputStreamBound
, Basics
) {
496 std::shared_ptr
<Buffer
> buffer
;
497 util::string_view view
;
499 // source is at offset 10
500 ASSERT_OK_AND_ASSIGN(view
, stream_
->Peek(10));
501 ASSERT_EQ(10, view
.size());
502 for (int i
= 0; i
< 10; i
++) {
503 ASSERT_EQ(10 + i
, view
[i
]) << i
;
506 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(10));
507 ASSERT_EQ(10, buffer
->size());
508 for (int i
= 0; i
< 10; i
++) {
509 ASSERT_EQ(10 + i
, (*buffer
)[i
]) << i
;
512 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(10));
513 ASSERT_EQ(10, buffer
->size());
514 for (int i
= 0; i
< 10; i
++) {
515 ASSERT_EQ(20 + i
, (*buffer
)[i
]) << i
;
517 ASSERT_OK(stream_
->Advance(5));
518 ASSERT_OK(stream_
->Advance(5));
520 // source is at offset 40
521 // read across buffer boundary. buffer size is 50
522 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(20));
523 ASSERT_EQ(20, buffer
->size());
524 for (int i
= 0; i
< 20; i
++) {
525 ASSERT_EQ(40 + i
, (*buffer
)[i
]) << i
;
528 // read more than original chunk size
529 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(60));
530 ASSERT_EQ(60, buffer
->size());
531 for (int i
= 0; i
< 60; i
++) {
532 ASSERT_EQ(60 + i
, (*buffer
)[i
]) << i
;
535 ASSERT_OK(stream_
->Advance(120));
537 // source is at offset 240
538 // read outside of source boundary. source size is 256
539 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(30));
541 ASSERT_EQ(16, buffer
->size());
542 for (int i
= 0; i
< 16; i
++) {
543 ASSERT_EQ(240 + i
, (*buffer
)[i
]) << i
;
546 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(1));
547 ASSERT_EQ(0, buffer
->size());
550 TEST_F(TestBufferedInputStreamBound
, LargeFirstPeek
) {
551 // Test a first peek larger than chunk size
552 std::shared_ptr
<Buffer
> buffer
;
553 util::string_view view
;
555 ASSERT_GT(n
, chunk_size_
);
557 // source is at offset 10
558 ASSERT_OK_AND_ASSIGN(view
, stream_
->Peek(n
));
559 ASSERT_EQ(n
, static_cast<int>(view
.size()));
560 for (int i
= 0; i
< n
; i
++) {
561 ASSERT_EQ(10 + i
, view
[i
]) << i
;
564 ASSERT_OK_AND_ASSIGN(view
, stream_
->Peek(n
));
565 ASSERT_EQ(n
, static_cast<int>(view
.size()));
566 for (int i
= 0; i
< n
; i
++) {
567 ASSERT_EQ(10 + i
, view
[i
]) << i
;
570 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(n
));
571 ASSERT_EQ(n
, buffer
->size());
572 for (int i
= 0; i
< n
; i
++) {
573 ASSERT_EQ(10 + i
, (*buffer
)[i
]) << i
;
575 // source is at offset 10 + n
576 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(20));
577 ASSERT_EQ(20, buffer
->size());
578 for (int i
= 0; i
< 20; i
++) {
579 ASSERT_EQ(10 + n
+ i
, (*buffer
)[i
]) << i
;
583 TEST_F(TestBufferedInputStreamBound
, UnboundedPeek
) {
584 CreateExample(/*bounded=*/false);
586 util::string_view view
;
587 ASSERT_OK_AND_ASSIGN(view
, stream_
->Peek(10));
588 ASSERT_EQ(10, view
.size());
589 ASSERT_EQ(50, stream_
->bytes_buffered());
591 ASSERT_OK(stream_
->Read(10));
593 // Peek into buffered bytes
594 ASSERT_OK_AND_ASSIGN(view
, stream_
->Peek(40));
595 ASSERT_EQ(40, view
.size());
596 ASSERT_EQ(40, stream_
->bytes_buffered());
597 ASSERT_EQ(50, stream_
->buffer_size());
599 // Peek past buffered bytes
600 ASSERT_OK_AND_ASSIGN(view
, stream_
->Peek(41));
601 ASSERT_EQ(41, view
.size());
602 ASSERT_EQ(41, stream_
->bytes_buffered());
603 ASSERT_EQ(51, stream_
->buffer_size());
605 // Peek to the end of the buffer
606 ASSERT_OK_AND_ASSIGN(view
, stream_
->Peek(246));
607 ASSERT_EQ(246, view
.size());
608 ASSERT_EQ(246, stream_
->bytes_buffered());
609 ASSERT_EQ(256, stream_
->buffer_size());
611 // Larger peek returns the same, expands the buffer, but there is no
612 // more data to buffer
613 ASSERT_OK_AND_ASSIGN(view
, stream_
->Peek(300));
614 ASSERT_EQ(246, view
.size());
615 ASSERT_EQ(246, stream_
->bytes_buffered());
616 ASSERT_EQ(310, stream_
->buffer_size());
619 TEST_F(TestBufferedInputStreamBound
, OneByteReads
) {
620 for (int i
= 0; i
< stream_size_
; ++i
) {
621 ASSERT_OK_AND_ASSIGN(auto buffer
, stream_
->Read(1));
622 ASSERT_EQ(1, buffer
->size());
623 ASSERT_EQ(10 + i
, (*buffer
)[0]) << i
;
626 ASSERT_OK_AND_ASSIGN(auto buffer
, stream_
->Read(1));
627 ASSERT_EQ(0, buffer
->size());
630 TEST_F(TestBufferedInputStreamBound
, BufferExactlyExhausted
) {
631 // Test exhausting the buffer exactly then issuing further reads (PARQUET-1571).
632 std::shared_ptr
<Buffer
> buffer
;
634 // source is at offset 10
636 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(n
));
637 ASSERT_EQ(n
, buffer
->size());
638 for (int i
= 0; i
< n
; i
++) {
639 ASSERT_EQ(10 + i
, (*buffer
)[i
]) << i
;
641 // source is at offset 20
642 // Exhaust buffer exactly
643 n
= stream_
->bytes_buffered();
644 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(n
));
645 ASSERT_EQ(n
, buffer
->size());
646 for (int i
= 0; i
< n
; i
++) {
647 ASSERT_EQ(20 + i
, (*buffer
)[i
]) << i
;
650 // source is at offset 20 + n
652 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(10));
653 ASSERT_EQ(10, buffer
->size());
654 for (int i
= 0; i
< 10; i
++) {
655 ASSERT_EQ(20 + n
+ i
, (*buffer
)[i
]) << i
;
658 // source is at offset 30 + n
659 ASSERT_OK_AND_ASSIGN(buffer
, stream_
->Read(10));
660 ASSERT_EQ(10, buffer
->size());
661 for (int i
= 0; i
< 10; i
++) {
662 ASSERT_EQ(30 + n
+ i
, (*buffer
)[i
]) << i
;