]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/ipc/message.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / ipc / message.h
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// C++ object model and user API for interprocess schema messaging
19
20#pragma once
21
22#include <cstdint>
23#include <memory>
24#include <string>
25#include <utility>
26
27#include "arrow/io/type_fwd.h"
28#include "arrow/ipc/type_fwd.h"
29#include "arrow/result.h"
30#include "arrow/status.h"
31#include "arrow/type_fwd.h"
32#include "arrow/util/macros.h"
33#include "arrow/util/visibility.h"
34
35namespace arrow {
36namespace ipc {
37
38struct IpcWriteOptions;
39
40// Read interface classes. We do not fully deserialize the flatbuffers so that
41// individual fields metadata can be retrieved from very large schema without
42//
43
44/// \class Message
45/// \brief An IPC message including metadata and body
46class ARROW_EXPORT Message {
47 public:
48 /// \brief Construct message, but do not validate
49 ///
50 /// Use at your own risk; Message::Open has more metadata validation
51 Message(std::shared_ptr<Buffer> metadata, std::shared_ptr<Buffer> body);
52
53 ~Message();
54
55 /// \brief Create and validate a Message instance from two buffers
56 ///
57 /// \param[in] metadata a buffer containing the Flatbuffer metadata
58 /// \param[in] body a buffer containing the message body, which may be null
59 /// \return the created message
60 static Result<std::unique_ptr<Message>> Open(std::shared_ptr<Buffer> metadata,
61 std::shared_ptr<Buffer> body);
62
63 /// \brief Read message body and create Message given Flatbuffer metadata
64 /// \param[in] metadata containing a serialized Message flatbuffer
65 /// \param[in] stream an InputStream
66 /// \return the created Message
67 ///
68 /// \note If stream supports zero-copy, this is zero-copy
69 static Result<std::unique_ptr<Message>> ReadFrom(std::shared_ptr<Buffer> metadata,
70 io::InputStream* stream);
71
72 /// \brief Read message body from position in file, and create Message given
73 /// the Flatbuffer metadata
74 /// \param[in] offset the position in the file where the message body starts.
75 /// \param[in] metadata containing a serialized Message flatbuffer
76 /// \param[in] file the seekable file interface to read from
77 /// \return the created Message
78 ///
79 /// \note If file supports zero-copy, this is zero-copy
80 static Result<std::unique_ptr<Message>> ReadFrom(const int64_t offset,
81 std::shared_ptr<Buffer> metadata,
82 io::RandomAccessFile* file);
83
84 /// \brief Return true if message type and contents are equal
85 ///
86 /// \param other another message
87 /// \return true if contents equal
88 bool Equals(const Message& other) const;
89
90 /// \brief the Message metadata
91 ///
92 /// \return buffer
93 std::shared_ptr<Buffer> metadata() const;
94
95 /// \brief Custom metadata serialized in metadata Flatbuffer. Returns nullptr
96 /// when none set
97 const std::shared_ptr<const KeyValueMetadata>& custom_metadata() const;
98
99 /// \brief the Message body, if any
100 ///
101 /// \return buffer is null if no body
102 std::shared_ptr<Buffer> body() const;
103
104 /// \brief The expected body length according to the metadata, for
105 /// verification purposes
106 int64_t body_length() const;
107
108 /// \brief The Message type
109 MessageType type() const;
110
111 /// \brief The Message metadata version
112 MetadataVersion metadata_version() const;
113
114 const void* header() const;
115
116 /// \brief Write length-prefixed metadata and body to output stream
117 ///
118 /// \param[in] file output stream to write to
119 /// \param[in] options IPC writing options including alignment
120 /// \param[out] output_length the number of bytes written
121 /// \return Status
122 Status SerializeTo(io::OutputStream* file, const IpcWriteOptions& options,
123 int64_t* output_length) const;
124
125 /// \brief Return true if the Message metadata passes Flatbuffer validation
126 bool Verify() const;
127
128 /// \brief Whether a given message type needs a body.
129 static bool HasBody(MessageType type) {
130 return type != MessageType::NONE && type != MessageType::SCHEMA;
131 }
132
133 private:
134 // Hide serialization details from user API
135 class MessageImpl;
136 std::unique_ptr<MessageImpl> impl_;
137
138 ARROW_DISALLOW_COPY_AND_ASSIGN(Message);
139};
140
141ARROW_EXPORT std::string FormatMessageType(MessageType type);
142
143/// \class MessageDecoderListener
144/// \brief An abstract class to listen events from MessageDecoder.
145///
146/// This API is EXPERIMENTAL.
147///
148/// \since 0.17.0
149class ARROW_EXPORT MessageDecoderListener {
150 public:
151 virtual ~MessageDecoderListener() = default;
152
153 /// \brief Called when a message is decoded.
154 ///
155 /// MessageDecoder calls this method when it decodes a message. This
156 /// method is called multiple times when the target stream has
157 /// multiple messages.
158 ///
159 /// \param[in] message a decoded message
160 /// \return Status
161 virtual Status OnMessageDecoded(std::unique_ptr<Message> message) = 0;
162
163 /// \brief Called when the decoder state is changed to
164 /// MessageDecoder::State::INITIAL.
165 ///
166 /// The default implementation just returns arrow::Status::OK().
167 ///
168 /// \return Status
169 virtual Status OnInitial();
170
171 /// \brief Called when the decoder state is changed to
172 /// MessageDecoder::State::METADATA_LENGTH.
173 ///
174 /// The default implementation just returns arrow::Status::OK().
175 ///
176 /// \return Status
177 virtual Status OnMetadataLength();
178
179 /// \brief Called when the decoder state is changed to
180 /// MessageDecoder::State::METADATA.
181 ///
182 /// The default implementation just returns arrow::Status::OK().
183 ///
184 /// \return Status
185 virtual Status OnMetadata();
186
187 /// \brief Called when the decoder state is changed to
188 /// MessageDecoder::State::BODY.
189 ///
190 /// The default implementation just returns arrow::Status::OK().
191 ///
192 /// \return Status
193 virtual Status OnBody();
194
195 /// \brief Called when the decoder state is changed to
196 /// MessageDecoder::State::EOS.
197 ///
198 /// The default implementation just returns arrow::Status::OK().
199 ///
200 /// \return Status
201 virtual Status OnEOS();
202};
203
204/// \class AssignMessageDecoderListener
205/// \brief Assign a message decoded by MessageDecoder.
206///
207/// This API is EXPERIMENTAL.
208///
209/// \since 0.17.0
210class ARROW_EXPORT AssignMessageDecoderListener : public MessageDecoderListener {
211 public:
212 /// \brief Construct a listener that assigns a decoded message to the
213 /// specified location.
214 ///
215 /// \param[in] message a location to store the received message
216 explicit AssignMessageDecoderListener(std::unique_ptr<Message>* message)
217 : message_(message) {}
218
219 virtual ~AssignMessageDecoderListener() = default;
220
221 Status OnMessageDecoded(std::unique_ptr<Message> message) override {
222 *message_ = std::move(message);
223 return Status::OK();
224 }
225
226 private:
227 std::unique_ptr<Message>* message_;
228
229 ARROW_DISALLOW_COPY_AND_ASSIGN(AssignMessageDecoderListener);
230};
231
232/// \class MessageDecoder
233/// \brief Push style message decoder that receives data from user.
234///
235/// This API is EXPERIMENTAL.
236///
237/// \since 0.17.0
238class ARROW_EXPORT MessageDecoder {
239 public:
240 /// \brief State for reading a message
241 enum State {
242 /// The initial state. It requires one of the followings as the next data:
243 ///
244 /// * int32_t continuation token
245 /// * int32_t end-of-stream mark (== 0)
246 /// * int32_t metadata length (backward compatibility for
247 /// reading old IPC messages produced prior to version 0.15.0
248 INITIAL,
249
250 /// It requires int32_t metadata length.
251 METADATA_LENGTH,
252
253 /// It requires metadata.
254 METADATA,
255
256 /// It requires message body.
257 BODY,
258
259 /// The end-of-stream state. No more data is processed.
260 EOS,
261 };
262
263 /// \brief Construct a message decoder.
264 ///
265 /// \param[in] listener a MessageDecoderListener that responds events from
266 /// the decoder
267 /// \param[in] pool an optional MemoryPool to copy metadata on the
268 /// CPU, if required
269 explicit MessageDecoder(std::shared_ptr<MessageDecoderListener> listener,
270 MemoryPool* pool = default_memory_pool());
271
272 /// \brief Construct a message decoder with the specified state.
273 ///
274 /// This is a construct for advanced users that know how to decode
275 /// Message.
276 ///
277 /// \param[in] listener a MessageDecoderListener that responds events from
278 /// the decoder
279 /// \param[in] initial_state an initial state of the decode
280 /// \param[in] initial_next_required_size the number of bytes needed
281 /// to run the next action
282 /// \param[in] pool an optional MemoryPool to copy metadata on the
283 /// CPU, if required
284 MessageDecoder(std::shared_ptr<MessageDecoderListener> listener, State initial_state,
285 int64_t initial_next_required_size,
286 MemoryPool* pool = default_memory_pool());
287
288 virtual ~MessageDecoder();
289
290 /// \brief Feed data to the decoder as a raw data.
291 ///
292 /// If the decoder can decode one or more messages by the data, the
293 /// decoder calls listener->OnMessageDecoded() with a decoded
294 /// message multiple times.
295 ///
296 /// If the state of the decoder is changed, corresponding callbacks
297 /// on listener is called:
298 ///
299 /// * MessageDecoder::State::INITIAL: listener->OnInitial()
300 /// * MessageDecoder::State::METADATA_LENGTH: listener->OnMetadataLength()
301 /// * MessageDecoder::State::METADATA: listener->OnMetadata()
302 /// * MessageDecoder::State::BODY: listener->OnBody()
303 /// * MessageDecoder::State::EOS: listener->OnEOS()
304 ///
305 /// \param[in] data a raw data to be processed. This data isn't
306 /// copied. The passed memory must be kept alive through message
307 /// processing.
308 /// \param[in] size raw data size.
309 /// \return Status
310 Status Consume(const uint8_t* data, int64_t size);
311
312 /// \brief Feed data to the decoder as a Buffer.
313 ///
314 /// If the decoder can decode one or more messages by the Buffer,
315 /// the decoder calls listener->OnMessageDecoded() with a decoded
316 /// message multiple times.
317 ///
318 /// \param[in] buffer a Buffer to be processed.
319 /// \return Status
320 Status Consume(std::shared_ptr<Buffer> buffer);
321
322 /// \brief Return the number of bytes needed to advance the state of
323 /// the decoder.
324 ///
325 /// This method is provided for users who want to optimize performance.
326 /// Normal users don't need to use this method.
327 ///
328 /// Here is an example usage for normal users:
329 ///
330 /// ~~~{.cpp}
331 /// decoder.Consume(buffer1);
332 /// decoder.Consume(buffer2);
333 /// decoder.Consume(buffer3);
334 /// ~~~
335 ///
336 /// Decoder has internal buffer. If consumed data isn't enough to
337 /// advance the state of the decoder, consumed data is buffered to
338 /// the internal buffer. It causes performance overhead.
339 ///
340 /// If you pass next_required_size() size data to each Consume()
341 /// call, the decoder doesn't use its internal buffer. It improves
342 /// performance.
343 ///
344 /// Here is an example usage to avoid using internal buffer:
345 ///
346 /// ~~~{.cpp}
347 /// buffer1 = get_data(decoder.next_required_size());
348 /// decoder.Consume(buffer1);
349 /// buffer2 = get_data(decoder.next_required_size());
350 /// decoder.Consume(buffer2);
351 /// ~~~
352 ///
353 /// Users can use this method to avoid creating small
354 /// chunks. Message body must be contiguous data. If users pass
355 /// small chunks to the decoder, the decoder needs concatenate small
356 /// chunks internally. It causes performance overhead.
357 ///
358 /// Here is an example usage to reduce small chunks:
359 ///
360 /// ~~~{.cpp}
361 /// buffer = AllocateResizableBuffer();
362 /// while ((small_chunk = get_data(&small_chunk_size))) {
363 /// auto current_buffer_size = buffer->size();
364 /// buffer->Resize(current_buffer_size + small_chunk_size);
365 /// memcpy(buffer->mutable_data() + current_buffer_size,
366 /// small_chunk,
367 /// small_chunk_size);
368 /// if (buffer->size() < decoder.next_required_size()) {
369 /// continue;
370 /// }
371 /// std::shared_ptr<arrow::Buffer> chunk(buffer.release());
372 /// decoder.Consume(chunk);
373 /// buffer = AllocateResizableBuffer();
374 /// }
375 /// if (buffer->size() > 0) {
376 /// std::shared_ptr<arrow::Buffer> chunk(buffer.release());
377 /// decoder.Consume(chunk);
378 /// }
379 /// ~~~
380 ///
381 /// \return the number of bytes needed to advance the state of the
382 /// decoder
383 int64_t next_required_size() const;
384
385 /// \brief Return the current state of the decoder.
386 ///
387 /// This method is provided for users who want to optimize performance.
388 /// Normal users don't need to use this method.
389 ///
390 /// Decoder doesn't need Buffer to process data on the
391 /// MessageDecoder::State::INITIAL state and the
392 /// MessageDecoder::State::METADATA_LENGTH. Creating Buffer has
393 /// performance overhead. Advanced users can avoid creating Buffer
394 /// by checking the current state of the decoder:
395 ///
396 /// ~~~{.cpp}
397 /// switch (decoder.state()) {
398 /// MessageDecoder::State::INITIAL:
399 /// MessageDecoder::State::METADATA_LENGTH:
400 /// {
401 /// uint8_t data[sizeof(int32_t)];
402 /// auto data_size = input->Read(decoder.next_required_size(), data);
403 /// decoder.Consume(data, data_size);
404 /// }
405 /// break;
406 /// default:
407 /// {
408 /// auto buffer = input->Read(decoder.next_required_size());
409 /// decoder.Consume(buffer);
410 /// }
411 /// break;
412 /// }
413 /// ~~~
414 ///
415 /// \return the current state
416 State state() const;
417
418 private:
419 class MessageDecoderImpl;
420 std::unique_ptr<MessageDecoderImpl> impl_;
421
422 ARROW_DISALLOW_COPY_AND_ASSIGN(MessageDecoder);
423};
424
425/// \brief Abstract interface for a sequence of messages
426/// \since 0.5.0
427class ARROW_EXPORT MessageReader {
428 public:
429 virtual ~MessageReader() = default;
430
431 /// \brief Create MessageReader that reads from InputStream
432 static std::unique_ptr<MessageReader> Open(io::InputStream* stream);
433
434 /// \brief Create MessageReader that reads from owned InputStream
435 static std::unique_ptr<MessageReader> Open(
436 const std::shared_ptr<io::InputStream>& owned_stream);
437
438 /// \brief Read next Message from the interface
439 ///
440 /// \return an arrow::ipc::Message instance
441 virtual Result<std::unique_ptr<Message>> ReadNextMessage() = 0;
442};
443
444/// \brief Read encapsulated RPC message from position in file
445///
446/// Read a length-prefixed message flatbuffer starting at the indicated file
447/// offset. If the message has a body with non-zero length, it will also be
448/// read
449///
450/// The metadata_length includes at least the length prefix and the flatbuffer
451///
452/// \param[in] offset the position in the file where the message starts. The
453/// first 4 bytes after the offset are the message length
454/// \param[in] metadata_length the total number of bytes to read from file
455/// \param[in] file the seekable file interface to read from
456/// \return the message read
457ARROW_EXPORT
458Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
459 const int32_t metadata_length,
460 io::RandomAccessFile* file);
461
462ARROW_EXPORT
463Future<std::shared_ptr<Message>> ReadMessageAsync(
464 const int64_t offset, const int32_t metadata_length, const int64_t body_length,
465 io::RandomAccessFile* file, const io::IOContext& context = io::default_io_context());
466
467/// \brief Advance stream to an 8-byte offset if its position is not a multiple
468/// of 8 already
469/// \param[in] stream an input stream
470/// \param[in] alignment the byte multiple for the metadata prefix, usually 8
471/// or 64, to ensure the body starts on a multiple of that alignment
472/// \return Status
473ARROW_EXPORT
474Status AlignStream(io::InputStream* stream, int32_t alignment = 8);
475
476/// \brief Advance stream to an 8-byte offset if its position is not a multiple
477/// of 8 already
478/// \param[in] stream an output stream
479/// \param[in] alignment the byte multiple for the metadata prefix, usually 8
480/// or 64, to ensure the body starts on a multiple of that alignment
481/// \return Status
482ARROW_EXPORT
483Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
484
485/// \brief Return error Status if file position is not a multiple of the
486/// indicated alignment
487ARROW_EXPORT
488Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
489
490/// \brief Read encapsulated IPC message (metadata and body) from InputStream
491///
492/// Returns null if there are not enough bytes available or the
493/// message length is 0 (e.g. EOS in a stream)
494///
495/// \param[in] stream an input stream
496/// \param[in] pool an optional MemoryPool to copy metadata on the CPU, if required
497/// \return Message
498ARROW_EXPORT
499Result<std::unique_ptr<Message>> ReadMessage(io::InputStream* stream,
500 MemoryPool* pool = default_memory_pool());
501
502/// \brief Feed data from InputStream to MessageDecoder to decode an
503/// encapsulated IPC message (metadata and body)
504///
505/// This API is EXPERIMENTAL.
506///
507/// \param[in] decoder a decoder
508/// \param[in] stream an input stream
509/// \return Status
510///
511/// \since 0.17.0
512ARROW_EXPORT
513Status DecodeMessage(MessageDecoder* decoder, io::InputStream* stream);
514
515/// Write encapsulated IPC message Does not make assumptions about
516/// whether the stream is aligned already. Can write legacy (pre
517/// version 0.15.0) IPC message if option set
518///
519/// continuation: 0xFFFFFFFF
520/// message_size: int32
521/// message: const void*
522/// padding
523///
524///
525/// \param[in] message a buffer containing the metadata to write
526/// \param[in] options IPC writing options, including alignment and
527/// legacy message support
528/// \param[in,out] file the OutputStream to write to
529/// \param[out] message_length the total size of the payload written including
530/// padding
531/// \return Status
532Status WriteMessage(const Buffer& message, const IpcWriteOptions& options,
533 io::OutputStream* file, int32_t* message_length);
534
535} // namespace ipc
536} // namespace arrow