]> git.proxmox.com Git - ceph.git/blob - ceph/src/rbd_replay/rbd-replay-prep.cc
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / rbd_replay / rbd-replay-prep.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 // This code assumes that IO IDs and timestamps are related monotonically.
16 // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
17
18 #include "include/compat.h"
19 #include "common/errno.h"
20 #include "rbd_replay/ActionTypes.h"
21 #include <babeltrace/babeltrace.h>
22 #include <babeltrace/ctf/events.h>
23 #include <babeltrace/ctf/iterator.h>
24 #include <sys/types.h>
25 #include <fcntl.h>
26 #include <cstdlib>
27 #include <string>
28 #include <assert.h>
29 #include <fstream>
30 #include <set>
31 #include <boost/thread/thread.hpp>
32 #include <boost/scope_exit.hpp>
33 #include "ios.hpp"
34
35 using namespace std;
36 using namespace rbd_replay;
37
38 #define ASSERT_EXIT(check, str) \
39 if (!(check)) { \
40 std::cerr << str << std::endl; \
41 exit(1); \
42 }
43
44 class Thread {
45 public:
46 typedef boost::shared_ptr<Thread> ptr;
47
48 Thread(thread_id_t id,
49 uint64_t window)
50 : m_id(id),
51 m_window(window),
52 m_latest_io(IO::ptr()),
53 m_max_ts(0) {
54 }
55
56 void insert_ts(uint64_t ts) {
57 if (m_max_ts == 0 || ts > m_max_ts) {
58 m_max_ts = ts;
59 }
60 }
61
62 uint64_t max_ts() const {
63 return m_max_ts;
64 }
65
66 void issued_io(IO::ptr io, std::set<IO::ptr> *latest_ios) {
67 ceph_assert(io);
68 if (m_latest_io.get() != NULL) {
69 latest_ios->erase(m_latest_io);
70 }
71 m_latest_io = io;
72 latest_ios->insert(io);
73 }
74
75 thread_id_t id() const {
76 return m_id;
77 }
78
79 IO::ptr latest_io() {
80 return m_latest_io;
81 }
82
83 private:
84 thread_id_t m_id;
85 uint64_t m_window;
86 IO::ptr m_latest_io;
87 uint64_t m_max_ts;
88 };
89
90 class AnonymizedImage {
91 public:
92 void init(const string &image_name, int index) {
93 ceph_assert(m_image_name == "");
94 m_image_name = image_name;
95 ostringstream oss;
96 oss << "image" << index;
97 m_anonymized_image_name = oss.str();
98 }
99
100 string image_name() const {
101 return m_image_name;
102 }
103
104 pair<string, string> anonymize(string snap_name) {
105 if (snap_name == "") {
106 return pair<string, string>(m_anonymized_image_name, "");
107 }
108 string& anonymized_snap_name(m_snaps[snap_name]);
109 if (anonymized_snap_name == "") {
110 ostringstream oss;
111 oss << "snap" << m_snaps.size();
112 anonymized_snap_name = oss.str();
113 }
114 return pair<string, string>(m_anonymized_image_name, anonymized_snap_name);
115 }
116
117 private:
118 string m_image_name;
119 string m_anonymized_image_name;
120 map<string, string> m_snaps;
121 };
122
123 static void usage(const string &prog) {
124 std::stringstream str;
125 str << "Usage: " << prog << " ";
126 std::cout << str.str() << "[ --window <seconds> ] [ --anonymize ] [ --verbose ]" << std::endl
127 << std::string(str.str().size(), ' ') << "<trace-input> <replay-output>" << endl;
128 }
129
130 __attribute__((noreturn)) static void usage_exit(const string &prog, const string &msg) {
131 cerr << msg << endl;
132 usage(prog);
133 exit(1);
134 }
135
136 class Processor {
137 public:
138 Processor()
139 : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second
140 m_io_count(0),
141 m_anonymize(false),
142 m_verbose(false) {
143 }
144
145 void run(vector<string> args) {
146 string input_file_name;
147 string output_file_name;
148 bool got_input = false;
149 bool got_output = false;
150 for (int i = 1, nargs = args.size(); i < nargs; i++) {
151 const string& arg(args[i]);
152 if (arg == "--window") {
153 if (i == nargs - 1) {
154 usage_exit(args[0], "--window requires an argument");
155 }
156 m_window = (uint64_t)(1e9 * atof(args[++i].c_str()));
157 } else if (arg.compare(0, 9, "--window=") == 0) {
158 m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window=")));
159 } else if (arg == "--anonymize") {
160 m_anonymize = true;
161 } else if (arg == "--verbose") {
162 m_verbose = true;
163 } else if (arg == "-h" || arg == "--help") {
164 usage(args[0]);
165 exit(0);
166 } else if (arg.compare(0, 1, "-") == 0) {
167 usage_exit(args[0], "Unrecognized argument: " + arg);
168 } else if (!got_input) {
169 input_file_name = arg;
170 got_input = true;
171 } else if (!got_output) {
172 output_file_name = arg;
173 got_output = true;
174 } else {
175 usage_exit(args[0], "Too many arguments");
176 }
177 }
178 if (!got_output) {
179 usage_exit(args[0], "Not enough arguments");
180 }
181
182 struct bt_context *ctx = bt_context_create();
183 int trace_handle = bt_context_add_trace(ctx,
184 input_file_name.c_str(), // path
185 "ctf", // format
186 NULL, // packet_seek
187 NULL, // stream_list
188 NULL); // metadata
189 ASSERT_EXIT(trace_handle >= 0, "Error loading trace file");
190
191 uint64_t start_time_ns = bt_trace_handle_get_timestamp_begin(ctx, trace_handle, BT_CLOCK_REAL);
192 ASSERT_EXIT(start_time_ns != -1ULL,
193 "Error extracting creation time from trace");
194
195 struct bt_ctf_iter *itr = bt_ctf_iter_create(ctx,
196 NULL, // begin_pos
197 NULL); // end_pos
198 ceph_assert(itr);
199
200 struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
201
202 int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0644);
203 ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name <<
204 ": " << cpp_strerror(errno));
205 BOOST_SCOPE_EXIT( (fd) ) {
206 close(fd);
207 } BOOST_SCOPE_EXIT_END;
208
209 write_banner(fd);
210
211 uint64_t trace_start = 0;
212 bool first = true;
213 while(true) {
214 struct bt_ctf_event *evt = bt_ctf_iter_read_event(itr);
215 if(!evt) {
216 break;
217 }
218 uint64_t ts = bt_ctf_get_timestamp(evt);
219 ASSERT_EXIT(ts != -1ULL, "Error extracting event timestamp");
220
221 if (first) {
222 trace_start = ts;
223 first = false;
224 }
225 ts -= trace_start;
226 ts += 4; // This is so we have room to insert two events (thread start and open image) at unique timestamps before whatever the first event is.
227
228 IO::ptrs ptrs;
229 process_event(ts, evt, &ptrs);
230 serialize_events(fd, ptrs);
231
232 int r = bt_iter_next(bt_itr);
233 ASSERT_EXIT(r == 0, "Error advancing event iterator");
234 }
235
236 bt_ctf_iter_destroy(itr);
237
238 insert_thread_stops(fd);
239 }
240
241 private:
242 void write_banner(int fd) {
243 bufferlist bl;
244 bl.append(rbd_replay::action::BANNER);
245 int r = bl.write_fd(fd);
246 ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
247 }
248
249 void serialize_events(int fd, const IO::ptrs &ptrs) {
250 for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) {
251 IO::ptr io(*it);
252
253 bufferlist bl;
254 io->encode(bl);
255
256 int r = bl.write_fd(fd);
257 ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
258
259 if (m_verbose) {
260 io->write_debug(std::cout);
261 std::cout << std::endl;
262 }
263 }
264 }
265
266 void insert_thread_stops(int fd) {
267 IO::ptrs ios;
268 for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(),
269 end = m_threads.end(); itr != end; ++itr) {
270 Thread::ptr thread(itr->second);
271 ios.push_back(IO::ptr(new StopThreadIO(next_id(), thread->max_ts(),
272 thread->id(),
273 m_recent_completions)));
274 }
275 serialize_events(fd, ios);
276 }
277
278 void process_event(uint64_t ts, struct bt_ctf_event *evt,
279 IO::ptrs *ios) {
280 const char *event_name = bt_ctf_event_name(evt);
281 const struct bt_definition *scope_context = bt_ctf_get_top_level_scope(evt,
282 BT_STREAM_EVENT_CONTEXT);
283 ASSERT_EXIT(scope_context != NULL, "Error retrieving event context");
284
285 const struct bt_definition *scope_fields = bt_ctf_get_top_level_scope(evt,
286 BT_EVENT_FIELDS);
287 ASSERT_EXIT(scope_fields != NULL, "Error retrieving event fields");
288
289 const struct bt_definition *pthread_id_field = bt_ctf_get_field(evt, scope_context, "pthread_id");
290 ASSERT_EXIT(pthread_id_field != NULL, "Error retrieving thread id");
291
292 thread_id_t threadID = bt_ctf_get_uint64(pthread_id_field);
293 Thread::ptr &thread(m_threads[threadID]);
294 if (!thread) {
295 thread.reset(new Thread(threadID, m_window));
296 IO::ptr io(new StartThreadIO(next_id(), ts - 4, threadID));
297 ios->push_back(io);
298 }
299 thread->insert_ts(ts);
300
301 class FieldLookup {
302 public:
303 FieldLookup(struct bt_ctf_event *evt,
304 const struct bt_definition *scope)
305 : m_evt(evt),
306 m_scope(scope) {
307 }
308
309 const char* string(const char* name) {
310 const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
311 ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
312
313 const char* c = bt_ctf_get_string(field);
314 int err = bt_ctf_field_get_error();
315 ASSERT_EXIT(c && err == 0, "Error retrieving field value '" << name <<
316 "': error=" << err);
317 return c;
318 }
319
320 int64_t int64(const char* name) {
321 const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
322 ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
323
324 int64_t val = bt_ctf_get_int64(field);
325 int err = bt_ctf_field_get_error();
326 ASSERT_EXIT(err == 0, "Error retrieving field value '" << name <<
327 "': error=" << err);
328 return val;
329 }
330
331 uint64_t uint64(const char* name) {
332 const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
333 ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
334
335 uint64_t val = bt_ctf_get_uint64(field);
336 int err = bt_ctf_field_get_error();
337 ASSERT_EXIT(err == 0, "Error retrieving field value '" << name <<
338 "': error=" << err);
339 return val;
340 }
341
342 private:
343 struct bt_ctf_event *m_evt;
344 const struct bt_definition *m_scope;
345 } fields(evt, scope_fields);
346
347 if (strcmp(event_name, "librbd:open_image_enter") == 0) {
348 string name(fields.string("name"));
349 string snap_name(fields.string("snap_name"));
350 bool readonly = fields.uint64("read_only");
351 imagectx_id_t imagectx = fields.uint64("imagectx");
352 action_id_t ionum = next_id();
353 pair<string, string> aname(map_image_snap(name, snap_name));
354 IO::ptr io(new OpenImageIO(ionum, ts, threadID, m_recent_completions,
355 imagectx, aname.first, aname.second,
356 readonly));
357 thread->issued_io(io, &m_latest_ios);
358 ios->push_back(io);
359 } else if (strcmp(event_name, "librbd:open_image_exit") == 0) {
360 completed(thread->latest_io());
361 boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->latest_io()));
362 ceph_assert(io);
363 m_open_images.insert(io->imagectx());
364 } else if (strcmp(event_name, "librbd:close_image_enter") == 0) {
365 imagectx_id_t imagectx = fields.uint64("imagectx");
366 action_id_t ionum = next_id();
367 IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions,
368 imagectx));
369 thread->issued_io(io, &m_latest_ios);
370 ios->push_back(thread->latest_io());
371 } else if (strcmp(event_name, "librbd:close_image_exit") == 0) {
372 completed(thread->latest_io());
373 boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->latest_io()));
374 ceph_assert(io);
375 m_open_images.erase(io->imagectx());
376 } else if (strcmp(event_name, "librbd:aio_open_image_enter") == 0) {
377 string name(fields.string("name"));
378 string snap_name(fields.string("snap_name"));
379 bool readonly = fields.uint64("read_only");
380 imagectx_id_t imagectx = fields.uint64("imagectx");
381 uint64_t completion = fields.uint64("completion");
382 action_id_t ionum = next_id();
383 pair<string, string> aname(map_image_snap(name, snap_name));
384 IO::ptr io(new AioOpenImageIO(ionum, ts, threadID, m_recent_completions,
385 imagectx, aname.first, aname.second,
386 readonly));
387 thread->issued_io(io, &m_latest_ios);
388 ios->push_back(io);
389 m_pending_ios[completion] = io;
390 } else if (strcmp(event_name, "librbd:aio_close_image_enter") == 0) {
391 imagectx_id_t imagectx = fields.uint64("imagectx");
392 uint64_t completion = fields.uint64("completion");
393 action_id_t ionum = next_id();
394 IO::ptr io(new AioCloseImageIO(ionum, ts, threadID, m_recent_completions,
395 imagectx));
396 thread->issued_io(io, &m_latest_ios);
397 ios->push_back(thread->latest_io());
398 m_pending_ios[completion] = io;
399 } else if (strcmp(event_name, "librbd:read_enter") == 0 ||
400 strcmp(event_name, "librbd:read2_enter") == 0) {
401 string name(fields.string("name"));
402 string snap_name(fields.string("snap_name"));
403 bool readonly = fields.int64("read_only");
404 imagectx_id_t imagectx = fields.uint64("imagectx");
405 uint64_t offset = fields.uint64("offset");
406 uint64_t length = fields.uint64("length");
407 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
408 action_id_t ionum = next_id();
409 IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx,
410 offset, length));
411 thread->issued_io(io, &m_latest_ios);
412 ios->push_back(io);
413 } else if (strcmp(event_name, "librbd:read_exit") == 0) {
414 completed(thread->latest_io());
415 } else if (strcmp(event_name, "librbd:write_enter") == 0 ||
416 strcmp(event_name, "librbd:write2_enter") == 0) {
417 string name(fields.string("name"));
418 string snap_name(fields.string("snap_name"));
419 bool readonly = fields.int64("read_only");
420 uint64_t offset = fields.uint64("off");
421 uint64_t length = fields.uint64("buf_len");
422 imagectx_id_t imagectx = fields.uint64("imagectx");
423 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
424 action_id_t ionum = next_id();
425 IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions,
426 imagectx, offset, length));
427 thread->issued_io(io, &m_latest_ios);
428 ios->push_back(io);
429 } else if (strcmp(event_name, "librbd:write_exit") == 0) {
430 completed(thread->latest_io());
431 } else if (strcmp(event_name, "librbd:discard_enter") == 0) {
432 string name(fields.string("name"));
433 string snap_name(fields.string("snap_name"));
434 bool readonly = fields.int64("read_only");
435 uint64_t offset = fields.uint64("off");
436 uint64_t length = fields.uint64("len");
437 imagectx_id_t imagectx = fields.uint64("imagectx");
438 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
439 action_id_t ionum = next_id();
440 IO::ptr io(new DiscardIO(ionum, ts, threadID, m_recent_completions,
441 imagectx, offset, length));
442 thread->issued_io(io, &m_latest_ios);
443 ios->push_back(io);
444 } else if (strcmp(event_name, "librbd:discard_exit") == 0) {
445 completed(thread->latest_io());
446 } else if (strcmp(event_name, "librbd:aio_read_enter") == 0 ||
447 strcmp(event_name, "librbd:aio_read2_enter") == 0) {
448 string name(fields.string("name"));
449 string snap_name(fields.string("snap_name"));
450 bool readonly = fields.int64("read_only");
451 uint64_t completion = fields.uint64("completion");
452 imagectx_id_t imagectx = fields.uint64("imagectx");
453 uint64_t offset = fields.uint64("offset");
454 uint64_t length = fields.uint64("length");
455 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
456 action_id_t ionum = next_id();
457 IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions,
458 imagectx, offset, length));
459 ios->push_back(io);
460 thread->issued_io(io, &m_latest_ios);
461 m_pending_ios[completion] = io;
462 } else if (strcmp(event_name, "librbd:aio_write_enter") == 0 ||
463 strcmp(event_name, "librbd:aio_write2_enter") == 0) {
464 string name(fields.string("name"));
465 string snap_name(fields.string("snap_name"));
466 bool readonly = fields.int64("read_only");
467 uint64_t offset = fields.uint64("off");
468 uint64_t length = fields.uint64("len");
469 uint64_t completion = fields.uint64("completion");
470 imagectx_id_t imagectx = fields.uint64("imagectx");
471 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
472 action_id_t ionum = next_id();
473 IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions,
474 imagectx, offset, length));
475 thread->issued_io(io, &m_latest_ios);
476 ios->push_back(io);
477 m_pending_ios[completion] = io;
478 } else if (strcmp(event_name, "librbd:aio_discard_enter") == 0) {
479 string name(fields.string("name"));
480 string snap_name(fields.string("snap_name"));
481 bool readonly = fields.int64("read_only");
482 uint64_t offset = fields.uint64("off");
483 uint64_t length = fields.uint64("len");
484 uint64_t completion = fields.uint64("completion");
485 imagectx_id_t imagectx = fields.uint64("imagectx");
486 require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
487 action_id_t ionum = next_id();
488 IO::ptr io(new AioDiscardIO(ionum, ts, threadID, m_recent_completions,
489 imagectx, offset, length));
490 thread->issued_io(io, &m_latest_ios);
491 ios->push_back(io);
492 m_pending_ios[completion] = io;
493 } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) {
494 uint64_t completion = fields.uint64("completion");
495 map<uint64_t, IO::ptr>::iterator itr = m_pending_ios.find(completion);
496 if (itr != m_pending_ios.end()) {
497 IO::ptr completedIO(itr->second);
498 m_pending_ios.erase(itr);
499 completed(completedIO);
500 }
501 }
502 }
503
504 action_id_t next_id() {
505 action_id_t id = m_io_count;
506 m_io_count += 2;
507 return id;
508 }
509
510 void completed(IO::ptr io) {
511 uint64_t limit = (io->start_time() < m_window ?
512 0 : io->start_time() - m_window);
513 for (io_set_t::iterator itr = m_recent_completions.begin();
514 itr != m_recent_completions.end(); ) {
515 IO::ptr recent_comp(*itr);
516 if ((recent_comp->start_time() < limit ||
517 io->dependencies().count(recent_comp) != 0) &&
518 m_latest_ios.count(recent_comp) == 0) {
519 m_recent_completions.erase(itr++);
520 } else {
521 ++itr;
522 }
523 }
524 m_recent_completions.insert(io);
525 }
526
527 pair<string, string> map_image_snap(string image_name, string snap_name) {
528 if (!m_anonymize) {
529 return pair<string, string>(image_name, snap_name);
530 }
531 AnonymizedImage& m(m_anonymized_images[image_name]);
532 if (m.image_name() == "") {
533 m.init(image_name, m_anonymized_images.size());
534 }
535 return m.anonymize(snap_name);
536 }
537
538 void require_image(uint64_t ts,
539 Thread::ptr thread,
540 imagectx_id_t imagectx,
541 const string& name,
542 const string& snap_name,
543 bool readonly,
544 IO::ptrs *ios) {
545 ceph_assert(thread);
546 if (m_open_images.count(imagectx) > 0) {
547 return;
548 }
549 action_id_t ionum = next_id();
550 pair<string, string> aname(map_image_snap(name, snap_name));
551 IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(),
552 m_recent_completions, imagectx, aname.first,
553 aname.second, readonly));
554 thread->issued_io(io, &m_latest_ios);
555 ios->push_back(io);
556 completed(io);
557 m_open_images.insert(imagectx);
558 }
559
560 uint64_t m_window;
561 map<thread_id_t, Thread::ptr> m_threads;
562 uint32_t m_io_count;
563 io_set_t m_recent_completions;
564 set<imagectx_id_t> m_open_images;
565
566 // keyed by completion
567 map<uint64_t, IO::ptr> m_pending_ios;
568 std::set<IO::ptr> m_latest_ios;
569
570 bool m_anonymize;
571 map<string, AnonymizedImage> m_anonymized_images;
572
573 bool m_verbose;
574 };
575
576 int main(int argc, char** argv) {
577 vector<string> args;
578 for (int i = 0; i < argc; i++) {
579 args.push_back(string(argv[i]));
580 }
581
582 Processor p;
583 p.run(args);
584 }