--- /dev/null
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package encoding_test
+
+import (
+ "encoding/binary"
+ "strconv"
+ "testing"
+
+ "github.com/apache/arrow/go/v6/arrow"
+ "github.com/apache/arrow/go/v6/arrow/memory"
+ "github.com/apache/arrow/go/v6/parquet"
+ "github.com/apache/arrow/go/v6/parquet/internal/encoding"
+ "github.com/apache/arrow/go/v6/parquet/internal/utils"
+ "github.com/stretchr/testify/assert"
+)
+
+func generateLevels(minRepeat, maxRepeat int, maxLevel int16) []int16 {
+ // for each repetition count up to max repeat
+ ret := make([]int16, 0)
+ for rep := minRepeat; rep <= maxRepeat; rep++ {
+ var (
+ repCount = 1 << rep
+ val int16 = 0
+ bwidth = 0
+ )
+ // generate levels for repetition count up to max level
+ for val <= maxLevel {
+ for i := 0; i < repCount; i++ {
+ ret = append(ret, val)
+ }
+ val = int16((2 << bwidth) - 1)
+ bwidth++
+ }
+ }
+ return ret
+}
+
+func encodeLevels(t *testing.T, enc parquet.Encoding, maxLvl int16, numLevels int, input []int16) []byte {
+ var (
+ encoder encoding.LevelEncoder
+ lvlCount = 0
+ buf = encoding.NewBufferWriter(2*numLevels, memory.DefaultAllocator)
+ )
+
+ if enc == parquet.Encodings.RLE {
+ buf.SetOffset(arrow.Int32SizeBytes)
+ // leave space to write the rle length value
+ encoder.Init(enc, maxLvl, buf)
+ lvlCount, _ = encoder.Encode(input)
+ buf.SetOffset(0)
+ arrow.Int32Traits.CastFromBytes(buf.Bytes())[0] = utils.ToLEInt32(int32(encoder.Len()))
+ } else {
+ encoder.Init(enc, maxLvl, buf)
+ lvlCount, _ = encoder.Encode(input)
+ }
+
+ assert.Equal(t, numLevels, lvlCount)
+ return buf.Bytes()
+}
+
+func verifyDecodingLvls(t *testing.T, enc parquet.Encoding, maxLvl int16, input []int16, buf []byte) {
+ var (
+ decoder encoding.LevelDecoder
+ lvlCount = 0
+ numLevels = len(input)
+ output = make([]int16, numLevels)
+ decodeCount = 4
+ numInnerLevels = numLevels / decodeCount
+ )
+
+ // decode levels and test with multiple decode calls
+ _, err := decoder.SetData(enc, maxLvl, numLevels, buf)
+ assert.NoError(t, err)
+ // try multiple decoding on a single setdata call
+ for ct := 0; ct < decodeCount; ct++ {
+ offset := ct * numInnerLevels
+ lvlCount, _ = decoder.Decode(output[:numInnerLevels])
+ assert.Equal(t, numInnerLevels, lvlCount)
+ assert.Equal(t, input[offset:offset+numInnerLevels], output[:numInnerLevels])
+ }
+
+ // check the remaining levels
+ var (
+ levelsCompleted = decodeCount * (numLevels / decodeCount)
+ remaining = numLevels - levelsCompleted
+ )
+
+ if remaining > 0 {
+ lvlCount, _ = decoder.Decode(output[:remaining])
+ assert.Equal(t, remaining, lvlCount)
+ assert.Equal(t, input[levelsCompleted:], output[:remaining])
+ }
+ // test decode zero values
+ lvlCount, _ = decoder.Decode(output[:1])
+ assert.Zero(t, lvlCount)
+}
+
+func verifyDecodingMultipleSetData(t *testing.T, enc parquet.Encoding, max int16, input []int16, buf [][]byte) {
+ var (
+ decoder encoding.LevelDecoder
+ lvlCount = 0
+ setdataCount = len(buf)
+ numLevels = len(input) / setdataCount
+ output = make([]int16, numLevels)
+ )
+
+ for ct := 0; ct < setdataCount; ct++ {
+ offset := ct * numLevels
+ assert.Len(t, output, numLevels)
+ _, err := decoder.SetData(enc, max, numLevels, buf[ct])
+ assert.NoError(t, err)
+ lvlCount, _ = decoder.Decode(output)
+ assert.Equal(t, numLevels, lvlCount)
+ assert.Equal(t, input[offset:offset+numLevels], output)
+ }
+}
+
+func TestLevelsDecodeMultipleBitWidth(t *testing.T) {
+ t.Parallel()
+ // Test levels with maximum bit-width from 1 to 8
+ // increase the repetition count for each iteration by a factor of 2
+ var (
+ minRepeat = 0
+ maxRepeat = 7 // 128
+ maxBitWidth = 8
+ input []int16
+ buf []byte
+ encodings = [2]parquet.Encoding{parquet.Encodings.RLE, parquet.Encodings.BitPacked}
+ )
+
+ for _, enc := range encodings {
+ t.Run(enc.String(), func(t *testing.T) {
+ // bitpacked requires a sequence of at least 8
+ if enc == parquet.Encodings.BitPacked {
+ minRepeat = 3
+ }
+ // for each max bit width
+ for bitWidth := 1; bitWidth <= maxBitWidth; bitWidth++ {
+ t.Run(strconv.Itoa(bitWidth), func(t *testing.T) {
+ max := int16((1 << bitWidth) - 1)
+ // generate levels
+ input = generateLevels(minRepeat, maxRepeat, max)
+ assert.NotPanics(t, func() {
+ buf = encodeLevels(t, enc, max, len(input), input)
+ })
+ assert.NotPanics(t, func() {
+ verifyDecodingLvls(t, enc, max, input, buf)
+ })
+ })
+ }
+ })
+ }
+}
+
+func TestLevelsDecodeMultipleSetData(t *testing.T) {
+ t.Parallel()
+
+ var (
+ minRepeat = 3
+ maxRepeat = 7
+ bitWidth = 8
+ maxLevel = int16((1 << bitWidth) - 1)
+ encodings = [2]parquet.Encoding{parquet.Encodings.RLE, parquet.Encodings.BitPacked}
+ )
+
+ input := generateLevels(minRepeat, maxRepeat, maxLevel)
+
+ var (
+ numLevels = len(input)
+ setdataFactor = 8
+ splitLevelSize = numLevels / setdataFactor
+ buf = make([][]byte, setdataFactor)
+ )
+
+ for _, enc := range encodings {
+ t.Run(enc.String(), func(t *testing.T) {
+ for rf := 0; rf < setdataFactor; rf++ {
+ offset := rf * splitLevelSize
+ assert.NotPanics(t, func() {
+ buf[rf] = encodeLevels(t, enc, maxLevel, splitLevelSize, input[offset:offset+splitLevelSize])
+ })
+ }
+ assert.NotPanics(t, func() {
+ verifyDecodingMultipleSetData(t, enc, maxLevel, input, buf)
+ })
+ })
+ }
+}
+
+func TestMinimumBufferSize(t *testing.T) {
+ t.Parallel()
+
+ const numToEncode = 1024
+ levels := make([]int16, numToEncode)
+
+ for idx := range levels {
+ if idx%9 == 0 {
+ levels[idx] = 0
+ } else {
+ levels[idx] = 1
+ }
+ }
+
+ output := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+
+ var encoder encoding.LevelEncoder
+ encoder.Init(parquet.Encodings.RLE, 1, output)
+ count, _ := encoder.Encode(levels)
+ assert.Equal(t, numToEncode, count)
+}
+
+func TestMinimumBufferSize2(t *testing.T) {
+ t.Parallel()
+
+ // test the worst case for bit_width=2 consisting of
+ // LiteralRun(size=8)
+ // RepeatedRun(size=8)
+ // LiteralRun(size=8)
+ // ...
+ const numToEncode = 1024
+ levels := make([]int16, numToEncode)
+
+ for idx := range levels {
+ // This forces a literal run of 00000001
+ // followed by eight 1s
+ if (idx % 16) < 7 {
+ levels[idx] = 0
+ } else {
+ levels[idx] = 1
+ }
+ }
+
+ for bitWidth := int16(1); bitWidth <= 8; bitWidth++ {
+ output := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+
+ var encoder encoding.LevelEncoder
+ encoder.Init(parquet.Encodings.RLE, bitWidth, output)
+ count, _ := encoder.Encode(levels)
+ assert.Equal(t, numToEncode, count)
+ }
+}
+
+func TestEncodeDecodeLevels(t *testing.T) {
+ t.Parallel()
+ const numToEncode = 2048
+ levels := make([]int16, numToEncode)
+ numones := 0
+ for idx := range levels {
+ if (idx % 16) < 7 {
+ levels[idx] = 0
+ } else {
+ levels[idx] = 1
+ numones++
+ }
+ }
+
+ output := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+
+ var encoder encoding.LevelEncoder
+ encoder.Init(parquet.Encodings.RLE, 1, output)
+ count, _ := encoder.Encode(levels)
+ assert.Equal(t, numToEncode, count)
+ encoder.Flush()
+
+ buf := output.Bytes()
+ var prefix [4]byte
+ binary.LittleEndian.PutUint32(prefix[:], uint32(len(buf)))
+
+ var decoder encoding.LevelDecoder
+ _, err := decoder.SetData(parquet.Encodings.RLE, 1, numToEncode, append(prefix[:], buf...))
+ assert.NoError(t, err)
+
+ var levelOut [numToEncode]int16
+ total, vals := decoder.Decode(levelOut[:])
+ assert.EqualValues(t, numToEncode, total)
+ assert.EqualValues(t, numones, vals)
+ assert.Equal(t, levels, levelOut[:])
+}