]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | // Public API for different memory sharing / IO mechanisms | |
19 | ||
20 | #pragma once | |
21 | ||
22 | #include <cstdint> | |
23 | #include <memory> | |
24 | #include <vector> | |
25 | ||
26 | #include "arrow/io/concurrency.h" | |
27 | #include "arrow/io/interfaces.h" | |
28 | #include "arrow/type_fwd.h" | |
29 | #include "arrow/util/string_view.h" | |
30 | #include "arrow/util/visibility.h" | |
31 | ||
32 | namespace arrow { | |
33 | ||
34 | class Status; | |
35 | ||
36 | namespace io { | |
37 | ||
38 | /// \brief An output stream that writes to a resizable buffer | |
39 | class ARROW_EXPORT BufferOutputStream : public OutputStream { | |
40 | public: | |
41 | explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer); | |
42 | ||
43 | /// \brief Create in-memory output stream with indicated capacity using a | |
44 | /// memory pool | |
45 | /// \param[in] initial_capacity the initial allocated internal capacity of | |
46 | /// the OutputStream | |
47 | /// \param[in,out] pool a MemoryPool to use for allocations | |
48 | /// \return the created stream | |
49 | static Result<std::shared_ptr<BufferOutputStream>> Create( | |
50 | int64_t initial_capacity = 4096, MemoryPool* pool = default_memory_pool()); | |
51 | ||
52 | ~BufferOutputStream() override; | |
53 | ||
54 | // Implement the OutputStream interface | |
55 | ||
56 | /// Close the stream, preserving the buffer (retrieve it with Finish()). | |
57 | Status Close() override; | |
58 | bool closed() const override; | |
59 | Result<int64_t> Tell() const override; | |
60 | Status Write(const void* data, int64_t nbytes) override; | |
61 | ||
62 | /// \cond FALSE | |
63 | using OutputStream::Write; | |
64 | /// \endcond | |
65 | ||
66 | /// Close the stream and return the buffer | |
67 | Result<std::shared_ptr<Buffer>> Finish(); | |
68 | ||
69 | /// \brief Initialize state of OutputStream with newly allocated memory and | |
70 | /// set position to 0 | |
71 | /// \param[in] initial_capacity the starting allocated capacity | |
72 | /// \param[in,out] pool the memory pool to use for allocations | |
73 | /// \return Status | |
74 | Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool()); | |
75 | ||
76 | int64_t capacity() const { return capacity_; } | |
77 | ||
78 | private: | |
79 | BufferOutputStream(); | |
80 | ||
81 | // Ensures there is sufficient space available to write nbytes | |
82 | Status Reserve(int64_t nbytes); | |
83 | ||
84 | std::shared_ptr<ResizableBuffer> buffer_; | |
85 | bool is_open_; | |
86 | int64_t capacity_; | |
87 | int64_t position_; | |
88 | uint8_t* mutable_data_; | |
89 | }; | |
90 | ||
91 | /// \brief A helper class to track the size of allocations | |
92 | /// | |
93 | /// Writes to this stream do not copy or retain any data, they just bump | |
94 | /// a size counter that can be later used to know exactly which data size | |
95 | /// needs to be allocated for actual writing. | |
96 | class ARROW_EXPORT MockOutputStream : public OutputStream { | |
97 | public: | |
98 | MockOutputStream() : extent_bytes_written_(0), is_open_(true) {} | |
99 | ||
100 | // Implement the OutputStream interface | |
101 | Status Close() override; | |
102 | bool closed() const override; | |
103 | Result<int64_t> Tell() const override; | |
104 | Status Write(const void* data, int64_t nbytes) override; | |
105 | /// \cond FALSE | |
106 | using Writable::Write; | |
107 | /// \endcond | |
108 | ||
109 | int64_t GetExtentBytesWritten() const { return extent_bytes_written_; } | |
110 | ||
111 | private: | |
112 | int64_t extent_bytes_written_; | |
113 | bool is_open_; | |
114 | }; | |
115 | ||
116 | /// \brief An output stream that writes into a fixed-size mutable buffer | |
117 | class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile { | |
118 | public: | |
119 | /// Input buffer must be mutable, will abort if not | |
120 | explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer); | |
121 | ~FixedSizeBufferWriter() override; | |
122 | ||
123 | Status Close() override; | |
124 | bool closed() const override; | |
125 | Status Seek(int64_t position) override; | |
126 | Result<int64_t> Tell() const override; | |
127 | Status Write(const void* data, int64_t nbytes) override; | |
128 | /// \cond FALSE | |
129 | using Writable::Write; | |
130 | /// \endcond | |
131 | ||
132 | Status WriteAt(int64_t position, const void* data, int64_t nbytes) override; | |
133 | ||
134 | void set_memcopy_threads(int num_threads); | |
135 | void set_memcopy_blocksize(int64_t blocksize); | |
136 | void set_memcopy_threshold(int64_t threshold); | |
137 | ||
138 | protected: | |
139 | class FixedSizeBufferWriterImpl; | |
140 | std::unique_ptr<FixedSizeBufferWriterImpl> impl_; | |
141 | }; | |
142 | ||
143 | /// \class BufferReader | |
144 | /// \brief Random access zero-copy reads on an arrow::Buffer | |
145 | class ARROW_EXPORT BufferReader | |
146 | : public internal::RandomAccessFileConcurrencyWrapper<BufferReader> { | |
147 | public: | |
148 | explicit BufferReader(std::shared_ptr<Buffer> buffer); | |
149 | explicit BufferReader(const Buffer& buffer); | |
150 | BufferReader(const uint8_t* data, int64_t size); | |
151 | ||
152 | /// \brief Instantiate from std::string or arrow::util::string_view. Does not | |
153 | /// own data | |
154 | explicit BufferReader(const util::string_view& data); | |
155 | ||
156 | bool closed() const override; | |
157 | ||
158 | bool supports_zero_copy() const override; | |
159 | ||
160 | std::shared_ptr<Buffer> buffer() const { return buffer_; } | |
161 | ||
162 | // Synchronous ReadAsync override | |
163 | Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position, | |
164 | int64_t nbytes) override; | |
165 | Status WillNeed(const std::vector<ReadRange>& ranges) override; | |
166 | ||
167 | protected: | |
168 | friend RandomAccessFileConcurrencyWrapper<BufferReader>; | |
169 | ||
170 | Status DoClose(); | |
171 | ||
172 | Result<int64_t> DoRead(int64_t nbytes, void* buffer); | |
173 | Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes); | |
174 | Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out); | |
175 | Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes); | |
176 | Result<util::string_view> DoPeek(int64_t nbytes) override; | |
177 | ||
178 | Result<int64_t> DoTell() const; | |
179 | Status DoSeek(int64_t position); | |
180 | Result<int64_t> DoGetSize(); | |
181 | ||
182 | Status CheckClosed() const { | |
183 | if (!is_open_) { | |
184 | return Status::Invalid("Operation forbidden on closed BufferReader"); | |
185 | } | |
186 | return Status::OK(); | |
187 | } | |
188 | ||
189 | std::shared_ptr<Buffer> buffer_; | |
190 | const uint8_t* data_; | |
191 | int64_t size_; | |
192 | int64_t position_; | |
193 | bool is_open_; | |
194 | }; | |
195 | ||
196 | } // namespace io | |
197 | } // namespace arrow |