]> git.proxmox.com Git - ceph.git/blob - ceph/src/rbd_replay/rbd-replay-prep.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rbd_replay / rbd-replay-prep.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 // This code assumes that IO IDs and timestamps are related monotonically.
16 // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
17
18 #include "common/errno.h"
19 #include "rbd_replay/ActionTypes.h"
20 #include <babeltrace/babeltrace.h>
21 #include <babeltrace/ctf/events.h>
22 #include <babeltrace/ctf/iterator.h>
23 #include <sys/types.h>
24 #include <fcntl.h>
25 #include <cstdlib>
26 #include <string>
27 #include <assert.h>
28 #include <fstream>
29 #include <set>
30 #include <boost/thread/thread.hpp>
31 #include <boost/scope_exit.hpp>
32 #include "ios.hpp"
33
34 using namespace std;
35 using namespace rbd_replay;
36
37 #define ASSERT_EXIT(check, str) \
38 if (!(check)) { \
39 std::cerr << str << std::endl; \
40 exit(1); \
41 }
42
43 class Thread {
44 public:
45 typedef boost::shared_ptr<Thread> ptr;
46
47 Thread(thread_id_t id,
48 uint64_t window)
49 : m_id(id),
50 m_window(window),
51 m_latest_io(IO::ptr()),
52 m_max_ts(0) {
53 }
54
55 void insert_ts(uint64_t ts) {
56 if (m_max_ts == 0 || ts > m_max_ts) {
57 m_max_ts = ts;
58 }
59 }
60
61 uint64_t max_ts() const {
62 return m_max_ts;
63 }
64
65 void issued_io(IO::ptr io, std::set<IO::ptr> *latest_ios) {
66 assert(io);
67 if (m_latest_io.get() != NULL) {
68 latest_ios->erase(m_latest_io);
69 }
70 m_latest_io = io;
71 latest_ios->insert(io);
72 }
73
74 thread_id_t id() const {
75 return m_id;
76 }
77
78 IO::ptr latest_io() {
79 return m_latest_io;
80 }
81
82 private:
83 thread_id_t m_id;
84 uint64_t m_window;
85 IO::ptr m_latest_io;
86 uint64_t m_max_ts;
87 };
88
89 class AnonymizedImage {
90 public:
91 void init(string image_name, int index) {
92 assert(m_image_name == "");
93 m_image_name = image_name;
94 ostringstream oss;
95 oss << "image" << index;
96 m_anonymized_image_name = oss.str();
97 }
98
99 string image_name() const {
100 return m_image_name;
101 }
102
103 pair<string, string> anonymize(string snap_name) {
104 if (snap_name == "") {
105 return pair<string, string>(m_anonymized_image_name, "");
106 }
107 string& anonymized_snap_name(m_snaps[snap_name]);
108 if (anonymized_snap_name == "") {
109 ostringstream oss;
110 oss << "snap" << m_snaps.size();
111 anonymized_snap_name = oss.str();
112 }
113 return pair<string, string>(m_anonymized_image_name, anonymized_snap_name);
114 }
115
116 private:
117 string m_image_name;
118 string m_anonymized_image_name;
119 map<string, string> m_snaps;
120 };
121
122 static void usage(string prog) {
123 std::stringstream str;
124 str << "Usage: " << prog << " ";
125 std::cout << str.str() << "[ --window <seconds> ] [ --anonymize ] [ --verbose ]" << std::endl
126 << std::string(str.str().size(), ' ') << "<trace-input> <replay-output>" << endl;
127 }
128
129 __attribute__((noreturn)) static void usage_exit(string prog, string msg) {
130 cerr << msg << endl;
131 usage(prog);
132 exit(1);
133 }
134
135 class Processor {
136 public:
137 Processor()
138 : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second
139 m_io_count(0),
140 m_anonymize(false),
141 m_verbose(false) {
142 }
143
144 void run(vector<string> args) {
145 string input_file_name;
146 string output_file_name;
147 bool got_input = false;
148 bool got_output = false;
149 for (int i = 1, nargs = args.size(); i < nargs; i++) {
150 const string& arg(args[i]);
151 if (arg == "--window") {
152 if (i == nargs - 1) {
153 usage_exit(args[0], "--window requires an argument");
154 }
155 m_window = (uint64_t)(1e9 * atof(args[++i].c_str()));
156 } else if (arg.compare(0, 9, "--window=") == 0) {
157 m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window=")));
158 } else if (arg == "--anonymize") {
159 m_anonymize = true;
160 } else if (arg == "--verbose") {
161 m_verbose = true;
162 } else if (arg == "-h" || arg == "--help") {
163 usage(args[0]);
164 exit(0);
165 } else if (arg.compare(0, 1, "-") == 0) {
166 usage_exit(args[0], "Unrecognized argument: " + arg);
167 } else if (!got_input) {
168 input_file_name = arg;
169 got_input = true;
170 } else if (!got_output) {
171 output_file_name = arg;
172 got_output = true;
173 } else {
174 usage_exit(args[0], "Too many arguments");
175 }
176 }
177 if (!got_output) {
178 usage_exit(args[0], "Not enough arguments");
179 }
180
181 struct bt_context *ctx = bt_context_create();
182 int trace_handle = bt_context_add_trace(ctx,
183 input_file_name.c_str(), // path
184 "ctf", // format
185 NULL, // packet_seek
186 NULL, // stream_list
187 NULL); // metadata
188 ASSERT_EXIT(trace_handle >= 0, "Error loading trace file");
189
190 uint64_t start_time_ns = bt_trace_handle_get_timestamp_begin(ctx, trace_handle, BT_CLOCK_REAL);
191 ASSERT_EXIT(start_time_ns != -1ULL,
192 "Error extracting creation time from trace");
193
194 struct bt_ctf_iter *itr = bt_ctf_iter_create(ctx,
195 NULL, // begin_pos
196 NULL); // end_pos
197 assert(itr);
198
199 struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
200
201 int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644);
202 ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name <<
203 ": " << cpp_strerror(errno));
204 BOOST_SCOPE_EXIT( (fd) ) {
205 close(fd);
206 } BOOST_SCOPE_EXIT_END;
207
208 write_banner(fd);
209
210 uint64_t trace_start = 0;
211 bool first = true;
212 while(true) {
213 struct bt_ctf_event *evt = bt_ctf_iter_read_event(itr);
214 if(!evt) {
215 break;
216 }
217 uint64_t ts = bt_ctf_get_timestamp(evt);
218 ASSERT_EXIT(ts != -1ULL, "Error extracting event timestamp");
219
220 if (first) {
221 trace_start = ts;
222 first = false;
223 }
224 ts -= trace_start;
225 ts += 4; // This is so we have room to insert two events (thread start and open image) at unique timestamps before whatever the first event is.
226
227 IO::ptrs ptrs;
228 process_event(ts, evt, &ptrs);
229 serialize_events(fd, ptrs);
230
231 int r = bt_iter_next(bt_itr);
232 ASSERT_EXIT(r == 0, "Error advancing event iterator");
233 }
234
235 bt_ctf_iter_destroy(itr);
236
237 insert_thread_stops(fd);
238 }
239
240 private:
241 void write_banner(int fd) {
242 bufferlist bl;
243 bl.append(rbd_replay::action::BANNER);
244 int r = bl.write_fd(fd);
245 ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
246 }
247
248 void serialize_events(int fd, const IO::ptrs &ptrs) {
249 for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) {
250 IO::ptr io(*it);
251
252 bufferlist bl;
253 io->encode(bl);
254
255 int r = bl.write_fd(fd);
256 ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
257
258 if (m_verbose) {
259 io->write_debug(std::cout);
260 std::cout << std::endl;
261 }
262 }
263 }
264
265 void insert_thread_stops(int fd) {
266 IO::ptrs ios;
267 for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(),
268 end = m_threads.end(); itr != end; ++itr) {
269 Thread::ptr thread(itr->second);
270 ios.push_back(IO::ptr(new StopThreadIO(next_id(), thread->max_ts(),
271 thread->id(),
272 m_recent_completions)));
273 }
274 serialize_events(fd, ios);
275 }
276
277 void process_event(uint64_t ts, struct bt_ctf_event *evt,
278 IO::ptrs *ios) {
279 const char *event_name = bt_ctf_event_name(evt);
280 const struct bt_definition *scope_context = bt_ctf_get_top_level_scope(evt,
281 BT_STREAM_EVENT_CONTEXT);
282 ASSERT_EXIT(scope_context != NULL, "Error retrieving event context");
283
284 const struct bt_definition *scope_fields = bt_ctf_get_top_level_scope(evt,
285 BT_EVENT_FIELDS);
286 ASSERT_EXIT(scope_fields != NULL, "Error retrieving event fields");
287
288 const struct bt_definition *pthread_id_field = bt_ctf_get_field(evt, scope_context, "pthread_id");
289 ASSERT_EXIT(pthread_id_field != NULL, "Error retrieving thread id");
290
291 thread_id_t threadID = bt_ctf_get_uint64(pthread_id_field);
292 Thread::ptr &thread(m_threads[threadID]);
293 if (!thread) {
294 thread.reset(new Thread(threadID, m_window));
295 IO::ptr io(new StartThreadIO(next_id(), ts - 4, threadID));
296 ios->push_back(io);
297 }
298 thread->insert_ts(ts);
299
300 class FieldLookup {
301 public:
302 FieldLookup(struct bt_ctf_event *evt,
303 const struct bt_definition *scope)
304 : m_evt(evt),
305 m_scope(scope) {
306 }
307
308 const char* string(const char* name) {
309 const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
310 ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
311
312 const char* c = bt_ctf_get_string(field);
313 int err = bt_ctf_field_get_error();
314 ASSERT_EXIT(c && err == 0, "Error retrieving field value '" << name <<
315 "': error=" << err);
316 return c;
317 }
318
319 int64_t int64(const char* name) {
320 const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
321 ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
322
323 int64_t val = bt_ctf_get_int64(field);
324 int err = bt_ctf_field_get_error();
325 ASSERT_EXIT(err == 0, "Error retrieving field value '" << name <<
326 "': error=" << err);
327 return val;
328 }
329
330 uint64_t uint64(const char* name) {
331 const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
332 ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
333
334 uint64_t val = bt_ctf_get_uint64(field);
335 int err = bt_ctf_field_get_error();
336 ASSERT_EXIT(err == 0, "Error retrieving field value '" << name <<
337 "': error=" << err);
338 return val;
339 }
340
341 private:
342 struct bt_ctf_event *m_evt;
343 const struct bt_definition *m_scope;
344 } fields(evt, scope_fields);
345
346 if (strcmp(event_name, "librbd:open_image_enter") == 0) {
347 string name(fields.string("name"));
348 string snap_name(fields.string("snap_name"));
349 bool readonly = fields.uint64("read_only");
350 imagectx_id_t imagectx = fields.uint64("imagectx");
351 action_id_t ionum = next_id();
352 pair<string, string> aname(map_image_snap(name, snap_name));
353 IO::ptr io(new OpenImageIO(ionum, ts, threadID, m_recent_completions,
354 imagectx, aname.first, aname.second,
355 readonly));
356 thread->issued_io(io, &m_latest_ios);
357 ios->push_back(io);
358 } else if (strcmp(event_name, "librbd:open_image_exit") == 0) {
359 completed(thread->latest_io());
360 boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->latest_io()));
361 assert(io);
362 m_open_images.insert(io->imagectx());
363 } else if (strcmp(event_name, "librbd:close_image_enter") == 0) {
364 imagectx_id_t imagectx = fields.uint64("imagectx");
365 action_id_t ionum = next_id();
366 IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions,
367 imagectx));
368 thread->issued_io(io, &m_latest_ios);
369 ios->push_back(thread->latest_io());
370 } else if (strcmp(event_name, "librbd:close_image_exit") == 0) {
371 completed(thread->latest_io());
372 boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->latest_io()));
373 assert(io);
374 m_open_images.erase(io->imagectx());
375 } else if (strcmp(event_name, "librbd:aio_open_image_enter") == 0) {
376 string name(fields.string("name"));
377 string snap_name(fields.string("snap_name"));
378 bool readonly = fields.uint64("read_only");
379 imagectx_id_t imagectx = fields.uint64("imagectx");
380 uint64_t completion = fields.uint64("completion");
381 action_id_t ionum = next_id();
382 pair<string, string> aname(map_image_snap(name, snap_name));
383 IO::ptr io(new AioOpenImageIO(ionum, ts, threadID, m_recent_completions,
384 imagectx, aname.first, aname.second,
385 readonly));
386 thread->issued_io(io, &m_latest_ios);
387 ios->push_back(io);
388 m_pending_ios[completion] = io;
389 } else if (strcmp(event_name, "librbd:aio_close_image_enter") == 0) {
390 imagectx_id_t imagectx = fields.uint64("imagectx");
391 uint64_t completion = fields.uint64("completion");
392 action_id_t ionum = next_id();
393 IO::ptr io(new AioCloseImageIO(ionum, ts, threadID, m_recent_completions,
394 imagectx));
395 thread->issued_io(io, &m_latest_ios);
396 ios->push_back(thread->latest_io());
397 m_pending_ios[completion] = io;
398 } else if (strcmp(event_name, "librbd:read_enter") == 0 ||
399 strcmp(event_name, "librbd:read2_enter") == 0) {
400 string name(fields.string("name"));
401 string snap_name(fields.string("snap_name"));
402 bool readonly = fields.int64("read_only");
403 imagectx_id_t imagectx = fields.uint64("imagectx");
404 uint64_t offset = fields.uint64("offset");
405 uint64_t length = fields.uint64("length");
406 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
407 action_id_t ionum = next_id();
408 IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx,
409 offset, length));
410 thread->issued_io(io, &m_latest_ios);
411 ios->push_back(io);
412 } else if (strcmp(event_name, "librbd:read_exit") == 0) {
413 completed(thread->latest_io());
414 } else if (strcmp(event_name, "librbd:write_enter") == 0 ||
415 strcmp(event_name, "librbd:write2_enter") == 0) {
416 string name(fields.string("name"));
417 string snap_name(fields.string("snap_name"));
418 bool readonly = fields.int64("read_only");
419 uint64_t offset = fields.uint64("off");
420 uint64_t length = fields.uint64("buf_len");
421 imagectx_id_t imagectx = fields.uint64("imagectx");
422 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
423 action_id_t ionum = next_id();
424 IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions,
425 imagectx, offset, length));
426 thread->issued_io(io, &m_latest_ios);
427 ios->push_back(io);
428 } else if (strcmp(event_name, "librbd:write_exit") == 0) {
429 completed(thread->latest_io());
430 } else if (strcmp(event_name, "librbd:discard_enter") == 0) {
431 string name(fields.string("name"));
432 string snap_name(fields.string("snap_name"));
433 bool readonly = fields.int64("read_only");
434 uint64_t offset = fields.uint64("off");
435 uint64_t length = fields.uint64("len");
436 imagectx_id_t imagectx = fields.uint64("imagectx");
437 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
438 action_id_t ionum = next_id();
439 IO::ptr io(new DiscardIO(ionum, ts, threadID, m_recent_completions,
440 imagectx, offset, length));
441 thread->issued_io(io, &m_latest_ios);
442 ios->push_back(io);
443 } else if (strcmp(event_name, "librbd:discard_exit") == 0) {
444 completed(thread->latest_io());
445 } else if (strcmp(event_name, "librbd:aio_read_enter") == 0 ||
446 strcmp(event_name, "librbd:aio_read2_enter") == 0) {
447 string name(fields.string("name"));
448 string snap_name(fields.string("snap_name"));
449 bool readonly = fields.int64("read_only");
450 uint64_t completion = fields.uint64("completion");
451 imagectx_id_t imagectx = fields.uint64("imagectx");
452 uint64_t offset = fields.uint64("offset");
453 uint64_t length = fields.uint64("length");
454 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
455 action_id_t ionum = next_id();
456 IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions,
457 imagectx, offset, length));
458 ios->push_back(io);
459 thread->issued_io(io, &m_latest_ios);
460 m_pending_ios[completion] = io;
461 } else if (strcmp(event_name, "librbd:aio_write_enter") == 0 ||
462 strcmp(event_name, "librbd:aio_write2_enter") == 0) {
463 string name(fields.string("name"));
464 string snap_name(fields.string("snap_name"));
465 bool readonly = fields.int64("read_only");
466 uint64_t offset = fields.uint64("off");
467 uint64_t length = fields.uint64("len");
468 uint64_t completion = fields.uint64("completion");
469 imagectx_id_t imagectx = fields.uint64("imagectx");
470 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
471 action_id_t ionum = next_id();
472 IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions,
473 imagectx, offset, length));
474 thread->issued_io(io, &m_latest_ios);
475 ios->push_back(io);
476 m_pending_ios[completion] = io;
477 } else if (strcmp(event_name, "librbd:aio_discard_enter") == 0) {
478 string name(fields.string("name"));
479 string snap_name(fields.string("snap_name"));
480 bool readonly = fields.int64("read_only");
481 uint64_t offset = fields.uint64("off");
482 uint64_t length = fields.uint64("len");
483 uint64_t completion = fields.uint64("completion");
484 imagectx_id_t imagectx = fields.uint64("imagectx");
485 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
486 action_id_t ionum = next_id();
487 IO::ptr io(new AioDiscardIO(ionum, ts, threadID, m_recent_completions,
488 imagectx, offset, length));
489 thread->issued_io(io, &m_latest_ios);
490 ios->push_back(io);
491 m_pending_ios[completion] = io;
492 } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) {
493 uint64_t completion = fields.uint64("completion");
494 map<uint64_t, IO::ptr>::iterator itr = m_pending_ios.find(completion);
495 if (itr != m_pending_ios.end()) {
496 IO::ptr completedIO(itr->second);
497 m_pending_ios.erase(itr);
498 completed(completedIO);
499 }
500 }
501 }
502
503 action_id_t next_id() {
504 action_id_t id = m_io_count;
505 m_io_count += 2;
506 return id;
507 }
508
509 void completed(IO::ptr io) {
510 uint64_t limit = (io->start_time() < m_window ?
511 0 : io->start_time() - m_window);
512 for (io_set_t::iterator itr = m_recent_completions.begin();
513 itr != m_recent_completions.end(); ) {
514 IO::ptr recent_comp(*itr);
515 if ((recent_comp->start_time() < limit ||
516 io->dependencies().count(recent_comp) != 0) &&
517 m_latest_ios.count(recent_comp) == 0) {
518 m_recent_completions.erase(itr++);
519 } else {
520 ++itr;
521 }
522 }
523 m_recent_completions.insert(io);
524 }
525
526 pair<string, string> map_image_snap(string image_name, string snap_name) {
527 if (!m_anonymize) {
528 return pair<string, string>(image_name, snap_name);
529 }
530 AnonymizedImage& m(m_anonymized_images[image_name]);
531 if (m.image_name() == "") {
532 m.init(image_name, m_anonymized_images.size());
533 }
534 return m.anonymize(snap_name);
535 }
536
537 void require_image(uint64_t ts,
538 Thread::ptr thread,
539 imagectx_id_t imagectx,
540 const string& name,
541 const string& snap_name,
542 bool readonly,
543 IO::ptrs *ios) {
544 assert(thread);
545 if (m_open_images.count(imagectx) > 0) {
546 return;
547 }
548 action_id_t ionum = next_id();
549 pair<string, string> aname(map_image_snap(name, snap_name));
550 IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(),
551 m_recent_completions, imagectx, aname.first,
552 aname.second, readonly));
553 thread->issued_io(io, &m_latest_ios);
554 ios->push_back(io);
555 completed(io);
556 m_open_images.insert(imagectx);
557 }
558
559 uint64_t m_window;
560 map<thread_id_t, Thread::ptr> m_threads;
561 uint32_t m_io_count;
562 io_set_t m_recent_completions;
563 set<imagectx_id_t> m_open_images;
564
565 // keyed by completion
566 map<uint64_t, IO::ptr> m_pending_ios;
567 std::set<IO::ptr> m_latest_ios;
568
569 bool m_anonymize;
570 map<string, AnonymizedImage> m_anonymized_images;
571
572 bool m_verbose;
573 };
574
575 int main(int argc, char** argv) {
576 vector<string> args;
577 for (int i = 0; i < argc; i++) {
578 args.push_back(string(argv[i]));
579 }
580
581 Processor p;
582 p.run(args);
583 }