]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/librados_test_stub/TestRadosClient.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / librados_test_stub / TestRadosClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "test/librados_test_stub/TestRadosClient.h"
5#include "test/librados_test_stub/TestIoCtxImpl.h"
6#include "librados/AioCompletionImpl.h"
11fdf7f2 7#include "include/ceph_assert.h"
7c673cae
FG
8#include "common/ceph_json.h"
9#include "common/Finisher.h"
f67539c2
TL
10#include "common/async/context_pool.h"
11#include <boost/lexical_cast.hpp>
7c673cae
FG
12#include <boost/thread.hpp>
13#include <errno.h>
14
31f18b77 15#include <atomic>
f67539c2 16#include <functional>
11fdf7f2 17#include <sstream>
31f18b77 18
7c673cae
FG
19static int get_concurrency() {
20 int concurrency = 0;
21 char *env = getenv("LIBRADOS_CONCURRENCY");
22 if (env != NULL) {
23 concurrency = atoi(env);
24 }
25 if (concurrency == 0) {
26 concurrency = boost::thread::thread::hardware_concurrency();
27 }
28 if (concurrency == 0) {
29 concurrency = 1;
30 }
31 return concurrency;
32}
33
f67539c2
TL
34using namespace std::placeholders;
35
7c673cae
FG
36namespace librados {
37
f67539c2
TL
38namespace {
39
40const char *config_keys[] = {
41 "librados_thread_count",
42 NULL
43};
44
45} // anonymous namespace
46
7c673cae 47static void finish_aio_completion(AioCompletionImpl *c, int r) {
9f95a23c 48 c->lock.lock();
7c673cae
FG
49 c->complete = true;
50 c->rval = r;
9f95a23c 51 c->lock.unlock();
7c673cae
FG
52
53 rados_callback_t cb_complete = c->callback_complete;
54 void *cb_complete_arg = c->callback_complete_arg;
55 if (cb_complete) {
56 cb_complete(c, cb_complete_arg);
57 }
58
59 rados_callback_t cb_safe = c->callback_safe;
60 void *cb_safe_arg = c->callback_safe_arg;
61 if (cb_safe) {
62 cb_safe(c, cb_safe_arg);
63 }
64
9f95a23c 65 c->lock.lock();
7c673cae
FG
66 c->callback_complete = NULL;
67 c->callback_safe = NULL;
9f95a23c 68 c->cond.notify_all();
7c673cae
FG
69 c->put_unlock();
70}
71
72class AioFunctionContext : public Context {
73public:
74 AioFunctionContext(const TestRadosClient::AioFunction &callback,
75 Finisher *finisher, AioCompletionImpl *c)
76 : m_callback(callback), m_finisher(finisher), m_comp(c)
77 {
78 if (m_comp != NULL) {
79 m_comp->get();
80 }
81 }
82
83 void finish(int r) override {
84 int ret = m_callback();
85 if (m_comp != NULL) {
86 if (m_finisher != NULL) {
f67539c2 87 m_finisher->queue(new LambdaContext(std::bind(
7c673cae
FG
88 &finish_aio_completion, m_comp, ret)));
89 } else {
90 finish_aio_completion(m_comp, ret);
91 }
92 }
93 }
94private:
95 TestRadosClient::AioFunction m_callback;
96 Finisher *m_finisher;
97 AioCompletionImpl *m_comp;
98};
99
100TestRadosClient::TestRadosClient(CephContext *cct,
101 TestWatchNotify *watch_notify)
102 : m_cct(cct->get()), m_watch_notify(watch_notify),
f67539c2
TL
103 m_aio_finisher(new Finisher(m_cct)),
104 m_io_context_pool(std::make_unique<ceph::async::io_context_pool>())
7c673cae
FG
105{
106 get();
107
108 // simulate multiple OSDs
109 int concurrency = get_concurrency();
110 for (int i = 0; i < concurrency; ++i) {
111 m_finishers.push_back(new Finisher(m_cct));
112 m_finishers.back()->start();
113 }
114
115 // replicate AIO callback processing
116 m_aio_finisher->start();
f67539c2
TL
117
118 // replicate neorados callback processing
119 m_cct->_conf.add_observer(this);
120 m_io_context_pool->start(m_cct->_conf.get_val<uint64_t>(
121 "librados_thread_count"));
7c673cae
FG
122}
123
124TestRadosClient::~TestRadosClient() {
125 flush_aio_operations();
126
127 for (size_t i = 0; i < m_finishers.size(); ++i) {
128 m_finishers[i]->stop();
129 delete m_finishers[i];
130 }
131 m_aio_finisher->stop();
132 delete m_aio_finisher;
133
f67539c2
TL
134 m_cct->_conf.remove_observer(this);
135 m_io_context_pool->stop();
136
7c673cae
FG
137 m_cct->put();
138 m_cct = NULL;
139}
140
f67539c2
TL
141boost::asio::io_context& TestRadosClient::get_io_context() {
142 return m_io_context_pool->get_io_context();
143}
144
145const char** TestRadosClient::get_tracked_conf_keys() const {
146 return config_keys;
147}
148
149void TestRadosClient::handle_conf_change(
150 const ConfigProxy& conf, const std::set<std::string> &changed) {
151 if (changed.count("librados_thread_count")) {
152 m_io_context_pool->stop();
153 m_io_context_pool->start(conf.get_val<std::uint64_t>(
154 "librados_thread_count"));
155 }
156}
157
7c673cae 158void TestRadosClient::get() {
31f18b77 159 m_refcount++;
7c673cae
FG
160}
161
162void TestRadosClient::put() {
31f18b77 163 if (--m_refcount == 0) {
7c673cae
FG
164 shutdown();
165 delete this;
166 }
167}
168
169CephContext *TestRadosClient::cct() {
170 return m_cct;
171}
172
173int TestRadosClient::connect() {
174 return 0;
175}
176
177void TestRadosClient::shutdown() {
178}
179
180int TestRadosClient::wait_for_latest_osdmap() {
181 return 0;
182}
183
184int TestRadosClient::mon_command(const std::vector<std::string>& cmd,
185 const bufferlist &inbl,
186 bufferlist *outbl, std::string *outs) {
187 for (std::vector<std::string>::const_iterator it = cmd.begin();
188 it != cmd.end(); ++it) {
189 JSONParser parser;
190 if (!parser.parse(it->c_str(), it->length())) {
191 return -EINVAL;
192 }
193
194 JSONObjIter j_it = parser.find("prefix");
195 if (j_it.end()) {
196 return -EINVAL;
197 }
198
199 if ((*j_it)->get_data() == "osd tier add") {
200 return 0;
201 } else if ((*j_it)->get_data() == "osd tier cache-mode") {
202 return 0;
203 } else if ((*j_it)->get_data() == "osd tier set-overlay") {
204 return 0;
205 } else if ((*j_it)->get_data() == "osd tier remove-overlay") {
206 return 0;
207 } else if ((*j_it)->get_data() == "osd tier remove") {
208 return 0;
11fdf7f2
TL
209 } else if ((*j_it)->get_data() == "config-key rm") {
210 return 0;
f6b5b4d7
TL
211 } else if ((*j_it)->get_data() == "config set") {
212 return 0;
11fdf7f2
TL
213 } else if ((*j_it)->get_data() == "df") {
214 std::stringstream str;
215 str << R"({"pools": [)";
216
217 std::list<std::pair<int64_t, std::string>> pools;
218 pool_list(pools);
219 for (auto& pool : pools) {
220 if (pools.begin()->first != pool.first) {
221 str << ",";
222 }
223 str << R"({"name": ")" << pool.second << R"(", "stats": )"
224 << R"({"percent_used": 1.0, "bytes_used": 0, "max_avail": 0}})";
225 }
226
227 str << "]}";
228 outbl->append(str.str());
229 return 0;
f67539c2
TL
230 } else if ((*j_it)->get_data() == "osd blocklist") {
231 auto op_it = parser.find("blocklistop");
232 if (!op_it.end() && (*op_it)->get_data() == "add") {
233 uint32_t expire = 0;
234 auto expire_it = parser.find("expire");
235 if (!expire_it.end()) {
236 expire = boost::lexical_cast<uint32_t>((*expire_it)->get_data());
237 }
238
239 auto addr_it = parser.find("addr");
240 return blocklist_add((*addr_it)->get_data(), expire);
241 }
7c673cae
FG
242 }
243 }
244 return -ENOSYS;
245}
246
247void TestRadosClient::add_aio_operation(const std::string& oid,
248 bool queue_callback,
249 const AioFunction &aio_function,
250 AioCompletionImpl *c) {
251 AioFunctionContext *ctx = new AioFunctionContext(
252 aio_function, queue_callback ? m_aio_finisher : NULL, c);
253 get_finisher(oid)->queue(ctx);
254}
255
256struct WaitForFlush {
257 int flushed() {
31f18b77 258 if (--count == 0) {
f67539c2 259 aio_finisher->queue(new LambdaContext(std::bind(
7c673cae
FG
260 &finish_aio_completion, c, 0)));
261 delete this;
262 }
263 return 0;
264 }
265
31f18b77 266 std::atomic<int64_t> count = { 0 };
7c673cae
FG
267 Finisher *aio_finisher;
268 AioCompletionImpl *c;
269};
270
271void TestRadosClient::flush_aio_operations() {
272 AioCompletionImpl *comp = new AioCompletionImpl();
273 flush_aio_operations(comp);
9f95a23c 274 comp->wait_for_complete();
7c673cae
FG
275 comp->put();
276}
277
278void TestRadosClient::flush_aio_operations(AioCompletionImpl *c) {
279 c->get();
280
281 WaitForFlush *wait_for_flush = new WaitForFlush();
31f18b77 282 wait_for_flush->count = m_finishers.size();
7c673cae
FG
283 wait_for_flush->aio_finisher = m_aio_finisher;
284 wait_for_flush->c = c;
285
286 for (size_t i = 0; i < m_finishers.size(); ++i) {
287 AioFunctionContext *ctx = new AioFunctionContext(
f67539c2 288 std::bind(&WaitForFlush::flushed, wait_for_flush),
7c673cae
FG
289 nullptr, nullptr);
290 m_finishers[i]->queue(ctx);
291 }
292}
293
294int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) {
295 c->get();
f67539c2
TL
296 Context *ctx = new LambdaContext(std::bind(
297 &TestRadosClient::finish_aio_completion, this, c, std::placeholders::_1));
7c673cae
FG
298 get_watch_notify()->aio_flush(this, ctx);
299 return 0;
300}
301
302void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) {
303 librados::finish_aio_completion(c, r);
304}
305
306Finisher *TestRadosClient::get_finisher(const std::string &oid) {
307 std::size_t h = m_hash(oid);
308 return m_finishers[h % m_finishers.size()];
309}
310
311} // namespace librados