]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #pragma once | |
19 | ||
20 | #include <cstdint> | |
21 | #include <cstring> | |
22 | #include <memory> | |
23 | ||
24 | #include "parquet/exception.h" | |
25 | #include "parquet/platform.h" | |
26 | #include "parquet/types.h" | |
27 | ||
28 | namespace arrow { | |
29 | ||
30 | class Array; | |
31 | ||
32 | namespace BitUtil { | |
33 | class BitWriter; | |
34 | } // namespace BitUtil | |
35 | ||
36 | namespace util { | |
37 | class RleEncoder; | |
38 | } // namespace util | |
39 | ||
40 | } // namespace arrow | |
41 | ||
42 | namespace parquet { | |
43 | ||
44 | struct ArrowWriteContext; | |
45 | class ColumnDescriptor; | |
46 | class DataPage; | |
47 | class DictionaryPage; | |
48 | class ColumnChunkMetaDataBuilder; | |
49 | class Encryptor; | |
50 | class WriterProperties; | |
51 | ||
52 | class PARQUET_EXPORT LevelEncoder { | |
53 | public: | |
54 | LevelEncoder(); | |
55 | ~LevelEncoder(); | |
56 | ||
57 | static int MaxBufferSize(Encoding::type encoding, int16_t max_level, | |
58 | int num_buffered_values); | |
59 | ||
60 | // Initialize the LevelEncoder. | |
61 | void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values, | |
62 | uint8_t* data, int data_size); | |
63 | ||
64 | // Encodes a batch of levels from an array and returns the number of levels encoded | |
65 | int Encode(int batch_size, const int16_t* levels); | |
66 | ||
67 | int32_t len() { | |
68 | if (encoding_ != Encoding::RLE) { | |
69 | throw ParquetException("Only implemented for RLE encoding"); | |
70 | } | |
71 | return rle_length_; | |
72 | } | |
73 | ||
74 | private: | |
75 | int bit_width_; | |
76 | int rle_length_; | |
77 | Encoding::type encoding_; | |
78 | std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_; | |
79 | std::unique_ptr<::arrow::BitUtil::BitWriter> bit_packed_encoder_; | |
80 | }; | |
81 | ||
82 | class PARQUET_EXPORT PageWriter { | |
83 | public: | |
84 | virtual ~PageWriter() {} | |
85 | ||
86 | static std::unique_ptr<PageWriter> Open( | |
87 | std::shared_ptr<ArrowOutputStream> sink, Compression::type codec, | |
88 | int compression_level, ColumnChunkMetaDataBuilder* metadata, | |
89 | int16_t row_group_ordinal = -1, int16_t column_chunk_ordinal = -1, | |
90 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), | |
91 | bool buffered_row_group = false, | |
92 | std::shared_ptr<Encryptor> header_encryptor = NULLPTR, | |
93 | std::shared_ptr<Encryptor> data_encryptor = NULLPTR); | |
94 | ||
95 | // The Column Writer decides if dictionary encoding is used if set and | |
96 | // if the dictionary encoding has fallen back to default encoding on reaching dictionary | |
97 | // page limit | |
98 | virtual void Close(bool has_dictionary, bool fallback) = 0; | |
99 | ||
100 | // Return the number of uncompressed bytes written (including header size) | |
101 | virtual int64_t WriteDataPage(const DataPage& page) = 0; | |
102 | ||
103 | // Return the number of uncompressed bytes written (including header size) | |
104 | virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; | |
105 | ||
106 | virtual bool has_compressor() = 0; | |
107 | ||
108 | virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0; | |
109 | }; | |
110 | ||
111 | static constexpr int WRITE_BATCH_SIZE = 1000; | |
112 | class PARQUET_EXPORT ColumnWriter { | |
113 | public: | |
114 | virtual ~ColumnWriter() = default; | |
115 | ||
116 | static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*, | |
117 | std::unique_ptr<PageWriter>, | |
118 | const WriterProperties* properties); | |
119 | ||
120 | /// \brief Closes the ColumnWriter, commits any buffered values to pages. | |
121 | /// \return Total size of the column in bytes | |
122 | virtual int64_t Close() = 0; | |
123 | ||
124 | /// \brief The physical Parquet type of the column | |
125 | virtual Type::type type() const = 0; | |
126 | ||
127 | /// \brief The schema for the column | |
128 | virtual const ColumnDescriptor* descr() const = 0; | |
129 | ||
130 | /// \brief The number of rows written so far | |
131 | virtual int64_t rows_written() const = 0; | |
132 | ||
133 | /// \brief The total size of the compressed pages + page headers. Some values | |
134 | /// might be still buffered and not written to a page yet | |
135 | virtual int64_t total_compressed_bytes() const = 0; | |
136 | ||
137 | /// \brief The total number of bytes written as serialized data and | |
138 | /// dictionary pages to the ColumnChunk so far | |
139 | virtual int64_t total_bytes_written() const = 0; | |
140 | ||
141 | /// \brief The file-level writer properties | |
142 | virtual const WriterProperties* properties() = 0; | |
143 | ||
144 | /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns | |
145 | /// error status if the array data type is not compatible with the concrete | |
146 | /// writer type. | |
147 | /// | |
148 | /// leaf_array is always a primitive (possibly dictionary encoded type). | |
149 | /// Leaf_field_nullable indicates whether the leaf array is considered nullable | |
150 | /// according to its schema in a Table or its parent array. | |
151 | virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, | |
152 | int64_t num_levels, const ::arrow::Array& leaf_array, | |
153 | ArrowWriteContext* ctx, | |
154 | bool leaf_field_nullable) = 0; | |
155 | }; | |
156 | ||
157 | // API to write values to a single column. This is the main client facing API. | |
158 | template <typename DType> | |
159 | class TypedColumnWriter : public ColumnWriter { | |
160 | public: | |
161 | using T = typename DType::c_type; | |
162 | ||
163 | // Write a batch of repetition levels, definition levels, and values to the | |
164 | // column. | |
165 | // `num_values` is the number of logical leaf values. | |
166 | // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level | |
167 | // (resp. max repetition level) is 0. | |
168 | // If not null, each of `def_levels` and `rep_levels` must have at least | |
169 | // `num_values`. | |
170 | // | |
171 | // The number of physical values written (taken from `values`) is returned. | |
172 | // It can be smaller than `num_values` is there are some undefined values. | |
173 | virtual int64_t WriteBatch(int64_t num_values, const int16_t* def_levels, | |
174 | const int16_t* rep_levels, const T* values) = 0; | |
175 | ||
176 | /// Write a batch of repetition levels, definition levels, and values to the | |
177 | /// column. | |
178 | /// | |
179 | /// In comparison to WriteBatch the length of repetition and definition levels | |
180 | /// is the same as of the number of values read for max_definition_level == 1. | |
181 | /// In the case of max_definition_level > 1, the repetition and definition | |
182 | /// levels are larger than the values but the values include the null entries | |
183 | /// with definition_level == (max_definition_level - 1). Thus we have to differentiate | |
184 | /// in the parameters of this function if the input has the length of num_values or the | |
185 | /// _number of rows in the lowest nesting level_. | |
186 | /// | |
187 | /// In the case that the most inner node in the Parquet is required, the _number of rows | |
188 | /// in the lowest nesting level_ is equal to the number of non-null values. If the | |
189 | /// inner-most schema node is optional, the _number of rows in the lowest nesting level_ | |
190 | /// also includes all values with definition_level == (max_definition_level - 1). | |
191 | /// | |
192 | /// @param num_values number of levels to write. | |
193 | /// @param def_levels The Parquet definition levels, length is num_values | |
194 | /// @param rep_levels The Parquet repetition levels, length is num_values | |
195 | /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting | |
196 | /// level. The length is number of rows in the lowest nesting level. | |
197 | /// @param valid_bits_offset The offset in bits of the valid_bits where the | |
198 | /// first relevant bit resides. | |
199 | /// @param values The values in the lowest nested level including | |
200 | /// spacing for nulls on the lowest levels; input has the length | |
201 | /// of the number of rows on the lowest nesting level. | |
202 | virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels, | |
203 | const int16_t* rep_levels, const uint8_t* valid_bits, | |
204 | int64_t valid_bits_offset, const T* values) = 0; | |
205 | ||
206 | // Estimated size of the values that are not written to a page yet | |
207 | virtual int64_t EstimatedBufferedValueBytes() const = 0; | |
208 | }; | |
209 | ||
210 | using BoolWriter = TypedColumnWriter<BooleanType>; | |
211 | using Int32Writer = TypedColumnWriter<Int32Type>; | |
212 | using Int64Writer = TypedColumnWriter<Int64Type>; | |
213 | using Int96Writer = TypedColumnWriter<Int96Type>; | |
214 | using FloatWriter = TypedColumnWriter<FloatType>; | |
215 | using DoubleWriter = TypedColumnWriter<DoubleType>; | |
216 | using ByteArrayWriter = TypedColumnWriter<ByteArrayType>; | |
217 | using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>; | |
218 | ||
219 | namespace internal { | |
220 | ||
221 | /** | |
222 | * Timestamp conversion constants | |
223 | */ | |
224 | constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588); | |
225 | ||
226 | template <int64_t UnitPerDay, int64_t NanosecondsPerUnit> | |
227 | inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) { | |
228 | int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays; | |
229 | (*impala_timestamp).value[2] = (uint32_t)julian_days; | |
230 | ||
231 | int64_t last_day_units = time % UnitPerDay; | |
232 | auto last_day_nanos = last_day_units * NanosecondsPerUnit; | |
233 | // impala_timestamp will be unaligned every other entry so do memcpy instead | |
234 | // of assign and reinterpret cast to avoid undefined behavior. | |
235 | std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t)); | |
236 | } | |
237 | ||
238 | constexpr int64_t kSecondsInNanos = INT64_C(1000000000); | |
239 | ||
240 | inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) { | |
241 | ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds, | |
242 | impala_timestamp); | |
243 | } | |
244 | ||
245 | constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000); | |
246 | ||
247 | inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds, | |
248 | Int96* impala_timestamp) { | |
249 | ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>( | |
250 | milliseconds, impala_timestamp); | |
251 | } | |
252 | ||
253 | constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000); | |
254 | ||
255 | inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds, | |
256 | Int96* impala_timestamp) { | |
257 | ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>( | |
258 | microseconds, impala_timestamp); | |
259 | } | |
260 | ||
261 | constexpr int64_t kNanosecondsInNanos = INT64_C(1); | |
262 | ||
263 | inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, | |
264 | Int96* impala_timestamp) { | |
265 | ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>( | |
266 | nanoseconds, impala_timestamp); | |
267 | } | |
268 | ||
269 | } // namespace internal | |
270 | } // namespace parquet |