]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/librados_test_stub/TestRadosClient.cc
2d75a4b2aa9390cf9141e5ab803afefef0755602
[ceph.git] / ceph / src / test / librados_test_stub / TestRadosClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "test/librados_test_stub/TestRadosClient.h"
5 #include "test/librados_test_stub/TestIoCtxImpl.h"
6 #include "librados/AioCompletionImpl.h"
7 #include "include/ceph_assert.h"
8 #include "common/ceph_json.h"
9 #include "common/Finisher.h"
10 #include <boost/bind.hpp>
11 #include <boost/thread.hpp>
12 #include <errno.h>
13
14 #include <atomic>
15 #include <sstream>
16
17 static int get_concurrency() {
18 int concurrency = 0;
19 char *env = getenv("LIBRADOS_CONCURRENCY");
20 if (env != NULL) {
21 concurrency = atoi(env);
22 }
23 if (concurrency == 0) {
24 concurrency = boost::thread::thread::hardware_concurrency();
25 }
26 if (concurrency == 0) {
27 concurrency = 1;
28 }
29 return concurrency;
30 }
31
32 namespace librados {
33
34 static void finish_aio_completion(AioCompletionImpl *c, int r) {
35 c->lock.lock();
36 c->complete = true;
37 c->rval = r;
38 c->lock.unlock();
39
40 rados_callback_t cb_complete = c->callback_complete;
41 void *cb_complete_arg = c->callback_complete_arg;
42 if (cb_complete) {
43 cb_complete(c, cb_complete_arg);
44 }
45
46 rados_callback_t cb_safe = c->callback_safe;
47 void *cb_safe_arg = c->callback_safe_arg;
48 if (cb_safe) {
49 cb_safe(c, cb_safe_arg);
50 }
51
52 c->lock.lock();
53 c->callback_complete = NULL;
54 c->callback_safe = NULL;
55 c->cond.notify_all();
56 c->put_unlock();
57 }
58
59 class AioFunctionContext : public Context {
60 public:
61 AioFunctionContext(const TestRadosClient::AioFunction &callback,
62 Finisher *finisher, AioCompletionImpl *c)
63 : m_callback(callback), m_finisher(finisher), m_comp(c)
64 {
65 if (m_comp != NULL) {
66 m_comp->get();
67 }
68 }
69
70 void finish(int r) override {
71 int ret = m_callback();
72 if (m_comp != NULL) {
73 if (m_finisher != NULL) {
74 m_finisher->queue(new LambdaContext(boost::bind(
75 &finish_aio_completion, m_comp, ret)));
76 } else {
77 finish_aio_completion(m_comp, ret);
78 }
79 }
80 }
81 private:
82 TestRadosClient::AioFunction m_callback;
83 Finisher *m_finisher;
84 AioCompletionImpl *m_comp;
85 };
86
87 TestRadosClient::TestRadosClient(CephContext *cct,
88 TestWatchNotify *watch_notify)
89 : m_cct(cct->get()), m_watch_notify(watch_notify),
90 m_aio_finisher(new Finisher(m_cct))
91 {
92 get();
93
94 // simulate multiple OSDs
95 int concurrency = get_concurrency();
96 for (int i = 0; i < concurrency; ++i) {
97 m_finishers.push_back(new Finisher(m_cct));
98 m_finishers.back()->start();
99 }
100
101 // replicate AIO callback processing
102 m_aio_finisher->start();
103 }
104
105 TestRadosClient::~TestRadosClient() {
106 flush_aio_operations();
107
108 for (size_t i = 0; i < m_finishers.size(); ++i) {
109 m_finishers[i]->stop();
110 delete m_finishers[i];
111 }
112 m_aio_finisher->stop();
113 delete m_aio_finisher;
114
115 m_cct->put();
116 m_cct = NULL;
117 }
118
119 void TestRadosClient::get() {
120 m_refcount++;
121 }
122
123 void TestRadosClient::put() {
124 if (--m_refcount == 0) {
125 shutdown();
126 delete this;
127 }
128 }
129
130 CephContext *TestRadosClient::cct() {
131 return m_cct;
132 }
133
134 int TestRadosClient::connect() {
135 return 0;
136 }
137
138 void TestRadosClient::shutdown() {
139 }
140
141 int TestRadosClient::wait_for_latest_osdmap() {
142 return 0;
143 }
144
145 int TestRadosClient::mon_command(const std::vector<std::string>& cmd,
146 const bufferlist &inbl,
147 bufferlist *outbl, std::string *outs) {
148 for (std::vector<std::string>::const_iterator it = cmd.begin();
149 it != cmd.end(); ++it) {
150 JSONParser parser;
151 if (!parser.parse(it->c_str(), it->length())) {
152 return -EINVAL;
153 }
154
155 JSONObjIter j_it = parser.find("prefix");
156 if (j_it.end()) {
157 return -EINVAL;
158 }
159
160 if ((*j_it)->get_data() == "osd tier add") {
161 return 0;
162 } else if ((*j_it)->get_data() == "osd tier cache-mode") {
163 return 0;
164 } else if ((*j_it)->get_data() == "osd tier set-overlay") {
165 return 0;
166 } else if ((*j_it)->get_data() == "osd tier remove-overlay") {
167 return 0;
168 } else if ((*j_it)->get_data() == "osd tier remove") {
169 return 0;
170 } else if ((*j_it)->get_data() == "config-key rm") {
171 return 0;
172 } else if ((*j_it)->get_data() == "df") {
173 std::stringstream str;
174 str << R"({"pools": [)";
175
176 std::list<std::pair<int64_t, std::string>> pools;
177 pool_list(pools);
178 for (auto& pool : pools) {
179 if (pools.begin()->first != pool.first) {
180 str << ",";
181 }
182 str << R"({"name": ")" << pool.second << R"(", "stats": )"
183 << R"({"percent_used": 1.0, "bytes_used": 0, "max_avail": 0}})";
184 }
185
186 str << "]}";
187 outbl->append(str.str());
188 return 0;
189 }
190 }
191 return -ENOSYS;
192 }
193
194 void TestRadosClient::add_aio_operation(const std::string& oid,
195 bool queue_callback,
196 const AioFunction &aio_function,
197 AioCompletionImpl *c) {
198 AioFunctionContext *ctx = new AioFunctionContext(
199 aio_function, queue_callback ? m_aio_finisher : NULL, c);
200 get_finisher(oid)->queue(ctx);
201 }
202
203 struct WaitForFlush {
204 int flushed() {
205 if (--count == 0) {
206 aio_finisher->queue(new LambdaContext(boost::bind(
207 &finish_aio_completion, c, 0)));
208 delete this;
209 }
210 return 0;
211 }
212
213 std::atomic<int64_t> count = { 0 };
214 Finisher *aio_finisher;
215 AioCompletionImpl *c;
216 };
217
218 void TestRadosClient::flush_aio_operations() {
219 AioCompletionImpl *comp = new AioCompletionImpl();
220 flush_aio_operations(comp);
221 comp->wait_for_complete();
222 comp->put();
223 }
224
225 void TestRadosClient::flush_aio_operations(AioCompletionImpl *c) {
226 c->get();
227
228 WaitForFlush *wait_for_flush = new WaitForFlush();
229 wait_for_flush->count = m_finishers.size();
230 wait_for_flush->aio_finisher = m_aio_finisher;
231 wait_for_flush->c = c;
232
233 for (size_t i = 0; i < m_finishers.size(); ++i) {
234 AioFunctionContext *ctx = new AioFunctionContext(
235 boost::bind(&WaitForFlush::flushed, wait_for_flush),
236 nullptr, nullptr);
237 m_finishers[i]->queue(ctx);
238 }
239 }
240
241 int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) {
242 c->get();
243 Context *ctx = new LambdaContext(boost::bind(
244 &TestRadosClient::finish_aio_completion, this, c, _1));
245 get_watch_notify()->aio_flush(this, ctx);
246 return 0;
247 }
248
249 void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) {
250 librados::finish_aio_completion(c, r);
251 }
252
253 Finisher *TestRadosClient::get_finisher(const std::string &oid) {
254 std::size_t h = m_hash(oid);
255 return m_finishers[h % m_finishers.size()];
256 }
257
258 } // namespace librados