]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "arrow/io/slow.h" | |
19 | ||
20 | #include <algorithm> | |
21 | #include <cstring> | |
22 | #include <mutex> | |
23 | #include <random> | |
24 | #include <thread> | |
25 | #include <utility> | |
26 | ||
27 | #include "arrow/buffer.h" | |
28 | #include "arrow/io/util_internal.h" | |
29 | #include "arrow/result.h" | |
30 | #include "arrow/status.h" | |
31 | #include "arrow/util/io_util.h" | |
32 | #include "arrow/util/logging.h" | |
33 | ||
34 | namespace arrow { | |
35 | namespace io { | |
36 | ||
37 | // Multiply the average by this ratio to get the intended standard deviation | |
38 | static constexpr double kStandardDeviationRatio = 0.1; | |
39 | ||
40 | class LatencyGeneratorImpl : public LatencyGenerator { | |
41 | public: | |
42 | ~LatencyGeneratorImpl() override = default; | |
43 | ||
44 | LatencyGeneratorImpl(double average_latency, int32_t seed) | |
45 | : gen_(static_cast<decltype(gen_)::result_type>(seed)), | |
46 | latency_dist_(average_latency, average_latency * kStandardDeviationRatio) {} | |
47 | ||
48 | double NextLatency() override { | |
49 | // std::random distributions are unlikely to be thread-safe, and | |
50 | // a RandomAccessFile may be called from multiple threads | |
51 | std::lock_guard<std::mutex> lock(mutex_); | |
52 | return std::max<double>(0.0, latency_dist_(gen_)); | |
53 | } | |
54 | ||
55 | private: | |
56 | std::default_random_engine gen_; | |
57 | std::normal_distribution<double> latency_dist_; | |
58 | std::mutex mutex_; | |
59 | }; | |
60 | ||
61 | LatencyGenerator::~LatencyGenerator() {} | |
62 | ||
63 | void LatencyGenerator::Sleep() { | |
64 | std::this_thread::sleep_for(std::chrono::duration<double>(NextLatency())); | |
65 | } | |
66 | ||
67 | std::shared_ptr<LatencyGenerator> LatencyGenerator::Make(double average_latency) { | |
68 | return std::make_shared<LatencyGeneratorImpl>( | |
69 | average_latency, static_cast<int32_t>(::arrow::internal::GetRandomSeed())); | |
70 | } | |
71 | ||
72 | std::shared_ptr<LatencyGenerator> LatencyGenerator::Make(double average_latency, | |
73 | int32_t seed) { | |
74 | return std::make_shared<LatencyGeneratorImpl>(average_latency, seed); | |
75 | } | |
76 | ||
77 | ////////////////////////////////////////////////////////////////////////// | |
78 | // SlowInputStream implementation | |
79 | ||
80 | SlowInputStream::~SlowInputStream() { internal::CloseFromDestructor(this); } | |
81 | ||
82 | Status SlowInputStream::Close() { return stream_->Close(); } | |
83 | ||
84 | Status SlowInputStream::Abort() { return stream_->Abort(); } | |
85 | ||
86 | bool SlowInputStream::closed() const { return stream_->closed(); } | |
87 | ||
88 | Result<int64_t> SlowInputStream::Tell() const { return stream_->Tell(); } | |
89 | ||
90 | Result<int64_t> SlowInputStream::Read(int64_t nbytes, void* out) { | |
91 | latencies_->Sleep(); | |
92 | return stream_->Read(nbytes, out); | |
93 | } | |
94 | ||
95 | Result<std::shared_ptr<Buffer>> SlowInputStream::Read(int64_t nbytes) { | |
96 | latencies_->Sleep(); | |
97 | return stream_->Read(nbytes); | |
98 | } | |
99 | ||
100 | Result<util::string_view> SlowInputStream::Peek(int64_t nbytes) { | |
101 | return stream_->Peek(nbytes); | |
102 | } | |
103 | ||
104 | ////////////////////////////////////////////////////////////////////////// | |
105 | // SlowRandomAccessFile implementation | |
106 | ||
107 | SlowRandomAccessFile::~SlowRandomAccessFile() { internal::CloseFromDestructor(this); } | |
108 | ||
109 | Status SlowRandomAccessFile::Close() { return stream_->Close(); } | |
110 | ||
111 | Status SlowRandomAccessFile::Abort() { return stream_->Abort(); } | |
112 | ||
113 | bool SlowRandomAccessFile::closed() const { return stream_->closed(); } | |
114 | ||
115 | Result<int64_t> SlowRandomAccessFile::GetSize() { return stream_->GetSize(); } | |
116 | ||
117 | Status SlowRandomAccessFile::Seek(int64_t position) { return stream_->Seek(position); } | |
118 | ||
119 | Result<int64_t> SlowRandomAccessFile::Tell() const { return stream_->Tell(); } | |
120 | ||
121 | Result<int64_t> SlowRandomAccessFile::Read(int64_t nbytes, void* out) { | |
122 | latencies_->Sleep(); | |
123 | return stream_->Read(nbytes, out); | |
124 | } | |
125 | ||
126 | Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::Read(int64_t nbytes) { | |
127 | latencies_->Sleep(); | |
128 | return stream_->Read(nbytes); | |
129 | } | |
130 | ||
131 | Result<int64_t> SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes, | |
132 | void* out) { | |
133 | latencies_->Sleep(); | |
134 | return stream_->ReadAt(position, nbytes, out); | |
135 | } | |
136 | ||
137 | Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::ReadAt(int64_t position, | |
138 | int64_t nbytes) { | |
139 | latencies_->Sleep(); | |
140 | return stream_->ReadAt(position, nbytes); | |
141 | } | |
142 | ||
143 | Result<util::string_view> SlowRandomAccessFile::Peek(int64_t nbytes) { | |
144 | return stream_->Peek(nbytes); | |
145 | } | |
146 | ||
147 | } // namespace io | |
148 | } // namespace arrow |