]> git.proxmox.com Git - ceph.git/blame - ceph/src/rbd_replay/Replayer.cc
update sources to v12.1.0
[ceph.git] / ceph / src / rbd_replay / Replayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Replayer.hpp"
16#include "common/errno.h"
17#include "rbd_replay/ActionTypes.h"
18#include "rbd_replay/BufferReader.h"
19#include <boost/foreach.hpp>
20#include <boost/thread/thread.hpp>
21#include <boost/scope_exit.hpp>
22#include <fstream>
23#include "global/global_context.h"
24#include "rbd_replay_debug.hpp"
25
26#define dout_context g_ceph_context
27
28using namespace std;
29using namespace rbd_replay;
30
31namespace {
32
33bool is_versioned_replay(BufferReader &buffer_reader) {
34 bufferlist::iterator *it;
35 int r = buffer_reader.fetch(&it);
36 if (r < 0) {
37 return false;
38 }
39
40 if (it->get_remaining() < action::BANNER.size()) {
41 return false;
42 }
43
44 std::string banner;
45 it->copy(action::BANNER.size(), banner);
46 bool versioned = (banner == action::BANNER);
47 if (!versioned) {
48 it->seek(0);
49 }
50 return versioned;
51}
52
53} // anonymous namespace
54
55Worker::Worker(Replayer &replayer)
56 : m_replayer(replayer),
57 m_buffer(100),
58 m_done(false) {
59}
60
61void Worker::start() {
62 m_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&Worker::run, this)));
63}
64
65// Should only be called by StopThreadAction
66void Worker::stop() {
67 m_done = true;
68}
69
70void Worker::join() {
71 m_thread->join();
72}
73
74void Worker::send(Action::ptr action) {
75 assert(action);
76 m_buffer.push_front(action);
77}
78
79void Worker::add_pending(PendingIO::ptr io) {
80 assert(io);
81 boost::mutex::scoped_lock lock(m_pending_ios_mutex);
82 assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id());
83 m_pending_ios[io->id()] = io;
84}
85
86void Worker::run() {
87 dout(THREAD_LEVEL) << "Worker thread started" << dendl;
88 while (!m_done) {
89 Action::ptr action;
90 m_buffer.pop_back(&action);
91 m_replayer.wait_for_actions(action->predecessors());
92 action->perform(*this);
93 m_replayer.set_action_complete(action->id());
94 }
95 {
96 boost::mutex::scoped_lock lock(m_pending_ios_mutex);
97 bool first_time = true;
98 while (!m_pending_ios.empty()) {
99 if (!first_time) {
100 dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl;
101 pair<action_id_t, PendingIO::ptr> p;
102 BOOST_FOREACH(p, m_pending_ios) {
103 dout(THREAD_LEVEL) << "> " << p.first << dendl;
104 }
105 }
106 m_pending_ios_empty.timed_wait(lock, boost::posix_time::seconds(1));
107 first_time = false;
108 }
109 }
110 dout(THREAD_LEVEL) << "Worker thread stopped" << dendl;
111}
112
113
114void Worker::remove_pending(PendingIO::ptr io) {
115 assert(io);
116 m_replayer.set_action_complete(io->id());
117 boost::mutex::scoped_lock lock(m_pending_ios_mutex);
118 size_t num_erased = m_pending_ios.erase(io->id());
119 assertf(num_erased == 1, "id = %d", io->id());
120 if (m_pending_ios.empty()) {
121 m_pending_ios_empty.notify_all();
122 }
123}
124
125
126librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) {
127 return m_replayer.get_image(imagectx_id);
128}
129
130
131void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) {
132 assert(image);
133 m_replayer.put_image(imagectx_id, image);
134}
135
136
137void Worker::erase_image(imagectx_id_t imagectx_id) {
138 m_replayer.erase_image(imagectx_id);
139}
140
141
142librbd::RBD* Worker::rbd() {
143 return m_replayer.get_rbd();
144}
145
146
147librados::IoCtx* Worker::ioctx() {
148 return m_replayer.get_ioctx();
149}
150
151void Worker::set_action_complete(action_id_t id) {
152 m_replayer.set_action_complete(id);
153}
154
155bool Worker::readonly() const {
156 return m_replayer.readonly();
157}
158
159rbd_loc Worker::map_image_name(string image_name, string snap_name) const {
160 return m_replayer.image_name_map().map(rbd_loc("", image_name, snap_name));
161}
162
163
164Replayer::Replayer(int num_action_trackers)
31f18b77
FG
165 : m_rbd(NULL), m_ioctx(0),
166 m_latency_multiplier(1.0),
7c673cae
FG
167 m_readonly(false), m_dump_perf_counters(false),
168 m_num_action_trackers(num_action_trackers),
169 m_action_trackers(new action_tracker_d[m_num_action_trackers]) {
170 assertf(num_action_trackers > 0, "num_action_trackers = %d", num_action_trackers);
171}
172
173Replayer::~Replayer() {
174 delete[] m_action_trackers;
175}
176
177Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) {
178 return m_action_trackers[id % m_num_action_trackers];
179}
180
181void Replayer::run(const std::string& replay_file) {
182 {
183 librados::Rados rados;
184 rados.init(NULL);
185 int r = rados.init_with_context(g_ceph_context);
186 if (r) {
187 cerr << "Failed to initialize RADOS: " << cpp_strerror(r) << std::endl;
188 goto out;
189 }
190 r = rados.connect();
191 if (r) {
192 cerr << "Failed to connect to cluster: " << cpp_strerror(r) << std::endl;
193 goto out;
194 }
31f18b77
FG
195
196 if (m_pool_name.empty()) {
197 r = rados.conf_get("rbd_default_pool", m_pool_name);
198 if (r < 0) {
199 cerr << "Failed to retrieve default pool: " << cpp_strerror(r)
200 << std::endl;
201 goto out;
202 }
203 }
204
7c673cae
FG
205 m_ioctx = new librados::IoCtx();
206 {
207 r = rados.ioctx_create(m_pool_name.c_str(), *m_ioctx);
208 if (r) {
209 cerr << "Failed to open pool " << m_pool_name << ": "
210 << cpp_strerror(r) << std::endl;
211 goto out2;
212 }
213 m_rbd = new librbd::RBD();
214 map<thread_id_t, Worker*> workers;
215
216 int fd = open(replay_file.c_str(), O_RDONLY);
217 if (fd < 0) {
218 std::cerr << "Failed to open " << replay_file << ": "
219 << cpp_strerror(errno) << std::endl;
220 exit(1);
221 }
222 BOOST_SCOPE_EXIT( (fd) ) {
223 close(fd);
224 } BOOST_SCOPE_EXIT_END;
225
226 BufferReader buffer_reader(fd);
227 bool versioned = is_versioned_replay(buffer_reader);
228 while (true) {
229 action::ActionEntry action_entry;
230 try {
231 bufferlist::iterator *it;
232 int r = buffer_reader.fetch(&it);
233 if (r < 0) {
234 std::cerr << "Failed to read from trace file: " << cpp_strerror(r)
235 << std::endl;
236 exit(-r);
237 }
238 if (it->get_remaining() == 0) {
239 break;
240 }
241
242 if (versioned) {
243 action_entry.decode(*it);
244 } else {
245 action_entry.decode_unversioned(*it);
246 }
247 } catch (const buffer::error &err) {
248 std::cerr << "Failed to decode trace action: " << err.what() << std::endl;
249 exit(1);
250 }
251
252 Action::ptr action = Action::construct(action_entry);
253 if (!action) {
254 // unknown / unsupported action
255 continue;
256 }
257
258 if (action->is_start_thread()) {
259 Worker *worker = new Worker(*this);
260 workers[action->thread_id()] = worker;
261 worker->start();
262 } else {
263 workers[action->thread_id()]->send(action);
264 }
265 }
266
267 dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl;
268 pair<thread_id_t, Worker*> w;
269 BOOST_FOREACH(w, workers) {
270 w.second->join();
271 delete w.second;
272 }
273 clear_images();
274 delete m_rbd;
275 m_rbd = NULL;
276 }
277 out2:
278 delete m_ioctx;
279 m_ioctx = NULL;
280 }
281 out:
282 ;
283}
284
285
286librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) {
287 boost::shared_lock<boost::shared_mutex> lock(m_images_mutex);
288 return m_images[imagectx_id];
289}
290
291void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) {
292 assert(image);
293 boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
294 assert(m_images.count(imagectx_id) == 0);
295 m_images[imagectx_id] = image;
296}
297
298void Replayer::erase_image(imagectx_id_t imagectx_id) {
299 boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
300 librbd::Image* image = m_images[imagectx_id];
301 if (m_dump_perf_counters) {
302 string command = "perf dump";
303 cmdmap_t cmdmap;
304 string format = "json-pretty";
305 bufferlist out;
306 g_ceph_context->do_command(command, cmdmap, format, &out);
307 out.write_stream(cout);
308 cout << std::endl;
309 cout.flush();
310 }
311 delete image;
312 m_images.erase(imagectx_id);
313}
314
315void Replayer::set_action_complete(action_id_t id) {
316 dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
317 boost::system_time now(boost::get_system_time());
318 action_tracker_d &tracker = tracker_for(id);
319 boost::unique_lock<boost::shared_mutex> lock(tracker.mutex);
320 assert(tracker.actions.count(id) == 0);
321 tracker.actions[id] = now;
322 tracker.condition.notify_all();
323}
324
325bool Replayer::is_action_complete(action_id_t id) {
326 action_tracker_d &tracker = tracker_for(id);
327 boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
328 return tracker.actions.count(id) > 0;
329}
330
331void Replayer::wait_for_actions(const action::Dependencies &deps) {
332 boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
333 BOOST_FOREACH(const action::Dependency &dep, deps) {
334 dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
335 boost::system_time start_time(boost::get_system_time());
336 action_tracker_d &tracker = tracker_for(dep.id);
337 boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
338 bool first_time = true;
339 while (tracker.actions.count(dep.id) == 0) {
340 if (!first_time) {
341 dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
342 }
343 tracker.condition.timed_wait(lock, boost::posix_time::seconds(1));
344 first_time = false;
345 }
346 boost::system_time action_completed_time(tracker.actions[dep.id]);
347 lock.unlock();
348 boost::system_time end_time(boost::get_system_time());
349 long long micros = (end_time - start_time).total_microseconds();
350 dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl;
351 // Apparently the nanoseconds constructor is optional:
352 // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
353 boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000));
354 if (sub_release_time > release_time) {
355 release_time = sub_release_time;
356 }
357 }
358 if (release_time > boost::get_system_time()) {
359 dout(SLEEP_LEVEL) << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << dendl;
360 boost::this_thread::sleep(release_time);
361 }
362}
363
364void Replayer::clear_images() {
365 boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
366 if (m_dump_perf_counters && !m_images.empty()) {
367 string command = "perf dump";
368 cmdmap_t cmdmap;
369 string format = "json-pretty";
370 bufferlist out;
371 g_ceph_context->do_command(command, cmdmap, format, &out);
372 out.write_stream(cout);
373 cout << std::endl;
374 cout.flush();
375 }
376 pair<imagectx_id_t, librbd::Image*> p;
377 BOOST_FOREACH(p, m_images) {
378 delete p.second;
379 }
380 m_images.clear();
381}
382
383void Replayer::set_latency_multiplier(float f) {
384 assertf(f >= 0, "f = %f", f);
385 m_latency_multiplier = f;
386}
387
388bool Replayer::readonly() const {
389 return m_readonly;
390}
391
392void Replayer::set_readonly(bool readonly) {
393 m_readonly = readonly;
394}
395
396string Replayer::pool_name() const {
397 return m_pool_name;
398}
399
400void Replayer::set_pool_name(string pool_name) {
401 m_pool_name = pool_name;
402}