]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/librados_test_stub/TestRadosClient.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / librados_test_stub / TestRadosClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "test/librados_test_stub/TestRadosClient.h"
5 #include "test/librados_test_stub/TestIoCtxImpl.h"
6 #include "librados/AioCompletionImpl.h"
7 #include "include/ceph_assert.h"
8 #include "common/ceph_json.h"
9 #include "common/Finisher.h"
10 #include "common/async/context_pool.h"
11 #include <boost/lexical_cast.hpp>
12 #include <boost/thread.hpp>
13 #include <errno.h>
14
15 #include <atomic>
16 #include <functional>
17 #include <sstream>
18
19 static int get_concurrency() {
20 int concurrency = 0;
21 char *env = getenv("LIBRADOS_CONCURRENCY");
22 if (env != NULL) {
23 concurrency = atoi(env);
24 }
25 if (concurrency == 0) {
26 concurrency = boost::thread::thread::hardware_concurrency();
27 }
28 if (concurrency == 0) {
29 concurrency = 1;
30 }
31 return concurrency;
32 }
33
34 using namespace std::placeholders;
35
36 namespace librados {
37
38 namespace {
39
40 const char *config_keys[] = {
41 "librados_thread_count",
42 NULL
43 };
44
45 } // anonymous namespace
46
47 static void finish_aio_completion(AioCompletionImpl *c, int r) {
48 c->lock.lock();
49 c->complete = true;
50 c->rval = r;
51 c->lock.unlock();
52
53 rados_callback_t cb_complete = c->callback_complete;
54 void *cb_complete_arg = c->callback_complete_arg;
55 if (cb_complete) {
56 cb_complete(c, cb_complete_arg);
57 }
58
59 rados_callback_t cb_safe = c->callback_safe;
60 void *cb_safe_arg = c->callback_safe_arg;
61 if (cb_safe) {
62 cb_safe(c, cb_safe_arg);
63 }
64
65 c->lock.lock();
66 c->callback_complete = NULL;
67 c->callback_safe = NULL;
68 c->cond.notify_all();
69 c->put_unlock();
70 }
71
72 class AioFunctionContext : public Context {
73 public:
74 AioFunctionContext(const TestRadosClient::AioFunction &callback,
75 Finisher *finisher, AioCompletionImpl *c)
76 : m_callback(callback), m_finisher(finisher), m_comp(c)
77 {
78 if (m_comp != NULL) {
79 m_comp->get();
80 }
81 }
82
83 void finish(int r) override {
84 int ret = m_callback();
85 if (m_comp != NULL) {
86 if (m_finisher != NULL) {
87 m_finisher->queue(new LambdaContext(std::bind(
88 &finish_aio_completion, m_comp, ret)));
89 } else {
90 finish_aio_completion(m_comp, ret);
91 }
92 }
93 }
94 private:
95 TestRadosClient::AioFunction m_callback;
96 Finisher *m_finisher;
97 AioCompletionImpl *m_comp;
98 };
99
100 TestRadosClient::TestRadosClient(CephContext *cct,
101 TestWatchNotify *watch_notify)
102 : m_cct(cct->get()), m_watch_notify(watch_notify),
103 m_aio_finisher(new Finisher(m_cct)),
104 m_io_context_pool(std::make_unique<ceph::async::io_context_pool>())
105 {
106 get();
107
108 // simulate multiple OSDs
109 int concurrency = get_concurrency();
110 for (int i = 0; i < concurrency; ++i) {
111 m_finishers.push_back(new Finisher(m_cct));
112 m_finishers.back()->start();
113 }
114
115 // replicate AIO callback processing
116 m_aio_finisher->start();
117
118 // replicate neorados callback processing
119 m_cct->_conf.add_observer(this);
120 m_io_context_pool->start(m_cct->_conf.get_val<uint64_t>(
121 "librados_thread_count"));
122 }
123
124 TestRadosClient::~TestRadosClient() {
125 flush_aio_operations();
126
127 for (size_t i = 0; i < m_finishers.size(); ++i) {
128 m_finishers[i]->stop();
129 delete m_finishers[i];
130 }
131 m_aio_finisher->stop();
132 delete m_aio_finisher;
133
134 m_cct->_conf.remove_observer(this);
135 m_io_context_pool->stop();
136
137 m_cct->put();
138 m_cct = NULL;
139 }
140
141 boost::asio::io_context& TestRadosClient::get_io_context() {
142 return m_io_context_pool->get_io_context();
143 }
144
145 const char** TestRadosClient::get_tracked_conf_keys() const {
146 return config_keys;
147 }
148
149 void TestRadosClient::handle_conf_change(
150 const ConfigProxy& conf, const std::set<std::string> &changed) {
151 if (changed.count("librados_thread_count")) {
152 m_io_context_pool->stop();
153 m_io_context_pool->start(conf.get_val<std::uint64_t>(
154 "librados_thread_count"));
155 }
156 }
157
158 void TestRadosClient::get() {
159 m_refcount++;
160 }
161
162 void TestRadosClient::put() {
163 if (--m_refcount == 0) {
164 shutdown();
165 delete this;
166 }
167 }
168
169 CephContext *TestRadosClient::cct() {
170 return m_cct;
171 }
172
173 int TestRadosClient::connect() {
174 return 0;
175 }
176
177 void TestRadosClient::shutdown() {
178 }
179
180 int TestRadosClient::wait_for_latest_osdmap() {
181 return 0;
182 }
183
184 int TestRadosClient::mon_command(const std::vector<std::string>& cmd,
185 const bufferlist &inbl,
186 bufferlist *outbl, std::string *outs) {
187 for (std::vector<std::string>::const_iterator it = cmd.begin();
188 it != cmd.end(); ++it) {
189 JSONParser parser;
190 if (!parser.parse(it->c_str(), it->length())) {
191 return -EINVAL;
192 }
193
194 JSONObjIter j_it = parser.find("prefix");
195 if (j_it.end()) {
196 return -EINVAL;
197 }
198
199 if ((*j_it)->get_data() == "osd tier add") {
200 return 0;
201 } else if ((*j_it)->get_data() == "osd tier cache-mode") {
202 return 0;
203 } else if ((*j_it)->get_data() == "osd tier set-overlay") {
204 return 0;
205 } else if ((*j_it)->get_data() == "osd tier remove-overlay") {
206 return 0;
207 } else if ((*j_it)->get_data() == "osd tier remove") {
208 return 0;
209 } else if ((*j_it)->get_data() == "config-key rm") {
210 return 0;
211 } else if ((*j_it)->get_data() == "config set") {
212 return 0;
213 } else if ((*j_it)->get_data() == "df") {
214 std::stringstream str;
215 str << R"({"pools": [)";
216
217 std::list<std::pair<int64_t, std::string>> pools;
218 pool_list(pools);
219 for (auto& pool : pools) {
220 if (pools.begin()->first != pool.first) {
221 str << ",";
222 }
223 str << R"({"name": ")" << pool.second << R"(", "stats": )"
224 << R"({"percent_used": 1.0, "bytes_used": 0, "max_avail": 0}})";
225 }
226
227 str << "]}";
228 outbl->append(str.str());
229 return 0;
230 } else if ((*j_it)->get_data() == "osd blocklist") {
231 auto op_it = parser.find("blocklistop");
232 if (!op_it.end() && (*op_it)->get_data() == "add") {
233 uint32_t expire = 0;
234 auto expire_it = parser.find("expire");
235 if (!expire_it.end()) {
236 expire = boost::lexical_cast<uint32_t>((*expire_it)->get_data());
237 }
238
239 auto addr_it = parser.find("addr");
240 return blocklist_add((*addr_it)->get_data(), expire);
241 }
242 }
243 }
244 return -ENOSYS;
245 }
246
247 void TestRadosClient::add_aio_operation(const std::string& oid,
248 bool queue_callback,
249 const AioFunction &aio_function,
250 AioCompletionImpl *c) {
251 AioFunctionContext *ctx = new AioFunctionContext(
252 aio_function, queue_callback ? m_aio_finisher : NULL, c);
253 get_finisher(oid)->queue(ctx);
254 }
255
256 struct WaitForFlush {
257 int flushed() {
258 if (--count == 0) {
259 aio_finisher->queue(new LambdaContext(std::bind(
260 &finish_aio_completion, c, 0)));
261 delete this;
262 }
263 return 0;
264 }
265
266 std::atomic<int64_t> count = { 0 };
267 Finisher *aio_finisher;
268 AioCompletionImpl *c;
269 };
270
271 void TestRadosClient::flush_aio_operations() {
272 AioCompletionImpl *comp = new AioCompletionImpl();
273 flush_aio_operations(comp);
274 comp->wait_for_complete();
275 comp->put();
276 }
277
278 void TestRadosClient::flush_aio_operations(AioCompletionImpl *c) {
279 c->get();
280
281 WaitForFlush *wait_for_flush = new WaitForFlush();
282 wait_for_flush->count = m_finishers.size();
283 wait_for_flush->aio_finisher = m_aio_finisher;
284 wait_for_flush->c = c;
285
286 for (size_t i = 0; i < m_finishers.size(); ++i) {
287 AioFunctionContext *ctx = new AioFunctionContext(
288 std::bind(&WaitForFlush::flushed, wait_for_flush),
289 nullptr, nullptr);
290 m_finishers[i]->queue(ctx);
291 }
292 }
293
294 int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) {
295 c->get();
296 Context *ctx = new LambdaContext(std::bind(
297 &TestRadosClient::finish_aio_completion, this, c, std::placeholders::_1));
298 get_watch_notify()->aio_flush(this, ctx);
299 return 0;
300 }
301
302 void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) {
303 librados::finish_aio_completion(c, r);
304 }
305
306 Finisher *TestRadosClient::get_finisher(const std::string &oid) {
307 std::size_t h = m_hash(oid);
308 return m_finishers[h % m_finishers.size()];
309 }
310
311 } // namespace librados