]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/io/concurrency.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / concurrency.h
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#pragma once
19
20#include <memory>
21
22#include "arrow/io/interfaces.h"
23#include "arrow/result.h"
24#include "arrow/status.h"
25#include "arrow/util/checked_cast.h"
26#include "arrow/util/macros.h"
27#include "arrow/util/visibility.h"
28
29namespace arrow {
30namespace io {
31namespace internal {
32
33template <class LockType>
34class SharedLockGuard {
35 public:
36 explicit SharedLockGuard(LockType* lock) : lock_(lock) { lock_->LockShared(); }
37
38 ~SharedLockGuard() { lock_->UnlockShared(); }
39
40 protected:
41 LockType* lock_;
42};
43
44template <class LockType>
45class ExclusiveLockGuard {
46 public:
47 explicit ExclusiveLockGuard(LockType* lock) : lock_(lock) { lock_->LockExclusive(); }
48
49 ~ExclusiveLockGuard() { lock_->UnlockExclusive(); }
50
51 protected:
52 LockType* lock_;
53};
54
55// Debug concurrency checker that marks "shared" and "exclusive" code sections,
56// aborting if the concurrency rules get violated. Does nothing in release mode.
57// Note that we intentionally use the same class declaration in debug and
58// release builds in order to avoid runtime failures when e.g. loading a
59// release-built DLL with a debug-built application, or the reverse.
60
61class ARROW_EXPORT SharedExclusiveChecker {
62 public:
63 SharedExclusiveChecker();
64 void LockShared();
65 void UnlockShared();
66 void LockExclusive();
67 void UnlockExclusive();
68
69 SharedLockGuard<SharedExclusiveChecker> shared_guard() {
70 return SharedLockGuard<SharedExclusiveChecker>(this);
71 }
72
73 ExclusiveLockGuard<SharedExclusiveChecker> exclusive_guard() {
74 return ExclusiveLockGuard<SharedExclusiveChecker>(this);
75 }
76
77 protected:
78 struct Impl;
79 std::shared_ptr<Impl> impl_;
80};
81
82// Concurrency wrappers for IO classes that check the correctness of
83// concurrent calls to various methods. It is not necessary to wrap all
84// IO classes with these, only a few core classes that get used in tests.
85//
86// We're not using virtual inheritance here as virtual bases have poorly
87// understood semantic overhead which we'd be passing on to implementers
88// and users of these interfaces. Instead, we just duplicate the method
89// wrappers between those two classes.
90
91template <class Derived>
92class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream {
93 public:
94 Status Close() final {
95 auto guard = lock_.exclusive_guard();
96 return derived()->DoClose();
97 }
98
99 Status Abort() final {
100 auto guard = lock_.exclusive_guard();
101 return derived()->DoAbort();
102 }
103
104 Result<int64_t> Tell() const final {
105 auto guard = lock_.exclusive_guard();
106 return derived()->DoTell();
107 }
108
109 Result<int64_t> Read(int64_t nbytes, void* out) final {
110 auto guard = lock_.exclusive_guard();
111 return derived()->DoRead(nbytes, out);
112 }
113
114 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final {
115 auto guard = lock_.exclusive_guard();
116 return derived()->DoRead(nbytes);
117 }
118
119 Result<util::string_view> Peek(int64_t nbytes) final {
120 auto guard = lock_.exclusive_guard();
121 return derived()->DoPeek(nbytes);
122 }
123
124 /*
125 Methods to implement in derived class:
126
127 Status DoClose();
128 Result<int64_t> DoTell() const;
129 Result<int64_t> DoRead(int64_t nbytes, void* out);
130 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
131
132 And optionally:
133
134 Status DoAbort() override;
135 Result<util::string_view> DoPeek(int64_t nbytes) override;
136
137 These methods should be protected in the derived class and
138 InputStreamConcurrencyWrapper declared as a friend with
139
140 friend InputStreamConcurrencyWrapper<derived>;
141 */
142
143 protected:
144 // Default implementations. They are virtual because the derived class may
145 // have derived classes itself.
146 virtual Status DoAbort() { return derived()->DoClose(); }
147
148 virtual Result<util::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) {
149 return Status::NotImplemented("Peek not implemented");
150 }
151
152 Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); }
153
154 const Derived* derived() const {
155 return ::arrow::internal::checked_cast<const Derived*>(this);
156 }
157
158 mutable SharedExclusiveChecker lock_;
159};
160
161template <class Derived>
162class ARROW_EXPORT RandomAccessFileConcurrencyWrapper : public RandomAccessFile {
163 public:
164 Status Close() final {
165 auto guard = lock_.exclusive_guard();
166 return derived()->DoClose();
167 }
168
169 Status Abort() final {
170 auto guard = lock_.exclusive_guard();
171 return derived()->DoAbort();
172 }
173
174 Result<int64_t> Tell() const final {
175 auto guard = lock_.exclusive_guard();
176 return derived()->DoTell();
177 }
178
179 Result<int64_t> Read(int64_t nbytes, void* out) final {
180 auto guard = lock_.exclusive_guard();
181 return derived()->DoRead(nbytes, out);
182 }
183
184 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final {
185 auto guard = lock_.exclusive_guard();
186 return derived()->DoRead(nbytes);
187 }
188
189 Result<util::string_view> Peek(int64_t nbytes) final {
190 auto guard = lock_.exclusive_guard();
191 return derived()->DoPeek(nbytes);
192 }
193
194 Status Seek(int64_t position) final {
195 auto guard = lock_.exclusive_guard();
196 return derived()->DoSeek(position);
197 }
198
199 Result<int64_t> GetSize() final {
200 auto guard = lock_.shared_guard();
201 return derived()->DoGetSize();
202 }
203
204 // NOTE: ReadAt doesn't use stream pointer, but it is allowed to update it
205 // (it's the case on Windows when using ReadFileEx).
206 // So any method that relies on the current position (even if it doesn't
207 // update it, such as Peek) cannot run in parallel with ReadAt and has
208 // to use the exclusive_guard.
209
210 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final {
211 auto guard = lock_.shared_guard();
212 return derived()->DoReadAt(position, nbytes, out);
213 }
214
215 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) final {
216 auto guard = lock_.shared_guard();
217 return derived()->DoReadAt(position, nbytes);
218 }
219
220 /*
221 Methods to implement in derived class:
222
223 Status DoClose();
224 Result<int64_t> DoTell() const;
225 Result<int64_t> DoRead(int64_t nbytes, void* out);
226 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
227 Status DoSeek(int64_t position);
228 Result<int64_t> DoGetSize()
229 Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
230 Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
231
232 And optionally:
233
234 Status DoAbort() override;
235 Result<util::string_view> DoPeek(int64_t nbytes) override;
236
237 These methods should be protected in the derived class and
238 RandomAccessFileConcurrencyWrapper declared as a friend with
239
240 friend RandomAccessFileConcurrencyWrapper<derived>;
241 */
242
243 protected:
244 // Default implementations. They are virtual because the derived class may
245 // have derived classes itself.
246 virtual Status DoAbort() { return derived()->DoClose(); }
247
248 virtual Result<util::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) {
249 return Status::NotImplemented("Peek not implemented");
250 }
251
252 Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); }
253
254 const Derived* derived() const {
255 return ::arrow::internal::checked_cast<const Derived*>(this);
256 }
257
258 mutable SharedExclusiveChecker lock_;
259};
260
261} // namespace internal
262} // namespace io
263} // namespace arrow