]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "test/librados_test_stub/TestRadosClient.h" | |
5 | #include "test/librados_test_stub/TestIoCtxImpl.h" | |
6 | #include "librados/AioCompletionImpl.h" | |
11fdf7f2 | 7 | #include "include/ceph_assert.h" |
7c673cae FG |
8 | #include "common/ceph_json.h" |
9 | #include "common/Finisher.h" | |
f67539c2 TL |
10 | #include "common/async/context_pool.h" |
11 | #include <boost/lexical_cast.hpp> | |
7c673cae FG |
12 | #include <boost/thread.hpp> |
13 | #include <errno.h> | |
14 | ||
31f18b77 | 15 | #include <atomic> |
f67539c2 | 16 | #include <functional> |
11fdf7f2 | 17 | #include <sstream> |
31f18b77 | 18 | |
7c673cae FG |
19 | static int get_concurrency() { |
20 | int concurrency = 0; | |
21 | char *env = getenv("LIBRADOS_CONCURRENCY"); | |
22 | if (env != NULL) { | |
23 | concurrency = atoi(env); | |
24 | } | |
25 | if (concurrency == 0) { | |
26 | concurrency = boost::thread::thread::hardware_concurrency(); | |
27 | } | |
28 | if (concurrency == 0) { | |
29 | concurrency = 1; | |
30 | } | |
31 | return concurrency; | |
32 | } | |
33 | ||
f67539c2 TL |
34 | using namespace std::placeholders; |
35 | ||
7c673cae FG |
36 | namespace librados { |
37 | ||
f67539c2 TL |
38 | namespace { |
39 | ||
40 | const char *config_keys[] = { | |
41 | "librados_thread_count", | |
42 | NULL | |
43 | }; | |
44 | ||
45 | } // anonymous namespace | |
46 | ||
7c673cae | 47 | static void finish_aio_completion(AioCompletionImpl *c, int r) { |
9f95a23c | 48 | c->lock.lock(); |
7c673cae FG |
49 | c->complete = true; |
50 | c->rval = r; | |
9f95a23c | 51 | c->lock.unlock(); |
7c673cae FG |
52 | |
53 | rados_callback_t cb_complete = c->callback_complete; | |
54 | void *cb_complete_arg = c->callback_complete_arg; | |
55 | if (cb_complete) { | |
56 | cb_complete(c, cb_complete_arg); | |
57 | } | |
58 | ||
59 | rados_callback_t cb_safe = c->callback_safe; | |
60 | void *cb_safe_arg = c->callback_safe_arg; | |
61 | if (cb_safe) { | |
62 | cb_safe(c, cb_safe_arg); | |
63 | } | |
64 | ||
9f95a23c | 65 | c->lock.lock(); |
7c673cae FG |
66 | c->callback_complete = NULL; |
67 | c->callback_safe = NULL; | |
9f95a23c | 68 | c->cond.notify_all(); |
7c673cae FG |
69 | c->put_unlock(); |
70 | } | |
71 | ||
72 | class AioFunctionContext : public Context { | |
73 | public: | |
74 | AioFunctionContext(const TestRadosClient::AioFunction &callback, | |
75 | Finisher *finisher, AioCompletionImpl *c) | |
76 | : m_callback(callback), m_finisher(finisher), m_comp(c) | |
77 | { | |
78 | if (m_comp != NULL) { | |
79 | m_comp->get(); | |
80 | } | |
81 | } | |
82 | ||
83 | void finish(int r) override { | |
84 | int ret = m_callback(); | |
85 | if (m_comp != NULL) { | |
86 | if (m_finisher != NULL) { | |
f67539c2 | 87 | m_finisher->queue(new LambdaContext(std::bind( |
7c673cae FG |
88 | &finish_aio_completion, m_comp, ret))); |
89 | } else { | |
90 | finish_aio_completion(m_comp, ret); | |
91 | } | |
92 | } | |
93 | } | |
94 | private: | |
95 | TestRadosClient::AioFunction m_callback; | |
96 | Finisher *m_finisher; | |
97 | AioCompletionImpl *m_comp; | |
98 | }; | |
99 | ||
100 | TestRadosClient::TestRadosClient(CephContext *cct, | |
101 | TestWatchNotify *watch_notify) | |
102 | : m_cct(cct->get()), m_watch_notify(watch_notify), | |
f67539c2 TL |
103 | m_aio_finisher(new Finisher(m_cct)), |
104 | m_io_context_pool(std::make_unique<ceph::async::io_context_pool>()) | |
7c673cae FG |
105 | { |
106 | get(); | |
107 | ||
108 | // simulate multiple OSDs | |
109 | int concurrency = get_concurrency(); | |
110 | for (int i = 0; i < concurrency; ++i) { | |
111 | m_finishers.push_back(new Finisher(m_cct)); | |
112 | m_finishers.back()->start(); | |
113 | } | |
114 | ||
115 | // replicate AIO callback processing | |
116 | m_aio_finisher->start(); | |
f67539c2 TL |
117 | |
118 | // replicate neorados callback processing | |
119 | m_cct->_conf.add_observer(this); | |
120 | m_io_context_pool->start(m_cct->_conf.get_val<uint64_t>( | |
121 | "librados_thread_count")); | |
7c673cae FG |
122 | } |
123 | ||
124 | TestRadosClient::~TestRadosClient() { | |
125 | flush_aio_operations(); | |
126 | ||
127 | for (size_t i = 0; i < m_finishers.size(); ++i) { | |
128 | m_finishers[i]->stop(); | |
129 | delete m_finishers[i]; | |
130 | } | |
131 | m_aio_finisher->stop(); | |
132 | delete m_aio_finisher; | |
133 | ||
f67539c2 TL |
134 | m_cct->_conf.remove_observer(this); |
135 | m_io_context_pool->stop(); | |
136 | ||
7c673cae FG |
137 | m_cct->put(); |
138 | m_cct = NULL; | |
139 | } | |
140 | ||
f67539c2 TL |
141 | boost::asio::io_context& TestRadosClient::get_io_context() { |
142 | return m_io_context_pool->get_io_context(); | |
143 | } | |
144 | ||
145 | const char** TestRadosClient::get_tracked_conf_keys() const { | |
146 | return config_keys; | |
147 | } | |
148 | ||
149 | void TestRadosClient::handle_conf_change( | |
150 | const ConfigProxy& conf, const std::set<std::string> &changed) { | |
151 | if (changed.count("librados_thread_count")) { | |
152 | m_io_context_pool->stop(); | |
153 | m_io_context_pool->start(conf.get_val<std::uint64_t>( | |
154 | "librados_thread_count")); | |
155 | } | |
156 | } | |
157 | ||
7c673cae | 158 | void TestRadosClient::get() { |
31f18b77 | 159 | m_refcount++; |
7c673cae FG |
160 | } |
161 | ||
162 | void TestRadosClient::put() { | |
31f18b77 | 163 | if (--m_refcount == 0) { |
7c673cae FG |
164 | shutdown(); |
165 | delete this; | |
166 | } | |
167 | } | |
168 | ||
169 | CephContext *TestRadosClient::cct() { | |
170 | return m_cct; | |
171 | } | |
172 | ||
173 | int TestRadosClient::connect() { | |
174 | return 0; | |
175 | } | |
176 | ||
177 | void TestRadosClient::shutdown() { | |
178 | } | |
179 | ||
180 | int TestRadosClient::wait_for_latest_osdmap() { | |
181 | return 0; | |
182 | } | |
183 | ||
184 | int TestRadosClient::mon_command(const std::vector<std::string>& cmd, | |
185 | const bufferlist &inbl, | |
186 | bufferlist *outbl, std::string *outs) { | |
187 | for (std::vector<std::string>::const_iterator it = cmd.begin(); | |
188 | it != cmd.end(); ++it) { | |
189 | JSONParser parser; | |
190 | if (!parser.parse(it->c_str(), it->length())) { | |
191 | return -EINVAL; | |
192 | } | |
193 | ||
194 | JSONObjIter j_it = parser.find("prefix"); | |
195 | if (j_it.end()) { | |
196 | return -EINVAL; | |
197 | } | |
198 | ||
199 | if ((*j_it)->get_data() == "osd tier add") { | |
200 | return 0; | |
201 | } else if ((*j_it)->get_data() == "osd tier cache-mode") { | |
202 | return 0; | |
203 | } else if ((*j_it)->get_data() == "osd tier set-overlay") { | |
204 | return 0; | |
205 | } else if ((*j_it)->get_data() == "osd tier remove-overlay") { | |
206 | return 0; | |
207 | } else if ((*j_it)->get_data() == "osd tier remove") { | |
208 | return 0; | |
11fdf7f2 TL |
209 | } else if ((*j_it)->get_data() == "config-key rm") { |
210 | return 0; | |
f6b5b4d7 TL |
211 | } else if ((*j_it)->get_data() == "config set") { |
212 | return 0; | |
11fdf7f2 TL |
213 | } else if ((*j_it)->get_data() == "df") { |
214 | std::stringstream str; | |
215 | str << R"({"pools": [)"; | |
216 | ||
217 | std::list<std::pair<int64_t, std::string>> pools; | |
218 | pool_list(pools); | |
219 | for (auto& pool : pools) { | |
220 | if (pools.begin()->first != pool.first) { | |
221 | str << ","; | |
222 | } | |
223 | str << R"({"name": ")" << pool.second << R"(", "stats": )" | |
224 | << R"({"percent_used": 1.0, "bytes_used": 0, "max_avail": 0}})"; | |
225 | } | |
226 | ||
227 | str << "]}"; | |
228 | outbl->append(str.str()); | |
229 | return 0; | |
f67539c2 TL |
230 | } else if ((*j_it)->get_data() == "osd blocklist") { |
231 | auto op_it = parser.find("blocklistop"); | |
232 | if (!op_it.end() && (*op_it)->get_data() == "add") { | |
233 | uint32_t expire = 0; | |
234 | auto expire_it = parser.find("expire"); | |
235 | if (!expire_it.end()) { | |
236 | expire = boost::lexical_cast<uint32_t>((*expire_it)->get_data()); | |
237 | } | |
238 | ||
239 | auto addr_it = parser.find("addr"); | |
240 | return blocklist_add((*addr_it)->get_data(), expire); | |
241 | } | |
7c673cae FG |
242 | } |
243 | } | |
244 | return -ENOSYS; | |
245 | } | |
246 | ||
247 | void TestRadosClient::add_aio_operation(const std::string& oid, | |
248 | bool queue_callback, | |
249 | const AioFunction &aio_function, | |
250 | AioCompletionImpl *c) { | |
251 | AioFunctionContext *ctx = new AioFunctionContext( | |
252 | aio_function, queue_callback ? m_aio_finisher : NULL, c); | |
253 | get_finisher(oid)->queue(ctx); | |
254 | } | |
255 | ||
256 | struct WaitForFlush { | |
257 | int flushed() { | |
31f18b77 | 258 | if (--count == 0) { |
f67539c2 | 259 | aio_finisher->queue(new LambdaContext(std::bind( |
7c673cae FG |
260 | &finish_aio_completion, c, 0))); |
261 | delete this; | |
262 | } | |
263 | return 0; | |
264 | } | |
265 | ||
31f18b77 | 266 | std::atomic<int64_t> count = { 0 }; |
7c673cae FG |
267 | Finisher *aio_finisher; |
268 | AioCompletionImpl *c; | |
269 | }; | |
270 | ||
271 | void TestRadosClient::flush_aio_operations() { | |
272 | AioCompletionImpl *comp = new AioCompletionImpl(); | |
273 | flush_aio_operations(comp); | |
9f95a23c | 274 | comp->wait_for_complete(); |
7c673cae FG |
275 | comp->put(); |
276 | } | |
277 | ||
278 | void TestRadosClient::flush_aio_operations(AioCompletionImpl *c) { | |
279 | c->get(); | |
280 | ||
281 | WaitForFlush *wait_for_flush = new WaitForFlush(); | |
31f18b77 | 282 | wait_for_flush->count = m_finishers.size(); |
7c673cae FG |
283 | wait_for_flush->aio_finisher = m_aio_finisher; |
284 | wait_for_flush->c = c; | |
285 | ||
286 | for (size_t i = 0; i < m_finishers.size(); ++i) { | |
287 | AioFunctionContext *ctx = new AioFunctionContext( | |
f67539c2 | 288 | std::bind(&WaitForFlush::flushed, wait_for_flush), |
7c673cae FG |
289 | nullptr, nullptr); |
290 | m_finishers[i]->queue(ctx); | |
291 | } | |
292 | } | |
293 | ||
294 | int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) { | |
295 | c->get(); | |
f67539c2 TL |
296 | Context *ctx = new LambdaContext(std::bind( |
297 | &TestRadosClient::finish_aio_completion, this, c, std::placeholders::_1)); | |
7c673cae FG |
298 | get_watch_notify()->aio_flush(this, ctx); |
299 | return 0; | |
300 | } | |
301 | ||
302 | void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) { | |
303 | librados::finish_aio_completion(c, r); | |
304 | } | |
305 | ||
306 | Finisher *TestRadosClient::get_finisher(const std::string &oid) { | |
307 | std::size_t h = m_hash(oid); | |
308 | return m_finishers[h % m_finishers.size()]; | |
309 | } | |
310 | ||
311 | } // namespace librados |