1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "test/librados_test_stub/TestRadosClient.h"
5 #include "test/librados_test_stub/TestIoCtxImpl.h"
6 #include "librados/AioCompletionImpl.h"
7 #include "include/ceph_assert.h"
8 #include "common/ceph_json.h"
9 #include "common/Finisher.h"
10 #include <boost/bind.hpp>
11 #include <boost/thread.hpp>
17 static int get_concurrency() {
19 char *env
= getenv("LIBRADOS_CONCURRENCY");
21 concurrency
= atoi(env
);
23 if (concurrency
== 0) {
24 concurrency
= boost::thread::thread::hardware_concurrency();
26 if (concurrency
== 0) {
34 static void finish_aio_completion(AioCompletionImpl
*c
, int r
) {
40 rados_callback_t cb_complete
= c
->callback_complete
;
41 void *cb_complete_arg
= c
->callback_complete_arg
;
43 cb_complete(c
, cb_complete_arg
);
46 rados_callback_t cb_safe
= c
->callback_safe
;
47 void *cb_safe_arg
= c
->callback_safe_arg
;
49 cb_safe(c
, cb_safe_arg
);
53 c
->callback_complete
= NULL
;
54 c
->callback_safe
= NULL
;
59 class AioFunctionContext
: public Context
{
61 AioFunctionContext(const TestRadosClient::AioFunction
&callback
,
62 Finisher
*finisher
, AioCompletionImpl
*c
)
63 : m_callback(callback
), m_finisher(finisher
), m_comp(c
)
70 void finish(int r
) override
{
71 int ret
= m_callback();
73 if (m_finisher
!= NULL
) {
74 m_finisher
->queue(new LambdaContext(boost::bind(
75 &finish_aio_completion
, m_comp
, ret
)));
77 finish_aio_completion(m_comp
, ret
);
82 TestRadosClient::AioFunction m_callback
;
84 AioCompletionImpl
*m_comp
;
87 TestRadosClient::TestRadosClient(CephContext
*cct
,
88 TestWatchNotify
*watch_notify
)
89 : m_cct(cct
->get()), m_watch_notify(watch_notify
),
90 m_aio_finisher(new Finisher(m_cct
))
94 // simulate multiple OSDs
95 int concurrency
= get_concurrency();
96 for (int i
= 0; i
< concurrency
; ++i
) {
97 m_finishers
.push_back(new Finisher(m_cct
));
98 m_finishers
.back()->start();
101 // replicate AIO callback processing
102 m_aio_finisher
->start();
105 TestRadosClient::~TestRadosClient() {
106 flush_aio_operations();
108 for (size_t i
= 0; i
< m_finishers
.size(); ++i
) {
109 m_finishers
[i
]->stop();
110 delete m_finishers
[i
];
112 m_aio_finisher
->stop();
113 delete m_aio_finisher
;
119 void TestRadosClient::get() {
123 void TestRadosClient::put() {
124 if (--m_refcount
== 0) {
130 CephContext
*TestRadosClient::cct() {
134 int TestRadosClient::connect() {
138 void TestRadosClient::shutdown() {
141 int TestRadosClient::wait_for_latest_osdmap() {
145 int TestRadosClient::mon_command(const std::vector
<std::string
>& cmd
,
146 const bufferlist
&inbl
,
147 bufferlist
*outbl
, std::string
*outs
) {
148 for (std::vector
<std::string
>::const_iterator it
= cmd
.begin();
149 it
!= cmd
.end(); ++it
) {
151 if (!parser
.parse(it
->c_str(), it
->length())) {
155 JSONObjIter j_it
= parser
.find("prefix");
160 if ((*j_it
)->get_data() == "osd tier add") {
162 } else if ((*j_it
)->get_data() == "osd tier cache-mode") {
164 } else if ((*j_it
)->get_data() == "osd tier set-overlay") {
166 } else if ((*j_it
)->get_data() == "osd tier remove-overlay") {
168 } else if ((*j_it
)->get_data() == "osd tier remove") {
170 } else if ((*j_it
)->get_data() == "config-key rm") {
172 } else if ((*j_it
)->get_data() == "df") {
173 std::stringstream str
;
174 str
<< R
"({"pools
": [)";
176 std::list
<std::pair
<int64_t, std::string
>> pools
;
178 for (auto& pool
: pools
) {
179 if (pools
.begin()->first
!= pool
.first
) {
182 str
<< R
"({"name
": ")" << pool.second << R"(", "stats
": )"
183 << R
"({"percent_used
": 1.0, "bytes_used
": 0, "max_avail
": 0}})";
187 outbl
->append(str
.str());
194 void TestRadosClient::add_aio_operation(const std::string
& oid
,
196 const AioFunction
&aio_function
,
197 AioCompletionImpl
*c
) {
198 AioFunctionContext
*ctx
= new AioFunctionContext(
199 aio_function
, queue_callback
? m_aio_finisher
: NULL
, c
);
200 get_finisher(oid
)->queue(ctx
);
203 struct WaitForFlush
{
206 aio_finisher
->queue(new LambdaContext(boost::bind(
207 &finish_aio_completion
, c
, 0)));
213 std::atomic
<int64_t> count
= { 0 };
214 Finisher
*aio_finisher
;
215 AioCompletionImpl
*c
;
218 void TestRadosClient::flush_aio_operations() {
219 AioCompletionImpl
*comp
= new AioCompletionImpl();
220 flush_aio_operations(comp
);
221 comp
->wait_for_complete();
225 void TestRadosClient::flush_aio_operations(AioCompletionImpl
*c
) {
228 WaitForFlush
*wait_for_flush
= new WaitForFlush();
229 wait_for_flush
->count
= m_finishers
.size();
230 wait_for_flush
->aio_finisher
= m_aio_finisher
;
231 wait_for_flush
->c
= c
;
233 for (size_t i
= 0; i
< m_finishers
.size(); ++i
) {
234 AioFunctionContext
*ctx
= new AioFunctionContext(
235 boost::bind(&WaitForFlush::flushed
, wait_for_flush
),
237 m_finishers
[i
]->queue(ctx
);
241 int TestRadosClient::aio_watch_flush(AioCompletionImpl
*c
) {
243 Context
*ctx
= new LambdaContext(boost::bind(
244 &TestRadosClient::finish_aio_completion
, this, c
, _1
));
245 get_watch_notify()->aio_flush(this, ctx
);
249 void TestRadosClient::finish_aio_completion(AioCompletionImpl
*c
, int r
) {
250 librados::finish_aio_completion(c
, r
);
253 Finisher
*TestRadosClient::get_finisher(const std::string
&oid
) {
254 std::size_t h
= m_hash(oid
);
255 return m_finishers
[h
% m_finishers
.size()];
258 } // namespace librados