1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "Replayer.hpp"
16 #include "common/errno.h"
17 #include "rbd_replay/ActionTypes.h"
18 #include "rbd_replay/BufferReader.h"
19 #include <boost/foreach.hpp>
20 #include <boost/thread/thread.hpp>
21 #include <boost/scope_exit.hpp>
23 #include "global/global_context.h"
24 #include "rbd_replay_debug.hpp"
26 #define dout_context g_ceph_context
29 using namespace rbd_replay
;
33 bool is_versioned_replay(BufferReader
&buffer_reader
) {
34 bufferlist::iterator
*it
;
35 int r
= buffer_reader
.fetch(&it
);
40 if (it
->get_remaining() < action::BANNER
.size()) {
45 it
->copy(action::BANNER
.size(), banner
);
46 bool versioned
= (banner
== action::BANNER
);
53 } // anonymous namespace
55 Worker::Worker(Replayer
&replayer
)
56 : m_replayer(replayer
),
61 void Worker::start() {
62 m_thread
= boost::shared_ptr
<boost::thread
>(new boost::thread(boost::bind(&Worker::run
, this)));
65 // Should only be called by StopThreadAction
74 void Worker::send(Action::ptr action
) {
76 m_buffer
.push_front(action
);
79 void Worker::add_pending(PendingIO::ptr io
) {
81 boost::mutex::scoped_lock
lock(m_pending_ios_mutex
);
82 assertf(m_pending_ios
.count(io
->id()) == 0, "id = %d", io
->id());
83 m_pending_ios
[io
->id()] = io
;
87 dout(THREAD_LEVEL
) << "Worker thread started" << dendl
;
90 m_buffer
.pop_back(&action
);
91 m_replayer
.wait_for_actions(action
->predecessors());
92 action
->perform(*this);
93 m_replayer
.set_action_complete(action
->id());
96 boost::mutex::scoped_lock
lock(m_pending_ios_mutex
);
97 bool first_time
= true;
98 while (!m_pending_ios
.empty()) {
100 dout(THREAD_LEVEL
) << "Worker thread trying to stop, still waiting for " << m_pending_ios
.size() << " pending IOs to complete:" << dendl
;
101 pair
<action_id_t
, PendingIO::ptr
> p
;
102 BOOST_FOREACH(p
, m_pending_ios
) {
103 dout(THREAD_LEVEL
) << "> " << p
.first
<< dendl
;
106 m_pending_ios_empty
.timed_wait(lock
, boost::posix_time::seconds(1));
110 dout(THREAD_LEVEL
) << "Worker thread stopped" << dendl
;
114 void Worker::remove_pending(PendingIO::ptr io
) {
116 m_replayer
.set_action_complete(io
->id());
117 boost::mutex::scoped_lock
lock(m_pending_ios_mutex
);
118 size_t num_erased
= m_pending_ios
.erase(io
->id());
119 assertf(num_erased
== 1, "id = %d", io
->id());
120 if (m_pending_ios
.empty()) {
121 m_pending_ios_empty
.notify_all();
126 librbd::Image
* Worker::get_image(imagectx_id_t imagectx_id
) {
127 return m_replayer
.get_image(imagectx_id
);
131 void Worker::put_image(imagectx_id_t imagectx_id
, librbd::Image
* image
) {
133 m_replayer
.put_image(imagectx_id
, image
);
137 void Worker::erase_image(imagectx_id_t imagectx_id
) {
138 m_replayer
.erase_image(imagectx_id
);
142 librbd::RBD
* Worker::rbd() {
143 return m_replayer
.get_rbd();
147 librados::IoCtx
* Worker::ioctx() {
148 return m_replayer
.get_ioctx();
151 void Worker::set_action_complete(action_id_t id
) {
152 m_replayer
.set_action_complete(id
);
155 bool Worker::readonly() const {
156 return m_replayer
.readonly();
159 rbd_loc
Worker::map_image_name(string image_name
, string snap_name
) const {
160 return m_replayer
.image_name_map().map(rbd_loc("", image_name
, snap_name
));
164 Replayer::Replayer(int num_action_trackers
)
165 : m_rbd(NULL
), m_ioctx(0),
166 m_pool_name("rbd"), m_latency_multiplier(1.0),
167 m_readonly(false), m_dump_perf_counters(false),
168 m_num_action_trackers(num_action_trackers
),
169 m_action_trackers(new action_tracker_d
[m_num_action_trackers
]) {
170 assertf(num_action_trackers
> 0, "num_action_trackers = %d", num_action_trackers
);
173 Replayer::~Replayer() {
174 delete[] m_action_trackers
;
177 Replayer::action_tracker_d
&Replayer::tracker_for(action_id_t id
) {
178 return m_action_trackers
[id
% m_num_action_trackers
];
181 void Replayer::run(const std::string
& replay_file
) {
183 librados::Rados rados
;
185 int r
= rados
.init_with_context(g_ceph_context
);
187 cerr
<< "Failed to initialize RADOS: " << cpp_strerror(r
) << std::endl
;
192 cerr
<< "Failed to connect to cluster: " << cpp_strerror(r
) << std::endl
;
195 m_ioctx
= new librados::IoCtx();
197 r
= rados
.ioctx_create(m_pool_name
.c_str(), *m_ioctx
);
199 cerr
<< "Failed to open pool " << m_pool_name
<< ": "
200 << cpp_strerror(r
) << std::endl
;
203 m_rbd
= new librbd::RBD();
204 map
<thread_id_t
, Worker
*> workers
;
206 int fd
= open(replay_file
.c_str(), O_RDONLY
);
208 std::cerr
<< "Failed to open " << replay_file
<< ": "
209 << cpp_strerror(errno
) << std::endl
;
212 BOOST_SCOPE_EXIT( (fd
) ) {
214 } BOOST_SCOPE_EXIT_END
;
216 BufferReader
buffer_reader(fd
);
217 bool versioned
= is_versioned_replay(buffer_reader
);
219 action::ActionEntry action_entry
;
221 bufferlist::iterator
*it
;
222 int r
= buffer_reader
.fetch(&it
);
224 std::cerr
<< "Failed to read from trace file: " << cpp_strerror(r
)
228 if (it
->get_remaining() == 0) {
233 action_entry
.decode(*it
);
235 action_entry
.decode_unversioned(*it
);
237 } catch (const buffer::error
&err
) {
238 std::cerr
<< "Failed to decode trace action: " << err
.what() << std::endl
;
242 Action::ptr action
= Action::construct(action_entry
);
244 // unknown / unsupported action
248 if (action
->is_start_thread()) {
249 Worker
*worker
= new Worker(*this);
250 workers
[action
->thread_id()] = worker
;
253 workers
[action
->thread_id()]->send(action
);
257 dout(THREAD_LEVEL
) << "Waiting for workers to die" << dendl
;
258 pair
<thread_id_t
, Worker
*> w
;
259 BOOST_FOREACH(w
, workers
) {
276 librbd::Image
* Replayer::get_image(imagectx_id_t imagectx_id
) {
277 boost::shared_lock
<boost::shared_mutex
> lock(m_images_mutex
);
278 return m_images
[imagectx_id
];
281 void Replayer::put_image(imagectx_id_t imagectx_id
, librbd::Image
*image
) {
283 boost::unique_lock
<boost::shared_mutex
> lock(m_images_mutex
);
284 assert(m_images
.count(imagectx_id
) == 0);
285 m_images
[imagectx_id
] = image
;
288 void Replayer::erase_image(imagectx_id_t imagectx_id
) {
289 boost::unique_lock
<boost::shared_mutex
> lock(m_images_mutex
);
290 librbd::Image
* image
= m_images
[imagectx_id
];
291 if (m_dump_perf_counters
) {
292 string command
= "perf dump";
294 string format
= "json-pretty";
296 g_ceph_context
->do_command(command
, cmdmap
, format
, &out
);
297 out
.write_stream(cout
);
302 m_images
.erase(imagectx_id
);
305 void Replayer::set_action_complete(action_id_t id
) {
306 dout(DEPGRAPH_LEVEL
) << "ActionTracker::set_complete(" << id
<< ")" << dendl
;
307 boost::system_time
now(boost::get_system_time());
308 action_tracker_d
&tracker
= tracker_for(id
);
309 boost::unique_lock
<boost::shared_mutex
> lock(tracker
.mutex
);
310 assert(tracker
.actions
.count(id
) == 0);
311 tracker
.actions
[id
] = now
;
312 tracker
.condition
.notify_all();
315 bool Replayer::is_action_complete(action_id_t id
) {
316 action_tracker_d
&tracker
= tracker_for(id
);
317 boost::shared_lock
<boost::shared_mutex
> lock(tracker
.mutex
);
318 return tracker
.actions
.count(id
) > 0;
321 void Replayer::wait_for_actions(const action::Dependencies
&deps
) {
322 boost::posix_time::ptime
release_time(boost::posix_time::neg_infin
);
323 BOOST_FOREACH(const action::Dependency
&dep
, deps
) {
324 dout(DEPGRAPH_LEVEL
) << "Waiting for " << dep
.id
<< dendl
;
325 boost::system_time
start_time(boost::get_system_time());
326 action_tracker_d
&tracker
= tracker_for(dep
.id
);
327 boost::shared_lock
<boost::shared_mutex
> lock(tracker
.mutex
);
328 bool first_time
= true;
329 while (tracker
.actions
.count(dep
.id
) == 0) {
331 dout(DEPGRAPH_LEVEL
) << "Still waiting for " << dep
.id
<< dendl
;
333 tracker
.condition
.timed_wait(lock
, boost::posix_time::seconds(1));
336 boost::system_time
action_completed_time(tracker
.actions
[dep
.id
]);
338 boost::system_time
end_time(boost::get_system_time());
339 long long micros
= (end_time
- start_time
).total_microseconds();
340 dout(DEPGRAPH_LEVEL
) << "Finished waiting for " << dep
.id
<< " after " << micros
<< " microseconds" << dendl
;
341 // Apparently the nanoseconds constructor is optional:
342 // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
343 boost::system_time
sub_release_time(action_completed_time
+ boost::posix_time::microseconds(dep
.time_delta
* m_latency_multiplier
/ 1000));
344 if (sub_release_time
> release_time
) {
345 release_time
= sub_release_time
;
348 if (release_time
> boost::get_system_time()) {
349 dout(SLEEP_LEVEL
) << "Sleeping for " << (release_time
- boost::get_system_time()).total_microseconds() << " microseconds" << dendl
;
350 boost::this_thread::sleep(release_time
);
354 void Replayer::clear_images() {
355 boost::unique_lock
<boost::shared_mutex
> lock(m_images_mutex
);
356 if (m_dump_perf_counters
&& !m_images
.empty()) {
357 string command
= "perf dump";
359 string format
= "json-pretty";
361 g_ceph_context
->do_command(command
, cmdmap
, format
, &out
);
362 out
.write_stream(cout
);
366 pair
<imagectx_id_t
, librbd::Image
*> p
;
367 BOOST_FOREACH(p
, m_images
) {
373 void Replayer::set_latency_multiplier(float f
) {
374 assertf(f
>= 0, "f = %f", f
);
375 m_latency_multiplier
= f
;
378 bool Replayer::readonly() const {
382 void Replayer::set_readonly(bool readonly
) {
383 m_readonly
= readonly
;
386 string
Replayer::pool_name() const {
390 void Replayer::set_pool_name(string pool_name
) {
391 m_pool_name
= pool_name
;