]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/util/compression_benchmark.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / util / compression_benchmark.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "benchmark/benchmark.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstring>
23#include <memory>
24#include <random>
25#include <string>
26#include <vector>
27
28#include "arrow/result.h"
29#include "arrow/util/compression.h"
30#include "arrow/util/logging.h"
31#include "arrow/util/macros.h"
32
33namespace arrow {
34namespace util {
35
36#ifdef ARROW_WITH_BENCHMARKS_REFERENCE
37
38std::vector<uint8_t> MakeCompressibleData(int data_size) {
39 // XXX This isn't a real-world corpus so doesn't really represent the
40 // comparative qualities of the algorithms
41
42 // First make highly compressible data
43 std::string base_data =
44 "Apache Arrow is a cross-language development platform for in-memory data";
45 int nrepeats = static_cast<int>(1 + data_size / base_data.size());
46
47 std::vector<uint8_t> data(base_data.size() * nrepeats);
48 for (int i = 0; i < nrepeats; ++i) {
49 std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size());
50 }
51 data.resize(data_size);
52
53 // Then randomly mutate some bytes so as to make things harder
54 std::mt19937 engine(42);
55 std::exponential_distribution<> offsets(0.05);
56 std::uniform_int_distribution<> values(0, 255);
57
58 int64_t pos = 0;
59 while (pos < data_size) {
60 data[pos] = static_cast<uint8_t>(values(engine));
61 pos += static_cast<int64_t>(offsets(engine));
62 }
63
64 return data;
65}
66
67int64_t StreamingCompress(Codec* codec, const std::vector<uint8_t>& data,
68 std::vector<uint8_t>* compressed_data = nullptr) {
69 if (compressed_data != nullptr) {
70 compressed_data->clear();
71 compressed_data->shrink_to_fit();
72 }
73 auto compressor = *codec->MakeCompressor();
74
75 const uint8_t* input = data.data();
76 int64_t input_len = data.size();
77 int64_t compressed_size = 0;
78
79 std::vector<uint8_t> output_buffer(1 << 20); // 1 MB
80
81 while (input_len > 0) {
82 auto result = *compressor->Compress(input_len, input, output_buffer.size(),
83 output_buffer.data());
84 input += result.bytes_read;
85 input_len -= result.bytes_read;
86 compressed_size += result.bytes_written;
87 if (compressed_data != nullptr && result.bytes_written > 0) {
88 compressed_data->resize(compressed_data->size() + result.bytes_written);
89 memcpy(compressed_data->data() + compressed_data->size() - result.bytes_written,
90 output_buffer.data(), result.bytes_written);
91 }
92 if (result.bytes_read == 0) {
93 // Need to enlarge output buffer
94 output_buffer.resize(output_buffer.size() * 2);
95 }
96 }
97 while (true) {
98 auto result = *compressor->End(output_buffer.size(), output_buffer.data());
99 compressed_size += result.bytes_written;
100 if (compressed_data != nullptr && result.bytes_written > 0) {
101 compressed_data->resize(compressed_data->size() + result.bytes_written);
102 memcpy(compressed_data->data() + compressed_data->size() - result.bytes_written,
103 output_buffer.data(), result.bytes_written);
104 }
105 if (result.should_retry) {
106 // Need to enlarge output buffer
107 output_buffer.resize(output_buffer.size() * 2);
108 } else {
109 break;
110 }
111 }
112 return compressed_size;
113}
114
115static void StreamingCompression(Compression::type compression,
116 const std::vector<uint8_t>& data,
117 benchmark::State& state) { // NOLINT non-const reference
118 auto codec = *Codec::Create(compression);
119
120 while (state.KeepRunning()) {
121 int64_t compressed_size = StreamingCompress(codec.get(), data);
122 state.counters["ratio"] =
123 static_cast<double>(data.size()) / static_cast<double>(compressed_size);
124 }
125 state.SetBytesProcessed(state.iterations() * data.size());
126}
127
128template <Compression::type COMPRESSION>
129static void ReferenceStreamingCompression(
130 benchmark::State& state) { // NOLINT non-const reference
131 auto data = MakeCompressibleData(8 * 1024 * 1024); // 8 MB
132
133 StreamingCompression(COMPRESSION, data, state);
134}
135
136static void StreamingDecompression(
137 Compression::type compression, const std::vector<uint8_t>& data,
138 benchmark::State& state) { // NOLINT non-const reference
139 auto codec = *Codec::Create(compression);
140
141 std::vector<uint8_t> compressed_data;
142 ARROW_UNUSED(StreamingCompress(codec.get(), data, &compressed_data));
143 state.counters["ratio"] =
144 static_cast<double>(data.size()) / static_cast<double>(compressed_data.size());
145
146 while (state.KeepRunning()) {
147 auto decompressor = *codec->MakeDecompressor();
148
149 const uint8_t* input = compressed_data.data();
150 int64_t input_len = compressed_data.size();
151 int64_t decompressed_size = 0;
152
153 std::vector<uint8_t> output_buffer(1 << 20); // 1 MB
154 while (!decompressor->IsFinished()) {
155 auto result = *decompressor->Decompress(input_len, input, output_buffer.size(),
156 output_buffer.data());
157 input += result.bytes_read;
158 input_len -= result.bytes_read;
159 decompressed_size += result.bytes_written;
160 if (result.need_more_output) {
161 // Enlarge output buffer
162 output_buffer.resize(output_buffer.size() * 2);
163 }
164 }
165 ARROW_CHECK(decompressed_size == static_cast<int64_t>(data.size()));
166 }
167 state.SetBytesProcessed(state.iterations() * data.size());
168}
169
170template <Compression::type COMPRESSION>
171static void ReferenceStreamingDecompression(
172 benchmark::State& state) { // NOLINT non-const reference
173 auto data = MakeCompressibleData(8 * 1024 * 1024); // 8 MB
174
175 StreamingDecompression(COMPRESSION, data, state);
176}
177
178#ifdef ARROW_WITH_ZLIB
179BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::GZIP);
180BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::GZIP);
181#endif
182
183#ifdef ARROW_WITH_BROTLI
184BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::BROTLI);
185BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::BROTLI);
186#endif
187
188#ifdef ARROW_WITH_ZSTD
189BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::ZSTD);
190BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::ZSTD);
191#endif
192
193#ifdef ARROW_WITH_LZ4
194BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::LZ4_FRAME);
195BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::LZ4_FRAME);
196#endif
197
198#endif
199
200} // namespace util
201} // namespace arrow