]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "Replayer.hpp" | |
16 | #include "common/errno.h" | |
11fdf7f2 | 17 | #include "include/scope_guard.h" |
7c673cae FG |
18 | #include "rbd_replay/ActionTypes.h" |
19 | #include "rbd_replay/BufferReader.h" | |
20 | #include <boost/foreach.hpp> | |
11fdf7f2 TL |
21 | #include <chrono> |
22 | #include <condition_variable> | |
23 | #include <thread> | |
7c673cae FG |
24 | #include <fstream> |
25 | #include "global/global_context.h" | |
26 | #include "rbd_replay_debug.hpp" | |
27 | ||
28 | #define dout_context g_ceph_context | |
29 | ||
7c673cae FG |
30 | using namespace rbd_replay; |
31 | ||
32 | namespace { | |
33 | ||
34 | bool is_versioned_replay(BufferReader &buffer_reader) { | |
11fdf7f2 | 35 | bufferlist::const_iterator *it; |
7c673cae FG |
36 | int r = buffer_reader.fetch(&it); |
37 | if (r < 0) { | |
38 | return false; | |
39 | } | |
40 | ||
41 | if (it->get_remaining() < action::BANNER.size()) { | |
42 | return false; | |
43 | } | |
44 | ||
45 | std::string banner; | |
46 | it->copy(action::BANNER.size(), banner); | |
47 | bool versioned = (banner == action::BANNER); | |
48 | if (!versioned) { | |
49 | it->seek(0); | |
50 | } | |
51 | return versioned; | |
52 | } | |
53 | ||
54 | } // anonymous namespace | |
55 | ||
56 | Worker::Worker(Replayer &replayer) | |
57 | : m_replayer(replayer), | |
58 | m_buffer(100), | |
59 | m_done(false) { | |
60 | } | |
61 | ||
62 | void Worker::start() { | |
11fdf7f2 | 63 | m_thread = std::make_shared<std::thread>(&Worker::run, this); |
7c673cae FG |
64 | } |
65 | ||
66 | // Should only be called by StopThreadAction | |
67 | void Worker::stop() { | |
68 | m_done = true; | |
69 | } | |
70 | ||
71 | void Worker::join() { | |
72 | m_thread->join(); | |
73 | } | |
74 | ||
75 | void Worker::send(Action::ptr action) { | |
11fdf7f2 | 76 | ceph_assert(action); |
7c673cae FG |
77 | m_buffer.push_front(action); |
78 | } | |
79 | ||
80 | void Worker::add_pending(PendingIO::ptr io) { | |
11fdf7f2 TL |
81 | ceph_assert(io); |
82 | std::scoped_lock lock{m_pending_ios_mutex}; | |
7c673cae FG |
83 | assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id()); |
84 | m_pending_ios[io->id()] = io; | |
85 | } | |
86 | ||
87 | void Worker::run() { | |
88 | dout(THREAD_LEVEL) << "Worker thread started" << dendl; | |
89 | while (!m_done) { | |
90 | Action::ptr action; | |
91 | m_buffer.pop_back(&action); | |
92 | m_replayer.wait_for_actions(action->predecessors()); | |
93 | action->perform(*this); | |
94 | m_replayer.set_action_complete(action->id()); | |
95 | } | |
96 | { | |
11fdf7f2 | 97 | std::unique_lock lock{m_pending_ios_mutex}; |
7c673cae FG |
98 | bool first_time = true; |
99 | while (!m_pending_ios.empty()) { | |
100 | if (!first_time) { | |
101 | dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl; | |
102 | pair<action_id_t, PendingIO::ptr> p; | |
103 | BOOST_FOREACH(p, m_pending_ios) { | |
104 | dout(THREAD_LEVEL) << "> " << p.first << dendl; | |
105 | } | |
106 | } | |
11fdf7f2 | 107 | m_pending_ios_empty.wait_for(lock, std::chrono::seconds(1)); |
7c673cae FG |
108 | first_time = false; |
109 | } | |
110 | } | |
111 | dout(THREAD_LEVEL) << "Worker thread stopped" << dendl; | |
112 | } | |
113 | ||
114 | ||
115 | void Worker::remove_pending(PendingIO::ptr io) { | |
11fdf7f2 | 116 | ceph_assert(io); |
7c673cae | 117 | m_replayer.set_action_complete(io->id()); |
11fdf7f2 | 118 | std::scoped_lock lock{m_pending_ios_mutex}; |
7c673cae FG |
119 | size_t num_erased = m_pending_ios.erase(io->id()); |
120 | assertf(num_erased == 1, "id = %d", io->id()); | |
121 | if (m_pending_ios.empty()) { | |
122 | m_pending_ios_empty.notify_all(); | |
123 | } | |
124 | } | |
125 | ||
126 | ||
127 | librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) { | |
128 | return m_replayer.get_image(imagectx_id); | |
129 | } | |
130 | ||
131 | ||
132 | void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) { | |
11fdf7f2 | 133 | ceph_assert(image); |
7c673cae FG |
134 | m_replayer.put_image(imagectx_id, image); |
135 | } | |
136 | ||
137 | ||
138 | void Worker::erase_image(imagectx_id_t imagectx_id) { | |
139 | m_replayer.erase_image(imagectx_id); | |
140 | } | |
141 | ||
142 | ||
143 | librbd::RBD* Worker::rbd() { | |
144 | return m_replayer.get_rbd(); | |
145 | } | |
146 | ||
147 | ||
148 | librados::IoCtx* Worker::ioctx() { | |
149 | return m_replayer.get_ioctx(); | |
150 | } | |
151 | ||
152 | void Worker::set_action_complete(action_id_t id) { | |
153 | m_replayer.set_action_complete(id); | |
154 | } | |
155 | ||
156 | bool Worker::readonly() const { | |
157 | return m_replayer.readonly(); | |
158 | } | |
159 | ||
160 | rbd_loc Worker::map_image_name(string image_name, string snap_name) const { | |
161 | return m_replayer.image_name_map().map(rbd_loc("", image_name, snap_name)); | |
162 | } | |
163 | ||
164 | ||
165 | Replayer::Replayer(int num_action_trackers) | |
31f18b77 FG |
166 | : m_rbd(NULL), m_ioctx(0), |
167 | m_latency_multiplier(1.0), | |
7c673cae FG |
168 | m_readonly(false), m_dump_perf_counters(false), |
169 | m_num_action_trackers(num_action_trackers), | |
170 | m_action_trackers(new action_tracker_d[m_num_action_trackers]) { | |
171 | assertf(num_action_trackers > 0, "num_action_trackers = %d", num_action_trackers); | |
172 | } | |
173 | ||
174 | Replayer::~Replayer() { | |
175 | delete[] m_action_trackers; | |
176 | } | |
177 | ||
178 | Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) { | |
179 | return m_action_trackers[id % m_num_action_trackers]; | |
180 | } | |
181 | ||
182 | void Replayer::run(const std::string& replay_file) { | |
183 | { | |
184 | librados::Rados rados; | |
185 | rados.init(NULL); | |
186 | int r = rados.init_with_context(g_ceph_context); | |
187 | if (r) { | |
188 | cerr << "Failed to initialize RADOS: " << cpp_strerror(r) << std::endl; | |
189 | goto out; | |
190 | } | |
191 | r = rados.connect(); | |
192 | if (r) { | |
193 | cerr << "Failed to connect to cluster: " << cpp_strerror(r) << std::endl; | |
194 | goto out; | |
195 | } | |
31f18b77 FG |
196 | |
197 | if (m_pool_name.empty()) { | |
198 | r = rados.conf_get("rbd_default_pool", m_pool_name); | |
199 | if (r < 0) { | |
200 | cerr << "Failed to retrieve default pool: " << cpp_strerror(r) | |
201 | << std::endl; | |
202 | goto out; | |
203 | } | |
204 | } | |
205 | ||
7c673cae FG |
206 | m_ioctx = new librados::IoCtx(); |
207 | { | |
208 | r = rados.ioctx_create(m_pool_name.c_str(), *m_ioctx); | |
209 | if (r) { | |
210 | cerr << "Failed to open pool " << m_pool_name << ": " | |
211 | << cpp_strerror(r) << std::endl; | |
212 | goto out2; | |
213 | } | |
214 | m_rbd = new librbd::RBD(); | |
215 | map<thread_id_t, Worker*> workers; | |
216 | ||
217 | int fd = open(replay_file.c_str(), O_RDONLY); | |
218 | if (fd < 0) { | |
219 | std::cerr << "Failed to open " << replay_file << ": " | |
220 | << cpp_strerror(errno) << std::endl; | |
221 | exit(1); | |
222 | } | |
11fdf7f2 | 223 | auto close_fd = make_scope_guard([fd] { close(fd); }); |
7c673cae FG |
224 | |
225 | BufferReader buffer_reader(fd); | |
226 | bool versioned = is_versioned_replay(buffer_reader); | |
227 | while (true) { | |
228 | action::ActionEntry action_entry; | |
229 | try { | |
11fdf7f2 | 230 | bufferlist::const_iterator *it; |
7c673cae FG |
231 | int r = buffer_reader.fetch(&it); |
232 | if (r < 0) { | |
233 | std::cerr << "Failed to read from trace file: " << cpp_strerror(r) | |
234 | << std::endl; | |
235 | exit(-r); | |
236 | } | |
237 | if (it->get_remaining() == 0) { | |
238 | break; | |
239 | } | |
240 | ||
241 | if (versioned) { | |
242 | action_entry.decode(*it); | |
243 | } else { | |
244 | action_entry.decode_unversioned(*it); | |
245 | } | |
246 | } catch (const buffer::error &err) { | |
247 | std::cerr << "Failed to decode trace action: " << err.what() << std::endl; | |
248 | exit(1); | |
249 | } | |
250 | ||
251 | Action::ptr action = Action::construct(action_entry); | |
252 | if (!action) { | |
253 | // unknown / unsupported action | |
254 | continue; | |
255 | } | |
256 | ||
257 | if (action->is_start_thread()) { | |
258 | Worker *worker = new Worker(*this); | |
259 | workers[action->thread_id()] = worker; | |
260 | worker->start(); | |
261 | } else { | |
262 | workers[action->thread_id()]->send(action); | |
263 | } | |
264 | } | |
265 | ||
266 | dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl; | |
267 | pair<thread_id_t, Worker*> w; | |
268 | BOOST_FOREACH(w, workers) { | |
269 | w.second->join(); | |
270 | delete w.second; | |
271 | } | |
272 | clear_images(); | |
273 | delete m_rbd; | |
274 | m_rbd = NULL; | |
275 | } | |
276 | out2: | |
277 | delete m_ioctx; | |
278 | m_ioctx = NULL; | |
279 | } | |
280 | out: | |
281 | ; | |
282 | } | |
283 | ||
284 | ||
285 | librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) { | |
11fdf7f2 | 286 | std::scoped_lock lock{m_images_mutex}; |
7c673cae FG |
287 | return m_images[imagectx_id]; |
288 | } | |
289 | ||
290 | void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) { | |
11fdf7f2 TL |
291 | ceph_assert(image); |
292 | std::unique_lock lock{m_images_mutex}; | |
293 | ceph_assert(m_images.count(imagectx_id) == 0); | |
7c673cae FG |
294 | m_images[imagectx_id] = image; |
295 | } | |
296 | ||
297 | void Replayer::erase_image(imagectx_id_t imagectx_id) { | |
11fdf7f2 | 298 | std::unique_lock lock{m_images_mutex}; |
7c673cae FG |
299 | librbd::Image* image = m_images[imagectx_id]; |
300 | if (m_dump_perf_counters) { | |
301 | string command = "perf dump"; | |
302 | cmdmap_t cmdmap; | |
9f95a23c | 303 | JSONFormatter jf(true); |
7c673cae | 304 | bufferlist out; |
9f95a23c TL |
305 | stringstream ss; |
306 | g_ceph_context->do_command(command, cmdmap, &jf, ss, &out); | |
307 | jf.flush(cout); | |
7c673cae FG |
308 | cout << std::endl; |
309 | cout.flush(); | |
310 | } | |
311 | delete image; | |
312 | m_images.erase(imagectx_id); | |
313 | } | |
314 | ||
315 | void Replayer::set_action_complete(action_id_t id) { | |
316 | dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl; | |
11fdf7f2 | 317 | auto now = std::chrono::system_clock::now(); |
7c673cae | 318 | action_tracker_d &tracker = tracker_for(id); |
11fdf7f2 TL |
319 | std::unique_lock lock{tracker.mutex}; |
320 | ceph_assert(tracker.actions.count(id) == 0); | |
7c673cae FG |
321 | tracker.actions[id] = now; |
322 | tracker.condition.notify_all(); | |
323 | } | |
324 | ||
325 | bool Replayer::is_action_complete(action_id_t id) { | |
326 | action_tracker_d &tracker = tracker_for(id); | |
11fdf7f2 | 327 | std::shared_lock lock{tracker.mutex}; |
7c673cae FG |
328 | return tracker.actions.count(id) > 0; |
329 | } | |
330 | ||
331 | void Replayer::wait_for_actions(const action::Dependencies &deps) { | |
11fdf7f2 TL |
332 | auto release_time = std::chrono::time_point<std::chrono::system_clock>::min(); |
333 | for(auto& dep : deps) { | |
7c673cae | 334 | dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl; |
11fdf7f2 | 335 | auto start_time = std::chrono::system_clock::now(); |
7c673cae | 336 | action_tracker_d &tracker = tracker_for(dep.id); |
11fdf7f2 | 337 | std::unique_lock lock{tracker.mutex}; |
7c673cae FG |
338 | bool first_time = true; |
339 | while (tracker.actions.count(dep.id) == 0) { | |
340 | if (!first_time) { | |
341 | dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl; | |
342 | } | |
11fdf7f2 | 343 | tracker.condition.wait_for(lock, std::chrono::seconds(1)); |
7c673cae FG |
344 | first_time = false; |
345 | } | |
11fdf7f2 | 346 | auto action_completed_time(tracker.actions[dep.id]); |
7c673cae | 347 | lock.unlock(); |
11fdf7f2 TL |
348 | auto end_time = std::chrono::system_clock::now(); |
349 | auto micros = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count(); | |
7c673cae FG |
350 | dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl; |
351 | // Apparently the nanoseconds constructor is optional: | |
352 | // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options | |
11fdf7f2 TL |
353 | auto sub_release_time{action_completed_time + |
354 | std::chrono::microseconds{static_cast<long long>(dep.time_delta * m_latency_multiplier / 1000)}}; | |
7c673cae FG |
355 | if (sub_release_time > release_time) { |
356 | release_time = sub_release_time; | |
357 | } | |
358 | } | |
11fdf7f2 TL |
359 | if (release_time > std::chrono::system_clock::now()) { |
360 | auto sleep_for = release_time - std::chrono::system_clock::now(); | |
361 | dout(SLEEP_LEVEL) << "Sleeping for " | |
362 | << std::chrono::duration_cast<std::chrono::microseconds>(sleep_for).count() | |
363 | << " microseconds" << dendl; | |
364 | std::this_thread::sleep_until(release_time); | |
7c673cae FG |
365 | } |
366 | } | |
367 | ||
368 | void Replayer::clear_images() { | |
11fdf7f2 | 369 | std::shared_lock lock{m_images_mutex}; |
7c673cae FG |
370 | if (m_dump_perf_counters && !m_images.empty()) { |
371 | string command = "perf dump"; | |
372 | cmdmap_t cmdmap; | |
9f95a23c | 373 | JSONFormatter jf(true); |
7c673cae | 374 | bufferlist out; |
9f95a23c TL |
375 | stringstream ss; |
376 | g_ceph_context->do_command(command, cmdmap, &jf, ss, &out); | |
377 | jf.flush(cout); | |
7c673cae | 378 | cout << std::endl; |
9f95a23c | 379 | cout.flush(); |
7c673cae | 380 | } |
11fdf7f2 | 381 | for (auto& p : m_images) { |
7c673cae FG |
382 | delete p.second; |
383 | } | |
384 | m_images.clear(); | |
385 | } | |
386 | ||
387 | void Replayer::set_latency_multiplier(float f) { | |
388 | assertf(f >= 0, "f = %f", f); | |
389 | m_latency_multiplier = f; | |
390 | } | |
391 | ||
392 | bool Replayer::readonly() const { | |
393 | return m_readonly; | |
394 | } | |
395 | ||
396 | void Replayer::set_readonly(bool readonly) { | |
397 | m_readonly = readonly; | |
398 | } | |
399 | ||
400 | string Replayer::pool_name() const { | |
401 | return m_pool_name; | |
402 | } | |
403 | ||
404 | void Replayer::set_pool_name(string pool_name) { | |
405 | m_pool_name = pool_name; | |
406 | } |