]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include "benchmark/benchmark.h" | |
19 | ||
20 | #include <algorithm> | |
21 | #include <cstdint> | |
22 | #include <cstring> | |
23 | #include <memory> | |
24 | #include <random> | |
25 | #include <string> | |
26 | #include <vector> | |
27 | ||
28 | #include "arrow/result.h" | |
29 | #include "arrow/util/compression.h" | |
30 | #include "arrow/util/logging.h" | |
31 | #include "arrow/util/macros.h" | |
32 | ||
33 | namespace arrow { | |
34 | namespace util { | |
35 | ||
36 | #ifdef ARROW_WITH_BENCHMARKS_REFERENCE | |
37 | ||
38 | std::vector<uint8_t> MakeCompressibleData(int data_size) { | |
39 | // XXX This isn't a real-world corpus so doesn't really represent the | |
40 | // comparative qualities of the algorithms | |
41 | ||
42 | // First make highly compressible data | |
43 | std::string base_data = | |
44 | "Apache Arrow is a cross-language development platform for in-memory data"; | |
45 | int nrepeats = static_cast<int>(1 + data_size / base_data.size()); | |
46 | ||
47 | std::vector<uint8_t> data(base_data.size() * nrepeats); | |
48 | for (int i = 0; i < nrepeats; ++i) { | |
49 | std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size()); | |
50 | } | |
51 | data.resize(data_size); | |
52 | ||
53 | // Then randomly mutate some bytes so as to make things harder | |
54 | std::mt19937 engine(42); | |
55 | std::exponential_distribution<> offsets(0.05); | |
56 | std::uniform_int_distribution<> values(0, 255); | |
57 | ||
58 | int64_t pos = 0; | |
59 | while (pos < data_size) { | |
60 | data[pos] = static_cast<uint8_t>(values(engine)); | |
61 | pos += static_cast<int64_t>(offsets(engine)); | |
62 | } | |
63 | ||
64 | return data; | |
65 | } | |
66 | ||
67 | int64_t StreamingCompress(Codec* codec, const std::vector<uint8_t>& data, | |
68 | std::vector<uint8_t>* compressed_data = nullptr) { | |
69 | if (compressed_data != nullptr) { | |
70 | compressed_data->clear(); | |
71 | compressed_data->shrink_to_fit(); | |
72 | } | |
73 | auto compressor = *codec->MakeCompressor(); | |
74 | ||
75 | const uint8_t* input = data.data(); | |
76 | int64_t input_len = data.size(); | |
77 | int64_t compressed_size = 0; | |
78 | ||
79 | std::vector<uint8_t> output_buffer(1 << 20); // 1 MB | |
80 | ||
81 | while (input_len > 0) { | |
82 | auto result = *compressor->Compress(input_len, input, output_buffer.size(), | |
83 | output_buffer.data()); | |
84 | input += result.bytes_read; | |
85 | input_len -= result.bytes_read; | |
86 | compressed_size += result.bytes_written; | |
87 | if (compressed_data != nullptr && result.bytes_written > 0) { | |
88 | compressed_data->resize(compressed_data->size() + result.bytes_written); | |
89 | memcpy(compressed_data->data() + compressed_data->size() - result.bytes_written, | |
90 | output_buffer.data(), result.bytes_written); | |
91 | } | |
92 | if (result.bytes_read == 0) { | |
93 | // Need to enlarge output buffer | |
94 | output_buffer.resize(output_buffer.size() * 2); | |
95 | } | |
96 | } | |
97 | while (true) { | |
98 | auto result = *compressor->End(output_buffer.size(), output_buffer.data()); | |
99 | compressed_size += result.bytes_written; | |
100 | if (compressed_data != nullptr && result.bytes_written > 0) { | |
101 | compressed_data->resize(compressed_data->size() + result.bytes_written); | |
102 | memcpy(compressed_data->data() + compressed_data->size() - result.bytes_written, | |
103 | output_buffer.data(), result.bytes_written); | |
104 | } | |
105 | if (result.should_retry) { | |
106 | // Need to enlarge output buffer | |
107 | output_buffer.resize(output_buffer.size() * 2); | |
108 | } else { | |
109 | break; | |
110 | } | |
111 | } | |
112 | return compressed_size; | |
113 | } | |
114 | ||
115 | static void StreamingCompression(Compression::type compression, | |
116 | const std::vector<uint8_t>& data, | |
117 | benchmark::State& state) { // NOLINT non-const reference | |
118 | auto codec = *Codec::Create(compression); | |
119 | ||
120 | while (state.KeepRunning()) { | |
121 | int64_t compressed_size = StreamingCompress(codec.get(), data); | |
122 | state.counters["ratio"] = | |
123 | static_cast<double>(data.size()) / static_cast<double>(compressed_size); | |
124 | } | |
125 | state.SetBytesProcessed(state.iterations() * data.size()); | |
126 | } | |
127 | ||
128 | template <Compression::type COMPRESSION> | |
129 | static void ReferenceStreamingCompression( | |
130 | benchmark::State& state) { // NOLINT non-const reference | |
131 | auto data = MakeCompressibleData(8 * 1024 * 1024); // 8 MB | |
132 | ||
133 | StreamingCompression(COMPRESSION, data, state); | |
134 | } | |
135 | ||
136 | static void StreamingDecompression( | |
137 | Compression::type compression, const std::vector<uint8_t>& data, | |
138 | benchmark::State& state) { // NOLINT non-const reference | |
139 | auto codec = *Codec::Create(compression); | |
140 | ||
141 | std::vector<uint8_t> compressed_data; | |
142 | ARROW_UNUSED(StreamingCompress(codec.get(), data, &compressed_data)); | |
143 | state.counters["ratio"] = | |
144 | static_cast<double>(data.size()) / static_cast<double>(compressed_data.size()); | |
145 | ||
146 | while (state.KeepRunning()) { | |
147 | auto decompressor = *codec->MakeDecompressor(); | |
148 | ||
149 | const uint8_t* input = compressed_data.data(); | |
150 | int64_t input_len = compressed_data.size(); | |
151 | int64_t decompressed_size = 0; | |
152 | ||
153 | std::vector<uint8_t> output_buffer(1 << 20); // 1 MB | |
154 | while (!decompressor->IsFinished()) { | |
155 | auto result = *decompressor->Decompress(input_len, input, output_buffer.size(), | |
156 | output_buffer.data()); | |
157 | input += result.bytes_read; | |
158 | input_len -= result.bytes_read; | |
159 | decompressed_size += result.bytes_written; | |
160 | if (result.need_more_output) { | |
161 | // Enlarge output buffer | |
162 | output_buffer.resize(output_buffer.size() * 2); | |
163 | } | |
164 | } | |
165 | ARROW_CHECK(decompressed_size == static_cast<int64_t>(data.size())); | |
166 | } | |
167 | state.SetBytesProcessed(state.iterations() * data.size()); | |
168 | } | |
169 | ||
170 | template <Compression::type COMPRESSION> | |
171 | static void ReferenceStreamingDecompression( | |
172 | benchmark::State& state) { // NOLINT non-const reference | |
173 | auto data = MakeCompressibleData(8 * 1024 * 1024); // 8 MB | |
174 | ||
175 | StreamingDecompression(COMPRESSION, data, state); | |
176 | } | |
177 | ||
178 | #ifdef ARROW_WITH_ZLIB | |
179 | BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::GZIP); | |
180 | BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::GZIP); | |
181 | #endif | |
182 | ||
183 | #ifdef ARROW_WITH_BROTLI | |
184 | BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::BROTLI); | |
185 | BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::BROTLI); | |
186 | #endif | |
187 | ||
188 | #ifdef ARROW_WITH_ZSTD | |
189 | BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::ZSTD); | |
190 | BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::ZSTD); | |
191 | #endif | |
192 | ||
193 | #ifdef ARROW_WITH_LZ4 | |
194 | BENCHMARK_TEMPLATE(ReferenceStreamingCompression, Compression::LZ4_FRAME); | |
195 | BENCHMARK_TEMPLATE(ReferenceStreamingDecompression, Compression::LZ4_FRAME); | |
196 | #endif | |
197 | ||
198 | #endif | |
199 | ||
200 | } // namespace util | |
201 | } // namespace arrow |