]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/io/memory_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / memory_test.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <chrono>
19#include <cmath>
20#include <cstdint>
21#include <cstdlib>
22#include <cstring>
23#include <functional>
24#include <memory>
25#include <ostream>
26#include <random>
27#include <string>
28#include <vector>
29
30#include <gtest/gtest.h>
31
32#include "arrow/buffer.h"
33#include "arrow/io/caching.h"
34#include "arrow/io/interfaces.h"
35#include "arrow/io/memory.h"
36#include "arrow/io/slow.h"
37#include "arrow/io/transform.h"
38#include "arrow/io/util_internal.h"
39#include "arrow/status.h"
40#include "arrow/testing/future_util.h"
41#include "arrow/testing/gtest_util.h"
42#include "arrow/testing/util.h"
43#include "arrow/util/bit_util.h"
44#include "arrow/util/checked_cast.h"
45#include "arrow/util/future.h"
46#include "arrow/util/iterator.h"
47#include "arrow/util/logging.h"
48#include "arrow/util/parallel.h"
49
50namespace arrow {
51
52using internal::checked_cast;
53
54namespace io {
55
56std::ostream& operator<<(std::ostream& os, const ReadRange& range) {
57 return os << "<offset=" << range.offset << ", length=" << range.length << ">";
58}
59
60class TestBufferOutputStream : public ::testing::Test {
61 public:
62 void SetUp() {
63 ASSERT_OK_AND_ASSIGN(buffer_, AllocateResizableBuffer(0));
64 stream_.reset(new BufferOutputStream(buffer_));
65 }
66
67 protected:
68 std::shared_ptr<ResizableBuffer> buffer_;
69 std::unique_ptr<OutputStream> stream_;
70};
71
72TEST_F(TestBufferOutputStream, DtorCloses) {
73 std::string data = "data123456";
74
75 const int K = 100;
76 for (int i = 0; i < K; ++i) {
77 ARROW_EXPECT_OK(stream_->Write(data));
78 }
79
80 stream_ = nullptr;
81 ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size());
82}
83
84TEST_F(TestBufferOutputStream, CloseResizes) {
85 std::string data = "data123456";
86
87 const int K = 100;
88 for (int i = 0; i < K; ++i) {
89 ARROW_EXPECT_OK(stream_->Write(data));
90 }
91
92 ASSERT_OK(stream_->Close());
93 ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size());
94}
95
96TEST_F(TestBufferOutputStream, WriteAfterFinish) {
97 std::string data = "data123456";
98 ASSERT_OK(stream_->Write(data));
99
100 auto buffer_stream = checked_cast<BufferOutputStream*>(stream_.get());
101
102 ASSERT_OK(buffer_stream->Finish());
103
104 ASSERT_RAISES(IOError, stream_->Write(data));
105}
106
107TEST_F(TestBufferOutputStream, Reset) {
108 std::string data = "data123456";
109
110 auto stream = checked_cast<BufferOutputStream*>(stream_.get());
111
112 ASSERT_OK(stream->Write(data));
113
114 ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish());
115 ASSERT_EQ(buffer->size(), static_cast<int64_t>(data.size()));
116
117 ASSERT_OK(stream->Reset(2048));
118 ASSERT_OK(stream->Write(data));
119 ASSERT_OK(stream->Write(data));
120 ASSERT_OK_AND_ASSIGN(auto buffer2, stream->Finish());
121
122 ASSERT_EQ(buffer2->size(), static_cast<int64_t>(data.size() * 2));
123}
124
125TEST(TestFixedSizeBufferWriter, Basics) {
126 ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, AllocateBuffer(1024));
127
128 FixedSizeBufferWriter writer(buffer);
129
130 ASSERT_OK_AND_EQ(0, writer.Tell());
131
132 std::string data = "data123456";
133 auto nbytes = static_cast<int64_t>(data.size());
134 ASSERT_OK(writer.Write(data.c_str(), nbytes));
135
136 ASSERT_OK_AND_EQ(nbytes, writer.Tell());
137
138 ASSERT_OK(writer.Seek(4));
139 ASSERT_OK_AND_EQ(4, writer.Tell());
140
141 ASSERT_OK(writer.Seek(1024));
142 ASSERT_OK_AND_EQ(1024, writer.Tell());
143
144 // Write out of bounds
145 ASSERT_RAISES(IOError, writer.Write(data.c_str(), 1));
146
147 // Seek out of bounds
148 ASSERT_RAISES(IOError, writer.Seek(-1));
149 ASSERT_RAISES(IOError, writer.Seek(1025));
150
151 ASSERT_OK(writer.Close());
152}
153
154TEST(TestFixedSizeBufferWriter, InvalidWrites) {
155 ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, AllocateBuffer(1024));
156
157 FixedSizeBufferWriter writer(buffer);
158 const uint8_t data[10]{};
159 ASSERT_RAISES(Invalid, writer.WriteAt(-1, data, 1));
160 ASSERT_RAISES(Invalid, writer.WriteAt(1, data, -1));
161}
162
163TEST(TestBufferReader, FromStrings) {
164 // ARROW-3291: construct BufferReader from std::string or
165 // arrow::util::string_view
166
167 std::string data = "data123456";
168 auto view = util::string_view(data);
169
170 BufferReader reader1(data);
171 BufferReader reader2(view);
172
173 std::shared_ptr<Buffer> piece;
174 ASSERT_OK_AND_ASSIGN(piece, reader1.Read(4));
175 ASSERT_EQ(0, memcmp(piece->data(), data.data(), 4));
176
177 ASSERT_OK(reader2.Seek(2));
178 ASSERT_OK_AND_ASSIGN(piece, reader2.Read(4));
179 ASSERT_EQ(0, memcmp(piece->data(), data.data() + 2, 4));
180}
181
182TEST(TestBufferReader, FromNullBuffer) {
183 std::shared_ptr<Buffer> buf;
184 BufferReader reader(buf);
185 ASSERT_OK_AND_EQ(0, reader.GetSize());
186 ASSERT_OK_AND_ASSIGN(auto piece, reader.Read(10));
187 ASSERT_EQ(0, piece->size());
188}
189
190TEST(TestBufferReader, Seeking) {
191 std::string data = "data123456";
192
193 BufferReader reader(data);
194 ASSERT_OK_AND_EQ(0, reader.Tell());
195
196 ASSERT_OK(reader.Seek(9));
197 ASSERT_OK_AND_EQ(9, reader.Tell());
198
199 ASSERT_OK(reader.Seek(10));
200 ASSERT_OK_AND_EQ(10, reader.Tell());
201
202 ASSERT_RAISES(IOError, reader.Seek(11));
203 ASSERT_OK_AND_EQ(10, reader.Tell());
204}
205
206TEST(TestBufferReader, Peek) {
207 std::string data = "data123456";
208
209 BufferReader reader(std::make_shared<Buffer>(data));
210
211 util::string_view view;
212
213 ASSERT_OK_AND_ASSIGN(view, reader.Peek(4));
214
215 ASSERT_EQ(4, view.size());
216 ASSERT_EQ(data.substr(0, 4), std::string(view));
217
218 ASSERT_OK_AND_ASSIGN(view, reader.Peek(20));
219 ASSERT_EQ(data.size(), view.size());
220 ASSERT_EQ(data, std::string(view));
221}
222
223TEST(TestBufferReader, ReadAsync) {
224 std::string data = "data123456";
225
226 BufferReader reader(std::make_shared<Buffer>(data));
227
228 auto fut1 = reader.ReadAsync({}, 2, 6);
229 auto fut2 = reader.ReadAsync({}, 1, 4);
230 ASSERT_EQ(fut1.state(), FutureState::SUCCESS);
231 ASSERT_EQ(fut2.state(), FutureState::SUCCESS);
232 ASSERT_OK_AND_ASSIGN(auto buf, fut1.result());
233 AssertBufferEqual(*buf, "ta1234");
234 ASSERT_OK_AND_ASSIGN(buf, fut2.result());
235 AssertBufferEqual(*buf, "ata1");
236}
237
238TEST(TestBufferReader, InvalidReads) {
239 std::string data = "data123456";
240 BufferReader reader(std::make_shared<Buffer>(data));
241 uint8_t buffer[10];
242
243 ASSERT_RAISES(Invalid, reader.ReadAt(-1, 1));
244 ASSERT_RAISES(Invalid, reader.ReadAt(1, -1));
245 ASSERT_RAISES(Invalid, reader.ReadAt(-1, 1, buffer));
246 ASSERT_RAISES(Invalid, reader.ReadAt(1, -1, buffer));
247
248 ASSERT_RAISES(Invalid, reader.ReadAsync({}, -1, 1).result());
249 ASSERT_RAISES(Invalid, reader.ReadAsync({}, 1, -1).result());
250}
251
252TEST(TestBufferReader, RetainParentReference) {
253 // ARROW-387
254 std::string data = "data123456";
255
256 std::shared_ptr<Buffer> slice1;
257 std::shared_ptr<Buffer> slice2;
258 {
259 ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer,
260 AllocateBuffer(static_cast<int64_t>(data.size())));
261 std::memcpy(buffer->mutable_data(), data.c_str(), data.size());
262 BufferReader reader(buffer);
263 ASSERT_OK_AND_ASSIGN(slice1, reader.Read(4));
264 ASSERT_OK_AND_ASSIGN(slice2, reader.Read(6));
265 }
266
267 ASSERT_TRUE(slice1->parent() != nullptr);
268
269 ASSERT_EQ(0, std::memcmp(slice1->data(), data.c_str(), 4));
270 ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6));
271}
272
273TEST(TestBufferReader, WillNeed) {
274 {
275 std::string data = "data123456";
276 BufferReader reader(std::make_shared<Buffer>(data));
277
278 ASSERT_OK(reader.WillNeed({}));
279 ASSERT_OK(reader.WillNeed({{0, 4}, {4, 6}}));
280 ASSERT_OK(reader.WillNeed({{10, 0}}));
281 ASSERT_RAISES(IOError, reader.WillNeed({{11, 1}})); // Out of bounds
282 }
283 {
284 std::string data = "data123456";
285 BufferReader reader(reinterpret_cast<const uint8_t*>(data.data()),
286 static_cast<int64_t>(data.size()));
287
288 ASSERT_OK(reader.WillNeed({{0, 4}, {4, 6}}));
289 ASSERT_RAISES(IOError, reader.WillNeed({{11, 1}})); // Out of bounds
290 }
291}
292
293TEST(TestRandomAccessFile, GetStream) {
294 std::string data = "data1data2data3data4data5";
295
296 auto buf = std::make_shared<Buffer>(data);
297 auto file = std::make_shared<BufferReader>(buf);
298
299 std::shared_ptr<InputStream> stream1, stream2;
300
301 stream1 = RandomAccessFile::GetStream(file, 0, 10);
302 stream2 = RandomAccessFile::GetStream(file, 9, 16);
303
304 ASSERT_OK_AND_EQ(0, stream1->Tell());
305
306 std::shared_ptr<Buffer> buf2;
307 uint8_t buf3[20];
308
309 ASSERT_OK_AND_EQ(4, stream2->Read(4, buf3));
310 ASSERT_EQ(0, std::memcmp(buf3, "2dat", 4));
311 ASSERT_OK_AND_EQ(4, stream2->Tell());
312
313 ASSERT_OK_AND_EQ(6, stream1->Read(6, buf3));
314 ASSERT_EQ(0, std::memcmp(buf3, "data1d", 6));
315 ASSERT_OK_AND_EQ(6, stream1->Tell());
316
317 ASSERT_OK_AND_ASSIGN(buf2, stream1->Read(2));
318 ASSERT_TRUE(SliceBuffer(buf, 6, 2)->Equals(*buf2));
319
320 // Read to end of each stream
321 ASSERT_OK_AND_EQ(2, stream1->Read(4, buf3));
322 ASSERT_EQ(0, std::memcmp(buf3, "a2", 2));
323 ASSERT_OK_AND_EQ(10, stream1->Tell());
324
325 ASSERT_OK_AND_EQ(0, stream1->Read(1, buf3));
326 ASSERT_OK_AND_EQ(10, stream1->Tell());
327
328 // stream2 had its extent limited
329 ASSERT_OK_AND_ASSIGN(buf2, stream2->Read(20));
330 ASSERT_TRUE(SliceBuffer(buf, 13, 12)->Equals(*buf2));
331
332 ASSERT_OK_AND_ASSIGN(buf2, stream2->Read(1));
333 ASSERT_EQ(0, buf2->size());
334 ASSERT_OK_AND_EQ(16, stream2->Tell());
335
336 ASSERT_OK(stream1->Close());
337
338 // idempotent
339 ASSERT_OK(stream1->Close());
340 ASSERT_TRUE(stream1->closed());
341
342 // Check whether closed
343 ASSERT_RAISES(IOError, stream1->Tell());
344 ASSERT_RAISES(IOError, stream1->Read(1));
345 ASSERT_RAISES(IOError, stream1->Read(1, buf3));
346}
347
348TEST(TestMemcopy, ParallelMemcopy) {
349#if defined(ARROW_VALGRIND)
350 // Compensate for Valgrind's slowness
351 constexpr int64_t THRESHOLD = 32 * 1024;
352#else
353 constexpr int64_t THRESHOLD = 1024 * 1024;
354#endif
355
356 for (int i = 0; i < 5; ++i) {
357 // randomize size so the memcopy alignment is tested
358 int64_t total_size = 3 * THRESHOLD + std::rand() % 100;
359
360 ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer1, AllocateBuffer(total_size));
361 ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer2, AllocateBuffer(total_size));
362
363 random_bytes(total_size, 0, buffer2->mutable_data());
364
365 io::FixedSizeBufferWriter writer(buffer1);
366 writer.set_memcopy_threads(4);
367 writer.set_memcopy_threshold(THRESHOLD);
368 ASSERT_OK(writer.Write(buffer2->data(), buffer2->size()));
369
370 ASSERT_EQ(0, memcmp(buffer1->data(), buffer2->data(), buffer1->size()));
371 }
372}
373
374// -----------------------------------------------------------------------
375// Test slow streams
376
377template <typename SlowStreamType>
378void TestSlowInputStream() {
379 using clock = std::chrono::high_resolution_clock;
380
381 auto stream = std::make_shared<BufferReader>(util::string_view("abcdefghijkl"));
382 const double latency = 0.6;
383 auto slow = std::make_shared<SlowStreamType>(stream, latency);
384
385 ASSERT_FALSE(slow->closed());
386 auto t1 = clock::now();
387 ASSERT_OK_AND_ASSIGN(auto buf, slow->Read(6));
388 auto t2 = clock::now();
389 AssertBufferEqual(*buf, "abcdef");
390 auto dt = std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1).count();
391#ifdef ARROW_WITH_TIMING_TESTS
392 ASSERT_LT(dt, latency * 3); // likely
393 ASSERT_GT(dt, latency / 3); // likely
394#else
395 ARROW_UNUSED(dt);
396#endif
397
398 ASSERT_OK_AND_ASSIGN(util::string_view view, slow->Peek(4));
399 ASSERT_EQ(view, util::string_view("ghij"));
400
401 ASSERT_OK(slow->Close());
402 ASSERT_TRUE(slow->closed());
403 ASSERT_TRUE(stream->closed());
404 ASSERT_OK(slow->Close());
405 ASSERT_TRUE(slow->closed());
406 ASSERT_TRUE(stream->closed());
407}
408
409TEST(TestSlowInputStream, Basics) { TestSlowInputStream<SlowInputStream>(); }
410
411TEST(TestSlowRandomAccessFile, Basics) { TestSlowInputStream<SlowRandomAccessFile>(); }
412
413// -----------------------------------------------------------------------
414// Test transform streams
415
416struct DoublingTransform {
417 // A transform that duplicates every byte
418 Result<std::shared_ptr<Buffer>> operator()(const std::shared_ptr<Buffer>& buf) {
419 ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf->size() * 2));
420 const uint8_t* data = buf->data();
421 uint8_t* out_data = dest->mutable_data();
422 for (int64_t i = 0; i < buf->size(); ++i) {
423 out_data[i * 2] = data[i];
424 out_data[i * 2 + 1] = data[i];
425 }
426 return std::shared_ptr<Buffer>(std::move(dest));
427 }
428};
429
430struct SwappingTransform {
431 // A transform that swaps every pair of bytes
432 Result<std::shared_ptr<Buffer>> operator()(const std::shared_ptr<Buffer>& buf) {
433 int64_t dest_size = BitUtil::RoundDown(buf->size() + has_pending_, 2);
434 ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(dest_size));
435 const uint8_t* data = buf->data();
436 uint8_t* out_data = dest->mutable_data();
437 if (has_pending_ && dest_size > 0) {
438 *out_data++ = *data++;
439 *out_data++ = pending_byte_;
440 dest_size -= 2;
441 }
442 for (int64_t i = 0; i < dest_size; i += 2) {
443 out_data[i] = data[i + 1];
444 out_data[i + 1] = data[i];
445 }
446 has_pending_ = has_pending_ ^ (buf->size() & 1);
447 if (has_pending_) {
448 pending_byte_ = buf->data()[buf->size() - 1];
449 }
450 return std::shared_ptr<Buffer>(std::move(dest));
451 }
452
453 protected:
454 bool has_pending_ = 0;
455 uint8_t pending_byte_ = 0;
456};
457
458struct BaseShrinkingTransform {
459 // A transform that keeps one byte every N bytes
460 explicit BaseShrinkingTransform(int64_t keep_every) : keep_every_(keep_every) {}
461
462 Result<std::shared_ptr<Buffer>> operator()(const std::shared_ptr<Buffer>& buf) {
463 int64_t dest_size = (buf->size() - skip_bytes_ + keep_every_ - 1) / keep_every_;
464 ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(dest_size));
465 const uint8_t* data = buf->data() + skip_bytes_;
466 uint8_t* out_data = dest->mutable_data();
467 for (int64_t i = 0; i < dest_size; ++i) {
468 out_data[i] = data[i * keep_every_];
469 }
470 if (dest_size > 0) {
471 skip_bytes_ = skip_bytes_ + dest_size * keep_every_ - buf->size();
472 } else {
473 skip_bytes_ = skip_bytes_ - buf->size();
474 }
475 DCHECK_GE(skip_bytes_, 0);
476 DCHECK_LT(skip_bytes_, keep_every_);
477 return std::shared_ptr<Buffer>(std::move(dest));
478 }
479
480 protected:
481 int64_t skip_bytes_ = 0;
482 const int64_t keep_every_;
483};
484
485template <int N>
486struct ShrinkingTransform : public BaseShrinkingTransform {
487 ShrinkingTransform() : BaseShrinkingTransform(N) {}
488};
489
490template <typename T>
491class TestTransformInputStream : public ::testing::Test {
492 public:
493 TransformInputStream::TransformFunc transform() const { return T(); }
494
495 void TestEmptyStream() {
496 auto wrapped = std::make_shared<BufferReader>(util::string_view());
497 auto stream = std::make_shared<TransformInputStream>(wrapped, transform());
498
499 ASSERT_OK_AND_EQ(0, stream->Tell());
500 ASSERT_OK_AND_ASSIGN(auto buf, stream->Read(123));
501 ASSERT_EQ(buf->size(), 0);
502 ASSERT_OK_AND_ASSIGN(buf, stream->Read(0));
503 ASSERT_EQ(buf->size(), 0);
504 ASSERT_OK_AND_EQ(0, stream->Read(5, out_data_));
505 ASSERT_OK_AND_EQ(0, stream->Tell());
506 }
507
508 void TestBasics() {
509 auto src = Buffer::FromString("1234567890abcdefghi");
510 ASSERT_OK_AND_ASSIGN(auto expected, this->transform()(src));
511
512 auto stream = std::make_shared<TransformInputStream>(
513 std::make_shared<BufferReader>(src), this->transform());
514 std::shared_ptr<Buffer> actual;
515 AccumulateReads(stream, 200, &actual);
516 AssertBufferEqual(*actual, *expected);
517 }
518
519 void TestClose() {
520 auto src = Buffer::FromString("1234567890abcdefghi");
521 auto stream = std::make_shared<TransformInputStream>(
522 std::make_shared<BufferReader>(src), this->transform());
523 ASSERT_FALSE(stream->closed());
524 ASSERT_OK(stream->Close());
525 ASSERT_TRUE(stream->closed());
526 ASSERT_RAISES(Invalid, stream->Read(1));
527 ASSERT_RAISES(Invalid, stream->Read(1, out_data_));
528 ASSERT_RAISES(Invalid, stream->Tell());
529 ASSERT_OK(stream->Close());
530 ASSERT_TRUE(stream->closed());
531 }
532
533 void TestChunked() {
534 auto src = Buffer::FromString("1234567890abcdefghi");
535 ASSERT_OK_AND_ASSIGN(auto expected, this->transform()(src));
536
537 auto stream = std::make_shared<TransformInputStream>(
538 std::make_shared<BufferReader>(src), this->transform());
539 std::shared_ptr<Buffer> actual;
540 AccumulateReads(stream, 5, &actual);
541 AssertBufferEqual(*actual, *expected);
542 }
543
544 void TestStressChunked() {
545 ASSERT_OK_AND_ASSIGN(auto unique_src, AllocateBuffer(1000));
546 auto src = std::shared_ptr<Buffer>(std::move(unique_src));
547 random_bytes(src->size(), /*seed=*/42, src->mutable_data());
548
549 ASSERT_OK_AND_ASSIGN(auto expected, this->transform()(src));
550
551 std::default_random_engine gen(42);
552 std::uniform_int_distribution<int> chunk_sizes(0, 20);
553
554 auto stream = std::make_shared<TransformInputStream>(
555 std::make_shared<BufferReader>(src), this->transform());
556 std::shared_ptr<Buffer> actual;
557 AccumulateReads(
558 stream, [&]() -> int64_t { return chunk_sizes(gen); }, &actual);
559 AssertBufferEqual(*actual, *expected);
560 }
561
562 void AccumulateReads(const std::shared_ptr<InputStream>& stream,
563 std::function<int64_t()> gen_chunk_sizes,
564 std::shared_ptr<Buffer>* out) {
565 std::vector<std::shared_ptr<Buffer>> buffers;
566 int64_t total_size = 0;
567 while (true) {
568 const int64_t chunk_size = gen_chunk_sizes();
569 ASSERT_OK_AND_ASSIGN(auto buf, stream->Read(chunk_size));
570 const int64_t buf_size = buf->size();
571 total_size += buf_size;
572 ASSERT_OK_AND_EQ(total_size, stream->Tell());
573 if (chunk_size > 0 && buf_size == 0) {
574 // EOF
575 break;
576 }
577 buffers.push_back(std::move(buf));
578 if (buf_size < chunk_size) {
579 // Short read should imply EOF on next read
580 ASSERT_OK_AND_ASSIGN(auto buf, stream->Read(100));
581 ASSERT_EQ(buf->size(), 0);
582 break;
583 }
584 }
585 ASSERT_OK_AND_ASSIGN(*out, ConcatenateBuffers(buffers));
586 }
587
588 void AccumulateReads(const std::shared_ptr<InputStream>& stream, int64_t chunk_size,
589 std::shared_ptr<Buffer>* out) {
590 return AccumulateReads(
591 stream, [=]() { return chunk_size; }, out);
592 }
593
594 protected:
595 uint8_t* out_data_[10];
596};
597
598using TransformTypes =
599 ::testing::Types<DoublingTransform, SwappingTransform, ShrinkingTransform<2>,
600 ShrinkingTransform<3>, ShrinkingTransform<7>>;
601
602TYPED_TEST_SUITE(TestTransformInputStream, TransformTypes);
603
604TYPED_TEST(TestTransformInputStream, EmptyStream) { this->TestEmptyStream(); }
605
606TYPED_TEST(TestTransformInputStream, Basics) { this->TestBasics(); }
607
608TYPED_TEST(TestTransformInputStream, Close) { this->TestClose(); }
609
610TYPED_TEST(TestTransformInputStream, Chunked) { this->TestChunked(); }
611
612TYPED_TEST(TestTransformInputStream, StressChunked) { this->TestStressChunked(); }
613
614static Result<std::shared_ptr<Buffer>> FailingTransform(
615 const std::shared_ptr<Buffer>& buf) {
616 return Status::UnknownError("Failed transform");
617}
618
619TEST(TestTransformInputStream, FailingTransform) {
620 auto src = Buffer::FromString("1234567890abcdefghi");
621 auto stream = std::make_shared<TransformInputStream>(
622 std::make_shared<BufferReader>(src), FailingTransform);
623 ASSERT_RAISES(UnknownError, stream->Read(5));
624}
625
626// -----------------------------------------------------------------------
627// Test various utilities
628
629TEST(TestInputStreamIterator, Basics) {
630 auto reader = std::make_shared<BufferReader>(Buffer::FromString("data123456"));
631 ASSERT_OK_AND_ASSIGN(auto it, MakeInputStreamIterator(reader, /*block_size=*/3));
632 std::shared_ptr<Buffer> buf;
633 ASSERT_OK_AND_ASSIGN(buf, it.Next());
634 AssertBufferEqual(*buf, "dat");
635 ASSERT_OK_AND_ASSIGN(buf, it.Next());
636 AssertBufferEqual(*buf, "a12");
637 ASSERT_OK_AND_ASSIGN(buf, it.Next());
638 AssertBufferEqual(*buf, "345");
639 ASSERT_OK_AND_ASSIGN(buf, it.Next());
640 AssertBufferEqual(*buf, "6");
641 ASSERT_OK_AND_ASSIGN(buf, it.Next());
642 ASSERT_EQ(buf, nullptr);
643 ASSERT_OK_AND_ASSIGN(buf, it.Next());
644 ASSERT_EQ(buf, nullptr);
645}
646
647TEST(TestInputStreamIterator, Closed) {
648 auto reader = std::make_shared<BufferReader>(Buffer::FromString("data123456"));
649 ASSERT_OK(reader->Close());
650 ASSERT_RAISES(Invalid, MakeInputStreamIterator(reader, 3));
651
652 reader = std::make_shared<BufferReader>(Buffer::FromString("data123456"));
653 ASSERT_OK_AND_ASSIGN(auto it, MakeInputStreamIterator(reader, /*block_size=*/3));
654 ASSERT_OK_AND_ASSIGN(auto buf, it.Next());
655 AssertBufferEqual(*buf, "dat");
656 // Close stream and read from iterator
657 ASSERT_OK(reader->Close());
658 ASSERT_RAISES(Invalid, it.Next().status());
659}
660
661TEST(CoalesceReadRanges, Basics) {
662 auto check = [](std::vector<ReadRange> ranges,
663 std::vector<ReadRange> expected) -> void {
664 const int64_t hole_size_limit = 9;
665 const int64_t range_size_limit = 99;
666 auto coalesced =
667 internal::CoalesceReadRanges(ranges, hole_size_limit, range_size_limit);
668 ASSERT_EQ(coalesced, expected);
669 };
670
671 check({}, {});
672 // Zero sized range that ends up in empty list
673 check({{110, 0}}, {});
674 // Combination on 1 zero sized range and 1 non-zero sized range
675 check({{110, 10}, {120, 0}}, {{110, 10}});
676 // 1 non-zero sized range
677 check({{110, 10}}, {{110, 10}});
678 // No holes + unordered ranges
679 check({{130, 10}, {110, 10}, {120, 10}}, {{110, 30}});
680 // No holes
681 check({{110, 10}, {120, 10}, {130, 10}}, {{110, 30}});
682 // Small holes only
683 check({{110, 11}, {130, 11}, {150, 11}}, {{110, 51}});
684 // Large holes
685 check({{110, 10}, {130, 10}}, {{110, 10}, {130, 10}});
686 check({{110, 11}, {130, 11}, {150, 10}, {170, 11}, {190, 11}}, {{110, 50}, {170, 31}});
687
688 // With zero-sized ranges
689 check({{110, 11}, {130, 0}, {130, 11}, {145, 0}, {150, 11}, {200, 0}}, {{110, 51}});
690
691 // No holes but large ranges
692 check({{110, 100}, {210, 100}}, {{110, 100}, {210, 100}});
693 // Small holes and large range in the middle (*)
694 check({{110, 10}, {120, 11}, {140, 100}, {240, 11}, {260, 11}},
695 {{110, 21}, {140, 100}, {240, 31}});
696 // Mid-size ranges that would turn large after coalescing
697 check({{100, 50}, {150, 50}}, {{100, 50}, {150, 50}});
698 check({{100, 30}, {130, 30}, {160, 30}, {190, 30}, {220, 30}}, {{100, 90}, {190, 60}});
699
700 // Same as (*) but unsorted
701 check({{140, 100}, {120, 11}, {240, 11}, {110, 10}, {260, 11}},
702 {{110, 21}, {140, 100}, {240, 31}});
703
704 // Completely overlapping ranges should be eliminated
705 check({{20, 5}, {20, 5}, {21, 2}}, {{20, 5}});
706}
707
708class CountingBufferReader : public BufferReader {
709 public:
710 using BufferReader::BufferReader;
711 Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext& context, int64_t position,
712 int64_t nbytes) override {
713 read_count_++;
714 return BufferReader::ReadAsync(context, position, nbytes);
715 }
716 int64_t read_count() const { return read_count_; }
717
718 private:
719 int64_t read_count_ = 0;
720};
721
722TEST(RangeReadCache, Basics) {
723 std::string data = "abcdefghijklmnopqrstuvwxyz";
724
725 CacheOptions options = CacheOptions::Defaults();
726 options.hole_size_limit = 2;
727 options.range_size_limit = 10;
728
729 for (auto lazy : std::vector<bool>{false, true}) {
730 SCOPED_TRACE(lazy);
731 options.lazy = lazy;
732 auto file = std::make_shared<CountingBufferReader>(Buffer(data));
733 internal::ReadRangeCache cache(file, {}, options);
734
735 ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}}));
736 ASSERT_OK(cache.Cache({{10, 4}, {14, 0}, {15, 4}}));
737
738 ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({20, 2}));
739 AssertBufferEqual(*buf, "uv");
740 ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 2}));
741 AssertBufferEqual(*buf, "bc");
742 ASSERT_OK_AND_ASSIGN(buf, cache.Read({3, 2}));
743 AssertBufferEqual(*buf, "de");
744 ASSERT_OK_AND_ASSIGN(buf, cache.Read({8, 2}));
745 AssertBufferEqual(*buf, "ij");
746 ASSERT_OK_AND_ASSIGN(buf, cache.Read({10, 4}));
747 AssertBufferEqual(*buf, "klmn");
748 ASSERT_OK_AND_ASSIGN(buf, cache.Read({15, 4}));
749 AssertBufferEqual(*buf, "pqrs");
750 ASSERT_FINISHES_OK(cache.WaitFor({{15, 1}, {16, 3}, {25, 0}, {1, 2}}));
751 // Zero-sized
752 ASSERT_OK_AND_ASSIGN(buf, cache.Read({14, 0}));
753 AssertBufferEqual(*buf, "");
754 ASSERT_OK_AND_ASSIGN(buf, cache.Read({25, 0}));
755 AssertBufferEqual(*buf, "");
756
757 // Non-cached ranges
758 ASSERT_RAISES(Invalid, cache.Read({20, 3}));
759 ASSERT_RAISES(Invalid, cache.Read({19, 3}));
760 ASSERT_RAISES(Invalid, cache.Read({0, 3}));
761 ASSERT_RAISES(Invalid, cache.Read({25, 2}));
762 ASSERT_FINISHES_AND_RAISES(Invalid, cache.WaitFor({{25, 2}}));
763 ASSERT_FINISHES_AND_RAISES(Invalid, cache.WaitFor({{1, 2}, {25, 2}}));
764
765 ASSERT_FINISHES_OK(cache.Wait());
766 // 8 ranges should lead to less than 8 reads
767 ASSERT_LT(file->read_count(), 8);
768 }
769}
770
771TEST(RangeReadCache, Concurrency) {
772 std::string data = "abcdefghijklmnopqrstuvwxyz";
773
774 auto file = std::make_shared<BufferReader>(Buffer(data));
775 std::vector<ReadRange> ranges{{1, 2}, {3, 2}, {8, 2}, {20, 2},
776 {25, 0}, {10, 4}, {14, 0}, {15, 4}};
777
778 for (auto lazy : std::vector<bool>{false, true}) {
779 SCOPED_TRACE(lazy);
780 CacheOptions options = CacheOptions::Defaults();
781 options.hole_size_limit = 2;
782 options.range_size_limit = 10;
783 options.lazy = lazy;
784
785 {
786 internal::ReadRangeCache cache(file, {}, options);
787 ASSERT_OK(cache.Cache(ranges));
788 std::vector<Future<std::shared_ptr<Buffer>>> futures;
789 for (const auto& range : ranges) {
790 futures.push_back(
791 cache.WaitFor({range}).Then([&cache, range]() { return cache.Read(range); }));
792 }
793 for (auto fut : futures) {
794 ASSERT_FINISHES_OK(fut);
795 }
796 }
797 {
798 internal::ReadRangeCache cache(file, {}, options);
799 ASSERT_OK(cache.Cache(ranges));
800 ASSERT_OK(arrow::internal::ParallelFor(
801 static_cast<int>(ranges.size()),
802 [&](int index) { return cache.Read(ranges[index]).status(); }));
803 }
804 }
805}
806
807TEST(RangeReadCache, Lazy) {
808 std::string data = "abcdefghijklmnopqrstuvwxyz";
809
810 auto file = std::make_shared<CountingBufferReader>(Buffer(data));
811 CacheOptions options = CacheOptions::LazyDefaults();
812 options.hole_size_limit = 2;
813 options.range_size_limit = 10;
814 internal::ReadRangeCache cache(file, {}, options);
815
816 ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}}));
817 ASSERT_OK(cache.Cache({{10, 4}, {14, 0}, {15, 4}}));
818
819 // Lazy cache doesn't fetch ranges until requested
820 ASSERT_EQ(0, file->read_count());
821
822 ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({20, 2}));
823 AssertBufferEqual(*buf, "uv");
824 ASSERT_EQ(1, file->read_count());
825
826 ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 4}));
827 AssertBufferEqual(*buf, "bcde");
828 ASSERT_EQ(2, file->read_count());
829
830 // Requested ranges are still cached
831 ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 4}));
832 ASSERT_EQ(2, file->read_count());
833
834 // Non-cached ranges
835 ASSERT_RAISES(Invalid, cache.Read({20, 3}));
836 ASSERT_RAISES(Invalid, cache.Read({19, 3}));
837 ASSERT_RAISES(Invalid, cache.Read({0, 3}));
838 ASSERT_RAISES(Invalid, cache.Read({25, 2}));
839
840 // Can asynchronously kick off a read (though BufferReader::ReadAsync is synchronous so
841 // it will increment the read count here)
842 ASSERT_FINISHES_OK(cache.WaitFor({{10, 2}, {15, 4}}));
843 ASSERT_EQ(3, file->read_count());
844 ASSERT_OK_AND_ASSIGN(buf, cache.Read({10, 2}));
845 ASSERT_EQ(3, file->read_count());
846}
847
848TEST(CacheOptions, Basics) {
849 auto check = [](const CacheOptions actual, const double expected_hole_size_limit_MiB,
850 const double expected_range_size_limit_MiB) -> void {
851 const CacheOptions expected = {
852 static_cast<int64_t>(std::round(expected_hole_size_limit_MiB * 1024 * 1024)),
853 static_cast<int64_t>(std::round(expected_range_size_limit_MiB * 1024 * 1024)),
854 /*lazy=*/false};
855 ASSERT_EQ(actual, expected);
856 };
857
858 // Test: normal usage.
859 // TTFB = 5 ms, BW = 500 MiB/s,
860 // we expect hole_size_limit = 2.5 MiB, and range_size_limit = 22.5 MiB
861 check(CacheOptions::MakeFromNetworkMetrics(5, 500), 2.5, 22.5);
862 // Test: custom bandwidth utilization.
863 // TTFB = 5 ms, BW = 500 MiB/s, BW_utilization = 75%,
864 // we expect a change in range_size_limit = 7.5 MiB.
865 check(CacheOptions::MakeFromNetworkMetrics(5, 500, .75), 2.5, 7.5);
866 // Test: custom max_ideal_request_size, range_size_limit gets capped.
867 // TTFB = 5 ms, BW = 500 MiB/s, BW_utilization = 75%, max_ideal_request_size = 5 MiB,
868 // we expect the range_size_limit to be capped at 5 MiB.
869 check(CacheOptions::MakeFromNetworkMetrics(5, 500, .75, 5), 2.5, 5);
870}
871
872TEST(IOThreadPool, Capacity) {
873 // Simple sanity check
874 auto pool = internal::GetIOThreadPool();
875 int capacity = pool->GetCapacity();
876 ASSERT_GT(capacity, 0);
877 ASSERT_EQ(GetIOThreadPoolCapacity(), capacity);
878 ASSERT_OK(SetIOThreadPoolCapacity(capacity + 1));
879 ASSERT_EQ(GetIOThreadPoolCapacity(), capacity + 1);
880}
881
882} // namespace io
883} // namespace arrow