1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 // Public API for different memory sharing / IO mechanisms
26 #include "arrow/io/concurrency.h"
27 #include "arrow/io/interfaces.h"
28 #include "arrow/type_fwd.h"
29 #include "arrow/util/string_view.h"
30 #include "arrow/util/visibility.h"
38 /// \brief An output stream that writes to a resizable buffer
39 class ARROW_EXPORT BufferOutputStream
: public OutputStream
{
41 explicit BufferOutputStream(const std::shared_ptr
<ResizableBuffer
>& buffer
);
43 /// \brief Create in-memory output stream with indicated capacity using a
45 /// \param[in] initial_capacity the initial allocated internal capacity of
47 /// \param[in,out] pool a MemoryPool to use for allocations
48 /// \return the created stream
49 static Result
<std::shared_ptr
<BufferOutputStream
>> Create(
50 int64_t initial_capacity
= 4096, MemoryPool
* pool
= default_memory_pool());
52 ~BufferOutputStream() override
;
54 // Implement the OutputStream interface
56 /// Close the stream, preserving the buffer (retrieve it with Finish()).
57 Status
Close() override
;
58 bool closed() const override
;
59 Result
<int64_t> Tell() const override
;
60 Status
Write(const void* data
, int64_t nbytes
) override
;
63 using OutputStream::Write
;
66 /// Close the stream and return the buffer
67 Result
<std::shared_ptr
<Buffer
>> Finish();
69 /// \brief Initialize state of OutputStream with newly allocated memory and
71 /// \param[in] initial_capacity the starting allocated capacity
72 /// \param[in,out] pool the memory pool to use for allocations
74 Status
Reset(int64_t initial_capacity
= 1024, MemoryPool
* pool
= default_memory_pool());
76 int64_t capacity() const { return capacity_
; }
81 // Ensures there is sufficient space available to write nbytes
82 Status
Reserve(int64_t nbytes
);
84 std::shared_ptr
<ResizableBuffer
> buffer_
;
88 uint8_t* mutable_data_
;
91 /// \brief A helper class to track the size of allocations
93 /// Writes to this stream do not copy or retain any data, they just bump
94 /// a size counter that can be later used to know exactly which data size
95 /// needs to be allocated for actual writing.
96 class ARROW_EXPORT MockOutputStream
: public OutputStream
{
98 MockOutputStream() : extent_bytes_written_(0), is_open_(true) {}
100 // Implement the OutputStream interface
101 Status
Close() override
;
102 bool closed() const override
;
103 Result
<int64_t> Tell() const override
;
104 Status
Write(const void* data
, int64_t nbytes
) override
;
106 using Writable::Write
;
109 int64_t GetExtentBytesWritten() const { return extent_bytes_written_
; }
112 int64_t extent_bytes_written_
;
116 /// \brief An output stream that writes into a fixed-size mutable buffer
117 class ARROW_EXPORT FixedSizeBufferWriter
: public WritableFile
{
119 /// Input buffer must be mutable, will abort if not
120 explicit FixedSizeBufferWriter(const std::shared_ptr
<Buffer
>& buffer
);
121 ~FixedSizeBufferWriter() override
;
123 Status
Close() override
;
124 bool closed() const override
;
125 Status
Seek(int64_t position
) override
;
126 Result
<int64_t> Tell() const override
;
127 Status
Write(const void* data
, int64_t nbytes
) override
;
129 using Writable::Write
;
132 Status
WriteAt(int64_t position
, const void* data
, int64_t nbytes
) override
;
134 void set_memcopy_threads(int num_threads
);
135 void set_memcopy_blocksize(int64_t blocksize
);
136 void set_memcopy_threshold(int64_t threshold
);
139 class FixedSizeBufferWriterImpl
;
140 std::unique_ptr
<FixedSizeBufferWriterImpl
> impl_
;
143 /// \class BufferReader
144 /// \brief Random access zero-copy reads on an arrow::Buffer
145 class ARROW_EXPORT BufferReader
146 : public internal::RandomAccessFileConcurrencyWrapper
<BufferReader
> {
148 explicit BufferReader(std::shared_ptr
<Buffer
> buffer
);
149 explicit BufferReader(const Buffer
& buffer
);
150 BufferReader(const uint8_t* data
, int64_t size
);
152 /// \brief Instantiate from std::string or arrow::util::string_view. Does not
154 explicit BufferReader(const util::string_view
& data
);
156 bool closed() const override
;
158 bool supports_zero_copy() const override
;
160 std::shared_ptr
<Buffer
> buffer() const { return buffer_
; }
162 // Synchronous ReadAsync override
163 Future
<std::shared_ptr
<Buffer
>> ReadAsync(const IOContext
&, int64_t position
,
164 int64_t nbytes
) override
;
165 Status
WillNeed(const std::vector
<ReadRange
>& ranges
) override
;
168 friend RandomAccessFileConcurrencyWrapper
<BufferReader
>;
172 Result
<int64_t> DoRead(int64_t nbytes
, void* buffer
);
173 Result
<std::shared_ptr
<Buffer
>> DoRead(int64_t nbytes
);
174 Result
<int64_t> DoReadAt(int64_t position
, int64_t nbytes
, void* out
);
175 Result
<std::shared_ptr
<Buffer
>> DoReadAt(int64_t position
, int64_t nbytes
);
176 Result
<util::string_view
> DoPeek(int64_t nbytes
) override
;
178 Result
<int64_t> DoTell() const;
179 Status
DoSeek(int64_t position
);
180 Result
<int64_t> DoGetSize();
182 Status
CheckClosed() const {
184 return Status::Invalid("Operation forbidden on closed BufferReader");
189 std::shared_ptr
<Buffer
> buffer_
;
190 const uint8_t* data_
;