]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/io/buffered_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / buffered_test.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #ifndef _WIN32
19 #include <fcntl.h> // IWYU pragma: keep
20 #include <unistd.h>
21 #endif
22
23 #include <algorithm>
24 #include <cstdint>
25 #include <cstdio>
26 #include <functional>
27 #include <iterator>
28 #include <memory>
29 #include <random>
30 #include <string>
31 #include <utility>
32 #include <valarray>
33 #include <vector>
34
35 #include <gtest/gtest.h>
36
37 #include "arrow/io/buffered.h"
38 #include "arrow/io/file.h"
39 #include "arrow/io/interfaces.h"
40 #include "arrow/io/memory.h"
41 #include "arrow/io/test_common.h"
42 #include "arrow/memory_pool.h"
43 #include "arrow/status.h"
44 #include "arrow/testing/gtest_util.h"
45 #include "arrow/util/io_util.h"
46 #include "arrow/util/string_view.h"
47
48 namespace arrow {
49 namespace io {
50
51 using ::arrow::internal::TemporaryDir;
52
53 static std::string GenerateRandomData(size_t nbytes) {
54 // MSVC doesn't accept uint8_t for std::independent_bits_engine<>
55 typedef unsigned long UInt; // NOLINT
56 std::independent_bits_engine<std::default_random_engine, 8 * sizeof(UInt), UInt> engine;
57
58 std::vector<UInt> data(nbytes / sizeof(UInt) + 1);
59 std::generate(begin(data), end(data), std::ref(engine));
60 return std::string(reinterpret_cast<char*>(data.data()), nbytes);
61 }
62
63 template <typename FileType>
64 class FileTestFixture : public ::testing::Test {
65 public:
66 void SetUp() {
67 ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("buffered-test-"));
68 path_ = temp_dir_->path()
69 .Join("arrow-test-io-buffered-stream.txt")
70 .ValueOrDie()
71 .ToString();
72 EnsureFileDeleted();
73 }
74
75 void TearDown() { EnsureFileDeleted(); }
76
77 void EnsureFileDeleted() {
78 if (FileExists(path_)) {
79 ARROW_UNUSED(std::remove(path_.c_str()));
80 }
81 }
82
83 void AssertTell(int64_t expected) { ASSERT_OK_AND_EQ(expected, buffered_->Tell()); }
84
85 protected:
86 int fd_;
87 std::shared_ptr<FileType> buffered_;
88 std::string path_;
89 std::unique_ptr<TemporaryDir> temp_dir_;
90 };
91
92 // ----------------------------------------------------------------------
93 // Buffered output tests
94
95 constexpr int64_t kDefaultBufferSize = 4096;
96
97 class TestBufferedOutputStream : public FileTestFixture<BufferedOutputStream> {
98 public:
99 void OpenBuffered(int64_t buffer_size = kDefaultBufferSize, bool append = false) {
100 // So that any open file is closed
101 buffered_.reset();
102
103 ASSERT_OK_AND_ASSIGN(auto file, FileOutputStream::Open(path_, append));
104 fd_ = file->file_descriptor();
105 if (append) {
106 // Workaround for ARROW-2466 ("append" flag doesn't set file pos)
107 #if defined(_MSC_VER)
108 _lseeki64(fd_, 0, SEEK_END);
109 #else
110 lseek(fd_, 0, SEEK_END);
111 #endif
112 }
113 ASSERT_OK_AND_ASSIGN(buffered_, BufferedOutputStream::Create(
114 buffer_size, default_memory_pool(), file));
115 }
116
117 void WriteChunkwise(const std::string& datastr, const std::valarray<int64_t>& sizes) {
118 const char* data = datastr.data();
119 const int64_t data_size = static_cast<int64_t>(datastr.size());
120 int64_t data_pos = 0;
121 auto size_it = std::begin(sizes);
122
123 // Write datastr, chunk by chunk, until exhausted
124 while (true) {
125 int64_t size = *size_it++;
126 if (size_it == std::end(sizes)) {
127 size_it = std::begin(sizes);
128 }
129 if (data_pos + size > data_size) {
130 break;
131 }
132 ASSERT_OK(buffered_->Write(data + data_pos, size));
133 data_pos += size;
134 }
135 ASSERT_OK(buffered_->Write(data + data_pos, data_size - data_pos));
136 }
137 };
138
139 TEST_F(TestBufferedOutputStream, DestructorClosesFile) {
140 OpenBuffered();
141 ASSERT_FALSE(FileIsClosed(fd_));
142 buffered_.reset();
143 ASSERT_TRUE(FileIsClosed(fd_));
144 }
145
146 TEST_F(TestBufferedOutputStream, Detach) {
147 OpenBuffered();
148 const std::string datastr = "1234568790";
149
150 ASSERT_OK(buffered_->Write(datastr.data(), 10));
151
152 ASSERT_OK_AND_ASSIGN(auto detached_stream, buffered_->Detach());
153
154 // Destroying the stream does not close the file because we have detached
155 buffered_.reset();
156 ASSERT_FALSE(FileIsClosed(fd_));
157
158 ASSERT_OK(detached_stream->Close());
159 ASSERT_TRUE(FileIsClosed(fd_));
160
161 AssertFileContents(path_, datastr);
162 }
163
164 TEST_F(TestBufferedOutputStream, ExplicitCloseClosesFile) {
165 OpenBuffered();
166 ASSERT_FALSE(buffered_->closed());
167 ASSERT_FALSE(FileIsClosed(fd_));
168 ASSERT_OK(buffered_->Close());
169 ASSERT_TRUE(buffered_->closed());
170 ASSERT_TRUE(FileIsClosed(fd_));
171 // Idempotency
172 ASSERT_OK(buffered_->Close());
173 ASSERT_TRUE(buffered_->closed());
174 ASSERT_TRUE(FileIsClosed(fd_));
175 }
176
177 TEST_F(TestBufferedOutputStream, InvalidWrites) {
178 OpenBuffered();
179
180 const char* data = "";
181 ASSERT_RAISES(Invalid, buffered_->Write(data, -1));
182 }
183
184 TEST_F(TestBufferedOutputStream, TinyWrites) {
185 OpenBuffered();
186
187 const std::string datastr = "1234568790";
188 const char* data = datastr.data();
189
190 ASSERT_OK(buffered_->Write(data, 2));
191 ASSERT_OK(buffered_->Write(data + 2, 6));
192 ASSERT_OK(buffered_->Close());
193
194 AssertFileContents(path_, datastr.substr(0, 8));
195 }
196
197 TEST_F(TestBufferedOutputStream, SmallWrites) {
198 OpenBuffered();
199
200 // Data here should be larger than BufferedOutputStream's buffer size
201 const std::string data = GenerateRandomData(200000);
202 const std::valarray<int64_t> sizes = {1, 1, 2, 3, 5, 8, 13};
203
204 WriteChunkwise(data, sizes);
205 ASSERT_OK(buffered_->Close());
206
207 AssertFileContents(path_, data);
208 }
209
210 TEST_F(TestBufferedOutputStream, MixedWrites) {
211 OpenBuffered();
212
213 const std::string data = GenerateRandomData(300000);
214 const std::valarray<int64_t> sizes = {1, 1, 2, 3, 70000};
215
216 WriteChunkwise(data, sizes);
217 ASSERT_OK(buffered_->Close());
218
219 AssertFileContents(path_, data);
220 }
221
222 TEST_F(TestBufferedOutputStream, LargeWrites) {
223 OpenBuffered();
224
225 const std::string data = GenerateRandomData(800000);
226 const std::valarray<int64_t> sizes = {10000, 60000, 70000};
227
228 WriteChunkwise(data, sizes);
229 ASSERT_OK(buffered_->Close());
230
231 AssertFileContents(path_, data);
232 }
233
234 TEST_F(TestBufferedOutputStream, Flush) {
235 OpenBuffered();
236
237 const std::string datastr = "1234568790";
238 const char* data = datastr.data();
239
240 ASSERT_OK(buffered_->Write(data, datastr.size()));
241 ASSERT_OK(buffered_->Flush());
242
243 AssertFileContents(path_, datastr);
244
245 ASSERT_OK(buffered_->Close());
246 }
247
248 TEST_F(TestBufferedOutputStream, SetBufferSize) {
249 OpenBuffered(20);
250
251 ASSERT_EQ(20, buffered_->buffer_size());
252
253 const std::string datastr = "1234568790abcdefghij";
254 const char* data = datastr.data();
255
256 // Write part of the data, then shrink buffer size to make sure it gets
257 // flushed
258 ASSERT_OK(buffered_->Write(data, 10));
259 ASSERT_OK(buffered_->SetBufferSize(10));
260
261 ASSERT_EQ(10, buffered_->buffer_size());
262
263 // Shrink buffer, write some buffered bytes, then expand buffer
264 ASSERT_OK(buffered_->SetBufferSize(5));
265 ASSERT_OK(buffered_->Write(data + 10, 3));
266 ASSERT_OK(buffered_->SetBufferSize(10));
267 ASSERT_EQ(3, buffered_->bytes_buffered());
268
269 ASSERT_OK(buffered_->Write(data + 13, 7));
270 ASSERT_OK(buffered_->Flush());
271
272 AssertFileContents(path_, datastr);
273 ASSERT_OK(buffered_->Close());
274 }
275
276 TEST_F(TestBufferedOutputStream, Tell) {
277 OpenBuffered();
278
279 AssertTell(0);
280 AssertTell(0);
281 WriteChunkwise(std::string(100, 'x'), {1, 1, 2, 3, 5, 8});
282 AssertTell(100);
283 WriteChunkwise(std::string(100000, 'x'), {60000});
284 AssertTell(100100);
285
286 ASSERT_OK(buffered_->Close());
287
288 OpenBuffered(kDefaultBufferSize, true /* append */);
289 AssertTell(100100);
290 WriteChunkwise(std::string(90, 'x'), {1, 1, 2, 3, 5, 8});
291 AssertTell(100190);
292
293 ASSERT_OK(buffered_->Close());
294
295 OpenBuffered();
296 AssertTell(0);
297 }
298
299 TEST_F(TestBufferedOutputStream, TruncatesFile) {
300 OpenBuffered();
301
302 const std::string datastr = "1234568790";
303 ASSERT_OK(buffered_->Write(datastr.data(), datastr.size()));
304 ASSERT_OK(buffered_->Close());
305
306 AssertFileContents(path_, datastr);
307
308 OpenBuffered();
309 AssertFileContents(path_, "");
310 }
311
312 // ----------------------------------------------------------------------
313 // BufferedInputStream tests
314
315 const char kExample1[] = "informaticacrobaticsimmolation";
316
317 class TestBufferedInputStream : public FileTestFixture<BufferedInputStream> {
318 public:
319 void SetUp() {
320 FileTestFixture<BufferedInputStream>::SetUp();
321 local_pool_ = MemoryPool::CreateDefault();
322 }
323
324 void MakeExample1(int64_t buffer_size, MemoryPool* pool = default_memory_pool()) {
325 test_data_ = kExample1;
326
327 ASSERT_OK_AND_ASSIGN(auto file_out, FileOutputStream::Open(path_));
328 ASSERT_OK(file_out->Write(test_data_));
329 ASSERT_OK(file_out->Close());
330
331 ASSERT_OK_AND_ASSIGN(auto file_in, ReadableFile::Open(path_));
332 raw_ = file_in;
333 ASSERT_OK_AND_ASSIGN(buffered_, BufferedInputStream::Create(buffer_size, pool, raw_));
334 }
335
336 protected:
337 std::unique_ptr<MemoryPool> local_pool_;
338 std::string test_data_;
339 std::shared_ptr<InputStream> raw_;
340 };
341
342 TEST_F(TestBufferedInputStream, InvalidReads) {
343 const int64_t kBufferSize = 10;
344 MakeExample1(kBufferSize);
345 ASSERT_EQ(kBufferSize, buffered_->buffer_size());
346 std::vector<char> buf(test_data_.size());
347 ASSERT_RAISES(Invalid, buffered_->Read(-1, buf.data()));
348 }
349
350 TEST_F(TestBufferedInputStream, BasicOperation) {
351 const int64_t kBufferSize = 10;
352 MakeExample1(kBufferSize);
353 ASSERT_EQ(kBufferSize, buffered_->buffer_size());
354
355 ASSERT_OK_AND_EQ(0, buffered_->Tell());
356
357 // Nothing in the buffer
358 ASSERT_EQ(0, buffered_->bytes_buffered());
359
360 std::vector<char> buf(test_data_.size());
361 ASSERT_OK_AND_EQ(0, buffered_->Read(0, buf.data()));
362 ASSERT_OK_AND_EQ(4, buffered_->Read(4, buf.data()));
363 ASSERT_EQ(0, memcmp(buf.data(), test_data_.data(), 4));
364
365 // 6 bytes remaining in buffer
366 ASSERT_EQ(6, buffered_->bytes_buffered());
367
368 // This make sure Peek() works well when buffered bytes are not enough
369 ASSERT_OK_AND_ASSIGN(auto peek, buffered_->Peek(8));
370 ASSERT_EQ(8, peek.size());
371 ASSERT_EQ('r', peek.data()[0]);
372 ASSERT_EQ('m', peek.data()[1]);
373 ASSERT_EQ('a', peek.data()[2]);
374 ASSERT_EQ('t', peek.data()[3]);
375 ASSERT_EQ('i', peek.data()[4]);
376 ASSERT_EQ('c', peek.data()[5]);
377 ASSERT_EQ('a', peek.data()[6]);
378 ASSERT_EQ('c', peek.data()[7]);
379
380 // Buffered position is 4
381 ASSERT_OK_AND_EQ(4, buffered_->Tell());
382
383 // Raw position actually 12
384 ASSERT_OK_AND_EQ(12, raw_->Tell());
385
386 // Reading to end of buffered bytes does not cause any more data to be
387 // buffered
388 ASSERT_OK_AND_EQ(8, buffered_->Read(8, buf.data()));
389 ASSERT_EQ(0, memcmp(buf.data(), test_data_.data() + 4, 8));
390
391 ASSERT_EQ(0, buffered_->bytes_buffered());
392
393 // Read to EOF, exceeding buffer size
394 ASSERT_OK_AND_EQ(18, buffered_->Read(18, buf.data()));
395 ASSERT_EQ(0, memcmp(buf.data(), test_data_.data() + 12, 18));
396 ASSERT_EQ(0, buffered_->bytes_buffered());
397
398 // Read to EOF
399 ASSERT_OK_AND_EQ(0, buffered_->Read(1, buf.data()));
400 ASSERT_OK_AND_EQ(test_data_.size(), buffered_->Tell());
401
402 // Peek at EOF
403 ASSERT_OK_AND_ASSIGN(peek, buffered_->Peek(10));
404 ASSERT_EQ(0, peek.size());
405
406 // Calling Close closes raw_
407 ASSERT_OK(buffered_->Close());
408 ASSERT_TRUE(buffered_->raw()->closed());
409 }
410
411 TEST_F(TestBufferedInputStream, Detach) {
412 MakeExample1(10);
413 auto raw = buffered_->Detach();
414 ASSERT_OK(buffered_->Close());
415 ASSERT_FALSE(raw->closed());
416 }
417
418 TEST_F(TestBufferedInputStream, ReadBuffer) {
419 const int64_t kBufferSize = 10;
420 MakeExample1(kBufferSize);
421
422 std::shared_ptr<Buffer> buf;
423
424 // Read exceeding buffer size
425 ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(15));
426 ASSERT_EQ(0, memcmp(buf->data(), test_data_.data(), 15));
427 ASSERT_EQ(0, buffered_->bytes_buffered());
428
429 // Buffered reads
430 ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(6));
431 ASSERT_EQ(6, buf->size());
432 ASSERT_EQ(0, memcmp(buf->data(), test_data_.data() + 15, 6));
433 ASSERT_EQ(4, buffered_->bytes_buffered());
434
435 ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(4));
436 ASSERT_EQ(4, buf->size());
437 ASSERT_EQ(0, memcmp(buf->data(), test_data_.data() + 21, 4));
438 ASSERT_EQ(0, buffered_->bytes_buffered());
439 }
440
441 TEST_F(TestBufferedInputStream, SetBufferSize) {
442 MakeExample1(5);
443
444 std::shared_ptr<Buffer> buf;
445 ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(5));
446 ASSERT_EQ(5, buf->size());
447
448 // Increase buffer size
449 ASSERT_OK(buffered_->SetBufferSize(10));
450 ASSERT_EQ(10, buffered_->buffer_size());
451 ASSERT_OK_AND_ASSIGN(buf, buffered_->Read(6));
452 ASSERT_EQ(4, buffered_->bytes_buffered());
453
454 // Consume until 5 byte left
455 ASSERT_OK(buffered_->Read(15));
456
457 // Read at EOF so there will be only 5 bytes in the buffer
458 ASSERT_OK(buffered_->Read(2));
459
460 // Cannot shrink buffer if it would destroy data
461 ASSERT_RAISES(Invalid, buffered_->SetBufferSize(4));
462
463 // Shrinking to exactly number of buffered bytes is ok
464 ASSERT_OK(buffered_->SetBufferSize(5));
465 }
466
467 class TestBufferedInputStreamBound : public ::testing::Test {
468 public:
469 void SetUp() { CreateExample(/*bounded=*/true); }
470
471 void CreateExample(bool bounded = true) {
472 // Create a buffer larger than source size, to check that the
473 // stream end is respected
474 ASSERT_OK_AND_ASSIGN(auto buf, AllocateResizableBuffer(source_size_ + 10));
475 ASSERT_LT(source_size_, buf->size());
476 for (int i = 0; i < source_size_; i++) {
477 buf->mutable_data()[i] = static_cast<uint8_t>(i);
478 }
479 source_ = std::make_shared<BufferReader>(std::move(buf));
480 ASSERT_OK(source_->Advance(stream_offset_));
481 ASSERT_OK_AND_ASSIGN(
482 stream_, BufferedInputStream::Create(chunk_size_, default_memory_pool(), source_,
483 bounded ? stream_size_ : -1));
484 }
485
486 protected:
487 int64_t source_size_ = 256;
488 int64_t stream_offset_ = 10;
489 int64_t stream_size_ = source_size_ - stream_offset_;
490 int64_t chunk_size_ = 50;
491 std::shared_ptr<InputStream> source_;
492 std::shared_ptr<BufferedInputStream> stream_;
493 };
494
495 TEST_F(TestBufferedInputStreamBound, Basics) {
496 std::shared_ptr<Buffer> buffer;
497 util::string_view view;
498
499 // source is at offset 10
500 ASSERT_OK_AND_ASSIGN(view, stream_->Peek(10));
501 ASSERT_EQ(10, view.size());
502 for (int i = 0; i < 10; i++) {
503 ASSERT_EQ(10 + i, view[i]) << i;
504 }
505
506 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(10));
507 ASSERT_EQ(10, buffer->size());
508 for (int i = 0; i < 10; i++) {
509 ASSERT_EQ(10 + i, (*buffer)[i]) << i;
510 }
511
512 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(10));
513 ASSERT_EQ(10, buffer->size());
514 for (int i = 0; i < 10; i++) {
515 ASSERT_EQ(20 + i, (*buffer)[i]) << i;
516 }
517 ASSERT_OK(stream_->Advance(5));
518 ASSERT_OK(stream_->Advance(5));
519
520 // source is at offset 40
521 // read across buffer boundary. buffer size is 50
522 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(20));
523 ASSERT_EQ(20, buffer->size());
524 for (int i = 0; i < 20; i++) {
525 ASSERT_EQ(40 + i, (*buffer)[i]) << i;
526 }
527
528 // read more than original chunk size
529 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(60));
530 ASSERT_EQ(60, buffer->size());
531 for (int i = 0; i < 60; i++) {
532 ASSERT_EQ(60 + i, (*buffer)[i]) << i;
533 }
534
535 ASSERT_OK(stream_->Advance(120));
536
537 // source is at offset 240
538 // read outside of source boundary. source size is 256
539 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(30));
540
541 ASSERT_EQ(16, buffer->size());
542 for (int i = 0; i < 16; i++) {
543 ASSERT_EQ(240 + i, (*buffer)[i]) << i;
544 }
545 // Stream exhausted
546 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(1));
547 ASSERT_EQ(0, buffer->size());
548 }
549
550 TEST_F(TestBufferedInputStreamBound, LargeFirstPeek) {
551 // Test a first peek larger than chunk size
552 std::shared_ptr<Buffer> buffer;
553 util::string_view view;
554 int64_t n = 70;
555 ASSERT_GT(n, chunk_size_);
556
557 // source is at offset 10
558 ASSERT_OK_AND_ASSIGN(view, stream_->Peek(n));
559 ASSERT_EQ(n, static_cast<int>(view.size()));
560 for (int i = 0; i < n; i++) {
561 ASSERT_EQ(10 + i, view[i]) << i;
562 }
563
564 ASSERT_OK_AND_ASSIGN(view, stream_->Peek(n));
565 ASSERT_EQ(n, static_cast<int>(view.size()));
566 for (int i = 0; i < n; i++) {
567 ASSERT_EQ(10 + i, view[i]) << i;
568 }
569
570 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(n));
571 ASSERT_EQ(n, buffer->size());
572 for (int i = 0; i < n; i++) {
573 ASSERT_EQ(10 + i, (*buffer)[i]) << i;
574 }
575 // source is at offset 10 + n
576 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(20));
577 ASSERT_EQ(20, buffer->size());
578 for (int i = 0; i < 20; i++) {
579 ASSERT_EQ(10 + n + i, (*buffer)[i]) << i;
580 }
581 }
582
583 TEST_F(TestBufferedInputStreamBound, UnboundedPeek) {
584 CreateExample(/*bounded=*/false);
585
586 util::string_view view;
587 ASSERT_OK_AND_ASSIGN(view, stream_->Peek(10));
588 ASSERT_EQ(10, view.size());
589 ASSERT_EQ(50, stream_->bytes_buffered());
590
591 ASSERT_OK(stream_->Read(10));
592
593 // Peek into buffered bytes
594 ASSERT_OK_AND_ASSIGN(view, stream_->Peek(40));
595 ASSERT_EQ(40, view.size());
596 ASSERT_EQ(40, stream_->bytes_buffered());
597 ASSERT_EQ(50, stream_->buffer_size());
598
599 // Peek past buffered bytes
600 ASSERT_OK_AND_ASSIGN(view, stream_->Peek(41));
601 ASSERT_EQ(41, view.size());
602 ASSERT_EQ(41, stream_->bytes_buffered());
603 ASSERT_EQ(51, stream_->buffer_size());
604
605 // Peek to the end of the buffer
606 ASSERT_OK_AND_ASSIGN(view, stream_->Peek(246));
607 ASSERT_EQ(246, view.size());
608 ASSERT_EQ(246, stream_->bytes_buffered());
609 ASSERT_EQ(256, stream_->buffer_size());
610
611 // Larger peek returns the same, expands the buffer, but there is no
612 // more data to buffer
613 ASSERT_OK_AND_ASSIGN(view, stream_->Peek(300));
614 ASSERT_EQ(246, view.size());
615 ASSERT_EQ(246, stream_->bytes_buffered());
616 ASSERT_EQ(310, stream_->buffer_size());
617 }
618
619 TEST_F(TestBufferedInputStreamBound, OneByteReads) {
620 for (int i = 0; i < stream_size_; ++i) {
621 ASSERT_OK_AND_ASSIGN(auto buffer, stream_->Read(1));
622 ASSERT_EQ(1, buffer->size());
623 ASSERT_EQ(10 + i, (*buffer)[0]) << i;
624 }
625 // Stream exhausted
626 ASSERT_OK_AND_ASSIGN(auto buffer, stream_->Read(1));
627 ASSERT_EQ(0, buffer->size());
628 }
629
630 TEST_F(TestBufferedInputStreamBound, BufferExactlyExhausted) {
631 // Test exhausting the buffer exactly then issuing further reads (PARQUET-1571).
632 std::shared_ptr<Buffer> buffer;
633
634 // source is at offset 10
635 int64_t n = 10;
636 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(n));
637 ASSERT_EQ(n, buffer->size());
638 for (int i = 0; i < n; i++) {
639 ASSERT_EQ(10 + i, (*buffer)[i]) << i;
640 }
641 // source is at offset 20
642 // Exhaust buffer exactly
643 n = stream_->bytes_buffered();
644 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(n));
645 ASSERT_EQ(n, buffer->size());
646 for (int i = 0; i < n; i++) {
647 ASSERT_EQ(20 + i, (*buffer)[i]) << i;
648 }
649
650 // source is at offset 20 + n
651 // Read new buffer
652 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(10));
653 ASSERT_EQ(10, buffer->size());
654 for (int i = 0; i < 10; i++) {
655 ASSERT_EQ(20 + n + i, (*buffer)[i]) << i;
656 }
657
658 // source is at offset 30 + n
659 ASSERT_OK_AND_ASSIGN(buffer, stream_->Read(10));
660 ASSERT_EQ(10, buffer->size());
661 for (int i = 0; i < 10; i++) {
662 ASSERT_EQ(30 + n + i, (*buffer)[i]) << i;
663 }
664 }
665
666 } // namespace io
667 } // namespace arrow