]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/trace_replay/trace_replay.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / trace_replay / trace_replay.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
f67539c2 6#include "trace_replay/trace_replay.h"
11fdf7f2
TL
7
8#include <chrono>
9#include <sstream>
10#include <thread>
1e59de90 11
f67539c2 12#include "db/db_impl/db_impl.h"
1e59de90
TL
13#include "rocksdb/env.h"
14#include "rocksdb/iterator.h"
15#include "rocksdb/options.h"
11fdf7f2 16#include "rocksdb/slice.h"
1e59de90
TL
17#include "rocksdb/system_clock.h"
18#include "rocksdb/trace_reader_writer.h"
11fdf7f2
TL
19#include "rocksdb/write_batch.h"
20#include "util/coding.h"
21#include "util/string_util.h"
22
f67539c2 23namespace ROCKSDB_NAMESPACE {
11fdf7f2 24
494da23a
TL
25const std::string kTraceMagic = "feedcafedeadbeef";
26
11fdf7f2 27namespace {
11fdf7f2
TL
28void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
29 Slice buf(buffer);
30 GetFixed32(&buf, cf_id);
31 GetLengthPrefixedSlice(&buf, key);
32}
33} // namespace
34
1e59de90
TL
35Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) {
36 if (v_string.find_first_of('.') == std::string::npos ||
37 v_string.find_first_of('.') != v_string.find_last_of('.')) {
38 return Status::Corruption(
39 "Corrupted trace file. Incorrect version format.");
40 }
41 int tmp_num = 0;
42 for (int i = 0; i < static_cast<int>(v_string.size()); i++) {
43 if (v_string[i] == '.') {
44 continue;
45 } else if (isdigit(v_string[i])) {
46 tmp_num = tmp_num * 10 + (v_string[i] - '0');
47 } else {
48 return Status::Corruption(
49 "Corrupted trace file. Incorrect version format");
50 }
51 }
52 *v_num = tmp_num;
53 return Status::OK();
54}
55
56Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version,
57 int* db_version) {
58 std::vector<std::string> s_vec;
59 int begin = 0, end;
60 for (int i = 0; i < 3; i++) {
61 assert(header.payload.find("\t", begin) != std::string::npos);
62 end = static_cast<int>(header.payload.find("\t", begin));
63 s_vec.push_back(header.payload.substr(begin, end - begin));
64 begin = end + 1;
65 }
66
67 std::string t_v_str, db_v_str;
68 assert(s_vec.size() == 3);
69 assert(s_vec[1].find("Trace Version: ") != std::string::npos);
70 t_v_str = s_vec[1].substr(15);
71 assert(s_vec[2].find("RocksDB Version: ") != std::string::npos);
72 db_v_str = s_vec[2].substr(17);
73
74 Status s;
75 s = ParseVersionStr(t_v_str, trace_version);
76 if (s != Status::OK()) {
77 return s;
78 }
79 s = ParseVersionStr(db_v_str, db_version);
80 return s;
81}
82
f67539c2
TL
83void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
84 assert(encoded_trace);
85 PutFixed64(encoded_trace, trace.ts);
86 encoded_trace->push_back(trace.type);
87 PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
88 encoded_trace->append(trace.payload);
89}
90
91Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
92 Trace* trace) {
93 assert(trace != nullptr);
94 Slice enc_slice = Slice(encoded_trace);
95 if (!GetFixed64(&enc_slice, &trace->ts)) {
96 return Status::Incomplete("Decode trace string failed");
97 }
98 if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
99 return Status::Incomplete("Decode trace string failed");
100 }
101 trace->type = static_cast<TraceType>(enc_slice[0]);
102 enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
103 trace->payload = enc_slice.ToString();
104 return Status::OK();
105}
106
1e59de90
TL
107Status TracerHelper::DecodeHeader(const std::string& encoded_trace,
108 Trace* header) {
109 Status s = TracerHelper::DecodeTrace(encoded_trace, header);
110
111 if (header->type != kTraceBegin) {
112 return Status::Corruption("Corrupted trace file. Incorrect header.");
113 }
114 if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
115 return Status::Corruption("Corrupted trace file. Incorrect magic.");
116 }
117
118 return s;
119}
120
121bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
122 const TracePayloadType payload_type) {
123 uint64_t old_state = payload_map;
124 uint64_t tmp = 1;
125 payload_map |= (tmp << payload_type);
126 return old_state != payload_map;
127}
128
129Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
130 std::unique_ptr<TraceRecord>* record) {
131 assert(trace != nullptr);
132
133 if (record != nullptr) {
134 record->reset(nullptr);
135 }
136
137 switch (trace->type) {
138 // Write
139 case kTraceWrite: {
140 PinnableSlice rep;
141 if (trace_file_version < 2) {
142 rep.PinSelf(trace->payload);
143 } else {
144 Slice buf(trace->payload);
145 GetFixed64(&buf, &trace->payload_map);
146 int64_t payload_map = static_cast<int64_t>(trace->payload_map);
147 Slice write_batch_data;
148 while (payload_map) {
149 // Find the rightmost set bit.
150 uint32_t set_pos =
151 static_cast<uint32_t>(log2(payload_map & -payload_map));
152 switch (set_pos) {
153 case TracePayloadType::kWriteBatchData: {
154 GetLengthPrefixedSlice(&buf, &write_batch_data);
155 break;
156 }
157 default: {
158 assert(false);
159 }
160 }
161 // unset the rightmost bit.
162 payload_map &= (payload_map - 1);
163 }
164 rep.PinSelf(write_batch_data);
165 }
166
167 if (record != nullptr) {
168 record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
169 }
170
171 return Status::OK();
172 }
173 // Get
174 case kTraceGet: {
175 uint32_t cf_id = 0;
176 Slice get_key;
177
178 if (trace_file_version < 2) {
179 DecodeCFAndKey(trace->payload, &cf_id, &get_key);
180 } else {
181 Slice buf(trace->payload);
182 GetFixed64(&buf, &trace->payload_map);
183 int64_t payload_map = static_cast<int64_t>(trace->payload_map);
184 while (payload_map) {
185 // Find the rightmost set bit.
186 uint32_t set_pos =
187 static_cast<uint32_t>(log2(payload_map & -payload_map));
188 switch (set_pos) {
189 case TracePayloadType::kGetCFID: {
190 GetFixed32(&buf, &cf_id);
191 break;
192 }
193 case TracePayloadType::kGetKey: {
194 GetLengthPrefixedSlice(&buf, &get_key);
195 break;
196 }
197 default: {
198 assert(false);
199 }
200 }
201 // unset the rightmost bit.
202 payload_map &= (payload_map - 1);
203 }
204 }
205
206 if (record != nullptr) {
207 PinnableSlice ps;
208 ps.PinSelf(get_key);
209 record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
210 }
211
212 return Status::OK();
213 }
214 // Iterator Seek and SeekForPrev
215 case kTraceIteratorSeek:
216 case kTraceIteratorSeekForPrev: {
217 uint32_t cf_id = 0;
218 Slice iter_key;
219 Slice lower_bound;
220 Slice upper_bound;
221
222 if (trace_file_version < 2) {
223 DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
224 } else {
225 Slice buf(trace->payload);
226 GetFixed64(&buf, &trace->payload_map);
227 int64_t payload_map = static_cast<int64_t>(trace->payload_map);
228 while (payload_map) {
229 // Find the rightmost set bit.
230 uint32_t set_pos =
231 static_cast<uint32_t>(log2(payload_map & -payload_map));
232 switch (set_pos) {
233 case TracePayloadType::kIterCFID: {
234 GetFixed32(&buf, &cf_id);
235 break;
236 }
237 case TracePayloadType::kIterKey: {
238 GetLengthPrefixedSlice(&buf, &iter_key);
239 break;
240 }
241 case TracePayloadType::kIterLowerBound: {
242 GetLengthPrefixedSlice(&buf, &lower_bound);
243 break;
244 }
245 case TracePayloadType::kIterUpperBound: {
246 GetLengthPrefixedSlice(&buf, &upper_bound);
247 break;
248 }
249 default: {
250 assert(false);
251 }
252 }
253 // unset the rightmost bit.
254 payload_map &= (payload_map - 1);
255 }
256 }
257
258 if (record != nullptr) {
259 PinnableSlice ps_key;
260 ps_key.PinSelf(iter_key);
261 PinnableSlice ps_lower;
262 ps_lower.PinSelf(lower_bound);
263 PinnableSlice ps_upper;
264 ps_upper.PinSelf(upper_bound);
265 record->reset(new IteratorSeekQueryTraceRecord(
266 static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
267 cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
268 trace->ts));
269 }
270
271 return Status::OK();
272 }
273 // MultiGet
274 case kTraceMultiGet: {
275 if (trace_file_version < 2) {
276 return Status::Corruption("MultiGet is not supported.");
277 }
278
279 uint32_t multiget_size = 0;
280 std::vector<uint32_t> cf_ids;
281 std::vector<PinnableSlice> multiget_keys;
282
283 Slice cfids_payload;
284 Slice keys_payload;
285 Slice buf(trace->payload);
286 GetFixed64(&buf, &trace->payload_map);
287 int64_t payload_map = static_cast<int64_t>(trace->payload_map);
288 while (payload_map) {
289 // Find the rightmost set bit.
290 uint32_t set_pos =
291 static_cast<uint32_t>(log2(payload_map & -payload_map));
292 switch (set_pos) {
293 case TracePayloadType::kMultiGetSize: {
294 GetFixed32(&buf, &multiget_size);
295 break;
296 }
297 case TracePayloadType::kMultiGetCFIDs: {
298 GetLengthPrefixedSlice(&buf, &cfids_payload);
299 break;
300 }
301 case TracePayloadType::kMultiGetKeys: {
302 GetLengthPrefixedSlice(&buf, &keys_payload);
303 break;
304 }
305 default: {
306 assert(false);
307 }
308 }
309 // unset the rightmost bit.
310 payload_map &= (payload_map - 1);
311 }
312 if (multiget_size == 0) {
313 return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
314 }
315
316 // Decode the cfids_payload and keys_payload
317 cf_ids.reserve(multiget_size);
318 multiget_keys.reserve(multiget_size);
319 for (uint32_t i = 0; i < multiget_size; i++) {
320 uint32_t tmp_cfid;
321 Slice tmp_key;
322 GetFixed32(&cfids_payload, &tmp_cfid);
323 GetLengthPrefixedSlice(&keys_payload, &tmp_key);
324 cf_ids.push_back(tmp_cfid);
325 Slice s(tmp_key);
326 PinnableSlice ps;
327 ps.PinSelf(s);
328 multiget_keys.push_back(std::move(ps));
329 }
330
331 if (record != nullptr) {
332 record->reset(new MultiGetQueryTraceRecord(
333 std::move(cf_ids), std::move(multiget_keys), trace->ts));
334 }
335
336 return Status::OK();
337 }
338 default:
339 return Status::NotSupported("Unsupported trace type.");
340 }
341}
342
343Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
494da23a 344 std::unique_ptr<TraceWriter>&& trace_writer)
1e59de90 345 : clock_(clock),
494da23a
TL
346 trace_options_(trace_options),
347 trace_writer_(std::move(trace_writer)),
1e59de90 348 trace_request_count_(0) {
20effc67
TL
349 // TODO: What if this fails?
350 WriteHeader().PermitUncheckedError();
11fdf7f2
TL
351}
352
353Tracer::~Tracer() { trace_writer_.reset(); }
354
355Status Tracer::Write(WriteBatch* write_batch) {
494da23a
TL
356 TraceType trace_type = kTraceWrite;
357 if (ShouldSkipTrace(trace_type)) {
358 return Status::OK();
359 }
11fdf7f2 360 Trace trace;
1e59de90 361 trace.ts = clock_->NowMicros();
494da23a 362 trace.type = trace_type;
1e59de90
TL
363 TracerHelper::SetPayloadMap(trace.payload_map,
364 TracePayloadType::kWriteBatchData);
365 PutFixed64(&trace.payload, trace.payload_map);
366 PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data()));
11fdf7f2
TL
367 return WriteTrace(trace);
368}
369
370Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
494da23a
TL
371 TraceType trace_type = kTraceGet;
372 if (ShouldSkipTrace(trace_type)) {
373 return Status::OK();
374 }
11fdf7f2 375 Trace trace;
1e59de90 376 trace.ts = clock_->NowMicros();
494da23a 377 trace.type = trace_type;
1e59de90
TL
378 // Set the payloadmap of the struct member that will be encoded in the
379 // payload.
380 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID);
381 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey);
382 // Encode the Get struct members into payload. Make sure add them in order.
383 PutFixed64(&trace.payload, trace.payload_map);
384 PutFixed32(&trace.payload, column_family->GetID());
385 PutLengthPrefixedSlice(&trace.payload, key);
11fdf7f2
TL
386 return WriteTrace(trace);
387}
388
1e59de90
TL
389Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key,
390 const Slice& lower_bound, const Slice upper_bound) {
494da23a
TL
391 TraceType trace_type = kTraceIteratorSeek;
392 if (ShouldSkipTrace(trace_type)) {
393 return Status::OK();
394 }
11fdf7f2 395 Trace trace;
1e59de90 396 trace.ts = clock_->NowMicros();
494da23a 397 trace.type = trace_type;
1e59de90
TL
398 // Set the payloadmap of the struct member that will be encoded in the
399 // payload.
400 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
401 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
402 if (lower_bound.size() > 0) {
403 TracerHelper::SetPayloadMap(trace.payload_map,
404 TracePayloadType::kIterLowerBound);
405 }
406 if (upper_bound.size() > 0) {
407 TracerHelper::SetPayloadMap(trace.payload_map,
408 TracePayloadType::kIterUpperBound);
409 }
410 // Encode the Iterator struct members into payload. Make sure add them in
411 // order.
412 PutFixed64(&trace.payload, trace.payload_map);
413 PutFixed32(&trace.payload, cf_id);
414 PutLengthPrefixedSlice(&trace.payload, key);
415 if (lower_bound.size() > 0) {
416 PutLengthPrefixedSlice(&trace.payload, lower_bound);
417 }
418 if (upper_bound.size() > 0) {
419 PutLengthPrefixedSlice(&trace.payload, upper_bound);
420 }
11fdf7f2
TL
421 return WriteTrace(trace);
422}
423
1e59de90
TL
424Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
425 const Slice& lower_bound,
426 const Slice upper_bound) {
494da23a
TL
427 TraceType trace_type = kTraceIteratorSeekForPrev;
428 if (ShouldSkipTrace(trace_type)) {
429 return Status::OK();
430 }
11fdf7f2 431 Trace trace;
1e59de90
TL
432 trace.ts = clock_->NowMicros();
433 trace.type = trace_type;
434 // Set the payloadmap of the struct member that will be encoded in the
435 // payload.
436 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
437 TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
438 if (lower_bound.size() > 0) {
439 TracerHelper::SetPayloadMap(trace.payload_map,
440 TracePayloadType::kIterLowerBound);
441 }
442 if (upper_bound.size() > 0) {
443 TracerHelper::SetPayloadMap(trace.payload_map,
444 TracePayloadType::kIterUpperBound);
445 }
446 // Encode the Iterator struct members into payload. Make sure add them in
447 // order.
448 PutFixed64(&trace.payload, trace.payload_map);
449 PutFixed32(&trace.payload, cf_id);
450 PutLengthPrefixedSlice(&trace.payload, key);
451 if (lower_bound.size() > 0) {
452 PutLengthPrefixedSlice(&trace.payload, lower_bound);
453 }
454 if (upper_bound.size() > 0) {
455 PutLengthPrefixedSlice(&trace.payload, upper_bound);
456 }
457 return WriteTrace(trace);
458}
459
460Status Tracer::MultiGet(const size_t num_keys,
461 ColumnFamilyHandle** column_families,
462 const Slice* keys) {
463 if (num_keys == 0) {
464 return Status::OK();
465 }
466 std::vector<ColumnFamilyHandle*> v_column_families;
467 std::vector<Slice> v_keys;
468 v_column_families.resize(num_keys);
469 v_keys.resize(num_keys);
470 for (size_t i = 0; i < num_keys; i++) {
471 v_column_families[i] = column_families[i];
472 v_keys[i] = keys[i];
473 }
474 return MultiGet(v_column_families, v_keys);
475}
476
477Status Tracer::MultiGet(const size_t num_keys,
478 ColumnFamilyHandle* column_family, const Slice* keys) {
479 if (num_keys == 0) {
480 return Status::OK();
481 }
482 std::vector<ColumnFamilyHandle*> column_families;
483 std::vector<Slice> v_keys;
484 column_families.resize(num_keys);
485 v_keys.resize(num_keys);
486 for (size_t i = 0; i < num_keys; i++) {
487 column_families[i] = column_family;
488 v_keys[i] = keys[i];
489 }
490 return MultiGet(column_families, v_keys);
491}
492
493Status Tracer::MultiGet(const std::vector<ColumnFamilyHandle*>& column_families,
494 const std::vector<Slice>& keys) {
495 if (column_families.size() != keys.size()) {
496 return Status::Corruption("the CFs size and keys size does not match!");
497 }
498 TraceType trace_type = kTraceMultiGet;
499 if (ShouldSkipTrace(trace_type)) {
500 return Status::OK();
501 }
502 uint32_t multiget_size = static_cast<uint32_t>(keys.size());
503 Trace trace;
504 trace.ts = clock_->NowMicros();
494da23a 505 trace.type = trace_type;
1e59de90
TL
506 // Set the payloadmap of the struct member that will be encoded in the
507 // payload.
508 TracerHelper::SetPayloadMap(trace.payload_map,
509 TracePayloadType::kMultiGetSize);
510 TracerHelper::SetPayloadMap(trace.payload_map,
511 TracePayloadType::kMultiGetCFIDs);
512 TracerHelper::SetPayloadMap(trace.payload_map,
513 TracePayloadType::kMultiGetKeys);
514 // Encode the CFIDs inorder
515 std::string cfids_payload;
516 std::string keys_payload;
517 for (uint32_t i = 0; i < multiget_size; i++) {
518 assert(i < column_families.size());
519 assert(i < keys.size());
520 PutFixed32(&cfids_payload, column_families[i]->GetID());
521 PutLengthPrefixedSlice(&keys_payload, keys[i]);
522 }
523 // Encode the Get struct members into payload. Make sure add them in order.
524 PutFixed64(&trace.payload, trace.payload_map);
525 PutFixed32(&trace.payload, multiget_size);
526 PutLengthPrefixedSlice(&trace.payload, cfids_payload);
527 PutLengthPrefixedSlice(&trace.payload, keys_payload);
11fdf7f2
TL
528 return WriteTrace(trace);
529}
530
494da23a
TL
531bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
532 if (IsTraceFileOverMax()) {
533 return true;
534 }
1e59de90
TL
535
536 TraceFilterType filter_mask = kTraceFilterNone;
537 switch (trace_type) {
538 case kTraceNone:
539 case kTraceBegin:
540 case kTraceEnd:
541 filter_mask = kTraceFilterNone;
542 break;
543 case kTraceWrite:
544 filter_mask = kTraceFilterWrite;
545 break;
546 case kTraceGet:
547 filter_mask = kTraceFilterGet;
548 break;
549 case kTraceIteratorSeek:
550 filter_mask = kTraceFilterIteratorSeek;
551 break;
552 case kTraceIteratorSeekForPrev:
553 filter_mask = kTraceFilterIteratorSeekForPrev;
554 break;
555 case kBlockTraceIndexBlock:
556 case kBlockTraceFilterBlock:
557 case kBlockTraceDataBlock:
558 case kBlockTraceUncompressionDictBlock:
559 case kBlockTraceRangeDeletionBlock:
560 case kIOTracer:
561 filter_mask = kTraceFilterNone;
562 break;
563 case kTraceMultiGet:
564 filter_mask = kTraceFilterMultiGet;
565 break;
566 case kTraceMax:
567 assert(false);
568 filter_mask = kTraceFilterNone;
569 break;
570 }
571 if (filter_mask != kTraceFilterNone && trace_options_.filter & filter_mask) {
494da23a
TL
572 return true;
573 }
1e59de90 574
494da23a
TL
575 ++trace_request_count_;
576 if (trace_request_count_ < trace_options_.sampling_frequency) {
577 return true;
578 }
579 trace_request_count_ = 0;
580 return false;
581}
582
583bool Tracer::IsTraceFileOverMax() {
584 uint64_t trace_file_size = trace_writer_->GetFileSize();
585 return (trace_file_size > trace_options_.max_trace_file_size);
586}
587
11fdf7f2
TL
588Status Tracer::WriteHeader() {
589 std::ostringstream s;
590 s << kTraceMagic << "\t"
1e59de90
TL
591 << "Trace Version: " << kTraceFileMajorVersion << "."
592 << kTraceFileMinorVersion << "\t"
11fdf7f2
TL
593 << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
594 << "Format: Timestamp OpType Payload\n";
595 std::string header(s.str());
596
597 Trace trace;
1e59de90 598 trace.ts = clock_->NowMicros();
11fdf7f2
TL
599 trace.type = kTraceBegin;
600 trace.payload = header;
601 return WriteTrace(trace);
602}
603
604Status Tracer::WriteFooter() {
605 Trace trace;
1e59de90 606 trace.ts = clock_->NowMicros();
11fdf7f2 607 trace.type = kTraceEnd;
1e59de90
TL
608 TracerHelper::SetPayloadMap(trace.payload_map,
609 TracePayloadType::kEmptyPayload);
11fdf7f2
TL
610 trace.payload = "";
611 return WriteTrace(trace);
612}
613
614Status Tracer::WriteTrace(const Trace& trace) {
615 std::string encoded_trace;
f67539c2 616 TracerHelper::EncodeTrace(trace, &encoded_trace);
11fdf7f2
TL
617 return trace_writer_->Write(Slice(encoded_trace));
618}
619
620Status Tracer::Close() { return WriteFooter(); }
621
f67539c2 622} // namespace ROCKSDB_NAMESPACE