]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/parquet/column_writer.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / parquet / column_writer.h
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#pragma once
19
20#include <cstdint>
21#include <cstring>
22#include <memory>
23
24#include "parquet/exception.h"
25#include "parquet/platform.h"
26#include "parquet/types.h"
27
28namespace arrow {
29
30class Array;
31
32namespace BitUtil {
33class BitWriter;
34} // namespace BitUtil
35
36namespace util {
37class RleEncoder;
38} // namespace util
39
40} // namespace arrow
41
42namespace parquet {
43
44struct ArrowWriteContext;
45class ColumnDescriptor;
46class DataPage;
47class DictionaryPage;
48class ColumnChunkMetaDataBuilder;
49class Encryptor;
50class WriterProperties;
51
52class PARQUET_EXPORT LevelEncoder {
53 public:
54 LevelEncoder();
55 ~LevelEncoder();
56
57 static int MaxBufferSize(Encoding::type encoding, int16_t max_level,
58 int num_buffered_values);
59
60 // Initialize the LevelEncoder.
61 void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values,
62 uint8_t* data, int data_size);
63
64 // Encodes a batch of levels from an array and returns the number of levels encoded
65 int Encode(int batch_size, const int16_t* levels);
66
67 int32_t len() {
68 if (encoding_ != Encoding::RLE) {
69 throw ParquetException("Only implemented for RLE encoding");
70 }
71 return rle_length_;
72 }
73
74 private:
75 int bit_width_;
76 int rle_length_;
77 Encoding::type encoding_;
78 std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_;
79 std::unique_ptr<::arrow::BitUtil::BitWriter> bit_packed_encoder_;
80};
81
82class PARQUET_EXPORT PageWriter {
83 public:
84 virtual ~PageWriter() {}
85
86 static std::unique_ptr<PageWriter> Open(
87 std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
88 int compression_level, ColumnChunkMetaDataBuilder* metadata,
89 int16_t row_group_ordinal = -1, int16_t column_chunk_ordinal = -1,
90 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
91 bool buffered_row_group = false,
92 std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
93 std::shared_ptr<Encryptor> data_encryptor = NULLPTR);
94
95 // The Column Writer decides if dictionary encoding is used if set and
96 // if the dictionary encoding has fallen back to default encoding on reaching dictionary
97 // page limit
98 virtual void Close(bool has_dictionary, bool fallback) = 0;
99
100 // Return the number of uncompressed bytes written (including header size)
101 virtual int64_t WriteDataPage(const DataPage& page) = 0;
102
103 // Return the number of uncompressed bytes written (including header size)
104 virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
105
106 virtual bool has_compressor() = 0;
107
108 virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
109};
110
111static constexpr int WRITE_BATCH_SIZE = 1000;
112class PARQUET_EXPORT ColumnWriter {
113 public:
114 virtual ~ColumnWriter() = default;
115
116 static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
117 std::unique_ptr<PageWriter>,
118 const WriterProperties* properties);
119
120 /// \brief Closes the ColumnWriter, commits any buffered values to pages.
121 /// \return Total size of the column in bytes
122 virtual int64_t Close() = 0;
123
124 /// \brief The physical Parquet type of the column
125 virtual Type::type type() const = 0;
126
127 /// \brief The schema for the column
128 virtual const ColumnDescriptor* descr() const = 0;
129
130 /// \brief The number of rows written so far
131 virtual int64_t rows_written() const = 0;
132
133 /// \brief The total size of the compressed pages + page headers. Some values
134 /// might be still buffered and not written to a page yet
135 virtual int64_t total_compressed_bytes() const = 0;
136
137 /// \brief The total number of bytes written as serialized data and
138 /// dictionary pages to the ColumnChunk so far
139 virtual int64_t total_bytes_written() const = 0;
140
141 /// \brief The file-level writer properties
142 virtual const WriterProperties* properties() = 0;
143
144 /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
145 /// error status if the array data type is not compatible with the concrete
146 /// writer type.
147 ///
148 /// leaf_array is always a primitive (possibly dictionary encoded type).
149 /// Leaf_field_nullable indicates whether the leaf array is considered nullable
150 /// according to its schema in a Table or its parent array.
151 virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
152 int64_t num_levels, const ::arrow::Array& leaf_array,
153 ArrowWriteContext* ctx,
154 bool leaf_field_nullable) = 0;
155};
156
157// API to write values to a single column. This is the main client facing API.
158template <typename DType>
159class TypedColumnWriter : public ColumnWriter {
160 public:
161 using T = typename DType::c_type;
162
163 // Write a batch of repetition levels, definition levels, and values to the
164 // column.
165 // `num_values` is the number of logical leaf values.
166 // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
167 // (resp. max repetition level) is 0.
168 // If not null, each of `def_levels` and `rep_levels` must have at least
169 // `num_values`.
170 //
171 // The number of physical values written (taken from `values`) is returned.
172 // It can be smaller than `num_values` is there are some undefined values.
173 virtual int64_t WriteBatch(int64_t num_values, const int16_t* def_levels,
174 const int16_t* rep_levels, const T* values) = 0;
175
176 /// Write a batch of repetition levels, definition levels, and values to the
177 /// column.
178 ///
179 /// In comparison to WriteBatch the length of repetition and definition levels
180 /// is the same as of the number of values read for max_definition_level == 1.
181 /// In the case of max_definition_level > 1, the repetition and definition
182 /// levels are larger than the values but the values include the null entries
183 /// with definition_level == (max_definition_level - 1). Thus we have to differentiate
184 /// in the parameters of this function if the input has the length of num_values or the
185 /// _number of rows in the lowest nesting level_.
186 ///
187 /// In the case that the most inner node in the Parquet is required, the _number of rows
188 /// in the lowest nesting level_ is equal to the number of non-null values. If the
189 /// inner-most schema node is optional, the _number of rows in the lowest nesting level_
190 /// also includes all values with definition_level == (max_definition_level - 1).
191 ///
192 /// @param num_values number of levels to write.
193 /// @param def_levels The Parquet definition levels, length is num_values
194 /// @param rep_levels The Parquet repetition levels, length is num_values
195 /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting
196 /// level. The length is number of rows in the lowest nesting level.
197 /// @param valid_bits_offset The offset in bits of the valid_bits where the
198 /// first relevant bit resides.
199 /// @param values The values in the lowest nested level including
200 /// spacing for nulls on the lowest levels; input has the length
201 /// of the number of rows on the lowest nesting level.
202 virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
203 const int16_t* rep_levels, const uint8_t* valid_bits,
204 int64_t valid_bits_offset, const T* values) = 0;
205
206 // Estimated size of the values that are not written to a page yet
207 virtual int64_t EstimatedBufferedValueBytes() const = 0;
208};
209
210using BoolWriter = TypedColumnWriter<BooleanType>;
211using Int32Writer = TypedColumnWriter<Int32Type>;
212using Int64Writer = TypedColumnWriter<Int64Type>;
213using Int96Writer = TypedColumnWriter<Int96Type>;
214using FloatWriter = TypedColumnWriter<FloatType>;
215using DoubleWriter = TypedColumnWriter<DoubleType>;
216using ByteArrayWriter = TypedColumnWriter<ByteArrayType>;
217using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>;
218
219namespace internal {
220
221/**
222 * Timestamp conversion constants
223 */
224constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
225
226template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
227inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
228 int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
229 (*impala_timestamp).value[2] = (uint32_t)julian_days;
230
231 int64_t last_day_units = time % UnitPerDay;
232 auto last_day_nanos = last_day_units * NanosecondsPerUnit;
233 // impala_timestamp will be unaligned every other entry so do memcpy instead
234 // of assign and reinterpret cast to avoid undefined behavior.
235 std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
236}
237
238constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
239
240inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
241 ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
242 impala_timestamp);
243}
244
245constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
246
247inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
248 Int96* impala_timestamp) {
249 ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
250 milliseconds, impala_timestamp);
251}
252
253constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
254
255inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
256 Int96* impala_timestamp) {
257 ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
258 microseconds, impala_timestamp);
259}
260
261constexpr int64_t kNanosecondsInNanos = INT64_C(1);
262
263inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
264 Int96* impala_timestamp) {
265 ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
266 nanoseconds, impala_timestamp);
267}
268
269} // namespace internal
270} // namespace parquet