1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
25 #include "arrow/io/type_fwd.h"
26 #include "arrow/type_fwd.h"
27 #include "arrow/util/cancel.h"
28 #include "arrow/util/macros.h"
29 #include "arrow/util/string_view.h"
30 #include "arrow/util/type_fwd.h"
31 #include "arrow/util/visibility.h"
40 friend bool operator==(const ReadRange
& left
, const ReadRange
& right
) {
41 return (left
.offset
== right
.offset
&& left
.length
== right
.length
);
43 friend bool operator!=(const ReadRange
& left
, const ReadRange
& right
) {
44 return !(left
== right
);
47 bool Contains(const ReadRange
& other
) const {
48 return (offset
<= other
.offset
&& offset
+ length
>= other
.offset
+ other
.length
);
52 /// EXPERIMENTAL: options provider for IO tasks
54 /// Includes an Executor (which will be used to execute asynchronous reads),
55 /// a MemoryPool (which will be used to allocate buffers when zero copy reads
56 /// are not possible), and an external id (in case the executor receives tasks from
57 /// multiple sources and must distinguish tasks associated with this IOContext).
58 struct ARROW_EXPORT IOContext
{
59 // No specified executor: will use a global IO thread pool
60 IOContext() : IOContext(default_memory_pool(), StopToken::Unstoppable()) {}
62 explicit IOContext(StopToken stop_token
)
63 : IOContext(default_memory_pool(), std::move(stop_token
)) {}
65 explicit IOContext(MemoryPool
* pool
, StopToken stop_token
= StopToken::Unstoppable());
67 explicit IOContext(MemoryPool
* pool
, ::arrow::internal::Executor
* executor
,
68 StopToken stop_token
= StopToken::Unstoppable(),
69 int64_t external_id
= -1)
72 external_id_(external_id
),
73 stop_token_(std::move(stop_token
)) {}
75 explicit IOContext(::arrow::internal::Executor
* executor
,
76 StopToken stop_token
= StopToken::Unstoppable(),
77 int64_t external_id
= -1)
78 : pool_(default_memory_pool()),
80 external_id_(external_id
),
81 stop_token_(std::move(stop_token
)) {}
83 MemoryPool
* pool() const { return pool_
; }
85 ::arrow::internal::Executor
* executor() const { return executor_
; }
87 // An application-specific ID, forwarded to executor task submissions
88 int64_t external_id() const { return external_id_
; }
90 StopToken
stop_token() const { return stop_token_
; }
94 ::arrow::internal::Executor
* executor_
;
96 StopToken stop_token_
;
99 struct ARROW_DEPRECATED("renamed to IOContext in 4.0.0") AsyncContext
: public IOContext
{
100 using IOContext::IOContext
;
103 class ARROW_EXPORT FileInterface
{
105 virtual ~FileInterface() = 0;
107 /// \brief Close the stream cleanly
109 /// For writable streams, this will attempt to flush any pending data
110 /// before releasing the underlying resource.
112 /// After Close() is called, closed() returns true and the stream is not
113 /// available for further operations.
114 virtual Status
Close() = 0;
116 /// \brief Close the stream abruptly
118 /// This method does not guarantee that any pending data is flushed.
119 /// It merely releases any underlying resource used by the stream for
122 /// After Abort() is called, closed() returns true and the stream is not
123 /// available for further operations.
124 virtual Status
Abort();
126 /// \brief Return the position in this stream
127 virtual Result
<int64_t> Tell() const = 0;
129 /// \brief Return whether the stream is closed
130 virtual bool closed() const = 0;
132 FileMode::type
mode() const { return mode_
; }
135 FileInterface() : mode_(FileMode::READ
) {}
136 FileMode::type mode_
;
137 void set_mode(FileMode::type mode
) { mode_
= mode
; }
140 ARROW_DISALLOW_COPY_AND_ASSIGN(FileInterface
);
143 class ARROW_EXPORT Seekable
{
145 virtual ~Seekable() = default;
146 virtual Status
Seek(int64_t position
) = 0;
149 class ARROW_EXPORT Writable
{
151 virtual ~Writable() = default;
153 /// \brief Write the given data to the stream
155 /// This method always processes the bytes in full. Depending on the
156 /// semantics of the stream, the data may be written out immediately,
157 /// held in a buffer, or written asynchronously. In the case where
158 /// the stream buffers the data, it will be copied. To avoid potentially
159 /// large copies, use the Write variant that takes an owned Buffer.
160 virtual Status
Write(const void* data
, int64_t nbytes
) = 0;
162 /// \brief Write the given data to the stream
164 /// Since the Buffer owns its memory, this method can avoid a copy if
165 /// buffering is required. See Write(const void*, int64_t) for details.
166 virtual Status
Write(const std::shared_ptr
<Buffer
>& data
);
168 /// \brief Flush buffered bytes, if any
169 virtual Status
Flush();
171 Status
Write(util::string_view data
);
174 class ARROW_EXPORT Readable
{
176 virtual ~Readable() = default;
178 /// \brief Read data from current file position.
180 /// Read at most `nbytes` from the current file position into `out`.
181 /// The number of bytes read is returned.
182 virtual Result
<int64_t> Read(int64_t nbytes
, void* out
) = 0;
184 /// \brief Read data from current file position.
186 /// Read at most `nbytes` from the current file position. Less bytes may
187 /// be read if EOF is reached. This method updates the current file position.
189 /// In some cases (e.g. a memory-mapped file), this method may avoid a
191 virtual Result
<std::shared_ptr
<Buffer
>> Read(int64_t nbytes
) = 0;
193 /// EXPERIMENTAL: The IOContext associated with this file.
195 /// By default, this is the same as default_io_context(), but it may be
196 /// overriden by subclasses.
197 virtual const IOContext
& io_context() const;
200 class ARROW_EXPORT OutputStream
: virtual public FileInterface
, public Writable
{
202 OutputStream() = default;
205 class ARROW_EXPORT InputStream
: virtual public FileInterface
,
206 virtual public Readable
,
207 public std::enable_shared_from_this
<InputStream
> {
209 /// \brief Advance or skip stream indicated number of bytes
210 /// \param[in] nbytes the number to move forward
212 Status
Advance(int64_t nbytes
);
214 /// \brief Return zero-copy string_view to upcoming bytes.
216 /// Do not modify the stream position. The view becomes invalid after
217 /// any operation on the stream. May trigger buffering if the requested
218 /// size is larger than the number of buffered bytes.
220 /// May return NotImplemented on streams that don't support it.
222 /// \param[in] nbytes the maximum number of bytes to see
223 virtual Result
<util::string_view
> Peek(int64_t nbytes
);
225 /// \brief Return true if InputStream is capable of zero copy Buffer reads
227 /// Zero copy reads imply the use of Buffer-returning Read() overloads.
228 virtual bool supports_zero_copy() const;
230 /// \brief Read and return stream metadata
232 /// If the stream implementation doesn't support metadata, empty metadata
233 /// is returned. Note that it is allowed to return a null pointer rather
234 /// than an allocated empty metadata.
235 virtual Result
<std::shared_ptr
<const KeyValueMetadata
>> ReadMetadata();
237 /// \brief Read stream metadata asynchronously
238 virtual Future
<std::shared_ptr
<const KeyValueMetadata
>> ReadMetadataAsync(
239 const IOContext
& io_context
);
240 Future
<std::shared_ptr
<const KeyValueMetadata
>> ReadMetadataAsync();
243 InputStream() = default;
246 class ARROW_EXPORT RandomAccessFile
: public InputStream
, public Seekable
{
248 /// Necessary because we hold a std::unique_ptr
249 ~RandomAccessFile() override
;
251 /// \brief Create an isolated InputStream that reads a segment of a
252 /// RandomAccessFile. Multiple such stream can be created and used
253 /// independently without interference
254 /// \param[in] file a file instance
255 /// \param[in] file_offset the starting position in the file
256 /// \param[in] nbytes the extent of bytes to read. The file should have
257 /// sufficient bytes available
258 static std::shared_ptr
<InputStream
> GetStream(std::shared_ptr
<RandomAccessFile
> file
,
259 int64_t file_offset
, int64_t nbytes
);
261 /// \brief Return the total file size in bytes.
263 /// This method does not read or move the current file position, so is safe
264 /// to call concurrently with e.g. ReadAt().
265 virtual Result
<int64_t> GetSize() = 0;
267 /// \brief Read data from given file position.
269 /// At most `nbytes` bytes are read. The number of bytes read is returned
270 /// (it can be less than `nbytes` if EOF is reached).
272 /// This method can be safely called from multiple threads concurrently.
273 /// It is unspecified whether this method updates the file position or not.
275 /// The default RandomAccessFile-provided implementation uses Seek() and Read(),
276 /// but subclasses may override it with a more efficient implementation
277 /// that doesn't depend on implicit file positioning.
279 /// \param[in] position Where to read bytes from
280 /// \param[in] nbytes The number of bytes to read
281 /// \param[out] out The buffer to read bytes into
282 /// \return The number of bytes read, or an error
283 virtual Result
<int64_t> ReadAt(int64_t position
, int64_t nbytes
, void* out
);
285 /// \brief Read data from given file position.
287 /// At most `nbytes` bytes are read, but it can be less if EOF is reached.
289 /// \param[in] position Where to read bytes from
290 /// \param[in] nbytes The number of bytes to read
291 /// \return A buffer containing the bytes read, or an error
292 virtual Result
<std::shared_ptr
<Buffer
>> ReadAt(int64_t position
, int64_t nbytes
);
294 /// EXPERIMENTAL: Read data asynchronously.
295 virtual Future
<std::shared_ptr
<Buffer
>> ReadAsync(const IOContext
&, int64_t position
,
298 /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
299 Future
<std::shared_ptr
<Buffer
>> ReadAsync(int64_t position
, int64_t nbytes
);
301 /// EXPERIMENTAL: Inform that the given ranges may be read soon.
303 /// Some implementations might arrange to prefetch some of the data.
304 /// However, no guarantee is made and the default implementation does nothing.
305 /// For robust prefetching, use ReadAt() or ReadAsync().
306 virtual Status
WillNeed(const std::vector
<ReadRange
>& ranges
);
312 struct ARROW_NO_EXPORT Impl
;
313 std::unique_ptr
<Impl
> interface_impl_
;
316 class ARROW_EXPORT WritableFile
: public OutputStream
, public Seekable
{
318 virtual Status
WriteAt(int64_t position
, const void* data
, int64_t nbytes
) = 0;
321 WritableFile() = default;
324 class ARROW_EXPORT ReadWriteFileInterface
: public RandomAccessFile
, public WritableFile
{
326 ReadWriteFileInterface() { RandomAccessFile::set_mode(FileMode::READWRITE
); }
329 /// \brief Return an iterator on an input stream
331 /// The iterator yields a fixed-size block on each Next() call, except the
332 /// last block in the stream which may be smaller.
333 /// Once the end of stream is reached, Next() returns nullptr
334 /// (unlike InputStream::Read() which returns an empty buffer).
336 Result
<Iterator
<std::shared_ptr
<Buffer
>>> MakeInputStreamIterator(
337 std::shared_ptr
<InputStream
> stream
, int64_t block_size
);