]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #pragma once | |
19 | ||
20 | #include <memory> | |
21 | ||
22 | #include "arrow/io/interfaces.h" | |
23 | #include "arrow/result.h" | |
24 | #include "arrow/status.h" | |
25 | #include "arrow/util/checked_cast.h" | |
26 | #include "arrow/util/macros.h" | |
27 | #include "arrow/util/visibility.h" | |
28 | ||
29 | namespace arrow { | |
30 | namespace io { | |
31 | namespace internal { | |
32 | ||
33 | template <class LockType> | |
34 | class SharedLockGuard { | |
35 | public: | |
36 | explicit SharedLockGuard(LockType* lock) : lock_(lock) { lock_->LockShared(); } | |
37 | ||
38 | ~SharedLockGuard() { lock_->UnlockShared(); } | |
39 | ||
40 | protected: | |
41 | LockType* lock_; | |
42 | }; | |
43 | ||
44 | template <class LockType> | |
45 | class ExclusiveLockGuard { | |
46 | public: | |
47 | explicit ExclusiveLockGuard(LockType* lock) : lock_(lock) { lock_->LockExclusive(); } | |
48 | ||
49 | ~ExclusiveLockGuard() { lock_->UnlockExclusive(); } | |
50 | ||
51 | protected: | |
52 | LockType* lock_; | |
53 | }; | |
54 | ||
55 | // Debug concurrency checker that marks "shared" and "exclusive" code sections, | |
56 | // aborting if the concurrency rules get violated. Does nothing in release mode. | |
57 | // Note that we intentionally use the same class declaration in debug and | |
58 | // release builds in order to avoid runtime failures when e.g. loading a | |
59 | // release-built DLL with a debug-built application, or the reverse. | |
60 | ||
61 | class ARROW_EXPORT SharedExclusiveChecker { | |
62 | public: | |
63 | SharedExclusiveChecker(); | |
64 | void LockShared(); | |
65 | void UnlockShared(); | |
66 | void LockExclusive(); | |
67 | void UnlockExclusive(); | |
68 | ||
69 | SharedLockGuard<SharedExclusiveChecker> shared_guard() { | |
70 | return SharedLockGuard<SharedExclusiveChecker>(this); | |
71 | } | |
72 | ||
73 | ExclusiveLockGuard<SharedExclusiveChecker> exclusive_guard() { | |
74 | return ExclusiveLockGuard<SharedExclusiveChecker>(this); | |
75 | } | |
76 | ||
77 | protected: | |
78 | struct Impl; | |
79 | std::shared_ptr<Impl> impl_; | |
80 | }; | |
81 | ||
82 | // Concurrency wrappers for IO classes that check the correctness of | |
83 | // concurrent calls to various methods. It is not necessary to wrap all | |
84 | // IO classes with these, only a few core classes that get used in tests. | |
85 | // | |
86 | // We're not using virtual inheritance here as virtual bases have poorly | |
87 | // understood semantic overhead which we'd be passing on to implementers | |
88 | // and users of these interfaces. Instead, we just duplicate the method | |
89 | // wrappers between those two classes. | |
90 | ||
91 | template <class Derived> | |
92 | class ARROW_EXPORT InputStreamConcurrencyWrapper : public InputStream { | |
93 | public: | |
94 | Status Close() final { | |
95 | auto guard = lock_.exclusive_guard(); | |
96 | return derived()->DoClose(); | |
97 | } | |
98 | ||
99 | Status Abort() final { | |
100 | auto guard = lock_.exclusive_guard(); | |
101 | return derived()->DoAbort(); | |
102 | } | |
103 | ||
104 | Result<int64_t> Tell() const final { | |
105 | auto guard = lock_.exclusive_guard(); | |
106 | return derived()->DoTell(); | |
107 | } | |
108 | ||
109 | Result<int64_t> Read(int64_t nbytes, void* out) final { | |
110 | auto guard = lock_.exclusive_guard(); | |
111 | return derived()->DoRead(nbytes, out); | |
112 | } | |
113 | ||
114 | Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final { | |
115 | auto guard = lock_.exclusive_guard(); | |
116 | return derived()->DoRead(nbytes); | |
117 | } | |
118 | ||
119 | Result<util::string_view> Peek(int64_t nbytes) final { | |
120 | auto guard = lock_.exclusive_guard(); | |
121 | return derived()->DoPeek(nbytes); | |
122 | } | |
123 | ||
124 | /* | |
125 | Methods to implement in derived class: | |
126 | ||
127 | Status DoClose(); | |
128 | Result<int64_t> DoTell() const; | |
129 | Result<int64_t> DoRead(int64_t nbytes, void* out); | |
130 | Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes); | |
131 | ||
132 | And optionally: | |
133 | ||
134 | Status DoAbort() override; | |
135 | Result<util::string_view> DoPeek(int64_t nbytes) override; | |
136 | ||
137 | These methods should be protected in the derived class and | |
138 | InputStreamConcurrencyWrapper declared as a friend with | |
139 | ||
140 | friend InputStreamConcurrencyWrapper<derived>; | |
141 | */ | |
142 | ||
143 | protected: | |
144 | // Default implementations. They are virtual because the derived class may | |
145 | // have derived classes itself. | |
146 | virtual Status DoAbort() { return derived()->DoClose(); } | |
147 | ||
148 | virtual Result<util::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) { | |
149 | return Status::NotImplemented("Peek not implemented"); | |
150 | } | |
151 | ||
152 | Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); } | |
153 | ||
154 | const Derived* derived() const { | |
155 | return ::arrow::internal::checked_cast<const Derived*>(this); | |
156 | } | |
157 | ||
158 | mutable SharedExclusiveChecker lock_; | |
159 | }; | |
160 | ||
161 | template <class Derived> | |
162 | class ARROW_EXPORT RandomAccessFileConcurrencyWrapper : public RandomAccessFile { | |
163 | public: | |
164 | Status Close() final { | |
165 | auto guard = lock_.exclusive_guard(); | |
166 | return derived()->DoClose(); | |
167 | } | |
168 | ||
169 | Status Abort() final { | |
170 | auto guard = lock_.exclusive_guard(); | |
171 | return derived()->DoAbort(); | |
172 | } | |
173 | ||
174 | Result<int64_t> Tell() const final { | |
175 | auto guard = lock_.exclusive_guard(); | |
176 | return derived()->DoTell(); | |
177 | } | |
178 | ||
179 | Result<int64_t> Read(int64_t nbytes, void* out) final { | |
180 | auto guard = lock_.exclusive_guard(); | |
181 | return derived()->DoRead(nbytes, out); | |
182 | } | |
183 | ||
184 | Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final { | |
185 | auto guard = lock_.exclusive_guard(); | |
186 | return derived()->DoRead(nbytes); | |
187 | } | |
188 | ||
189 | Result<util::string_view> Peek(int64_t nbytes) final { | |
190 | auto guard = lock_.exclusive_guard(); | |
191 | return derived()->DoPeek(nbytes); | |
192 | } | |
193 | ||
194 | Status Seek(int64_t position) final { | |
195 | auto guard = lock_.exclusive_guard(); | |
196 | return derived()->DoSeek(position); | |
197 | } | |
198 | ||
199 | Result<int64_t> GetSize() final { | |
200 | auto guard = lock_.shared_guard(); | |
201 | return derived()->DoGetSize(); | |
202 | } | |
203 | ||
204 | // NOTE: ReadAt doesn't use stream pointer, but it is allowed to update it | |
205 | // (it's the case on Windows when using ReadFileEx). | |
206 | // So any method that relies on the current position (even if it doesn't | |
207 | // update it, such as Peek) cannot run in parallel with ReadAt and has | |
208 | // to use the exclusive_guard. | |
209 | ||
210 | Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final { | |
211 | auto guard = lock_.shared_guard(); | |
212 | return derived()->DoReadAt(position, nbytes, out); | |
213 | } | |
214 | ||
215 | Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) final { | |
216 | auto guard = lock_.shared_guard(); | |
217 | return derived()->DoReadAt(position, nbytes); | |
218 | } | |
219 | ||
220 | /* | |
221 | Methods to implement in derived class: | |
222 | ||
223 | Status DoClose(); | |
224 | Result<int64_t> DoTell() const; | |
225 | Result<int64_t> DoRead(int64_t nbytes, void* out); | |
226 | Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes); | |
227 | Status DoSeek(int64_t position); | |
228 | Result<int64_t> DoGetSize() | |
229 | Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out); | |
230 | Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes); | |
231 | ||
232 | And optionally: | |
233 | ||
234 | Status DoAbort() override; | |
235 | Result<util::string_view> DoPeek(int64_t nbytes) override; | |
236 | ||
237 | These methods should be protected in the derived class and | |
238 | RandomAccessFileConcurrencyWrapper declared as a friend with | |
239 | ||
240 | friend RandomAccessFileConcurrencyWrapper<derived>; | |
241 | */ | |
242 | ||
243 | protected: | |
244 | // Default implementations. They are virtual because the derived class may | |
245 | // have derived classes itself. | |
246 | virtual Status DoAbort() { return derived()->DoClose(); } | |
247 | ||
248 | virtual Result<util::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) { | |
249 | return Status::NotImplemented("Peek not implemented"); | |
250 | } | |
251 | ||
252 | Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); } | |
253 | ||
254 | const Derived* derived() const { | |
255 | return ::arrow::internal::checked_cast<const Derived*>(this); | |
256 | } | |
257 | ||
258 | mutable SharedExclusiveChecker lock_; | |
259 | }; | |
260 | ||
261 | } // namespace internal | |
262 | } // namespace io | |
263 | } // namespace arrow |