]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/io/memory.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / memory.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // Public API for different memory sharing / IO mechanisms
19
20 #pragma once
21
22 #include <cstdint>
23 #include <memory>
24 #include <vector>
25
26 #include "arrow/io/concurrency.h"
27 #include "arrow/io/interfaces.h"
28 #include "arrow/type_fwd.h"
29 #include "arrow/util/string_view.h"
30 #include "arrow/util/visibility.h"
31
32 namespace arrow {
33
34 class Status;
35
36 namespace io {
37
38 /// \brief An output stream that writes to a resizable buffer
39 class ARROW_EXPORT BufferOutputStream : public OutputStream {
40 public:
41 explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
42
43 /// \brief Create in-memory output stream with indicated capacity using a
44 /// memory pool
45 /// \param[in] initial_capacity the initial allocated internal capacity of
46 /// the OutputStream
47 /// \param[in,out] pool a MemoryPool to use for allocations
48 /// \return the created stream
49 static Result<std::shared_ptr<BufferOutputStream>> Create(
50 int64_t initial_capacity = 4096, MemoryPool* pool = default_memory_pool());
51
52 ~BufferOutputStream() override;
53
54 // Implement the OutputStream interface
55
56 /// Close the stream, preserving the buffer (retrieve it with Finish()).
57 Status Close() override;
58 bool closed() const override;
59 Result<int64_t> Tell() const override;
60 Status Write(const void* data, int64_t nbytes) override;
61
62 /// \cond FALSE
63 using OutputStream::Write;
64 /// \endcond
65
66 /// Close the stream and return the buffer
67 Result<std::shared_ptr<Buffer>> Finish();
68
69 /// \brief Initialize state of OutputStream with newly allocated memory and
70 /// set position to 0
71 /// \param[in] initial_capacity the starting allocated capacity
72 /// \param[in,out] pool the memory pool to use for allocations
73 /// \return Status
74 Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool());
75
76 int64_t capacity() const { return capacity_; }
77
78 private:
79 BufferOutputStream();
80
81 // Ensures there is sufficient space available to write nbytes
82 Status Reserve(int64_t nbytes);
83
84 std::shared_ptr<ResizableBuffer> buffer_;
85 bool is_open_;
86 int64_t capacity_;
87 int64_t position_;
88 uint8_t* mutable_data_;
89 };
90
91 /// \brief A helper class to track the size of allocations
92 ///
93 /// Writes to this stream do not copy or retain any data, they just bump
94 /// a size counter that can be later used to know exactly which data size
95 /// needs to be allocated for actual writing.
96 class ARROW_EXPORT MockOutputStream : public OutputStream {
97 public:
98 MockOutputStream() : extent_bytes_written_(0), is_open_(true) {}
99
100 // Implement the OutputStream interface
101 Status Close() override;
102 bool closed() const override;
103 Result<int64_t> Tell() const override;
104 Status Write(const void* data, int64_t nbytes) override;
105 /// \cond FALSE
106 using Writable::Write;
107 /// \endcond
108
109 int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }
110
111 private:
112 int64_t extent_bytes_written_;
113 bool is_open_;
114 };
115
116 /// \brief An output stream that writes into a fixed-size mutable buffer
117 class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile {
118 public:
119 /// Input buffer must be mutable, will abort if not
120 explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer);
121 ~FixedSizeBufferWriter() override;
122
123 Status Close() override;
124 bool closed() const override;
125 Status Seek(int64_t position) override;
126 Result<int64_t> Tell() const override;
127 Status Write(const void* data, int64_t nbytes) override;
128 /// \cond FALSE
129 using Writable::Write;
130 /// \endcond
131
132 Status WriteAt(int64_t position, const void* data, int64_t nbytes) override;
133
134 void set_memcopy_threads(int num_threads);
135 void set_memcopy_blocksize(int64_t blocksize);
136 void set_memcopy_threshold(int64_t threshold);
137
138 protected:
139 class FixedSizeBufferWriterImpl;
140 std::unique_ptr<FixedSizeBufferWriterImpl> impl_;
141 };
142
143 /// \class BufferReader
144 /// \brief Random access zero-copy reads on an arrow::Buffer
145 class ARROW_EXPORT BufferReader
146 : public internal::RandomAccessFileConcurrencyWrapper<BufferReader> {
147 public:
148 explicit BufferReader(std::shared_ptr<Buffer> buffer);
149 explicit BufferReader(const Buffer& buffer);
150 BufferReader(const uint8_t* data, int64_t size);
151
152 /// \brief Instantiate from std::string or arrow::util::string_view. Does not
153 /// own data
154 explicit BufferReader(const util::string_view& data);
155
156 bool closed() const override;
157
158 bool supports_zero_copy() const override;
159
160 std::shared_ptr<Buffer> buffer() const { return buffer_; }
161
162 // Synchronous ReadAsync override
163 Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
164 int64_t nbytes) override;
165 Status WillNeed(const std::vector<ReadRange>& ranges) override;
166
167 protected:
168 friend RandomAccessFileConcurrencyWrapper<BufferReader>;
169
170 Status DoClose();
171
172 Result<int64_t> DoRead(int64_t nbytes, void* buffer);
173 Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
174 Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
175 Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
176 Result<util::string_view> DoPeek(int64_t nbytes) override;
177
178 Result<int64_t> DoTell() const;
179 Status DoSeek(int64_t position);
180 Result<int64_t> DoGetSize();
181
182 Status CheckClosed() const {
183 if (!is_open_) {
184 return Status::Invalid("Operation forbidden on closed BufferReader");
185 }
186 return Status::OK();
187 }
188
189 std::shared_ptr<Buffer> buffer_;
190 const uint8_t* data_;
191 int64_t size_;
192 int64_t position_;
193 bool is_open_;
194 };
195
196 } // namespace io
197 } // namespace arrow