]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/io/slow.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / slow.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/io/slow.h"
19
20#include <algorithm>
21#include <cstring>
22#include <mutex>
23#include <random>
24#include <thread>
25#include <utility>
26
27#include "arrow/buffer.h"
28#include "arrow/io/util_internal.h"
29#include "arrow/result.h"
30#include "arrow/status.h"
31#include "arrow/util/io_util.h"
32#include "arrow/util/logging.h"
33
34namespace arrow {
35namespace io {
36
37// Multiply the average by this ratio to get the intended standard deviation
38static constexpr double kStandardDeviationRatio = 0.1;
39
40class LatencyGeneratorImpl : public LatencyGenerator {
41 public:
42 ~LatencyGeneratorImpl() override = default;
43
44 LatencyGeneratorImpl(double average_latency, int32_t seed)
45 : gen_(static_cast<decltype(gen_)::result_type>(seed)),
46 latency_dist_(average_latency, average_latency * kStandardDeviationRatio) {}
47
48 double NextLatency() override {
49 // std::random distributions are unlikely to be thread-safe, and
50 // a RandomAccessFile may be called from multiple threads
51 std::lock_guard<std::mutex> lock(mutex_);
52 return std::max<double>(0.0, latency_dist_(gen_));
53 }
54
55 private:
56 std::default_random_engine gen_;
57 std::normal_distribution<double> latency_dist_;
58 std::mutex mutex_;
59};
60
61LatencyGenerator::~LatencyGenerator() {}
62
63void LatencyGenerator::Sleep() {
64 std::this_thread::sleep_for(std::chrono::duration<double>(NextLatency()));
65}
66
67std::shared_ptr<LatencyGenerator> LatencyGenerator::Make(double average_latency) {
68 return std::make_shared<LatencyGeneratorImpl>(
69 average_latency, static_cast<int32_t>(::arrow::internal::GetRandomSeed()));
70}
71
72std::shared_ptr<LatencyGenerator> LatencyGenerator::Make(double average_latency,
73 int32_t seed) {
74 return std::make_shared<LatencyGeneratorImpl>(average_latency, seed);
75}
76
77//////////////////////////////////////////////////////////////////////////
78// SlowInputStream implementation
79
80SlowInputStream::~SlowInputStream() { internal::CloseFromDestructor(this); }
81
82Status SlowInputStream::Close() { return stream_->Close(); }
83
84Status SlowInputStream::Abort() { return stream_->Abort(); }
85
86bool SlowInputStream::closed() const { return stream_->closed(); }
87
88Result<int64_t> SlowInputStream::Tell() const { return stream_->Tell(); }
89
90Result<int64_t> SlowInputStream::Read(int64_t nbytes, void* out) {
91 latencies_->Sleep();
92 return stream_->Read(nbytes, out);
93}
94
95Result<std::shared_ptr<Buffer>> SlowInputStream::Read(int64_t nbytes) {
96 latencies_->Sleep();
97 return stream_->Read(nbytes);
98}
99
100Result<util::string_view> SlowInputStream::Peek(int64_t nbytes) {
101 return stream_->Peek(nbytes);
102}
103
104//////////////////////////////////////////////////////////////////////////
105// SlowRandomAccessFile implementation
106
107SlowRandomAccessFile::~SlowRandomAccessFile() { internal::CloseFromDestructor(this); }
108
109Status SlowRandomAccessFile::Close() { return stream_->Close(); }
110
111Status SlowRandomAccessFile::Abort() { return stream_->Abort(); }
112
113bool SlowRandomAccessFile::closed() const { return stream_->closed(); }
114
115Result<int64_t> SlowRandomAccessFile::GetSize() { return stream_->GetSize(); }
116
117Status SlowRandomAccessFile::Seek(int64_t position) { return stream_->Seek(position); }
118
119Result<int64_t> SlowRandomAccessFile::Tell() const { return stream_->Tell(); }
120
121Result<int64_t> SlowRandomAccessFile::Read(int64_t nbytes, void* out) {
122 latencies_->Sleep();
123 return stream_->Read(nbytes, out);
124}
125
126Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::Read(int64_t nbytes) {
127 latencies_->Sleep();
128 return stream_->Read(nbytes);
129}
130
131Result<int64_t> SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
132 void* out) {
133 latencies_->Sleep();
134 return stream_->ReadAt(position, nbytes, out);
135}
136
137Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::ReadAt(int64_t position,
138 int64_t nbytes) {
139 latencies_->Sleep();
140 return stream_->ReadAt(position, nbytes);
141}
142
143Result<util::string_view> SlowRandomAccessFile::Peek(int64_t nbytes) {
144 return stream_->Peek(nbytes);
145}
146
147} // namespace io
148} // namespace arrow