1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
24 #include "parquet/exception.h"
25 #include "parquet/platform.h"
26 #include "parquet/types.h"
34 } // namespace BitUtil
44 struct ArrowWriteContext
;
45 class ColumnDescriptor
;
48 class ColumnChunkMetaDataBuilder
;
50 class WriterProperties
;
52 class PARQUET_EXPORT LevelEncoder
{
57 static int MaxBufferSize(Encoding::type encoding
, int16_t max_level
,
58 int num_buffered_values
);
60 // Initialize the LevelEncoder.
61 void Init(Encoding::type encoding
, int16_t max_level
, int num_buffered_values
,
62 uint8_t* data
, int data_size
);
64 // Encodes a batch of levels from an array and returns the number of levels encoded
65 int Encode(int batch_size
, const int16_t* levels
);
68 if (encoding_
!= Encoding::RLE
) {
69 throw ParquetException("Only implemented for RLE encoding");
77 Encoding::type encoding_
;
78 std::unique_ptr
<::arrow::util::RleEncoder
> rle_encoder_
;
79 std::unique_ptr
<::arrow::BitUtil::BitWriter
> bit_packed_encoder_
;
82 class PARQUET_EXPORT PageWriter
{
84 virtual ~PageWriter() {}
86 static std::unique_ptr
<PageWriter
> Open(
87 std::shared_ptr
<ArrowOutputStream
> sink
, Compression::type codec
,
88 int compression_level
, ColumnChunkMetaDataBuilder
* metadata
,
89 int16_t row_group_ordinal
= -1, int16_t column_chunk_ordinal
= -1,
90 ::arrow::MemoryPool
* pool
= ::arrow::default_memory_pool(),
91 bool buffered_row_group
= false,
92 std::shared_ptr
<Encryptor
> header_encryptor
= NULLPTR
,
93 std::shared_ptr
<Encryptor
> data_encryptor
= NULLPTR
);
95 // The Column Writer decides if dictionary encoding is used if set and
96 // if the dictionary encoding has fallen back to default encoding on reaching dictionary
98 virtual void Close(bool has_dictionary
, bool fallback
) = 0;
100 // Return the number of uncompressed bytes written (including header size)
101 virtual int64_t WriteDataPage(const DataPage
& page
) = 0;
103 // Return the number of uncompressed bytes written (including header size)
104 virtual int64_t WriteDictionaryPage(const DictionaryPage
& page
) = 0;
106 virtual bool has_compressor() = 0;
108 virtual void Compress(const Buffer
& src_buffer
, ResizableBuffer
* dest_buffer
) = 0;
111 static constexpr int WRITE_BATCH_SIZE
= 1000;
112 class PARQUET_EXPORT ColumnWriter
{
114 virtual ~ColumnWriter() = default;
116 static std::shared_ptr
<ColumnWriter
> Make(ColumnChunkMetaDataBuilder
*,
117 std::unique_ptr
<PageWriter
>,
118 const WriterProperties
* properties
);
120 /// \brief Closes the ColumnWriter, commits any buffered values to pages.
121 /// \return Total size of the column in bytes
122 virtual int64_t Close() = 0;
124 /// \brief The physical Parquet type of the column
125 virtual Type::type
type() const = 0;
127 /// \brief The schema for the column
128 virtual const ColumnDescriptor
* descr() const = 0;
130 /// \brief The number of rows written so far
131 virtual int64_t rows_written() const = 0;
133 /// \brief The total size of the compressed pages + page headers. Some values
134 /// might be still buffered and not written to a page yet
135 virtual int64_t total_compressed_bytes() const = 0;
137 /// \brief The total number of bytes written as serialized data and
138 /// dictionary pages to the ColumnChunk so far
139 virtual int64_t total_bytes_written() const = 0;
141 /// \brief The file-level writer properties
142 virtual const WriterProperties
* properties() = 0;
144 /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
145 /// error status if the array data type is not compatible with the concrete
148 /// leaf_array is always a primitive (possibly dictionary encoded type).
149 /// Leaf_field_nullable indicates whether the leaf array is considered nullable
150 /// according to its schema in a Table or its parent array.
151 virtual ::arrow::Status
WriteArrow(const int16_t* def_levels
, const int16_t* rep_levels
,
152 int64_t num_levels
, const ::arrow::Array
& leaf_array
,
153 ArrowWriteContext
* ctx
,
154 bool leaf_field_nullable
) = 0;
157 // API to write values to a single column. This is the main client facing API.
158 template <typename DType
>
159 class TypedColumnWriter
: public ColumnWriter
{
161 using T
= typename
DType::c_type
;
163 // Write a batch of repetition levels, definition levels, and values to the
165 // `num_values` is the number of logical leaf values.
166 // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
167 // (resp. max repetition level) is 0.
168 // If not null, each of `def_levels` and `rep_levels` must have at least
171 // The number of physical values written (taken from `values`) is returned.
172 // It can be smaller than `num_values` is there are some undefined values.
173 virtual int64_t WriteBatch(int64_t num_values
, const int16_t* def_levels
,
174 const int16_t* rep_levels
, const T
* values
) = 0;
176 /// Write a batch of repetition levels, definition levels, and values to the
179 /// In comparison to WriteBatch the length of repetition and definition levels
180 /// is the same as of the number of values read for max_definition_level == 1.
181 /// In the case of max_definition_level > 1, the repetition and definition
182 /// levels are larger than the values but the values include the null entries
183 /// with definition_level == (max_definition_level - 1). Thus we have to differentiate
184 /// in the parameters of this function if the input has the length of num_values or the
185 /// _number of rows in the lowest nesting level_.
187 /// In the case that the most inner node in the Parquet is required, the _number of rows
188 /// in the lowest nesting level_ is equal to the number of non-null values. If the
189 /// inner-most schema node is optional, the _number of rows in the lowest nesting level_
190 /// also includes all values with definition_level == (max_definition_level - 1).
192 /// @param num_values number of levels to write.
193 /// @param def_levels The Parquet definition levels, length is num_values
194 /// @param rep_levels The Parquet repetition levels, length is num_values
195 /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting
196 /// level. The length is number of rows in the lowest nesting level.
197 /// @param valid_bits_offset The offset in bits of the valid_bits where the
198 /// first relevant bit resides.
199 /// @param values The values in the lowest nested level including
200 /// spacing for nulls on the lowest levels; input has the length
201 /// of the number of rows on the lowest nesting level.
202 virtual void WriteBatchSpaced(int64_t num_values
, const int16_t* def_levels
,
203 const int16_t* rep_levels
, const uint8_t* valid_bits
,
204 int64_t valid_bits_offset
, const T
* values
) = 0;
206 // Estimated size of the values that are not written to a page yet
207 virtual int64_t EstimatedBufferedValueBytes() const = 0;
210 using BoolWriter
= TypedColumnWriter
<BooleanType
>;
211 using Int32Writer
= TypedColumnWriter
<Int32Type
>;
212 using Int64Writer
= TypedColumnWriter
<Int64Type
>;
213 using Int96Writer
= TypedColumnWriter
<Int96Type
>;
214 using FloatWriter
= TypedColumnWriter
<FloatType
>;
215 using DoubleWriter
= TypedColumnWriter
<DoubleType
>;
216 using ByteArrayWriter
= TypedColumnWriter
<ByteArrayType
>;
217 using FixedLenByteArrayWriter
= TypedColumnWriter
<FLBAType
>;
222 * Timestamp conversion constants
224 constexpr int64_t kJulianEpochOffsetDays
= INT64_C(2440588);
226 template <int64_t UnitPerDay
, int64_t NanosecondsPerUnit
>
227 inline void ArrowTimestampToImpalaTimestamp(const int64_t time
, Int96
* impala_timestamp
) {
228 int64_t julian_days
= (time
/ UnitPerDay
) + kJulianEpochOffsetDays
;
229 (*impala_timestamp
).value
[2] = (uint32_t)julian_days
;
231 int64_t last_day_units
= time
% UnitPerDay
;
232 auto last_day_nanos
= last_day_units
* NanosecondsPerUnit
;
233 // impala_timestamp will be unaligned every other entry so do memcpy instead
234 // of assign and reinterpret cast to avoid undefined behavior.
235 std::memcpy(impala_timestamp
, &last_day_nanos
, sizeof(int64_t));
238 constexpr int64_t kSecondsInNanos
= INT64_C(1000000000);
240 inline void SecondsToImpalaTimestamp(const int64_t seconds
, Int96
* impala_timestamp
) {
241 ArrowTimestampToImpalaTimestamp
<kSecondsPerDay
, kSecondsInNanos
>(seconds
,
245 constexpr int64_t kMillisecondsInNanos
= kSecondsInNanos
/ INT64_C(1000);
247 inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds
,
248 Int96
* impala_timestamp
) {
249 ArrowTimestampToImpalaTimestamp
<kMillisecondsPerDay
, kMillisecondsInNanos
>(
250 milliseconds
, impala_timestamp
);
253 constexpr int64_t kMicrosecondsInNanos
= kMillisecondsInNanos
/ INT64_C(1000);
255 inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds
,
256 Int96
* impala_timestamp
) {
257 ArrowTimestampToImpalaTimestamp
<kMicrosecondsPerDay
, kMicrosecondsInNanos
>(
258 microseconds
, impala_timestamp
);
261 constexpr int64_t kNanosecondsInNanos
= INT64_C(1);
263 inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds
,
264 Int96
* impala_timestamp
) {
265 ArrowTimestampToImpalaTimestamp
<kNanosecondsPerDay
, kNanosecondsInNanos
>(
266 nanoseconds
, impala_timestamp
);
269 } // namespace internal
270 } // namespace parquet