]> git.proxmox.com Git - ceph.git/blame - ceph/src/rbd_replay/Replayer.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rbd_replay / Replayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Replayer.hpp"
16#include "common/errno.h"
11fdf7f2 17#include "include/scope_guard.h"
7c673cae
FG
18#include "rbd_replay/ActionTypes.h"
19#include "rbd_replay/BufferReader.h"
20#include <boost/foreach.hpp>
11fdf7f2
TL
21#include <chrono>
22#include <condition_variable>
23#include <thread>
7c673cae
FG
24#include <fstream>
25#include "global/global_context.h"
26#include "rbd_replay_debug.hpp"
27
28#define dout_context g_ceph_context
29
7c673cae
FG
30using namespace rbd_replay;
31
32namespace {
33
34bool is_versioned_replay(BufferReader &buffer_reader) {
11fdf7f2 35 bufferlist::const_iterator *it;
7c673cae
FG
36 int r = buffer_reader.fetch(&it);
37 if (r < 0) {
38 return false;
39 }
40
41 if (it->get_remaining() < action::BANNER.size()) {
42 return false;
43 }
44
45 std::string banner;
46 it->copy(action::BANNER.size(), banner);
47 bool versioned = (banner == action::BANNER);
48 if (!versioned) {
49 it->seek(0);
50 }
51 return versioned;
52}
53
54} // anonymous namespace
55
56Worker::Worker(Replayer &replayer)
57 : m_replayer(replayer),
58 m_buffer(100),
59 m_done(false) {
60}
61
62void Worker::start() {
11fdf7f2 63 m_thread = std::make_shared<std::thread>(&Worker::run, this);
7c673cae
FG
64}
65
66// Should only be called by StopThreadAction
67void Worker::stop() {
68 m_done = true;
69}
70
71void Worker::join() {
72 m_thread->join();
73}
74
75void Worker::send(Action::ptr action) {
11fdf7f2 76 ceph_assert(action);
7c673cae
FG
77 m_buffer.push_front(action);
78}
79
80void Worker::add_pending(PendingIO::ptr io) {
11fdf7f2
TL
81 ceph_assert(io);
82 std::scoped_lock lock{m_pending_ios_mutex};
7c673cae
FG
83 assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id());
84 m_pending_ios[io->id()] = io;
85}
86
87void Worker::run() {
88 dout(THREAD_LEVEL) << "Worker thread started" << dendl;
89 while (!m_done) {
90 Action::ptr action;
91 m_buffer.pop_back(&action);
92 m_replayer.wait_for_actions(action->predecessors());
93 action->perform(*this);
94 m_replayer.set_action_complete(action->id());
95 }
96 {
11fdf7f2 97 std::unique_lock lock{m_pending_ios_mutex};
7c673cae
FG
98 bool first_time = true;
99 while (!m_pending_ios.empty()) {
100 if (!first_time) {
101 dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl;
102 pair<action_id_t, PendingIO::ptr> p;
103 BOOST_FOREACH(p, m_pending_ios) {
104 dout(THREAD_LEVEL) << "> " << p.first << dendl;
105 }
106 }
11fdf7f2 107 m_pending_ios_empty.wait_for(lock, std::chrono::seconds(1));
7c673cae
FG
108 first_time = false;
109 }
110 }
111 dout(THREAD_LEVEL) << "Worker thread stopped" << dendl;
112}
113
114
115void Worker::remove_pending(PendingIO::ptr io) {
11fdf7f2 116 ceph_assert(io);
7c673cae 117 m_replayer.set_action_complete(io->id());
11fdf7f2 118 std::scoped_lock lock{m_pending_ios_mutex};
7c673cae
FG
119 size_t num_erased = m_pending_ios.erase(io->id());
120 assertf(num_erased == 1, "id = %d", io->id());
121 if (m_pending_ios.empty()) {
122 m_pending_ios_empty.notify_all();
123 }
124}
125
126
127librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) {
128 return m_replayer.get_image(imagectx_id);
129}
130
131
132void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) {
11fdf7f2 133 ceph_assert(image);
7c673cae
FG
134 m_replayer.put_image(imagectx_id, image);
135}
136
137
138void Worker::erase_image(imagectx_id_t imagectx_id) {
139 m_replayer.erase_image(imagectx_id);
140}
141
142
143librbd::RBD* Worker::rbd() {
144 return m_replayer.get_rbd();
145}
146
147
148librados::IoCtx* Worker::ioctx() {
149 return m_replayer.get_ioctx();
150}
151
152void Worker::set_action_complete(action_id_t id) {
153 m_replayer.set_action_complete(id);
154}
155
156bool Worker::readonly() const {
157 return m_replayer.readonly();
158}
159
160rbd_loc Worker::map_image_name(string image_name, string snap_name) const {
161 return m_replayer.image_name_map().map(rbd_loc("", image_name, snap_name));
162}
163
164
165Replayer::Replayer(int num_action_trackers)
31f18b77
FG
166 : m_rbd(NULL), m_ioctx(0),
167 m_latency_multiplier(1.0),
7c673cae
FG
168 m_readonly(false), m_dump_perf_counters(false),
169 m_num_action_trackers(num_action_trackers),
170 m_action_trackers(new action_tracker_d[m_num_action_trackers]) {
171 assertf(num_action_trackers > 0, "num_action_trackers = %d", num_action_trackers);
172}
173
174Replayer::~Replayer() {
175 delete[] m_action_trackers;
176}
177
178Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) {
179 return m_action_trackers[id % m_num_action_trackers];
180}
181
182void Replayer::run(const std::string& replay_file) {
183 {
184 librados::Rados rados;
185 rados.init(NULL);
186 int r = rados.init_with_context(g_ceph_context);
187 if (r) {
188 cerr << "Failed to initialize RADOS: " << cpp_strerror(r) << std::endl;
189 goto out;
190 }
191 r = rados.connect();
192 if (r) {
193 cerr << "Failed to connect to cluster: " << cpp_strerror(r) << std::endl;
194 goto out;
195 }
31f18b77
FG
196
197 if (m_pool_name.empty()) {
198 r = rados.conf_get("rbd_default_pool", m_pool_name);
199 if (r < 0) {
200 cerr << "Failed to retrieve default pool: " << cpp_strerror(r)
201 << std::endl;
202 goto out;
203 }
204 }
205
7c673cae
FG
206 m_ioctx = new librados::IoCtx();
207 {
208 r = rados.ioctx_create(m_pool_name.c_str(), *m_ioctx);
209 if (r) {
210 cerr << "Failed to open pool " << m_pool_name << ": "
211 << cpp_strerror(r) << std::endl;
212 goto out2;
213 }
214 m_rbd = new librbd::RBD();
215 map<thread_id_t, Worker*> workers;
216
217 int fd = open(replay_file.c_str(), O_RDONLY);
218 if (fd < 0) {
219 std::cerr << "Failed to open " << replay_file << ": "
220 << cpp_strerror(errno) << std::endl;
221 exit(1);
222 }
11fdf7f2 223 auto close_fd = make_scope_guard([fd] { close(fd); });
7c673cae
FG
224
225 BufferReader buffer_reader(fd);
226 bool versioned = is_versioned_replay(buffer_reader);
227 while (true) {
228 action::ActionEntry action_entry;
229 try {
11fdf7f2 230 bufferlist::const_iterator *it;
7c673cae
FG
231 int r = buffer_reader.fetch(&it);
232 if (r < 0) {
233 std::cerr << "Failed to read from trace file: " << cpp_strerror(r)
234 << std::endl;
235 exit(-r);
236 }
237 if (it->get_remaining() == 0) {
238 break;
239 }
240
241 if (versioned) {
242 action_entry.decode(*it);
243 } else {
244 action_entry.decode_unversioned(*it);
245 }
246 } catch (const buffer::error &err) {
247 std::cerr << "Failed to decode trace action: " << err.what() << std::endl;
248 exit(1);
249 }
250
251 Action::ptr action = Action::construct(action_entry);
252 if (!action) {
253 // unknown / unsupported action
254 continue;
255 }
256
257 if (action->is_start_thread()) {
258 Worker *worker = new Worker(*this);
259 workers[action->thread_id()] = worker;
260 worker->start();
261 } else {
262 workers[action->thread_id()]->send(action);
263 }
264 }
265
266 dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl;
267 pair<thread_id_t, Worker*> w;
268 BOOST_FOREACH(w, workers) {
269 w.second->join();
270 delete w.second;
271 }
272 clear_images();
273 delete m_rbd;
274 m_rbd = NULL;
275 }
276 out2:
277 delete m_ioctx;
278 m_ioctx = NULL;
279 }
280 out:
281 ;
282}
283
284
285librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) {
11fdf7f2 286 std::scoped_lock lock{m_images_mutex};
7c673cae
FG
287 return m_images[imagectx_id];
288}
289
290void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) {
11fdf7f2
TL
291 ceph_assert(image);
292 std::unique_lock lock{m_images_mutex};
293 ceph_assert(m_images.count(imagectx_id) == 0);
7c673cae
FG
294 m_images[imagectx_id] = image;
295}
296
297void Replayer::erase_image(imagectx_id_t imagectx_id) {
11fdf7f2 298 std::unique_lock lock{m_images_mutex};
7c673cae
FG
299 librbd::Image* image = m_images[imagectx_id];
300 if (m_dump_perf_counters) {
301 string command = "perf dump";
302 cmdmap_t cmdmap;
9f95a23c 303 JSONFormatter jf(true);
7c673cae 304 bufferlist out;
9f95a23c
TL
305 stringstream ss;
306 g_ceph_context->do_command(command, cmdmap, &jf, ss, &out);
307 jf.flush(cout);
7c673cae
FG
308 cout << std::endl;
309 cout.flush();
310 }
311 delete image;
312 m_images.erase(imagectx_id);
313}
314
315void Replayer::set_action_complete(action_id_t id) {
316 dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
11fdf7f2 317 auto now = std::chrono::system_clock::now();
7c673cae 318 action_tracker_d &tracker = tracker_for(id);
11fdf7f2
TL
319 std::unique_lock lock{tracker.mutex};
320 ceph_assert(tracker.actions.count(id) == 0);
7c673cae
FG
321 tracker.actions[id] = now;
322 tracker.condition.notify_all();
323}
324
325bool Replayer::is_action_complete(action_id_t id) {
326 action_tracker_d &tracker = tracker_for(id);
11fdf7f2 327 std::shared_lock lock{tracker.mutex};
7c673cae
FG
328 return tracker.actions.count(id) > 0;
329}
330
331void Replayer::wait_for_actions(const action::Dependencies &deps) {
11fdf7f2
TL
332 auto release_time = std::chrono::time_point<std::chrono::system_clock>::min();
333 for(auto& dep : deps) {
7c673cae 334 dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
11fdf7f2 335 auto start_time = std::chrono::system_clock::now();
7c673cae 336 action_tracker_d &tracker = tracker_for(dep.id);
11fdf7f2 337 std::unique_lock lock{tracker.mutex};
7c673cae
FG
338 bool first_time = true;
339 while (tracker.actions.count(dep.id) == 0) {
340 if (!first_time) {
341 dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
342 }
11fdf7f2 343 tracker.condition.wait_for(lock, std::chrono::seconds(1));
7c673cae
FG
344 first_time = false;
345 }
11fdf7f2 346 auto action_completed_time(tracker.actions[dep.id]);
7c673cae 347 lock.unlock();
11fdf7f2
TL
348 auto end_time = std::chrono::system_clock::now();
349 auto micros = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
7c673cae
FG
350 dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl;
351 // Apparently the nanoseconds constructor is optional:
352 // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
11fdf7f2
TL
353 auto sub_release_time{action_completed_time +
354 std::chrono::microseconds{static_cast<long long>(dep.time_delta * m_latency_multiplier / 1000)}};
7c673cae
FG
355 if (sub_release_time > release_time) {
356 release_time = sub_release_time;
357 }
358 }
11fdf7f2
TL
359 if (release_time > std::chrono::system_clock::now()) {
360 auto sleep_for = release_time - std::chrono::system_clock::now();
361 dout(SLEEP_LEVEL) << "Sleeping for "
362 << std::chrono::duration_cast<std::chrono::microseconds>(sleep_for).count()
363 << " microseconds" << dendl;
364 std::this_thread::sleep_until(release_time);
7c673cae
FG
365 }
366}
367
368void Replayer::clear_images() {
11fdf7f2 369 std::shared_lock lock{m_images_mutex};
7c673cae
FG
370 if (m_dump_perf_counters && !m_images.empty()) {
371 string command = "perf dump";
372 cmdmap_t cmdmap;
9f95a23c 373 JSONFormatter jf(true);
7c673cae 374 bufferlist out;
9f95a23c
TL
375 stringstream ss;
376 g_ceph_context->do_command(command, cmdmap, &jf, ss, &out);
377 jf.flush(cout);
7c673cae 378 cout << std::endl;
9f95a23c 379 cout.flush();
7c673cae 380 }
11fdf7f2 381 for (auto& p : m_images) {
7c673cae
FG
382 delete p.second;
383 }
384 m_images.clear();
385}
386
387void Replayer::set_latency_multiplier(float f) {
388 assertf(f >= 0, "f = %f", f);
389 m_latency_multiplier = f;
390}
391
392bool Replayer::readonly() const {
393 return m_readonly;
394}
395
396void Replayer::set_readonly(bool readonly) {
397 m_readonly = readonly;
398}
399
400string Replayer::pool_name() const {
401 return m_pool_name;
402}
403
404void Replayer::set_pool_name(string pool_name) {
405 m_pool_name = pool_name;
406}