]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include <chrono> | |
19 | #include <cmath> | |
20 | #include <cstdint> | |
21 | #include <cstdlib> | |
22 | #include <cstring> | |
23 | #include <functional> | |
24 | #include <memory> | |
25 | #include <ostream> | |
26 | #include <random> | |
27 | #include <string> | |
28 | #include <vector> | |
29 | ||
30 | #include <gtest/gtest.h> | |
31 | ||
32 | #include "arrow/buffer.h" | |
33 | #include "arrow/io/caching.h" | |
34 | #include "arrow/io/interfaces.h" | |
35 | #include "arrow/io/memory.h" | |
36 | #include "arrow/io/slow.h" | |
37 | #include "arrow/io/transform.h" | |
38 | #include "arrow/io/util_internal.h" | |
39 | #include "arrow/status.h" | |
40 | #include "arrow/testing/future_util.h" | |
41 | #include "arrow/testing/gtest_util.h" | |
42 | #include "arrow/testing/util.h" | |
43 | #include "arrow/util/bit_util.h" | |
44 | #include "arrow/util/checked_cast.h" | |
45 | #include "arrow/util/future.h" | |
46 | #include "arrow/util/iterator.h" | |
47 | #include "arrow/util/logging.h" | |
48 | #include "arrow/util/parallel.h" | |
49 | ||
50 | namespace arrow { | |
51 | ||
52 | using internal::checked_cast; | |
53 | ||
54 | namespace io { | |
55 | ||
56 | std::ostream& operator<<(std::ostream& os, const ReadRange& range) { | |
57 | return os << "<offset=" << range.offset << ", length=" << range.length << ">"; | |
58 | } | |
59 | ||
60 | class TestBufferOutputStream : public ::testing::Test { | |
61 | public: | |
62 | void SetUp() { | |
63 | ASSERT_OK_AND_ASSIGN(buffer_, AllocateResizableBuffer(0)); | |
64 | stream_.reset(new BufferOutputStream(buffer_)); | |
65 | } | |
66 | ||
67 | protected: | |
68 | std::shared_ptr<ResizableBuffer> buffer_; | |
69 | std::unique_ptr<OutputStream> stream_; | |
70 | }; | |
71 | ||
72 | TEST_F(TestBufferOutputStream, DtorCloses) { | |
73 | std::string data = "data123456"; | |
74 | ||
75 | const int K = 100; | |
76 | for (int i = 0; i < K; ++i) { | |
77 | ARROW_EXPECT_OK(stream_->Write(data)); | |
78 | } | |
79 | ||
80 | stream_ = nullptr; | |
81 | ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size()); | |
82 | } | |
83 | ||
84 | TEST_F(TestBufferOutputStream, CloseResizes) { | |
85 | std::string data = "data123456"; | |
86 | ||
87 | const int K = 100; | |
88 | for (int i = 0; i < K; ++i) { | |
89 | ARROW_EXPECT_OK(stream_->Write(data)); | |
90 | } | |
91 | ||
92 | ASSERT_OK(stream_->Close()); | |
93 | ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size()); | |
94 | } | |
95 | ||
96 | TEST_F(TestBufferOutputStream, WriteAfterFinish) { | |
97 | std::string data = "data123456"; | |
98 | ASSERT_OK(stream_->Write(data)); | |
99 | ||
100 | auto buffer_stream = checked_cast<BufferOutputStream*>(stream_.get()); | |
101 | ||
102 | ASSERT_OK(buffer_stream->Finish()); | |
103 | ||
104 | ASSERT_RAISES(IOError, stream_->Write(data)); | |
105 | } | |
106 | ||
107 | TEST_F(TestBufferOutputStream, Reset) { | |
108 | std::string data = "data123456"; | |
109 | ||
110 | auto stream = checked_cast<BufferOutputStream*>(stream_.get()); | |
111 | ||
112 | ASSERT_OK(stream->Write(data)); | |
113 | ||
114 | ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish()); | |
115 | ASSERT_EQ(buffer->size(), static_cast<int64_t>(data.size())); | |
116 | ||
117 | ASSERT_OK(stream->Reset(2048)); | |
118 | ASSERT_OK(stream->Write(data)); | |
119 | ASSERT_OK(stream->Write(data)); | |
120 | ASSERT_OK_AND_ASSIGN(auto buffer2, stream->Finish()); | |
121 | ||
122 | ASSERT_EQ(buffer2->size(), static_cast<int64_t>(data.size() * 2)); | |
123 | } | |
124 | ||
125 | TEST(TestFixedSizeBufferWriter, Basics) { | |
126 | ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, AllocateBuffer(1024)); | |
127 | ||
128 | FixedSizeBufferWriter writer(buffer); | |
129 | ||
130 | ASSERT_OK_AND_EQ(0, writer.Tell()); | |
131 | ||
132 | std::string data = "data123456"; | |
133 | auto nbytes = static_cast<int64_t>(data.size()); | |
134 | ASSERT_OK(writer.Write(data.c_str(), nbytes)); | |
135 | ||
136 | ASSERT_OK_AND_EQ(nbytes, writer.Tell()); | |
137 | ||
138 | ASSERT_OK(writer.Seek(4)); | |
139 | ASSERT_OK_AND_EQ(4, writer.Tell()); | |
140 | ||
141 | ASSERT_OK(writer.Seek(1024)); | |
142 | ASSERT_OK_AND_EQ(1024, writer.Tell()); | |
143 | ||
144 | // Write out of bounds | |
145 | ASSERT_RAISES(IOError, writer.Write(data.c_str(), 1)); | |
146 | ||
147 | // Seek out of bounds | |
148 | ASSERT_RAISES(IOError, writer.Seek(-1)); | |
149 | ASSERT_RAISES(IOError, writer.Seek(1025)); | |
150 | ||
151 | ASSERT_OK(writer.Close()); | |
152 | } | |
153 | ||
154 | TEST(TestFixedSizeBufferWriter, InvalidWrites) { | |
155 | ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, AllocateBuffer(1024)); | |
156 | ||
157 | FixedSizeBufferWriter writer(buffer); | |
158 | const uint8_t data[10]{}; | |
159 | ASSERT_RAISES(Invalid, writer.WriteAt(-1, data, 1)); | |
160 | ASSERT_RAISES(Invalid, writer.WriteAt(1, data, -1)); | |
161 | } | |
162 | ||
163 | TEST(TestBufferReader, FromStrings) { | |
164 | // ARROW-3291: construct BufferReader from std::string or | |
165 | // arrow::util::string_view | |
166 | ||
167 | std::string data = "data123456"; | |
168 | auto view = util::string_view(data); | |
169 | ||
170 | BufferReader reader1(data); | |
171 | BufferReader reader2(view); | |
172 | ||
173 | std::shared_ptr<Buffer> piece; | |
174 | ASSERT_OK_AND_ASSIGN(piece, reader1.Read(4)); | |
175 | ASSERT_EQ(0, memcmp(piece->data(), data.data(), 4)); | |
176 | ||
177 | ASSERT_OK(reader2.Seek(2)); | |
178 | ASSERT_OK_AND_ASSIGN(piece, reader2.Read(4)); | |
179 | ASSERT_EQ(0, memcmp(piece->data(), data.data() + 2, 4)); | |
180 | } | |
181 | ||
182 | TEST(TestBufferReader, FromNullBuffer) { | |
183 | std::shared_ptr<Buffer> buf; | |
184 | BufferReader reader(buf); | |
185 | ASSERT_OK_AND_EQ(0, reader.GetSize()); | |
186 | ASSERT_OK_AND_ASSIGN(auto piece, reader.Read(10)); | |
187 | ASSERT_EQ(0, piece->size()); | |
188 | } | |
189 | ||
190 | TEST(TestBufferReader, Seeking) { | |
191 | std::string data = "data123456"; | |
192 | ||
193 | BufferReader reader(data); | |
194 | ASSERT_OK_AND_EQ(0, reader.Tell()); | |
195 | ||
196 | ASSERT_OK(reader.Seek(9)); | |
197 | ASSERT_OK_AND_EQ(9, reader.Tell()); | |
198 | ||
199 | ASSERT_OK(reader.Seek(10)); | |
200 | ASSERT_OK_AND_EQ(10, reader.Tell()); | |
201 | ||
202 | ASSERT_RAISES(IOError, reader.Seek(11)); | |
203 | ASSERT_OK_AND_EQ(10, reader.Tell()); | |
204 | } | |
205 | ||
206 | TEST(TestBufferReader, Peek) { | |
207 | std::string data = "data123456"; | |
208 | ||
209 | BufferReader reader(std::make_shared<Buffer>(data)); | |
210 | ||
211 | util::string_view view; | |
212 | ||
213 | ASSERT_OK_AND_ASSIGN(view, reader.Peek(4)); | |
214 | ||
215 | ASSERT_EQ(4, view.size()); | |
216 | ASSERT_EQ(data.substr(0, 4), std::string(view)); | |
217 | ||
218 | ASSERT_OK_AND_ASSIGN(view, reader.Peek(20)); | |
219 | ASSERT_EQ(data.size(), view.size()); | |
220 | ASSERT_EQ(data, std::string(view)); | |
221 | } | |
222 | ||
223 | TEST(TestBufferReader, ReadAsync) { | |
224 | std::string data = "data123456"; | |
225 | ||
226 | BufferReader reader(std::make_shared<Buffer>(data)); | |
227 | ||
228 | auto fut1 = reader.ReadAsync({}, 2, 6); | |
229 | auto fut2 = reader.ReadAsync({}, 1, 4); | |
230 | ASSERT_EQ(fut1.state(), FutureState::SUCCESS); | |
231 | ASSERT_EQ(fut2.state(), FutureState::SUCCESS); | |
232 | ASSERT_OK_AND_ASSIGN(auto buf, fut1.result()); | |
233 | AssertBufferEqual(*buf, "ta1234"); | |
234 | ASSERT_OK_AND_ASSIGN(buf, fut2.result()); | |
235 | AssertBufferEqual(*buf, "ata1"); | |
236 | } | |
237 | ||
238 | TEST(TestBufferReader, InvalidReads) { | |
239 | std::string data = "data123456"; | |
240 | BufferReader reader(std::make_shared<Buffer>(data)); | |
241 | uint8_t buffer[10]; | |
242 | ||
243 | ASSERT_RAISES(Invalid, reader.ReadAt(-1, 1)); | |
244 | ASSERT_RAISES(Invalid, reader.ReadAt(1, -1)); | |
245 | ASSERT_RAISES(Invalid, reader.ReadAt(-1, 1, buffer)); | |
246 | ASSERT_RAISES(Invalid, reader.ReadAt(1, -1, buffer)); | |
247 | ||
248 | ASSERT_RAISES(Invalid, reader.ReadAsync({}, -1, 1).result()); | |
249 | ASSERT_RAISES(Invalid, reader.ReadAsync({}, 1, -1).result()); | |
250 | } | |
251 | ||
252 | TEST(TestBufferReader, RetainParentReference) { | |
253 | // ARROW-387 | |
254 | std::string data = "data123456"; | |
255 | ||
256 | std::shared_ptr<Buffer> slice1; | |
257 | std::shared_ptr<Buffer> slice2; | |
258 | { | |
259 | ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, | |
260 | AllocateBuffer(static_cast<int64_t>(data.size()))); | |
261 | std::memcpy(buffer->mutable_data(), data.c_str(), data.size()); | |
262 | BufferReader reader(buffer); | |
263 | ASSERT_OK_AND_ASSIGN(slice1, reader.Read(4)); | |
264 | ASSERT_OK_AND_ASSIGN(slice2, reader.Read(6)); | |
265 | } | |
266 | ||
267 | ASSERT_TRUE(slice1->parent() != nullptr); | |
268 | ||
269 | ASSERT_EQ(0, std::memcmp(slice1->data(), data.c_str(), 4)); | |
270 | ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6)); | |
271 | } | |
272 | ||
273 | TEST(TestBufferReader, WillNeed) { | |
274 | { | |
275 | std::string data = "data123456"; | |
276 | BufferReader reader(std::make_shared<Buffer>(data)); | |
277 | ||
278 | ASSERT_OK(reader.WillNeed({})); | |
279 | ASSERT_OK(reader.WillNeed({{0, 4}, {4, 6}})); | |
280 | ASSERT_OK(reader.WillNeed({{10, 0}})); | |
281 | ASSERT_RAISES(IOError, reader.WillNeed({{11, 1}})); // Out of bounds | |
282 | } | |
283 | { | |
284 | std::string data = "data123456"; | |
285 | BufferReader reader(reinterpret_cast<const uint8_t*>(data.data()), | |
286 | static_cast<int64_t>(data.size())); | |
287 | ||
288 | ASSERT_OK(reader.WillNeed({{0, 4}, {4, 6}})); | |
289 | ASSERT_RAISES(IOError, reader.WillNeed({{11, 1}})); // Out of bounds | |
290 | } | |
291 | } | |
292 | ||
293 | TEST(TestRandomAccessFile, GetStream) { | |
294 | std::string data = "data1data2data3data4data5"; | |
295 | ||
296 | auto buf = std::make_shared<Buffer>(data); | |
297 | auto file = std::make_shared<BufferReader>(buf); | |
298 | ||
299 | std::shared_ptr<InputStream> stream1, stream2; | |
300 | ||
301 | stream1 = RandomAccessFile::GetStream(file, 0, 10); | |
302 | stream2 = RandomAccessFile::GetStream(file, 9, 16); | |
303 | ||
304 | ASSERT_OK_AND_EQ(0, stream1->Tell()); | |
305 | ||
306 | std::shared_ptr<Buffer> buf2; | |
307 | uint8_t buf3[20]; | |
308 | ||
309 | ASSERT_OK_AND_EQ(4, stream2->Read(4, buf3)); | |
310 | ASSERT_EQ(0, std::memcmp(buf3, "2dat", 4)); | |
311 | ASSERT_OK_AND_EQ(4, stream2->Tell()); | |
312 | ||
313 | ASSERT_OK_AND_EQ(6, stream1->Read(6, buf3)); | |
314 | ASSERT_EQ(0, std::memcmp(buf3, "data1d", 6)); | |
315 | ASSERT_OK_AND_EQ(6, stream1->Tell()); | |
316 | ||
317 | ASSERT_OK_AND_ASSIGN(buf2, stream1->Read(2)); | |
318 | ASSERT_TRUE(SliceBuffer(buf, 6, 2)->Equals(*buf2)); | |
319 | ||
320 | // Read to end of each stream | |
321 | ASSERT_OK_AND_EQ(2, stream1->Read(4, buf3)); | |
322 | ASSERT_EQ(0, std::memcmp(buf3, "a2", 2)); | |
323 | ASSERT_OK_AND_EQ(10, stream1->Tell()); | |
324 | ||
325 | ASSERT_OK_AND_EQ(0, stream1->Read(1, buf3)); | |
326 | ASSERT_OK_AND_EQ(10, stream1->Tell()); | |
327 | ||
328 | // stream2 had its extent limited | |
329 | ASSERT_OK_AND_ASSIGN(buf2, stream2->Read(20)); | |
330 | ASSERT_TRUE(SliceBuffer(buf, 13, 12)->Equals(*buf2)); | |
331 | ||
332 | ASSERT_OK_AND_ASSIGN(buf2, stream2->Read(1)); | |
333 | ASSERT_EQ(0, buf2->size()); | |
334 | ASSERT_OK_AND_EQ(16, stream2->Tell()); | |
335 | ||
336 | ASSERT_OK(stream1->Close()); | |
337 | ||
338 | // idempotent | |
339 | ASSERT_OK(stream1->Close()); | |
340 | ASSERT_TRUE(stream1->closed()); | |
341 | ||
342 | // Check whether closed | |
343 | ASSERT_RAISES(IOError, stream1->Tell()); | |
344 | ASSERT_RAISES(IOError, stream1->Read(1)); | |
345 | ASSERT_RAISES(IOError, stream1->Read(1, buf3)); | |
346 | } | |
347 | ||
348 | TEST(TestMemcopy, ParallelMemcopy) { | |
349 | #if defined(ARROW_VALGRIND) | |
350 | // Compensate for Valgrind's slowness | |
351 | constexpr int64_t THRESHOLD = 32 * 1024; | |
352 | #else | |
353 | constexpr int64_t THRESHOLD = 1024 * 1024; | |
354 | #endif | |
355 | ||
356 | for (int i = 0; i < 5; ++i) { | |
357 | // randomize size so the memcopy alignment is tested | |
358 | int64_t total_size = 3 * THRESHOLD + std::rand() % 100; | |
359 | ||
360 | ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer1, AllocateBuffer(total_size)); | |
361 | ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer2, AllocateBuffer(total_size)); | |
362 | ||
363 | random_bytes(total_size, 0, buffer2->mutable_data()); | |
364 | ||
365 | io::FixedSizeBufferWriter writer(buffer1); | |
366 | writer.set_memcopy_threads(4); | |
367 | writer.set_memcopy_threshold(THRESHOLD); | |
368 | ASSERT_OK(writer.Write(buffer2->data(), buffer2->size())); | |
369 | ||
370 | ASSERT_EQ(0, memcmp(buffer1->data(), buffer2->data(), buffer1->size())); | |
371 | } | |
372 | } | |
373 | ||
374 | // ----------------------------------------------------------------------- | |
375 | // Test slow streams | |
376 | ||
377 | template <typename SlowStreamType> | |
378 | void TestSlowInputStream() { | |
379 | using clock = std::chrono::high_resolution_clock; | |
380 | ||
381 | auto stream = std::make_shared<BufferReader>(util::string_view("abcdefghijkl")); | |
382 | const double latency = 0.6; | |
383 | auto slow = std::make_shared<SlowStreamType>(stream, latency); | |
384 | ||
385 | ASSERT_FALSE(slow->closed()); | |
386 | auto t1 = clock::now(); | |
387 | ASSERT_OK_AND_ASSIGN(auto buf, slow->Read(6)); | |
388 | auto t2 = clock::now(); | |
389 | AssertBufferEqual(*buf, "abcdef"); | |
390 | auto dt = std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1).count(); | |
391 | #ifdef ARROW_WITH_TIMING_TESTS | |
392 | ASSERT_LT(dt, latency * 3); // likely | |
393 | ASSERT_GT(dt, latency / 3); // likely | |
394 | #else | |
395 | ARROW_UNUSED(dt); | |
396 | #endif | |
397 | ||
398 | ASSERT_OK_AND_ASSIGN(util::string_view view, slow->Peek(4)); | |
399 | ASSERT_EQ(view, util::string_view("ghij")); | |
400 | ||
401 | ASSERT_OK(slow->Close()); | |
402 | ASSERT_TRUE(slow->closed()); | |
403 | ASSERT_TRUE(stream->closed()); | |
404 | ASSERT_OK(slow->Close()); | |
405 | ASSERT_TRUE(slow->closed()); | |
406 | ASSERT_TRUE(stream->closed()); | |
407 | } | |
408 | ||
409 | TEST(TestSlowInputStream, Basics) { TestSlowInputStream<SlowInputStream>(); } | |
410 | ||
411 | TEST(TestSlowRandomAccessFile, Basics) { TestSlowInputStream<SlowRandomAccessFile>(); } | |
412 | ||
413 | // ----------------------------------------------------------------------- | |
414 | // Test transform streams | |
415 | ||
416 | struct DoublingTransform { | |
417 | // A transform that duplicates every byte | |
418 | Result<std::shared_ptr<Buffer>> operator()(const std::shared_ptr<Buffer>& buf) { | |
419 | ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(buf->size() * 2)); | |
420 | const uint8_t* data = buf->data(); | |
421 | uint8_t* out_data = dest->mutable_data(); | |
422 | for (int64_t i = 0; i < buf->size(); ++i) { | |
423 | out_data[i * 2] = data[i]; | |
424 | out_data[i * 2 + 1] = data[i]; | |
425 | } | |
426 | return std::shared_ptr<Buffer>(std::move(dest)); | |
427 | } | |
428 | }; | |
429 | ||
430 | struct SwappingTransform { | |
431 | // A transform that swaps every pair of bytes | |
432 | Result<std::shared_ptr<Buffer>> operator()(const std::shared_ptr<Buffer>& buf) { | |
433 | int64_t dest_size = BitUtil::RoundDown(buf->size() + has_pending_, 2); | |
434 | ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(dest_size)); | |
435 | const uint8_t* data = buf->data(); | |
436 | uint8_t* out_data = dest->mutable_data(); | |
437 | if (has_pending_ && dest_size > 0) { | |
438 | *out_data++ = *data++; | |
439 | *out_data++ = pending_byte_; | |
440 | dest_size -= 2; | |
441 | } | |
442 | for (int64_t i = 0; i < dest_size; i += 2) { | |
443 | out_data[i] = data[i + 1]; | |
444 | out_data[i + 1] = data[i]; | |
445 | } | |
446 | has_pending_ = has_pending_ ^ (buf->size() & 1); | |
447 | if (has_pending_) { | |
448 | pending_byte_ = buf->data()[buf->size() - 1]; | |
449 | } | |
450 | return std::shared_ptr<Buffer>(std::move(dest)); | |
451 | } | |
452 | ||
453 | protected: | |
454 | bool has_pending_ = 0; | |
455 | uint8_t pending_byte_ = 0; | |
456 | }; | |
457 | ||
458 | struct BaseShrinkingTransform { | |
459 | // A transform that keeps one byte every N bytes | |
460 | explicit BaseShrinkingTransform(int64_t keep_every) : keep_every_(keep_every) {} | |
461 | ||
462 | Result<std::shared_ptr<Buffer>> operator()(const std::shared_ptr<Buffer>& buf) { | |
463 | int64_t dest_size = (buf->size() - skip_bytes_ + keep_every_ - 1) / keep_every_; | |
464 | ARROW_ASSIGN_OR_RAISE(auto dest, AllocateBuffer(dest_size)); | |
465 | const uint8_t* data = buf->data() + skip_bytes_; | |
466 | uint8_t* out_data = dest->mutable_data(); | |
467 | for (int64_t i = 0; i < dest_size; ++i) { | |
468 | out_data[i] = data[i * keep_every_]; | |
469 | } | |
470 | if (dest_size > 0) { | |
471 | skip_bytes_ = skip_bytes_ + dest_size * keep_every_ - buf->size(); | |
472 | } else { | |
473 | skip_bytes_ = skip_bytes_ - buf->size(); | |
474 | } | |
475 | DCHECK_GE(skip_bytes_, 0); | |
476 | DCHECK_LT(skip_bytes_, keep_every_); | |
477 | return std::shared_ptr<Buffer>(std::move(dest)); | |
478 | } | |
479 | ||
480 | protected: | |
481 | int64_t skip_bytes_ = 0; | |
482 | const int64_t keep_every_; | |
483 | }; | |
484 | ||
485 | template <int N> | |
486 | struct ShrinkingTransform : public BaseShrinkingTransform { | |
487 | ShrinkingTransform() : BaseShrinkingTransform(N) {} | |
488 | }; | |
489 | ||
490 | template <typename T> | |
491 | class TestTransformInputStream : public ::testing::Test { | |
492 | public: | |
493 | TransformInputStream::TransformFunc transform() const { return T(); } | |
494 | ||
495 | void TestEmptyStream() { | |
496 | auto wrapped = std::make_shared<BufferReader>(util::string_view()); | |
497 | auto stream = std::make_shared<TransformInputStream>(wrapped, transform()); | |
498 | ||
499 | ASSERT_OK_AND_EQ(0, stream->Tell()); | |
500 | ASSERT_OK_AND_ASSIGN(auto buf, stream->Read(123)); | |
501 | ASSERT_EQ(buf->size(), 0); | |
502 | ASSERT_OK_AND_ASSIGN(buf, stream->Read(0)); | |
503 | ASSERT_EQ(buf->size(), 0); | |
504 | ASSERT_OK_AND_EQ(0, stream->Read(5, out_data_)); | |
505 | ASSERT_OK_AND_EQ(0, stream->Tell()); | |
506 | } | |
507 | ||
508 | void TestBasics() { | |
509 | auto src = Buffer::FromString("1234567890abcdefghi"); | |
510 | ASSERT_OK_AND_ASSIGN(auto expected, this->transform()(src)); | |
511 | ||
512 | auto stream = std::make_shared<TransformInputStream>( | |
513 | std::make_shared<BufferReader>(src), this->transform()); | |
514 | std::shared_ptr<Buffer> actual; | |
515 | AccumulateReads(stream, 200, &actual); | |
516 | AssertBufferEqual(*actual, *expected); | |
517 | } | |
518 | ||
519 | void TestClose() { | |
520 | auto src = Buffer::FromString("1234567890abcdefghi"); | |
521 | auto stream = std::make_shared<TransformInputStream>( | |
522 | std::make_shared<BufferReader>(src), this->transform()); | |
523 | ASSERT_FALSE(stream->closed()); | |
524 | ASSERT_OK(stream->Close()); | |
525 | ASSERT_TRUE(stream->closed()); | |
526 | ASSERT_RAISES(Invalid, stream->Read(1)); | |
527 | ASSERT_RAISES(Invalid, stream->Read(1, out_data_)); | |
528 | ASSERT_RAISES(Invalid, stream->Tell()); | |
529 | ASSERT_OK(stream->Close()); | |
530 | ASSERT_TRUE(stream->closed()); | |
531 | } | |
532 | ||
533 | void TestChunked() { | |
534 | auto src = Buffer::FromString("1234567890abcdefghi"); | |
535 | ASSERT_OK_AND_ASSIGN(auto expected, this->transform()(src)); | |
536 | ||
537 | auto stream = std::make_shared<TransformInputStream>( | |
538 | std::make_shared<BufferReader>(src), this->transform()); | |
539 | std::shared_ptr<Buffer> actual; | |
540 | AccumulateReads(stream, 5, &actual); | |
541 | AssertBufferEqual(*actual, *expected); | |
542 | } | |
543 | ||
544 | void TestStressChunked() { | |
545 | ASSERT_OK_AND_ASSIGN(auto unique_src, AllocateBuffer(1000)); | |
546 | auto src = std::shared_ptr<Buffer>(std::move(unique_src)); | |
547 | random_bytes(src->size(), /*seed=*/42, src->mutable_data()); | |
548 | ||
549 | ASSERT_OK_AND_ASSIGN(auto expected, this->transform()(src)); | |
550 | ||
551 | std::default_random_engine gen(42); | |
552 | std::uniform_int_distribution<int> chunk_sizes(0, 20); | |
553 | ||
554 | auto stream = std::make_shared<TransformInputStream>( | |
555 | std::make_shared<BufferReader>(src), this->transform()); | |
556 | std::shared_ptr<Buffer> actual; | |
557 | AccumulateReads( | |
558 | stream, [&]() -> int64_t { return chunk_sizes(gen); }, &actual); | |
559 | AssertBufferEqual(*actual, *expected); | |
560 | } | |
561 | ||
562 | void AccumulateReads(const std::shared_ptr<InputStream>& stream, | |
563 | std::function<int64_t()> gen_chunk_sizes, | |
564 | std::shared_ptr<Buffer>* out) { | |
565 | std::vector<std::shared_ptr<Buffer>> buffers; | |
566 | int64_t total_size = 0; | |
567 | while (true) { | |
568 | const int64_t chunk_size = gen_chunk_sizes(); | |
569 | ASSERT_OK_AND_ASSIGN(auto buf, stream->Read(chunk_size)); | |
570 | const int64_t buf_size = buf->size(); | |
571 | total_size += buf_size; | |
572 | ASSERT_OK_AND_EQ(total_size, stream->Tell()); | |
573 | if (chunk_size > 0 && buf_size == 0) { | |
574 | // EOF | |
575 | break; | |
576 | } | |
577 | buffers.push_back(std::move(buf)); | |
578 | if (buf_size < chunk_size) { | |
579 | // Short read should imply EOF on next read | |
580 | ASSERT_OK_AND_ASSIGN(auto buf, stream->Read(100)); | |
581 | ASSERT_EQ(buf->size(), 0); | |
582 | break; | |
583 | } | |
584 | } | |
585 | ASSERT_OK_AND_ASSIGN(*out, ConcatenateBuffers(buffers)); | |
586 | } | |
587 | ||
588 | void AccumulateReads(const std::shared_ptr<InputStream>& stream, int64_t chunk_size, | |
589 | std::shared_ptr<Buffer>* out) { | |
590 | return AccumulateReads( | |
591 | stream, [=]() { return chunk_size; }, out); | |
592 | } | |
593 | ||
594 | protected: | |
595 | uint8_t* out_data_[10]; | |
596 | }; | |
597 | ||
598 | using TransformTypes = | |
599 | ::testing::Types<DoublingTransform, SwappingTransform, ShrinkingTransform<2>, | |
600 | ShrinkingTransform<3>, ShrinkingTransform<7>>; | |
601 | ||
602 | TYPED_TEST_SUITE(TestTransformInputStream, TransformTypes); | |
603 | ||
604 | TYPED_TEST(TestTransformInputStream, EmptyStream) { this->TestEmptyStream(); } | |
605 | ||
606 | TYPED_TEST(TestTransformInputStream, Basics) { this->TestBasics(); } | |
607 | ||
608 | TYPED_TEST(TestTransformInputStream, Close) { this->TestClose(); } | |
609 | ||
610 | TYPED_TEST(TestTransformInputStream, Chunked) { this->TestChunked(); } | |
611 | ||
612 | TYPED_TEST(TestTransformInputStream, StressChunked) { this->TestStressChunked(); } | |
613 | ||
614 | static Result<std::shared_ptr<Buffer>> FailingTransform( | |
615 | const std::shared_ptr<Buffer>& buf) { | |
616 | return Status::UnknownError("Failed transform"); | |
617 | } | |
618 | ||
619 | TEST(TestTransformInputStream, FailingTransform) { | |
620 | auto src = Buffer::FromString("1234567890abcdefghi"); | |
621 | auto stream = std::make_shared<TransformInputStream>( | |
622 | std::make_shared<BufferReader>(src), FailingTransform); | |
623 | ASSERT_RAISES(UnknownError, stream->Read(5)); | |
624 | } | |
625 | ||
626 | // ----------------------------------------------------------------------- | |
627 | // Test various utilities | |
628 | ||
629 | TEST(TestInputStreamIterator, Basics) { | |
630 | auto reader = std::make_shared<BufferReader>(Buffer::FromString("data123456")); | |
631 | ASSERT_OK_AND_ASSIGN(auto it, MakeInputStreamIterator(reader, /*block_size=*/3)); | |
632 | std::shared_ptr<Buffer> buf; | |
633 | ASSERT_OK_AND_ASSIGN(buf, it.Next()); | |
634 | AssertBufferEqual(*buf, "dat"); | |
635 | ASSERT_OK_AND_ASSIGN(buf, it.Next()); | |
636 | AssertBufferEqual(*buf, "a12"); | |
637 | ASSERT_OK_AND_ASSIGN(buf, it.Next()); | |
638 | AssertBufferEqual(*buf, "345"); | |
639 | ASSERT_OK_AND_ASSIGN(buf, it.Next()); | |
640 | AssertBufferEqual(*buf, "6"); | |
641 | ASSERT_OK_AND_ASSIGN(buf, it.Next()); | |
642 | ASSERT_EQ(buf, nullptr); | |
643 | ASSERT_OK_AND_ASSIGN(buf, it.Next()); | |
644 | ASSERT_EQ(buf, nullptr); | |
645 | } | |
646 | ||
647 | TEST(TestInputStreamIterator, Closed) { | |
648 | auto reader = std::make_shared<BufferReader>(Buffer::FromString("data123456")); | |
649 | ASSERT_OK(reader->Close()); | |
650 | ASSERT_RAISES(Invalid, MakeInputStreamIterator(reader, 3)); | |
651 | ||
652 | reader = std::make_shared<BufferReader>(Buffer::FromString("data123456")); | |
653 | ASSERT_OK_AND_ASSIGN(auto it, MakeInputStreamIterator(reader, /*block_size=*/3)); | |
654 | ASSERT_OK_AND_ASSIGN(auto buf, it.Next()); | |
655 | AssertBufferEqual(*buf, "dat"); | |
656 | // Close stream and read from iterator | |
657 | ASSERT_OK(reader->Close()); | |
658 | ASSERT_RAISES(Invalid, it.Next().status()); | |
659 | } | |
660 | ||
661 | TEST(CoalesceReadRanges, Basics) { | |
662 | auto check = [](std::vector<ReadRange> ranges, | |
663 | std::vector<ReadRange> expected) -> void { | |
664 | const int64_t hole_size_limit = 9; | |
665 | const int64_t range_size_limit = 99; | |
666 | auto coalesced = | |
667 | internal::CoalesceReadRanges(ranges, hole_size_limit, range_size_limit); | |
668 | ASSERT_EQ(coalesced, expected); | |
669 | }; | |
670 | ||
671 | check({}, {}); | |
672 | // Zero sized range that ends up in empty list | |
673 | check({{110, 0}}, {}); | |
674 | // Combination on 1 zero sized range and 1 non-zero sized range | |
675 | check({{110, 10}, {120, 0}}, {{110, 10}}); | |
676 | // 1 non-zero sized range | |
677 | check({{110, 10}}, {{110, 10}}); | |
678 | // No holes + unordered ranges | |
679 | check({{130, 10}, {110, 10}, {120, 10}}, {{110, 30}}); | |
680 | // No holes | |
681 | check({{110, 10}, {120, 10}, {130, 10}}, {{110, 30}}); | |
682 | // Small holes only | |
683 | check({{110, 11}, {130, 11}, {150, 11}}, {{110, 51}}); | |
684 | // Large holes | |
685 | check({{110, 10}, {130, 10}}, {{110, 10}, {130, 10}}); | |
686 | check({{110, 11}, {130, 11}, {150, 10}, {170, 11}, {190, 11}}, {{110, 50}, {170, 31}}); | |
687 | ||
688 | // With zero-sized ranges | |
689 | check({{110, 11}, {130, 0}, {130, 11}, {145, 0}, {150, 11}, {200, 0}}, {{110, 51}}); | |
690 | ||
691 | // No holes but large ranges | |
692 | check({{110, 100}, {210, 100}}, {{110, 100}, {210, 100}}); | |
693 | // Small holes and large range in the middle (*) | |
694 | check({{110, 10}, {120, 11}, {140, 100}, {240, 11}, {260, 11}}, | |
695 | {{110, 21}, {140, 100}, {240, 31}}); | |
696 | // Mid-size ranges that would turn large after coalescing | |
697 | check({{100, 50}, {150, 50}}, {{100, 50}, {150, 50}}); | |
698 | check({{100, 30}, {130, 30}, {160, 30}, {190, 30}, {220, 30}}, {{100, 90}, {190, 60}}); | |
699 | ||
700 | // Same as (*) but unsorted | |
701 | check({{140, 100}, {120, 11}, {240, 11}, {110, 10}, {260, 11}}, | |
702 | {{110, 21}, {140, 100}, {240, 31}}); | |
703 | ||
704 | // Completely overlapping ranges should be eliminated | |
705 | check({{20, 5}, {20, 5}, {21, 2}}, {{20, 5}}); | |
706 | } | |
707 | ||
708 | class CountingBufferReader : public BufferReader { | |
709 | public: | |
710 | using BufferReader::BufferReader; | |
711 | Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext& context, int64_t position, | |
712 | int64_t nbytes) override { | |
713 | read_count_++; | |
714 | return BufferReader::ReadAsync(context, position, nbytes); | |
715 | } | |
716 | int64_t read_count() const { return read_count_; } | |
717 | ||
718 | private: | |
719 | int64_t read_count_ = 0; | |
720 | }; | |
721 | ||
722 | TEST(RangeReadCache, Basics) { | |
723 | std::string data = "abcdefghijklmnopqrstuvwxyz"; | |
724 | ||
725 | CacheOptions options = CacheOptions::Defaults(); | |
726 | options.hole_size_limit = 2; | |
727 | options.range_size_limit = 10; | |
728 | ||
729 | for (auto lazy : std::vector<bool>{false, true}) { | |
730 | SCOPED_TRACE(lazy); | |
731 | options.lazy = lazy; | |
732 | auto file = std::make_shared<CountingBufferReader>(Buffer(data)); | |
733 | internal::ReadRangeCache cache(file, {}, options); | |
734 | ||
735 | ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}})); | |
736 | ASSERT_OK(cache.Cache({{10, 4}, {14, 0}, {15, 4}})); | |
737 | ||
738 | ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({20, 2})); | |
739 | AssertBufferEqual(*buf, "uv"); | |
740 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 2})); | |
741 | AssertBufferEqual(*buf, "bc"); | |
742 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({3, 2})); | |
743 | AssertBufferEqual(*buf, "de"); | |
744 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({8, 2})); | |
745 | AssertBufferEqual(*buf, "ij"); | |
746 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({10, 4})); | |
747 | AssertBufferEqual(*buf, "klmn"); | |
748 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({15, 4})); | |
749 | AssertBufferEqual(*buf, "pqrs"); | |
750 | ASSERT_FINISHES_OK(cache.WaitFor({{15, 1}, {16, 3}, {25, 0}, {1, 2}})); | |
751 | // Zero-sized | |
752 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({14, 0})); | |
753 | AssertBufferEqual(*buf, ""); | |
754 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({25, 0})); | |
755 | AssertBufferEqual(*buf, ""); | |
756 | ||
757 | // Non-cached ranges | |
758 | ASSERT_RAISES(Invalid, cache.Read({20, 3})); | |
759 | ASSERT_RAISES(Invalid, cache.Read({19, 3})); | |
760 | ASSERT_RAISES(Invalid, cache.Read({0, 3})); | |
761 | ASSERT_RAISES(Invalid, cache.Read({25, 2})); | |
762 | ASSERT_FINISHES_AND_RAISES(Invalid, cache.WaitFor({{25, 2}})); | |
763 | ASSERT_FINISHES_AND_RAISES(Invalid, cache.WaitFor({{1, 2}, {25, 2}})); | |
764 | ||
765 | ASSERT_FINISHES_OK(cache.Wait()); | |
766 | // 8 ranges should lead to less than 8 reads | |
767 | ASSERT_LT(file->read_count(), 8); | |
768 | } | |
769 | } | |
770 | ||
771 | TEST(RangeReadCache, Concurrency) { | |
772 | std::string data = "abcdefghijklmnopqrstuvwxyz"; | |
773 | ||
774 | auto file = std::make_shared<BufferReader>(Buffer(data)); | |
775 | std::vector<ReadRange> ranges{{1, 2}, {3, 2}, {8, 2}, {20, 2}, | |
776 | {25, 0}, {10, 4}, {14, 0}, {15, 4}}; | |
777 | ||
778 | for (auto lazy : std::vector<bool>{false, true}) { | |
779 | SCOPED_TRACE(lazy); | |
780 | CacheOptions options = CacheOptions::Defaults(); | |
781 | options.hole_size_limit = 2; | |
782 | options.range_size_limit = 10; | |
783 | options.lazy = lazy; | |
784 | ||
785 | { | |
786 | internal::ReadRangeCache cache(file, {}, options); | |
787 | ASSERT_OK(cache.Cache(ranges)); | |
788 | std::vector<Future<std::shared_ptr<Buffer>>> futures; | |
789 | for (const auto& range : ranges) { | |
790 | futures.push_back( | |
791 | cache.WaitFor({range}).Then([&cache, range]() { return cache.Read(range); })); | |
792 | } | |
793 | for (auto fut : futures) { | |
794 | ASSERT_FINISHES_OK(fut); | |
795 | } | |
796 | } | |
797 | { | |
798 | internal::ReadRangeCache cache(file, {}, options); | |
799 | ASSERT_OK(cache.Cache(ranges)); | |
800 | ASSERT_OK(arrow::internal::ParallelFor( | |
801 | static_cast<int>(ranges.size()), | |
802 | [&](int index) { return cache.Read(ranges[index]).status(); })); | |
803 | } | |
804 | } | |
805 | } | |
806 | ||
807 | TEST(RangeReadCache, Lazy) { | |
808 | std::string data = "abcdefghijklmnopqrstuvwxyz"; | |
809 | ||
810 | auto file = std::make_shared<CountingBufferReader>(Buffer(data)); | |
811 | CacheOptions options = CacheOptions::LazyDefaults(); | |
812 | options.hole_size_limit = 2; | |
813 | options.range_size_limit = 10; | |
814 | internal::ReadRangeCache cache(file, {}, options); | |
815 | ||
816 | ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}})); | |
817 | ASSERT_OK(cache.Cache({{10, 4}, {14, 0}, {15, 4}})); | |
818 | ||
819 | // Lazy cache doesn't fetch ranges until requested | |
820 | ASSERT_EQ(0, file->read_count()); | |
821 | ||
822 | ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({20, 2})); | |
823 | AssertBufferEqual(*buf, "uv"); | |
824 | ASSERT_EQ(1, file->read_count()); | |
825 | ||
826 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 4})); | |
827 | AssertBufferEqual(*buf, "bcde"); | |
828 | ASSERT_EQ(2, file->read_count()); | |
829 | ||
830 | // Requested ranges are still cached | |
831 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 4})); | |
832 | ASSERT_EQ(2, file->read_count()); | |
833 | ||
834 | // Non-cached ranges | |
835 | ASSERT_RAISES(Invalid, cache.Read({20, 3})); | |
836 | ASSERT_RAISES(Invalid, cache.Read({19, 3})); | |
837 | ASSERT_RAISES(Invalid, cache.Read({0, 3})); | |
838 | ASSERT_RAISES(Invalid, cache.Read({25, 2})); | |
839 | ||
840 | // Can asynchronously kick off a read (though BufferReader::ReadAsync is synchronous so | |
841 | // it will increment the read count here) | |
842 | ASSERT_FINISHES_OK(cache.WaitFor({{10, 2}, {15, 4}})); | |
843 | ASSERT_EQ(3, file->read_count()); | |
844 | ASSERT_OK_AND_ASSIGN(buf, cache.Read({10, 2})); | |
845 | ASSERT_EQ(3, file->read_count()); | |
846 | } | |
847 | ||
848 | TEST(CacheOptions, Basics) { | |
849 | auto check = [](const CacheOptions actual, const double expected_hole_size_limit_MiB, | |
850 | const double expected_range_size_limit_MiB) -> void { | |
851 | const CacheOptions expected = { | |
852 | static_cast<int64_t>(std::round(expected_hole_size_limit_MiB * 1024 * 1024)), | |
853 | static_cast<int64_t>(std::round(expected_range_size_limit_MiB * 1024 * 1024)), | |
854 | /*lazy=*/false}; | |
855 | ASSERT_EQ(actual, expected); | |
856 | }; | |
857 | ||
858 | // Test: normal usage. | |
859 | // TTFB = 5 ms, BW = 500 MiB/s, | |
860 | // we expect hole_size_limit = 2.5 MiB, and range_size_limit = 22.5 MiB | |
861 | check(CacheOptions::MakeFromNetworkMetrics(5, 500), 2.5, 22.5); | |
862 | // Test: custom bandwidth utilization. | |
863 | // TTFB = 5 ms, BW = 500 MiB/s, BW_utilization = 75%, | |
864 | // we expect a change in range_size_limit = 7.5 MiB. | |
865 | check(CacheOptions::MakeFromNetworkMetrics(5, 500, .75), 2.5, 7.5); | |
866 | // Test: custom max_ideal_request_size, range_size_limit gets capped. | |
867 | // TTFB = 5 ms, BW = 500 MiB/s, BW_utilization = 75%, max_ideal_request_size = 5 MiB, | |
868 | // we expect the range_size_limit to be capped at 5 MiB. | |
869 | check(CacheOptions::MakeFromNetworkMetrics(5, 500, .75, 5), 2.5, 5); | |
870 | } | |
871 | ||
872 | TEST(IOThreadPool, Capacity) { | |
873 | // Simple sanity check | |
874 | auto pool = internal::GetIOThreadPool(); | |
875 | int capacity = pool->GetCapacity(); | |
876 | ASSERT_GT(capacity, 0); | |
877 | ASSERT_EQ(GetIOThreadPoolCapacity(), capacity); | |
878 | ASSERT_OK(SetIOThreadPoolCapacity(capacity + 1)); | |
879 | ASSERT_EQ(GetIOThreadPoolCapacity(), capacity + 1); | |
880 | } | |
881 | ||
882 | } // namespace io | |
883 | } // namespace arrow |