]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "Replayer.hpp" | |
16 | #include "common/errno.h" | |
17 | #include "rbd_replay/ActionTypes.h" | |
18 | #include "rbd_replay/BufferReader.h" | |
19 | #include <boost/foreach.hpp> | |
20 | #include <boost/thread/thread.hpp> | |
21 | #include <boost/scope_exit.hpp> | |
22 | #include <fstream> | |
23 | #include "global/global_context.h" | |
24 | #include "rbd_replay_debug.hpp" | |
25 | ||
26 | #define dout_context g_ceph_context | |
27 | ||
28 | using namespace std; | |
29 | using namespace rbd_replay; | |
30 | ||
31 | namespace { | |
32 | ||
33 | bool is_versioned_replay(BufferReader &buffer_reader) { | |
34 | bufferlist::iterator *it; | |
35 | int r = buffer_reader.fetch(&it); | |
36 | if (r < 0) { | |
37 | return false; | |
38 | } | |
39 | ||
40 | if (it->get_remaining() < action::BANNER.size()) { | |
41 | return false; | |
42 | } | |
43 | ||
44 | std::string banner; | |
45 | it->copy(action::BANNER.size(), banner); | |
46 | bool versioned = (banner == action::BANNER); | |
47 | if (!versioned) { | |
48 | it->seek(0); | |
49 | } | |
50 | return versioned; | |
51 | } | |
52 | ||
53 | } // anonymous namespace | |
54 | ||
55 | Worker::Worker(Replayer &replayer) | |
56 | : m_replayer(replayer), | |
57 | m_buffer(100), | |
58 | m_done(false) { | |
59 | } | |
60 | ||
61 | void Worker::start() { | |
62 | m_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&Worker::run, this))); | |
63 | } | |
64 | ||
65 | // Should only be called by StopThreadAction | |
66 | void Worker::stop() { | |
67 | m_done = true; | |
68 | } | |
69 | ||
70 | void Worker::join() { | |
71 | m_thread->join(); | |
72 | } | |
73 | ||
74 | void Worker::send(Action::ptr action) { | |
75 | assert(action); | |
76 | m_buffer.push_front(action); | |
77 | } | |
78 | ||
79 | void Worker::add_pending(PendingIO::ptr io) { | |
80 | assert(io); | |
81 | boost::mutex::scoped_lock lock(m_pending_ios_mutex); | |
82 | assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id()); | |
83 | m_pending_ios[io->id()] = io; | |
84 | } | |
85 | ||
86 | void Worker::run() { | |
87 | dout(THREAD_LEVEL) << "Worker thread started" << dendl; | |
88 | while (!m_done) { | |
89 | Action::ptr action; | |
90 | m_buffer.pop_back(&action); | |
91 | m_replayer.wait_for_actions(action->predecessors()); | |
92 | action->perform(*this); | |
93 | m_replayer.set_action_complete(action->id()); | |
94 | } | |
95 | { | |
96 | boost::mutex::scoped_lock lock(m_pending_ios_mutex); | |
97 | bool first_time = true; | |
98 | while (!m_pending_ios.empty()) { | |
99 | if (!first_time) { | |
100 | dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl; | |
101 | pair<action_id_t, PendingIO::ptr> p; | |
102 | BOOST_FOREACH(p, m_pending_ios) { | |
103 | dout(THREAD_LEVEL) << "> " << p.first << dendl; | |
104 | } | |
105 | } | |
106 | m_pending_ios_empty.timed_wait(lock, boost::posix_time::seconds(1)); | |
107 | first_time = false; | |
108 | } | |
109 | } | |
110 | dout(THREAD_LEVEL) << "Worker thread stopped" << dendl; | |
111 | } | |
112 | ||
113 | ||
114 | void Worker::remove_pending(PendingIO::ptr io) { | |
115 | assert(io); | |
116 | m_replayer.set_action_complete(io->id()); | |
117 | boost::mutex::scoped_lock lock(m_pending_ios_mutex); | |
118 | size_t num_erased = m_pending_ios.erase(io->id()); | |
119 | assertf(num_erased == 1, "id = %d", io->id()); | |
120 | if (m_pending_ios.empty()) { | |
121 | m_pending_ios_empty.notify_all(); | |
122 | } | |
123 | } | |
124 | ||
125 | ||
126 | librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) { | |
127 | return m_replayer.get_image(imagectx_id); | |
128 | } | |
129 | ||
130 | ||
131 | void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) { | |
132 | assert(image); | |
133 | m_replayer.put_image(imagectx_id, image); | |
134 | } | |
135 | ||
136 | ||
137 | void Worker::erase_image(imagectx_id_t imagectx_id) { | |
138 | m_replayer.erase_image(imagectx_id); | |
139 | } | |
140 | ||
141 | ||
142 | librbd::RBD* Worker::rbd() { | |
143 | return m_replayer.get_rbd(); | |
144 | } | |
145 | ||
146 | ||
147 | librados::IoCtx* Worker::ioctx() { | |
148 | return m_replayer.get_ioctx(); | |
149 | } | |
150 | ||
151 | void Worker::set_action_complete(action_id_t id) { | |
152 | m_replayer.set_action_complete(id); | |
153 | } | |
154 | ||
155 | bool Worker::readonly() const { | |
156 | return m_replayer.readonly(); | |
157 | } | |
158 | ||
159 | rbd_loc Worker::map_image_name(string image_name, string snap_name) const { | |
160 | return m_replayer.image_name_map().map(rbd_loc("", image_name, snap_name)); | |
161 | } | |
162 | ||
163 | ||
164 | Replayer::Replayer(int num_action_trackers) | |
31f18b77 FG |
165 | : m_rbd(NULL), m_ioctx(0), |
166 | m_latency_multiplier(1.0), | |
7c673cae FG |
167 | m_readonly(false), m_dump_perf_counters(false), |
168 | m_num_action_trackers(num_action_trackers), | |
169 | m_action_trackers(new action_tracker_d[m_num_action_trackers]) { | |
170 | assertf(num_action_trackers > 0, "num_action_trackers = %d", num_action_trackers); | |
171 | } | |
172 | ||
173 | Replayer::~Replayer() { | |
174 | delete[] m_action_trackers; | |
175 | } | |
176 | ||
177 | Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) { | |
178 | return m_action_trackers[id % m_num_action_trackers]; | |
179 | } | |
180 | ||
181 | void Replayer::run(const std::string& replay_file) { | |
182 | { | |
183 | librados::Rados rados; | |
184 | rados.init(NULL); | |
185 | int r = rados.init_with_context(g_ceph_context); | |
186 | if (r) { | |
187 | cerr << "Failed to initialize RADOS: " << cpp_strerror(r) << std::endl; | |
188 | goto out; | |
189 | } | |
190 | r = rados.connect(); | |
191 | if (r) { | |
192 | cerr << "Failed to connect to cluster: " << cpp_strerror(r) << std::endl; | |
193 | goto out; | |
194 | } | |
31f18b77 FG |
195 | |
196 | if (m_pool_name.empty()) { | |
197 | r = rados.conf_get("rbd_default_pool", m_pool_name); | |
198 | if (r < 0) { | |
199 | cerr << "Failed to retrieve default pool: " << cpp_strerror(r) | |
200 | << std::endl; | |
201 | goto out; | |
202 | } | |
203 | } | |
204 | ||
7c673cae FG |
205 | m_ioctx = new librados::IoCtx(); |
206 | { | |
207 | r = rados.ioctx_create(m_pool_name.c_str(), *m_ioctx); | |
208 | if (r) { | |
209 | cerr << "Failed to open pool " << m_pool_name << ": " | |
210 | << cpp_strerror(r) << std::endl; | |
211 | goto out2; | |
212 | } | |
213 | m_rbd = new librbd::RBD(); | |
214 | map<thread_id_t, Worker*> workers; | |
215 | ||
216 | int fd = open(replay_file.c_str(), O_RDONLY); | |
217 | if (fd < 0) { | |
218 | std::cerr << "Failed to open " << replay_file << ": " | |
219 | << cpp_strerror(errno) << std::endl; | |
220 | exit(1); | |
221 | } | |
222 | BOOST_SCOPE_EXIT( (fd) ) { | |
223 | close(fd); | |
224 | } BOOST_SCOPE_EXIT_END; | |
225 | ||
226 | BufferReader buffer_reader(fd); | |
227 | bool versioned = is_versioned_replay(buffer_reader); | |
228 | while (true) { | |
229 | action::ActionEntry action_entry; | |
230 | try { | |
231 | bufferlist::iterator *it; | |
232 | int r = buffer_reader.fetch(&it); | |
233 | if (r < 0) { | |
234 | std::cerr << "Failed to read from trace file: " << cpp_strerror(r) | |
235 | << std::endl; | |
236 | exit(-r); | |
237 | } | |
238 | if (it->get_remaining() == 0) { | |
239 | break; | |
240 | } | |
241 | ||
242 | if (versioned) { | |
243 | action_entry.decode(*it); | |
244 | } else { | |
245 | action_entry.decode_unversioned(*it); | |
246 | } | |
247 | } catch (const buffer::error &err) { | |
248 | std::cerr << "Failed to decode trace action: " << err.what() << std::endl; | |
249 | exit(1); | |
250 | } | |
251 | ||
252 | Action::ptr action = Action::construct(action_entry); | |
253 | if (!action) { | |
254 | // unknown / unsupported action | |
255 | continue; | |
256 | } | |
257 | ||
258 | if (action->is_start_thread()) { | |
259 | Worker *worker = new Worker(*this); | |
260 | workers[action->thread_id()] = worker; | |
261 | worker->start(); | |
262 | } else { | |
263 | workers[action->thread_id()]->send(action); | |
264 | } | |
265 | } | |
266 | ||
267 | dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl; | |
268 | pair<thread_id_t, Worker*> w; | |
269 | BOOST_FOREACH(w, workers) { | |
270 | w.second->join(); | |
271 | delete w.second; | |
272 | } | |
273 | clear_images(); | |
274 | delete m_rbd; | |
275 | m_rbd = NULL; | |
276 | } | |
277 | out2: | |
278 | delete m_ioctx; | |
279 | m_ioctx = NULL; | |
280 | } | |
281 | out: | |
282 | ; | |
283 | } | |
284 | ||
285 | ||
286 | librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) { | |
287 | boost::shared_lock<boost::shared_mutex> lock(m_images_mutex); | |
288 | return m_images[imagectx_id]; | |
289 | } | |
290 | ||
291 | void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) { | |
292 | assert(image); | |
293 | boost::unique_lock<boost::shared_mutex> lock(m_images_mutex); | |
294 | assert(m_images.count(imagectx_id) == 0); | |
295 | m_images[imagectx_id] = image; | |
296 | } | |
297 | ||
298 | void Replayer::erase_image(imagectx_id_t imagectx_id) { | |
299 | boost::unique_lock<boost::shared_mutex> lock(m_images_mutex); | |
300 | librbd::Image* image = m_images[imagectx_id]; | |
301 | if (m_dump_perf_counters) { | |
302 | string command = "perf dump"; | |
303 | cmdmap_t cmdmap; | |
304 | string format = "json-pretty"; | |
305 | bufferlist out; | |
306 | g_ceph_context->do_command(command, cmdmap, format, &out); | |
307 | out.write_stream(cout); | |
308 | cout << std::endl; | |
309 | cout.flush(); | |
310 | } | |
311 | delete image; | |
312 | m_images.erase(imagectx_id); | |
313 | } | |
314 | ||
315 | void Replayer::set_action_complete(action_id_t id) { | |
316 | dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl; | |
317 | boost::system_time now(boost::get_system_time()); | |
318 | action_tracker_d &tracker = tracker_for(id); | |
319 | boost::unique_lock<boost::shared_mutex> lock(tracker.mutex); | |
320 | assert(tracker.actions.count(id) == 0); | |
321 | tracker.actions[id] = now; | |
322 | tracker.condition.notify_all(); | |
323 | } | |
324 | ||
325 | bool Replayer::is_action_complete(action_id_t id) { | |
326 | action_tracker_d &tracker = tracker_for(id); | |
327 | boost::shared_lock<boost::shared_mutex> lock(tracker.mutex); | |
328 | return tracker.actions.count(id) > 0; | |
329 | } | |
330 | ||
331 | void Replayer::wait_for_actions(const action::Dependencies &deps) { | |
332 | boost::posix_time::ptime release_time(boost::posix_time::neg_infin); | |
333 | BOOST_FOREACH(const action::Dependency &dep, deps) { | |
334 | dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl; | |
335 | boost::system_time start_time(boost::get_system_time()); | |
336 | action_tracker_d &tracker = tracker_for(dep.id); | |
337 | boost::shared_lock<boost::shared_mutex> lock(tracker.mutex); | |
338 | bool first_time = true; | |
339 | while (tracker.actions.count(dep.id) == 0) { | |
340 | if (!first_time) { | |
341 | dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl; | |
342 | } | |
343 | tracker.condition.timed_wait(lock, boost::posix_time::seconds(1)); | |
344 | first_time = false; | |
345 | } | |
346 | boost::system_time action_completed_time(tracker.actions[dep.id]); | |
347 | lock.unlock(); | |
348 | boost::system_time end_time(boost::get_system_time()); | |
349 | long long micros = (end_time - start_time).total_microseconds(); | |
350 | dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl; | |
351 | // Apparently the nanoseconds constructor is optional: | |
352 | // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options | |
353 | boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000)); | |
354 | if (sub_release_time > release_time) { | |
355 | release_time = sub_release_time; | |
356 | } | |
357 | } | |
358 | if (release_time > boost::get_system_time()) { | |
359 | dout(SLEEP_LEVEL) << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << dendl; | |
360 | boost::this_thread::sleep(release_time); | |
361 | } | |
362 | } | |
363 | ||
364 | void Replayer::clear_images() { | |
365 | boost::unique_lock<boost::shared_mutex> lock(m_images_mutex); | |
366 | if (m_dump_perf_counters && !m_images.empty()) { | |
367 | string command = "perf dump"; | |
368 | cmdmap_t cmdmap; | |
369 | string format = "json-pretty"; | |
370 | bufferlist out; | |
371 | g_ceph_context->do_command(command, cmdmap, format, &out); | |
372 | out.write_stream(cout); | |
373 | cout << std::endl; | |
374 | cout.flush(); | |
375 | } | |
376 | pair<imagectx_id_t, librbd::Image*> p; | |
377 | BOOST_FOREACH(p, m_images) { | |
378 | delete p.second; | |
379 | } | |
380 | m_images.clear(); | |
381 | } | |
382 | ||
383 | void Replayer::set_latency_multiplier(float f) { | |
384 | assertf(f >= 0, "f = %f", f); | |
385 | m_latency_multiplier = f; | |
386 | } | |
387 | ||
388 | bool Replayer::readonly() const { | |
389 | return m_readonly; | |
390 | } | |
391 | ||
392 | void Replayer::set_readonly(bool readonly) { | |
393 | m_readonly = readonly; | |
394 | } | |
395 | ||
396 | string Replayer::pool_name() const { | |
397 | return m_pool_name; | |
398 | } | |
399 | ||
400 | void Replayer::set_pool_name(string pool_name) { | |
401 | m_pool_name = pool_name; | |
402 | } |