]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #include <time.h> | |
17 | #include <boost/random/mersenne_twister.hpp> | |
18 | #include <boost/random/uniform_int.hpp> | |
19 | #include <boost/random/binomial_distribution.hpp> | |
20 | #include <gtest/gtest.h> | |
21 | #include "common/ceph_argparse.h" | |
22 | #include "compressor/AsyncCompressor.h" | |
23 | #include "global/global_init.h" | |
24 | ||
25 | typedef boost::mt11213b gen_type; | |
26 | ||
27 | class AsyncCompressorTest : public ::testing::Test { | |
28 | public: | |
29 | AsyncCompressor *async_compressor; | |
30 | void SetUp() override { | |
31 | cerr << __func__ << " start set up " << std::endl; | |
32 | async_compressor = new AsyncCompressor(g_ceph_context); | |
33 | async_compressor->init(); | |
34 | } | |
35 | void TearDown() override { | |
36 | async_compressor->terminate(); | |
37 | delete async_compressor; | |
38 | } | |
39 | ||
40 | void generate_random_data(bufferlist &bl, uint64_t len = 0) { | |
41 | static const char *base= "znvm,x12399zasdfjkl1209zxcvjlkasjdfljwqelrjzx,cvn,m123#*(@)"; | |
42 | if (!len) { | |
43 | boost::uniform_int<> kb(16, 4096); | |
44 | gen_type rng(time(NULL)); | |
45 | len = kb(rng) * 1024; | |
46 | } | |
47 | ||
48 | while (bl.length() < len) | |
49 | bl.append(base, sizeof(base)-1); | |
50 | } | |
51 | }; | |
52 | ||
53 | TEST_F(AsyncCompressorTest, SimpleTest) { | |
54 | bufferlist compress_data, decompress_data, rawdata; | |
55 | generate_random_data(rawdata, 1<<22); | |
56 | bool finished; | |
57 | uint64_t id = async_compressor->async_compress(rawdata); | |
58 | ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); | |
59 | ASSERT_TRUE(finished == true); | |
60 | id = async_compressor->async_decompress(compress_data); | |
61 | do { | |
62 | ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, false, &finished)); | |
63 | } while (!finished); | |
64 | ASSERT_TRUE(finished == true); | |
65 | ASSERT_TRUE(rawdata.contents_equal(decompress_data)); | |
66 | ASSERT_EQ(-ENOENT, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); | |
67 | } | |
68 | ||
69 | TEST_F(AsyncCompressorTest, GrubWaitTest) { | |
70 | async_compressor->terminate(); | |
71 | bufferlist compress_data, decompress_data, rawdata; | |
72 | generate_random_data(rawdata, 1<<22); | |
73 | bool finished; | |
74 | uint64_t id = async_compressor->async_compress(rawdata); | |
75 | ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); | |
76 | ASSERT_TRUE(finished == true); | |
77 | id = async_compressor->async_decompress(compress_data); | |
78 | ASSERT_EQ(0, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); | |
79 | ASSERT_TRUE(finished == true); | |
80 | ASSERT_TRUE(rawdata.contents_equal(decompress_data)); | |
81 | async_compressor->init(); | |
82 | } | |
83 | ||
84 | TEST_F(AsyncCompressorTest, DecompressInjectTest) { | |
85 | bufferlist compress_data, decompress_data, rawdata; | |
86 | generate_random_data(rawdata, 1<<22); | |
87 | bool finished; | |
88 | uint64_t id = async_compressor->async_compress(rawdata); | |
89 | ASSERT_EQ(0, async_compressor->get_compress_data(id, compress_data, true, &finished)); | |
90 | ASSERT_TRUE(finished == true); | |
91 | char error[] = "asjdfkwejrljqwaelrj"; | |
92 | memcpy(compress_data.c_str()+1024, error, sizeof(error)-1); | |
93 | id = async_compressor->async_decompress(compress_data); | |
94 | ASSERT_EQ(-EIO, async_compressor->get_decompress_data(id, decompress_data, true, &finished)); | |
95 | } | |
96 | ||
97 | class SyntheticWorkload { | |
98 | set<pair<uint64_t, uint64_t> > compress_jobs, decompress_jobs; | |
99 | AsyncCompressor *async_compressor; | |
100 | vector<bufferlist> rand_data, compress_data; | |
101 | gen_type rng; | |
102 | static const uint64_t MAX_INFLIGHT = 128; | |
103 | ||
104 | public: | |
105 | explicit SyntheticWorkload(AsyncCompressor *ac): async_compressor(ac), rng(time(NULL)) { | |
106 | for (int i = 0; i < 100; i++) { | |
107 | bufferlist bl; | |
108 | boost::uniform_int<> u(4096, 1<<24); | |
109 | uint64_t value_len = u(rng); | |
110 | bufferptr bp(value_len); | |
111 | bp.zero(); | |
112 | for (uint64_t j = 0; j < value_len-sizeof(i); ) { | |
113 | memcpy(bp.c_str()+j, &i, sizeof(i)); | |
114 | j += 4096; | |
115 | } | |
116 | ||
117 | bl.append(bp); | |
118 | rand_data.push_back(bl); | |
119 | compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[i]), i)); | |
120 | if (!(i % 10)) cerr << "seeding compress data " << i << std::endl; | |
121 | } | |
122 | compress_data.resize(100); | |
123 | reap(true); | |
124 | } | |
125 | void do_compress() { | |
126 | boost::uniform_int<> u(0, rand_data.size()-1); | |
127 | uint64_t index = u(rng); | |
128 | compress_jobs.insert(make_pair(async_compressor->async_compress(rand_data[index]), index)); | |
129 | } | |
130 | void do_decompress() { | |
131 | boost::uniform_int<> u(0, compress_data.size()-1); | |
132 | uint64_t index = u(rng); | |
133 | if (compress_data[index].length()) | |
134 | decompress_jobs.insert(make_pair(async_compressor->async_decompress(compress_data[index]), index)); | |
135 | } | |
136 | void reap(bool blocking) { | |
137 | bufferlist data; | |
138 | bool finished; | |
139 | set<pair<uint64_t, uint64_t> >::iterator prev; | |
140 | uint64_t c_reap = 0, d_reap = 0; | |
141 | do { | |
142 | for (set<pair<uint64_t, uint64_t> >::iterator it = compress_jobs.begin(); | |
143 | it != compress_jobs.end();) { | |
144 | prev = it; | |
145 | ++it; | |
146 | ASSERT_EQ(0, async_compressor->get_compress_data(prev->first, data, blocking, &finished)); | |
147 | if (finished) { | |
148 | c_reap++; | |
149 | if (compress_data[prev->second].length()) | |
150 | ASSERT_TRUE(compress_data[prev->second].contents_equal(data)); | |
151 | else | |
152 | compress_data[prev->second].swap(data); | |
153 | compress_jobs.erase(prev); | |
154 | } | |
155 | } | |
156 | ||
157 | for (set<pair<uint64_t, uint64_t> >::iterator it = decompress_jobs.begin(); | |
158 | it != decompress_jobs.end();) { | |
159 | prev = it; | |
160 | ++it; | |
161 | ASSERT_EQ(0, async_compressor->get_decompress_data(prev->first, data, blocking, &finished)); | |
162 | if (finished) { | |
163 | d_reap++; | |
164 | ASSERT_TRUE(rand_data[prev->second].contents_equal(data)); | |
165 | decompress_jobs.erase(prev); | |
166 | } | |
167 | } | |
168 | usleep(1000 * 500); | |
169 | } while (compress_jobs.size() + decompress_jobs.size() > MAX_INFLIGHT); | |
170 | cerr << " reap compress jobs " << c_reap << " decompress jobs " << d_reap << std::endl; | |
171 | } | |
172 | void print_internal_state() { | |
173 | cerr << "inlfight compress jobs: " << compress_jobs.size() | |
174 | << " inflight decompress jobs: " << decompress_jobs.size() << std::endl; | |
175 | } | |
176 | bool empty() const { return compress_jobs.empty() && decompress_jobs.empty(); } | |
177 | }; | |
178 | ||
179 | TEST_F(AsyncCompressorTest, SyntheticTest) { | |
180 | SyntheticWorkload test_ac(async_compressor); | |
181 | gen_type rng(time(NULL)); | |
182 | boost::uniform_int<> true_false(0, 99); | |
183 | int val; | |
184 | for (int i = 0; i < 3000; ++i) { | |
185 | if (!(i % 10)) { | |
186 | cerr << "Op " << i << ": "; | |
187 | test_ac.print_internal_state(); | |
188 | } | |
189 | val = true_false(rng); | |
190 | if (val < 45) { | |
191 | test_ac.do_compress(); | |
192 | } else if (val < 95) { | |
193 | test_ac.do_decompress(); | |
194 | } else { | |
195 | test_ac.reap(false); | |
196 | } | |
197 | } | |
198 | while (!test_ac.empty()) { | |
199 | test_ac.reap(false); | |
200 | test_ac.print_internal_state(); | |
201 | usleep(1000*500); | |
202 | } | |
203 | } | |
204 | ||
205 | ||
206 | int main(int argc, char **argv) { | |
207 | vector<const char*> args; | |
208 | argv_to_vec(argc, (const char **)argv, args); | |
209 | ||
210 | auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); | |
211 | common_init_finish(g_ceph_context); | |
212 | ||
213 | const char* env = getenv("CEPH_LIB"); | |
214 | string directory(env ? env : ".libs"); | |
215 | g_conf->set_val("plugin_dir", directory, false); | |
216 | ||
217 | ::testing::InitGoogleTest(&argc, argv); | |
218 | return RUN_ALL_TESTS(); | |
219 | } | |
220 | ||
221 | /* | |
222 | * Local Variables: | |
223 | * compile-command: "cd ../.. ; make -j4 unittest_async_compressor && valgrind --tool=memcheck ./unittest_async_compressor" | |
224 | * End: | |
225 | */ |