]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | // C++ object model and user API for interprocess schema messaging | |
19 | ||
20 | #pragma once | |
21 | ||
22 | #include <cstdint> | |
23 | #include <memory> | |
24 | #include <string> | |
25 | #include <utility> | |
26 | ||
27 | #include "arrow/io/type_fwd.h" | |
28 | #include "arrow/ipc/type_fwd.h" | |
29 | #include "arrow/result.h" | |
30 | #include "arrow/status.h" | |
31 | #include "arrow/type_fwd.h" | |
32 | #include "arrow/util/macros.h" | |
33 | #include "arrow/util/visibility.h" | |
34 | ||
35 | namespace arrow { | |
36 | namespace ipc { | |
37 | ||
38 | struct IpcWriteOptions; | |
39 | ||
40 | // Read interface classes. We do not fully deserialize the flatbuffers so that | |
41 | // individual fields metadata can be retrieved from very large schema without | |
42 | // | |
43 | ||
44 | /// \class Message | |
45 | /// \brief An IPC message including metadata and body | |
46 | class ARROW_EXPORT Message { | |
47 | public: | |
48 | /// \brief Construct message, but do not validate | |
49 | /// | |
50 | /// Use at your own risk; Message::Open has more metadata validation | |
51 | Message(std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body); | |
52 | ||
53 | ~Message(); | |
54 | ||
55 | /// \brief Create and validate a Message instance from two buffers | |
56 | /// | |
57 | /// \param[in] metadata a buffer containing the Flatbuffer metadata | |
58 | /// \param[in] body a buffer containing the message body, which may be null | |
59 | /// \return the created message | |
60 | static Result<std::unique_ptr<Message>> Open(std::shared_ptr<Buffer> metadata, | |
61 | std::shared_ptr<Buffer> body); | |
62 | ||
63 | /// \brief Read message body and create Message given Flatbuffer metadata | |
64 | /// \param[in] metadata containing a serialized Message flatbuffer | |
65 | /// \param[in] stream an InputStream | |
66 | /// \return the created Message | |
67 | /// | |
68 | /// \note If stream supports zero-copy, this is zero-copy | |
69 | static Result<std::unique_ptr<Message>> ReadFrom(std::shared_ptr<Buffer> metadata, | |
70 | io::InputStream* stream); | |
71 | ||
72 | /// \brief Read message body from position in file, and create Message given | |
73 | /// the Flatbuffer metadata | |
74 | /// \param[in] offset the position in the file where the message body starts. | |
75 | /// \param[in] metadata containing a serialized Message flatbuffer | |
76 | /// \param[in] file the seekable file interface to read from | |
77 | /// \return the created Message | |
78 | /// | |
79 | /// \note If file supports zero-copy, this is zero-copy | |
80 | static Result<std::unique_ptr<Message>> ReadFrom(const int64_t offset, | |
81 | std::shared_ptr<Buffer> metadata, | |
82 | io::RandomAccessFile* file); | |
83 | ||
84 | /// \brief Return true if message type and contents are equal | |
85 | /// | |
86 | /// \param other another message | |
87 | /// \return true if contents equal | |
88 | bool Equals(const Message& other) const; | |
89 | ||
90 | /// \brief the Message metadata | |
91 | /// | |
92 | /// \return buffer | |
93 | std::shared_ptr<Buffer> metadata() const; | |
94 | ||
95 | /// \brief Custom metadata serialized in metadata Flatbuffer. Returns nullptr | |
96 | /// when none set | |
97 | const std::shared_ptr<const KeyValueMetadata>& custom_metadata() const; | |
98 | ||
99 | /// \brief the Message body, if any | |
100 | /// | |
101 | /// \return buffer is null if no body | |
102 | std::shared_ptr<Buffer> body() const; | |
103 | ||
104 | /// \brief The expected body length according to the metadata, for | |
105 | /// verification purposes | |
106 | int64_t body_length() const; | |
107 | ||
108 | /// \brief The Message type | |
109 | MessageType type() const; | |
110 | ||
111 | /// \brief The Message metadata version | |
112 | MetadataVersion metadata_version() const; | |
113 | ||
114 | const void* header() const; | |
115 | ||
116 | /// \brief Write length-prefixed metadata and body to output stream | |
117 | /// | |
118 | /// \param[in] file output stream to write to | |
119 | /// \param[in] options IPC writing options including alignment | |
120 | /// \param[out] output_length the number of bytes written | |
121 | /// \return Status | |
122 | Status SerializeTo(io::OutputStream* file, const IpcWriteOptions& options, | |
123 | int64_t* output_length) const; | |
124 | ||
125 | /// \brief Return true if the Message metadata passes Flatbuffer validation | |
126 | bool Verify() const; | |
127 | ||
128 | /// \brief Whether a given message type needs a body. | |
129 | static bool HasBody(MessageType type) { | |
130 | return type != MessageType::NONE && type != MessageType::SCHEMA; | |
131 | } | |
132 | ||
133 | private: | |
134 | // Hide serialization details from user API | |
135 | class MessageImpl; | |
136 | std::unique_ptr<MessageImpl> impl_; | |
137 | ||
138 | ARROW_DISALLOW_COPY_AND_ASSIGN(Message); | |
139 | }; | |
140 | ||
141 | ARROW_EXPORT std::string FormatMessageType(MessageType type); | |
142 | ||
143 | /// \class MessageDecoderListener | |
144 | /// \brief An abstract class to listen events from MessageDecoder. | |
145 | /// | |
146 | /// This API is EXPERIMENTAL. | |
147 | /// | |
148 | /// \since 0.17.0 | |
149 | class ARROW_EXPORT MessageDecoderListener { | |
150 | public: | |
151 | virtual ~MessageDecoderListener() = default; | |
152 | ||
153 | /// \brief Called when a message is decoded. | |
154 | /// | |
155 | /// MessageDecoder calls this method when it decodes a message. This | |
156 | /// method is called multiple times when the target stream has | |
157 | /// multiple messages. | |
158 | /// | |
159 | /// \param[in] message a decoded message | |
160 | /// \return Status | |
161 | virtual Status OnMessageDecoded(std::unique_ptr<Message> message) = 0; | |
162 | ||
163 | /// \brief Called when the decoder state is changed to | |
164 | /// MessageDecoder::State::INITIAL. | |
165 | /// | |
166 | /// The default implementation just returns arrow::Status::OK(). | |
167 | /// | |
168 | /// \return Status | |
169 | virtual Status OnInitial(); | |
170 | ||
171 | /// \brief Called when the decoder state is changed to | |
172 | /// MessageDecoder::State::METADATA_LENGTH. | |
173 | /// | |
174 | /// The default implementation just returns arrow::Status::OK(). | |
175 | /// | |
176 | /// \return Status | |
177 | virtual Status OnMetadataLength(); | |
178 | ||
179 | /// \brief Called when the decoder state is changed to | |
180 | /// MessageDecoder::State::METADATA. | |
181 | /// | |
182 | /// The default implementation just returns arrow::Status::OK(). | |
183 | /// | |
184 | /// \return Status | |
185 | virtual Status OnMetadata(); | |
186 | ||
187 | /// \brief Called when the decoder state is changed to | |
188 | /// MessageDecoder::State::BODY. | |
189 | /// | |
190 | /// The default implementation just returns arrow::Status::OK(). | |
191 | /// | |
192 | /// \return Status | |
193 | virtual Status OnBody(); | |
194 | ||
195 | /// \brief Called when the decoder state is changed to | |
196 | /// MessageDecoder::State::EOS. | |
197 | /// | |
198 | /// The default implementation just returns arrow::Status::OK(). | |
199 | /// | |
200 | /// \return Status | |
201 | virtual Status OnEOS(); | |
202 | }; | |
203 | ||
204 | /// \class AssignMessageDecoderListener | |
205 | /// \brief Assign a message decoded by MessageDecoder. | |
206 | /// | |
207 | /// This API is EXPERIMENTAL. | |
208 | /// | |
209 | /// \since 0.17.0 | |
210 | class ARROW_EXPORT AssignMessageDecoderListener : public MessageDecoderListener { | |
211 | public: | |
212 | /// \brief Construct a listener that assigns a decoded message to the | |
213 | /// specified location. | |
214 | /// | |
215 | /// \param[in] message a location to store the received message | |
216 | explicit AssignMessageDecoderListener(std::unique_ptr<Message>* message) | |
217 | : message_(message) {} | |
218 | ||
219 | virtual ~AssignMessageDecoderListener() = default; | |
220 | ||
221 | Status OnMessageDecoded(std::unique_ptr<Message> message) override { | |
222 | *message_ = std::move(message); | |
223 | return Status::OK(); | |
224 | } | |
225 | ||
226 | private: | |
227 | std::unique_ptr<Message>* message_; | |
228 | ||
229 | ARROW_DISALLOW_COPY_AND_ASSIGN(AssignMessageDecoderListener); | |
230 | }; | |
231 | ||
232 | /// \class MessageDecoder | |
233 | /// \brief Push style message decoder that receives data from user. | |
234 | /// | |
235 | /// This API is EXPERIMENTAL. | |
236 | /// | |
237 | /// \since 0.17.0 | |
238 | class ARROW_EXPORT MessageDecoder { | |
239 | public: | |
240 | /// \brief State for reading a message | |
241 | enum State { | |
242 | /// The initial state. It requires one of the followings as the next data: | |
243 | /// | |
244 | /// * int32_t continuation token | |
245 | /// * int32_t end-of-stream mark (== 0) | |
246 | /// * int32_t metadata length (backward compatibility for | |
247 | /// reading old IPC messages produced prior to version 0.15.0 | |
248 | INITIAL, | |
249 | ||
250 | /// It requires int32_t metadata length. | |
251 | METADATA_LENGTH, | |
252 | ||
253 | /// It requires metadata. | |
254 | METADATA, | |
255 | ||
256 | /// It requires message body. | |
257 | BODY, | |
258 | ||
259 | /// The end-of-stream state. No more data is processed. | |
260 | EOS, | |
261 | }; | |
262 | ||
263 | /// \brief Construct a message decoder. | |
264 | /// | |
265 | /// \param[in] listener a MessageDecoderListener that responds events from | |
266 | /// the decoder | |
267 | /// \param[in] pool an optional MemoryPool to copy metadata on the | |
268 | /// CPU, if required | |
269 | explicit MessageDecoder(std::shared_ptr<MessageDecoderListener> listener, | |
270 | MemoryPool* pool = default_memory_pool()); | |
271 | ||
272 | /// \brief Construct a message decoder with the specified state. | |
273 | /// | |
274 | /// This is a construct for advanced users that know how to decode | |
275 | /// Message. | |
276 | /// | |
277 | /// \param[in] listener a MessageDecoderListener that responds events from | |
278 | /// the decoder | |
279 | /// \param[in] initial_state an initial state of the decode | |
280 | /// \param[in] initial_next_required_size the number of bytes needed | |
281 | /// to run the next action | |
282 | /// \param[in] pool an optional MemoryPool to copy metadata on the | |
283 | /// CPU, if required | |
284 | MessageDecoder(std::shared_ptr<MessageDecoderListener> listener, State initial_state, | |
285 | int64_t initial_next_required_size, | |
286 | MemoryPool* pool = default_memory_pool()); | |
287 | ||
288 | virtual ~MessageDecoder(); | |
289 | ||
290 | /// \brief Feed data to the decoder as a raw data. | |
291 | /// | |
292 | /// If the decoder can decode one or more messages by the data, the | |
293 | /// decoder calls listener->OnMessageDecoded() with a decoded | |
294 | /// message multiple times. | |
295 | /// | |
296 | /// If the state of the decoder is changed, corresponding callbacks | |
297 | /// on listener is called: | |
298 | /// | |
299 | /// * MessageDecoder::State::INITIAL: listener->OnInitial() | |
300 | /// * MessageDecoder::State::METADATA_LENGTH: listener->OnMetadataLength() | |
301 | /// * MessageDecoder::State::METADATA: listener->OnMetadata() | |
302 | /// * MessageDecoder::State::BODY: listener->OnBody() | |
303 | /// * MessageDecoder::State::EOS: listener->OnEOS() | |
304 | /// | |
305 | /// \param[in] data a raw data to be processed. This data isn't | |
306 | /// copied. The passed memory must be kept alive through message | |
307 | /// processing. | |
308 | /// \param[in] size raw data size. | |
309 | /// \return Status | |
310 | Status Consume(const uint8_t* data, int64_t size); | |
311 | ||
312 | /// \brief Feed data to the decoder as a Buffer. | |
313 | /// | |
314 | /// If the decoder can decode one or more messages by the Buffer, | |
315 | /// the decoder calls listener->OnMessageDecoded() with a decoded | |
316 | /// message multiple times. | |
317 | /// | |
318 | /// \param[in] buffer a Buffer to be processed. | |
319 | /// \return Status | |
320 | Status Consume(std::shared_ptr<Buffer> buffer); | |
321 | ||
322 | /// \brief Return the number of bytes needed to advance the state of | |
323 | /// the decoder. | |
324 | /// | |
325 | /// This method is provided for users who want to optimize performance. | |
326 | /// Normal users don't need to use this method. | |
327 | /// | |
328 | /// Here is an example usage for normal users: | |
329 | /// | |
330 | /// ~~~{.cpp} | |
331 | /// decoder.Consume(buffer1); | |
332 | /// decoder.Consume(buffer2); | |
333 | /// decoder.Consume(buffer3); | |
334 | /// ~~~ | |
335 | /// | |
336 | /// Decoder has internal buffer. If consumed data isn't enough to | |
337 | /// advance the state of the decoder, consumed data is buffered to | |
338 | /// the internal buffer. It causes performance overhead. | |
339 | /// | |
340 | /// If you pass next_required_size() size data to each Consume() | |
341 | /// call, the decoder doesn't use its internal buffer. It improves | |
342 | /// performance. | |
343 | /// | |
344 | /// Here is an example usage to avoid using internal buffer: | |
345 | /// | |
346 | /// ~~~{.cpp} | |
347 | /// buffer1 = get_data(decoder.next_required_size()); | |
348 | /// decoder.Consume(buffer1); | |
349 | /// buffer2 = get_data(decoder.next_required_size()); | |
350 | /// decoder.Consume(buffer2); | |
351 | /// ~~~ | |
352 | /// | |
353 | /// Users can use this method to avoid creating small | |
354 | /// chunks. Message body must be contiguous data. If users pass | |
355 | /// small chunks to the decoder, the decoder needs concatenate small | |
356 | /// chunks internally. It causes performance overhead. | |
357 | /// | |
358 | /// Here is an example usage to reduce small chunks: | |
359 | /// | |
360 | /// ~~~{.cpp} | |
361 | /// buffer = AllocateResizableBuffer(); | |
362 | /// while ((small_chunk = get_data(&small_chunk_size))) { | |
363 | /// auto current_buffer_size = buffer->size(); | |
364 | /// buffer->Resize(current_buffer_size + small_chunk_size); | |
365 | /// memcpy(buffer->mutable_data() + current_buffer_size, | |
366 | /// small_chunk, | |
367 | /// small_chunk_size); | |
368 | /// if (buffer->size() < decoder.next_required_size()) { | |
369 | /// continue; | |
370 | /// } | |
371 | /// std::shared_ptr<arrow::Buffer> chunk(buffer.release()); | |
372 | /// decoder.Consume(chunk); | |
373 | /// buffer = AllocateResizableBuffer(); | |
374 | /// } | |
375 | /// if (buffer->size() > 0) { | |
376 | /// std::shared_ptr<arrow::Buffer> chunk(buffer.release()); | |
377 | /// decoder.Consume(chunk); | |
378 | /// } | |
379 | /// ~~~ | |
380 | /// | |
381 | /// \return the number of bytes needed to advance the state of the | |
382 | /// decoder | |
383 | int64_t next_required_size() const; | |
384 | ||
385 | /// \brief Return the current state of the decoder. | |
386 | /// | |
387 | /// This method is provided for users who want to optimize performance. | |
388 | /// Normal users don't need to use this method. | |
389 | /// | |
390 | /// Decoder doesn't need Buffer to process data on the | |
391 | /// MessageDecoder::State::INITIAL state and the | |
392 | /// MessageDecoder::State::METADATA_LENGTH. Creating Buffer has | |
393 | /// performance overhead. Advanced users can avoid creating Buffer | |
394 | /// by checking the current state of the decoder: | |
395 | /// | |
396 | /// ~~~{.cpp} | |
397 | /// switch (decoder.state()) { | |
398 | /// MessageDecoder::State::INITIAL: | |
399 | /// MessageDecoder::State::METADATA_LENGTH: | |
400 | /// { | |
401 | /// uint8_t data[sizeof(int32_t)]; | |
402 | /// auto data_size = input->Read(decoder.next_required_size(), data); | |
403 | /// decoder.Consume(data, data_size); | |
404 | /// } | |
405 | /// break; | |
406 | /// default: | |
407 | /// { | |
408 | /// auto buffer = input->Read(decoder.next_required_size()); | |
409 | /// decoder.Consume(buffer); | |
410 | /// } | |
411 | /// break; | |
412 | /// } | |
413 | /// ~~~ | |
414 | /// | |
415 | /// \return the current state | |
416 | State state() const; | |
417 | ||
418 | private: | |
419 | class MessageDecoderImpl; | |
420 | std::unique_ptr<MessageDecoderImpl> impl_; | |
421 | ||
422 | ARROW_DISALLOW_COPY_AND_ASSIGN(MessageDecoder); | |
423 | }; | |
424 | ||
425 | /// \brief Abstract interface for a sequence of messages | |
426 | /// \since 0.5.0 | |
427 | class ARROW_EXPORT MessageReader { | |
428 | public: | |
429 | virtual ~MessageReader() = default; | |
430 | ||
431 | /// \brief Create MessageReader that reads from InputStream | |
432 | static std::unique_ptr<MessageReader> Open(io::InputStream* stream); | |
433 | ||
434 | /// \brief Create MessageReader that reads from owned InputStream | |
435 | static std::unique_ptr<MessageReader> Open( | |
436 | const std::shared_ptr<io::InputStream>& owned_stream); | |
437 | ||
438 | /// \brief Read next Message from the interface | |
439 | /// | |
440 | /// \return an arrow::ipc::Message instance | |
441 | virtual Result<std::unique_ptr<Message>> ReadNextMessage() = 0; | |
442 | }; | |
443 | ||
444 | /// \brief Read encapsulated RPC message from position in file | |
445 | /// | |
446 | /// Read a length-prefixed message flatbuffer starting at the indicated file | |
447 | /// offset. If the message has a body with non-zero length, it will also be | |
448 | /// read | |
449 | /// | |
450 | /// The metadata_length includes at least the length prefix and the flatbuffer | |
451 | /// | |
452 | /// \param[in] offset the position in the file where the message starts. The | |
453 | /// first 4 bytes after the offset are the message length | |
454 | /// \param[in] metadata_length the total number of bytes to read from file | |
455 | /// \param[in] file the seekable file interface to read from | |
456 | /// \return the message read | |
457 | ARROW_EXPORT | |
458 | Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset, | |
459 | const int32_t metadata_length, | |
460 | io::RandomAccessFile* file); | |
461 | ||
462 | ARROW_EXPORT | |
463 | Future<std::shared_ptr<Message>> ReadMessageAsync( | |
464 | const int64_t offset, const int32_t metadata_length, const int64_t body_length, | |
465 | io::RandomAccessFile* file, const io::IOContext& context = io::default_io_context()); | |
466 | ||
467 | /// \brief Advance stream to an 8-byte offset if its position is not a multiple | |
468 | /// of 8 already | |
469 | /// \param[in] stream an input stream | |
470 | /// \param[in] alignment the byte multiple for the metadata prefix, usually 8 | |
471 | /// or 64, to ensure the body starts on a multiple of that alignment | |
472 | /// \return Status | |
473 | ARROW_EXPORT | |
474 | Status AlignStream(io::InputStream* stream, int32_t alignment = 8); | |
475 | ||
476 | /// \brief Advance stream to an 8-byte offset if its position is not a multiple | |
477 | /// of 8 already | |
478 | /// \param[in] stream an output stream | |
479 | /// \param[in] alignment the byte multiple for the metadata prefix, usually 8 | |
480 | /// or 64, to ensure the body starts on a multiple of that alignment | |
481 | /// \return Status | |
482 | ARROW_EXPORT | |
483 | Status AlignStream(io::OutputStream* stream, int32_t alignment = 8); | |
484 | ||
485 | /// \brief Return error Status if file position is not a multiple of the | |
486 | /// indicated alignment | |
487 | ARROW_EXPORT | |
488 | Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8); | |
489 | ||
490 | /// \brief Read encapsulated IPC message (metadata and body) from InputStream | |
491 | /// | |
492 | /// Returns null if there are not enough bytes available or the | |
493 | /// message length is 0 (e.g. EOS in a stream) | |
494 | /// | |
495 | /// \param[in] stream an input stream | |
496 | /// \param[in] pool an optional MemoryPool to copy metadata on the CPU, if required | |
497 | /// \return Message | |
498 | ARROW_EXPORT | |
499 | Result<std::unique_ptr<Message>> ReadMessage(io::InputStream* stream, | |
500 | MemoryPool* pool = default_memory_pool()); | |
501 | ||
502 | /// \brief Feed data from InputStream to MessageDecoder to decode an | |
503 | /// encapsulated IPC message (metadata and body) | |
504 | /// | |
505 | /// This API is EXPERIMENTAL. | |
506 | /// | |
507 | /// \param[in] decoder a decoder | |
508 | /// \param[in] stream an input stream | |
509 | /// \return Status | |
510 | /// | |
511 | /// \since 0.17.0 | |
512 | ARROW_EXPORT | |
513 | Status DecodeMessage(MessageDecoder* decoder, io::InputStream* stream); | |
514 | ||
515 | /// Write encapsulated IPC message Does not make assumptions about | |
516 | /// whether the stream is aligned already. Can write legacy (pre | |
517 | /// version 0.15.0) IPC message if option set | |
518 | /// | |
519 | /// continuation: 0xFFFFFFFF | |
520 | /// message_size: int32 | |
521 | /// message: const void* | |
522 | /// padding | |
523 | /// | |
524 | /// | |
525 | /// \param[in] message a buffer containing the metadata to write | |
526 | /// \param[in] options IPC writing options, including alignment and | |
527 | /// legacy message support | |
528 | /// \param[in,out] file the OutputStream to write to | |
529 | /// \param[out] message_length the total size of the payload written including | |
530 | /// padding | |
531 | /// \return Status | |
532 | Status WriteMessage(const Buffer& message, const IpcWriteOptions& options, | |
533 | io::OutputStream* file, int32_t* message_length); | |
534 | ||
535 | } // namespace ipc | |
536 | } // namespace arrow |