1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "test/librados_test_stub/TestRadosClient.h"
5 #include "test/librados_test_stub/TestIoCtxImpl.h"
6 #include "librados/AioCompletionImpl.h"
7 #include "include/ceph_assert.h"
8 #include "common/ceph_json.h"
9 #include "common/Finisher.h"
10 #include "common/async/context_pool.h"
11 #include <boost/lexical_cast.hpp>
12 #include <boost/thread.hpp>
19 static int get_concurrency() {
21 char *env
= getenv("LIBRADOS_CONCURRENCY");
23 concurrency
= atoi(env
);
25 if (concurrency
== 0) {
26 concurrency
= boost::thread::thread::hardware_concurrency();
28 if (concurrency
== 0) {
34 using namespace std::placeholders
;
40 const char *config_keys
[] = {
41 "librados_thread_count",
45 } // anonymous namespace
47 static void finish_aio_completion(AioCompletionImpl
*c
, int r
) {
53 rados_callback_t cb_complete
= c
->callback_complete
;
54 void *cb_complete_arg
= c
->callback_complete_arg
;
56 cb_complete(c
, cb_complete_arg
);
59 rados_callback_t cb_safe
= c
->callback_safe
;
60 void *cb_safe_arg
= c
->callback_safe_arg
;
62 cb_safe(c
, cb_safe_arg
);
66 c
->callback_complete
= NULL
;
67 c
->callback_safe
= NULL
;
72 class AioFunctionContext
: public Context
{
74 AioFunctionContext(const TestRadosClient::AioFunction
&callback
,
75 Finisher
*finisher
, AioCompletionImpl
*c
)
76 : m_callback(callback
), m_finisher(finisher
), m_comp(c
)
83 void finish(int r
) override
{
84 int ret
= m_callback();
86 if (m_finisher
!= NULL
) {
87 m_finisher
->queue(new LambdaContext(std::bind(
88 &finish_aio_completion
, m_comp
, ret
)));
90 finish_aio_completion(m_comp
, ret
);
95 TestRadosClient::AioFunction m_callback
;
97 AioCompletionImpl
*m_comp
;
100 TestRadosClient::TestRadosClient(CephContext
*cct
,
101 TestWatchNotify
*watch_notify
)
102 : m_cct(cct
->get()), m_watch_notify(watch_notify
),
103 m_aio_finisher(new Finisher(m_cct
)),
104 m_io_context_pool(std::make_unique
<ceph::async::io_context_pool
>())
108 // simulate multiple OSDs
109 int concurrency
= get_concurrency();
110 for (int i
= 0; i
< concurrency
; ++i
) {
111 m_finishers
.push_back(new Finisher(m_cct
));
112 m_finishers
.back()->start();
115 // replicate AIO callback processing
116 m_aio_finisher
->start();
118 // replicate neorados callback processing
119 m_cct
->_conf
.add_observer(this);
120 m_io_context_pool
->start(m_cct
->_conf
.get_val
<uint64_t>(
121 "librados_thread_count"));
124 TestRadosClient::~TestRadosClient() {
125 flush_aio_operations();
127 for (size_t i
= 0; i
< m_finishers
.size(); ++i
) {
128 m_finishers
[i
]->stop();
129 delete m_finishers
[i
];
131 m_aio_finisher
->stop();
132 delete m_aio_finisher
;
134 m_cct
->_conf
.remove_observer(this);
135 m_io_context_pool
->stop();
141 boost::asio::io_context
& TestRadosClient::get_io_context() {
142 return m_io_context_pool
->get_io_context();
145 const char** TestRadosClient::get_tracked_conf_keys() const {
149 void TestRadosClient::handle_conf_change(
150 const ConfigProxy
& conf
, const std::set
<std::string
> &changed
) {
151 if (changed
.count("librados_thread_count")) {
152 m_io_context_pool
->stop();
153 m_io_context_pool
->start(conf
.get_val
<std::uint64_t>(
154 "librados_thread_count"));
158 void TestRadosClient::get() {
162 void TestRadosClient::put() {
163 if (--m_refcount
== 0) {
169 CephContext
*TestRadosClient::cct() {
173 int TestRadosClient::connect() {
177 void TestRadosClient::shutdown() {
180 int TestRadosClient::wait_for_latest_osdmap() {
184 int TestRadosClient::mon_command(const std::vector
<std::string
>& cmd
,
185 const bufferlist
&inbl
,
186 bufferlist
*outbl
, std::string
*outs
) {
187 for (std::vector
<std::string
>::const_iterator it
= cmd
.begin();
188 it
!= cmd
.end(); ++it
) {
190 if (!parser
.parse(it
->c_str(), it
->length())) {
194 JSONObjIter j_it
= parser
.find("prefix");
199 if ((*j_it
)->get_data() == "osd tier add") {
201 } else if ((*j_it
)->get_data() == "osd tier cache-mode") {
203 } else if ((*j_it
)->get_data() == "osd tier set-overlay") {
205 } else if ((*j_it
)->get_data() == "osd tier remove-overlay") {
207 } else if ((*j_it
)->get_data() == "osd tier remove") {
209 } else if ((*j_it
)->get_data() == "config-key rm") {
211 } else if ((*j_it
)->get_data() == "config set") {
213 } else if ((*j_it
)->get_data() == "df") {
214 std::stringstream str
;
215 str
<< R
"({"pools
": [)";
217 std::list
<std::pair
<int64_t, std::string
>> pools
;
219 for (auto& pool
: pools
) {
220 if (pools
.begin()->first
!= pool
.first
) {
223 str
<< R
"({"name
": ")" << pool.second << R"(", "stats
": )"
224 << R
"({"percent_used
": 1.0, "bytes_used
": 0, "max_avail
": 0}})";
228 outbl
->append(str
.str());
230 } else if ((*j_it
)->get_data() == "osd blocklist") {
231 auto op_it
= parser
.find("blocklistop");
232 if (!op_it
.end() && (*op_it
)->get_data() == "add") {
234 auto expire_it
= parser
.find("expire");
235 if (!expire_it
.end()) {
236 expire
= boost::lexical_cast
<uint32_t>((*expire_it
)->get_data());
239 auto addr_it
= parser
.find("addr");
240 return blocklist_add((*addr_it
)->get_data(), expire
);
247 void TestRadosClient::add_aio_operation(const std::string
& oid
,
249 const AioFunction
&aio_function
,
250 AioCompletionImpl
*c
) {
251 AioFunctionContext
*ctx
= new AioFunctionContext(
252 aio_function
, queue_callback
? m_aio_finisher
: NULL
, c
);
253 get_finisher(oid
)->queue(ctx
);
256 struct WaitForFlush
{
259 aio_finisher
->queue(new LambdaContext(std::bind(
260 &finish_aio_completion
, c
, 0)));
266 std::atomic
<int64_t> count
= { 0 };
267 Finisher
*aio_finisher
;
268 AioCompletionImpl
*c
;
271 void TestRadosClient::flush_aio_operations() {
272 AioCompletionImpl
*comp
= new AioCompletionImpl();
273 flush_aio_operations(comp
);
274 comp
->wait_for_complete();
278 void TestRadosClient::flush_aio_operations(AioCompletionImpl
*c
) {
281 WaitForFlush
*wait_for_flush
= new WaitForFlush();
282 wait_for_flush
->count
= m_finishers
.size();
283 wait_for_flush
->aio_finisher
= m_aio_finisher
;
284 wait_for_flush
->c
= c
;
286 for (size_t i
= 0; i
< m_finishers
.size(); ++i
) {
287 AioFunctionContext
*ctx
= new AioFunctionContext(
288 std::bind(&WaitForFlush::flushed
, wait_for_flush
),
290 m_finishers
[i
]->queue(ctx
);
294 int TestRadosClient::aio_watch_flush(AioCompletionImpl
*c
) {
296 Context
*ctx
= new LambdaContext(std::bind(
297 &TestRadosClient::finish_aio_completion
, this, c
, std::placeholders::_1
));
298 get_watch_notify()->aio_flush(this, ctx
);
302 void TestRadosClient::finish_aio_completion(AioCompletionImpl
*c
, int r
) {
303 librados::finish_aio_completion(c
, r
);
306 Finisher
*TestRadosClient::get_finisher(const std::string
&oid
) {
307 std::size_t h
= m_hash(oid
);
308 return m_finishers
[h
% m_finishers
.size()];
311 } // namespace librados